• ADADADADAD

    kafka手动提交偏移量怎么实现[ 电脑知识 ]

    电脑知识 时间:2024-12-03 10:16:50

    作者:文/会员上传

    简介:

    Kafka 提供了两种方式来手动提交偏移量:使用 commitSync() 方法同步提交偏移量:import org.apache.kafka.clients.consumer.*;Properties props = new Properties();props.put

    以下为本文的正文内容,内容仅供参考!本站为公益性网站,复制本文以及下载DOC文档全部免费。

    Kafka 提供了两种方式来手动提交偏移量:

      使用 commitSync() 方法同步提交偏移量:
    import org.apache.kafka.clients.consumer.*;Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "false"); // 关闭自动提交KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息}consumer.commitSync(); // 手动提交偏移量}} finally {consumer.close();}
      使用 commitAsync() 方法异步提交偏移量:
    import org.apache.kafka.clients.consumer.*;Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "false"); // 关闭自动提交KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息}consumer.commitAsync(); // 异步提交偏移量}} finally {consumer.close();}

    在这两种方式中,commitSync() 方法会一直阻塞直到偏移量提交成功或发生错误。而 commitAsync() 方法则会在提交请求发送后立即返回,不会等待确认。如果发生错误,可以在 commitAsync() 方法的回调函数中处理。

    kafka手动提交偏移量怎么实现.docx

    将本文的Word文档下载到电脑

    推荐度:

    下载
    热门标签: Kafka