Skip to content

Database Integration

The Elasticsearch integration requires a boolean column in the product table for tracking synchronization status. This column serves as the single source of truth for determining which products need to be indexed.

-- Add sync tracking column
ALTER TABLE product ADD COLUMN added_search BOOLEAN DEFAULT FALSE;
-- Create index for performance
CREATE INDEX idx_product_added_search ON product(added_search);
-- Optional: Add index on composite columns for better query performance
CREATE INDEX idx_product_composite ON product(added_search, id) WHERE title IS NOT NULL;
TablePurposeKey Columns
productMain product dataid, title, brand, price, added_search
core_categoryHierarchical categoriesid, name, parent_id
PostgreSQL product table
[added_search = false]
Elasticsearch indexing
[added_search = true]
Progress tracking updated

The added_search boolean column serves as the single source of truth for sync progress:

  • FALSE or NULL: Product needs indexing
  • TRUE: Product successfully indexed in Elasticsearch

File: scripts/elasticsearch-auto-sync.php

class ElasticsearchAutoSyncManager {
private function getDatabaseConnection() {
$conn_string = "host=162.55.174.116 dbname=categories_db user=website password=*** port=5432";
$connection = pg_connect($conn_string);
if (!$connection) {
throw new Exception("Failed to connect to PostgreSQL database");
}
return $connection;
}
}
private function ensureDatabaseConnection() {
if ($this->connection === null) {
$this->connection = $this->getDatabaseConnection();
}
// Test connection
$result = pg_query($this->connection, "SELECT 1");
if (!$result) {
$this->connection = $this->getDatabaseConnection();
}
return $this->connection;
}
private function getProductsBatch($last_id, $batch_size) {
$connection = $this->ensureDatabaseConnection();
$query = "
SELECT p.id, p.title, p.brand, p.price, p.list_price,
p.availability, p.description, p.specifications,
p.image_urls, p.category_id, p.site_id, p.product_url,
p.sku, p.ean, p.created_at, p.updated_at,
c.name as category_name, c.path as category_path
FROM product p
LEFT JOIN core_category c ON p.category_id = c.id
WHERE p.id > $1
AND p.title IS NOT NULL
AND p.added_search = FALSE
ORDER BY p.id ASC
LIMIT $2
";
$result = pg_query_params($connection, $query, [$last_id, $batch_size]);
if (!$result) {
throw new Exception("Database query failed: " . pg_last_error($connection));
}
return pg_fetch_all($result);
}
private function getProductsNeedingSync($limit) {
$connection = $this->ensureDatabaseConnection();
$query = "
SELECT p.id, p.title, p.brand, p.price, p.list_price,
p.availability, p.description, p.specifications,
p.image_urls, p.category_id, p.site_id, p.product_url,
p.sku, p.ean, p.created_at, p.updated_at,
c.name as category_name, c.path as category_path
FROM product p
LEFT JOIN core_category c ON p.category_id = c.id
WHERE p.added_search = FALSE
AND p.title IS NOT NULL
ORDER BY p.updated_at DESC
LIMIT $1
";
$result = pg_query_params($connection, $query, [$limit]);
if (!$result) {
throw new Exception("Database query failed: " . pg_last_error($connection));
}
return pg_fetch_all($result);
}
private function getTotalProductCount() {
$connection = $this->ensureDatabaseConnection();
$query = "SELECT COUNT(*) FROM product WHERE title IS NOT NULL";
$result = pg_query($connection, $query);
if (!$result) {
throw new Exception("Database query failed: " . pg_last_error($connection));
}
$row = pg_fetch_row($result);
return intval($row[0]);
}
private function markProductsAsIndexed($products) {
if (empty($products)) {
return true;
}
$connection = $this->ensureDatabaseConnection();
$product_ids = array_column($products, 'id');
$placeholders = implode(',', array_fill(0, count($product_ids), '$' . ($i + 1)));
$query = "UPDATE product SET added_search = TRUE WHERE id IN ($placeholders)";
$result = pg_query_params($connection, $query, $product_ids);
if (!$result) {
throw new Exception("Failed to update products: " . pg_last_error($connection));
}
$this->log("Marked " . count($products) . " products as indexed");
return true;
}
private function resetSyncFlags() {
$connection = $this->ensureDatabaseConnection();
$query = "UPDATE product SET added_search = FALSE";
$result = pg_query($connection, $query);
if (!$result) {
throw new Exception("Failed to reset sync flags: " . pg_last_error($connection));
}
$this->log("Reset sync flags for all products");
return true;
}
private function updateLastProcessedId($products) {
if (empty($products)) {
return;
}
$last_product = end($products);
$this->state['last_processed_id'] = $last_product['id'];
}
private function transformProductForElasticsearch($product) {
return [
'id' => (int)$product['id'],
'title' => $this->cleanText($product['title']),
'brand' => $this->cleanText($product['brand']),
'description' => $this->cleanText($product['description']),
'price' => $this->parseFloat($product['price']),
'list_price' => $this->parseFloat($product['list_price']),
'discount_percentage' => $this->calculateDiscount($product),
'availability' => $this->normalizeAvailability($product['availability']),
'category' => $product['category_name'],
'category_path' => $product['category_path'],
'sku' => $product['sku'],
'ean' => $product['ean'],
'image_url' => $this->getPrimaryImage($product['image_urls']),
'product_url' => $product['product_url'],
'specifications' => $this->parseSpecifications($product['specifications']),
'created_at' => $product['created_at'],
'updated_at' => $product['updated_at']
];
}
private function cleanText($text) {
if (empty($text)) {
return '';
}
// Remove HTML tags
$text = strip_tags($text);
// Normalize whitespace
$text = preg_replace('/\s+/', ' ', $text);
// Trim
$text = trim($text);
return $text;
}
private function parseFloat($value) {
if (empty($value)) {
return 0.0;
}
// Remove non-numeric characters except decimal point
$value = preg_replace('/[^0-9.]/', '', $value);
return floatval($value);
}
private function normalizeAvailability($availability) {
if (empty($availability)) {
return 'Unknown';
}
$availability = strtolower(trim($availability));
// Map common availability terms
$mapping = [
'in stock' => 'In Stock',
'out of stock' => 'Out of Stock',
'pre-order' => 'Pre-order',
'discontinued' => 'Discontinued'
];
return $mapping[$availability] ?? ucwords($availability);
}
private function calculateDiscount($product) {
$price = $this->parseFloat($product['price']);
$list_price = $this->parseFloat($product['list_price']);
if ($list_price <= 0 || $price >= $list_price) {
return 0.0;
}
return round((($list_price - $price) / $list_price) * 100, 2);
}
private function getPrimaryImage($image_urls) {
if (empty($image_urls)) {
return '';
}
$images = json_decode($image_urls, true);
if (is_array($images) && !empty($images)) {
return $images[0];
}
return '';
}
private function parseSpecifications($specifications) {
if (empty($specifications)) {
return [];
}
$specs = json_decode($specifications, true);
return is_array($specs) ? $specs : [];
}
-- Optimize product queries with composite index
CREATE INDEX idx_product_sync_composite ON product(added_search, id)
WHERE title IS NOT NULL;
-- Optimize category joins
CREATE INDEX idx_category_parent ON core_category(parent_id);
-- Optimize updated_at queries for incremental sync
CREATE INDEX idx_product_updated ON product(updated_at DESC)
WHERE added_search = FALSE;
private function optimizeBatchSize() {
// Adjust batch size based on available memory
$memory_limit = ini_get('memory_limit');
$memory_bytes = $this->parseMemoryLimit($memory_limit);
if ($memory_bytes < 256 * 1024 * 1024) { // Less than 256MB
return 100;
} elseif ($memory_bytes < 512 * 1024 * 1024) { // Less than 512MB
return 250;
} else {
return 500; // Default
}
}
private function parseMemoryLimit($limit) {
$limit = trim($limit);
$last = strtolower($limit[strlen($limit) - 1]);
$limit = intval($limit);
switch ($last) {
case 'g':
$limit *= 1024;
case 'm':
$limit *= 1024;
case 'k':
$limit *= 1024;
}
return $limit;
}
private function handleDatabaseError($error) {
$this->log("Database error: $error", 'ERROR');
// Check if it's a connection error
if (strpos($error, 'connection') !== false) {
$this->log("Attempting to reconnect to database", 'INFO');
$this->connection = null; // Force reconnection
$this->ensureDatabaseConnection();
}
// Log error to state
$this->state['errors'][] = [
'timestamp' => date('c'),
'type' => 'database',
'message' => $error
];
$this->saveState();
}
public function testDatabaseConnection() {
try {
$connection = $this->getDatabaseConnection();
$result = pg_query($connection, "SELECT 1");
if ($result) {
pg_free_result($result);
return ['status' => 'success', 'message' => 'Database connection successful'];
} else {
return ['status' => 'error', 'message' => 'Database query failed'];
}
} catch (Exception $e) {
return ['status' => 'error', 'message' => $e->getMessage()];
}
}
public function getSyncStatistics() {
$connection = $this->ensureDatabaseConnection();
$query = "
SELECT
COUNT(*) as total_products,
COUNT(CASE WHEN added_search = TRUE THEN 1 END) as indexed_products,
COUNT(CASE WHEN added_search = FALSE THEN 1 END) as pending_products,
ROUND(
COUNT(CASE WHEN added_search = TRUE THEN 1 END)::numeric /
COUNT(*) * 100, 1
) as progress_percentage
FROM product
WHERE title IS NOT NULL
";
$result = pg_query($connection, $query);
$row = pg_fetch_assoc($result);
return [
'total_products' => intval($row['total_products']),
'indexed_products' => intval($row['indexed_products']),
'pending_products' => intval($row['pending_products']),
'progress_percentage' => floatval($row['progress_percentage'])
];
}
public function getCategoryStatistics() {
$connection = $this->ensureDatabaseConnection();
$query = "
SELECT
c.name as category_name,
COUNT(p.id) as product_count,
COUNT(CASE WHEN p.added_search = TRUE THEN 1 END) as indexed_count
FROM core_category c
LEFT JOIN product p ON c.id = p.category_id
WHERE p.title IS NOT NULL
GROUP BY c.id, c.name
ORDER BY product_count DESC
LIMIT 20
";
$result = pg_query($connection, $query);
return pg_fetch_all($result);
}