当前位置 主页 > 网站技术 > 代码类 >

    kafka-python 获取topic lag值方式

    栏目:代码类 时间:2019-12-23 12:08

    说真,这个问题看上去很简单,但“得益”与kafka-python神奇的文档,真的不算简单,反正我是搜了半天还看了半天源码。

    直接上代码吧

    from kafka import SimpleClient, KafkaConsumer
    from kafka.common import OffsetRequestPayload, TopicPartition
    
    def get_topic_offset(brokers, topic):
      """
      获取一个topic的offset值的和
      """
      client = SimpleClient(brokers)
      partitions = client.topic_partitions[topic]
      offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
      offsets_responses = client.send_offset_request(offset_requests)
      return sum([r.offsets[0] for r in offsets_responses])
    
    
    def get_group_offset(brokers, group_id, topic):
      """
      获取一个topic特定group已经消费的offset值的和
      """
      consumer = KafkaConsumer(bootstrap_servers=brokers,
                   group_id=group_id,
                   )
      pts = [TopicPartition(topic=topic, partition=i) for i in
          consumer.partitions_for_topic(topic)]
      result = consumer._coordinator.fetch_committed_offsets(pts)
      return sum([r.offset for r in result.values()])
    
    
    if __name__ == '__main__':
      topic_offset = get_topic_offset("brokers", "topic")
      group_offset = get_group_offset("brokers", "group_id", "topic")
      lag = topic_offset - group_offset
    

    以上这篇kafka-python 获取topic lag值方式就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持IIS7站长之家。