新增分区对 producer 的影响 - 源码解析
/ 8 min read
📑 目录
概述
题图为 8月14号晚上拍摄的英仙座流星
在分区扩容时,比较担心扩容对 client 的影响, 本节主要分析下对 producer 的影响
这里先说下小结
- producer 与 server 的 TCP 连接是在创建 KafkaProducer 实例时建立的
- producer 内部会定期向 broker 发送 metadata 请求, 更新主题分区的关系, 时长metadata.max.age.ms 控制
- 当 Producer 尝试给一个不存在的主题发送消息时, broker 会告诉 producer 主题不存在, producer 会发送 metadata 请求, 尝试获取最新的元数据信息
而更新了元数据后, 相当于 producer 会根据你的 key -> partition 的规则, 分发数据到不同分区
所以默认情况下, 相同 key 的数据可能由于分区扩容导致 前后数据处于不同分区
所以建议**在建立 topic 时就初始化足够多的分区 ,**避免之后分区扩展的麻烦
源码分析 (基于 kafka-clients-2.4.1)
producer 获取分区信息跟 metadata 请求有着密切的联系, 有兴趣可以自己调试下源码, 下面的分析也可以不用看
先看下 metadata 请求/响应的主要调用栈, 方便你们自己查看调试
- 发送 metadata 请求
- 主要方法是NetworkClients.sendInternalMetadataRequest
- 收到 metadata 响应
- 主要方法是ProducerMetadata.updata
Producer 发送数据时, 什么时候去拿分区信息的?
Producer 获取 分区信息, 我这边分为 两部分来讲 , 一个是主线程, 一个是 producer 子线程
这两个线程主要通过 ProducerMetadata 来共享数据, 并通过 synchronized 来做同步 (通知)
主线程部分
-
主线程调用 KafkaProducer 构造函数时会创建一个
kafka-producer-network-thread | producer-1的线程, 请注意和 main 主线程做区分
-
当主线程第一次调用 producer.send 后, 会等待版本初始化更新
- 主要调用了KafkaProducer.waitOnMetadata方法
-
接着调用了ProducerMetadata.awaitUpdate方法
- 注意这里的 SystemTime 将 this (producerMetadata 对象本身) 传了进去, 作为同步块的同步对象
- 然后开始 wait
- 这个伏笔 this 会在之后, ProducerMetadata 更新时被唤醒
-
继续看到 time.waitObject , 这个方法其实是个死循环, 知道条件满足, 才会返回
- 其实就是等待, 直到元数据被拉取下来
producer 线程部分
-
与此同时,在 ProducerClient的构造函数中, 创建并启动 producer 线程, 然后会循环调用 poll 方法
@Overridepublic List<ClientResponse> poll(long timeout, long now) {// 省略了一些状态判断的代码long metadataTimeout = metadataUpdater.maybeUpdate(now);try {this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));} catch (IOException e) {log.error("Unexpected error during I/O", e);}long updatedNow = this.time.milliseconds();List<ClientResponse> responses = new ArrayList<>();handleCompletedSends(responses, updatedNow);handleCompletedReceives(responses, updatedNow);handleDisconnections(responses, updatedNow);handleConnections();handleInitiateApiVersionRequests(updatedNow);handleTimedOutRequests(responses, updatedNow);completeResponses(responses);return responses;} -
producer 首先需要连接到 kafka 服务器
private void initiateConnect(Node node, long now) {String nodeConnectionId = node.idString();try {connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);InetAddress address = connectionStates.currentAddress(nodeConnectionId);log.debug("Initiating connection to node {} using address {}", node, address);selector.connect(nodeConnectionId,new InetSocketAddress(address, node.port()),this.socketSendBuffer,this.socketReceiveBuffer);} catch (IOException e) {log.warn("Error connecting to node {}", node, e);// Attempt failed, we'll try again after the backoffconnectionStates.disconnected(nodeConnectionId, now);// Notify metadata updater of the connection failuremetadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());}} -
在连接到 kafka 服务器后, handleConnections 将 node 加进 nodesNeedingApiVersionsFetch 一个 map 之中
- 这里的3,4,5步其实和本节探讨的问题没有关联, 直接跳到第6步看就行
private void handleConnections() {for (String node : this.selector.connected()) {// We are now connected. Note that we might not still be able to send requests. For instance,// if SSL is enabled, the SSL handshake happens after the connection is established.// Therefore, it is still necessary to check isChannelReady before attempting to send on this// connection.if (discoverBrokerVersions) {this.connectionStates.checkingApiVersions(node);nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());log.debug("Completed connection to node {}. Fetching API versions.", node);} else {this.connectionStates.ready(node);log.debug("Completed connection to node {}. Ready.", node);}}} -
在poll循环过程中, handleInitiateApiVersionRequests 会向服务器发送 ApiVersionRequest
-
handleInitiateApiVersionRequests 会取出 nodesNeedingApiVersionsFetch 中的拉取请求, 并发送正式的客户端请求给 服务器
private void handleInitiateApiVersionRequests(long now) {Iterator<Map.Entry<String, ApiVersionsRequest.Builder>> iter = nodesNeedingApiVersionsFetch.entrySet().iterator();while (iter.hasNext()) {Map.Entry<String, ApiVersionsRequest.Builder> entry = iter.next();String node = entry.getKey();if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {log.debug("Initiating API versions fetch from node {}.", node);ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue();ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true);doSend(clientRequest, true, now);iter.remove();}}} -
当连接上服务器后, maybeUpdata 方法中会发送内部元数据请求
-
收到元数据请求的响应时, 更新元数据 Metadata.update
-
ProducerMetadata.notifyAll 唤醒条件不满足的线程, 于是主线程, 继续执行, 拿到了 metadata
- 这里其实就是一个典型的 MESA 管程应用
- 另外, 这里要写成 notifyAll() , 否则多个线程 同时使用 一个 producer 发送数据时, 可能会导致有一些线程拿不到锁, 饿死…
如何确定消息发给的分区
- Producer.partition 有以下代码
- 我们可以通过实现 Partitioner接口, 自定义选择分区
- 但我想如果自己实现一个 key -> partition 永久绑定, 得有一个存储中心来记录 key -> partition 的关系, 这样实现的话, 好像比较麻烦, 对性能/内存要求也很高, 每次一个新的 key 传进来, 都要查询下存储; 并且如果出现不同 producer 对分区扩容感应的时间不同, 可能出现 key -> partition 冲突的情况, 不过仔细一想这点在实现上可以避免
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);}小结
从上面的源码分析来看 , 分区增加对于 producer 的影响可能就是会将相同 key 的消息发送到不同的分区, 可能导致一些数据处理上的问题
所以建议对可能会增加吞吐量的主题进行过度分区(在建立 topic 时就初始化足够多的分区),防止在之后进行分区扩容时, 数据被分发到不同分区上。
其他参考
-
kafka New topic or Increase partition count
-
开启 org.apache.kafka.clients.Metadata 的 debug 日志
# 我这里用的是 log4j2.yaml# 下面是部分配置, 完整请查看 todoLoggers:logger:-name: org.apache.kafka.clients.Metadatalevel: debugAppenderRef:ref: STDOUT- 新增 分区时 producer 客户端 metadata 类的日志内容示例