Data Processing Pipeline
Pipeline Overview
Section titled “Pipeline Overview”The DealAI.lt data processing pipeline orchestrates the flow of product data from external sources through scraping, storage, indexing, and presentation. This document describes each stage and how they interconnect.
Pipeline Architecture
Section titled “Pipeline Architecture”┌──────────────────┐│ External Sites ││ (E-commerce) │└────────┬─────────┘ │ ↓┌──────────────────┐│ Web Scraping ││ (Scrapyd) │└────────┬─────────┘ │ ↓┌──────────────────┐│ PostgreSQL ││ (Raw Data) │└────────┬─────────┘ │ ↓┌──────────────────┐│ Data Transform ││ & Validation │└────────┬─────────┘ │ ↓┌──────────────────┐│ Elasticsearch ││ (Search Index) │└────────┬─────────┘ │ ↓┌──────────────────┐│ Search UI ││ (WordPress) │└──────────────────┘Pipeline Stages
Section titled “Pipeline Stages”Stage 1: Data Collection
Section titled “Stage 1: Data Collection”Component: Scrapyd Spiders
Process:
- Job scheduled via cron or API
- Spider extracts product data
- Data normalized and validated
- Raw data inserted into PostgreSQL
Output: Product records in product table
Stage 2: Storage
Section titled “Stage 2: Storage”Component: PostgreSQL Database
Process:
- Receive scraped data
- Insert or update product records
- Track changes in
product_crawl_history - Set
added_search = FALSEfor new/updated products - Trigger database triggers
Data Quality:
- Validate required fields
- Sanitize inputs
- Deduplicate by external_id
- Maintain referential integrity
Stage 3: Synchronization
Section titled “Stage 3: Synchronization”Component: Elasticsearch Auto-Sync
Process: Three-phase synchronization:
Phase 1: Initialize
function initialize_sync() { // Reset sync flags $query = "UPDATE product SET added_search = FALSE"; pg_query($connection, $query);
// Create state file $state = [ 'phase' => 'bulk', 'last_processed_id' => 0, 'processed_count' => 0, 'failed_batches' => [], 'started_at' => time() ];
file_put_contents('elasticsearch-sync-state.json', json_encode($state));}Phase 2: Bulk Processing
function bulk_sync_phase() { $state = load_sync_state(); $batch_size = 500;
while (true) { // Get next batch $products = get_products_batch( $state['last_processed_id'], $batch_size );
if (empty($products)) { $state['phase'] = 'incremental'; save_sync_state($state); break; }
// Index batch $success = bulk_index_products($products);
if ($success) { // Mark as indexed mark_products_indexed($products);
// Update state $state['last_processed_id'] = end($products)['id']; $state['processed_count'] += count($products); save_sync_state($state); } else { // Track failed batch $state['failed_batches'][] = [ 'start_id' => $state['last_processed_id'], 'count' => count($products), 'timestamp' => time() ]; }
// Rate limiting usleep(100000); // 100ms }}Phase 3: Incremental Updates
function incremental_sync() { while (true) { // Get products needing sync $products = get_products_for_sync(100);
if (!empty($products)) { bulk_index_products($products); mark_products_indexed($products); }
// Sleep until next check sleep(60); }}Stage 4: Search Indexing
Section titled “Stage 4: Search Indexing”Component: Elasticsearch Bulk API
Process:
- Transform product data for search
- Batch documents (500 at a time)
- Send to Elasticsearch Bulk API
- Process responses
- Handle errors and retries
Transformation:
function transform_for_search($product) { return [ 'id' => (int)$product['id'], 'title' => clean_text($product['title']), 'brand' => $product['brand'], 'description' => strip_tags($product['description']), 'price' => (float)$product['price'], 'list_price' => (float)$product['list_price'], 'discount_percentage' => calculate_discount($product), 'availability' => normalize_availability($product['availability']), 'category' => $product['category_name'], 'category_path' => build_category_path($product['category_id']), 'sku' => $product['sku'], 'image_url' => get_primary_image($product['image_urls']), 'updated_at' => $product['updated_at'] ];}Stage 5: Presentation
Section titled “Stage 5: Presentation”Component: WordPress Search Interface
Process:
- User submits search query
- WordPress processes request
- Query sent to Elasticsearch
- Results rendered with highlighting
- Filters applied dynamically
Data Quality Assurance
Section titled “Data Quality Assurance”Validation Rules
Section titled “Validation Rules”Required Fields:
function validate_product_data($product) { $required = ['title', 'price', 'product_url'];
foreach ($required as $field) { if (empty($product[$field])) { throw new ValidationException("Missing required field: $field"); } }
return true;}Data Normalization:
function normalize_product_data($product) { return [ 'title' => trim($product['title']), 'price' => floatval(preg_replace('/[^0-9.]/', '', $product['price'])), 'availability' => normalize_availability($product['availability']), 'brand' => ucwords(strtolower($product['brand'])) ];}Error Handling
Section titled “Error Handling”Retry Logic:
function process_with_retry($callback, $max_attempts = 3) { $attempt = 0; $last_error = null;
while ($attempt < $max_attempts) { try { return $callback(); } catch (Exception $e) { $last_error = $e; $attempt++;
// Exponential backoff sleep(pow(2, $attempt)); } }
throw new Exception("Failed after $max_attempts attempts: " . $last_error->getMessage());}Performance Optimization
Section titled “Performance Optimization”Batch Processing
Section titled “Batch Processing”Benefits:
- Reduced database connections
- Lower API overhead
- Better throughput
Implementation:
function process_in_batches($items, $batch_size, $processor) { $batches = array_chunk($items, $batch_size); $results = [];
foreach ($batches as $batch) { $results = array_merge($results, $processor($batch)); }
return $results;}Connection Pooling
Section titled “Connection Pooling”class DatabasePool { private static $connections = []; private static $max_connections = 10;
public static function getConnection() { // Reuse existing connection if (!empty(self::$connections)) { return array_pop(self::$connections); }
return create_new_connection(); }
public static function releaseConnection($connection) { if (count(self::$connections) < self::$max_connections) { self::$connections[] = $connection; } else { pg_close($connection); } }}Memory Management
Section titled “Memory Management”function process_large_dataset() { $batch_size = 1000; $offset = 0;
while (true) { $batch = get_products_batch($offset, $batch_size);
if (empty($batch)) { break; }
process_batch($batch);
// Free memory unset($batch); gc_collect_cycles();
$offset += $batch_size; }}Monitoring & Observability
Section titled “Monitoring & Observability”Pipeline Metrics
Section titled “Pipeline Metrics”class PipelineMetrics { public static function recordProcessing($stage, $duration, $count) { $metrics = [ 'stage' => $stage, 'duration_ms' => $duration, 'items_processed' => $count, 'items_per_second' => $count / ($duration / 1000), 'timestamp' => time() ];
log_metrics($metrics); }}Health Checks
Section titled “Health Checks”function check_pipeline_health() { $health = [ 'scraping' => check_scrapyd_health(), 'database' => check_database_health(), 'elasticsearch' => check_elasticsearch_health(), 'sync' => check_sync_health() ];
return $health;}State Management
Section titled “State Management”Sync State Persistence
Section titled “Sync State Persistence”function save_sync_state($state) { $state['last_updated'] = time(); file_put_contents( 'elasticsearch-sync-state.json', json_encode($state, JSON_PRETTY_PRINT) );}
function load_sync_state() { if (!file_exists('elasticsearch-sync-state.json')) { return initialize_state(); }
return json_decode( file_get_contents('elasticsearch-sync-state.json'), true );}Error Recovery
Section titled “Error Recovery”Failed Batch Recovery
Section titled “Failed Batch Recovery”function recover_failed_batches() { $state = load_sync_state();
foreach ($state['failed_batches'] as $key => $batch) { $products = get_products_range( $batch['start_id'], $batch['count'] );
$success = bulk_index_products($products);
if ($success) { unset($state['failed_batches'][$key]); save_sync_state($state); } }}Next Steps
Section titled “Next Steps”- Automation & Cron Jobs - Scheduling system
- Elasticsearch Console - Monitor sync
- Performance Optimization - Tuning guide