diff --git a/backend/app/Controllers/WhatsAppController.php b/backend/app/Controllers/WhatsAppController.php index 404bcd2..6d7c9f2 100644 --- a/backend/app/Controllers/WhatsAppController.php +++ b/backend/app/Controllers/WhatsAppController.php @@ -202,19 +202,40 @@ class WhatsAppController extends BaseController $msgData = $body['message']; + // 0. Check if this message has already been processed (deduplication) + if (!empty($msgData['id'])) { + $alreadyLogged = \App\Core\Database::selectOne( + "SELECT id FROM messages_log WHERE whatsapp_message_id = ? LIMIT 1", + [$msgData['id']] + ); + if ($alreadyLogged) { + // Message already processed, return 200 immediately to prevent duplicate replies & DB errors + $response->status(200)->json([ + 'status' => 'success', + 'message' => 'Message already processed (duplicate detected)' + ]); + return; + } + } + // 1. Find or create the contact in the CRM $contact = \App\Models\Contact::findByPhone($session['company_id'], $msgData['phone']); if (!$contact) { - - // Determine a fallback name $contactName = !empty($msgData['name']) ? $msgData['name'] : 'WA-' . substr($msgData['phone'], -4); - \App\Models\Contact::createSecure([ - 'company_id' => $session['company_id'], - 'name' => $contactName, - 'phone' => $msgData['phone'], - 'notes' => 'Auto-created via incoming WhatsApp message' - ]); + try { + \App\Models\Contact::createSecure([ + 'company_id' => $session['company_id'], + 'name' => $contactName, + 'phone' => $msgData['phone'], + 'notes' => 'Auto-created via incoming WhatsApp message' + ]); + } catch (\PDOException $e) { + // Ignore duplicate contact error if another thread created it concurrently + if ($e->getCode() !== '23000' && strpos($e->getMessage(), '1062') === false) { + throw $e; + } + } } // 2. Log the incoming message in history log diff --git a/backend/app/Models/MessageLog.php b/backend/app/Models/MessageLog.php index a0103e1..ef826f4 100644 --- a/backend/app/Models/MessageLog.php +++ b/backend/app/Models/MessageLog.php @@ -30,6 +30,25 @@ class MessageLog extends BaseModel $data['media_url'] = Security::encrypt($data['media_url']); } - return self::create($data); + try { + return self::create($data); + } catch (\PDOException $e) { + // Handle duplicate entry gracefully + if ($e->getCode() === '23000' || strpos($e->getMessage(), '1062') !== false) { + error_log("[MessageLog] Duplicate whatsapp_message_id: " . ($data['whatsapp_message_id'] ?? 'unknown')); + // Retrieve and return existing log record + if (!empty($data['whatsapp_message_id'])) { + $existing = \App\Core\Database::select( + "SELECT * FROM " . static::$table . " WHERE whatsapp_message_id = ? LIMIT 1", + [$data['whatsapp_message_id']] + ); + if (!empty($existing)) { + return $existing[0]; + } + } + return null; + } + throw $e; + } } } diff --git a/whatsapp-gateway/baileys-client.js b/whatsapp-gateway/baileys-client.js index 333b806..a160545 100644 --- a/whatsapp-gateway/baileys-client.js +++ b/whatsapp-gateway/baileys-client.js @@ -8,6 +8,7 @@ const path = require('path'); const sessions = new Map(); // Store active sockets in memory const retryCounters = new Map(); // Track reconnection attempts per session +const recentMessages = new Map(); // Cache of recent messages in memory to serve getMessage callback const MAX_RETRIES = 5; // Maximum reconnection attempts before giving up @@ -63,7 +64,15 @@ async function startSession(session_key, webhook_url) { auth: state, printQRInTerminal: false, logger: pino({ level: 'silent' }), - browser: ['Nabeh Gateway', 'Chrome', '120.0.0'] + browser: ['Nabeh Gateway', 'Chrome', '120.0.0'], + getMessage: async (key) => { + if (recentMessages.has(key.id)) { + return recentMessages.get(key.id); + } + return { + conversation: ' ' // Fallback message content to trigger E2EE decryption retry on companion devices + }; + } }; if (version) socketConfig.version = version; @@ -81,6 +90,17 @@ async function startSession(session_key, webhook_url) { console.log(`[Upsert] First message keys:`, Object.keys(m.messages[0])); console.log(`[Upsert] First message key:`, JSON.stringify(m.messages[0].key)); console.log(`[Upsert] First message structure:`, JSON.stringify(m.messages[0].message)); + + // Cache all incoming messages to serve E2EE retries + for (const msg of m.messages) { + if (msg.key && msg.key.id && msg.message) { + recentMessages.set(msg.key.id, msg.message); + if (recentMessages.size > 2000) { + const firstKey = recentMessages.keys().next().value; + recentMessages.delete(firstKey); + } + } + } } if (m.type !== 'notify') return; @@ -296,28 +316,38 @@ async function sendMessage(session_key, phone, message, mediaUrl = null, audioBa } let jid = phone.includes('@') ? phone : `${phone}@s.whatsapp.net`; + let sentMsg; if (audioBase64) { const buffer = Buffer.from(audioBase64, 'base64'); - return await sock.sendMessage(jid, { + sentMsg = await sock.sendMessage(jid, { audio: buffer, mimetype: 'audio/mp4', ptt: true }); - } - - if (mediaUrl) { + } else if (mediaUrl) { const ext = mediaUrl.split('.').pop().toLowerCase(); if (['jpg', 'jpeg', 'png', 'webp'].includes(ext)) { - return await sock.sendMessage(jid, { image: { url: mediaUrl }, caption: message }); + sentMsg = await sock.sendMessage(jid, { image: { url: mediaUrl }, caption: message }); } else if (['mp4', 'mkv', 'avi'].includes(ext)) { - return await sock.sendMessage(jid, { video: { url: mediaUrl }, caption: message }); + sentMsg = await sock.sendMessage(jid, { video: { url: mediaUrl }, caption: message }); } else { - return await sock.sendMessage(jid, { document: { url: mediaUrl }, caption: message, fileName: mediaUrl.split('/').pop() }); + sentMsg = await sock.sendMessage(jid, { document: { url: mediaUrl }, caption: message, fileName: mediaUrl.split('/').pop() }); + } + } else { + sentMsg = await sock.sendMessage(jid, { text: message }); + } + + // Cache outbound messages for E2EE decryption retries + if (sentMsg && sentMsg.key && sentMsg.key.id && sentMsg.message) { + recentMessages.set(sentMsg.key.id, sentMsg.message); + if (recentMessages.size > 2000) { + const firstKey = recentMessages.keys().next().value; + recentMessages.delete(firstKey); } } - return await sock.sendMessage(jid, { text: message }); + return sentMsg; } function getActiveSessions() {