kafka max.poll.interval.ms配置太短
2021/10/27 6:11:39
本文主要是介绍kafka max.poll.interval.ms配置太短,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
max.poll.interval.ms这个应该是消费者每次去kafka拉取数据最大间隔,如果超过这个间隔,服务端会认为消费者已离线。触发rebalance demo
1 public ConsumerDemo(String topicName) { 2 Properties props = new Properties(); 3 props.put("bootstrap.servers", "localhost:9092"); 4 props.put("group.id", GROUPID); 5 props.put("enable.auto.commit", "false"); 6 props.put("max.poll.interval.ms", "1000"); 7 props.put("auto.offset.reset", "earliest"); 8 props.put("key.deserializer", StringDeserializer.class.getName()); 9 props.put("value.deserializer", StringDeserializer.class.getName()); 10 this.consumer = new KafkaConsumer<String, String>(props); 11 this.topic = topicName; 12 this.consumer.subscribe(Arrays.asList(topic)); 13 }
5行配置自动提交为false,手动提交。6行配置 max.poll.interval.ms为1秒
1 public void receiveMsg() { 2 int messageNo = 1; 3 System.out.println("---------开始消费---------"); 4 try { 5 for (;;) { 6 msgList = consumer.poll(1000); 7 System.out.println("start sleep" + System.currentTimeMillis() / 1000); 8 Thread.sleep(10000); 9 System.out.println("end sleep" + System.currentTimeMillis() / 1000); 10 if(null!=msgList&&msgList.count()>0){ 11 for (ConsumerRecord<String, String> record : msgList) { 12 System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset()); 13 } 14 }else{ 15 Thread.sleep(1000); 16 } 17 consumer.commitSync(); 18 } 19 } catch (InterruptedException e) { 20 e.printStackTrace(); 21 } finally { 22 consumer.close(); 23 } 24 }
8行slepp 10秒,模拟处理消息耗时。提交消息的时候报错
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1211) at com.gxf.kafka.ConsumerDemo.receiveMsg(ConsumerDemo.java:49) at com.gxf.kafka.ConsumerDemo.main(ConsumerDemo.java:59)
max.poll.interval.ms 可以配置稍微大点,或者减少处理时间,每次少拉取数据,异步处理等
这篇关于kafka max.poll.interval.ms配置太短的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-08「布道师系列文章」解析 AutoMQ 对象存储中的文件存储格式
- 2024-05-08「布道师系列文章」小红书黄章衡:AutoMQ Serverless 基石-秒级分区迁移
- 2024-05-08AutoMQ 系统测试体系揭秘
- 2024-03-14AutoMQ 携手阿里云共同发布新一代云原生 Kafka,帮助得物有效压缩 85% Kafka 云支出!
- 2024-02-22kafka partitioner
- 2024-01-24AutoMQ生态集成 - 将数据从 AutoMQ Kafka 导入 RisingWave 数据库
- 2024-01-13消息队列面试题:为什么要使用消息队列?
- 2024-01-08"基于 XHAMQ 的消息队列系统实现"
- 2023-11-24全网最全图解Kafka适用场景
- 2023-09-19RabbitMQ 消息应答