AWS SageMaker

AIMachine LearningPlatformAWSMLOpsAutoMLCloudEnterprise

AI/ML Platform

AWS SageMaker

Overview

AWS SageMaker is Amazon's comprehensive machine learning platform that covers the entire ML lifecycle. It enables unified management of data preparation, model building, training, deployment, and monitoring, realizing enterprise-level MLOps automation. With rich feature sets including SageMaker Studio, Pipelines, Model Registry, and JumpStart, it provides an integrated environment where users of various skill levels from data scientists to machine learning engineers can efficiently develop AI/ML solutions. Supporting everything from AutoML to custom model development, it caters to diverse needs and assists scalable ML operations through deep integration with the AWS ecosystem.

Details

AWS SageMaker 2025 has evolved significantly as a next-generation integrated platform, providing a unified environment for data, analytics, and AI development through SageMaker Unified Studio (preview). In addition to traditional SageMaker AI, it integrates various inference options including MLOps automation, real-time inference, serverless inference, and batch transformation. Complete with enterprise-grade MLOps capabilities such as SageMaker Pipelines, Model Monitor, Clarify, and Feature Store, it comprehensively supports everything from AutoML to Foundation Model fine-tuning and custom algorithm development. With enterprise security, compliance features, and cost optimization capabilities, it accelerates AI/ML adoption in large organizations.

Key Features

  • Integrated Development Environment: Flexible development experience with SageMaker Studio, Jupyter Notebook, and Visual Studio Code integration
  • MLOps Automation: Full-scale machine learning operations through Pipelines, Model Registry, and CI/CD integration
  • Diverse Inference Options: Real-time, serverless, batch, and asynchronous inference endpoints
  • AutoML Capabilities: No-code/Low-code machine learning with AutoPilot and Canvas
  • Foundation Model Integration: Generative AI utilization through JumpStart and Bedrock integration
  • Enterprise Features: Enterprise-grade security with VPC, IAM, encryption, and audit trails

Advantages and Disadvantages

Advantages

  • Complete cloud-native experience through integration with all AWS services
  • Reduced operational burden and automatic scaling through rich managed services
  • Cost-efficient usage without initial investment through pay-as-you-use pricing
  • Standard enterprise-grade security and compliance features
  • Support for wide range of skill levels from AutoML to custom development
  • Continuous deployment and monitoring of ML models through MLOps automation

Disadvantages

  • Vendor lock-in risk and migration costs due to AWS dependency
  • Complex pricing structure and potential for unexpected high charges
  • Learning costs and configuration complexity due to rich features
  • No on-premises deployment available, requires internet connection
  • Integration constraints with other cloud platforms
  • Development efficiency degradation due to compute instance startup time

Reference Links

Code Examples

Basic Setup and Environment Configuration

# Install SageMaker Python SDK
!pip install sagemaker boto3 pandas numpy

# Import necessary libraries
import sagemaker
import boto3
import pandas as pd
import numpy as np
from sagemaker import get_execution_role
from sagemaker.session import Session

# AWS credential configuration (using IAM role)
role = get_execution_role()
sagemaker_session = Session()
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name

print(f"SageMaker role: {role}")
print(f"SageMaker session region: {region}")
print(f"Default S3 bucket: {bucket}")

# Initialize SageMaker clients
sm_client = boto3.client('sagemaker')
s3_client = boto3.client('s3')

# Check available instance types
def list_available_instances():
    """Check available instance types"""
    training_instances = [
        'ml.m5.large', 'ml.m5.xlarge', 'ml.m5.2xlarge',
        'ml.c5.xlarge', 'ml.c5.2xlarge', 'ml.c5.4xlarge',
        'ml.p3.2xlarge', 'ml.p3.8xlarge', 'ml.p3.16xlarge',
        'ml.g4dn.xlarge', 'ml.g4dn.2xlarge'
    ]
    
    inference_instances = [
        'ml.t3.medium', 'ml.t3.large', 'ml.t3.xlarge',
        'ml.m5.large', 'ml.m5.xlarge', 'ml.c5.large',
        'ml.c5.xlarge', 'ml.g4dn.xlarge'
    ]
    
    print("=== Recommended Training Instances ===")
    for instance in training_instances:
        print(f"- {instance}")
    
    print("\n=== Recommended Inference Instances ===")
    for instance in inference_instances:
        print(f"- {instance}")

list_available_instances()

# S3 data upload function
def upload_data_to_s3(local_path, s3_prefix):
    """Upload local data to S3"""
    s3_path = f"s3://{bucket}/{s3_prefix}"
    sagemaker_session.upload_data(
        path=local_path,
        bucket=bucket,
        key_prefix=s3_prefix
    )
    print(f"Data uploaded to {s3_path}")
    return s3_path

# Display basic environment information
print(f"\n=== SageMaker Environment Information ===")
print(f"SageMaker SDK version: {sagemaker.__version__}")
print(f"Python execution environment: {sagemaker_session.boto_session.region_name}")

Data Processing and Feature Engineering

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
import json

# Configure SKLearn Processor
sklearn_processor = SKLearnProcessor(
    framework_version='1.0-1',
    role=role,
    instance_type='ml.m5.xlarge',
    instance_count=1,
    base_job_name='data-preprocessing',
    sagemaker_session=sagemaker_session
)

# Create data preprocessing script
preprocessing_script = """
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
import argparse
import os

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--train-test-split-ratio', type=float, default=0.2)
    args = parser.parse_args()
    
    # Load data
    input_data_path = '/opt/ml/processing/input/data.csv'
    df = pd.read_csv(input_data_path)
    
    print(f"Data shape: {df.shape}")
    print(f"Missing values: {df.isnull().sum().sum()}")
    
    # Basic preprocessing
    # Handle missing values
    df = df.fillna(df.mean(numeric_only=True))
    
    # Encode categorical variables
    categorical_columns = df.select_dtypes(include=['object']).columns
    label_encoders = {}
    
    for col in categorical_columns:
        if col != 'target':  # Exclude target column
            le = LabelEncoder()
            df[col] = le.fit_transform(df[col].astype(str))
            label_encoders[col] = le
    
    # Separate features and target
    if 'target' in df.columns:
        X = df.drop('target', axis=1)
        y = df['target']
    else:
        X = df.iloc[:, :-1]
        y = df.iloc[:, -1]
    
    # Standardization
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    X_scaled = pd.DataFrame(X_scaled, columns=X.columns)
    
    # Train/validation/test split
    X_train, X_temp, y_train, y_temp = train_test_split(
        X_scaled, y, test_size=args.train_test_split_ratio * 2, random_state=42
    )
    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, random_state=42
    )
    
    # Combine data
    train_data = pd.concat([y_train, X_train], axis=1)
    val_data = pd.concat([y_val, X_val], axis=1)
    test_data = pd.concat([y_test, X_test], axis=1)
    
    # Create output directories
    os.makedirs('/opt/ml/processing/train', exist_ok=True)
    os.makedirs('/opt/ml/processing/validation', exist_ok=True)
    os.makedirs('/opt/ml/processing/test', exist_ok=True)
    
    # Save data
    train_data.to_csv('/opt/ml/processing/train/train.csv', index=False, header=False)
    val_data.to_csv('/opt/ml/processing/validation/validation.csv', index=False, header=False)
    test_data.to_csv('/opt/ml/processing/test/test.csv', index=False, header=False)
    
    # Save metadata
    metadata = {
        'feature_columns': list(X.columns),
        'target_column': 'target',
        'scaler_params': {
            'mean': scaler.mean_.tolist(),
            'scale': scaler.scale_.tolist()
        },
        'train_shape': train_data.shape,
        'val_shape': val_data.shape,
        'test_shape': test_data.shape
    }
    
    with open('/opt/ml/processing/train/metadata.json', 'w') as f:
        json.dump(metadata, f, indent=2)
    
    print("Data preprocessing completed")
    print(f"Training data: {train_data.shape}")
    print(f"Validation data: {val_data.shape}")
    print(f"Test data: {test_data.shape}")

if __name__ == '__main__':
    main()
"""

# Save script file
with open('preprocessing.py', 'w') as f:
    f.write(preprocessing_script)

# Execute processing job
def run_processing_job(input_s3_path):
    """Execute data preprocessing job"""
    sklearn_processor.run(
        inputs=[
            ProcessingInput(
                source=input_s3_path,
                destination='/opt/ml/processing/input',
                input_name='data'
            )
        ],
        outputs=[
            ProcessingOutput(
                source='/opt/ml/processing/train',
                destination=f's3://{bucket}/processed-data/train',
                output_name='train_data'
            ),
            ProcessingOutput(
                source='/opt/ml/processing/validation',
                destination=f's3://{bucket}/processed-data/validation',
                output_name='validation_data'
            ),
            ProcessingOutput(
                source='/opt/ml/processing/test',
                destination=f's3://{bucket}/processed-data/test',
                output_name='test_data'
            )
        ],
        code='preprocessing.py',
        arguments=[
            '--train-test-split-ratio', '0.2'
        ]
    )
    
    print("Data preprocessing job completed")
    return {
        'train': f's3://{bucket}/processed-data/train',
        'validation': f's3://{bucket}/processed-data/validation',
        'test': f's3://{bucket}/processed-data/test'
    }

# Feature Store usage example
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum

def create_feature_store(df, feature_group_name):
    """Create SageMaker Feature Store"""
    # Create feature definitions
    feature_definitions = [
        FeatureDefinition(feature_name=col, feature_type=FeatureTypeEnum.FRACTIONAL)
        if df[col].dtype in ['float64', 'float32'] 
        else FeatureDefinition(feature_name=col, feature_type=FeatureTypeEnum.INTEGRAL)
        for col in df.columns
    ]
    
    # Create Feature Group
    feature_group = FeatureGroup(
        name=feature_group_name,
        sagemaker_session=sagemaker_session
    )
    
    feature_group.create(
        s3_uri=f's3://{bucket}/feature-store',
        record_identifier_name='id',
        event_time_feature_name='event_time',
        role_arn=role,
        feature_definitions=feature_definitions,
        enable_online_store=True
    )
    
    print(f"Feature Store '{feature_group_name}' created")
    return feature_group

print("Data processing components ready")

Model Training and Hyperparameter Tuning

from sagemaker.estimator import Estimator
from sagemaker.tuner import HyperparameterTuner, IntegerParameter, ContinuousParameter
from sagemaker.pytorch import PyTorch
from sagemaker.sklearn import SKLearn
from sagemaker.xgboost import XGBoost

# Basic training using XGBoost
def train_xgboost_model(train_path, validation_path):
    """Train XGBoost model"""
    
    # Configure XGBoost Estimator
    xgb_estimator = XGBoost(
        entry_point='xgboost_training.py',
        role=role,
        instance_count=1,
        instance_type='ml.m5.xlarge',
        framework_version='1.7-1',
        py_version='py3',
        hyperparameters={
            'objective': 'reg:squarederror',
            'num_round': 100,
            'max_depth': 6,
            'eta': 0.3,
            'subsample': 0.8,
            'colsample_bytree': 0.8
        },
        base_job_name='xgboost-training',
        sagemaker_session=sagemaker_session
    )
    
    # Execute training
    xgb_estimator.fit({
        'train': train_path,
        'validation': validation_path
    })
    
    return xgb_estimator

# Custom PyTorch model training
pytorch_training_script = """
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import numpy as np
import argparse
import os
import json

class NeuralNetwork(nn.Module):
    def __init__(self, input_size, hidden_size=64, output_size=1):
        super(NeuralNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
        
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--epochs', type=int, default=100)
    parser.add_argument('--batch-size', type=int, default=32)
    parser.add_argument('--learning-rate', type=float, default=0.001)
    parser.add_argument('--hidden-size', type=int, default=64)
    
    args = parser.parse_args()
    
    # Device configuration
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Load data
    train_data = pd.read_csv('/opt/ml/input/data/train/train.csv', header=None)
    val_data = pd.read_csv('/opt/ml/input/data/validation/validation.csv', header=None)
    
    # Separate features and target
    X_train = train_data.iloc[:, 1:].values.astype(np.float32)
    y_train = train_data.iloc[:, 0].values.astype(np.float32)
    X_val = val_data.iloc[:, 1:].values.astype(np.float32)
    y_val = val_data.iloc[:, 0].values.astype(np.float32)
    
    # Create data loaders
    train_dataset = TensorDataset(torch.tensor(X_train), torch.tensor(y_train))
    val_dataset = TensorDataset(torch.tensor(X_val), torch.tensor(y_val))
    
    train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=args.batch_size)
    
    # Initialize model
    input_size = X_train.shape[1]
    model = NeuralNetwork(input_size, args.hidden_size).to(device)
    
    # Loss function and optimizer
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=args.learning_rate)
    
    # Training loop
    train_losses = []
    val_losses = []
    
    for epoch in range(args.epochs):
        # Training
        model.train()
        train_loss = 0.0
        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            
            optimizer.zero_grad()
            outputs = model(batch_X).squeeze()
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        # Validation
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                outputs = model(batch_X).squeeze()
                loss = criterion(outputs, batch_y)
                val_loss += loss.item()
        
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        
        if epoch % 10 == 0:
            print(f'Epoch {epoch}/{args.epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
    
    # Save model
    model_path = '/opt/ml/model/model.pth'
    torch.save(model.state_dict(), model_path)
    
    # Save metadata
    metadata = {
        'input_size': input_size,
        'hidden_size': args.hidden_size,
        'final_train_loss': train_losses[-1],
        'final_val_loss': val_losses[-1]
    }
    
    with open('/opt/ml/model/metadata.json', 'w') as f:
        json.dump(metadata, f)
    
    print(f'Training completed. Final validation loss: {val_losses[-1]:.4f}')

if __name__ == '__main__':
    main()
"""

# Save PyTorch training script
with open('pytorch_training.py', 'w') as f:
    f.write(pytorch_training_script)

def train_pytorch_model(train_path, validation_path):
    """Train PyTorch model"""
    
    pytorch_estimator = PyTorch(
        entry_point='pytorch_training.py',
        role=role,
        instance_count=1,
        instance_type='ml.m5.xlarge',
        framework_version='2.0.0',
        py_version='py310',
        hyperparameters={
            'epochs': 100,
            'batch-size': 32,
            'learning-rate': 0.001,
            'hidden-size': 128
        },
        base_job_name='pytorch-training',
        sagemaker_session=sagemaker_session
    )
    
    pytorch_estimator.fit({
        'train': train_path,
        'validation': validation_path
    })
    
    return pytorch_estimator

# Hyperparameter tuning
def hyperparameter_tuning(train_path, validation_path):
    """Automatic hyperparameter tuning"""
    
    # Base Estimator
    base_estimator = XGBoost(
        entry_point='xgboost_training.py',
        role=role,
        instance_count=1,
        instance_type='ml.m5.xlarge',
        framework_version='1.7-1',
        py_version='py3',
        base_job_name='xgboost-hpo',
        sagemaker_session=sagemaker_session
    )
    
    # Define hyperparameter ranges
    hyperparameter_ranges = {
        'max_depth': IntegerParameter(3, 10),
        'eta': ContinuousParameter(0.01, 0.3),
        'num_round': IntegerParameter(50, 200),
        'subsample': ContinuousParameter(0.5, 1.0),
        'colsample_bytree': ContinuousParameter(0.5, 1.0)
    }
    
    # Objective metric
    objective_metric_name = 'validation:rmse'
    
    # Configure tuner
    tuner = HyperparameterTuner(
        base_estimator,
        objective_metric_name,
        hyperparameter_ranges,
        max_jobs=20,
        max_parallel_jobs=3,
        objective_type='Minimize',
        base_tuning_job_name='xgboost-hpo'
    )
    
    # Execute tuning
    tuner.fit({
        'train': train_path,
        'validation': validation_path
    })
    
    # Get best job
    best_training_job = tuner.best_training_job()
    print(f"Best job: {best_training_job}")
    
    return tuner, best_training_job

print("Model training components ready")

Model Deployment and Inference

from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.model import Model
from sagemaker.serverless import ServerlessInferenceConfig
from sagemaker.async_inference import AsyncInferenceConfig

# Real-time inference endpoint
def deploy_realtime_endpoint(estimator):
    """Deploy real-time inference endpoint"""
    
    predictor = estimator.deploy(
        initial_instance_count=1,
        instance_type='ml.t3.medium',
        serializer=CSVSerializer(),
        deserializer=JSONDeserializer(),
        endpoint_name='my-model-endpoint'
    )
    
    print(f"Endpoint deployment completed: {predictor.endpoint_name}")
    return predictor

# Serverless inference endpoint
def deploy_serverless_endpoint(estimator):
    """Deploy serverless inference endpoint"""
    
    serverless_config = ServerlessInferenceConfig(
        memory_size_in_mb=2048,
        max_concurrency=5
    )
    
    predictor = estimator.deploy(
        serverless_inference_config=serverless_config,
        serializer=CSVSerializer(),
        deserializer=JSONDeserializer(),
        endpoint_name='my-serverless-endpoint'
    )
    
    print(f"Serverless endpoint deployment completed: {predictor.endpoint_name}")
    return predictor

# Asynchronous inference endpoint
def deploy_async_endpoint(estimator):
    """Deploy asynchronous inference endpoint"""
    
    async_config = AsyncInferenceConfig(
        output_path=f's3://{bucket}/async-inference-output/',
        max_concurrent_invocations_per_instance=4,
        failure_path=f's3://{bucket}/async-inference-errors/'
    )
    
    predictor = estimator.deploy(
        initial_instance_count=1,
        instance_type='ml.m5.large',
        async_inference_config=async_config,
        endpoint_name='my-async-endpoint'
    )
    
    print(f"Async endpoint deployment completed: {predictor.endpoint_name}")
    return predictor

# Custom inference code example
inference_script = """
import torch
import torch.nn as nn
import json
import numpy as np

class NeuralNetwork(nn.Module):
    def __init__(self, input_size, hidden_size=64, output_size=1):
        super(NeuralNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
        
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

def model_fn(model_dir):
    \"\"\"Model loading function\"\"\"
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Load metadata
    with open(f'{model_dir}/metadata.json', 'r') as f:
        metadata = json.load(f)
    
    # Initialize model and load weights
    model = NeuralNetwork(
        input_size=metadata['input_size'],
        hidden_size=metadata['hidden_size']
    )
    model.load_state_dict(torch.load(f'{model_dir}/model.pth', map_location=device))
    model.to(device)
    model.eval()
    
    return model

def input_fn(request_body, request_content_type):
    \"\"\"Input data preprocessing\"\"\"
    if request_content_type == 'application/json':
        input_data = json.loads(request_body)
        return np.array(input_data['instances']).astype(np.float32)
    elif request_content_type == 'text/csv':
        return np.loadtxt(request_body.split('\\n'), delimiter=',').astype(np.float32)
    else:
        raise ValueError(f'Unsupported content type: {request_content_type}')

def predict_fn(input_data, model):
    \"\"\"Inference execution function\"\"\"
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    with torch.no_grad():
        if len(input_data.shape) == 1:
            input_data = input_data.reshape(1, -1)
        
        input_tensor = torch.tensor(input_data).to(device)
        predictions = model(input_tensor).cpu().numpy()
        
    return predictions.tolist()

def output_fn(prediction, content_type):
    \"\"\"Output data postprocessing\"\"\"
    if content_type == 'application/json':
        return json.dumps({'predictions': prediction})
    elif content_type == 'text/csv':
        return '\\n'.join([str(p[0]) for p in prediction])
    else:
        raise ValueError(f'Unsupported content type: {content_type}')
"""

# Save inference script
with open('inference.py', 'w') as f:
    f.write(inference_script)

# Multi-model endpoint
def deploy_multi_model_endpoint():
    """Deploy multi-model endpoint"""
    from sagemaker.multidatamodel import MultiDataModel
    
    # Configure multi-model
    multi_model = MultiDataModel(
        name='multi-model-endpoint',
        model_data_prefix=f's3://{bucket}/multi-model/',
        role=role,
        sagemaker_session=sagemaker_session
    )
    
    predictor = multi_model.deploy(
        initial_instance_count=1,
        instance_type='ml.m5.large',
        endpoint_name='multi-model-endpoint'
    )
    
    return predictor, multi_model

# Inference performance test
def test_endpoint_performance(predictor, test_data, num_requests=100):
    """Test endpoint performance"""
    import time
    
    response_times = []
    
    for i in range(num_requests):
        start_time = time.time()
        
        try:
            result = predictor.predict(test_data)
            end_time = time.time()
            response_times.append(end_time - start_time)
            
            if i % 10 == 0:
                print(f"Request {i+1}/{num_requests} completed")
                
        except Exception as e:
            print(f"Error in request {i+1}: {e}")
    
    # Statistics
    avg_response_time = np.mean(response_times)
    p95_response_time = np.percentile(response_times, 95)
    p99_response_time = np.percentile(response_times, 99)
    
    print(f"=== Performance Test Results ===")
    print(f"Total requests: {num_requests}")
    print(f"Success rate: {len(response_times)/num_requests*100:.2f}%")
    print(f"Average response time: {avg_response_time:.3f}s")
    print(f"95th percentile: {p95_response_time:.3f}s")
    print(f"99th percentile: {p99_response_time:.3f}s")
    
    return response_times

print("Model deployment components ready")

SageMaker Pipelines and MLOps

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, TuningStep
from sagemaker.workflow.parameters import ParameterString, ParameterInteger, ParameterFloat
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model_metrics import MetricsSource, ModelMetrics

# Define pipeline parameters
def define_pipeline_parameters():
    """Define pipeline parameters"""
    parameters = {
        'input_data': ParameterString(
            name='InputData',
            default_value=f's3://{bucket}/input-data/'
        ),
        'instance_type': ParameterString(
            name='InstanceType',
            default_value='ml.m5.xlarge'
        ),
        'instance_count': ParameterInteger(
            name='InstanceCount',
            default_value=1
        ),
        'max_depth': ParameterInteger(
            name='MaxDepth',
            default_value=6
        ),
        'eta': ParameterFloat(
            name='Eta',
            default_value=0.3
        ),
        'model_approval_status': ParameterString(
            name='ModelApprovalStatus',
            default_value='PendingManualApproval'
        )
    }
    return parameters

# Model evaluation step
def create_evaluation_step(model_path, test_data_path, property_file):
    """Create model evaluation step"""
    
    # Evaluation script
    evaluation_script = """
import json
import pandas as pd
import numpy as np
import joblib
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import argparse
import os

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--model-path', type=str, default='/opt/ml/processing/model')
    parser.add_argument('--test-data-path', type=str, default='/opt/ml/processing/test')
    args = parser.parse_args()
    
    # Load test data
    test_data = pd.read_csv(f'{args.test_data_path}/test.csv', header=None)
    X_test = test_data.iloc[:, 1:].values
    y_test = test_data.iloc[:, 0].values
    
    # Load model (example: scikit-learn)
    try:
        model = joblib.load(f'{args.model_path}/model.joblib')
    except:
        # For custom models
        print("Please implement custom model evaluation logic")
        model = None
    
    if model is not None:
        # Execute prediction
        y_pred = model.predict(X_test)
        
        # Calculate metrics
        mse = mean_squared_error(y_test, y_pred)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        
        # Save evaluation results
        evaluation_result = {
            'regression_metrics': {
                'mse': {'value': float(mse), 'standard_deviation': 'NaN'},
                'rmse': {'value': float(rmse), 'standard_deviation': 'NaN'},
                'mae': {'value': float(mae), 'standard_deviation': 'NaN'},
                'r2': {'value': float(r2), 'standard_deviation': 'NaN'}
            }
        }
        
        # Create output directory
        output_dir = '/opt/ml/processing/evaluation'
        os.makedirs(output_dir, exist_ok=True)
        
        # Save in JSON format
        with open(f'{output_dir}/evaluation.json', 'w') as f:
            json.dump(evaluation_result, f, indent=2)
        
        print(f"Evaluation completed - RMSE: {rmse:.4f}, R2: {r2:.4f}")
    else:
        print("Model evaluation skipped")

if __name__ == '__main__':
    main()
"""
    
    # Save evaluation script
    with open('evaluation.py', 'w') as f:
        f.write(evaluation_script)
    
    # Configure evaluation processor
    from sagemaker.sklearn.processing import SKLearnProcessor
    
    eval_processor = SKLearnProcessor(
        framework_version='1.0-1',
        role=role,
        instance_type='ml.m5.xlarge',
        instance_count=1,
        base_job_name='model-evaluation',
        sagemaker_session=sagemaker_session
    )
    
    # Create evaluation step
    step_eval = ProcessingStep(
        name='ModelEvaluation',
        processor=eval_processor,
        inputs=[
            ProcessingInput(
                source=model_path,
                destination='/opt/ml/processing/model'
            ),
            ProcessingInput(
                source=test_data_path,
                destination='/opt/ml/processing/test'
            )
        ],
        outputs=[
            ProcessingOutput(
                output_name='evaluation',
                source='/opt/ml/processing/evaluation'
            )
        ],
        code='evaluation.py',
        property_files=[property_file]
    )
    
    return step_eval

# Build complete ML pipeline
def create_ml_pipeline():
    """Build complete ML pipeline"""
    
    # Define parameters
    params = define_pipeline_parameters()
    
    # 1. Data preprocessing step
    step_process = ProcessingStep(
        name='DataPreprocessing',
        processor=sklearn_processor,
        inputs=[
            ProcessingInput(
                source=params['input_data'],
                destination='/opt/ml/processing/input'
            )
        ],
        outputs=[
            ProcessingOutput(
                output_name='train',
                source='/opt/ml/processing/train'
            ),
            ProcessingOutput(
                output_name='validation',
                source='/opt/ml/processing/validation'
            ),
            ProcessingOutput(
                output_name='test',
                source='/opt/ml/processing/test'
            )
        ],
        code='preprocessing.py'
    )
    
    # 2. Hyperparameter tuning step
    base_estimator = XGBoost(
        entry_point='xgboost_training.py',
        role=role,
        instance_count=params['instance_count'],
        instance_type=params['instance_type'],
        framework_version='1.7-1',
        py_version='py3',
        base_job_name='xgboost-training'
    )
    
    tuner = HyperparameterTuner(
        base_estimator,
        objective_metric_name='validation:rmse',
        hyperparameter_ranges={
            'max_depth': IntegerParameter(3, 10),
            'eta': ContinuousParameter(0.01, 0.3),
            'num_round': IntegerParameter(50, 200)
        },
        max_jobs=5,
        max_parallel_jobs=2,
        objective_type='Minimize'
    )
    
    step_tuning = TuningStep(
        name='HyperparameterTuning',
        tuner=tuner,
        inputs={
            'train': TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri,
                content_type='text/csv'
            ),
            'validation': TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs['validation'].S3Output.S3Uri,
                content_type='text/csv'
            )
        }
    )
    
    # 3. Model evaluation step
    evaluation_report = PropertyFile(
        name='EvaluationReport',
        output_name='evaluation',
        path='evaluation.json'
    )
    
    step_eval = create_evaluation_step(
        model_path=step_tuning.get_top_model_s3_uri(top_k=0),
        test_data_path=step_process.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
        property_file=evaluation_report
    )
    
    # 4. Conditional model registration step
    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri=f"{step_eval.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']}/evaluation.json",
            content_type="application/json"
        )
    )
    
    step_register = RegisterModel(
        name='RegisterModel',
        estimator=base_estimator,
        model_data=step_tuning.get_top_model_s3_uri(top_k=0),
        content_types=['text/csv'],
        response_types=['text/csv'],
        inference_instances=['ml.t3.medium', 'ml.m5.large'],
        transform_instances=['ml.m5.large'],
        model_package_group_name='MLOps-Model-Package-Group',
        approval_status=params['model_approval_status'],
        model_metrics=model_metrics
    )
    
    # 5. Conditional deployment
    condition_deploy = ConditionGreaterThanOrEqualTo(
        left=JsonGet(
            step_name=step_eval.name,
            property_file=evaluation_report,
            json_path="regression_metrics.r2.value"
        ),
        right=0.8  # Deploy if R2 score >= 0.8
    )
    
    step_condition = ConditionStep(
        name='ConditionalDeploy',
        conditions=[condition_deploy],
        if_steps=[step_register],
        else_steps=[]
    )
    
    # Create pipeline
    pipeline = Pipeline(
        name='MLOps-Pipeline',
        parameters=list(params.values()),
        steps=[step_process, step_tuning, step_eval, step_condition],
        sagemaker_session=sagemaker_session
    )
    
    return pipeline

# Execute and monitor pipeline
def execute_and_monitor_pipeline():
    """Execute and monitor pipeline"""
    
    # Create pipeline
    pipeline = create_ml_pipeline()
    
    # Register pipeline
    pipeline.upsert(role_arn=role)
    print(f"Pipeline '{pipeline.name}' created/updated")
    
    # Execute pipeline
    execution = pipeline.start()
    print(f"Pipeline execution started: {execution.arn}")
    
    # Monitor execution status
    execution.describe()
    
    # Wait for completion
    execution.wait()
    
    # Check results
    steps = execution.list_steps()
    print("=== Pipeline Execution Results ===")
    for step in steps:
        print(f"Step: {step['StepName']}, Status: {step['StepStatus']}")
    
    return execution

print("SageMaker Pipelines MLOps components ready")

Advanced Features and Integrations

from sagemaker.clarify import SageMakerClarifyProcessor, DataConfig, BiasConfig, ModelConfig
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.jumpstart import utils as jumpstart_utils
from sagemaker.huggingface import HuggingFace

# Bias detection with SageMaker Clarify
def setup_bias_detection(train_data_path, model_name):
    """Setup bias detection"""
    
    clarify_processor = SageMakerClarifyProcessor(
        role=role,
        instance_count=1,
        instance_type='ml.m5.xlarge',
        sagemaker_session=sagemaker_session
    )
    
    # Data configuration
    bias_data_config = DataConfig(
        s3_data_input_path=train_data_path,
        s3_output_path=f's3://{bucket}/bias-analysis/',
        label='target',
        headers=['target', 'feature1', 'feature2', 'sensitive_feature'],
        dataset_type='text/csv'
    )
    
    # Bias configuration
    bias_config = BiasConfig(
        label_values_or_threshold=[0],
        facet_name='sensitive_feature',
        facet_values_or_threshold=[1]
    )
    
    # Model configuration
    model_config = ModelConfig(
        model_name=model_name,
        instance_type='ml.m5.xlarge',
        instance_count=1,
        accept_type='text/csv'
    )
    
    # Execute bias analysis
    clarify_processor.run_bias(
        data_config=bias_data_config,
        bias_config=bias_config,
        model_config=model_config
    )
    
    print("Bias analysis completed")

# Setup model monitoring
def setup_model_monitoring(endpoint_name, ground_truth_s3_path):
    """Setup model monitoring"""
    
    # Data quality monitoring
    data_quality_monitor = DefaultModelMonitor(
        role=role,
        instance_count=1,
        instance_type='ml.m5.xlarge',
        volume_size_in_gb=20,
        max_runtime_in_seconds=3600,
        sagemaker_session=sagemaker_session
    )
    
    # Create baseline
    baseline_job = data_quality_monitor.suggest_baseline(
        baseline_dataset=f's3://{bucket}/baseline-data/baseline.csv',
        dataset_format='csv',
        output_s3_uri=f's3://{bucket}/baseline-output/',
        wait=True
    )
    
    # Create monitoring schedule
    monitoring_schedule = data_quality_monitor.create_monitoring_schedule(
        monitor_schedule_name=f'{endpoint_name}-monitoring',
        endpoint_input=endpoint_name,
        output_s3_uri=f's3://{bucket}/monitoring-output/',
        statistics=baseline_job.baseline_statistics(),
        constraints=baseline_job.suggested_constraints(),
        schedule_cron_expression='cron(0 * * * ? *)',  # Run hourly
        enable_cloudwatch_metrics=True
    )
    
    print(f"Model monitoring schedule '{monitoring_schedule.monitoring_schedule_name}' created")
    
    return monitoring_schedule

# Deploy JumpStart model
def deploy_jumpstart_model():
    """Deploy JumpStart model"""
    
    # Search available models
    models = jumpstart_utils.list_jumpstart_models()
    print(f"Available JumpStart models: {len(models)}")
    
    # Select specific model (example: BERT)
    model_id = 'huggingface-text-classification-bert-base-uncased'
    
    # Get model information
    model_info = jumpstart_utils.get_jumpstart_content_bucket(model_id)
    print(f"Model info: {model_info}")
    
    # Deploy JumpStart model
    from sagemaker.jumpstart.model import JumpStartModel
    
    model = JumpStartModel(
        model_id=model_id,
        role=role,
        sagemaker_session=sagemaker_session
    )
    
    predictor = model.deploy(
        initial_instance_count=1,
        instance_type='ml.m5.large',
        endpoint_name='jumpstart-bert-endpoint'
    )
    
    print(f"JumpStart model deployment completed: {predictor.endpoint_name}")
    return predictor

# Resource management and cost optimization
def manage_resources_and_costs():
    """Resource management and cost optimization"""
    
    # List running endpoints
    endpoints = sm_client.list_endpoints(StatusEquals='InService')
    print("=== Running Endpoints ===")
    for endpoint in endpoints['Endpoints']:
        print(f"- {endpoint['EndpointName']}: {endpoint['EndpointStatus']}")
    
    # Delete old models
    models = sm_client.list_models()
    print(f"\n=== Registered Models: {len(models['Models'])} ===")
    
    # Monitor training jobs
    training_jobs = sm_client.list_training_jobs(
        StatusEquals='InProgress',
        MaxResults=10
    )
    print(f"\n=== Running Training Jobs: {len(training_jobs['TrainingJobSummaries'])} ===")
    
    # Monitor processing jobs
    processing_jobs = sm_client.list_processing_jobs(
        StatusEquals='InProgress',
        MaxResults=10
    )
    print(f"=== Running Processing Jobs: {len(processing_jobs['ProcessingJobSummaries'])} ===")
    
    # Cost monitoring recommendations
    print("\n=== Resource Optimization Recommendations ===")
    print("1. Regular deletion of unused endpoints")
    print("2. Appropriate instance type selection")
    print("3. Utilize spot instances")
    print("4. Data compression for transfer cost reduction")
    print("5. Leverage S3 Intelligent-Tiering")

# Error handling and robustness
def implement_error_handling():
    """Implement error handling and robustness"""
    
    def safe_sagemaker_operation(operation, *args, **kwargs):
        """Safe execution of SageMaker operations"""
        import time
        
        max_retries = 3
        base_delay = 2
        
        for attempt in range(max_retries):
            try:
                return operation(*args, **kwargs)
            except Exception as e:
                if attempt == max_retries - 1:
                    raise e
                
                delay = base_delay * (2 ** attempt)
                print(f"Operation failed (attempt {attempt + 1}/{max_retries}): {e}")
                print(f"Waiting {delay} seconds for retry...")
                time.sleep(delay)
    
    # Health check function
    def health_check():
        """SageMaker environment health check"""
        try:
            # Basic operation verification
            models = sm_client.list_models(MaxResults=1)
            endpoints = sm_client.list_endpoints(MaxResults=1)
            
            return {
                'status': 'healthy',
                'models_accessible': True,
                'endpoints_accessible': True,
                'timestamp': time.time()
            }
        except Exception as e:
            return {
                'status': 'unhealthy',
                'error': str(e),
                'timestamp': time.time()
            }
    
    # Regular health check
    health_status = health_check()
    print(f"SageMaker environment status: {health_status['status']}")
    
    return safe_sagemaker_operation, health_check

print("Advanced features and integration components ready")
safe_operation, health_checker = implement_error_handling()