prepare(" SELECT q.*, b.tenant_id, b.company_id, b.uploaded_by FROM invoice_processing_queue q JOIN invoice_batches b ON q.batch_id = b.id COLLATE utf8mb4_unicode_ci WHERE q.status = 'pending' AND b.status = 'processing' ORDER BY q.created_at ASC LIMIT 1 "); $stmt->execute(); $item = $stmt->fetch(); if (!$item) { echo "Queue empty. Waiting...\n"; sleep(5); continue; } $queueId = $item['id']; $batchId = $item['batch_id']; $tenantId = $item['tenant_id']; $companyId = $item['company_id']; $userId = $item['uploaded_by']; $imagePath = $item['image_path']; echo "Processing Image: $imagePath (Queue ID: $queueId)\n"; // Mark as processing $db->prepare("UPDATE invoice_processing_queue SET status = 'processing' WHERE id = ?")->execute([$queueId]); // 2. Perform AI Extraction if (!file_exists($imagePath)) { $db->prepare("UPDATE invoice_processing_queue SET status = 'failed', error_message = 'File not found' WHERE id = ?")->execute([$queueId]); continue; } $mimeType = mime_content_type($imagePath) ?: 'image/jpeg'; $fileContent = file_get_contents($imagePath); $base64Data = base64_encode($fileContent); $extracted = AI::extractInvoiceData($base64Data, $mimeType); if (!$extracted) { echo "AI Extraction Failed.\n"; $db->prepare("UPDATE invoice_processing_queue SET status = 'failed', error_message = 'AI failed to extract' WHERE id = ?")->execute([$queueId]); continue; } // 3. Save to Invoices Table $db->beginTransaction(); try { $invoiceId = vsprintf('%s%s-%s-%s-%s-%s%s%s', str_split(bin2hex(random_bytes(16)), 4)); $supplierTin = $extracted['supplier']['tin'] ?? ''; $invoiceNum = $extracted['invoice_number'] ?? ''; $invoiceDate = $extracted['invoice_date'] ?? ''; $validDate = (!empty($invoiceDate) && strtotime($invoiceDate)) ? $invoiceDate : null; $stmt = $db->prepare(" INSERT INTO invoices ( id, tenant_id, company_id, uploaded_by, original_file_path, status, invoice_number, invoice_date, invoice_type, invoice_category, supplier_tin, supplier_name, supplier_address, buyer_tin, buyer_name, buyer_national_id, subtotal, tax_amount, discount_total, grand_total, currency_code, created_at ) VALUES ( ?, ?, ?, ?, ?, 'extracted', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW() ) "); $stmt->execute([ $invoiceId, $tenantId, $companyId, $userId, $imagePath, $invoiceNum, $validDate, $extracted['invoice_type'] ?? 'cash', $extracted['invoice_category'] ?? 'simplified', Encryption::encrypt($supplierTin), Encryption::encrypt($extracted['supplier']['name'] ?? ''), Encryption::encrypt($extracted['supplier']['address'] ?? ''), Encryption::encrypt($extracted['buyer']['tin'] ?? ''), Encryption::encrypt($extracted['buyer']['name'] ?? ''), Encryption::encrypt($extracted['buyer']['national_id'] ?? ''), $extracted['subtotal'] ?? 0, $extracted['tax_amount'] ?? 0, $extracted['discount_total'] ?? 0, $extracted['grand_total'] ?? 0, $extracted['currency_code'] ?? 'JOD' ]); // Save Lines if (!empty($extracted['lines'])) { $lineStmt = $db->prepare("INSERT INTO invoice_lines (id, invoice_id, line_number, description, quantity, unit_price, tax_rate, line_total) VALUES (?,?,?,?,?,?,?,?)"); foreach ($extracted['lines'] as $idx => $line) { $lineStmt->execute([ vsprintf('%s%s-%s-%s-%s-%s%s%s', str_split(bin2hex(random_bytes(16)), 4)), $invoiceId, $line['line_number'] ?? ($idx + 1), $line['description'] ?? '', $line['quantity'] ?? 1, $line['unit_price'] ?? 0, $line['tax_rate'] ?? 0, $line['line_total'] ?? $line['total_amount'] ?? 0 ]); } } // Mark queue item done $db->prepare("UPDATE invoice_processing_queue SET status = 'done', invoice_id = ?, processed_at = NOW() WHERE id = ?")->execute([$invoiceId, $queueId]); // Update batch progress $db->prepare("UPDATE invoice_batches SET processed_images = processed_images + 1 WHERE id = ?")->execute([$batchId]); $db->commit(); echo "Success: Created Invoice $invoiceId\n"; // Send silent push update for progress $stmt = $db->prepare("SELECT total_images, processed_images, uploaded_by FROM invoice_batches WHERE id = ?"); $stmt->execute([$batchId]); $currentBatch = $stmt->fetch(); if ($currentBatch) { $notifier = new NotificationService(); $notifier->sendDataNotification($currentBatch['uploaded_by'], [ 'type' => 'batch_progress', 'batch_id' => $batchId, 'processed' => $currentBatch['processed_images'], 'total' => $currentBatch['total_images'] ]); } // Increment Quota QuotaMiddleware::incrementInvoiceUsage($tenantId); } catch (Exception $e) { $db->rollBack(); echo "DB Error: " . $e->getMessage() . "\n"; $db->prepare("UPDATE invoice_processing_queue SET status = 'failed', error_message = ? WHERE id = ?")->execute([$e->getMessage(), $queueId]); } // Check if batch is complete $stmt = $db->prepare("SELECT total_images, processed_images, uploaded_by FROM invoice_batches WHERE id = ?"); $stmt->execute([$batchId]); $batch = $stmt->fetch(); if ($batch && $batch['processed_images'] >= $batch['total_images']) { $db->prepare("UPDATE invoice_batches SET status = 'done', completed_at = NOW() WHERE id = ?")->execute([$batchId]); echo "Batch $batchId Complete!\n"; // Send Notification to user $notifier = new NotificationService(); $title = "اكتملت معالجة الدفعة"; $body = "تمت معالجة جميع الفواتير بنجاح. يمكنك الآن مراجعتها وتدقيقها في لوحة التحكم قبل اعتمادها."; $notifier->sendNotification($batch['uploaded_by'], $title, $body, ['batch_id' => $batchId]); } } } catch (Exception $e) { echo "Fatal Worker Error: " . $e->getMessage() . "\n"; } finally { flock($fp, LOCK_UN); fclose($fp); }