Update: 2026-05-08 15:02:13

This commit is contained in:
Hamza-Ayed
2026-05-08 15:02:13 +03:00
parent 80949e584c
commit 7ea42f0f3b
4 changed files with 430 additions and 156 deletions

View File

@@ -1,56 +1,91 @@
<?php
/**
* Background Worker for AI Invoice Extraction
* Processes images in the invoice_processing_queue.
* Cron Worker for AI Invoice Extraction
*
* Designed to run via cron every minute: * * * * *
* Processes ALL pending items in the queue, then EXITS.
* NO infinite loop. NO lock file issues.
*/
declare(strict_types=1);
require_once __DIR__ . '/../bootstrap/init.php';
use App\Core\Database;
use App\Services\InvoiceProcessor;
// Prevent multiple instances (Lock file)
// Simple lock: prevent overlapping runs
$lockFile = STORAGE_PATH . '/logs/process_batches.lock';
$fp = fopen($lockFile, 'c+');
if (!flock($fp, LOCK_EX | LOCK_NB)) {
exit("Worker already running.\n");
// Check if lock file exists and is stale (older than 5 minutes = dead process)
if (file_exists($lockFile)) {
$lockAge = time() - filemtime($lockFile);
if ($lockAge > 300) {
// Stale lock from a crashed process - remove it
@unlink($lockFile);
workerLog("Removed stale lock file (age: {$lockAge}s)");
} else {
workerLog("Worker already running (lock age: {$lockAge}s). Exiting.");
exit(0);
}
}
echo "Starting Musadaq AI Worker [" . date('Y-m-d H:i:s') . "]\n";
// Create lock
file_put_contents($lockFile, getmypid() . "\n" . date('c'));
function workerLog(string $msg): void {
$line = "[" . date('Y-m-d H:i:s') . "] " . $msg . "\n";
echo $line;
// Also write to dedicated log file
@file_put_contents(STORAGE_PATH . '/logs/worker.log', $line, FILE_APPEND);
}
workerLog("=== Musadaq AI Worker Started ===");
try {
$db = Database::getInstance();
$processed = 0;
$failed = 0;
while (true) {
$stmt = $db->prepare("
SELECT id FROM invoice_processing_queue
WHERE status = 'pending'
ORDER BY created_at ASC
LIMIT 1
");
$stmt->execute();
$queueId = $stmt->fetchColumn();
// Get ALL pending items (no infinite loop!)
$stmt = $db->prepare("
SELECT id FROM invoice_processing_queue
WHERE status = 'pending'
ORDER BY created_at ASC
LIMIT 20
");
$stmt->execute();
$items = $stmt->fetchAll(\PDO::FETCH_COLUMN);
if (!$queueId) {
echo "Queue empty. Waiting...\n";
sleep(5);
continue;
if (empty($items)) {
workerLog("No pending items. Exiting.");
} else {
workerLog("Found " . count($items) . " pending item(s).");
foreach ($items as $queueId) {
workerLog("Processing Queue ID: $queueId ...");
try {
$success = InvoiceProcessor::processQueueItem((int)$queueId);
if ($success) {
$processed++;
workerLog(" ✓ Queue ID $queueId processed successfully.");
} else {
$failed++;
workerLog(" ✗ Queue ID $queueId failed (returned false).");
}
} catch (\Throwable $e) {
$failed++;
workerLog(" ✗ Queue ID $queueId EXCEPTION: " . $e->getMessage());
}
}
echo "Processing Queue ID: $queueId\n";
$success = InvoiceProcessor::processQueueItem((int)$queueId);
if ($success) {
echo "Success for Queue ID $queueId\n";
} else {
echo "Failed for Queue ID $queueId\n";
}
workerLog("=== Worker Done: $processed success, $failed failed ===");
}
} catch (Exception $e) {
echo "Fatal Worker Error: " . $e->getMessage() . "\n";
} catch (\Throwable $e) {
workerLog("FATAL ERROR: " . $e->getMessage() . "\n" . $e->getTraceAsString());
} finally {
flock($fp, LOCK_UN);
fclose($fp);
// ALWAYS remove lock file
@unlink($lockFile);
}