feat: initial commit at project root

This commit is contained in:
Hamza-Ayed
2026-05-29 01:06:47 +03:00
commit 87ec54bbd7
22 changed files with 1737 additions and 0 deletions

48
internal/config/config.js Normal file
View File

@@ -0,0 +1,48 @@
import process from 'process';
const apiKey = process.env.API_KEY || '';
if (apiKey.length < 32) {
console.error(`FATAL: API_KEY must be at least 32 characters long, current length: ${apiKey.length}`);
process.exit(1);
}
/**
* Parses duration strings like "20s", "1m" to milliseconds.
* @param {string|undefined} val
* @param {number} defaultMs
* @returns {number}
*/
const parseDuration = (val, defaultMs) => {
if (!val) return defaultMs;
const match = val.match(/^(\d+)(s|m|h|ms)$/);
if (!match) return defaultMs;
const num = parseInt(match[1], 10);
const unit = match[2];
switch (unit) {
case 'ms': return num;
case 's': return num * 1000;
case 'm': return num * 60 * 1000;
case 'h': return num * 60 * 60 * 1000;
default: return defaultMs;
}
};
export const config = {
serverAddr: process.env.SERVER_ADDR || '127.0.0.1:47880',
apiKey: apiKey,
logLevel: process.env.LOG_LEVEL || 'info',
maxMessageBytes: parseInt(process.env.MAX_MESSAGE_BYTES || '4096', 10),
heartbeatInterval: parseDuration(process.env.HEARTBEAT_INTERVAL, 20000), // 20s
heartbeatTimeout: parseDuration(process.env.HEARTBEAT_TIMEOUT, 30000), // 30s
sessionDuration: parseDuration(process.env.SESSION_DURATION, 60000), // 60s
rateLimitPerMin: parseInt(process.env.RATE_LIMIT_PER_MIN || '10', 10),
environment: process.env.ENVIRONMENT || 'production',
dbHost: process.env.DB_HOST || '127.0.0.1',
dbPort: parseInt(process.env.DB_PORT || '3306', 10),
dbDatabase: process.env.DB_DATABASE || 'callDB',
dbUsername: process.env.DB_USERNAME || '',
dbPassword: process.env.DB_PASSWORD || '',
turnUrl: process.env.TURN_URL || '',
turnUsername: process.env.TURN_USERNAME || '',
turnCredential: process.env.TURN_CREDENTIAL || '',
};

119
internal/db/db.js Normal file
View File

@@ -0,0 +1,119 @@
import mysql from 'mysql2/promise';
import { config } from '../config/config.js';
import { logger } from '../logger/logger.js';
let pool = null;
/**
* Initializes the MySQL database pool and ensures schema exists.
*/
export async function initializeDatabase() {
if (!config.dbUsername) {
logger.warn('db_disabled', { reason: 'No DB_USERNAME configured, database logging is disabled' });
return;
}
try {
pool = mysql.createPool({
host: config.dbHost,
port: config.dbPort,
database: config.dbDatabase,
user: config.dbUsername,
password: config.dbPassword,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
});
// Test connection and auto-create the table if not exists
const conn = await pool.getConnection();
await conn.query(`
CREATE TABLE IF NOT EXISTS \`call_logs\` (
\`id\` INT AUTO_INCREMENT PRIMARY KEY,
\`session_id\` VARCHAR(36) NOT NULL UNIQUE,
\`ride_id\` VARCHAR(255) NOT NULL,
\`driver_id\` VARCHAR(255) NOT NULL,
\`passenger_id\` VARCHAR(255) NOT NULL,
\`driver_ip\` VARCHAR(45) NULL DEFAULT NULL,
\`passenger_ip\` VARCHAR(45) NULL DEFAULT NULL,
\`driver_conn_ip\` VARCHAR(45) NULL DEFAULT NULL,
\`passenger_conn_ip\` VARCHAR(45) NULL DEFAULT NULL,
\`status\` VARCHAR(50) NOT NULL DEFAULT 'created',
\`initiated_by\` VARCHAR(255) NULL DEFAULT NULL,
\`end_reason\` VARCHAR(255) NULL DEFAULT NULL,
\`created_at\` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
\`connected_at\` TIMESTAMP NULL DEFAULT NULL,
\`ended_at\` TIMESTAMP NULL DEFAULT NULL,
INDEX \`idx_session_id\` (\`session_id\`),
INDEX \`idx_ride_id\` (\`ride_id\`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
`);
conn.release();
logger.info('db_initialized', { host: config.dbHost, database: config.dbDatabase });
} catch (err) {
logger.error('db_initialization_failed', { error: err.message });
// Soft fallback: log the failure but do not crash the signaling server
pool = null;
}
}
/**
* Logs session creation to the database with pre-registered client IPs.
*/
export async function logSessionCreated(sessionID, rideID, driverID, passengerID, driverIP = null, passengerIP = null) {
if (!pool) return;
try {
await pool.query(
`INSERT INTO call_logs (session_id, ride_id, driver_id, passenger_id, driver_ip, passenger_ip, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, 'created', NOW())`,
[sessionID, rideID, driverID, passengerID, driverIP, passengerIP]
);
} catch (err) {
logger.error('db_log_session_created_failed', { session_id: sessionID, error: err.message });
}
}
/**
* Logs session status transition to active and records the connection IPs.
*/
export async function logSessionActive(sessionID, driverConnIP = null, passengerConnIP = null) {
if (!pool) return;
try {
await pool.query(
`UPDATE call_logs SET status = 'active', connected_at = NOW(), driver_conn_ip = ?, passenger_conn_ip = ? WHERE session_id = ?`,
[driverConnIP, passengerConnIP, sessionID]
);
} catch (err) {
logger.error('db_log_session_active_failed', { session_id: sessionID, error: err.message });
}
}
/**
* Logs the initiator (caller) of the call when the first SDP offer is received.
*/
export async function logSessionInitiator(sessionID, initiatorID) {
if (!pool) return;
try {
await pool.query(
`UPDATE call_logs SET initiated_by = ? WHERE session_id = ? AND initiated_by IS NULL`,
[initiatorID, sessionID]
);
} catch (err) {
logger.error('db_log_session_initiator_failed', { session_id: sessionID, initiator_id: initiatorID, error: err.message });
}
}
/**
* Logs call termination reason and end time.
*/
export async function logSessionEnded(sessionID, endReason) {
if (!pool) return;
try {
await pool.query(
`UPDATE call_logs SET status = 'ended', ended_at = NOW(), end_reason = ? WHERE session_id = ?`,
[endReason, sessionID]
);
} catch (err) {
logger.error('db_log_session_ended_failed', { session_id: sessionID, error: err.message });
}
}

52
internal/logger/logger.js Normal file
View File

@@ -0,0 +1,52 @@
import process from 'process';
const LOG_LEVELS = {
debug: 0,
info: 1,
warn: 2,
error: 3
};
const currentLevelStr = (process.env.LOG_LEVEL || 'info').toLowerCase();
const currentLevel = LOG_LEVELS[currentLevelStr] !== undefined ? LOG_LEVELS[currentLevelStr] : 1;
const ANSI_COLORS = {
DEBUG: '\x1b[36mDEBUG\x1b[0m', // Cyan
INFO: '\x1b[32mINFO\x1b[0m', // Green
WARN: '\x1b[33mWARN\x1b[0m', // Yellow
ERROR: '\x1b[31mERROR\x1b[0m' // Red
};
/**
* Helper to print log structured JSON objects to stdout.
* @param {string} levelName
* @param {string} eventName
* @param {object} [attributes]
*/
function writeLog(levelName, eventName, attributes = {}) {
const levelNum = LOG_LEVELS[levelName];
if (levelNum < currentLevel) {
return;
}
const levelUpper = levelName.toUpperCase();
const logEntry = {
timestamp: new Date().toISOString(),
level: `__COLOR_LEVEL_${levelUpper}__`,
event: eventName,
...attributes
};
let jsonStr = JSON.stringify(logEntry);
const coloredLevel = ANSI_COLORS[levelUpper] || levelUpper;
jsonStr = jsonStr.replace(`"__COLOR_LEVEL_${levelUpper}__"`, `"${coloredLevel}"`);
process.stdout.write(jsonStr + '\n');
}
export const logger = {
debug: (event, attrs) => writeLog('debug', event, attrs),
info: (event, attrs) => writeLog('info', event, attrs),
warn: (event, attrs) => writeLog('warn', event, attrs),
error: (event, attrs) => writeLog('error', event, attrs)
};

View File

@@ -0,0 +1,59 @@
/**
* RateLimiter limits connection attempts using a sliding window.
*/
export class RateLimiter {
/**
* @param {number} limit Maximum connections allowed
* @param {number} windowMs Sliding window length in milliseconds
*/
constructor(limit, windowMs) {
this.limit = limit;
this.windowMs = windowMs;
this.attempts = new Map();
// Start background cleanup worker every 5 minutes
this.cleanupInterval = setInterval(() => this.cleanup(), 5 * 60 * 1000);
if (this.cleanupInterval.unref) {
this.cleanupInterval.unref();
}
}
/**
* Checks if an attempt is allowed under rate limits for the given IP.
* @param {string} ip
* @returns {boolean} True if connection attempt is within rate limits
*/
allow(ip) {
const now = Date.now();
const cutoff = now - this.windowMs;
let timestamps = this.attempts.get(ip) || [];
// Filter out historical connection attempts outside the current window
timestamps = timestamps.filter(t => t > cutoff);
if (timestamps.length >= this.limit) {
this.attempts.set(ip, timestamps);
return false;
}
timestamps.push(now);
this.attempts.set(ip, timestamps);
return true;
}
/**
* Sweeps inactive keys to reclaim memory.
*/
cleanup() {
const cutoff = Date.now() - this.windowMs;
for (const [ip, timestamps] of this.attempts.entries()) {
const active = timestamps.filter(t => t > cutoff);
if (active.length === 0) {
this.attempts.delete(ip);
} else {
this.attempts.set(ip, active);
}
}
}
}

View File

@@ -0,0 +1,48 @@
// Message type constants
export const TypeAuthenticate = 'authenticate';
export const TypeCreateSession = 'create_session';
export const TypeJoinSession = 'join_session';
export const TypeOffer = 'offer';
export const TypeAnswer = 'answer';
export const TypeICECandidate = 'ice_candidate';
export const TypeHeartbeat = 'heartbeat';
export const TypeEndCall = 'end_call';
export const TypeAuthenticated = 'authenticated';
export const TypeSessionCreated = 'session_created';
export const TypeSessionJoined = 'session_joined';
export const TypeParticipantJoined = 'participant_joined';
export const TypeParticipantLeft = 'participant_left';
export const TypeCallTimeout = 'call_timeout';
export const TypeCallEnded = 'call_ended';
export const TypePong = 'pong';
export const TypeUnauthorized = 'unauthorized';
export const TypeError = 'error';
// Error codes
export const ErrRideNotActive = 'ride_not_active';
export const ErrRideAlreadyStarted = 'ride_already_started';
export const ErrRideCancelled = 'ride_cancelled';
export const ErrSessionExists = 'session_exists';
export const ErrSessionNotFound = 'session_not_found';
export const ErrSessionFull = 'session_full';
export const ErrUnauthorizedUser = 'unauthorized_user';
export const ErrTokenExpired = 'token_expired';
export const ErrTokenInvalid = 'token_invalid';
export const ErrPayloadTooLarge = 'payload_too_large';
export const ErrRateLimited = 'rate_limited';
// Helper constructors for encoding server messages
export const newAuthenticated = (userID, iceServers) => JSON.stringify({ type: TypeAuthenticated, user_id: userID, ice_servers: iceServers });
export const newSessionCreated = (sessionID, rideID, expiresIn) => JSON.stringify({ type: TypeSessionCreated, session_id: sessionID, ride_id: rideID, expires_in: expiresIn });
export const newSessionJoined = (sessionID, rideID) => JSON.stringify({ type: TypeSessionJoined, session_id: sessionID, ride_id: rideID });
export const newParticipantJoined = (role) => JSON.stringify({ type: TypeParticipantJoined, role });
export const newRelayOffer = (sdp) => JSON.stringify({ type: TypeOffer, sdp });
export const newRelayAnswer = (sdp) => JSON.stringify({ type: TypeAnswer, sdp });
export const newRelayICE = (candidate) => JSON.stringify({ type: TypeICECandidate, candidate });
export const newParticipantLeft = (role) => JSON.stringify({ type: TypeParticipantLeft, role });
export const newCallTimeout = (reason) => JSON.stringify({ type: TypeCallTimeout, reason });
export const newCallEnded = (reason) => JSON.stringify({ type: TypeCallEnded, reason });
export const newPong = () => JSON.stringify({ type: TypePong });
export const newUnauthorized = (reason) => JSON.stringify({ type: TypeUnauthorized, reason });
export const newError = (code, message) => JSON.stringify({ type: TypeError, code, message });

View File

@@ -0,0 +1,37 @@
import crypto from 'crypto';
/**
* Generates a cryptographically secure UUID version 4.
* @returns {string}
*/
export function generateUUIDv4() {
return crypto.randomUUID();
}
/**
* Session models an active call coordination session.
*/
export class Session {
/**
* @param {string} rideID
* @param {string} driverID
* @param {string} passengerID
* @param {number} durationMs
* @param {string} [driverIP]
* @param {string} [passengerIP]
*/
constructor(rideID, driverID, passengerID, durationMs, driverIP = null, passengerIP = null) {
this.sessionID = generateUUIDv4();
this.rideID = rideID;
this.driverID = driverID;
this.passengerID = passengerID;
this.driverIP = driverIP;
this.passengerIP = passengerIP;
this.createdAt = new Date();
this.expiresAt = new Date(this.createdAt.getTime() + durationMs);
this.status = 'waiting'; // "waiting" | "active" | "ended"
this.driverConn = null;
this.passengerConn = null;
this.timer = null;
}
}

88
internal/session/store.js Normal file
View File

@@ -0,0 +1,88 @@
import { Session } from './session.js';
/**
* Store handles active in-memory sessions indexed by session ID.
* It also maintains a ride ID mapping to prevent duplicate active sessions per ride.
*/
export class Store {
constructor() {
this.sessions = new Map(); // key: sessionID, value: Session
this.rideToSession = new Map(); // key: rideID, value: Session
}
/**
* Retrieves a session by its session ID.
* @param {string} sessionID
* @returns {Session|undefined}
*/
getSession(sessionID) {
return this.sessions.get(sessionID);
}
/**
* Retrieves an active session by its ride ID.
* @param {string} rideID
* @returns {Session|undefined}
*/
getSessionByRide(rideID) {
return this.rideToSession.get(rideID);
}
/**
* Registers a new call session, checking for active duplicates.
* @param {string} rideID
* @param {string} driverID
* @param {string} passengerID
* @param {number} durationMs
* @param {string} [driverIP]
* @param {string} [passengerIP]
* @returns {Session}
* @throws {Error} Throws if an active session already exists for the ride
*/
createSession(rideID, driverID, passengerID, durationMs, driverIP = null, passengerIP = null) {
const existing = this.rideToSession.get(rideID);
if (existing && existing.status !== 'ended') {
throw new Error('session_exists');
}
const sess = new Session(rideID, driverID, passengerID, durationMs, driverIP, passengerIP);
this.sessions.set(sess.sessionID, sess);
this.rideToSession.set(rideID, sess);
return sess;
}
/**
* Removes a session from registry mappings.
* @param {string} sessionID
*/
destroySession(sessionID) {
const sess = this.sessions.get(sessionID);
if (sess) {
this.sessions.delete(sessionID);
this.rideToSession.delete(sess.rideID);
}
}
/**
* Finds expired active sessions.
* @returns {Session[]}
*/
getExpiredSessions() {
const expired = [];
const now = Date.now();
for (const sess of this.sessions.values()) {
if (sess.status !== 'ended' && now > sess.expiresAt.getTime()) {
expired.push(sess);
}
}
return expired;
}
/**
* Returns active sessions count.
* @returns {number}
*/
getActiveCount() {
return this.sessions.size;
}
}

32
internal/timer/timer.js Normal file
View File

@@ -0,0 +1,32 @@
/**
* CallTimer wraps Node's setTimeout/clearTimeout to allow session cancellations.
*/
export class CallTimer {
/**
* @param {number} durationMs
* @param {function} onExpiry
*/
constructor(durationMs, onExpiry) {
this.timer = setTimeout(onExpiry, durationMs);
}
/**
* Cancels the active timer.
*/
stop() {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
}
}
/**
* Starts a session timeout timer.
* @param {number} durationMs
* @param {function} onExpiry
* @returns {CallTimer}
*/
export function startTimer(durationMs, onExpiry) {
return new CallTimer(durationMs, onExpiry);
}

151
internal/ws/client.js Normal file
View File

@@ -0,0 +1,151 @@
import { logger } from '../logger/logger.js';
import * as protocol from '../protocol/messages.js';
/**
* Client represents a single active WebSocket connection wrapper.
* It encapsulates the raw connection, handles ping/pong heartbeat,
* parses messages, and routes events back to the coordinator Hub.
*/
export class Client {
/**
* Initializes a new client instance and sets up connection event listeners and heartbeat.
* @param {object} conn The raw WS socket connection instance from the 'ws' library.
* @param {object} hub The central Hub coordinator to process and route messages.
*/
constructor(conn, hub) {
this.conn = conn; // The underlying raw WebSocket connection
this.hub = hub; // Reference to the main coordinator hub
this.userID = ''; // Authenticated user ID (driver_id or passenger_id)
this.rideID = ''; // Associated ride ID
this.role = ''; // User's role: 'driver' or 'passenger'
this.isAlive = true; // Connection status flag updated by heartbeat pong responses
this.pongTimeout = null; // Timer waiting for a pong reply after sending a ping
/**
* Listener for incoming WebSocket message payloads.
* Enforces a hard read limit of 4096 bytes (4KB) to prevent Denial of Service (DoS) memory consumption.
*/
this.conn.on('message', (data) => {
if (data.length > 4096) {
logger.warn('payload_too_large', { remote_ip: this.remoteIP() });
this.send(protocol.newError(protocol.ErrPayloadTooLarge, 'Payload too large'));
this.close();
return;
}
// Pass the message to the hub for authentication and signaling routing
this.hub.handleMessage(this, data);
});
/**
* Listener for the 'pong' response frame.
* Indicates that the client is responsive. Clears the timeout timer.
*/
this.conn.on('pong', () => {
this.isAlive = true;
if (this.pongTimeout) {
clearTimeout(this.pongTimeout);
this.pongTimeout = null;
}
});
/**
* Listener for connection close event.
* Triggers cleanup of heartbeat timers and unregisters the client from the hub.
*/
this.conn.on('close', () => {
this.cleanup();
this.hub.unregister(this);
});
/**
* Listener for socket connection errors.
* Logs the error details and safely terminates the connection.
*/
this.conn.on('error', (err) => {
logger.error('websocket_error', {
user_id: this.userID,
ride_id: this.rideID,
error: err.message
});
this.close();
});
/**
* Heartbeat loop: Pings the client every 20 seconds.
* If the client does not respond with a pong within 10 seconds, the connection is considered dead and closed.
* This prevents resource leaks from half-open TCP connections.
*/
this.pingInterval = setInterval(() => {
this.isAlive = false;
try {
this.conn.ping();
} catch (err) {
this.close();
return;
}
// Schedule a timeout check for 10 seconds. Close connection if isAlive is still false.
this.pongTimeout = setTimeout(() => {
if (!this.isAlive) {
logger.warn('heartbeat_timeout', {
user_id: this.userID,
ride_id: this.rideID,
remote_ip: this.remoteIP()
});
this.close();
}
}, 10000);
}, 20000);
}
/**
* Sends a serialized JSON message down the WebSocket connection wire.
* Checks the connection readyState to ensure it is in the OPEN state before writing.
* @param {string} msg Serialized JSON string message payload.
*/
send(msg) {
if (this.conn.readyState === 1) { // WebSocket.OPEN state
try {
this.conn.send(msg);
} catch (err) {
logger.error('websocket_send_failed', { error: err.message });
}
}
}
/**
* Resolves the remote IP address of the connected client.
* Falls back to 'unknown' if the connection socket is already closed or unavailable.
* @returns {string} The remote client's IP address.
*/
remoteIP() {
return this.conn._socket ? this.conn._socket.remoteAddress : 'unknown';
}
/**
* Safely terminates the WebSocket connection and releases local memory and timers.
*/
close() {
try {
this.conn.close();
} catch (err) {
// Ignored: connection might already be closed or unreachable
}
this.cleanup();
}
/**
* Cleans up and clears all scheduled interval pings and timeout check timers.
* This is critical to prevent CPU timer memory leaks when client closes.
*/
cleanup() {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
if (this.pongTimeout) {
clearTimeout(this.pongTimeout);
this.pongTimeout = null;
}
}
}

59
internal/ws/handler.js Normal file
View File

@@ -0,0 +1,59 @@
import { WebSocketServer } from 'ws';
import { Client } from './client.js';
import { logger } from '../logger/logger.js';
/**
* Attaches the WebSocket server upgrade hook to the given HTTP server instance.
* It intercepts standard HTTP GET requests for WebSocket protocols and handles upgrades.
* @param {object} server The native http.Server instance.
* @param {object} hub The central Hub coordinator instance.
* @param {object} limiter The IP-based RateLimiter instance.
*/
export function setupWebSocket(server, hub, limiter) {
// Initialize ws server in noServer mode since we manually intercept upgrade requests.
const wss = new WebSocketServer({ noServer: true });
/**
* Listens for the HTTP 'upgrade' event to switch protocol from HTTP to WebSocket.
*/
server.on('upgrade', (request, socket, head) => {
const ip = getClientIP(request);
// Enforce IP-based rate limiting to prevent spam connections.
if (!limiter.allow(ip)) {
logger.warn('rate_limit_exceeded', { remote_ip: ip });
// Send 429 Too Many Requests response to client and close connection.
socket.write('HTTP/1.1 429 Too Many Requests\r\nConnection: close\r\n\r\nrate_limited');
socket.destroy();
return;
}
// Hand over control to 'ws' library to complete the upgrade protocol handshake.
wss.handleUpgrade(request, socket, head, (wsConn) => {
// Set remote address on socket mock for later client IP queries.
// This ensures we can resolve client IP even after connection is upgraded.
wsConn._socket = wsConn._socket || {};
wsConn._socket.remoteAddress = ip;
// Instantiate a new Client wrapper.
// Event listeners are automatically attached in the Client constructor.
new Client(wsConn, hub);
});
});
}
/**
* Resolves the client's original IP address, supporting reverse proxy setups
* (like Nginx) that forward the real client IP via X-Forwarded-For header.
* @param {object} request The http.IncomingMessage request object.
* @returns {string} The resolved IP address.
*/
function getClientIP(request) {
const xff = request.headers['x-forwarded-for'];
if (xff) {
// If proxied multiple times, the first element is the client's real IP.
const ips = xff.split(',');
return ips[0].trim();
}
return request.socket.remoteAddress || 'unknown';
}

454
internal/ws/hub.js Normal file
View File

@@ -0,0 +1,454 @@
import { logger } from '../logger/logger.js';
import * as protocol from '../protocol/messages.js';
import { logSessionActive, logSessionInitiator, logSessionEnded } from '../db/db.js';
import { config } from '../config/config.js';
/**
* Hub is the central coordinator for all active call sessions and client sockets.
* It manages connection mappings, routing of WebRTC signaling payloads (Offer, Answer, ICE),
* enforcing call session timers, and logging call connectivity changes to the database.
*/
export class Hub {
/**
* Initializes the Hub instance.
* @param {object} store The session memory store that holds active call profiles.
*/
constructor(store) {
this.clients = new Map(); // Map storing authenticated socket wrappers: Key is userID, Value is Client instance
this.store = store; // Reference to the active sessions in-memory storage
// Background job: Periodically scans the session store every 10 seconds to close expired sessions
this.sweepInterval = setInterval(() => this.sweepExpiredSessions(), 10000);
if (this.sweepInterval.unref) {
this.sweepInterval.unref(); // Prevent this interval from keeping the Node process alive on exit
}
}
/**
* Unregisters a disconnected client from the active clients registry map.
* If the client had an active session, it triggers call termination.
* @param {Client} client The Client connection instance.
*/
unregister(client) {
if (client.userID) {
// Check if the registered instance matches the client being removed
if (this.clients.get(client.userID) === client) {
this.clients.delete(client.userID);
logger.info('client_disconnected', {
user_id: client.userID,
session_id: client.sessionID,
remote_ip: client.remoteIP()
});
}
}
// If client was part of an active session, end the session since one participant disconnected
if (client.sessionID) {
this.endSession(client.sessionID, client.role);
}
}
/**
* Ends an active call session because one of the participants disconnected.
* Cleans up timeouts, notifies the counterparty, and deletes session records.
* @param {string} sessionID The unique identifier of the call session.
* @param {string} leftRole The role of the client who disconnected ('driver' or 'passenger').
*/
endSession(sessionID, leftRole) {
const sess = this.store.getSession(sessionID);
if (!sess) return;
if (sess.status === 'ended') return;
sess.status = 'ended';
// Log the disconnection event and reason to the database
logSessionEnded(sessionID, `disconnect_${leftRole}`);
// Stop the 60 seconds hard call duration timer
if (sess.timer) {
sess.timer.stop();
sess.timer = null;
}
logger.info('session_ended_by_disconnect', {
session_id: sessionID,
left_role: leftRole
});
// Notify the remaining peer that their partner left, and close their connection
const peer = leftRole === 'driver' ? sess.passengerConn : sess.driverConn;
if (peer) {
peer.send(protocol.newParticipantLeft(leftRole));
peer.close();
}
// Evict the session from the active store index
this.store.destroySession(sessionID);
}
/**
* Forces a session to end due to reaching the maximum call duration (e.g. 60 seconds).
* @param {string} sessionID The unique identifier of the call session.
* @param {string} reason The termination reason code (usually 'max_duration_reached').
*/
forceEndSession(sessionID, reason) {
const sess = this.store.getSession(sessionID);
if (!sess) return;
if (sess.status === 'ended') return;
sess.status = 'ended';
// Log the timeout event to the database
logSessionEnded(sessionID, reason);
if (sess.timer) {
sess.timer.stop();
sess.timer = null;
}
logger.info('call_timeout', {
session_id: sessionID,
reason: reason
});
// Construct and transmit the timeout signal to both participants
const timeoutMsg = protocol.newCallTimeout(reason);
if (sess.driverConn) {
sess.driverConn.send(timeoutMsg);
sess.driverConn.close();
}
if (sess.passengerConn) {
sess.passengerConn.send(timeoutMsg);
sess.passengerConn.close();
}
this.store.destroySession(sessionID);
}
/**
* Ends an active call session because one of the participants hung up.
* @param {string} sessionID The unique identifier of the call session.
*/
userEndCall(sessionID) {
const sess = this.store.getSession(sessionID);
if (!sess) return;
if (sess.status === 'ended') return;
sess.status = 'ended';
// Log the hangup event to the database
logSessionEnded(sessionID, 'user_terminated');
if (sess.timer) {
sess.timer.stop();
sess.timer = null;
}
logger.info('call_ended', {
session_id: sessionID,
reason: 'user_terminated'
});
// Notify both participants that the call was hung up
const endedMsg = protocol.newCallEnded('user_terminated');
if (sess.driverConn) {
sess.driverConn.send(endedMsg);
sess.driverConn.close();
}
if (sess.passengerConn) {
sess.passengerConn.send(endedMsg);
sess.passengerConn.close();
}
this.store.destroySession(sessionID);
}
/**
* Gracefully shuts down all connected clients (invoked during server shutdown).
*/
shutdownGracefully() {
logger.info('server_shutdown_alert_clients');
for (const client of this.clients.values()) {
client.send(protocol.newCallEnded('server_shutdown'));
client.close();
}
}
/**
* Sweeps and shuts down any sessions that have exceeded their expiration timestamps.
*/
sweepExpiredSessions() {
const expired = this.store.getExpiredSessions();
for (const sess of expired) {
this.forceEndSession(sess.sessionID, 'max_duration_reached');
}
}
/**
* Returns the count of currently connected and authenticated WebSocket client connections.
* @returns {number} Active client connections count.
*/
getConnectedClientsCount() {
return this.clients.size;
}
/**
* Processes incoming raw WebSocket message payloads.
* Handles client authentication and parses signalling messages.
* @param {Client} client The Client socket wrapper that received the message.
* @param {Buffer|string} msgBytes The raw message payload bytes.
*/
handleMessage(client, msgBytes) {
let base;
try {
base = JSON.parse(msgBytes.toString());
} catch (err) {
client.send(protocol.newError(protocol.ErrTokenInvalid, 'Invalid JSON format'));
return;
}
// 1. Enforce Authentication (Must be the very first message sent by a client connection)
if (!client.userID) {
if (base.type !== protocol.TypeAuthenticate) {
client.send(protocol.newError(protocol.ErrTokenInvalid, 'Authentication required'));
client.close();
return;
}
const { session_id, user_id } = base;
if (!session_id || !user_id) {
client.send(protocol.newError(protocol.ErrTokenInvalid, 'Missing session_id or user_id'));
client.close();
return;
}
// Check if the session exists in the memory store
const sess = this.store.getSession(session_id);
if (!sess) {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'No active session found'));
client.close();
return;
}
if (sess.status === 'ended') {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session already ended'));
client.close();
return;
}
// Authorize client: user_id must match the driverID or passengerID pre-created for this session
let role = '';
if (user_id === sess.driverID) {
role = 'driver';
if (sess.driverConn) {
client.send(protocol.newError(protocol.ErrSessionExists, 'Driver already connected'));
client.close();
return;
}
} else if (user_id === sess.passengerID) {
role = 'passenger';
if (sess.passengerConn) {
client.send(protocol.newError(protocol.ErrSessionExists, 'Passenger already connected'));
client.close();
return;
}
} else {
client.send(protocol.newError(protocol.ErrUnauthorizedUser, 'User not authorized for this session'));
client.close();
return;
}
// Kick out any duplicate connections for this user ID
if (this.clients.has(user_id)) {
const existing = this.clients.get(user_id);
if (existing) {
existing.close();
}
}
// Bind session context to the Client object
client.userID = user_id;
client.sessionID = session_id;
client.role = role;
client.rideID = sess.rideID;
// Register the client in the hub's active connections map
this.clients.set(client.userID, client);
// Register connection in the session
if (role === 'driver') {
sess.driverConn = client;
} else {
sess.passengerConn = client;
}
// Log a warning if the connection IP does not match the pre-registered IP
const connectionIP = client.remoteIP();
const expectedIP = role === 'driver' ? sess.driverIP : sess.passengerIP;
if (expectedIP && expectedIP !== '127.0.0.1' && expectedIP !== 'localhost' && connectionIP !== expectedIP) {
logger.warn('client_ip_mismatch', {
user_id: user_id,
session_id: session_id,
role: role,
expected_ip: expectedIP,
connection_ip: connectionIP
});
}
logger.info('client_authenticated', {
user_id: client.userID,
session_id: client.sessionID,
role: client.role,
remote_ip: connectionIP
});
// Prepare ICE servers configuration dynamically
const iceServers = [
{ urls: 'stun:stun.l.google.com:19302' },
{ urls: 'stun:stun1.l.google.com:19302' }
];
if (config.turnUrl) {
iceServers.push({
urls: `${config.turnUrl}?transport=udp`,
username: config.turnUsername,
credential: config.turnCredential
});
iceServers.push({
urls: `${config.turnUrl}?transport=tcp`,
username: config.turnUsername,
credential: config.turnCredential
});
}
// Confirm successful authentication to the client with the dynamic ICE servers configuration
client.send(protocol.newAuthenticated(client.userID, iceServers));
// Trigger WebRTC active session joined if both are connected
if (sess.driverConn && sess.passengerConn) {
sess.status = 'active';
const driverConnIP = sess.driverConn.remoteIP();
const passengerConnIP = sess.passengerConn.remoteIP();
logSessionActive(session_id, driverConnIP, passengerConnIP);
logger.info('session_joined_active', {
session_id: session_id,
ride_id: sess.rideID,
driver_conn_ip: driverConnIP,
passenger_conn_ip: passengerConnIP
});
// Notify both clients that they have successfully joined the active session
const joinedMsg = protocol.newSessionJoined(session_id, sess.rideID);
sess.driverConn.send(joinedMsg);
sess.passengerConn.send(joinedMsg);
// Send participant joined alerts to trigger WebRTC peer-to-peer negotiation
sess.driverConn.send(protocol.newParticipantJoined('passenger'));
sess.passengerConn.send(protocol.newParticipantJoined('driver'));
}
return;
}
// 2. Process authenticated WebRTC signaling payloads
switch (base.type) {
case protocol.TypeOffer:
this.handleOffer(client, base);
break;
case protocol.TypeAnswer:
this.handleAnswer(client, base);
break;
case protocol.TypeICECandidate:
this.handleICECandidate(client, base);
break;
case protocol.TypeHeartbeat:
client.send(protocol.newPong()); // Reply with pong frame
break;
case protocol.TypeEndCall:
this.handleEndCall(client, base);
break;
default:
client.send(protocol.newError('bad_request', 'Unknown message type'));
}
}
/**
* Relays a WebRTC SDP Offer message to the peer connection.
* Also identifies and logs the caller (initiator) of the call in the database on the first offer.
* @param {Client} client The Client instance that sent the offer.
* @param {object} m The offer message object containing the SDP.
*/
handleOffer(client, m) {
const sess = this.store.getSession(client.sessionID);
if (!sess) {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session not found'));
return;
}
// Log initiator of the call in the database (the user who sends the first WebRTC offer)
if (!sess.initiatedBy) {
sess.initiatedBy = client.userID;
logSessionInitiator(client.sessionID, client.userID);
}
const target = client.role === 'driver' ? sess.passengerConn : sess.driverConn;
if (!target) {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'Peer not connected'));
return;
}
target.send(protocol.newRelayOffer(m.sdp));
}
/**
* Relays a WebRTC SDP Answer message back to the peer.
* @param {Client} client The Client instance that sent the answer.
* @param {object} m The answer message object containing the SDP.
*/
handleAnswer(client, m) {
const sess = this.store.getSession(client.sessionID);
if (!sess) {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session not found'));
return;
}
const target = client.role === 'driver' ? sess.passengerConn : sess.driverConn;
if (!target) {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'Peer not connected'));
return;
}
target.send(protocol.newRelayAnswer(m.sdp));
}
/**
* Relays a WebRTC ICE candidate message to the peer.
* @param {Client} client The Client instance that sent the candidate.
* @param {object} m The ICE candidate message object.
*/
handleICECandidate(client, m) {
const sess = this.store.getSession(client.sessionID);
if (!sess) {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session not found'));
return;
}
const target = client.role === 'driver' ? sess.passengerConn : sess.driverConn;
if (!target) {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'Peer not connected'));
return;
}
target.send(protocol.newRelayICE(m.candidate));
}
/**
* Handles user-triggered end_call action.
* @param {Client} client The Client instance that requested to end the call.
* @param {object} m The end_call message object.
*/
handleEndCall(client, m) {
this.userEndCall(client.sessionID);
}
}