Skip to content

Monitoring & Management

The Elasticsearch integration includes a comprehensive monitoring and management system with real-time console, progress tracking, performance metrics, and automated alerting.

The monitoring console provides comprehensive oversight of the Elasticsearch synchronization process:

  • Current sync phase (Initialize, Bulk, Incremental)
  • Progress percentage with visual indicators
  • Indexed product count vs total products
  • Elasticsearch cluster health status
  • Processing speed (products/hour)
  • ETA calculations (adaptive and scheduled)
  • Batch processing statistics
  • Error rates and patterns
  • Real-time log viewing with auto-refresh
  • Multi-log support (auto-sync, regular sync)
  • Log download functionality
  • Error highlighting and filtering
  • Emergency reset functionality
  • Manual sync triggering
  • Connection testing
  • State file inspection

File: inc/elasticsearch-console-manager.php

class ElasticsearchConsoleManager {
// Configuration
private $state_file = 'elasticsearch-sync-state.json';
private $log_files = [
'auto_sync' => 'elasticsearch-auto-sync.log',
'regular_sync' => 'elasticsearch-sync.log'
];
// Key methods:
public function getStats() // Real-time statistics
public function getPerformanceEstimates() // ETA calculations
public function getLogContent() // Log file access
public function testConnection() // Health checks
public function triggerManualSync() // Background sync trigger
public function getSyncState() // State file access
public function resetSync() // Emergency reset
}
public function getStats() {
$state = $this->getSyncState();
if (!$state) {
return [
'error' => 'No sync state found',
'phase' => 'unknown',
'progress_percentage' => 0
];
}
$elasticsearch = new ProductElasticsearch();
$es_count = $elasticsearch->getDocumentCount();
return [
'phase' => $state['phase'],
'total_products' => $state['total_products'],
'indexed_products' => $state['processed_products'],
'elasticsearch_count' => $es_count,
'progress_percentage' => $this->calculateProgress($state),
'started_at' => $state['started_at'],
'last_run' => $state['last_run'],
'errors' => count($state['errors']),
'failed_batches' => count($state['failed_batches']),
'consecutive_failures' => $state['consecutive_failures'] ?? 0
];
}
private function calculateProgress($state) {
if ($state['total_products'] <= 0) {
return 0;
}
return round(($state['processed_products'] / $state['total_products']) * 100, 1);
}

The system provides intelligent ETA calculations using two methods:

ETA = (Remaining Batches × Cron Interval) ÷ 60
Actual Rate = Indexed Products ÷ Elapsed Hours
ETA = Remaining Products ÷ Actual Rate
public function getPerformanceEstimates() {
$state = $this->getSyncState();
if (!$state || $state['phase'] === 'incremental_syncing') {
return ['error' => 'No bulk sync in progress'];
}
$remaining_products = $state['total_products'] - $state['processed_products'];
$remaining_batches = ceil($remaining_products / 500); // Default batch size
// Method 1: Scheduled (based on cron frequency)
$scheduled_eta = ($remaining_batches * $this->cron_interval) / 60; // hours
// Method 2: Adaptive (based on actual performance)
$adaptive_eta = $this->calculateAdaptiveETA($state);
return [
'remaining_products' => $remaining_products,
'remaining_batches' => $remaining_batches,
'scheduled_eta_hours' => round($scheduled_eta, 1),
'adaptive_eta_hours' => round($adaptive_eta, 1),
'calculation_method' => $adaptive_eta > 0 ? 'adaptive' : 'scheduled',
'products_per_hour' => $this->calculateProductsPerHour($state),
'confidence_level' => $this->calculateConfidenceLevel($state)
];
}
private function calculateAdaptiveETA($state) {
if (!isset($state['started_at']) || !isset($state['processed_products'])) {
return 0;
}
$start_time = strtotime($state['started_at']);
$elapsed_hours = (time() - $start_time) / 3600;
if ($elapsed_hours < 0.1) { // Less than 6 minutes
return 0;
}
$products_per_hour = $state['processed_products'] / $elapsed_hours;
$remaining_products = $state['total_products'] - $state['processed_products'];
return $remaining_products / $products_per_hour;
}
private function calculateProductsPerHour($state) {
if (!isset($state['started_at']) || !isset($state['processed_products'])) {
return 0;
}
$start_time = strtotime($state['started_at']);
$elapsed_hours = (time() - $start_time) / 3600;
if ($elapsed_hours < 0.1) {
return 0;
}
return round($state['processed_products'] / $elapsed_hours);
}
private function calculateConfidenceLevel($state) {
if (!isset($state['started_at'])) {
return 'low';
}
$start_time = strtotime($state['started_at']);
$elapsed_hours = (time() - $start_time) / 3600;
if ($elapsed_hours < 1) {
return 'low';
} elseif ($elapsed_hours < 6) {
return 'medium';
} else {
return 'high';
}
}

Sync state is tracked in JSON files for persistence and resumability:

{
"phase": "bulk_syncing",
"index_created": true,
"total_products": 60560,
"processed_products": 15000,
"last_processed_id": 75981,
"started_at": "2025-09-28T15:14:03+00:00",
"last_run": "2025-09-28T16:22:53+00:00",
"errors": [
{
"timestamp": "2025-09-28T16:20:15+00:00",
"type": "elasticsearch",
"message": "Connection timeout"
}
],
"consecutive_failures": 0,
"failed_batches": {
"batch_123": {
"attempts": 2,
"first_failure": 1695924015,
"last_failure": 1695924075,
"products": [12345, 12346, 12347]
}
}
}
private function loadState() {
if (!file_exists($this->state_file)) {
return null;
}
$content = file_get_contents($this->state_file);
$state = json_decode($content, true);
if (json_last_error() !== JSON_ERROR_NONE) {
$this->log("Invalid state file, resetting", 'WARNING');
return null;
}
return $state;
}
private function saveState() {
$this->state['last_run'] = date('c');
$this->state['last_updated'] = time();
$content = json_encode($this->state, JSON_PRETTY_PRINT);
if (file_put_contents($this->state_file, $content) === false) {
$this->log("Failed to save state file", 'ERROR');
}
}
public function resetSync() {
// Reset state file
if (file_exists($this->state_file)) {
unlink($this->state_file);
}
// Reset database flags
$this->resetDatabaseFlags();
// Delete Elasticsearch index
$elasticsearch = new ProductElasticsearch();
$elasticsearch->deleteIndex();
return [
'status' => 'success',
'message' => 'Sync process reset successfully'
];
}
public function getLogContent($log_type = 'auto_sync', $lines = 100) {
$log_file = $this->log_files[$log_type] ?? $this->log_files['auto_sync'];
$log_path = dirname(__FILE__) . "/../scripts/$log_file";
if (!file_exists($log_path)) {
return ['error' => 'Log file not found'];
}
$content = $this->tailFile($log_path, $lines);
return [
'content' => $content,
'file' => $log_file,
'size' => filesize($log_path),
'modified' => date('Y-m-d H:i:s', filemtime($log_path)),
'lines' => $lines
];
}
private function tailFile($file, $lines) {
$handle = fopen($file, 'r');
$line_counter = 0;
$content = '';
// Count total lines
while (fgets($handle) !== false) {
$line_counter++;
}
// Reset to beginning
rewind($handle);
// Skip to the last N lines
$skip_lines = max(0, $line_counter - $lines);
for ($i = 0; $i < $skip_lines; $i++) {
fgets($handle);
}
// Read remaining lines
while (($line = fgets($handle)) !== false) {
$content .= $line;
}
fclose($handle);
return $content;
}
public function analyzeLogs($log_type = 'auto_sync', $hours = 24) {
$log_content = $this->getLogContent($log_type, 1000);
if (isset($log_content['error'])) {
return $log_content;
}
$lines = explode("\n", $log_content['content']);
$analysis = [
'total_lines' => count($lines),
'error_count' => 0,
'warning_count' => 0,
'info_count' => 0,
'recent_errors' => [],
'performance_metrics' => []
];
foreach ($lines as $line) {
if (strpos($line, 'ERROR') !== false) {
$analysis['error_count']++;
$analysis['recent_errors'][] = $line;
} elseif (strpos($line, 'WARNING') !== false) {
$analysis['warning_count']++;
} elseif (strpos($line, 'INFO') !== false) {
$analysis['info_count']++;
}
// Extract performance metrics
if (preg_match('/Processed (\d+) products in ([\d.]+) seconds/', $line, $matches)) {
$analysis['performance_metrics'][] = [
'products' => intval($matches[1]),
'seconds' => floatval($matches[2]),
'products_per_second' => intval($matches[1]) / floatval($matches[2])
];
}
}
return $analysis;
}
public function testConnection() {
try {
$elasticsearch = new ProductElasticsearch();
$is_available = $elasticsearch->isAvailable();
if ($is_available) {
$document_count = $elasticsearch->getDocumentCount();
$index_stats = $elasticsearch->getIndexStats();
return [
'status' => 'success',
'message' => 'Elasticsearch connection successful',
'document_count' => $document_count,
'index_size' => $index_stats['size'] ?? 'unknown',
'health' => $index_stats['health'] ?? 'unknown'
];
} else {
return [
'status' => 'error',
'message' => 'Elasticsearch connection failed'
];
}
} catch (Exception $e) {
return [
'status' => 'error',
'message' => 'Connection test failed: ' . $e->getMessage()
];
}
}
public function testDatabaseConnection() {
try {
$connection = $this->getDatabaseConnection();
$result = pg_query($connection, "SELECT 1");
if ($result) {
pg_free_result($result);
// Get sync statistics
$stats = $this->getSyncStatistics();
return [
'status' => 'success',
'message' => 'Database connection successful',
'sync_stats' => $stats
];
} else {
return [
'status' => 'error',
'message' => 'Database query failed'
];
}
} catch (Exception $e) {
return [
'status' => 'error',
'message' => $e->getMessage()
];
}
}
public function getPerformanceMetrics() {
$state = $this->getSyncState();
if (!$state) {
return ['error' => 'No sync state available'];
}
$metrics = [
'current_phase' => $state['phase'],
'total_products' => $state['total_products'],
'processed_products' => $state['processed_products'],
'progress_percentage' => $this->calculateProgress($state),
'started_at' => $state['started_at'],
'last_run' => $state['last_run']
];
// Calculate performance metrics
if (isset($state['started_at']) && $state['processed_products'] > 0) {
$start_time = strtotime($state['started_at']);
$elapsed_hours = (time() - $start_time) / 3600;
$metrics['elapsed_hours'] = round($elapsed_hours, 2);
$metrics['products_per_hour'] = round($state['processed_products'] / $elapsed_hours);
$metrics['estimated_completion'] = $this->calculateAdaptiveETA($state);
}
// Error metrics
$metrics['error_count'] = count($state['errors']);
$metrics['failed_batches'] = count($state['failed_batches']);
$metrics['consecutive_failures'] = $state['consecutive_failures'] ?? 0;
return $metrics;
}
public function checkAlerts() {
$alerts = [];
$state = $this->getSyncState();
if (!$state) {
return $alerts;
}
// Check for stuck sync
if ($state['phase'] === 'bulk_syncing') {
$last_run = strtotime($state['last_run']);
$hours_since_last_run = (time() - $last_run) / 3600;
if ($hours_since_last_run > 2) {
$alerts[] = [
'type' => 'warning',
'message' => "Sync hasn't run in {$hours_since_last_run} hours",
'timestamp' => date('c')
];
}
}
// Check for high error rate
$error_count = count($state['errors']);
if ($error_count > 10) {
$alerts[] = [
'type' => 'error',
'message' => "High error count: {$error_count} errors",
'timestamp' => date('c')
];
}
// Check for consecutive failures
if (($state['consecutive_failures'] ?? 0) > 3) {
$alerts[] = [
'type' => 'critical',
'message' => "Multiple consecutive failures: {$state['consecutive_failures']}",
'timestamp' => date('c')
];
}
return $alerts;
}
// Get console statistics
add_action('wp_ajax_get_elasticsearch_stats', function() {
$console_manager = new ElasticsearchConsoleManager();
$stats = $console_manager->getStats();
$performance = $console_manager->getPerformanceEstimates();
$alerts = $console_manager->checkAlerts();
wp_send_json_success([
'stats' => $stats,
'performance' => $performance,
'alerts' => $alerts
]);
});
// Get log content
add_action('wp_ajax_get_elasticsearch_log', function() {
$console_manager = new ElasticsearchConsoleManager();
$log_type = sanitize_text_field($_POST['log_type']);
$lines = intval($_POST['lines']);
$log_content = $console_manager->getLogContent($log_type, $lines);
wp_send_json_success($log_content);
});
// Test connections
add_action('wp_ajax_test_elasticsearch_connection', function() {
$console_manager = new ElasticsearchConsoleManager();
$result = $console_manager->testConnection();
wp_send_json_success($result);
});
class ElasticsearchConsole {
constructor() {
this.refreshInterval = 30000; // 30 seconds
this.init();
}
init() {
this.loadStats();
this.bindEvents();
this.startAutoRefresh();
}
bindEvents() {
document.getElementById('refresh-stats').addEventListener('click', () => {
this.loadStats();
});
document.getElementById('test-connection').addEventListener('click', () => {
this.testConnection();
});
document.getElementById('reset-sync').addEventListener('click', () => {
this.resetSync();
});
}
loadStats() {
jQuery.post(ajaxurl, {
action: 'get_elasticsearch_stats',
nonce: ajax_nonce
}, (response) => {
if (response.success) {
this.updateDisplay(response.data);
}
});
}
updateDisplay(data) {
// Update statistics
document.getElementById('sync-status').textContent = data.stats.phase;
document.getElementById('sync-progress').textContent = data.stats.progress_percentage + '%';
document.getElementById('indexed-count').textContent = data.stats.indexed_products;
document.getElementById('sync-eta').textContent = data.performance.adaptive_eta_hours + ' hours';
// Update progress bar
this.updateProgressBar(data.stats.progress_percentage);
// Display alerts
this.displayAlerts(data.alerts);
}
updateProgressBar(percentage) {
const progressBar = document.getElementById('progress-bar');
progressBar.style.width = percentage + '%';
progressBar.setAttribute('aria-valuenow', percentage);
}
displayAlerts(alerts) {
const alertsContainer = document.getElementById('alerts-container');
if (alerts.length === 0) {
alertsContainer.innerHTML = '<div class="alert alert-success">All systems operational</div>';
return;
}
alertsContainer.innerHTML = alerts.map(alert => `
<div class="alert alert-${alert.type}">
<strong>${alert.type.toUpperCase()}:</strong> ${alert.message}
<small class="text-muted">${new Date(alert.timestamp).toLocaleString()}</small>
</div>
`).join('');
}
startAutoRefresh() {
setInterval(() => {
this.loadStats();
}, this.refreshInterval);
}
}
// Initialize console
document.addEventListener('DOMContentLoaded', () => {
new ElasticsearchConsole();
});