当前位置 博文首页 > Kafka 入门(四)-- Python Kafka Client 性能测试

    Kafka 入门(四)-- Python Kafka Client 性能测试

    作者:TM0831 时间:2021-01-16 14:06

    一、前言

      由于工作原因使用到了 Kafka,而现有的代码并不能满足性能需求,所以需要开发高效读写 Kafka 的工具,本文是一个 Python Kafka Client 的性能测试记录,通过本次测试,可以知道选用什么第三方库的性能最高,选用什么编程模型开发出来的工具效率最高。

     

    二、第三方库性能测试

    1.第三方库

      此次测试的是三个主要的 Python Kafka Client:pykafka、kafka-python 和 confluent-kafka,具体介绍见官网:

    • pykafka:https://pypi.org/project/pykafka/
    • kafka-python:https://pypi.org/project/kafka-python/
    • confluent_kafka:https://pypi.org/project/confluent-kafka/

    2.测试环境

           此次测试使用的 Python 版本是2.7,第三方库的版本为:

    • pykafka:2.8.0
    • kafka-python:2.0.2
    • confluent-kafka:1.5.0

           使用的数据总量有50万,每条数据大小为2KB,总共为966MB。

    3.测试过程

    (1)Kafka Producer 测试

      分别使用 pykafka、kafka-python 和 confluent-kafka 实例化一个 Kafka 的 Producer 对象,然后调用相应的 produce 方法将数据推送给 Kafka,数据总条数为50万,比较三个库所耗费的时间,并计算每秒钟可以推送的数据条数和大小,比较得出性能最优的。

      代码示例(以 pykafka 为例):

     1 import sys
     2 from datetime import datetime
     3 from pykafka import KafkaClient
     4 
     5 
     6 class KafkaProducerTool():
     7     def __init__(self, broker, topic):
     8         client = KafkaClient(hosts=broker)
     9         self.topic = client.topics[topic]
    10         self.producer = self.topic.get_producer()
    11 
    12     def send_msg(self, msg):
    13         self.producer.produce(msg)
    14 
    15 
    16 if __name__ == '__main__':
    17     producer = KafkaProducerTool(broker, topic)
    18     print(datetime.now())
    19     for line in sys.stdin:
    20         producer.send_msg(line.strip())
    21     producer.producer.stop()
    22     print(datetime.now())

    (2)Kafka Consumer 测试

      分别使用 pykafka、kafka-python 和 confluent-kafka 实例化一个 Kafka 的 Consumer 对象,然后调用相应的 consume 方法从 Kafka 中消费数据,要消费下来的数据总条数为50万,比较三个库所耗费的时间,并计算每秒钟可以消费的数据条数和大小,比较得出性能最优的。

      代码示例(以 pykafka 为例):

     1 from datetime import datetime
     2 from pykafka import KafkaClient
     3 
     4 
     5 class KafkaConsumerTool():
     6     def __init__(self, broker, topic):
     7         client = KafkaClient(hosts=broker)
     8         self.topic = client.topics[topic]
     9         self.consumer = self.topic.get_simple_consumer()
    10 
    11     def receive_msg(self):
    12         count = 0
    13         print(datetime.now())
    14         while True:
    15             msg = self.consumer.consume()
    16             if msg:
    17                 count += 1
    18             if count == 500000:
    19                 print(datetime.now())
    20                 return
    21 
    22 
    23 if __name__ == '__main__':
    24     consumer = KafkaConsumerTool(broker, topic)
    25     consumer.receive_msg()
    26     consumer.consumer.stop()

    4.测试结果

    • Kafka Producer 测试结果:
      总耗时/秒 每秒数据量/MB 每秒数据条数
    confluent_kafka 35 27.90 14285.71
    pykafka 50 19.53 10000
    kafka-python 532 1.83 939.85
    • Kafka Consumer 测试结果:
      总耗时/秒 每秒数据量/MB 每秒数据条数
    confluent_kafka 39 25.04 12820.51
    kafka-python 52 18.78 9615.38
    pykafka 335 2.92 1492.54

    5.测试结论

      经过测试,在此次测试的三个库中,生产消息的效率排名是:confluent-kafka > pykafka > kafka-python,消费消息的效率排名是:confluent-kafka > kafka-python > pykafka,由此可见 confluent-kafka 的性能是其中最优的,因而选用这个库进行后续开发。

     

    三、多线程模型性能测试

    1.编程模型

      经过前面的测试已经知道 confluent-kafka 这个库的性能是很优秀的了,但如果还需要更高的效率,应该怎么办呢?当单线程(或者单进程)不能满足需求时,我们很容易想到使用多线程(或者多进程)来增加并发提高效率,考虑到线程的资源消耗比进程少,所以打算选用多线程来进行开发。那么多线程消费 Kafka 有什么实现方式呢?我想到的有两种:

    1. 一个线程实现一个 Kafka Consumer,最多可以有 n 个线程同时消费 Topic(其中 n 是该 Topic 下的分区数量);
    2. 多个线程共用一个 Kafka Consumer,此时也可以实例化多个 Consumer 同时消费。

        

      对比这两种多线程模型:

    • 模型1实现方便,可以保证每个分区有序消费,但 Partition 数量会限制消费能力;
    • 模型2并发度高,可扩展能力强,消费能力不受 Partition 限制。

     2.测试过程

    (1)多线程模型1

      测试代码:

     1 import time
     2 from threading import Thread
     3 from datetime import datetime
     4 from confluent_kafka import Consumer
     5 
     6 
     7 class ChildThread(Thread):
     8     def __init__(self, name, broker, topic):
     9         Thread.__init__(self, name=name)
    10         self.con = KafkaConsumerTool(broker, topic)
    11 
    12     def run(self):
    13         self.con.receive_msg()
    14 
    15 
    16 class KafkaConsumerTool:
    17     def __init__(self, broker, topic):
    18         config = {
    19             'bootstrap.servers': broker,
    20             'session.timeout.ms': 30000,
    21             'auto.offset.reset': 'earliest',
    22             'api.version.request': False,
    23             'broker.version.fallback': '2.6.0',
    24             'group.id': 'test'
    25         }
    26         self.consumer = Consumer(config)
    27         self.topic = topic
    28 
    29     def receive_msg(self):
    30         self.consumer.subscribe([self.topic])
    31         print(datetime.now())
    32         while True:
    33             msg = self.consumer.poll(timeout=30.0)
    34             print(msg)
    35 
    36 
    37 if __name__ == '__main__':
    38     thread_num = 10
    39     threads = [ChildThread("thread_" + str(i + 1), broker, topic) for i in range(thread_num)]
    40 
    41     for i in range(thread_num):
    42         threads[i].setDaemon(True)
    43     for i in range(thread_num):
    44         threads[i].start()

      因为我使用的 Topic 共有8个分区,所以我分别测试了线程数在5个、8个和10个时消费50万数据所需要的时间,并计算每秒可消费的数据条数。

    (2)多线程模型2

      测试代码:

     1 import time
     2 from datetime import datetime
     3 from confluent_kafka import Consumer
     4 from threadpool import ThreadPool, makeRequests
     5 
     6 
     7 class KafkaConsumerTool:
     8     def __init__(self, broker, topic):
     9         config = {
    10             'bootstrap.servers': broker,
    11             'session.timeout.ms': 30000,
    12             'auto.offset.reset': 'earliest',
    13             'api.version.request': False,
    14             'broker.version.fallback': '2.6.0',
    15             'group.id': 'mini-spider'
    16         }
    17         self.consumer = Consumer(config)
    18         self.topic = topic
    19 
    20     def receive_msg(self, x):
    21         self.consumer.subscribe([self.topic])
    22         print(datetime.now())
    23         while True:
    24             msg = self.consumer.poll(timeout=30.0)
    25             print(msg)
    26 
    27 
    28 if __name__ == '__main__':
    29     thread_num = 10
    30     consumer = KafkaConsumerTool(broker, topic)
    31     pool = ThreadPool(thread_num)
    32     for r in makeRequests(consumer.receive_msg, [i for i in range(thread_num)]):
    33         pool.putRequest(r)
    34     pool.wait()

      主要使用 threadpool 这个第三方库来实现线程池,此处当然也可以使用其他库来实现,这里我分别测试了线程数量在5个和10个时消费50万数据所需要的时间,并计算每秒可消费的数据条数。

    3.测试结果

    • 多线程模型1
     总数据量/万 线程数量 总耗时/秒 每秒数据条数
    50 5 27 18518.51
    50 8 24 20833.33
    50 10 26 19230.76
    • 多线程模型2
      总数据量/万 线程数量 总耗时/秒 每秒数据条数
    50 5 17 29411.76
    50 10 13 38461.53

    4.测试结论

      使用多线程可以有效提高 Kafka 的 Consumer 消费数据的效率,而选用线程池共用一个 KafkaConsumer 的消费方式的消费效率更高。

     

    上一篇:Objective-C Runtime
    下一篇:没有了