12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
ADADADADAD
电脑知识 时间:2024-12-03 15:01:46
作者:文/会员上传
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
要拉取某段时间的数据,可以使用Kafka的Consumer API来实现。以下是一些步骤和示例代码可供参考:创建一个Consumer实例,并指定要订阅的topic:Properties props = new Properties(
以下为本文的正文内容,内容仅供参考!本站为公益性网站,复制本文以及下载DOC文档全部免费。
要拉取某段时间的数据,可以使用Kafka的Consumer API来实现。以下是一些步骤和示例代码可供参考:
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-consumer-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic-name"));
// 设置要拉取数据的起始时间点long timestamp = new Date().getTime() - 24 * 60 * 60 * 1000; // 24小时前的时间点Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();timestampsToSearch.put(new TopicPartition("topic-name", 0), timestamp);// 从指定时间点开始拉取数据Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampsToSearch);for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {if (entry.getValue() != null) {consumer.seek(entry.getKey(), entry.getValue().offset());}}
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("offset = " + record.offset() + ", key = " + record.key() + ", value = " + record.value());}}
通过这些步骤,您可以使用Kafka Consumer API从指定时间点开始拉取数据并进行处理。请注意,在设置offset时,需要根据分区来设置,并且可能需要处理一些异常情况例如某些分区不存在或者指定时间点之前没有数据等。
11-20
11-19
11-20
11-20
11-20
11-19
11-20
11-20
11-19
11-20
11-19
11-19
11-19
11-19
11-19
11-19