データベース

Amazon Neptune

概要

Amazon Neptuneは、AWSが提供するフルマネージドグラフデータベースサービスです。PropertyGraph(Apache TinkerPop Gremlin)とRDF(SPARQL 1.1)の両方のグラフモデルをサポートし、高可用性、自動スケーリング、運用負荷軽減を実現します。AWSの他のサービスとシームレスに統合でき、数十億の関係性を持つ大規模グラフデータを数ミリ秒のレイテンシで処理できます。

詳細

Amazon Neptuneは2018年にAWSから正式リリースされました。従来のグラフデータベースと異なり、完全なマネージドサービスとして提供され、インフラストラクチャの管理やメンテナンスが不要です。最大64TBまでの自動スケーリングストレージ、3つのアベイラビリティゾーンにわたる6つのデータレプリカ、自動バックアップとポイントインタイムリカバリを提供します。

Amazon Neptuneの主な特徴:

  • デュアルグラフエンジン(Property Graph + RDF)
  • Apache TinkerPop Gremlin クエリ言語サポート
  • SPARQL 1.1 準拠のRDFクエリ
  • 最大100,000 QPS のクエリ処理能力
  • VPC内での実行とKMSによる暗号化
  • AWS IAMによるきめ細かなアクセス制御
  • Amazon Athenaとの統合によるSQL分析
  • 自動フェイルオーバーとリードレプリカ
  • Amazon SageMakerとの機械学習統合
  • AWS Lambda、EC2、EKSからの接続サポート

メリット・デメリット

メリット

  • フルマネージド: インフラ管理、パッチ適用、バックアップが自動化
  • 高可用性: 3-AZ構成で99.99%の可用性SLAを提供
  • 自動スケーリング: ストレージとコンピュートの独立スケーリング
  • デュアルエンジン: GremlinとSPARQLの両方をネイティブサポート
  • AWSエコシステム: 他のAWSサービスとのシームレス連携
  • セキュリティ: VPC、IAM、KMS暗号化の統合セキュリティ
  • パフォーマンス: 専用グラフエンジンによる高速処理
  • 運用負荷軽減: モニタリング、アラート、メトリクスが統合

デメリット

  • ベンダーロックイン: AWSプラットフォーム依存
  • コスト: 小規模使用でも固定的なインスタンス料金
  • 学習コスト: GremlinとSPARQLの習得が必要
  • カスタマイゼーション制限: マネージドサービスによる設定制約
  • リージョン制限: 利用可能リージョンが限定的
  • 複雑な料金体系: インスタンス、ストレージ、I/O料金の組み合わせ

主要リンク

書き方の例

セットアップ・接続設定

# AWS CLI設定
aws configure
# アクセスキー、シークレットキー、リージョンを設定

# Python Gremlinクライアントインストール
pip install gremlinpython boto3

# Python SPARQLクライアントインストール  
pip install rdflib requests

# Java依存関係(Maven)
<dependency>
    <groupId>org.apache.tinkerpop</groupId>
    <artifactId>gremlin-driver</artifactId>
    <version>3.6.2</version>
</dependency>

<dependency>
    <groupId>org.eclipse.rdf4j</groupId>
    <artifactId>rdf4j-repository-sparql</artifactId>
    <version>3.7.4</version>
</dependency>

# ネットワーク設定確認
# Neptune VPCセキュリティグループでポート8182(Gremlin)、8183(SPARQL)を開放

基本操作(Gremlin)

# Gremlin接続とCRUD操作
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.anonymous_traversal import traversal
import boto3

# Neptune接続
database_url = "wss://your-neptune-endpoint.region.neptune.amazonaws.com:8182/gremlin"
remoteConn = DriverRemoteConnection(database_url, "g")
g = traversal().withRemote(remoteConn)

# 頂点作成(Create)
person = g.addV("Person").property("name", "田中太郎").property("age", 30).next()
company = g.addV("Company").property("name", "テック株式会社").property("industry", "IT").next()

# エッジ作成
g.V(person).addE("WORKS_FOR").to(g.V(company)).property("since", 2020).iterate()

# 頂点検索(Read)
persons = g.V().hasLabel("Person").toList()
print(f"Persons found: {len(persons)}")

# 特定条件での検索
result = g.V().hasLabel("Person").has("name", "田中太郎").valueMap().toList()
print(result)

# 関係性クエリ
paths = g.V().hasLabel("Person").has("name", "田中太郎")\
    .outE("WORKS_FOR").inV().hasLabel("Company")\
    .path().by("name").toList()

# 頂点更新(Update)
g.V().hasLabel("Person").has("name", "田中太郎")\
    .property("age", 31).property("location", "東京").iterate()

# 頂点削除(Delete)
g.V().hasLabel("Person").has("name", "田中太郎").drop().iterate()

# 接続クローズ
remoteConn.close()

基本操作(SPARQL)

# SPARQL接続とCRUD操作
import requests
import json

sparql_endpoint = "https://your-neptune-endpoint.region.neptune.amazonaws.com:8183/sparql"

# RDFトリプル挿入(Create)
insert_query = """
INSERT DATA {
    <http://example.org/person/tanaka> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://example.org/Person> .
    <http://example.org/person/tanaka> <http://example.org/name> "田中太郎" .
    <http://example.org/person/tanaka> <http://example.org/age> "30"^^<http://www.w3.org/2001/XMLSchema#int> .
    <http://example.org/company/tech> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://example.org/Company> .
    <http://example.org/company/tech> <http://example.org/name> "テック株式会社" .
    <http://example.org/person/tanaka> <http://example.org/worksFor> <http://example.org/company/tech> .
}
"""

response = requests.post(sparql_endpoint, 
                        data={'update': insert_query},
                        headers={'Content-Type': 'application/x-www-form-urlencoded'})

# データ検索(Read)
select_query = """
SELECT ?person ?name ?age WHERE {
    ?person <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://example.org/Person> .
    ?person <http://example.org/name> ?name .
    ?person <http://example.org/age> ?age .
}
"""

response = requests.post(sparql_endpoint,
                        data={'query': select_query},
                        headers={'Content-Type': 'application/x-www-form-urlencoded'})
results = response.json()

# 関係性クエリ
relationship_query = """
SELECT ?person ?company WHERE {
    ?person <http://example.org/worksFor> ?company .
    ?person <http://example.org/name> "田中太郎" .
}
"""

# データ更新(Update)
update_query = """
DELETE { <http://example.org/person/tanaka> <http://example.org/age> ?oldAge }
INSERT { <http://example.org/person/tanaka> <http://example.org/age> "31"^^<http://www.w3.org/2001/XMLSchema#int> }
WHERE { <http://example.org/person/tanaka> <http://example.org/age> ?oldAge }
"""

# データ削除(Delete)
delete_query = """
DELETE WHERE {
    <http://example.org/person/tanaka> ?p ?o .
}
"""

データモデリング・スキーマ設計

# Gremlinを使った複雑なデータモデル
# eコマースシステムのモデリング

# ユーザー、商品、カテゴリ作成
def create_ecommerce_model():
    # ユーザー作成
    user1 = g.addV("User").property("id", "user001").property("name", "佐藤花子")\
        .property("email", "[email protected]").property("joinDate", "2024-01-15").next()
    
    # 商品とカテゴリ作成
    laptop = g.addV("Product").property("id", "prod001").property("name", "ノートPC")\
        .property("price", 120000).property("brand", "TechBrand").next()
        
    mouse = g.addV("Product").property("id", "prod002").property("name", "ワイヤレスマウス")\
        .property("price", 3000).property("brand", "TechBrand").next()
        
    category = g.addV("Category").property("name", "コンピュータ").property("level", 1).next()
    
    # 関係性作成
    g.V(user1).addE("PURCHASED").to(g.V(laptop))\
        .property("date", "2024-02-01").property("quantity", 1).iterate()
        
    g.V(user1).addE("VIEWED").to(g.V(mouse))\
        .property("timestamp", "2024-02-15T10:30:00").iterate()
        
    g.V(laptop).addE("BELONGS_TO").to(g.V(category)).iterate()
    g.V(mouse).addE("BELONGS_TO").to(g.V(category)).iterate()

# SPARQLでのオントロジー設計
ontology_insert = """
PREFIX ex: <http://example.org/>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>

INSERT DATA {
    ex:User rdf:type rdfs:Class .
    ex:Product rdf:type rdfs:Class .
    ex:Category rdf:type rdfs:Class .
    
    ex:hasName rdf:type rdf:Property .
    ex:hasPrice rdf:type rdf:Property .
    ex:belongsToCategory rdf:type rdf:Property .
    ex:purchasedBy rdf:type rdf:Property .
    
    ex:user001 rdf:type ex:User .
    ex:user001 ex:hasName "佐藤花子" .
    ex:prod001 rdf:type ex:Product .
    ex:prod001 ex:hasName "ノートPC" .
    ex:prod001 ex:hasPrice "120000"^^<http://www.w3.org/2001/XMLSchema#int> .
}
"""

パフォーマンス最適化・インデックス

# Gremlinでのインデックス最適化戦略
# Neptune は自動的にインデックスを管理しますが、クエリパターンの最適化が重要

# 効率的なクエリパターン
def optimized_queries():
    # インデックスを活用する効率的な検索
    # has() を使って最初にフィルタリング
    result = g.V().hasLabel("User").has("email", "[email protected]")\
        .outE("PURCHASED").inV().hasLabel("Product").valueMap().toList()
    
    # 複数プロパティでの絞り込み
    expensive_products = g.V().hasLabel("Product")\
        .has("price", P.gte(100000))\
        .has("category", "electronics").toList()
    
    # エッジ方向を明示的に指定
    user_purchases = g.V().hasLabel("User").has("id", "user001")\
        .out("PURCHASED").hasLabel("Product").toList()

# クエリプロファイリング
def profile_query():
    # profile() でクエリパフォーマンス分析
    profile_result = g.V().hasLabel("User")\
        .out("PURCHASED").hasLabel("Product")\
        .profile().toList()
    print(profile_result)

# SPARQLでのクエリ最適化
optimized_sparql = """
PREFIX ex: <http://example.org/>

SELECT ?user ?product ?price WHERE {
    ?user a ex:User .
    ?user ex:purchased ?product .
    ?product ex:hasPrice ?price .
    FILTER(?price > 50000)
}
ORDER BY DESC(?price)
LIMIT 10
"""

# バッチ処理最適化
def batch_operations():
    # 複数操作をまとめて実行
    batch_traversal = g.addV("User").property("name", "ユーザー1")\
        .as_("user1")\
        .addV("Product").property("name", "商品1")\
        .as_("product1")\
        .addE("PURCHASED").from_("user1").to("product1")
    
    batch_traversal.iterate()

高度な分析クエリ

# Gremlinでの高度なグラフ分析
def advanced_graph_analytics():
    # 最短パス検索
    shortest_path = g.V().hasLabel("User").has("name", "佐藤花子")\
        .repeat(outE().otherV().simplePath())\
        .until(hasLabel("Product").has("name", "ノートPC"))\
        .path().limit(1).toList()
    
    # 推薦アルゴリズム(協調フィルタリング)
    recommendations = g.V().hasLabel("User").has("name", "佐藤花子")\
        .out("PURCHASED").aggregate("purchased")\
        .in_("PURCHASED").where(P.neq("佐藤花子"))\
        .out("PURCHASED").where(P.without("purchased"))\
        .groupCount().order().by(Column.values, Order.desc)\
        .limit(5).toList()
    
    # ネットワーク中心性分析
    centrality = g.V().hasLabel("Product")\
        .project("product", "connections")\
        .by("name")\
        .by(bothE().count())\
        .order().by(Column.values, Order.desc)\
        .limit(10).toList()
    
    # コミュニティ検出(トライアングル検索)
    triangles = g.V().hasLabel("User").as_("a")\
        .out("FRIENDS_WITH").as_("b")\
        .out("FRIENDS_WITH").as_("c")\
        .where("c", P.eq("a"))\
        .select("a", "b", "c").by("name").toList()

# SPARQLでの高度な分析
advanced_sparql_queries = """
# ページランク風の重要度計算
PREFIX ex: <http://example.org/>

SELECT ?product (COUNT(?purchase) AS ?popularity) WHERE {
    ?user ex:purchased ?product .
    ?product ex:hasName ?productName .
}
GROUP BY ?product
ORDER BY DESC(?popularity)
LIMIT 10

# 時系列分析
SELECT ?month (COUNT(?purchase) AS ?sales) WHERE {
    ?user ex:purchased ?product .
    ?purchase ex:date ?date .
    BIND(SUBSTR(?date, 1, 7) AS ?month)
}
GROUP BY ?month
ORDER BY ?month

# グラフパターンマッチング
SELECT ?user1 ?user2 ?commonProduct WHERE {
    ?user1 ex:purchased ?commonProduct .
    ?user2 ex:purchased ?commonProduct .
    FILTER(?user1 != ?user2)
}
"""

AWS統合・運用監視

# AWS Neptuneのモニタリングと管理
import boto3

# CloudWatch メトリクス取得
def get_neptune_metrics():
    cloudwatch = boto3.client('cloudwatch')
    
    # クエリ実行時間メトリクス
    response = cloudwatch.get_metric_statistics(
        Namespace='AWS/Neptune',
        MetricName='GremlinRequestsPerSec',
        Dimensions=[
            {
                'Name': 'DBClusterIdentifier',
                'Value': 'your-neptune-cluster'
            }
        ],
        StartTime='2024-01-01T00:00:00Z',
        EndTime='2024-01-02T00:00:00Z',
        Period=300,
        Statistics=['Average', 'Maximum']
    )
    
    return response['Datapoints']

# IAMロールベースアクセス制御
def create_neptune_connection_with_iam():
    from botocore.auth import SigV4Auth
    from botocore.awsrequest import AWSRequest
    
    # IAM認証を使用したNeptune接続
    session = boto3.Session()
    credentials = session.get_credentials()
    
    #署名付きGremlin接続(WebSocket)
    database_url = "wss://your-neptune-endpoint.region.neptune.amazonaws.com:8182/gremlin"
    
    # カスタム認証ハンドラー実装
    def create_signed_connection():
        # SigV4署名でWebSocket接続を作成
        pass

# Lambda関数でのNeptune操作
def lambda_handler(event, context):
    try:
        # Gremlin接続
        database_url = "wss://your-neptune-endpoint.region.neptune.amazonaws.com:8182/gremlin"
        remoteConn = DriverRemoteConnection(database_url, "g")
        g = traversal().withRemote(remoteConn)
        
        # ビジネスロジック実行
        result = g.V().hasLabel("User").has("id", event['userId'])\
            .out("PURCHASED").hasLabel("Product").valueMap().toList()
        
        remoteConn.close()
        
        return {
            'statusCode': 200,
            'body': json.dumps(result)
        }
    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

# バックアップと復旧
def neptune_backup_management():
    neptune = boto3.client('neptune')
    
    # 手動スナップショット作成
    response = neptune.create_db_cluster_snapshot(
        DBSnapshotIdentifier='manual-snapshot-2024-01',
        DBClusterIdentifier='your-neptune-cluster'
    )
    
    # ポイントインタイムリカバリ
    response = neptune.restore_db_cluster_to_point_in_time(
        DBClusterIdentifier='restored-cluster',
        SourceDBClusterIdentifier='your-neptune-cluster',
        RestoreToTime='2024-01-01T12:00:00Z'
    )

実用例・ユースケース実装

# 不正検知システムの実装
def fraud_detection_system():
    # 疑わしい取引パターンの検出
    suspicious_transactions = g.V().hasLabel("Account")\
        .outE("TRANSFER")\
        .has("amount", P.gt(100000))\
        .has("timestamp", P.gt("2024-01-01"))\
        .inV().hasLabel("Account")\
        .groupCount().by("accountId")\
        .unfold()\
        .where(Column.values, P.gt(10))\
        .toList()
    
    # リング状送金の検出
    money_laundering = g.V().hasLabel("Account").as_("start")\
        .repeat(outE("TRANSFER").inV().simplePath())\
        .until(where(P.eq("start")))\
        .path().toList()
    
    return suspicious_transactions, money_laundering

# ソーシャルネットワーク分析
def social_network_analysis():
    # 影響力の高いユーザー検出
    influencers = g.V().hasLabel("User")\
        .project("user", "followers", "influence")\
        .by("name")\
        .by(inE("FOLLOWS").count())\
        .by(inE("FOLLOWS").outV().outE("LIKES").count().mean())\
        .order().by(Column.values.get(2), Order.desc)\
        .limit(10).toList()
    
    # コミュニティ検出(モジュラリティ最大化)
    communities = g.V().hasLabel("User")\
        .aggregate("allUsers")\
        .repeat(outE("FRIENDS_WITH").otherV().simplePath())\
        .until(__.where(P.within("allUsers")).count().is_(P.gt(5)))\
        .path().toList()

# 知識グラフシステム
knowledge_graph_sparql = """
PREFIX ex: <http://example.org/>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>

# エンティティ間の関係推論
SELECT ?entity1 ?relationship ?entity2 WHERE {
    ?entity1 ?relationship ?entity2 .
    ?entity1 rdf:type ex:Person .
    ?entity2 rdf:type ex:Organization .
    FILTER(?relationship != rdf:type)
}

# 階層構造の検索
SELECT ?parent ?child ?depth WHERE {
    ex:root_entity (ex:hasChild+) ?child .
    ex:root_entity (ex:hasChild*) ?parent .
    ?parent ex:hasChild ?child .
}

# 語彙的類似性検索
SELECT ?concept ?similarity WHERE {
    ?concept rdfs:label ?label .
    FILTER(CONTAINS(LCASE(?label), "技術"))
    BIND(1.0 AS ?similarity)
}
ORDER BY DESC(?similarity)
"""

# レコメンデーションエンジン
def recommendation_engine():
    # 協調フィルタリング
    collaborative_filtering = g.V().hasLabel("User").has("id", "target_user")\
        .out("PURCHASED").aggregate("purchased")\
        .in_("PURCHASED").where(P.neq("target_user"))\
        .as_("similar_user")\
        .out("PURCHASED").where(P.without("purchased"))\
        .groupCount().by("productId")\
        .order().by(Column.values, Order.desc)\
        .limit(10).toList()
    
    # コンテンツベースフィルタリング
    content_based = g.V().hasLabel("User").has("id", "target_user")\
        .out("PURCHASED").hasLabel("Product")\
        .out("BELONGS_TO").hasLabel("Category")\
        .in_("BELONGS_TO").hasLabel("Product")\
        .where(P.without("purchased"))\
        .groupCount().by("productId")\
        .order().by(Column.values, Order.desc)\
        .limit(5).toList()
    
    return collaborative_filtering, content_based