Files
musadaq-saas/app/cron/process_batches.php
2026-05-07 01:14:37 +03:00

156 lines
6.4 KiB
PHP

<?php
/**
* Background Worker for AI Invoice Extraction
* Processes images in the invoice_processing_queue.
*/
declare(strict_types=1);
require_once __DIR__ . '/../bootstrap/init.php';
use App\Core\Database;
use App\Core\AI;
use App\Core\Encryption;
use App\Middleware\QuotaMiddleware;
// Prevent multiple instances (Lock file)
$lockFile = STORAGE_PATH . '/logs/process_batches.lock';
$fp = fopen($lockFile, 'c+');
if (!flock($fp, LOCK_EX | LOCK_NB)) {
exit("Worker already running.\n");
}
echo "Starting Musadaq AI Worker [" . date('Y-m-d H:i:s') . "]\n";
try {
$db = Database::getInstance();
while (true) {
// 1. Get next pending item from queue
$stmt = $db->prepare("
SELECT q.*, b.tenant_id, b.company_id, b.uploaded_by
FROM invoice_processing_queue q
JOIN invoice_batches b ON q.batch_id = b.id
WHERE q.status = 'pending' AND b.status = 'processing'
ORDER BY q.created_at ASC
LIMIT 1
");
$stmt->execute();
$item = $stmt->fetch();
if (!$item) {
echo "Queue empty. Waiting...\n";
sleep(5);
continue;
}
$queueId = $item['id'];
$batchId = $item['batch_id'];
$tenantId = $item['tenant_id'];
$companyId = $item['company_id'];
$userId = $item['uploaded_by'];
$imagePath = $item['image_path'];
echo "Processing Image: $imagePath (Queue ID: $queueId)\n";
// Mark as processing
$db->prepare("UPDATE invoice_processing_queue SET status = 'processing' WHERE id = ?")->execute([$queueId]);
// 2. Perform AI Extraction
if (!file_exists($imagePath)) {
$db->prepare("UPDATE invoice_processing_queue SET status = 'failed', error_message = 'File not found' WHERE id = ?")->execute([$queueId]);
continue;
}
$mimeType = mime_content_type($imagePath) ?: 'image/jpeg';
$fileContent = file_get_contents($imagePath);
$base64Data = base64_encode($fileContent);
$extracted = AI::extractInvoiceData($base64Data, $mimeType);
if (!$extracted) {
echo "AI Extraction Failed.\n";
$db->prepare("UPDATE invoice_processing_queue SET status = 'failed', error_message = 'AI failed to extract' WHERE id = ?")->execute([$queueId]);
continue;
}
// 3. Save to Invoices Table
$db->beginTransaction();
try {
$invoiceId = vsprintf('%s%s-%s-%s-%s-%s%s%s', str_split(bin2hex(random_bytes(16)), 4));
$supplierTin = $extracted['supplier']['tin'] ?? '';
$invoiceNum = $extracted['invoice_number'] ?? '';
$invoiceDate = $extracted['invoice_date'] ?? '';
$validDate = (!empty($invoiceDate) && strtotime($invoiceDate)) ? $invoiceDate : null;
$stmt = $db->prepare("
INSERT INTO invoices (
id, tenant_id, company_id, uploaded_by, original_file_path, status,
invoice_number, invoice_date, invoice_type, invoice_category,
supplier_tin, supplier_name, supplier_address,
buyer_tin, buyer_name, buyer_national_id,
subtotal, tax_amount, discount_total, grand_total, currency_code,
created_at
) VALUES (
?, ?, ?, ?, ?, 'extracted',
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?,
NOW()
)
");
$stmt->execute([
$invoiceId, $tenantId, $companyId, $userId, $imagePath,
$invoiceNum, $validDate, $extracted['invoice_type'] ?? 'cash', $extracted['invoice_category'] ?? 'simplified',
Encryption::encrypt($supplierTin), Encryption::encrypt($extracted['supplier']['name'] ?? ''), Encryption::encrypt($extracted['supplier']['address'] ?? ''),
Encryption::encrypt($extracted['buyer']['tin'] ?? ''), Encryption::encrypt($extracted['buyer']['name'] ?? ''), Encryption::encrypt($extracted['buyer']['national_id'] ?? ''),
$extracted['subtotal'] ?? 0, $extracted['tax_amount'] ?? 0, $extracted['discount_total'] ?? 0, $extracted['grand_total'] ?? 0, $extracted['currency_code'] ?? 'JOD'
]);
// Save Lines
if (!empty($extracted['lines'])) {
$lineStmt = $db->prepare("INSERT INTO invoice_lines (id, invoice_id, line_number, description, quantity, unit_price, tax_rate, line_total) VALUES (?,?,?,?,?,?,?,?)");
foreach ($extracted['lines'] as $idx => $line) {
$lineStmt->execute([
vsprintf('%s%s-%s-%s-%s-%s%s%s', str_split(bin2hex(random_bytes(16)), 4)),
$invoiceId, $line['line_number'] ?? ($idx + 1), $line['description'] ?? '', $line['quantity'] ?? 1, $line['unit_price'] ?? 0, $line['tax_rate'] ?? 0, $line['line_total'] ?? 0
]);
}
}
// Mark queue item done
$db->prepare("UPDATE invoice_processing_queue SET status = 'done', invoice_id = ?, processed_at = NOW() WHERE id = ?")->execute([$invoiceId, $queueId]);
// Update batch progress
$db->prepare("UPDATE invoice_batches SET processed_images = processed_images + 1 WHERE id = ?")->execute([$batchId]);
$db->commit();
echo "Success: Created Invoice $invoiceId\n";
// Increment Quota
QuotaMiddleware::incrementInvoiceUsage($tenantId);
} catch (Exception $e) {
$db->rollBack();
echo "DB Error: " . $e->getMessage() . "\n";
$db->prepare("UPDATE invoice_processing_queue SET status = 'failed', error_message = ? WHERE id = ?")->execute([$e->getMessage(), $queueId]);
}
// Check if batch is complete
$stmt = $db->prepare("SELECT total_images, processed_images FROM invoice_batches WHERE id = ?");
$stmt->execute([$batchId]);
$batch = $stmt->fetch();
if ($batch && $batch['processed_images'] >= $batch['total_images']) {
$db->prepare("UPDATE invoice_batches SET status = 'done', completed_at = NOW() WHERE id = ?")->execute([$batchId]);
echo "Batch $batchId Complete!\n";
}
}
} catch (Exception $e) {
echo "Fatal Worker Error: " . $e->getMessage() . "\n";
} finally {
flock($fp, LOCK_UN);
fclose($fp);
}