GitHub概要
apache/cassandra
Apache Cassandra®
スター9,296
ウォッチ443
フォーク3,723
作成日:2009年5月21日
言語:Java
ライセンス:Apache License 2.0
トピックス
cassandradatabasejava
スター履歴
データ取得日時: 2025/7/30 02:37
データベース
Apache Cassandra + Vector Search
概要
Apache Cassandraは、高いスケーラビリティと可用性を特徴とする分散NoSQLデータベースです。DataStax社の拡張により、CassandraにVector Search機能が追加され、大規模なベクトルデータの保存と検索が可能になりました。分散アーキテクチャにより、ペタバイト級のデータでもベクトル検索を実行できます。
詳細
Cassandraは2008年にFacebookで開発され、その後Apache Software Foundationに寄贈されました。DataStax Astra DBやDataStax Enterprise (DSE)では、ベクトル検索機能が統合されており、既存のCassandraインフラストラクチャでAIワークロードを実行できます。
Cassandraベクトル検索の主な特徴:
- 大規模分散環境でのベクトル検索
- ANNアルゴリズム(Approximate Nearest Neighbors)
- 高次元ベクトル(最大8192次元)のサポート
- CQLによるベクトル操作
- 線形スケーラビリティ
- マルチデータセンターレプリケーション
- タプル整合性とイベンチュアル整合性
- 高可用性(単一障害点なし)
- 時系列データとの統合
- DataStax Astra DBでのマネージドサービス
アーキテクチャの特徴
- ピアツーピア分散システム
- 一貫性ハッシュによるデータ分散
- ゴシッププロトコルによるノード間通信
- SSTables(Sorted String Tables)によるストレージ
メリット・デメリット
メリット
- 高いスケーラビリティ: 線形スケーリングにより大規模データセットに対応
- 高可用性: 単一障害点がなく、常に利用可能
- 地理的分散: マルチデータセンターレプリケーション
- 書き込み性能: 高速な書き込み処理
- 柔軟な一貫性: アプリケーション要件に応じた一貫性レベル
- 運用実績: 多くの大規模システムでの採用実績
デメリット
- 複雑な運用: クラスタ管理とチューニングが必要
- 結果整合性: 強一貫性が必要な場合は性能が低下
- クエリの制限: セカンダリインデックスの制約
- メモリ使用量: 大量のメモリを必要とする
- 学習曲線: CQLとデータモデリングの理解が必要
主要リンク
書き方の例
セットアップとテーブル作成
-- ベクトル検索対応テーブルの作成
CREATE TABLE IF NOT EXISTS vector_data (
id UUID PRIMARY KEY,
title TEXT,
content TEXT,
embedding VECTOR<FLOAT, 768>,
category TEXT,
created_at TIMESTAMP
);
-- ベクトルインデックスの作成
CREATE CUSTOM INDEX IF NOT EXISTS embedding_index
ON vector_data(embedding)
USING 'org.apache.cassandra.index.sai.StorageAttachedIndex'
WITH OPTIONS = {
'similarity_function': 'cosine'
};
Pythonでの基本操作
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import numpy as np
import uuid
from datetime import datetime
# 接続設定
auth_provider = PlainTextAuthProvider(
username='cassandra',
password='cassandra'
)
cluster = Cluster(
['127.0.0.1'],
auth_provider=auth_provider
)
session = cluster.connect('vector_keyspace')
# ドキュメントの挿入
def insert_vector_data(title, content, embedding):
query = """
INSERT INTO vector_data (id, title, content, embedding, category, created_at)
VALUES (?, ?, ?, ?, ?, ?)
"""
session.execute(query, (
uuid.uuid4(),
title,
content,
embedding.tolist(),
'technology',
datetime.now()
))
# ベクトルの挿入
embedding = np.random.rand(768).astype(np.float32)
insert_vector_data(
"Cassandraベクトル検索",
"分散環境でのベクトル検索実装",
embedding
)
# ベクトル検索
def vector_search(query_vector, limit=10):
query = """
SELECT id, title, content,
similarity_cosine(embedding, ?) as similarity
FROM vector_data
ORDER BY embedding ANN OF ?
LIMIT ?
"""
results = session.execute(query, (
query_vector.tolist(),
query_vector.tolist(),
limit
))
return results
# 検索実行
query_embedding = np.random.rand(768).astype(np.float32)
results = vector_search(query_embedding)
for row in results:
print(f"Title: {row.title}, Similarity: {row.similarity}")
バッチ処理と最適化
from cassandra.concurrent import execute_concurrent_with_args
# バッチ挿入
def batch_insert_vectors(documents):
query = """
INSERT INTO vector_data (id, title, content, embedding, category, created_at)
VALUES (?, ?, ?, ?, ?, ?)
"""
# データの準備
parameters = []
for doc in documents:
parameters.append((
uuid.uuid4(),
doc['title'],
doc['content'],
doc['embedding'].tolist(),
doc['category'],
datetime.now()
))
# 並行実行
execute_concurrent_with_args(
session,
query,
parameters,
concurrency=50
)
# フィルタリング付き検索
def filtered_vector_search(query_vector, category, limit=10):
query = """
SELECT id, title, content,
similarity_cosine(embedding, ?) as similarity
FROM vector_data
WHERE category = ?
ORDER BY embedding ANN OF ?
LIMIT ?
ALLOW FILTERING
"""
results = session.execute(query, (
query_vector.tolist(),
category,
query_vector.tolist(),
limit
))
return results
DataStax Astra DB での実装
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json
# Astra DB接続
cloud_config = {
'secure_connect_bundle': '/path/to/secure-connect-bundle.zip'
}
with open('/path/to/token.json') as f:
secrets = json.load(f)
auth_provider = PlainTextAuthProvider(
secrets['clientId'],
secrets['secret']
)
cluster = Cluster(
cloud=cloud_config,
auth_provider=auth_provider
)
session = cluster.connect()
# ベクトル検索(Astra DB)
def astra_vector_search(query_vector, collection_name):
query = f"""
SELECT * FROM {collection_name}
WHERE vector_embedding ANN OF ?
LIMIT 10
"""
results = session.execute(query, [query_vector.tolist()])
return results