GitHub概要

cockroachdb/cockroach

CockroachDB — the cloud native, distributed SQL database designed for high availability, effortless scale, and control over data placement.

スター31,127
ウォッチ678
フォーク3,940
作成日:2014年2月6日
言語:Go
ライセンス:Other

トピックス

cockroachdbdatabasedistributed-databasegohacktoberfestsql

スター履歴

cockroachdb/cockroach Star History
データ取得日時: 2025/7/30 02:37

データベース

CockroachDB + pgvector

概要

CockroachDBは、PostgreSQL互換の分散SQLデータベースで、グローバルに分散されたアプリケーションのために設計されています。pgvector拡張機能を使用することで、ベクトル検索機能を追加でき、分散環境でのベクトルデータの保存と検索が可能になります。強い一貫性と高可用性を維持しながら、AIワークロードを実行できます。

詳細

CockroachDBは2015年にCockroach Labs社によって開発され、Google Spannerにインスパイアされた設計となっています。PostgreSQL互換のため、pgvectorを含む多くのPostgreSQL拡張機能が利用可能です。分散トランザクション、自動レプリケーション、地理的分散などの機能により、グローバルスケールでのベクトル検索アプリケーションを構築できます。

CockroachDB + pgvectorの主な特徴:

  • 分散環境でのベクトル検索
  • PostgreSQL互換によるpgvectorサポート
  • 強い一貫性(ACID準拠)
  • 自動レプリケーションとシャーディング
  • 地理的に分散したクラスタ
  • ゼロダウンタイムでのスケーリング
  • 自動障害復旧
  • マルチリージョン対応
  • SQLによる使い慣れたインターフェース
  • エンタープライズグレードのセキュリティ

アーキテクチャの特徴

  • Raftコンセンサスアルゴリズム
  • 分散トランザクション(2PC)
  • レンジベースのデータ分散
  • マルチバージョン同時実行制御(MVCC)

メリット・デメリット

メリット

  • 地理的分散: グローバルに分散したデータとベクトル検索
  • 強い一貫性: ACIDトランザクションの完全サポート
  • 高可用性: 自動フェイルオーバーとゼロダウンタイム
  • PostgreSQL互換: 既存のツールとライブラリの活用
  • 自動管理: シャーディングとレプリケーションの自動化
  • 水平スケーリング: ノード追加による簡単なスケールアウト

デメリット

  • レイテンシ: 分散環境でのコンセンサスによる遅延
  • 複雑性: 分散システムの管理と最適化
  • リソース要求: 最小3ノードからの構成
  • pgvectorの制限: ネイティブサポートではない
  • コスト: エンタープライズ機能のライセンス費用

主要リンク

書き方の例

セットアップとテーブル作成

-- pgvector拡張機能のインストール
CREATE EXTENSION IF NOT EXISTS vector;

-- ベクトルデータテーブルの作成
CREATE TABLE documents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    title TEXT NOT NULL,
    content TEXT,
    embedding vector(768),
    category TEXT,
    metadata JSONB,
    created_at TIMESTAMP DEFAULT now()
);

-- ベクトルインデックスの作成
CREATE INDEX documents_embedding_idx ON documents 
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

-- 地理的パーティショニング(マルチリージョン)
ALTER TABLE documents 
SET LOCALITY REGIONAL BY ROW AS "region";

Pythonでの基本操作

import psycopg2
from psycopg2.extras import RealDictCursor
import numpy as np
from typing import List, Dict

# 接続
conn = psycopg2.connect(
    "postgresql://username:password@localhost:26257/vectordb?sslmode=require"
)

# ドキュメントの挿入
def insert_document(title: str, content: str, embedding: np.ndarray):
    with conn.cursor() as cur:
        cur.execute("""
            INSERT INTO documents (title, content, embedding, category, metadata)
            VALUES (%s, %s, %s, %s, %s)
            RETURNING id
        """, (
            title,
            content,
            embedding.tolist(),
            'technology',
            {'source': 'manual', 'version': 1}
        ))
        doc_id = cur.fetchone()[0]
        conn.commit()
        return doc_id

# ベクトル検索
def vector_search(query_vector: np.ndarray, limit: int = 10) -> List[Dict]:
    with conn.cursor(cursor_factory=RealDictCursor) as cur:
        cur.execute("""
            SELECT id, title, content, 
                   1 - (embedding <=> %s::vector) as similarity
            FROM documents
            ORDER BY embedding <=> %s::vector
            LIMIT %s
        """, (query_vector.tolist(), query_vector.tolist(), limit))
        
        return cur.fetchall()

# 使用例
embedding = np.random.rand(768).astype(np.float32)
doc_id = insert_document(
    "CockroachDBベクトル検索",
    "分散SQLデータベースでのベクトル検索実装",
    embedding
)

# 検索実行
query_embedding = np.random.rand(768).astype(np.float32)
results = vector_search(query_embedding)

for result in results:
    print(f"Title: {result['title']}, Similarity: {result['similarity']:.4f}")

ハイブリッド検索とフィルタリング

# テキスト検索とベクトル検索の組み合わせ
def hybrid_search(
    query_vector: np.ndarray, 
    text_query: str, 
    category: str = None,
    limit: int = 10
) -> List[Dict]:
    with conn.cursor(cursor_factory=RealDictCursor) as cur:
        query = """
            SELECT id, title, content,
                   1 - (embedding <=> %s::vector) as vector_score,
                   ts_rank(to_tsvector('english', content), 
                          plainto_tsquery('english', %s)) as text_score,
                   (1 - (embedding <=> %s::vector)) * 0.7 + 
                   ts_rank(to_tsvector('english', content), 
                          plainto_tsquery('english', %s)) * 0.3 as combined_score
            FROM documents
            WHERE (%s IS NULL OR category = %s)
            ORDER BY combined_score DESC
            LIMIT %s
        """
        
        cur.execute(query, (
            query_vector.tolist(),
            text_query,
            query_vector.tolist(),
            text_query,
            category,
            category,
            limit
        ))
        
        return cur.fetchall()

# 地理的に最適化されたクエリ
def region_optimized_search(
    query_vector: np.ndarray,
    region: str,
    limit: int = 10
) -> List[Dict]:
    with conn.cursor(cursor_factory=RealDictCursor) as cur:
        cur.execute("""
            SELECT id, title, content,
                   1 - (embedding <=> %s::vector) as similarity
            FROM documents
            WHERE region = %s
            ORDER BY embedding <=> %s::vector
            LIMIT %s
        """, (
            query_vector.tolist(),
            region,
            query_vector.tolist(),
            limit
        ))
        
        return cur.fetchall()

バッチ処理と最適化

# バッチ挿入
def batch_insert_documents(documents: List[Dict]):
    with conn.cursor() as cur:
        # COPY文を使用した高速挿入
        cur.execute("BEGIN")
        
        for doc in documents:
            cur.execute("""
                INSERT INTO documents (title, content, embedding, category, metadata)
                VALUES (%s, %s, %s, %s, %s)
            """, (
                doc['title'],
                doc['content'],
                doc['embedding'].tolist(),
                doc.get('category', 'general'),
                doc.get('metadata', {})
            ))
        
        cur.execute("COMMIT")

# インデックスの再構築
def rebuild_vector_index():
    with conn.cursor() as cur:
        # 既存インデックスの削除
        cur.execute("DROP INDEX IF EXISTS documents_embedding_idx")
        
        # 新しいインデックスの作成(最適化されたパラメータ)
        cur.execute("""
            CREATE INDEX documents_embedding_idx ON documents 
            USING ivfflat (embedding vector_cosine_ops)
            WITH (lists = 200)
        """)
        
        conn.commit()

# 統計情報の取得
def get_cluster_stats():
    with conn.cursor(cursor_factory=RealDictCursor) as cur:
        cur.execute("""
            SELECT 
                count(*) as total_documents,
                avg(octet_length(embedding::text)) as avg_embedding_size,
                count(DISTINCT category) as categories
            FROM documents
        """)
        
        return cur.fetchone()