feat: complete migration to 6-slot multi-tenant registry with MySQL message archiving
This commit is contained in:
154
whatsapp_bridge/database.js
Normal file
154
whatsapp_bridge/database.js
Normal file
@@ -0,0 +1,154 @@
|
||||
const mysql = require('mysql2/promise');
|
||||
|
||||
let pool = null;
|
||||
|
||||
function getPool() {
|
||||
if (!pool) {
|
||||
const config = {
|
||||
host: process.env.DB_HOST || '127.0.0.1',
|
||||
port: parseInt(process.env.DB_PORT || '3306'),
|
||||
user: process.env.DB_USER,
|
||||
password: process.env.DB_PASSWORD,
|
||||
database: process.env.DB_NAME,
|
||||
waitForConnections: true,
|
||||
connectionLimit: 10,
|
||||
queueLimit: 0
|
||||
};
|
||||
|
||||
console.log(`[DB] Initializing MySQL Connection Pool to ${config.host}:${config.port}/${config.database}`);
|
||||
pool = mysql.createPool(config);
|
||||
}
|
||||
return pool;
|
||||
}
|
||||
|
||||
// ─── Automatic Database Schema Migration ──────────────────────────────────
|
||||
async function initDatabase() {
|
||||
const connectionPool = getPool();
|
||||
|
||||
try {
|
||||
// 1. Create Slots Table
|
||||
await connectionPool.query(`
|
||||
CREATE TABLE IF NOT EXISTS slots (
|
||||
id INT PRIMARY KEY,
|
||||
phone_number VARCHAR(30) NULL,
|
||||
status VARCHAR(50) DEFAULT 'disconnected',
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
`);
|
||||
|
||||
// Seed exactly 6 slots if they don't exist yet
|
||||
for (let slotId = 1; slotId <= 6; slotId++) {
|
||||
await connectionPool.query(`
|
||||
INSERT INTO slots (id, status)
|
||||
VALUES (?, 'disconnected')
|
||||
ON DUPLICATE KEY UPDATE id=id;
|
||||
`, [slotId]);
|
||||
}
|
||||
|
||||
// 2. Create Messages Table
|
||||
await connectionPool.query(`
|
||||
CREATE TABLE IF NOT EXISTS messages (
|
||||
id VARCHAR(150) PRIMARY KEY,
|
||||
slot_id INT NOT NULL,
|
||||
chat_id VARCHAR(100) NOT NULL,
|
||||
sender_name VARCHAR(150) NULL,
|
||||
body TEXT NULL,
|
||||
from_me BOOLEAN DEFAULT FALSE,
|
||||
timestamp INT NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
FOREIGN KEY (slot_id) REFERENCES slots(id) ON DELETE CASCADE,
|
||||
INDEX idx_slot_chat (slot_id, chat_id),
|
||||
INDEX idx_timestamp (timestamp)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
`);
|
||||
|
||||
console.log('[DB] MySQL Tables initialized successfully.');
|
||||
} catch (err) {
|
||||
console.error('[DB ERROR] Migration failed:', err.message);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Helper Queries ────────────────────────────────────────────────────────
|
||||
async function updateSlotStatus(slotId, status, phoneNumber = null) {
|
||||
try {
|
||||
const connectionPool = getPool();
|
||||
await connectionPool.query(`
|
||||
UPDATE slots
|
||||
SET status = ?, phone_number = COALESCE(?, phone_number)
|
||||
WHERE id = ?;
|
||||
`, [status, phoneNumber, slotId]);
|
||||
console.log(`[DB] Slot ${slotId} status updated to: ${status}`);
|
||||
} catch (err) {
|
||||
console.error(`[DB ERROR] Failed to update slot ${slotId} status:`, err.message);
|
||||
}
|
||||
}
|
||||
|
||||
async function archiveMessage(slotId, msg) {
|
||||
try {
|
||||
const connectionPool = getPool();
|
||||
|
||||
// We only archive text-based body messages (or custom media representations)
|
||||
let bodyText = msg.body || '';
|
||||
if (!bodyText && msg.hasMedia) {
|
||||
bodyText = '📷 Media/Attachment';
|
||||
}
|
||||
|
||||
await connectionPool.query(`
|
||||
INSERT INTO messages (id, slot_id, chat_id, sender_name, body, from_me, timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
ON DUPLICATE KEY UPDATE body = VALUES(body);
|
||||
`, [
|
||||
msg.id._serialized || msg.id.id,
|
||||
slotId,
|
||||
msg.to || msg.from,
|
||||
msg.senderName || null,
|
||||
bodyText,
|
||||
msg.fromMe ? 1 : 0,
|
||||
msg.timestamp
|
||||
]);
|
||||
console.log(`[DB] Message ${msg.id.id} archived successfully in Slot ${slotId}`);
|
||||
} catch (err) {
|
||||
console.error('[DB ERROR] Failed to archive message:', err.message);
|
||||
}
|
||||
}
|
||||
|
||||
async function getChatHistory(slotId, chatId, limit = 50, offset = 0) {
|
||||
try {
|
||||
const connectionPool = getPool();
|
||||
const [rows] = await connectionPool.query(`
|
||||
SELECT * FROM messages
|
||||
WHERE slot_id = ? AND chat_id = ?
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT ? OFFSET ?;
|
||||
`, [slotId, chatId, parseInt(limit), parseInt(offset)]);
|
||||
return rows.reverse(); // Return in chronological order
|
||||
} catch (err) {
|
||||
console.error('[DB ERROR] Failed to get chat history:', err.message);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async function searchMessages(slotId, query, limit = 50) {
|
||||
try {
|
||||
const connectionPool = getPool();
|
||||
const [rows] = await connectionPool.query(`
|
||||
SELECT * FROM messages
|
||||
WHERE slot_id = ? AND body LIKE ?
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT ?;
|
||||
`, [slotId, `%${query}%`, parseInt(limit)]);
|
||||
return rows;
|
||||
} catch (err) {
|
||||
console.error('[DB ERROR] Failed to search messages:', err.message);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
initDatabase,
|
||||
updateSlotStatus,
|
||||
archiveMessage,
|
||||
getChatHistory,
|
||||
searchMessages
|
||||
};
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user