データベース
Hazelcast
概要
Hazelcastは、分散インメモリコンピューティングプラットフォームです。分散キャッシュ、分散実行、ストリーム処理を統合し、Java企業アプリケーションでの高性能データ処理に特化しています。エンタープライズ環境での大規模な分散システム構築において重要な役割を果たします。
詳細
Hazelcast(ヘーゼルキャスト)は、2008年にFuad Malikov、Talip Ozturk、Engin Inanによって設立されたオープンソースプロジェクトです。分散インメモリコンピューティングプラットフォームとして、データグリッド、分散キャッシュ、分散実行エンジンの機能を統合しています。
Hazelcastの主な特徴:
- 分散データ構造: Map、Queue、List、Set、Lock、Semaphore等の分散データ構造
- インメモリデータグリッド(IMDG): 高速なデータアクセスと処理
- 分散コンピューティング: クラスタ全体での計算処理分散
- ストリーム処理: リアルタイムデータストリーム処理(Jet機能)
- SQL サポート: ANSI SQLクエリサポート
- トランザクション: ACID準拠のトランザクション処理
- 高可用性: 自動フェイルオーバーとデータレプリケーション
- 水平スケーリング: ノード追加による線形スケーリング
- マルチ言語クライアント: Java、C++、C#、Python、Node.js、Go対応
- エンタープライズ機能: セキュリティ、管理センター、WAN レプリケーション
メリット・デメリット
メリット
- 高性能: インメモリ処理による低レイテンシ
- スケーラビリティ: 水平スケーリングによる高いスループット
- 開発効率: 豊富なJava APIと分散データ構造
- エンタープライズ対応: 堅牢な運用管理機能
- 統合機能: キャッシュ、計算、ストリーム処理の統合
- 高可用性: 自動故障復旧とデータ保護
- 標準準拠: JCacheとSQL標準サポート
- 商用サポート: エンタープライズサポートの提供
デメリット
- メモリ使用量: 大量のメモリリソースが必要
- 複雑性: 分散システムの管理とデバッグの複雑さ
- コスト: エンタープライズ版のライセンス費用
- Java中心: Java環境への依存が強い
- 学習コスト: 分散システム設計の専門知識が必要
主要リンク
書き方の例
インストール・セットアップ
# Java による実行(バイナリダウンロード)
wget https://repository.hazelcast.com/download/hazelcast/hazelcast-5.3.7.zip
unzip hazelcast-5.3.7.zip
cd hazelcast-5.3.7
bin/hz-start
# Docker による実行
docker run --name hazelcast --rm -p 5701:5701 hazelcast/hazelcast:5.3.7
# Docker Compose での複数ノード
version: '3.8'
services:
hazelcast-1:
image: hazelcast/hazelcast:5.3.7
environment:
- HZ_CLUSTERNAME=my-cluster
ports:
- "5701:5701"
hazelcast-2:
image: hazelcast/hazelcast:5.3.7
environment:
- HZ_CLUSTERNAME=my-cluster
ports:
- "5702:5701"
# Maven依存関係
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>5.3.7</version>
</dependency>
# Spring Boot統合
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast-spring</artifactId>
<version>5.3.7</version>
</dependency>
基本操作(分散データ構造)
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import com.hazelcast.collection.IQueue;
import com.hazelcast.collection.IList;
public class HazelcastBasicExample {
public static void main(String[] args) {
// Hazelcast インスタンス作成
Config config = new Config();
config.setClusterName("my-cluster");
HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
// 分散Map操作
IMap<String, String> map = hz.getMap("my-distributed-map");
map.put("key1", "value1");
map.put("key2", "value2");
System.out.println("Value: " + map.get("key1"));
// 分散Queue操作
IQueue<String> queue = hz.getQueue("my-distributed-queue");
queue.offer("item1");
queue.offer("item2");
System.out.println("Queue size: " + queue.size());
String item = queue.poll();
System.out.println("Polled item: " + item);
// 分散List操作
IList<String> list = hz.getList("my-distributed-list");
list.add("element1");
list.add("element2");
for (String element : list) {
System.out.println("List element: " + element);
}
hz.shutdown();
}
}
クライアント・サーバー接続
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
public class HazelcastClientExample {
public static void main(String[] args) {
// クライアント設定
ClientConfig clientConfig = new ClientConfig();
clientConfig.setClusterName("my-cluster");
clientConfig.getNetworkConfig().addAddress("127.0.0.1:5701");
// クライアント接続
HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
// 分散Mapアクセス
IMap<Integer, String> customerMap = client.getMap("customers");
customerMap.put(1, "田中太郎");
customerMap.put(2, "佐藤花子");
customerMap.put(3, "山田次郎");
// データ取得
for (Map.Entry<Integer, String> entry : customerMap.entrySet()) {
System.out.println("Customer " + entry.getKey() + ": " + entry.getValue());
}
// 条件検索(述語利用)
Collection<String> customers = customerMap.values(
Predicates.sql("this LIKE '%田%'")
);
System.out.println("Customers with 田: " + customers);
client.shutdown();
}
}
SQL クエリ機能
import com.hazelcast.sql.SqlService;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;
public class HazelcastSQLExample {
public static void main(String[] args) {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
// マッピング作成
SqlService sqlService = hz.getSql();
sqlService.execute(
"CREATE MAPPING employees (" +
" id INT," +
" name VARCHAR," +
" salary DECIMAL" +
") TYPE IMap OPTIONS (" +
" 'keyFormat'='int'," +
" 'valueFormat'='json-flat'" +
")"
);
// データ挿入
sqlService.execute("INSERT INTO employees VALUES (1, '田中太郎', 50000)");
sqlService.execute("INSERT INTO employees VALUES (2, '佐藤花子', 60000)");
sqlService.execute("INSERT INTO employees VALUES (3, '山田次郎', 55000)");
// SELECT クエリ
try (SqlResult result = sqlService.execute(
"SELECT name, salary FROM employees WHERE salary > 52000")) {
for (SqlRow row : result) {
String name = row.getObject(0);
BigDecimal salary = row.getObject(1);
System.out.println(name + ": " + salary);
}
}
// 集計クエリ
try (SqlResult result = sqlService.execute(
"SELECT COUNT(*), AVG(salary) FROM employees")) {
for (SqlRow row : result) {
long count = row.getObject(0);
BigDecimal avgSalary = row.getObject(1);
System.out.println("Count: " + count + ", Average: " + avgSalary);
}
}
hz.shutdown();
}
}
分散コンピューティング
import com.hazelcast.core.IExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Callable;
import java.io.Serializable;
public class HazelcastComputeExample {
// 計算タスククラス
public static class SumTask implements Callable<Integer>, Serializable {
private final int start;
private final int end;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
System.out.println("Computed sum from " + start + " to " + end + ": " + sum);
return sum;
}
}
public static void main(String[] args) throws Exception {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
// 分散実行サービス
IExecutorService executorService = hz.getExecutorService("my-executor");
// 複数のタスクを分散実行
Future<Integer> future1 = executorService.submit(new SumTask(1, 1000));
Future<Integer> future2 = executorService.submit(new SumTask(1001, 2000));
Future<Integer> future3 = executorService.submit(new SumTask(2001, 3000));
// 結果取得
int result1 = future1.get();
int result2 = future2.get();
int result3 = future3.get();
int totalSum = result1 + result2 + result3;
System.out.println("Total sum: " + totalSum);
// MapReduceタスク
IMap<String, Integer> numbers = hz.getMap("numbers");
for (int i = 1; i <= 100; i++) {
numbers.put("key" + i, i);
}
// MapReduce集計
int sum = numbers.aggregate(Aggregators.integerSum());
System.out.println("Sum using MapReduce: " + sum);
hz.shutdown();
}
}
設定と最適化
<!-- hazelcast.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.hazelcast.com/schema/config
https://www.hazelcast.com/schema/config/hazelcast-config-5.3.xsd">
<cluster-name>production-cluster</cluster-name>
<!-- ネットワーク設定 -->
<network>
<port auto-increment="true" port-count="100">5701</port>
<join>
<multicast enabled="false"/>
<tcp-ip enabled="true">
<member>192.168.1.100</member>
<member>192.168.1.101</member>
<member>192.168.1.102</member>
</tcp-ip>
</join>
<interfaces enabled="true">
<interface>192.168.1.*</interface>
</interfaces>
</network>
<!-- マップ設定 -->
<map name="users">
<backup-count>1</backup-count>
<async-backup-count>1</async-backup-count>
<time-to-live-seconds>3600</time-to-live-seconds>
<max-idle-seconds>1800</max-idle-seconds>
<eviction-policy>LRU</eviction-policy>
<max-size policy="PER_NODE">10000</max-size>
<near-cache>
<max-size>1000</max-size>
<time-to-live-seconds>600</time-to-live-seconds>
<eviction-policy>LFU</eviction-policy>
</near-cache>
</map>
<!-- 永続化設定 -->
<persistence enabled="true">
<base-dir>/opt/hazelcast/persistence</base-dir>
<backup-dir>/opt/hazelcast/backup</backup-dir>
<parallelism>4</parallelism>
</persistence>
<!-- JetによるストリーミングSQL -->
<jet enabled="true" resource-upload-enabled="true">
<instance>
<cooperative-thread-count>8</cooperative-thread-count>
<flow-control-period>100</flow-control-period>
<backup-count>1</backup-count>
</instance>
</jet>
</hazelcast>
実用例・本番運用
// Spring Boot統合例
@Configuration
@EnableHazelcast
public class HazelcastConfig {
@Bean
public Config hazelcastConfig() {
Config config = new Config();
config.setClusterName("spring-cluster");
// キャッシュ設定
MapConfig mapConfig = new MapConfig();
mapConfig.setName("user-cache");
mapConfig.setTimeToLiveSeconds(300);
mapConfig.setMaxSizeConfig(new MaxSizeConfig(1000, MaxSizeConfig.MaxSizePolicy.PER_NODE));
config.addMapConfig(mapConfig);
return config;
}
}
@Service
public class UserService {
@Autowired
private HazelcastInstance hazelcastInstance;
public User getUserById(Long id) {
IMap<Long, User> userCache = hazelcastInstance.getMap("user-cache");
return userCache.computeIfAbsent(id, key -> {
// データベースからユーザー取得
return userRepository.findById(key);
});
}
public void invalidateUser(Long id) {
IMap<Long, User> userCache = hazelcastInstance.getMap("user-cache");
userCache.remove(id);
}
// 分散ロック例
public void updateUserWithLock(Long userId, User updatedUser) {
FencedLock lock = hazelcastInstance.getCPSubsystem().getLock("user-lock-" + userId);
lock.lock();
try {
// クリティカルセクション
User currentUser = getUserById(userId);
// ユーザー更新処理
userRepository.save(updatedUser);
invalidateUser(userId);
} finally {
lock.unlock();
}
}
}
// 監視とメトリクス
@Component
public class HazelcastMonitoring {
@Autowired
private HazelcastInstance hazelcastInstance;
@Scheduled(fixedRate = 60000) // 1分毎
public void logClusterStatistics() {
Cluster cluster = hazelcastInstance.getCluster();
System.out.println("Cluster size: " + cluster.getMembers().size());
// メモリ使用状況
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
long usedMemory = heapUsage.getUsed() / 1024 / 1024; // MB
long maxMemory = heapUsage.getMax() / 1024 / 1024; // MB
System.out.println("Memory usage: " + usedMemory + "MB / " + maxMemory + "MB");
// マップ統計
IMap<String, Object> map = hazelcastInstance.getMap("user-cache");
LocalMapStats mapStats = map.getLocalMapStats();
System.out.println("Cache hits: " + mapStats.getHits());
System.out.println("Cache misses: " + mapStats.getMisses());
System.out.println("Cache size: " + map.size());
}
}