GitHub概要
valkey-io/valkey
A flexible distributed key-value database that is optimized for caching and other realtime workloads.
ホームページ:https://valkey.io
スター22,372
ウォッチ119
フォーク952
作成日:2024年3月22日
言語:C
ライセンス:Other
トピックス
cachedatabasekey-valuekey-value-storenosqlredisvalkeyvalkey-client
スター履歴
データ取得日時: 2025/7/30 02:37
データベース
Valkey + Vector Search
概要
Valkeyは、Redisのオープンソースフォークで、Linux Foundationの下で開発されています。Redis同様にインメモリデータストアとして高速な処理を提供し、拡張モジュールによりベクトル検索機能も利用可能です。Redis互換性を保ちながら、コミュニティ主導の開発により新機能が追加されています。
詳細
Valkeyは2024年にRedisのライセンス変更を受けて、Linux Foundation傘下のプロジェクトとして誕生しました。Redis 7.2.4をベースにフォークされ、Redis互換性を維持しながら独自の進化を遂げています。ベクトル検索機能は、Redisと同様のモジュールシステムを通じて提供され、高速なインメモリベクトル検索を実現します。
Valkeyベクトル検索の主な特徴:
- Redis互換のインメモリベクトル検索
- フラットインデックスとHNSWアルゴリズム
- リアルタイムインデックス更新
- ハイブリッド検索(ベクトル + テキスト + 数値)
- 複数の距離メトリクス
- 水平スケーリング対応
- 高可用性とレプリケーション
- Pub/Subによるリアルタイム通知
- トランザクションサポート
- オープンソース(BSD-3-Clause)
アーキテクチャの特徴
- シングルスレッドイベントループ
- 非同期I/O
- レプリケーションとフェイルオーバー
- クラスタモードでの分散
メリット・デメリット
メリット
- 完全オープンソース: BSD-3-Clauseライセンスで商用利用も自由
- Redis互換: 既存のRedisアプリケーションがそのまま動作
- 高速パフォーマンス: インメモリ処理による低レイテンシ
- コミュニティ主導: Linux Foundation下での透明な開発
- 将来性: 大手企業(AWS、Google、Oracle等)のサポート
- 柔軟なライセンス: ライセンス変更の心配なし
デメリット
- 新しいプロジェクト: エコシステムがまだ発展途上
- メモリコスト: 全データをメモリに保持
- モジュール互換性: 一部Redisモジュールの移植待ち
- ドキュメント: Redisと比較してドキュメントが少ない
- 企業サポート: 商用サポートオプションが限定的
主要リンク
書き方の例
セットアップとインストール
# Dockerでの実行
docker run -d --name valkey \
-p 6379:6379 \
valkey/valkey:latest
# ベクトル検索モジュールのインストール(例:VectorSearch互換モジュール)
docker exec valkey valkey-cli MODULE LOAD /path/to/vector_module.so
Pythonでの基本操作
import redis
import numpy as np
import json
# Valkey接続(Redis互換)
client = redis.Redis(host='localhost', port=6379, decode_responses=True)
# ベクトルインデックスの作成
def create_vector_index():
try:
client.execute_command(
'FT.CREATE', 'vector_idx',
'ON', 'HASH',
'PREFIX', '1', 'doc:',
'SCHEMA',
'title', 'TEXT',
'content', 'TEXT',
'embedding', 'VECTOR', 'HNSW', '6',
'TYPE', 'FLOAT32',
'DIM', '768',
'DISTANCE_METRIC', 'COSINE',
'M', '16',
'EF_CONSTRUCTION', '200'
)
print("Vector index created successfully")
except Exception as e:
print(f"Index already exists or error: {e}")
# ドキュメントの挿入
def insert_document(doc_id, title, content, embedding):
# ベクトルをバイナリ形式に変換
embedding_bytes = np.array(embedding, dtype=np.float32).tobytes()
client.hset(
f'doc:{doc_id}',
mapping={
'title': title,
'content': content,
'embedding': embedding_bytes
}
)
# ベクトル検索
def vector_search(query_vector, limit=10):
query_bytes = np.array(query_vector, dtype=np.float32).tobytes()
results = client.execute_command(
'FT.SEARCH', 'vector_idx',
f'*=>[KNN {limit} @embedding $vec AS score]',
'PARAMS', '2', 'vec', query_bytes,
'SORTBY', 'score',
'RETURN', '3', 'title', 'content', 'score',
'DIALECT', '2'
)
# 結果の解析
documents = []
if results[0] > 0:
for i in range(1, len(results), 2):
doc_id = results[i]
fields = results[i + 1]
doc = {
'id': doc_id,
'title': fields[fields.index('title') + 1],
'content': fields[fields.index('content') + 1],
'score': float(fields[fields.index('score') + 1])
}
documents.append(doc)
return documents
# 使用例
create_vector_index()
# サンプルデータの挿入
embedding = np.random.rand(768).astype(np.float32)
insert_document(
'1',
'Valkeyベクトル検索',
'オープンソースのインメモリベクトル検索',
embedding
)
# 検索実行
query_embedding = np.random.rand(768).astype(np.float32)
results = vector_search(query_embedding)
for doc in results:
print(f"Title: {doc['title']}, Score: {doc['score']:.4f}")
ハイブリッド検索とフィルタリング
# テキスト検索とベクトル検索の組み合わせ
def hybrid_search(text_query, query_vector, category=None, limit=10):
query_bytes = np.array(query_vector, dtype=np.float32).tobytes()
# フィルタ条件の構築
filter_clause = f"@content:{text_query}"
if category:
filter_clause += f" @category:{{{category}}}"
results = client.execute_command(
'FT.SEARCH', 'vector_idx',
f'({filter_clause})=>[KNN {limit} @embedding $vec AS score]',
'PARAMS', '2', 'vec', query_bytes,
'SORTBY', 'score',
'RETURN', '4', 'title', 'content', 'category', 'score',
'DIALECT', '2'
)
return parse_search_results(results)
# リアルタイム更新とPub/Sub
def setup_realtime_notifications():
pubsub = client.pubsub()
pubsub.subscribe('vector_updates')
def handle_updates():
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
print(f"Vector updated: {data['doc_id']}")
# 必要に応じてインデックスを更新
return handle_updates
# バッチ処理
def batch_insert_vectors(documents):
pipeline = client.pipeline()
for doc in documents:
doc_id = doc['id']
embedding_bytes = np.array(doc['embedding'], dtype=np.float32).tobytes()
pipeline.hset(
f'doc:{doc_id}',
mapping={
'title': doc['title'],
'content': doc['content'],
'embedding': embedding_bytes,
'category': doc.get('category', 'general')
}
)
# バッチ実行
pipeline.execute()
パフォーマンス最適化
# メモリ使用量の最適化
def optimize_memory_usage():
# メモリ使用状況の確認
info = client.info('memory')
print(f"Used memory: {info['used_memory_human']}")
# 不要なキーの削除
client.execute_command('MEMORY', 'DOCTOR')
# エビクションポリシーの設定
client.config_set('maxmemory-policy', 'allkeys-lru')
client.config_set('maxmemory', '4gb')
# インデックスの再構築
def rebuild_index():
# 既存インデックスの削除
try:
client.execute_command('FT.DROPINDEX', 'vector_idx', 'DD')
except:
pass
# 新しいインデックスの作成(最適化されたパラメータ)
create_vector_index()
# 既存データの再インデックス
cursor = '0'
while cursor != 0:
cursor, keys = client.scan(cursor, match='doc:*', count=100)
# 各ドキュメントの処理
# クラスタ設定(将来のバージョンで強化予定)
def setup_cluster_config():
# レプリケーション設定
client.execute_command('REPLICAOF', 'master-host', '6379')
# 永続化設定
client.config_set('save', '900 1 300 10 60 10000')
client.config_set('appendonly', 'yes')