Google Cloud Functions
Platform
Google Cloud Functions (Cloud Run functions)
Overview
Google Cloud Functions (now offered as Cloud Run functions) is a unified serverless execution environment provided by Google Cloud Platform. Evolving from the familiar event-driven programming model of traditional Cloud Functions, it combines fine-grained control capabilities and scalability of Cloud Run into a unified serverless platform. As of 2025, second-generation (Gen 2) features enable up to 16GB memory/4vCPU, 1000 concurrent request processing, 1-hour execution time, GPU support, and support for the latest runtimes including Node.js 22, Go 1.23, and .NET 8, significantly breaking through traditional serverless constraints to establish a comprehensive serverless platform supporting everything from AI/machine learning to long-running data processing.
Details
Google Cloud Functions 2025 edition (Cloud Run functions) provides a comprehensive serverless computing platform that goes beyond mere lightweight FaaS execution environments through the infrastructure integration of Cloud Functions and Cloud Run. Particularly noteworthy is the significant performance improvement through second-generation features, with instance concurrency enabling up to 1000 concurrent request processing per instance, 4x scale-up with 16GB memory/4vCPU, 1-hour long-running execution support, and NVIDIA GPU support enabling AI/machine learning workload execution. With fast rollback, gradual deployment, and warm-up instance features supporting full production operations, and comprehensive solutions for modern application development through Eventarc integration, Cloud Workflows connectivity, and rich Google service integration.
Key Features
- Gen 2 Integration: High-performance execution environment through Cloud Run infrastructure integration
- Large-scale Scaling: Up to 16GB/4vCPU, 1000 concurrent request support
- GPU Support: AI/machine learning workload execution with NVIDIA GPU
- Rich Triggers: HTTP, Pub/Sub, Cloud Storage, Firestore, Eventarc, and more
- Google Integration: Deep integration with BigQuery, Cloud Storage, Firestore, and more
Latest 2025 Features
- Latest Runtimes: Node.js 22, Go 1.23, .NET 8 GA support
- GPU Integration: Hugging Face model execution support with NVIDIA GPU
- Extended Execution Time: 1-hour execution support for HTTP functions
- Advanced Controls: Gradual deployment, fast rollback, warm-up capabilities
- Developer Experience: Eventarc subtasks, deployment tracker, performance recommendation features
Advantages and Disadvantages
Advantages
- Premier serverless experience through Cloud Run and Functions integration
- AI/machine learning workload execution capability with GPU support
- Complete integration with Google Cloud ecosystem
- Significantly improved performance and scalability (Gen2)
- Rich event sources and trigger options
- Transparent pay-as-you-go pricing and cost efficiency
- Powerful development tools and debugging capabilities
Disadvantages
- Google Cloud ecosystem dependency and vendor lock-in
- Limited learning resources compared to other major cloud platforms
- Initial setup complexity due to complex configuration options
- Long-running execution features deviate from traditional serverless concepts
- Fewer enterprise adoption cases compared to other major platforms
- Some advanced features still in preview stage
Reference Pages
- Cloud Run functions Official Site
- Cloud Run functions Documentation
- Cloud Functions for Firebase
- Google Cloud CLI Documentation
Code Examples
Setup and Basic Configuration
# Install Google Cloud CLI
curl https://sdk.cloud.google.com | bash
exec -l $SHELL
# Initialize Google Cloud CLI
gcloud init
# Authentication setup
gcloud auth login
gcloud auth application-default login
# Set project
gcloud config set project YOUR_PROJECT_ID
# Enable Functions API
gcloud services enable cloudfunctions.googleapis.com
gcloud services enable cloudbuild.googleapis.com
gcloud services enable artifactregistry.googleapis.com
# Install Functions Framework (Python)
pip install functions-framework
# Install Functions Framework (Node.js)
npm install @google-cloud/functions-framework
# Run function locally (Python)
functions-framework --target=hello_world --debug
# Run function locally (Node.js)
npx @google-cloud/functions-framework --target=helloWorld
# Deploy function (Gen 2)
gcloud functions deploy my-function \
--gen2 \
--runtime=python312 \
--region=us-central1 \
--source=. \
--entry-point=hello_world \
--trigger=http \
--memory=1Gi \
--cpu=1 \
--max-instances=100 \
--min-instances=1 \
--concurrency=80
# main.py - Python HTTP Function (Gen 2)
import functions_framework
from flask import Request
import json
import logging
import os
from datetime import datetime
from google.cloud import logging as cloud_logging
# Cloud Logging setup
cloud_logging.Client().setup_logging()
@functions_framework.http
def hello_world(request: Request):
"""HTTP function entry point"""
# Get request information
request_json = request.get_json(silent=True)
request_args = request.args
# CORS headers setup
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization',
}
# Handle preflight requests
if request.method == 'OPTIONS':
return ('', 204, headers)
try:
# Process request data
name = None
if request_json and 'name' in request_json:
name = request_json['name']
elif request_args and 'name' in request_args:
name = request_args['name']
else:
name = 'World'
# Create response
response_data = {
'message': f'Hello, {name}! from Google Cloud Functions',
'timestamp': datetime.utcnow().isoformat(),
'method': request.method,
'url': request.url,
'headers': dict(request.headers),
'environment': os.environ.get('FUNCTION_NAME', 'local'),
'memory': os.environ.get('FUNCTION_MEMORY_MB', '128'),
'region': os.environ.get('FUNCTION_REGION', 'unknown')
}
# Log output
logging.info(f'Request processed successfully for: {name}')
return (json.dumps(response_data, ensure_ascii=False), 200, headers)
except Exception as e:
logging.error(f'Error processing request: {str(e)}')
error_response = {
'error': 'Internal server error',
'timestamp': datetime.utcnow().isoformat()
}
return (json.dumps(error_response), 500, headers)
HTTP APIs and Routing
// index.js - Node.js HTTP Function (Gen 2)
const functions = require('@google-cloud/functions-framework');
const { CloudLogging } = require('@google-cloud/logging');
// Cloud Logging setup
const logging = new CloudLogging();
const log = logging.log('cloud-functions');
// User management API
functions.http('userApi', async (req, res) => {
console.log('User API function processed a request.');
// CORS setup
res.set('Access-Control-Allow-Origin', '*');
res.set('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS');
res.set('Access-Control-Allow-Headers', 'Content-Type, Authorization');
// Handle preflight requests
if (req.method === 'OPTIONS') {
return res.status(204).send('');
}
try {
const method = req.method;
const path = req.path;
const query = req.query;
// Routing logic
if (method === 'GET' && path === '/users') {
return handleGetUsers(req, res);
} else if (method === 'POST' && path === '/users') {
return handleCreateUser(req, res);
} else if (method === 'GET' && path.startsWith('/users/')) {
const userId = path.split('/')[2];
return handleGetUser(req, res, userId);
} else {
return res.status(404).json({ error: 'Not found' });
}
} catch (error) {
console.error('Error in userApi:', error);
// Log error to Cloud Logging
const metadata = {
resource: { type: 'cloud_function' },
severity: 'ERROR'
};
const entry = log.entry(metadata, {
message: error.message,
stack: error.stack,
function: 'userApi'
});
await log.write(entry);
return res.status(500).json({ error: 'Internal server error' });
}
});
async function handleGetUsers(req, res) {
const users = [
{ id: 1, name: 'Alice', email: '[email protected]' },
{ id: 2, name: 'Bob', email: '[email protected]' },
{ id: 3, name: 'Charlie', email: '[email protected]' }
];
const response = {
users: users,
total: users.length,
timestamp: new Date().toISOString(),
region: process.env.FUNCTION_REGION || 'unknown'
};
return res.status(200).json(response);
}
async function handleCreateUser(req, res) {
const { name, email } = req.body;
if (!name || !email) {
return res.status(400).json({ error: 'Name and email are required' });
}
const newUser = {
id: Math.floor(Math.random() * 10000),
name: name,
email: email,
createdAt: new Date().toISOString()
};
console.log('Created new user:', newUser);
return res.status(201).json({
message: 'User created successfully',
user: newUser
});
}
async function handleGetUser(req, res, userId) {
const user = {
id: parseInt(userId),
name: `User ${userId}`,
email: `user${userId}@example.com`,
createdAt: new Date().toISOString()
};
return res.status(200).json(user);
}
// Health check function
functions.http('healthCheck', (req, res) => {
const healthInfo = {
status: 'healthy',
timestamp: new Date().toISOString(),
version: process.env.K_REVISION || '1.0.0',
service: process.env.K_SERVICE || 'cloud-function',
memory: process.env.FUNCTION_MEMORY_MB || '256MB',
timeout: process.env.FUNCTION_TIMEOUT_SEC || '60s'
};
res.status(200).json(healthInfo);
});
Database Integration and Data Processing
# main.py - Firestore & BigQuery Integration
import functions_framework
from google.cloud import firestore
from google.cloud import bigquery
from google.cloud import storage
import json
import logging
from datetime import datetime
# Initialize clients
db = firestore.Client()
bq_client = bigquery.Client()
storage_client = storage.Client()
@functions_framework.http
def data_processing_api(request):
"""Data processing API"""
if request.method == 'OPTIONS':
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization',
}
return ('', 204, headers)
try:
if request.method == 'POST':
return create_data_entry(request)
elif request.method == 'GET':
return get_data_entries(request)
else:
return json.dumps({'error': 'Method not allowed'}), 405
except Exception as e:
logging.error(f'Error in data_processing_api: {str(e)}')
return json.dumps({'error': 'Internal server error'}), 500
def create_data_entry(request):
"""Create data entry"""
request_json = request.get_json()
if not request_json:
return json.dumps({'error': 'Invalid JSON'}), 400
# Save to Firestore
doc_ref = db.collection('entries').document()
data = {
'title': request_json.get('title'),
'content': request_json.get('content'),
'category': request_json.get('category'),
'created_at': datetime.utcnow(),
'updated_at': datetime.utcnow()
}
doc_ref.set(data)
# Also save to BigQuery for analytics
table_id = "your-project.analytics.entries"
bq_data = {
'entry_id': doc_ref.id,
'title': data['title'],
'category': data['category'],
'created_at': data['created_at'].isoformat(),
'char_count': len(data['content']) if data['content'] else 0
}
table = bq_client.get_table(table_id)
errors = bq_client.insert_rows_json(table, [bq_data])
if errors:
logging.warning(f'BigQuery insert errors: {errors}')
return json.dumps({
'message': 'Entry created successfully',
'id': doc_ref.id,
'data': data
}, default=str), 201
def get_data_entries(request):
"""Get data entries"""
category = request.args.get('category')
limit = int(request.args.get('limit', 10))
# Get from Firestore
query = db.collection('entries')
if category:
query = query.where('category', '==', category)
docs = query.order_by('created_at', direction=firestore.Query.DESCENDING).limit(limit).stream()
entries = []
for doc in docs:
entry = doc.to_dict()
entry['id'] = doc.id
entries.append(entry)
return json.dumps({
'entries': entries,
'count': len(entries),
'timestamp': datetime.utcnow().isoformat()
}, default=str), 200
@functions_framework.cloud_event
def process_storage_file(cloud_event):
"""Cloud Storage file processing"""
# Cloud Storage event data
data = cloud_event.data
bucket_name = data['bucket']
file_name = data['name']
logging.info(f'Processing file: {file_name} from bucket: {bucket_name}')
try:
# Download file
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
# Process file content (for text files)
if file_name.endswith('.txt'):
content = blob.download_as_text()
# Save analysis results to Firestore
analysis_doc = {
'file_name': file_name,
'bucket_name': bucket_name,
'file_size': blob.size,
'word_count': len(content.split()) if content else 0,
'char_count': len(content) if content else 0,
'processed_at': datetime.utcnow()
}
db.collection('file_analysis').add(analysis_doc)
logging.info(f'File analysis completed for: {file_name}')
# For image files (basic info only)
elif file_name.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')):
image_doc = {
'file_name': file_name,
'bucket_name': bucket_name,
'file_size': blob.size,
'content_type': blob.content_type,
'processed_at': datetime.utcnow()
}
db.collection('image_files').add(image_doc)
logging.info(f'Image file processed: {file_name}')
except Exception as e:
logging.error(f'Error processing file {file_name}: {str(e)}')
@functions_framework.http
def bigquery_analytics(request):
"""BigQuery analytics API"""
if request.method == 'OPTIONS':
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type',
}
return ('', 204, headers)
try:
# Category-wise entry count analysis
query = """
SELECT
category,
COUNT(*) as entry_count,
AVG(char_count) as avg_char_count,
DATE(created_at) as date
FROM `your-project.analytics.entries`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY category, DATE(created_at)
ORDER BY date DESC, entry_count DESC
"""
query_job = bq_client.query(query)
results = query_job.result()
analytics_data = []
for row in results:
analytics_data.append({
'category': row.category,
'entry_count': row.entry_count,
'avg_char_count': float(row.avg_char_count) if row.avg_char_count else 0,
'date': row.date.isoformat()
})
return json.dumps({
'analytics': analytics_data,
'generated_at': datetime.utcnow().isoformat()
}), 200
except Exception as e:
logging.error(f'Error in bigquery_analytics: {str(e)}')
return json.dumps({'error': 'Analytics processing failed'}), 500
Authentication and Security
# main.py - IAM and Authentication Integration
import functions_framework
from google.auth.transport import requests
from google.oauth2 import id_token
from google.cloud import secretmanager
import jwt
import json
import logging
from datetime import datetime, timedelta
import os
# Secret Manager client
secret_client = secretmanager.SecretManagerServiceClient()
def get_secret(secret_name):
"""Get secret from Secret Manager"""
project_id = os.environ.get('GCP_PROJECT')
name = f"projects/{project_id}/secrets/{secret_name}/versions/latest"
response = secret_client.access_secret_version(request={"name": name})
return response.payload.data.decode("UTF-8")
@functions_framework.http
def protected_api(request):
"""API requiring authentication"""
# CORS setup
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization',
}
if request.method == 'OPTIONS':
return ('', 204, headers)
try:
# Token verification
user_info = verify_token(request)
if not user_info:
return json.dumps({'error': 'Unauthorized'}), 401, headers
# API processing
if request.method == 'GET':
return handle_protected_get(user_info)
elif request.method == 'POST':
return handle_protected_post(request, user_info)
else:
return json.dumps({'error': 'Method not allowed'}), 405, headers
except Exception as e:
logging.error(f'Error in protected_api: {str(e)}')
return json.dumps({'error': 'Internal server error'}), 500, headers
def verify_token(request):
"""JWT token verification"""
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return None
token = auth_header.split('Bearer ')[1]
try:
# Google ID token verification
idinfo = id_token.verify_oauth2_token(
token, requests.Request(), os.environ.get('GOOGLE_CLIENT_ID')
)
# Check token issuer
if idinfo['iss'] not in ['accounts.google.com', 'https://accounts.google.com']:
raise ValueError('Invalid issuer')
return idinfo
except ValueError as e:
logging.warning(f'Token verification failed: {str(e)}')
# Custom JWT token verification (fallback)
try:
jwt_secret = get_secret('jwt-secret')
decoded = jwt.decode(token, jwt_secret, algorithms=['HS256'])
return decoded
except Exception as jwt_error:
logging.error(f'Custom JWT verification failed: {str(jwt_error)}')
return None
def handle_protected_get(user_info):
"""Handle authenticated GET request"""
user_data = {
'user_id': user_info.get('sub'),
'email': user_info.get('email'),
'name': user_info.get('name'),
'access_time': datetime.utcnow().isoformat(),
'permissions': ['read', 'write'] # In actual implementation, check proper permissions
}
return json.dumps({
'message': 'Access granted',
'user': user_data,
'data': 'Protected data here'
}), 200
def handle_protected_post(request, user_info):
"""Handle authenticated POST request"""
request_json = request.get_json()
if not request_json:
return json.dumps({'error': 'Invalid JSON'}), 400
# Check user permissions
if not has_write_permission(user_info):
return json.dumps({'error': 'Insufficient permissions'}), 403
# Process data
result = {
'processed_by': user_info.get('email'),
'data': request_json,
'timestamp': datetime.utcnow().isoformat()
}
return json.dumps({
'message': 'Data processed successfully',
'result': result
}), 201
def has_write_permission(user_info):
"""Check write permissions"""
# In actual implementation, integrate with IAM or custom permission system
allowed_domains = ['example.com', 'mycompany.com']
user_email = user_info.get('email', '')
return any(user_email.endswith(f'@{domain}') for domain in allowed_domains)
@functions_framework.http
def login_api(request):
"""Login API (custom JWT issuance)"""
if request.method == 'OPTIONS':
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'POST, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type',
}
return ('', 204, headers)
try:
request_json = request.get_json()
email = request_json.get('email')
password = request_json.get('password')
# Authentication process (use proper authentication system in actual implementation)
if authenticate_user(email, password):
# Generate JWT token
jwt_secret = get_secret('jwt-secret')
payload = {
'sub': f'user_{hash(email) % 10000}',
'email': email,
'name': email.split('@')[0],
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(hours=24)
}
token = jwt.encode(payload, jwt_secret, algorithm='HS256')
return json.dumps({
'token': token,
'user': {
'email': email,
'name': payload['name']
},
'expires_in': 86400 # 24 hours
}), 200
else:
return json.dumps({'error': 'Invalid credentials'}), 401
except Exception as e:
logging.error(f'Error in login_api: {str(e)}')
return json.dumps({'error': 'Login failed'}), 500
def authenticate_user(email, password):
"""User authentication (sample implementation)"""
# In actual implementation, integrate with database or directory service
return email == '[email protected]' and password == 'password123'
Event-Driven Architecture
// index.js - Pub/Sub & Eventarc Integration
const { PubSub } = require('@google-cloud/pubsub');
const { CloudSchedulerClient } = require('@google-cloud/scheduler');
const { CloudTasksClient } = require('@google-cloud/tasks');
const functions = require('@google-cloud/functions-framework');
const pubsub = new PubSub();
const scheduler = new CloudSchedulerClient();
const tasks = new CloudTasksClient();
// Pub/Sub trigger function
functions.cloudEvent('processPubSubMessage', async (cloudEvent) => {
console.log('Processing Pub/Sub message:', cloudEvent);
try {
// Get message data
const messageData = cloudEvent.data;
const message = Buffer.from(messageData.message.data, 'base64').toString();
const attributes = messageData.message.attributes || {};
console.log('Message content:', message);
console.log('Message attributes:', attributes);
// Process based on message type
const messageType = attributes.type || 'default';
switch (messageType) {
case 'user_registration':
await handleUserRegistration(JSON.parse(message));
break;
case 'order_processing':
await handleOrderProcessing(JSON.parse(message));
break;
case 'notification':
await handleNotification(JSON.parse(message));
break;
default:
console.log('Unknown message type:', messageType);
}
console.log('Message processed successfully');
} catch (error) {
console.error('Error processing Pub/Sub message:', error);
// Send to Dead Letter Queue (error handling)
const dlqTopic = 'error-dlq';
const errorMessage = {
originalMessage: cloudEvent.data,
error: error.message,
timestamp: new Date().toISOString()
};
await pubsub.topic(dlqTopic).publishMessage({
data: Buffer.from(JSON.stringify(errorMessage))
});
throw error; // Re-throw for retry
}
});
async function handleUserRegistration(userData) {
console.log('Processing user registration:', userData);
// User processing logic
const tasks = [
sendWelcomeEmail(userData.email),
createUserProfile(userData),
setupDefaultPreferences(userData.userId)
];
await Promise.all(tasks);
// Publish message for subsequent processing
await pubsub.topic('user-onboarding').publishMessage({
data: Buffer.from(JSON.stringify({
userId: userData.userId,
step: 'registration_complete',
timestamp: new Date().toISOString()
})),
attributes: {
type: 'onboarding_step'
}
});
}
async function handleOrderProcessing(orderData) {
console.log('Processing order:', orderData);
// Inventory check
const inventoryCheck = await checkInventory(orderData.items);
if (!inventoryCheck.available) {
throw new Error('Insufficient inventory');
}
// Payment processing
const paymentResult = await processPayment(orderData.payment);
if (!paymentResult.success) {
throw new Error('Payment failed');
}
// Shipping preparation
await prepareShipment(orderData);
// Completion notification
await pubsub.topic('order-updates').publishMessage({
data: Buffer.from(JSON.stringify({
orderId: orderData.orderId,
status: 'processed',
timestamp: new Date().toISOString()
})),
attributes: {
type: 'order_status_update'
}
});
}
// Cloud Storage trigger function
functions.cloudEvent('processStorageEvent', async (cloudEvent) => {
console.log('Processing Storage event:', cloudEvent);
const data = cloudEvent.data;
const bucketName = data.bucket;
const fileName = data.name;
const eventType = cloudEvent.type;
console.log(`File ${fileName} in bucket ${bucketName}: ${eventType}`);
try {
if (eventType === 'google.cloud.storage.object.v1.finalized') {
// File upload processing
await handleFileUpload(bucketName, fileName);
} else if (eventType === 'google.cloud.storage.object.v1.deleted') {
// File deletion processing
await handleFileDelete(bucketName, fileName);
}
} catch (error) {
console.error('Error processing storage event:', error);
throw error;
}
});
async function handleFileUpload(bucketName, fileName) {
console.log(`Processing uploaded file: ${fileName}`);
// Processing based on file type
if (fileName.endsWith('.csv')) {
// Add CSV processing task to queue
await scheduleCSVProcessing(bucketName, fileName);
} else if (fileName.match(/\.(jpg|jpeg|png)$/i)) {
// Add image processing task to queue
await scheduleImageProcessing(bucketName, fileName);
}
// File processing notification
await pubsub.topic('file-processing').publishMessage({
data: Buffer.from(JSON.stringify({
bucket: bucketName,
file: fileName,
action: 'uploaded',
timestamp: new Date().toISOString()
})),
attributes: {
type: 'file_event'
}
});
}
// Delayed processing using Cloud Tasks
async function scheduleCSVProcessing(bucketName, fileName) {
const projectId = process.env.GCP_PROJECT;
const queueName = 'csv-processing-queue';
const location = 'us-central1';
const url = `https://us-central1-${projectId}.cloudfunctions.net/processCSVFile`;
const parent = tasks.queuePath(projectId, location, queueName);
const task = {
httpRequest: {
httpMethod: 'POST',
url: url,
headers: {
'Content-Type': 'application/json'
},
body: Buffer.from(JSON.stringify({
bucket: bucketName,
file: fileName
}))
},
scheduleTime: {
seconds: Math.floor(Date.now() / 1000) + 60 // Execute after 1 minute
}
};
const [response] = await tasks.createTask({ parent, task });
console.log(`Created task: ${response.name}`);
}
// Workflow management function
functions.http('workflowManager', async (req, res) => {
console.log('Workflow manager called');
try {
const { action, data } = req.body;
switch (action) {
case 'start_workflow':
await startWorkflow(data);
res.status(200).json({ message: 'Workflow started', workflowId: data.workflowId });
break;
case 'pause_workflow':
await pauseWorkflow(data.workflowId);
res.status(200).json({ message: 'Workflow paused' });
break;
case 'resume_workflow':
await resumeWorkflow(data.workflowId);
res.status(200).json({ message: 'Workflow resumed' });
break;
default:
res.status(400).json({ error: 'Unknown action' });
}
} catch (error) {
console.error('Workflow manager error:', error);
res.status(500).json({ error: 'Workflow operation failed' });
}
});
async function startWorkflow(workflowData) {
// Workflow initialization
console.log('Starting workflow:', workflowData.workflowId);
// Publish each step as Pub/Sub message
const steps = workflowData.steps || [];
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
await pubsub.topic('workflow-steps').publishMessage({
data: Buffer.from(JSON.stringify({
workflowId: workflowData.workflowId,
stepIndex: i,
stepData: step,
totalSteps: steps.length
})),
attributes: {
type: 'workflow_step',
workflowId: workflowData.workflowId
}
});
}
}
// Helper functions (sample implementations)
async function sendWelcomeEmail(email) {
console.log(`Sending welcome email to: ${email}`);
// Use appropriate email service in actual implementation
}
async function createUserProfile(userData) {
console.log('Creating user profile:', userData.userId);
// Save to database in actual implementation
}
async function setupDefaultPreferences(userId) {
console.log('Setting up preferences for user:', userId);
// Initialize settings in actual implementation
}
async function checkInventory(items) {
// Inventory check implementation
return { available: true };
}
async function processPayment(paymentData) {
// Payment processing implementation
return { success: true };
}
async function prepareShipment(orderData) {
console.log('Preparing shipment for order:', orderData.orderId);
// Shipping preparation implementation
}
Monitoring and Performance Optimization
# main.py - Monitoring and Performance Optimization
import functions_framework
from google.cloud import monitoring_v3
from google.cloud import logging
from google.cloud import error_reporting
from google.cloud import trace_v1
import json
import time
import os
from datetime import datetime, timedelta
import psutil
# Initialize clients
monitoring_client = monitoring_v3.MetricServiceClient()
logging_client = logging.Client()
error_client = error_reporting.Client()
@functions_framework.http
def performance_monitor(request):
"""Performance monitoring API"""
if request.method == 'OPTIONS':
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type',
}
return ('', 204, headers)
try:
start_time = time.time()
# Collect system metrics
metrics = collect_system_metrics()
# Send custom metrics
send_custom_metrics(metrics)
# Record execution time
execution_time = time.time() - start_time
response_data = {
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'metrics': metrics,
'execution_time_ms': execution_time * 1000,
'function_info': {
'name': os.environ.get('K_SERVICE', 'unknown'),
'revision': os.environ.get('K_REVISION', 'unknown'),
'memory': os.environ.get('FUNCTION_MEMORY_MB', 'unknown'),
'timeout': os.environ.get('FUNCTION_TIMEOUT_SEC', 'unknown')
}
}
return json.dumps(response_data), 200
except Exception as e:
# Send error report
error_client.report_exception()
return json.dumps({
'status': 'error',
'error': str(e),
'timestamp': datetime.utcnow().isoformat()
}), 500
def collect_system_metrics():
"""Collect system metrics"""
try:
# Memory usage
memory_info = psutil.virtual_memory()
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
# Disk usage
disk_info = psutil.disk_usage('/')
metrics = {
'memory': {
'total': memory_info.total,
'available': memory_info.available,
'percent': memory_info.percent,
'used': memory_info.used
},
'cpu': {
'percent': cpu_percent,
'count': psutil.cpu_count()
},
'disk': {
'total': disk_info.total,
'used': disk_info.used,
'free': disk_info.free,
'percent': (disk_info.used / disk_info.total) * 100
}
}
return metrics
except Exception as e:
print(f'Error collecting system metrics: {str(e)}')
return {}
def send_custom_metrics(metrics):
"""Send custom metrics to Cloud Monitoring"""
project_id = os.environ.get('GCP_PROJECT')
if not project_id or not metrics:
return
try:
project_name = f"projects/{project_id}"
# Create metric time series
series = monitoring_v3.TimeSeries()
series.metric.type = "custom.googleapis.com/function/memory_usage"
series.resource.type = "cloud_function"
series.resource.labels["function_name"] = os.environ.get('K_SERVICE', 'unknown')
series.resource.labels["region"] = os.environ.get('FUNCTION_REGION', 'unknown')
# Add data point
now = time.time()
seconds = int(now)
nanos = int((now - seconds) * 10 ** 9)
interval = monitoring_v3.TimeInterval({
"end_time": {"seconds": seconds, "nanos": nanos}
})
point = monitoring_v3.Point({
"interval": interval,
"value": {"double_value": metrics.get('memory', {}).get('percent', 0)}
})
series.points = [point]
# Send metrics
monitoring_client.create_time_series(
name=project_name,
time_series=[series]
)
except Exception as e:
print(f'Error sending custom metrics: {str(e)}')
@functions_framework.http
def error_tracking(request):
"""Error tracking system"""
try:
# Intentionally trigger error for testing
if request.args.get('test_error') == 'true':
raise ValueError('This is a test error for demonstration')
# Normal response
return json.dumps({
'message': 'Error tracking system is working',
'timestamp': datetime.utcnow().isoformat()
}), 200
except Exception as e:
# Send error report
error_client.report_exception()
# Record structured log
log_entry = {
'severity': 'ERROR',
'message': str(e),
'function_name': os.environ.get('K_SERVICE', 'unknown'),
'timestamp': datetime.utcnow().isoformat(),
'trace': request.headers.get('X-Cloud-Trace-Context', ''),
'user_agent': request.headers.get('User-Agent', ''),
'remote_ip': request.headers.get('X-Forwarded-For', request.remote_addr)
}
logger = logging_client.logger('cloud-functions-errors')
logger.log_struct(log_entry)
return json.dumps({
'error': 'An error occurred',
'timestamp': datetime.utcnow().isoformat()
}), 500
@functions_framework.http
def performance_analytics(request):
"""Performance analytics API"""
try:
# Execute performance analysis
analysis = analyze_function_performance()
return json.dumps({
'analysis': analysis,
'recommendations': generate_recommendations(analysis),
'timestamp': datetime.utcnow().isoformat()
}), 200
except Exception as e:
return json.dumps({
'error': 'Performance analysis failed',
'details': str(e)
}), 500
def analyze_function_performance():
"""Analyze function performance"""
# In actual implementation, get data from Cloud Monitoring
return {
'avg_response_time': 150, # ms
'cold_start_frequency': 15, # %
'memory_utilization': 65, # %
'cpu_utilization': 45, # %
'error_rate': 2.3, # %
'invocation_count_24h': 1250
}
def generate_recommendations(analysis):
"""Generate performance improvement recommendations"""
recommendations = []
if analysis['cold_start_frequency'] > 10:
recommendations.append({
'type': 'cold_start',
'message': 'Cold starts are occurring frequently. Consider setting minimum instances.',
'action': 'Set minimum instances to 1-2'
})
if analysis['memory_utilization'] > 80:
recommendations.append({
'type': 'memory',
'message': 'Memory utilization is high. Consider increasing memory allocation.',
'action': 'Increase memory allocation'
})
if analysis['avg_response_time'] > 1000:
recommendations.append({
'type': 'performance',
'message': 'Response time is long. Consider code optimization.',
'action': 'Optimize code performance'
})
return recommendations
@functions_framework.http
def health_check(request):
"""Health check function"""
health_status = {
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'version': os.environ.get('K_REVISION', '1.0.0'),
'uptime': get_uptime(),
'dependencies': check_dependencies(),
'environment': {
'function_name': os.environ.get('K_SERVICE'),
'region': os.environ.get('FUNCTION_REGION'),
'memory': os.environ.get('FUNCTION_MEMORY_MB'),
'timeout': os.environ.get('FUNCTION_TIMEOUT_SEC')
}
}
# Check dependency health
all_healthy = all(dep['healthy'] for dep in health_status['dependencies'])
if not all_healthy:
health_status['status'] = 'degraded'
return json.dumps(health_status), 503
return json.dumps(health_status), 200
def get_uptime():
"""Get uptime (sample)"""
# Cloud Functions is stateless, so actual uptime is difficult to measure
return "N/A (Stateless)"
def check_dependencies():
"""Check health of dependent services"""
dependencies = []
# Firestore connection check
try:
from google.cloud import firestore
db = firestore.Client()
# Test with simple query
list(db.collection('health_check').limit(1).stream())
dependencies.append({'name': 'Firestore', 'healthy': True})
except Exception:
dependencies.append({'name': 'Firestore', 'healthy': False})
# Cloud Storage connection check
try:
from google.cloud import storage
client = storage.Client()
# Test with bucket listing
list(client.list_buckets(max_results=1))
dependencies.append({'name': 'Cloud Storage', 'healthy': True})
except Exception:
dependencies.append({'name': 'Cloud Storage', 'healthy': False})
return dependencies