/** * ════════════════════════════════════════════════════════════ * مُصادَق (Musadaq) — Invoice Processor (Queue Consumer) / معالج الفواتير * ════════════════════════════════════════════════════════════ * This is the main consumer for the invoice processing queue (Bull). * It orchestrates AI extraction, tax validation, and database storage. * المستهلك الرئيسي لطابور معالجة الفواتير (Bull Queue). * يقوم بالتنسيق بين استخراج البيانات بالذكاء الاصطناعي، التحقق الضريبي، وحفظ البيانات. * ════════════════════════════════════════════════════════════ */ import { Process, Processor, OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull'; import { Logger } from '@nestjs/common'; import { Job } from 'bull'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, DataSource } from 'typeorm'; import { Invoice, InvoiceStatus } from './entities/invoice.entity'; import { InvoiceLine } from './entities/invoice-line.entity'; import { GeminiExtractorService } from './gemini-extractor.service'; import { TaxValidationService } from '../validation/tax-validation.service'; import { ConfigService } from '@nestjs/config'; @Processor('invoice-processing') export class InvoiceProcessor { private readonly logger = new Logger(InvoiceProcessor.name); constructor( @InjectRepository(Invoice) private invoiceRepository: Repository, @InjectRepository(InvoiceLine) private lineRepository: Repository, private geminiExtractor: GeminiExtractorService, private taxValidation: TaxValidationService, private configService: ConfigService, private dataSource: DataSource, ) {} @OnQueueActive() onActive(job: Job) { this.logger.log(`Processing job ${job.id} of type ${job.name}...`); } @OnQueueCompleted() onComplete(job: Job, result: any) { this.logger.log(`Completed job ${job.id} for invoice ${job.data.invoiceId}`); } @OnQueueFailed() onError(job: Job, error: Error) { this.logger.error(`Job ${job.id} failed: ${error.message}`); } /** * الخطوة الأولى: استخراج البيانات باستخدام AI * Step 1: Extract data from the file using Gemini Multimodal AI */ @Process('extract-data') async handleExtraction(job: Job<{ invoiceId: string; filePath: string; tenantId: string; companyId: string }>) { const { invoiceId, filePath, tenantId, companyId } = job.data; const storageRoot = this.configService.get('STORAGE_PATH', './uploads'); try { // 1. Update status to EXTRACTING await this.invoiceRepository.update(invoiceId, { status: InvoiceStatus.EXTRACTING }); // 2. Extract data via Gemini (Now returns an array) const invoicesData = await this.geminiExtractor.extractInvoiceData(filePath, storageRoot); if (!invoicesData || invoicesData.length === 0) { throw new Error('No invoices found in file'); } // 3. Process the first invoice (updates the current record) await this.saveExtractedData(invoiceId, invoicesData[0]); // 4. If multiple invoices found, create new records for others if (invoicesData.length > 1) { this.logger.log(`Found ${invoicesData.length} invoices in file ${filePath}. Creating additional records...`); for (let i = 1; i < invoicesData.length; i++) { const newInvoice = this.invoiceRepository.create({ tenant_id: tenantId, company_id: companyId, original_file_path: filePath, status: InvoiceStatus.EXTRACTING, }); const savedNew = await this.invoiceRepository.save(newInvoice); await this.saveExtractedData(savedNew.id, invoicesData[i]); } } this.logger.log(`Extraction successful for invoice(s) in ${filePath}`); } catch (error) { await this.invoiceRepository.update(invoiceId, { status: InvoiceStatus.VALIDATION_FAILED, }); throw error; } } /** * حفظ البيانات المستخرجة في قاعدة البيانات * Save the JSON data extracted by AI into the SQL database */ private async saveExtractedData(invoiceId: string, data: any) { const queryRunner = this.dataSource.createQueryRunner(); await queryRunner.connect(); await queryRunner.startTransaction(); try { // 1. Update Invoice Header await queryRunner.manager.update(Invoice, invoiceId, { invoice_number: data.invoice_number, invoice_date: data.invoice_date, invoice_type: data.invoice_type || 'cash', ubl_type_code: data.ubl_type_code || '388', payment_method_code: data.payment_method_code || '013', invoice_category: data.invoice_category || 'simplified', supplier_name: data.supplier_name, supplier_tin: data.supplier_tin, buyer_name: data.buyer_name, buyer_tin: data.buyer_tin, buyer_national_id: data.buyer_national_id, subtotal: data.subtotal, discount_total: data.discount_total, tax_amount: data.tax_amount, grand_total: data.grand_total, currency_code: data.currency_code || 'JOD', status: InvoiceStatus.EXTRACTED, }); // 2. Clear old lines if any (shouldn't happen on first extract) await queryRunner.manager.delete(InvoiceLine, { invoice_id: invoiceId }); // 3. Create new lines if (data.lines && Array.isArray(data.lines)) { const lines = data.lines.map((l: any) => queryRunner.manager.create(InvoiceLine, { invoice_id: invoiceId, line_number: l.line_number, description: l.description, quantity: l.quantity, unit_price: l.unit_price, discount: l.discount || 0, tax_rate: l.tax_rate, line_total: l.line_total, }) ); await queryRunner.manager.save(InvoiceLine, lines); } await queryRunner.commitTransaction(); // 4. Trigger Auto-Validation (Internal Step) await this.autoValidate(invoiceId); } catch (error) { await queryRunner.rollbackTransaction(); throw error; } finally { await queryRunner.release(); } } /** * الخطوة الثانية: التحقق الضريبي التلقائي * Step 2: Perform automatic tax rules validation (ISTD Standards) */ private async autoValidate(invoiceId: string) { const invoice = await this.invoiceRepository.findOne({ where: { id: invoiceId }, relations: ['lines'], }); if (!invoice) return; const result = this.taxValidation.validateInvoice(invoice); if (result.isValid) { await this.invoiceRepository.update(invoiceId, { status: InvoiceStatus.VALIDATED, validation_errors: [], }); } else { await this.invoiceRepository.update(invoiceId, { status: InvoiceStatus.VALIDATION_FAILED, validation_errors: result.errors, }); } } }