NOTE: the code in shown notebook is hidden for better readability. To toggle on/off, click here.

Python Kafka Client Benchmarking

Kafka is an incredibly powerful service that can help you process huge streams of data. It is written in Scala and has been undergoing lots of changes. Historically, the JVM clients have been better supported then those in the Python ecosystem. However, this doesn't need to be case! The Kafka binary protocol has largely solidified and many people in open source community are working to provide first class support for non-JVM languages.

In this post we will benchmark the three main Python Kafka clients. pykafka, python-kafka and the newest arrival confluent-kafka-client. This will also be useful as an introduction to the different clients in the python ecosystem and their high level APIs.

Setup and Caveat

We are going to keep the setup simple and run a single broker that will auto-create topics with one partition. Please note this is a contrived setup. Any production deployment would be multi-broker and likely more partitions, but for simplicity sake we are going to use one.

The Plan

  • Spin up a single Kafka 0.9 broker

  • We are going to produce and consume 1 million 100 bytes messages with each client.

  • We will require a producer acks of 1. meaning that only the leader (we one have one broker anyways) needs to ack a message. Increasing this will ensure your data is not lost due to broker failure but will slow down production.

I am using the following versions:

pykafka 2.3.1

python-kafka 1.1.1

confluent-kafka-python 0.9.1

I ran these tests within Vagrant hosted on a MacBook Pro 2.2Ghz i7.

Caveat

Like all benchmarks, take this with a grain of salt. A single broker on a local machine is hardly a production deployment. All settings are largely left to their defaults. Also, the amount of file caching broker does really help the client consumption speed. To help combat this we will rerun each consumption test to compensate for caching recently accessed data.

Even with all the normal stipulations, I hope you find this informative.

Installing clients can be complicated by the fact of some C extensions. We are big fans of conda and maintain python3 linux builds of some of these clients here with recipes here.

The easiest way is to use conda to install the clients:

 conda create -n kafka-benchmark python=3 ipython jupyter pandas seaborn -y
 source activate kafka-benchmark
 conda install -c activisiongamescience confluent-kafka pykafka -y # will also get librdkafka
 pip install kafka-python # pure python version is easy to install

If you would like to run this notebook please find the full repo here

Included in this repo is a docker-compose file that will spin up a single kafka 0.9 broker and zookeeper instance locally. We can shell out and start it with docker compose.

!docker-compose up -d
Creating network "pythonkafkabenchmark_default" with the default driver
Creating pythonkafkabenchmark_zookeeper_1
Creating pythonkafkabenchmark_kafka_1
msg_count = 1000000
msg_size = 100
msg_payload = ('kafkatest' * 20).encode()[:msg_size]
print(msg_payload)
print(len(msg_payload))
b'kafkatestkafkatestkafkatestkafkatestkafkatestkafkatestkafkatestkafkatestkafkatestkafkatestkafkatestk'
100
bootstrap_servers = 'localhost:9092' # change if your brokers live else where
import time

producer_timings = {}
consumer_timings = {}
def calculate_thoughput(timing, n_messages=1000000, msg_size=100):
    print("Processed {0} messsages in {1:.2f} seconds".format(n_messages, timing))
    print("{0:.2f} MB/s".format((msg_size * n_messages) / timing / (1024*1024)))
    print("{0:.2f} Msgs/s".format(n_messages / timing))

pykafka

The first client we will examine is pykafka. It the client we have the most experience with and have used with success. It tries less hard to replicate the existing java client API. It also has a partition balancing code for kafka broker version 0.8.2

from pykafka import KafkaClient

def pykafka_producer_performance(use_rdkafka=False):
    
    # Setup client
    client = KafkaClient(hosts=bootstrap_servers)
    topic = client.topics[b'pykafka-test-topic']
    producer = topic.get_producer(use_rdkafka=use_rdkafka)

    msgs_produced = 0
    produce_start = time.time()
    for i in range(msg_count):
        # Start producing
        producer.produce(msg_payload)
                     
    producer.stop() # Will flush background queue
 
    return time.time() - produce_start
producer_timings['pykafka_producer'] = pykafka_producer_performance()
calculate_thoughput(producer_timings['pykafka_producer'])
Processed 1000000 messsages in 57.32 seconds
1.66 MB/s
17446.37 Msgs/s

If you are monitoring this function, you will notice that the produce loop completes and then the function stalls at the produce.stop(). This is because the producer is asynchronous and batches produce calls to Kafka. Kafka's speed comes from the ability to batch many message together. To take advantage of this, the client will keep a buffer of messages in the background and batch them. So, when you call producer.produce you are performing no external I/O. That message is queued in an in-memory buffer and the method returns immediately. So we are able to load the in-memory buffer faster then pykafka can send them to kafka. producer.stop() will block until all messages are sent.

So when producing messages make sure you allow the producer to flush the remaining messages before you exit.

Another way to ensure that the messages where produced is to check the topic offsets.

client = KafkaClient(hosts=bootstrap_servers)
topic = client.topics[b'pykafka-test-topic']
print(topic.earliest_available_offsets())
print(topic.latest_available_offsets())
{0: OffsetPartitionResponse(offset=[0], err=0)}
{0: OffsetPartitionResponse(offset=[1000000], err=0)}

Pykafka has an optional producer backend that wraps the librdkafka package. librdkafka is a pure C kafka client and holds very impressive benchmarks. Let rerun our pykafka producer test with rdkafka enabled.

producer_timings['pykafka_producer_rdkafka'] = pykafka_producer_performance(use_rdkafka=True)
calculate_thoughput(producer_timings['pykafka_producer_rdkafka'])
Processed 1000000 messsages in 15.72 seconds
6.06 MB/s
63595.38 Msgs/s
def pykafka_consumer_performance(use_rdkafka=False):
    # Setup client
    client = KafkaClient(hosts=bootstrap_servers)
    topic = client.topics[b'pykafka-test-topic']

    msg_consumed_count = 0
    
    consumer_start = time.time()
    # Consumer starts polling messages in background thread, need to start timer here
    consumer = topic.get_simple_consumer(use_rdkafka=use_rdkafka)

    while True:
        msg = consumer.consume()
        if msg:
            msg_consumed_count += 1

        if msg_consumed_count >= msg_count:
            break
                        
    consumer_timing = time.time() - consumer_start
    consumer.stop()    
    return consumer_timing
_ = pykafka_consumer_performance(use_rdkafka=False)
consumer_timings['pykafka_consumer'] = pykafka_consumer_performance(use_rdkafka=False)
calculate_thoughput(consumer_timings['pykafka_consumer'])
Processed 1000000 messsages in 29.43 seconds
3.24 MB/s
33976.94 Msgs/s
# run it once thorough to warm the cache
_ = pykafka_consumer_performance(use_rdkafka=True)
consumer_timings['pykafka_consumer_rdkafka'] = pykafka_consumer_performance(use_rdkafka=True)
calculate_thoughput(consumer_timings['pykafka_consumer_rdkafka'])
Processed 1000000 messsages in 6.09 seconds
15.67 MB/s
164311.50 Msgs/s

kafka-python

kafka-python aims to replicate the java client api exactly. This is a key difference with pykafka, which trys to maintains "pythonic" api. In earlier versions of kafka, partition balancing was left to the client. Pykafka was the only python client to implement this feature. However, with kafka 0.9 the broker provides this, so the lack of support within kafka-python is less important.

from kafka import KafkaProducer

def python_kafka_producer_performance():
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

    producer_start = time.time()
    topic = 'python-kafka-topic'
    for i in range(msg_count):
        producer.send(topic, msg_payload)
        
    producer.flush() # clear all local buffers and produce pending messages
        
    return time.time() - producer_start
producer_timings['python_kafka_producer'] = python_kafka_producer_performance()
calculate_thoughput(producer_timings['python_kafka_producer'])
Processed 1000000 messsages in 67.86 seconds
1.41 MB/s
14737.12 Msgs/s
from kafka import KafkaConsumer

def python_kafka_consumer_performance():
    topic = 'python-kafka-topic'

    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset = 'earliest', # start at earliest topic
        group_id = None # do no offest commit
    )
    msg_consumed_count = 0
            
    consumer_start = time.time()
    consumer.subscribe([topic])
    for msg in consumer:
        msg_consumed_count += 1
        
        if msg_consumed_count >= msg_count:
            break
                    
    consumer_timing = time.time() - consumer_start
    consumer.close()    
    return consumer_timing
_ = python_kafka_consumer_performance()
consumer_timings['python_kafka_consumer'] = python_kafka_consumer_performance()
calculate_thoughput(consumer_timings['python_kafka_consumer'])
Processed 1000000 messsages in 26.55 seconds
3.59 MB/s
37667.97 Msgs/s

confluent-kafka-python

With the latest release of the Confluent platform, there is a new python client on the scene. confluent-kafka-python is a python wrapper around librdkafka and is largely built by the same author. The underlying library is basis for most non-JVM clients out there. We have already mentioned it earlier when looking at pykafka.

import confluent_kafka
topic = 'confluent-kafka-topic'

def confluent_kafka_producer_performance():
    
    topic = 'confluent-kafka-topic'
    conf = {'bootstrap.servers': bootstrap_servers}
    producer = confluent_kafka.Producer(**conf)
    messages_to_retry = 0

    producer_start = time.time()
    for i in range(msg_count):
        try:
            producer.produce(topic, value=msg_payload)      
        except BufferError as e:
            messages_to_retry += 1

    # hacky retry messages that over filled the local buffer
    for i in range(messages_to_retry):
        producer.poll(0)
        try:
            producer.produce(topic, value=msg_payload)
        except BufferError as e:
            producer.poll(0)
            producer.produce(topic, value=msg_payload)

    producer.flush()
            
    return time.time() - producer_start
producer_timings['confluent_kafka_producer'] = confluent_kafka_producer_performance()
calculate_thoughput(producer_timings['confluent_kafka_producer'])
Processed 1000000 messsages in 5.45 seconds
17.50 MB/s
183456.28 Msgs/s
client = KafkaClient(hosts=bootstrap_servers)
topic = client.topics[b'confluent-kafka-topic']
print(topic.earliest_available_offsets())
print(topic.latest_available_offsets())
{0: OffsetPartitionResponse(offset=[0], err=0)}
{0: OffsetPartitionResponse(offset=[1000000], err=0)}
import confluent_kafka
import uuid

def confluent_kafka_consumer_performance():
    
    topic = 'confluent-kafka-topic'
    msg_consumed_count = 0
    conf = {'bootstrap.servers': bootstrap_servers,
            'group.id': uuid.uuid1(),
            'session.timeout.ms': 6000,
            'default.topic.config': {
                'auto.offset.reset': 'earliest'
            }
    }

    consumer = confluent_kafka.Consumer(**conf)

    consumer_start = time.time()
    # This is the same as pykafka, subscribing to a topic will start a background thread
    consumer.subscribe([topic])

    while True:
        msg = consumer.poll(1)
        if msg:
            msg_consumed_count += 1
                         
        if msg_consumed_count >= msg_count:
            break
                    
    consumer_timing = time.time() - consumer_start
    consumer.close()    
    return consumer_timing
_ = confluent_kafka_consumer_performance() # Warm cache
consumer_timings['confluent_kafka_consumer'] = confluent_kafka_consumer_performance()
calculate_thoughput(consumer_timings['confluent_kafka_consumer'])
Processed 1000000 messsages in 3.83 seconds
24.93 MB/s
261407.91 Msgs/s

The confluent_kafka client is crushingly fast. It can consume over 250K messages a second from a single broker. Note that the raw C client has been benchmarked at over 3 million messages/sec, so you see how much overhead python adds. But on the side of developer speed, you don't have to code in C!

Comparision

import pandas as pd
import seaborn as sns
%pylab inline
Populating the interactive namespace from numpy and matplotlib
consumer_df = pd.DataFrame.from_dict(consumer_timings, orient='index').rename(columns={0: 'time_in_seconds'})
producer_df = pd.DataFrame.from_dict(producer_timings, orient='index').rename(columns={0: 'time_in_seconds'})
consumer_df['MBs/s'] = (len(msg_payload) * msg_count) / consumer_df.time_in_seconds / (1024*1024)
producer_df['MBs/s'] = (len(msg_payload) * msg_count) / producer_df.time_in_seconds / (1024*1024)

consumer_df['Msgs/s'] = msg_count / consumer_df.time_in_seconds
producer_df['Msgs/s'] = msg_count / producer_df.time_in_seconds
producer_df.sort_index(inplace=True)
producer_df
time_in_seconds MBs/s Msgs/s
confluent_kafka_producer 5.450890 17.495754 183456.277455
pykafka_producer 57.318527 1.663815 17446.365994
pykafka_producer_rdkafka 15.724413 6.064928 63595.378094
python_kafka_producer 67.855882 1.405441 14737.115900
consumer_df.sort_index(inplace=True)
consumer_df
time_in_seconds MBs/s Msgs/s
confluent_kafka_consumer 3.825439 24.929801 261407.908007
pykafka_consumer 29.431728 3.240293 33976.938217
pykafka_consumer_rdkafka 6.086001 15.669966 164311.503412
python_kafka_consumer 26.547753 3.592298 37667.971237
producer_df.plot(kind='bar', subplots=True, figsize=(10, 10), title="Producer Comparison")
array([<matplotlib.axes._subplots.AxesSubplot object at 0x7f8ef4e2edd8>,
       <matplotlib.axes._subplots.AxesSubplot object at 0x7f8ef2978940>,
       <matplotlib.axes._subplots.AxesSubplot object at 0x7f8ef293c240>], dtype=object)
consumer_df.plot(kind='bar', subplots=True, figsize=(10, 10), title="Consumer Comparison")
array([<matplotlib.axes._subplots.AxesSubplot object at 0x7f8ef286c048>,
       <matplotlib.axes._subplots.AxesSubplot object at 0x7f8ef2855908>,
       <matplotlib.axes._subplots.AxesSubplot object at 0x7f8ef27a4080>], dtype=object)

Limitations

This is not a complete overview of these clients. There are many trade-offs to consider before you jump and pick one. First, is the confluent and rdkafka version of pykafka are C backed and will not work with pypy. Also, packaging and distributing python packages with C extension can be a pain. We are big fans of conda to help alleviate this pain. We have package both pykafka, confluent-kafka-client and librdkafka on our public conda channel.

Another limitation of this benchmark approach that negatively affects the performance of pykafka and confluent_kafka is the lack of other work other then blindly ripping through messages. Pykafka uses a background thread to consume messages before you ever call consume(). The vagaries of python threading and limitations of the GIL are beyond the scope of this post but keep in mind that pykafka's background will continue to poll brokers while you do work in your program. This effect is more pronounced for any work that is I/O bound.

The confluent_kafka client was released on May 25th, so while the underlying librdkafka is hardened as widely used, the python client is very fresh. Also, some of the metadata APIs are not exposed to the client. Specifically, we have found issues with the Offset API and Offset Commit/Fetch API, so getting the starting and ending offset for a partition and the current lag of a consumer group is not straight forward. Confluent seems committed to the client, so I'm sure a sane client metadata api will be added.

Conclusion

Not to devolve this post into the ravings of a petulant language zealot, but why should the JVM get all the goodies first! It is really exciting to see the python ecosystem around Kafka develop. Having first class Python support for Kafka is huge for our work here at Activision and for anyone who is interested in stream processing and streaming analytics.

!docker-compose down # Cleanup
Stopping pythonkafkabenchmark_kafka_1 ... 
Stopping pythonkafkabenchmark_zookeeper_1 ... 
Removing pythonkafkabenchmark_kafka_1 ... 
Removing pythonkafkabenchmark_zookeeper_1 ... 
Removing network pythonkafkabenchmark_default