Amazon S3

Scalable object storage service. Provides 99.999999999% (11 9's) durability, unlimited capacity, and rich security and management features.

File ServerObject StorageCloud StorageAWSScalableHigh Availability

Server

AWS S3

Overview

Amazon S3 (Simple Storage Service) is a cloud object storage service with industry-leading scalability, data availability, security, and performance. It supports unlimited data capacity from gigabytes to exabytes and provides 99.999999999% (eleven 9's) durability. S3 is widely used for data lakes, mobile applications, backup and restore, archival, IoT devices, machine learning, AI, and analytics. With a pay-as-you-go pricing model that's cost-effective, global infrastructure, and deep integration with the AWS ecosystem, S3 serves as the backbone of modern cloud architectures.

Details

AWS S3 2025 edition has significantly expanded high-performance workload options with the addition of the S3 Express One Zone storage class, achieving single-digit millisecond access performance. In addition to traditional S3 Standard, S3 Intelligent-Tiering, S3 Glacier, and S3 Glacier Deep Archive, new storage classes provide cost structures optimized for diverse use cases. Standard features include S3 Batch Operations for managing billions of objects at scale, S3 Replication for cross-region replication, S3 Storage Lens for comprehensive analytics and insights, and integration with VPC Endpoints for private network connectivity. Enterprise features such as encryption, access control, auditing, and lifecycle management are also robust, establishing S3's position as the world's largest cloud storage platform.

Key Features

  • Unlimited Scalability: Unlimited data capacity and object count (maximum 5TB per object)
  • High Durability: Guarantees 99.999999999% (eleven 9's) data durability
  • Diverse Storage Classes: Seven storage classes for different use cases and costs
  • Global Infrastructure: Consistent service delivery across AWS regions worldwide
  • Rich APIs: Flexible programmatic access via REST API and SDKs
  • Deep AWS Integration: Seamless integration with Lambda, CloudFront, CloudTrail, and more

Pros and Cons

Pros

  • Unlimited scalability eliminates need for capacity planning and reduces operational costs
  • Industry-leading reliability with eleven 9's durability and multi-AZ distribution
  • Pay-as-you-go pricing with no upfront costs and rational pricing structure
  • Deep AWS ecosystem integration enables simple construction of complex cloud architectures
  • Global-scale CDN integration provides high-speed content delivery
  • Comprehensive security features (encryption, IAM, VPC integration) provided as standard

Cons

  • AWS dependency creates vendor lock-in risk and migration costs to other clouds
  • High API call charges when request volume is large
  • Expensive data transfer fees (especially outbound) with risk of unexpected costs
  • Complex storage class selection and lifecycle design increases operational costs
  • Detailed pricing structure makes accurate cost prediction difficult
  • May be inferior to traditional storage for latency-sensitive use cases

Reference Pages

Code Examples

Basic Operations with AWS CLI

# AWS CLI installation
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install

# Configure credentials
aws configure
# AWS Access Key ID: YOUR_ACCESS_KEY
# AWS Secret Access Key: YOUR_SECRET_KEY
# Default region name: us-east-1
# Default output format: json

# Or set via environment variables
export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
export AWS_DEFAULT_REGION=us-east-1

# Bucket operations
aws s3 mb s3://my-unique-bucket-name-2025  # Create bucket
aws s3 ls                                   # List buckets
aws s3 ls s3://my-unique-bucket-name-2025  # List bucket contents

# File operations
aws s3 cp localfile.txt s3://my-unique-bucket-name-2025/  # Upload
aws s3 cp s3://my-unique-bucket-name-2025/file.txt ./     # Download
aws s3 sync ./local-folder s3://my-unique-bucket-name-2025/remote-folder  # Sync

# Advanced operations
aws s3 cp file.txt s3://bucket/path/ --storage-class GLACIER  # Specify storage class
aws s3 cp folder/ s3://bucket/ --recursive --exclude "*.tmp"  # Recursive upload with exclusions

# Delete bucket (empty only)
aws s3 rb s3://my-unique-bucket-name-2025
# Force delete (including contents)
aws s3 rb s3://my-unique-bucket-name-2025 --force

Basic Operations with Python Boto3

import boto3
from botocore.exceptions import ClientError
import json
from datetime import datetime

# Create S3 client
s3_client = boto3.client('s3', region_name='us-east-1')
s3_resource = boto3.resource('s3', region_name='us-east-1')

# Bucket operations
def create_bucket(bucket_name, region='us-east-1'):
    """Create S3 bucket"""
    try:
        if region == 'us-east-1':
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            s3_client.create_bucket(
                Bucket=bucket_name,
                CreateBucketConfiguration={'LocationConstraint': region}
            )
        print(f"Bucket '{bucket_name}' created successfully")
    except ClientError as e:
        print(f"Error: {e}")

def list_buckets():
    """List all buckets"""
    response = s3_client.list_buckets()
    print("Bucket list:")
    for bucket in response['Buckets']:
        print(f"  - {bucket['Name']} (Created: {bucket['CreationDate']})")

# Object operations
def upload_file(file_path, bucket_name, object_key=None):
    """Upload file to S3"""
    if object_key is None:
        object_key = file_path
    
    try:
        s3_client.upload_file(file_path, bucket_name, object_key)
        print(f"Uploaded '{file_path}' to '{bucket_name}/{object_key}'")
    except ClientError as e:
        print(f"Upload error: {e}")

def download_file(bucket_name, object_key, file_path):
    """Download file from S3"""
    try:
        s3_client.download_file(bucket_name, object_key, file_path)
        print(f"Downloaded '{bucket_name}/{object_key}' to '{file_path}'")
    except ClientError as e:
        print(f"Download error: {e}")

def upload_with_metadata(file_path, bucket_name, object_key, metadata=None):
    """Upload with metadata"""
    extra_args = {}
    if metadata:
        extra_args['Metadata'] = metadata
    
    try:
        s3_client.upload_file(
            file_path, bucket_name, object_key,
            ExtraArgs=extra_args
        )
        print(f"Upload with metadata completed: {object_key}")
    except ClientError as e:
        print(f"Error: {e}")

# Usage examples
bucket_name = "my-test-bucket-2025"
create_bucket(bucket_name)
upload_file("test.txt", bucket_name, "documents/test.txt")
upload_with_metadata(
    "image.jpg", bucket_name, "images/image.jpg",
    {"Content-Type": "image/jpeg", "Author": "TestUser"}
)
download_file(bucket_name, "documents/test.txt", "downloaded_test.txt")

Storage Classes and Lifecycle Management

import boto3
import json

s3_client = boto3.client('s3')

def create_lifecycle_policy(bucket_name):
    """Configure lifecycle policy"""
    lifecycle_config = {
        'Rules': [
            {
                'ID': 'TransitionRule',
                'Status': 'Enabled',
                'Filter': {'Prefix': 'documents/'},
                'Transitions': [
                    {
                        'Days': 30,
                        'StorageClass': 'STANDARD_IA'  # To IA after 30 days
                    },
                    {
                        'Days': 90,
                        'StorageClass': 'GLACIER'      # To Glacier after 90 days
                    },
                    {
                        'Days': 365,
                        'StorageClass': 'DEEP_ARCHIVE' # To Deep Archive after 1 year
                    }
                ]
            },
            {
                'ID': 'DeleteRule',
                'Status': 'Enabled',
                'Filter': {'Prefix': 'temp/'},
                'Expiration': {'Days': 7}  # Delete after 7 days
            },
            {
                'ID': 'IncompleteMultipartUploads',
                'Status': 'Enabled',
                'Filter': {},
                'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 7}
            }
        ]
    }
    
    try:
        s3_client.put_bucket_lifecycle_configuration(
            Bucket=bucket_name,
            LifecycleConfiguration=lifecycle_config
        )
        print(f"Lifecycle policy configured for: {bucket_name}")
    except ClientError as e:
        print(f"Error: {e}")

def upload_with_storage_class(file_path, bucket_name, object_key, storage_class='STANDARD'):
    """Upload with specific storage class"""
    try:
        s3_client.upload_file(
            file_path, bucket_name, object_key,
            ExtraArgs={'StorageClass': storage_class}
        )
        print(f"Uploaded '{object_key}' with {storage_class} class")
    except ClientError as e:
        print(f"Error: {e}")

# Usage examples
create_lifecycle_policy("my-bucket")
upload_with_storage_class("archive.zip", "my-bucket", "archives/backup.zip", "GLACIER")

Security Configuration and Access Control

import boto3
import json

s3_client = boto3.client('s3')

def create_bucket_policy(bucket_name, policy_statements):
    """Configure bucket policy"""
    bucket_policy = {
        "Version": "2012-10-17",
        "Statement": policy_statements
    }
    
    try:
        s3_client.put_bucket_policy(
            Bucket=bucket_name,
            Policy=json.dumps(bucket_policy)
        )
        print(f"Bucket policy configured for: {bucket_name}")
    except ClientError as e:
        print(f"Error: {e}")

def enable_bucket_encryption(bucket_name, kms_key_id=None):
    """Enable bucket encryption"""
    if kms_key_id:
        # Use KMS managed key
        encryption_config = {
            'Rules': [{
                'ApplyServerSideEncryptionByDefault': {
                    'SSEAlgorithm': 'aws:kms',
                    'KMSMasterKeyID': kms_key_id
                }
            }]
        }
    else:
        # Use S3 managed key
        encryption_config = {
            'Rules': [{
                'ApplyServerSideEncryptionByDefault': {
                    'SSEAlgorithm': 'AES256'
                }
            }]
        }
    
    try:
        s3_client.put_bucket_encryption(
            Bucket=bucket_name,
            ServerSideEncryptionConfiguration=encryption_config
        )
        print(f"Bucket encryption enabled for: {bucket_name}")
    except ClientError as e:
        print(f"Error: {e}")

def enable_versioning(bucket_name):
    """Enable versioning"""
    try:
        s3_client.put_bucket_versioning(
            Bucket=bucket_name,
            VersioningConfiguration={'Status': 'Enabled'}
        )
        print(f"Versioning enabled for: {bucket_name}")
    except ClientError as e:
        print(f"Error: {e}")

# Public read-only policy example
public_read_policy = [{
    "Effect": "Allow",
    "Principal": "*",
    "Action": "s3:GetObject",
    "Resource": f"arn:aws:s3:::my-bucket/public/*"
}]

# Usage examples
enable_bucket_encryption("my-bucket")
enable_versioning("my-bucket")
create_bucket_policy("my-bucket", public_read_policy)

Presigned URL Generation

import boto3
from botocore.exceptions import ClientError
from datetime import timedelta

s3_client = boto3.client('s3')

def generate_presigned_url(bucket_name, object_key, expiration=3600, http_method='GET'):
    """Generate presigned URL"""
    try:
        if http_method == 'GET':
            response = s3_client.generate_presigned_url(
                'get_object',
                Params={'Bucket': bucket_name, 'Key': object_key},
                ExpiresIn=expiration
            )
        elif http_method == 'PUT':
            response = s3_client.generate_presigned_url(
                'put_object',
                Params={'Bucket': bucket_name, 'Key': object_key},
                ExpiresIn=expiration
            )
        
        print(f"Presigned URL (expires in {expiration} seconds):")
        print(response)
        return response
        
    except ClientError as e:
        print(f"Error: {e}")
        return None

def generate_presigned_post(bucket_name, object_key, expiration=3600, max_size=1048576):
    """Generate presigned POST URL (for form uploads)"""
    try:
        response = s3_client.generate_presigned_post(
            Bucket=bucket_name,
            Key=object_key,
            Fields={"acl": "private"},
            Conditions=[
                {"acl": "private"},
                ["content-length-range", 1, max_size]  # 1B to 1MB
            ],
            ExpiresIn=expiration
        )
        
        print("Presigned POST URL:")
        print(f"URL: {response['url']}")
        print(f"Fields: {response['fields']}")
        return response
        
    except ClientError as e:
        print(f"Error: {e}")
        return None

# Usage examples
download_url = generate_presigned_url("my-bucket", "documents/file.pdf", 3600, 'GET')
upload_url = generate_presigned_url("my-bucket", "uploads/new-file.txt", 1800, 'PUT')
post_data = generate_presigned_post("my-bucket", "uploads/form-upload.jpg", 3600)

Multipart Upload for Large Files

import boto3
import os
from botocore.exceptions import ClientError

s3_client = boto3.client('s3')

def multipart_upload(file_path, bucket_name, object_key, part_size=5*1024*1024):
    """Multipart upload for large files"""
    file_size = os.path.getsize(file_path)
    
    if file_size <= part_size:
        # Use regular upload for small files
        s3_client.upload_file(file_path, bucket_name, object_key)
        print(f"Regular upload completed: {object_key}")
        return
    
    # Start multipart upload
    try:
        response = s3_client.create_multipart_upload(
            Bucket=bucket_name,
            Key=object_key
        )
        upload_id = response['UploadId']
        print(f"Multipart upload started: {upload_id}")
        
        parts = []
        part_number = 1
        
        with open(file_path, 'rb') as file:
            while True:
                data = file.read(part_size)
                if not data:
                    break
                
                # Upload part
                part_response = s3_client.upload_part(
                    Bucket=bucket_name,
                    Key=object_key,
                    PartNumber=part_number,
                    UploadId=upload_id,
                    Body=data
                )
                
                parts.append({
                    'ETag': part_response['ETag'],
                    'PartNumber': part_number
                })
                
                print(f"Part {part_number} uploaded")
                part_number += 1
        
        # Complete multipart upload
        s3_client.complete_multipart_upload(
            Bucket=bucket_name,
            Key=object_key,
            UploadId=upload_id,
            MultipartUpload={'Parts': parts}
        )
        
        print(f"Multipart upload completed: {object_key}")
        
    except ClientError as e:
        # Abort upload on error
        s3_client.abort_multipart_upload(
            Bucket=bucket_name,
            Key=object_key,
            UploadId=upload_id
        )
        print(f"Multipart upload error: {e}")

# Usage example
multipart_upload("large_file.zip", "my-bucket", "archives/large_file.zip")

Monitoring and Log Management

import boto3
from datetime import datetime, timedelta

cloudwatch = boto3.client('cloudwatch')
s3_client = boto3.client('s3')

def get_s3_metrics(bucket_name, days=7):
    """Get S3 metrics"""
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=days)
    
    # Bucket size metrics
    try:
        response = cloudwatch.get_metric_statistics(
            Namespace='AWS/S3',
            MetricName='BucketSizeBytes',
            Dimensions=[
                {'Name': 'BucketName', 'Value': bucket_name},
                {'Name': 'StorageType', 'Value': 'StandardStorage'}
            ],
            StartTime=start_time,
            EndTime=end_time,
            Period=86400,  # 1 day
            Statistics=['Average']
        )
        
        print(f"Bucket size ({bucket_name}):")
        for point in response['Datapoints']:
            size_gb = point['Average'] / (1024**3)
            print(f"  {point['Timestamp'].strftime('%Y-%m-%d')}: {size_gb:.2f} GB")
            
    except ClientError as e:
        print(f"Metrics retrieval error: {e}")

def setup_bucket_logging(bucket_name, target_bucket, prefix="access-logs/"):
    """Configure access logging"""
    logging_config = {
        'LoggingEnabled': {
            'TargetBucket': target_bucket,
            'TargetPrefix': prefix
        }
    }
    
    try:
        s3_client.put_bucket_logging(
            Bucket=bucket_name,
            BucketLoggingStatus=logging_config
        )
        print(f"Access logging configured: {bucket_name} -> {target_bucket}/{prefix}")
    except ClientError as e:
        print(f"Logging configuration error: {e}")

# Usage examples
get_s3_metrics("my-bucket")
setup_bucket_logging("my-bucket", "my-log-bucket", "logs/my-bucket/")