Database
Apache Ignite
Overview
Apache Ignite is a distributed in-memory computing platform that balances fast data access through memory-centric architecture with persistence capabilities. It enables flexible query execution through SQL support and addresses high-performance requirements in large-scale data processing, finance, and telecommunications industries.
Details
Apache Ignite began development at GridGain Systems in 2007, was accepted as an Apache Incubator project in 2014, and graduated to a top-level project in 2015. As a memory-centric distributed computing platform, it integrates database, cache, and compute grid functionalities.
Key features of Apache Ignite:
- Memory-Centric Architecture: High-speed data processing using RAM as primary storage
- Persistence Support: Optional data persistence to disk
- SQL Support: ANSI SQL, JDBC/ODBC driver support
- ACID Transactions: Complete ACID-compliant transaction processing
- Compute Grid: Distributed computing and MapReduce
- Streaming: Real-time data stream processing
- Multi-language Support: Java, .NET, C++, Python support
- Horizontal Scaling: Dynamic cluster expansion and contraction
- High Availability: Automatic failover and data replication
- Compliance: ISO/IEC 9075 SQL standard compliance
Pros and Cons
Pros
- High Performance: Ultra-fast access through in-memory processing
- Flexibility: Choice between memory-only and persistence modes
- SQL Support: Intuitive data operations using standard SQL
- ACID Compliance: Guarantees data integrity and consistency
- Integrated Platform: Unified cache, DB, and compute functionalities
- Scalability: High throughput through horizontal scaling
- Open Source: Free to use under Apache license
- Enterprise Ready: Commercial support available
Cons
- Memory Usage: Requires large RAM resources
- Complexity: Complex configuration and tuning
- Community-Driven: Uncertainty after GridGain support termination
- Learning Curve: Requires understanding of distributed systems
- Performance Tuning: Detailed configuration needed for optimal performance
Key Links
Code Examples
Installation & Setup
# Binary download and installation
wget https://archive.apache.org/dist/ignite/2.16.0/apache-ignite-2.16.0-bin.zip
unzip apache-ignite-2.16.0-bin.zip
cd apache-ignite-2.16.0-bin
bin/ignite.sh
# Docker execution
docker run --name ignite --rm -p 10800:10800 -p 47100:47100 -p 47500:47500 \
apacheignite/ignite:2.16.0
# Docker Compose cluster configuration
version: '3.8'
services:
ignite-1:
image: apacheignite/ignite:2.16.0
environment:
- OPTION_LIBS=ignite-rest-http
- CONFIG_URI=https://raw.githubusercontent.com/apache/ignite/master/config/default-config.xml
ports:
- "10800:10800"
- "47100:47100"
ignite-2:
image: apacheignite/ignite:2.16.0
environment:
- OPTION_LIBS=ignite-rest-http
ports:
- "10801:10800"
- "47101:47100"
# Maven dependencies
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
<version>2.16.0</version>
</dependency>
# Gradle dependencies
implementation 'org.apache.ignite:ignite-core:2.16.0'
implementation 'org.apache.ignite:ignite-indexing:2.16.0'
Basic Operations (Cache & Computing)
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
public class IgniteBasicExample {
public static void main(String[] args) {
// Ignite configuration
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setClientMode(false); // Server mode
// Start Ignite instance
try (Ignite ignite = Ignition.start(cfg)) {
// Create cache
IgniteCache<Integer, String> cache = ignite.getOrCreateCache("myCache");
// Add data
cache.put(1, "John Doe");
cache.put(2, "Jane Smith");
cache.put(3, "Bob Johnson");
// Retrieve data
String value = cache.get(1);
System.out.println("Value for key 1: " + value);
// Check cache size
System.out.println("Cache size: " + cache.size());
// Bulk operations
Map<Integer, String> batch = new HashMap<>();
batch.put(10, "Alice Cooper");
batch.put(11, "Charlie Brown");
cache.putAll(batch);
// Display all data
for (Cache.Entry<Integer, String> entry : cache) {
System.out.println("Key: " + entry.getKey() + ", Value: " + entry.getValue());
}
}
}
}
SQL Functions and Database Operations
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.QueryEntity;
import java.util.Collections;
public class IgniteSQLExample {
// Person class definition
public static class Person {
private String name;
private int age;
private String department;
public Person(String name, int age, String department) {
this.name = name;
this.age = age;
this.department = department;
}
// getter/setter methods omitted
}
public static void main(String[] args) {
IgniteConfiguration cfg = new IgniteConfiguration();
// Cache configuration and SQL schema definition
CacheConfiguration<Integer, Person> cacheCfg = new CacheConfiguration<>("person");
QueryEntity qryEntity = new QueryEntity();
qryEntity.setKeyType(Integer.class.getName());
qryEntity.setValueType(Person.class.getName());
qryEntity.addQueryField("name", String.class.getName(), null);
qryEntity.addQueryField("age", Integer.class.getName(), null);
qryEntity.addQueryField("department", String.class.getName(), null);
qryEntity.setIndexes(Collections.singleton(new QueryIndex("age")));
cacheCfg.setQueryEntities(Collections.singletonList(qryEntity));
try (Ignite ignite = Ignition.start(cfg)) {
IgniteCache<Integer, Person> cache = ignite.getOrCreateCache(cacheCfg);
// Insert test data
cache.put(1, new Person("John Doe", 30, "Engineering"));
cache.put(2, new Person("Jane Smith", 25, "Sales"));
cache.put(3, new Person("Bob Johnson", 35, "Engineering"));
cache.put(4, new Person("Alice Cooper", 28, "HR"));
// Execute SQL query
SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "age >= ? AND department = ?");
QueryCursor<Cache.Entry<Integer, Person>> cursor = cache.query(qry.setArgs(25, "Engineering"));
System.out.println("Engineering employees aged 25 and above:");
for (Cache.Entry<Integer, Person> entry : cursor) {
Person person = entry.getValue();
System.out.println(person.getName() + " (" + person.getAge() + " years old)");
}
// SQL Fields Query (aggregation query)
SqlFieldsQuery fieldQry = new SqlFieldsQuery(
"SELECT department, COUNT(*), AVG(age) FROM Person GROUP BY department");
QueryCursor<List<?>> fieldCursor = cache.query(fieldQry);
System.out.println("\nDepartment statistics:");
for (List<?> row : fieldCursor) {
System.out.println("Department: " + row.get(0) + ", Count: " + row.get(1) + ", Average age: " + row.get(2));
}
// DDL query (table creation)
cache.query(new SqlFieldsQuery(
"CREATE TABLE employee (id INT PRIMARY KEY, name VARCHAR, salary DECIMAL) " +
"WITH \"template=replicated\"")).getAll();
// DML query (data insertion)
cache.query(new SqlFieldsQuery(
"INSERT INTO employee (id, name, salary) VALUES (?, ?, ?)").setArgs(1, "John Doe", 50000)).getAll();
}
}
}
Distributed Computing
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
public class IgniteComputeExample {
// Compute task class
public static class WordCountTask extends ComputeTaskSplitAdapter<String, Integer> {
@Override
public Collection<? extends ComputeJob> split(int gridSize, String text) {
List<ComputeJob> jobs = new ArrayList<>();
String[] words = text.split(" ");
// Split words to be processed on each node
int wordsPerJob = Math.max(1, words.length / gridSize);
for (int i = 0; i < words.length; i += wordsPerJob) {
int endIdx = Math.min(i + wordsPerJob, words.length);
String[] jobWords = Arrays.copyOfRange(words, i, endIdx);
jobs.add(new WordCountJob(jobWords));
}
return jobs;
}
@Override
public Integer reduce(List<ComputeJobResult> results) {
int totalCount = 0;
for (ComputeJobResult result : results) {
totalCount += result.<Integer>getData();
}
return totalCount;
}
}
// Job executed on each node
public static class WordCountJob extends ComputeJobAdapter {
private String[] words;
public WordCountJob(String[] words) {
this.words = words;
}
@Override
public Object execute() {
System.out.println("Processing on node: " + Arrays.toString(words));
return words.length;
}
}
public static void main(String[] args) {
try (Ignite ignite = Ignition.start()) {
// Execute compute task
String text = "This is an Apache Ignite distributed computing sample";
Integer result = ignite.compute().execute(WordCountTask.class, text);
System.out.println("Word count: " + result);
// Simple functional compute
Collection<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Parallel processing on each node
Collection<Integer> squares = ignite.compute().apply(
(Integer n) -> {
System.out.println("Computing on node: " + n + "^2");
return n * n;
},
numbers
);
System.out.println("Square results: " + squares);
// MapReduce style aggregation
Integer sum = ignite.compute().apply(
Arrays.asList(1, 2, 3, 4, 5),
(Collection<Integer> nums) -> nums.stream().mapToInt(Integer::intValue).sum()
).stream().mapToInt(Integer::intValue).sum();
System.out.println("Total sum: " + sum);
}
}
}
Persistence and Data Management
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.WALMode;
public class IgnitePersistenceExample {
public static void main(String[] args) {
IgniteConfiguration cfg = new IgniteConfiguration();
// Persistence configuration
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
// Data region configuration
DataRegionConfiguration dataRegionCfg = new DataRegionConfiguration();
dataRegionCfg.setName("default_region");
dataRegionCfg.setPersistenceEnabled(true); // Enable persistence
dataRegionCfg.setInitialSize(100L * 1024 * 1024); // 100MB
dataRegionCfg.setMaxSize(200L * 1024 * 1024); // 200MB
storageCfg.setDefaultDataRegionConfiguration(dataRegionCfg);
storageCfg.setWalMode(WALMode.LOG_ONLY); // WAL mode setting
storageCfg.setStoragePath("/opt/ignite/data"); // Data file path
storageCfg.setWalPath("/opt/ignite/wal"); // WAL file path
cfg.setDataStorageConfiguration(storageCfg);
try (Ignite ignite = Ignition.start(cfg)) {
// Activate cluster (required when persistence is enabled)
ignite.cluster().active(true);
// Create persistent cache
CacheConfiguration<Integer, String> cacheCfg = new CacheConfiguration<>("persistentCache");
cacheCfg.setDataRegionName("default_region");
IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cacheCfg);
// Write data
for (int i = 1; i <= 1000; i++) {
cache.put(i, "Persistent Value " + i);
}
System.out.println("Cache size: " + cache.size());
// Create snapshot
ignite.snapshot().createSnapshot("backup_" + System.currentTimeMillis()).get();
// Display cache metrics
CacheMetrics metrics = cache.metrics();
System.out.println("Cache hit ratio: " + metrics.getCacheHitPercentage());
System.out.println("Average put time: " + metrics.getAveragePutTime());
System.out.println("Average get time: " + metrics.getAverageGetTime());
}
}
}
Configuration and Optimization
<!-- default-config.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Cluster name -->
<property name="igniteInstanceName" value="production-cluster"/>
<!-- Network configuration -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>192.168.1.100:47500..47509</value>
<value>192.168.1.101:47500..47509</value>
<value>192.168.1.102:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<!-- Data storage configuration -->
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="persistenceEnabled" value="true"/>
<property name="initialSize" value="#{100L * 1024 * 1024}"/>
<property name="maxSize" value="#{2L * 1024 * 1024 * 1024}"/>
<property name="pageEvictionMode" value="RANDOM_LRU"/>
</bean>
</property>
<property name="walMode" value="LOG_ONLY"/>
<property name="checkpointFrequency" value="180000"/>
<property name="walHistorySize" value="20"/>
</bean>
</property>
<!-- Cache configuration -->
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="userCache"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="backups" value="1"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
<property name="expiryPolicyFactory">
<bean class="javax.cache.expiry.CreatedExpiryPolicy" factory-method="factoryOf">
<constructor-arg>
<bean class="javax.cache.expiry.Duration">
<constructor-arg value="HOURS"/>
<constructor-arg value="1"/>
</bean>
</constructor-arg>
</bean>
</property>
</bean>
</list>
</property>
<!-- Enable JVM metrics -->
<property name="metricsLogFrequency" value="60000"/>
<property name="queryThreadPoolSize" value="8"/>
<property name="systemThreadPoolSize" value="16"/>
</bean>
</beans>
Production Use Cases & Operations
// Spring Boot integration example
@Configuration
public class IgniteConfig {
@Bean
public Ignite igniteInstance() {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setIgniteInstanceName("spring-ignite");
// Data storage configuration
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
DataRegionConfiguration regionCfg = new DataRegionConfiguration();
regionCfg.setName("spring_region");
regionCfg.setPersistenceEnabled(true);
regionCfg.setInitialSize(100 * 1024 * 1024L);
regionCfg.setMaxSize(500 * 1024 * 1024L);
storageCfg.setDefaultDataRegionConfiguration(regionCfg);
cfg.setDataStorageConfiguration(storageCfg);
return Ignition.start(cfg);
}
}
@Service
public class DataService {
@Autowired
private Ignite ignite;
@PostConstruct
public void initializeCache() {
// Activate cluster
ignite.cluster().active(true);
// Initialize cache
CacheConfiguration<String, Object> cacheCfg = new CacheConfiguration<>("businessCache");
cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cacheCfg.setBackups(1);
ignite.getOrCreateCache(cacheCfg);
}
public void performTransaction() {
try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
IgniteCache<String, Object> cache = ignite.cache("businessCache");
// Operations within transaction
String currentValue = (String) cache.get("counter");
int newValue = currentValue != null ? Integer.parseInt(currentValue) + 1 : 1;
cache.put("counter", String.valueOf(newValue));
// Business logic processing
processBusinessLogic(newValue);
tx.commit();
} catch (Exception e) {
log.error("Transaction error: ", e);
throw new RuntimeException("Transaction failed", e);
}
}
// Streaming data processing
public void setupDataStreaming() {
try (IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer("streamCache")) {
streamer.perNodeBufferSize(1024);
streamer.perNodeParallelOperations(8);
// High-speed insertion of large data
for (int i = 0; i < 1000000; i++) {
streamer.addData(i, "Stream data " + i);
if (i % 10000 == 0) {
System.out.println("Processed: " + i);
}
}
}
}
// Monitoring and metrics
@Scheduled(fixedRate = 60000)
public void logMetrics() {
ClusterMetrics metrics = ignite.cluster().metrics();
System.out.println("Cluster size: " + ignite.cluster().nodes().size());
System.out.println("CPU usage: " + String.format("%.2f%%", metrics.getCurrentCpuLoad() * 100));
System.out.println("Heap memory usage: " +
String.format("%.2f%%", (double)metrics.getHeapMemoryUsed() / metrics.getHeapMemoryMaximum() * 100));
// Cache metrics
IgniteCache<String, Object> cache = ignite.cache("businessCache");
if (cache != null) {
CacheMetrics cacheMetrics = cache.metrics();
System.out.println("Cache hit ratio: " +
String.format("%.2f%%", cacheMetrics.getCacheHitPercentage()));
}
}
}