Elasticsearch

Search and analytics engine that returned to open source with AGPLv3 license in September 2024. Features PyTorch ML integration, enhanced vector search capabilities, and performance improvements through Lucene 9.

Monitoring ServerSearch EngineAnalytics EngineELK StackMachine LearningVector SearchReal-time Search

Server

Elasticsearch

Overview

Elasticsearch is a search and analytics engine that returned to open source with AGPLv3 license in September 2024. It features PyTorch ML integration, enhanced vector search capabilities, and performance improvements through Lucene 9. As the core component of the ELK stack, it continues growing with adaptation to the AI era through machine learning integration and enhanced vector search functionality.

Details

Elasticsearch gained attention with its return to open source in September 2024. It's adapting to the AI era with PyTorch machine learning integration and enhanced vector search functionality. As the core component of the ELK stack, it continues growth with real-time search, distributed architecture, and comprehensive analytics capabilities. The recent open source return has strengthened the developer community and accelerated innovation in search and analytics domains.

Key Technical Features

  • Distributed Search Engine: Horizontal scaling with automatic sharding and replication
  • Real-time Analytics: Near real-time search and aggregation capabilities
  • Machine Learning Integration: PyTorch ML integration for advanced analytics
  • Vector Search: Enhanced vector search for AI/ML applications
  • RESTful API: Comprehensive REST API for all operations
  • Schema-free: Dynamic mapping with flexible document structure

Use Cases

  • Log aggregation and analysis
  • Real-time search applications
  • Business intelligence and analytics
  • Security information and event management (SIEM)
  • Application performance monitoring
  • Content and document search

Pros and Cons

Pros

  • Open Source Return: AGPLv3 license ensuring community-driven development
  • High Performance: Optimized for speed with Lucene 9 improvements
  • Scalability: Horizontal scaling with automatic cluster management
  • Rich Query DSL: Powerful query language for complex searches
  • AI/ML Integration: Built-in machine learning capabilities
  • Ecosystem: Strong integration with Kibana, Logstash, and Beats

Cons

  • Resource Intensive: High memory and CPU requirements
  • Complex Configuration: Learning curve for optimal setup
  • Data Loss Risk: Potential data loss during cluster failures
  • License Complexity: Multiple license options can be confusing
  • Version Compatibility: Breaking changes between major versions
  • Operational Overhead: Requires expertise for production deployment

Reference Pages

Code Examples

Installation and Basic Setup

# Docker installation
docker pull docker.elastic.co/elasticsearch/elasticsearch:8.16.0

# Run single-node cluster
docker run -d \
  --name elasticsearch \
  -p 9200:9200 \
  -p 9300:9300 \
  -e "discovery.type=single-node" \
  -e "xpack.security.enabled=false" \
  -e "xpack.security.enrollment.enabled=false" \
  docker.elastic.co/elasticsearch/elasticsearch:8.16.0

# Verify installation
curl -X GET "localhost:9200/"

# Package installation (Debian/Ubuntu)
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
sudo apt-get update && sudo apt-get install elasticsearch

# Start service
sudo systemctl enable elasticsearch
sudo systemctl start elasticsearch

Basic Configuration

# elasticsearch.yml
cluster.name: production-cluster
node.name: node-1
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch

# Network settings
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300

# Discovery settings
discovery.type: zen
discovery.zen.ping.unicast.hosts: ["node1.example.com", "node2.example.com"]
discovery.zen.minimum_master_nodes: 2

# Cluster settings
cluster.initial_master_nodes: ["node-1", "node-2", "node-3"]

# Memory settings
bootstrap.memory_lock: true

# Security settings (if X-Pack enabled)
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.client_authentication: required
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: elastic-certificates.p12

# Monitoring
xpack.monitoring.collection.enabled: true

# Machine Learning
xpack.ml.enabled: true

# Index lifecycle management
xpack.ilm.enabled: true

# Watcher (alerting)
xpack.watcher.enabled: true

Index Management

# Create index with mapping
curl -X PUT "localhost:9200/logs" -H 'Content-Type: application/json' -d'
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1,
    "index.max_result_window": 50000
  },
  "mappings": {
    "properties": {
      "timestamp": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      },
      "level": {
        "type": "keyword"
      },
      "message": {
        "type": "text",
        "analyzer": "standard"
      },
      "service": {
        "type": "keyword"
      },
      "host": {
        "type": "keyword"
      },
      "tags": {
        "type": "keyword"
      },
      "response_time": {
        "type": "double"
      },
      "status_code": {
        "type": "integer"
      }
    }
  }
}'

# Create index template
curl -X PUT "localhost:9200/_index_template/logs_template" -H 'Content-Type: application/json' -d'
{
  "index_patterns": ["logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 1,
      "index.lifecycle.name": "logs_policy",
      "index.lifecycle.rollover_alias": "logs"
    },
    "mappings": {
      "properties": {
        "@timestamp": {"type": "date"},
        "level": {"type": "keyword"},
        "message": {"type": "text"},
        "service": {"type": "keyword"}
      }
    }
  }
}'

# Index lifecycle policy
curl -X PUT "localhost:9200/_ilm/policy/logs_policy" -H 'Content-Type: application/json' -d'
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_size": "5GB",
            "max_age": "1d"
          }
        }
      },
      "warm": {
        "min_age": "1d",
        "actions": {
          "shrink": {
            "number_of_shards": 1
          },
          "allocate": {
            "number_of_replicas": 0
          }
        }
      },
      "cold": {
        "min_age": "7d",
        "actions": {
          "allocate": {
            "number_of_replicas": 0
          }
        }
      },
      "delete": {
        "min_age": "30d"
      }
    }
  }
}'

Search and Analytics

# Basic search
curl -X GET "localhost:9200/logs/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "message": "error"
    }
  },
  "size": 10,
  "sort": [
    {
      "timestamp": {
        "order": "desc"
      }
    }
  ]
}'

# Complex search with filters and aggregations
curl -X GET "localhost:9200/logs/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "timestamp": {
              "gte": "now-1h"
            }
          }
        }
      ],
      "filter": [
        {
          "term": {
            "level": "ERROR"
          }
        }
      ]
    }
  },
  "aggs": {
    "errors_by_service": {
      "terms": {
        "field": "service",
        "size": 10
      }
    },
    "errors_over_time": {
      "date_histogram": {
        "field": "timestamp",
        "calendar_interval": "5m"
      }
    },
    "avg_response_time": {
      "avg": {
        "field": "response_time"
      }
    }
  },
  "size": 0
}'

# Vector search (for AI/ML applications)
curl -X GET "localhost:9200/documents/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
    "script_score": {
      "query": {"match_all": {}},
      "script": {
        "source": "cosineSimilarity(params.query_vector, '\''text_vector'\'') + 1.0",
        "params": {
          "query_vector": [0.1, 0.2, 0.3, 0.4, 0.5]
        }
      }
    }
  }
}'

# Multi-search
curl -X GET "localhost:9200/_msearch" -H 'Content-Type: application/json' -d'
{"index": "logs"}
{"query": {"term": {"level": "ERROR"}}, "size": 5}
{"index": "metrics"}
{"query": {"range": {"value": {"gte": 100}}}, "size": 5}
'

Monitoring and Alerting

# elasticsearch_monitor.py
from elasticsearch import Elasticsearch
import time
import logging
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ElasticsearchMonitor:
    def __init__(self, hosts=['localhost:9200']):
        self.es = Elasticsearch(hosts)
        
    def check_cluster_health(self):
        """Check cluster health status"""
        try:
            health = self.es.cluster.health()
            status = health['status']
            
            if status == 'red':
                logger.error(f"Cluster health is RED: {health}")
                return False
            elif status == 'yellow':
                logger.warning(f"Cluster health is YELLOW: {health}")
            else:
                logger.info(f"Cluster health is GREEN: {health['cluster_name']}")
            
            return True
            
        except Exception as e:
            logger.error(f"Failed to check cluster health: {e}")
            return False
    
    def check_index_size(self, index_pattern="*", max_size_gb=50):
        """Check index sizes"""
        try:
            stats = self.es.cat.indices(
                index=index_pattern, 
                format='json', 
                bytes='gb'
            )
            
            large_indices = []
            for index in stats:
                size_gb = float(index.get('store.size', '0').replace('gb', ''))
                if size_gb > max_size_gb:
                    large_indices.append({
                        'index': index['index'],
                        'size_gb': size_gb
                    })
            
            if large_indices:
                logger.warning(f"Large indices found: {large_indices}")
            
            return large_indices
            
        except Exception as e:
            logger.error(f"Failed to check index sizes: {e}")
            return []
    
    def monitor_query_performance(self, slow_threshold_ms=1000):
        """Monitor slow queries"""
        try:
            # Get slow query logs
            slow_queries = self.es.search(
                index='.monitoring-*',
                body={
                    "query": {
                        "bool": {
                            "must": [
                                {"range": {"took": {"gte": slow_threshold_ms}}},
                                {"range": {"@timestamp": {"gte": "now-5m"}}}
                            ]
                        }
                    },
                    "sort": [{"took": {"order": "desc"}}],
                    "size": 10
                }
            )
            
            if slow_queries['hits']['total']['value'] > 0:
                logger.warning(f"Found {slow_queries['hits']['total']['value']} slow queries")
                for hit in slow_queries['hits']['hits']:
                    source = hit['_source']
                    logger.warning(f"Slow query: {source.get('took')}ms - {source.get('query', {})}")
            
        except Exception as e:
            logger.error(f"Failed to monitor query performance: {e}")
    
    def check_disk_usage(self, threshold_percent=85):
        """Check disk usage on nodes"""
        try:
            nodes = self.es.cat.allocation(format='json')
            
            high_usage_nodes = []
            for node in nodes:
                disk_percent = int(node.get('disk.percent', '0'))
                if disk_percent > threshold_percent:
                    high_usage_nodes.append({
                        'node': node.get('node'),
                        'disk_percent': disk_percent,
                        'disk_used': node.get('disk.used'),
                        'disk_avail': node.get('disk.avail')
                    })
            
            if high_usage_nodes:
                logger.error(f"High disk usage nodes: {high_usage_nodes}")
            
            return high_usage_nodes
            
        except Exception as e:
            logger.error(f"Failed to check disk usage: {e}")
            return []
    
    def run_monitoring_cycle(self):
        """Run complete monitoring cycle"""
        logger.info("Starting Elasticsearch monitoring cycle")
        
        # Check cluster health
        self.check_cluster_health()
        
        # Check index sizes
        self.check_index_size()
        
        # Monitor query performance
        self.monitor_query_performance()
        
        # Check disk usage
        self.check_disk_usage()
        
        logger.info("Monitoring cycle completed")

if __name__ == "__main__":
    monitor = ElasticsearchMonitor()
    
    while True:
        try:
            monitor.run_monitoring_cycle()
            time.sleep(300)  # Run every 5 minutes
        except KeyboardInterrupt:
            logger.info("Monitoring stopped")
            break
        except Exception as e:
            logger.error(f"Monitoring error: {e}")
            time.sleep(60)

Security Configuration

# elasticsearch.yml (Security)
xpack.security.enabled: true
xpack.security.enrollment.enabled: true

# Transport layer security
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.client_authentication: required
xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12

# HTTP layer security
xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.keystore.path: certs/elastic-certificates.p12

# LDAP authentication
xpack.security.authc.realms.ldap.ldap1:
  order: 0
  url: "ldap://ldap.example.com:389"
  bind_dn: "cn=ldapuser, ou=users, o=services, dc=example, dc=com"
  user_search:
    base_dn: "dc=example,dc=com"
    filter: "(cn={0})"
  group_search:
    base_dn: "dc=example,dc=com"
  unmapped_groups_as_roles: false

Troubleshooting

# Check cluster status
curl -X GET "localhost:9200/_cluster/health?pretty"

# Check node information
curl -X GET "localhost:9200/_nodes?pretty"

# Check shard allocation
curl -X GET "localhost:9200/_cat/shards?v"

# Check pending tasks
curl -X GET "localhost:9200/_cluster/pending_tasks?pretty"

# Force merge indices
curl -X POST "localhost:9200/logs/_forcemerge?max_num_segments=1"

# Clear cache
curl -X POST "localhost:9200/_cache/clear"

# Restart node (graceful)
curl -X POST "localhost:9200/_cluster/nodes/_local/_shutdown"

# Recovery status
curl -X GET "localhost:9200/_recovery?pretty"

# Thread pool stats
curl -X GET "localhost:9200/_nodes/stats/thread_pool?pretty"

# Hot threads
curl -X GET "localhost:9200/_nodes/hot_threads"