データベース

Apache Ignite

概要

Apache Igniteは、分散インメモリコンピューティングプラットフォームです。メモリ中心アーキテクチャによる高速データアクセスと永続化機能を両立し、SQLサポートによる柔軟なクエリ実行を実現します。大規模データ処理、金融、通信業界での高性能要求に対応しています。

詳細

Apache Igniteは、2007年にGridGain Systemsで開発が始まり、2014年にApacheインキュベータープロジェクトとして受け入れられ、2015年にトップレベルプロジェクトに昇格しました。メモリ中心の分散コンピューティングプラットフォームとして、データベース、キャッシュ、コンピュートグリッドの機能を統合しています。

Apache Igniteの主な特徴:

  • メモリ中心アーキテクチャ: RAMを主記憶域とした高速データ処理
  • 永続化サポート: オプションでディスクへのデータ永続化
  • SQLサポート: ANSI SQL、JDBC/ODBCドライバー対応
  • ACIDトランザクション: 完全なACID準拠のトランザクション処理
  • コンピュートグリッド: 分散コンピューティングとMapReduce
  • ストリーミング: リアルタイムデータストリーム処理
  • マルチ言語サポート: Java、.NET、C++、Python対応
  • 横斷的スケーリング: 動的クラスター拡張と縮小
  • 高可用性: 自動フェイルオーバーとデータレプリケーション
  • コンプライアンス: ISO/IEC 9075 SQL標準準拠

メリット・デメリット

メリット

  • 高性能: インメモリ処理による超高速アクセス
  • 柔軟性: メモリオンリーと永続化モードの選択可能
  • SQLサポート: 標準SQLによる直感的なデータ操作
  • ACID準拠: データ整合性と一貫性の保証
  • 統合プラットフォーム: キャッシュ、DB、コンピュート機能統合
  • スケーラビリティ: 水平スケーリングによる高いスループット
  • オープンソース: Apacheライセンスで無料利用可能
  • エンタープライズ対応: 商用サポートの提供

デメリット

  • メモリ使用量: 大量のRAMリソースが必要
  • 複雑性: 設定とチューニングの複雑さ
  • コミュニティ主導: GridGainサポート終了後の不確実性
  • 学習コスト: 分散システム理解の必要性
  • パフォーマンスチューニング: 最適な性能のための詳細設定が必要

主要リンク

書き方の例

インストール・セットアップ

# バイナリダウンロードとインストール
wget https://archive.apache.org/dist/ignite/2.16.0/apache-ignite-2.16.0-bin.zip
unzip apache-ignite-2.16.0-bin.zip
cd apache-ignite-2.16.0-bin
bin/ignite.sh

# Dockerでの実行
docker run --name ignite --rm -p 10800:10800 -p 47100:47100 -p 47500:47500 \
  apacheignite/ignite:2.16.0

# Docker Composeでのクラスター構成
version: '3.8'
services:
  ignite-1:
    image: apacheignite/ignite:2.16.0
    environment:
      - OPTION_LIBS=ignite-rest-http
      - CONFIG_URI=https://raw.githubusercontent.com/apache/ignite/master/config/default-config.xml
    ports:
      - "10800:10800"
      - "47100:47100"
  ignite-2:
    image: apacheignite/ignite:2.16.0
    environment:
      - OPTION_LIBS=ignite-rest-http
    ports:
      - "10801:10800"
      - "47101:47100"

# Maven依存関係
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-core</artifactId>
    <version>2.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-indexing</artifactId>
    <version>2.16.0</version>
</dependency>

# Gradle依存関係
implementation 'org.apache.ignite:ignite-core:2.16.0'
implementation 'org.apache.ignite:ignite-indexing:2.16.0'

基本操作(キャッシュ・コンピューティング)

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;

public class IgniteBasicExample {
    public static void main(String[] args) {
        // Ignite設定
        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setClientMode(false); // サーバーモード
        
        // Igniteインスタンス起動
        try (Ignite ignite = Ignition.start(cfg)) {
            // キャッシュ作成
            IgniteCache<Integer, String> cache = ignite.getOrCreateCache("myCache");
            
            // データ追加
            cache.put(1, "田中太郎");
            cache.put(2, "佐藤花子");
            cache.put(3, "山田次郎");
            
            // データ取得
            String value = cache.get(1);
            System.out.println("Value for key 1: " + value);
            
            // キャッシュサイズ確認
            System.out.println("Cache size: " + cache.size());
            
            // バルク操作
            Map<Integer, String> batch = new HashMap<>();
            batch.put(10, "鈴木一郎");
            batch.put(11, "伊藤美香");
            cache.putAll(batch);
            
            // 全データ表示
            for (Cache.Entry<Integer, String> entry : cache) {
                System.out.println("Key: " + entry.getKey() + ", Value: " + entry.getValue());
            }
        }
    }
}

SQL機能とデータベース操作

import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.QueryEntity;
import java.util.Collections;

public class IgniteSQLExample {
    
    // Personクラス定義
    public static class Person {
        private String name;
        private int age;
        private String department;
        
        public Person(String name, int age, String department) {
            this.name = name;
            this.age = age;
            this.department = department;
        }
        
        // getter/setterメソッド省略
    }
    
    public static void main(String[] args) {
        IgniteConfiguration cfg = new IgniteConfiguration();
        
        // キャッシュ設定とSQLスキーマ定義
        CacheConfiguration<Integer, Person> cacheCfg = new CacheConfiguration<>("person");
        
        QueryEntity qryEntity = new QueryEntity();
        qryEntity.setKeyType(Integer.class.getName());
        qryEntity.setValueType(Person.class.getName());
        qryEntity.addQueryField("name", String.class.getName(), null);
        qryEntity.addQueryField("age", Integer.class.getName(), null);
        qryEntity.addQueryField("department", String.class.getName(), null);
        qryEntity.setIndexes(Collections.singleton(new QueryIndex("age")));
        
        cacheCfg.setQueryEntities(Collections.singletonList(qryEntity));
        
        try (Ignite ignite = Ignition.start(cfg)) {
            IgniteCache<Integer, Person> cache = ignite.getOrCreateCache(cacheCfg);
            
            // テストデータ投入
            cache.put(1, new Person("田中太郎", 30, "技術部"));
            cache.put(2, new Person("佐藤花子", 25, "営業部"));
            cache.put(3, new Person("山田次郎", 35, "技術部"));
            cache.put(4, new Person("鈴木一郎", 28, "人事部"));
            
            // SQLクエリ実行
            SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "age >= ? AND department = ?");
            QueryCursor<Cache.Entry<Integer, Person>> cursor = cache.query(qry.setArgs(25, "技術部"));
            
            System.out.println("技術部の25歳以上の社員:");
            for (Cache.Entry<Integer, Person> entry : cursor) {
                Person person = entry.getValue();
                System.out.println(person.getName() + " (" + person.getAge() + "歳)");
            }
            
            // SQL Fields Query(集計クエリ)
            SqlFieldsQuery fieldQry = new SqlFieldsQuery(
                "SELECT department, COUNT(*), AVG(age) FROM Person GROUP BY department");
            
            QueryCursor<List<?>> fieldCursor = cache.query(fieldQry);
            System.out.println("\n部署別統計:");
            for (List<?> row : fieldCursor) {
                System.out.println(部署: " + row.get(0) + ", 人数: " + row.get(1) + ", 平均年齢: " + row.get(2));
            }
            
            // DDLクエリ(テーブル作成)
            cache.query(new SqlFieldsQuery(
                "CREATE TABLE employee (id INT PRIMARY KEY, name VARCHAR, salary DECIMAL) " +
                "WITH \"template=replicated\"")).getAll();
            
            // DMLクエリ(データ挿入)
            cache.query(new SqlFieldsQuery(
                "INSERT INTO employee (id, name, salary) VALUES (?, ?, ?)").setArgs(1, "田中太郎", 50000)).getAll();
        }
    }
}

分散コンピューティング

import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;

public class IgniteComputeExample {
    
    // 計算タスククラス
    public static class WordCountTask extends ComputeTaskSplitAdapter<String, Integer> {
        @Override
        public Collection<? extends ComputeJob> split(int gridSize, String text) {
            List<ComputeJob> jobs = new ArrayList<>();
            String[] words = text.split(" ");
            
            // 各ノードで処理する単語を分割
            int wordsPerJob = Math.max(1, words.length / gridSize);
            for (int i = 0; i < words.length; i += wordsPerJob) {
                int endIdx = Math.min(i + wordsPerJob, words.length);
                String[] jobWords = Arrays.copyOfRange(words, i, endIdx);
                jobs.add(new WordCountJob(jobWords));
            }
            
            return jobs;
        }
        
        @Override
        public Integer reduce(List<ComputeJobResult> results) {
            int totalCount = 0;
            for (ComputeJobResult result : results) {
                totalCount += result.<Integer>getData();
            }
            return totalCount;
        }
    }
    
    // 各ノードで実行されるジョブ
    public static class WordCountJob extends ComputeJobAdapter {
        private String[] words;
        
        public WordCountJob(String[] words) {
            this.words = words;
        }
        
        @Override
        public Object execute() {
            System.out.println("ノードで処理中: " + Arrays.toString(words));
            return words.length;
        }
    }
    
    public static void main(String[] args) {
        try (Ignite ignite = Ignition.start()) {
            // コンピュートタスク実行
            String text = "これは Apache Ignite の分散コンピューティングのサンプルです";
            
            Integer result = ignite.compute().execute(WordCountTask.class, text);
            System.out.println("単語数: " + result);
            
            // シンプルな関数型コンピュート
            Collection<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
            
            // 各ノードで並列処理
            Collection<Integer> squares = ignite.compute().apply(
                (Integer n) -> {
                    System.out.println("ノードで計算: " + n + "^2");
                    return n * n;
                },
                numbers
            );
            
            System.out.println("二乗結果: " + squares);
            
            // MapReduceスタイルの集計処理
            Integer sum = ignite.compute().apply(
                Arrays.asList(1, 2, 3, 4, 5),
                (Collection<Integer> nums) -> nums.stream().mapToInt(Integer::intValue).sum()
            ).stream().mapToInt(Integer::intValue).sum();
            
            System.out.println("合計: " + sum);
        }
    }
}

永続化とデータ管理

import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.WALMode;

public class IgnitePersistenceExample {
    
    public static void main(String[] args) {
        IgniteConfiguration cfg = new IgniteConfiguration();
        
        // 永続化設定
        DataStorageConfiguration storageCfg = new DataStorageConfiguration();
        
        // データリージョン設定
        DataRegionConfiguration dataRegionCfg = new DataRegionConfiguration();
        dataRegionCfg.setName("default_region");
        dataRegionCfg.setPersistenceEnabled(true); // 永続化有効
        dataRegionCfg.setInitialSize(100L * 1024 * 1024); // 100MB
        dataRegionCfg.setMaxSize(200L * 1024 * 1024);     // 200MB
        
        storageCfg.setDefaultDataRegionConfiguration(dataRegionCfg);
        storageCfg.setWalMode(WALMode.LOG_ONLY); // WALモード設定
        storageCfg.setStoragePath("/opt/ignite/data"); // データファイルパス
        storageCfg.setWalPath("/opt/ignite/wal");       // WALファイルパス
        
        cfg.setDataStorageConfiguration(storageCfg);
        
        try (Ignite ignite = Ignition.start(cfg)) {
            // クラスターをアクティブ化(永続化有効時必要)
            ignite.cluster().active(true);
            
            // 永続化キャッシュ作成
            CacheConfiguration<Integer, String> cacheCfg = new CacheConfiguration<>("persistentCache");
            cacheCfg.setDataRegionName("default_region");
            
            IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cacheCfg);
            
            // データ書き込み
            for (int i = 1; i <= 1000; i++) {
                cache.put(i, "Persistent Value " + i);
            }
            
            System.out.println("キャッシュサイズ: " + cache.size());
            
            // スナップショット作成
            ignite.snapshot().createSnapshot("backup_" + System.currentTimeMillis()).get();
            
            // キャッシュメトリクス表示
            CacheMetrics metrics = cache.metrics();
            System.out.println("キャッシュヒット率: " + metrics.getCacheHitPercentage());
            System.out.println("平均書き込み時間: " + metrics.getAveragePutTime());
            System.out.println("平均読み取り時間: " + metrics.getAverageGetTime());
        }
    }
}

設定と最適化

<!-- default-config.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans.xsd">
    
    <bean class="org.apache.ignite.configuration.IgniteConfiguration">
        
        <!-- クラスター名 -->
        <property name="igniteInstanceName" value="production-cluster"/>
        
        <!-- ネットワーク設定 -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <value>192.168.1.100:47500..47509</value>
                                <value>192.168.1.101:47500..47509</value>
                                <value>192.168.1.102:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
        
        <!-- データストレージ設定 -->
        <property name="dataStorageConfiguration">
            <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
                <property name="defaultDataRegionConfiguration">
                    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                        <property name="persistenceEnabled" value="true"/>
                        <property name="initialSize" value="#{100L * 1024 * 1024}"/>
                        <property name="maxSize" value="#{2L * 1024 * 1024 * 1024}"/>
                        <property name="pageEvictionMode" value="RANDOM_LRU"/>
                    </bean>
                </property>
                <property name="walMode" value="LOG_ONLY"/>
                <property name="checkpointFrequency" value="180000"/>
                <property name="walHistorySize" value="20"/>
            </bean>
        </property>
        
        <!-- キャッシュ設定 -->
        <property name="cacheConfiguration">
            <list>
                <bean class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="name" value="userCache"/>
                    <property name="cacheMode" value="PARTITIONED"/>
                    <property name="backups" value="1"/>
                    <property name="atomicityMode" value="TRANSACTIONAL"/>
                    <property name="writeSynchronizationMode" value="FULL_SYNC"/>
                    <property name="expiryPolicyFactory">
                        <bean class="javax.cache.expiry.CreatedExpiryPolicy" factory-method="factoryOf">
                            <constructor-arg>
                                <bean class="javax.cache.expiry.Duration">
                                    <constructor-arg value="HOURS"/>
                                    <constructor-arg value="1"/>
                                </bean>
                            </constructor-arg>
                        </bean>
                    </property>
                </bean>
            </list>
        </property>
        
        <!-- JVMメトリクス有効化 -->
        <property name="metricsLogFrequency" value="60000"/>
        <property name="queryThreadPoolSize" value="8"/>
        <property name="systemThreadPoolSize" value="16"/>
        
    </bean>
</beans>

実用例・本番運用

// Spring Boot統合例
@Configuration
public class IgniteConfig {
    
    @Bean
    public Ignite igniteInstance() {
        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setIgniteInstanceName("spring-ignite");
        
        // データストレージ設定
        DataStorageConfiguration storageCfg = new DataStorageConfiguration();
        DataRegionConfiguration regionCfg = new DataRegionConfiguration();
        regionCfg.setName("spring_region");
        regionCfg.setPersistenceEnabled(true);
        regionCfg.setInitialSize(100 * 1024 * 1024L);
        regionCfg.setMaxSize(500 * 1024 * 1024L);
        storageCfg.setDefaultDataRegionConfiguration(regionCfg);
        cfg.setDataStorageConfiguration(storageCfg);
        
        return Ignition.start(cfg);
    }
}

@Service
public class DataService {
    
    @Autowired
    private Ignite ignite;
    
    @PostConstruct
    public void initializeCache() {
        // クラスターアクティブ化
        ignite.cluster().active(true);
        
        // キャッシュ初期化
        CacheConfiguration<String, Object> cacheCfg = new CacheConfiguration<>("businessCache");
        cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
        cacheCfg.setBackups(1);
        ignite.getOrCreateCache(cacheCfg);
    }
    
    public void performTransaction() {
        try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
            IgniteCache<String, Object> cache = ignite.cache("businessCache");
            
            // トランザクション内での操作
            String currentValue = (String) cache.get("counter");
            int newValue = currentValue != null ? Integer.parseInt(currentValue) + 1 : 1;
            cache.put("counter", String.valueOf(newValue));
            
            // ビジネスロジック処理
            processBusinessLogic(newValue);
            
            tx.commit();
        } catch (Exception e) {
            log.error("トランザクションエラー: ", e);
            throw new RuntimeException("トランザクションに失敗しました", e);
        }
    }
    
    // ストリーミングデータ処理
    public void setupDataStreaming() {
        try (IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer("streamCache")) {
            streamer.perNodeBufferSize(1024);
            streamer.perNodeParallelOperations(8);
            
            // 大量データの高速投入
            for (int i = 0; i < 1000000; i++) {
                streamer.addData(i, "Stream data " + i);
                
                if (i % 10000 == 0) {
                    System.out.println("処理数: " + i);
                }
            }
        }
    }
    
    // 監視とメトリクス
    @Scheduled(fixedRate = 60000)
    public void logMetrics() {
        ClusterMetrics metrics = ignite.cluster().metrics();
        
        System.out.println("クラスターサイズ: " + ignite.cluster().nodes().size());
        System.out.println("CPU使用率: " + String.format("%.2f%%", metrics.getCurrentCpuLoad() * 100));
        System.out.println("ヒープメモリ使用率: " + 
            String.format("%.2f%%", (double)metrics.getHeapMemoryUsed() / metrics.getHeapMemoryMaximum() * 100));
        
        // キャッシュメトリクス
        IgniteCache<String, Object> cache = ignite.cache("businessCache");
        if (cache != null) {
            CacheMetrics cacheMetrics = cache.metrics();
            System.out.println("キャッシュヒット率: " + 
                String.format("%.2f%%", cacheMetrics.getCacheHitPercentage()));
        }
    }
}