Web Scraping (Scrapyd)
Overview
Section titled “Overview”DealAI.lt uses Scrapyd for distributed web scraping, automatically collecting product data from multiple Lithuanian e-commerce websites. The system schedules, monitors, and manages scraping jobs across remote servers.
Architecture
Section titled “Architecture”Scrapyd Server:
- Host:
78.56.0.236 - Port:
6800 - Framework: Python Scrapy
- Queue: Database-backed job queue
Scrapyd API Integration
Section titled “Scrapyd API Integration”API Endpoints
Section titled “API Endpoints”File: functions.php
define('SCRAPYD_HOST', '78.56.0.236');define('SCRAPYD_PORT', '6800');define('SCRAPYD_URL', 'http://' . SCRAPYD_HOST . ':' . SCRAPYD_PORT);
function scrapyd_api_request($endpoint, $params = []) { $url = SCRAPYD_URL . $endpoint;
$ch = curl_init(); curl_setopt($ch, CURLOPT_URL, $url); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_TIMEOUT, 30);
if (!empty($params)) { curl_setopt($ch, CURLOPT_POST, true); curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($params)); }
$response = curl_exec($ch); $http_code = curl_getinfo($ch, CURLINFO_HTTP_CODE); curl_close($ch);
if ($http_code !== 200) { error_log("Scrapyd API error: HTTP $http_code"); return false; }
return json_decode($response, true);}Core API Methods
Section titled “Core API Methods”Daemon Status:
function scrapyd_daemon_status() { return scrapyd_api_request('/daemonstatus.json');}List Jobs:
function scrapyd_list_jobs($project = 'dealai_scrapers') { return scrapyd_api_request('/listjobs.json', ['project' => $project]);}Schedule Spider:
function scrapyd_schedule_spider($spider_name, $settings = []) { $params = [ 'project' => 'dealai_scrapers', 'spider' => $spider_name ];
// Add custom settings foreach ($settings as $key => $value) { $params[$key] = $value; }
return scrapyd_api_request('/schedule.json', $params);}Cancel Job:
function scrapyd_cancel_job($job_id, $project = 'dealai_scrapers') { return scrapyd_api_request('/cancel.json', [ 'project' => $project, 'job' => $job_id ]);}List Spiders:
function scrapyd_list_spiders($project = 'dealai_scrapers') { return scrapyd_api_request('/listspiders.json', ['project' => $project]);}Spider Management
Section titled “Spider Management”Product Crawler Manager
Section titled “Product Crawler Manager”File: /scripts/product-crawler-manager.php
Purpose: Manages batch scanning of products
Process:
- Select 30 oldest products from database
- Queue them for scanning
- Schedule Scrapyd job
- Track job status
- Update database when complete
Implementation:
function manage_product_crawl_batch() { // Get oldest products (not scanned in 24 hours) $products = get_products_for_rescan(30);
if (empty($products)) { echo "No products need scanning\n"; return; }
// Add to queue $queue_id = add_products_to_queue($products);
// Schedule spider $response = scrapyd_schedule_spider('product_spider', [ 'queue_id' => $queue_id, 'products' => implode(',', array_column($products, 'id')) ]);
if ($response && $response['status'] === 'ok') { $job_id = $response['jobid']; update_queue_job_id($queue_id, $job_id); echo "Scheduled job: $job_id\n"; } else { error_log("Failed to schedule spider"); }}Category Crawler
Section titled “Category Crawler”File: /scripts/crawler-manager.php
Purpose: Crawls entire categories for new products
Implementation:
function schedule_category_crawl($category_id) { $connection = get_db_connection();
// Get category details $category = get_category_by_id($category_id);
if (!$category) { return false; }
// Create crawl record $query = " INSERT INTO category_product_crawl (category_id, category_url, started) VALUES ($1, $2, false) RETURNING id ";
$result = pg_query_params($connection, $query, [ $category_id, $category['category_url'] ]);
$crawl = pg_fetch_assoc($result);
// Schedule spider $response = scrapyd_schedule_spider('category_spider', [ 'category_url' => $category['category_url'], 'crawl_id' => $crawl['id'] ]);
if ($response && $response['status'] === 'ok') { mark_crawl_started($crawl['id'], $response['jobid']); return $response['jobid']; }
return false;}Job Monitoring
Section titled “Job Monitoring”Job Status Tracking
Section titled “Job Status Tracking”function get_job_status($job_id) { $jobs = scrapyd_list_jobs();
if (!$jobs) { return null; }
// Check in different states foreach (['pending', 'running', 'finished'] as $state) { if (isset($jobs[$state])) { foreach ($jobs[$state] as $job) { if ($job['id'] === $job_id) { return [ 'state' => $state, 'spider' => $job['spider'], 'start_time' => $job['start_time'] ?? null, 'end_time' => $job['end_time'] ?? null ]; } } } }
return null;}Real-time Statistics
Section titled “Real-time Statistics”function get_scrapyd_statistics() { $jobs = scrapyd_list_jobs();
if (!$jobs) { return []; }
return [ 'pending' => count($jobs['pending'] ?? []), 'running' => count($jobs['running'] ?? []), 'finished' => count($jobs['finished'] ?? []), 'total' => count($jobs['pending'] ?? []) + count($jobs['running'] ?? []) + count($jobs['finished'] ?? []) ];}Queue Management
Section titled “Queue Management”Database Queue Table
Section titled “Database Queue Table”CREATE TABLE product_crawl_queue ( id SERIAL PRIMARY KEY, products JSONB NOT NULL, job_id VARCHAR(255), status VARCHAR(50) DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, started_at TIMESTAMP, completed_at TIMESTAMP, products_processed INTEGER DEFAULT 0, products_updated INTEGER DEFAULT 0, errors JSONB);Queue Operations
Section titled “Queue Operations”Add to Queue:
function add_products_to_queue($products) { $connection = get_db_connection();
$query = " INSERT INTO product_crawl_queue (products, status) VALUES ($1, 'pending') RETURNING id ";
$result = pg_query_params($connection, $query, [ json_encode($products) ]);
$row = pg_fetch_assoc($result); return $row['id'];}Update Queue Status:
function update_queue_status($queue_id, $status, $stats = []) { $connection = get_db_connection();
$updates = ["status = $1"]; $params = [$status]; $param_num = 2;
if ($status === 'running') { $updates[] = "started_at = CURRENT_TIMESTAMP"; } elseif ($status === 'completed') { $updates[] = "completed_at = CURRENT_TIMESTAMP"; }
if (!empty($stats['processed'])) { $updates[] = "products_processed = $" . $param_num++; $params[] = $stats['processed']; }
if (!empty($stats['updated'])) { $updates[] = "products_updated = $" . $param_num++; $params[] = $stats['updated']; }
$params[] = $queue_id;
$query = " UPDATE product_crawl_queue SET " . implode(', ', $updates) . " WHERE id = $" . $param_num;
pg_query_params($connection, $query, $params);}Spider Implementation (Python)
Section titled “Spider Implementation (Python)”Basic Spider Structure
Section titled “Basic Spider Structure”import scrapyfrom scrapy.spiders import CrawlSpider, Rulefrom scrapy.linkextractors import LinkExtractor
class ProductSpider(CrawlSpider): name = 'product_spider'
def __init__(self, queue_id=None, products=None, *args, **kwargs): super(ProductSpider, self).__init__(*args, **kwargs) self.queue_id = queue_id self.product_ids = products.split(',') if products else []
def start_requests(self): # Get product URLs from database products = self.get_products_from_db(self.product_ids)
for product in products: yield scrapy.Request( url=product['product_url'], callback=self.parse_product, meta={'product_id': product['id']} )
def parse_product(self, response): product_id = response.meta['product_id']
# Extract data data = { 'product_id': product_id, 'title': response.css('h1.product-title::text').get(), 'price': response.css('.price-current::text').get(), 'availability': response.css('.availability::text').get(), 'description': response.css('.product-description::text').get(), }
# Update database self.update_product_in_db(data)
yield dataError Handling
Section titled “Error Handling”def handle_error(self, failure): self.logger.error(f"Request failed: {failure.value}")
# Log to database self.log_error_to_db({ 'url': failure.request.url, 'error': str(failure.value), 'timestamp': datetime.now() })Performance Optimization
Section titled “Performance Optimization”Concurrent Requests
Section titled “Concurrent Requests”CONCURRENT_REQUESTS = 16CONCURRENT_REQUESTS_PER_DOMAIN = 8DOWNLOAD_DELAY = 0.5Rate Limiting
Section titled “Rate Limiting”AUTOTHROTTLE_ENABLED = TrueAUTOTHROTTLE_START_DELAY = 1AUTOTHROTTLE_MAX_DELAY = 10AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0Retry Logic
Section titled “Retry Logic”RETRY_ENABLED = TrueRETRY_TIMES = 3RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429]Monitoring & Logging
Section titled “Monitoring & Logging”Log Access
Section titled “Log Access”function get_spider_logs($job_id, $project = 'dealai_scrapers') { $url = SCRAPYD_URL . "/logs/$project/$job_id.log";
$ch = curl_init($url); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); $log_content = curl_exec($ch); curl_close($ch);
return $log_content;}Error Tracking
Section titled “Error Tracking”function track_spider_errors($job_id) { $logs = get_spider_logs($job_id);
// Parse errors preg_match_all('/ERROR:(.*?)$/m', $logs, $matches);
return $matches[1] ?? [];}Best Practices
Section titled “Best Practices”Respect robots.txt
Section titled “Respect robots.txt”ROBOTSTXT_OBEY = TrueUser Agent
Section titled “User Agent”USER_AGENT = 'DealAI Bot/1.0 (+http://dealai.lt/bot)'Caching
Section titled “Caching”HTTPCACHE_ENABLED = TrueHTTPCACHE_EXPIRATION_SECS = 3600Next Steps
Section titled “Next Steps”- Data Pipeline - Data processing flow
- Scrapyd Dashboard - Monitoring interface
- Job Monitoring - Job tracking