kafka在python中的使用及结束kafka消费者

2021/12/6 17:17:14

本文主要是介绍kafka在python中的使用及结束kafka消费者,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

先说下问题:
正常使用kafka消费者,接收消息时,会出现消息循环无法结束问题,增加参数 consumer_timeout_ms:超时时间(毫秒),超过指定时间没有获取到消息关闭kafka。(例子如下)

consumer.py文件:

from kafka import KafkaProducer, KafkaConsumer
import time


class KafkaClient(object):
	topic = "topic"  # 使用的kafka的topic
	client = "0.0.0.0:19823"  # kafka所在的服务地址
	group_id = "test_consumer_group"  # kafka组信息
	
	@staticmethod
	def log(log_str):
		t = time.strftime(r"%Y-%m-%d_%H:%M:%S", time.localtime())
		print("[%s]%s" % (t, log_str))
		
	def  info_send(self, key, info_str):
		"""key: 发送信息的key;info_str:要发送的信息内容"""
		producer = KafkaProducer(bootstrap_servers=[self.client])
		producer.send(self.topic, key=key.encode("utf-8"), value=info_str.encode("utf-8"))
		# 批量提交可以使用 producer.flush()
		producer.close()
		
	def message_consumer():
		# consumer_timeout_ms:超时时间(毫秒),超过指定时间没有获取到消息关闭kafka
		consumer = KafkaConsumer(self.topic, group_id=self.group_id, bootstrap_servers=[self.client], consumer_timeout_ms=3000)
		for msg in consumer:
			# partition:消息所在的分区,offset:消息所在分区的位置,key:消息的key,value:消息的内容
			print(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.key, msg.value)


这篇关于kafka在python中的使用及结束kafka消费者的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程