Apache Kafka
Distributed streaming platform. Processes high-throughput real-time data streams. Integrates publish-subscribe messaging and stream processing.
Server
Apache Kafka
Overview
Apache Kafka is a distributed streaming platform designed for building real-time data pipelines and streaming applications that handle high-throughput, fault-tolerant data feeds. Originally developed by LinkedIn, Kafka has evolved into the industry standard for event streaming, message queuing, and building event-driven architectures at scale. With its commit log-based architecture, Kafka provides durable message storage, horizontal scalability, and the ability to process millions of messages per second with low latency, making it essential for modern data-intensive applications and microservices architectures.
Details
Apache Kafka 2025 edition represents the maturation of event streaming platforms with KRaft (Kafka Raft) mode eliminating ZooKeeper dependency, significantly simplifying deployment and operations. Version 3.8+ provides enhanced performance, improved security features, and better cloud-native integration. Kafka's distributed log-based architecture enables massive horizontal scaling through partitioning, automatic replication for fault tolerance, and exactly-once semantics for critical applications. The ecosystem includes Kafka Connect for data integration, Kafka Streams for real-time processing, and Schema Registry for data governance. With native cloud support and Kubernetes operators, Kafka seamlessly integrates into modern containerized environments while maintaining enterprise-grade reliability and performance.
Key Features
- High Throughput: Process millions of messages per second with low latency
- Horizontal Scalability: Dynamic cluster scaling through partitioning and broker addition
- Fault Tolerance: Automatic replication, failover, and distributed commit log architecture
- Durable Storage: Persistent message storage with configurable retention policies
- Stream Processing: Built-in Kafka Streams API for real-time data processing
- KRaft Mode: ZooKeeper-free operation for simplified deployment and management
Pros and Cons
Pros
- Industry standard for high-throughput event streaming and messaging
- Exceptional horizontal scalability and performance characteristics
- Strong durability guarantees with configurable replication and persistence
- Rich ecosystem including Kafka Connect, Streams, and Schema Registry
- Excellent integration with big data and analytics platforms
- Active community and comprehensive documentation
Cons
- Complexity in initial setup and cluster management
- Requires careful capacity planning and monitoring for optimal performance
- Learning curve for understanding partitioning and consumer group concepts
- Resource intensive requiring significant memory and disk space
- Network latency sensitive in multi-datacenter deployments
- Operational overhead for maintaining large clusters
Reference Pages
Code Examples
Installation and Basic Setup
# Java environment setup
sudo apt update
sudo apt install openjdk-17-jdk
# Set JAVA_HOME
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
echo 'export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64' >> ~/.bashrc
# Download and install Kafka
wget https://downloads.apache.org/kafka/2.13-3.8.0/kafka_2.13-3.8.0.tgz
tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0
# Set environment variables
export KAFKA_HOME=$(pwd)
export PATH=$PATH:$KAFKA_HOME/bin
# Generate cluster ID (KRaft mode)
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
# Format storage
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
# Start Kafka server
bin/kafka-server-start.sh config/kraft/server.properties
Docker Deployment
# Docker Compose configuration for Kafka
cat << EOF > docker-compose.yml
version: '3.8'
services:
kafka:
image: apache/kafka:latest
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka:19092'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:19093'
KAFKA_LISTENERS: 'CONTROLLER://:19093,PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
volumes:
- kafka_data:/tmp/kraft-combined-logs
volumes:
kafka_data:
EOF
docker-compose up -d
# Verify Kafka is running
docker logs kafka
Basic Topic Management
# Create topic
bin/kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# List topics
bin/kafka-topics.sh --list \
--bootstrap-server localhost:9092
# Describe topic
bin/kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092
# Modify topic (increase partitions)
bin/kafka-topics.sh --alter \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 6
# Delete topic
bin/kafka-topics.sh --delete \
--topic my-topic \
--bootstrap-server localhost:9092
Producer and Consumer Operations
# Console producer
bin/kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092
# Producer with key-value pairs
bin/kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 \
--property "parse.key=true" \
--property "key.separator=:"
# Producer from file
bin/kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 < messages.txt
# Console consumer
bin/kafka-console-consumer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 \
--from-beginning
# Consumer with key display
bin/kafka-console-consumer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 \
--property print.key=true \
--property key.separator="-" \
--from-beginning
# Consumer group
bin/kafka-console-consumer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 \
--group my-consumer-group \
--from-beginning
# Consumer group management
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-consumer-group \
--describe
# Reset consumer group offset
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-consumer-group \
--reset-offsets \
--to-earliest \
--topic my-topic \
--execute
Java Producer Implementation
// Maven dependencies
// <dependency>
// <groupId>org.apache.kafka</groupId>
// <artifactId>kafka-clients</artifactId>
// <version>3.8.0</version>
// </dependency>
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerExample {
private static final String TOPIC_NAME = "orders";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
// Producer configuration
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Performance tuning
props.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all replicas
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// Create producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
// Send messages
for (int i = 0; i < 1000; i++) {
String key = "order-" + i;
String value = generateOrderJson(i);
ProducerRecord<String, String> record =
new ProducerRecord<>(TOPIC_NAME, key, value);
// Asynchronous send with callback
Future<RecordMetadata> future = producer.send(record,
(metadata, exception) -> {
if (exception != null) {
System.err.println("Failed to send message: " + exception.getMessage());
} else {
System.out.printf("Message sent successfully: " +
"key=%s, partition=%d, offset=%d, timestamp=%d%n",
key, metadata.partition(), metadata.offset(), metadata.timestamp());
}
});
// Synchronous send (optional)
// RecordMetadata metadata = future.get();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
private static String generateOrderJson(int orderId) {
return String.format(
"{\"orderId\":%d,\"customerId\":%d,\"amount\":%.2f,\"timestamp\":%d}",
orderId, orderId % 100, Math.random() * 1000, System.currentTimeMillis()
);
}
}
// Transaction support
public class TransactionalProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
producer.initTransactions();
producer.beginTransaction();
// Send multiple messages as a transaction
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("transaction-topic", "key-" + i, "value-" + i);
producer.send(record);
}
producer.commitTransaction();
System.out.println("Transaction committed successfully");
} catch (Exception e) {
producer.abortTransaction();
System.err.println("Transaction aborted: " + e.getMessage());
} finally {
producer.close();
}
}
}
Java Consumer Implementation
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
public class KafkaConsumerExample {
private static final String TOPIC_NAME = "orders";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "order-processing-group";
public static void main(String[] args) {
// Consumer configuration
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Manual commit
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
// Create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
// Subscribe to topic
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
// Message processing loop
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// Process message
processOrder(record.key(), record.value());
System.out.printf("Processed message: " +
"key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
} catch (Exception e) {
System.err.printf("Failed to process message: key=%s, error=%s%n",
record.key(), e.getMessage());
// Handle error (dead letter queue, retry, etc.)
}
}
// Manual commit after processing batch
consumer.commitSync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
private static void processOrder(String key, String value) {
// Business logic implementation
System.out.println("Processing order: " + key);
// Simulate processing time
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// Manual partition assignment example
public class ManualPartitionConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
// Manual partition assignment
TopicPartition partition0 = new TopicPartition("orders", 0);
TopicPartition partition1 = new TopicPartition("orders", 1);
consumer.assign(Arrays.asList(partition0, partition1));
// Seek to specific offset
consumer.seekToBeginning(Arrays.asList(partition0, partition1));
// consumer.seek(partition0, 1000); // Seek to specific offset
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
Python Producer and Consumer
# pip install kafka-python
from kafka import KafkaProducer, KafkaConsumer
import json
import time
from datetime import datetime
# Python Producer
class OrderProducer:
def __init__(self, bootstrap_servers=['localhost:9092']):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all',
retries=3,
batch_size=16384,
linger_ms=1,
compression_type='snappy'
)
def send_order(self, order_id, customer_id, amount):
order = {
'order_id': order_id,
'customer_id': customer_id,
'amount': amount,
'timestamp': datetime.now().isoformat(),
'status': 'pending'
}
try:
# Send message
future = self.producer.send(
'orders',
key=f'order-{order_id}',
value=order
)
# Get metadata (synchronous)
record_metadata = future.get(timeout=10)
print(f"Order sent: {order_id}, partition: {record_metadata.partition}, "
f"offset: {record_metadata.offset}")
except Exception as e:
print(f"Failed to send order {order_id}: {e}")
def close(self):
self.producer.flush()
self.producer.close()
# Python Consumer
class OrderConsumer:
def __init__(self, group_id='order-processor', bootstrap_servers=['localhost:9092']):
self.consumer = KafkaConsumer(
'orders',
bootstrap_servers=bootstrap_servers,
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
auto_offset_reset='earliest',
enable_auto_commit=False,
max_poll_records=500,
session_timeout_ms=30000,
heartbeat_interval_ms=3000
)
def process_orders(self):
try:
for message in self.consumer:
try:
order = message.value
order_id = order.get('order_id')
# Process order
self.process_order(order)
print(f"Processed order: {order_id}, partition: {message.partition}, "
f"offset: {message.offset}")
# Manual commit
self.consumer.commit()
except Exception as e:
print(f"Failed to process message: {e}")
# Handle error (skip, retry, dead letter queue)
except KeyboardInterrupt:
print("Stopping consumer...")
finally:
self.consumer.close()
def process_order(self, order):
# Business logic implementation
order_id = order.get('order_id')
amount = order.get('amount')
print(f"Processing order {order_id} for amount ${amount}")
# Simulate processing time
time.sleep(0.01)
# Usage example
if __name__ == "__main__":
# Producer example
producer = OrderProducer()
for i in range(100):
producer.send_order(
order_id=i + 1,
customer_id=(i % 10) + 1,
amount=round(50 + (i * 10.5), 2)
)
producer.close()
# Consumer example
# consumer = OrderConsumer()
# consumer.process_orders()
Kafka Streams Processing
// Maven dependency for Kafka Streams
// <dependency>
// <groupId>org.apache.kafka</groupId>
// <artifactId>kafka-streams</artifactId>
// <version>3.8.0</version>
// </dependency>
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.util.Arrays;
import java.util.Properties;
public class OrderProcessingStream {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processing-stream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
"org.apache.kafka.common.serialization.Serdes$StringSerde");
StreamsBuilder builder = new StreamsBuilder();
// Input stream
KStream<String, String> orders = builder.stream("orders");
// Process and transform orders
KStream<String, String> processedOrders = orders
.filter((key, value) -> value.contains("\"status\":\"pending\""))
.mapValues(value -> value.replace("pending", "processing"))
.peek((key, value) -> System.out.println("Processing order: " + key));
// Output to processed orders topic
processedOrders.to("processed-orders");
// Word count example
KStream<String, String> textLines = builder.stream("text-input");
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.count();
wordCounts.toStream().to("word-count-output");
// Start streaming application
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
System.out.println("Kafka Streams application started");
}
}
// Advanced stream processing with windowing
public class AdvancedStreamProcessing {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "advanced-stream-processing");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> orders = builder.stream("orders");
// Windowed aggregation
orders
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count()
.toStream()
.map((windowedKey, count) ->
KeyValue.pair(windowedKey.key(), "Count in 5min window: " + count))
.to("order-counts");
// Join streams
KStream<String, String> payments = builder.stream("payments");
KStream<String, String> enrichedOrders = orders.join(
payments,
(orderValue, paymentValue) -> "Order: " + orderValue + ", Payment: " + paymentValue,
JoinWindows.of(Duration.ofMinutes(10))
);
enrichedOrders.to("enriched-orders");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
Cluster Configuration and High Availability
# Three-node cluster configuration
# Node 1 (server-1.properties)
cat > config/kraft/server-1.properties << 'EOF'
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093
listeners=PLAINTEXT://node1:9092,CONTROLLER://node1:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://node1:9092
controller.listener.names=CONTROLLER
log.dirs=/var/lib/kafka-logs-1
num.partitions=3
default.replication.factor=3
min.insync.replicas=2
EOF
# Node 2 (server-2.properties)
cat > config/kraft/server-2.properties << 'EOF'
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093
listeners=PLAINTEXT://node2:9092,CONTROLLER://node2:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://node2:9092
controller.listener.names=CONTROLLER
log.dirs=/var/lib/kafka-logs-2
num.partitions=3
default.replication.factor=3
min.insync.replicas=2
EOF
# Node 3 (server-3.properties)
cat > config/kraft/server-3.properties << 'EOF'
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093
listeners=PLAINTEXT://node3:9092,CONTROLLER://node3:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://node3:9092
controller.listener.names=CONTROLLER
log.dirs=/var/lib/kafka-logs-3
num.partitions=3
default.replication.factor=3
min.insync.replicas=2
EOF
# Cluster startup script
cat > start-cluster.sh << 'EOF'
#!/bin/bash
# Generate cluster ID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
echo "Cluster ID: $KAFKA_CLUSTER_ID"
# Format storage on all nodes
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server-1.properties
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server-2.properties
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server-3.properties
# Start brokers
echo "Starting Kafka brokers..."
bin/kafka-server-start.sh config/kraft/server-1.properties > logs/server-1.log 2>&1 &
bin/kafka-server-start.sh config/kraft/server-2.properties > logs/server-2.log 2>&1 &
bin/kafka-server-start.sh config/kraft/server-3.properties > logs/server-3.log 2>&1 &
sleep 10
# Verify cluster
bin/kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --list
EOF
chmod +x start-cluster.sh
Performance Tuning and Monitoring
# Performance testing
# Producer performance test
bin/kafka-producer-perf-test.sh \
--topic performance-test \
--num-records 1000000 \
--record-size 1024 \
--throughput 50000 \
--producer-props bootstrap.servers=localhost:9092 \
acks=all \
batch.size=32768 \
linger.ms=5 \
compression.type=snappy
# Consumer performance test
bin/kafka-consumer-perf-test.sh \
--topic performance-test \
--messages 1000000 \
--threads 4 \
--bootstrap-server localhost:9092 \
--group perf-test-group
# JMX monitoring setup
export JMX_PORT=9999
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Dcom.sun.management.jmxremote.port=9999"
bin/kafka-server-start.sh config/server.properties
# Key metrics to monitor
echo "Important Kafka metrics:
- kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
- kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
- kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
- kafka.controller:type=KafkaController,name=OfflinePartitionsCount"
# Log monitoring script
cat > monitor-kafka.sh << 'EOF'
#!/bin/bash
echo "=== Kafka Cluster Status $(date) ==="
# Check broker status
echo "=== Broker Status ==="
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
# Check topic details
echo "=== Topic Summary ==="
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list | wc -l | xargs echo "Total topics:"
# Check consumer groups
echo "=== Consumer Groups ==="
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list | wc -l | xargs echo "Total consumer groups:"
# Check disk usage
echo "=== Disk Usage ==="
du -sh /var/lib/kafka-logs*
# Check recent errors
echo "=== Recent Errors ==="
tail -n 20 logs/server.log | grep -i error || echo "No recent errors"
echo "=== Monitoring Complete ==="
EOF
chmod +x monitor-kafka.sh