Amazon S3

スケーラブルなオブジェクトストレージサービス。99.999999999%(11 9's)の耐久性、無制限容量、豊富なセキュリティ・管理機能を提供。

ファイルサーバーオブジェクトストレージクラウドストレージAWSスケーラブル高可用性

サーバー

AWS S3

概要

Amazon S3(Simple Storage Service)は、業界をリードするスケーラビリティ、データ可用性、セキュリティ、パフォーマンスを誇るクラウドオブジェクトストレージサービスです。数GB〜数エクサバイトまでの無制限のデータ容量に対応し、99.999999999%(イレブンナイン)の堅牢性を提供します。データレイク、モバイルアプリケーション、バックアップ・復元、アーカイブ、IoTデバイス、機械学習、AI、分析など幅広い用途で利用されています。従量課金制でコスト効率が高く、グローバルなインフラストラクチャとAWSエコシステムの深い統合により、現代のクラウドアーキテクチャの中核を担います。

詳細

AWS S3 2025年版は、S3 Express One Zoneストレージクラスの追加により単一桁ミリ秒のアクセス性能を実現し、高性能ワークロード向けの選択肢を大幅に拡充しました。従来のS3 Standard、S3 Intelligent-Tiering、S3 Glacier、S3 Glacier Deep Archiveに加え、新たなストレージクラスで多様な用途に最適化されたコスト構造を提供します。S3 Batch Operationsによる数十億オブジェクトの一括管理、S3 Replicationによるクロスリージョン複製、S3 Storage Lensによる包括的な分析とインサイト、VPC Endpointとの統合によるプライベートネットワーク接続などの機能を標準提供。暗号化、アクセス制御、監査機能、ライフサイクル管理などのエンタープライズ機能も充実しており、世界最大規模のクラウドストレージプラットフォームとしての地位を確立しています。

主な特徴

  • 無制限スケーラビリティ: データ容量とオブジェクト数が無制限(単一オブジェクト最大5TB)
  • 高い堅牢性: 99.999999999%(イレブンナイン)のデータ堅牢性を保証
  • 多様なストレージクラス: 用途とコストに応じた7つのストレージクラス
  • グローバルインフラ: 世界中のAWSリージョンで一貫したサービス提供
  • 豊富なAPI: REST API、SDKによる柔軟なプログラマティックアクセス
  • 深いAWS統合: Lambda、CloudFront、CloudTrailなどとのシームレス連携

メリット・デメリット

メリット

  • 無制限の拡張性により事前容量計画が不要で運用コストを削減
  • イレブンナインの堅牢性と複数AZ分散により業界最高レベルの信頼性
  • 従量課金制で初期費用不要、使用した分だけの合理的な料金体系
  • AWSエコシステムとの深い統合により複雑なクラウドアーキテクチャを簡潔に構築
  • 世界規模のCDN統合で高速コンテンツ配信を実現
  • 包括的なセキュリティ機能(暗号化、IAM、VPC統合)を標準提供

デメリット

  • AWS依存によるベンダーロックインのリスクと他クラウドへの移行コスト
  • リクエスト数が多い場合のAPIコール課金が高額になる可能性
  • データ転送料金(特にアウトバウンド)が高額で予期しないコスト発生リスク
  • ストレージクラス選択とライフサイクル設計の複雑さによる運用コスト
  • 細かい料金体系により正確なコスト予測が困難
  • レイテンシーが要求される用途では従来型ストレージより劣る場合がある

参考ページ

書き方の例

AWS CLI による基本操作

# AWS CLI インストール
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install

# 認証情報設定
aws configure
# AWS Access Key ID: YOUR_ACCESS_KEY
# AWS Secret Access Key: YOUR_SECRET_KEY
# Default region name: ap-northeast-1
# Default output format: json

# または環境変数で設定
export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
export AWS_DEFAULT_REGION=ap-northeast-1

# バケット操作
aws s3 mb s3://my-unique-bucket-name-2025  # バケット作成
aws s3 ls                                   # バケット一覧
aws s3 ls s3://my-unique-bucket-name-2025  # バケット内容一覧

# ファイル操作
aws s3 cp localfile.txt s3://my-unique-bucket-name-2025/  # アップロード
aws s3 cp s3://my-unique-bucket-name-2025/file.txt ./     # ダウンロード
aws s3 sync ./local-folder s3://my-unique-bucket-name-2025/remote-folder  # 同期

# 高度な操作
aws s3 cp file.txt s3://bucket/path/ --storage-class GLACIER  # ストレージクラス指定
aws s3 cp folder/ s3://bucket/ --recursive --exclude "*.tmp"  # 再帰的アップロード(除外パターン)

# バケット削除(空の場合のみ)
aws s3 rb s3://my-unique-bucket-name-2025
# 強制削除(中身も含む)
aws s3 rb s3://my-unique-bucket-name-2025 --force

Python Boto3 での基本操作

import boto3
from botocore.exceptions import ClientError
import json
from datetime import datetime

# S3クライアント作成
s3_client = boto3.client('s3', region_name='ap-northeast-1')
s3_resource = boto3.resource('s3', region_name='ap-northeast-1')

# バケット操作
def create_bucket(bucket_name, region='ap-northeast-1'):
    """S3バケットを作成"""
    try:
        s3_client.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={'LocationConstraint': region}
        )
        print(f"バケット '{bucket_name}' を作成しました")
    except ClientError as e:
        print(f"エラー: {e}")

def list_buckets():
    """バケット一覧を取得"""
    response = s3_client.list_buckets()
    print("バケット一覧:")
    for bucket in response['Buckets']:
        print(f"  - {bucket['Name']} (作成日: {bucket['CreationDate']})")

# オブジェクト操作
def upload_file(file_path, bucket_name, object_key=None):
    """ファイルをS3にアップロード"""
    if object_key is None:
        object_key = file_path
    
    try:
        s3_client.upload_file(file_path, bucket_name, object_key)
        print(f"'{file_path}' を '{bucket_name}/{object_key}' にアップロードしました")
    except ClientError as e:
        print(f"アップロードエラー: {e}")

def download_file(bucket_name, object_key, file_path):
    """S3からファイルをダウンロード"""
    try:
        s3_client.download_file(bucket_name, object_key, file_path)
        print(f"'{bucket_name}/{object_key}' を '{file_path}' にダウンロードしました")
    except ClientError as e:
        print(f"ダウンロードエラー: {e}")

def upload_with_metadata(file_path, bucket_name, object_key, metadata=None):
    """メタデータ付きでアップロード"""
    extra_args = {}
    if metadata:
        extra_args['Metadata'] = metadata
    
    try:
        s3_client.upload_file(
            file_path, bucket_name, object_key,
            ExtraArgs=extra_args
        )
        print(f"メタデータ付きアップロード完了: {object_key}")
    except ClientError as e:
        print(f"エラー: {e}")

# 使用例
bucket_name = "my-test-bucket-2025"
create_bucket(bucket_name)
upload_file("test.txt", bucket_name, "documents/test.txt")
upload_with_metadata(
    "image.jpg", bucket_name, "images/image.jpg",
    {"Content-Type": "image/jpeg", "Author": "TestUser"}
)
download_file(bucket_name, "documents/test.txt", "downloaded_test.txt")

ストレージクラスとライフサイクル管理

import boto3
import json

s3_client = boto3.client('s3')

def create_lifecycle_policy(bucket_name):
    """ライフサイクルポリシーを設定"""
    lifecycle_config = {
        'Rules': [
            {
                'ID': 'TransitionRule',
                'Status': 'Enabled',
                'Filter': {'Prefix': 'documents/'},
                'Transitions': [
                    {
                        'Days': 30,
                        'StorageClass': 'STANDARD_IA'  # 30日後にIA
                    },
                    {
                        'Days': 90,
                        'StorageClass': 'GLACIER'      # 90日後にGlacier
                    },
                    {
                        'Days': 365,
                        'StorageClass': 'DEEP_ARCHIVE' # 1年後にDeep Archive
                    }
                ]
            },
            {
                'ID': 'DeleteRule',
                'Status': 'Enabled',
                'Filter': {'Prefix': 'temp/'},
                'Expiration': {'Days': 7}  # 7日後に削除
            },
            {
                'ID': 'IncompleteMultipartUploads',
                'Status': 'Enabled',
                'Filter': {},
                'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 7}
            }
        ]
    }
    
    try:
        s3_client.put_bucket_lifecycle_configuration(
            Bucket=bucket_name,
            LifecycleConfiguration=lifecycle_config
        )
        print(f"ライフサイクルポリシーを設定しました: {bucket_name}")
    except ClientError as e:
        print(f"エラー: {e}")

def upload_with_storage_class(file_path, bucket_name, object_key, storage_class='STANDARD'):
    """特定のストレージクラスでアップロード"""
    try:
        s3_client.upload_file(
            file_path, bucket_name, object_key,
            ExtraArgs={'StorageClass': storage_class}
        )
        print(f"'{object_key}' を {storage_class} クラスでアップロードしました")
    except ClientError as e:
        print(f"エラー: {e}")

# 使用例
create_lifecycle_policy("my-bucket")
upload_with_storage_class("archive.zip", "my-bucket", "archives/backup.zip", "GLACIER")

セキュリティ設定とアクセス制御

import boto3
import json

s3_client = boto3.client('s3')

def create_bucket_policy(bucket_name, policy_statements):
    """バケットポリシーを設定"""
    bucket_policy = {
        "Version": "2012-10-17",
        "Statement": policy_statements
    }
    
    try:
        s3_client.put_bucket_policy(
            Bucket=bucket_name,
            Policy=json.dumps(bucket_policy)
        )
        print(f"バケットポリシーを設定しました: {bucket_name}")
    except ClientError as e:
        print(f"エラー: {e}")

def enable_bucket_encryption(bucket_name, kms_key_id=None):
    """バケット暗号化を有効化"""
    if kms_key_id:
        # KMS管理キーを使用
        encryption_config = {
            'Rules': [{
                'ApplyServerSideEncryptionByDefault': {
                    'SSEAlgorithm': 'aws:kms',
                    'KMSMasterKeyID': kms_key_id
                }
            }]
        }
    else:
        # S3管理キーを使用
        encryption_config = {
            'Rules': [{
                'ApplyServerSideEncryptionByDefault': {
                    'SSEAlgorithm': 'AES256'
                }
            }]
        }
    
    try:
        s3_client.put_bucket_encryption(
            Bucket=bucket_name,
            ServerSideEncryptionConfiguration=encryption_config
        )
        print(f"バケット暗号化を有効化しました: {bucket_name}")
    except ClientError as e:
        print(f"エラー: {e}")

def enable_versioning(bucket_name):
    """バージョニングを有効化"""
    try:
        s3_client.put_bucket_versioning(
            Bucket=bucket_name,
            VersioningConfiguration={'Status': 'Enabled'}
        )
        print(f"バージョニングを有効化しました: {bucket_name}")
    except ClientError as e:
        print(f"エラー: {e}")

# パブリック読み取り専用ポリシー例
public_read_policy = [{
    "Effect": "Allow",
    "Principal": "*",
    "Action": "s3:GetObject",
    "Resource": f"arn:aws:s3:::my-bucket/public/*"
}]

# 使用例
enable_bucket_encryption("my-bucket")
enable_versioning("my-bucket")
create_bucket_policy("my-bucket", public_read_policy)

署名付きURL(プリサインドURL)の生成

import boto3
from botocore.exceptions import ClientError
from datetime import timedelta

s3_client = boto3.client('s3')

def generate_presigned_url(bucket_name, object_key, expiration=3600, http_method='GET'):
    """署名付きURLを生成"""
    try:
        if http_method == 'GET':
            response = s3_client.generate_presigned_url(
                'get_object',
                Params={'Bucket': bucket_name, 'Key': object_key},
                ExpiresIn=expiration
            )
        elif http_method == 'PUT':
            response = s3_client.generate_presigned_url(
                'put_object',
                Params={'Bucket': bucket_name, 'Key': object_key},
                ExpiresIn=expiration
            )
        
        print(f"署名付きURL (有効期限: {expiration}秒):")
        print(response)
        return response
        
    except ClientError as e:
        print(f"エラー: {e}")
        return None

def generate_presigned_post(bucket_name, object_key, expiration=3600, max_size=1048576):
    """POST用署名付きURLを生成(フォームアップロード用)"""
    try:
        response = s3_client.generate_presigned_post(
            Bucket=bucket_name,
            Key=object_key,
            Fields={"acl": "private"},
            Conditions=[
                {"acl": "private"},
                ["content-length-range", 1, max_size]  # 1B〜1MB
            ],
            ExpiresIn=expiration
        )
        
        print("POST用署名付きURL:")
        print(f"URL: {response['url']}")
        print(f"Fields: {response['fields']}")
        return response
        
    except ClientError as e:
        print(f"エラー: {e}")
        return None

# 使用例
download_url = generate_presigned_url("my-bucket", "documents/file.pdf", 3600, 'GET')
upload_url = generate_presigned_url("my-bucket", "uploads/new-file.txt", 1800, 'PUT')
post_data = generate_presigned_post("my-bucket", "uploads/form-upload.jpg", 3600)

大容量ファイルのマルチパートアップロード

import boto3
import os
from botocore.exceptions import ClientError

s3_client = boto3.client('s3')

def multipart_upload(file_path, bucket_name, object_key, part_size=5*1024*1024):
    """大容量ファイルのマルチパートアップロード"""
    file_size = os.path.getsize(file_path)
    
    if file_size <= part_size:
        # 小さいファイルは通常のアップロード
        s3_client.upload_file(file_path, bucket_name, object_key)
        print(f"通常アップロード完了: {object_key}")
        return
    
    # マルチパートアップロード開始
    try:
        response = s3_client.create_multipart_upload(
            Bucket=bucket_name,
            Key=object_key
        )
        upload_id = response['UploadId']
        print(f"マルチパートアップロード開始: {upload_id}")
        
        parts = []
        part_number = 1
        
        with open(file_path, 'rb') as file:
            while True:
                data = file.read(part_size)
                if not data:
                    break
                
                # パートアップロード
                part_response = s3_client.upload_part(
                    Bucket=bucket_name,
                    Key=object_key,
                    PartNumber=part_number,
                    UploadId=upload_id,
                    Body=data
                )
                
                parts.append({
                    'ETag': part_response['ETag'],
                    'PartNumber': part_number
                })
                
                print(f"パート {part_number} アップロード完了")
                part_number += 1
        
        # マルチパートアップロード完了
        s3_client.complete_multipart_upload(
            Bucket=bucket_name,
            Key=object_key,
            UploadId=upload_id,
            MultipartUpload={'Parts': parts}
        )
        
        print(f"マルチパートアップロード完了: {object_key}")
        
    except ClientError as e:
        # エラー時はアップロードを中止
        s3_client.abort_multipart_upload(
            Bucket=bucket_name,
            Key=object_key,
            UploadId=upload_id
        )
        print(f"マルチパートアップロードエラー: {e}")

# 使用例
multipart_upload("large_file.zip", "my-bucket", "archives/large_file.zip")

モニタリングとログ管理

import boto3
from datetime import datetime, timedelta

cloudwatch = boto3.client('cloudwatch')
s3_client = boto3.client('s3')

def get_s3_metrics(bucket_name, days=7):
    """S3メトリクスを取得"""
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=days)
    
    # バケットサイズメトリクス
    try:
        response = cloudwatch.get_metric_statistics(
            Namespace='AWS/S3',
            MetricName='BucketSizeBytes',
            Dimensions=[
                {'Name': 'BucketName', 'Value': bucket_name},
                {'Name': 'StorageType', 'Value': 'StandardStorage'}
            ],
            StartTime=start_time,
            EndTime=end_time,
            Period=86400,  # 1日
            Statistics=['Average']
        )
        
        print(f"バケットサイズ ({bucket_name}):")
        for point in response['Datapoints']:
            size_gb = point['Average'] / (1024**3)
            print(f"  {point['Timestamp'].strftime('%Y-%m-%d')}: {size_gb:.2f} GB")
            
    except ClientError as e:
        print(f"メトリクス取得エラー: {e}")

def setup_bucket_logging(bucket_name, target_bucket, prefix="access-logs/"):
    """アクセスログ設定"""
    logging_config = {
        'LoggingEnabled': {
            'TargetBucket': target_bucket,
            'TargetPrefix': prefix
        }
    }
    
    try:
        s3_client.put_bucket_logging(
            Bucket=bucket_name,
            BucketLoggingStatus=logging_config
        )
        print(f"アクセスログ設定完了: {bucket_name} -> {target_bucket}/{prefix}")
    except ClientError as e:
        print(f"ログ設定エラー: {e}")

# 使用例
get_s3_metrics("my-bucket")
setup_bucket_logging("my-bucket", "my-log-bucket", "logs/my-bucket/")