Files
2026-05-29 01:27:20 +03:00

455 lines
15 KiB
JavaScript

import { logger } from '../logger/logger.js';
import * as protocol from '../protocol/messages.js';
import { logSessionActive, logSessionInitiator, logSessionEnded } from '../db/db.js';
import { config } from '../config/config.js';
/**
* Hub is the central coordinator for all active call sessions and client sockets.
* It manages connection mappings, routing of WebRTC signaling payloads (Offer, Answer, ICE),
* enforcing call session timers, and logging call connectivity changes to the database.
*/
export class Hub {
/**
* Initializes the Hub instance.
* @param {object} store The session memory store that holds active call profiles.
*/
constructor(store) {
this.clients = new Map(); // Map storing authenticated socket wrappers: Key is userID, Value is Client instance
this.store = store; // Reference to the active sessions in-memory storage
// Background job: Periodically scans the session store every 10 seconds to close expired sessions
this.sweepInterval = setInterval(() => this.sweepExpiredSessions(), 10000);
if (this.sweepInterval.unref) {
this.sweepInterval.unref(); // Prevent this interval from keeping the Node process alive on exit
}
}
/**
* Unregisters a disconnected client from the active clients registry map.
* If the client had an active session, it triggers call termination.
* @param {Client} client The Client connection instance.
*/
unregister(client) {
if (client.userID) {
// Check if the registered instance matches the client being removed
if (this.clients.get(client.userID) === client) {
this.clients.delete(client.userID);
logger.info('client_disconnected', {
user_id: client.userID,
session_id: client.sessionID,
remote_ip: client.remoteIP()
});
}
}
// If client was part of an active session, end the session since one participant disconnected
if (client.sessionID) {
this.endSession(client.sessionID, client.role);
}
}
/**
* Ends an active call session because one of the participants disconnected.
* Cleans up timeouts, notifies the counterparty, and deletes session records.
* @param {string} sessionID The unique identifier of the call session.
* @param {string} leftRole The role of the client who disconnected ('driver' or 'passenger').
*/
endSession(sessionID, leftRole) {
const sess = this.store.getSession(sessionID);
if (!sess) return;
if (sess.status === 'ended') return;
sess.status = 'ended';
// Log the disconnection event and reason to the database
logSessionEnded(sessionID, `disconnect_${leftRole}`);
// Stop the 60 seconds hard call duration timer
if (sess.timer) {
sess.timer.stop();
sess.timer = null;
}
logger.info('session_ended_by_disconnect', {
session_id: sessionID,
left_role: leftRole
});
// Notify the remaining peer that their partner left, and close their connection
const peer = leftRole === 'driver' ? sess.passengerConn : sess.driverConn;
if (peer) {
peer.send(protocol.newParticipantLeft(leftRole));
peer.close();
}
// Evict the session from the active store index
this.store.destroySession(sessionID);
}
/**
* Forces a session to end due to reaching the maximum call duration (e.g. 60 seconds).
* @param {string} sessionID The unique identifier of the call session.
* @param {string} reason The termination reason code (usually 'max_duration_reached').
*/
forceEndSession(sessionID, reason) {
const sess = this.store.getSession(sessionID);
if (!sess) return;
if (sess.status === 'ended') return;
sess.status = 'ended';
// Log the timeout event to the database
logSessionEnded(sessionID, reason);
if (sess.timer) {
sess.timer.stop();
sess.timer = null;
}
logger.info('call_timeout', {
session_id: sessionID,
reason: reason
});
// Construct and transmit the timeout signal to both participants
const timeoutMsg = protocol.newCallTimeout(reason);
if (sess.driverConn) {
sess.driverConn.send(timeoutMsg);
sess.driverConn.close();
}
if (sess.passengerConn) {
sess.passengerConn.send(timeoutMsg);
sess.passengerConn.close();
}
this.store.destroySession(sessionID);
}
/**
* Ends an active call session because one of the participants hung up.
* @param {string} sessionID The unique identifier of the call session.
*/
userEndCall(sessionID) {
const sess = this.store.getSession(sessionID);
if (!sess) return;
if (sess.status === 'ended') return;
sess.status = 'ended';
// Log the hangup event to the database
logSessionEnded(sessionID, 'user_terminated');
if (sess.timer) {
sess.timer.stop();
sess.timer = null;
}
logger.info('call_ended', {
session_id: sessionID,
reason: 'user_terminated'
});
// Notify both participants that the call was hung up
const endedMsg = protocol.newCallEnded('user_terminated');
if (sess.driverConn) {
sess.driverConn.send(endedMsg);
sess.driverConn.close();
}
if (sess.passengerConn) {
sess.passengerConn.send(endedMsg);
sess.passengerConn.close();
}
this.store.destroySession(sessionID);
}
/**
* Gracefully shuts down all connected clients (invoked during server shutdown).
*/
shutdownGracefully() {
logger.info('server_shutdown_alert_clients');
for (const client of this.clients.values()) {
client.send(protocol.newCallEnded('server_shutdown'));
client.close();
}
}
/**
* Sweeps and shuts down any sessions that have exceeded their expiration timestamps.
*/
sweepExpiredSessions() {
const expired = this.store.getExpiredSessions();
for (const sess of expired) {
this.forceEndSession(sess.sessionID, 'max_duration_reached');
}
}
/**
* Returns the count of currently connected and authenticated WebSocket client connections.
* @returns {number} Active client connections count.
*/
getConnectedClientsCount() {
return this.clients.size;
}
/**
* Processes incoming raw WebSocket message payloads.
* Handles client authentication and parses signalling messages.
* @param {Client} client The Client socket wrapper that received the message.
* @param {Buffer|string} msgBytes The raw message payload bytes.
*/
handleMessage(client, msgBytes) {
let base;
try {
base = JSON.parse(msgBytes.toString());
} catch (err) {
client.send(protocol.newError(protocol.ErrTokenInvalid, 'Invalid JSON format'));
return;
}
// 1. Enforce Authentication (Must be the very first message sent by a client connection)
if (!client.userID) {
if (base.type !== protocol.TypeAuthenticate) {
client.send(protocol.newError(protocol.ErrTokenInvalid, 'Authentication required'));
setTimeout(() => client.close(), 200);
return;
}
const { session_id, user_id } = base;
if (!session_id || !user_id) {
client.send(protocol.newError(protocol.ErrTokenInvalid, 'Missing session_id or user_id'));
setTimeout(() => client.close(), 200);
return;
}
// Check if the session exists in the memory store
const sess = this.store.getSession(session_id);
if (!sess) {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'No active session found'));
setTimeout(() => client.close(), 200);
return;
}
if (sess.status === 'ended') {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session already ended'));
setTimeout(() => client.close(), 200);
return;
}
// Authorize client: user_id must match the driverID or passengerID pre-created for this session
let role = '';
if (user_id === sess.driverID) {
role = 'driver';
if (sess.driverConn) {
client.send(protocol.newError(protocol.ErrSessionExists, 'Driver already connected'));
setTimeout(() => client.close(), 200);
return;
}
} else if (user_id === sess.passengerID) {
role = 'passenger';
if (sess.passengerConn) {
client.send(protocol.newError(protocol.ErrSessionExists, 'Passenger already connected'));
setTimeout(() => client.close(), 200);
return;
}
} else {
client.send(protocol.newError(protocol.ErrUnauthorizedUser, `User not authorized for this session. Got: ${user_id}, expected driver: ${sess.driverID} or passenger: ${sess.passengerID}`));
setTimeout(() => client.close(), 200);
return;
}
// Kick out any duplicate connections for this user ID
if (this.clients.has(user_id)) {
const existing = this.clients.get(user_id);
if (existing) {
existing.close();
}
}
// Bind session context to the Client object
client.userID = user_id;
client.sessionID = session_id;
client.role = role;
client.rideID = sess.rideID;
// Register the client in the hub's active connections map
this.clients.set(client.userID, client);
// Register connection in the session
if (role === 'driver') {
sess.driverConn = client;
} else {
sess.passengerConn = client;
}
// Log a warning if the connection IP does not match the pre-registered IP
const connectionIP = client.remoteIP();
const expectedIP = role === 'driver' ? sess.driverIP : sess.passengerIP;
if (expectedIP && expectedIP !== '127.0.0.1' && expectedIP !== 'localhost' && connectionIP !== expectedIP) {
logger.warn('client_ip_mismatch', {
user_id: user_id,
session_id: session_id,
role: role,
expected_ip: expectedIP,
connection_ip: connectionIP
});
}
logger.info('client_authenticated', {
user_id: client.userID,
session_id: client.sessionID,
role: client.role,
remote_ip: connectionIP
});
// Prepare ICE servers configuration dynamically
const iceServers = [
{ urls: 'stun:stun.l.google.com:19302' },
{ urls: 'stun:stun1.l.google.com:19302' }
];
if (config.turnUrl) {
iceServers.push({
urls: `${config.turnUrl}?transport=udp`,
username: config.turnUsername,
credential: config.turnCredential
});
iceServers.push({
urls: `${config.turnUrl}?transport=tcp`,
username: config.turnUsername,
credential: config.turnCredential
});
}
// Confirm successful authentication to the client with the dynamic ICE servers configuration
client.send(protocol.newAuthenticated(client.userID, iceServers));
// Trigger WebRTC active session joined if both are connected
if (sess.driverConn && sess.passengerConn) {
sess.status = 'active';
const driverConnIP = sess.driverConn.remoteIP();
const passengerConnIP = sess.passengerConn.remoteIP();
logSessionActive(session_id, driverConnIP, passengerConnIP);
logger.info('session_joined_active', {
session_id: session_id,
ride_id: sess.rideID,
driver_conn_ip: driverConnIP,
passenger_conn_ip: passengerConnIP
});
// Notify both clients that they have successfully joined the active session
const joinedMsg = protocol.newSessionJoined(session_id, sess.rideID);
sess.driverConn.send(joinedMsg);
sess.passengerConn.send(joinedMsg);
// Send participant joined alerts to trigger WebRTC peer-to-peer negotiation
sess.driverConn.send(protocol.newParticipantJoined('passenger'));
sess.passengerConn.send(protocol.newParticipantJoined('driver'));
}
return;
}
// 2. Process authenticated WebRTC signaling payloads
switch (base.type) {
case protocol.TypeOffer:
this.handleOffer(client, base);
break;
case protocol.TypeAnswer:
this.handleAnswer(client, base);
break;
case protocol.TypeICECandidate:
this.handleICECandidate(client, base);
break;
case protocol.TypeHeartbeat:
client.send(protocol.newPong()); // Reply with pong frame
break;
case protocol.TypeEndCall:
this.handleEndCall(client, base);
break;
default:
client.send(protocol.newError('bad_request', 'Unknown message type'));
}
}
/**
* Relays a WebRTC SDP Offer message to the peer connection.
* Also identifies and logs the caller (initiator) of the call in the database on the first offer.
* @param {Client} client The Client instance that sent the offer.
* @param {object} m The offer message object containing the SDP.
*/
handleOffer(client, m) {
const sess = this.store.getSession(client.sessionID);
if (!sess) {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session not found'));
return;
}
// Log initiator of the call in the database (the user who sends the first WebRTC offer)
if (!sess.initiatedBy) {
sess.initiatedBy = client.userID;
logSessionInitiator(client.sessionID, client.userID);
}
const target = client.role === 'driver' ? sess.passengerConn : sess.driverConn;
if (!target) {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'Peer not connected'));
return;
}
target.send(protocol.newRelayOffer(m.sdp));
}
/**
* Relays a WebRTC SDP Answer message back to the peer.
* @param {Client} client The Client instance that sent the answer.
* @param {object} m The answer message object containing the SDP.
*/
handleAnswer(client, m) {
const sess = this.store.getSession(client.sessionID);
if (!sess) {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session not found'));
return;
}
const target = client.role === 'driver' ? sess.passengerConn : sess.driverConn;
if (!target) {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'Peer not connected'));
return;
}
target.send(protocol.newRelayAnswer(m.sdp));
}
/**
* Relays a WebRTC ICE candidate message to the peer.
* @param {Client} client The Client instance that sent the candidate.
* @param {object} m The ICE candidate message object.
*/
handleICECandidate(client, m) {
const sess = this.store.getSession(client.sessionID);
if (!sess) {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session not found'));
return;
}
const target = client.role === 'driver' ? sess.passengerConn : sess.driverConn;
if (!target) {
client.send(protocol.newError(protocol.ErrSessionNotFound, 'Peer not connected'));
return;
}
target.send(protocol.newRelayICE(m.candidate));
}
/**
* Handles user-triggered end_call action.
* @param {Client} client The Client instance that requested to end the call.
* @param {object} m The end_call message object.
*/
handleEndCall(client, m) {
this.userEndCall(client.sessionID);
}
}