Database Integration
Database Integration Overview
Section titled “Database Integration Overview”The Elasticsearch integration requires a boolean column in the product table for tracking synchronization status. This column serves as the single source of truth for determining which products need to be indexed.
Schema Requirements
Section titled “Schema Requirements”Required Database Changes
Section titled “Required Database Changes”-- Add sync tracking columnALTER TABLE product ADD COLUMN added_search BOOLEAN DEFAULT FALSE;
-- Create index for performanceCREATE INDEX idx_product_added_search ON product(added_search);
-- Optional: Add index on composite columns for better query performanceCREATE INDEX idx_product_composite ON product(added_search, id) WHERE title IS NOT NULL;Database Tables
Section titled “Database Tables”| Table | Purpose | Key Columns |
|---|---|---|
product | Main product data | id, title, brand, price, added_search |
core_category | Hierarchical categories | id, name, parent_id |
Data Flow Architecture
Section titled “Data Flow Architecture”PostgreSQL product table ↓ [added_search = false] ↓ Elasticsearch indexing ↓ [added_search = true] ↓ Progress tracking updatedThe added_search boolean column serves as the single source of truth for sync progress:
FALSEorNULL: Product needs indexingTRUE: Product successfully indexed in Elasticsearch
Database Connection
Section titled “Database Connection”Connection Configuration
Section titled “Connection Configuration”File: scripts/elasticsearch-auto-sync.php
class ElasticsearchAutoSyncManager { private function getDatabaseConnection() { $conn_string = "host=162.55.174.116 dbname=categories_db user=website password=*** port=5432";
$connection = pg_connect($conn_string);
if (!$connection) { throw new Exception("Failed to connect to PostgreSQL database"); }
return $connection; }}Connection Management
Section titled “Connection Management”private function ensureDatabaseConnection() { if ($this->connection === null) { $this->connection = $this->getDatabaseConnection(); }
// Test connection $result = pg_query($this->connection, "SELECT 1"); if (!$result) { $this->connection = $this->getDatabaseConnection(); }
return $this->connection;}Data Retrieval Functions
Section titled “Data Retrieval Functions”Get Products for Sync
Section titled “Get Products for Sync”private function getProductsBatch($last_id, $batch_size) { $connection = $this->ensureDatabaseConnection();
$query = " SELECT p.id, p.title, p.brand, p.price, p.list_price, p.availability, p.description, p.specifications, p.image_urls, p.category_id, p.site_id, p.product_url, p.sku, p.ean, p.created_at, p.updated_at, c.name as category_name, c.path as category_path FROM product p LEFT JOIN core_category c ON p.category_id = c.id WHERE p.id > $1 AND p.title IS NOT NULL AND p.added_search = FALSE ORDER BY p.id ASC LIMIT $2 ";
$result = pg_query_params($connection, $query, [$last_id, $batch_size]);
if (!$result) { throw new Exception("Database query failed: " . pg_last_error($connection)); }
return pg_fetch_all($result);}Get Products Needing Incremental Sync
Section titled “Get Products Needing Incremental Sync”private function getProductsNeedingSync($limit) { $connection = $this->ensureDatabaseConnection();
$query = " SELECT p.id, p.title, p.brand, p.price, p.list_price, p.availability, p.description, p.specifications, p.image_urls, p.category_id, p.site_id, p.product_url, p.sku, p.ean, p.created_at, p.updated_at, c.name as category_name, c.path as category_path FROM product p LEFT JOIN core_category c ON p.category_id = c.id WHERE p.added_search = FALSE AND p.title IS NOT NULL ORDER BY p.updated_at DESC LIMIT $1 ";
$result = pg_query_params($connection, $query, [$limit]);
if (!$result) { throw new Exception("Database query failed: " . pg_last_error($connection)); }
return pg_fetch_all($result);}Get Total Product Count
Section titled “Get Total Product Count”private function getTotalProductCount() { $connection = $this->ensureDatabaseConnection();
$query = "SELECT COUNT(*) FROM product WHERE title IS NOT NULL"; $result = pg_query($connection, $query);
if (!$result) { throw new Exception("Database query failed: " . pg_last_error($connection)); }
$row = pg_fetch_row($result); return intval($row[0]);}Data Update Functions
Section titled “Data Update Functions”Mark Products as Indexed
Section titled “Mark Products as Indexed”private function markProductsAsIndexed($products) { if (empty($products)) { return true; }
$connection = $this->ensureDatabaseConnection();
$product_ids = array_column($products, 'id'); $placeholders = implode(',', array_fill(0, count($product_ids), '$' . ($i + 1)));
$query = "UPDATE product SET added_search = TRUE WHERE id IN ($placeholders)";
$result = pg_query_params($connection, $query, $product_ids);
if (!$result) { throw new Exception("Failed to update products: " . pg_last_error($connection)); }
$this->log("Marked " . count($products) . " products as indexed"); return true;}Reset Sync Flags
Section titled “Reset Sync Flags”private function resetSyncFlags() { $connection = $this->ensureDatabaseConnection();
$query = "UPDATE product SET added_search = FALSE"; $result = pg_query($connection, $query);
if (!$result) { throw new Exception("Failed to reset sync flags: " . pg_last_error($connection)); }
$this->log("Reset sync flags for all products"); return true;}Update Last Processed ID
Section titled “Update Last Processed ID”private function updateLastProcessedId($products) { if (empty($products)) { return; }
$last_product = end($products); $this->state['last_processed_id'] = $last_product['id'];}Data Transformation
Section titled “Data Transformation”Transform Product for Elasticsearch
Section titled “Transform Product for Elasticsearch”private function transformProductForElasticsearch($product) { return [ 'id' => (int)$product['id'], 'title' => $this->cleanText($product['title']), 'brand' => $this->cleanText($product['brand']), 'description' => $this->cleanText($product['description']), 'price' => $this->parseFloat($product['price']), 'list_price' => $this->parseFloat($product['list_price']), 'discount_percentage' => $this->calculateDiscount($product), 'availability' => $this->normalizeAvailability($product['availability']), 'category' => $product['category_name'], 'category_path' => $product['category_path'], 'sku' => $product['sku'], 'ean' => $product['ean'], 'image_url' => $this->getPrimaryImage($product['image_urls']), 'product_url' => $product['product_url'], 'specifications' => $this->parseSpecifications($product['specifications']), 'created_at' => $product['created_at'], 'updated_at' => $product['updated_at'] ];}Data Cleaning Functions
Section titled “Data Cleaning Functions”private function cleanText($text) { if (empty($text)) { return ''; }
// Remove HTML tags $text = strip_tags($text);
// Normalize whitespace $text = preg_replace('/\s+/', ' ', $text);
// Trim $text = trim($text);
return $text;}
private function parseFloat($value) { if (empty($value)) { return 0.0; }
// Remove non-numeric characters except decimal point $value = preg_replace('/[^0-9.]/', '', $value);
return floatval($value);}
private function normalizeAvailability($availability) { if (empty($availability)) { return 'Unknown'; }
$availability = strtolower(trim($availability));
// Map common availability terms $mapping = [ 'in stock' => 'In Stock', 'out of stock' => 'Out of Stock', 'pre-order' => 'Pre-order', 'discontinued' => 'Discontinued' ];
return $mapping[$availability] ?? ucwords($availability);}
private function calculateDiscount($product) { $price = $this->parseFloat($product['price']); $list_price = $this->parseFloat($product['list_price']);
if ($list_price <= 0 || $price >= $list_price) { return 0.0; }
return round((($list_price - $price) / $list_price) * 100, 2);}
private function getPrimaryImage($image_urls) { if (empty($image_urls)) { return ''; }
$images = json_decode($image_urls, true);
if (is_array($images) && !empty($images)) { return $images[0]; }
return '';}
private function parseSpecifications($specifications) { if (empty($specifications)) { return []; }
$specs = json_decode($specifications, true);
return is_array($specs) ? $specs : [];}Performance Optimization
Section titled “Performance Optimization”Query Optimization
Section titled “Query Optimization”-- Optimize product queries with composite indexCREATE INDEX idx_product_sync_composite ON product(added_search, id)WHERE title IS NOT NULL;
-- Optimize category joinsCREATE INDEX idx_category_parent ON core_category(parent_id);
-- Optimize updated_at queries for incremental syncCREATE INDEX idx_product_updated ON product(updated_at DESC)WHERE added_search = FALSE;Batch Processing Optimization
Section titled “Batch Processing Optimization”private function optimizeBatchSize() { // Adjust batch size based on available memory $memory_limit = ini_get('memory_limit'); $memory_bytes = $this->parseMemoryLimit($memory_limit);
if ($memory_bytes < 256 * 1024 * 1024) { // Less than 256MB return 100; } elseif ($memory_bytes < 512 * 1024 * 1024) { // Less than 512MB return 250; } else { return 500; // Default }}
private function parseMemoryLimit($limit) { $limit = trim($limit); $last = strtolower($limit[strlen($limit) - 1]); $limit = intval($limit);
switch ($last) { case 'g': $limit *= 1024; case 'm': $limit *= 1024; case 'k': $limit *= 1024; }
return $limit;}Error Handling
Section titled “Error Handling”Database Error Recovery
Section titled “Database Error Recovery”private function handleDatabaseError($error) { $this->log("Database error: $error", 'ERROR');
// Check if it's a connection error if (strpos($error, 'connection') !== false) { $this->log("Attempting to reconnect to database", 'INFO'); $this->connection = null; // Force reconnection $this->ensureDatabaseConnection(); }
// Log error to state $this->state['errors'][] = [ 'timestamp' => date('c'), 'type' => 'database', 'message' => $error ];
$this->saveState();}Connection Health Check
Section titled “Connection Health Check”public function testDatabaseConnection() { try { $connection = $this->getDatabaseConnection(); $result = pg_query($connection, "SELECT 1");
if ($result) { pg_free_result($result); return ['status' => 'success', 'message' => 'Database connection successful']; } else { return ['status' => 'error', 'message' => 'Database query failed']; } } catch (Exception $e) { return ['status' => 'error', 'message' => $e->getMessage()]; }}Monitoring and Statistics
Section titled “Monitoring and Statistics”Sync Progress Statistics
Section titled “Sync Progress Statistics”public function getSyncStatistics() { $connection = $this->ensureDatabaseConnection();
$query = " SELECT COUNT(*) as total_products, COUNT(CASE WHEN added_search = TRUE THEN 1 END) as indexed_products, COUNT(CASE WHEN added_search = FALSE THEN 1 END) as pending_products, ROUND( COUNT(CASE WHEN added_search = TRUE THEN 1 END)::numeric / COUNT(*) * 100, 1 ) as progress_percentage FROM product WHERE title IS NOT NULL ";
$result = pg_query($connection, $query); $row = pg_fetch_assoc($result);
return [ 'total_products' => intval($row['total_products']), 'indexed_products' => intval($row['indexed_products']), 'pending_products' => intval($row['pending_products']), 'progress_percentage' => floatval($row['progress_percentage']) ];}Category Statistics
Section titled “Category Statistics”public function getCategoryStatistics() { $connection = $this->ensureDatabaseConnection();
$query = " SELECT c.name as category_name, COUNT(p.id) as product_count, COUNT(CASE WHEN p.added_search = TRUE THEN 1 END) as indexed_count FROM core_category c LEFT JOIN product p ON c.id = p.category_id WHERE p.title IS NOT NULL GROUP BY c.id, c.name ORDER BY product_count DESC LIMIT 20 ";
$result = pg_query($connection, $query); return pg_fetch_all($result);}Next Steps
Section titled “Next Steps”- WordPress Integration - WordPress-specific features
- Search Functionality - Search implementation
- Monitoring & Management - Console and monitoring
- Configuration - System configuration