GitHub概要
redis/redis
For developers, who are building real-time data-driven applications, Redis is the preferred, fastest, and most feature-rich cache, data structure server, and document and vector query engine.
ホームページ:http://redis.io
スター70,233
ウォッチ2,517
フォーク24,144
作成日:2009年3月21日
言語:C
ライセンス:Other
トピックス
cachecachingdatabasedistributed-systemsin-memoryin-memory-databasejsonkey-valuekey-value-storemessage-brokermessage-queueno-sqlnosqlopen-sourcereal-timerealtimeredistime-seriesvector-databasesvector-search
スター履歴
データ取得日時: 2025/7/30 02:37
データベース
Redis + Vector Search
概要
Redisは高速なインメモリデータストアとして知られていますが、RediSearchモジュールやRedisJSON、RedisAIなどの拡張機能により、ベクトル検索機能も提供しています。Redis Stack(旧Redis Enterprise)では、これらの機能が統合され、リアルタイムのベクトル検索とキャッシングを組み合わせた高パフォーマンスなソリューションを実現できます。
詳細
Redisは2009年にSalvatore Sanfilippo氏によって開発され、現在はRedis Labs(現Redis社)が主導して開発を続けています。ベクトル検索機能は、RediSearchモジュールの一部として提供され、フラットインデックスとHNSWアルゴリズムの両方をサポートしています。インメモリデータベースの特性を活かし、ミリ秒単位の低レイテンシでベクトル検索を実行できます。
Redisベクトル検索の主な特徴:
- インメモリによる超高速検索
- フラットインデックスとHNSWアルゴリズムのサポート
- リアルタイムインデックス更新
- ハイブリッド検索(ベクトル + フルテキスト + 数値フィルタ)
- 複数の距離メトリクス(L2、内積、コサイン類似度)
- ベクトル範囲検索
- Redis Clusterによる水平スケーリング
- 永続化オプション(RDB、AOF)
- Pub/Subによるリアルタイム通知
- トランザクションサポート
アーキテクチャの特徴
- シングルスレッドイベントループによる高速処理
- 複製とフェイルオーバーによる高可用性
- Redis Sentinelによる自動フェイルオーバー
- メモリ効率的なデータ構造
メリット・デメリット
メリット
- 超高速パフォーマンス: インメモリ処理による極めて低いレイテンシ
- リアルタイム性: 即座のインデックス更新とクエリ応答
- 統合ソリューション: キャッシュ、セッション管理、ベクトル検索を一つのシステムで
- 豊富な機能: フルテキスト検索、地理空間検索、時系列データなど多機能
- シンプルな運用: 単一のRedisインスタンスで複数の用途に対応
- 成熟したエコシステム: 多くの言語でクライアントライブラリが利用可能
デメリット
- メモリコスト: 全データをメモリに保持するため高コスト
- スケーラビリティの制限: 単一ノードのメモリ容量に制限される
- 永続化のオーバーヘッド: ディスクへの永続化時にパフォーマンスが低下
- 大規模データセットには不向き: メモリサイズの制約により大規模データには向かない
- バックアップの複雑性: インメモリデータのバックアップ戦略が必要
主要リンク
書き方の例
インストール・セットアップ
# Docker でRedis Stackを実行
docker run -d --name redis-stack \
-p 6379:6379 \
-p 8001:8001 \
redis/redis-stack:latest
# Redis クライアントのインストール(Python)
pip install redis redis-py-search
# Node.js
npm install redis
ベクトル検索の基本操作
import redis
from redis.commands.search.field import VectorField, TextField, NumericField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query
import numpy as np
# 接続
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# インデックスの作成
schema = [
TextField("title"),
TextField("content"),
VectorField("embedding",
"HNSW", {
"TYPE": "FLOAT32",
"DIM": 768,
"DISTANCE_METRIC": "COSINE",
"M": 16,
"EF_CONSTRUCTION": 200
}
),
NumericField("year")
]
# インデックス定義
definition = IndexDefinition(
prefix=["doc:"],
index_type=IndexType.HASH
)
# インデックス作成
r.ft("documents").create_index(
fields=schema,
definition=definition
)
# ドキュメントの追加
embedding = np.random.rand(768).astype(np.float32)
doc_id = "doc:1"
r.hset(doc_id, mapping={
"title": "Redisベクトル検索",
"content": "Redisでリアルタイムベクトル検索を実装",
"embedding": embedding.tobytes(),
"year": 2024
})
# ベクトル検索
query_vector = np.random.rand(768).astype(np.float32)
q = Query("*=>[KNN 10 @embedding $vec AS score]")\
.sort_by("score")\
.return_fields("title", "content", "score")\
.paging(0, 10)\
.dialect(2)
results = r.ft("documents").search(
q,
query_params={"vec": query_vector.tobytes()}
)
for doc in results.docs:
print(f"Title: {doc.title}, Score: {doc.score}")
ハイブリッド検索
# テキストとベクトルの組み合わせ検索
hybrid_query = Query(
"(@content:Redis) => [KNN 5 @embedding $vec AS score]"
).sort_by("score").dialect(2)
results = r.ft("documents").search(
hybrid_query,
query_params={"vec": query_vector.tobytes()}
)
# フィルタリング付きベクトル検索
filtered_query = Query(
"(@year:[2023 2024]) => [KNN 10 @embedding $vec AS score]"
).sort_by("score").dialect(2)
results = r.ft("documents").search(
filtered_query,
query_params={"vec": query_vector.tobytes()}
)
バッチ処理とパイプライン
# パイプラインを使用した効率的なバッチ挿入
pipe = r.pipeline()
for i in range(1000):
doc_id = f"doc:{i}"
embedding = np.random.rand(768).astype(np.float32)
pipe.hset(doc_id, mapping={
"title": f"Document {i}",
"content": f"Content of document {i}",
"embedding": embedding.tobytes(),
"year": 2024
})
# バッチ実行
pipe.execute()
リアルタイム更新とPub/Sub
import threading
# ベクトル更新の監視
def vector_update_listener():
pubsub = r.pubsub()
pubsub.subscribe("vector_updates")
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Vector updated: {message['data']}")
# リスナーを別スレッドで起動
listener_thread = threading.Thread(target=vector_update_listener)
listener_thread.start()
# ベクトル更新と通知
new_embedding = np.random.rand(768).astype(np.float32)
r.hset("doc:1", "embedding", new_embedding.tobytes())
r.publish("vector_updates", "doc:1")
パフォーマンス最適化
# バッチクエリ
def batch_vector_search(query_vectors, k=10):
results = []
for vec in query_vectors:
q = Query("*=>[KNN {} @embedding $vec AS score]".format(k))\
.sort_by("score")\
.paging(0, k)\
.dialect(2)
result = r.ft("documents").search(
q,
query_params={"vec": vec.tobytes()}
)
results.append(result)
return results
# インデックス情報の取得
info = r.ft("documents").info()
print(f"インデックス内のドキュメント数: {info['num_docs']}")