当前位置 博文首页 > Shockang的博客:Kafka 的 Java 生产者如何管理 TCP 连接?

    Shockang的博客:Kafka 的 Java 生产者如何管理 TCP 连接?

    作者:[db:作者] 时间:2021-08-13 09:52

    前言

    本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

    本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

    正文

    Kafka 的 Producer 程序

    可以参考我的这篇博客——Kafka 的客户端代码怎么写?

    何时创建 TCP 连接?

    创建 KafkaProducer 实例

    KafkaProducer 实例创建时启动 Sender 线程,从而创建与 bootstrap.servers 中所有 Broker 的 TCP 连接。

    更新元数据后

    当 Producer 更新了集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。

    Producer 更新集群元数据信息的两个场景

    场景一

    当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。

    此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。

    场景二

    Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。

    该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。

    消息发送时

    当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,也会创建一个。

    何时关闭 TCP 连接?

    主动关闭

    这里的主动关闭实际上是广义的主动关闭,甚至包括用户调用 kill -9 主动“杀掉”Producer 应用。当然最推荐的方式还是调用 producer.close() 方法来关闭。

    自动关闭

    Kafka 自动帮你关闭,这与 Producer 端参数 connections.max.idle.ms 的值有关。

    默认情况下该参数值是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。

    用户可以在 Producer 端设置 connections.max.idle.ms=-1 禁掉这种机制。

    一旦被设置成 -1,TCP 连接将成为永久长连接。

    当然这只是软件层面的“长连接”机制,由于 Kafka 创建的这些 Socket 连接都开启了 keepalive,因此 keepalive 探活机制还是会遵守的。

    TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景。
    被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。

    cs