当前位置 博文首页 > Shockang的博客:Kafka 的 Java 消费者如何管理 TCP 连接?
本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系
可以参考我的这篇博客——Kafka 的客户端代码怎么写?
TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的。再细粒度地说,在 poll 方法内部有 3 个时机可以创建 TCP 连接。
消费者端协调者(Coordinator)驻留在 Broker 端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理。
当消费者程序首次启动调用 poll 方法时,它需要向 Kafka 集群发送一个名为 FindCoordinator 的请求,希望 Kafka 集群告诉它哪个 Broker 是管理它的协调者。
在这一步,消费者会创建一个 Socket 连接。
Broker 处理完上一步发送的 FindCoordinator 请求之后,会返还对应的响应结果(Response),显式地告诉消费者哪个 Broker 是真正的协调者,因此在这一步,消费者知晓了真正的协调者后,会创建连向该 Broker 的 Socket 连接。
只有成功连入协调者,协调者才能开启正常的组协调操作,比如加入组、等待组分配方案、心跳请求处理、位移获取、位移提交等。
消费者会为每个要消费的分区创建与该分区领导者副本所在 Broker 连接的 TCP。
举个例子,假设消费者要消费 5 个分区的数据,这 5 个分区各自的领导者副本分布在 4 台 Broker 上,那么该消费者在消费时会创建与这 4 台 Broker 的 Socket 连接。
和生产者类似,消费者关闭 Socket 也分为主动关闭和 Kafka 自动关闭。
主动关闭是指你显式地调用消费者 API 的方法去关闭消费者,具体方式就是手动调用 KafkaConsumer.close() 方法,或者是执行 Kill 命令,不论是 Kill -2 还是 Kill -9;
而 Kafka 自动关闭是由消费者端参数 connection.max.idle.ms 控制的,该参数现在的默认值是 9 分钟,即如果某个 Socket 连接上连续 9 分钟都没有任何请求“过境”的话,那么消费者会强行“杀掉”这个 Socket 连接。
不过,和生产者有些不同的是,如果在编写消费者程序时,你使用了循环的方式来调用 poll 方法消费消息,那么上面提到的所有请求都会被定期发送到 Broker,因此这些 Socket 连接上总是能保证有请求在发送,从而也就实现了“长连接”的效果。
针对上面提到的三类 TCP 连接,你需要注意的是,当第三类 TCP(消费数据时) 连接成功创建后,消费者程序就会废弃第一类 TCP(发起 FindCoordinator 请求时) 连接,之后在定期请求元数据时,它会改为使用第三类 TCP 连接。
也就是说,最终你会发现,第一类 TCP 连接会在后台被默默地关闭掉。对一个运行了一段时间的消费者程序来说,只会有后面两类 TCP 连接存在。
在实际场景中,很多人将 connection.max.idle.ms 设置成 -1,即禁用定时关闭的案例,如果是这样的话,这些 TCP 连接将不会被定期清除,只会成为永久的“僵尸”连接。
cs