Amazon SQS
フルマネージドメッセージキューサービス。サーバーレス、自動スケーリング、高い可用性を提供。標準キューとFIFOキューをサポート。
サーバー
Amazon SQS
概要
Amazon SQS(Simple Queue Service)は、AWSが提供するフルマネージドメッセージキューサービスです。ソフトウェアコンポーネント間でメッセージの送信、保存、受信を任意の規模で実行でき、メッセージの損失やその他のサービスの可用性を必要とせずに動作します。マイクロサービス、分散システム、サーバーレスアプリケーションの疎結合とスケーリングを簡単に実現できる、クラウドファーストの組織で急速に採用が拡大している次世代メッセージングサービスです。
詳細
Amazon SQS 2025年版は、サーバーレス時代におけるメッセージング基盤の決定版として確固たる地位を確立しています。20年以上のクラウドサービス運用実績により、99.9%の高可用性と無制限のスケーラビリティを誇り、Fortune 500企業の70%以上で採用されています。Standard QueueとFIFO Queueの2つのタイプを提供し、用途に応じた最適なメッセージ配信を実現。AWSの他サービスとのネイティブ統合により、Lambda、SNS、EventBridge等との連携でイベント駆動アーキテクチャの構築が容易です。運用管理が不要なフルマネージドサービスとして、インフラ管理コストを大幅に削減し、開発チームはビジネスロジックに集中できます。
主な特徴
- フルマネージド: サーバー管理、パッチ適用、高可用性設定が不要
- 自動スケーリング: トラフィック増減に応じた自動的な容量調整
- 高可用性: 複数のAZにわたる冗長化で99.9%の可用性
- セキュリティ: KMS暗号化、VPC Endpoint、IAMによる細かなアクセス制御
- コスト効率: 従量課金制で初期費用なし、使った分だけの支払い
- AWSネイティブ統合: Lambda、SNS、EventBridge等との完全統合
メリット・デメリット
メリット
- AWS環境での圧倒的な統合性と運用管理の完全自動化
- 無制限のスケーラビリティと99.9%の高可用性保証
- 従量課金制によるコスト効率性と初期投資不要
- KMS暗号化とVPC Endpointによる企業級セキュリティ
- AWS Lambda、SNS、EventBridge等との完全統合
- デッドレターキューとリドライブ機能による高い信頼性
デメリット
- AWS環境への強いロックインとベンダー依存
- Standard Queueでのメッセージ順序保証の制限
- FIFO Queueでの3000TPS(バッチなし)のスループット制限
- プッシュ型配信の未サポート(ポーリングベース)
- オンプレミス環境では利用不可
- 他クラウドプロバイダとの統合における制約
参考ページ
書き方の例
セットアップと基本設定
# AWS CLIでのキュー作成(Standard Queue)
aws sqs create-queue --queue-name my-standard-queue
# FIFO Queueの作成
aws sqs create-queue \
--queue-name my-fifo-queue.fifo \
--attributes FifoQueue=true,ContentBasedDeduplication=true
# キューの一覧表示
aws sqs list-queues
# キューのURL取得
aws sqs get-queue-url --queue-name my-standard-queue
# キューの属性確認
aws sqs get-queue-attributes \
--queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-standard-queue \
--attribute-names All
メッセージ送信・受信の基本操作
import boto3
import json
from datetime import datetime
# SQSクライアントの初期化
sqs = boto3.client('sqs', region_name='ap-northeast-1')
queue_url = 'https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-standard-queue'
# 基本的なメッセージ送信
def send_message(message_body, delay_seconds=0):
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=message_body,
DelaySeconds=delay_seconds,
MessageAttributes={
'timestamp': {
'StringValue': datetime.now().isoformat(),
'DataType': 'String'
},
'source': {
'StringValue': 'python-app',
'DataType': 'String'
}
}
)
print(f"メッセージ送信完了: MessageId={response['MessageId']}")
return response
# JSONデータの送信
order_data = {
'orderId': 'ORD-12345',
'customerId': 'CUST-98765',
'items': [
{'productId': 'PROD-001', 'quantity': 2, 'price': 1500},
{'productId': 'PROD-002', 'quantity': 1, 'price': 2500}
],
'totalAmount': 5500,
'timestamp': datetime.now().isoformat()
}
send_message(json.dumps(order_data, ensure_ascii=False))
# バッチメッセージ送信(最大10件まで)
def send_batch_messages(messages):
entries = []
for i, message in enumerate(messages):
entries.append({
'Id': str(i),
'MessageBody': json.dumps(message, ensure_ascii=False),
'MessageAttributes': {
'batchIndex': {
'StringValue': str(i),
'DataType': 'Number'
}
}
})
response = sqs.send_message_batch(
QueueUrl=queue_url,
Entries=entries
)
print(f"バッチ送信完了: {len(response['Successful'])}件成功")
if response.get('Failed'):
print(f"送信失敗: {len(response['Failed'])}件")
return response
# 複数メッセージの一括送信
batch_messages = [
{'type': 'user_signup', 'userId': f'user-{i}', 'email': f'user{i}@example.com'}
for i in range(5)
]
send_batch_messages(batch_messages)
# メッセージ受信
def receive_messages(max_messages=10, wait_time=20):
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=max_messages,
WaitTimeSeconds=wait_time, # ロングポーリング
MessageAttributeNames=['All'],
AttributeNames=['All']
)
messages = response.get('Messages', [])
print(f"{len(messages)}件のメッセージを受信")
for message in messages:
# メッセージ処理
print(f"MessageId: {message['MessageId']}")
print(f"Body: {message['Body']}")
print(f"Attributes: {message.get('MessageAttributes', {})}")
# メッセージ削除(処理完了後)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
print(f"メッセージ削除完了: {message['MessageId']}")
return messages
# メッセージ受信とバッチ削除
def receive_and_batch_delete():
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20
)
messages = response.get('Messages', [])
if not messages:
print("受信するメッセージがありません")
return
# バッチ削除のエントリ作成
delete_entries = []
for i, message in enumerate(messages):
# メッセージ処理ロジック
try:
body = json.loads(message['Body'])
print(f"処理中: {body}")
# 処理成功時のみ削除エントリに追加
delete_entries.append({
'Id': str(i),
'ReceiptHandle': message['ReceiptHandle']
})
except json.JSONDecodeError:
print(f"JSONパースエラー: {message['MessageId']}")
except Exception as e:
print(f"処理エラー: {e}")
# バッチ削除実行
if delete_entries:
delete_response = sqs.delete_message_batch(
QueueUrl=queue_url,
Entries=delete_entries
)
print(f"バッチ削除完了: {len(delete_response['Successful'])}件")
# 継続的なメッセージ処理ループ
def message_processor():
while True:
try:
messages = receive_messages(max_messages=10, wait_time=20)
if not messages:
print("メッセージなし、待機中...")
continue
except KeyboardInterrupt:
print("処理を停止します")
break
except Exception as e:
print(f"エラー発生: {e}")
time.sleep(5) # エラー時は少し待機
# 実行例
if __name__ == "__main__":
# receive_messages()
# message_processor()
pass
高度な設定とカスタマイズ
import boto3
import json
import time
from botocore.exceptions import ClientError
# 高度なSQS設定
class AdvancedSQSHandler:
def __init__(self, region_name='ap-northeast-1'):
self.sqs = boto3.client('sqs', region_name=region_name)
self.resource = boto3.resource('sqs', region_name=region_name)
def create_queue_with_dlq(self, queue_name, dlq_name=None):
"""デッドレターキューとメインキューの作成"""
if dlq_name is None:
dlq_name = f"{queue_name}-dlq"
# デッドレターキューの作成
dlq_response = self.sqs.create_queue(
QueueName=dlq_name,
Attributes={
'MessageRetentionPeriod': '1209600', # 14日
'VisibilityTimeoutSeconds': '300'
}
)
dlq_url = dlq_response['QueueUrl']
# デッドレターキューのARN取得
dlq_attributes = self.sqs.get_queue_attributes(
QueueUrl=dlq_url,
AttributeNames=['QueueArn']
)
dlq_arn = dlq_attributes['Attributes']['QueueArn']
# リドライブポリシー設定
redrive_policy = {
'deadLetterTargetArn': dlq_arn,
'maxReceiveCount': 3 # 3回失敗でDLQに移動
}
# メインキューの作成
main_response = self.sqs.create_queue(
QueueName=queue_name,
Attributes={
'VisibilityTimeoutSeconds': '300', # 5分
'MessageRetentionPeriod': '1209600', # 14日
'DelaySeconds': '0',
'ReceiveMessageWaitTimeSeconds': '20', # ロングポーリング
'RedrivePolicy': json.dumps(redrive_policy),
'KmsMasterKeyId': 'alias/aws/sqs', # SQS管理暗号化
'KmsDataKeyReusePeriodSeconds': '300'
}
)
print(f"キュー作成完了:")
print(f" メインキュー: {main_response['QueueUrl']}")
print(f" デッドレターキュー: {dlq_url}")
return main_response['QueueUrl'], dlq_url
def create_fifo_queue(self, queue_name):
"""FIFO キューの作成"""
if not queue_name.endswith('.fifo'):
queue_name += '.fifo'
response = self.sqs.create_queue(
QueueName=queue_name,
Attributes={
'FifoQueue': 'true',
'ContentBasedDeduplication': 'true',
'MessageRetentionPeriod': '1209600',
'VisibilityTimeoutSeconds': '300',
'ReceiveMessageWaitTimeSeconds': '20',
'KmsMasterKeyId': 'alias/aws/sqs'
}
)
print(f"FIFOキュー作成完了: {response['QueueUrl']}")
return response['QueueUrl']
def send_fifo_message(self, queue_url, message_body, group_id, deduplication_id=None):
"""FIFOキューへのメッセージ送信"""
params = {
'QueueUrl': queue_url,
'MessageBody': message_body,
'MessageGroupId': group_id
}
if deduplication_id:
params['MessageDeduplicationId'] = deduplication_id
response = self.sqs.send_message(**params)
print(f"FIFOメッセージ送信完了: {response['MessageId']}")
return response
def set_queue_policy(self, queue_url, policy):
"""キューポリシーの設定"""
self.sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
'Policy': json.dumps(policy)
}
)
print("キューポリシー設定完了")
def purge_queue(self, queue_url):
"""キューの完全削除(全メッセージ削除)"""
try:
self.sqs.purge_queue(QueueUrl=queue_url)
print("キューのパージ完了")
except ClientError as e:
if e.response['Error']['Code'] == 'PurgeQueueInProgress':
print("既にパージが進行中です")
else:
raise
def get_queue_metrics(self, queue_url):
"""キューのメトリクス取得"""
attributes = self.sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=[
'ApproximateNumberOfMessages',
'ApproximateNumberOfMessagesNotVisible',
'ApproximateNumberOfMessagesDelayed',
'CreatedTimestamp',
'LastModifiedTimestamp'
]
)
metrics = attributes['Attributes']
print("キューメトリクス:")
print(f" 待機中メッセージ: {metrics.get('ApproximateNumberOfMessages', 0)}")
print(f" 処理中メッセージ: {metrics.get('ApproximateNumberOfMessagesNotVisible', 0)}")
print(f" 遅延中メッセージ: {metrics.get('ApproximateNumberOfMessagesDelayed', 0)}")
return metrics
# 使用例
handler = AdvancedSQSHandler()
# DLQ付きキューの作成
main_url, dlq_url = handler.create_queue_with_dlq('order-processing')
# FIFOキューの作成
fifo_url = handler.create_fifo_queue('transaction-log')
# FIFOメッセージ送信
handler.send_fifo_message(
fifo_url,
json.dumps({'transaction_id': 'TXN-001', 'amount': 10000}),
group_id='financial-transactions',
deduplication_id='TXN-001-20250101'
)
# メトリクス確認
handler.get_queue_metrics(main_url)
クラスタリングと監視設定
import boto3
import json
from datetime import datetime, timedelta
# CloudWatch統合とアラート設定
class SQSMonitoring:
def __init__(self, region_name='ap-northeast-1'):
self.cloudwatch = boto3.client('cloudwatch', region_name=region_name)
self.sqs = boto3.client('sqs', region_name=region_name)
self.region = region_name
def create_queue_alarms(self, queue_name, sns_topic_arn):
"""SQSキューのCloudWatchアラーム作成"""
# メッセージ数アラーム
self.cloudwatch.put_metric_alarm(
AlarmName=f'{queue_name}-MessageCount-High',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=2,
MetricName='ApproximateNumberOfVisibleMessages',
Namespace='AWS/SQS',
Period=300, # 5分
Statistic='Average',
Threshold=1000.0,
ActionsEnabled=True,
AlarmActions=[sns_topic_arn],
AlarmDescription=f'High message count in {queue_name}',
Dimensions=[
{
'Name': 'QueueName',
'Value': queue_name
}
]
)
# メッセージ年齢アラーム
self.cloudwatch.put_metric_alarm(
AlarmName=f'{queue_name}-MessageAge-High',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=1,
MetricName='ApproximateAgeOfOldestMessage',
Namespace='AWS/SQS',
Period=300,
Statistic='Maximum',
Threshold=1800.0, # 30分
ActionsEnabled=True,
AlarmActions=[sns_topic_arn],
AlarmDescription=f'Old messages in {queue_name}',
Dimensions=[
{
'Name': 'QueueName',
'Value': queue_name
}
]
)
print(f"CloudWatchアラーム作成完了: {queue_name}")
def get_queue_metrics(self, queue_name, hours=24):
"""キューメトリクスの取得"""
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=hours)
# メッセージ数の取得
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/SQS',
MetricName='ApproximateNumberOfVisibleMessages',
Dimensions=[
{
'Name': 'QueueName',
'Value': queue_name
}
],
StartTime=start_time,
EndTime=end_time,
Period=3600, # 1時間ごと
Statistics=['Average', 'Maximum']
)
print(f"過去{hours}時間のメッセージ数:")
for datapoint in sorted(response['Datapoints'], key=lambda x: x['Timestamp']):
print(f" {datapoint['Timestamp']}: 平均{datapoint['Average']:.0f}, 最大{datapoint['Maximum']:.0f}")
return response['Datapoints']
def create_custom_metric(self, queue_url, metric_name, value, unit='Count'):
"""カスタムメトリクスの送信"""
queue_name = queue_url.split('/')[-1]
self.cloudwatch.put_metric_data(
Namespace='Custom/SQS',
MetricData=[
{
'MetricName': metric_name,
'Dimensions': [
{
'Name': 'QueueName',
'Value': queue_name
}
],
'Value': value,
'Unit': unit,
'Timestamp': datetime.utcnow()
}
]
)
# Lambda統合でのメッセージ処理
class LambdaSQSIntegration:
def __init__(self):
self.lambda_client = boto3.client('lambda')
self.sqs = boto3.client('sqs')
def create_lambda_trigger(self, function_name, queue_arn):
"""LambdaにSQSトリガーを設定"""
try:
response = self.lambda_client.create_event_source_mapping(
EventSourceArn=queue_arn,
FunctionName=function_name,
BatchSize=10, # 一度に処理するメッセージ数
MaximumBatchingWindowInSeconds=5, # バッチ化の最大待機時間
ParallelizationFactor=2 # 並列実行数
)
print(f"Lambdaトリガー作成完了: {response['UUID']}")
return response
except Exception as e:
print(f"トリガー作成エラー: {e}")
def update_trigger_config(self, uuid, batch_size=None, max_batching_window=None):
"""既存トリガーの設定更新"""
update_params = {'UUID': uuid}
if batch_size:
update_params['BatchSize'] = batch_size
if max_batching_window:
update_params['MaximumBatchingWindowInSeconds'] = max_batching_window
response = self.lambda_client.update_event_source_mapping(**update_params)
print("Lambdaトリガー設定更新完了")
return response
# Auto Scaling統合
def setup_auto_scaling(queue_url, target_group_arn):
"""SQSメトリクスベースのAuto Scaling設定"""
autoscaling = boto3.client('application-autoscaling')
# スケーラブルターゲットの登録
autoscaling.register_scalable_target(
ServiceNamespace='ecs',
ResourceId=f'service/my-cluster/my-service',
ScalableDimension='ecs:service:DesiredCount',
MinCapacity=1,
MaxCapacity=10
)
# スケーリングポリシーの作成
policy_response = autoscaling.put_scaling_policy(
PolicyName='sqs-scale-out-policy',
ServiceNamespace='ecs',
ResourceId='service/my-cluster/my-service',
ScalableDimension='ecs:service:DesiredCount',
PolicyType='TargetTrackingScaling',
TargetTrackingScalingPolicyConfiguration={
'TargetValue': 100.0, # 目標値:キューあたり100メッセージ
'CustomMetricSpecification': {
'MetricName': 'ApproximateNumberOfVisibleMessages',
'Namespace': 'AWS/SQS',
'Dimensions': [
{
'Name': 'QueueName',
'Value': queue_url.split('/')[-1]
}
],
'Statistic': 'Average'
}
}
)
print("Auto Scaling設定完了")
return policy_response
# 使用例
monitoring = SQSMonitoring()
lambda_integration = LambdaSQSIntegration()
# CloudWatchアラーム設定
# monitoring.create_queue_alarms('my-queue', 'arn:aws:sns:ap-northeast-1:123456789012:alerts')
# メトリクス取得
# monitoring.get_queue_metrics('my-queue', hours=12)
# Lambdaトリガー設定
# lambda_integration.create_lambda_trigger('my-processor-function', 'arn:aws:sqs:ap-northeast-1:123456789012:my-queue')
フレームワーク統合と実用例
# Terraform設定例(HCL)
terraform_config = '''
# variables.tf
variable "queue_name" {
description = "SQS queue name"
type = string
default = "app-queue"
}
variable "environment" {
description = "Environment name"
type = string
default = "production"
}
# main.tf
resource "aws_sqs_queue" "main_queue" {
name = var.queue_name
delay_seconds = 0
max_message_size = 262144
message_retention_seconds = 1209600
receive_wait_time_seconds = 20
visibility_timeout_seconds = 300
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.dlq.arn
maxReceiveCount = 3
})
kms_master_key_id = aws_kms_key.sqs_key.arn
tags = {
Name = var.queue_name
Environment = var.environment
}
}
resource "aws_sqs_queue" "dlq" {
name = "${var.queue_name}-dlq"
message_retention_seconds = 1209600
tags = {
Name = "${var.queue_name}-dlq"
Environment = var.environment
}
}
resource "aws_kms_key" "sqs_key" {
description = "KMS key for SQS encryption"
deletion_window_in_days = 7
}
# IAM Role for Lambda
resource "aws_iam_role" "lambda_role" {
name = "${var.queue_name}-lambda-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy" "lambda_sqs_policy" {
name = "${var.queue_name}-lambda-sqs-policy"
role = aws_iam_role.lambda_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
]
Resource = aws_sqs_queue.main_queue.arn
}
]
})
}
# outputs.tf
output "queue_url" {
description = "URL of the created SQS queue"
value = aws_sqs_queue.main_queue.url
}
output "queue_arn" {
description = "ARN of the created SQS queue"
value = aws_sqs_queue.main_queue.arn
}
output "dlq_url" {
description = "URL of the dead letter queue"
value = aws_sqs_queue.dlq.url
}
'''
# Spring Boot統合例
spring_boot_config = '''
# application.yml
cloud:
aws:
sqs:
region: ap-northeast-1
endpoint: https://sqs.ap-northeast-1.amazonaws.com
credentials:
access-key: ${AWS_ACCESS_KEY_ID}
secret-key: ${AWS_SECRET_ACCESS_KEY}
app:
sqs:
queue-name: app-queue
dlq-name: app-queue-dlq
max-concurrent-messages: 10
visibility-timeout: 300
wait-time-seconds: 20
# SQSConfiguration.java
@Configuration
@EnableConfigurationProperties
public class SQSConfiguration {
@Value("${cloud.aws.sqs.region}")
private String region;
@Bean
public AmazonSQS amazonSQS() {
return AmazonSQSClientBuilder.standard()
.withRegion(region)
.withCredentials(DefaultAWSCredentialsProviderChain.getInstance())
.build();
}
@Bean
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQS amazonSQS) {
return new QueueMessagingTemplate(amazonSQS);
}
}
# MessageService.java
@Service
public class MessageService {
@Autowired
private QueueMessagingTemplate messagingTemplate;
@Value("${app.sqs.queue-name}")
private String queueName;
public void sendMessage(Object message) {
messagingTemplate.convertAndSend(queueName, message);
}
@SqsListener("${app.sqs.queue-name}")
public void receiveMessage(String message) {
try {
// メッセージ処理ロジック
processMessage(message);
} catch (Exception e) {
// エラーハンドリング
log.error("Message processing failed: {}", e.getMessage(), e);
throw e; // DLQに送信するために再スロー
}
}
private void processMessage(String message) {
// ビジネスロジック実装
log.info("Processing message: {}", message);
}
}
'''
# Node.js Express統合例
nodejs_example = '''
// package.json
{
"dependencies": {
"aws-sdk": "^2.1400.0",
"express": "^4.18.0",
"uuid": "^9.0.0"
}
}
// sqsService.js
const AWS = require('aws-sdk');
const { v4: uuidv4 } = require('uuid');
class SQSService {
constructor() {
this.sqs = new AWS.SQS({
region: process.env.AWS_REGION || 'ap-northeast-1'
});
this.queueUrl = process.env.SQS_QUEUE_URL;
}
async sendMessage(messageBody, groupId = null) {
const params = {
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify(messageBody),
MessageAttributes: {
timestamp: {
DataType: 'String',
StringValue: new Date().toISOString()
},
messageId: {
DataType: 'String',
StringValue: uuidv4()
}
}
};
if (groupId) {
params.MessageGroupId = groupId;
params.MessageDeduplicationId = uuidv4();
}
try {
const result = await this.sqs.sendMessage(params).promise();
console.log('Message sent:', result.MessageId);
return result;
} catch (error) {
console.error('Failed to send message:', error);
throw error;
}
}
async receiveMessages(maxMessages = 10) {
const params = {
QueueUrl: this.queueUrl,
MaxNumberOfMessages: maxMessages,
WaitTimeSeconds: 20,
MessageAttributeNames: ['All']
};
try {
const result = await this.sqs.receiveMessage(params).promise();
return result.Messages || [];
} catch (error) {
console.error('Failed to receive messages:', error);
throw error;
}
}
async deleteMessage(receiptHandle) {
const params = {
QueueUrl: this.queueUrl,
ReceiptHandle: receiptHandle
};
try {
await this.sqs.deleteMessage(params).promise();
console.log('Message deleted successfully');
} catch (error) {
console.error('Failed to delete message:', error);
throw error;
}
}
async startPolling(messageHandler) {
console.log('Starting SQS polling...');
while (true) {
try {
const messages = await this.receiveMessages();
for (const message of messages) {
try {
await messageHandler(JSON.parse(message.Body));
await this.deleteMessage(message.ReceiptHandle);
} catch (error) {
console.error('Message processing failed:', error);
// DLQに移動されるまで再試行
}
}
} catch (error) {
console.error('Polling error:', error);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
}
module.exports = SQSService;
// app.js
const express = require('express');
const SQSService = require('./sqsService');
const app = express();
const sqsService = new SQSService();
app.use(express.json());
// メッセージ送信エンドポイント
app.post('/send', async (req, res) => {
try {
const result = await sqsService.sendMessage(req.body);
res.json({ success: true, messageId: result.MessageId });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// メッセージ処理ロジック
const processMessage = async (messageBody) => {
console.log('Processing:', messageBody);
// ビジネスロジック実装
if (messageBody.type === 'order') {
// 注文処理
await processOrder(messageBody);
} else if (messageBody.type === 'notification') {
// 通知処理
await sendNotification(messageBody);
}
};
// ポーリング開始
sqsService.startPolling(processMessage);
app.listen(3000, () => {
console.log('Server running on port 3000');
});
'''
print("設定例:")
print("1. Terraform設定")
print("2. Spring Boot統合")
print("3. Node.js Express統合")