Skip to content

Core Components

The Elasticsearch integration consists of three main classes, each handling specific responsibilities while maintaining clean separation of concerns.

wp-content/themes/products/
├── inc/
│ ├── elasticsearch.php # Core Elasticsearch client
│ └── elasticsearch-console-manager.php # WordPress console interface
├── scripts/
│ └── elasticsearch-auto-sync.php # Main sync engine (CLI/Cron)
├── template-parts/
│ └── category-selector.php # AJAX category selector
├── page-search.php # Public search interface
├── page-elasticsearch-console.php # Admin monitoring console
└── functions.php # WordPress integration hooks
ComponentPurposeContextKey Features
ProductElasticsearchCore ES operationsSharedIndex management, search queries, Lithuanian analysis
ElasticsearchConsoleManagerDisplay & controlWordPress onlyStats, logs, manual actions
ElasticsearchAutoSyncManagerData synchronizationCLI/Cron onlyAutomated sync, error handling, progress tracking
page-search.phpPublic search interfaceWordPressUser search, filtering, pagination
page-elasticsearch-console.phpAdmin dashboardWordPressReal-time monitoring, manual controls
functions.phpWordPress integrationWordPressAJAX handlers, hooks, admin menus

File: inc/elasticsearch.php
Context: Shared (WordPress and CLI)
Purpose: Core Elasticsearch client for all low-level operations

class ProductElasticsearch {
// Connection configuration
private $host = '91.99.113.45';
private $port = 9200;
private $index_name = 'products';
private $cluster_name = 'balerina';
// Connection management
private $connection;
private $last_error;
// Key methods:
public function createIndex() // Creates index with Lithuanian mapping
public function bulkIndexProducts() // Bulk product indexing
public function searchProducts() // Advanced search with filters
public function isAvailable() // Connection health check
public function getDocumentCount() // Index statistics
public function deleteIndex() // Index deletion
public function getIndexStats() // Detailed statistics
}
public function createIndex() {
$mapping = [
'settings' => [
'analysis' => [
'analyzer' => [
'lithuanian_analyzer' => [
'tokenizer' => 'standard',
'filter' => [
'lowercase',
'asciifolding',
'lithuanian_snowball',
'edge_ngram_filter'
]
]
],
'filter' => [
'lithuanian_snowball' => [
'type' => 'snowball',
'language' => 'Lithuanian'
]
]
]
]
];
return $this->makeRequest('PUT', "/{$this->index_name}", $mapping);
}
public function bulkIndexProducts($products) {
$body = [];
foreach ($products as $product) {
// Index action
$body[] = [
'index' => [
'_index' => $this->index_name,
'_id' => $product['id']
]
];
// Document data
$body[] = $this->transformProductForElasticsearch($product);
}
return $this->makeRequest('POST', '/_bulk', $body);
}
public function searchProducts($query, $filters = [], $page = 1, $per_page = 20, $sort = []) {
$search_body = [
'from' => ($page - 1) * $per_page,
'size' => $per_page,
'query' => $this->buildSearchQuery($query, $filters),
'highlight' => [
'fields' => [
'title' => new \stdClass(),
'description' => new \stdClass(),
'brand' => new \stdClass()
]
]
];
if (!empty($sort)) {
$search_body['sort'] = $sort;
}
return $this->makeRequest('POST', "/{$this->index_name}/_search", $search_body);
}
private function makeRequest($method, $endpoint, $data = null) {
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => "http://{$this->host}:{$this->port}{$endpoint}",
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => 30,
CURLOPT_CONNECTTIMEOUT => 10,
CURLOPT_CUSTOMREQUEST => $method,
CURLOPT_HTTPHEADER => ['Content-Type: application/json']
]);
if ($data !== null) {
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
}
$response = curl_exec($ch);
$http_code = curl_getinfo($ch, CURLINFO_HTTP_CODE);
$error = curl_error($ch);
curl_close($ch);
if ($error) {
$this->last_error = "cURL error: $error";
return false;
}
if ($http_code >= 400) {
$this->last_error = "HTTP $http_code: $response";
return false;
}
return json_decode($response, true);
}

File: scripts/elasticsearch-auto-sync.php
Context: CLI/Cron only
Purpose: Main synchronization engine for automated data processing

class ElasticsearchAutoSyncManager {
// Configuration
private $batch_size = 500;
private $max_time = 240; // 4 minutes
private $memory_limit = '256M';
private $cron_interval = 15; // minutes
// State management
private $state_file = 'elasticsearch-sync-state.json';
private $log_file = 'elasticsearch-auto-sync.log';
// Key methods:
public function autoSync() // Main entry point
private function initializePhase() // Phase 1: Setup
private function bulkSyncPhase() // Phase 2: Bulk processing
private function incrementalSyncPhase() // Phase 3: Maintenance
private function loadState() // State persistence
private function saveState() // State persistence
}
private function initializePhase() {
$this->log("Starting initialization phase");
// Create Elasticsearch index
$elasticsearch = new ProductElasticsearch();
if (!$elasticsearch->createIndex()) {
$this->log("Failed to create Elasticsearch index", 'ERROR');
return false;
}
// Reset sync flags in database
$this->resetSyncFlags();
// Initialize state
$this->state = [
'phase' => 'bulk_syncing',
'index_created' => true,
'total_products' => $this->getTotalProductCount(),
'processed_products' => 0,
'last_processed_id' => 0,
'started_at' => date('c'),
'errors' => [],
'failed_batches' => []
];
$this->saveState();
$this->log("Initialization phase completed");
return true;
}
private function bulkSyncPhase() {
$this->log("Starting bulk sync phase");
$elasticsearch = new ProductElasticsearch();
$start_time = time();
while (time() - $start_time < $this->max_time) {
// Get next batch
$products = $this->getProductsBatch($this->state['last_processed_id'], $this->batch_size);
if (empty($products)) {
$this->log("No more products to process, moving to incremental phase");
$this->state['phase'] = 'incremental_syncing';
$this->saveState();
return true;
}
// Process batch
$success = $this->processBatch($products, $elasticsearch);
if ($success) {
$this->updateProgress(count($products));
} else {
$this->handleFailedBatch($products);
}
// Memory management
unset($products);
gc_collect_cycles();
}
$this->log("Bulk sync phase completed (time limit reached)");
return true;
}
private function incrementalSyncPhase() {
$this->log("Starting incremental sync phase");
$elasticsearch = new ProductElasticsearch();
// Get products that need syncing (added_search = false)
$products = $this->getProductsNeedingSync($this->batch_size);
if (empty($products)) {
$this->log("No products need incremental sync");
return true;
}
$this->log("Found " . count($products) . " products for incremental sync");
// Process in smaller batches for incremental sync
$batches = array_chunk($products, 100);
foreach ($batches as $batch) {
$success = $this->processBatch($batch, $elasticsearch);
if (!$success) {
$this->handleFailedBatch($batch);
}
}
$this->log("Incremental sync phase completed");
return true;
}
private function loadState() {
if (!file_exists($this->state_file)) {
return null;
}
$content = file_get_contents($this->state_file);
$state = json_decode($content, true);
if (json_last_error() !== JSON_ERROR_NONE) {
$this->log("Invalid state file, resetting", 'WARNING');
return null;
}
return $state;
}
private function saveState() {
$this->state['last_run'] = date('c');
$content = json_encode($this->state, JSON_PRETTY_PRINT);
if (file_put_contents($this->state_file, $content) === false) {
$this->log("Failed to save state file", 'ERROR');
}
}
private function handleFailedBatch($products) {
$batch_id = md5(serialize($products));
if (!isset($this->state['failed_batches'][$batch_id])) {
$this->state['failed_batches'][$batch_id] = [
'attempts' => 0,
'first_failure' => time(),
'last_failure' => time(),
'products' => array_column($products, 'id')
];
}
$this->state['failed_batches'][$batch_id]['attempts']++;
$this->state['failed_batches'][$batch_id]['last_failure'] = time();
// Skip batch after 3 attempts
if ($this->state['failed_batches'][$batch_id]['attempts'] >= 3) {
$this->log("Skipping batch after 3 failures: " . implode(',', $products), 'WARNING');
unset($this->state['failed_batches'][$batch_id]);
}
$this->saveState();
}

File: inc/elasticsearch-console-manager.php
Context: WordPress only
Purpose: Display and monitoring interface (no auto-execution)

class ElasticsearchConsoleManager {
// Configuration
private $state_file = 'elasticsearch-sync-state.json';
private $log_files = [
'auto_sync' => 'elasticsearch-auto-sync.log',
'regular_sync' => 'elasticsearch-sync.log'
];
// Key methods:
public function getStats() // Real-time statistics
public function getPerformanceEstimates() // ETA calculations
public function getLogContent() // Log file access
public function testConnection() // Health checks
public function triggerManualSync() // Background sync trigger
public function getSyncState() // State file access
public function resetSync() // Emergency reset
}
public function getStats() {
$state = $this->getSyncState();
if (!$state) {
return [
'error' => 'No sync state found',
'phase' => 'unknown',
'progress_percentage' => 0
];
}
$elasticsearch = new ProductElasticsearch();
$es_count = $elasticsearch->getDocumentCount();
return [
'phase' => $state['phase'],
'total_products' => $state['total_products'],
'indexed_products' => $state['processed_products'],
'elasticsearch_count' => $es_count,
'progress_percentage' => $this->calculateProgress($state),
'started_at' => $state['started_at'],
'last_run' => $state['last_run'],
'errors' => count($state['errors']),
'failed_batches' => count($state['failed_batches'])
];
}
public function getPerformanceEstimates() {
$state = $this->getSyncState();
if (!$state || $state['phase'] === 'incremental_syncing') {
return ['error' => 'No bulk sync in progress'];
}
$remaining_products = $state['total_products'] - $state['processed_products'];
$remaining_batches = ceil($remaining_products / 500); // Default batch size
// Method 1: Scheduled (based on cron frequency)
$scheduled_eta = ($remaining_batches * $this->cron_interval) / 60; // hours
// Method 2: Adaptive (based on actual performance)
$adaptive_eta = $this->calculateAdaptiveETA($state);
return [
'remaining_products' => $remaining_products,
'remaining_batches' => $remaining_batches,
'scheduled_eta_hours' => round($scheduled_eta, 1),
'adaptive_eta_hours' => round($adaptive_eta, 1),
'calculation_method' => $adaptive_eta > 0 ? 'adaptive' : 'scheduled',
'products_per_hour' => $this->calculateProductsPerHour($state)
];
}
private function calculateAdaptiveETA($state) {
if (!isset($state['started_at']) || !isset($state['processed_products'])) {
return 0;
}
$start_time = strtotime($state['started_at']);
$elapsed_hours = (time() - $start_time) / 3600;
if ($elapsed_hours < 0.1) { // Less than 6 minutes
return 0;
}
$products_per_hour = $state['processed_products'] / $elapsed_hours;
$remaining_products = $state['total_products'] - $state['processed_products'];
return $remaining_products / $products_per_hour;
}
public function getLogContent($log_type = 'auto_sync', $lines = 100) {
$log_file = $this->log_files[$log_type] ?? $this->log_files['auto_sync'];
$log_path = dirname(__FILE__) . "/../scripts/$log_file";
if (!file_exists($log_path)) {
return ['error' => 'Log file not found'];
}
$content = $this->tailFile($log_path, $lines);
return [
'content' => $content,
'file' => $log_file,
'size' => filesize($log_path),
'modified' => date('Y-m-d H:i:s', filemtime($log_path))
];
}
private function tailFile($file, $lines) {
$handle = fopen($file, 'r');
$line_counter = 0;
$content = '';
// Count total lines
while (fgets($handle) !== false) {
$line_counter++;
}
// Reset to beginning
rewind($handle);
// Skip to the last N lines
$skip_lines = max(0, $line_counter - $lines);
for ($i = 0; $i < $skip_lines; $i++) {
fgets($handle);
}
// Read remaining lines
while (($line = fgets($handle)) !== false) {
$content .= $line;
}
fclose($handle);
return $content;
}
// AJAX handlers in functions.php
add_action('wp_ajax_get_elasticsearch_stats', function() {
$console_manager = new ElasticsearchConsoleManager();
$stats = $console_manager->getStats();
wp_send_json_success($stats);
});
add_action('wp_ajax_get_elasticsearch_log', function() {
$console_manager = new ElasticsearchConsoleManager();
$log_type = sanitize_text_field($_POST['log_type']);
$lines = intval($_POST['lines']);
$log_content = $console_manager->getLogContent($log_type, $lines);
wp_send_json_success($log_content);
});
// Command line interface
if (php_sapi_name() === 'cli') {
$sync_manager = new ElasticsearchAutoSyncManager();
if (isset($argv[1])) {
switch ($argv[1]) {
case 'sync':
$sync_manager->autoSync();
break;
case 'stats':
$sync_manager->showStats();
break;
case 'reset':
$sync_manager->resetSync();
break;
}
} else {
$sync_manager->autoSync();
}
}