データベース
TigerGraph
概要
TigerGraphは、リアルタイム分析と機械学習に特化した超高速グラフ分析プラットフォームです。独自のGSQLクエリ言語とNative Parallel Graph™(NPG)アーキテクチャにより、大規模グラフデータでの高速クエリ処理を実現します。HTAP(Hybrid Transactional/Analytical Processing)をサポートし、リアルタイムでのグラフ更新と分析処理を同時に実行できます。
詳細
TigerGraphは2012年に設立された企業が開発した、エンタープライズ向けグラフデータベースです。従来のグラフデータベースと異なり、ストレージからコンピュート層まで並列処理に最適化された設計を採用しています。数十テラバイト規模のデータで数兆のエッジを処理でき、不正検知、顧客360度分析、IoT、AI・機械学習アプリケーションで広く使用されています。
TigerGraphの主な特徴:
- Native Parallel Graph™(NPG)アーキテクチャ
- GSQL Turing完全なクエリ言語
- リアルタイムHTAP(ハイブリッド処理)
- 超高速並列計算エンジン
- 組み込みグラフデータサイエンスライブラリ
- リアルタイム増分グラフ更新
- スケーラブルクラウドネイティブ設計
- CoPilot AI による知識グラフ拡張
- エンタープライズセキュリティとガバナンス
- ビジュアル開発環境GraphStudio
メリット・デメリット
メリット
- 超高速処理: 数千万のエンティティ・関係性への亜秒級レスポンス
- リアルタイム分析: トランザクション処理と分析の同時実行
- スケーラビリティ: 数十TB、数兆エッジまでの線形スケーリング
- GSQL言語: SQLライクで学習しやすく、Turing完全
- 機械学習統合: グラフフィーチャー抽出と分析の組み込み
- 並列処理: あらゆる層での最適化された並列実行
- エンタープライズ対応: 高可用性、セキュリティ、ガバナンス
- AI拡張: CoPilot AIによる知識グラフの自動拡張
デメリット
- 学習コスト: GSQL言語とグラフ概念の習得が必要
- リソース要件: 高性能を得るために多くのコンピュートリソースが必要
- ライセンス費用: エンタープライズ向けのため高額なライセンス
- エコシステム: Neo4jと比較してサードパーティツールが限定的
- 運用複雑性: 大規模環境での運用管理の複雑さ
主要リンク
書き方の例
インストール・セットアップ
# Docker での実行
docker run -d -p 14022:22 -p 9000:9000 -p 14240:14240 \
--name tigergraph \
--ulimit nofile=1000000:1000000 \
tigergraph/tigergraph:latest
# TigerGraph Cloud の使用
# https://tgcloud.tigergraph.com/ でアカウント作成
# ローカルインストール(Linux)
curl -O https://dl.tigergraph.com/download.php
tar -xzf tigergraph-*.tar.gz
sudo ./install.sh
# Python Client インストール
pip install pyTigerGraph
# Java Client インストール(Maven)
# pom.xml に依存関係追加
基本操作(CRUD)
// スキーマ定義
CREATE GRAPH SocialNetwork()
USE GRAPH SocialNetwork
// 頂点タイプ定義
CREATE VERTEX Person (PRIMARY_ID name STRING, age INT, city STRING)
CREATE VERTEX Company (PRIMARY_ID company_name STRING, industry STRING)
// エッジタイプ定義
CREATE DIRECTED EDGE WORKS_FOR (FROM Person, TO Company, since INT, position STRING)
CREATE UNDIRECTED EDGE FRIENDS_WITH (FROM Person, TO Person, since STRING)
// データ挿入(Create)
INSERT INTO Person VALUES ("田中太郎", 30, "東京")
INSERT INTO Person VALUES ("佐藤花子", 28, "大阪")
INSERT INTO Company VALUES ("テック株式会社", "IT")
// 関係性挿入
INSERT INTO WORKS_FOR VALUES ("田中太郎", "テック株式会社", 2020, "エンジニア")
INSERT INTO FRIENDS_WITH VALUES ("田中太郎", "佐藤花子", "2019-05-15")
// バッチ挿入
INSERT INTO Person VALUES
("山田一郎", 25, "名古屋"),
("鈴木次郎", 32, "福岡"),
("高橋三郎", 29, "札幌")
// データ読み取り(Read)
SELECT * FROM Person
// 条件付きクエリ
SELECT * FROM Person WHERE age > 25
// JOIN クエリ(関係性を含む)
SELECT p.name, p.age, c.company_name, e.position
FROM Person p -(WORKS_FOR:e)-> Company c
WHERE p.city == "東京"
// データ更新(Update)
UPDATE Person SET age = 31 WHERE name == "田中太郎"
// データ削除(Delete)
DELETE FROM Person WHERE name == "田中太郎"
クエリ分析(高度なパターンマッチング)
// パスファインディング
CREATE QUERY findShortestPath(VERTEX<Person> source, VERTEX<Person> target) {
MinAccum<INT> @minDist = 999999;
OrAccum @visited = false;
source.@minDist = 0;
S = {source};
WHILE S.size() > 0 DO
S = SELECT v FROM S:s -(FRIENDS_WITH:e)- Person:v
WHERE v.@minDist > s.@minDist + 1
ACCUM v.@minDist += s.@minDist + 1;
END;
PRINT target.@minDist;
}
// 影響力分析(中心性計算)
CREATE QUERY calculateCentrality() {
SumAccum<INT> @degree_centrality;
Start = {Person.*};
Result = SELECT s FROM Start:s -(FRIENDS_WITH:e)- Person:t
ACCUM s.@degree_centrality += 1
ORDER BY s.@degree_centrality DESC;
PRINT Result[Result.name, Result.@degree_centrality];
}
// コミュニティ検出
CREATE QUERY detectCommunities() {
GroupByAccum<STRING city, SetAccum<STRING>> @@community_members;
Start = {Person.*};
Result = SELECT s FROM Start:s
ACCUM @@community_members += (s.city -> s.name);
PRINT @@community_members;
}
// リアルタイム推薦
CREATE QUERY realTimeRecommendation(VERTEX<Person> user) {
SumAccum<INT> @score;
// 協調フィルタリング
Friends = SELECT t FROM user:s -(FRIENDS_WITH:e)- Person:t;
Recommendations = SELECT c FROM Friends:f -(WORKS_FOR:e)- Company:c
WHERE c != ANY(SELECT comp FROM user -(WORKS_FOR:ew)- Company:comp)
ACCUM c.@score += 1
ORDER BY c.@score DESC
LIMIT 5;
PRINT Recommendations[Recommendations.company_name, Recommendations.@score];
}
高度な機能(機械学習・分析)
// グラフフィーチャー抽出
CREATE QUERY extractGraphFeatures(VERTEX<Person> target_person) {
SumAccum<INT> @neighbor_count;
AvgAccum @avg_neighbor_age;
SetAccum<STRING> @neighbor_cities;
MaxAccum<INT> @max_connection_strength;
// 1次近傍の特徴量
Level1 = SELECT t FROM target_person:s -(FRIENDS_WITH:e)- Person:t
ACCUM
s.@neighbor_count += 1,
s.@avg_neighbor_age += t.age,
s.@neighbor_cities += t.city;
// 2次近傍の特徴量
Level2 = SELECT t2 FROM Level1:l1 -(FRIENDS_WITH:e2)- Person:t2
WHERE t2 != target_person
ACCUM target_person.@max_connection_strength += 1;
// 特徴量ベクトル出力
PRINT target_person.name,
target_person.@neighbor_count as direct_friends,
target_person.@avg_neighbor_age as avg_friend_age,
target_person.@neighbor_cities.size() as unique_cities,
target_person.@max_connection_strength as network_reach;
}
// 不正検知分析
CREATE QUERY fraudDetection(VERTEX<Person> account) {
SumAccum<FLOAT> @risk_score;
MaxAccum<INT> @max_transaction_amount;
SetAccum<STRING> @transaction_patterns;
// 取引パターン分析
Transactions = SELECT t FROM account:s -(TRANSACTION:e)- Account:t
WHERE e.timestamp > now() - 86400 // 24時間以内
ACCUM
s.@risk_score += (CASE WHEN e.amount > 100000 THEN 10 ELSE 1 END),
s.@max_transaction_amount += e.amount,
s.@transaction_patterns += (e.type + "_" + to_string(e.amount));
// リスクスコア計算
IF account.@risk_score > 50 OR account.@max_transaction_amount > 500000 THEN
PRINT account.name, account.@risk_score, "HIGH_RISK";
ELSE
PRINT account.name, account.@risk_score, "LOW_RISK";
END;
}
// リアルタイムアラート
CREATE QUERY realTimeAlert(VERTEX<Person> user, STRING alert_type) {
TYPEDEF TUPLE<STRING user_name, FLOAT score, STRING alert_level> AlertRecord;
ListAccum<AlertRecord> @@alerts;
// アラート条件チェック
Current = {user};
Result = SELECT s FROM Current:s
POST-ACCUM
IF alert_type == "FRAUD" AND s.risk_score > 75 THEN
@@alerts += AlertRecord(s.name, s.risk_score, "CRITICAL")
ELSE IF alert_type == "ACTIVITY" AND s.activity_count > 100 THEN
@@alerts += AlertRecord(s.name, s.activity_count, "WARNING")
END;
// アラート送信
PRINT @@alerts;
}
最適化・パフォーマンス
// インデックス作成
CREATE INDEX ON VERTEX Person(age)
CREATE INDEX ON VERTEX Company(industry)
CREATE INDEX ON EDGE WORKS_FOR(since)
// パーティショニング設定
ALTER GRAPH SocialNetwork SET distributed_storage_config = '{
"replication_factor": 3,
"partition_count": 8,
"partition_key": "name"
}'
// クエリ最適化
CREATE QUERY optimizedQuery() RETURNS (STRING) {
// 並列実行の最適化
SetAccum<VERTEX<Person>> @@processed_vertices;
// バッチ処理で効率化
Start = {Person.*};
FOREACH batch_size IN RANGE[1, 1000] DO
CurrentBatch = SELECT s FROM Start:s
WHERE s.id % 1000 == batch_size - 1
ACCUM @@processed_vertices += s;
END;
RETURN "Processed " + to_string(@@processed_vertices.size()) + " vertices";
}
// メモリ使用量最適化
CREATE QUERY memoryOptimizedQuery() {
// ストリーミング処理
Start = {Person.*};
// 段階的処理でメモリ効率化
FOREACH vertex_set IN RANGE[1, 10] DO
Subset = SELECT s FROM Start:s
WHERE s.id % 10 == vertex_set - 1;
// 小さなバッチで処理
ProcessedSubset = SELECT s FROM Subset:s -(FRIENDS_WITH)- Person:t
ACCUM s.@local_count += 1;
END;
}
// 統計情報取得
RUN QUERY getGraphStatistics()
実用例(エンタープライズユースケース)
// 顧客360度分析
CREATE QUERY customer360Analysis(VERTEX<Customer> customer_id) {
// 顧客の全タッチポイント分析
// 購買履歴
Purchases = SELECT p FROM customer_id:c -(PURCHASED:e)- Product:p
ACCUM c.@total_spent += e.amount;
// サポート履歴
SupportTickets = SELECT t FROM customer_id:c -(CREATED:e)- Ticket:t;
// デジタルタッチポイント
DigitalInteractions = SELECT i FROM customer_id:c -(INTERACTED:e)- DigitalChannel:i;
// セグメント分析
CASE
WHEN customer_id.@total_spent > 100000 THEN
UPDATE customer_id SET segment = "Premium"
WHEN customer_id.@total_spent > 50000 THEN
UPDATE customer_id SET segment = "Gold"
ELSE
UPDATE customer_id SET segment = "Standard"
END;
PRINT customer_id, Purchases, SupportTickets, DigitalInteractions;
}
// サプライチェーン分析
CREATE QUERY supplyChainAnalysis() {
// リスク伝播分析
SumAccum<FLOAT> @risk_propagation;
HighRiskSuppliers = {Supplier.* WHERE risk_level > 0.7};
// リスクの下流伝播
AffectedProducts = SELECT p FROM HighRiskSuppliers:s -(SUPPLIES:e)- Product:p
ACCUM p.@risk_propagation += s.risk_level * e.dependency_weight;
// 代替経路検索
CREATE QUERY findAlternativeSuppliers(VERTEX<Product> product) {
Alternatives = SELECT alt FROM product:p <-(SUPPLIES:e)- Supplier:alt
WHERE alt.risk_level < 0.3
ORDER BY e.cost_efficiency DESC;
RETURN Alternatives;
}
}
// IoTデータストリーミング分析
CREATE QUERY iotStreamAnalysis() {
// リアルタイムセンサーデータ処理
SensorData = {IoTDevice.* WHERE last_update > now() - 300}; // 5分以内
// 異常検知
Anomalies = SELECT d FROM SensorData:d
WHERE d.temperature > d.threshold_max OR
d.temperature < d.threshold_min
ACCUM d.@alert_count += 1;
// 予測メンテナンス
MaintenanceNeeded = SELECT d FROM SensorData:d
WHERE d.vibration_level > 0.8 AND
d.operating_hours > 1000;
PRINT Anomalies, MaintenanceNeeded;
}
Pythonクライアント使用例
import pyTigerGraph as tg
# 接続設定
conn = tg.TigerGraphConnection(
host="https://your-instance.i.tgcloud.io",
graphname="SocialNetwork",
username="tigergraph",
password="your_password",
apiToken="your_api_token"
)
# 頂点挿入
def create_person(name, age, city):
result = conn.upsertVertex("Person", name, {
"age": age,
"city": city
})
return result
# エッジ挿入
def create_friendship(person1, person2, since):
result = conn.upsertEdge("Person", person1, "FRIENDS_WITH", "Person", person2, {
"since": since
})
return result
# クエリ実行
def run_custom_query(query_name, params={}):
result = conn.runInstalledQuery(query_name, params)
return result
# バッチデータロード
def batch_load_data(data_file):
job = conn.gsql(f'''
CREATE LOADING JOB load_persons FOR GRAPH SocialNetwork {{
DEFINE FILENAME f1 = "{data_file}";
LOAD f1 TO VERTEX Person VALUES ($0, $1, $2)
USING header="true", separator=",";
}}
''')
# ジョブ実行
conn.gsql("RUN LOADING JOB load_persons")
# リアルタイム分析
def real_time_analysis():
# ストリーミングデータ処理
while True:
# 新規データ取得
new_data = get_streaming_data()
# グラフ更新
for record in new_data:
create_person(record['name'], record['age'], record['city'])
# リアルタイム分析実行
result = run_custom_query("realTimeAnalysis", {
"threshold": 100,
"time_window": 3600
})
# アラート処理
if result['alert_level'] == 'HIGH':
send_alert(result)
time.sleep(1) # 1秒間隔
# 使用例
if __name__ == "__main__":
# データ挿入
create_person("田中太郎", 30, "東京")
create_person("佐藤花子", 28, "大阪")
create_friendship("田中太郎", "佐藤花子", "2024-01-15")
# 分析実行
centrality_result = run_custom_query("calculateCentrality")
print("中心性分析結果:", centrality_result)
# リアルタイム分析開始
real_time_analysis()
パフォーマンス設定
# TigerGraph 設定ファイル (tigergraph.cfg)
[System]
MemoryLimit=32GB
ThreadCount=16
StorageRoot=/data/tigergraph
[GSQL]
QueryTimeout=300
MaxResultSize=100MB
EnableParallelLoading=true
[GPE]
WorkerThreads=8
BatchSize=10000
EnableRealTimeUpdate=true
[REST]
MaxConcurrentRequests=1000
RequestTimeout=60
# システム最適化
echo 'vm.overcommit_memory=1' >> /etc/sysctl.conf
echo 'vm.max_map_count=262144' >> /etc/sysctl.conf
sysctl -p
# JVM設定(GSQL用)
export JAVA_OPTIONS="-Xms8g -Xmx16g -XX:+UseG1GC"