kafka在python中的使用及结束kafka消费者
2021/12/6 17:17:14
本文主要是介绍kafka在python中的使用及结束kafka消费者,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
先说下问题:
正常使用kafka消费者,接收消息时,会出现消息循环无法结束问题,增加参数 consumer_timeout_ms:超时时间(毫秒),超过指定时间没有获取到消息关闭kafka。(例子如下)
consumer.py文件:
from kafka import KafkaProducer, KafkaConsumer import time class KafkaClient(object): topic = "topic" # 使用的kafka的topic client = "0.0.0.0:19823" # kafka所在的服务地址 group_id = "test_consumer_group" # kafka组信息 @staticmethod def log(log_str): t = time.strftime(r"%Y-%m-%d_%H:%M:%S", time.localtime()) print("[%s]%s" % (t, log_str)) def info_send(self, key, info_str): """key: 发送信息的key;info_str:要发送的信息内容""" producer = KafkaProducer(bootstrap_servers=[self.client]) producer.send(self.topic, key=key.encode("utf-8"), value=info_str.encode("utf-8")) # 批量提交可以使用 producer.flush() producer.close() def message_consumer(): # consumer_timeout_ms:超时时间(毫秒),超过指定时间没有获取到消息关闭kafka consumer = KafkaConsumer(self.topic, group_id=self.group_id, bootstrap_servers=[self.client], consumer_timeout_ms=3000) for msg in consumer: # partition:消息所在的分区,offset:消息所在分区的位置,key:消息的key,value:消息的内容 print(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.key, msg.value)
这篇关于kafka在python中的使用及结束kafka消费者的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-08有遇到过吗?同样的规则 Excel 中 比Python 结果大
- 2024-03-30开始python成长之路
- 2024-03-29python optparse
- 2024-03-29python map 函数
- 2024-03-20invalid format specifier python
- 2024-03-18pool.map python
- 2024-03-18threads in python
- 2024-03-14python Ai 应用开发基础训练,字符串,字典,文件
- 2024-03-13id3 algorithm python
- 2024-03-13sum array elements python