
Event-driven architecture replaces batch polling with real-time event streams. Instead of asking are there new orders every five minutes, your systems react to each order the moment it happens. Kafka is the backbone for this pattern at scale, but the architecture decisions — topic design, schema governance, consumer patterns — determine whether it works reliably or becomes an operational nightmare.
This article covers the full production stack: topic design principles, schema registry with backward compatibility, producer and consumer patterns, exactly-once semantics, monitoring, and the move from ZooKeeper to KRaft mode.
Topic Design: One Topic Per Entity, Not Per Consumer
The most common mistake in Kafka architecture is creating topics for consumers instead of entities. A topic called orders-for-analytics and another called orders-for-billing means you are duplicating data, maintaining two producer code paths, and guaranteeing they will drift. Instead, create a single orders topic that all consumers read from. Each consumer group tracks its own offset and filters events it cares about.
The general rules: one topic per entity type (orders, users, payments), use event_type fields within messages to distinguish subtypes (order_created, order_shipped, order_cancelled), and partition by the entity's natural key (order_id, user_id) to guarantee ordering within an entity.
1# Topic creation with proper partition count and replication2from confluent_kafka.admin import AdminClient, NewTopic3 4admin = AdminClient({"bootstrap.servers": "kafka:9092"})5 6topics = [7 NewTopic(8 topic="orders",9 num_partitions=24,10 replication_factor=3,11 config={12 "retention.ms": str(7 * 24 * 60 * 60 * 1000), # 7 days13 "cleanup.policy": "delete",14 "min.insync.replicas": "2",15 "compression.type": "zstd",16 },17 ),18 NewTopic(19 topic="orders-dlq",20 num_partitions=6,21 replication_factor=3,22 config={23 "retention.ms": str(30 * 24 * 60 * 60 * 1000), # 30 days24 "cleanup.policy": "delete",25 },26 ),27]28 29futures = admin.create_topics(topics)30for topic, future in futures.items():31 try:32 future.result()33 print(f"Created topic: {topic}")34 except Exception as e:35 print(f"Failed to create {topic}: {e}")Partition count determines your maximum consumer parallelism. 24 partitions means you can have up to 24 consumers in a single consumer group processing in parallel. Start with 6x the number of consumer instances you plan to run — it is easy to add consumers later but painful to repartition an existing topic.
Schema Registry: Your Contract Layer
Without a schema registry, producers can change the event shape and silently break every consumer downstream. Confluent Schema Registry enforces compatibility rules on every schema change. Use backward compatibility as your default — new schemas can add optional fields but cannot remove or rename existing ones.
1# Producer with Avro serialization and Schema Registry2from confluent_kafka import SerializingProducer3from confluent_kafka.serialization import StringSerializer4from confluent_kafka.schema_registry import SchemaRegistryClient5from confluent_kafka.schema_registry.avro import AvroSerializer6from datetime import datetime, timezone7 8schema_registry = SchemaRegistryClient({"url": "http://schema-registry:8081"})9 10order_event_schema = """11{12 "type": "record",13 "name": "OrderEvent",14 "namespace": "com.example.events",15 "fields": [16 {"name": "order_id", "type": "string"},17 {"name": "event_type", "type": "string"},18 {"name": "occurred_at", "type": "string"},19 {"name": "customer_id", "type": "string"},20 {"name": "total_cents", "type": "long"},21 {"name": "currency", "type": "string"},22 {"name": "items", "type": {"type": "array", "items": "string"}},23 {"name": "metadata", "type": ["null", "string"], "default": null}24 ]25}26"""27 28def delivery_callback(err, msg):29 if err:30 print(f"DELIVERY FAILED for {msg.key()}: {err}")31 32producer = SerializingProducer({33 "bootstrap.servers": "kafka:9092",34 "key.serializer": StringSerializer("utf_8"),35 "value.serializer": AvroSerializer(36 schema_registry, order_event_schema,37 conf={"auto.register.schemas": False}38 ),39 "acks": "all",40 "enable.idempotence": True,41})42 43def publish_order_event(order_id, event_type, customer_id, total_cents, currency, items):44 event = {45 "order_id": order_id,46 "event_type": event_type,47 "occurred_at": datetime.now(timezone.utc).isoformat(),48 "customer_id": customer_id,49 "total_cents": total_cents,50 "currency": currency,51 "items": items,52 "metadata": None,53 }54 producer.produce(topic="orders", key=order_id, value=event, on_delivery=delivery_callback)55 producer.poll(0)56 57publish_order_event("ord-1234", "order_created", "cust-99", 4999, "USD", ["SKU-A", "SKU-B"])58producer.flush()The idempotent producer setting (enable.idempotence=True) ensures that retries due to transient network errors do not produce duplicate messages. Combined with acks=all, this gives you the strongest single-partition delivery guarantee available without transactions.
Consumer Patterns: Idempotent Processing and Dead-Letter Queues
Kafka guarantees at-least-once delivery by default. During consumer group rebalances, offset commits can lag behind actual processing, causing messages to be delivered twice. Your processing logic must be idempotent — applying the same event twice should produce the same result as applying it once.
1# Consumer with idempotent processing and dead-letter queue2from confluent_kafka import DeserializingConsumer, KafkaError3from confluent_kafka.serialization import StringDeserializer4from confluent_kafka.schema_registry.avro import AvroDeserializer5import traceback6 7consumer = DeserializingConsumer({8 "bootstrap.servers": "kafka:9092",9 "group.id": "analytics-pipeline",10 "auto.offset.reset": "earliest",11 "enable.auto.commit": False,12 "key.deserializer": StringDeserializer("utf_8"),13 "value.deserializer": AvroDeserializer(schema_registry, order_event_schema),14})15 16consumer.subscribe(["orders"])17 18def process_event(event):19 db.execute("""20 INSERT INTO analytics.order_events (order_id, event_type, occurred_at, total_cents)21 VALUES (%s, %s, %s, %s)22 ON CONFLICT (order_id, event_type)23 DO UPDATE SET total_cents = EXCLUDED.total_cents, updated_at = NOW()24 """, (event["order_id"], event["event_type"], event["occurred_at"], event["total_cents"]))25 26try:27 while True:28 msg = consumer.poll(timeout=1.0)29 if msg is None:30 continue31 if msg.error():32 if msg.error().code() == KafkaError._PARTITION_EOF:33 continue34 print(f"Consumer error: {msg.error()}")35 continue36 try:37 process_event(msg.value())38 consumer.commit(message=msg)39 except Exception as e:40 publish_to_dead_letter("orders-dlq", msg, traceback.format_exc())41 consumer.commit(message=msg)42finally:43 consumer.close()The dead-letter queue (DLQ) pattern is critical. When a message cannot be processed — bad data, downstream service unavailable, schema mismatch — send it to a DLQ topic with full context. This prevents a single bad message from blocking the entire consumer group.
Monitoring: Consumer Lag, Throughput, and Error Rate
A Kafka architecture without monitoring is a ticking time bomb. Three metrics matter above all others: consumer lag (difference between latest produced offset and latest committed consumer offset), throughput (events per second per partition), and error rate (events sent to dead-letter topics).
1# Prometheus alerting rules for Kafka monitoring2groups:3 - name: kafka_consumer_alerts4 interval: 30s5 rules:6 - alert: ConsumerLagCritical7 expr: kafka_consumergroup_lag_sum{group="analytics-pipeline"} > 1000008 for: 5m9 labels:10 severity: critical11 annotations:12 summary: "Consumer group {{ $labels.group }} lag exceeds 100K"13 14 - alert: DLQMessageRate15 expr: rate(kafka_topic_messages_in_total{topic=~".*-dlq"}[5m]) > 0.116 for: 5m17 labels:18 severity: warning19 annotations:20 summary: "Dead-letter queue {{ $labels.topic }} receiving messages"Exactly-Once Semantics with Kafka Transactions
Exactly-once semantics in Apache Kafka combine idempotent producers, transactional writes, and read_committed isolation so downstream readers only see messages that belong to successfully committed transactions. A transactional producer assigns a stable transaction.id, which lets the broker fence zombie instances after failovers and guarantees atomic writes across multiple topic partitions. The consume-transform-produce pattern wraps consumption, transformation, and production in a single transaction: offsets for consumed partitions are written to the consumer_offsets topic in the same transaction as output records, so either both commit or neither does. Consumers that should not observe aborted or partially written data must set isolation.level to read_committed. This model adds latency compared to at-least-once defaults, so reserve it for workflows where duplicate side effects are unacceptable, such as debiting accounts or updating regulatory ledgers.
1from confluent_kafka import Producer2 3conf = {4 "bootstrap.servers": "kafka:9092",5 "transactional.id": "payments-writer-01",6 "enable.idempotence": True,7 "acks": "all",8 "linger.ms": 5,9 "compression.type": "zstd",10}11 12p = Producer(conf)13p.init_transactions()14p.begin_transaction()15 16def delivery_report(err, msg):17 if err is not None:18 raise RuntimeError(err)19 20p.produce("payments", key=b"acct-42", value=b'{"amt": 19.99}', callback=delivery_report)21p.commit_transaction()Kafka Connect: Bridging External Systems
Kafka Connect provides a scalable integration layer between Kafka and databases, warehouses, object stores, and SaaS APIs without hand-rolling bespoke ingest jobs for every system. Source connectors continuously tail external changelogs or tables and publish well-typed events to topics, while sink connectors subscribe to topics and push data into destinations. Debezium is the de facto standard for change data capture from relational databases: it reads the write-ahead log, captures inserts, updates, and deletes as structured events, and preserves ordering per table or primary key. On the sink side, the S3 connector lands files partitioned by time or key for lakehouse ingestion. Treat connector configs as infrastructure code: pin plugin versions, externalize secrets, and validate throughput against rebalance budgets.
1{2 "name": "pg-cdc",3 "config": {4 "connector.class": "io.debezium.connector.postgresql.PostgresConnector",5 "tasks.max": "1",6 "database.hostname": "postgres",7 "database.port": "5432",8 "database.user": "cdc_user",9 "database.dbname": "appdb",10 "database.server.name": "appdb",11 "plugin.name": "pgoutput",12 "topic.prefix": "appdb",13 "table.include.list": "public.orders,public.order_items",14 "key.converter": "io.confluent.connect.avro.AvroConverter",15 "value.converter": "io.confluent.connect.avro.AvroConverter"16 }17}Consumer Group Rebalancing Strategies
The classic eager rebalancer revokes all partitions from every member before redistributing work, which causes stop-the-world pauses during rolling deploys. The cooperative sticky assignor incrementally migrates only the partitions that must move, reducing churn. Static group membership lets you reuse stable group.instance.id values so that routine restarts do not trigger full rebalances when session timeouts are tuned appropriately. Session timeouts, heartbeat intervals, and max.poll.interval.ms must be coherent: if processing loops exceed max.poll.interval.ms, the coordinator evicts the member and partitions shuffle.
1from confluent_kafka import Consumer2 3c = Consumer({4 "bootstrap.servers": "kafka:9092",5 "group.id": "orders-svc",6 "client.id": "orders-svc-7",7 "partition.assignment.strategy": "cooperative-sticky",8 "group.instance.id": "orders-svc-pod-7",9 "session.timeout.ms": 45000,10 "heartbeat.interval.ms": 3000,11 "max.poll.interval.ms": 300000,12 "enable.auto.commit": False,13})Back-Pressure and Flow Control
Back-pressure is how your pipeline signals that one tier cannot keep up with another. Producers accumulate records in buffer.memory until batch.size and linger.ms thresholds are met; setting linger.ms too high increases latency but improves compression and broker efficiency, while a tiny buffer with aggressive throughput leads to buffer full errors. On the consumer side, max.poll.records caps how much work you dequeue per iteration; oversized batches plus slow deserialization can exceed the interval and trigger eviction. Misaligned timeouts between slow processing, long GC pauses, and short session windows produce rebalance storms where partitions ping-pong among members. Instrument end-to-end latency, buffer utilization, consumer lag, and time-between-polls to detect pressure before it becomes an outage.
1# producer2buffer.memory=671088643linger.ms=204batch.size=655365compression.type=zstd6acks=all7 8# consumer9max.poll.interval.ms=60000010max.poll.records=20011fetch.min.bytes=104857612fetch.max.wait.ms=50013session.timeout.ms=6000014heartbeat.interval.ms=3000KRaft Mode: Replacing ZooKeeper
Kafka 4.0 (released early 2025) removed ZooKeeper entirely. KRaft (Kafka Raft) moves metadata management into the Kafka brokers themselves. The practical benefits: fewer processes to manage, millions of partitions per cluster vs hundreds of thousands with ZooKeeper, and millisecond controller failover instead of seconds. For new clusters, use KRaft from day one. For existing clusters, the migration involves introducing controller nodes, reassigning metadata leadership, and retiring ZooKeeper only after brokers confirm metadata dual-write. Configure controllers with process.roles=controller, enable brokers with zookeeper.metadata.migration.enable=true, then run the migration tooling from your distribution.
1#!/usr/bin/env bash2set -euo pipefail3 4kafka-storage format -t $(kafka-storage random-uuid) -c /etc/kafka/controller.properties --ignore-formatted5kafka-metadata --bootstrap-controller kafka-ctrl-1:9093 kafka-ctrl-2:9093 kafka-ctrl-3:9093 \6 --command migrate-zk-cluster --zookeeper zk-1:2181,zk-2:2181,zk-3:2181/kafka7 8systemctl restart kafka-controller9systemctl restart kafka-broker10kafka-metadata --bootstrap-server kafka-1:9092 --describe --snapshotA well-designed Kafka architecture has seven layers: topic design (one per entity, proper partitioning), schema governance (registry with backward compatibility), producers (idempotent, structured events, delivery callbacks), consumers (idempotent processing, dead-letter queues, manual offset commits), exactly-once transactions (for critical financial paths), Kafka Connect (for system integration), and monitoring (lag, throughput, error rate alerting). Start with one entity, implement all layers, and expand from there.




