Apache Cassandra
高可用性と線形スケーラビリティを提供する分散NoSQLデータベース。ビッグデータ処理に特化。単一障害点のない分散アーキテクチャ。
データベースサーバー
Apache Cassandra
概要
Apache Cassandraは、大規模な分散システム向けに設計されたオープンソースのNoSQLデータベースです。Facebookで開発され、現在はApache Software Foundationで管理されています。線形スケーラビリティと高い可用性を実現し、単一障害点がないアーキテクチャにより、ミッションクリティカルなアプリケーションで大量のデータを処理できます。Netflix、Apple、Instagram、Uberなど多くの大手企業が採用しており、特にWebスケールアプリケーションと時系列データ処理で圧倒的な強みを発揮します。
詳細
Apache Cassandra 2025年版は、数年にわたる成熟化により分散データベースの決定版として確立されています。最新の4.1系列では、仮想テーブルによるクラスター監視の強化、SAI(Storage Attached Indexing)による高速検索機能、改良されたcompaction戦略(UCS:Unified Compaction Strategy)が導入されています。CQLクエリ言語はSQLライクな構文により習得しやすく、ノード間の自動データ複製、Gossipプロトコルによる障害検出、チューナブル一貫性による性能・整合性のトレードオフ制御が可能です。Storage Attached IndexingとMaterialized Viewにより複雑なクエリパターンにも対応し、大規模環境でのパフォーマンス最適化が大幅に改善されています。
主な特徴
- 線形スケーラビリティ: ノード追加により性能が比例的に向上
- 高可用性: 単一障害点なし、自動フェイルオーバー
- カラム指向データモデル: 柔軟なスキーマ設計と効率的なデータ圧縮
- チューナブル一貫性: 用途に応じた整合性レベルの選択
- 分散アーキテクチャ: マスター・スレーブ構成なし、すべてのノードが対等
- 地理的分散: データセンターをまたいだレプリケーション対応
メリット・デメリット
メリット
- 大規模分散環境における圧倒的な性能と可用性の実績
- ノード数に比例したスケーラビリティと単一障害点の排除
- オープンソースで豊富なエコシステムとコミュニティサポート
- 地理的分散とマルチデータセンター構成の標準サポート
- 時系列データとリアルタイムアナリティクスに最適化
- NetflixやUberなど大手企業での実証済みの運用実績
デメリット
- 複雑な分散システムのため運用・保守に高度な専門知識が必要
- JOINやサブクエリなど複雑なクエリ機能の制約
- データモデル設計がクエリパターンに強く依存し設計変更が困難
- メモリ使用量が多く、ハードウェアリソースの要求が高い
- 強い整合性が必要なアプリケーションには不向き
- 初期設定とクラスター管理の複雑さ
参考ページ
書き方の例
インストールと基本セットアップ
# Java 11以上のインストール(Ubuntu/Debian)
sudo apt update
sudo apt install openjdk-11-jdk
# Cassandraリポジトリの追加
curl https://downloads.apache.org/cassandra/KEYS | sudo apt-key add -
echo "deb https://debian.cassandra.apache.org 41x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.list
# Cassandraのインストール
sudo apt update
sudo apt install cassandra
# CentOS/RHEL/Fedoraでのインストール
sudo dnf install java-11-openjdk
sudo tee /etc/yum.repos.d/cassandra.repo << 'EOF'
[cassandra]
name=Apache Cassandra
baseurl=https://redhat.cassandra.apache.org/41x/
gpgcheck=1
repo_gpgcheck=1
gpgkey=https://downloads.apache.org/cassandra/KEYS
EOF
sudo dnf install cassandra
# Dockerでの実行
docker run --name cassandra-node1 \
-p 9042:9042 \
-d cassandra:4.1
# 複数ノードクラスター(Docker Compose)
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
cassandra-node1:
image: cassandra:4.1
ports:
- "9042:9042"
environment:
- CASSANDRA_CLUSTER_NAME=MyCluster
- CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch
- CASSANDRA_DC=datacenter1
- CASSANDRA_RACK=rack1
volumes:
- cassandra1-data:/var/lib/cassandra
cassandra-node2:
image: cassandra:4.1
environment:
- CASSANDRA_CLUSTER_NAME=MyCluster
- CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch
- CASSANDRA_DC=datacenter1
- CASSANDRA_RACK=rack1
- CASSANDRA_SEEDS=cassandra-node1
volumes:
- cassandra2-data:/var/lib/cassandra
depends_on:
- cassandra-node1
cassandra-node3:
image: cassandra:4.1
environment:
- CASSANDRA_CLUSTER_NAME=MyCluster
- CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch
- CASSANDRA_DC=datacenter1
- CASSANDRA_RACK=rack2
- CASSANDRA_SEEDS=cassandra-node1
volumes:
- cassandra3-data:/var/lib/cassandra
depends_on:
- cassandra-node1
volumes:
cassandra1-data:
cassandra2-data:
cassandra3-data:
EOF
docker-compose up -d
# サービス開始・状態確認
sudo systemctl enable cassandra
sudo systemctl start cassandra
sudo systemctl status cassandra
# クラスター状態確認
nodetool status
nodetool info
基本的なCQLクエリとデータ操作
-- CQLShellへの接続
cqlsh localhost 9042
-- キースペース作成(データベース相当)
CREATE KEYSPACE IF NOT EXISTS myapp
WITH REPLICATION = {
'class': 'NetworkTopologyStrategy',
'datacenter1': 3
};
-- 単純レプリケーション戦略(開発用)
CREATE KEYSPACE IF NOT EXISTS myapp_dev
WITH REPLICATION = {
'class': 'SimpleStrategy',
'replication_factor': 1
};
USE myapp;
-- ユーザーテーブル作成
CREATE TABLE IF NOT EXISTS users (
user_id UUID PRIMARY KEY,
username TEXT,
email TEXT,
first_name TEXT,
last_name TEXT,
created_at TIMESTAMP,
updated_at TIMESTAMP,
is_active BOOLEAN,
profile MAP<TEXT, TEXT>
);
-- セカンダリインデックス作成
CREATE INDEX IF NOT EXISTS users_username_idx ON users (username);
CREATE INDEX IF NOT EXISTS users_email_idx ON users (email);
-- データ挿入
INSERT INTO users (user_id, username, email, first_name, last_name, created_at, updated_at, is_active, profile)
VALUES (uuid(), 'john_doe', '[email protected]', 'John', 'Doe', toTimestamp(now()), toTimestamp(now()), true,
{'bio': 'Software Engineer', 'location': 'Tokyo', 'company': 'Tech Corp'});
INSERT INTO users (user_id, username, email, first_name, last_name, created_at, updated_at, is_active, profile)
VALUES (uuid(), 'jane_smith', '[email protected]', 'Jane', 'Smith', toTimestamp(now()), toTimestamp(now()), true,
{'bio': 'Data Scientist', 'location': 'Osaka', 'company': 'Analytics Inc'});
-- データ取得
SELECT * FROM users;
SELECT user_id, username, email FROM users WHERE username = 'john_doe';
SELECT * FROM users WHERE email = '[email protected]';
-- 複合プライマリキーのテーブル(時系列データ向け)
CREATE TABLE IF NOT EXISTS user_activities (
user_id UUID,
activity_date DATE,
activity_time TIMESTAMP,
activity_type TEXT,
details MAP<TEXT, TEXT>,
metadata FROZEN<MAP<TEXT, TEXT>>,
PRIMARY KEY ((user_id, activity_date), activity_time)
) WITH CLUSTERING ORDER BY (activity_time DESC);
-- 時系列データ挿入
INSERT INTO user_activities (user_id, activity_date, activity_time, activity_type, details)
VALUES (uuid(), '2024-01-15', toTimestamp(now()), 'login', {'ip_address': '192.168.1.100', 'user_agent': 'Chrome'});
INSERT INTO user_activities (user_id, activity_date, activity_time, activity_type, details)
VALUES (uuid(), '2024-01-15', toTimestamp(now()), 'page_view', {'page': '/dashboard', 'referrer': '/login'});
-- パーティション内クエリ
SELECT * FROM user_activities
WHERE user_id = uuid() AND activity_date = '2024-01-15'
ORDER BY activity_time DESC
LIMIT 10;
-- バッチ文による複数操作
BEGIN BATCH
INSERT INTO users (user_id, username, email, first_name, last_name, created_at, is_active)
VALUES (uuid(), 'bob_wilson', '[email protected]', 'Bob', 'Wilson', toTimestamp(now()), true);
INSERT INTO user_activities (user_id, activity_date, activity_time, activity_type, details)
VALUES (uuid(), '2024-01-15', toTimestamp(now()), 'registration', {'method': 'email', 'source': 'website'});
APPLY BATCH;
-- TTL(Time To Live)の使用
INSERT INTO users (user_id, username, email, first_name, last_name, created_at, is_active)
VALUES (uuid(), 'temp_user', '[email protected]', 'Temp', 'User', toTimestamp(now()), true)
USING TTL 3600; -- 1時間後に自動削除
-- カウンターテーブル
CREATE TABLE IF NOT EXISTS page_view_counts (
page_url TEXT PRIMARY KEY,
view_count COUNTER
);
UPDATE page_view_counts SET view_count = view_count + 1 WHERE page_url = '/dashboard';
UPDATE page_view_counts SET view_count = view_count + 5 WHERE page_url = '/profile';
SELECT * FROM page_view_counts;
高度なデータモデリングとクエリ最適化
-- 商品カタログとレビューシステム
CREATE TABLE IF NOT EXISTS products (
product_id UUID PRIMARY KEY,
name TEXT,
description TEXT,
category TEXT,
price DECIMAL,
tags SET<TEXT>,
specifications MAP<TEXT, TEXT>,
created_at TIMESTAMP,
updated_at TIMESTAMP
);
-- カテゴリ別商品検索用テーブル
CREATE TABLE IF NOT EXISTS products_by_category (
category TEXT,
product_id UUID,
name TEXT,
price DECIMAL,
created_at TIMESTAMP,
PRIMARY KEY (category, created_at, product_id)
) WITH CLUSTERING ORDER BY (created_at DESC, product_id ASC);
-- 商品レビューテーブル(ユーザーと商品での複合パーティション)
CREATE TABLE IF NOT EXISTS product_reviews (
product_id UUID,
user_id UUID,
review_id TIMEUUID,
rating INT,
title TEXT,
content TEXT,
helpful_votes INT,
created_at TIMESTAMP,
PRIMARY KEY ((product_id, user_id), review_id)
) WITH CLUSTERING ORDER BY (review_id DESC);
-- 商品別レビュー集計用テーブル
CREATE TABLE IF NOT EXISTS product_review_summary (
product_id UUID PRIMARY KEY,
total_reviews COUNTER,
total_rating COUNTER,
five_star_count COUNTER,
four_star_count COUNTER,
three_star_count COUNTER,
two_star_count COUNTER,
one_star_count COUNTER
);
-- データ挿入とクエリパターン
-- 商品データ挿入
INSERT INTO products (product_id, name, description, category, price, tags, specifications, created_at, updated_at)
VALUES (
uuid(),
'Gaming Laptop Pro',
'High-performance gaming laptop',
'Electronics',
1299.99,
{'gaming', 'laptop', 'high-performance'},
{'cpu': 'Intel i7-12700H', 'gpu': 'RTX 3070', 'memory': '32GB', 'storage': '1TB SSD'},
toTimestamp(now()),
toTimestamp(now())
);
-- カテゴリ別商品テーブルにも挿入(非正規化)
INSERT INTO products_by_category (category, product_id, name, price, created_at)
VALUES ('Electronics', uuid(), 'Gaming Laptop Pro', 1299.99, toTimestamp(now()));
-- レビュー挿入
INSERT INTO product_reviews (product_id, user_id, review_id, rating, title, content, helpful_votes, created_at)
VALUES (
uuid(), -- product_id
uuid(), -- user_id
now(), -- review_id (TIMEUUID)
5,
'Excellent gaming laptop!',
'Great performance for gaming and work. Highly recommended.',
0,
toTimestamp(now())
);
-- レビュー集計カウンター更新
UPDATE product_review_summary
SET total_reviews = total_reviews + 1,
total_rating = total_rating + 5,
five_star_count = five_star_count + 1
WHERE product_id = uuid();
-- 効率的なクエリパターン
-- カテゴリ別商品取得(パーティション内ソート済み)
SELECT * FROM products_by_category
WHERE category = 'Electronics'
LIMIT 20;
-- 特定商品のレビュー取得
SELECT * FROM product_reviews
WHERE product_id = uuid() AND user_id = uuid()
ORDER BY review_id DESC;
-- UDT(User Defined Type)の使用
CREATE TYPE IF NOT EXISTS address (
street TEXT,
city TEXT,
state TEXT,
zip_code TEXT,
country TEXT
);
CREATE TABLE IF NOT EXISTS users_with_address (
user_id UUID PRIMARY KEY,
username TEXT,
email TEXT,
home_address FROZEN<address>,
work_address FROZEN<address>,
created_at TIMESTAMP
);
-- UDTデータ挿入
INSERT INTO users_with_address (user_id, username, email, home_address, created_at)
VALUES (
uuid(),
'alice_johnson',
'[email protected]',
{street: '123 Main St', city: 'Tokyo', state: 'Tokyo', zip_code: '100-0001', country: 'Japan'},
toTimestamp(now())
);
レプリケーション戦略とクラスター管理
# クラスター情報確認
nodetool status
nodetool info
nodetool describecluster
nodetool ring
# ノード追加準備
# 新ノードのcassandra.yaml設定
sudo tee -a /etc/cassandra/cassandra.yaml << 'EOF'
cluster_name: 'MyProductionCluster'
num_tokens: 256
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
- seeds: "192.168.1.10,192.168.1.11,192.168.1.12"
listen_address: 192.168.1.15
rpc_address: 192.168.1.15
endpoint_snitch: GossipingPropertyFileSnitch
EOF
# 新ノードをクラスターに追加
sudo systemctl start cassandra
# クラスター状態監視
nodetool status
nodetool netstats
# データセンター設定(cassandra-rackdc.properties)
sudo tee /etc/cassandra/cassandra-rackdc.properties << 'EOF'
dc=datacenter1
rack=rack1
prefer_local=true
EOF
# マルチデータセンター設定用キースペース
CREATE KEYSPACE IF NOT EXISTS myapp_multidc
WITH REPLICATION = {
'class': 'NetworkTopologyStrategy',
'datacenter1': 3,
'datacenter2': 2
};
# レプリケーション情報確認
DESCRIBE KEYSPACE myapp_multidc;
# ノード修復(定期メンテナンス)
nodetool repair -pr # Primary range repair
# 特定キースペースの修復
nodetool repair myapp
# 圧縮(Compaction)強制実行
nodetool compact myapp users
# スナップショット作成
nodetool snapshot myapp --tag snapshot_$(date +%Y%m%d_%H%M%S)
# スナップショット確認
nodetool listsnapshots
# ガベージコレクション実行
nodetool gc
# クラスター統計
nodetool tablestats myapp.users
nodetool cfstats myapp.users
Storage Attached Index(SAI)と検索機能
-- SAIインデックス作成(高速検索用)
CREATE CUSTOM INDEX IF NOT EXISTS users_first_name_sai
ON users (first_name)
USING 'StorageAttachedIndex';
CREATE CUSTOM INDEX IF NOT EXISTS users_profile_sai
ON users (profile)
USING 'StorageAttachedIndex';
-- 範囲検索対応SAIインデックス
CREATE CUSTOM INDEX IF NOT EXISTS users_created_at_sai
ON users (created_at)
USING 'StorageAttachedIndex';
-- テキスト検索用SAIインデックス
CREATE CUSTOM INDEX IF NOT EXISTS products_description_sai
ON products (description)
USING 'StorageAttachedIndex'
WITH OPTIONS = {'similarity_function': 'cosine'};
-- SAIを使用したクエリ例
-- 部分一致検索
SELECT * FROM users WHERE first_name LIKE 'J%';
-- MAP型データの検索
SELECT * FROM users WHERE profile['location'] = 'Tokyo';
-- 範囲クエリ
SELECT * FROM users
WHERE created_at >= '2024-01-01' AND created_at < '2024-02-01';
-- 複合条件
SELECT * FROM users
WHERE first_name = 'John' AND created_at >= '2024-01-01';
-- テキスト検索
SELECT * FROM products
WHERE description LIKE '%gaming%';
-- Materialized View(物理的なビュー)
CREATE MATERIALIZED VIEW IF NOT EXISTS users_by_email AS
SELECT user_id, username, email, first_name, last_name, created_at
FROM users
WHERE email IS NOT NULL AND user_id IS NOT NULL
PRIMARY KEY (email, user_id);
-- Materialized Viewを使用したクエリ
SELECT * FROM users_by_email WHERE email = '[email protected]';
-- 集約関数とグループ化
SELECT category, COUNT(*) as product_count
FROM products_by_category
WHERE category IN ('Electronics', 'Books', 'Clothing')
GROUP BY category;
パフォーマンス監視と最適化
# JMXメトリクス確認
nodetool tpstats # Thread pool statistics
nodetool compactionstats # Compaction statistics
nodetool proxyhistograms # Latency histograms
# テーブル統計
nodetool tablestats myapp.users
nodetool tablehistograms myapp users
# ノードパフォーマンス監視
nodetool gossipinfo
nodetool getcompactionthreshold myapp users
# 設定変更
nodetool setcompactionthreshold myapp users 4 32
nodetool sethintedhandoffthrottlekb 1024
# ログ監視
sudo tail -f /var/log/cassandra/system.log
sudo tail -f /var/log/cassandra/debug.log
# GC統計
nodetool gcstats
# Prometheus監視設定(JMXエクスポーター)
cat > /opt/cassandra/conf/jmx_prometheus.yml << 'EOF'
jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:7199/jmxrmi
ssl: false
lowercaseOutputName: false
lowercaseOutputLabelNames: false
whitelistObjectNames:
- "org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Latency"
- "org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Latency"
- "org.apache.cassandra.metrics:type=Storage,name=Load"
- "org.apache.cassandra.metrics:type=Storage,name=Exceptions"
rules:
- pattern: 'org.apache.cassandra.metrics<type=(.+), name=(.+)><>Value'
name: cassandra_$1_$2
type: GAUGE
EOF
# JMXエクスポーター起動
java -jar jmx_prometheus_javaagent.jar 8080:/opt/cassandra/conf/jmx_prometheus.yml
# クエリトレーシング有効化
TRACING ON;
SELECT * FROM users WHERE username = 'john_doe';
TRACING OFF;
# スロークエリログ設定
ALTER TABLE system_views.slow_queries WITH gc_grace_seconds = 86400;
アプリケーション統合とベストプラクティス
# Python(cassandra-driver)を使用した例
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
from cassandra.query import SimpleStatement, PreparedStatement
import uuid
from datetime import datetime
class CassandraConnection:
def __init__(self, hosts=['127.0.0.1'], keyspace='myapp'):
# 本番環境での推奨設定
auth_provider = PlainTextAuthProvider(
username='cassandra',
password='cassandra'
)
# ロードバランシングポリシー
load_balancing_policy = TokenAwarePolicy(
DCAwareRoundRobinPolicy(local_dc='datacenter1')
)
self.cluster = Cluster(
hosts,
auth_provider=auth_provider,
load_balancing_policy=load_balancing_policy,
port=9042,
protocol_version=4
)
self.session = self.cluster.connect()
self.session.set_keyspace(keyspace)
# プリペアドステートメント準備
self.prepared_statements = self._prepare_statements()
def _prepare_statements(self):
return {
'insert_user': self.session.prepare("""
INSERT INTO users (user_id, username, email, first_name, last_name, created_at, is_active)
VALUES (?, ?, ?, ?, ?, ?, ?)
"""),
'get_user_by_username': self.session.prepare("""
SELECT * FROM users WHERE username = ?
"""),
'insert_activity': self.session.prepare("""
INSERT INTO user_activities (user_id, activity_date, activity_time, activity_type, details)
VALUES (?, ?, ?, ?, ?)
"""),
'get_activities': self.session.prepare("""
SELECT * FROM user_activities
WHERE user_id = ? AND activity_date = ?
ORDER BY activity_time DESC
LIMIT ?
""")
}
def create_user(self, username, email, first_name, last_name):
"""ユーザー作成"""
user_id = uuid.uuid4()
self.session.execute(
self.prepared_statements['insert_user'],
[user_id, username, email, first_name, last_name, datetime.now(), True]
)
return user_id
def get_user_by_username(self, username):
"""ユーザー名によるユーザー取得"""
result = self.session.execute(
self.prepared_statements['get_user_by_username'],
[username]
)
return result.one() if result else None
def log_user_activity(self, user_id, activity_type, details=None):
"""ユーザーアクティビティ記録"""
from datetime import date
self.session.execute(
self.prepared_statements['insert_activity'],
[user_id, date.today(), datetime.now(), activity_type, details or {}]
)
def get_user_activities(self, user_id, activity_date, limit=50):
"""ユーザーアクティビティ取得"""
result = self.session.execute(
self.prepared_statements['get_activities'],
[user_id, activity_date, limit]
)
return list(result)
def batch_operations(self, operations):
"""バッチ操作実行"""
from cassandra.query import BatchStatement
batch = BatchStatement()
for statement, parameters in operations:
batch.add(statement, parameters)
self.session.execute(batch)
def close(self):
"""接続クローズ"""
self.cluster.shutdown()
# 使用例
def main():
# Cassandra接続
db = CassandraConnection(['192.168.1.10', '192.168.1.11', '192.168.1.12'])
try:
# ユーザー作成
user_id = db.create_user(
username='alice_cooper',
email='[email protected]',
first_name='Alice',
last_name='Cooper'
)
print(f"Created user: {user_id}")
# ユーザー取得
user = db.get_user_by_username('alice_cooper')
print(f"Retrieved user: {user}")
# アクティビティ記録
db.log_user_activity(
user_id=user_id,
activity_type='login',
details={'ip_address': '192.168.1.100', 'user_agent': 'Chrome/91.0'}
)
# バッチ操作例
from datetime import date
batch_ops = [
(db.prepared_statements['insert_activity'],
[user_id, date.today(), datetime.now(), 'page_view', {'page': '/dashboard'}]),
(db.prepared_statements['insert_activity'],
[user_id, date.today(), datetime.now(), 'click', {'element': 'nav-profile'}])
]
db.batch_operations(batch_ops)
# アクティビティ取得
activities = db.get_user_activities(user_id, date.today())
print(f"User activities: {len(activities)} records")
finally:
db.close()
if __name__ == "__main__":
main()
# Spring Boot統合例(Java)
"""
// application.yml
spring:
cassandra:
contact-points: 192.168.1.10,192.168.1.11,192.168.1.12
port: 9042
keyspace-name: myapp
username: cassandra
password: cassandra
local-datacenter: datacenter1
request:
timeout: 10s
consistency: LOCAL_QUORUM
// CassandraConfig.java
@Configuration
@EnableCassandraRepositories
public class CassandraConfig extends AbstractCassandraConfiguration {
@Override
protected String getKeyspaceName() {
return "myapp";
}
@Override
protected String getContactPoints() {
return "192.168.1.10,192.168.1.11,192.168.1.12";
}
@Override
protected int getPort() {
return 9042;
}
@Override
protected String getLocalDataCenter() {
return "datacenter1";
}
}
// User.java (Entity)
@Table("users")
public class User {
@PrimaryKey
private UUID userId;
@Column("username")
private String username;
@Column("email")
private String email;
@Column("first_name")
private String firstName;
@Column("last_name")
private String lastName;
@Column("created_at")
private Instant createdAt;
@Column("is_active")
private Boolean isActive;
// getters and setters
}
// UserRepository.java
@Repository
public interface UserRepository extends CassandraRepository<User, UUID> {
@Query("SELECT * FROM users WHERE username = ?0")
Optional<User> findByUsername(String username);
@Query("SELECT * FROM users WHERE email = ?0")
Optional<User> findByEmail(String email);
}
"""