pyrkafka is a fast, lightweight Python client for Apache Kafka, built on top of librdkafka via the Rust rdkafka crate. It provides a simple, Pythonic API for producing and consuming Kafka messages with native Rust performance through PyO3.
- Rust-powered performance — wraps librdkafka through Rust, avoiding the overhead of pure-Python implementations
- GIL release during polling — consumer polling releases the Python GIL, allowing other threads to run while waiting for messages
- Simple Pythonic API — producer and consumer with familiar Python patterns (
forandasync forfor consuming) - Full librdkafka configuration — pass any librdkafka config option via a dict
- SSL and compression built-in — compiled with SSL (vendored OpenSSL) and zstd compression support
- Cross-platform wheels — pre-built for Linux (x86_64, aarch64), macOS (x86_64, Apple Silicon), and Windows
pip install pyrkafkaRequires Python 3.13+. Pre-built wheels are available for all major platforms — no Rust toolchain needed for installation.
from pyrkafka import PyrKafkaProducer
producer = PyrKafkaProducer("localhost:9092")
# Send a message (partitioned round-robin)
producer.produce("my_topic", b"Hello, Kafka!")
# Send with a key (messages with the same key go to the same partition)
producer.produce_with_key("my_topic", b"Hello, Kafka!", "my_key")
# Flush pending messages (also happens automatically when the producer is dropped)
producer.flush()from pyrkafka import PyrKafkaConsumer
consumer = PyrKafkaConsumer("localhost:9092", "my_topic", "my_group")
for message in consumer:
print(message.decode())The consumer supports async for for use in asyncio applications. The blocking Kafka poll is automatically offloaded to a thread so the event loop stays responsive:
import asyncio
from pyrkafka import PyrKafkaConsumer
async def main():
consumer = PyrKafkaConsumer("localhost:9092", "my_topic", "my_group")
async for message in consumer:
print(message.decode())
asyncio.run(main())Both producer and consumer accept an optional config dict for additional librdkafka configuration:
producer = PyrKafkaProducer("localhost:9092", config={
"message.timeout.ms": "5000",
"compression.type": "zstd",
})
consumer = PyrKafkaConsumer("localhost:9092", "my_topic", "my_group", config={
"auto.offset.reset": "latest",
"enable.auto.commit": "false",
})MIT