95 lines
3.5 KiB
PHP
95 lines
3.5 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
require_once __DIR__ . '/../vendor/autoload.php';
|
|
|
|
use App\Core\Application;
|
|
use App\Services\QueueService;
|
|
|
|
// Initialize App (loads .env, etc.)
|
|
$app = new Application(dirname(__DIR__));
|
|
|
|
echo "[*] Musadaq Queue Worker Started...\n";
|
|
|
|
// Signal handling for graceful shutdown
|
|
$keepRunning = true;
|
|
pcntl_async_signals(true);
|
|
pcntl_signal(SIGTERM, function() use (&$keepRunning) {
|
|
echo "[!] SIGTERM received, shutting down gracefully...\n";
|
|
$keepRunning = false;
|
|
});
|
|
|
|
$jobsProcessed = 0;
|
|
$maxJobs = 100; // Prevent memory leaks by restarting after N jobs
|
|
|
|
while ($keepRunning && $jobsProcessed < $maxJobs) {
|
|
$job = QueueService::pop();
|
|
|
|
if ($job) {
|
|
$jobsProcessed++;
|
|
echo "[+] Processing job: {$job['type']} ({$job['id']}) - Attempt: " . (($job['attempts'] ?? 0) + 1) . "\n";
|
|
|
|
$timeout = 120; // 2 minutes max per job
|
|
$completed = false;
|
|
|
|
try {
|
|
pcntl_alarm($timeout);
|
|
$container = $app->getContainer();
|
|
|
|
switch($job['type']) {
|
|
case 'ExtractInvoiceJob':
|
|
case 'invoice_extraction':
|
|
$container->get(\queue\Jobs\ExtractInvoiceJob::class)->handle($job['payload']);
|
|
break;
|
|
case 'SubmitJoFotaraJob':
|
|
case 'submit_jofotara':
|
|
$container->get(\queue\Jobs\SubmitJoFotaraJob::class)->handle($job['payload']);
|
|
break;
|
|
case 'RiskAnalysisJob':
|
|
case 'risk_analysis':
|
|
$container->get(\queue\Jobs\RiskAnalysisJob::class)->handle($job['payload']);
|
|
break;
|
|
default:
|
|
echo "[!] Unknown job type: {$job['type']}\n";
|
|
}
|
|
|
|
pcntl_alarm(0); // Cancel alarm
|
|
$completed = true;
|
|
echo "[✓] Job completed: {$job['id']}\n";
|
|
|
|
// If fallback DB is used, mark done
|
|
$db = \App\Core\Database::getInstance();
|
|
$db->prepare("UPDATE queue_jobs SET status = 'completed', completed_at = NOW() WHERE id = ?")->execute([$job['id']]);
|
|
|
|
} catch (\Throwable $e) {
|
|
pcntl_alarm(0); // Cancel alarm
|
|
echo "[✗] Job failed: {$job['id']} - {$e->getMessage()}\n";
|
|
|
|
$attempts = ($job['attempts'] ?? 0) + 1;
|
|
if ($attempts < 3) {
|
|
// Exponential backoff: 2^attempts * 10 seconds (20s, 40s)
|
|
$delay = pow(2, $attempts) * 10;
|
|
$job['attempts'] = $attempts;
|
|
echo "[!] Retrying job in {$delay} seconds...\n";
|
|
// Note: Delay logic needs to be handled by a sorted set in Redis or a cron,
|
|
// but for simplicity we push back immediately for now, or you'd use a delayed queue.
|
|
QueueService::push($job['type'], $job['payload'], 0, $delay);
|
|
} else {
|
|
echo "[!] Job permanently failed (DLQ): {$job['id']}\n";
|
|
try {
|
|
$db = \App\Core\Database::getInstance();
|
|
// Just set status to failed if error_payload doesn't exist
|
|
$db->prepare("UPDATE queue_jobs SET status = 'failed' WHERE id = ?")->execute([$job['id']]);
|
|
} catch (\Throwable $err) {
|
|
echo "[!] Failed to update DB status: " . $err->getMessage() . "\n";
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
usleep(500000); // 0.5s
|
|
}
|
|
}
|
|
|
|
echo "[*] Worker stopped.\n";
|