データベース
Apache Ignite
概要
Apache Igniteは、分散インメモリコンピューティングプラットフォームです。メモリ中心アーキテクチャによる高速データアクセスと永続化機能を両立し、SQLサポートによる柔軟なクエリ実行を実現します。大規模データ処理、金融、通信業界での高性能要求に対応しています。
詳細
Apache Igniteは、2007年にGridGain Systemsで開発が始まり、2014年にApacheインキュベータープロジェクトとして受け入れられ、2015年にトップレベルプロジェクトに昇格しました。メモリ中心の分散コンピューティングプラットフォームとして、データベース、キャッシュ、コンピュートグリッドの機能を統合しています。
Apache Igniteの主な特徴:
- メモリ中心アーキテクチャ: RAMを主記憶域とした高速データ処理
- 永続化サポート: オプションでディスクへのデータ永続化
- SQLサポート: ANSI SQL、JDBC/ODBCドライバー対応
- ACIDトランザクション: 完全なACID準拠のトランザクション処理
- コンピュートグリッド: 分散コンピューティングとMapReduce
- ストリーミング: リアルタイムデータストリーム処理
- マルチ言語サポート: Java、.NET、C++、Python対応
- 横斷的スケーリング: 動的クラスター拡張と縮小
- 高可用性: 自動フェイルオーバーとデータレプリケーション
- コンプライアンス: ISO/IEC 9075 SQL標準準拠
メリット・デメリット
メリット
- 高性能: インメモリ処理による超高速アクセス
- 柔軟性: メモリオンリーと永続化モードの選択可能
- SQLサポート: 標準SQLによる直感的なデータ操作
- ACID準拠: データ整合性と一貫性の保証
- 統合プラットフォーム: キャッシュ、DB、コンピュート機能統合
- スケーラビリティ: 水平スケーリングによる高いスループット
- オープンソース: Apacheライセンスで無料利用可能
- エンタープライズ対応: 商用サポートの提供
デメリット
- メモリ使用量: 大量のRAMリソースが必要
- 複雑性: 設定とチューニングの複雑さ
- コミュニティ主導: GridGainサポート終了後の不確実性
- 学習コスト: 分散システム理解の必要性
- パフォーマンスチューニング: 最適な性能のための詳細設定が必要
主要リンク
書き方の例
インストール・セットアップ
# バイナリダウンロードとインストール
wget https://archive.apache.org/dist/ignite/2.16.0/apache-ignite-2.16.0-bin.zip
unzip apache-ignite-2.16.0-bin.zip
cd apache-ignite-2.16.0-bin
bin/ignite.sh
# Dockerでの実行
docker run --name ignite --rm -p 10800:10800 -p 47100:47100 -p 47500:47500 \
apacheignite/ignite:2.16.0
# Docker Composeでのクラスター構成
version: '3.8'
services:
ignite-1:
image: apacheignite/ignite:2.16.0
environment:
- OPTION_LIBS=ignite-rest-http
- CONFIG_URI=https://raw.githubusercontent.com/apache/ignite/master/config/default-config.xml
ports:
- "10800:10800"
- "47100:47100"
ignite-2:
image: apacheignite/ignite:2.16.0
environment:
- OPTION_LIBS=ignite-rest-http
ports:
- "10801:10800"
- "47101:47100"
# Maven依存関係
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
<version>2.16.0</version>
</dependency>
# Gradle依存関係
implementation 'org.apache.ignite:ignite-core:2.16.0'
implementation 'org.apache.ignite:ignite-indexing:2.16.0'
基本操作(キャッシュ・コンピューティング)
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
public class IgniteBasicExample {
public static void main(String[] args) {
// Ignite設定
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setClientMode(false); // サーバーモード
// Igniteインスタンス起動
try (Ignite ignite = Ignition.start(cfg)) {
// キャッシュ作成
IgniteCache<Integer, String> cache = ignite.getOrCreateCache("myCache");
// データ追加
cache.put(1, "田中太郎");
cache.put(2, "佐藤花子");
cache.put(3, "山田次郎");
// データ取得
String value = cache.get(1);
System.out.println("Value for key 1: " + value);
// キャッシュサイズ確認
System.out.println("Cache size: " + cache.size());
// バルク操作
Map<Integer, String> batch = new HashMap<>();
batch.put(10, "鈴木一郎");
batch.put(11, "伊藤美香");
cache.putAll(batch);
// 全データ表示
for (Cache.Entry<Integer, String> entry : cache) {
System.out.println("Key: " + entry.getKey() + ", Value: " + entry.getValue());
}
}
}
}
SQL機能とデータベース操作
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.QueryEntity;
import java.util.Collections;
public class IgniteSQLExample {
// Personクラス定義
public static class Person {
private String name;
private int age;
private String department;
public Person(String name, int age, String department) {
this.name = name;
this.age = age;
this.department = department;
}
// getter/setterメソッド省略
}
public static void main(String[] args) {
IgniteConfiguration cfg = new IgniteConfiguration();
// キャッシュ設定とSQLスキーマ定義
CacheConfiguration<Integer, Person> cacheCfg = new CacheConfiguration<>("person");
QueryEntity qryEntity = new QueryEntity();
qryEntity.setKeyType(Integer.class.getName());
qryEntity.setValueType(Person.class.getName());
qryEntity.addQueryField("name", String.class.getName(), null);
qryEntity.addQueryField("age", Integer.class.getName(), null);
qryEntity.addQueryField("department", String.class.getName(), null);
qryEntity.setIndexes(Collections.singleton(new QueryIndex("age")));
cacheCfg.setQueryEntities(Collections.singletonList(qryEntity));
try (Ignite ignite = Ignition.start(cfg)) {
IgniteCache<Integer, Person> cache = ignite.getOrCreateCache(cacheCfg);
// テストデータ投入
cache.put(1, new Person("田中太郎", 30, "技術部"));
cache.put(2, new Person("佐藤花子", 25, "営業部"));
cache.put(3, new Person("山田次郎", 35, "技術部"));
cache.put(4, new Person("鈴木一郎", 28, "人事部"));
// SQLクエリ実行
SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "age >= ? AND department = ?");
QueryCursor<Cache.Entry<Integer, Person>> cursor = cache.query(qry.setArgs(25, "技術部"));
System.out.println("技術部の25歳以上の社員:");
for (Cache.Entry<Integer, Person> entry : cursor) {
Person person = entry.getValue();
System.out.println(person.getName() + " (" + person.getAge() + "歳)");
}
// SQL Fields Query(集計クエリ)
SqlFieldsQuery fieldQry = new SqlFieldsQuery(
"SELECT department, COUNT(*), AVG(age) FROM Person GROUP BY department");
QueryCursor<List<?>> fieldCursor = cache.query(fieldQry);
System.out.println("\n部署別統計:");
for (List<?> row : fieldCursor) {
System.out.println(部署: " + row.get(0) + ", 人数: " + row.get(1) + ", 平均年齢: " + row.get(2));
}
// DDLクエリ(テーブル作成)
cache.query(new SqlFieldsQuery(
"CREATE TABLE employee (id INT PRIMARY KEY, name VARCHAR, salary DECIMAL) " +
"WITH \"template=replicated\"")).getAll();
// DMLクエリ(データ挿入)
cache.query(new SqlFieldsQuery(
"INSERT INTO employee (id, name, salary) VALUES (?, ?, ?)").setArgs(1, "田中太郎", 50000)).getAll();
}
}
}
分散コンピューティング
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
public class IgniteComputeExample {
// 計算タスククラス
public static class WordCountTask extends ComputeTaskSplitAdapter<String, Integer> {
@Override
public Collection<? extends ComputeJob> split(int gridSize, String text) {
List<ComputeJob> jobs = new ArrayList<>();
String[] words = text.split(" ");
// 各ノードで処理する単語を分割
int wordsPerJob = Math.max(1, words.length / gridSize);
for (int i = 0; i < words.length; i += wordsPerJob) {
int endIdx = Math.min(i + wordsPerJob, words.length);
String[] jobWords = Arrays.copyOfRange(words, i, endIdx);
jobs.add(new WordCountJob(jobWords));
}
return jobs;
}
@Override
public Integer reduce(List<ComputeJobResult> results) {
int totalCount = 0;
for (ComputeJobResult result : results) {
totalCount += result.<Integer>getData();
}
return totalCount;
}
}
// 各ノードで実行されるジョブ
public static class WordCountJob extends ComputeJobAdapter {
private String[] words;
public WordCountJob(String[] words) {
this.words = words;
}
@Override
public Object execute() {
System.out.println("ノードで処理中: " + Arrays.toString(words));
return words.length;
}
}
public static void main(String[] args) {
try (Ignite ignite = Ignition.start()) {
// コンピュートタスク実行
String text = "これは Apache Ignite の分散コンピューティングのサンプルです";
Integer result = ignite.compute().execute(WordCountTask.class, text);
System.out.println("単語数: " + result);
// シンプルな関数型コンピュート
Collection<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 各ノードで並列処理
Collection<Integer> squares = ignite.compute().apply(
(Integer n) -> {
System.out.println("ノードで計算: " + n + "^2");
return n * n;
},
numbers
);
System.out.println("二乗結果: " + squares);
// MapReduceスタイルの集計処理
Integer sum = ignite.compute().apply(
Arrays.asList(1, 2, 3, 4, 5),
(Collection<Integer> nums) -> nums.stream().mapToInt(Integer::intValue).sum()
).stream().mapToInt(Integer::intValue).sum();
System.out.println("合計: " + sum);
}
}
}
永続化とデータ管理
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.WALMode;
public class IgnitePersistenceExample {
public static void main(String[] args) {
IgniteConfiguration cfg = new IgniteConfiguration();
// 永続化設定
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
// データリージョン設定
DataRegionConfiguration dataRegionCfg = new DataRegionConfiguration();
dataRegionCfg.setName("default_region");
dataRegionCfg.setPersistenceEnabled(true); // 永続化有効
dataRegionCfg.setInitialSize(100L * 1024 * 1024); // 100MB
dataRegionCfg.setMaxSize(200L * 1024 * 1024); // 200MB
storageCfg.setDefaultDataRegionConfiguration(dataRegionCfg);
storageCfg.setWalMode(WALMode.LOG_ONLY); // WALモード設定
storageCfg.setStoragePath("/opt/ignite/data"); // データファイルパス
storageCfg.setWalPath("/opt/ignite/wal"); // WALファイルパス
cfg.setDataStorageConfiguration(storageCfg);
try (Ignite ignite = Ignition.start(cfg)) {
// クラスターをアクティブ化(永続化有効時必要)
ignite.cluster().active(true);
// 永続化キャッシュ作成
CacheConfiguration<Integer, String> cacheCfg = new CacheConfiguration<>("persistentCache");
cacheCfg.setDataRegionName("default_region");
IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cacheCfg);
// データ書き込み
for (int i = 1; i <= 1000; i++) {
cache.put(i, "Persistent Value " + i);
}
System.out.println("キャッシュサイズ: " + cache.size());
// スナップショット作成
ignite.snapshot().createSnapshot("backup_" + System.currentTimeMillis()).get();
// キャッシュメトリクス表示
CacheMetrics metrics = cache.metrics();
System.out.println("キャッシュヒット率: " + metrics.getCacheHitPercentage());
System.out.println("平均書き込み時間: " + metrics.getAveragePutTime());
System.out.println("平均読み取り時間: " + metrics.getAverageGetTime());
}
}
}
設定と最適化
<!-- default-config.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- クラスター名 -->
<property name="igniteInstanceName" value="production-cluster"/>
<!-- ネットワーク設定 -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>192.168.1.100:47500..47509</value>
<value>192.168.1.101:47500..47509</value>
<value>192.168.1.102:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<!-- データストレージ設定 -->
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="persistenceEnabled" value="true"/>
<property name="initialSize" value="#{100L * 1024 * 1024}"/>
<property name="maxSize" value="#{2L * 1024 * 1024 * 1024}"/>
<property name="pageEvictionMode" value="RANDOM_LRU"/>
</bean>
</property>
<property name="walMode" value="LOG_ONLY"/>
<property name="checkpointFrequency" value="180000"/>
<property name="walHistorySize" value="20"/>
</bean>
</property>
<!-- キャッシュ設定 -->
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="userCache"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="backups" value="1"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
<property name="expiryPolicyFactory">
<bean class="javax.cache.expiry.CreatedExpiryPolicy" factory-method="factoryOf">
<constructor-arg>
<bean class="javax.cache.expiry.Duration">
<constructor-arg value="HOURS"/>
<constructor-arg value="1"/>
</bean>
</constructor-arg>
</bean>
</property>
</bean>
</list>
</property>
<!-- JVMメトリクス有効化 -->
<property name="metricsLogFrequency" value="60000"/>
<property name="queryThreadPoolSize" value="8"/>
<property name="systemThreadPoolSize" value="16"/>
</bean>
</beans>
実用例・本番運用
// Spring Boot統合例
@Configuration
public class IgniteConfig {
@Bean
public Ignite igniteInstance() {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setIgniteInstanceName("spring-ignite");
// データストレージ設定
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
DataRegionConfiguration regionCfg = new DataRegionConfiguration();
regionCfg.setName("spring_region");
regionCfg.setPersistenceEnabled(true);
regionCfg.setInitialSize(100 * 1024 * 1024L);
regionCfg.setMaxSize(500 * 1024 * 1024L);
storageCfg.setDefaultDataRegionConfiguration(regionCfg);
cfg.setDataStorageConfiguration(storageCfg);
return Ignition.start(cfg);
}
}
@Service
public class DataService {
@Autowired
private Ignite ignite;
@PostConstruct
public void initializeCache() {
// クラスターアクティブ化
ignite.cluster().active(true);
// キャッシュ初期化
CacheConfiguration<String, Object> cacheCfg = new CacheConfiguration<>("businessCache");
cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cacheCfg.setBackups(1);
ignite.getOrCreateCache(cacheCfg);
}
public void performTransaction() {
try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
IgniteCache<String, Object> cache = ignite.cache("businessCache");
// トランザクション内での操作
String currentValue = (String) cache.get("counter");
int newValue = currentValue != null ? Integer.parseInt(currentValue) + 1 : 1;
cache.put("counter", String.valueOf(newValue));
// ビジネスロジック処理
processBusinessLogic(newValue);
tx.commit();
} catch (Exception e) {
log.error("トランザクションエラー: ", e);
throw new RuntimeException("トランザクションに失敗しました", e);
}
}
// ストリーミングデータ処理
public void setupDataStreaming() {
try (IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer("streamCache")) {
streamer.perNodeBufferSize(1024);
streamer.perNodeParallelOperations(8);
// 大量データの高速投入
for (int i = 0; i < 1000000; i++) {
streamer.addData(i, "Stream data " + i);
if (i % 10000 == 0) {
System.out.println("処理数: " + i);
}
}
}
}
// 監視とメトリクス
@Scheduled(fixedRate = 60000)
public void logMetrics() {
ClusterMetrics metrics = ignite.cluster().metrics();
System.out.println("クラスターサイズ: " + ignite.cluster().nodes().size());
System.out.println("CPU使用率: " + String.format("%.2f%%", metrics.getCurrentCpuLoad() * 100));
System.out.println("ヒープメモリ使用率: " +
String.format("%.2f%%", (double)metrics.getHeapMemoryUsed() / metrics.getHeapMemoryMaximum() * 100));
// キャッシュメトリクス
IgniteCache<String, Object> cache = ignite.cache("businessCache");
if (cache != null) {
CacheMetrics cacheMetrics = cache.metrics();
System.out.println("キャッシュヒット率: " +
String.format("%.2f%%", cacheMetrics.getCacheHitPercentage()));
}
}
}