const baileys = require('@whiskeysockets/baileys'); const makeWASocket = baileys.default || baileys.makeWASocket || baileys; const { useMultiFileAuthState, DisconnectReason, fetchLatestBaileysVersion, downloadMediaMessage, makeCacheableSignalKeyStore } = baileys; const pino = require('pino'); const NodeCache = require('node-cache'); const axios = require('axios'); const fs = require('fs'); const path = require('path'); class InMemoryStore { constructor() { this.chats = { dict: {}, all() { return Object.values(this.dict); }, get(id) { return this.dict[id]; }, set(id, chat) { this.dict[id] = chat; }, delete(id) { delete this.dict[id]; } }; this.messages = new Proxy({}, { get: (target, jid) => { if (typeof jid === 'symbol') return target[jid]; return this.getMessages(jid); } }); this.messagesData = {}; this.contacts = {}; } getMessages(jid) { if (!this.messagesData[jid]) { this.messagesData[jid] = { dict: {}, all() { return Object.values(this.dict).sort((a, b) => { const tA = Number(a.messageTimestamp || 0); const tB = Number(b.messageTimestamp || 0); return tA - tB; }); }, get(id) { return this.dict[id]; }, set(id, msg) { this.dict[id] = msg; }, delete(id) { delete this.dict[id]; } }; } return this.messagesData[jid]; } bind(ev) { ev.on('messaging-history.set', ({ chats, messages, contacts }) => { console.log(`[Store] Synced history: chats=${chats?.length || 0}, messages=${messages?.length || 0}, contacts=${contacts?.length || 0}`); if (chats) { for (const chat of chats) { this.chats.set(chat.id, { ...(this.chats.get(chat.id) || {}), ...chat }); } } if (contacts) { for (const contact of contacts) { this.contacts[contact.id] = { ...(this.contacts[contact.id] || {}), ...contact }; const chat = this.chats.get(contact.id); if (chat) { chat.name = contact.name || contact.verifiedName || contact.notify; } } } if (messages) { for (const msg of messages) { const jid = msg.key.remoteJid; if (jid) { this.getMessages(jid).set(msg.key.id, msg); } } } }); ev.on('chats.upsert', (newChats) => { for (const chat of newChats) { this.chats.set(chat.id, { ...(this.chats.get(chat.id) || {}), ...chat }); } }); ev.on('chats.update', (updates) => { for (const update of updates) { const current = this.chats.get(update.id); if (current) { this.chats.set(update.id, { ...current, ...update }); } } }); ev.on('chats.delete', (deletions) => { for (const id of deletions) { this.chats.delete(id); } }); ev.on('messages.upsert', ({ messages, type }) => { if (type === 'notify' || type === 'append') { for (const msg of messages) { const jid = msg.key.remoteJid; if (jid) { this.getMessages(jid).set(msg.key.id, msg); } } } }); ev.on('messages.update', (updates) => { for (const update of updates) { const jid = update.key.remoteJid; if (jid) { const storeMsgs = this.getMessages(jid); const current = storeMsgs.get(update.key.id); if (current) { storeMsgs.set(update.key.id, { ...current, ...update }); } } } }); ev.on('contacts.upsert', (newContacts) => { for (const contact of newContacts) { this.contacts[contact.id] = { ...(this.contacts[contact.id] || {}), ...contact }; } }); ev.on('contacts.update', (updates) => { for (const update of updates) { const id = update.id; if (id && this.contacts[id]) { this.contacts[id] = { ...this.contacts[id], ...update }; } } }); } readFromFile(filePath) { if (fs.existsSync(filePath)) { try { const data = JSON.parse(fs.readFileSync(filePath, 'utf8')); this.chats.dict = data.chats || {}; this.contacts = data.contacts || {}; if (data.messages) { for (const jid in data.messages) { const msgs = data.messages[jid]; const storeMsgs = this.getMessages(jid); storeMsgs.dict = msgs || {}; } } } catch (e) { console.error(`[Store] Failed to read store file:`, e.message); } } } writeToFile(filePath) { try { const data = { chats: this.chats.dict, contacts: this.contacts, messages: {} }; for (const jid in this.messagesData) { data.messages[jid] = this.messagesData[jid].dict; } fs.writeFileSync(filePath, JSON.stringify(data, null, 2), 'utf8'); } catch (e) { console.error(`[Store] Failed to write store file:`, e.message); } } } function makeInMemoryStore() { return new InMemoryStore(); } const sessions = new Map(); // Store active sockets in memory const retryCounters = new Map(); // Track reconnection attempts per session const recentMessages = new Map(); // Cache of recent messages in memory to serve getMessage callback const phoneToLid = new Map(); // Map phone numbers to LID JIDs for correct E2EE routing const sessionStores = new Map(); // Store active stores in memory const storeIntervals = new Map(); // Store save intervals in memory // Global retry counter cache — persists across socket reconnects // This is CRITICAL for E2EE: tracks message retry attempts so Baileys can // re-encrypt messages when a recipient's device requests a retry const msgRetryCounterCache = new NodeCache({ stdTTL: 600, checkperiod: 120 }); const MAX_RETRIES = 5; // Maximum reconnection attempts before giving up // Local folder for saving auth keys const SESSIONS_DIR = path.join(__dirname, 'sessions'); if (!fs.existsSync(SESSIONS_DIR)) { fs.mkdirSync(SESSIONS_DIR, { recursive: true }); } async function sendWebhook(webhook_url, payload) { try { console.log(`[Webhook] Sending to ${webhook_url} | state=${payload.state}`); const response = await axios.post(webhook_url, payload, { headers: { 'Content-Type': 'application/json', 'X-Webhook-Secret': process.env.WEBHOOK_SECRET || '' }, timeout: 10000 }); console.log(`[Webhook] ✅ Success | HTTP ${response.status}`); } catch (err) { if (err.response) { console.error(`[Webhook] ❌ HTTP ${err.response.status} | ${JSON.stringify(err.response.data)}`); } else { console.error(`[Webhook] ❌ Network Error: ${err.message}`); } } } async function startSession(session_key, webhook_url) { // Return existing socket if it's already active if (sessions.has(session_key)) { console.log(`[Session] ${session_key} already active, reusing`); return sessions.get(session_key); } console.log(`[Session] Starting ${session_key} → webhook: ${webhook_url}`); const sessionFolder = path.join(SESSIONS_DIR, session_key); const { state, saveCreds } = await useMultiFileAuthState(sessionFolder); // Initialize InMemoryStore for chat history const store = makeInMemoryStore({ logger: pino({ level: 'silent' }) }); const storeFile = path.join(SESSIONS_DIR, `${session_key}_store.json`); if (fs.existsSync(storeFile)) { try { store.readFromFile(storeFile); console.log(`[Store] Loaded store from file for ${session_key}`); } catch (e) { console.error(`[Store] Failed to load store file for ${session_key}:`, e.message); } } // Periodically save store to file every 10 seconds const storeInterval = setInterval(() => { try { store.writeToFile(storeFile); } catch (e) { console.error(`[Store] Failed to write store file for ${session_key}:`, e.message); } }, 10000); storeIntervals.set(session_key, storeInterval); // Fetch the latest WhatsApp Web version to avoid 405 rejection let version; try { const versionInfo = await fetchLatestBaileysVersion(); version = versionInfo.version; console.log(`[Baileys] Using WA version: ${version}`); } catch (e) { console.warn(`[Baileys] Could not fetch version, using default`); } const logger = pino({ level: 'error' }); const socketConfig = { auth: { creds: state.creds, // Wrap keys with makeCacheableSignalKeyStore for fast in-memory // Signal key access — this prevents E2EE key lookup failures // that cause "Waiting for this message" on recipient devices keys: makeCacheableSignalKeyStore(state.keys, logger), }, printQRInTerminal: false, logger: logger, browser: ['Nabeh Gateway', 'Chrome', '120.0.0'], syncFullHistory: false, // Disabled to prevent connection timeouts // Message retry counter cache — tracks how many times each message // retry has been attempted, preventing infinite retry loops msgRetryCounterCache, getMessage: async (key) => { if (recentMessages.has(key.id)) { return recentMessages.get(key.id); } return undefined; } }; if (version) socketConfig.version = version; const sock = makeWASocket(socketConfig); console.log(`[Session] Socket created for ${session_key}`); store.bind(sock.ev); sessionStores.set(session_key, store); sessions.set(session_key, sock); sock.ev.on('creds.update', saveCreds); // Listen for incoming messages sock.ev.on('messages.upsert', async (m) => { console.log(`[Upsert] Event received type=${m.type} messagesCount=${m.messages?.length}`); if (m.messages && m.messages.length > 0) { console.log(`[Upsert] First message keys:`, Object.keys(m.messages[0])); console.log(`[Upsert] First message key:`, JSON.stringify(m.messages[0].key)); console.log(`[Upsert] First message structure:`, JSON.stringify(m.messages[0].message)); // Cache all incoming messages to serve E2EE retries for (const msg of m.messages) { if (msg.key && msg.key.id && msg.message) { recentMessages.set(msg.key.id, msg.message); if (recentMessages.size > 2000) { const firstKey = recentMessages.keys().next().value; recentMessages.delete(firstKey); } } } } if (m.type !== 'notify') return; for (const msg of m.messages) { // Ignore messages sent by ourselves if (msg.key.fromMe) continue; const remoteJid = msg.key.remoteJid; if (!remoteJid) continue; // Only process individual chats (ignore groups and broadcasts) const isGroup = remoteJid.endsWith('@g.us'); const isBroadcast = remoteJid.endsWith('@broadcast'); if (isGroup || isBroadcast) continue; // Extract text body const body = msg.message?.conversation || msg.message?.extendedTextMessage?.text || msg.message?.imageMessage?.caption || msg.message?.videoMessage?.caption || ''; const isAudio = !!msg.message?.audioMessage; const isImage = !!msg.message?.imageMessage; // Only process messages that have text content OR are audio/image messages if (!body && !isAudio && !isImage) continue; let audioBase64 = null; let audioMimeType = null; let imageBase64 = null; let imageMimeType = null; if (isAudio) { try { console.log(`[Baileys] Downloading audio message for ${remoteJid}`); const buffer = await downloadMediaMessage( msg, 'buffer', {}, { logger: pino({ level: 'silent' }), rekey: true } ); audioBase64 = buffer.toString('base64'); audioMimeType = msg.message.audioMessage.mimetype || 'audio/ogg'; } catch (e) { console.error('[Baileys] Failed to download audio message:', e.message); continue; // Skip if audio download fails to prevent empty processing } } else if (isImage) { try { console.log(`[Baileys] Downloading image message for ${remoteJid}`); const buffer = await downloadMediaMessage( msg, 'buffer', {}, { logger: pino({ level: 'silent' }), rekey: true } ); imageBase64 = buffer.toString('base64'); imageMimeType = msg.message.imageMessage.mimetype || 'image/jpeg'; } catch (e) { console.error('[Baileys] Failed to download image message:', e.message); continue; // Skip if image download fails } } // Extract sender phone number (handle LID privacy scheme) let senderPhone = ''; if (msg.key.senderPn && msg.key.senderPn.endsWith('@s.whatsapp.net')) { senderPhone = msg.key.senderPn.split('@')[0]; // CRITICAL: Map this phone number to the LID JID for reply routing. // Signal E2EE sessions are bound to the LID, so replies MUST go // to the LID JID, not the phone JID, to prevent session conflicts. if (remoteJid.endsWith('@lid')) { phoneToLid.set(senderPhone, remoteJid); console.log(`[LID] Mapped ${senderPhone} → ${remoteJid}`); } } else if (remoteJid.endsWith('@s.whatsapp.net')) { senderPhone = remoteJid.split('@')[0]; } else if (remoteJid.endsWith('@lid')) { senderPhone = remoteJid.split('@')[0]; } if (!senderPhone) continue; const senderName = msg.pushName || ''; if (isAudio) { console.log(`[Message] Received audio voice note from ${senderPhone} (JID: ${remoteJid})`); } else if (isImage) { console.log(`[Message] Received image from ${senderPhone} (JID: ${remoteJid})`); } else { console.log(`[Message] Received from ${senderPhone} (JID: ${remoteJid}): ${body}`); } await sendWebhook(webhook_url, { session_key, state: 'message_received', message: { id: msg.key.id, phone: senderPhone, name: senderName, body: body, audio: audioBase64, mimeType: audioMimeType, duration: msg.message?.audioMessage?.seconds ? Number(msg.message.audioMessage.seconds) : null, image: imageBase64, imageMimeType: imageMimeType, timestamp: msg.messageTimestamp } }); } }); sock.ev.on('connection.update', async (update) => { const { connection, lastDisconnect, qr } = update; if (qr) { console.log(`[QR] Generated for ${session_key}`); // Reset retry counter on QR generation (session is alive) retryCounters.set(session_key, 0); await sendWebhook(webhook_url, { session_key, state: 'waiting_qr', qr_code: qr }); } if (connection === 'close') { const statusCode = lastDisconnect?.error?.output?.statusCode; const shouldReconnect = statusCode !== DisconnectReason.loggedOut; if (!shouldReconnect) { console.log(`[Connection] ${session_key} permanently closed (logged out). Cleaning up.`); sessions.delete(session_key); await cleanupSession(session_key); await sendWebhook(webhook_url, { session_key, state: 'disconnected' }); } else { const retries = (retryCounters.get(session_key) || 0) + 1; retryCounters.set(session_key, retries); sessions.delete(session_key); let delay; if (retries <= MAX_RETRIES) { delay = Math.min(retries * 3000, 15000); // 3s, 6s, 9s, 12s, 15s console.log(`[Connection] Reconnecting ${session_key} (quick retry ${retries}/${MAX_RETRIES}) in ${delay}ms...`); } else { delay = 60000; // 60s console.log(`[Connection] Reconnecting ${session_key} (long-term retry ${retries}) in 60s...`); } setTimeout(() => startSession(session_key, webhook_url), delay); } } else if (connection === 'open') { console.log(`[Connection] ✅ ${session_key} connected successfully!`); retryCounters.set(session_key, 0); // Reset on successful connection // Parse phone number from the JID (e.g. 9665XXXXXXX@s.whatsapp.net) const phone = sock.user.id.split(':')[0]; await sendWebhook(webhook_url, { session_key, state: 'connected', phone: phone }); } }); return sock; } /** * Cleanup session: remove from memory and delete auth files */ async function cleanupSession(session_key) { const interval = storeIntervals.get(session_key); if (interval) { clearInterval(interval); storeIntervals.delete(session_key); } sessionStores.delete(session_key); const sock = sessions.get(session_key); if (sock) { try { sock.end(); } catch (e) { } // Gracefully close socket without logout sessions.delete(session_key); } retryCounters.delete(session_key); // Wipe the auth directory so a fresh session can be created next time const sessionFolder = path.join(SESSIONS_DIR, session_key); if (fs.existsSync(sessionFolder)) { fs.rmSync(sessionFolder, { recursive: true, force: true }); console.log(`[Cleanup] Deleted session folder for ${session_key}`); } } /** * Disconnect session: logout from WhatsApp and cleanup */ async function disconnectSession(session_key) { const interval = storeIntervals.get(session_key); if (interval) { clearInterval(interval); storeIntervals.delete(session_key); } sessionStores.delete(session_key); const storeFile = path.join(SESSIONS_DIR, `${session_key}_store.json`); if (fs.existsSync(storeFile)) { try { fs.unlinkSync(storeFile); } catch (e) { } } const sock = sessions.get(session_key); if (sock) { try { sock.logout(); } catch (e) { } // best effort logout sessions.delete(session_key); } retryCounters.delete(session_key); // Completely wipe the auth directory const sessionFolder = path.join(SESSIONS_DIR, session_key); if (fs.existsSync(sessionFolder)) { fs.rmSync(sessionFolder, { recursive: true, force: true }); console.log(`[Disconnect] Deleted session folder for ${session_key}`); } } const { exec } = require('child_process'); const os = require('os'); function convertToOggOpus(base64Audio) { return new Promise((resolve, reject) => { const timeId = Date.now() + Math.random().toString(36).substring(7); const inputPath = path.join(os.tmpdir(), `input_${timeId}.tmp`); const outputPath = path.join(os.tmpdir(), `output_${timeId}.ogg`); fs.writeFile(inputPath, Buffer.from(base64Audio, 'base64'), (err) => { if (err) return reject(err); exec(`ffmpeg -i ${inputPath} -c:a libopus -y ${outputPath}`, (execErr) => { fs.unlink(inputPath, () => { }); if (execErr) return reject(execErr); fs.readFile(outputPath, (readErr, data) => { fs.unlink(outputPath, () => { }); if (readErr) return reject(readErr); resolve(data.toString('base64')); }); }); }); }); } /** * Send a message using an active session */ async function sendMessage(session_key, phone, message, mediaUrl = null, audioBase64 = null, mimetype = null, imageBase64 = null) { const sock = sessions.get(session_key); if (!sock) { throw new Error(`Session ${session_key} is not active or connected`); } // Use the LID JID if we have a mapping for this phone number. // This is CRITICAL: Signal E2EE sessions are bound to the LID JID, // so replying to the phone JID creates a separate session that // conflicts with the LID session, causing "Waiting for this message". let jid; if (phone.includes('@')) { jid = phone; } else if (phoneToLid.has(phone)) { jid = phoneToLid.get(phone); console.log(`[LID] Routing reply to ${phone} via LID: ${jid}`); } else { jid = `${phone}@s.whatsapp.net`; } let sentMsg; if (imageBase64) { const buffer = Buffer.from(imageBase64, 'base64'); sentMsg = await sock.sendMessage(jid, { image: buffer, caption: message || '' }); } else if (audioBase64) { let finalAudioBase64 = audioBase64; let finalMime = mimetype || 'audio/mp4'; // If it's MP3, convert to OGG Opus to ensure iPhone PTT compatibility if (finalMime.includes('mpeg') || finalMime.includes('mp3')) { try { console.log(`[Baileys] Converting MP3 to OGG Opus for native PTT...`); finalAudioBase64 = await convertToOggOpus(audioBase64); finalMime = 'audio/ogg; codecs=opus'; } catch (err) { console.error(`[Baileys] FFmpeg conversion failed:`, err.message); // Fallback to sending as normal audio if conversion fails finalMime = 'audio/mp4'; } } const buffer = Buffer.from(finalAudioBase64, 'base64'); const isMp3 = finalMime.includes('mpeg') || finalMime.includes('mp3'); sentMsg = await sock.sendMessage(jid, { audio: buffer, mimetype: finalMime, ptt: !isMp3 // PTT enabled for OGG/MP4, disabled for raw MP3 }); } else if (mediaUrl) { const ext = mediaUrl.split('.').pop().toLowerCase(); if (['jpg', 'jpeg', 'png', 'webp'].includes(ext)) { sentMsg = await sock.sendMessage(jid, { image: { url: mediaUrl }, caption: message }); } else if (['mp4', 'mkv', 'avi'].includes(ext)) { sentMsg = await sock.sendMessage(jid, { video: { url: mediaUrl }, caption: message }); } else { sentMsg = await sock.sendMessage(jid, { document: { url: mediaUrl }, caption: message, fileName: mediaUrl.split('/').pop() }); } } else { sentMsg = await sock.sendMessage(jid, { text: message }); } // Cache outbound messages for E2EE decryption retries if (sentMsg && sentMsg.key && sentMsg.key.id && sentMsg.message) { recentMessages.set(sentMsg.key.id, sentMsg.message); if (recentMessages.size > 2000) { const firstKey = recentMessages.keys().next().value; recentMessages.delete(firstKey); } } return sentMsg; } async function checkContact(session_key, phone) { const sock = sessions.get(session_key); if (!sock) { throw new Error(`Session ${session_key} is not active or connected`); } const jid = phone.includes('@') ? phone : `${phone}@s.whatsapp.net`; try { const result = await sock.onWhatsApp(jid); if (result && result.length > 0) { return result[0]; } return { exists: false, jid }; } catch (err) { console.error(`[Baileys] Error checking contact ${jid}:`, err.message); throw err; } } async function exportChatHistory(session_key) { let store = sessionStores.get(session_key); if (!store) { const storeFile = path.join(SESSIONS_DIR, `${session_key}_store.json`); if (fs.existsSync(storeFile)) { store = makeInMemoryStore(); store.readFromFile(storeFile); console.log(`[Store] Loaded store from file for ${session_key} (Export)`); } else { throw new Error(`No store found for session ${session_key}`); } } const chats = store.chats.all(); let outputText = `==================================================\n`; outputText += `سجل محادثات منصة نبيه - جلسة: ${session_key}\n`; outputText += `تاريخ التصدير: ${new Date().toLocaleString('ar-EG')}\n`; outputText += `==================================================\n\n`; for (const chat of chats) { const jid = chat.id; const phone = jid.split('@')[0]; const name = chat.name || 'عميل غير مسمى'; // Skip groups or broadcasts if (jid.endsWith('@g.us') || jid.endsWith('@broadcast')) { continue; } const messages = store.messages[jid]?.all() || []; if (messages.length === 0) continue; outputText += `--------------------------------------------------\n`; outputText += `المحادثة مع: ${name} (${phone})\n`; outputText += `--------------------------------------------------\n`; for (const msg of messages) { const fromMe = msg.key.fromMe; const sender = fromMe ? 'المنصة (نبيه)' : name; const body = msg.message?.conversation || msg.message?.extendedTextMessage?.text || msg.message?.imageMessage?.caption || msg.message?.videoMessage?.caption || ''; let dateStr = 'تاريخ غير معروف'; if (msg.messageTimestamp) { const timestamp = Number(msg.messageTimestamp) * 1000; dateStr = new Date(timestamp).toLocaleString('ar-EG'); } if (body) { outputText += `[${dateStr}] ${sender}: ${body}\n`; } else if (msg.message?.audioMessage) { outputText += `[${dateStr}] ${sender}: [رسالة صوتية]\n`; } else if (msg.message?.imageMessage) { outputText += `[${dateStr}] ${sender}: [صورة]\n`; } else { outputText += `[${dateStr}] ${sender}: [رسالة غير معروفة]\n`; } } outputText += `\n\n`; } const publicDir = path.join(__dirname, '..', 'backend', 'public'); if (!fs.existsSync(publicDir)) { fs.mkdirSync(publicDir, { recursive: true }); } const filePath = path.join(publicDir, 'whatsapp_chats_history.txt'); fs.writeFileSync(filePath, '\uFEFF' + outputText, 'utf8'); return filePath; } function getActiveSessions() { return Array.from(sessions.keys()); } module.exports = { startSession, disconnectSession, sendMessage, getActiveSessions, checkContact, exportChatHistory };