原来kafka也有事务啊,再也不担心消息不一致了
2023/6/6 11:22:06
本文主要是介绍原来kafka也有事务啊,再也不担心消息不一致了,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
前言
现在假定这么一个业务场景,从kafka
中的topic
获取消息数据,经过一定加工处理后,发送到另外一个topic
中,要求整个过程消息不能丢失,也不能重复发送,即实现端到端的Exactly-Once
精确一次消息投递。这该如何实现呢?
kafka事务介绍
针对上面的业务场景,kafka已经替我们想到了,在kafka 0.11版本以后,引入了一个重大的特性:幂等性和事务。
幂等性
这里提到幂等性的原因,主要是因为事务的启用必须要先开启幂等性,那么什么是幂等性呢?
幂等性是指生产者无论向kafka broker
发送多少次重复的数据,broker
端只会持久化一条,保证数据不会重复。
幂等性通过生产者配置项enable.idempotence=true
开启,默认情况下为true。
幂等性实现原理
- 每条消息都有一个主键,这个主键由
<PID, Partition, SeqNumber>
组成。
-
PID
:ProducerID
,每个生产者启动时,Kafka 都会给它分配一个ID
,ProducerID
是生产者的唯一标识,需要注意的是,Kafka
重启也会重新分配PID
。 -
Partition
:消息需要发往的分区号。 -
SeqNumber
:生产者,他会记录自己所发送的消息,给他们分配一个自增的ID
,这个ID
就是SeqNumber
,是该消息的唯一标识,每发送一条消息,序列号加 1。
- 对于主键相同的数据,kafka 是不会重复持久化的,它只会接收一条。
幂等性缺点
根据幂等性的原理,我们发现它存在下面的缺点:
- 只能保证单分区、单会话内的数据不重复
- kafka 挂掉,重新给生产者分配了
PID
,还是有可能产生重复的数据
那么如何实现跨分区、kafka broker重启也能保证不重复呢?这就要使用事务了。
事务
所谓事务,就是要求保证原子性,要么全部成功,要么全部失败。那么具体该如何开启呢?
-
kafka
要想开启事务必须要启用幂等性,即生产者配置enable.idempotence=true
-
kafka
生产者需要配置唯一的事务idtransactional.id
, 最好为其设置一个有意义的名字。 -
kafka
消费端也有一个配置项isolation.level
和事务有很大关系。
-
read_uncommitted
:默认值,消费端应用可以看到(消费到)未提交的事务,当然对于已提交的事务也是可见的。 -
read_committed
:消费端应用只能消费到提交的事务内的消息。
kafka事务 API
现在我们用java的api来实现一下前面这个“消费-处理-生产“的例子吧。
- 引入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency>
- 创建事务的生产者
Properties prodcuerProps = new Properties(); // kafka地址 prodcuerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); // key序列化 prodcuerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化 prodcuerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 启用幂等性 producerProps.put("enable.idempotence", "true"); // 设置事务id producerProps.put("transactional.id", "prod-1"); KafkaProducer<String, String> producer = new KafkaProducer(prodcuerProps);
-
enable.idempotence
配置项目为true - 设置
transactional.id
- 创建事务的消费者
Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "localhost:9092"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put("group.id", "my-group-id"); // 设置consumer手动提交 consumerProps.put("enable.auto.commit", "false"); // 设置隔离级别,读取事务已提交的消息 consumerProps.put("isolation.level", "read_committed"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps); //订阅主题 consumer.subscribe(Collections.singletonList("topic1"));
-
enable.auto.commit=false
,设置手动提交消费者offset
- 设置
isolation.level=read_committed
,消费事务已提交的消息
- 核心逻辑
// 初始化事务 producer.initTransactions(); while(true) { // 拉取消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L)); if(!records.isEmpty()){ // 准备一个 hashmap 来记录:"分区-消费位移" 键值对 HashMap<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>(); // 开启事务 producer.beginTransaction(); try { // 获取本批消息中所有的分区 Set<TopicPartition> partitions = records.partitions(); // 遍历每个分区 for (TopicPartition partition : partitions) { // 获取该分区的消息 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); // 遍历每条消息 for (ConsumerRecord<String, String> record : partitionRecords) { // 执行数据的业务处理逻辑 ProducerRecord<String, String> outRecord = new ProducerRecord<>("topic2", record.key(), record.value().toUpperCase()); // 将处理结果写入 kafka producer.send(outRecord); } // 将处理完的本分区对应的消费位移记录到 hashmap 中 long offset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 事务提交的是即将到来的偏移量,这意味着我们需要加 1 offsetsMap.put(partition,new OffsetAndMetadata(offset+1)); } // 向事务管理器提交消费位移 producer.sendOffsetsToTransaction(offsetsMap,"groupid"); // 提交事务 producer.commitTransaction(); } catch(Exeception e) { e.printStackTrace(); // 终止事务 producer.abortTransaction(); } } }
-
initTransactions()
: 初始化事务 -
beginTransaction()
: 开启事务 -
sendOffsetsToTransaction()
: 在事务内提交已经消费的偏移量(主要用于消费者) -
commitTransaction()
: 提交事务 -
abortTransaction()
: 放弃事务
kafka事务实现原理
kafka事务的实现引入了事务协调器,如下图所示:
- 生产者使用事务必须配置事务id, kafka根据事务id计算分配事务协调器
- 事务协调器返回pid,前面的幂等性中需要
- 开始发送消息到topic中,不过这些消息与普通的消息不同,它们带着一个字段标识自己是事务消息
- 当生产者事务内的消息发送完毕,会向事务协调器发送
commit
或abort
请求,等待 kafka 响应 - 事务协调器收到请求后先持久化到内置事务主题
__transaction_state
中,__transaction_state
默认有50个分区,每个分区负责一部分事务。事务划分是根据transactional.id
的hashcode
值%50
,计算出该事务属于哪个分区。 该分区Leader
副本所在的broker节点即为这个transactional.id
对应的Transaction Coordinator
节点,这也是上面第一步中的计算逻辑。 - 事务协调器后台会跟topic通信,告诉它们事务是成功还是失败的。
- 如果是成功,topic会汇报自己已经收到消息,协调者收到主题的回应便确认了事务完成,并持久化这一结果。
- 如果是失败的,主题会把这个事务内的消息丢弃,并汇报给协调者,协调者收到所有结果后再持久化这一信息,事务结束。
- 持久化第6步中的事务成功或者失败的信息, 如果
kafka broker
配置max.transaction.timeout.ms
之前既不提交也不中止事务,kafka broker
将中止事务本身。 此属性的默认值为 15 分钟。
总结
本文讲解了通过kafka事务可以实现端到端的精确一次的消息语义,通过事务机制,KAFKA 实现了对多个 topic
的多个 partition
的原子性的写入,通过一个例子了解了一下如何使用事物。同时也简单介绍了事务实现的原理,它底层必须要依赖kafka的幂等性机制,同时通过类似“二段提交”的方式保证事务的原子性。
欢迎关注个人公众号【JAVA旭阳】交流学习!
这篇关于原来kafka也有事务啊,再也不担心消息不一致了的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-03-14AutoMQ 携手阿里云共同发布新一代云原生 Kafka,帮助得物有效压缩 85% Kafka 云支出!
- 2024-02-22kafka partitioner
- 2024-01-24AutoMQ生态集成 - 将数据从 AutoMQ Kafka 导入 RisingWave 数据库
- 2024-01-13消息队列面试题:为什么要使用消息队列?
- 2024-01-08"基于 XHAMQ 的消息队列系统实现"
- 2023-11-24全网最全图解Kafka适用场景
- 2023-09-19RabbitMQ 消息应答
- 2023-09-18Flink Sink Kafka 和 Flink 端到端一致性建议
- 2023-07-18【RabbitMQ】当队列中消息数量超过最大长度的淘汰策略
- 2023-07-13【后端面经-架构】RabbitMQ简介