Fix E2EE decryption issue and duplicate webhook insertion
This commit is contained in:
@@ -202,19 +202,40 @@ class WhatsAppController extends BaseController
|
|||||||
|
|
||||||
$msgData = $body['message'];
|
$msgData = $body['message'];
|
||||||
|
|
||||||
|
// 0. Check if this message has already been processed (deduplication)
|
||||||
|
if (!empty($msgData['id'])) {
|
||||||
|
$alreadyLogged = \App\Core\Database::selectOne(
|
||||||
|
"SELECT id FROM messages_log WHERE whatsapp_message_id = ? LIMIT 1",
|
||||||
|
[$msgData['id']]
|
||||||
|
);
|
||||||
|
if ($alreadyLogged) {
|
||||||
|
// Message already processed, return 200 immediately to prevent duplicate replies & DB errors
|
||||||
|
$response->status(200)->json([
|
||||||
|
'status' => 'success',
|
||||||
|
'message' => 'Message already processed (duplicate detected)'
|
||||||
|
]);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 1. Find or create the contact in the CRM
|
// 1. Find or create the contact in the CRM
|
||||||
$contact = \App\Models\Contact::findByPhone($session['company_id'], $msgData['phone']);
|
$contact = \App\Models\Contact::findByPhone($session['company_id'], $msgData['phone']);
|
||||||
if (!$contact) {
|
if (!$contact) {
|
||||||
|
|
||||||
|
|
||||||
// Determine a fallback name
|
// Determine a fallback name
|
||||||
$contactName = !empty($msgData['name']) ? $msgData['name'] : 'WA-' . substr($msgData['phone'], -4);
|
$contactName = !empty($msgData['name']) ? $msgData['name'] : 'WA-' . substr($msgData['phone'], -4);
|
||||||
|
try {
|
||||||
\App\Models\Contact::createSecure([
|
\App\Models\Contact::createSecure([
|
||||||
'company_id' => $session['company_id'],
|
'company_id' => $session['company_id'],
|
||||||
'name' => $contactName,
|
'name' => $contactName,
|
||||||
'phone' => $msgData['phone'],
|
'phone' => $msgData['phone'],
|
||||||
'notes' => 'Auto-created via incoming WhatsApp message'
|
'notes' => 'Auto-created via incoming WhatsApp message'
|
||||||
]);
|
]);
|
||||||
|
} catch (\PDOException $e) {
|
||||||
|
// Ignore duplicate contact error if another thread created it concurrently
|
||||||
|
if ($e->getCode() !== '23000' && strpos($e->getMessage(), '1062') === false) {
|
||||||
|
throw $e;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Log the incoming message in history log
|
// 2. Log the incoming message in history log
|
||||||
|
|||||||
@@ -30,6 +30,25 @@ class MessageLog extends BaseModel
|
|||||||
$data['media_url'] = Security::encrypt($data['media_url']);
|
$data['media_url'] = Security::encrypt($data['media_url']);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
return self::create($data);
|
return self::create($data);
|
||||||
|
} catch (\PDOException $e) {
|
||||||
|
// Handle duplicate entry gracefully
|
||||||
|
if ($e->getCode() === '23000' || strpos($e->getMessage(), '1062') !== false) {
|
||||||
|
error_log("[MessageLog] Duplicate whatsapp_message_id: " . ($data['whatsapp_message_id'] ?? 'unknown'));
|
||||||
|
// Retrieve and return existing log record
|
||||||
|
if (!empty($data['whatsapp_message_id'])) {
|
||||||
|
$existing = \App\Core\Database::select(
|
||||||
|
"SELECT * FROM " . static::$table . " WHERE whatsapp_message_id = ? LIMIT 1",
|
||||||
|
[$data['whatsapp_message_id']]
|
||||||
|
);
|
||||||
|
if (!empty($existing)) {
|
||||||
|
return $existing[0];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
throw $e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ const path = require('path');
|
|||||||
|
|
||||||
const sessions = new Map(); // Store active sockets in memory
|
const sessions = new Map(); // Store active sockets in memory
|
||||||
const retryCounters = new Map(); // Track reconnection attempts per session
|
const retryCounters = new Map(); // Track reconnection attempts per session
|
||||||
|
const recentMessages = new Map(); // Cache of recent messages in memory to serve getMessage callback
|
||||||
|
|
||||||
const MAX_RETRIES = 5; // Maximum reconnection attempts before giving up
|
const MAX_RETRIES = 5; // Maximum reconnection attempts before giving up
|
||||||
|
|
||||||
@@ -63,7 +64,15 @@ async function startSession(session_key, webhook_url) {
|
|||||||
auth: state,
|
auth: state,
|
||||||
printQRInTerminal: false,
|
printQRInTerminal: false,
|
||||||
logger: pino({ level: 'silent' }),
|
logger: pino({ level: 'silent' }),
|
||||||
browser: ['Nabeh Gateway', 'Chrome', '120.0.0']
|
browser: ['Nabeh Gateway', 'Chrome', '120.0.0'],
|
||||||
|
getMessage: async (key) => {
|
||||||
|
if (recentMessages.has(key.id)) {
|
||||||
|
return recentMessages.get(key.id);
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
conversation: ' ' // Fallback message content to trigger E2EE decryption retry on companion devices
|
||||||
|
};
|
||||||
|
}
|
||||||
};
|
};
|
||||||
if (version) socketConfig.version = version;
|
if (version) socketConfig.version = version;
|
||||||
|
|
||||||
@@ -81,6 +90,17 @@ async function startSession(session_key, webhook_url) {
|
|||||||
console.log(`[Upsert] First message keys:`, Object.keys(m.messages[0]));
|
console.log(`[Upsert] First message keys:`, Object.keys(m.messages[0]));
|
||||||
console.log(`[Upsert] First message key:`, JSON.stringify(m.messages[0].key));
|
console.log(`[Upsert] First message key:`, JSON.stringify(m.messages[0].key));
|
||||||
console.log(`[Upsert] First message structure:`, JSON.stringify(m.messages[0].message));
|
console.log(`[Upsert] First message structure:`, JSON.stringify(m.messages[0].message));
|
||||||
|
|
||||||
|
// Cache all incoming messages to serve E2EE retries
|
||||||
|
for (const msg of m.messages) {
|
||||||
|
if (msg.key && msg.key.id && msg.message) {
|
||||||
|
recentMessages.set(msg.key.id, msg.message);
|
||||||
|
if (recentMessages.size > 2000) {
|
||||||
|
const firstKey = recentMessages.keys().next().value;
|
||||||
|
recentMessages.delete(firstKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (m.type !== 'notify') return;
|
if (m.type !== 'notify') return;
|
||||||
@@ -296,28 +316,38 @@ async function sendMessage(session_key, phone, message, mediaUrl = null, audioBa
|
|||||||
}
|
}
|
||||||
|
|
||||||
let jid = phone.includes('@') ? phone : `${phone}@s.whatsapp.net`;
|
let jid = phone.includes('@') ? phone : `${phone}@s.whatsapp.net`;
|
||||||
|
let sentMsg;
|
||||||
|
|
||||||
if (audioBase64) {
|
if (audioBase64) {
|
||||||
const buffer = Buffer.from(audioBase64, 'base64');
|
const buffer = Buffer.from(audioBase64, 'base64');
|
||||||
return await sock.sendMessage(jid, {
|
sentMsg = await sock.sendMessage(jid, {
|
||||||
audio: buffer,
|
audio: buffer,
|
||||||
mimetype: 'audio/mp4',
|
mimetype: 'audio/mp4',
|
||||||
ptt: true
|
ptt: true
|
||||||
});
|
});
|
||||||
}
|
} else if (mediaUrl) {
|
||||||
|
|
||||||
if (mediaUrl) {
|
|
||||||
const ext = mediaUrl.split('.').pop().toLowerCase();
|
const ext = mediaUrl.split('.').pop().toLowerCase();
|
||||||
if (['jpg', 'jpeg', 'png', 'webp'].includes(ext)) {
|
if (['jpg', 'jpeg', 'png', 'webp'].includes(ext)) {
|
||||||
return await sock.sendMessage(jid, { image: { url: mediaUrl }, caption: message });
|
sentMsg = await sock.sendMessage(jid, { image: { url: mediaUrl }, caption: message });
|
||||||
} else if (['mp4', 'mkv', 'avi'].includes(ext)) {
|
} else if (['mp4', 'mkv', 'avi'].includes(ext)) {
|
||||||
return await sock.sendMessage(jid, { video: { url: mediaUrl }, caption: message });
|
sentMsg = await sock.sendMessage(jid, { video: { url: mediaUrl }, caption: message });
|
||||||
} else {
|
} else {
|
||||||
return await sock.sendMessage(jid, { document: { url: mediaUrl }, caption: message, fileName: mediaUrl.split('/').pop() });
|
sentMsg = await sock.sendMessage(jid, { document: { url: mediaUrl }, caption: message, fileName: mediaUrl.split('/').pop() });
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sentMsg = await sock.sendMessage(jid, { text: message });
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cache outbound messages for E2EE decryption retries
|
||||||
|
if (sentMsg && sentMsg.key && sentMsg.key.id && sentMsg.message) {
|
||||||
|
recentMessages.set(sentMsg.key.id, sentMsg.message);
|
||||||
|
if (recentMessages.size > 2000) {
|
||||||
|
const firstKey = recentMessages.keys().next().value;
|
||||||
|
recentMessages.delete(firstKey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return await sock.sendMessage(jid, { text: message });
|
return sentMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
function getActiveSessions() {
|
function getActiveSessions() {
|
||||||
|
|||||||
Reference in New Issue
Block a user