Skip to content

Data Processing Pipeline

The DealAI.lt data processing pipeline orchestrates the flow of product data from external sources through scraping, storage, indexing, and presentation. This document describes each stage and how they interconnect.

┌──────────────────┐
│ External Sites │
│ (E-commerce) │
└────────┬─────────┘
┌──────────────────┐
│ Web Scraping │
│ (Scrapyd) │
└────────┬─────────┘
┌──────────────────┐
│ PostgreSQL │
│ (Raw Data) │
└────────┬─────────┘
┌──────────────────┐
│ Data Transform │
│ & Validation │
└────────┬─────────┘
┌──────────────────┐
│ Elasticsearch │
│ (Search Index) │
└────────┬─────────┘
┌──────────────────┐
│ Search UI │
│ (WordPress) │
└──────────────────┘

Component: Scrapyd Spiders

Process:

  1. Job scheduled via cron or API
  2. Spider extracts product data
  3. Data normalized and validated
  4. Raw data inserted into PostgreSQL

Output: Product records in product table

Component: PostgreSQL Database

Process:

  1. Receive scraped data
  2. Insert or update product records
  3. Track changes in product_crawl_history
  4. Set added_search = FALSE for new/updated products
  5. Trigger database triggers

Data Quality:

  • Validate required fields
  • Sanitize inputs
  • Deduplicate by external_id
  • Maintain referential integrity

Component: Elasticsearch Auto-Sync

Process: Three-phase synchronization:

Phase 1: Initialize

function initialize_sync() {
// Reset sync flags
$query = "UPDATE product SET added_search = FALSE";
pg_query($connection, $query);
// Create state file
$state = [
'phase' => 'bulk',
'last_processed_id' => 0,
'processed_count' => 0,
'failed_batches' => [],
'started_at' => time()
];
file_put_contents('elasticsearch-sync-state.json', json_encode($state));
}

Phase 2: Bulk Processing

function bulk_sync_phase() {
$state = load_sync_state();
$batch_size = 500;
while (true) {
// Get next batch
$products = get_products_batch(
$state['last_processed_id'],
$batch_size
);
if (empty($products)) {
$state['phase'] = 'incremental';
save_sync_state($state);
break;
}
// Index batch
$success = bulk_index_products($products);
if ($success) {
// Mark as indexed
mark_products_indexed($products);
// Update state
$state['last_processed_id'] = end($products)['id'];
$state['processed_count'] += count($products);
save_sync_state($state);
} else {
// Track failed batch
$state['failed_batches'][] = [
'start_id' => $state['last_processed_id'],
'count' => count($products),
'timestamp' => time()
];
}
// Rate limiting
usleep(100000); // 100ms
}
}

Phase 3: Incremental Updates

function incremental_sync() {
while (true) {
// Get products needing sync
$products = get_products_for_sync(100);
if (!empty($products)) {
bulk_index_products($products);
mark_products_indexed($products);
}
// Sleep until next check
sleep(60);
}
}

Component: Elasticsearch Bulk API

Process:

  1. Transform product data for search
  2. Batch documents (500 at a time)
  3. Send to Elasticsearch Bulk API
  4. Process responses
  5. Handle errors and retries

Transformation:

function transform_for_search($product) {
return [
'id' => (int)$product['id'],
'title' => clean_text($product['title']),
'brand' => $product['brand'],
'description' => strip_tags($product['description']),
'price' => (float)$product['price'],
'list_price' => (float)$product['list_price'],
'discount_percentage' => calculate_discount($product),
'availability' => normalize_availability($product['availability']),
'category' => $product['category_name'],
'category_path' => build_category_path($product['category_id']),
'sku' => $product['sku'],
'image_url' => get_primary_image($product['image_urls']),
'updated_at' => $product['updated_at']
];
}

Component: WordPress Search Interface

Process:

  1. User submits search query
  2. WordPress processes request
  3. Query sent to Elasticsearch
  4. Results rendered with highlighting
  5. Filters applied dynamically

Required Fields:

function validate_product_data($product) {
$required = ['title', 'price', 'product_url'];
foreach ($required as $field) {
if (empty($product[$field])) {
throw new ValidationException("Missing required field: $field");
}
}
return true;
}

Data Normalization:

function normalize_product_data($product) {
return [
'title' => trim($product['title']),
'price' => floatval(preg_replace('/[^0-9.]/', '', $product['price'])),
'availability' => normalize_availability($product['availability']),
'brand' => ucwords(strtolower($product['brand']))
];
}

Retry Logic:

function process_with_retry($callback, $max_attempts = 3) {
$attempt = 0;
$last_error = null;
while ($attempt < $max_attempts) {
try {
return $callback();
} catch (Exception $e) {
$last_error = $e;
$attempt++;
// Exponential backoff
sleep(pow(2, $attempt));
}
}
throw new Exception("Failed after $max_attempts attempts: " . $last_error->getMessage());
}

Benefits:

  • Reduced database connections
  • Lower API overhead
  • Better throughput

Implementation:

function process_in_batches($items, $batch_size, $processor) {
$batches = array_chunk($items, $batch_size);
$results = [];
foreach ($batches as $batch) {
$results = array_merge($results, $processor($batch));
}
return $results;
}
class DatabasePool {
private static $connections = [];
private static $max_connections = 10;
public static function getConnection() {
// Reuse existing connection
if (!empty(self::$connections)) {
return array_pop(self::$connections);
}
return create_new_connection();
}
public static function releaseConnection($connection) {
if (count(self::$connections) < self::$max_connections) {
self::$connections[] = $connection;
} else {
pg_close($connection);
}
}
}
function process_large_dataset() {
$batch_size = 1000;
$offset = 0;
while (true) {
$batch = get_products_batch($offset, $batch_size);
if (empty($batch)) {
break;
}
process_batch($batch);
// Free memory
unset($batch);
gc_collect_cycles();
$offset += $batch_size;
}
}
class PipelineMetrics {
public static function recordProcessing($stage, $duration, $count) {
$metrics = [
'stage' => $stage,
'duration_ms' => $duration,
'items_processed' => $count,
'items_per_second' => $count / ($duration / 1000),
'timestamp' => time()
];
log_metrics($metrics);
}
}
function check_pipeline_health() {
$health = [
'scraping' => check_scrapyd_health(),
'database' => check_database_health(),
'elasticsearch' => check_elasticsearch_health(),
'sync' => check_sync_health()
];
return $health;
}
function save_sync_state($state) {
$state['last_updated'] = time();
file_put_contents(
'elasticsearch-sync-state.json',
json_encode($state, JSON_PRETTY_PRINT)
);
}
function load_sync_state() {
if (!file_exists('elasticsearch-sync-state.json')) {
return initialize_state();
}
return json_decode(
file_get_contents('elasticsearch-sync-state.json'),
true
);
}
function recover_failed_batches() {
$state = load_sync_state();
foreach ($state['failed_batches'] as $key => $batch) {
$products = get_products_range(
$batch['start_id'],
$batch['count']
);
$success = bulk_index_products($products);
if ($success) {
unset($state['failed_batches'][$key]);
save_sync_state($state);
}
}
}