panaversity

building-with-kafka-strimzi

@panaversity/building-with-kafka-strimzi
panaversity
96
89 forks
Updated 1/6/2026
View on GitHub

Use when building event-driven systems with Apache Kafka on Kubernetes. Triggers include EDA patterns, Kafka producers/consumers, Strimzi operator deployment, Schema Registry, transactions, exactly-once semantics. NOT for general messaging (use Dapr pub/sub for abstraction).

Installation

$skills install @panaversity/building-with-kafka-strimzi
Claude Code
Cursor
Copilot
Codex
Antigravity

Details

Path.claude/skills/building-with-kafka-strimzi/SKILL.md
Branchmain
Scoped Name@panaversity/building-with-kafka-strimzi

Usage

After installing, this skill will be available to your AI coding assistant.

Verify installation:

skills list

Skill Instructions


name: building-with-kafka-strimzi description: Use when building event-driven systems with Apache Kafka on Kubernetes. Triggers include EDA patterns, Kafka producers/consumers, Strimzi operator deployment, Schema Registry, transactions, exactly-once semantics. NOT for general messaging (use Dapr pub/sub for abstraction).

Building Event-Driven Systems with Kafka & Strimzi

Production-ready event streaming on Kubernetes using Apache Kafka with Strimzi operator.

Persona

You are a Kafka and event-driven architecture expert with production Kubernetes experience. You understand:

  • Event-driven architecture patterns (events vs commands, eventual consistency)
  • Apache Kafka internals (brokers, partitions, consumer groups, offsets)
  • Strimzi operator for Kubernetes-native Kafka deployment
  • confluent-kafka-python for high-performance Python clients
  • Schema Registry and Avro for event schema management
  • Exactly-once semantics and transactional patterns

When to Use

  • Building event-driven microservices
  • Deploying Kafka on Kubernetes with Strimzi
  • Implementing reliable producers with delivery guarantees
  • Managing consumer groups and offset handling
  • Schema evolution with Avro and Schema Registry
  • Change data capture with Debezium
  • Transactional event processing

Core Concepts

Event-Driven Architecture

ConceptDescription
EventsImmutable facts about past occurrences (e.g., "OrderCreated")
CommandsRequests to perform actions (e.g., "CreateOrder")
Eventual ConsistencySystems converge to consistent state over time
Event SourcingCapture state changes as event sequence
CQRSSeparate command and query processing

Kafka Architecture

┌─────────────────────────────────────────────────────────────┐
│  Kafka Cluster (KRaft Mode - No ZooKeeper)                  │
├─────────────────────────────────────────────────────────────┤
│  Controller Nodes (metadata via Raft)                       │
│   └─ __cluster_metadata topic                               │
├─────────────────────────────────────────────────────────────┤
│  Broker Nodes (message processing)                          │
│   └─ Topics → Partitions → Segments                         │
├─────────────────────────────────────────────────────────────┤
│  Producers → Topics ← Consumer Groups                       │
│   └─ Partition assignment, offset tracking                  │
└─────────────────────────────────────────────────────────────┘

Strimzi Components

ComponentRole
Cluster OperatorManages Kafka cluster lifecycle
Entity OperatorContains Topic + User operators
Topic OperatorManages KafkaTopic CRs
User OperatorManages KafkaUser CRs and credentials

Decision Logic

SituationPatternWhy
Critical dataacks=all + idempotent producerDurability over speed
High throughputacks=1 + batchingBalance speed/safety
Atomic multi-topic writesTransactionsAll-or-nothing
Schema evolutionAvro + Schema RegistryBackward compatibility
Database syncDebezium CDC + OutboxTransactional integrity
Consumer scalingConsumer groupsParallel processing

Strimzi Deployment on Kubernetes

Install Strimzi Operator

# Add Strimzi Helm repo
helm repo add strimzi https://strimzi.io/charts/
helm repo update

# Install operator
helm install strimzi-kafka-operator strimzi/strimzi-kafka-operator \
  --namespace kafka --create-namespace

Create Kafka Cluster (KRaft Mode)

# kafka-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: dual-role
  labels:
    strimzi.io/cluster: task-events
spec:
  replicas: 1
  roles:
    - controller
    - broker
  storage:
    type: ephemeral  # Use persistent-claim for production
---

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: task-events
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: 3.8.0
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
  entityOperator:
    topicOperator: {}
    userOperator: {}
kubectl apply -f kafka-cluster.yaml -n kafka

Create Topics via CRD

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: task-created
  labels:
    strimzi.io/cluster: task-events
spec:
  partitions: 3
  replicas: 1
  config:
    retention.ms: "604800000"  # 7 days
    cleanup.policy: delete

Create Users via CRD

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: task-api
  labels:
    strimzi.io/cluster: task-events
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: task-*
          patternType: prefix
        operations: [Read, Write, Describe]

Python Producer Patterns

Basic Producer with Delivery Reports

from confluent_kafka import Producer

def delivery_report(err, msg):
    """Callback triggered by poll() or flush()"""
    if err is not None:
        print(f'Delivery failed: {err}')
    else:
        print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

producer = Producer({
    'bootstrap.servers': 'task-events-kafka-bootstrap:9092',
    'client.id': 'task-api',
    'acks': 'all',
    'enable.idempotence': True,
    'retries': 5,
    'delivery.timeout.ms': 30000
})

# Async produce (default)
producer.produce(
    topic='task-created',
    key='task-123',
    value='{"id": "task-123", "title": "Buy groceries"}',
    callback=delivery_report
)

# Service callbacks
producer.poll(0)

# Flush before shutdown
producer.flush()

Idempotent Producer (Exactly-Once)

producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'enable.idempotence': True,  # Prevents duplicates on retry
    'acks': 'all',               # Wait for all replicas
    'max.in.flight.requests.per.connection': 5,  # Max for idempotence
    'retries': 2147483647        # Retry indefinitely within timeout
})

Python Consumer Patterns

Basic Consumer with Manual Commit

from confluent_kafka import Consumer, KafkaError

consumer = Consumer({
    'bootstrap.servers': 'task-events-kafka-bootstrap:9092',
    'group.id': 'notification-service',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False  # Manual commit for at-least-once
})

consumer.subscribe(['task-created'])

try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            raise Exception(msg.error())

        # Process message
        print(f'Received: {msg.value().decode()}')

        # Commit after successful processing
        consumer.commit(message=msg)

finally:
    consumer.close()

Consumer with Rebalance Callbacks

def on_assign(consumer, partitions):
    print(f'Assigned: {partitions}')

def on_revoke(consumer, partitions):
    print(f'Revoking: {partitions}')
    consumer.commit(asynchronous=False)  # Commit before losing partitions

consumer.subscribe(
    ['task-created'],
    on_assign=on_assign,
    on_revoke=on_revoke
)

Avro with Schema Registry

Producer with Avro Serialization

from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField

# Schema Registry client
sr_client = SchemaRegistryClient({'url': 'http://schema-registry:8081'})

# Avro schema
task_schema = """
{
  "type": "record",
  "name": "TaskCreated",
  "namespace": "com.example.events",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "title", "type": "string"},
    {"name": "created_at", "type": "string"},
    {"name": "priority", "type": ["null", "int"], "default": null}
  ]
}
"""

# Serializer
serializer = AvroSerializer(
    schema_registry_client=sr_client,
    schema_str=task_schema,
    to_dict=lambda obj, ctx: obj.__dict__
)

# Produce
class TaskCreated:
    def __init__(self, id, title, created_at, priority=None):
        self.id = id
        self.title = title
        self.created_at = created_at
        self.priority = priority

event = TaskCreated('task-123', 'Buy groceries', '2025-01-01T10:00:00Z', 1)
producer.produce(
    topic='task-created',
    key='task-123',
    value=serializer(event, SerializationContext('task-created', MessageField.VALUE))
)

Transactions (Exactly-Once)

Transactional Producer

producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': 'task-processor-1',
    'enable.idempotence': True
})

# Initialize transactions once
producer.init_transactions()

try:
    producer.begin_transaction()

    # Produce multiple messages atomically
    producer.produce('orders', key='o1', value='order-1')
    producer.produce('payments', key='p1', value='payment-1')
    producer.produce('audit', key='a1', value='audit-log')

    producer.commit_transaction()
except Exception as e:
    producer.abort_transaction()
    raise

Transaction-Aware Consumer

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'txn-consumer',
    'isolation.level': 'read_committed',  # Only read committed messages
    'enable.auto.commit': False
})

FastAPI Integration

Async Producer with Lifespan

from contextlib import asynccontextmanager
from fastapi import FastAPI
from confluent_kafka import Producer
import asyncio

producer = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global producer
    producer = Producer({'bootstrap.servers': 'kafka:9092'})
    yield
    producer.flush()

app = FastAPI(lifespan=lifespan)

@app.post("/tasks")
async def create_task(title: str):
    task_id = str(uuid.uuid4())
    event = {"id": task_id, "title": title}

    producer.produce(
        'task-created',
        key=task_id,
        value=json.dumps(event)
    )
    producer.poll(0)

    return {"id": task_id}

Background Consumer

import asyncio
from threading import Thread

def consume_loop():
    consumer = Consumer({
        'bootstrap.servers': 'kafka:9092',
        'group.id': 'notification-service'
    })
    consumer.subscribe(['task-created'])

    while True:
        msg = consumer.poll(1.0)
        if msg and not msg.error():
            # Process message
            pass

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Start consumer in background thread
    consumer_thread = Thread(target=consume_loop, daemon=True)
    consumer_thread.start()
    yield

Delivery Guarantees

GuaranteeProducer ConfigConsumer ConfigRisk
At-most-onceacks=0Auto-commit before processData loss
At-least-onceacks=all, retriesCommit after processDuplicates
Exactly-onceTransactions + idempotenceisolation.level=read_committedComplexity

Task API Event Examples

Event Naming Convention

Domain: task
Events: task.created, task.updated, task.completed, task.deleted
Topics: task-events (single topic) or task-created, task-updated (per event)

Event Schema

{
  "event_id": "uuid",
  "event_type": "task.created",
  "occurred_at": "ISO-8601",
  "data": {
    "task_id": "uuid",
    "title": "string",
    "owner_id": "uuid"
  },
  "metadata": {
    "correlation_id": "uuid",
    "causation_id": "uuid"
  }
}

Safety & Guardrails

NEVER

  • Use acks=0 for critical data
  • Set max.in.flight.requests > 5 with idempotence
  • Skip consumer.close() (causes rebalance delays)
  • Store offsets before successful processing
  • Expose broker addresses externally without TLS

ALWAYS

  • Use acks=all for important events
  • Enable idempotence for exactly-once
  • Handle KafkaError._PARTITION_EOF gracefully
  • Use Schema Registry for production
  • Set appropriate retention.ms for event topics
  • Monitor consumer lag

Common Errors

ErrorCauseFix
NOT_ENOUGH_REPLICASISR below min.insync.replicasCheck broker health
COORDINATOR_NOT_AVAILABLEConsumer group coordinator missingWait, retry
REBALANCE_IN_PROGRESSConsumer group rebalancingWait for completion
OFFSET_OUT_OF_RANGERequested offset doesn't existCheck auto.offset.reset
UNKNOWN_TOPIC_OR_PARTITIONTopic doesn't existCreate topic first

References