Google Cloud Functions
プラットフォーム
Google Cloud Functions (Cloud Run functions)
概要
Google Cloud Functions(現在はCloud Run functionsとして提供)は、Googleクラウドプラットフォームが提供する統合サーバーレス実行環境です。従来のCloud Functionsの使いやすいイベント駆動プログラミングモデルに、Cloud Runの細かい制御機能とスケーラビリティを組み合わせ、統一されたサーバーレスプラットフォームとして進化しました。2025年現在、第2世代(Gen 2)機能により最大16GBメモリ・4vCPU、1000並行リクエスト処理、1時間実行時間、GPU対応、そしてNode.js 22、Go 1.23、.NET 8などの最新ランタイムサポートにより、従来のサーバーレス制約を大幅に突破し、AI・機械学習から長時間データ処理まで対応する包括的なサーバーレスプラットフォームとして確立されています。
詳細
Google Cloud Functions 2025年版(Cloud Run functions)は、Cloud FunctionsとCloud Runのインフラストラクチャ統合により、単なる軽量なFaaS実行環境を超えた包括的なサーバーレスコンピューティングプラットフォームを提供しています。特に注目すべきは、第2世代機能による大幅な性能向上で、インスタンス並行性により1つのインスタンスで最大1000の同時リクエスト処理、16GBメモリ・4vCPUによる4倍のスケールアップ、1時間の長時間実行対応、そしてNVIDIA GPU対応によるAI・機械学習ワークロードの実行が可能です。高速ロールバック・段階的デプロイメント・ウォームアップインスタンス機能により本格的なプロダクション運用を支援し、Eventarc統合・Cloud Workflows連携・豊富なGoogleサービス統合により、モダンアプリケーション開発の包括的なソリューションを提供します。
主な特徴
- 第2世代統合: Cloud Runインフラとの統合による高性能実行環境
- 大規模スケーリング: 最大16GB・4vCPU、1000並行リクエスト対応
- GPU対応: NVIDIA GPU によるAI・機械学習ワークロード実行
- 豊富なトリガー: HTTP、Pub/Sub、Cloud Storage、Firestore、Eventarc等
- Google統合: BigQuery、Cloud Storage、Firestore等との深い統合
2025年の最新機能
- 最新ランタイム: Node.js 22、Go 1.23、.NET 8 GA対応
- GPU統合: NVIDIA GPU によるHugging Face モデル実行対応
- 拡張実行時間: HTTP関数の1時間実行サポート
- 高度制御: 段階的デプロイ、高速ロールバック、ウォームアップ機能
- 開発者体験: Eventarcサブタスク、デプロイメントトラッカー、パフォーマンス推奨機能
メリット・デメリット
メリット
- Cloud RunとFunctionsの統合による最高水準のサーバーレス体験
- GPU対応によるAI・機械学習ワークロードの実行能力
- Google クラウドエコシステムとの完全統合
- 大幅に向上した性能とスケーラビリティ(Gen2)
- 豊富なイベントソースとトリガーオプション
- 透明性の高い従量課金とコスト効率性
- 強力な開発ツールとデバッグ機能
デメリット
- Google Cloudエコシステムへの依存とベンダーロックイン
- 他のクラウドプラットフォームと比較して学習リソースが限定的
- 複雑な構成オプションによる初期設定の難しさ
- 長時間実行機能は従来のサーバーレス概念との乖離
- 企業での採用事例が他の主要プラットフォームより少ない
- 一部の高度な機能がまだプレビュー段階
参考ページ
- Cloud Run functions公式サイト
- Cloud Run functions ドキュメント
- Cloud Functions for Firebase
- Google Cloud CLI ドキュメント
書き方の例
セットアップと基本設定
# Google Cloud CLIのインストール
curl https://sdk.cloud.google.com | bash
exec -l $SHELL
# Google Cloud CLIの初期化
gcloud init
# 認証設定
gcloud auth login
gcloud auth application-default login
# プロジェクトの設定
gcloud config set project YOUR_PROJECT_ID
# Functions APIの有効化
gcloud services enable cloudfunctions.googleapis.com
gcloud services enable cloudbuild.googleapis.com
gcloud services enable artifactregistry.googleapis.com
# Functions Framework のインストール(Python)
pip install functions-framework
# Functions Framework のインストール(Node.js)
npm install @google-cloud/functions-framework
# 関数のローカル実行(Python)
functions-framework --target=hello_world --debug
# 関数のローカル実行(Node.js)
npx @google-cloud/functions-framework --target=helloWorld
# 関数のデプロイ(第2世代)
gcloud functions deploy my-function \
--gen2 \
--runtime=python312 \
--region=asia-northeast1 \
--source=. \
--entry-point=hello_world \
--trigger=http \
--memory=1Gi \
--cpu=1 \
--max-instances=100 \
--min-instances=1 \
--concurrency=80
# main.py - Python HTTP関数(第2世代)
import functions_framework
from flask import Request
import json
import logging
import os
from datetime import datetime
from google.cloud import logging as cloud_logging
# Cloud Loggingの設定
cloud_logging.Client().setup_logging()
@functions_framework.http
def hello_world(request: Request):
"""HTTP関数のエントリーポイント"""
# リクエスト情報の取得
request_json = request.get_json(silent=True)
request_args = request.args
# CORSヘッダーの設定
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization',
}
# プリフライトリクエストの処理
if request.method == 'OPTIONS':
return ('', 204, headers)
try:
# リクエストデータの処理
name = None
if request_json and 'name' in request_json:
name = request_json['name']
elif request_args and 'name' in request_args:
name = request_args['name']
else:
name = 'World'
# レスポンス作成
response_data = {
'message': f'Hello, {name}! from Google Cloud Functions',
'timestamp': datetime.utcnow().isoformat(),
'method': request.method,
'url': request.url,
'headers': dict(request.headers),
'environment': os.environ.get('FUNCTION_NAME', 'local'),
'memory': os.environ.get('FUNCTION_MEMORY_MB', '128'),
'region': os.environ.get('FUNCTION_REGION', 'unknown')
}
# ログ出力
logging.info(f'Request processed successfully for: {name}')
return (json.dumps(response_data, ensure_ascii=False), 200, headers)
except Exception as e:
logging.error(f'Error processing request: {str(e)}')
error_response = {
'error': 'Internal server error',
'timestamp': datetime.utcnow().isoformat()
}
return (json.dumps(error_response), 500, headers)
HTTP APIとルーティング
// index.js - Node.js HTTP関数(第2世代)
const functions = require('@google-cloud/functions-framework');
const { CloudLogging } = require('@google-cloud/logging');
// Cloud Logging設定
const logging = new CloudLogging();
const log = logging.log('cloud-functions');
// ユーザー管理API
functions.http('userApi', async (req, res) => {
console.log('User API function processed a request.');
// CORS設定
res.set('Access-Control-Allow-Origin', '*');
res.set('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS');
res.set('Access-Control-Allow-Headers', 'Content-Type, Authorization');
// プリフライトリクエスト処理
if (req.method === 'OPTIONS') {
return res.status(204).send('');
}
try {
const method = req.method;
const path = req.path;
const query = req.query;
// ルーティング処理
if (method === 'GET' && path === '/users') {
return handleGetUsers(req, res);
} else if (method === 'POST' && path === '/users') {
return handleCreateUser(req, res);
} else if (method === 'GET' && path.startsWith('/users/')) {
const userId = path.split('/')[2];
return handleGetUser(req, res, userId);
} else {
return res.status(404).json({ error: 'Not found' });
}
} catch (error) {
console.error('Error in userApi:', error);
// Cloud Loggingへエラー記録
const metadata = {
resource: { type: 'cloud_function' },
severity: 'ERROR'
};
const entry = log.entry(metadata, {
message: error.message,
stack: error.stack,
function: 'userApi'
});
await log.write(entry);
return res.status(500).json({ error: 'Internal server error' });
}
});
async function handleGetUsers(req, res) {
const users = [
{ id: 1, name: 'Alice', email: '[email protected]' },
{ id: 2, name: 'Bob', email: '[email protected]' },
{ id: 3, name: 'Charlie', email: '[email protected]' }
];
const response = {
users: users,
total: users.length,
timestamp: new Date().toISOString(),
region: process.env.FUNCTION_REGION || 'unknown'
};
return res.status(200).json(response);
}
async function handleCreateUser(req, res) {
const { name, email } = req.body;
if (!name || !email) {
return res.status(400).json({ error: 'Name and email are required' });
}
const newUser = {
id: Math.floor(Math.random() * 10000),
name: name,
email: email,
createdAt: new Date().toISOString()
};
console.log('Created new user:', newUser);
return res.status(201).json({
message: 'User created successfully',
user: newUser
});
}
async function handleGetUser(req, res, userId) {
const user = {
id: parseInt(userId),
name: `User ${userId}`,
email: `user${userId}@example.com`,
createdAt: new Date().toISOString()
};
return res.status(200).json(user);
}
// ヘルスチェック関数
functions.http('healthCheck', (req, res) => {
const healthInfo = {
status: 'healthy',
timestamp: new Date().toISOString(),
version: process.env.K_REVISION || '1.0.0',
service: process.env.K_SERVICE || 'cloud-function',
memory: process.env.FUNCTION_MEMORY_MB || '256MB',
timeout: process.env.FUNCTION_TIMEOUT_SEC || '60s'
};
res.status(200).json(healthInfo);
});
データベース統合とデータ処理
# main.py - Firestore & BigQuery統合
import functions_framework
from google.cloud import firestore
from google.cloud import bigquery
from google.cloud import storage
import json
import logging
from datetime import datetime
# クライアント初期化
db = firestore.Client()
bq_client = bigquery.Client()
storage_client = storage.Client()
@functions_framework.http
def data_processing_api(request):
"""データ処理API"""
if request.method == 'OPTIONS':
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization',
}
return ('', 204, headers)
try:
if request.method == 'POST':
return create_data_entry(request)
elif request.method == 'GET':
return get_data_entries(request)
else:
return json.dumps({'error': 'Method not allowed'}), 405
except Exception as e:
logging.error(f'Error in data_processing_api: {str(e)}')
return json.dumps({'error': 'Internal server error'}), 500
def create_data_entry(request):
"""データエントリの作成"""
request_json = request.get_json()
if not request_json:
return json.dumps({'error': 'Invalid JSON'}), 400
# Firestoreに保存
doc_ref = db.collection('entries').document()
data = {
'title': request_json.get('title'),
'content': request_json.get('content'),
'category': request_json.get('category'),
'created_at': datetime.utcnow(),
'updated_at': datetime.utcnow()
}
doc_ref.set(data)
# BigQueryにも分析用データとして保存
table_id = "your-project.analytics.entries"
bq_data = {
'entry_id': doc_ref.id,
'title': data['title'],
'category': data['category'],
'created_at': data['created_at'].isoformat(),
'char_count': len(data['content']) if data['content'] else 0
}
table = bq_client.get_table(table_id)
errors = bq_client.insert_rows_json(table, [bq_data])
if errors:
logging.warning(f'BigQuery insert errors: {errors}')
return json.dumps({
'message': 'Entry created successfully',
'id': doc_ref.id,
'data': data
}, default=str), 201
def get_data_entries(request):
"""データエントリの取得"""
category = request.args.get('category')
limit = int(request.args.get('limit', 10))
# Firestoreから取得
query = db.collection('entries')
if category:
query = query.where('category', '==', category)
docs = query.order_by('created_at', direction=firestore.Query.DESCENDING).limit(limit).stream()
entries = []
for doc in docs:
entry = doc.to_dict()
entry['id'] = doc.id
entries.append(entry)
return json.dumps({
'entries': entries,
'count': len(entries),
'timestamp': datetime.utcnow().isoformat()
}, default=str), 200
@functions_framework.cloud_event
def process_storage_file(cloud_event):
"""Cloud Storage ファイル処理"""
# Cloud Storage イベントデータ
data = cloud_event.data
bucket_name = data['bucket']
file_name = data['name']
logging.info(f'Processing file: {file_name} from bucket: {bucket_name}')
try:
# ファイルダウンロード
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
# ファイル内容の処理(テキストファイルの場合)
if file_name.endswith('.txt'):
content = blob.download_as_text()
# 分析結果をFirestoreに保存
analysis_doc = {
'file_name': file_name,
'bucket_name': bucket_name,
'file_size': blob.size,
'word_count': len(content.split()) if content else 0,
'char_count': len(content) if content else 0,
'processed_at': datetime.utcnow()
}
db.collection('file_analysis').add(analysis_doc)
logging.info(f'File analysis completed for: {file_name}')
# 画像ファイルの場合(基本情報のみ)
elif file_name.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')):
image_doc = {
'file_name': file_name,
'bucket_name': bucket_name,
'file_size': blob.size,
'content_type': blob.content_type,
'processed_at': datetime.utcnow()
}
db.collection('image_files').add(image_doc)
logging.info(f'Image file processed: {file_name}')
except Exception as e:
logging.error(f'Error processing file {file_name}: {str(e)}')
@functions_framework.http
def bigquery_analytics(request):
"""BigQueryを使用した分析API"""
if request.method == 'OPTIONS':
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type',
}
return ('', 204, headers)
try:
# カテゴリ別エントリ数の分析
query = """
SELECT
category,
COUNT(*) as entry_count,
AVG(char_count) as avg_char_count,
DATE(created_at) as date
FROM `your-project.analytics.entries`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY category, DATE(created_at)
ORDER BY date DESC, entry_count DESC
"""
query_job = bq_client.query(query)
results = query_job.result()
analytics_data = []
for row in results:
analytics_data.append({
'category': row.category,
'entry_count': row.entry_count,
'avg_char_count': float(row.avg_char_count) if row.avg_char_count else 0,
'date': row.date.isoformat()
})
return json.dumps({
'analytics': analytics_data,
'generated_at': datetime.utcnow().isoformat()
}), 200
except Exception as e:
logging.error(f'Error in bigquery_analytics: {str(e)}')
return json.dumps({'error': 'Analytics processing failed'}), 500
認証とセキュリティ
# main.py - IAMと認証統合
import functions_framework
from google.auth.transport import requests
from google.oauth2 import id_token
from google.cloud import secretmanager
import jwt
import json
import logging
from datetime import datetime, timedelta
import os
# Secret Manager クライアント
secret_client = secretmanager.SecretManagerServiceClient()
def get_secret(secret_name):
"""Secret Managerからシークレットを取得"""
project_id = os.environ.get('GCP_PROJECT')
name = f"projects/{project_id}/secrets/{secret_name}/versions/latest"
response = secret_client.access_secret_version(request={"name": name})
return response.payload.data.decode("UTF-8")
@functions_framework.http
def protected_api(request):
"""認証が必要なAPI"""
# CORS設定
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization',
}
if request.method == 'OPTIONS':
return ('', 204, headers)
try:
# トークン検証
user_info = verify_token(request)
if not user_info:
return json.dumps({'error': 'Unauthorized'}), 401, headers
# API処理
if request.method == 'GET':
return handle_protected_get(user_info)
elif request.method == 'POST':
return handle_protected_post(request, user_info)
else:
return json.dumps({'error': 'Method not allowed'}), 405, headers
except Exception as e:
logging.error(f'Error in protected_api: {str(e)}')
return json.dumps({'error': 'Internal server error'}), 500, headers
def verify_token(request):
"""JWT トークンの検証"""
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return None
token = auth_header.split('Bearer ')[1]
try:
# Google ID トークンの検証
idinfo = id_token.verify_oauth2_token(
token, requests.Request(), os.environ.get('GOOGLE_CLIENT_ID')
)
# トークンの発行者確認
if idinfo['iss'] not in ['accounts.google.com', 'https://accounts.google.com']:
raise ValueError('Invalid issuer')
return idinfo
except ValueError as e:
logging.warning(f'Token verification failed: {str(e)}')
# カスタムJWTトークンの検証(フォールバック)
try:
jwt_secret = get_secret('jwt-secret')
decoded = jwt.decode(token, jwt_secret, algorithms=['HS256'])
return decoded
except Exception as jwt_error:
logging.error(f'Custom JWT verification failed: {str(jwt_error)}')
return None
def handle_protected_get(user_info):
"""認証済みGETリクエストの処理"""
user_data = {
'user_id': user_info.get('sub'),
'email': user_info.get('email'),
'name': user_info.get('name'),
'access_time': datetime.utcnow().isoformat(),
'permissions': ['read', 'write'] # 実際の実装では適切な権限チェック
}
return json.dumps({
'message': 'Access granted',
'user': user_data,
'data': 'Protected data here'
}), 200
def handle_protected_post(request, user_info):
"""認証済みPOSTリクエストの処理"""
request_json = request.get_json()
if not request_json:
return json.dumps({'error': 'Invalid JSON'}), 400
# ユーザー権限の確認
if not has_write_permission(user_info):
return json.dumps({'error': 'Insufficient permissions'}), 403
# データ処理
result = {
'processed_by': user_info.get('email'),
'data': request_json,
'timestamp': datetime.utcnow().isoformat()
}
return json.dumps({
'message': 'Data processed successfully',
'result': result
}), 201
def has_write_permission(user_info):
"""書き込み権限の確認"""
# 実際の実装では、IAMやカスタム権限システムとの統合
allowed_domains = ['example.com', 'mycompany.com']
user_email = user_info.get('email', '')
return any(user_email.endswith(f'@{domain}') for domain in allowed_domains)
@functions_framework.http
def login_api(request):
"""ログインAPI(カスタムJWT発行)"""
if request.method == 'OPTIONS':
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'POST, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type',
}
return ('', 204, headers)
try:
request_json = request.get_json()
email = request_json.get('email')
password = request_json.get('password')
# 認証処理(実際の実装では適切な認証システムを使用)
if authenticate_user(email, password):
# JWTトークン生成
jwt_secret = get_secret('jwt-secret')
payload = {
'sub': f'user_{hash(email) % 10000}',
'email': email,
'name': email.split('@')[0],
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(hours=24)
}
token = jwt.encode(payload, jwt_secret, algorithm='HS256')
return json.dumps({
'token': token,
'user': {
'email': email,
'name': payload['name']
},
'expires_in': 86400 # 24時間
}), 200
else:
return json.dumps({'error': 'Invalid credentials'}), 401
except Exception as e:
logging.error(f'Error in login_api: {str(e)}')
return json.dumps({'error': 'Login failed'}), 500
def authenticate_user(email, password):
"""ユーザー認証(サンプル実装)"""
# 実際の実装では、データベースやディレクトリサービスとの連携
return email == '[email protected]' and password == 'password123'
イベント駆動アーキテクチャ
// index.js - Pub/Sub & Eventarc統合
const { PubSub } = require('@google-cloud/pubsub');
const { CloudSchedulerClient } = require('@google-cloud/scheduler');
const { CloudTasksClient } = require('@google-cloud/tasks');
const functions = require('@google-cloud/functions-framework');
const pubsub = new PubSub();
const scheduler = new CloudSchedulerClient();
const tasks = new CloudTasksClient();
// Pub/Sub トリガー関数
functions.cloudEvent('processPubSubMessage', async (cloudEvent) => {
console.log('Processing Pub/Sub message:', cloudEvent);
try {
// メッセージデータの取得
const messageData = cloudEvent.data;
const message = Buffer.from(messageData.message.data, 'base64').toString();
const attributes = messageData.message.attributes || {};
console.log('Message content:', message);
console.log('Message attributes:', attributes);
// メッセージタイプに応じた処理
const messageType = attributes.type || 'default';
switch (messageType) {
case 'user_registration':
await handleUserRegistration(JSON.parse(message));
break;
case 'order_processing':
await handleOrderProcessing(JSON.parse(message));
break;
case 'notification':
await handleNotification(JSON.parse(message));
break;
default:
console.log('Unknown message type:', messageType);
}
console.log('Message processed successfully');
} catch (error) {
console.error('Error processing Pub/Sub message:', error);
// Dead Letter Queueへの送信(エラー処理)
const dlqTopic = 'error-dlq';
const errorMessage = {
originalMessage: cloudEvent.data,
error: error.message,
timestamp: new Date().toISOString()
};
await pubsub.topic(dlqTopic).publishMessage({
data: Buffer.from(JSON.stringify(errorMessage))
});
throw error; // 再試行のためにエラーを再スロー
}
});
async function handleUserRegistration(userData) {
console.log('Processing user registration:', userData);
// ユーザー処理ロジック
const tasks = [
sendWelcomeEmail(userData.email),
createUserProfile(userData),
setupDefaultPreferences(userData.userId)
];
await Promise.all(tasks);
// 後続処理のためのメッセージ発行
await pubsub.topic('user-onboarding').publishMessage({
data: Buffer.from(JSON.stringify({
userId: userData.userId,
step: 'registration_complete',
timestamp: new Date().toISOString()
})),
attributes: {
type: 'onboarding_step'
}
});
}
async function handleOrderProcessing(orderData) {
console.log('Processing order:', orderData);
// 在庫確認
const inventoryCheck = await checkInventory(orderData.items);
if (!inventoryCheck.available) {
throw new Error('Insufficient inventory');
}
// 支払い処理
const paymentResult = await processPayment(orderData.payment);
if (!paymentResult.success) {
throw new Error('Payment failed');
}
// 配送準備
await prepareShipment(orderData);
// 完了通知
await pubsub.topic('order-updates').publishMessage({
data: Buffer.from(JSON.stringify({
orderId: orderData.orderId,
status: 'processed',
timestamp: new Date().toISOString()
})),
attributes: {
type: 'order_status_update'
}
});
}
// Cloud Storage トリガー関数
functions.cloudEvent('processStorageEvent', async (cloudEvent) => {
console.log('Processing Storage event:', cloudEvent);
const data = cloudEvent.data;
const bucketName = data.bucket;
const fileName = data.name;
const eventType = cloudEvent.type;
console.log(`File ${fileName} in bucket ${bucketName}: ${eventType}`);
try {
if (eventType === 'google.cloud.storage.object.v1.finalized') {
// ファイルアップロード処理
await handleFileUpload(bucketName, fileName);
} else if (eventType === 'google.cloud.storage.object.v1.deleted') {
// ファイル削除処理
await handleFileDelete(bucketName, fileName);
}
} catch (error) {
console.error('Error processing storage event:', error);
throw error;
}
});
async function handleFileUpload(bucketName, fileName) {
console.log(`Processing uploaded file: ${fileName}`);
// ファイルタイプに基づく処理
if (fileName.endsWith('.csv')) {
// CSVファイルの処理タスクをキューに追加
await scheduleCSVProcessing(bucketName, fileName);
} else if (fileName.match(/\.(jpg|jpeg|png)$/i)) {
// 画像処理タスクをキューに追加
await scheduleImageProcessing(bucketName, fileName);
}
// ファイル処理通知
await pubsub.topic('file-processing').publishMessage({
data: Buffer.from(JSON.stringify({
bucket: bucketName,
file: fileName,
action: 'uploaded',
timestamp: new Date().toISOString()
})),
attributes: {
type: 'file_event'
}
});
}
// Cloud Tasks を使用した遅延処理
async function scheduleCSVProcessing(bucketName, fileName) {
const projectId = process.env.GCP_PROJECT;
const queueName = 'csv-processing-queue';
const location = 'asia-northeast1';
const url = `https://asia-northeast1-${projectId}.cloudfunctions.net/processCSVFile`;
const parent = tasks.queuePath(projectId, location, queueName);
const task = {
httpRequest: {
httpMethod: 'POST',
url: url,
headers: {
'Content-Type': 'application/json'
},
body: Buffer.from(JSON.stringify({
bucket: bucketName,
file: fileName
}))
},
scheduleTime: {
seconds: Math.floor(Date.now() / 1000) + 60 // 1分後に実行
}
};
const [response] = await tasks.createTask({ parent, task });
console.log(`Created task: ${response.name}`);
}
// ワークフロー管理関数
functions.http('workflowManager', async (req, res) => {
console.log('Workflow manager called');
try {
const { action, data } = req.body;
switch (action) {
case 'start_workflow':
await startWorkflow(data);
res.status(200).json({ message: 'Workflow started', workflowId: data.workflowId });
break;
case 'pause_workflow':
await pauseWorkflow(data.workflowId);
res.status(200).json({ message: 'Workflow paused' });
break;
case 'resume_workflow':
await resumeWorkflow(data.workflowId);
res.status(200).json({ message: 'Workflow resumed' });
break;
default:
res.status(400).json({ error: 'Unknown action' });
}
} catch (error) {
console.error('Workflow manager error:', error);
res.status(500).json({ error: 'Workflow operation failed' });
}
});
async function startWorkflow(workflowData) {
// ワークフローの初期化
console.log('Starting workflow:', workflowData.workflowId);
// 各ステップをPub/Subメッセージとして発行
const steps = workflowData.steps || [];
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
await pubsub.topic('workflow-steps').publishMessage({
data: Buffer.from(JSON.stringify({
workflowId: workflowData.workflowId,
stepIndex: i,
stepData: step,
totalSteps: steps.length
})),
attributes: {
type: 'workflow_step',
workflowId: workflowData.workflowId
}
});
}
}
// ヘルパー関数(サンプル実装)
async function sendWelcomeEmail(email) {
console.log(`Sending welcome email to: ${email}`);
// 実際の実装では適切なメール送信サービスを使用
}
async function createUserProfile(userData) {
console.log('Creating user profile:', userData.userId);
// 実際の実装ではデータベースへの保存
}
async function setupDefaultPreferences(userId) {
console.log('Setting up preferences for user:', userId);
// 実際の実装では設定の初期化
}
async function checkInventory(items) {
// 在庫確認の実装
return { available: true };
}
async function processPayment(paymentData) {
// 支払い処理の実装
return { success: true };
}
async function prepareShipment(orderData) {
console.log('Preparing shipment for order:', orderData.orderId);
// 配送準備の実装
}
監視とパフォーマンス最適化
# main.py - 監視とパフォーマンス最適化
import functions_framework
from google.cloud import monitoring_v3
from google.cloud import logging
from google.cloud import error_reporting
from google.cloud import trace_v1
import json
import time
import os
from datetime import datetime, timedelta
import psutil
# クライアント初期化
monitoring_client = monitoring_v3.MetricServiceClient()
logging_client = logging.Client()
error_client = error_reporting.Client()
@functions_framework.http
def performance_monitor(request):
"""パフォーマンス監視API"""
if request.method == 'OPTIONS':
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type',
}
return ('', 204, headers)
try:
start_time = time.time()
# システムメトリクスの収集
metrics = collect_system_metrics()
# カスタムメトリクスの送信
send_custom_metrics(metrics)
# 実行時間の記録
execution_time = time.time() - start_time
response_data = {
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'metrics': metrics,
'execution_time_ms': execution_time * 1000,
'function_info': {
'name': os.environ.get('K_SERVICE', 'unknown'),
'revision': os.environ.get('K_REVISION', 'unknown'),
'memory': os.environ.get('FUNCTION_MEMORY_MB', 'unknown'),
'timeout': os.environ.get('FUNCTION_TIMEOUT_SEC', 'unknown')
}
}
return json.dumps(response_data), 200
except Exception as e:
# エラーレポートの送信
error_client.report_exception()
return json.dumps({
'status': 'error',
'error': str(e),
'timestamp': datetime.utcnow().isoformat()
}), 500
def collect_system_metrics():
"""システムメトリクスの収集"""
try:
# メモリ使用率
memory_info = psutil.virtual_memory()
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
# ディスク使用率
disk_info = psutil.disk_usage('/')
metrics = {
'memory': {
'total': memory_info.total,
'available': memory_info.available,
'percent': memory_info.percent,
'used': memory_info.used
},
'cpu': {
'percent': cpu_percent,
'count': psutil.cpu_count()
},
'disk': {
'total': disk_info.total,
'used': disk_info.used,
'free': disk_info.free,
'percent': (disk_info.used / disk_info.total) * 100
}
}
return metrics
except Exception as e:
print(f'Error collecting system metrics: {str(e)}')
return {}
def send_custom_metrics(metrics):
"""Cloud Monitoringへカスタムメトリクスを送信"""
project_id = os.environ.get('GCP_PROJECT')
if not project_id or not metrics:
return
try:
project_name = f"projects/{project_id}"
# メトリクス時系列の作成
series = monitoring_v3.TimeSeries()
series.metric.type = "custom.googleapis.com/function/memory_usage"
series.resource.type = "cloud_function"
series.resource.labels["function_name"] = os.environ.get('K_SERVICE', 'unknown')
series.resource.labels["region"] = os.environ.get('FUNCTION_REGION', 'unknown')
# データポイントの追加
now = time.time()
seconds = int(now)
nanos = int((now - seconds) * 10 ** 9)
interval = monitoring_v3.TimeInterval({
"end_time": {"seconds": seconds, "nanos": nanos}
})
point = monitoring_v3.Point({
"interval": interval,
"value": {"double_value": metrics.get('memory', {}).get('percent', 0)}
})
series.points = [point]
# メトリクスの送信
monitoring_client.create_time_series(
name=project_name,
time_series=[series]
)
except Exception as e:
print(f'Error sending custom metrics: {str(e)}')
@functions_framework.http
def error_tracking(request):
"""エラートラッキングシステム"""
try:
# 意図的にエラーを発生させるテスト
if request.args.get('test_error') == 'true':
raise ValueError('This is a test error for demonstration')
# 正常なレスポンス
return json.dumps({
'message': 'Error tracking system is working',
'timestamp': datetime.utcnow().isoformat()
}), 200
except Exception as e:
# エラーレポートの送信
error_client.report_exception()
# 構造化ログの記録
log_entry = {
'severity': 'ERROR',
'message': str(e),
'function_name': os.environ.get('K_SERVICE', 'unknown'),
'timestamp': datetime.utcnow().isoformat(),
'trace': request.headers.get('X-Cloud-Trace-Context', ''),
'user_agent': request.headers.get('User-Agent', ''),
'remote_ip': request.headers.get('X-Forwarded-For', request.remote_addr)
}
logger = logging_client.logger('cloud-functions-errors')
logger.log_struct(log_entry)
return json.dumps({
'error': 'An error occurred',
'timestamp': datetime.utcnow().isoformat()
}), 500
@functions_framework.http
def performance_analytics(request):
"""パフォーマンス分析API"""
try:
# パフォーマンス分析の実行
analysis = analyze_function_performance()
return json.dumps({
'analysis': analysis,
'recommendations': generate_recommendations(analysis),
'timestamp': datetime.utcnow().isoformat()
}), 200
except Exception as e:
return json.dumps({
'error': 'Performance analysis failed',
'details': str(e)
}), 500
def analyze_function_performance():
"""関数パフォーマンスの分析"""
# 実際の実装では、Cloud Monitoringからデータを取得
return {
'avg_response_time': 150, # ms
'cold_start_frequency': 15, # %
'memory_utilization': 65, # %
'cpu_utilization': 45, # %
'error_rate': 2.3, # %
'invocation_count_24h': 1250
}
def generate_recommendations(analysis):
"""パフォーマンス改善の推奨事項"""
recommendations = []
if analysis['cold_start_frequency'] > 10:
recommendations.append({
'type': 'cold_start',
'message': 'Cold startが頻繁に発生しています。最小インスタンス数の設定を検討してください。',
'action': 'Set minimum instances to 1-2'
})
if analysis['memory_utilization'] > 80:
recommendations.append({
'type': 'memory',
'message': 'メモリ使用率が高いです。メモリ割り当ての増加を検討してください。',
'action': 'Increase memory allocation'
})
if analysis['avg_response_time'] > 1000:
recommendations.append({
'type': 'performance',
'message': 'レスポンス時間が長いです。コードの最適化を検討してください。',
'action': 'Optimize code performance'
})
return recommendations
@functions_framework.http
def health_check(request):
"""ヘルスチェック関数"""
health_status = {
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'version': os.environ.get('K_REVISION', '1.0.0'),
'uptime': get_uptime(),
'dependencies': check_dependencies(),
'environment': {
'function_name': os.environ.get('K_SERVICE'),
'region': os.environ.get('FUNCTION_REGION'),
'memory': os.environ.get('FUNCTION_MEMORY_MB'),
'timeout': os.environ.get('FUNCTION_TIMEOUT_SEC')
}
}
# 依存関係の健全性チェック
all_healthy = all(dep['healthy'] for dep in health_status['dependencies'])
if not all_healthy:
health_status['status'] = 'degraded'
return json.dumps(health_status), 503
return json.dumps(health_status), 200
def get_uptime():
"""アップタイムの取得(サンプル)"""
# Cloud Functionsはステートレスなので、実際のアップタイムは測定困難
return "N/A (Stateless)"
def check_dependencies():
"""依存サービスの健全性チェック"""
dependencies = []
# Firestore接続チェック
try:
from google.cloud import firestore
db = firestore.Client()
# 簡単なクエリでテスト
list(db.collection('health_check').limit(1).stream())
dependencies.append({'name': 'Firestore', 'healthy': True})
except Exception:
dependencies.append({'name': 'Firestore', 'healthy': False})
# Cloud Storage接続チェック
try:
from google.cloud import storage
client = storage.Client()
# バケット一覧の取得でテスト
list(client.list_buckets(max_results=1))
dependencies.append({'name': 'Cloud Storage', 'healthy': True})
except Exception:
dependencies.append({'name': 'Cloud Storage', 'healthy': False})
return dependencies