Skip to content

Web Scraping (Scrapyd)

DealAI.lt uses Scrapyd for distributed web scraping, automatically collecting product data from multiple Lithuanian e-commerce websites. The system schedules, monitors, and manages scraping jobs across remote servers.

Scrapyd Server:

  • Host: 78.56.0.236
  • Port: 6800
  • Framework: Python Scrapy
  • Queue: Database-backed job queue

File: functions.php

define('SCRAPYD_HOST', '78.56.0.236');
define('SCRAPYD_PORT', '6800');
define('SCRAPYD_URL', 'http://' . SCRAPYD_HOST . ':' . SCRAPYD_PORT);
function scrapyd_api_request($endpoint, $params = []) {
$url = SCRAPYD_URL . $endpoint;
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_TIMEOUT, 30);
if (!empty($params)) {
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($params));
}
$response = curl_exec($ch);
$http_code = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($http_code !== 200) {
error_log("Scrapyd API error: HTTP $http_code");
return false;
}
return json_decode($response, true);
}

Daemon Status:

function scrapyd_daemon_status() {
return scrapyd_api_request('/daemonstatus.json');
}

List Jobs:

function scrapyd_list_jobs($project = 'dealai_scrapers') {
return scrapyd_api_request('/listjobs.json', ['project' => $project]);
}

Schedule Spider:

function scrapyd_schedule_spider($spider_name, $settings = []) {
$params = [
'project' => 'dealai_scrapers',
'spider' => $spider_name
];
// Add custom settings
foreach ($settings as $key => $value) {
$params[$key] = $value;
}
return scrapyd_api_request('/schedule.json', $params);
}

Cancel Job:

function scrapyd_cancel_job($job_id, $project = 'dealai_scrapers') {
return scrapyd_api_request('/cancel.json', [
'project' => $project,
'job' => $job_id
]);
}

List Spiders:

function scrapyd_list_spiders($project = 'dealai_scrapers') {
return scrapyd_api_request('/listspiders.json', ['project' => $project]);
}

File: /scripts/product-crawler-manager.php

Purpose: Manages batch scanning of products

Process:

  1. Select 30 oldest products from database
  2. Queue them for scanning
  3. Schedule Scrapyd job
  4. Track job status
  5. Update database when complete

Implementation:

function manage_product_crawl_batch() {
// Get oldest products (not scanned in 24 hours)
$products = get_products_for_rescan(30);
if (empty($products)) {
echo "No products need scanning\n";
return;
}
// Add to queue
$queue_id = add_products_to_queue($products);
// Schedule spider
$response = scrapyd_schedule_spider('product_spider', [
'queue_id' => $queue_id,
'products' => implode(',', array_column($products, 'id'))
]);
if ($response && $response['status'] === 'ok') {
$job_id = $response['jobid'];
update_queue_job_id($queue_id, $job_id);
echo "Scheduled job: $job_id\n";
} else {
error_log("Failed to schedule spider");
}
}

File: /scripts/crawler-manager.php

Purpose: Crawls entire categories for new products

Implementation:

function schedule_category_crawl($category_id) {
$connection = get_db_connection();
// Get category details
$category = get_category_by_id($category_id);
if (!$category) {
return false;
}
// Create crawl record
$query = "
INSERT INTO category_product_crawl
(category_id, category_url, started)
VALUES ($1, $2, false)
RETURNING id
";
$result = pg_query_params($connection, $query, [
$category_id,
$category['category_url']
]);
$crawl = pg_fetch_assoc($result);
// Schedule spider
$response = scrapyd_schedule_spider('category_spider', [
'category_url' => $category['category_url'],
'crawl_id' => $crawl['id']
]);
if ($response && $response['status'] === 'ok') {
mark_crawl_started($crawl['id'], $response['jobid']);
return $response['jobid'];
}
return false;
}
function get_job_status($job_id) {
$jobs = scrapyd_list_jobs();
if (!$jobs) {
return null;
}
// Check in different states
foreach (['pending', 'running', 'finished'] as $state) {
if (isset($jobs[$state])) {
foreach ($jobs[$state] as $job) {
if ($job['id'] === $job_id) {
return [
'state' => $state,
'spider' => $job['spider'],
'start_time' => $job['start_time'] ?? null,
'end_time' => $job['end_time'] ?? null
];
}
}
}
}
return null;
}
function get_scrapyd_statistics() {
$jobs = scrapyd_list_jobs();
if (!$jobs) {
return [];
}
return [
'pending' => count($jobs['pending'] ?? []),
'running' => count($jobs['running'] ?? []),
'finished' => count($jobs['finished'] ?? []),
'total' => count($jobs['pending'] ?? []) +
count($jobs['running'] ?? []) +
count($jobs['finished'] ?? [])
];
}
CREATE TABLE product_crawl_queue (
id SERIAL PRIMARY KEY,
products JSONB NOT NULL,
job_id VARCHAR(255),
status VARCHAR(50) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
products_processed INTEGER DEFAULT 0,
products_updated INTEGER DEFAULT 0,
errors JSONB
);

Add to Queue:

function add_products_to_queue($products) {
$connection = get_db_connection();
$query = "
INSERT INTO product_crawl_queue (products, status)
VALUES ($1, 'pending')
RETURNING id
";
$result = pg_query_params($connection, $query, [
json_encode($products)
]);
$row = pg_fetch_assoc($result);
return $row['id'];
}

Update Queue Status:

function update_queue_status($queue_id, $status, $stats = []) {
$connection = get_db_connection();
$updates = ["status = $1"];
$params = [$status];
$param_num = 2;
if ($status === 'running') {
$updates[] = "started_at = CURRENT_TIMESTAMP";
} elseif ($status === 'completed') {
$updates[] = "completed_at = CURRENT_TIMESTAMP";
}
if (!empty($stats['processed'])) {
$updates[] = "products_processed = $" . $param_num++;
$params[] = $stats['processed'];
}
if (!empty($stats['updated'])) {
$updates[] = "products_updated = $" . $param_num++;
$params[] = $stats['updated'];
}
$params[] = $queue_id;
$query = "
UPDATE product_crawl_queue
SET " . implode(', ', $updates) . "
WHERE id = $" . $param_num;
pg_query_params($connection, $query, $params);
}
import scrapy
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
class ProductSpider(CrawlSpider):
name = 'product_spider'
def __init__(self, queue_id=None, products=None, *args, **kwargs):
super(ProductSpider, self).__init__(*args, **kwargs)
self.queue_id = queue_id
self.product_ids = products.split(',') if products else []
def start_requests(self):
# Get product URLs from database
products = self.get_products_from_db(self.product_ids)
for product in products:
yield scrapy.Request(
url=product['product_url'],
callback=self.parse_product,
meta={'product_id': product['id']}
)
def parse_product(self, response):
product_id = response.meta['product_id']
# Extract data
data = {
'product_id': product_id,
'title': response.css('h1.product-title::text').get(),
'price': response.css('.price-current::text').get(),
'availability': response.css('.availability::text').get(),
'description': response.css('.product-description::text').get(),
}
# Update database
self.update_product_in_db(data)
yield data
def handle_error(self, failure):
self.logger.error(f"Request failed: {failure.value}")
# Log to database
self.log_error_to_db({
'url': failure.request.url,
'error': str(failure.value),
'timestamp': datetime.now()
})
settings.py
CONCURRENT_REQUESTS = 16
CONCURRENT_REQUESTS_PER_DOMAIN = 8
DOWNLOAD_DELAY = 0.5
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 10
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
RETRY_ENABLED = True
RETRY_TIMES = 3
RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429]
function get_spider_logs($job_id, $project = 'dealai_scrapers') {
$url = SCRAPYD_URL . "/logs/$project/$job_id.log";
$ch = curl_init($url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
$log_content = curl_exec($ch);
curl_close($ch);
return $log_content;
}
function track_spider_errors($job_id) {
$logs = get_spider_logs($job_id);
// Parse errors
preg_match_all('/ERROR:(.*?)$/m', $logs, $matches);
return $matches[1] ?? [];
}
ROBOTSTXT_OBEY = True
USER_AGENT = 'DealAI Bot/1.0 (+http://dealai.lt/bot)'
HTTPCACHE_ENABLED = True
HTTPCACHE_EXPIRATION_SECS = 3600