Apache Kafka
分散ストリーミングプラットフォーム。高スループットのリアルタイムデータストリームを処理。パブリッシュ・サブスクライブメッセージングとストリーム処理を統合。
Apache Kafka
Apache Kafkaは、分散ストリーミングプラットフォームです。高スループットでフォルトトレラントなリアルタイムデータフィードを構築するために設計され、大規模なメッセージストリーミング、イベント処理、データパイプラインの構築に使用されます。
主な特徴
高性能・高スループット
- 毎秒数百万メッセージの処理能力
- 低レイテンシーでの配信保証
- 水平スケーリング対応
- パーティション並列処理
耐障害性と永続性
- レプリケーションによる高可用性
- データの永続化とログベースストレージ
- 自動フェイルオーバー
- 分散コミットログアーキテクチャ
スケーラビリティ
- クラスターの動的スケーリング
- パーティション分散
- Broker の追加・削除対応
- 複数データセンター対応
インストール
Java環境の準備
# Java 11 または 17 のインストール
sudo apt update
sudo apt install openjdk-17-jdk
# JAVA_HOME の設定
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
echo 'export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64' >> ~/.bashrc
Kafka バイナリインストール
# Kafka のダウンロード
wget https://downloads.apache.org/kafka/2.13-3.8.0/kafka_2.13-3.8.0.tgz
tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0
# 環境変数設定
export KAFKA_HOME=$(pwd)
export PATH=$PATH:$KAFKA_HOME/bin
Docker での起動
# Docker Compose ファイル例
cat << EOF > docker-compose.yml
version: '3.8'
services:
kafka:
image: apache/kafka:latest
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka:19092'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:19093'
KAFKA_LISTENERS: 'CONTROLLER://:19093,PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
volumes:
- kafka_data:/tmp/kraft-combined-logs
volumes:
kafka_data:
EOF
docker-compose up -d
基本設定
KRaft モード(Kafka 3.0+)
# クラスターID生成
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
# ストレージフォーマット
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
# Kafka サーバー起動
bin/kafka-server-start.sh config/kraft/server.properties
server.properties 設定例
# ブローカー設定
broker.id=1
listeners=PLAINTEXT://localhost:9092
log.dirs=/var/lib/kafka-logs
# レプリケーション設定
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# ログ保持設定
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# パーティション設定
num.partitions=1
default.replication.factor=1
min.insync.replicas=1
# クリーンアップ設定
log.cleanup.policy=delete
# ZooKeeper接続(KRaftモードでは不要)
# zookeeper.connect=localhost:2181
# zookeeper.connection.timeout.ms=18000
# KRaftモード設定
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@localhost:9093
listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER
log.dirs=/tmp/kraft-combined-logs
基本操作
トピック管理
# トピック作成
bin/kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# トピック一覧
bin/kafka-topics.sh --list \
--bootstrap-server localhost:9092
# トピック詳細確認
bin/kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092
# トピック削除
bin/kafka-topics.sh --delete \
--topic my-topic \
--bootstrap-server localhost:9092
プロデューサー操作
# コンソールプロデューサー
bin/kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092
# キー付きメッセージ送信
bin/kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 \
--property "parse.key=true" \
--property "key.separator=:"
# ファイルからメッセージ送信
bin/kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 < messages.txt
コンシューマー操作
# コンソールコンシューマー
bin/kafka-console-consumer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 \
--from-beginning
# キー付きメッセージ受信
bin/kafka-console-consumer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 \
--property print.key=true \
--property key.separator="-" \
--from-beginning
# コンシューマーグループ指定
bin/kafka-console-consumer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 \
--group my-group \
--from-beginning
コンシューマーグループ管理
# コンシューマーグループ一覧
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
# コンシューマーグループ詳細
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--describe
# オフセットリセット
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--reset-offsets \
--to-earliest \
--topic my-topic \
--execute
プログラミング例
Java Producer
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// プロデューサー作成
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
// メッセージ送信
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key-" + i, "message-" + i);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.printf("Sent message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), metadata.partition(), metadata.offset());
}
});
}
} finally {
producer.close();
}
}
}
Java Consumer
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// コンシューマー作成
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
// トピック購読
consumer.subscribe(Collections.singletonList("my-topic"));
// メッセージ受信
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} finally {
consumer.close();
}
}
}
Python Producer (kafka-python)
from kafka import KafkaProducer
import json
# プロデューサー設定
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
# メッセージ送信
for i in range(100):
message = {'id': i, 'message': f'Hello Kafka {i}'}
future = producer.send('my-topic', key=f'key-{i}', value=message)
# 同期送信
record_metadata = future.get(timeout=10)
print(f"Sent: topic={record_metadata.topic}, partition={record_metadata.partition}, offset={record_metadata.offset}")
producer.flush()
producer.close()
Python Consumer (kafka-python)
from kafka import KafkaConsumer
import json
# コンシューマー設定
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
auto_offset_reset='earliest'
)
# メッセージ受信
for message in consumer:
print(f"Received: key={message.key}, value={message.value}, partition={message.partition}, offset={message.offset}")
クラスター構成
3ノードクラスター設定
# ノード1設定 (server-1.properties)
broker.id=1
listeners=PLAINTEXT://node1:9092,CONTROLLER://node1:9093
advertised.listeners=PLAINTEXT://node1:9092
node.id=1
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093
log.dirs=/var/lib/kafka-logs-1
# ノード2設定 (server-2.properties)
broker.id=2
listeners=PLAINTEXT://node2:9092,CONTROLLER://node2:9093
advertised.listeners=PLAINTEXT://node2:9092
node.id=2
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093
log.dirs=/var/lib/kafka-logs-2
# ノード3設定 (server-3.properties)
broker.id=3
listeners=PLAINTEXT://node3:9092,CONTROLLER://node3:9093
advertised.listeners=PLAINTEXT://node3:9092
node.id=3
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093
log.dirs=/var/lib/kafka-logs-3
クラスター起動
# 各ノードでストレージフォーマット
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
# ノード1
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server-1.properties
bin/kafka-server-start.sh config/kraft/server-1.properties
# ノード2
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server-2.properties
bin/kafka-server-start.sh config/kraft/server-2.properties
# ノード3
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server-3.properties
bin/kafka-server-start.sh config/kraft/server-3.properties
Kafka Streams
ストリーム処理例
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class WordCountStream {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
"org.apache.kafka.common.serialization.Serdes$StringSerde");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("text-input");
textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.count()
.toStream()
.to("word-count-output");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// シャットダウンフック
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
パフォーマンス最適化
プロデューサー最適化
# スループット重視設定
batch.size=32768
linger.ms=5
compression.type=snappy
acks=1
# レイテンシー重視設定
batch.size=1
linger.ms=0
compression.type=none
acks=1
コンシューマー最適化
# バッチサイズ調整
fetch.min.bytes=1024
fetch.max.wait.ms=500
max.partition.fetch.bytes=1048576
# セッション管理
session.timeout.ms=30000
heartbeat.interval.ms=3000
ブローカー最適化
# ネットワーク設定
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
# メモリ設定
replica.fetch.max.bytes=1048576
message.max.bytes=1000000
# ディスクI/O設定
log.flush.interval.messages=10000
log.flush.interval.ms=1000
監視と運用
JMX メトリクス確認
# JMXポート設定
export JMX_PORT=9999
bin/kafka-server-start.sh config/server.properties
# メトリクス取得例
echo "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec" | \
java -jar jmxterm.jar -l localhost:9999
ログレベル変更
# ログレベル確認
bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 \
--describe --json
# 動的ログレベル変更
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type brokers \
--entity-name 1 \
--alter \
--add-config 'log4j.logger.kafka.server.KafkaRequestHandler=DEBUG'
パフォーマンステスト
# プロデューサーテスト
bin/kafka-producer-perf-test.sh \
--topic test-topic \
--num-records 1000000 \
--record-size 1024 \
--throughput 10000 \
--producer-props bootstrap.servers=localhost:9092
# コンシューマーテスト
bin/kafka-consumer-perf-test.sh \
--topic test-topic \
--messages 1000000 \
--threads 1 \
--bootstrap-server localhost:9092
セキュリティ
SSL/TLS設定
# server.properties
listeners=SSL://localhost:9093
security.inter.broker.protocol=SSL
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=test1234
SASL認証設定
# server.properties
listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
Apache Kafkaは、その高いスループット、スケーラビリティ、耐障害性により、大規模なリアルタイムデータ処理、ログ集約、メッセージング、イベント駆動アーキテクチャの構築において業界標準として広く採用されています。