MongoDB
ドキュメント指向のNoSQLデータベース。JSONライクなドキュメント形式でデータを格納。スケーラビリティと開発者フレンドリーな設計が特徴。
データベースサーバー
MongoDB
概要
MongoDBは、2009年に10gen(現MongoDB Inc.)によって開発された、ドキュメント指向NoSQLデータベースです。JSONライクなBSONフォーマットでデータを格納し、柔軟なスキーマ設計、自動シャーディング、レプリカセットによる高可用性を実現する、現代のアプリケーション開発に最適化されたデータベースシステムです。MongoDB 8.0(2024年リリース)では、25%の性能向上、Queryable Encryptionの強化、バックグラウンド圧縮機能、時系列データの処理性能が60%向上するなど、大幅な進化を遂げています。MongoDB Atlasクラウドサービスとの組み合わせにより、グローバル規模での自動スケーリング、セキュリティ強化、マルチクラウド対応を実現し、スタートアップから大企業まで幅広く採用されています。開発者フレンドリーなAPIとリッチなエコシステムにより、迅速なアプリケーション開発と運用を支援します。
詳細
MongoDB 8.0は、2025年版として従来のNoSQLデータベースの限界を突破した包括的なデータプラットフォームへと進化しています。最新版では、リアルタイム分析、機械学習統合、暗号化クエリ(Queryable Encryption)、時系列データ最適化、ベクトル検索によるAI対応が大幅に強化されています。MongoDB Atlasでは、自動スケーリングが50%高速化、リアルタイムリソース対応が5倍高速化、Atlas Searchによる全文検索とベクトル検索の統合により、現代のデータドリブンアプリケーションに必要な全ての機能を提供します。また、Relational Migratorによる既存RDBからの移行支援、Atlas Data Federationによるデータレイク統合、Charts・Compass・Ops Managerなどの包括的な運用ツールにより、エンタープライズレベルの要求に応えます。ACID準拠のマルチドキュメントトランザクション、分散アーキテクチャによる無制限スケーラビリティ、Change Streamsによるリアルタイムデータ変更監視により、ミッションクリティカルなシステムの基盤として機能します。
主な特徴
- ドキュメント指向: JSONライクなBSONによる直感的で柔軟なデータモデリング
- 自動シャーディング: データ分散とクエリ負荷の自動バランシング
- 高可用性: レプリカセットによる自動フェイルオーバーとデータ冗長化
- ACID準拠: マルチドキュメントトランザクションによる強一貫性
- Atlas Cloud: マネージドサービスによる運用負荷ゼロとグローバル展開
- リアルタイム分析: 集約パイプラインとTime Seriesによる高速データ処理
メリット・デメリット
メリット
- リレーショナルDBより30-50%高速な読み書き性能と水平スケーラビリティ
- 柔軟なスキーマ設計によるアジャイル開発とマイクロサービス対応
- Atlas Cloudでの自動スケーリング・バックアップ・セキュリティ運用
- 豊富なプログラミング言語ドライバとフレームワーク統合
- Change Streamsによるリアルタイムデータ変更の検知・配信
- 地理空間データ、全文検索、時系列データの組み込みサポート
デメリット
- 複雑なJOIN操作やトランザクション処理ではRDBが優位
- メモリ使用量がRDBMSより多く、ストレージコストの増大
- 分散システムの複雑性による運用・デバッグの学習コスト
- スキーマレス設計による潜在的なデータ整合性リスク
- MongoDB AtlasのライセンスコストとCloud Provider依存
- ACID保証のパフォーマンスオーバーヘッドと設計トレードオフ
参考ページ
書き方の例
インストールと基本セットアップ
# Ubuntu/Debian系でのMongoDB Community Edition インストール
# MongoDB公式GPGキーのインポート
wget -qO - https://www.mongodb.org/static/pgp/server-8.0.asc | sudo apt-key add -
# MongoDB リポジトリの追加
echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu $(lsb_release -cs)/mongodb-org/8.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-8.0.list
# パッケージデータベースの更新とMongoDB インストール
sudo apt-get update
sudo apt-get install -y mongodb-org
# MongoDB サービスの開始と自動起動設定
sudo systemctl enable mongod
sudo systemctl start mongod
sudo systemctl status mongod
# Docker Composeを使用したMongoDB環境構築
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
mongodb:
image: mongo:8.0
container_name: mongodb
restart: unless-stopped
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: password123
MONGO_INITDB_DATABASE: myapp
ports:
- "27017:27017"
volumes:
- mongodb_data:/data/db
- mongodb_config:/data/configdb
- ./mongod.conf:/etc/mongod.conf
command: --config /etc/mongod.conf
mongo-express:
image: mongo-express:latest
container_name: mongo-express
restart: unless-stopped
ports:
- "8081:8081"
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: admin
ME_CONFIG_MONGODB_ADMINPASSWORD: password123
ME_CONFIG_MONGODB_URL: mongodb://admin:password123@mongodb:27017/
ME_CONFIG_BASICAUTH: false
depends_on:
- mongodb
volumes:
mongodb_data:
driver: local
mongodb_config:
driver: local
EOF
# MongoDB設定ファイル作成
cat > mongod.conf << 'EOF'
# MongoDB 8.0 最適化設定
storage:
dbPath: /data/db
journal:
enabled: true
wiredTiger:
engineConfig:
cacheSizeGB: 2
journalCompressor: snappy
collectionConfig:
blockCompressor: snappy
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod.log
logRotate: rename
net:
port: 27017
bindIp: 0.0.0.0
processManagement:
timeZoneInfo: /usr/share/zoneinfo
security:
authorization: enabled
operationProfiling:
slowOpThresholdMs: 100
mode: slowOp
EOF
# サービス起動
docker-compose up -d
# 接続確認
mongosh --host localhost --port 27017 --username admin --password password123
# MongoDB Atlasクラスターへの接続
mongosh "mongodb+srv://cluster0.xxxxx.mongodb.net/myFirstDatabase" --apiVersion 1 --username <username>
基本的なデータベース操作とCRUD
// MongoDB Shell (mongosh) での基本操作
// データベース作成・切り替え
use ecommerce_db
// コレクション作成とドキュメント挿入
db.products.insertMany([
{
name: "MacBook Pro 16-inch",
category: "laptop",
price: 2499.99,
specifications: {
cpu: "Apple M3 Max",
memory: "32GB",
storage: "1TB SSD",
screen: "16.2-inch Liquid Retina XDR"
},
tags: ["apple", "professional", "high-performance"],
inventory: {
stock: 15,
warehouse: "Tokyo",
supplier: "Apple Inc."
},
reviews: [
{
user: "tech_enthusiast",
rating: 5,
comment: "Outstanding performance for development work",
date: new Date("2024-01-15")
}
],
created_at: new Date(),
updated_at: new Date()
},
{
name: "Dell XPS 13",
category: "laptop",
price: 1299.99,
specifications: {
cpu: "Intel Core i7-1360P",
memory: "16GB",
storage: "512GB SSD",
screen: "13.4-inch FHD+"
},
tags: ["dell", "portable", "business"],
inventory: {
stock: 8,
warehouse: "Osaka",
supplier: "Dell Technologies"
},
reviews: [],
created_at: new Date(),
updated_at: new Date()
}
])
// 基本的なクエリ操作
// 全件取得
db.products.find().pretty()
// 条件検索
db.products.find({ category: "laptop" })
// 価格範囲での検索
db.products.find({
price: { $gte: 1000, $lte: 2000 }
})
// 複合条件検索
db.products.find({
$and: [
{ category: "laptop" },
{ "inventory.stock": { $gte: 10 } }
]
})
// テキスト検索のためのインデックス作成
db.products.createIndex({
name: "text",
"specifications.cpu": "text",
tags: "text"
})
// テキスト検索実行
db.products.find({
$text: { $search: "Apple MacBook" }
})
// 配列要素の検索
db.products.find({
tags: { $in: ["apple", "professional"] }
})
// ネストされたオブジェクトの検索
db.products.find({
"specifications.memory": "32GB"
})
// ドキュメント更新
db.products.updateOne(
{ name: "MacBook Pro 16-inch" },
{
$set: { price: 2399.99, updated_at: new Date() },
$push: {
reviews: {
user: "developer_pro",
rating: 5,
comment: "Perfect for iOS development",
date: new Date()
}
}
}
)
// 配列要素の更新
db.products.updateOne(
{
name: "MacBook Pro 16-inch",
"reviews.user": "tech_enthusiast"
},
{
$set: { "reviews.$.rating": 4 }
}
)
// upsert操作(存在しない場合は作成)
db.products.updateOne(
{ name: "iPad Pro 12.9-inch" },
{
$set: {
category: "tablet",
price: 1099.99,
created_at: new Date()
}
},
{ upsert: true }
)
// ドキュメント削除
db.products.deleteOne({ name: "iPad Pro 12.9-inch" })
// 条件に合う複数ドキュメントの削除
db.products.deleteMany({ "inventory.stock": { $lt: 5 } })
Python統合とアプリケーション開発
import pymongo
from pymongo import MongoClient
from datetime import datetime, timezone
import certifi
from bson.objectid import ObjectId
import json
class MongoDBManager:
def __init__(self, connection_string=None, database_name="ecommerce_db"):
"""MongoDB接続管理クラス"""
if connection_string:
# MongoDB Atlas接続
self.client = MongoClient(
connection_string,
tlsCAFile=certifi.where()
)
else:
# ローカルMongoDB接続
self.client = MongoClient(
"mongodb://admin:password123@localhost:27017/",
authSource='admin'
)
self.db = self.client[database_name]
self.products = self.db.products
self.users = self.db.users
self.orders = self.db.orders
def test_connection(self):
"""接続テスト"""
try:
self.client.admin.command('ping')
print("MongoDB connection successful!")
return True
except Exception as e:
print(f"MongoDB connection failed: {e}")
return False
def create_indexes(self):
"""インデックス作成"""
# 商品検索用インデックス
self.products.create_index([
("name", pymongo.TEXT),
("tags", pymongo.TEXT),
("specifications.cpu", pymongo.TEXT)
])
# カテゴリ・価格での効率的検索
self.products.create_index([
("category", 1),
("price", 1)
])
# 在庫管理用インデックス
self.products.create_index("inventory.stock")
# ユーザー管理用インデックス
self.users.create_index("email", unique=True)
# 注文履歴用インデックス
self.orders.create_index([
("user_id", 1),
("order_date", -1)
])
print("Indexes created successfully")
def insert_product(self, product_data):
"""商品データ挿入"""
try:
product_data['created_at'] = datetime.now(timezone.utc)
product_data['updated_at'] = datetime.now(timezone.utc)
result = self.products.insert_one(product_data)
return result.inserted_id
except Exception as e:
print(f"Product insertion error: {e}")
return None
def search_products(self, query=None, category=None, price_range=None, limit=10):
"""商品検索"""
filter_conditions = {}
# テキスト検索
if query:
filter_conditions['$text'] = {'$search': query}
# カテゴリフィルタ
if category:
filter_conditions['category'] = category
# 価格範囲フィルタ
if price_range:
filter_conditions['price'] = {
'$gte': price_range.get('min', 0),
'$lte': price_range.get('max', float('inf'))
}
# 検索実行
cursor = self.products.find(filter_conditions).limit(limit)
if query:
# テキスト検索の場合、関連度でソート
cursor = cursor.sort([('score', {'$meta': 'textScore'})])
else:
# 通常検索の場合、価格でソート
cursor = cursor.sort("price", 1)
return list(cursor)
def get_product_analytics(self):
"""商品分析(集約パイプライン使用)"""
pipeline = [
# カテゴリ別の統計
{
'$group': {
'_id': '$category',
'product_count': {'$sum': 1},
'avg_price': {'$avg': '$price'},
'max_price': {'$max': '$price'},
'min_price': {'$min': '$price'},
'total_stock': {'$sum': '$inventory.stock'}
}
},
# 価格で降順ソート
{
'$sort': {'avg_price': -1}
},
# 結果の整形
{
'$project': {
'category': '$_id',
'product_count': 1,
'avg_price': {'$round': ['$avg_price', 2]},
'max_price': 1,
'min_price': 1,
'total_stock': 1,
'_id': 0
}
}
]
return list(self.products.aggregate(pipeline))
def update_inventory(self, product_id, quantity_change):
"""在庫更新"""
try:
result = self.products.update_one(
{'_id': ObjectId(product_id)},
{
'$inc': {'inventory.stock': quantity_change},
'$set': {'updated_at': datetime.now(timezone.utc)}
}
)
return result.modified_count > 0
except Exception as e:
print(f"Inventory update error: {e}")
return False
def create_order(self, user_id, items):
"""注文作成(トランザクション使用)"""
with self.client.start_session() as session:
with session.start_transaction():
try:
# 在庫確認と減算
total_amount = 0
order_items = []
for item in items:
product = self.products.find_one(
{'_id': ObjectId(item['product_id'])},
session=session
)
if not product:
raise Exception(f"Product {item['product_id']} not found")
if product['inventory']['stock'] < item['quantity']:
raise Exception(f"Insufficient stock for {product['name']}")
# 在庫減算
self.products.update_one(
{'_id': ObjectId(item['product_id'])},
{'$inc': {'inventory.stock': -item['quantity']}},
session=session
)
item_total = product['price'] * item['quantity']
total_amount += item_total
order_items.append({
'product_id': item['product_id'],
'product_name': product['name'],
'price': product['price'],
'quantity': item['quantity'],
'subtotal': item_total
})
# 注文作成
order = {
'user_id': user_id,
'items': order_items,
'total_amount': total_amount,
'status': 'pending',
'order_date': datetime.now(timezone.utc),
'created_at': datetime.now(timezone.utc)
}
result = self.orders.insert_one(order, session=session)
# トランザクションコミット
session.commit_transaction()
return result.inserted_id
except Exception as e:
# トランザクションロールバック
session.abort_transaction()
print(f"Order creation failed: {e}")
return None
# 使用例とパフォーマンステスト
def performance_test():
"""パフォーマンステスト"""
import time
# Atlas接続の場合
# connection_string = "mongodb+srv://username:[email protected]/"
# db_manager = MongoDBManager(connection_string)
# ローカル接続の場合
db_manager = MongoDBManager()
if not db_manager.test_connection():
return
# インデックス作成
db_manager.create_indexes()
# 大量データ挿入テスト
print("大量データ挿入テスト開始...")
start_time = time.time()
products_data = []
categories = ["laptop", "smartphone", "tablet", "desktop", "accessory"]
for i in range(1000):
product = {
"name": f"Product {i}",
"category": categories[i % len(categories)],
"price": round(100 + (i * 10.5), 2),
"specifications": {
"cpu": f"Processor {i % 20}",
"memory": f"{8 + (i % 4) * 8}GB",
"storage": f"{256 + (i % 4) * 256}GB"
},
"tags": [f"tag{i % 10}", f"feature{i % 15}"],
"inventory": {
"stock": i % 50,
"warehouse": f"Warehouse {i % 5}"
},
"reviews": []
}
products_data.append(product)
# バッチ挿入
result = db_manager.products.insert_many(products_data)
insertion_time = time.time() - start_time
print(f"1000件挿入時間: {insertion_time:.2f}秒")
# 検索パフォーマンステスト
start_time = time.time()
results = db_manager.search_products(
query="Processor",
category="laptop",
price_range={"min": 500, "max": 1500}
)
search_time = time.time() - start_time
print(f"複合検索時間: {search_time:.4f}秒, 結果数: {len(results)}")
# 集約パフォーマンステスト
start_time = time.time()
analytics = db_manager.get_product_analytics()
aggregation_time = time.time() - start_time
print(f"集約処理時間: {aggregation_time:.4f}秒")
print("カテゴリ別分析結果:")
for category in analytics:
print(f" {category['category']}: {category['product_count']}商品, 平均価格: ${category['avg_price']}")
if __name__ == "__main__":
# MongoDB管理オブジェクト作成
db_manager = MongoDBManager()
try:
# 接続テスト
if not db_manager.test_connection():
exit(1)
# サンプル商品データ挿入
sample_product = {
"name": "Apple iPhone 15 Pro",
"category": "smartphone",
"price": 999.99,
"specifications": {
"cpu": "A17 Pro chip",
"memory": "8GB",
"storage": "128GB",
"screen": "6.1-inch Super Retina XDR"
},
"tags": ["apple", "5g", "professional"],
"inventory": {
"stock": 25,
"warehouse": "Tokyo",
"supplier": "Apple Inc."
},
"reviews": []
}
product_id = db_manager.insert_product(sample_product)
print(f"商品挿入完了: {product_id}")
# 商品検索テスト
search_results = db_manager.search_products(query="iPhone", category="smartphone")
print(f"検索結果: {len(search_results)}件")
# 分析レポート
analytics = db_manager.get_product_analytics()
print(f"カテゴリ分析: {len(analytics)}カテゴリ")
# パフォーマンステスト実行
performance_test()
except Exception as e:
print(f"エラーが発生しました: {e}")
finally:
db_manager.client.close()
レプリカセットとシャーディング設定
# レプリカセット構成(3ノード)の設定
# ノード1の設定
sudo tee /etc/mongod1.conf << 'EOF'
storage:
dbPath: /data/db1
journal:
enabled: true
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod1.log
net:
port: 27017
bindIp: 0.0.0.0
replication:
replSetName: "rs0"
security:
authorization: enabled
keyFile: /opt/mongodb/keyfile
EOF
# ノード2の設定
sudo tee /etc/mongod2.conf << 'EOF'
storage:
dbPath: /data/db2
journal:
enabled: true
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod2.log
net:
port: 27018
bindIp: 0.0.0.0
replication:
replSetName: "rs0"
security:
authorization: enabled
keyFile: /opt/mongodb/keyfile
EOF
# ノード3の設定
sudo tee /etc/mongod3.conf << 'EOF'
storage:
dbPath: /data/db3
journal:
enabled: true
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod3.log
net:
port: 27019
bindIp: 0.0.0.0
replication:
replSetName: "rs0"
security:
authorization: enabled
keyFile: /opt/mongodb/keyfile
EOF
# キーファイル作成(レプリカセット認証用)
sudo mkdir -p /opt/mongodb
openssl rand -base64 756 | sudo tee /opt/mongodb/keyfile
sudo chmod 400 /opt/mongodb/keyfile
sudo chown mongodb:mongodb /opt/mongodb/keyfile
# データディレクトリ作成
sudo mkdir -p /data/db{1,2,3}
sudo chown -R mongodb:mongodb /data/db{1,2,3}
# 各ノードでMongoDB起動
sudo mongod --config /etc/mongod1.conf --fork
sudo mongod --config /etc/mongod2.conf --fork
sudo mongod --config /etc/mongod3.conf --fork
# レプリカセット初期化
mongosh --port 27017 << 'EOF'
rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "localhost:27017", priority: 2 },
{ _id: 1, host: "localhost:27018", priority: 1 },
{ _id: 2, host: "localhost:27019", priority: 1 }
]
})
// レプリカセット状態確認
rs.status()
// 管理者ユーザー作成
use admin
db.createUser({
user: "admin",
pwd: "securePassword123",
roles: ["userAdminAnyDatabase", "clusterAdmin"]
})
EOF
# シャーディング環境の構築
# Config Server設定
sudo tee /etc/mongod-config.conf << 'EOF'
storage:
dbPath: /data/configdb
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod-config.log
net:
port: 27019
bindIp: 0.0.0.0
sharding:
clusterRole: configsvr
replication:
replSetName: "configReplSet"
security:
authorization: enabled
keyFile: /opt/mongodb/keyfile
EOF
# Shard Server設定
sudo tee /etc/mongod-shard1.conf << 'EOF'
storage:
dbPath: /data/shard1
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod-shard1.log
net:
port: 27018
bindIp: 0.0.0.0
sharding:
clusterRole: shardsvr
replication:
replSetName: "shard1ReplSet"
security:
authorization: enabled
keyFile: /opt/mongodb/keyfile
EOF
# Mongos Router設定
sudo tee /etc/mongos.conf << 'EOF'
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongos.log
net:
port: 27017
bindIp: 0.0.0.0
sharding:
configDB: "configReplSet/localhost:27019"
security:
keyFile: /opt/mongodb/keyfile
EOF
# シャーディング有効化
mongosh --port 27017 << 'EOF'
// シャード追加
sh.addShard("shard1ReplSet/localhost:27018")
// データベースのシャーディング有効化
sh.enableSharding("ecommerce_db")
// コレクションのシャーディング
sh.shardCollection("ecommerce_db.products", { "category": 1, "_id": 1 })
// シャーディング状態確認
sh.status()
EOF
監視とパフォーマンス最適化
# MongoDB監視設定とパフォーマンスチューニング
# プロファイリング有効化
mongosh << 'EOF'
// 全体的なプロファイリング設定
use admin
db.runCommand({
setParameter: 1,
slowOpThresholdMs: 100
})
// 特定データベースでのプロファイリング
use ecommerce_db
db.setProfilingLevel(2) // 全操作をプロファイル
// スロークエリ確認
db.system.profile.find().limit(5).sort({ ts: -1 }).pretty()
// 統計情報取得
db.runCommand({ serverStatus: 1 })
db.runCommand({ dbStats: 1 })
db.products.stats()
EOF
# MongoDB Ops Manager風監視スクリプト
cat > mongodb_monitor.py << 'EOF'
#!/usr/bin/env python3
import pymongo
import time
import json
import os
from datetime import datetime
class MongoDBMonitor:
def __init__(self, connection_string="mongodb://localhost:27017/"):
self.client = pymongo.MongoClient(connection_string)
self.admin_db = self.client.admin
def get_server_status(self):
"""サーバー状態取得"""
try:
status = self.admin_db.command("serverStatus")
return {
"uptime": status["uptime"],
"connections": status["connections"],
"opcounters": status["opcounters"],
"memory": status["mem"],
"network": status["network"],
"wiredTiger": status.get("wiredTiger", {}),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {"error": str(e)}
def get_database_stats(self):
"""データベース統計"""
stats = []
for db_name in self.client.list_database_names():
if db_name not in ["admin", "config", "local"]:
try:
db_stats = self.client[db_name].command("dbStats")
stats.append({
"database": db_name,
"collections": db_stats["collections"],
"dataSize": db_stats["dataSize"],
"storageSize": db_stats["storageSize"],
"indexes": db_stats["indexes"],
"indexSize": db_stats["indexSize"]
})
except Exception as e:
stats.append({
"database": db_name,
"error": str(e)
})
return stats
def get_replication_status(self):
"""レプリケーション状態"""
try:
rs_status = self.admin_db.command("replSetGetStatus")
return {
"set": rs_status["set"],
"members": [{
"name": member["name"],
"state": member["stateStr"],
"health": member["health"],
"uptime": member.get("uptime", 0),
"lag": member.get("optimeDate", datetime.now()) if "optimeDate" in member else None
} for member in rs_status["members"]]
}
except Exception:
return {"replication": "Not configured or not primary"}
def analyze_slow_queries(self, db_name="ecommerce_db"):
"""スロークエリ分析"""
try:
db = self.client[db_name]
slow_queries = list(db.system.profile.find().sort("ts", -1).limit(10))
analysis = []
for query in slow_queries:
analysis.append({
"timestamp": query.get("ts"),
"operation": query.get("op"),
"namespace": query.get("ns"),
"duration_ms": query.get("millis"),
"command": query.get("command", {}),
"execution_stats": query.get("execStats", {})
})
return analysis
except Exception as e:
return {"error": str(e)}
def generate_health_report(self):
"""健康診断レポート生成"""
report = {
"timestamp": datetime.now().isoformat(),
"server_status": self.get_server_status(),
"database_stats": self.get_database_stats(),
"replication_status": self.get_replication_status(),
"slow_queries": self.analyze_slow_queries(),
"recommendations": []
}
# 推奨事項生成
server_status = report["server_status"]
if "connections" in server_status:
current_connections = server_status["connections"]["current"]
if current_connections > 80: # 80%以上の接続使用率
report["recommendations"].append(
"High connection usage detected. Consider connection pooling."
)
if "memory" in server_status:
resident_mb = server_status["memory"]["resident"]
if resident_mb > 4000: # 4GB以上のメモリ使用
report["recommendations"].append(
"High memory usage. Consider increasing available memory or optimizing queries."
)
return report
def continuous_monitoring():
"""継続的監視"""
monitor = MongoDBMonitor()
while True:
try:
# 健康診断実行
health_report = monitor.generate_health_report()
# 結果をファイルに保存
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
with open(f"/var/log/mongodb/health_report_{timestamp}.json", "w") as f:
json.dump(health_report, f, indent=2, default=str)
# アラート判定
if health_report["recommendations"]:
print(f"[ALERT] {datetime.now()}: Found {len(health_report['recommendations'])} recommendations")
for rec in health_report["recommendations"]:
print(f" - {rec}")
# コンソール出力
server_status = health_report["server_status"]
if "connections" in server_status:
print(f"[INFO] Connections: {server_status['connections']['current']}")
if "opcounters" in server_status:
ops = server_status["opcounters"]
print(f"[INFO] Operations - Insert: {ops['insert']}, Query: {ops['query']}, Update: {ops['update']}")
time.sleep(60) # 1分間隔
except KeyboardInterrupt:
print("Monitoring stopped by user")
break
except Exception as e:
print(f"Monitoring error: {e}")
time.sleep(30)
if __name__ == "__main__":
continuous_monitoring()
EOF
chmod +x mongodb_monitor.py
# パフォーマンス最適化スクリプト
cat > optimize_mongodb.py << 'EOF'
#!/usr/bin/env python3
import pymongo
from pymongo import MongoClient
def optimize_mongodb():
"""MongoDB最適化実行"""
client = MongoClient("mongodb://localhost:27017/")
# インデックス分析と最適化
for db_name in client.list_database_names():
if db_name not in ["admin", "config", "local"]:
db = client[db_name]
for collection_name in db.list_collection_names():
collection = db[collection_name]
print(f"\n=== {db_name}.{collection_name} ===")
# 既存インデックス確認
indexes = list(collection.list_indexes())
print(f"Existing indexes: {len(indexes)}")
for idx in indexes:
print(f" - {idx['name']}: {idx['key']}")
# インデックス使用統計
try:
index_stats = list(collection.aggregate([{"$indexStats": {}}]))
for stat in index_stats:
usage = stat["accesses"]["ops"]
print(f" Index {stat['name']} usage: {usage} operations")
# 未使用インデックスの警告
if usage == 0 and stat["name"] != "_id_":
print(f" WARNING: Index {stat['name']} is unused")
except:
pass
# コレクション統計
try:
stats = collection.stats()
print(f" Documents: {stats['count']}")
print(f" Storage size: {stats['storageSize'] / 1024 / 1024:.2f} MB")
print(f" Index size: {stats['totalIndexSize'] / 1024 / 1024:.2f} MB")
except:
pass
if __name__ == "__main__":
optimize_mongodb()
EOF
chmod +x optimize_mongodb.py
# 自動バックアップスクリプト
cat > mongodb_backup.sh << 'EOF'
#!/bin/bash
# 設定
BACKUP_DIR="/backup/mongodb"
DATE=$(date +%Y%m%d_%H%M%S)
RETENTION_DAYS=7
MONGO_URI="mongodb://localhost:27017"
# バックアップディレクトリ作成
mkdir -p $BACKUP_DIR
# データベースバックアップ
echo "MongoDB backup started: $(date)"
mongodump --uri="$MONGO_URI" --out="$BACKUP_DIR/backup_$DATE"
# 圧縮
tar -czf "$BACKUP_DIR/backup_$DATE.tar.gz" -C "$BACKUP_DIR" "backup_$DATE"
rm -rf "$BACKUP_DIR/backup_$DATE"
# 古いバックアップ削除
find $BACKUP_DIR -name "backup_*.tar.gz" -mtime +$RETENTION_DAYS -delete
echo "MongoDB backup completed: $(date)"
EOF
chmod +x mongodb_backup.sh
# cron設定(毎日午前2時にバックアップ)
echo "0 2 * * * /path/to/mongodb_backup.sh" | crontab -
echo "MongoDB監視・最適化環境のセットアップが完了しました"
Atlas統合とクラウド運用
# MongoDB Atlas CLI セットアップと運用管理
# Atlas CLI インストール(macOS)
brew install mongodb-atlas-cli
# Atlas CLI インストール(Linux)
curl -fLo atlas-linux-x86_64.tar.gz https://fastdl.mongodb.org/mongocli/mongodb-atlas-cli_1.14.0_linux_x86_64.tar.gz
tar -xzf atlas-linux-x86_64.tar.gz
sudo install atlas-linux-x86_64/atlas /usr/local/bin/
# Atlas認証設定
atlas auth login
# 新しいAtlasプロジェクト作成
atlas projects create "MyNewProject"
# クラスター作成(M10、AWS東京リージョン)
atlas clusters create myCluster \
--provider AWS \
--region AP_NORTHEAST_1 \
--tier M10 \
--diskSizeGB 10 \
--mdbVersion 8.0
# データベースユーザー作成
atlas dbusers create \
--username appUser \
--password securePassword123 \
--role readWrite \
--projectId $(atlas projects list --output json | jq -r '.[0].id')
# IPアクセスリスト設定
atlas accessLists create \
--type ipAddress \
--value "0.0.0.0/0" \
--comment "Allow all IPs (for development only)"
# 接続文字列取得
atlas clusters connectionStrings describe myCluster
# Atlas監視・アラート設定
cat > atlas_monitoring.py << 'EOF'
#!/usr/bin/env python3
import requests
import json
import base64
from datetime import datetime, timedelta
class AtlasMonitoring:
def __init__(self, public_key, private_key, group_id):
self.public_key = public_key
self.private_key = private_key
self.group_id = group_id
self.base_url = "https://cloud.mongodb.com/api/atlas/v1.0"
# 認証ヘッダー準備
credentials = f"{public_key}:{private_key}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
self.headers = {
"Authorization": f"Basic {encoded_credentials}",
"Content-Type": "application/json"
}
def get_cluster_metrics(self, cluster_name):
"""クラスターメトリクス取得"""
# 過去1時間のメトリクス取得
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)
params = {
"granularity": "PT1M", # 1分間隔
"period": f"{start_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}:{end_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}"
}
# CPU使用率
cpu_url = f"{self.base_url}/groups/{self.group_id}/processes/{cluster_name}/measurements/PROCESS_CPU_USER"
cpu_response = requests.get(cpu_url, headers=self.headers, params=params)
# メモリ使用率
memory_url = f"{self.base_url}/groups/{self.group_id}/processes/{cluster_name}/measurements/MEMORY_RESIDENT"
memory_response = requests.get(memory_url, headers=self.headers, params=params)
# 接続数
connections_url = f"{self.base_url}/groups/{self.group_id}/processes/{cluster_name}/measurements/CONNECTIONS"
connections_response = requests.get(connections_url, headers=self.headers, params=params)
return {
"cpu": cpu_response.json() if cpu_response.status_code == 200 else None,
"memory": memory_response.json() if memory_response.status_code == 200 else None,
"connections": connections_response.json() if connections_response.status_code == 200 else None
}
def create_alert(self, cluster_name, metric_name, threshold):
"""アラート作成"""
alert_config = {
"typeName": "HOST_METRIC",
"metricThreshold": {
"metricName": metric_name,
"operator": "GREATER_THAN",
"threshold": threshold,
"units": "RAW",
"mode": "AVERAGE"
},
"eventTypeName": f"{metric_name}_HIGH",
"enabled": True,
"notifications": [
{
"typeName": "EMAIL",
"emailAddress": "[email protected]",
"intervalMin": 60,
"delayMin": 0
}
]
}
url = f"{self.base_url}/groups/{self.group_id}/alertConfigs"
response = requests.post(url, headers=self.headers, data=json.dumps(alert_config))
return response.json() if response.status_code == 201 else None
def get_performance_advisor_suggestions(self, cluster_name):
"""Performance Advisor提案取得"""
url = f"{self.base_url}/groups/{self.group_id}/processes/{cluster_name}/performanceAdvisor/suggestedIndexes"
response = requests.get(url, headers=self.headers)
return response.json() if response.status_code == 200 else None
# Atlas Data API使用例
def atlas_data_api_example():
"""Atlas Data API使用例"""
import requests
# Data API設定
data_api_url = "https://data.mongodb-api.com/app/data-xxxxx/endpoint/data/v1"
api_key = "your-api-key"
headers = {
"Content-Type": "application/json",
"api-key": api_key
}
# ドキュメント検索
search_payload = {
"collection": "products",
"database": "ecommerce_db",
"filter": {"category": "laptop"},
"limit": 10
}
response = requests.post(
f"{data_api_url}/action/find",
headers=headers,
data=json.dumps(search_payload)
)
if response.status_code == 200:
results = response.json()
print(f"Found {len(results['documents'])} products")
return results['documents']
else:
print(f"Error: {response.status_code} - {response.text}")
return None
# Atlas Search インデックス作成例
def create_atlas_search_index():
"""Atlas Search インデックス作成"""
search_index_definition = {
"name": "products_search",
"definition": {
"mappings": {
"dynamic": False,
"fields": {
"name": {
"type": "string",
"analyzer": "lucene.standard"
},
"description": {
"type": "string",
"analyzer": "lucene.english"
},
"tags": {
"type": "string",
"analyzer": "lucene.keyword"
},
"price": {
"type": "number"
},
"specifications": {
"type": "document",
"dynamic": True
}
}
}
}
}
print("Atlas Search index definition:")
print(json.dumps(search_index_definition, indent=2))
if __name__ == "__main__":
# Atlas API例
# atlas_monitor = AtlasMonitoring("public_key", "private_key", "group_id")
# metrics = atlas_monitor.get_cluster_metrics("cluster_name")
# print(json.dumps(metrics, indent=2))
# Data API例
# products = atlas_data_api_example()
# Search Index例
create_atlas_search_index()
EOF
chmod +x atlas_monitoring.py
# Terraform でのAtlas Infrastructure as Code
cat > atlas_terraform.tf << 'EOF'
terraform {
required_providers {
mongodbatlas = {
source = "mongodb/mongodbatlas"
version = "~> 1.0"
}
}
}
provider "mongodbatlas" {
public_key = var.atlas_public_key
private_key = var.atlas_private_key
}
# プロジェクト作成
resource "mongodbatlas_project" "ecommerce" {
name = "ecommerce-project"
org_id = var.atlas_org_id
}
# クラスター作成
resource "mongodbatlas_cluster" "ecommerce_cluster" {
project_id = mongodbatlas_project.ecommerce.id
name = "ecommerce-cluster"
cluster_type = "REPLICASET"
provider_name = "AWS"
backing_provider_name = "AWS"
provider_region_name = "AP_NORTHEAST_1"
provider_instance_size_name = "M10"
mongo_db_major_version = "8.0"
auto_scaling_disk_gb_enabled = true
advanced_configuration {
javascript_enabled = true
minimum_enabled_tls_protocol = "TLS1_2"
no_table_scan = false
oplog_size_mb = 2048
sample_size_bi_connector = 110
sample_refresh_interval_bi_connector = 310
}
}
# データベースユーザー作成
resource "mongodbatlas_database_user" "app_user" {
username = "appUser"
password = var.atlas_user_password
project_id = mongodbatlas_project.ecommerce.id
auth_database_name = "admin"
roles {
role_name = "readWrite"
database_name = "ecommerce_db"
}
}
# IPアクセスリスト
resource "mongodbatlas_project_ip_access_list" "development" {
project_id = mongodbatlas_project.ecommerce.id
cidr_block = "0.0.0.0/0"
comment = "Development environment access"
}
# アラート設定
resource "mongodbatlas_alert_configuration" "high_cpu" {
project_id = mongodbatlas_project.ecommerce.id
event_type = "OUTSIDE_METRIC_THRESHOLD"
enabled = true
metric_threshold_config {
metric_name = "PROCESS_CPU_USER"
operator = "GREATER_THAN"
threshold = 80.0
units = "RAW"
mode = "AVERAGE"
}
notification {
type_name = "EMAIL"
email_address = "[email protected]"
interval_min = 30
delay_min = 0
}
}
# 出力
output "cluster_connection_string" {
value = mongodbatlas_cluster.ecommerce_cluster.connection_strings[0].standard_srv
sensitive = true
}
EOF
echo "MongoDB Atlas統合環境のセットアップが完了しました"