Files
nabeh/whatsapp-gateway/baileys-client.js
2026-05-23 22:54:25 +03:00

583 lines
23 KiB
JavaScript

const baileys = require('@whiskeysockets/baileys');
const makeWASocket = baileys.default || baileys.makeWASocket || baileys;
const { useMultiFileAuthState, DisconnectReason, fetchLatestBaileysVersion, downloadMediaMessage, makeCacheableSignalKeyStore, makeInMemoryStore } = baileys;
const pino = require('pino');
const NodeCache = require('node-cache');
const axios = require('axios');
const fs = require('fs');
const path = require('path');
const sessions = new Map(); // Store active sockets in memory
const retryCounters = new Map(); // Track reconnection attempts per session
const recentMessages = new Map(); // Cache of recent messages in memory to serve getMessage callback
const phoneToLid = new Map(); // Map phone numbers to LID JIDs for correct E2EE routing
const sessionStores = new Map(); // Store active stores in memory
const storeIntervals = new Map(); // Store save intervals in memory
// Global retry counter cache — persists across socket reconnects
// This is CRITICAL for E2EE: tracks message retry attempts so Baileys can
// re-encrypt messages when a recipient's device requests a retry
const msgRetryCounterCache = new NodeCache({ stdTTL: 600, checkperiod: 120 });
const MAX_RETRIES = 5; // Maximum reconnection attempts before giving up
// Local folder for saving auth keys
const SESSIONS_DIR = path.join(__dirname, 'sessions');
if (!fs.existsSync(SESSIONS_DIR)) {
fs.mkdirSync(SESSIONS_DIR, { recursive: true });
}
async function sendWebhook(webhook_url, payload) {
try {
console.log(`[Webhook] Sending to ${webhook_url} | state=${payload.state}`);
const response = await axios.post(webhook_url, payload, {
headers: {
'Content-Type': 'application/json',
'X-Webhook-Secret': process.env.WEBHOOK_SECRET || ''
},
timeout: 10000
});
console.log(`[Webhook] ✅ Success | HTTP ${response.status}`);
} catch (err) {
if (err.response) {
console.error(`[Webhook] ❌ HTTP ${err.response.status} | ${JSON.stringify(err.response.data)}`);
} else {
console.error(`[Webhook] ❌ Network Error: ${err.message}`);
}
}
}
async function startSession(session_key, webhook_url) {
// Return existing socket if it's already active
if (sessions.has(session_key)) {
console.log(`[Session] ${session_key} already active, reusing`);
return sessions.get(session_key);
}
console.log(`[Session] Starting ${session_key} → webhook: ${webhook_url}`);
const sessionFolder = path.join(SESSIONS_DIR, session_key);
const { state, saveCreds } = await useMultiFileAuthState(sessionFolder);
// Initialize InMemoryStore for chat history
const store = makeInMemoryStore({ logger: pino({ level: 'silent' }) });
const storeFile = path.join(SESSIONS_DIR, `${session_key}_store.json`);
if (fs.existsSync(storeFile)) {
try {
store.readFromFile(storeFile);
console.log(`[Store] Loaded store from file for ${session_key}`);
} catch (e) {
console.error(`[Store] Failed to load store file for ${session_key}:`, e.message);
}
}
// Periodically save store to file every 10 seconds
const storeInterval = setInterval(() => {
try {
store.writeToFile(storeFile);
} catch (e) {
console.error(`[Store] Failed to write store file for ${session_key}:`, e.message);
}
}, 10000);
storeIntervals.set(session_key, storeInterval);
// Fetch the latest WhatsApp Web version to avoid 405 rejection
let version;
try {
const versionInfo = await fetchLatestBaileysVersion();
version = versionInfo.version;
console.log(`[Baileys] Using WA version: ${version}`);
} catch (e) {
console.warn(`[Baileys] Could not fetch version, using default`);
}
const logger = pino({ level: 'silent' });
const socketConfig = {
auth: {
creds: state.creds,
// Wrap keys with makeCacheableSignalKeyStore for fast in-memory
// Signal key access — this prevents E2EE key lookup failures
// that cause "Waiting for this message" on recipient devices
keys: makeCacheableSignalKeyStore(state.keys, logger),
},
printQRInTerminal: false,
logger: logger,
browser: ['Nabeh Gateway', 'Chrome', '120.0.0'],
// Message retry counter cache — tracks how many times each message
// retry has been attempted, preventing infinite retry loops
msgRetryCounterCache,
getMessage: async (key) => {
if (recentMessages.has(key.id)) {
return recentMessages.get(key.id);
}
return undefined;
}
};
if (version) socketConfig.version = version;
const sock = makeWASocket(socketConfig);
console.log(`[Session] Socket created for ${session_key}`);
store.bind(sock.ev);
sessionStores.set(session_key, store);
sessions.set(session_key, sock);
sock.ev.on('creds.update', saveCreds);
// Listen for incoming messages
sock.ev.on('messages.upsert', async (m) => {
console.log(`[Upsert] Event received type=${m.type} messagesCount=${m.messages?.length}`);
if (m.messages && m.messages.length > 0) {
console.log(`[Upsert] First message keys:`, Object.keys(m.messages[0]));
console.log(`[Upsert] First message key:`, JSON.stringify(m.messages[0].key));
console.log(`[Upsert] First message structure:`, JSON.stringify(m.messages[0].message));
// Cache all incoming messages to serve E2EE retries
for (const msg of m.messages) {
if (msg.key && msg.key.id && msg.message) {
recentMessages.set(msg.key.id, msg.message);
if (recentMessages.size > 2000) {
const firstKey = recentMessages.keys().next().value;
recentMessages.delete(firstKey);
}
}
}
}
if (m.type !== 'notify') return;
for (const msg of m.messages) {
// Ignore messages sent by ourselves
if (msg.key.fromMe) continue;
const remoteJid = msg.key.remoteJid;
if (!remoteJid) continue;
// Only process individual chats (ignore groups and broadcasts)
const isGroup = remoteJid.endsWith('@g.us');
const isBroadcast = remoteJid.endsWith('@broadcast');
if (isGroup || isBroadcast) continue;
// Extract text body
const body = msg.message?.conversation ||
msg.message?.extendedTextMessage?.text ||
msg.message?.imageMessage?.caption ||
msg.message?.videoMessage?.caption || '';
const isAudio = !!msg.message?.audioMessage;
const isImage = !!msg.message?.imageMessage;
// Only process messages that have text content OR are audio/image messages
if (!body && !isAudio && !isImage) continue;
let audioBase64 = null;
let audioMimeType = null;
let imageBase64 = null;
let imageMimeType = null;
if (isAudio) {
try {
console.log(`[Baileys] Downloading audio message for ${remoteJid}`);
const buffer = await downloadMediaMessage(
msg,
'buffer',
{},
{
logger: pino({ level: 'silent' }),
rekey: true
}
);
audioBase64 = buffer.toString('base64');
audioMimeType = msg.message.audioMessage.mimetype || 'audio/ogg';
} catch (e) {
console.error('[Baileys] Failed to download audio message:', e.message);
continue; // Skip if audio download fails to prevent empty processing
}
} else if (isImage) {
try {
console.log(`[Baileys] Downloading image message for ${remoteJid}`);
const buffer = await downloadMediaMessage(
msg,
'buffer',
{},
{
logger: pino({ level: 'silent' }),
rekey: true
}
);
imageBase64 = buffer.toString('base64');
imageMimeType = msg.message.imageMessage.mimetype || 'image/jpeg';
} catch (e) {
console.error('[Baileys] Failed to download image message:', e.message);
continue; // Skip if image download fails
}
}
// Extract sender phone number (handle LID privacy scheme)
let senderPhone = '';
if (msg.key.senderPn && msg.key.senderPn.endsWith('@s.whatsapp.net')) {
senderPhone = msg.key.senderPn.split('@')[0];
// CRITICAL: Map this phone number to the LID JID for reply routing.
// Signal E2EE sessions are bound to the LID, so replies MUST go
// to the LID JID, not the phone JID, to prevent session conflicts.
if (remoteJid.endsWith('@lid')) {
phoneToLid.set(senderPhone, remoteJid);
console.log(`[LID] Mapped ${senderPhone}${remoteJid}`);
}
} else if (remoteJid.endsWith('@s.whatsapp.net')) {
senderPhone = remoteJid.split('@')[0];
} else if (remoteJid.endsWith('@lid')) {
senderPhone = remoteJid.split('@')[0];
}
if (!senderPhone) continue;
const senderName = msg.pushName || '';
if (isAudio) {
console.log(`[Message] Received audio voice note from ${senderPhone} (JID: ${remoteJid})`);
} else if (isImage) {
console.log(`[Message] Received image from ${senderPhone} (JID: ${remoteJid})`);
} else {
console.log(`[Message] Received from ${senderPhone} (JID: ${remoteJid}): ${body}`);
}
await sendWebhook(webhook_url, {
session_key,
state: 'message_received',
message: {
id: msg.key.id,
phone: senderPhone,
name: senderName,
body: body,
audio: audioBase64,
mimeType: audioMimeType,
duration: msg.message?.audioMessage?.seconds ? Number(msg.message.audioMessage.seconds) : null,
image: imageBase64,
imageMimeType: imageMimeType,
timestamp: msg.messageTimestamp
}
});
}
});
sock.ev.on('connection.update', async (update) => {
const { connection, lastDisconnect, qr } = update;
if (qr) {
console.log(`[QR] Generated for ${session_key}`);
// Reset retry counter on QR generation (session is alive)
retryCounters.set(session_key, 0);
await sendWebhook(webhook_url, {
session_key,
state: 'waiting_qr',
qr_code: qr
});
}
if (connection === 'close') {
const statusCode = lastDisconnect?.error?.output?.statusCode;
const shouldReconnect = statusCode !== DisconnectReason.loggedOut;
if (!shouldReconnect) {
console.log(`[Connection] ${session_key} permanently closed (logged out). Cleaning up.`);
sessions.delete(session_key);
await cleanupSession(session_key);
await sendWebhook(webhook_url, {
session_key,
state: 'disconnected'
});
} else {
const retries = (retryCounters.get(session_key) || 0) + 1;
retryCounters.set(session_key, retries);
sessions.delete(session_key);
let delay;
if (retries <= MAX_RETRIES) {
delay = Math.min(retries * 3000, 15000); // 3s, 6s, 9s, 12s, 15s
console.log(`[Connection] Reconnecting ${session_key} (quick retry ${retries}/${MAX_RETRIES}) in ${delay}ms...`);
} else {
delay = 60000; // 60s
console.log(`[Connection] Reconnecting ${session_key} (long-term retry ${retries}) in 60s...`);
}
setTimeout(() => startSession(session_key, webhook_url), delay);
}
} else if (connection === 'open') {
console.log(`[Connection] ✅ ${session_key} connected successfully!`);
retryCounters.set(session_key, 0); // Reset on successful connection
// Parse phone number from the JID (e.g. 9665XXXXXXX@s.whatsapp.net)
const phone = sock.user.id.split(':')[0];
await sendWebhook(webhook_url, {
session_key,
state: 'connected',
phone: phone
});
}
});
return sock;
}
/**
* Cleanup session: remove from memory and delete auth files
*/
async function cleanupSession(session_key) {
const interval = storeIntervals.get(session_key);
if (interval) {
clearInterval(interval);
storeIntervals.delete(session_key);
}
sessionStores.delete(session_key);
const sock = sessions.get(session_key);
if (sock) {
try { sock.end(); } catch (e) { } // Gracefully close socket without logout
sessions.delete(session_key);
}
retryCounters.delete(session_key);
// Wipe the auth directory so a fresh session can be created next time
const sessionFolder = path.join(SESSIONS_DIR, session_key);
if (fs.existsSync(sessionFolder)) {
fs.rmSync(sessionFolder, { recursive: true, force: true });
console.log(`[Cleanup] Deleted session folder for ${session_key}`);
}
}
/**
* Disconnect session: logout from WhatsApp and cleanup
*/
async function disconnectSession(session_key) {
const interval = storeIntervals.get(session_key);
if (interval) {
clearInterval(interval);
storeIntervals.delete(session_key);
}
sessionStores.delete(session_key);
const storeFile = path.join(SESSIONS_DIR, `${session_key}_store.json`);
if (fs.existsSync(storeFile)) {
try { fs.unlinkSync(storeFile); } catch (e) {}
}
const sock = sessions.get(session_key);
if (sock) {
try { sock.logout(); } catch (e) { } // best effort logout
sessions.delete(session_key);
}
retryCounters.delete(session_key);
// Completely wipe the auth directory
const sessionFolder = path.join(SESSIONS_DIR, session_key);
if (fs.existsSync(sessionFolder)) {
fs.rmSync(sessionFolder, { recursive: true, force: true });
console.log(`[Disconnect] Deleted session folder for ${session_key}`);
}
}
const { exec } = require('child_process');
const os = require('os');
function convertToOggOpus(base64Audio) {
return new Promise((resolve, reject) => {
const timeId = Date.now() + Math.random().toString(36).substring(7);
const inputPath = path.join(os.tmpdir(), `input_${timeId}.tmp`);
const outputPath = path.join(os.tmpdir(), `output_${timeId}.ogg`);
fs.writeFile(inputPath, Buffer.from(base64Audio, 'base64'), (err) => {
if (err) return reject(err);
exec(`ffmpeg -i ${inputPath} -c:a libopus -y ${outputPath}`, (execErr) => {
fs.unlink(inputPath, () => {});
if (execErr) return reject(execErr);
fs.readFile(outputPath, (readErr, data) => {
fs.unlink(outputPath, () => {});
if (readErr) return reject(readErr);
resolve(data.toString('base64'));
});
});
});
});
}
/**
* Send a message using an active session
*/
async function sendMessage(session_key, phone, message, mediaUrl = null, audioBase64 = null, mimetype = null, imageBase64 = null) {
const sock = sessions.get(session_key);
if (!sock) {
throw new Error(`Session ${session_key} is not active or connected`);
}
// Use the LID JID if we have a mapping for this phone number.
// This is CRITICAL: Signal E2EE sessions are bound to the LID JID,
// so replying to the phone JID creates a separate session that
// conflicts with the LID session, causing "Waiting for this message".
let jid;
if (phone.includes('@')) {
jid = phone;
} else if (phoneToLid.has(phone)) {
jid = phoneToLid.get(phone);
console.log(`[LID] Routing reply to ${phone} via LID: ${jid}`);
} else {
jid = `${phone}@s.whatsapp.net`;
}
let sentMsg;
if (imageBase64) {
const buffer = Buffer.from(imageBase64, 'base64');
sentMsg = await sock.sendMessage(jid, {
image: buffer,
caption: message || ''
});
} else if (audioBase64) {
let finalAudioBase64 = audioBase64;
let finalMime = mimetype || 'audio/mp4';
// If it's MP3, convert to OGG Opus to ensure iPhone PTT compatibility
if (finalMime.includes('mpeg') || finalMime.includes('mp3')) {
try {
console.log(`[Baileys] Converting MP3 to OGG Opus for native PTT...`);
finalAudioBase64 = await convertToOggOpus(audioBase64);
finalMime = 'audio/ogg; codecs=opus';
} catch (err) {
console.error(`[Baileys] FFmpeg conversion failed:`, err.message);
// Fallback to sending as normal audio if conversion fails
finalMime = 'audio/mp4';
}
}
const buffer = Buffer.from(finalAudioBase64, 'base64');
const isMp3 = finalMime.includes('mpeg') || finalMime.includes('mp3');
sentMsg = await sock.sendMessage(jid, {
audio: buffer,
mimetype: finalMime,
ptt: !isMp3 // PTT enabled for OGG/MP4, disabled for raw MP3
});
} else if (mediaUrl) {
const ext = mediaUrl.split('.').pop().toLowerCase();
if (['jpg', 'jpeg', 'png', 'webp'].includes(ext)) {
sentMsg = await sock.sendMessage(jid, { image: { url: mediaUrl }, caption: message });
} else if (['mp4', 'mkv', 'avi'].includes(ext)) {
sentMsg = await sock.sendMessage(jid, { video: { url: mediaUrl }, caption: message });
} else {
sentMsg = await sock.sendMessage(jid, { document: { url: mediaUrl }, caption: message, fileName: mediaUrl.split('/').pop() });
}
} else {
sentMsg = await sock.sendMessage(jid, { text: message });
}
// Cache outbound messages for E2EE decryption retries
if (sentMsg && sentMsg.key && sentMsg.key.id && sentMsg.message) {
recentMessages.set(sentMsg.key.id, sentMsg.message);
if (recentMessages.size > 2000) {
const firstKey = recentMessages.keys().next().value;
recentMessages.delete(firstKey);
}
}
return sentMsg;
}
async function checkContact(session_key, phone) {
const sock = sessions.get(session_key);
if (!sock) {
throw new Error(`Session ${session_key} is not active or connected`);
}
const jid = phone.includes('@') ? phone : `${phone}@s.whatsapp.net`;
try {
const result = await sock.onWhatsApp(jid);
if (result && result.length > 0) {
return result[0];
}
return { exists: false, jid };
} catch (err) {
console.error(`[Baileys] Error checking contact ${jid}:`, err.message);
throw err;
}
}
async function exportChatHistory(session_key) {
const store = sessionStores.get(session_key);
if (!store) {
throw new Error(`No store found for session ${session_key}`);
}
const chats = store.chats.all();
let outputText = `==================================================\n`;
outputText += `سجل محادثات منصة نبيه - جلسة: ${session_key}\n`;
outputText += `تاريخ التصدير: ${new Date().toLocaleString('ar-EG')}\n`;
outputText += `==================================================\n\n`;
for (const chat of chats) {
const jid = chat.id;
const phone = jid.split('@')[0];
const name = chat.name || 'عميل غير مسمى';
// Skip groups or broadcasts
if (jid.endsWith('@g.us') || jid.endsWith('@broadcast')) {
continue;
}
const messages = store.messages[jid]?.all() || [];
if (messages.length === 0) continue;
outputText += `--------------------------------------------------\n`;
outputText += `المحادثة مع: ${name} (${phone})\n`;
outputText += `--------------------------------------------------\n`;
for (const msg of messages) {
const fromMe = msg.key.fromMe;
const sender = fromMe ? 'المنصة (نبيه)' : name;
const body = msg.message?.conversation ||
msg.message?.extendedTextMessage?.text ||
msg.message?.imageMessage?.caption ||
msg.message?.videoMessage?.caption || '';
let dateStr = 'تاريخ غير معروف';
if (msg.messageTimestamp) {
const timestamp = Number(msg.messageTimestamp) * 1000;
dateStr = new Date(timestamp).toLocaleString('ar-EG');
}
if (body) {
outputText += `[${dateStr}] ${sender}: ${body}\n`;
} else if (msg.message?.audioMessage) {
outputText += `[${dateStr}] ${sender}: [رسالة صوتية]\n`;
} else if (msg.message?.imageMessage) {
outputText += `[${dateStr}] ${sender}: [صورة]\n`;
} else {
outputText += `[${dateStr}] ${sender}: [رسالة غير معروفة]\n`;
}
}
outputText += `\n\n`;
}
const publicDir = path.join(__dirname, '..', 'backend', 'public');
if (!fs.existsSync(publicDir)) {
fs.mkdirSync(publicDir, { recursive: true });
}
const filePath = path.join(publicDir, 'whatsapp_chats_history.txt');
fs.writeFileSync(filePath, outputText, 'utf8');
return filePath;
}
function getActiveSessions() {
return Array.from(sessions.keys());
}
module.exports = {
startSession,
disconnectSession,
sendMessage,
getActiveSessions,
checkContact,
exportChatHistory
};