AWS SageMaker

AI機械学習プラットフォームAWSMLOpsAutoMLクラウドエンタープライズ

AI/MLプラットフォーム

AWS SageMaker

概要

AWS SageMakerは、機械学習のライフサイクル全体をカバーするAmazonの包括的な機械学習プラットフォームです。データ準備、モデル構築、トレーニング、デプロイ、監視まで一元管理でき、エンタープライズレベルのMLOps自動化を実現します。SageMaker Studio、Pipelines、Model Registry、JumpStartなど豊富な機能群を提供し、データサイエンティストから機械学習エンジニアまで様々なスキルレベルのユーザーが効率的にAI/ML開発を行える統合環境です。AutoMLからカスタムモデル開発まで幅広いニーズに対応し、AWSエコシステムとの深い統合によりスケーラブルなML運用を支援しています。

詳細

AWS SageMaker 2025年版は、次世代統合プラットフォームとして大幅に進化し、SageMaker Unified Studio(プレビュー)によるデータ、分析、AI開発の統一環境を提供しています。従来のSageMaker AIに加え、MLOps自動化、リアルタイム推論、サーバーレス推論、バッチ変換など多様な推論オプションを統合。SageMaker Pipelines、Model Monitor、Clarify、Feature Storeなど企業級MLOps機能を完備し、AutoMLからFoundation Modelファインチューニング、カスタムアルゴリズム開発まで包括的にサポート。エンタープライズセキュリティ、コンプライアンス対応、コスト最適化機能により大規模組織でのAI/ML活用を加速しています。

主な特徴

  • 統合開発環境: SageMaker Studio、Jupyter Notebook、Visual Studio Code連携による柔軟な開発体験
  • MLOps自動化: Pipelines、Model Registry、CI/CD統合による本格的な機械学習運用
  • 多様な推論オプション: リアルタイム、サーバーレス、バッチ、非同期推論エンドポイント
  • AutoML機能: AutoPilot、Canvas による No-code/Low-code 機械学習
  • Foundation Model統合: JumpStart、Bedrock連携による生成AI活用
  • エンタープライズ機能: VPC、IAM、暗号化、監査証跡による企業級セキュリティ

メリット・デメリット

メリット

  • AWS全サービスとの統合による完全なクラウドネイティブ体験
  • 豊富なマネージドサービスによる運用負荷軽減と自動スケーリング
  • 従量課金制による初期投資不要でのコスト効率的な利用
  • 企業級セキュリティとコンプライアンス機能の標準装備
  • AutoMLからカスタム開発まで幅広いスキルレベルへの対応
  • MLOps自動化による機械学習モデルの継続的デプロイと監視

デメリット

  • AWS依存によるベンダーロックインリスクと移行コストの発生
  • 複雑な料金体系と予期しない高額課金の可能性
  • 豊富な機能ゆえの学習コストと設定の複雑さ
  • オンプレミス利用不可でインターネット接続必須
  • 他クラウドプラットフォームとの統合制約
  • コンピューティングインスタンス起動時間による開発効率低下

参考ページ

書き方の例

基本セットアップと環境設定

# SageMaker Python SDKのインストール
!pip install sagemaker boto3 pandas numpy

# 必要なライブラリのインポート
import sagemaker
import boto3
import pandas as pd
import numpy as np
from sagemaker import get_execution_role
from sagemaker.session import Session

# AWS認証情報の設定(IAMロール使用)
role = get_execution_role()
sagemaker_session = Session()
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name

print(f"SageMaker役割: {role}")
print(f"SageMakerセッション領域: {region}")
print(f"デフォルトS3バケット: {bucket}")

# SageMakerクライアントの初期化
sm_client = boto3.client('sagemaker')
s3_client = boto3.client('s3')

# 利用可能なインスタンスタイプの確認
def list_available_instances():
    """利用可能なインスタンスタイプを確認"""
    training_instances = [
        'ml.m5.large', 'ml.m5.xlarge', 'ml.m5.2xlarge',
        'ml.c5.xlarge', 'ml.c5.2xlarge', 'ml.c5.4xlarge',
        'ml.p3.2xlarge', 'ml.p3.8xlarge', 'ml.p3.16xlarge',
        'ml.g4dn.xlarge', 'ml.g4dn.2xlarge'
    ]
    
    inference_instances = [
        'ml.t3.medium', 'ml.t3.large', 'ml.t3.xlarge',
        'ml.m5.large', 'ml.m5.xlarge', 'ml.c5.large',
        'ml.c5.xlarge', 'ml.g4dn.xlarge'
    ]
    
    print("=== 推奨トレーニングインスタンス ===")
    for instance in training_instances:
        print(f"- {instance}")
    
    print("\n=== 推奨推論インスタンス ===")
    for instance in inference_instances:
        print(f"- {instance}")

list_available_instances()

# S3データアップロード関数
def upload_data_to_s3(local_path, s3_prefix):
    """ローカルデータをS3にアップロード"""
    s3_path = f"s3://{bucket}/{s3_prefix}"
    sagemaker_session.upload_data(
        path=local_path,
        bucket=bucket,
        key_prefix=s3_prefix
    )
    print(f"データを{s3_path}にアップロードしました")
    return s3_path

# 基本的な環境情報の表示
print(f"\n=== SageMaker環境情報 ===")
print(f"SageMaker SDK バージョン: {sagemaker.__version__}")
print(f"Python実行環境: {sagemaker_session.boto_session.region_name}")

データ処理と特徴量エンジニアリング

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
import json

# SKLearn Processorの設定
sklearn_processor = SKLearnProcessor(
    framework_version='1.0-1',
    role=role,
    instance_type='ml.m5.xlarge',
    instance_count=1,
    base_job_name='data-preprocessing',
    sagemaker_session=sagemaker_session
)

# データ前処理スクリプトの作成
preprocessing_script = """
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
import argparse
import os

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--train-test-split-ratio', type=float, default=0.2)
    args = parser.parse_args()
    
    # データ読み込み
    input_data_path = '/opt/ml/processing/input/data.csv'
    df = pd.read_csv(input_data_path)
    
    print(f"データ形状: {df.shape}")
    print(f"欠損値: {df.isnull().sum().sum()}")
    
    # 基本的な前処理
    # 欠損値処理
    df = df.fillna(df.mean(numeric_only=True))
    
    # カテゴリカル変数のエンコーディング
    categorical_columns = df.select_dtypes(include=['object']).columns
    label_encoders = {}
    
    for col in categorical_columns:
        if col != 'target':  # ターゲット列以外
            le = LabelEncoder()
            df[col] = le.fit_transform(df[col].astype(str))
            label_encoders[col] = le
    
    # 特徴量とターゲットの分離
    if 'target' in df.columns:
        X = df.drop('target', axis=1)
        y = df['target']
    else:
        X = df.iloc[:, :-1]
        y = df.iloc[:, -1]
    
    # 標準化
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    X_scaled = pd.DataFrame(X_scaled, columns=X.columns)
    
    # 訓練・検証・テスト分割
    X_train, X_temp, y_train, y_temp = train_test_split(
        X_scaled, y, test_size=args.train_test_split_ratio * 2, random_state=42
    )
    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, random_state=42
    )
    
    # データ結合
    train_data = pd.concat([y_train, X_train], axis=1)
    val_data = pd.concat([y_val, X_val], axis=1)
    test_data = pd.concat([y_test, X_test], axis=1)
    
    # 出力ディレクトリの作成
    os.makedirs('/opt/ml/processing/train', exist_ok=True)
    os.makedirs('/opt/ml/processing/validation', exist_ok=True)
    os.makedirs('/opt/ml/processing/test', exist_ok=True)
    
    # データ保存
    train_data.to_csv('/opt/ml/processing/train/train.csv', index=False, header=False)
    val_data.to_csv('/opt/ml/processing/validation/validation.csv', index=False, header=False)
    test_data.to_csv('/opt/ml/processing/test/test.csv', index=False, header=False)
    
    # メタデータ保存
    metadata = {
        'feature_columns': list(X.columns),
        'target_column': 'target',
        'scaler_params': {
            'mean': scaler.mean_.tolist(),
            'scale': scaler.scale_.tolist()
        },
        'train_shape': train_data.shape,
        'val_shape': val_data.shape,
        'test_shape': test_data.shape
    }
    
    with open('/opt/ml/processing/train/metadata.json', 'w') as f:
        json.dump(metadata, f, indent=2)
    
    print("データ前処理完了")
    print(f"訓練データ: {train_data.shape}")
    print(f"検証データ: {val_data.shape}")
    print(f"テストデータ: {test_data.shape}")

if __name__ == '__main__':
    main()
"""

# スクリプトファイルの保存
with open('preprocessing.py', 'w') as f:
    f.write(preprocessing_script)

# 処理ジョブの実行
def run_processing_job(input_s3_path):
    """データ前処理ジョブの実行"""
    sklearn_processor.run(
        inputs=[
            ProcessingInput(
                source=input_s3_path,
                destination='/opt/ml/processing/input',
                input_name='data'
            )
        ],
        outputs=[
            ProcessingOutput(
                source='/opt/ml/processing/train',
                destination=f's3://{bucket}/processed-data/train',
                output_name='train_data'
            ),
            ProcessingOutput(
                source='/opt/ml/processing/validation',
                destination=f's3://{bucket}/processed-data/validation',
                output_name='validation_data'
            ),
            ProcessingOutput(
                source='/opt/ml/processing/test',
                destination=f's3://{bucket}/processed-data/test',
                output_name='test_data'
            )
        ],
        code='preprocessing.py',
        arguments=[
            '--train-test-split-ratio', '0.2'
        ]
    )
    
    print("データ前処理ジョブが完了しました")
    return {
        'train': f's3://{bucket}/processed-data/train',
        'validation': f's3://{bucket}/processed-data/validation',
        'test': f's3://{bucket}/processed-data/test'
    }

# Feature Store利用例
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum

def create_feature_store(df, feature_group_name):
    """SageMaker Feature Storeの作成"""
    # 特徴量定義の作成
    feature_definitions = [
        FeatureDefinition(feature_name=col, feature_type=FeatureTypeEnum.FRACTIONAL)
        if df[col].dtype in ['float64', 'float32'] 
        else FeatureDefinition(feature_name=col, feature_type=FeatureTypeEnum.INTEGRAL)
        for col in df.columns
    ]
    
    # Feature Groupの作成
    feature_group = FeatureGroup(
        name=feature_group_name,
        sagemaker_session=sagemaker_session
    )
    
    feature_group.create(
        s3_uri=f's3://{bucket}/feature-store',
        record_identifier_name='id',
        event_time_feature_name='event_time',
        role_arn=role,
        feature_definitions=feature_definitions,
        enable_online_store=True
    )
    
    print(f"Feature Store '{feature_group_name}' を作成しました")
    return feature_group

print("データ処理コンポーネントの準備完了")

モデル訓練とハイパーパラメータチューニング

from sagemaker.estimator import Estimator
from sagemaker.tuner import HyperparameterTuner, IntegerParameter, ContinuousParameter
from sagemaker.pytorch import PyTorch
from sagemaker.sklearn import SKLearn
from sagemaker.xgboost import XGBoost

# XGBoostを使用した基本的な訓練
def train_xgboost_model(train_path, validation_path):
    """XGBoostモデルの訓練"""
    
    # XGBoost Estimatorの設定
    xgb_estimator = XGBoost(
        entry_point='xgboost_training.py',
        role=role,
        instance_count=1,
        instance_type='ml.m5.xlarge',
        framework_version='1.7-1',
        py_version='py3',
        hyperparameters={
            'objective': 'reg:squarederror',
            'num_round': 100,
            'max_depth': 6,
            'eta': 0.3,
            'subsample': 0.8,
            'colsample_bytree': 0.8
        },
        base_job_name='xgboost-training',
        sagemaker_session=sagemaker_session
    )
    
    # 訓練の実行
    xgb_estimator.fit({
        'train': train_path,
        'validation': validation_path
    })
    
    return xgb_estimator

# カスタムPyTorchモデル訓練
pytorch_training_script = """
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import numpy as np
import argparse
import os
import json

class NeuralNetwork(nn.Module):
    def __init__(self, input_size, hidden_size=64, output_size=1):
        super(NeuralNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
        
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--epochs', type=int, default=100)
    parser.add_argument('--batch-size', type=int, default=32)
    parser.add_argument('--learning-rate', type=float, default=0.001)
    parser.add_argument('--hidden-size', type=int, default=64)
    
    args = parser.parse_args()
    
    # デバイス設定
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # データ読み込み
    train_data = pd.read_csv('/opt/ml/input/data/train/train.csv', header=None)
    val_data = pd.read_csv('/opt/ml/input/data/validation/validation.csv', header=None)
    
    # 特徴量とターゲットの分離
    X_train = train_data.iloc[:, 1:].values.astype(np.float32)
    y_train = train_data.iloc[:, 0].values.astype(np.float32)
    X_val = val_data.iloc[:, 1:].values.astype(np.float32)
    y_val = val_data.iloc[:, 0].values.astype(np.float32)
    
    # データローダーの作成
    train_dataset = TensorDataset(torch.tensor(X_train), torch.tensor(y_train))
    val_dataset = TensorDataset(torch.tensor(X_val), torch.tensor(y_val))
    
    train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=args.batch_size)
    
    # モデル初期化
    input_size = X_train.shape[1]
    model = NeuralNetwork(input_size, args.hidden_size).to(device)
    
    # 損失関数と最適化器
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=args.learning_rate)
    
    # 訓練ループ
    train_losses = []
    val_losses = []
    
    for epoch in range(args.epochs):
        # 訓練
        model.train()
        train_loss = 0.0
        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            
            optimizer.zero_grad()
            outputs = model(batch_X).squeeze()
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        # 検証
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                outputs = model(batch_X).squeeze()
                loss = criterion(outputs, batch_y)
                val_loss += loss.item()
        
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        
        if epoch % 10 == 0:
            print(f'Epoch {epoch}/{args.epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
    
    # モデル保存
    model_path = '/opt/ml/model/model.pth'
    torch.save(model.state_dict(), model_path)
    
    # メタデータ保存
    metadata = {
        'input_size': input_size,
        'hidden_size': args.hidden_size,
        'final_train_loss': train_losses[-1],
        'final_val_loss': val_losses[-1]
    }
    
    with open('/opt/ml/model/metadata.json', 'w') as f:
        json.dump(metadata, f)
    
    print(f'Training completed. Final validation loss: {val_losses[-1]:.4f}')

if __name__ == '__main__':
    main()
"""

# PyTorch訓練スクリプトの保存
with open('pytorch_training.py', 'w') as f:
    f.write(pytorch_training_script)

def train_pytorch_model(train_path, validation_path):
    """PyTorchモデルの訓練"""
    
    pytorch_estimator = PyTorch(
        entry_point='pytorch_training.py',
        role=role,
        instance_count=1,
        instance_type='ml.m5.xlarge',
        framework_version='2.0.0',
        py_version='py310',
        hyperparameters={
            'epochs': 100,
            'batch-size': 32,
            'learning-rate': 0.001,
            'hidden-size': 128
        },
        base_job_name='pytorch-training',
        sagemaker_session=sagemaker_session
    )
    
    pytorch_estimator.fit({
        'train': train_path,
        'validation': validation_path
    })
    
    return pytorch_estimator

# ハイパーパラメータチューニング
def hyperparameter_tuning(train_path, validation_path):
    """ハイパーパラメータ自動チューニング"""
    
    # ベースEstimator
    base_estimator = XGBoost(
        entry_point='xgboost_training.py',
        role=role,
        instance_count=1,
        instance_type='ml.m5.xlarge',
        framework_version='1.7-1',
        py_version='py3',
        base_job_name='xgboost-hpo',
        sagemaker_session=sagemaker_session
    )
    
    # ハイパーパラメータ範囲の定義
    hyperparameter_ranges = {
        'max_depth': IntegerParameter(3, 10),
        'eta': ContinuousParameter(0.01, 0.3),
        'num_round': IntegerParameter(50, 200),
        'subsample': ContinuousParameter(0.5, 1.0),
        'colsample_bytree': ContinuousParameter(0.5, 1.0)
    }
    
    # 目的メトリクス
    objective_metric_name = 'validation:rmse'
    
    # チューナーの設定
    tuner = HyperparameterTuner(
        base_estimator,
        objective_metric_name,
        hyperparameter_ranges,
        max_jobs=20,
        max_parallel_jobs=3,
        objective_type='Minimize',
        base_tuning_job_name='xgboost-hpo'
    )
    
    # チューニング実行
    tuner.fit({
        'train': train_path,
        'validation': validation_path
    })
    
    # 最適なジョブの取得
    best_training_job = tuner.best_training_job()
    print(f"最適なジョブ: {best_training_job}")
    
    return tuner, best_training_job

print("モデル訓練コンポーネントの準備完了")

モデルデプロイと推論

from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.model import Model
from sagemaker.serverless import ServerlessInferenceConfig
from sagemaker.async_inference import AsyncInferenceConfig

# リアルタイム推論エンドポイント
def deploy_realtime_endpoint(estimator):
    """リアルタイム推論エンドポイントのデプロイ"""
    
    predictor = estimator.deploy(
        initial_instance_count=1,
        instance_type='ml.t3.medium',
        serializer=CSVSerializer(),
        deserializer=JSONDeserializer(),
        endpoint_name='my-model-endpoint'
    )
    
    print(f"エンドポイントデプロイ完了: {predictor.endpoint_name}")
    return predictor

# サーバーレス推論エンドポイント
def deploy_serverless_endpoint(estimator):
    """サーバーレス推論エンドポイントのデプロイ"""
    
    serverless_config = ServerlessInferenceConfig(
        memory_size_in_mb=2048,
        max_concurrency=5
    )
    
    predictor = estimator.deploy(
        serverless_inference_config=serverless_config,
        serializer=CSVSerializer(),
        deserializer=JSONDeserializer(),
        endpoint_name='my-serverless-endpoint'
    )
    
    print(f"サーバーレスエンドポイントデプロイ完了: {predictor.endpoint_name}")
    return predictor

# 非同期推論エンドポイント
def deploy_async_endpoint(estimator):
    """非同期推論エンドポイントのデプロイ"""
    
    async_config = AsyncInferenceConfig(
        output_path=f's3://{bucket}/async-inference-output/',
        max_concurrent_invocations_per_instance=4,
        failure_path=f's3://{bucket}/async-inference-errors/'
    )
    
    predictor = estimator.deploy(
        initial_instance_count=1,
        instance_type='ml.m5.large',
        async_inference_config=async_config,
        endpoint_name='my-async-endpoint'
    )
    
    print(f"非同期エンドポイントデプロイ完了: {predictor.endpoint_name}")
    return predictor

# カスタム推論コードの例
inference_script = """
import torch
import torch.nn as nn
import json
import numpy as np

class NeuralNetwork(nn.Module):
    def __init__(self, input_size, hidden_size=64, output_size=1):
        super(NeuralNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
        
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

def model_fn(model_dir):
    \"\"\"モデル読み込み関数\"\"\"
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # メタデータ読み込み
    with open(f'{model_dir}/metadata.json', 'r') as f:
        metadata = json.load(f)
    
    # モデル初期化と重み読み込み
    model = NeuralNetwork(
        input_size=metadata['input_size'],
        hidden_size=metadata['hidden_size']
    )
    model.load_state_dict(torch.load(f'{model_dir}/model.pth', map_location=device))
    model.to(device)
    model.eval()
    
    return model

def input_fn(request_body, request_content_type):
    \"\"\"入力データの前処理\"\"\"
    if request_content_type == 'application/json':
        input_data = json.loads(request_body)
        return np.array(input_data['instances']).astype(np.float32)
    elif request_content_type == 'text/csv':
        return np.loadtxt(request_body.split('\\n'), delimiter=',').astype(np.float32)
    else:
        raise ValueError(f'Unsupported content type: {request_content_type}')

def predict_fn(input_data, model):
    \"\"\"推論実行関数\"\"\"
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    with torch.no_grad():
        if len(input_data.shape) == 1:
            input_data = input_data.reshape(1, -1)
        
        input_tensor = torch.tensor(input_data).to(device)
        predictions = model(input_tensor).cpu().numpy()
        
    return predictions.tolist()

def output_fn(prediction, content_type):
    \"\"\"出力データの後処理\"\"\"
    if content_type == 'application/json':
        return json.dumps({'predictions': prediction})
    elif content_type == 'text/csv':
        return '\\n'.join([str(p[0]) for p in prediction])
    else:
        raise ValueError(f'Unsupported content type: {content_type}')
"""

# 推論スクリプトの保存
with open('inference.py', 'w') as f:
    f.write(inference_script)

# マルチモデルエンドポイント
def deploy_multi_model_endpoint():
    """マルチモデルエンドポイントのデプロイ"""
    from sagemaker.multidatamodel import MultiDataModel
    
    # マルチモデルの設定
    multi_model = MultiDataModel(
        name='multi-model-endpoint',
        model_data_prefix=f's3://{bucket}/multi-model/',
        role=role,
        sagemaker_session=sagemaker_session
    )
    
    predictor = multi_model.deploy(
        initial_instance_count=1,
        instance_type='ml.m5.large',
        endpoint_name='multi-model-endpoint'
    )
    
    return predictor, multi_model

# 推論パフォーマンステスト
def test_endpoint_performance(predictor, test_data, num_requests=100):
    """エンドポイントのパフォーマンステスト"""
    import time
    
    response_times = []
    
    for i in range(num_requests):
        start_time = time.time()
        
        try:
            result = predictor.predict(test_data)
            end_time = time.time()
            response_times.append(end_time - start_time)
            
            if i % 10 == 0:
                print(f"リクエスト {i+1}/{num_requests} 完了")
                
        except Exception as e:
            print(f"リクエスト {i+1} でエラー: {e}")
    
    # 統計情報
    avg_response_time = np.mean(response_times)
    p95_response_time = np.percentile(response_times, 95)
    p99_response_time = np.percentile(response_times, 99)
    
    print(f"=== パフォーマンステスト結果 ===")
    print(f"総リクエスト数: {num_requests}")
    print(f"成功率: {len(response_times)/num_requests*100:.2f}%")
    print(f"平均応答時間: {avg_response_time:.3f}秒")
    print(f"95パーセンタイル: {p95_response_time:.3f}秒")
    print(f"99パーセンタイル: {p99_response_time:.3f}秒")
    
    return response_times

print("モデルデプロイコンポーネントの準備完了")

SageMaker PipelinesとMLOps

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, TuningStep
from sagemaker.workflow.parameters import ParameterString, ParameterInteger, ParameterFloat
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model_metrics import MetricsSource, ModelMetrics

# パイプラインパラメータの定義
def define_pipeline_parameters():
    """パイプラインパラメータの定義"""
    parameters = {
        'input_data': ParameterString(
            name='InputData',
            default_value=f's3://{bucket}/input-data/'
        ),
        'instance_type': ParameterString(
            name='InstanceType',
            default_value='ml.m5.xlarge'
        ),
        'instance_count': ParameterInteger(
            name='InstanceCount',
            default_value=1
        ),
        'max_depth': ParameterInteger(
            name='MaxDepth',
            default_value=6
        ),
        'eta': ParameterFloat(
            name='Eta',
            default_value=0.3
        ),
        'model_approval_status': ParameterString(
            name='ModelApprovalStatus',
            default_value='PendingManualApproval'
        )
    }
    return parameters

# モデル評価ステップ
def create_evaluation_step(model_path, test_data_path, property_file):
    """モデル評価ステップの作成"""
    
    # 評価スクリプト
    evaluation_script = """
import json
import pandas as pd
import numpy as np
import joblib
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import argparse
import os

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--model-path', type=str, default='/opt/ml/processing/model')
    parser.add_argument('--test-data-path', type=str, default='/opt/ml/processing/test')
    args = parser.parse_args()
    
    # テストデータ読み込み
    test_data = pd.read_csv(f'{args.test_data_path}/test.csv', header=None)
    X_test = test_data.iloc[:, 1:].values
    y_test = test_data.iloc[:, 0].values
    
    # モデル読み込み(例:scikit-learn)
    try:
        model = joblib.load(f'{args.model_path}/model.joblib')
    except:
        # カスタムモデルの場合
        print("カスタムモデル評価のロジックを実装してください")
        model = None
    
    if model is not None:
        # 予測実行
        y_pred = model.predict(X_test)
        
        # メトリクス計算
        mse = mean_squared_error(y_test, y_pred)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        
        # 評価結果の保存
        evaluation_result = {
            'regression_metrics': {
                'mse': {'value': float(mse), 'standard_deviation': 'NaN'},
                'rmse': {'value': float(rmse), 'standard_deviation': 'NaN'},
                'mae': {'value': float(mae), 'standard_deviation': 'NaN'},
                'r2': {'value': float(r2), 'standard_deviation': 'NaN'}
            }
        }
        
        # 出力ディレクトリ作成
        output_dir = '/opt/ml/processing/evaluation'
        os.makedirs(output_dir, exist_ok=True)
        
        # JSON形式で保存
        with open(f'{output_dir}/evaluation.json', 'w') as f:
            json.dump(evaluation_result, f, indent=2)
        
        print(f"評価完了 - RMSE: {rmse:.4f}, R2: {r2:.4f}")
    else:
        print("モデル評価をスキップしました")

if __name__ == '__main__':
    main()
"""
    
    # 評価スクリプトの保存
    with open('evaluation.py', 'w') as f:
        f.write(evaluation_script)
    
    # 評価プロセッサの設定
    from sagemaker.sklearn.processing import SKLearnProcessor
    
    eval_processor = SKLearnProcessor(
        framework_version='1.0-1',
        role=role,
        instance_type='ml.m5.xlarge',
        instance_count=1,
        base_job_name='model-evaluation',
        sagemaker_session=sagemaker_session
    )
    
    # 評価ステップの作成
    step_eval = ProcessingStep(
        name='ModelEvaluation',
        processor=eval_processor,
        inputs=[
            ProcessingInput(
                source=model_path,
                destination='/opt/ml/processing/model'
            ),
            ProcessingInput(
                source=test_data_path,
                destination='/opt/ml/processing/test'
            )
        ],
        outputs=[
            ProcessingOutput(
                output_name='evaluation',
                source='/opt/ml/processing/evaluation'
            )
        ],
        code='evaluation.py',
        property_files=[property_file]
    )
    
    return step_eval

# 完全なMLパイプラインの構築
def create_ml_pipeline():
    """完全なMLパイプラインの構築"""
    
    # パラメータ定義
    params = define_pipeline_parameters()
    
    # 1. データ前処理ステップ
    step_process = ProcessingStep(
        name='DataPreprocessing',
        processor=sklearn_processor,
        inputs=[
            ProcessingInput(
                source=params['input_data'],
                destination='/opt/ml/processing/input'
            )
        ],
        outputs=[
            ProcessingOutput(
                output_name='train',
                source='/opt/ml/processing/train'
            ),
            ProcessingOutput(
                output_name='validation',
                source='/opt/ml/processing/validation'
            ),
            ProcessingOutput(
                output_name='test',
                source='/opt/ml/processing/test'
            )
        ],
        code='preprocessing.py'
    )
    
    # 2. ハイパーパラメータチューニングステップ
    base_estimator = XGBoost(
        entry_point='xgboost_training.py',
        role=role,
        instance_count=params['instance_count'],
        instance_type=params['instance_type'],
        framework_version='1.7-1',
        py_version='py3',
        base_job_name='xgboost-training'
    )
    
    tuner = HyperparameterTuner(
        base_estimator,
        objective_metric_name='validation:rmse',
        hyperparameter_ranges={
            'max_depth': IntegerParameter(3, 10),
            'eta': ContinuousParameter(0.01, 0.3),
            'num_round': IntegerParameter(50, 200)
        },
        max_jobs=5,
        max_parallel_jobs=2,
        objective_type='Minimize'
    )
    
    step_tuning = TuningStep(
        name='HyperparameterTuning',
        tuner=tuner,
        inputs={
            'train': TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri,
                content_type='text/csv'
            ),
            'validation': TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs['validation'].S3Output.S3Uri,
                content_type='text/csv'
            )
        }
    )
    
    # 3. モデル評価ステップ
    evaluation_report = PropertyFile(
        name='EvaluationReport',
        output_name='evaluation',
        path='evaluation.json'
    )
    
    step_eval = create_evaluation_step(
        model_path=step_tuning.get_top_model_s3_uri(top_k=0),
        test_data_path=step_process.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
        property_file=evaluation_report
    )
    
    # 4. 条件付きモデル登録ステップ
    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri=f"{step_eval.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']}/evaluation.json",
            content_type="application/json"
        )
    )
    
    step_register = RegisterModel(
        name='RegisterModel',
        estimator=base_estimator,
        model_data=step_tuning.get_top_model_s3_uri(top_k=0),
        content_types=['text/csv'],
        response_types=['text/csv'],
        inference_instances=['ml.t3.medium', 'ml.m5.large'],
        transform_instances=['ml.m5.large'],
        model_package_group_name='MLOps-Model-Package-Group',
        approval_status=params['model_approval_status'],
        model_metrics=model_metrics
    )
    
    # 5. 条件付きデプロイ
    condition_deploy = ConditionGreaterThanOrEqualTo(
        left=JsonGet(
            step_name=step_eval.name,
            property_file=evaluation_report,
            json_path="regression_metrics.r2.value"
        ),
        right=0.8  # R2スコアが0.8以上の場合にデプロイ
    )
    
    step_condition = ConditionStep(
        name='ConditionalDeploy',
        conditions=[condition_deploy],
        if_steps=[step_register],
        else_steps=[]
    )
    
    # パイプライン作成
    pipeline = Pipeline(
        name='MLOps-Pipeline',
        parameters=list(params.values()),
        steps=[step_process, step_tuning, step_eval, step_condition],
        sagemaker_session=sagemaker_session
    )
    
    return pipeline

# パイプラインの実行と監視
def execute_and_monitor_pipeline():
    """パイプラインの実行と監視"""
    
    # パイプライン作成
    pipeline = create_ml_pipeline()
    
    # パイプライン登録
    pipeline.upsert(role_arn=role)
    print(f"パイプライン '{pipeline.name}' を作成/更新しました")
    
    # パイプライン実行
    execution = pipeline.start()
    print(f"パイプライン実行を開始しました: {execution.arn}")
    
    # 実行状況の監視
    execution.describe()
    
    # 完了まで待機
    execution.wait()
    
    # 結果の確認
    steps = execution.list_steps()
    print("=== パイプライン実行結果 ===")
    for step in steps:
        print(f"ステップ: {step['StepName']}, ステータス: {step['StepStatus']}")
    
    return execution

print("SageMaker Pipelines MLOpsコンポーネントの準備完了")

高度な機能と統合

from sagemaker.clarify import SageMakerClarifyProcessor, DataConfig, BiasConfig, ModelConfig
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.jumpstart import utils as jumpstart_utils
from sagemaker.huggingface import HuggingFace

# SageMaker Clarifyによるバイアス検出
def setup_bias_detection(train_data_path, model_name):
    """バイアス検出の設定"""
    
    clarify_processor = SageMakerClarifyProcessor(
        role=role,
        instance_count=1,
        instance_type='ml.m5.xlarge',
        sagemaker_session=sagemaker_session
    )
    
    # データ設定
    bias_data_config = DataConfig(
        s3_data_input_path=train_data_path,
        s3_output_path=f's3://{bucket}/bias-analysis/',
        label='target',
        headers=['target', 'feature1', 'feature2', 'sensitive_feature'],
        dataset_type='text/csv'
    )
    
    # バイアス設定
    bias_config = BiasConfig(
        label_values_or_threshold=[0],
        facet_name='sensitive_feature',
        facet_values_or_threshold=[1]
    )
    
    # モデル設定
    model_config = ModelConfig(
        model_name=model_name,
        instance_type='ml.m5.xlarge',
        instance_count=1,
        accept_type='text/csv'
    )
    
    # バイアス分析実行
    clarify_processor.run_bias(
        data_config=bias_data_config,
        bias_config=bias_config,
        model_config=model_config
    )
    
    print("バイアス分析が完了しました")

# モデル監視の設定
def setup_model_monitoring(endpoint_name, ground_truth_s3_path):
    """モデル監視の設定"""
    
    # データ品質監視
    data_quality_monitor = DefaultModelMonitor(
        role=role,
        instance_count=1,
        instance_type='ml.m5.xlarge',
        volume_size_in_gb=20,
        max_runtime_in_seconds=3600,
        sagemaker_session=sagemaker_session
    )
    
    # ベースライン作成
    baseline_job = data_quality_monitor.suggest_baseline(
        baseline_dataset=f's3://{bucket}/baseline-data/baseline.csv',
        dataset_format='csv',
        output_s3_uri=f's3://{bucket}/baseline-output/',
        wait=True
    )
    
    # 監視スケジュール作成
    monitoring_schedule = data_quality_monitor.create_monitoring_schedule(
        monitor_schedule_name=f'{endpoint_name}-monitoring',
        endpoint_input=endpoint_name,
        output_s3_uri=f's3://{bucket}/monitoring-output/',
        statistics=baseline_job.baseline_statistics(),
        constraints=baseline_job.suggested_constraints(),
        schedule_cron_expression='cron(0 * * * ? *)',  # 毎時実行
        enable_cloudwatch_metrics=True
    )
    
    print(f"モデル監視スケジュール '{monitoring_schedule.monitoring_schedule_name}' を作成しました")
    
    return monitoring_schedule

# JumpStartモデルの利用
def deploy_jumpstart_model():
    """JumpStartモデルのデプロイ"""
    
    # 利用可能なモデルの検索
    models = jumpstart_utils.list_jumpstart_models()
    print(f"利用可能なJumpStartモデル数: {len(models)}")
    
    # 特定のモデル(例:BERT)の選択
    model_id = 'huggingface-text-classification-bert-base-uncased'
    
    # モデル情報の取得
    model_info = jumpstart_utils.get_jumpstart_content_bucket(model_id)
    print(f"モデル情報: {model_info}")
    
    # JumpStartモデルのデプロイ
    from sagemaker.jumpstart.model import JumpStartModel
    
    model = JumpStartModel(
        model_id=model_id,
        role=role,
        sagemaker_session=sagemaker_session
    )
    
    predictor = model.deploy(
        initial_instance_count=1,
        instance_type='ml.m5.large',
        endpoint_name='jumpstart-bert-endpoint'
    )
    
    print(f"JumpStartモデルデプロイ完了: {predictor.endpoint_name}")
    return predictor

# Hugging Face Transformersとの統合
def deploy_huggingface_model():
    """Hugging Faceモデルのデプロイ"""
    
    # Hugging Face Estimatorの設定
    huggingface_estimator = HuggingFace(
        entry_point='train.py',
        source_dir='./scripts',
        instance_type='ml.p3.2xlarge',
        instance_count=1,
        role=role,
        transformers_version='4.21',
        pytorch_version='1.12',
        py_version='py39',
        hyperparameters={
            'model_name_or_path': 'distilbert-base-uncased',
            'task_name': 'sst2',
            'do_train': True,
            'do_eval': True,
            'max_seq_length': 128,
            'per_device_train_batch_size': 32,
            'learning_rate': 2e-5,
            'num_train_epochs': 3,
            'output_dir': '/opt/ml/model'
        }
    )
    
    # 訓練実行
    # huggingface_estimator.fit({'train': 's3://path/to/train', 'test': 's3://path/to/test'})
    
    # デプロイ
    # predictor = huggingface_estimator.deploy(
    #     initial_instance_count=1,
    #     instance_type='ml.m5.large'
    # )
    
    print("Hugging Faceモデル設定完了")

# リソース管理とコスト最適化
def manage_resources_and_costs():
    """リソース管理とコスト最適化"""
    
    # 実行中のエンドポイント一覧
    endpoints = sm_client.list_endpoints(StatusEquals='InService')
    print("=== 実行中のエンドポイント ===")
    for endpoint in endpoints['Endpoints']:
        print(f"- {endpoint['EndpointName']}: {endpoint['EndpointStatus']}")
    
    # 古いモデルの削除
    models = sm_client.list_models()
    print(f"\n=== 登録モデル数: {len(models['Models'])} ===")
    
    # トレーニングジョブの監視
    training_jobs = sm_client.list_training_jobs(
        StatusEquals='InProgress',
        MaxResults=10
    )
    print(f"\n=== 実行中のトレーニングジョブ: {len(training_jobs['TrainingJobSummaries'])} ===")
    
    # プロセシングジョブの監視
    processing_jobs = sm_client.list_processing_jobs(
        StatusEquals='InProgress',
        MaxResults=10
    )
    print(f"=== 実行中のプロセシングジョブ: {len(processing_jobs['ProcessingJobSummaries'])} ===")
    
    # コスト監視のためのタグ付け
    def tag_resources(resource_arn, tags):
        sm_client.add_tags(
            ResourceArn=resource_arn,
            Tags=tags
        )
    
    # 自動停止の設定例
    print("\n=== リソース最適化の推奨事項 ===")
    print("1. 未使用エンドポイントの定期的な削除")
    print("2. 適切なインスタンスタイプの選択")
    print("3. スポットインスタンスの活用")
    print("4. データ圧縮による転送コスト削減")
    print("5. S3 Intelligent-Tiering の活用")

# エラーハンドリングとロバストネス
def implement_error_handling():
    """エラーハンドリングとロバストネスの実装"""
    
    def safe_sagemaker_operation(operation, *args, **kwargs):
        """SageMaker操作の安全な実行"""
        import time
        
        max_retries = 3
        base_delay = 2
        
        for attempt in range(max_retries):
            try:
                return operation(*args, **kwargs)
            except Exception as e:
                if attempt == max_retries - 1:
                    raise e
                
                delay = base_delay * (2 ** attempt)
                print(f"操作失敗(試行 {attempt + 1}/{max_retries}): {e}")
                print(f"{delay}秒待機してリトライします...")
                time.sleep(delay)
    
    # ヘルスチェック関数
    def health_check():
        """SageMaker環境のヘルスチェック"""
        try:
            # 基本的な操作確認
            models = sm_client.list_models(MaxResults=1)
            endpoints = sm_client.list_endpoints(MaxResults=1)
            
            return {
                'status': 'healthy',
                'models_accessible': True,
                'endpoints_accessible': True,
                'timestamp': time.time()
            }
        except Exception as e:
            return {
                'status': 'unhealthy',
                'error': str(e),
                'timestamp': time.time()
            }
    
    # 定期ヘルスチェック
    health_status = health_check()
    print(f"SageMaker環境ステータス: {health_status['status']}")
    
    return safe_sagemaker_operation, health_check

print("高度な機能と統合コンポーネントの準備完了")
safe_operation, health_checker = implement_error_handling()