当前位置 博文首页 > Shockang的博客:Kafka 的 Java 消费者如何管理 TCP 连接?

    Shockang的博客:Kafka 的 Java 消费者如何管理 TCP 连接?

    作者:[db:作者] 时间:2021-08-13 09:52

    前言

    本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

    本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

    正文

    Kafka 的 Consumer 程序

    可以参考我的这篇博客——Kafka 的客户端代码怎么写?

    何时创建 TCP 连接?

    TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的。再细粒度地说,在 poll 方法内部有 3 个时机可以创建 TCP 连接。

    1.发起 FindCoordinator 请求时

    消费者端协调者(Coordinator)驻留在 Broker 端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理。

    当消费者程序首次启动调用 poll 方法时,它需要向 Kafka 集群发送一个名为 FindCoordinator 的请求,希望 Kafka 集群告诉它哪个 Broker 是管理它的协调者。

    在这一步,消费者会创建一个 Socket 连接。

    2.连接协调者时

    Broker 处理完上一步发送的 FindCoordinator 请求之后,会返还对应的响应结果(Response),显式地告诉消费者哪个 Broker 是真正的协调者,因此在这一步,消费者知晓了真正的协调者后,会创建连向该 Broker 的 Socket 连接。

    只有成功连入协调者,协调者才能开启正常的组协调操作,比如加入组、等待组分配方案、心跳请求处理、位移获取、位移提交等。

    3.消费数据时

    消费者会为每个要消费的分区创建与该分区领导者副本所在 Broker 连接的 TCP。

    举个例子,假设消费者要消费 5 个分区的数据,这 5 个分区各自的领导者副本分布在 4 台 Broker 上,那么该消费者在消费时会创建与这 4 台 Broker 的 Socket 连接。

    何时关闭 TCP 连接?

    和生产者类似,消费者关闭 Socket 也分为主动关闭和 Kafka 自动关闭。

    主动关闭是指你显式地调用消费者 API 的方法去关闭消费者,具体方式就是手动调用 KafkaConsumer.close() 方法,或者是执行 Kill 命令,不论是 Kill -2 还是 Kill -9;

    而 Kafka 自动关闭是由消费者端参数 connection.max.idle.ms 控制的,该参数现在的默认值是 9 分钟,即如果某个 Socket 连接上连续 9 分钟都没有任何请求“过境”的话,那么消费者会强行“杀掉”这个 Socket 连接。

    不过,和生产者有些不同的是,如果在编写消费者程序时,你使用了循环的方式来调用 poll 方法消费消息,那么上面提到的所有请求都会被定期发送到 Broker,因此这些 Socket 连接上总是能保证有请求在发送,从而也就实现了“长连接”的效果。

    针对上面提到的三类 TCP 连接,你需要注意的是,当第三类 TCP(消费数据时) 连接成功创建后,消费者程序就会废弃第一类 TCP(发起 FindCoordinator 请求时) 连接,之后在定期请求元数据时,它会改为使用第三类 TCP 连接。

    也就是说,最终你会发现,第一类 TCP 连接会在后台被默默地关闭掉。对一个运行了一段时间的消费者程序来说,只会有后面两类 TCP 连接存在。

    “僵尸”连接

    在实际场景中,很多人将 connection.max.idle.ms 设置成 -1,即禁用定时关闭的案例,如果是这样的话,这些 TCP 连接将不会被定期清除,只会成为永久的“僵尸”连接。

    cs