Kafka生产者源码初识
2021/6/27 14:17:15
本文主要是介绍Kafka生产者源码初识,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Kafka生产者整体架构
- 整个生产者客户端主要有两个线程:主线程和Sender线程
- RecordAccumulator用来缓存消息,如果创建消息的速度过快,超过sender发给Kafka服务器的速度,会导致缓存空间不足
实现上述架构图的源码
- 在初始化生产者时,会初始化一个Sender线程并启动(下截图为KafkaProducer构造方法)
- 在发送前先经过拦截器过滤
- 序列化消息、选择分区以及添加消息累加器
3.1 在选择分区时,需要注意分区信息是通过元数据中获取(下面为详细分析)
分区消息源码分析
- 根据主题topic查询对应的分区,初始化时没有对应的分区,需要sender线程获取到后才会进行填充
- 没有获取到分区信息,需要等待更新
2.1 通过上图发现,需要等待updateVersion发生变化后才会结束等待,否则直到超时
Sender获取元数据
- KafkaProducer通过Sender进行相应的IO操作,而Sender又调用NetworkClient进行IO操作,NetworkClient底层是对Java NIO进行相应的封装
- Sender的整体流程
2.1 连接
(1)通过SocketChannel连接节点
(2)将soecketChannel向Selector注册连接事件SelectionKey.OP_CONNECT
(3)为连接事件Key附属信息KafkaChannel
2.2 循环Selector上的可读的key
2.3 读取Selector上的可读的key
(1)将数据从socketChannel读到ByteBuffer中
(2)解析响应ByteBuffe - 处理元数据响应:NetworkClient#handleCompletedMetadataResponse
3.1 更新版本号:this.updateVersion += 1;
3.2 将响应的主题的分区信息放在Cluster属性中
Sender的整体流程伪代码
连接
SocketChannel socketChannel = SocketChannel.open(); 1. 配置SocketChannel socketChannel.configureBlocking(false); Socket socket = socketChannel.socket(); socket.setKeepAlive(true); if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setSendBufferSize(sendBufferSize); if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setReceiveBufferSize(receiveBufferSize); socket.setTcpNoDelay(true); 2. 连接node channel.connect(address); 3. 注册事件 SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);//构造自定义附属信息 key.attach(channel);//附属key信息
轮询可读Key
int numReadyKeys = this.nioSelector.select(timeoutMs); if (numReadyKeys > 0){ Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys(); for (SelectionKey key:readyKeys){ KafkaChannel channel = (KafkaChannel) key.attachment(); if (channel.ready()){ while ((networkReceive = channel.read()) != null) { addToStagedReceives(channel, networkReceive); } } } }
这篇关于Kafka生产者源码初识的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-08「布道师系列文章」解析 AutoMQ 对象存储中的文件存储格式
- 2024-05-08「布道师系列文章」小红书黄章衡:AutoMQ Serverless 基石-秒级分区迁移
- 2024-05-08AutoMQ 系统测试体系揭秘
- 2024-03-14AutoMQ 携手阿里云共同发布新一代云原生 Kafka,帮助得物有效压缩 85% Kafka 云支出!
- 2024-02-22kafka partitioner
- 2024-01-24AutoMQ生态集成 - 将数据从 AutoMQ Kafka 导入 RisingWave 数据库
- 2024-01-13消息队列面试题:为什么要使用消息队列?
- 2024-01-08"基于 XHAMQ 的消息队列系统实现"
- 2023-11-24全网最全图解Kafka适用场景
- 2023-09-19RabbitMQ 消息应答