GitHub概要

valkey-io/valkey

A flexible distributed key-value database that is optimized for caching and other realtime workloads.

ホームページ:https://valkey.io
スター22,372
ウォッチ119
フォーク952
作成日:2024年3月22日
言語:C
ライセンス:Other

トピックス

cachedatabasekey-valuekey-value-storenosqlredisvalkeyvalkey-client

スター履歴

valkey-io/valkey Star History
データ取得日時: 2025/7/30 02:37

データベース

Valkey + Vector Search

概要

Valkeyは、Redisのオープンソースフォークで、Linux Foundationの下で開発されています。Redis同様にインメモリデータストアとして高速な処理を提供し、拡張モジュールによりベクトル検索機能も利用可能です。Redis互換性を保ちながら、コミュニティ主導の開発により新機能が追加されています。

詳細

Valkeyは2024年にRedisのライセンス変更を受けて、Linux Foundation傘下のプロジェクトとして誕生しました。Redis 7.2.4をベースにフォークされ、Redis互換性を維持しながら独自の進化を遂げています。ベクトル検索機能は、Redisと同様のモジュールシステムを通じて提供され、高速なインメモリベクトル検索を実現します。

Valkeyベクトル検索の主な特徴:

  • Redis互換のインメモリベクトル検索
  • フラットインデックスとHNSWアルゴリズム
  • リアルタイムインデックス更新
  • ハイブリッド検索(ベクトル + テキスト + 数値)
  • 複数の距離メトリクス
  • 水平スケーリング対応
  • 高可用性とレプリケーション
  • Pub/Subによるリアルタイム通知
  • トランザクションサポート
  • オープンソース(BSD-3-Clause)

アーキテクチャの特徴

  • シングルスレッドイベントループ
  • 非同期I/O
  • レプリケーションとフェイルオーバー
  • クラスタモードでの分散

メリット・デメリット

メリット

  • 完全オープンソース: BSD-3-Clauseライセンスで商用利用も自由
  • Redis互換: 既存のRedisアプリケーションがそのまま動作
  • 高速パフォーマンス: インメモリ処理による低レイテンシ
  • コミュニティ主導: Linux Foundation下での透明な開発
  • 将来性: 大手企業(AWS、Google、Oracle等)のサポート
  • 柔軟なライセンス: ライセンス変更の心配なし

デメリット

  • 新しいプロジェクト: エコシステムがまだ発展途上
  • メモリコスト: 全データをメモリに保持
  • モジュール互換性: 一部Redisモジュールの移植待ち
  • ドキュメント: Redisと比較してドキュメントが少ない
  • 企業サポート: 商用サポートオプションが限定的

主要リンク

書き方の例

セットアップとインストール

# Dockerでの実行
docker run -d --name valkey \
  -p 6379:6379 \
  valkey/valkey:latest

# ベクトル検索モジュールのインストール(例:VectorSearch互換モジュール)
docker exec valkey valkey-cli MODULE LOAD /path/to/vector_module.so

Pythonでの基本操作

import redis
import numpy as np
import json

# Valkey接続(Redis互換)
client = redis.Redis(host='localhost', port=6379, decode_responses=True)

# ベクトルインデックスの作成
def create_vector_index():
    try:
        client.execute_command(
            'FT.CREATE', 'vector_idx',
            'ON', 'HASH',
            'PREFIX', '1', 'doc:',
            'SCHEMA',
            'title', 'TEXT',
            'content', 'TEXT',
            'embedding', 'VECTOR', 'HNSW', '6',
            'TYPE', 'FLOAT32',
            'DIM', '768',
            'DISTANCE_METRIC', 'COSINE',
            'M', '16',
            'EF_CONSTRUCTION', '200'
        )
        print("Vector index created successfully")
    except Exception as e:
        print(f"Index already exists or error: {e}")

# ドキュメントの挿入
def insert_document(doc_id, title, content, embedding):
    # ベクトルをバイナリ形式に変換
    embedding_bytes = np.array(embedding, dtype=np.float32).tobytes()
    
    client.hset(
        f'doc:{doc_id}',
        mapping={
            'title': title,
            'content': content,
            'embedding': embedding_bytes
        }
    )

# ベクトル検索
def vector_search(query_vector, limit=10):
    query_bytes = np.array(query_vector, dtype=np.float32).tobytes()
    
    results = client.execute_command(
        'FT.SEARCH', 'vector_idx',
        f'*=>[KNN {limit} @embedding $vec AS score]',
        'PARAMS', '2', 'vec', query_bytes,
        'SORTBY', 'score',
        'RETURN', '3', 'title', 'content', 'score',
        'DIALECT', '2'
    )
    
    # 結果の解析
    documents = []
    if results[0] > 0:
        for i in range(1, len(results), 2):
            doc_id = results[i]
            fields = results[i + 1]
            doc = {
                'id': doc_id,
                'title': fields[fields.index('title') + 1],
                'content': fields[fields.index('content') + 1],
                'score': float(fields[fields.index('score') + 1])
            }
            documents.append(doc)
    
    return documents

# 使用例
create_vector_index()

# サンプルデータの挿入
embedding = np.random.rand(768).astype(np.float32)
insert_document(
    '1',
    'Valkeyベクトル検索',
    'オープンソースのインメモリベクトル検索',
    embedding
)

# 検索実行
query_embedding = np.random.rand(768).astype(np.float32)
results = vector_search(query_embedding)

for doc in results:
    print(f"Title: {doc['title']}, Score: {doc['score']:.4f}")

ハイブリッド検索とフィルタリング

# テキスト検索とベクトル検索の組み合わせ
def hybrid_search(text_query, query_vector, category=None, limit=10):
    query_bytes = np.array(query_vector, dtype=np.float32).tobytes()
    
    # フィルタ条件の構築
    filter_clause = f"@content:{text_query}"
    if category:
        filter_clause += f" @category:{{{category}}}"
    
    results = client.execute_command(
        'FT.SEARCH', 'vector_idx',
        f'({filter_clause})=>[KNN {limit} @embedding $vec AS score]',
        'PARAMS', '2', 'vec', query_bytes,
        'SORTBY', 'score',
        'RETURN', '4', 'title', 'content', 'category', 'score',
        'DIALECT', '2'
    )
    
    return parse_search_results(results)

# リアルタイム更新とPub/Sub
def setup_realtime_notifications():
    pubsub = client.pubsub()
    pubsub.subscribe('vector_updates')
    
    def handle_updates():
        for message in pubsub.listen():
            if message['type'] == 'message':
                data = json.loads(message['data'])
                print(f"Vector updated: {data['doc_id']}")
                # 必要に応じてインデックスを更新
    
    return handle_updates

# バッチ処理
def batch_insert_vectors(documents):
    pipeline = client.pipeline()
    
    for doc in documents:
        doc_id = doc['id']
        embedding_bytes = np.array(doc['embedding'], dtype=np.float32).tobytes()
        
        pipeline.hset(
            f'doc:{doc_id}',
            mapping={
                'title': doc['title'],
                'content': doc['content'],
                'embedding': embedding_bytes,
                'category': doc.get('category', 'general')
            }
        )
    
    # バッチ実行
    pipeline.execute()

パフォーマンス最適化

# メモリ使用量の最適化
def optimize_memory_usage():
    # メモリ使用状況の確認
    info = client.info('memory')
    print(f"Used memory: {info['used_memory_human']}")
    
    # 不要なキーの削除
    client.execute_command('MEMORY', 'DOCTOR')
    
    # エビクションポリシーの設定
    client.config_set('maxmemory-policy', 'allkeys-lru')
    client.config_set('maxmemory', '4gb')

# インデックスの再構築
def rebuild_index():
    # 既存インデックスの削除
    try:
        client.execute_command('FT.DROPINDEX', 'vector_idx', 'DD')
    except:
        pass
    
    # 新しいインデックスの作成(最適化されたパラメータ)
    create_vector_index()
    
    # 既存データの再インデックス
    cursor = '0'
    while cursor != 0:
        cursor, keys = client.scan(cursor, match='doc:*', count=100)
        # 各ドキュメントの処理

# クラスタ設定(将来のバージョンで強化予定)
def setup_cluster_config():
    # レプリケーション設定
    client.execute_command('REPLICAOF', 'master-host', '6379')
    
    # 永続化設定
    client.config_set('save', '900 1 300 10 60 10000')
    client.config_set('appendonly', 'yes')