データベース
Amazon Neptune
概要
Amazon Neptuneは、AWSが提供するフルマネージドグラフデータベースサービスです。PropertyGraph(Apache TinkerPop Gremlin)とRDF(SPARQL 1.1)の両方のグラフモデルをサポートし、高可用性、自動スケーリング、運用負荷軽減を実現します。AWSの他のサービスとシームレスに統合でき、数十億の関係性を持つ大規模グラフデータを数ミリ秒のレイテンシで処理できます。
詳細
Amazon Neptuneは2018年にAWSから正式リリースされました。従来のグラフデータベースと異なり、完全なマネージドサービスとして提供され、インフラストラクチャの管理やメンテナンスが不要です。最大64TBまでの自動スケーリングストレージ、3つのアベイラビリティゾーンにわたる6つのデータレプリカ、自動バックアップとポイントインタイムリカバリを提供します。
Amazon Neptuneの主な特徴:
- デュアルグラフエンジン(Property Graph + RDF)
- Apache TinkerPop Gremlin クエリ言語サポート
- SPARQL 1.1 準拠のRDFクエリ
- 最大100,000 QPS のクエリ処理能力
- VPC内での実行とKMSによる暗号化
- AWS IAMによるきめ細かなアクセス制御
- Amazon Athenaとの統合によるSQL分析
- 自動フェイルオーバーとリードレプリカ
- Amazon SageMakerとの機械学習統合
- AWS Lambda、EC2、EKSからの接続サポート
メリット・デメリット
メリット
- フルマネージド: インフラ管理、パッチ適用、バックアップが自動化
- 高可用性: 3-AZ構成で99.99%の可用性SLAを提供
- 自動スケーリング: ストレージとコンピュートの独立スケーリング
- デュアルエンジン: GremlinとSPARQLの両方をネイティブサポート
- AWSエコシステム: 他のAWSサービスとのシームレス連携
- セキュリティ: VPC、IAM、KMS暗号化の統合セキュリティ
- パフォーマンス: 専用グラフエンジンによる高速処理
- 運用負荷軽減: モニタリング、アラート、メトリクスが統合
デメリット
- ベンダーロックイン: AWSプラットフォーム依存
- コスト: 小規模使用でも固定的なインスタンス料金
- 学習コスト: GremlinとSPARQLの習得が必要
- カスタマイゼーション制限: マネージドサービスによる設定制約
- リージョン制限: 利用可能リージョンが限定的
- 複雑な料金体系: インスタンス、ストレージ、I/O料金の組み合わせ
主要リンク
書き方の例
セットアップ・接続設定
# AWS CLI設定
aws configure
# アクセスキー、シークレットキー、リージョンを設定
# Python Gremlinクライアントインストール
pip install gremlinpython boto3
# Python SPARQLクライアントインストール
pip install rdflib requests
# Java依存関係(Maven)
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-driver</artifactId>
<version>3.6.2</version>
</dependency>
<dependency>
<groupId>org.eclipse.rdf4j</groupId>
<artifactId>rdf4j-repository-sparql</artifactId>
<version>3.7.4</version>
</dependency>
# ネットワーク設定確認
# Neptune VPCセキュリティグループでポート8182(Gremlin)、8183(SPARQL)を開放
基本操作(Gremlin)
# Gremlin接続とCRUD操作
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.anonymous_traversal import traversal
import boto3
# Neptune接続
database_url = "wss://your-neptune-endpoint.region.neptune.amazonaws.com:8182/gremlin"
remoteConn = DriverRemoteConnection(database_url, "g")
g = traversal().withRemote(remoteConn)
# 頂点作成(Create)
person = g.addV("Person").property("name", "田中太郎").property("age", 30).next()
company = g.addV("Company").property("name", "テック株式会社").property("industry", "IT").next()
# エッジ作成
g.V(person).addE("WORKS_FOR").to(g.V(company)).property("since", 2020).iterate()
# 頂点検索(Read)
persons = g.V().hasLabel("Person").toList()
print(f"Persons found: {len(persons)}")
# 特定条件での検索
result = g.V().hasLabel("Person").has("name", "田中太郎").valueMap().toList()
print(result)
# 関係性クエリ
paths = g.V().hasLabel("Person").has("name", "田中太郎")\
.outE("WORKS_FOR").inV().hasLabel("Company")\
.path().by("name").toList()
# 頂点更新(Update)
g.V().hasLabel("Person").has("name", "田中太郎")\
.property("age", 31).property("location", "東京").iterate()
# 頂点削除(Delete)
g.V().hasLabel("Person").has("name", "田中太郎").drop().iterate()
# 接続クローズ
remoteConn.close()
基本操作(SPARQL)
# SPARQL接続とCRUD操作
import requests
import json
sparql_endpoint = "https://your-neptune-endpoint.region.neptune.amazonaws.com:8183/sparql"
# RDFトリプル挿入(Create)
insert_query = """
INSERT DATA {
<http://example.org/person/tanaka> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://example.org/Person> .
<http://example.org/person/tanaka> <http://example.org/name> "田中太郎" .
<http://example.org/person/tanaka> <http://example.org/age> "30"^^<http://www.w3.org/2001/XMLSchema#int> .
<http://example.org/company/tech> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://example.org/Company> .
<http://example.org/company/tech> <http://example.org/name> "テック株式会社" .
<http://example.org/person/tanaka> <http://example.org/worksFor> <http://example.org/company/tech> .
}
"""
response = requests.post(sparql_endpoint,
data={'update': insert_query},
headers={'Content-Type': 'application/x-www-form-urlencoded'})
# データ検索(Read)
select_query = """
SELECT ?person ?name ?age WHERE {
?person <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://example.org/Person> .
?person <http://example.org/name> ?name .
?person <http://example.org/age> ?age .
}
"""
response = requests.post(sparql_endpoint,
data={'query': select_query},
headers={'Content-Type': 'application/x-www-form-urlencoded'})
results = response.json()
# 関係性クエリ
relationship_query = """
SELECT ?person ?company WHERE {
?person <http://example.org/worksFor> ?company .
?person <http://example.org/name> "田中太郎" .
}
"""
# データ更新(Update)
update_query = """
DELETE { <http://example.org/person/tanaka> <http://example.org/age> ?oldAge }
INSERT { <http://example.org/person/tanaka> <http://example.org/age> "31"^^<http://www.w3.org/2001/XMLSchema#int> }
WHERE { <http://example.org/person/tanaka> <http://example.org/age> ?oldAge }
"""
# データ削除(Delete)
delete_query = """
DELETE WHERE {
<http://example.org/person/tanaka> ?p ?o .
}
"""
データモデリング・スキーマ設計
# Gremlinを使った複雑なデータモデル
# eコマースシステムのモデリング
# ユーザー、商品、カテゴリ作成
def create_ecommerce_model():
# ユーザー作成
user1 = g.addV("User").property("id", "user001").property("name", "佐藤花子")\
.property("email", "[email protected]").property("joinDate", "2024-01-15").next()
# 商品とカテゴリ作成
laptop = g.addV("Product").property("id", "prod001").property("name", "ノートPC")\
.property("price", 120000).property("brand", "TechBrand").next()
mouse = g.addV("Product").property("id", "prod002").property("name", "ワイヤレスマウス")\
.property("price", 3000).property("brand", "TechBrand").next()
category = g.addV("Category").property("name", "コンピュータ").property("level", 1).next()
# 関係性作成
g.V(user1).addE("PURCHASED").to(g.V(laptop))\
.property("date", "2024-02-01").property("quantity", 1).iterate()
g.V(user1).addE("VIEWED").to(g.V(mouse))\
.property("timestamp", "2024-02-15T10:30:00").iterate()
g.V(laptop).addE("BELONGS_TO").to(g.V(category)).iterate()
g.V(mouse).addE("BELONGS_TO").to(g.V(category)).iterate()
# SPARQLでのオントロジー設計
ontology_insert = """
PREFIX ex: <http://example.org/>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
INSERT DATA {
ex:User rdf:type rdfs:Class .
ex:Product rdf:type rdfs:Class .
ex:Category rdf:type rdfs:Class .
ex:hasName rdf:type rdf:Property .
ex:hasPrice rdf:type rdf:Property .
ex:belongsToCategory rdf:type rdf:Property .
ex:purchasedBy rdf:type rdf:Property .
ex:user001 rdf:type ex:User .
ex:user001 ex:hasName "佐藤花子" .
ex:prod001 rdf:type ex:Product .
ex:prod001 ex:hasName "ノートPC" .
ex:prod001 ex:hasPrice "120000"^^<http://www.w3.org/2001/XMLSchema#int> .
}
"""
パフォーマンス最適化・インデックス
# Gremlinでのインデックス最適化戦略
# Neptune は自動的にインデックスを管理しますが、クエリパターンの最適化が重要
# 効率的なクエリパターン
def optimized_queries():
# インデックスを活用する効率的な検索
# has() を使って最初にフィルタリング
result = g.V().hasLabel("User").has("email", "[email protected]")\
.outE("PURCHASED").inV().hasLabel("Product").valueMap().toList()
# 複数プロパティでの絞り込み
expensive_products = g.V().hasLabel("Product")\
.has("price", P.gte(100000))\
.has("category", "electronics").toList()
# エッジ方向を明示的に指定
user_purchases = g.V().hasLabel("User").has("id", "user001")\
.out("PURCHASED").hasLabel("Product").toList()
# クエリプロファイリング
def profile_query():
# profile() でクエリパフォーマンス分析
profile_result = g.V().hasLabel("User")\
.out("PURCHASED").hasLabel("Product")\
.profile().toList()
print(profile_result)
# SPARQLでのクエリ最適化
optimized_sparql = """
PREFIX ex: <http://example.org/>
SELECT ?user ?product ?price WHERE {
?user a ex:User .
?user ex:purchased ?product .
?product ex:hasPrice ?price .
FILTER(?price > 50000)
}
ORDER BY DESC(?price)
LIMIT 10
"""
# バッチ処理最適化
def batch_operations():
# 複数操作をまとめて実行
batch_traversal = g.addV("User").property("name", "ユーザー1")\
.as_("user1")\
.addV("Product").property("name", "商品1")\
.as_("product1")\
.addE("PURCHASED").from_("user1").to("product1")
batch_traversal.iterate()
高度な分析クエリ
# Gremlinでの高度なグラフ分析
def advanced_graph_analytics():
# 最短パス検索
shortest_path = g.V().hasLabel("User").has("name", "佐藤花子")\
.repeat(outE().otherV().simplePath())\
.until(hasLabel("Product").has("name", "ノートPC"))\
.path().limit(1).toList()
# 推薦アルゴリズム(協調フィルタリング)
recommendations = g.V().hasLabel("User").has("name", "佐藤花子")\
.out("PURCHASED").aggregate("purchased")\
.in_("PURCHASED").where(P.neq("佐藤花子"))\
.out("PURCHASED").where(P.without("purchased"))\
.groupCount().order().by(Column.values, Order.desc)\
.limit(5).toList()
# ネットワーク中心性分析
centrality = g.V().hasLabel("Product")\
.project("product", "connections")\
.by("name")\
.by(bothE().count())\
.order().by(Column.values, Order.desc)\
.limit(10).toList()
# コミュニティ検出(トライアングル検索)
triangles = g.V().hasLabel("User").as_("a")\
.out("FRIENDS_WITH").as_("b")\
.out("FRIENDS_WITH").as_("c")\
.where("c", P.eq("a"))\
.select("a", "b", "c").by("name").toList()
# SPARQLでの高度な分析
advanced_sparql_queries = """
# ページランク風の重要度計算
PREFIX ex: <http://example.org/>
SELECT ?product (COUNT(?purchase) AS ?popularity) WHERE {
?user ex:purchased ?product .
?product ex:hasName ?productName .
}
GROUP BY ?product
ORDER BY DESC(?popularity)
LIMIT 10
# 時系列分析
SELECT ?month (COUNT(?purchase) AS ?sales) WHERE {
?user ex:purchased ?product .
?purchase ex:date ?date .
BIND(SUBSTR(?date, 1, 7) AS ?month)
}
GROUP BY ?month
ORDER BY ?month
# グラフパターンマッチング
SELECT ?user1 ?user2 ?commonProduct WHERE {
?user1 ex:purchased ?commonProduct .
?user2 ex:purchased ?commonProduct .
FILTER(?user1 != ?user2)
}
"""
AWS統合・運用監視
# AWS Neptuneのモニタリングと管理
import boto3
# CloudWatch メトリクス取得
def get_neptune_metrics():
cloudwatch = boto3.client('cloudwatch')
# クエリ実行時間メトリクス
response = cloudwatch.get_metric_statistics(
Namespace='AWS/Neptune',
MetricName='GremlinRequestsPerSec',
Dimensions=[
{
'Name': 'DBClusterIdentifier',
'Value': 'your-neptune-cluster'
}
],
StartTime='2024-01-01T00:00:00Z',
EndTime='2024-01-02T00:00:00Z',
Period=300,
Statistics=['Average', 'Maximum']
)
return response['Datapoints']
# IAMロールベースアクセス制御
def create_neptune_connection_with_iam():
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
# IAM認証を使用したNeptune接続
session = boto3.Session()
credentials = session.get_credentials()
#署名付きGremlin接続(WebSocket)
database_url = "wss://your-neptune-endpoint.region.neptune.amazonaws.com:8182/gremlin"
# カスタム認証ハンドラー実装
def create_signed_connection():
# SigV4署名でWebSocket接続を作成
pass
# Lambda関数でのNeptune操作
def lambda_handler(event, context):
try:
# Gremlin接続
database_url = "wss://your-neptune-endpoint.region.neptune.amazonaws.com:8182/gremlin"
remoteConn = DriverRemoteConnection(database_url, "g")
g = traversal().withRemote(remoteConn)
# ビジネスロジック実行
result = g.V().hasLabel("User").has("id", event['userId'])\
.out("PURCHASED").hasLabel("Product").valueMap().toList()
remoteConn.close()
return {
'statusCode': 200,
'body': json.dumps(result)
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
# バックアップと復旧
def neptune_backup_management():
neptune = boto3.client('neptune')
# 手動スナップショット作成
response = neptune.create_db_cluster_snapshot(
DBSnapshotIdentifier='manual-snapshot-2024-01',
DBClusterIdentifier='your-neptune-cluster'
)
# ポイントインタイムリカバリ
response = neptune.restore_db_cluster_to_point_in_time(
DBClusterIdentifier='restored-cluster',
SourceDBClusterIdentifier='your-neptune-cluster',
RestoreToTime='2024-01-01T12:00:00Z'
)
実用例・ユースケース実装
# 不正検知システムの実装
def fraud_detection_system():
# 疑わしい取引パターンの検出
suspicious_transactions = g.V().hasLabel("Account")\
.outE("TRANSFER")\
.has("amount", P.gt(100000))\
.has("timestamp", P.gt("2024-01-01"))\
.inV().hasLabel("Account")\
.groupCount().by("accountId")\
.unfold()\
.where(Column.values, P.gt(10))\
.toList()
# リング状送金の検出
money_laundering = g.V().hasLabel("Account").as_("start")\
.repeat(outE("TRANSFER").inV().simplePath())\
.until(where(P.eq("start")))\
.path().toList()
return suspicious_transactions, money_laundering
# ソーシャルネットワーク分析
def social_network_analysis():
# 影響力の高いユーザー検出
influencers = g.V().hasLabel("User")\
.project("user", "followers", "influence")\
.by("name")\
.by(inE("FOLLOWS").count())\
.by(inE("FOLLOWS").outV().outE("LIKES").count().mean())\
.order().by(Column.values.get(2), Order.desc)\
.limit(10).toList()
# コミュニティ検出(モジュラリティ最大化)
communities = g.V().hasLabel("User")\
.aggregate("allUsers")\
.repeat(outE("FRIENDS_WITH").otherV().simplePath())\
.until(__.where(P.within("allUsers")).count().is_(P.gt(5)))\
.path().toList()
# 知識グラフシステム
knowledge_graph_sparql = """
PREFIX ex: <http://example.org/>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
# エンティティ間の関係推論
SELECT ?entity1 ?relationship ?entity2 WHERE {
?entity1 ?relationship ?entity2 .
?entity1 rdf:type ex:Person .
?entity2 rdf:type ex:Organization .
FILTER(?relationship != rdf:type)
}
# 階層構造の検索
SELECT ?parent ?child ?depth WHERE {
ex:root_entity (ex:hasChild+) ?child .
ex:root_entity (ex:hasChild*) ?parent .
?parent ex:hasChild ?child .
}
# 語彙的類似性検索
SELECT ?concept ?similarity WHERE {
?concept rdfs:label ?label .
FILTER(CONTAINS(LCASE(?label), "技術"))
BIND(1.0 AS ?similarity)
}
ORDER BY DESC(?similarity)
"""
# レコメンデーションエンジン
def recommendation_engine():
# 協調フィルタリング
collaborative_filtering = g.V().hasLabel("User").has("id", "target_user")\
.out("PURCHASED").aggregate("purchased")\
.in_("PURCHASED").where(P.neq("target_user"))\
.as_("similar_user")\
.out("PURCHASED").where(P.without("purchased"))\
.groupCount().by("productId")\
.order().by(Column.values, Order.desc)\
.limit(10).toList()
# コンテンツベースフィルタリング
content_based = g.V().hasLabel("User").has("id", "target_user")\
.out("PURCHASED").hasLabel("Product")\
.out("BELONGS_TO").hasLabel("Category")\
.in_("BELONGS_TO").hasLabel("Product")\
.where(P.without("purchased"))\
.groupCount().by("productId")\
.order().by(Column.values, Order.desc)\
.limit(5).toList()
return collaborative_filtering, content_based