MongoDB

ドキュメント指向のNoSQLデータベース。JSONライクなドキュメント形式でデータを格納。スケーラビリティと開発者フレンドリーな設計が特徴。

データベースサーバーNoSQLデータベースドキュメント指向分散システムスケーラブルビッグデータリアルタイム分析クラウドネイティブ

データベースサーバー

MongoDB

概要

MongoDBは、2009年に10gen(現MongoDB Inc.)によって開発された、ドキュメント指向NoSQLデータベースです。JSONライクなBSONフォーマットでデータを格納し、柔軟なスキーマ設計、自動シャーディング、レプリカセットによる高可用性を実現する、現代のアプリケーション開発に最適化されたデータベースシステムです。MongoDB 8.0(2024年リリース)では、25%の性能向上、Queryable Encryptionの強化、バックグラウンド圧縮機能、時系列データの処理性能が60%向上するなど、大幅な進化を遂げています。MongoDB Atlasクラウドサービスとの組み合わせにより、グローバル規模での自動スケーリング、セキュリティ強化、マルチクラウド対応を実現し、スタートアップから大企業まで幅広く採用されています。開発者フレンドリーなAPIとリッチなエコシステムにより、迅速なアプリケーション開発と運用を支援します。

詳細

MongoDB 8.0は、2025年版として従来のNoSQLデータベースの限界を突破した包括的なデータプラットフォームへと進化しています。最新版では、リアルタイム分析、機械学習統合、暗号化クエリ(Queryable Encryption)、時系列データ最適化、ベクトル検索によるAI対応が大幅に強化されています。MongoDB Atlasでは、自動スケーリングが50%高速化、リアルタイムリソース対応が5倍高速化、Atlas Searchによる全文検索とベクトル検索の統合により、現代のデータドリブンアプリケーションに必要な全ての機能を提供します。また、Relational Migratorによる既存RDBからの移行支援、Atlas Data Federationによるデータレイク統合、Charts・Compass・Ops Managerなどの包括的な運用ツールにより、エンタープライズレベルの要求に応えます。ACID準拠のマルチドキュメントトランザクション、分散アーキテクチャによる無制限スケーラビリティ、Change Streamsによるリアルタイムデータ変更監視により、ミッションクリティカルなシステムの基盤として機能します。

主な特徴

  • ドキュメント指向: JSONライクなBSONによる直感的で柔軟なデータモデリング
  • 自動シャーディング: データ分散とクエリ負荷の自動バランシング
  • 高可用性: レプリカセットによる自動フェイルオーバーとデータ冗長化
  • ACID準拠: マルチドキュメントトランザクションによる強一貫性
  • Atlas Cloud: マネージドサービスによる運用負荷ゼロとグローバル展開
  • リアルタイム分析: 集約パイプラインとTime Seriesによる高速データ処理

メリット・デメリット

メリット

  • リレーショナルDBより30-50%高速な読み書き性能と水平スケーラビリティ
  • 柔軟なスキーマ設計によるアジャイル開発とマイクロサービス対応
  • Atlas Cloudでの自動スケーリング・バックアップ・セキュリティ運用
  • 豊富なプログラミング言語ドライバとフレームワーク統合
  • Change Streamsによるリアルタイムデータ変更の検知・配信
  • 地理空間データ、全文検索、時系列データの組み込みサポート

デメリット

  • 複雑なJOIN操作やトランザクション処理ではRDBが優位
  • メモリ使用量がRDBMSより多く、ストレージコストの増大
  • 分散システムの複雑性による運用・デバッグの学習コスト
  • スキーマレス設計による潜在的なデータ整合性リスク
  • MongoDB AtlasのライセンスコストとCloud Provider依存
  • ACID保証のパフォーマンスオーバーヘッドと設計トレードオフ

参考ページ

書き方の例

インストールと基本セットアップ

# Ubuntu/Debian系でのMongoDB Community Edition インストール
# MongoDB公式GPGキーのインポート
wget -qO - https://www.mongodb.org/static/pgp/server-8.0.asc | sudo apt-key add -

# MongoDB リポジトリの追加
echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu $(lsb_release -cs)/mongodb-org/8.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-8.0.list

# パッケージデータベースの更新とMongoDB インストール
sudo apt-get update
sudo apt-get install -y mongodb-org

# MongoDB サービスの開始と自動起動設定
sudo systemctl enable mongod
sudo systemctl start mongod
sudo systemctl status mongod

# Docker Composeを使用したMongoDB環境構築
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
  mongodb:
    image: mongo:8.0
    container_name: mongodb
    restart: unless-stopped
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: password123
      MONGO_INITDB_DATABASE: myapp
    ports:
      - "27017:27017"
    volumes:
      - mongodb_data:/data/db
      - mongodb_config:/data/configdb
      - ./mongod.conf:/etc/mongod.conf
    command: --config /etc/mongod.conf

  mongo-express:
    image: mongo-express:latest
    container_name: mongo-express
    restart: unless-stopped
    ports:
      - "8081:8081"
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: admin
      ME_CONFIG_MONGODB_ADMINPASSWORD: password123
      ME_CONFIG_MONGODB_URL: mongodb://admin:password123@mongodb:27017/
      ME_CONFIG_BASICAUTH: false
    depends_on:
      - mongodb

volumes:
  mongodb_data:
    driver: local
  mongodb_config:
    driver: local
EOF

# MongoDB設定ファイル作成
cat > mongod.conf << 'EOF'
# MongoDB 8.0 最適化設定
storage:
  dbPath: /data/db
  journal:
    enabled: true
  wiredTiger:
    engineConfig:
      cacheSizeGB: 2
      journalCompressor: snappy
    collectionConfig:
      blockCompressor: snappy

systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod.log
  logRotate: rename

net:
  port: 27017
  bindIp: 0.0.0.0

processManagement:
  timeZoneInfo: /usr/share/zoneinfo

security:
  authorization: enabled

operationProfiling:
  slowOpThresholdMs: 100
  mode: slowOp
EOF

# サービス起動
docker-compose up -d

# 接続確認
mongosh --host localhost --port 27017 --username admin --password password123

# MongoDB Atlasクラスターへの接続
mongosh "mongodb+srv://cluster0.xxxxx.mongodb.net/myFirstDatabase" --apiVersion 1 --username <username>

基本的なデータベース操作とCRUD

// MongoDB Shell (mongosh) での基本操作

// データベース作成・切り替え
use ecommerce_db

// コレクション作成とドキュメント挿入
db.products.insertMany([
  {
    name: "MacBook Pro 16-inch",
    category: "laptop",
    price: 2499.99,
    specifications: {
      cpu: "Apple M3 Max",
      memory: "32GB",
      storage: "1TB SSD",
      screen: "16.2-inch Liquid Retina XDR"
    },
    tags: ["apple", "professional", "high-performance"],
    inventory: {
      stock: 15,
      warehouse: "Tokyo",
      supplier: "Apple Inc."
    },
    reviews: [
      {
        user: "tech_enthusiast",
        rating: 5,
        comment: "Outstanding performance for development work",
        date: new Date("2024-01-15")
      }
    ],
    created_at: new Date(),
    updated_at: new Date()
  },
  {
    name: "Dell XPS 13",
    category: "laptop",
    price: 1299.99,
    specifications: {
      cpu: "Intel Core i7-1360P",
      memory: "16GB",
      storage: "512GB SSD",
      screen: "13.4-inch FHD+"
    },
    tags: ["dell", "portable", "business"],
    inventory: {
      stock: 8,
      warehouse: "Osaka",
      supplier: "Dell Technologies"
    },
    reviews: [],
    created_at: new Date(),
    updated_at: new Date()
  }
])

// 基本的なクエリ操作
// 全件取得
db.products.find().pretty()

// 条件検索
db.products.find({ category: "laptop" })

// 価格範囲での検索
db.products.find({ 
  price: { $gte: 1000, $lte: 2000 } 
})

// 複合条件検索
db.products.find({ 
  $and: [
    { category: "laptop" },
    { "inventory.stock": { $gte: 10 } }
  ]
})

// テキスト検索のためのインデックス作成
db.products.createIndex({ 
  name: "text", 
  "specifications.cpu": "text", 
  tags: "text" 
})

// テキスト検索実行
db.products.find({ 
  $text: { $search: "Apple MacBook" } 
})

// 配列要素の検索
db.products.find({ 
  tags: { $in: ["apple", "professional"] } 
})

// ネストされたオブジェクトの検索
db.products.find({ 
  "specifications.memory": "32GB" 
})

// ドキュメント更新
db.products.updateOne(
  { name: "MacBook Pro 16-inch" },
  { 
    $set: { price: 2399.99, updated_at: new Date() },
    $push: { 
      reviews: {
        user: "developer_pro",
        rating: 5,
        comment: "Perfect for iOS development",
        date: new Date()
      }
    }
  }
)

// 配列要素の更新
db.products.updateOne(
  { 
    name: "MacBook Pro 16-inch",
    "reviews.user": "tech_enthusiast"
  },
  { 
    $set: { "reviews.$.rating": 4 } 
  }
)

// upsert操作(存在しない場合は作成)
db.products.updateOne(
  { name: "iPad Pro 12.9-inch" },
  { 
    $set: {
      category: "tablet",
      price: 1099.99,
      created_at: new Date()
    }
  },
  { upsert: true }
)

// ドキュメント削除
db.products.deleteOne({ name: "iPad Pro 12.9-inch" })

// 条件に合う複数ドキュメントの削除
db.products.deleteMany({ "inventory.stock": { $lt: 5 } })

Python統合とアプリケーション開発

import pymongo
from pymongo import MongoClient
from datetime import datetime, timezone
import certifi
from bson.objectid import ObjectId
import json

class MongoDBManager:
    def __init__(self, connection_string=None, database_name="ecommerce_db"):
        """MongoDB接続管理クラス"""
        if connection_string:
            # MongoDB Atlas接続
            self.client = MongoClient(
                connection_string,
                tlsCAFile=certifi.where()
            )
        else:
            # ローカルMongoDB接続
            self.client = MongoClient(
                "mongodb://admin:password123@localhost:27017/",
                authSource='admin'
            )
        
        self.db = self.client[database_name]
        self.products = self.db.products
        self.users = self.db.users
        self.orders = self.db.orders
    
    def test_connection(self):
        """接続テスト"""
        try:
            self.client.admin.command('ping')
            print("MongoDB connection successful!")
            return True
        except Exception as e:
            print(f"MongoDB connection failed: {e}")
            return False
    
    def create_indexes(self):
        """インデックス作成"""
        # 商品検索用インデックス
        self.products.create_index([
            ("name", pymongo.TEXT),
            ("tags", pymongo.TEXT),
            ("specifications.cpu", pymongo.TEXT)
        ])
        
        # カテゴリ・価格での効率的検索
        self.products.create_index([
            ("category", 1),
            ("price", 1)
        ])
        
        # 在庫管理用インデックス
        self.products.create_index("inventory.stock")
        
        # ユーザー管理用インデックス
        self.users.create_index("email", unique=True)
        
        # 注文履歴用インデックス
        self.orders.create_index([
            ("user_id", 1),
            ("order_date", -1)
        ])
        
        print("Indexes created successfully")
    
    def insert_product(self, product_data):
        """商品データ挿入"""
        try:
            product_data['created_at'] = datetime.now(timezone.utc)
            product_data['updated_at'] = datetime.now(timezone.utc)
            result = self.products.insert_one(product_data)
            return result.inserted_id
        except Exception as e:
            print(f"Product insertion error: {e}")
            return None
    
    def search_products(self, query=None, category=None, price_range=None, limit=10):
        """商品検索"""
        filter_conditions = {}
        
        # テキスト検索
        if query:
            filter_conditions['$text'] = {'$search': query}
        
        # カテゴリフィルタ
        if category:
            filter_conditions['category'] = category
        
        # 価格範囲フィルタ
        if price_range:
            filter_conditions['price'] = {
                '$gte': price_range.get('min', 0),
                '$lte': price_range.get('max', float('inf'))
            }
        
        # 検索実行
        cursor = self.products.find(filter_conditions).limit(limit)
        
        if query:
            # テキスト検索の場合、関連度でソート
            cursor = cursor.sort([('score', {'$meta': 'textScore'})])
        else:
            # 通常検索の場合、価格でソート
            cursor = cursor.sort("price", 1)
        
        return list(cursor)
    
    def get_product_analytics(self):
        """商品分析(集約パイプライン使用)"""
        pipeline = [
            # カテゴリ別の統計
            {
                '$group': {
                    '_id': '$category',
                    'product_count': {'$sum': 1},
                    'avg_price': {'$avg': '$price'},
                    'max_price': {'$max': '$price'},
                    'min_price': {'$min': '$price'},
                    'total_stock': {'$sum': '$inventory.stock'}
                }
            },
            # 価格で降順ソート
            {
                '$sort': {'avg_price': -1}
            },
            # 結果の整形
            {
                '$project': {
                    'category': '$_id',
                    'product_count': 1,
                    'avg_price': {'$round': ['$avg_price', 2]},
                    'max_price': 1,
                    'min_price': 1,
                    'total_stock': 1,
                    '_id': 0
                }
            }
        ]
        
        return list(self.products.aggregate(pipeline))
    
    def update_inventory(self, product_id, quantity_change):
        """在庫更新"""
        try:
            result = self.products.update_one(
                {'_id': ObjectId(product_id)},
                {
                    '$inc': {'inventory.stock': quantity_change},
                    '$set': {'updated_at': datetime.now(timezone.utc)}
                }
            )
            return result.modified_count > 0
        except Exception as e:
            print(f"Inventory update error: {e}")
            return False
    
    def create_order(self, user_id, items):
        """注文作成(トランザクション使用)"""
        with self.client.start_session() as session:
            with session.start_transaction():
                try:
                    # 在庫確認と減算
                    total_amount = 0
                    order_items = []
                    
                    for item in items:
                        product = self.products.find_one(
                            {'_id': ObjectId(item['product_id'])},
                            session=session
                        )
                        
                        if not product:
                            raise Exception(f"Product {item['product_id']} not found")
                        
                        if product['inventory']['stock'] < item['quantity']:
                            raise Exception(f"Insufficient stock for {product['name']}")
                        
                        # 在庫減算
                        self.products.update_one(
                            {'_id': ObjectId(item['product_id'])},
                            {'$inc': {'inventory.stock': -item['quantity']}},
                            session=session
                        )
                        
                        item_total = product['price'] * item['quantity']
                        total_amount += item_total
                        
                        order_items.append({
                            'product_id': item['product_id'],
                            'product_name': product['name'],
                            'price': product['price'],
                            'quantity': item['quantity'],
                            'subtotal': item_total
                        })
                    
                    # 注文作成
                    order = {
                        'user_id': user_id,
                        'items': order_items,
                        'total_amount': total_amount,
                        'status': 'pending',
                        'order_date': datetime.now(timezone.utc),
                        'created_at': datetime.now(timezone.utc)
                    }
                    
                    result = self.orders.insert_one(order, session=session)
                    
                    # トランザクションコミット
                    session.commit_transaction()
                    return result.inserted_id
                    
                except Exception as e:
                    # トランザクションロールバック
                    session.abort_transaction()
                    print(f"Order creation failed: {e}")
                    return None

# 使用例とパフォーマンステスト
def performance_test():
    """パフォーマンステスト"""
    import time
    
    # Atlas接続の場合
    # connection_string = "mongodb+srv://username:[email protected]/"
    # db_manager = MongoDBManager(connection_string)
    
    # ローカル接続の場合
    db_manager = MongoDBManager()
    
    if not db_manager.test_connection():
        return
    
    # インデックス作成
    db_manager.create_indexes()
    
    # 大量データ挿入テスト
    print("大量データ挿入テスト開始...")
    start_time = time.time()
    
    products_data = []
    categories = ["laptop", "smartphone", "tablet", "desktop", "accessory"]
    
    for i in range(1000):
        product = {
            "name": f"Product {i}",
            "category": categories[i % len(categories)],
            "price": round(100 + (i * 10.5), 2),
            "specifications": {
                "cpu": f"Processor {i % 20}",
                "memory": f"{8 + (i % 4) * 8}GB",
                "storage": f"{256 + (i % 4) * 256}GB"
            },
            "tags": [f"tag{i % 10}", f"feature{i % 15}"],
            "inventory": {
                "stock": i % 50,
                "warehouse": f"Warehouse {i % 5}"
            },
            "reviews": []
        }
        products_data.append(product)
    
    # バッチ挿入
    result = db_manager.products.insert_many(products_data)
    insertion_time = time.time() - start_time
    print(f"1000件挿入時間: {insertion_time:.2f}秒")
    
    # 検索パフォーマンステスト
    start_time = time.time()
    results = db_manager.search_products(
        query="Processor",
        category="laptop",
        price_range={"min": 500, "max": 1500}
    )
    search_time = time.time() - start_time
    print(f"複合検索時間: {search_time:.4f}秒, 結果数: {len(results)}")
    
    # 集約パフォーマンステスト
    start_time = time.time()
    analytics = db_manager.get_product_analytics()
    aggregation_time = time.time() - start_time
    print(f"集約処理時間: {aggregation_time:.4f}秒")
    print("カテゴリ別分析結果:")
    for category in analytics:
        print(f"  {category['category']}: {category['product_count']}商品, 平均価格: ${category['avg_price']}")

if __name__ == "__main__":
    # MongoDB管理オブジェクト作成
    db_manager = MongoDBManager()
    
    try:
        # 接続テスト
        if not db_manager.test_connection():
            exit(1)
        
        # サンプル商品データ挿入
        sample_product = {
            "name": "Apple iPhone 15 Pro",
            "category": "smartphone",
            "price": 999.99,
            "specifications": {
                "cpu": "A17 Pro chip",
                "memory": "8GB",
                "storage": "128GB",
                "screen": "6.1-inch Super Retina XDR"
            },
            "tags": ["apple", "5g", "professional"],
            "inventory": {
                "stock": 25,
                "warehouse": "Tokyo",
                "supplier": "Apple Inc."
            },
            "reviews": []
        }
        
        product_id = db_manager.insert_product(sample_product)
        print(f"商品挿入完了: {product_id}")
        
        # 商品検索テスト
        search_results = db_manager.search_products(query="iPhone", category="smartphone")
        print(f"検索結果: {len(search_results)}件")
        
        # 分析レポート
        analytics = db_manager.get_product_analytics()
        print(f"カテゴリ分析: {len(analytics)}カテゴリ")
        
        # パフォーマンステスト実行
        performance_test()
        
    except Exception as e:
        print(f"エラーが発生しました: {e}")
    
    finally:
        db_manager.client.close()

レプリカセットとシャーディング設定

# レプリカセット構成(3ノード)の設定
# ノード1の設定
sudo tee /etc/mongod1.conf << 'EOF'
storage:
  dbPath: /data/db1
  journal:
    enabled: true

systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod1.log

net:
  port: 27017
  bindIp: 0.0.0.0

replication:
  replSetName: "rs0"

security:
  authorization: enabled
  keyFile: /opt/mongodb/keyfile
EOF

# ノード2の設定
sudo tee /etc/mongod2.conf << 'EOF'
storage:
  dbPath: /data/db2
  journal:
    enabled: true

systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod2.log

net:
  port: 27018
  bindIp: 0.0.0.0

replication:
  replSetName: "rs0"

security:
  authorization: enabled
  keyFile: /opt/mongodb/keyfile
EOF

# ノード3の設定
sudo tee /etc/mongod3.conf << 'EOF'
storage:
  dbPath: /data/db3
  journal:
    enabled: true

systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod3.log

net:
  port: 27019
  bindIp: 0.0.0.0

replication:
  replSetName: "rs0"

security:
  authorization: enabled
  keyFile: /opt/mongodb/keyfile
EOF

# キーファイル作成(レプリカセット認証用)
sudo mkdir -p /opt/mongodb
openssl rand -base64 756 | sudo tee /opt/mongodb/keyfile
sudo chmod 400 /opt/mongodb/keyfile
sudo chown mongodb:mongodb /opt/mongodb/keyfile

# データディレクトリ作成
sudo mkdir -p /data/db{1,2,3}
sudo chown -R mongodb:mongodb /data/db{1,2,3}

# 各ノードでMongoDB起動
sudo mongod --config /etc/mongod1.conf --fork
sudo mongod --config /etc/mongod2.conf --fork
sudo mongod --config /etc/mongod3.conf --fork

# レプリカセット初期化
mongosh --port 27017 << 'EOF'
rs.initiate({
  _id: "rs0",
  members: [
    { _id: 0, host: "localhost:27017", priority: 2 },
    { _id: 1, host: "localhost:27018", priority: 1 },
    { _id: 2, host: "localhost:27019", priority: 1 }
  ]
})

// レプリカセット状態確認
rs.status()

// 管理者ユーザー作成
use admin
db.createUser({
  user: "admin",
  pwd: "securePassword123",
  roles: ["userAdminAnyDatabase", "clusterAdmin"]
})
EOF

# シャーディング環境の構築
# Config Server設定
sudo tee /etc/mongod-config.conf << 'EOF'
storage:
  dbPath: /data/configdb

systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod-config.log

net:
  port: 27019
  bindIp: 0.0.0.0

sharding:
  clusterRole: configsvr

replication:
  replSetName: "configReplSet"

security:
  authorization: enabled
  keyFile: /opt/mongodb/keyfile
EOF

# Shard Server設定
sudo tee /etc/mongod-shard1.conf << 'EOF'
storage:
  dbPath: /data/shard1

systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod-shard1.log

net:
  port: 27018
  bindIp: 0.0.0.0

sharding:
  clusterRole: shardsvr

replication:
  replSetName: "shard1ReplSet"

security:
  authorization: enabled
  keyFile: /opt/mongodb/keyfile
EOF

# Mongos Router設定
sudo tee /etc/mongos.conf << 'EOF'
systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongos.log

net:
  port: 27017
  bindIp: 0.0.0.0

sharding:
  configDB: "configReplSet/localhost:27019"

security:
  keyFile: /opt/mongodb/keyfile
EOF

# シャーディング有効化
mongosh --port 27017 << 'EOF'
// シャード追加
sh.addShard("shard1ReplSet/localhost:27018")

// データベースのシャーディング有効化
sh.enableSharding("ecommerce_db")

// コレクションのシャーディング
sh.shardCollection("ecommerce_db.products", { "category": 1, "_id": 1 })

// シャーディング状態確認
sh.status()
EOF

監視とパフォーマンス最適化

# MongoDB監視設定とパフォーマンスチューニング

# プロファイリング有効化
mongosh << 'EOF'
// 全体的なプロファイリング設定
use admin
db.runCommand({
  setParameter: 1,
  slowOpThresholdMs: 100
})

// 特定データベースでのプロファイリング
use ecommerce_db
db.setProfilingLevel(2)  // 全操作をプロファイル

// スロークエリ確認
db.system.profile.find().limit(5).sort({ ts: -1 }).pretty()

// 統計情報取得
db.runCommand({ serverStatus: 1 })
db.runCommand({ dbStats: 1 })
db.products.stats()
EOF

# MongoDB Ops Manager風監視スクリプト
cat > mongodb_monitor.py << 'EOF'
#!/usr/bin/env python3
import pymongo
import time
import json
import os
from datetime import datetime

class MongoDBMonitor:
    def __init__(self, connection_string="mongodb://localhost:27017/"):
        self.client = pymongo.MongoClient(connection_string)
        self.admin_db = self.client.admin
    
    def get_server_status(self):
        """サーバー状態取得"""
        try:
            status = self.admin_db.command("serverStatus")
            return {
                "uptime": status["uptime"],
                "connections": status["connections"],
                "opcounters": status["opcounters"],
                "memory": status["mem"],
                "network": status["network"],
                "wiredTiger": status.get("wiredTiger", {}),
                "timestamp": datetime.now().isoformat()
            }
        except Exception as e:
            return {"error": str(e)}
    
    def get_database_stats(self):
        """データベース統計"""
        stats = []
        for db_name in self.client.list_database_names():
            if db_name not in ["admin", "config", "local"]:
                try:
                    db_stats = self.client[db_name].command("dbStats")
                    stats.append({
                        "database": db_name,
                        "collections": db_stats["collections"],
                        "dataSize": db_stats["dataSize"],
                        "storageSize": db_stats["storageSize"],
                        "indexes": db_stats["indexes"],
                        "indexSize": db_stats["indexSize"]
                    })
                except Exception as e:
                    stats.append({
                        "database": db_name,
                        "error": str(e)
                    })
        return stats
    
    def get_replication_status(self):
        """レプリケーション状態"""
        try:
            rs_status = self.admin_db.command("replSetGetStatus")
            return {
                "set": rs_status["set"],
                "members": [{
                    "name": member["name"],
                    "state": member["stateStr"],
                    "health": member["health"],
                    "uptime": member.get("uptime", 0),
                    "lag": member.get("optimeDate", datetime.now()) if "optimeDate" in member else None
                } for member in rs_status["members"]]
            }
        except Exception:
            return {"replication": "Not configured or not primary"}
    
    def analyze_slow_queries(self, db_name="ecommerce_db"):
        """スロークエリ分析"""
        try:
            db = self.client[db_name]
            slow_queries = list(db.system.profile.find().sort("ts", -1).limit(10))
            
            analysis = []
            for query in slow_queries:
                analysis.append({
                    "timestamp": query.get("ts"),
                    "operation": query.get("op"),
                    "namespace": query.get("ns"),
                    "duration_ms": query.get("millis"),
                    "command": query.get("command", {}),
                    "execution_stats": query.get("execStats", {})
                })
            
            return analysis
        except Exception as e:
            return {"error": str(e)}
    
    def generate_health_report(self):
        """健康診断レポート生成"""
        report = {
            "timestamp": datetime.now().isoformat(),
            "server_status": self.get_server_status(),
            "database_stats": self.get_database_stats(),
            "replication_status": self.get_replication_status(),
            "slow_queries": self.analyze_slow_queries(),
            "recommendations": []
        }
        
        # 推奨事項生成
        server_status = report["server_status"]
        if "connections" in server_status:
            current_connections = server_status["connections"]["current"]
            if current_connections > 80:  # 80%以上の接続使用率
                report["recommendations"].append(
                    "High connection usage detected. Consider connection pooling."
                )
        
        if "memory" in server_status:
            resident_mb = server_status["memory"]["resident"]
            if resident_mb > 4000:  # 4GB以上のメモリ使用
                report["recommendations"].append(
                    "High memory usage. Consider increasing available memory or optimizing queries."
                )
        
        return report

def continuous_monitoring():
    """継続的監視"""
    monitor = MongoDBMonitor()
    
    while True:
        try:
            # 健康診断実行
            health_report = monitor.generate_health_report()
            
            # 結果をファイルに保存
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            with open(f"/var/log/mongodb/health_report_{timestamp}.json", "w") as f:
                json.dump(health_report, f, indent=2, default=str)
            
            # アラート判定
            if health_report["recommendations"]:
                print(f"[ALERT] {datetime.now()}: Found {len(health_report['recommendations'])} recommendations")
                for rec in health_report["recommendations"]:
                    print(f"  - {rec}")
            
            # コンソール出力
            server_status = health_report["server_status"]
            if "connections" in server_status:
                print(f"[INFO] Connections: {server_status['connections']['current']}")
            if "opcounters" in server_status:
                ops = server_status["opcounters"]
                print(f"[INFO] Operations - Insert: {ops['insert']}, Query: {ops['query']}, Update: {ops['update']}")
            
            time.sleep(60)  # 1分間隔
            
        except KeyboardInterrupt:
            print("Monitoring stopped by user")
            break
        except Exception as e:
            print(f"Monitoring error: {e}")
            time.sleep(30)

if __name__ == "__main__":
    continuous_monitoring()
EOF

chmod +x mongodb_monitor.py

# パフォーマンス最適化スクリプト
cat > optimize_mongodb.py << 'EOF'
#!/usr/bin/env python3
import pymongo
from pymongo import MongoClient

def optimize_mongodb():
    """MongoDB最適化実行"""
    client = MongoClient("mongodb://localhost:27017/")
    
    # インデックス分析と最適化
    for db_name in client.list_database_names():
        if db_name not in ["admin", "config", "local"]:
            db = client[db_name]
            
            for collection_name in db.list_collection_names():
                collection = db[collection_name]
                
                print(f"\n=== {db_name}.{collection_name} ===")
                
                # 既存インデックス確認
                indexes = list(collection.list_indexes())
                print(f"Existing indexes: {len(indexes)}")
                for idx in indexes:
                    print(f"  - {idx['name']}: {idx['key']}")
                
                # インデックス使用統計
                try:
                    index_stats = list(collection.aggregate([{"$indexStats": {}}]))
                    for stat in index_stats:
                        usage = stat["accesses"]["ops"]
                        print(f"  Index {stat['name']} usage: {usage} operations")
                        
                        # 未使用インデックスの警告
                        if usage == 0 and stat["name"] != "_id_":
                            print(f"  WARNING: Index {stat['name']} is unused")
                except:
                    pass
                
                # コレクション統計
                try:
                    stats = collection.stats()
                    print(f"  Documents: {stats['count']}")
                    print(f"  Storage size: {stats['storageSize'] / 1024 / 1024:.2f} MB")
                    print(f"  Index size: {stats['totalIndexSize'] / 1024 / 1024:.2f} MB")
                except:
                    pass

if __name__ == "__main__":
    optimize_mongodb()
EOF

chmod +x optimize_mongodb.py

# 自動バックアップスクリプト
cat > mongodb_backup.sh << 'EOF'
#!/bin/bash

# 設定
BACKUP_DIR="/backup/mongodb"
DATE=$(date +%Y%m%d_%H%M%S)
RETENTION_DAYS=7
MONGO_URI="mongodb://localhost:27017"

# バックアップディレクトリ作成
mkdir -p $BACKUP_DIR

# データベースバックアップ
echo "MongoDB backup started: $(date)"

mongodump --uri="$MONGO_URI" --out="$BACKUP_DIR/backup_$DATE"

# 圧縮
tar -czf "$BACKUP_DIR/backup_$DATE.tar.gz" -C "$BACKUP_DIR" "backup_$DATE"
rm -rf "$BACKUP_DIR/backup_$DATE"

# 古いバックアップ削除
find $BACKUP_DIR -name "backup_*.tar.gz" -mtime +$RETENTION_DAYS -delete

echo "MongoDB backup completed: $(date)"
EOF

chmod +x mongodb_backup.sh

# cron設定(毎日午前2時にバックアップ)
echo "0 2 * * * /path/to/mongodb_backup.sh" | crontab -

echo "MongoDB監視・最適化環境のセットアップが完了しました"

Atlas統合とクラウド運用

# MongoDB Atlas CLI セットアップと運用管理

# Atlas CLI インストール(macOS)
brew install mongodb-atlas-cli

# Atlas CLI インストール(Linux)
curl -fLo atlas-linux-x86_64.tar.gz https://fastdl.mongodb.org/mongocli/mongodb-atlas-cli_1.14.0_linux_x86_64.tar.gz
tar -xzf atlas-linux-x86_64.tar.gz
sudo install atlas-linux-x86_64/atlas /usr/local/bin/

# Atlas認証設定
atlas auth login

# 新しいAtlasプロジェクト作成
atlas projects create "MyNewProject"

# クラスター作成(M10、AWS東京リージョン)
atlas clusters create myCluster \
  --provider AWS \
  --region AP_NORTHEAST_1 \
  --tier M10 \
  --diskSizeGB 10 \
  --mdbVersion 8.0

# データベースユーザー作成
atlas dbusers create \
  --username appUser \
  --password securePassword123 \
  --role readWrite \
  --projectId $(atlas projects list --output json | jq -r '.[0].id')

# IPアクセスリスト設定
atlas accessLists create \
  --type ipAddress \
  --value "0.0.0.0/0" \
  --comment "Allow all IPs (for development only)"

# 接続文字列取得
atlas clusters connectionStrings describe myCluster

# Atlas監視・アラート設定
cat > atlas_monitoring.py << 'EOF'
#!/usr/bin/env python3
import requests
import json
import base64
from datetime import datetime, timedelta

class AtlasMonitoring:
    def __init__(self, public_key, private_key, group_id):
        self.public_key = public_key
        self.private_key = private_key
        self.group_id = group_id
        self.base_url = "https://cloud.mongodb.com/api/atlas/v1.0"
        
        # 認証ヘッダー準備
        credentials = f"{public_key}:{private_key}"
        encoded_credentials = base64.b64encode(credentials.encode()).decode()
        self.headers = {
            "Authorization": f"Basic {encoded_credentials}",
            "Content-Type": "application/json"
        }
    
    def get_cluster_metrics(self, cluster_name):
        """クラスターメトリクス取得"""
        # 過去1時間のメトリクス取得
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(hours=1)
        
        params = {
            "granularity": "PT1M",  # 1分間隔
            "period": f"{start_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}:{end_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}"
        }
        
        # CPU使用率
        cpu_url = f"{self.base_url}/groups/{self.group_id}/processes/{cluster_name}/measurements/PROCESS_CPU_USER"
        cpu_response = requests.get(cpu_url, headers=self.headers, params=params)
        
        # メモリ使用率
        memory_url = f"{self.base_url}/groups/{self.group_id}/processes/{cluster_name}/measurements/MEMORY_RESIDENT"
        memory_response = requests.get(memory_url, headers=self.headers, params=params)
        
        # 接続数
        connections_url = f"{self.base_url}/groups/{self.group_id}/processes/{cluster_name}/measurements/CONNECTIONS"
        connections_response = requests.get(connections_url, headers=self.headers, params=params)
        
        return {
            "cpu": cpu_response.json() if cpu_response.status_code == 200 else None,
            "memory": memory_response.json() if memory_response.status_code == 200 else None,
            "connections": connections_response.json() if connections_response.status_code == 200 else None
        }
    
    def create_alert(self, cluster_name, metric_name, threshold):
        """アラート作成"""
        alert_config = {
            "typeName": "HOST_METRIC",
            "metricThreshold": {
                "metricName": metric_name,
                "operator": "GREATER_THAN",
                "threshold": threshold,
                "units": "RAW",
                "mode": "AVERAGE"
            },
            "eventTypeName": f"{metric_name}_HIGH",
            "enabled": True,
            "notifications": [
                {
                    "typeName": "EMAIL",
                    "emailAddress": "[email protected]",
                    "intervalMin": 60,
                    "delayMin": 0
                }
            ]
        }
        
        url = f"{self.base_url}/groups/{self.group_id}/alertConfigs"
        response = requests.post(url, headers=self.headers, data=json.dumps(alert_config))
        
        return response.json() if response.status_code == 201 else None
    
    def get_performance_advisor_suggestions(self, cluster_name):
        """Performance Advisor提案取得"""
        url = f"{self.base_url}/groups/{self.group_id}/processes/{cluster_name}/performanceAdvisor/suggestedIndexes"
        response = requests.get(url, headers=self.headers)
        
        return response.json() if response.status_code == 200 else None

# Atlas Data API使用例
def atlas_data_api_example():
    """Atlas Data API使用例"""
    import requests
    
    # Data API設定
    data_api_url = "https://data.mongodb-api.com/app/data-xxxxx/endpoint/data/v1"
    api_key = "your-api-key"
    
    headers = {
        "Content-Type": "application/json",
        "api-key": api_key
    }
    
    # ドキュメント検索
    search_payload = {
        "collection": "products",
        "database": "ecommerce_db",
        "filter": {"category": "laptop"},
        "limit": 10
    }
    
    response = requests.post(
        f"{data_api_url}/action/find",
        headers=headers,
        data=json.dumps(search_payload)
    )
    
    if response.status_code == 200:
        results = response.json()
        print(f"Found {len(results['documents'])} products")
        return results['documents']
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return None

# Atlas Search インデックス作成例
def create_atlas_search_index():
    """Atlas Search インデックス作成"""
    search_index_definition = {
        "name": "products_search",
        "definition": {
            "mappings": {
                "dynamic": False,
                "fields": {
                    "name": {
                        "type": "string",
                        "analyzer": "lucene.standard"
                    },
                    "description": {
                        "type": "string",
                        "analyzer": "lucene.english"
                    },
                    "tags": {
                        "type": "string",
                        "analyzer": "lucene.keyword"
                    },
                    "price": {
                        "type": "number"
                    },
                    "specifications": {
                        "type": "document",
                        "dynamic": True
                    }
                }
            }
        }
    }
    
    print("Atlas Search index definition:")
    print(json.dumps(search_index_definition, indent=2))

if __name__ == "__main__":
    # Atlas API例
    # atlas_monitor = AtlasMonitoring("public_key", "private_key", "group_id")
    # metrics = atlas_monitor.get_cluster_metrics("cluster_name")
    # print(json.dumps(metrics, indent=2))
    
    # Data API例
    # products = atlas_data_api_example()
    
    # Search Index例
    create_atlas_search_index()
EOF

chmod +x atlas_monitoring.py

# Terraform でのAtlas Infrastructure as Code
cat > atlas_terraform.tf << 'EOF'
terraform {
  required_providers {
    mongodbatlas = {
      source = "mongodb/mongodbatlas"
      version = "~> 1.0"
    }
  }
}

provider "mongodbatlas" {
  public_key  = var.atlas_public_key
  private_key = var.atlas_private_key
}

# プロジェクト作成
resource "mongodbatlas_project" "ecommerce" {
  name   = "ecommerce-project"
  org_id = var.atlas_org_id
}

# クラスター作成
resource "mongodbatlas_cluster" "ecommerce_cluster" {
  project_id   = mongodbatlas_project.ecommerce.id
  name         = "ecommerce-cluster"
  
  cluster_type = "REPLICASET"
  
  provider_name               = "AWS"
  backing_provider_name       = "AWS"
  provider_region_name        = "AP_NORTHEAST_1"
  provider_instance_size_name = "M10"
  
  mongo_db_major_version = "8.0"
  
  auto_scaling_disk_gb_enabled = true
  
  advanced_configuration {
    javascript_enabled           = true
    minimum_enabled_tls_protocol = "TLS1_2"
    no_table_scan               = false
    oplog_size_mb               = 2048
    sample_size_bi_connector    = 110
    sample_refresh_interval_bi_connector = 310
  }
}

# データベースユーザー作成
resource "mongodbatlas_database_user" "app_user" {
  username           = "appUser"
  password           = var.atlas_user_password
  project_id         = mongodbatlas_project.ecommerce.id
  auth_database_name = "admin"

  roles {
    role_name     = "readWrite"
    database_name = "ecommerce_db"
  }
}

# IPアクセスリスト
resource "mongodbatlas_project_ip_access_list" "development" {
  project_id = mongodbatlas_project.ecommerce.id
  cidr_block = "0.0.0.0/0"
  comment    = "Development environment access"
}

# アラート設定
resource "mongodbatlas_alert_configuration" "high_cpu" {
  project_id = mongodbatlas_project.ecommerce.id
  event_type = "OUTSIDE_METRIC_THRESHOLD"
  enabled    = true

  metric_threshold_config {
    metric_name = "PROCESS_CPU_USER"
    operator    = "GREATER_THAN"
    threshold   = 80.0
    units       = "RAW"
    mode        = "AVERAGE"
  }

  notification {
    type_name     = "EMAIL"
    email_address = "[email protected]"
    interval_min  = 30
    delay_min     = 0
  }
}

# 出力
output "cluster_connection_string" {
  value = mongodbatlas_cluster.ecommerce_cluster.connection_strings[0].standard_srv
  sensitive = true
}
EOF

echo "MongoDB Atlas統合環境のセットアップが完了しました"