import { logger } from '../logger/logger.js'; import * as protocol from '../protocol/messages.js'; import { logSessionActive, logSessionInitiator, logSessionEnded } from '../db/db.js'; import { config } from '../config/config.js'; /** * Hub is the central coordinator for all active call sessions and client sockets. * It manages connection mappings, routing of WebRTC signaling payloads (Offer, Answer, ICE), * enforcing call session timers, and logging call connectivity changes to the database. */ export class Hub { /** * Initializes the Hub instance. * @param {object} store The session memory store that holds active call profiles. */ constructor(store) { this.clients = new Map(); // Map storing authenticated socket wrappers: Key is userID, Value is Client instance this.store = store; // Reference to the active sessions in-memory storage // Background job: Periodically scans the session store every 10 seconds to close expired sessions this.sweepInterval = setInterval(() => this.sweepExpiredSessions(), 10000); if (this.sweepInterval.unref) { this.sweepInterval.unref(); // Prevent this interval from keeping the Node process alive on exit } } /** * Unregisters a disconnected client from the active clients registry map. * If the client had an active session, it triggers call termination. * @param {Client} client The Client connection instance. */ unregister(client) { if (client.userID) { // Check if the registered instance matches the client being removed if (this.clients.get(client.userID) === client) { this.clients.delete(client.userID); logger.info('client_disconnected', { user_id: client.userID, session_id: client.sessionID, remote_ip: client.remoteIP() }); } } // If client was part of an active session, end the session since one participant disconnected if (client.sessionID) { this.endSession(client.sessionID, client.role); } } /** * Ends an active call session because one of the participants disconnected. * Cleans up timeouts, notifies the counterparty, and deletes session records. * @param {string} sessionID The unique identifier of the call session. * @param {string} leftRole The role of the client who disconnected ('driver' or 'passenger'). */ endSession(sessionID, leftRole) { const sess = this.store.getSession(sessionID); if (!sess) return; if (sess.status === 'ended') return; sess.status = 'ended'; // Log the disconnection event and reason to the database logSessionEnded(sessionID, `disconnect_${leftRole}`); // Stop the 60 seconds hard call duration timer if (sess.timer) { sess.timer.stop(); sess.timer = null; } logger.info('session_ended_by_disconnect', { session_id: sessionID, left_role: leftRole }); // Notify the remaining peer that their partner left, and close their connection const peer = leftRole === 'driver' ? sess.passengerConn : sess.driverConn; if (peer) { peer.send(protocol.newParticipantLeft(leftRole)); peer.close(); } // Evict the session from the active store index this.store.destroySession(sessionID); } /** * Forces a session to end due to reaching the maximum call duration (e.g. 60 seconds). * @param {string} sessionID The unique identifier of the call session. * @param {string} reason The termination reason code (usually 'max_duration_reached'). */ forceEndSession(sessionID, reason) { const sess = this.store.getSession(sessionID); if (!sess) return; if (sess.status === 'ended') return; sess.status = 'ended'; // Log the timeout event to the database logSessionEnded(sessionID, reason); if (sess.timer) { sess.timer.stop(); sess.timer = null; } logger.info('call_timeout', { session_id: sessionID, reason: reason }); // Construct and transmit the timeout signal to both participants const timeoutMsg = protocol.newCallTimeout(reason); if (sess.driverConn) { sess.driverConn.send(timeoutMsg); sess.driverConn.close(); } if (sess.passengerConn) { sess.passengerConn.send(timeoutMsg); sess.passengerConn.close(); } this.store.destroySession(sessionID); } /** * Ends an active call session because one of the participants hung up. * @param {string} sessionID The unique identifier of the call session. */ userEndCall(sessionID) { const sess = this.store.getSession(sessionID); if (!sess) return; if (sess.status === 'ended') return; sess.status = 'ended'; // Log the hangup event to the database logSessionEnded(sessionID, 'user_terminated'); if (sess.timer) { sess.timer.stop(); sess.timer = null; } logger.info('call_ended', { session_id: sessionID, reason: 'user_terminated' }); // Notify both participants that the call was hung up const endedMsg = protocol.newCallEnded('user_terminated'); if (sess.driverConn) { sess.driverConn.send(endedMsg); sess.driverConn.close(); } if (sess.passengerConn) { sess.passengerConn.send(endedMsg); sess.passengerConn.close(); } this.store.destroySession(sessionID); } /** * Gracefully shuts down all connected clients (invoked during server shutdown). */ shutdownGracefully() { logger.info('server_shutdown_alert_clients'); for (const client of this.clients.values()) { client.send(protocol.newCallEnded('server_shutdown')); client.close(); } } /** * Sweeps and shuts down any sessions that have exceeded their expiration timestamps. */ sweepExpiredSessions() { const expired = this.store.getExpiredSessions(); for (const sess of expired) { this.forceEndSession(sess.sessionID, 'max_duration_reached'); } } /** * Returns the count of currently connected and authenticated WebSocket client connections. * @returns {number} Active client connections count. */ getConnectedClientsCount() { return this.clients.size; } /** * Processes incoming raw WebSocket message payloads. * Handles client authentication and parses signalling messages. * @param {Client} client The Client socket wrapper that received the message. * @param {Buffer|string} msgBytes The raw message payload bytes. */ handleMessage(client, msgBytes) { let base; try { base = JSON.parse(msgBytes.toString()); } catch (err) { client.send(protocol.newError(protocol.ErrTokenInvalid, 'Invalid JSON format')); return; } // 1. Enforce Authentication (Must be the very first message sent by a client connection) if (!client.userID) { if (base.type !== protocol.TypeAuthenticate) { client.send(protocol.newError(protocol.ErrTokenInvalid, 'Authentication required')); setTimeout(() => client.close(), 200); return; } const { session_id, user_id } = base; if (!session_id || !user_id) { client.send(protocol.newError(protocol.ErrTokenInvalid, 'Missing session_id or user_id')); setTimeout(() => client.close(), 200); return; } // Check if the session exists in the memory store const sess = this.store.getSession(session_id); if (!sess) { client.send(protocol.newError(protocol.ErrSessionNotFound, 'No active session found')); setTimeout(() => client.close(), 200); return; } if (sess.status === 'ended') { client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session already ended')); setTimeout(() => client.close(), 200); return; } // Authorize client: user_id must match the driverID or passengerID pre-created for this session let role = ''; if (user_id === sess.driverID) { role = 'driver'; if (sess.driverConn) { client.send(protocol.newError(protocol.ErrSessionExists, 'Driver already connected')); setTimeout(() => client.close(), 200); return; } } else if (user_id === sess.passengerID) { role = 'passenger'; if (sess.passengerConn) { client.send(protocol.newError(protocol.ErrSessionExists, 'Passenger already connected')); setTimeout(() => client.close(), 200); return; } } else { client.send(protocol.newError(protocol.ErrUnauthorizedUser, `User not authorized for this session. Got: ${user_id}, expected driver: ${sess.driverID} or passenger: ${sess.passengerID}`)); setTimeout(() => client.close(), 200); return; } // Kick out any duplicate connections for this user ID if (this.clients.has(user_id)) { const existing = this.clients.get(user_id); if (existing) { existing.close(); } } // Bind session context to the Client object client.userID = user_id; client.sessionID = session_id; client.role = role; client.rideID = sess.rideID; // Register the client in the hub's active connections map this.clients.set(client.userID, client); // Register connection in the session if (role === 'driver') { sess.driverConn = client; } else { sess.passengerConn = client; } // Log a warning if the connection IP does not match the pre-registered IP const connectionIP = client.remoteIP(); const expectedIP = role === 'driver' ? sess.driverIP : sess.passengerIP; if (expectedIP && expectedIP !== '127.0.0.1' && expectedIP !== 'localhost' && connectionIP !== expectedIP) { logger.warn('client_ip_mismatch', { user_id: user_id, session_id: session_id, role: role, expected_ip: expectedIP, connection_ip: connectionIP }); } logger.info('client_authenticated', { user_id: client.userID, session_id: client.sessionID, role: client.role, remote_ip: connectionIP }); // Prepare ICE servers configuration dynamically const iceServers = [ { urls: 'stun:stun.l.google.com:19302' }, { urls: 'stun:stun1.l.google.com:19302' } ]; if (config.turnUrl) { iceServers.push({ urls: `${config.turnUrl}?transport=udp`, username: config.turnUsername, credential: config.turnCredential }); iceServers.push({ urls: `${config.turnUrl}?transport=tcp`, username: config.turnUsername, credential: config.turnCredential }); } // Confirm successful authentication to the client with the dynamic ICE servers configuration client.send(protocol.newAuthenticated(client.userID, iceServers)); // Trigger WebRTC active session joined if both are connected if (sess.driverConn && sess.passengerConn) { sess.status = 'active'; const driverConnIP = sess.driverConn.remoteIP(); const passengerConnIP = sess.passengerConn.remoteIP(); logSessionActive(session_id, driverConnIP, passengerConnIP); logger.info('session_joined_active', { session_id: session_id, ride_id: sess.rideID, driver_conn_ip: driverConnIP, passenger_conn_ip: passengerConnIP }); // Notify both clients that they have successfully joined the active session const joinedMsg = protocol.newSessionJoined(session_id, sess.rideID); sess.driverConn.send(joinedMsg); sess.passengerConn.send(joinedMsg); // Send participant joined alerts to trigger WebRTC peer-to-peer negotiation sess.driverConn.send(protocol.newParticipantJoined('passenger')); sess.passengerConn.send(protocol.newParticipantJoined('driver')); } return; } // 2. Process authenticated WebRTC signaling payloads switch (base.type) { case protocol.TypeOffer: this.handleOffer(client, base); break; case protocol.TypeAnswer: this.handleAnswer(client, base); break; case protocol.TypeICECandidate: this.handleICECandidate(client, base); break; case protocol.TypeHeartbeat: client.send(protocol.newPong()); // Reply with pong frame break; case protocol.TypeEndCall: this.handleEndCall(client, base); break; default: client.send(protocol.newError('bad_request', 'Unknown message type')); } } /** * Relays a WebRTC SDP Offer message to the peer connection. * Also identifies and logs the caller (initiator) of the call in the database on the first offer. * @param {Client} client The Client instance that sent the offer. * @param {object} m The offer message object containing the SDP. */ handleOffer(client, m) { const sess = this.store.getSession(client.sessionID); if (!sess) { client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session not found')); return; } // Log initiator of the call in the database (the user who sends the first WebRTC offer) if (!sess.initiatedBy) { sess.initiatedBy = client.userID; logSessionInitiator(client.sessionID, client.userID); } const target = client.role === 'driver' ? sess.passengerConn : sess.driverConn; if (!target) { client.send(protocol.newError(protocol.ErrSessionNotFound, 'Peer not connected')); return; } target.send(protocol.newRelayOffer(m.sdp)); } /** * Relays a WebRTC SDP Answer message back to the peer. * @param {Client} client The Client instance that sent the answer. * @param {object} m The answer message object containing the SDP. */ handleAnswer(client, m) { const sess = this.store.getSession(client.sessionID); if (!sess) { client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session not found')); return; } const target = client.role === 'driver' ? sess.passengerConn : sess.driverConn; if (!target) { client.send(protocol.newError(protocol.ErrSessionNotFound, 'Peer not connected')); return; } target.send(protocol.newRelayAnswer(m.sdp)); } /** * Relays a WebRTC ICE candidate message to the peer. * @param {Client} client The Client instance that sent the candidate. * @param {object} m The ICE candidate message object. */ handleICECandidate(client, m) { const sess = this.store.getSession(client.sessionID); if (!sess) { client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session not found')); return; } const target = client.role === 'driver' ? sess.passengerConn : sess.driverConn; if (!target) { client.send(protocol.newError(protocol.ErrSessionNotFound, 'Peer not connected')); return; } target.send(protocol.newRelayICE(m.candidate)); } /** * Handles user-triggered end_call action. * @param {Client} client The Client instance that requested to end the call. * @param {object} m The end_call message object. */ handleEndCall(client, m) { this.userEndCall(client.sessionID); } }