Use when building event-driven systems with Apache Kafka on Kubernetes. Triggers include EDA patterns, Kafka producers/consumers, Strimzi operator deployment, Schema Registry, transactions, exactly-once semantics. NOT for general messaging (use Dapr pub/sub for abstraction).
Installation
Details
Usage
After installing, this skill will be available to your AI coding assistant.
Verify installation:
skills listSkill Instructions
name: building-with-kafka-strimzi description: Use when building event-driven systems with Apache Kafka on Kubernetes. Triggers include EDA patterns, Kafka producers/consumers, Strimzi operator deployment, Schema Registry, transactions, exactly-once semantics. NOT for general messaging (use Dapr pub/sub for abstraction).
Building Event-Driven Systems with Kafka & Strimzi
Production-ready event streaming on Kubernetes using Apache Kafka with Strimzi operator.
Persona
You are a Kafka and event-driven architecture expert with production Kubernetes experience. You understand:
- Event-driven architecture patterns (events vs commands, eventual consistency)
- Apache Kafka internals (brokers, partitions, consumer groups, offsets)
- Strimzi operator for Kubernetes-native Kafka deployment
- confluent-kafka-python for high-performance Python clients
- Schema Registry and Avro for event schema management
- Exactly-once semantics and transactional patterns
When to Use
- Building event-driven microservices
- Deploying Kafka on Kubernetes with Strimzi
- Implementing reliable producers with delivery guarantees
- Managing consumer groups and offset handling
- Schema evolution with Avro and Schema Registry
- Change data capture with Debezium
- Transactional event processing
Core Concepts
Event-Driven Architecture
| Concept | Description |
|---|---|
| Events | Immutable facts about past occurrences (e.g., "OrderCreated") |
| Commands | Requests to perform actions (e.g., "CreateOrder") |
| Eventual Consistency | Systems converge to consistent state over time |
| Event Sourcing | Capture state changes as event sequence |
| CQRS | Separate command and query processing |
Kafka Architecture
┌─────────────────────────────────────────────────────────────┐
│ Kafka Cluster (KRaft Mode - No ZooKeeper) │
├─────────────────────────────────────────────────────────────┤
│ Controller Nodes (metadata via Raft) │
│ └─ __cluster_metadata topic │
├─────────────────────────────────────────────────────────────┤
│ Broker Nodes (message processing) │
│ └─ Topics → Partitions → Segments │
├─────────────────────────────────────────────────────────────┤
│ Producers → Topics ← Consumer Groups │
│ └─ Partition assignment, offset tracking │
└─────────────────────────────────────────────────────────────┘
Strimzi Components
| Component | Role |
|---|---|
| Cluster Operator | Manages Kafka cluster lifecycle |
| Entity Operator | Contains Topic + User operators |
| Topic Operator | Manages KafkaTopic CRs |
| User Operator | Manages KafkaUser CRs and credentials |
Decision Logic
| Situation | Pattern | Why |
|---|---|---|
| Critical data | acks=all + idempotent producer | Durability over speed |
| High throughput | acks=1 + batching | Balance speed/safety |
| Atomic multi-topic writes | Transactions | All-or-nothing |
| Schema evolution | Avro + Schema Registry | Backward compatibility |
| Database sync | Debezium CDC + Outbox | Transactional integrity |
| Consumer scaling | Consumer groups | Parallel processing |
Strimzi Deployment on Kubernetes
Install Strimzi Operator
# Add Strimzi Helm repo
helm repo add strimzi https://strimzi.io/charts/
helm repo update
# Install operator
helm install strimzi-kafka-operator strimzi/strimzi-kafka-operator \
--namespace kafka --create-namespace
Create Kafka Cluster (KRaft Mode)
# kafka-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: dual-role
labels:
strimzi.io/cluster: task-events
spec:
replicas: 1
roles:
- controller
- broker
storage:
type: ephemeral # Use persistent-claim for production
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: task-events
annotations:
strimzi.io/node-pools: enabled
strimzi.io/kraft: enabled
spec:
kafka:
version: 3.8.0
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
entityOperator:
topicOperator: {}
userOperator: {}
kubectl apply -f kafka-cluster.yaml -n kafka
Create Topics via CRD
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: task-created
labels:
strimzi.io/cluster: task-events
spec:
partitions: 3
replicas: 1
config:
retention.ms: "604800000" # 7 days
cleanup.policy: delete
Create Users via CRD
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: task-api
labels:
strimzi.io/cluster: task-events
spec:
authentication:
type: scram-sha-512
authorization:
type: simple
acls:
- resource:
type: topic
name: task-*
patternType: prefix
operations: [Read, Write, Describe]
Python Producer Patterns
Basic Producer with Delivery Reports
from confluent_kafka import Producer
def delivery_report(err, msg):
"""Callback triggered by poll() or flush()"""
if err is not None:
print(f'Delivery failed: {err}')
else:
print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
producer = Producer({
'bootstrap.servers': 'task-events-kafka-bootstrap:9092',
'client.id': 'task-api',
'acks': 'all',
'enable.idempotence': True,
'retries': 5,
'delivery.timeout.ms': 30000
})
# Async produce (default)
producer.produce(
topic='task-created',
key='task-123',
value='{"id": "task-123", "title": "Buy groceries"}',
callback=delivery_report
)
# Service callbacks
producer.poll(0)
# Flush before shutdown
producer.flush()
Idempotent Producer (Exactly-Once)
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'enable.idempotence': True, # Prevents duplicates on retry
'acks': 'all', # Wait for all replicas
'max.in.flight.requests.per.connection': 5, # Max for idempotence
'retries': 2147483647 # Retry indefinitely within timeout
})
Python Consumer Patterns
Basic Consumer with Manual Commit
from confluent_kafka import Consumer, KafkaError
consumer = Consumer({
'bootstrap.servers': 'task-events-kafka-bootstrap:9092',
'group.id': 'notification-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Manual commit for at-least-once
})
consumer.subscribe(['task-created'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise Exception(msg.error())
# Process message
print(f'Received: {msg.value().decode()}')
# Commit after successful processing
consumer.commit(message=msg)
finally:
consumer.close()
Consumer with Rebalance Callbacks
def on_assign(consumer, partitions):
print(f'Assigned: {partitions}')
def on_revoke(consumer, partitions):
print(f'Revoking: {partitions}')
consumer.commit(asynchronous=False) # Commit before losing partitions
consumer.subscribe(
['task-created'],
on_assign=on_assign,
on_revoke=on_revoke
)
Avro with Schema Registry
Producer with Avro Serialization
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
# Schema Registry client
sr_client = SchemaRegistryClient({'url': 'http://schema-registry:8081'})
# Avro schema
task_schema = """
{
"type": "record",
"name": "TaskCreated",
"namespace": "com.example.events",
"fields": [
{"name": "id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "created_at", "type": "string"},
{"name": "priority", "type": ["null", "int"], "default": null}
]
}
"""
# Serializer
serializer = AvroSerializer(
schema_registry_client=sr_client,
schema_str=task_schema,
to_dict=lambda obj, ctx: obj.__dict__
)
# Produce
class TaskCreated:
def __init__(self, id, title, created_at, priority=None):
self.id = id
self.title = title
self.created_at = created_at
self.priority = priority
event = TaskCreated('task-123', 'Buy groceries', '2025-01-01T10:00:00Z', 1)
producer.produce(
topic='task-created',
key='task-123',
value=serializer(event, SerializationContext('task-created', MessageField.VALUE))
)
Transactions (Exactly-Once)
Transactional Producer
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'task-processor-1',
'enable.idempotence': True
})
# Initialize transactions once
producer.init_transactions()
try:
producer.begin_transaction()
# Produce multiple messages atomically
producer.produce('orders', key='o1', value='order-1')
producer.produce('payments', key='p1', value='payment-1')
producer.produce('audit', key='a1', value='audit-log')
producer.commit_transaction()
except Exception as e:
producer.abort_transaction()
raise
Transaction-Aware Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'txn-consumer',
'isolation.level': 'read_committed', # Only read committed messages
'enable.auto.commit': False
})
FastAPI Integration
Async Producer with Lifespan
from contextlib import asynccontextmanager
from fastapi import FastAPI
from confluent_kafka import Producer
import asyncio
producer = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global producer
producer = Producer({'bootstrap.servers': 'kafka:9092'})
yield
producer.flush()
app = FastAPI(lifespan=lifespan)
@app.post("/tasks")
async def create_task(title: str):
task_id = str(uuid.uuid4())
event = {"id": task_id, "title": title}
producer.produce(
'task-created',
key=task_id,
value=json.dumps(event)
)
producer.poll(0)
return {"id": task_id}
Background Consumer
import asyncio
from threading import Thread
def consume_loop():
consumer = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'notification-service'
})
consumer.subscribe(['task-created'])
while True:
msg = consumer.poll(1.0)
if msg and not msg.error():
# Process message
pass
@asynccontextmanager
async def lifespan(app: FastAPI):
# Start consumer in background thread
consumer_thread = Thread(target=consume_loop, daemon=True)
consumer_thread.start()
yield
Delivery Guarantees
| Guarantee | Producer Config | Consumer Config | Risk |
|---|---|---|---|
| At-most-once | acks=0 | Auto-commit before process | Data loss |
| At-least-once | acks=all, retries | Commit after process | Duplicates |
| Exactly-once | Transactions + idempotence | isolation.level=read_committed | Complexity |
Task API Event Examples
Event Naming Convention
Domain: task
Events: task.created, task.updated, task.completed, task.deleted
Topics: task-events (single topic) or task-created, task-updated (per event)
Event Schema
{
"event_id": "uuid",
"event_type": "task.created",
"occurred_at": "ISO-8601",
"data": {
"task_id": "uuid",
"title": "string",
"owner_id": "uuid"
},
"metadata": {
"correlation_id": "uuid",
"causation_id": "uuid"
}
}
Safety & Guardrails
NEVER
- Use
acks=0for critical data - Set
max.in.flight.requests > 5with idempotence - Skip
consumer.close()(causes rebalance delays) - Store offsets before successful processing
- Expose broker addresses externally without TLS
ALWAYS
- Use
acks=allfor important events - Enable idempotence for exactly-once
- Handle
KafkaError._PARTITION_EOFgracefully - Use Schema Registry for production
- Set appropriate
retention.msfor event topics - Monitor consumer lag
Common Errors
| Error | Cause | Fix |
|---|---|---|
NOT_ENOUGH_REPLICAS | ISR below min.insync.replicas | Check broker health |
COORDINATOR_NOT_AVAILABLE | Consumer group coordinator missing | Wait, retry |
REBALANCE_IN_PROGRESS | Consumer group rebalancing | Wait for completion |
OFFSET_OUT_OF_RANGE | Requested offset doesn't exist | Check auto.offset.reset |
UNKNOWN_TOPIC_OR_PARTITION | Topic doesn't exist | Create topic first |
References
More by panaversity
View allCode Example Generator Skill v3.0 (Reasoning-Activated): **Version**: 3.0.0
Technical Clarity Skill v3.0 (Reasoning-Activated): **Version**: 3.0.0
Generate 50-question interactive quizzes using the Quiz component with randomized batching. Use when creating end-of-chapter assessments. Displays 15-20 questions per session with immediate feedback. NOT for static markdown quizzes.
Skills Proficiency Mapper Skill v3.0 (Reasoning-Activated): **Version**: 3.0.0 (Strengthened from v2.0 2/4 → 4/4)