Hazelcast

Java向けインメモリデータグリッド。分散キャッシュとリアルタイム処理を統合。数百ノードにスケールし、100万トピック対応。

キャッシュサーバーインメモリデータグリッド分散コンピューティングJavaリアルタイム処理

キャッシュサーバー

Hazelcast

概要

Hazelcastは「Java向けインメモリデータグリッド」として開発された、分散キャッシュとリアルタイム処理を統合した企業向けプラットフォームです。単なるキャッシュサーバーを超えて、分散データ管理、ストリーム処理、分散コンピューティングを包括的に提供。Java環境での分散アプリケーション開発において重要な地位を占め、数百ノードにスケールし、100万トピック対応の強力な分散データ処理能力を実現しています。

詳細

Hazelcast 2025年版はエンタープライズJava環境における分散データ管理の決定版として確固たる地位を維持しています。20年近い開発実績により成熟したAPIと業界標準の安定性を誇り、マイクロサービスアーキテクチャの普及により需要が急拡大。分散キャッシング、リアルタイムストリーム処理、分散コンピューティングを単一プラットフォームで統合し、従来の複数ツール構成を大幅にシンプル化します。Java 8から最新Java LTSまで幅広いバージョンサポートを提供し、Spring Boot、Kubernetes、Docker環境との優れた統合性を実現しています。

主な特徴

  • インメモリデータグリッド: 分散環境での高速データアクセスと処理
  • 分散コンピューティング: データローカリティを活用した効率的な計算処理
  • ストリーム処理(Jet): リアルタイムデータパイプラインとイベント処理
  • SQL対応: 分散データに対するSQL クエリとJOIN操作
  • 管理センター: 包括的な監視、管理、可視化ツール
  • 線形スケーラビリティ: 数百ノードまでの水平スケーリング

メリット・デメリット

メリット

  • Java生態系での圧倒的な実績と豊富な統合ライブラリ
  • 分散キャッシュ、計算、ストリーム処理の統合プラットフォーム
  • エンタープライズ向けの包括的な監視・管理機能
  • Spring Boot、Microservices、Kubernetes環境での優れた統合
  • データローカリティによる高パフォーマンスな分散処理
  • ACID トランザクションとSQL対応による高い信頼性

デメリット

  • Java環境への依存で他言語エコシステムとの統合制約
  • 複雑な機能セットによる学習コストと運用コストの高さ
  • 大規模クラスター運用時のGC(ガベージコレクション)チューニング必要
  • 単純なキャッシュ用途には過剰な機能とリソース消費
  • ライセンス体系が複雑で企業向け機能には有償版が必要
  • メモリ使用量が多く、大容量データには不向き

参考ページ

書き方の例

基本セットアップとHazelcastインスタンス作成

// Maven依存関係
<!--
<dependency>
    <groupId>com.hazelcast</groupId>
    <artifactId>hazelcast</artifactId>
    <version>5.3.6</version>
</dependency>
-->

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.config.Config;

// デフォルト設定でHazelcastインスタンス作成
HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();

// カスタム設定でインスタンス作成
Config config = new Config();
config.setClusterName("my-cluster");
config.getNetworkConfig().setPort(5701);
config.getNetworkConfig().setPortAutoIncrement(true);

HazelcastInstance customHazelcast = Hazelcast.newHazelcastInstance(config);

// 分散マップの取得と基本操作
Map<String, String> distributedMap = hazelcast.getMap("my-distributed-map");
distributedMap.put("key1", "value1");
String value = distributedMap.get("key1");
System.out.println("Retrieved value: " + value);

// 分散リストの作成と操作
List<String> distributedList = hazelcast.getList("my-list");
distributedList.add("item1");
distributedList.add("item2");
System.out.println("List size: " + distributedList.size());

// 分散ロック
Lock distributedLock = hazelcast.getLock("my-lock");
distributedLock.lock();
try {
    // クリティカルセクション
    System.out.println("Lock acquired, performing work...");
} finally {
    distributedLock.unlock();
}

// 適切なシャットダウン
hazelcast.shutdown();

分散キャッシュとNear Cache設定

import com.hazelcast.config.*;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;

// キャッシュ設定付きのHazelcast設定
Config config = new Config();

// 分散マップ設定
MapConfig mapConfig = new MapConfig("user-cache");
mapConfig.setTimeToLiveSeconds(300); // 5分でTTL
mapConfig.setMaxIdleSeconds(120);    // 2分でアイドルタイムアウト
mapConfig.setBackupCount(1);         // バックアップ数
mapConfig.setAsyncBackupCount(1);    // 非同期バックアップ数

// Near Cache設定(ローカルキャッシュ)
NearCacheConfig nearCacheConfig = new NearCacheConfig();
nearCacheConfig.setInMemoryFormat(InMemoryFormat.OBJECT);
nearCacheConfig.setInvalidateOnChange(true);
nearCacheConfig.setTimeToLiveSeconds(60);
nearCacheConfig.setCacheLocalEntries(false);

// エビクション設定
EvictionConfig evictionConfig = new EvictionConfig();
evictionConfig.setEvictionPolicy(EvictionPolicy.LFU);
evictionConfig.setMaxSizePolicy(MaxSizePolicy.ENTRY_COUNT);
evictionConfig.setSize(1000);
nearCacheConfig.setEvictionConfig(evictionConfig);

mapConfig.setNearCacheConfig(nearCacheConfig);
config.addMapConfig(mapConfig);

HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(config);

// 分散キャッシュ操作
IMap<String, User> userCache = hazelcast.getMap("user-cache");

// ユーザーデータのキャッシュ
User user = new User("12345", "田中太郎", "[email protected]");
userCache.put(user.getId(), user);

// 条件付きPUT(キーが存在しない場合のみ)
User existingUser = userCache.putIfAbsent("67890", 
    new User("67890", "佐藤花子", "[email protected]"));

// TTL指定でのPUT(個別にTTL設定)
userCache.put("temp-user", user, 30, TimeUnit.SECONDS);

// バッチ操作
Map<String, User> batchUsers = new HashMap<>();
batchUsers.put("user1", new User("user1", "山田一郎", "[email protected]"));
batchUsers.put("user2", new User("user2", "鈴木二郎", "[email protected]"));
userCache.putAll(batchUsers);

// 検索とフィルタリング
Collection<User> activeUsers = userCache.values(
    Predicates.sql("active = true")
);

// 統計情報取得
LocalMapStats stats = userCache.getLocalMapStats();
System.out.println("Hit ratio: " + stats.getHitRatio());
System.out.println("Entry count: " + stats.getOwnedEntryCount());

// User クラス(簡単な例)
class User implements Serializable {
    private String id;
    private String name;
    private String email;
    private boolean active = true;
    
    public User(String id, String name, String email) {
        this.id = id;
        this.name = name;
        this.email = email;
    }
    
    // getters and setters
    public String getId() { return id; }
    public String getName() { return name; }
    public String getEmail() { return email; }
    public boolean isActive() { return active; }
}

分散コンピューティングとExecutorService

import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.Member;
import java.util.concurrent.Future;
import java.util.concurrent.Callable;
import java.io.Serializable;

// 分散実行サービスの取得
IExecutorService executor = hazelcast.getExecutorService("my-executor");

// Callableタスクの定義
class DataProcessingTask implements Callable<String>, Serializable {
    private String data;
    
    public DataProcessingTask(String data) {
        this.data = data;
    }
    
    @Override
    public String call() throws Exception {
        // 重い処理のシミュレーション
        Thread.sleep(1000);
        return "Processed: " + data + " on " + 
               Thread.currentThread().getName();
    }
}

// 特定のメンバーでタスク実行
Set<Member> members = hazelcast.getCluster().getMembers();
Member targetMember = members.iterator().next();
Future<String> result = executor.submitToMember(
    new DataProcessingTask("important-data"), 
    targetMember
);

System.out.println("Result: " + result.get());

// キーの所有者でタスク実行(データローカリティ)
IMap<String, String> dataMap = hazelcast.getMap("data-map");
dataMap.put("customer-123", "customer data");

Future<String> localResult = executor.submitToKeyOwner(
    new KeyBasedTask("customer-123"), 
    "customer-123"
);

// 全メンバーでタスク並列実行
class ClusterStatsTask implements Callable<Integer>, Serializable {
    @Override
    public Integer call() throws Exception {
        // ローカルデータの統計を計算
        HazelcastInstance localInstance = Hazelcast.getHazelcastInstanceByName("my-instance");
        IMap<String, String> localMap = localInstance.getMap("data-map");
        return localMap.getLocalMapStats().getOwnedEntryCount();
    }
}

Map<Member, Future<Integer>> allResults = executor.submitToAllMembers(
    new ClusterStatsTask()
);

int totalEntries = 0;
for (Future<Integer> future : allResults.values()) {
    totalEntries += future.get();
}
System.out.println("Total entries across cluster: " + totalEntries);

// キーベースタスクの例
class KeyBasedTask implements Callable<String>, Serializable {
    private String key;
    
    public KeyBasedTask(String key) {
        this.key = key;
    }
    
    @Override
    public String call() throws Exception {
        HazelcastInstance instance = Hazelcast.getHazelcastInstanceByName("my-instance");
        IMap<String, String> map = instance.getMap("data-map");
        
        // キーに関連するローカルデータ処理
        String data = map.get(key);
        return "Processed " + key + ": " + data;
    }
}

SQL操作とリアルタイムクエリ

import com.hazelcast.sql.SqlService;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;

// SQL サービスの取得
SqlService sql = hazelcast.getSql();

// データマッピングの作成
String createMapping = """
    CREATE MAPPING IF NOT EXISTS employees (
        id BIGINT,
        name VARCHAR,
        department VARCHAR,
        salary DECIMAL,
        hire_date DATE
    )
    TYPE IMap
    OPTIONS (
        'keyFormat' = 'bigint',
        'valueFormat' = 'json'
    )
    """;

sql.execute(createMapping);

// サンプルデータの挿入
sql.execute("INSERT INTO employees VALUES (1, '田中太郎', 'Engineering', 75000, '2020-01-15')");
sql.execute("INSERT INTO employees VALUES (2, '佐藤花子', 'Marketing', 65000, '2019-03-20')");
sql.execute("INSERT INTO employees VALUES (3, '鈴木一郎', 'Engineering', 80000, '2021-07-10')");

// 基本的なSELECTクエリ
try (SqlResult result = sql.execute("SELECT * FROM employees WHERE department = 'Engineering'")) {
    for (SqlRow row : result) {
        System.out.printf("ID: %d, Name: %s, Salary: %s%n",
            row.getObject("id"),
            row.getObject("name"),
            row.getObject("salary"));
    }
}

// 集計クエリ
try (SqlResult result = sql.execute("""
    SELECT department, COUNT(*) as employee_count, AVG(salary) as avg_salary
    FROM employees 
    GROUP BY department
    """)) {
    
    for (SqlRow row : result) {
        System.out.printf("Department: %s, Count: %d, Avg Salary: %.2f%n",
            row.getObject("department"),
            row.getObject("employee_count"),
            row.getObject("avg_salary"));
    }
}

// ストリーミングクエリ(リアルタイム処理)
String streamingQuery = """
    CREATE JOB employee_salary_monitor AS
    SELECT department, COUNT(*) as count, AVG(salary) as avg_salary
    FROM TABLE(IMPOSE_ORDER(TABLE employees, DESCRIPTOR(hire_date), INTERVAL '1' MINUTE))
    GROUP BY department, TUMBLE(hire_date, INTERVAL '1' HOUR)
    """;

sql.execute(streamingQuery);

// 継続的クエリキャッシュ(Continuous Query Cache)
import com.hazelcast.map.QueryCache;
import com.hazelcast.query.Predicates;

IMap<String, Employee> employeeMap = hazelcast.getMap("employees");

// 高給取り社員のクエリキャッシュ
QueryCache<String, Employee> highSalaryCache = employeeMap.getQueryCache(
    "high-salary-employees",
    Predicates.sql("salary > 70000"),
    true
);

// クエリキャッシュにリスナー追加
highSalaryCache.addEntryListener(new EntryListener<String, Employee>() {
    @Override
    public void entryAdded(EntryEvent<String, Employee> event) {
        System.out.println("High salary employee added: " + event.getValue().getName());
    }
    
    @Override
    public void entryUpdated(EntryEvent<String, Employee> event) {
        System.out.println("High salary employee updated: " + event.getValue().getName());
    }
    
    @Override
    public void entryRemoved(EntryEvent<String, Employee> event) {
        System.out.println("High salary employee removed: " + event.getOldValue().getName());
    }
}, false);

Spring Boot統合とマイクロサービス連携

// Spring Boot設定 (application.yml)
/*
hazelcast:
  cluster-name: microservices-cluster
  network:
    port: 5701
    port-auto-increment: true
    join:
      multicast:
        enabled: false
      kubernetes:
        enabled: true
        service-name: hazelcast-service
  map:
    user-sessions:
      time-to-live-seconds: 1800
      backup-count: 1
*/

// Spring Boot Configuration クラス
@Configuration
@EnableCaching
public class HazelcastConfig {
    
    @Bean
    public Config hazelcastConfig() {
        Config config = new Config();
        config.setClusterName("microservices-cluster");
        
        // マップ設定
        MapConfig sessionConfig = new MapConfig("user-sessions");
        sessionConfig.setTimeToLiveSeconds(1800); // 30分
        sessionConfig.setBackupCount(1);
        config.addMapConfig(sessionConfig);
        
        // Kubernetes ディスカバリー設定
        config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
        config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true)
              .setProperty("service-name", "hazelcast-service");
        
        return config;
    }
    
    @Bean
    public HazelcastInstance hazelcastInstance(Config config) {
        return Hazelcast.newHazelcastInstance(config);
    }
    
    @Bean
    public CacheManager cacheManager(HazelcastInstance hazelcastInstance) {
        return new HazelcastCacheManager(hazelcastInstance);
    }
}

// キャッシュ機能付きサービス
@Service
public class UserService {
    
    @Autowired
    private UserRepository userRepository;
    
    @Cacheable(value = "user-cache", key = "#userId")
    public User getUserById(String userId) {
        System.out.println("Fetching user from database: " + userId);
        return userRepository.findById(userId)
                .orElseThrow(() -> new UserNotFoundException(userId));
    }
    
    @CacheEvict(value = "user-cache", key = "#user.id")
    public User updateUser(User user) {
        return userRepository.save(user);
    }
    
    @CacheEvict(value = "user-cache", allEntries = true)
    public void clearAllUserCache() {
        System.out.println("All user cache cleared");
    }
}

// 分散セッション管理
@RestController
public class SessionController {
    
    @Autowired
    private HazelcastInstance hazelcastInstance;
    
    @PostMapping("/session/create")
    public ResponseEntity<String> createSession(@RequestBody LoginRequest request) {
        String sessionId = UUID.randomUUID().toString();
        
        IMap<String, UserSession> sessions = hazelcastInstance.getMap("user-sessions");
        
        UserSession session = new UserSession();
        session.setUserId(request.getUserId());
        session.setCreatedAt(LocalDateTime.now());
        session.setLastAccessTime(LocalDateTime.now());
        
        sessions.put(sessionId, session, 30, TimeUnit.MINUTES);
        
        return ResponseEntity.ok(sessionId);
    }
    
    @GetMapping("/session/{sessionId}")
    public ResponseEntity<UserSession> getSession(@PathVariable String sessionId) {
        IMap<String, UserSession> sessions = hazelcastInstance.getMap("user-sessions");
        
        UserSession session = sessions.get(sessionId);
        if (session != null) {
            // アクセス時間更新
            session.setLastAccessTime(LocalDateTime.now());
            sessions.put(sessionId, session, 30, TimeUnit.MINUTES);
            return ResponseEntity.ok(session);
        } else {
            return ResponseEntity.notFound().build();
        }
    }
}

// 分散イベント処理
@Component
public class EventProcessor {
    
    @Autowired
    private HazelcastInstance hazelcastInstance;
    
    @PostConstruct
    public void setupEventProcessing() {
        // 分散トピックでのイベント配信
        ITopic<OrderEvent> orderTopic = hazelcastInstance.getTopic("order-events");
        
        orderTopic.addMessageListener(message -> {
            OrderEvent event = message.getMessageObject();
            System.out.println("Processing order event: " + event.getOrderId());
            
            // 注文処理ロジック
            processOrderEvent(event);
        });
    }
    
    @EventListener
    public void handleUserAction(UserActionEvent event) {
        ITopic<UserActionEvent> topic = hazelcastInstance.getTopic("user-actions");
        topic.publish(event);
    }
    
    private void processOrderEvent(OrderEvent event) {
        // 分散ロックを使用した重複処理防止
        Lock orderLock = hazelcastInstance.getLock("order-lock-" + event.getOrderId());
        
        if (orderLock.tryLock(5, TimeUnit.SECONDS)) {
            try {
                // 注文処理ロジック
                System.out.println("Processing order: " + event.getOrderId());
            } finally {
                orderLock.unlock();
            }
        }
    }
}

クラスター監視と管理センター統合

// 管理・監視用のクラスタリング設定
@Component
public class HazelcastMonitoringService {
    
    @Autowired
    private HazelcastInstance hazelcastInstance;
    
    public ClusterHealthInfo getClusterHealth() {
        Cluster cluster = hazelcastInstance.getCluster();
        
        ClusterHealthInfo health = new ClusterHealthInfo();
        health.setMemberCount(cluster.getMembers().size());
        health.setClusterState(cluster.getClusterState().toString());
        health.setClusterVersion(cluster.getClusterVersion().toString());
        
        // 各メンバーの統計
        for (Member member : cluster.getMembers()) {
            MemberHealthStats memberStats = new MemberHealthStats();
            memberStats.setAddress(member.getAddress().toString());
            memberStats.setUuid(member.getUuid().toString());
            
            health.addMemberStats(memberStats);
        }
        
        return health;
    }
    
    public Map<String, Object> getMapStatistics(String mapName) {
        IMap<?, ?> map = hazelcastInstance.getMap(mapName);
        LocalMapStats stats = map.getLocalMapStats();
        
        Map<String, Object> statistics = new HashMap<>();
        statistics.put("ownedEntryCount", stats.getOwnedEntryCount());
        statistics.put("backupEntryCount", stats.getBackupEntryCount());
        statistics.put("hits", stats.getHits());
        statistics.put("hitRatio", stats.getHitRatio());
        statistics.put("putOperationCount", stats.getPutOperationCount());
        statistics.put("getOperationCount", stats.getGetOperationCount());
        statistics.put("removeOperationCount", stats.getRemoveOperationCount());
        statistics.put("totalPutLatency", stats.getTotalPutLatency());
        statistics.put("totalGetLatency", stats.getTotalGetLatency());
        statistics.put("maxPutLatency", stats.getMaxPutLatency());
        statistics.put("maxGetLatency", stats.getMaxGetLatency());
        
        return statistics;
    }
    
    @Scheduled(fixedRate = 60000) // 1分ごと
    public void logClusterStatistics() {
        ClusterHealthInfo health = getClusterHealth();
        System.out.println("Cluster Health - Members: " + health.getMemberCount() 
                         + ", State: " + health.getClusterState());
        
        // 主要マップの統計情報
        String[] monitoredMaps = {"user-cache", "user-sessions", "order-cache"};
        for (String mapName : monitoredMaps) {
            Map<String, Object> stats = getMapStatistics(mapName);
            System.out.println("Map [" + mapName + "] - Entries: " 
                             + stats.get("ownedEntryCount") + ", Hit Ratio: " 
                             + stats.get("hitRatio"));
        }
    }
}