Database

Hazelcast

Overview

Hazelcast is a distributed in-memory computing platform that integrates distributed caching, distributed execution, and stream processing. It specializes in high-performance data processing for Java enterprise applications and plays a crucial role in building large-scale distributed systems in enterprise environments.

Details

Hazelcast was founded in 2008 by Fuad Malikov, Talip Ozturk, and Engin Inan as an open-source project. As a distributed in-memory computing platform, it integrates data grid, distributed cache, and distributed execution engine functionalities.

Key features of Hazelcast:

  • Distributed Data Structures: Map, Queue, List, Set, Lock, Semaphore, and other distributed data structures
  • In-Memory Data Grid (IMDG): Fast data access and processing
  • Distributed Computing: Computation distribution across the entire cluster
  • Stream Processing: Real-time data stream processing (Jet functionality)
  • SQL Support: ANSI SQL query support
  • Transactions: ACID-compliant transaction processing
  • High Availability: Automatic failover and data replication
  • Horizontal Scaling: Linear scaling by adding nodes
  • Multi-language Clients: Support for Java, C++, C#, Python, Node.js, Go
  • Enterprise Features: Security, Management Center, WAN replication

Pros and Cons

Pros

  • High Performance: Low latency through in-memory processing
  • Scalability: High throughput through horizontal scaling
  • Development Efficiency: Rich Java APIs and distributed data structures
  • Enterprise Ready: Robust operational management features
  • Integrated Functionality: Integration of cache, compute, and stream processing
  • High Availability: Automatic failure recovery and data protection
  • Standards Compliance: JCache and SQL standards support
  • Commercial Support: Enterprise support available

Cons

  • Memory Usage: Requires large memory resources
  • Complexity: Complex distributed system management and debugging
  • Cost: Enterprise edition licensing fees
  • Java-Centric: Strong dependency on Java environment
  • Learning Curve: Requires specialized knowledge in distributed system design

Key Links

Code Examples

Installation & Setup

# Java execution (binary download)
wget https://repository.hazelcast.com/download/hazelcast/hazelcast-5.3.7.zip
unzip hazelcast-5.3.7.zip
cd hazelcast-5.3.7
bin/hz-start

# Docker execution
docker run --name hazelcast --rm -p 5701:5701 hazelcast/hazelcast:5.3.7

# Docker Compose multi-node setup
version: '3.8'
services:
  hazelcast-1:
    image: hazelcast/hazelcast:5.3.7
    environment:
      - HZ_CLUSTERNAME=my-cluster
    ports:
      - "5701:5701"
  hazelcast-2:
    image: hazelcast/hazelcast:5.3.7
    environment:
      - HZ_CLUSTERNAME=my-cluster
    ports:
      - "5702:5701"

# Maven dependencies
<dependency>
    <groupId>com.hazelcast</groupId>
    <artifactId>hazelcast</artifactId>
    <version>5.3.7</version>
</dependency>

# Spring Boot integration
<dependency>
    <groupId>com.hazelcast</groupId>
    <artifactId>hazelcast-spring</artifactId>
    <version>5.3.7</version>
</dependency>

Basic Operations (Distributed Data Structures)

import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import com.hazelcast.collection.IQueue;
import com.hazelcast.collection.IList;

public class HazelcastBasicExample {
    public static void main(String[] args) {
        // Create Hazelcast instance
        Config config = new Config();
        config.setClusterName("my-cluster");
        HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
        
        // Distributed Map operations
        IMap<String, String> map = hz.getMap("my-distributed-map");
        map.put("key1", "value1");
        map.put("key2", "value2");
        System.out.println("Value: " + map.get("key1"));
        
        // Distributed Queue operations
        IQueue<String> queue = hz.getQueue("my-distributed-queue");
        queue.offer("item1");
        queue.offer("item2");
        System.out.println("Queue size: " + queue.size());
        String item = queue.poll();
        System.out.println("Polled item: " + item);
        
        // Distributed List operations
        IList<String> list = hz.getList("my-distributed-list");
        list.add("element1");
        list.add("element2");
        for (String element : list) {
            System.out.println("List element: " + element);
        }
        
        hz.shutdown();
    }
}

Client-Server Connection

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;

public class HazelcastClientExample {
    public static void main(String[] args) {
        // Client configuration
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setClusterName("my-cluster");
        clientConfig.getNetworkConfig().addAddress("127.0.0.1:5701");
        
        // Client connection
        HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
        
        // Distributed Map access
        IMap<Integer, String> customerMap = client.getMap("customers");
        customerMap.put(1, "John Doe");
        customerMap.put(2, "Jane Smith");
        customerMap.put(3, "Bob Johnson");
        
        // Data retrieval
        for (Map.Entry<Integer, String> entry : customerMap.entrySet()) {
            System.out.println("Customer " + entry.getKey() + ": " + entry.getValue());
        }
        
        // Conditional search (using predicates)
        Collection<String> customers = customerMap.values(
            Predicates.sql("this LIKE '%John%'")
        );
        System.out.println("Customers with John: " + customers);
        
        client.shutdown();
    }
}

SQL Query Functionality

import com.hazelcast.sql.SqlService;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;

public class HazelcastSQLExample {
    public static void main(String[] args) {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        
        // Create mapping
        SqlService sqlService = hz.getSql();
        sqlService.execute(
            "CREATE MAPPING employees (" +
            "  id INT," +
            "  name VARCHAR," +
            "  salary DECIMAL" +
            ") TYPE IMap OPTIONS (" +
            "  'keyFormat'='int'," +
            "  'valueFormat'='json-flat'" +
            ")"
        );
        
        // Data insertion
        sqlService.execute("INSERT INTO employees VALUES (1, 'John Doe', 50000)");
        sqlService.execute("INSERT INTO employees VALUES (2, 'Jane Smith', 60000)");
        sqlService.execute("INSERT INTO employees VALUES (3, 'Bob Johnson', 55000)");
        
        // SELECT query
        try (SqlResult result = sqlService.execute(
                "SELECT name, salary FROM employees WHERE salary > 52000")) {
            for (SqlRow row : result) {
                String name = row.getObject(0);
                BigDecimal salary = row.getObject(1);
                System.out.println(name + ": " + salary);
            }
        }
        
        // Aggregation query
        try (SqlResult result = sqlService.execute(
                "SELECT COUNT(*), AVG(salary) FROM employees")) {
            for (SqlRow row : result) {
                long count = row.getObject(0);
                BigDecimal avgSalary = row.getObject(1);
                System.out.println("Count: " + count + ", Average: " + avgSalary);
            }
        }
        
        hz.shutdown();
    }
}

Distributed Computing

import com.hazelcast.core.IExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Callable;
import java.io.Serializable;

public class HazelcastComputeExample {
    
    // Computation task class
    public static class SumTask implements Callable<Integer>, Serializable {
        private final int start;
        private final int end;
        
        public SumTask(int start, int end) {
            this.start = start;
            this.end = end;
        }
        
        @Override
        public Integer call() throws Exception {
            int sum = 0;
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            System.out.println("Computed sum from " + start + " to " + end + ": " + sum);
            return sum;
        }
    }
    
    public static void main(String[] args) throws Exception {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        
        // Distributed execution service
        IExecutorService executorService = hz.getExecutorService("my-executor");
        
        // Execute multiple tasks in distributed manner
        Future<Integer> future1 = executorService.submit(new SumTask(1, 1000));
        Future<Integer> future2 = executorService.submit(new SumTask(1001, 2000));
        Future<Integer> future3 = executorService.submit(new SumTask(2001, 3000));
        
        // Get results
        int result1 = future1.get();
        int result2 = future2.get();
        int result3 = future3.get();
        
        int totalSum = result1 + result2 + result3;
        System.out.println("Total sum: " + totalSum);
        
        // MapReduce task
        IMap<String, Integer> numbers = hz.getMap("numbers");
        for (int i = 1; i <= 100; i++) {
            numbers.put("key" + i, i);
        }
        
        // MapReduce aggregation
        int sum = numbers.aggregate(Aggregators.integerSum());
        System.out.println("Sum using MapReduce: " + sum);
        
        hz.shutdown();
    }
}

Configuration and Optimization

<!-- hazelcast.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.hazelcast.com/schema/config
           https://www.hazelcast.com/schema/config/hazelcast-config-5.3.xsd">
    
    <cluster-name>production-cluster</cluster-name>
    
    <!-- Network configuration -->
    <network>
        <port auto-increment="true" port-count="100">5701</port>
        <join>
            <multicast enabled="false"/>
            <tcp-ip enabled="true">
                <member>192.168.1.100</member>
                <member>192.168.1.101</member>
                <member>192.168.1.102</member>
            </tcp-ip>
        </join>
        <interfaces enabled="true">
            <interface>192.168.1.*</interface>
        </interfaces>
    </network>
    
    <!-- Map configuration -->
    <map name="users">
        <backup-count>1</backup-count>
        <async-backup-count>1</async-backup-count>
        <time-to-live-seconds>3600</time-to-live-seconds>
        <max-idle-seconds>1800</max-idle-seconds>
        <eviction-policy>LRU</eviction-policy>
        <max-size policy="PER_NODE">10000</max-size>
        <near-cache>
            <max-size>1000</max-size>
            <time-to-live-seconds>600</time-to-live-seconds>
            <eviction-policy>LFU</eviction-policy>
        </near-cache>
    </map>
    
    <!-- Persistence configuration -->
    <persistence enabled="true">
        <base-dir>/opt/hazelcast/persistence</base-dir>
        <backup-dir>/opt/hazelcast/backup</backup-dir>
        <parallelism>4</parallelism>
    </persistence>
    
    <!-- Jet streaming SQL -->
    <jet enabled="true" resource-upload-enabled="true">
        <instance>
            <cooperative-thread-count>8</cooperative-thread-count>
            <flow-control-period>100</flow-control-period>
            <backup-count>1</backup-count>
        </instance>
    </jet>
    
</hazelcast>

Production Use Cases & Operations

// Spring Boot integration example
@Configuration
@EnableHazelcast
public class HazelcastConfig {
    
    @Bean
    public Config hazelcastConfig() {
        Config config = new Config();
        config.setClusterName("spring-cluster");
        
        // Cache configuration
        MapConfig mapConfig = new MapConfig();
        mapConfig.setName("user-cache");
        mapConfig.setTimeToLiveSeconds(300);
        mapConfig.setMaxSizeConfig(new MaxSizeConfig(1000, MaxSizeConfig.MaxSizePolicy.PER_NODE));
        config.addMapConfig(mapConfig);
        
        return config;
    }
}

@Service
public class UserService {
    
    @Autowired
    private HazelcastInstance hazelcastInstance;
    
    public User getUserById(Long id) {
        IMap<Long, User> userCache = hazelcastInstance.getMap("user-cache");
        
        return userCache.computeIfAbsent(id, key -> {
            // Fetch user from database
            return userRepository.findById(key);
        });
    }
    
    public void invalidateUser(Long id) {
        IMap<Long, User> userCache = hazelcastInstance.getMap("user-cache");
        userCache.remove(id);
    }
    
    // Distributed lock example
    public void updateUserWithLock(Long userId, User updatedUser) {
        FencedLock lock = hazelcastInstance.getCPSubsystem().getLock("user-lock-" + userId);
        
        lock.lock();
        try {
            // Critical section
            User currentUser = getUserById(userId);
            // Update user process
            userRepository.save(updatedUser);
            invalidateUser(userId);
        } finally {
            lock.unlock();
        }
    }
}

// Monitoring and metrics
@Component
public class HazelcastMonitoring {
    
    @Autowired
    private HazelcastInstance hazelcastInstance;
    
    @Scheduled(fixedRate = 60000) // Every minute
    public void logClusterStatistics() {
        Cluster cluster = hazelcastInstance.getCluster();
        
        System.out.println("Cluster size: " + cluster.getMembers().size());
        
        // Memory usage
        MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
        MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
        long usedMemory = heapUsage.getUsed() / 1024 / 1024; // MB
        long maxMemory = heapUsage.getMax() / 1024 / 1024;   // MB
        
        System.out.println("Memory usage: " + usedMemory + "MB / " + maxMemory + "MB");
        
        // Map statistics
        IMap<String, Object> map = hazelcastInstance.getMap("user-cache");
        LocalMapStats mapStats = map.getLocalMapStats();
        System.out.println("Cache hits: " + mapStats.getHits());
        System.out.println("Cache misses: " + mapStats.getMisses());
        System.out.println("Cache size: " + map.size());
    }
}