Amazon SQS
Fully managed message queue service. Provides serverless, auto-scaling, and high availability. Supports standard and FIFO queues.
Server
Amazon SQS
Overview
Amazon SQS (Simple Queue Service) is a fully managed message queuing service provided by AWS. It enables sending, storing, and receiving messages between software components at any scale without losing messages or requiring other services to be available. As a next-generation messaging service, it easily achieves decoupling and scaling of microservices, distributed systems, and serverless applications, experiencing rapid adoption expansion in cloud-first organizations.
Details
Amazon SQS 2025 edition has established a solid position as the definitive messaging infrastructure for the serverless era. With over 20 years of cloud service operation experience, it boasts 99.9% high availability and unlimited scalability, adopted by over 70% of Fortune 500 companies. It offers two types: Standard Queue and FIFO Queue, achieving optimal message delivery according to use cases. Through native integration with other AWS services, it easily builds event-driven architectures through collaboration with Lambda, SNS, EventBridge, etc. As a fully managed service requiring no operational management, it significantly reduces infrastructure management costs, allowing development teams to focus on business logic.
Key Features
- Fully Managed: No server management, patching, or high availability configuration required
- Auto Scaling: Automatic capacity adjustment according to traffic increases and decreases
- High Availability: 99.9% availability with redundancy across multiple AZs
- Security: Fine-grained access control through KMS encryption, VPC Endpoint, and IAM
- Cost Efficiency: Pay-as-you-use pricing with no upfront costs
- AWS Native Integration: Complete integration with Lambda, SNS, EventBridge, etc.
Pros and Cons
Pros
- Overwhelming integration and complete automation of operational management in AWS environments
- Unlimited scalability and 99.9% high availability guarantee
- Cost efficiency through pay-as-you-use pricing and no initial investment required
- Enterprise-grade security through KMS encryption and VPC Endpoint
- Complete integration with AWS Lambda, SNS, EventBridge, etc.
- High reliability through dead letter queue and redrive functionality
Cons
- Strong lock-in to AWS environment and vendor dependency
- Limitations in message ordering guarantee with Standard Queue
- Throughput limitation of 3000 TPS (without batching) in FIFO Queue
- No support for push-type delivery (polling-based)
- Not available in on-premises environments
- Constraints in integration with other cloud providers
Reference Pages
Code Examples
Setup and Basic Configuration
# Create queue with AWS CLI (Standard Queue)
aws sqs create-queue --queue-name my-standard-queue
# Create FIFO Queue
aws sqs create-queue \
--queue-name my-fifo-queue.fifo \
--attributes FifoQueue=true,ContentBasedDeduplication=true
# List queues
aws sqs list-queues
# Get queue URL
aws sqs get-queue-url --queue-name my-standard-queue
# Check queue attributes
aws sqs get-queue-attributes \
--queue-url https://sqs.us-east-1.amazonaws.com/123456789012/my-standard-queue \
--attribute-names All
Basic Message Sending and Receiving Operations
import boto3
import json
from datetime import datetime
# Initialize SQS client
sqs = boto3.client('sqs', region_name='us-east-1')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/my-standard-queue'
# Basic message sending
def send_message(message_body, delay_seconds=0):
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=message_body,
DelaySeconds=delay_seconds,
MessageAttributes={
'timestamp': {
'StringValue': datetime.now().isoformat(),
'DataType': 'String'
},
'source': {
'StringValue': 'python-app',
'DataType': 'String'
}
}
)
print(f"Message sent successfully: MessageId={response['MessageId']}")
return response
# Send JSON data
order_data = {
'orderId': 'ORD-12345',
'customerId': 'CUST-98765',
'items': [
{'productId': 'PROD-001', 'quantity': 2, 'price': 1500},
{'productId': 'PROD-002', 'quantity': 1, 'price': 2500}
],
'totalAmount': 5500,
'timestamp': datetime.now().isoformat()
}
send_message(json.dumps(order_data))
# Batch message sending (up to 10 messages)
def send_batch_messages(messages):
entries = []
for i, message in enumerate(messages):
entries.append({
'Id': str(i),
'MessageBody': json.dumps(message),
'MessageAttributes': {
'batchIndex': {
'StringValue': str(i),
'DataType': 'Number'
}
}
})
response = sqs.send_message_batch(
QueueUrl=queue_url,
Entries=entries
)
print(f"Batch sending completed: {len(response['Successful'])} successful")
if response.get('Failed'):
print(f"Send failed: {len(response['Failed'])} messages")
return response
# Send multiple messages at once
batch_messages = [
{'type': 'user_signup', 'userId': f'user-{i}', 'email': f'user{i}@example.com'}
for i in range(5)
]
send_batch_messages(batch_messages)
# Message receiving
def receive_messages(max_messages=10, wait_time=20):
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=max_messages,
WaitTimeSeconds=wait_time, # Long polling
MessageAttributeNames=['All'],
AttributeNames=['All']
)
messages = response.get('Messages', [])
print(f"Received {len(messages)} messages")
for message in messages:
# Process message
print(f"MessageId: {message['MessageId']}")
print(f"Body: {message['Body']}")
print(f"Attributes: {message.get('MessageAttributes', {})}")
# Delete message (after processing completed)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
print(f"Message deleted successfully: {message['MessageId']}")
return messages
# Message receiving and batch deletion
def receive_and_batch_delete():
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20
)
messages = response.get('Messages', [])
if not messages:
print("No messages to receive")
return
# Create batch delete entries
delete_entries = []
for i, message in enumerate(messages):
# Message processing logic
try:
body = json.loads(message['Body'])
print(f"Processing: {body}")
# Add to delete entries only on successful processing
delete_entries.append({
'Id': str(i),
'ReceiptHandle': message['ReceiptHandle']
})
except json.JSONDecodeError:
print(f"JSON parse error: {message['MessageId']}")
except Exception as e:
print(f"Processing error: {e}")
# Execute batch deletion
if delete_entries:
delete_response = sqs.delete_message_batch(
QueueUrl=queue_url,
Entries=delete_entries
)
print(f"Batch deletion completed: {len(delete_response['Successful'])} messages")
# Continuous message processing loop
def message_processor():
while True:
try:
messages = receive_messages(max_messages=10, wait_time=20)
if not messages:
print("No messages, waiting...")
continue
except KeyboardInterrupt:
print("Stopping processing")
break
except Exception as e:
print(f"Error occurred: {e}")
time.sleep(5) # Wait a bit on error
# Usage example
if __name__ == "__main__":
# receive_messages()
# message_processor()
pass
Advanced Settings and Customization
import boto3
import json
import time
from botocore.exceptions import ClientError
# Advanced SQS configuration
class AdvancedSQSHandler:
def __init__(self, region_name='us-east-1'):
self.sqs = boto3.client('sqs', region_name=region_name)
self.resource = boto3.resource('sqs', region_name=region_name)
def create_queue_with_dlq(self, queue_name, dlq_name=None):
"""Create dead letter queue and main queue"""
if dlq_name is None:
dlq_name = f"{queue_name}-dlq"
# Create dead letter queue
dlq_response = self.sqs.create_queue(
QueueName=dlq_name,
Attributes={
'MessageRetentionPeriod': '1209600', # 14 days
'VisibilityTimeoutSeconds': '300'
}
)
dlq_url = dlq_response['QueueUrl']
# Get dead letter queue ARN
dlq_attributes = self.sqs.get_queue_attributes(
QueueUrl=dlq_url,
AttributeNames=['QueueArn']
)
dlq_arn = dlq_attributes['Attributes']['QueueArn']
# Configure redrive policy
redrive_policy = {
'deadLetterTargetArn': dlq_arn,
'maxReceiveCount': 3 # Move to DLQ after 3 failures
}
# Create main queue
main_response = self.sqs.create_queue(
QueueName=queue_name,
Attributes={
'VisibilityTimeoutSeconds': '300', # 5 minutes
'MessageRetentionPeriod': '1209600', # 14 days
'DelaySeconds': '0',
'ReceiveMessageWaitTimeSeconds': '20', # Long polling
'RedrivePolicy': json.dumps(redrive_policy),
'KmsMasterKeyId': 'alias/aws/sqs', # SQS managed encryption
'KmsDataKeyReusePeriodSeconds': '300'
}
)
print(f"Queue creation completed:")
print(f" Main queue: {main_response['QueueUrl']}")
print(f" Dead letter queue: {dlq_url}")
return main_response['QueueUrl'], dlq_url
def create_fifo_queue(self, queue_name):
"""Create FIFO queue"""
if not queue_name.endswith('.fifo'):
queue_name += '.fifo'
response = self.sqs.create_queue(
QueueName=queue_name,
Attributes={
'FifoQueue': 'true',
'ContentBasedDeduplication': 'true',
'MessageRetentionPeriod': '1209600',
'VisibilityTimeoutSeconds': '300',
'ReceiveMessageWaitTimeSeconds': '20',
'KmsMasterKeyId': 'alias/aws/sqs'
}
)
print(f"FIFO queue creation completed: {response['QueueUrl']}")
return response['QueueUrl']
def send_fifo_message(self, queue_url, message_body, group_id, deduplication_id=None):
"""Send message to FIFO queue"""
params = {
'QueueUrl': queue_url,
'MessageBody': message_body,
'MessageGroupId': group_id
}
if deduplication_id:
params['MessageDeduplicationId'] = deduplication_id
response = self.sqs.send_message(**params)
print(f"FIFO message sent successfully: {response['MessageId']}")
return response
def set_queue_policy(self, queue_url, policy):
"""Set queue policy"""
self.sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
'Policy': json.dumps(policy)
}
)
print("Queue policy set successfully")
def purge_queue(self, queue_url):
"""Purge queue (delete all messages)"""
try:
self.sqs.purge_queue(QueueUrl=queue_url)
print("Queue purge completed")
except ClientError as e:
if e.response['Error']['Code'] == 'PurgeQueueInProgress':
print("Purge already in progress")
else:
raise
def get_queue_metrics(self, queue_url):
"""Get queue metrics"""
attributes = self.sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=[
'ApproximateNumberOfMessages',
'ApproximateNumberOfMessagesNotVisible',
'ApproximateNumberOfMessagesDelayed',
'CreatedTimestamp',
'LastModifiedTimestamp'
]
)
metrics = attributes['Attributes']
print("Queue metrics:")
print(f" Waiting messages: {metrics.get('ApproximateNumberOfMessages', 0)}")
print(f" Processing messages: {metrics.get('ApproximateNumberOfMessagesNotVisible', 0)}")
print(f" Delayed messages: {metrics.get('ApproximateNumberOfMessagesDelayed', 0)}")
return metrics
# Usage example
handler = AdvancedSQSHandler()
# Create queue with DLQ
main_url, dlq_url = handler.create_queue_with_dlq('order-processing')
# Create FIFO queue
fifo_url = handler.create_fifo_queue('transaction-log')
# Send FIFO message
handler.send_fifo_message(
fifo_url,
json.dumps({'transaction_id': 'TXN-001', 'amount': 10000}),
group_id='financial-transactions',
deduplication_id='TXN-001-20250101'
)
# Check metrics
handler.get_queue_metrics(main_url)
Clustering and Monitoring Setup
import boto3
import json
from datetime import datetime, timedelta
# CloudWatch integration and alert setup
class SQSMonitoring:
def __init__(self, region_name='us-east-1'):
self.cloudwatch = boto3.client('cloudwatch', region_name=region_name)
self.sqs = boto3.client('sqs', region_name=region_name)
self.region = region_name
def create_queue_alarms(self, queue_name, sns_topic_arn):
"""Create CloudWatch alarms for SQS queue"""
# Message count alarm
self.cloudwatch.put_metric_alarm(
AlarmName=f'{queue_name}-MessageCount-High',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=2,
MetricName='ApproximateNumberOfVisibleMessages',
Namespace='AWS/SQS',
Period=300, # 5 minutes
Statistic='Average',
Threshold=1000.0,
ActionsEnabled=True,
AlarmActions=[sns_topic_arn],
AlarmDescription=f'High message count in {queue_name}',
Dimensions=[
{
'Name': 'QueueName',
'Value': queue_name
}
]
)
# Message age alarm
self.cloudwatch.put_metric_alarm(
AlarmName=f'{queue_name}-MessageAge-High',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=1,
MetricName='ApproximateAgeOfOldestMessage',
Namespace='AWS/SQS',
Period=300,
Statistic='Maximum',
Threshold=1800.0, # 30 minutes
ActionsEnabled=True,
AlarmActions=[sns_topic_arn],
AlarmDescription=f'Old messages in {queue_name}',
Dimensions=[
{
'Name': 'QueueName',
'Value': queue_name
}
]
)
print(f"CloudWatch alarms created: {queue_name}")
def get_queue_metrics(self, queue_name, hours=24):
"""Get queue metrics"""
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=hours)
# Get message count
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/SQS',
MetricName='ApproximateNumberOfVisibleMessages',
Dimensions=[
{
'Name': 'QueueName',
'Value': queue_name
}
],
StartTime=start_time,
EndTime=end_time,
Period=3600, # Every hour
Statistics=['Average', 'Maximum']
)
print(f"Message count for the past {hours} hours:")
for datapoint in sorted(response['Datapoints'], key=lambda x: x['Timestamp']):
print(f" {datapoint['Timestamp']}: avg{datapoint['Average']:.0f}, max{datapoint['Maximum']:.0f}")
return response['Datapoints']
def create_custom_metric(self, queue_url, metric_name, value, unit='Count'):
"""Send custom metrics"""
queue_name = queue_url.split('/')[-1]
self.cloudwatch.put_metric_data(
Namespace='Custom/SQS',
MetricData=[
{
'MetricName': metric_name,
'Dimensions': [
{
'Name': 'QueueName',
'Value': queue_name
}
],
'Value': value,
'Unit': unit,
'Timestamp': datetime.utcnow()
}
]
)
# Lambda integration for message processing
class LambdaSQSIntegration:
def __init__(self):
self.lambda_client = boto3.client('lambda')
self.sqs = boto3.client('sqs')
def create_lambda_trigger(self, function_name, queue_arn):
"""Set up SQS trigger for Lambda"""
try:
response = self.lambda_client.create_event_source_mapping(
EventSourceArn=queue_arn,
FunctionName=function_name,
BatchSize=10, # Number of messages to process at once
MaximumBatchingWindowInSeconds=5, # Maximum batching wait time
ParallelizationFactor=2 # Number of parallel executions
)
print(f"Lambda trigger created: {response['UUID']}")
return response
except Exception as e:
print(f"Trigger creation error: {e}")
def update_trigger_config(self, uuid, batch_size=None, max_batching_window=None):
"""Update existing trigger configuration"""
update_params = {'UUID': uuid}
if batch_size:
update_params['BatchSize'] = batch_size
if max_batching_window:
update_params['MaximumBatchingWindowInSeconds'] = max_batching_window
response = self.lambda_client.update_event_source_mapping(**update_params)
print("Lambda trigger configuration updated")
return response
# Auto Scaling integration
def setup_auto_scaling(queue_url, target_group_arn):
"""Set up Auto Scaling based on SQS metrics"""
autoscaling = boto3.client('application-autoscaling')
# Register scalable target
autoscaling.register_scalable_target(
ServiceNamespace='ecs',
ResourceId=f'service/my-cluster/my-service',
ScalableDimension='ecs:service:DesiredCount',
MinCapacity=1,
MaxCapacity=10
)
# Create scaling policy
policy_response = autoscaling.put_scaling_policy(
PolicyName='sqs-scale-out-policy',
ServiceNamespace='ecs',
ResourceId='service/my-cluster/my-service',
ScalableDimension='ecs:service:DesiredCount',
PolicyType='TargetTrackingScaling',
TargetTrackingScalingPolicyConfiguration={
'TargetValue': 100.0, # Target: 100 messages per queue
'CustomMetricSpecification': {
'MetricName': 'ApproximateNumberOfVisibleMessages',
'Namespace': 'AWS/SQS',
'Dimensions': [
{
'Name': 'QueueName',
'Value': queue_url.split('/')[-1]
}
],
'Statistic': 'Average'
}
}
)
print("Auto Scaling configuration completed")
return policy_response
# Usage example
monitoring = SQSMonitoring()
lambda_integration = LambdaSQSIntegration()
# Set up CloudWatch alarms
# monitoring.create_queue_alarms('my-queue', 'arn:aws:sns:us-east-1:123456789012:alerts')
# Get metrics
# monitoring.get_queue_metrics('my-queue', hours=12)
# Set up Lambda trigger
# lambda_integration.create_lambda_trigger('my-processor-function', 'arn:aws:sqs:us-east-1:123456789012:my-queue')
Framework Integration and Practical Examples
# Terraform configuration example (HCL)
terraform_config = '''
# variables.tf
variable "queue_name" {
description = "SQS queue name"
type = string
default = "app-queue"
}
variable "environment" {
description = "Environment name"
type = string
default = "production"
}
# main.tf
resource "aws_sqs_queue" "main_queue" {
name = var.queue_name
delay_seconds = 0
max_message_size = 262144
message_retention_seconds = 1209600
receive_wait_time_seconds = 20
visibility_timeout_seconds = 300
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.dlq.arn
maxReceiveCount = 3
})
kms_master_key_id = aws_kms_key.sqs_key.arn
tags = {
Name = var.queue_name
Environment = var.environment
}
}
resource "aws_sqs_queue" "dlq" {
name = "${var.queue_name}-dlq"
message_retention_seconds = 1209600
tags = {
Name = "${var.queue_name}-dlq"
Environment = var.environment
}
}
resource "aws_kms_key" "sqs_key" {
description = "KMS key for SQS encryption"
deletion_window_in_days = 7
}
# IAM Role for Lambda
resource "aws_iam_role" "lambda_role" {
name = "${var.queue_name}-lambda-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy" "lambda_sqs_policy" {
name = "${var.queue_name}-lambda-sqs-policy"
role = aws_iam_role.lambda_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
]
Resource = aws_sqs_queue.main_queue.arn
}
]
})
}
# outputs.tf
output "queue_url" {
description = "URL of the created SQS queue"
value = aws_sqs_queue.main_queue.url
}
output "queue_arn" {
description = "ARN of the created SQS queue"
value = aws_sqs_queue.main_queue.arn
}
output "dlq_url" {
description = "URL of the dead letter queue"
value = aws_sqs_queue.dlq.url
}
'''
# Spring Boot integration example
spring_boot_config = '''
# application.yml
cloud:
aws:
sqs:
region: us-east-1
endpoint: https://sqs.us-east-1.amazonaws.com
credentials:
access-key: ${AWS_ACCESS_KEY_ID}
secret-key: ${AWS_SECRET_ACCESS_KEY}
app:
sqs:
queue-name: app-queue
dlq-name: app-queue-dlq
max-concurrent-messages: 10
visibility-timeout: 300
wait-time-seconds: 20
# SQSConfiguration.java
@Configuration
@EnableConfigurationProperties
public class SQSConfiguration {
@Value("${cloud.aws.sqs.region}")
private String region;
@Bean
public AmazonSQS amazonSQS() {
return AmazonSQSClientBuilder.standard()
.withRegion(region)
.withCredentials(DefaultAWSCredentialsProviderChain.getInstance())
.build();
}
@Bean
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQS amazonSQS) {
return new QueueMessagingTemplate(amazonSQS);
}
}
# MessageService.java
@Service
public class MessageService {
@Autowired
private QueueMessagingTemplate messagingTemplate;
@Value("${app.sqs.queue-name}")
private String queueName;
public void sendMessage(Object message) {
messagingTemplate.convertAndSend(queueName, message);
}
@SqsListener("${app.sqs.queue-name}")
public void receiveMessage(String message) {
try {
// Message processing logic
processMessage(message);
} catch (Exception e) {
// Error handling
log.error("Message processing failed: {}", e.getMessage(), e);
throw e; // Re-throw to send to DLQ
}
}
private void processMessage(String message) {
// Business logic implementation
log.info("Processing message: {}", message);
}
}
'''
# Node.js Express integration example
nodejs_example = '''
// package.json
{
"dependencies": {
"aws-sdk": "^2.1400.0",
"express": "^4.18.0",
"uuid": "^9.0.0"
}
}
// sqsService.js
const AWS = require('aws-sdk');
const { v4: uuidv4 } = require('uuid');
class SQSService {
constructor() {
this.sqs = new AWS.SQS({
region: process.env.AWS_REGION || 'us-east-1'
});
this.queueUrl = process.env.SQS_QUEUE_URL;
}
async sendMessage(messageBody, groupId = null) {
const params = {
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify(messageBody),
MessageAttributes: {
timestamp: {
DataType: 'String',
StringValue: new Date().toISOString()
},
messageId: {
DataType: 'String',
StringValue: uuidv4()
}
}
};
if (groupId) {
params.MessageGroupId = groupId;
params.MessageDeduplicationId = uuidv4();
}
try {
const result = await this.sqs.sendMessage(params).promise();
console.log('Message sent:', result.MessageId);
return result;
} catch (error) {
console.error('Failed to send message:', error);
throw error;
}
}
async receiveMessages(maxMessages = 10) {
const params = {
QueueUrl: this.queueUrl,
MaxNumberOfMessages: maxMessages,
WaitTimeSeconds: 20,
MessageAttributeNames: ['All']
};
try {
const result = await this.sqs.receiveMessage(params).promise();
return result.Messages || [];
} catch (error) {
console.error('Failed to receive messages:', error);
throw error;
}
}
async deleteMessage(receiptHandle) {
const params = {
QueueUrl: this.queueUrl,
ReceiptHandle: receiptHandle
};
try {
await this.sqs.deleteMessage(params).promise();
console.log('Message deleted successfully');
} catch (error) {
console.error('Failed to delete message:', error);
throw error;
}
}
async startPolling(messageHandler) {
console.log('Starting SQS polling...');
while (true) {
try {
const messages = await this.receiveMessages();
for (const message of messages) {
try {
await messageHandler(JSON.parse(message.Body));
await this.deleteMessage(message.ReceiptHandle);
} catch (error) {
console.error('Message processing failed:', error);
// Retry until moved to DLQ
}
}
} catch (error) {
console.error('Polling error:', error);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
}
module.exports = SQSService;
// app.js
const express = require('express');
const SQSService = require('./sqsService');
const app = express();
const sqsService = new SQSService();
app.use(express.json());
// Message sending endpoint
app.post('/send', async (req, res) => {
try {
const result = await sqsService.sendMessage(req.body);
res.json({ success: true, messageId: result.MessageId });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Message processing logic
const processMessage = async (messageBody) => {
console.log('Processing:', messageBody);
// Business logic implementation
if (messageBody.type === 'order') {
// Order processing
await processOrder(messageBody);
} else if (messageBody.type === 'notification') {
// Notification processing
await sendNotification(messageBody);
}
};
// Start polling
sqsService.startPolling(processMessage);
app.listen(3000, () => {
console.log('Server running on port 3000');
});
'''
print("Configuration examples:")
print("1. Terraform configuration")
print("2. Spring Boot integration")
print("3. Node.js Express integration")