Apache Kafka

Distributed streaming platform. Processes high-throughput real-time data streams. Integrates publish-subscribe messaging and stream processing.

Message BrokerStreamingDistributedScalableHigh ThroughputReal-timeEventLog

Server

Apache Kafka

Overview

Apache Kafka is a distributed streaming platform designed for building real-time data pipelines and streaming applications that handle high-throughput, fault-tolerant data feeds. Originally developed by LinkedIn, Kafka has evolved into the industry standard for event streaming, message queuing, and building event-driven architectures at scale. With its commit log-based architecture, Kafka provides durable message storage, horizontal scalability, and the ability to process millions of messages per second with low latency, making it essential for modern data-intensive applications and microservices architectures.

Details

Apache Kafka 2025 edition represents the maturation of event streaming platforms with KRaft (Kafka Raft) mode eliminating ZooKeeper dependency, significantly simplifying deployment and operations. Version 3.8+ provides enhanced performance, improved security features, and better cloud-native integration. Kafka's distributed log-based architecture enables massive horizontal scaling through partitioning, automatic replication for fault tolerance, and exactly-once semantics for critical applications. The ecosystem includes Kafka Connect for data integration, Kafka Streams for real-time processing, and Schema Registry for data governance. With native cloud support and Kubernetes operators, Kafka seamlessly integrates into modern containerized environments while maintaining enterprise-grade reliability and performance.

Key Features

  • High Throughput: Process millions of messages per second with low latency
  • Horizontal Scalability: Dynamic cluster scaling through partitioning and broker addition
  • Fault Tolerance: Automatic replication, failover, and distributed commit log architecture
  • Durable Storage: Persistent message storage with configurable retention policies
  • Stream Processing: Built-in Kafka Streams API for real-time data processing
  • KRaft Mode: ZooKeeper-free operation for simplified deployment and management

Pros and Cons

Pros

  • Industry standard for high-throughput event streaming and messaging
  • Exceptional horizontal scalability and performance characteristics
  • Strong durability guarantees with configurable replication and persistence
  • Rich ecosystem including Kafka Connect, Streams, and Schema Registry
  • Excellent integration with big data and analytics platforms
  • Active community and comprehensive documentation

Cons

  • Complexity in initial setup and cluster management
  • Requires careful capacity planning and monitoring for optimal performance
  • Learning curve for understanding partitioning and consumer group concepts
  • Resource intensive requiring significant memory and disk space
  • Network latency sensitive in multi-datacenter deployments
  • Operational overhead for maintaining large clusters

Reference Pages

Code Examples

Installation and Basic Setup

# Java environment setup
sudo apt update
sudo apt install openjdk-17-jdk

# Set JAVA_HOME
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
echo 'export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64' >> ~/.bashrc

# Download and install Kafka
wget https://downloads.apache.org/kafka/2.13-3.8.0/kafka_2.13-3.8.0.tgz
tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0

# Set environment variables
export KAFKA_HOME=$(pwd)
export PATH=$PATH:$KAFKA_HOME/bin

# Generate cluster ID (KRaft mode)
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

# Format storage
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

# Start Kafka server
bin/kafka-server-start.sh config/kraft/server.properties

Docker Deployment

# Docker Compose configuration for Kafka
cat << EOF > docker-compose.yml
version: '3.8'
services:
  kafka:
    image: apache/kafka:latest
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka:19092'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:19093'
      KAFKA_LISTENERS: 'CONTROLLER://:19093,PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    volumes:
      - kafka_data:/tmp/kraft-combined-logs

volumes:
  kafka_data:
EOF

docker-compose up -d

# Verify Kafka is running
docker logs kafka

Basic Topic Management

# Create topic
bin/kafka-topics.sh --create \
    --topic my-topic \
    --bootstrap-server localhost:9092 \
    --partitions 3 \
    --replication-factor 1

# List topics
bin/kafka-topics.sh --list \
    --bootstrap-server localhost:9092

# Describe topic
bin/kafka-topics.sh --describe \
    --topic my-topic \
    --bootstrap-server localhost:9092

# Modify topic (increase partitions)
bin/kafka-topics.sh --alter \
    --topic my-topic \
    --bootstrap-server localhost:9092 \
    --partitions 6

# Delete topic
bin/kafka-topics.sh --delete \
    --topic my-topic \
    --bootstrap-server localhost:9092

Producer and Consumer Operations

# Console producer
bin/kafka-console-producer.sh \
    --topic my-topic \
    --bootstrap-server localhost:9092

# Producer with key-value pairs
bin/kafka-console-producer.sh \
    --topic my-topic \
    --bootstrap-server localhost:9092 \
    --property "parse.key=true" \
    --property "key.separator=:"

# Producer from file
bin/kafka-console-producer.sh \
    --topic my-topic \
    --bootstrap-server localhost:9092 < messages.txt

# Console consumer
bin/kafka-console-consumer.sh \
    --topic my-topic \
    --bootstrap-server localhost:9092 \
    --from-beginning

# Consumer with key display
bin/kafka-console-consumer.sh \
    --topic my-topic \
    --bootstrap-server localhost:9092 \
    --property print.key=true \
    --property key.separator="-" \
    --from-beginning

# Consumer group
bin/kafka-console-consumer.sh \
    --topic my-topic \
    --bootstrap-server localhost:9092 \
    --group my-consumer-group \
    --from-beginning

# Consumer group management
bin/kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --list

bin/kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group my-consumer-group \
    --describe

# Reset consumer group offset
bin/kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group my-consumer-group \
    --reset-offsets \
    --to-earliest \
    --topic my-topic \
    --execute

Java Producer Implementation

// Maven dependencies
// <dependency>
//   <groupId>org.apache.kafka</groupId>
//   <artifactId>kafka-clients</artifactId>
//   <version>3.8.0</version>
// </dependency>

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;

public class KafkaProducerExample {
    private static final String TOPIC_NAME = "orders";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    
    public static void main(String[] args) {
        // Producer configuration
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        // Performance tuning
        props.put(ProducerConfig.ACKS_CONFIG, "all");  // Wait for all replicas
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        
        // Create producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        try {
            // Send messages
            for (int i = 0; i < 1000; i++) {
                String key = "order-" + i;
                String value = generateOrderJson(i);
                
                ProducerRecord<String, String> record = 
                    new ProducerRecord<>(TOPIC_NAME, key, value);
                
                // Asynchronous send with callback
                Future<RecordMetadata> future = producer.send(record, 
                    (metadata, exception) -> {
                        if (exception != null) {
                            System.err.println("Failed to send message: " + exception.getMessage());
                        } else {
                            System.out.printf("Message sent successfully: " +
                                "key=%s, partition=%d, offset=%d, timestamp=%d%n",
                                key, metadata.partition(), metadata.offset(), metadata.timestamp());
                        }
                    });
                
                // Synchronous send (optional)
                // RecordMetadata metadata = future.get();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
    
    private static String generateOrderJson(int orderId) {
        return String.format(
            "{\"orderId\":%d,\"customerId\":%d,\"amount\":%.2f,\"timestamp\":%d}",
            orderId, orderId % 100, Math.random() * 1000, System.currentTimeMillis()
        );
    }
}

// Transaction support
public class TransactionalProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        try {
            producer.initTransactions();
            
            producer.beginTransaction();
            
            // Send multiple messages as a transaction
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = 
                    new ProducerRecord<>("transaction-topic", "key-" + i, "value-" + i);
                producer.send(record);
            }
            
            producer.commitTransaction();
            System.out.println("Transaction committed successfully");
            
        } catch (Exception e) {
            producer.abortTransaction();
            System.err.println("Transaction aborted: " + e.getMessage());
        } finally {
            producer.close();
        }
    }
}

Java Consumer Implementation

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;

public class KafkaConsumerExample {
    private static final String TOPIC_NAME = "orders";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "order-processing-group";
    
    public static void main(String[] args) {
        // Consumer configuration
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // Manual commit
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
        
        // Create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        try {
            // Subscribe to topic
            consumer.subscribe(Collections.singletonList(TOPIC_NAME));
            
            // Message processing loop
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        // Process message
                        processOrder(record.key(), record.value());
                        
                        System.out.printf("Processed message: " +
                            "key=%s, value=%s, partition=%d, offset=%d%n",
                            record.key(), record.value(), record.partition(), record.offset());
                        
                    } catch (Exception e) {
                        System.err.printf("Failed to process message: key=%s, error=%s%n", 
                            record.key(), e.getMessage());
                        // Handle error (dead letter queue, retry, etc.)
                    }
                }
                
                // Manual commit after processing batch
                consumer.commitSync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
    
    private static void processOrder(String key, String value) {
        // Business logic implementation
        System.out.println("Processing order: " + key);
        
        // Simulate processing time
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// Manual partition assignment example
public class ManualPartitionConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        try {
            // Manual partition assignment
            TopicPartition partition0 = new TopicPartition("orders", 0);
            TopicPartition partition1 = new TopicPartition("orders", 1);
            consumer.assign(Arrays.asList(partition0, partition1));
            
            // Seek to specific offset
            consumer.seekToBeginning(Arrays.asList(partition0, partition1));
            // consumer.seek(partition0, 1000);  // Seek to specific offset
            
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s%n",
                        record.partition(), record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

Python Producer and Consumer

# pip install kafka-python

from kafka import KafkaProducer, KafkaConsumer
import json
import time
from datetime import datetime

# Python Producer
class OrderProducer:
    def __init__(self, bootstrap_servers=['localhost:9092']):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks='all',
            retries=3,
            batch_size=16384,
            linger_ms=1,
            compression_type='snappy'
        )
    
    def send_order(self, order_id, customer_id, amount):
        order = {
            'order_id': order_id,
            'customer_id': customer_id,
            'amount': amount,
            'timestamp': datetime.now().isoformat(),
            'status': 'pending'
        }
        
        try:
            # Send message
            future = self.producer.send(
                'orders',
                key=f'order-{order_id}',
                value=order
            )
            
            # Get metadata (synchronous)
            record_metadata = future.get(timeout=10)
            
            print(f"Order sent: {order_id}, partition: {record_metadata.partition}, "
                  f"offset: {record_metadata.offset}")
            
        except Exception as e:
            print(f"Failed to send order {order_id}: {e}")
    
    def close(self):
        self.producer.flush()
        self.producer.close()

# Python Consumer
class OrderConsumer:
    def __init__(self, group_id='order-processor', bootstrap_servers=['localhost:9092']):
        self.consumer = KafkaConsumer(
            'orders',
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda k: k.decode('utf-8') if k else None,
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            max_poll_records=500,
            session_timeout_ms=30000,
            heartbeat_interval_ms=3000
        )
    
    def process_orders(self):
        try:
            for message in self.consumer:
                try:
                    order = message.value
                    order_id = order.get('order_id')
                    
                    # Process order
                    self.process_order(order)
                    
                    print(f"Processed order: {order_id}, partition: {message.partition}, "
                          f"offset: {message.offset}")
                    
                    # Manual commit
                    self.consumer.commit()
                    
                except Exception as e:
                    print(f"Failed to process message: {e}")
                    # Handle error (skip, retry, dead letter queue)
                    
        except KeyboardInterrupt:
            print("Stopping consumer...")
        finally:
            self.consumer.close()
    
    def process_order(self, order):
        # Business logic implementation
        order_id = order.get('order_id')
        amount = order.get('amount')
        
        print(f"Processing order {order_id} for amount ${amount}")
        
        # Simulate processing time
        time.sleep(0.01)

# Usage example
if __name__ == "__main__":
    # Producer example
    producer = OrderProducer()
    
    for i in range(100):
        producer.send_order(
            order_id=i + 1,
            customer_id=(i % 10) + 1,
            amount=round(50 + (i * 10.5), 2)
        )
    
    producer.close()
    
    # Consumer example
    # consumer = OrderConsumer()
    # consumer.process_orders()

Kafka Streams Processing

// Maven dependency for Kafka Streams
// <dependency>
//   <groupId>org.apache.kafka</groupId>
//   <artifactId>kafka-streams</artifactId>
//   <version>3.8.0</version>
// </dependency>

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.Arrays;
import java.util.Properties;

public class OrderProcessingStream {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processing-stream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
                  "org.apache.kafka.common.serialization.Serdes$StringSerde");
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
                  "org.apache.kafka.common.serialization.Serdes$StringSerde");
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Input stream
        KStream<String, String> orders = builder.stream("orders");
        
        // Process and transform orders
        KStream<String, String> processedOrders = orders
            .filter((key, value) -> value.contains("\"status\":\"pending\""))
            .mapValues(value -> value.replace("pending", "processing"))
            .peek((key, value) -> System.out.println("Processing order: " + key));
        
        // Output to processed orders topic
        processedOrders.to("processed-orders");
        
        // Word count example
        KStream<String, String> textLines = builder.stream("text-input");
        
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .groupBy((key, value) -> value)
            .count();
        
        wordCounts.toStream().to("word-count-output");
        
        // Start streaming application
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        
        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        
        streams.start();
        
        System.out.println("Kafka Streams application started");
    }
}

// Advanced stream processing with windowing
public class AdvancedStreamProcessing {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "advanced-stream-processing");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        StreamsBuilder builder = new StreamsBuilder();
        
        KStream<String, String> orders = builder.stream("orders");
        
        // Windowed aggregation
        orders
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .count()
            .toStream()
            .map((windowedKey, count) -> 
                KeyValue.pair(windowedKey.key(), "Count in 5min window: " + count))
            .to("order-counts");
        
        // Join streams
        KStream<String, String> payments = builder.stream("payments");
        
        KStream<String, String> enrichedOrders = orders.join(
            payments,
            (orderValue, paymentValue) -> "Order: " + orderValue + ", Payment: " + paymentValue,
            JoinWindows.of(Duration.ofMinutes(10))
        );
        
        enrichedOrders.to("enriched-orders");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

Cluster Configuration and High Availability

# Three-node cluster configuration

# Node 1 (server-1.properties)
cat > config/kraft/server-1.properties << 'EOF'
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093
listeners=PLAINTEXT://node1:9092,CONTROLLER://node1:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://node1:9092
controller.listener.names=CONTROLLER
log.dirs=/var/lib/kafka-logs-1
num.partitions=3
default.replication.factor=3
min.insync.replicas=2
EOF

# Node 2 (server-2.properties)
cat > config/kraft/server-2.properties << 'EOF'
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093
listeners=PLAINTEXT://node2:9092,CONTROLLER://node2:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://node2:9092
controller.listener.names=CONTROLLER
log.dirs=/var/lib/kafka-logs-2
num.partitions=3
default.replication.factor=3
min.insync.replicas=2
EOF

# Node 3 (server-3.properties)
cat > config/kraft/server-3.properties << 'EOF'
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093
listeners=PLAINTEXT://node3:9092,CONTROLLER://node3:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://node3:9092
controller.listener.names=CONTROLLER
log.dirs=/var/lib/kafka-logs-3
num.partitions=3
default.replication.factor=3
min.insync.replicas=2
EOF

# Cluster startup script
cat > start-cluster.sh << 'EOF'
#!/bin/bash

# Generate cluster ID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
echo "Cluster ID: $KAFKA_CLUSTER_ID"

# Format storage on all nodes
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server-1.properties
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server-2.properties
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server-3.properties

# Start brokers
echo "Starting Kafka brokers..."
bin/kafka-server-start.sh config/kraft/server-1.properties > logs/server-1.log 2>&1 &
bin/kafka-server-start.sh config/kraft/server-2.properties > logs/server-2.log 2>&1 &
bin/kafka-server-start.sh config/kraft/server-3.properties > logs/server-3.log 2>&1 &

sleep 10

# Verify cluster
bin/kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --list
EOF

chmod +x start-cluster.sh

Performance Tuning and Monitoring

# Performance testing
# Producer performance test
bin/kafka-producer-perf-test.sh \
    --topic performance-test \
    --num-records 1000000 \
    --record-size 1024 \
    --throughput 50000 \
    --producer-props bootstrap.servers=localhost:9092 \
                     acks=all \
                     batch.size=32768 \
                     linger.ms=5 \
                     compression.type=snappy

# Consumer performance test
bin/kafka-consumer-perf-test.sh \
    --topic performance-test \
    --messages 1000000 \
    --threads 4 \
    --bootstrap-server localhost:9092 \
    --group perf-test-group

# JMX monitoring setup
export JMX_PORT=9999
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true \
                       -Dcom.sun.management.jmxremote.authenticate=false \
                       -Dcom.sun.management.jmxremote.ssl=false \
                       -Dcom.sun.management.jmxremote.port=9999"

bin/kafka-server-start.sh config/server.properties

# Key metrics to monitor
echo "Important Kafka metrics:
- kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
- kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec  
- kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
- kafka.controller:type=KafkaController,name=OfflinePartitionsCount"

# Log monitoring script
cat > monitor-kafka.sh << 'EOF'
#!/bin/bash

echo "=== Kafka Cluster Status $(date) ==="

# Check broker status
echo "=== Broker Status ==="
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092

# Check topic details
echo "=== Topic Summary ==="
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list | wc -l | xargs echo "Total topics:"

# Check consumer groups
echo "=== Consumer Groups ==="
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list | wc -l | xargs echo "Total consumer groups:"

# Check disk usage
echo "=== Disk Usage ==="
du -sh /var/lib/kafka-logs*

# Check recent errors
echo "=== Recent Errors ==="
tail -n 20 logs/server.log | grep -i error || echo "No recent errors"

echo "=== Monitoring Complete ==="
EOF

chmod +x monitor-kafka.sh