Monitoring & Management
Monitoring & Management Overview
Section titled “Monitoring & Management Overview”The Elasticsearch integration includes a comprehensive monitoring and management system with real-time console, progress tracking, performance metrics, and automated alerting.
Real-Time Console
Section titled “Real-Time Console”Console Features
Section titled “Console Features”The monitoring console provides comprehensive oversight of the Elasticsearch synchronization process:
1. Status Overview
Section titled “1. Status Overview”- Current sync phase (Initialize, Bulk, Incremental)
- Progress percentage with visual indicators
- Indexed product count vs total products
- Elasticsearch cluster health status
2. Performance Metrics
Section titled “2. Performance Metrics”- Processing speed (products/hour)
- ETA calculations (adaptive and scheduled)
- Batch processing statistics
- Error rates and patterns
3. Log Management
Section titled “3. Log Management”- Real-time log viewing with auto-refresh
- Multi-log support (auto-sync, regular sync)
- Log download functionality
- Error highlighting and filtering
4. Manual Controls
Section titled “4. Manual Controls”- Emergency reset functionality
- Manual sync triggering
- Connection testing
- State file inspection
Console Implementation
Section titled “Console Implementation”Console Manager Class
Section titled “Console Manager Class”File: inc/elasticsearch-console-manager.php
class ElasticsearchConsoleManager { // Configuration private $state_file = 'elasticsearch-sync-state.json'; private $log_files = [ 'auto_sync' => 'elasticsearch-auto-sync.log', 'regular_sync' => 'elasticsearch-sync.log' ];
// Key methods: public function getStats() // Real-time statistics public function getPerformanceEstimates() // ETA calculations public function getLogContent() // Log file access public function testConnection() // Health checks public function triggerManualSync() // Background sync trigger public function getSyncState() // State file access public function resetSync() // Emergency reset}Real-time Statistics
Section titled “Real-time Statistics”public function getStats() { $state = $this->getSyncState();
if (!$state) { return [ 'error' => 'No sync state found', 'phase' => 'unknown', 'progress_percentage' => 0 ]; }
$elasticsearch = new ProductElasticsearch(); $es_count = $elasticsearch->getDocumentCount();
return [ 'phase' => $state['phase'], 'total_products' => $state['total_products'], 'indexed_products' => $state['processed_products'], 'elasticsearch_count' => $es_count, 'progress_percentage' => $this->calculateProgress($state), 'started_at' => $state['started_at'], 'last_run' => $state['last_run'], 'errors' => count($state['errors']), 'failed_batches' => count($state['failed_batches']), 'consecutive_failures' => $state['consecutive_failures'] ?? 0 ];}
private function calculateProgress($state) { if ($state['total_products'] <= 0) { return 0; }
return round(($state['processed_products'] / $state['total_products']) * 100, 1);}ETA Calculation System
Section titled “ETA Calculation System”Dual ETA Methods
Section titled “Dual ETA Methods”The system provides intelligent ETA calculations using two methods:
1. Scheduled Method
Section titled “1. Scheduled Method”ETA = (Remaining Batches × Cron Interval) ÷ 602. Adaptive Method (Preferred)
Section titled “2. Adaptive Method (Preferred)”Actual Rate = Indexed Products ÷ Elapsed HoursETA = Remaining Products ÷ Actual RateETA Implementation
Section titled “ETA Implementation”public function getPerformanceEstimates() { $state = $this->getSyncState();
if (!$state || $state['phase'] === 'incremental_syncing') { return ['error' => 'No bulk sync in progress']; }
$remaining_products = $state['total_products'] - $state['processed_products']; $remaining_batches = ceil($remaining_products / 500); // Default batch size
// Method 1: Scheduled (based on cron frequency) $scheduled_eta = ($remaining_batches * $this->cron_interval) / 60; // hours
// Method 2: Adaptive (based on actual performance) $adaptive_eta = $this->calculateAdaptiveETA($state);
return [ 'remaining_products' => $remaining_products, 'remaining_batches' => $remaining_batches, 'scheduled_eta_hours' => round($scheduled_eta, 1), 'adaptive_eta_hours' => round($adaptive_eta, 1), 'calculation_method' => $adaptive_eta > 0 ? 'adaptive' : 'scheduled', 'products_per_hour' => $this->calculateProductsPerHour($state), 'confidence_level' => $this->calculateConfidenceLevel($state) ];}
private function calculateAdaptiveETA($state) { if (!isset($state['started_at']) || !isset($state['processed_products'])) { return 0; }
$start_time = strtotime($state['started_at']); $elapsed_hours = (time() - $start_time) / 3600;
if ($elapsed_hours < 0.1) { // Less than 6 minutes return 0; }
$products_per_hour = $state['processed_products'] / $elapsed_hours; $remaining_products = $state['total_products'] - $state['processed_products'];
return $remaining_products / $products_per_hour;}
private function calculateProductsPerHour($state) { if (!isset($state['started_at']) || !isset($state['processed_products'])) { return 0; }
$start_time = strtotime($state['started_at']); $elapsed_hours = (time() - $start_time) / 3600;
if ($elapsed_hours < 0.1) { return 0; }
return round($state['processed_products'] / $elapsed_hours);}
private function calculateConfidenceLevel($state) { if (!isset($state['started_at'])) { return 'low'; }
$start_time = strtotime($state['started_at']); $elapsed_hours = (time() - $start_time) / 3600;
if ($elapsed_hours < 1) { return 'low'; } elseif ($elapsed_hours < 6) { return 'medium'; } else { return 'high'; }}State Management
Section titled “State Management”State File Structure
Section titled “State File Structure”Sync state is tracked in JSON files for persistence and resumability:
{ "phase": "bulk_syncing", "index_created": true, "total_products": 60560, "processed_products": 15000, "last_processed_id": 75981, "started_at": "2025-09-28T15:14:03+00:00", "last_run": "2025-09-28T16:22:53+00:00", "errors": [ { "timestamp": "2025-09-28T16:20:15+00:00", "type": "elasticsearch", "message": "Connection timeout" } ], "consecutive_failures": 0, "failed_batches": { "batch_123": { "attempts": 2, "first_failure": 1695924015, "last_failure": 1695924075, "products": [12345, 12346, 12347] } }}State Operations
Section titled “State Operations”private function loadState() { if (!file_exists($this->state_file)) { return null; }
$content = file_get_contents($this->state_file); $state = json_decode($content, true);
if (json_last_error() !== JSON_ERROR_NONE) { $this->log("Invalid state file, resetting", 'WARNING'); return null; }
return $state;}
private function saveState() { $this->state['last_run'] = date('c'); $this->state['last_updated'] = time();
$content = json_encode($this->state, JSON_PRETTY_PRINT);
if (file_put_contents($this->state_file, $content) === false) { $this->log("Failed to save state file", 'ERROR'); }}
public function resetSync() { // Reset state file if (file_exists($this->state_file)) { unlink($this->state_file); }
// Reset database flags $this->resetDatabaseFlags();
// Delete Elasticsearch index $elasticsearch = new ProductElasticsearch(); $elasticsearch->deleteIndex();
return [ 'status' => 'success', 'message' => 'Sync process reset successfully' ];}Log Management
Section titled “Log Management”Log File Access
Section titled “Log File Access”public function getLogContent($log_type = 'auto_sync', $lines = 100) { $log_file = $this->log_files[$log_type] ?? $this->log_files['auto_sync']; $log_path = dirname(__FILE__) . "/../scripts/$log_file";
if (!file_exists($log_path)) { return ['error' => 'Log file not found']; }
$content = $this->tailFile($log_path, $lines);
return [ 'content' => $content, 'file' => $log_file, 'size' => filesize($log_path), 'modified' => date('Y-m-d H:i:s', filemtime($log_path)), 'lines' => $lines ];}
private function tailFile($file, $lines) { $handle = fopen($file, 'r'); $line_counter = 0; $content = '';
// Count total lines while (fgets($handle) !== false) { $line_counter++; }
// Reset to beginning rewind($handle);
// Skip to the last N lines $skip_lines = max(0, $line_counter - $lines); for ($i = 0; $i < $skip_lines; $i++) { fgets($handle); }
// Read remaining lines while (($line = fgets($handle)) !== false) { $content .= $line; }
fclose($handle); return $content;}Log Analysis
Section titled “Log Analysis”public function analyzeLogs($log_type = 'auto_sync', $hours = 24) { $log_content = $this->getLogContent($log_type, 1000);
if (isset($log_content['error'])) { return $log_content; }
$lines = explode("\n", $log_content['content']); $analysis = [ 'total_lines' => count($lines), 'error_count' => 0, 'warning_count' => 0, 'info_count' => 0, 'recent_errors' => [], 'performance_metrics' => [] ];
foreach ($lines as $line) { if (strpos($line, 'ERROR') !== false) { $analysis['error_count']++; $analysis['recent_errors'][] = $line; } elseif (strpos($line, 'WARNING') !== false) { $analysis['warning_count']++; } elseif (strpos($line, 'INFO') !== false) { $analysis['info_count']++; }
// Extract performance metrics if (preg_match('/Processed (\d+) products in ([\d.]+) seconds/', $line, $matches)) { $analysis['performance_metrics'][] = [ 'products' => intval($matches[1]), 'seconds' => floatval($matches[2]), 'products_per_second' => intval($matches[1]) / floatval($matches[2]) ]; } }
return $analysis;}Health Monitoring
Section titled “Health Monitoring”Connection Health Check
Section titled “Connection Health Check”public function testConnection() { try { $elasticsearch = new ProductElasticsearch(); $is_available = $elasticsearch->isAvailable();
if ($is_available) { $document_count = $elasticsearch->getDocumentCount(); $index_stats = $elasticsearch->getIndexStats();
return [ 'status' => 'success', 'message' => 'Elasticsearch connection successful', 'document_count' => $document_count, 'index_size' => $index_stats['size'] ?? 'unknown', 'health' => $index_stats['health'] ?? 'unknown' ]; } else { return [ 'status' => 'error', 'message' => 'Elasticsearch connection failed' ]; } } catch (Exception $e) { return [ 'status' => 'error', 'message' => 'Connection test failed: ' . $e->getMessage() ]; }}Database Health Check
Section titled “Database Health Check”public function testDatabaseConnection() { try { $connection = $this->getDatabaseConnection(); $result = pg_query($connection, "SELECT 1");
if ($result) { pg_free_result($result);
// Get sync statistics $stats = $this->getSyncStatistics();
return [ 'status' => 'success', 'message' => 'Database connection successful', 'sync_stats' => $stats ]; } else { return [ 'status' => 'error', 'message' => 'Database query failed' ]; } } catch (Exception $e) { return [ 'status' => 'error', 'message' => $e->getMessage() ]; }}Performance Monitoring
Section titled “Performance Monitoring”Real-time Metrics
Section titled “Real-time Metrics”public function getPerformanceMetrics() { $state = $this->getSyncState();
if (!$state) { return ['error' => 'No sync state available']; }
$metrics = [ 'current_phase' => $state['phase'], 'total_products' => $state['total_products'], 'processed_products' => $state['processed_products'], 'progress_percentage' => $this->calculateProgress($state), 'started_at' => $state['started_at'], 'last_run' => $state['last_run'] ];
// Calculate performance metrics if (isset($state['started_at']) && $state['processed_products'] > 0) { $start_time = strtotime($state['started_at']); $elapsed_hours = (time() - $start_time) / 3600;
$metrics['elapsed_hours'] = round($elapsed_hours, 2); $metrics['products_per_hour'] = round($state['processed_products'] / $elapsed_hours); $metrics['estimated_completion'] = $this->calculateAdaptiveETA($state); }
// Error metrics $metrics['error_count'] = count($state['errors']); $metrics['failed_batches'] = count($state['failed_batches']); $metrics['consecutive_failures'] = $state['consecutive_failures'] ?? 0;
return $metrics;}Alert System
Section titled “Alert System”public function checkAlerts() { $alerts = []; $state = $this->getSyncState();
if (!$state) { return $alerts; }
// Check for stuck sync if ($state['phase'] === 'bulk_syncing') { $last_run = strtotime($state['last_run']); $hours_since_last_run = (time() - $last_run) / 3600;
if ($hours_since_last_run > 2) { $alerts[] = [ 'type' => 'warning', 'message' => "Sync hasn't run in {$hours_since_last_run} hours", 'timestamp' => date('c') ]; } }
// Check for high error rate $error_count = count($state['errors']); if ($error_count > 10) { $alerts[] = [ 'type' => 'error', 'message' => "High error count: {$error_count} errors", 'timestamp' => date('c') ]; }
// Check for consecutive failures if (($state['consecutive_failures'] ?? 0) > 3) { $alerts[] = [ 'type' => 'critical', 'message' => "Multiple consecutive failures: {$state['consecutive_failures']}", 'timestamp' => date('c') ]; }
return $alerts;}Console Interface
Section titled “Console Interface”AJAX Endpoints
Section titled “AJAX Endpoints”// Get console statisticsadd_action('wp_ajax_get_elasticsearch_stats', function() { $console_manager = new ElasticsearchConsoleManager(); $stats = $console_manager->getStats(); $performance = $console_manager->getPerformanceEstimates(); $alerts = $console_manager->checkAlerts();
wp_send_json_success([ 'stats' => $stats, 'performance' => $performance, 'alerts' => $alerts ]);});
// Get log contentadd_action('wp_ajax_get_elasticsearch_log', function() { $console_manager = new ElasticsearchConsoleManager(); $log_type = sanitize_text_field($_POST['log_type']); $lines = intval($_POST['lines']); $log_content = $console_manager->getLogContent($log_type, $lines);
wp_send_json_success($log_content);});
// Test connectionsadd_action('wp_ajax_test_elasticsearch_connection', function() { $console_manager = new ElasticsearchConsoleManager(); $result = $console_manager->testConnection();
wp_send_json_success($result);});Frontend Console
Section titled “Frontend Console”class ElasticsearchConsole { constructor() { this.refreshInterval = 30000; // 30 seconds this.init(); }
init() { this.loadStats(); this.bindEvents(); this.startAutoRefresh(); }
bindEvents() { document.getElementById('refresh-stats').addEventListener('click', () => { this.loadStats(); });
document.getElementById('test-connection').addEventListener('click', () => { this.testConnection(); });
document.getElementById('reset-sync').addEventListener('click', () => { this.resetSync(); }); }
loadStats() { jQuery.post(ajaxurl, { action: 'get_elasticsearch_stats', nonce: ajax_nonce }, (response) => { if (response.success) { this.updateDisplay(response.data); } }); }
updateDisplay(data) { // Update statistics document.getElementById('sync-status').textContent = data.stats.phase; document.getElementById('sync-progress').textContent = data.stats.progress_percentage + '%'; document.getElementById('indexed-count').textContent = data.stats.indexed_products; document.getElementById('sync-eta').textContent = data.performance.adaptive_eta_hours + ' hours';
// Update progress bar this.updateProgressBar(data.stats.progress_percentage);
// Display alerts this.displayAlerts(data.alerts); }
updateProgressBar(percentage) { const progressBar = document.getElementById('progress-bar'); progressBar.style.width = percentage + '%'; progressBar.setAttribute('aria-valuenow', percentage); }
displayAlerts(alerts) { const alertsContainer = document.getElementById('alerts-container');
if (alerts.length === 0) { alertsContainer.innerHTML = '<div class="alert alert-success">All systems operational</div>'; return; }
alertsContainer.innerHTML = alerts.map(alert => ` <div class="alert alert-${alert.type}"> <strong>${alert.type.toUpperCase()}:</strong> ${alert.message} <small class="text-muted">${new Date(alert.timestamp).toLocaleString()}</small> </div> `).join(''); }
startAutoRefresh() { setInterval(() => { this.loadStats(); }, this.refreshInterval); }}
// Initialize consoledocument.addEventListener('DOMContentLoaded', () => { new ElasticsearchConsole();});Next Steps
Section titled “Next Steps”- API Endpoints - API reference
- Configuration - System configuration
- Deployment - Installation and setup
- Troubleshooting - Common issues and solutions