Upstash

データベースRedisKafkaサーバーレスエッジ従量課金グローバル分散リアルタイム

データベースプラットフォーム

Upstash

概要

Upstashは、RedisとKafkaをサーバーレスで提供するデータプラットフォームです。従量課金制(pay-per-request)により、使用した分だけ支払うコスト効率的なモデルを採用し、8以上のリージョンにデータをレプリケートしてグローバルな低レイテンシアクセスを実現します。Vercel Edge、Cloudflare Workers、Fastly EdgeなどのエッジコンピューティングプラットフォームとSeamlessに統合され、モダンなサーバーレスアプリケーション開発に最適化されています。

詳細

サーバーレスRedis

Upstash Redisは、標準的なRedisプロトコルに加えてHTTP/REST APIを提供し、TCPコネクションが制限される環境(Cloudflare Workersなど)でも利用可能です。データは自動的に複数リージョンにレプリケートされ、最寄りのリージョンから低レイテンシでアクセスできます。

サーバーレスKafka

イベントストリーミングプラットフォームKafkaをサーバーレス化し、HTTP APIによりサーバーレス関数やエッジ関数からメッセージの送受信が可能です。メッセージ数に基づく課金により、トラフィックの変動に柔軟に対応します。

エッジ最適化

Vercel、Cloudflare、Netlifyなどの主要エッジプラットフォームで検証・最適化されており、グローバルに分散されたアプリケーションで最高のパフォーマンスを発揮します。専用のSDKにより、エッジ環境での開発体験を向上させています。

2024年の新サービス

  • Vector: 機械学習とAIアプリケーション向けのサーバーレスベクターデータベース
  • QStash: HTTPベースのメッセージングサービス。CRONスケジューリングと自動リトライ機能を提供

メリット・デメリット

メリット

  • 真の従量課金: アイドル時の料金ゼロ、使用分のみ課金
  • 価格上限保証: 月額料金の上限設定により予期しない高額請求を防止
  • グローバル低レイテンシ: 8以上のリージョンへの自動レプリケーション
  • エッジ対応: Cloudflare Workers、Vercel Edge Functions完全サポート
  • 標準プロトコル: Redis/Kafkaプロトコル互換で既存コードの移行が容易
  • インスタント起動: コールドスタートなしで即座にリクエスト処理
  • 開発者体験: シンプルなAPIと豊富なSDKで迅速な開発

デメリット

  • 機能制限: フルRedis/Kafkaと比較して一部の高度な機能が利用不可
  • レイテンシ変動: リージョン間のレプリケーションによる若干の遅延
  • データサイズ制限: 大規模データセットには不向きな場合がある
  • カスタマイズ制限: マネージドサービスのため設定の自由度が低い
  • ベンダー依存: Upstashプラットフォームへの依存

参考ページ

実装の例

セットアップと初期設定

# Upstash CLIをインストール
npm install -g @upstash/cli

# ログイン
upstash auth login

# Redisデータベースを作成
upstash redis create my-redis-db --region us-east-1

# Kafkaクラスターを作成  
upstash kafka create my-kafka-cluster --region us-east-1

Redis SDKの使用(Node.js)

// @upstash/redisをインストール
// npm install @upstash/redis

import { Redis } from '@upstash/redis';

// Redis接続を初期化
const redis = new Redis({
  url: process.env.UPSTASH_REDIS_REST_URL,
  token: process.env.UPSTASH_REDIS_REST_TOKEN,
});

// 基本的なキー・バリュー操作
async function basicOperations() {
  // 値を設定
  await redis.set('user:1', JSON.stringify({
    id: 1,
    name: 'Alice',
    email: '[email protected]'
  }));

  // 値を取得
  const user = await redis.get('user:1');
  console.log('User:', JSON.parse(user));

  // TTL付きで設定(60秒)
  await redis.setex('session:abc123', 60, 'user-data');

  // カウンターのインクリメント
  const views = await redis.incr('page:home:views');
  console.log('Page views:', views);

  // リスト操作
  await redis.lpush('notifications', 'New message');
  const notification = await redis.rpop('notifications');
}

basicOperations();

Cloudflare Workers での使用

// wrangler.toml
// [vars]
// UPSTASH_REDIS_REST_URL = "https://xxx.upstash.io"
// UPSTASH_REDIS_REST_TOKEN = "your-token"

import { Redis } from '@upstash/redis/cloudflare';

export default {
  async fetch(request, env) {
    const redis = Redis.fromEnv(env);
    
    // レート制限の実装
    const ip = request.headers.get('CF-Connecting-IP');
    const key = `rate:${ip}`;
    
    const current = await redis.incr(key);
    if (current === 1) {
      await redis.expire(key, 60); // 60秒でリセット
    }
    
    if (current > 100) {
      return new Response('Rate limit exceeded', { status: 429 });
    }
    
    // キャッシュの実装
    const cacheKey = `cache:${request.url}`;
    const cached = await redis.get(cacheKey);
    
    if (cached) {
      return new Response(cached, {
        headers: { 'X-Cache': 'HIT' }
      });
    }
    
    // 実際の処理...
    const response = 'Hello from Edge!';
    await redis.setex(cacheKey, 300, response); // 5分間キャッシュ
    
    return new Response(response, {
      headers: { 'X-Cache': 'MISS' }
    });
  }
};

Kafka の使用例

import { Kafka } from '@upstash/kafka';

const kafka = new Kafka({
  url: process.env.UPSTASH_KAFKA_REST_URL,
  username: process.env.UPSTASH_KAFKA_REST_USERNAME,
  password: process.env.UPSTASH_KAFKA_REST_PASSWORD,
});

// プロデューサー
async function publishEvent() {
  const producer = kafka.producer();
  
  const message = {
    eventType: 'user.created',
    userId: '12345',
    timestamp: new Date().toISOString(),
    data: {
      name: 'Alice',
      email: '[email protected]'
    }
  };
  
  const response = await producer.produce('user-events', message);
  console.log('Message published:', response);
}

// コンシューマー
async function consumeEvents() {
  const consumer = kafka.consumer();
  
  const messages = await consumer.consume({
    consumerGroupId: 'my-group',
    instanceId: 'my-instance',
    topics: ['user-events'],
    autoOffsetReset: 'earliest',
  });
  
  messages.forEach(message => {
    console.log('Received:', message.value);
    // メッセージ処理ロジック
  });
}

Vector(ベクターデータベース)の使用

import { Index } from '@upstash/vector';

const index = new Index({
  url: process.env.UPSTASH_VECTOR_REST_URL,
  token: process.env.UPSTASH_VECTOR_REST_TOKEN,
});

// ベクターの追加
async function addVectors() {
  await index.upsert([
    {
      id: 'doc1',
      vector: [0.1, 0.2, 0.3, 0.4],
      metadata: { title: 'Introduction to AI' }
    },
    {
      id: 'doc2', 
      vector: [0.2, 0.3, 0.4, 0.5],
      metadata: { title: 'Machine Learning Basics' }
    }
  ]);
}

// 類似検索
async function searchSimilar() {
  const queryVector = [0.15, 0.25, 0.35, 0.45];
  const results = await index.query({
    vector: queryVector,
    topK: 5,
    includeMetadata: true
  });
  
  results.forEach(result => {
    console.log(`Score: ${result.score}, Doc: ${result.metadata.title}`);
  });
}

QStash(メッセージキュー)の使用

import { QStash } from '@upstash/qstash';

const qstash = new QStash({
  token: process.env.QSTASH_TOKEN,
});

// メッセージの送信
async function sendMessage() {
  await qstash.publishJSON({
    url: 'https://my-api.com/webhook',
    body: {
      action: 'send-email',
      to: '[email protected]',
      subject: 'Welcome!'
    },
    retries: 3,
    delay: '10s', // 10秒後に配信
  });
}

// CRONジョブの設定
async function setupCron() {
  await qstash.schedules.create({
    destination: 'https://my-api.com/daily-report',
    cron: '0 9 * * *', // 毎日9時
    body: { type: 'daily-report' }
  });
}