Amazon SQS

フルマネージドメッセージキューサービス。サーバーレス、自動スケーリング、高い可用性を提供。標準キューとFIFOキューをサポート。

メッセージブローカーフルマネージドキューイングスケーラブルサーバーレス

サーバー

Amazon SQS

概要

Amazon SQS(Simple Queue Service)は、AWSが提供するフルマネージドメッセージキューサービスです。ソフトウェアコンポーネント間でメッセージの送信、保存、受信を任意の規模で実行でき、メッセージの損失やその他のサービスの可用性を必要とせずに動作します。マイクロサービス、分散システム、サーバーレスアプリケーションの疎結合とスケーリングを簡単に実現できる、クラウドファーストの組織で急速に採用が拡大している次世代メッセージングサービスです。

詳細

Amazon SQS 2025年版は、サーバーレス時代におけるメッセージング基盤の決定版として確固たる地位を確立しています。20年以上のクラウドサービス運用実績により、99.9%の高可用性と無制限のスケーラビリティを誇り、Fortune 500企業の70%以上で採用されています。Standard QueueとFIFO Queueの2つのタイプを提供し、用途に応じた最適なメッセージ配信を実現。AWSの他サービスとのネイティブ統合により、Lambda、SNS、EventBridge等との連携でイベント駆動アーキテクチャの構築が容易です。運用管理が不要なフルマネージドサービスとして、インフラ管理コストを大幅に削減し、開発チームはビジネスロジックに集中できます。

主な特徴

  • フルマネージド: サーバー管理、パッチ適用、高可用性設定が不要
  • 自動スケーリング: トラフィック増減に応じた自動的な容量調整
  • 高可用性: 複数のAZにわたる冗長化で99.9%の可用性
  • セキュリティ: KMS暗号化、VPC Endpoint、IAMによる細かなアクセス制御
  • コスト効率: 従量課金制で初期費用なし、使った分だけの支払い
  • AWSネイティブ統合: Lambda、SNS、EventBridge等との完全統合

メリット・デメリット

メリット

  • AWS環境での圧倒的な統合性と運用管理の完全自動化
  • 無制限のスケーラビリティと99.9%の高可用性保証
  • 従量課金制によるコスト効率性と初期投資不要
  • KMS暗号化とVPC Endpointによる企業級セキュリティ
  • AWS Lambda、SNS、EventBridge等との完全統合
  • デッドレターキューとリドライブ機能による高い信頼性

デメリット

  • AWS環境への強いロックインとベンダー依存
  • Standard Queueでのメッセージ順序保証の制限
  • FIFO Queueでの3000TPS(バッチなし)のスループット制限
  • プッシュ型配信の未サポート(ポーリングベース)
  • オンプレミス環境では利用不可
  • 他クラウドプロバイダとの統合における制約

参考ページ

書き方の例

セットアップと基本設定

# AWS CLIでのキュー作成(Standard Queue)
aws sqs create-queue --queue-name my-standard-queue

# FIFO Queueの作成
aws sqs create-queue \
    --queue-name my-fifo-queue.fifo \
    --attributes FifoQueue=true,ContentBasedDeduplication=true

# キューの一覧表示
aws sqs list-queues

# キューのURL取得
aws sqs get-queue-url --queue-name my-standard-queue

# キューの属性確認
aws sqs get-queue-attributes \
    --queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-standard-queue \
    --attribute-names All

メッセージ送信・受信の基本操作

import boto3
import json
from datetime import datetime

# SQSクライアントの初期化
sqs = boto3.client('sqs', region_name='ap-northeast-1')
queue_url = 'https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-standard-queue'

# 基本的なメッセージ送信
def send_message(message_body, delay_seconds=0):
    response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=message_body,
        DelaySeconds=delay_seconds,
        MessageAttributes={
            'timestamp': {
                'StringValue': datetime.now().isoformat(),
                'DataType': 'String'
            },
            'source': {
                'StringValue': 'python-app',
                'DataType': 'String'
            }
        }
    )
    print(f"メッセージ送信完了: MessageId={response['MessageId']}")
    return response

# JSONデータの送信
order_data = {
    'orderId': 'ORD-12345',
    'customerId': 'CUST-98765',
    'items': [
        {'productId': 'PROD-001', 'quantity': 2, 'price': 1500},
        {'productId': 'PROD-002', 'quantity': 1, 'price': 2500}
    ],
    'totalAmount': 5500,
    'timestamp': datetime.now().isoformat()
}

send_message(json.dumps(order_data, ensure_ascii=False))

# バッチメッセージ送信(最大10件まで)
def send_batch_messages(messages):
    entries = []
    for i, message in enumerate(messages):
        entries.append({
            'Id': str(i),
            'MessageBody': json.dumps(message, ensure_ascii=False),
            'MessageAttributes': {
                'batchIndex': {
                    'StringValue': str(i),
                    'DataType': 'Number'
                }
            }
        })
    
    response = sqs.send_message_batch(
        QueueUrl=queue_url,
        Entries=entries
    )
    
    print(f"バッチ送信完了: {len(response['Successful'])}件成功")
    if response.get('Failed'):
        print(f"送信失敗: {len(response['Failed'])}件")
    
    return response

# 複数メッセージの一括送信
batch_messages = [
    {'type': 'user_signup', 'userId': f'user-{i}', 'email': f'user{i}@example.com'}
    for i in range(5)
]
send_batch_messages(batch_messages)

# メッセージ受信
def receive_messages(max_messages=10, wait_time=20):
    response = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=max_messages,
        WaitTimeSeconds=wait_time,  # ロングポーリング
        MessageAttributeNames=['All'],
        AttributeNames=['All']
    )
    
    messages = response.get('Messages', [])
    print(f"{len(messages)}件のメッセージを受信")
    
    for message in messages:
        # メッセージ処理
        print(f"MessageId: {message['MessageId']}")
        print(f"Body: {message['Body']}")
        print(f"Attributes: {message.get('MessageAttributes', {})}")
        
        # メッセージ削除(処理完了後)
        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=message['ReceiptHandle']
        )
        print(f"メッセージ削除完了: {message['MessageId']}")
    
    return messages

# メッセージ受信とバッチ削除
def receive_and_batch_delete():
    response = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=10,
        WaitTimeSeconds=20
    )
    
    messages = response.get('Messages', [])
    if not messages:
        print("受信するメッセージがありません")
        return
    
    # バッチ削除のエントリ作成
    delete_entries = []
    for i, message in enumerate(messages):
        # メッセージ処理ロジック
        try:
            body = json.loads(message['Body'])
            print(f"処理中: {body}")
            
            # 処理成功時のみ削除エントリに追加
            delete_entries.append({
                'Id': str(i),
                'ReceiptHandle': message['ReceiptHandle']
            })
        except json.JSONDecodeError:
            print(f"JSONパースエラー: {message['MessageId']}")
        except Exception as e:
            print(f"処理エラー: {e}")
    
    # バッチ削除実行
    if delete_entries:
        delete_response = sqs.delete_message_batch(
            QueueUrl=queue_url,
            Entries=delete_entries
        )
        print(f"バッチ削除完了: {len(delete_response['Successful'])}件")

# 継続的なメッセージ処理ループ
def message_processor():
    while True:
        try:
            messages = receive_messages(max_messages=10, wait_time=20)
            if not messages:
                print("メッセージなし、待機中...")
                continue
                
        except KeyboardInterrupt:
            print("処理を停止します")
            break
        except Exception as e:
            print(f"エラー発生: {e}")
            time.sleep(5)  # エラー時は少し待機

# 実行例
if __name__ == "__main__":
    # receive_messages()
    # message_processor()
    pass

高度な設定とカスタマイズ

import boto3
import json
import time
from botocore.exceptions import ClientError

# 高度なSQS設定
class AdvancedSQSHandler:
    def __init__(self, region_name='ap-northeast-1'):
        self.sqs = boto3.client('sqs', region_name=region_name)
        self.resource = boto3.resource('sqs', region_name=region_name)
    
    def create_queue_with_dlq(self, queue_name, dlq_name=None):
        """デッドレターキューとメインキューの作成"""
        if dlq_name is None:
            dlq_name = f"{queue_name}-dlq"
        
        # デッドレターキューの作成
        dlq_response = self.sqs.create_queue(
            QueueName=dlq_name,
            Attributes={
                'MessageRetentionPeriod': '1209600',  # 14日
                'VisibilityTimeoutSeconds': '300'
            }
        )
        dlq_url = dlq_response['QueueUrl']
        
        # デッドレターキューのARN取得
        dlq_attributes = self.sqs.get_queue_attributes(
            QueueUrl=dlq_url,
            AttributeNames=['QueueArn']
        )
        dlq_arn = dlq_attributes['Attributes']['QueueArn']
        
        # リドライブポリシー設定
        redrive_policy = {
            'deadLetterTargetArn': dlq_arn,
            'maxReceiveCount': 3  # 3回失敗でDLQに移動
        }
        
        # メインキューの作成
        main_response = self.sqs.create_queue(
            QueueName=queue_name,
            Attributes={
                'VisibilityTimeoutSeconds': '300',  # 5分
                'MessageRetentionPeriod': '1209600',  # 14日
                'DelaySeconds': '0',
                'ReceiveMessageWaitTimeSeconds': '20',  # ロングポーリング
                'RedrivePolicy': json.dumps(redrive_policy),
                'KmsMasterKeyId': 'alias/aws/sqs',  # SQS管理暗号化
                'KmsDataKeyReusePeriodSeconds': '300'
            }
        )
        
        print(f"キュー作成完了:")
        print(f"  メインキュー: {main_response['QueueUrl']}")
        print(f"  デッドレターキュー: {dlq_url}")
        
        return main_response['QueueUrl'], dlq_url
    
    def create_fifo_queue(self, queue_name):
        """FIFO キューの作成"""
        if not queue_name.endswith('.fifo'):
            queue_name += '.fifo'
        
        response = self.sqs.create_queue(
            QueueName=queue_name,
            Attributes={
                'FifoQueue': 'true',
                'ContentBasedDeduplication': 'true',
                'MessageRetentionPeriod': '1209600',
                'VisibilityTimeoutSeconds': '300',
                'ReceiveMessageWaitTimeSeconds': '20',
                'KmsMasterKeyId': 'alias/aws/sqs'
            }
        )
        
        print(f"FIFOキュー作成完了: {response['QueueUrl']}")
        return response['QueueUrl']
    
    def send_fifo_message(self, queue_url, message_body, group_id, deduplication_id=None):
        """FIFOキューへのメッセージ送信"""
        params = {
            'QueueUrl': queue_url,
            'MessageBody': message_body,
            'MessageGroupId': group_id
        }
        
        if deduplication_id:
            params['MessageDeduplicationId'] = deduplication_id
        
        response = self.sqs.send_message(**params)
        print(f"FIFOメッセージ送信完了: {response['MessageId']}")
        return response
    
    def set_queue_policy(self, queue_url, policy):
        """キューポリシーの設定"""
        self.sqs.set_queue_attributes(
            QueueUrl=queue_url,
            Attributes={
                'Policy': json.dumps(policy)
            }
        )
        print("キューポリシー設定完了")
    
    def purge_queue(self, queue_url):
        """キューの完全削除(全メッセージ削除)"""
        try:
            self.sqs.purge_queue(QueueUrl=queue_url)
            print("キューのパージ完了")
        except ClientError as e:
            if e.response['Error']['Code'] == 'PurgeQueueInProgress':
                print("既にパージが進行中です")
            else:
                raise
    
    def get_queue_metrics(self, queue_url):
        """キューのメトリクス取得"""
        attributes = self.sqs.get_queue_attributes(
            QueueUrl=queue_url,
            AttributeNames=[
                'ApproximateNumberOfMessages',
                'ApproximateNumberOfMessagesNotVisible',
                'ApproximateNumberOfMessagesDelayed',
                'CreatedTimestamp',
                'LastModifiedTimestamp'
            ]
        )
        
        metrics = attributes['Attributes']
        print("キューメトリクス:")
        print(f"  待機中メッセージ: {metrics.get('ApproximateNumberOfMessages', 0)}")
        print(f"  処理中メッセージ: {metrics.get('ApproximateNumberOfMessagesNotVisible', 0)}")
        print(f"  遅延中メッセージ: {metrics.get('ApproximateNumberOfMessagesDelayed', 0)}")
        
        return metrics

# 使用例
handler = AdvancedSQSHandler()

# DLQ付きキューの作成
main_url, dlq_url = handler.create_queue_with_dlq('order-processing')

# FIFOキューの作成
fifo_url = handler.create_fifo_queue('transaction-log')

# FIFOメッセージ送信
handler.send_fifo_message(
    fifo_url,
    json.dumps({'transaction_id': 'TXN-001', 'amount': 10000}),
    group_id='financial-transactions',
    deduplication_id='TXN-001-20250101'
)

# メトリクス確認
handler.get_queue_metrics(main_url)

クラスタリングと監視設定

import boto3
import json
from datetime import datetime, timedelta

# CloudWatch統合とアラート設定
class SQSMonitoring:
    def __init__(self, region_name='ap-northeast-1'):
        self.cloudwatch = boto3.client('cloudwatch', region_name=region_name)
        self.sqs = boto3.client('sqs', region_name=region_name)
        self.region = region_name
    
    def create_queue_alarms(self, queue_name, sns_topic_arn):
        """SQSキューのCloudWatchアラーム作成"""
        
        # メッセージ数アラーム
        self.cloudwatch.put_metric_alarm(
            AlarmName=f'{queue_name}-MessageCount-High',
            ComparisonOperator='GreaterThanThreshold',
            EvaluationPeriods=2,
            MetricName='ApproximateNumberOfVisibleMessages',
            Namespace='AWS/SQS',
            Period=300,  # 5分
            Statistic='Average',
            Threshold=1000.0,
            ActionsEnabled=True,
            AlarmActions=[sns_topic_arn],
            AlarmDescription=f'High message count in {queue_name}',
            Dimensions=[
                {
                    'Name': 'QueueName',
                    'Value': queue_name
                }
            ]
        )
        
        # メッセージ年齢アラーム
        self.cloudwatch.put_metric_alarm(
            AlarmName=f'{queue_name}-MessageAge-High',
            ComparisonOperator='GreaterThanThreshold',
            EvaluationPeriods=1,
            MetricName='ApproximateAgeOfOldestMessage',
            Namespace='AWS/SQS',
            Period=300,
            Statistic='Maximum',
            Threshold=1800.0,  # 30分
            ActionsEnabled=True,
            AlarmActions=[sns_topic_arn],
            AlarmDescription=f'Old messages in {queue_name}',
            Dimensions=[
                {
                    'Name': 'QueueName',
                    'Value': queue_name
                }
            ]
        )
        
        print(f"CloudWatchアラーム作成完了: {queue_name}")
    
    def get_queue_metrics(self, queue_name, hours=24):
        """キューメトリクスの取得"""
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(hours=hours)
        
        # メッセージ数の取得
        response = self.cloudwatch.get_metric_statistics(
            Namespace='AWS/SQS',
            MetricName='ApproximateNumberOfVisibleMessages',
            Dimensions=[
                {
                    'Name': 'QueueName',
                    'Value': queue_name
                }
            ],
            StartTime=start_time,
            EndTime=end_time,
            Period=3600,  # 1時間ごと
            Statistics=['Average', 'Maximum']
        )
        
        print(f"過去{hours}時間のメッセージ数:")
        for datapoint in sorted(response['Datapoints'], key=lambda x: x['Timestamp']):
            print(f"  {datapoint['Timestamp']}: 平均{datapoint['Average']:.0f}, 最大{datapoint['Maximum']:.0f}")
        
        return response['Datapoints']
    
    def create_custom_metric(self, queue_url, metric_name, value, unit='Count'):
        """カスタムメトリクスの送信"""
        queue_name = queue_url.split('/')[-1]
        
        self.cloudwatch.put_metric_data(
            Namespace='Custom/SQS',
            MetricData=[
                {
                    'MetricName': metric_name,
                    'Dimensions': [
                        {
                            'Name': 'QueueName',
                            'Value': queue_name
                        }
                    ],
                    'Value': value,
                    'Unit': unit,
                    'Timestamp': datetime.utcnow()
                }
            ]
        )

# Lambda統合でのメッセージ処理
class LambdaSQSIntegration:
    def __init__(self):
        self.lambda_client = boto3.client('lambda')
        self.sqs = boto3.client('sqs')
    
    def create_lambda_trigger(self, function_name, queue_arn):
        """LambdaにSQSトリガーを設定"""
        try:
            response = self.lambda_client.create_event_source_mapping(
                EventSourceArn=queue_arn,
                FunctionName=function_name,
                BatchSize=10,  # 一度に処理するメッセージ数
                MaximumBatchingWindowInSeconds=5,  # バッチ化の最大待機時間
                ParallelizationFactor=2  # 並列実行数
            )
            print(f"Lambdaトリガー作成完了: {response['UUID']}")
            return response
        except Exception as e:
            print(f"トリガー作成エラー: {e}")
    
    def update_trigger_config(self, uuid, batch_size=None, max_batching_window=None):
        """既存トリガーの設定更新"""
        update_params = {'UUID': uuid}
        
        if batch_size:
            update_params['BatchSize'] = batch_size
        if max_batching_window:
            update_params['MaximumBatchingWindowInSeconds'] = max_batching_window
        
        response = self.lambda_client.update_event_source_mapping(**update_params)
        print("Lambdaトリガー設定更新完了")
        return response

# Auto Scaling統合
def setup_auto_scaling(queue_url, target_group_arn):
    """SQSメトリクスベースのAuto Scaling設定"""
    autoscaling = boto3.client('application-autoscaling')
    
    # スケーラブルターゲットの登録
    autoscaling.register_scalable_target(
        ServiceNamespace='ecs',
        ResourceId=f'service/my-cluster/my-service',
        ScalableDimension='ecs:service:DesiredCount',
        MinCapacity=1,
        MaxCapacity=10
    )
    
    # スケーリングポリシーの作成
    policy_response = autoscaling.put_scaling_policy(
        PolicyName='sqs-scale-out-policy',
        ServiceNamespace='ecs',
        ResourceId='service/my-cluster/my-service',
        ScalableDimension='ecs:service:DesiredCount',
        PolicyType='TargetTrackingScaling',
        TargetTrackingScalingPolicyConfiguration={
            'TargetValue': 100.0,  # 目標値:キューあたり100メッセージ
            'CustomMetricSpecification': {
                'MetricName': 'ApproximateNumberOfVisibleMessages',
                'Namespace': 'AWS/SQS',
                'Dimensions': [
                    {
                        'Name': 'QueueName',
                        'Value': queue_url.split('/')[-1]
                    }
                ],
                'Statistic': 'Average'
            }
        }
    )
    
    print("Auto Scaling設定完了")
    return policy_response

# 使用例
monitoring = SQSMonitoring()
lambda_integration = LambdaSQSIntegration()

# CloudWatchアラーム設定
# monitoring.create_queue_alarms('my-queue', 'arn:aws:sns:ap-northeast-1:123456789012:alerts')

# メトリクス取得
# monitoring.get_queue_metrics('my-queue', hours=12)

# Lambdaトリガー設定
# lambda_integration.create_lambda_trigger('my-processor-function', 'arn:aws:sqs:ap-northeast-1:123456789012:my-queue')

フレームワーク統合と実用例

# Terraform設定例(HCL)
terraform_config = '''
# variables.tf
variable "queue_name" {
  description = "SQS queue name"
  type        = string
  default     = "app-queue"
}

variable "environment" {
  description = "Environment name"
  type        = string
  default     = "production"
}

# main.tf
resource "aws_sqs_queue" "main_queue" {
  name                       = var.queue_name
  delay_seconds              = 0
  max_message_size          = 262144
  message_retention_seconds = 1209600
  receive_wait_time_seconds = 20
  visibility_timeout_seconds = 300

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.dlq.arn
    maxReceiveCount     = 3
  })

  kms_master_key_id = aws_kms_key.sqs_key.arn

  tags = {
    Name        = var.queue_name
    Environment = var.environment
  }
}

resource "aws_sqs_queue" "dlq" {
  name                       = "${var.queue_name}-dlq"
  message_retention_seconds = 1209600
  
  tags = {
    Name        = "${var.queue_name}-dlq"
    Environment = var.environment
  }
}

resource "aws_kms_key" "sqs_key" {
  description             = "KMS key for SQS encryption"
  deletion_window_in_days = 7
}

# IAM Role for Lambda
resource "aws_iam_role" "lambda_role" {
  name = "${var.queue_name}-lambda-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy" "lambda_sqs_policy" {
  name = "${var.queue_name}-lambda-sqs-policy"
  role = aws_iam_role.lambda_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes"
        ]
        Resource = aws_sqs_queue.main_queue.arn
      }
    ]
  })
}

# outputs.tf
output "queue_url" {
  description = "URL of the created SQS queue"
  value       = aws_sqs_queue.main_queue.url
}

output "queue_arn" {
  description = "ARN of the created SQS queue"
  value       = aws_sqs_queue.main_queue.arn
}

output "dlq_url" {
  description = "URL of the dead letter queue"
  value       = aws_sqs_queue.dlq.url
}
'''

# Spring Boot統合例
spring_boot_config = '''
# application.yml
cloud:
  aws:
    sqs:
      region: ap-northeast-1
      endpoint: https://sqs.ap-northeast-1.amazonaws.com
    credentials:
      access-key: ${AWS_ACCESS_KEY_ID}
      secret-key: ${AWS_SECRET_ACCESS_KEY}

app:
  sqs:
    queue-name: app-queue
    dlq-name: app-queue-dlq
    max-concurrent-messages: 10
    visibility-timeout: 300
    wait-time-seconds: 20

# SQSConfiguration.java
@Configuration
@EnableConfigurationProperties
public class SQSConfiguration {
    
    @Value("${cloud.aws.sqs.region}")
    private String region;
    
    @Bean
    public AmazonSQS amazonSQS() {
        return AmazonSQSClientBuilder.standard()
                .withRegion(region)
                .withCredentials(DefaultAWSCredentialsProviderChain.getInstance())
                .build();
    }
    
    @Bean
    public QueueMessagingTemplate queueMessagingTemplate(AmazonSQS amazonSQS) {
        return new QueueMessagingTemplate(amazonSQS);
    }
}

# MessageService.java
@Service
public class MessageService {
    
    @Autowired
    private QueueMessagingTemplate messagingTemplate;
    
    @Value("${app.sqs.queue-name}")
    private String queueName;
    
    public void sendMessage(Object message) {
        messagingTemplate.convertAndSend(queueName, message);
    }
    
    @SqsListener("${app.sqs.queue-name}")
    public void receiveMessage(String message) {
        try {
            // メッセージ処理ロジック
            processMessage(message);
        } catch (Exception e) {
            // エラーハンドリング
            log.error("Message processing failed: {}", e.getMessage(), e);
            throw e; // DLQに送信するために再スロー
        }
    }
    
    private void processMessage(String message) {
        // ビジネスロジック実装
        log.info("Processing message: {}", message);
    }
}
'''

# Node.js Express統合例
nodejs_example = '''
// package.json
{
  "dependencies": {
    "aws-sdk": "^2.1400.0",
    "express": "^4.18.0",
    "uuid": "^9.0.0"
  }
}

// sqsService.js
const AWS = require('aws-sdk');
const { v4: uuidv4 } = require('uuid');

class SQSService {
  constructor() {
    this.sqs = new AWS.SQS({
      region: process.env.AWS_REGION || 'ap-northeast-1'
    });
    this.queueUrl = process.env.SQS_QUEUE_URL;
  }

  async sendMessage(messageBody, groupId = null) {
    const params = {
      QueueUrl: this.queueUrl,
      MessageBody: JSON.stringify(messageBody),
      MessageAttributes: {
        timestamp: {
          DataType: 'String',
          StringValue: new Date().toISOString()
        },
        messageId: {
          DataType: 'String',
          StringValue: uuidv4()
        }
      }
    };

    if (groupId) {
      params.MessageGroupId = groupId;
      params.MessageDeduplicationId = uuidv4();
    }

    try {
      const result = await this.sqs.sendMessage(params).promise();
      console.log('Message sent:', result.MessageId);
      return result;
    } catch (error) {
      console.error('Failed to send message:', error);
      throw error;
    }
  }

  async receiveMessages(maxMessages = 10) {
    const params = {
      QueueUrl: this.queueUrl,
      MaxNumberOfMessages: maxMessages,
      WaitTimeSeconds: 20,
      MessageAttributeNames: ['All']
    };

    try {
      const result = await this.sqs.receiveMessage(params).promise();
      return result.Messages || [];
    } catch (error) {
      console.error('Failed to receive messages:', error);
      throw error;
    }
  }

  async deleteMessage(receiptHandle) {
    const params = {
      QueueUrl: this.queueUrl,
      ReceiptHandle: receiptHandle
    };

    try {
      await this.sqs.deleteMessage(params).promise();
      console.log('Message deleted successfully');
    } catch (error) {
      console.error('Failed to delete message:', error);
      throw error;
    }
  }

  async startPolling(messageHandler) {
    console.log('Starting SQS polling...');
    
    while (true) {
      try {
        const messages = await this.receiveMessages();
        
        for (const message of messages) {
          try {
            await messageHandler(JSON.parse(message.Body));
            await this.deleteMessage(message.ReceiptHandle);
          } catch (error) {
            console.error('Message processing failed:', error);
            // DLQに移動されるまで再試行
          }
        }
      } catch (error) {
        console.error('Polling error:', error);
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    }
  }
}

module.exports = SQSService;

// app.js
const express = require('express');
const SQSService = require('./sqsService');

const app = express();
const sqsService = new SQSService();

app.use(express.json());

// メッセージ送信エンドポイント
app.post('/send', async (req, res) => {
  try {
    const result = await sqsService.sendMessage(req.body);
    res.json({ success: true, messageId: result.MessageId });
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

// メッセージ処理ロジック
const processMessage = async (messageBody) => {
  console.log('Processing:', messageBody);
  
  // ビジネスロジック実装
  if (messageBody.type === 'order') {
    // 注文処理
    await processOrder(messageBody);
  } else if (messageBody.type === 'notification') {
    // 通知処理
    await sendNotification(messageBody);
  }
};

// ポーリング開始
sqsService.startPolling(processMessage);

app.listen(3000, () => {
  console.log('Server running on port 3000');
});
'''

print("設定例:")
print("1. Terraform設定")
print("2. Spring Boot統合")
print("3. Node.js Express統合")