Database
Hazelcast
Overview
Hazelcast is a distributed in-memory computing platform that integrates distributed caching, distributed execution, and stream processing. It specializes in high-performance data processing for Java enterprise applications and plays a crucial role in building large-scale distributed systems in enterprise environments.
Details
Hazelcast was founded in 2008 by Fuad Malikov, Talip Ozturk, and Engin Inan as an open-source project. As a distributed in-memory computing platform, it integrates data grid, distributed cache, and distributed execution engine functionalities.
Key features of Hazelcast:
- Distributed Data Structures: Map, Queue, List, Set, Lock, Semaphore, and other distributed data structures
- In-Memory Data Grid (IMDG): Fast data access and processing
- Distributed Computing: Computation distribution across the entire cluster
- Stream Processing: Real-time data stream processing (Jet functionality)
- SQL Support: ANSI SQL query support
- Transactions: ACID-compliant transaction processing
- High Availability: Automatic failover and data replication
- Horizontal Scaling: Linear scaling by adding nodes
- Multi-language Clients: Support for Java, C++, C#, Python, Node.js, Go
- Enterprise Features: Security, Management Center, WAN replication
Pros and Cons
Pros
- High Performance: Low latency through in-memory processing
- Scalability: High throughput through horizontal scaling
- Development Efficiency: Rich Java APIs and distributed data structures
- Enterprise Ready: Robust operational management features
- Integrated Functionality: Integration of cache, compute, and stream processing
- High Availability: Automatic failure recovery and data protection
- Standards Compliance: JCache and SQL standards support
- Commercial Support: Enterprise support available
Cons
- Memory Usage: Requires large memory resources
- Complexity: Complex distributed system management and debugging
- Cost: Enterprise edition licensing fees
- Java-Centric: Strong dependency on Java environment
- Learning Curve: Requires specialized knowledge in distributed system design
Key Links
Code Examples
Installation & Setup
# Java execution (binary download)
wget https://repository.hazelcast.com/download/hazelcast/hazelcast-5.3.7.zip
unzip hazelcast-5.3.7.zip
cd hazelcast-5.3.7
bin/hz-start
# Docker execution
docker run --name hazelcast --rm -p 5701:5701 hazelcast/hazelcast:5.3.7
# Docker Compose multi-node setup
version: '3.8'
services:
hazelcast-1:
image: hazelcast/hazelcast:5.3.7
environment:
- HZ_CLUSTERNAME=my-cluster
ports:
- "5701:5701"
hazelcast-2:
image: hazelcast/hazelcast:5.3.7
environment:
- HZ_CLUSTERNAME=my-cluster
ports:
- "5702:5701"
# Maven dependencies
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>5.3.7</version>
</dependency>
# Spring Boot integration
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast-spring</artifactId>
<version>5.3.7</version>
</dependency>
Basic Operations (Distributed Data Structures)
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import com.hazelcast.collection.IQueue;
import com.hazelcast.collection.IList;
public class HazelcastBasicExample {
public static void main(String[] args) {
// Create Hazelcast instance
Config config = new Config();
config.setClusterName("my-cluster");
HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
// Distributed Map operations
IMap<String, String> map = hz.getMap("my-distributed-map");
map.put("key1", "value1");
map.put("key2", "value2");
System.out.println("Value: " + map.get("key1"));
// Distributed Queue operations
IQueue<String> queue = hz.getQueue("my-distributed-queue");
queue.offer("item1");
queue.offer("item2");
System.out.println("Queue size: " + queue.size());
String item = queue.poll();
System.out.println("Polled item: " + item);
// Distributed List operations
IList<String> list = hz.getList("my-distributed-list");
list.add("element1");
list.add("element2");
for (String element : list) {
System.out.println("List element: " + element);
}
hz.shutdown();
}
}
Client-Server Connection
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
public class HazelcastClientExample {
public static void main(String[] args) {
// Client configuration
ClientConfig clientConfig = new ClientConfig();
clientConfig.setClusterName("my-cluster");
clientConfig.getNetworkConfig().addAddress("127.0.0.1:5701");
// Client connection
HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
// Distributed Map access
IMap<Integer, String> customerMap = client.getMap("customers");
customerMap.put(1, "John Doe");
customerMap.put(2, "Jane Smith");
customerMap.put(3, "Bob Johnson");
// Data retrieval
for (Map.Entry<Integer, String> entry : customerMap.entrySet()) {
System.out.println("Customer " + entry.getKey() + ": " + entry.getValue());
}
// Conditional search (using predicates)
Collection<String> customers = customerMap.values(
Predicates.sql("this LIKE '%John%'")
);
System.out.println("Customers with John: " + customers);
client.shutdown();
}
}
SQL Query Functionality
import com.hazelcast.sql.SqlService;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;
public class HazelcastSQLExample {
public static void main(String[] args) {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
// Create mapping
SqlService sqlService = hz.getSql();
sqlService.execute(
"CREATE MAPPING employees (" +
" id INT," +
" name VARCHAR," +
" salary DECIMAL" +
") TYPE IMap OPTIONS (" +
" 'keyFormat'='int'," +
" 'valueFormat'='json-flat'" +
")"
);
// Data insertion
sqlService.execute("INSERT INTO employees VALUES (1, 'John Doe', 50000)");
sqlService.execute("INSERT INTO employees VALUES (2, 'Jane Smith', 60000)");
sqlService.execute("INSERT INTO employees VALUES (3, 'Bob Johnson', 55000)");
// SELECT query
try (SqlResult result = sqlService.execute(
"SELECT name, salary FROM employees WHERE salary > 52000")) {
for (SqlRow row : result) {
String name = row.getObject(0);
BigDecimal salary = row.getObject(1);
System.out.println(name + ": " + salary);
}
}
// Aggregation query
try (SqlResult result = sqlService.execute(
"SELECT COUNT(*), AVG(salary) FROM employees")) {
for (SqlRow row : result) {
long count = row.getObject(0);
BigDecimal avgSalary = row.getObject(1);
System.out.println("Count: " + count + ", Average: " + avgSalary);
}
}
hz.shutdown();
}
}
Distributed Computing
import com.hazelcast.core.IExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Callable;
import java.io.Serializable;
public class HazelcastComputeExample {
// Computation task class
public static class SumTask implements Callable<Integer>, Serializable {
private final int start;
private final int end;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
System.out.println("Computed sum from " + start + " to " + end + ": " + sum);
return sum;
}
}
public static void main(String[] args) throws Exception {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
// Distributed execution service
IExecutorService executorService = hz.getExecutorService("my-executor");
// Execute multiple tasks in distributed manner
Future<Integer> future1 = executorService.submit(new SumTask(1, 1000));
Future<Integer> future2 = executorService.submit(new SumTask(1001, 2000));
Future<Integer> future3 = executorService.submit(new SumTask(2001, 3000));
// Get results
int result1 = future1.get();
int result2 = future2.get();
int result3 = future3.get();
int totalSum = result1 + result2 + result3;
System.out.println("Total sum: " + totalSum);
// MapReduce task
IMap<String, Integer> numbers = hz.getMap("numbers");
for (int i = 1; i <= 100; i++) {
numbers.put("key" + i, i);
}
// MapReduce aggregation
int sum = numbers.aggregate(Aggregators.integerSum());
System.out.println("Sum using MapReduce: " + sum);
hz.shutdown();
}
}
Configuration and Optimization
<!-- hazelcast.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.hazelcast.com/schema/config
https://www.hazelcast.com/schema/config/hazelcast-config-5.3.xsd">
<cluster-name>production-cluster</cluster-name>
<!-- Network configuration -->
<network>
<port auto-increment="true" port-count="100">5701</port>
<join>
<multicast enabled="false"/>
<tcp-ip enabled="true">
<member>192.168.1.100</member>
<member>192.168.1.101</member>
<member>192.168.1.102</member>
</tcp-ip>
</join>
<interfaces enabled="true">
<interface>192.168.1.*</interface>
</interfaces>
</network>
<!-- Map configuration -->
<map name="users">
<backup-count>1</backup-count>
<async-backup-count>1</async-backup-count>
<time-to-live-seconds>3600</time-to-live-seconds>
<max-idle-seconds>1800</max-idle-seconds>
<eviction-policy>LRU</eviction-policy>
<max-size policy="PER_NODE">10000</max-size>
<near-cache>
<max-size>1000</max-size>
<time-to-live-seconds>600</time-to-live-seconds>
<eviction-policy>LFU</eviction-policy>
</near-cache>
</map>
<!-- Persistence configuration -->
<persistence enabled="true">
<base-dir>/opt/hazelcast/persistence</base-dir>
<backup-dir>/opt/hazelcast/backup</backup-dir>
<parallelism>4</parallelism>
</persistence>
<!-- Jet streaming SQL -->
<jet enabled="true" resource-upload-enabled="true">
<instance>
<cooperative-thread-count>8</cooperative-thread-count>
<flow-control-period>100</flow-control-period>
<backup-count>1</backup-count>
</instance>
</jet>
</hazelcast>
Production Use Cases & Operations
// Spring Boot integration example
@Configuration
@EnableHazelcast
public class HazelcastConfig {
@Bean
public Config hazelcastConfig() {
Config config = new Config();
config.setClusterName("spring-cluster");
// Cache configuration
MapConfig mapConfig = new MapConfig();
mapConfig.setName("user-cache");
mapConfig.setTimeToLiveSeconds(300);
mapConfig.setMaxSizeConfig(new MaxSizeConfig(1000, MaxSizeConfig.MaxSizePolicy.PER_NODE));
config.addMapConfig(mapConfig);
return config;
}
}
@Service
public class UserService {
@Autowired
private HazelcastInstance hazelcastInstance;
public User getUserById(Long id) {
IMap<Long, User> userCache = hazelcastInstance.getMap("user-cache");
return userCache.computeIfAbsent(id, key -> {
// Fetch user from database
return userRepository.findById(key);
});
}
public void invalidateUser(Long id) {
IMap<Long, User> userCache = hazelcastInstance.getMap("user-cache");
userCache.remove(id);
}
// Distributed lock example
public void updateUserWithLock(Long userId, User updatedUser) {
FencedLock lock = hazelcastInstance.getCPSubsystem().getLock("user-lock-" + userId);
lock.lock();
try {
// Critical section
User currentUser = getUserById(userId);
// Update user process
userRepository.save(updatedUser);
invalidateUser(userId);
} finally {
lock.unlock();
}
}
}
// Monitoring and metrics
@Component
public class HazelcastMonitoring {
@Autowired
private HazelcastInstance hazelcastInstance;
@Scheduled(fixedRate = 60000) // Every minute
public void logClusterStatistics() {
Cluster cluster = hazelcastInstance.getCluster();
System.out.println("Cluster size: " + cluster.getMembers().size());
// Memory usage
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
long usedMemory = heapUsage.getUsed() / 1024 / 1024; // MB
long maxMemory = heapUsage.getMax() / 1024 / 1024; // MB
System.out.println("Memory usage: " + usedMemory + "MB / " + maxMemory + "MB");
// Map statistics
IMap<String, Object> map = hazelcastInstance.getMap("user-cache");
LocalMapStats mapStats = map.getLocalMapStats();
System.out.println("Cache hits: " + mapStats.getHits());
System.out.println("Cache misses: " + mapStats.getMisses());
System.out.println("Cache size: " + map.size());
}
}