commit 87ec54bbd7d64d4a3d87b59e8b262909be3f5361 Author: Hamza-Ayed Date: Fri May 29 01:06:47 2026 +0300 feat: initial commit at project root diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..8187d57 --- /dev/null +++ b/.env.example @@ -0,0 +1,23 @@ +SERVER_ADDR=127.0.0.1:47880 +API_KEY=replace_with_min_64_char_random_string_here +LOG_LEVEL=info +MAX_MESSAGE_BYTES=4096 +HEARTBEAT_INTERVAL=20s +HEARTBEAT_TIMEOUT=30s +SESSION_DURATION=60s +RATE_LIMIT_PER_MIN=10 +ENVIRONMENT=production + +# Database configuration +DB_HOST=127.0.0.1 +DB_PORT=3306 +DB_DATABASE=callDB +DB_USERNAME=callDbUser +DB_PASSWORD=replace_with_db_password + +# Coturn TURN server settings +TURN_URL=turn:turn.intaleqapp.com:3478 +TURN_USERNAME=intaleq_call_user +TURN_CREDENTIAL=intaleq_call_password_secret + + diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..50b3bda --- /dev/null +++ b/.gitignore @@ -0,0 +1,30 @@ +# Binaries and build outputs +/voice-call-service +*.exe +*.exe~ +*.dll +*.so +*.dylib +*.test +*.out + +# Environment variables containing secrets +.env +.env.local +.env.production +.env.development + +# Sandbox isolation caches +.go/ +.gocache/ +.tmp/ +.home/ + +# IDE and OS settings +.DS_Store +.vscode/ +.idea/ + +# Node.js dependencies +node_modules/ +npm-debug.log diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..491b6f8 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,30 @@ +# Stage 1: Build & install production dependencies +FROM node:20-alpine AS builder + +WORKDIR /app + +# Copy dependency configs +COPY package.json ./ + +# Install only production dependencies +RUN npm install --only=production + +# Stage 2: Final runtime container +FROM node:20-alpine + +WORKDIR /app + +# Copy built node_modules +COPY --from=builder /app/node_modules ./node_modules +COPY package.json ./ +COPY cmd/ ./cmd/ +COPY internal/ ./internal/ + +# Use the built-in non-root user 'node' (UID 1000) +USER node + +# Expose backend service port +EXPOSE 47880 + +# Execute server +CMD ["node", "cmd/server/main.js"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..bbf0bde --- /dev/null +++ b/README.md @@ -0,0 +1,244 @@ +# Intaleq Voice Call Signaling Backend (API Key & Session Pre-creation) + +Production-ready WebRTC signaling server in Node.js for the Intaleq ride-hailing application. +Coordinates secure peer-to-peer audio calls between driver and passenger during active rides only. +Sessions are pre-created by the main application backend over an authenticated HTTP endpoint, allowing mobile clients to connect using temporary session IDs without dealing with JWTs. + +## Prerequisites + +- **Node.js 20+** (for local development) +- **Docker** and **Docker Compose** (for production deployments) +- **Nginx** (acting as reverse proxy for WSS upgrade and SSL termination) +- **Let's Encrypt** (for SSL certificates) + +## Local Development + +1. **Clone and Navigate** + Ensure you are in the `voice-call-service` directory. + +2. **Setup Configuration** + Copy the example environment file and configure variables: + ```bash + cp .env.example .env + ``` + *Note: Ensure `API_KEY` is at least 32 characters long.* + +3. **Install Dependencies** + ```bash + npm install + ``` + +4. **Run Server** + Since Node.js 20.6.0+, you can load `.env` files natively using the `--env-file` flag: + ```bash + node --env-file=.env cmd/server/main.js + ``` + The signaling server will start and bind locally to `127.0.0.1:47880`. + +## Docker Deployment + +Deploying with Docker isolates dependencies and bounds resources to 128MB. + +1. **Deploy Command** + Run the deployment automation script: + ```bash + ./deploy.sh + ``` + This script will verify your configuration, build a secure multi-stage container running under user `node` (UID 1000), deploy it in host networking mode, and verify the service's health. + +2. **Manual Docker Compose Commands** + Alternatively, you can build and run it manually: + ```bash + docker-compose up -d --build + ``` + +## Nginx Configuration + +An Nginx configuration block should be configured on your CloudPanel/hosting server for `calls.intaleqapp.com`. It proxies connections to localhost and processes WebSocket handshakes: + +```nginx +server { + listen 80; + server_name calls.intaleqapp.com; + + location / { + proxy_pass http://127.0.0.1:47880; + + # WebSocket support + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + + # Headers + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # Read timeout management + proxy_read_timeout 120s; + proxy_send_timeout 120s; + } +} +``` + +## Environment Variables + +| Variable | Description | Default | +| :--- | :--- | :--- | +| `SERVER_ADDR` | Local binding interface and port | `127.0.0.1:47880` | +| `API_KEY` | Secret key for authenticating HTTP session creation | None (min 32 chars) | +| `LOG_LEVEL` | Level of logging output (`debug`, `info`, `warn`, `error`) | `info` | +| `MAX_MESSAGE_BYTES` | Maximum payload size allowed for WebSocket frames | `4096` | +| `HEARTBEAT_INTERVAL` | Keepalive ping interval | `20s` | +| `HEARTBEAT_TIMEOUT` | Connection death timeout | `30s` | +| `SESSION_DURATION` | Hard limit on call length before auto-termination | `60s` | +| `RATE_LIMIT_PER_MIN` | Maximum connections allowed per IP per minute | `10` | +| `ENVIRONMENT` | Target environment designation | `production` | +| `DB_HOST` | MySQL database host address | `127.0.0.1` | +| `DB_PORT` | MySQL database connection port | `3306` | +| `DB_DATABASE` | MySQL database name | `callDB` | +| `DB_USERNAME` | MySQL database username | None | +| `DB_PASSWORD` | MySQL database password | None | + +## Database Logging + +The signaling server integrates with a MySQL database to log call connectivity metrics. +The system automatically creates a `call_logs` table on startup (defined in `database.sql`). + +### Call Logs Schema + +| Column | Type | Description | +| :--- | :--- | :--- | +| `id` | `INT` | Primary Key, Auto Increment | +| `session_id` | `VARCHAR(36)` | Unique Ephemeral Session ID | +| `ride_id` | `VARCHAR(255)` | Active Ride ID | +| `driver_id` | `VARCHAR(255)` | Registered Driver ID | +| `passenger_id` | `VARCHAR(255)` | Registered Passenger ID | +| `status` | `VARCHAR(50)` | Call state (`created`, `active`, `ended`) | +| `initiated_by` | `VARCHAR(255)` | User ID of the participant who started the connection (sent first offer) | +| `end_reason` | `VARCHAR(255)` | Call termination reason (e.g., `user_terminated`, `max_duration_reached`, `disconnect_driver`, `disconnect_passenger`) | +| `created_at` | `TIMESTAMP` | Time when session was pre-created | +| `connected_at` | `TIMESTAMP` | Time when both parties connected and call started | +| `ended_at` | `TIMESTAMP` | Time when call was terminated | + +## HTTP Session Pre-creation API + +The main application backend (PHP) registers a session before allowing call access. + +### Create Call Session +* **Route**: `POST /sessions` +* **Headers**: + * `X-API-Key: YOUR_API_KEY_HERE` + * `Content-Type: application/json` +* **Request Body**: + ```json + { + "ride_id": "ride_456", + "driver_id": "user_123", + "passenger_id": "user_789" + } + ``` +* **Response Body (200 OK)**: + ```json + { + "session_id": "3b2e7c4f-95a2-4a0b-99f6-fc935d0a4461", + "ride_id": "ride_456", + "expires_in": 60 + } + ``` + +## WebSocket Protocol Reference + +Clients communicate with the backend at `ws://calls.intaleqapp.com/ws`. + +### Client-to-Server Messages + +1. **Authenticate (MUST be sent first)** + ```json + { + "type": "authenticate", + "session_id": "3b2e7c4f-95a2-4a0b-99f6-fc935d0a4461", + "user_id": "user_123" + } + ``` +2. **Offer WebRTC Payload** + ```json + {"type": "offer", "sdp": "v=0\r\n..."} + ``` +3. **Answer WebRTC Payload** + ```json + {"type": "answer", "sdp": "v=0\r\n..."} + ``` +4. **ICE Candidate WebRTC Payload** + ```json + {"type": "ice_candidate", "candidate": {"candidate": "...", "sdpMid": "0", "sdpMLineIndex": 0}} + ``` +5. **Heartbeat** + ```json + {"type": "heartbeat"} + ``` +6. **End Call** + ```json + {"type": "end_call"} + ``` + +### Server-to-Client Messages + +1. **Authentication Success Alert** + ```json + {"type": "authenticated", "user_id": "user_123"} + ``` +2. **Session Joined (Active status)** + ```json + {"type": "session_joined", "session_id": "sess_abc", "ride_id": "ride_456"} + ``` +3. **Peer Joined Notification** + ```json + {"type": "participant_joined", "role": "passenger"} + ``` +4. **Relayed WebRTC Offer** + ```json + {"type": "offer", "sdp": "v=0\r\n..."} + ``` +5. **Relayed WebRTC Answer** + ```json + {"type": "answer", "sdp": "v=0\r\n..."} + ``` +6. **Relayed ICE Candidate** + ```json + {"type": "ice_candidate", "candidate": {"candidate": "...", "sdpMid": "0", "sdpMLineIndex": 0}} + ``` +7. **Peer Disconnected Alert** + ```json + {"type": "participant_left", "role": "driver"} + ``` +8. **Call Duration Timeout Alert** + ```json + {"type": "call_timeout", "reason": "max_duration_reached"} + ``` +9. **Call Terminated Alert** + ```json + {"type": "call_ended", "reason": "user_terminated"} + ``` +10. **Heartbeat Reply (Pong)** + ```json + {"type": "pong"} + ``` +11. **Unauthorized Connection Error** + ```json + {"type": "unauthorized", "reason": "session_not_found"} + ``` +12. **General Error Message** + ```json + {"type": "error", "code": "session_not_found", "message": "No active session for this ride"} + ``` + +## Security Notes + +1. **Access Control** + Only your trusted main backend can create call sessions, verified via the `X-API-Key` header. + Mobile clients only receive the ephemeral `session_id` and can only authenticate under their pre-registered `user_id` context. + +2. **Encryption** + SSL (WSS) is terminated at Nginx. Internal network loops (from Nginx to 47880) run local loopbacks. diff --git a/cmd/server/main.js b/cmd/server/main.js new file mode 100644 index 0000000..f2a3821 --- /dev/null +++ b/cmd/server/main.js @@ -0,0 +1,152 @@ +import http from 'http'; +import process from 'process'; +import { config } from '../../internal/config/config.js'; +import { logger } from '../../internal/logger/logger.js'; +import { RateLimiter } from '../../internal/middleware/ratelimit.js'; +import { Store } from '../../internal/session/store.js'; +import { Hub } from '../../internal/ws/hub.js'; +import { setupWebSocket } from '../../internal/ws/handler.js'; +import { startTimer } from '../../internal/timer/timer.js'; +import { initializeDatabase, logSessionCreated } from '../../internal/db/db.js'; + +// Bootstrapping log +logger.info('server_initiating', { + addr: config.serverAddr, + environment: config.environment +}); + +const store = new Store(); +const hub = new Hub(store); +const limiter = new RateLimiter(config.rateLimitPerMin, 60000); // 1 minute window + +// Configure standard HTTP server to manage REST actions +const server = http.createServer((req, res) => { + const url = new URL(req.url, `http://${req.headers.host || 'localhost'}`); + + // Endpoint: GET /health + if (url.pathname === '/health' && req.method === 'GET') { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ + status: 'ok', + active_sessions: store.getActiveCount(), + connected_clients: hub.getConnectedClientsCount() + })); + return; + } + + // Endpoint: POST /sessions + // Pre-creates call session mapping. Authenticated using X-API-Key header. + if (url.pathname === '/sessions' && req.method === 'POST') { + const apiKeyHeader = req.headers['x-api-key']; + if (!apiKeyHeader || apiKeyHeader !== config.apiKey) { + logger.warn('unauthorized_session_creation_attempt', { + remote_ip: req.socket.remoteAddress + }); + res.writeHead(401, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'unauthorized' })); + return; + } + + let body = ''; + req.on('data', chunk => { + body += chunk; + }); + + req.on('end', async () => { + let params; + try { + params = JSON.parse(body); + } catch (err) { + res.writeHead(400, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'invalid_json' })); + return; + } + + const { ride_id, driver_id, passenger_id, driver_ip, passenger_ip } = params; + if (!ride_id || !driver_id || !passenger_id) { + res.writeHead(400, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'missing_parameters' })); + return; + } + + let sess; + try { + // Enforce hard 60s maximum call duration + sess = store.createSession(ride_id, driver_id, passenger_id, 60000, driver_ip, passenger_ip); + } catch (err) { + logger.warn('session_creation_failed', { ride_id, error: err.message }); + res.writeHead(409, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'session_exists' })); + return; + } + + // Start 60s timeout countdown + sess.timer = startTimer(60000, () => { + hub.forceEndSession(sess.sessionID, 'max_duration_reached'); + }); + + // Log session creation to database + await logSessionCreated(sess.sessionID, sess.rideID, sess.driverID, sess.passengerID, sess.driverIP, sess.passengerIP); + + logger.info('session_created', { + session_id: sess.sessionID, + ride_id: sess.rideID, + driver_id: sess.driverID, + passenger_id: sess.passengerID, + driver_ip: sess.driverIP, + passenger_ip: sess.passengerIP + }); + + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ + session_id: sess.sessionID, + ride_id: sess.rideID, + expires_in: 60 + })); + }); + return; + } + + res.writeHead(404, { 'Content-Type': 'text/plain' }); + res.end('Not Found'); +}); + +// Bind WebSocket upgrades interceptor +setupWebSocket(server, hub, limiter); + +// Resolve address binding parts +const [host, portStr] = config.serverAddr.split(':'); +const port = parseInt(portStr, 10); + +// Initialize database first then start server +initializeDatabase().then(() => { + server.listen(port, host, () => { + logger.info('server_running', { addr: config.serverAddr }); + }); +}); + +// Graceful exit handling +function shutdown(signal) { + logger.info('server_stopping', { signal }); + + // Close HTTP server to stop accepting new traffic + server.close((err) => { + if (err) { + logger.error('server_close_error', { error: err.message }); + } + logger.info('server_stopped_cleanly'); + process.exit(0); + }); + + // Warn active sockets and terminate connections + hub.shutdownGracefully(); + + // Force close after 10s maximum timeout + setTimeout(() => { + logger.warn('server_shutdown_timeout_force_exit'); + process.exit(1); + }, 10000).unref(); +} + +process.on('SIGINT', () => shutdown('SIGINT')); +process.on('SIGTERM', () => shutdown('SIGTERM')); diff --git a/database.sql b/database.sql new file mode 100644 index 0000000..9ac8faf --- /dev/null +++ b/database.sql @@ -0,0 +1,19 @@ +CREATE TABLE IF NOT EXISTS `call_logs` ( + `id` INT AUTO_INCREMENT PRIMARY KEY, + `session_id` VARCHAR(36) NOT NULL UNIQUE, + `ride_id` VARCHAR(255) NOT NULL, + `driver_id` VARCHAR(255) NOT NULL, + `passenger_id` VARCHAR(255) NOT NULL, + `driver_ip` VARCHAR(45) NULL DEFAULT NULL, + `passenger_ip` VARCHAR(45) NULL DEFAULT NULL, + `driver_conn_ip` VARCHAR(45) NULL DEFAULT NULL, + `passenger_conn_ip` VARCHAR(45) NULL DEFAULT NULL, + `status` VARCHAR(50) NOT NULL DEFAULT 'created', -- 'created', 'active', 'ended' + `initiated_by` VARCHAR(255) NULL DEFAULT NULL, + `end_reason` VARCHAR(255) NULL DEFAULT NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `connected_at` TIMESTAMP NULL DEFAULT NULL, + `ended_at` TIMESTAMP NULL DEFAULT NULL, + INDEX `idx_session_id` (`session_id`), + INDEX `idx_ride_id` (`ride_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; diff --git a/deploy.sh b/deploy.sh new file mode 100755 index 0000000..2f8a4b2 --- /dev/null +++ b/deploy.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# Simple Deployment Script to stage, commit, and push changes to Gitea + +echo "=========================================" +echo " Staging and pushing changes to Gitea..." +echo "=========================================" + +git add . + +# Prompt for a commit message +read -p "Enter commit message: " desc +if [ -z "$desc" ]; then + desc="deploy: update voice call service" +fi + +git commit -m "$desc" +git push origin main + +echo "[+] Done! Now run 'git pull origin main' on your server." diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..24e2bc0 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,15 @@ +version: "3.9" +services: + voice-call: + build: . + container_name: intaleq-voice-call + restart: unless-stopped + network_mode: host + env_file: .env + mem_limit: 128m + memswap_limit: 128m + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" diff --git a/docker/nginx.conf b/docker/nginx.conf new file mode 100644 index 0000000..7218713 --- /dev/null +++ b/docker/nginx.conf @@ -0,0 +1,27 @@ +server { + listen 80; + listen [::]:80; + server_name calls.intaleqapp.com; + + # Redirection to HTTPS should be managed by CloudPanel/Nginx outer server block. + # This block configures proxying WebSocket signaling connections to the Go backend. + + location / { + proxy_pass http://127.0.0.1:47880; + + # WebSockets support + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + + # Forwarded headers + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # Connection timeout management + proxy_read_timeout 120s; + proxy_send_timeout 120s; + } +} diff --git a/internal/config/config.js b/internal/config/config.js new file mode 100644 index 0000000..bad89f1 --- /dev/null +++ b/internal/config/config.js @@ -0,0 +1,48 @@ +import process from 'process'; + +const apiKey = process.env.API_KEY || ''; +if (apiKey.length < 32) { + console.error(`FATAL: API_KEY must be at least 32 characters long, current length: ${apiKey.length}`); + process.exit(1); +} + +/** + * Parses duration strings like "20s", "1m" to milliseconds. + * @param {string|undefined} val + * @param {number} defaultMs + * @returns {number} + */ +const parseDuration = (val, defaultMs) => { + if (!val) return defaultMs; + const match = val.match(/^(\d+)(s|m|h|ms)$/); + if (!match) return defaultMs; + const num = parseInt(match[1], 10); + const unit = match[2]; + switch (unit) { + case 'ms': return num; + case 's': return num * 1000; + case 'm': return num * 60 * 1000; + case 'h': return num * 60 * 60 * 1000; + default: return defaultMs; + } +}; + +export const config = { + serverAddr: process.env.SERVER_ADDR || '127.0.0.1:47880', + apiKey: apiKey, + logLevel: process.env.LOG_LEVEL || 'info', + maxMessageBytes: parseInt(process.env.MAX_MESSAGE_BYTES || '4096', 10), + heartbeatInterval: parseDuration(process.env.HEARTBEAT_INTERVAL, 20000), // 20s + heartbeatTimeout: parseDuration(process.env.HEARTBEAT_TIMEOUT, 30000), // 30s + sessionDuration: parseDuration(process.env.SESSION_DURATION, 60000), // 60s + rateLimitPerMin: parseInt(process.env.RATE_LIMIT_PER_MIN || '10', 10), + environment: process.env.ENVIRONMENT || 'production', + dbHost: process.env.DB_HOST || '127.0.0.1', + dbPort: parseInt(process.env.DB_PORT || '3306', 10), + dbDatabase: process.env.DB_DATABASE || 'callDB', + dbUsername: process.env.DB_USERNAME || '', + dbPassword: process.env.DB_PASSWORD || '', + turnUrl: process.env.TURN_URL || '', + turnUsername: process.env.TURN_USERNAME || '', + turnCredential: process.env.TURN_CREDENTIAL || '', +}; diff --git a/internal/db/db.js b/internal/db/db.js new file mode 100644 index 0000000..ee0b6b3 --- /dev/null +++ b/internal/db/db.js @@ -0,0 +1,119 @@ +import mysql from 'mysql2/promise'; +import { config } from '../config/config.js'; +import { logger } from '../logger/logger.js'; + +let pool = null; + +/** + * Initializes the MySQL database pool and ensures schema exists. + */ +export async function initializeDatabase() { + if (!config.dbUsername) { + logger.warn('db_disabled', { reason: 'No DB_USERNAME configured, database logging is disabled' }); + return; + } + + try { + pool = mysql.createPool({ + host: config.dbHost, + port: config.dbPort, + database: config.dbDatabase, + user: config.dbUsername, + password: config.dbPassword, + waitForConnections: true, + connectionLimit: 10, + queueLimit: 0 + }); + + // Test connection and auto-create the table if not exists + const conn = await pool.getConnection(); + await conn.query(` + CREATE TABLE IF NOT EXISTS \`call_logs\` ( + \`id\` INT AUTO_INCREMENT PRIMARY KEY, + \`session_id\` VARCHAR(36) NOT NULL UNIQUE, + \`ride_id\` VARCHAR(255) NOT NULL, + \`driver_id\` VARCHAR(255) NOT NULL, + \`passenger_id\` VARCHAR(255) NOT NULL, + \`driver_ip\` VARCHAR(45) NULL DEFAULT NULL, + \`passenger_ip\` VARCHAR(45) NULL DEFAULT NULL, + \`driver_conn_ip\` VARCHAR(45) NULL DEFAULT NULL, + \`passenger_conn_ip\` VARCHAR(45) NULL DEFAULT NULL, + \`status\` VARCHAR(50) NOT NULL DEFAULT 'created', + \`initiated_by\` VARCHAR(255) NULL DEFAULT NULL, + \`end_reason\` VARCHAR(255) NULL DEFAULT NULL, + \`created_at\` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + \`connected_at\` TIMESTAMP NULL DEFAULT NULL, + \`ended_at\` TIMESTAMP NULL DEFAULT NULL, + INDEX \`idx_session_id\` (\`session_id\`), + INDEX \`idx_ride_id\` (\`ride_id\`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + `); + conn.release(); + logger.info('db_initialized', { host: config.dbHost, database: config.dbDatabase }); + } catch (err) { + logger.error('db_initialization_failed', { error: err.message }); + // Soft fallback: log the failure but do not crash the signaling server + pool = null; + } +} + +/** + * Logs session creation to the database with pre-registered client IPs. + */ +export async function logSessionCreated(sessionID, rideID, driverID, passengerID, driverIP = null, passengerIP = null) { + if (!pool) return; + try { + await pool.query( + `INSERT INTO call_logs (session_id, ride_id, driver_id, passenger_id, driver_ip, passenger_ip, status, created_at) + VALUES (?, ?, ?, ?, ?, ?, 'created', NOW())`, + [sessionID, rideID, driverID, passengerID, driverIP, passengerIP] + ); + } catch (err) { + logger.error('db_log_session_created_failed', { session_id: sessionID, error: err.message }); + } +} + +/** + * Logs session status transition to active and records the connection IPs. + */ +export async function logSessionActive(sessionID, driverConnIP = null, passengerConnIP = null) { + if (!pool) return; + try { + await pool.query( + `UPDATE call_logs SET status = 'active', connected_at = NOW(), driver_conn_ip = ?, passenger_conn_ip = ? WHERE session_id = ?`, + [driverConnIP, passengerConnIP, sessionID] + ); + } catch (err) { + logger.error('db_log_session_active_failed', { session_id: sessionID, error: err.message }); + } +} + +/** + * Logs the initiator (caller) of the call when the first SDP offer is received. + */ +export async function logSessionInitiator(sessionID, initiatorID) { + if (!pool) return; + try { + await pool.query( + `UPDATE call_logs SET initiated_by = ? WHERE session_id = ? AND initiated_by IS NULL`, + [initiatorID, sessionID] + ); + } catch (err) { + logger.error('db_log_session_initiator_failed', { session_id: sessionID, initiator_id: initiatorID, error: err.message }); + } +} + +/** + * Logs call termination reason and end time. + */ +export async function logSessionEnded(sessionID, endReason) { + if (!pool) return; + try { + await pool.query( + `UPDATE call_logs SET status = 'ended', ended_at = NOW(), end_reason = ? WHERE session_id = ?`, + [endReason, sessionID] + ); + } catch (err) { + logger.error('db_log_session_ended_failed', { session_id: sessionID, error: err.message }); + } +} diff --git a/internal/logger/logger.js b/internal/logger/logger.js new file mode 100644 index 0000000..afb231e --- /dev/null +++ b/internal/logger/logger.js @@ -0,0 +1,52 @@ +import process from 'process'; + +const LOG_LEVELS = { + debug: 0, + info: 1, + warn: 2, + error: 3 +}; + +const currentLevelStr = (process.env.LOG_LEVEL || 'info').toLowerCase(); +const currentLevel = LOG_LEVELS[currentLevelStr] !== undefined ? LOG_LEVELS[currentLevelStr] : 1; + +const ANSI_COLORS = { + DEBUG: '\x1b[36mDEBUG\x1b[0m', // Cyan + INFO: '\x1b[32mINFO\x1b[0m', // Green + WARN: '\x1b[33mWARN\x1b[0m', // Yellow + ERROR: '\x1b[31mERROR\x1b[0m' // Red +}; + +/** + * Helper to print log structured JSON objects to stdout. + * @param {string} levelName + * @param {string} eventName + * @param {object} [attributes] + */ +function writeLog(levelName, eventName, attributes = {}) { + const levelNum = LOG_LEVELS[levelName]; + if (levelNum < currentLevel) { + return; + } + + const levelUpper = levelName.toUpperCase(); + const logEntry = { + timestamp: new Date().toISOString(), + level: `__COLOR_LEVEL_${levelUpper}__`, + event: eventName, + ...attributes + }; + + let jsonStr = JSON.stringify(logEntry); + const coloredLevel = ANSI_COLORS[levelUpper] || levelUpper; + jsonStr = jsonStr.replace(`"__COLOR_LEVEL_${levelUpper}__"`, `"${coloredLevel}"`); + + process.stdout.write(jsonStr + '\n'); +} + +export const logger = { + debug: (event, attrs) => writeLog('debug', event, attrs), + info: (event, attrs) => writeLog('info', event, attrs), + warn: (event, attrs) => writeLog('warn', event, attrs), + error: (event, attrs) => writeLog('error', event, attrs) +}; diff --git a/internal/middleware/ratelimit.js b/internal/middleware/ratelimit.js new file mode 100644 index 0000000..6276329 --- /dev/null +++ b/internal/middleware/ratelimit.js @@ -0,0 +1,59 @@ +/** + * RateLimiter limits connection attempts using a sliding window. + */ +export class RateLimiter { + /** + * @param {number} limit Maximum connections allowed + * @param {number} windowMs Sliding window length in milliseconds + */ + constructor(limit, windowMs) { + this.limit = limit; + this.windowMs = windowMs; + this.attempts = new Map(); + + // Start background cleanup worker every 5 minutes + this.cleanupInterval = setInterval(() => this.cleanup(), 5 * 60 * 1000); + if (this.cleanupInterval.unref) { + this.cleanupInterval.unref(); + } + } + + /** + * Checks if an attempt is allowed under rate limits for the given IP. + * @param {string} ip + * @returns {boolean} True if connection attempt is within rate limits + */ + allow(ip) { + const now = Date.now(); + const cutoff = now - this.windowMs; + + let timestamps = this.attempts.get(ip) || []; + + // Filter out historical connection attempts outside the current window + timestamps = timestamps.filter(t => t > cutoff); + + if (timestamps.length >= this.limit) { + this.attempts.set(ip, timestamps); + return false; + } + + timestamps.push(now); + this.attempts.set(ip, timestamps); + return true; + } + + /** + * Sweeps inactive keys to reclaim memory. + */ + cleanup() { + const cutoff = Date.now() - this.windowMs; + for (const [ip, timestamps] of this.attempts.entries()) { + const active = timestamps.filter(t => t > cutoff); + if (active.length === 0) { + this.attempts.delete(ip); + } else { + this.attempts.set(ip, active); + } + } + } +} diff --git a/internal/protocol/messages.js b/internal/protocol/messages.js new file mode 100644 index 0000000..dfba403 --- /dev/null +++ b/internal/protocol/messages.js @@ -0,0 +1,48 @@ +// Message type constants +export const TypeAuthenticate = 'authenticate'; +export const TypeCreateSession = 'create_session'; +export const TypeJoinSession = 'join_session'; +export const TypeOffer = 'offer'; +export const TypeAnswer = 'answer'; +export const TypeICECandidate = 'ice_candidate'; +export const TypeHeartbeat = 'heartbeat'; +export const TypeEndCall = 'end_call'; + +export const TypeAuthenticated = 'authenticated'; +export const TypeSessionCreated = 'session_created'; +export const TypeSessionJoined = 'session_joined'; +export const TypeParticipantJoined = 'participant_joined'; +export const TypeParticipantLeft = 'participant_left'; +export const TypeCallTimeout = 'call_timeout'; +export const TypeCallEnded = 'call_ended'; +export const TypePong = 'pong'; +export const TypeUnauthorized = 'unauthorized'; +export const TypeError = 'error'; + +// Error codes +export const ErrRideNotActive = 'ride_not_active'; +export const ErrRideAlreadyStarted = 'ride_already_started'; +export const ErrRideCancelled = 'ride_cancelled'; +export const ErrSessionExists = 'session_exists'; +export const ErrSessionNotFound = 'session_not_found'; +export const ErrSessionFull = 'session_full'; +export const ErrUnauthorizedUser = 'unauthorized_user'; +export const ErrTokenExpired = 'token_expired'; +export const ErrTokenInvalid = 'token_invalid'; +export const ErrPayloadTooLarge = 'payload_too_large'; +export const ErrRateLimited = 'rate_limited'; + +// Helper constructors for encoding server messages +export const newAuthenticated = (userID, iceServers) => JSON.stringify({ type: TypeAuthenticated, user_id: userID, ice_servers: iceServers }); +export const newSessionCreated = (sessionID, rideID, expiresIn) => JSON.stringify({ type: TypeSessionCreated, session_id: sessionID, ride_id: rideID, expires_in: expiresIn }); +export const newSessionJoined = (sessionID, rideID) => JSON.stringify({ type: TypeSessionJoined, session_id: sessionID, ride_id: rideID }); +export const newParticipantJoined = (role) => JSON.stringify({ type: TypeParticipantJoined, role }); +export const newRelayOffer = (sdp) => JSON.stringify({ type: TypeOffer, sdp }); +export const newRelayAnswer = (sdp) => JSON.stringify({ type: TypeAnswer, sdp }); +export const newRelayICE = (candidate) => JSON.stringify({ type: TypeICECandidate, candidate }); +export const newParticipantLeft = (role) => JSON.stringify({ type: TypeParticipantLeft, role }); +export const newCallTimeout = (reason) => JSON.stringify({ type: TypeCallTimeout, reason }); +export const newCallEnded = (reason) => JSON.stringify({ type: TypeCallEnded, reason }); +export const newPong = () => JSON.stringify({ type: TypePong }); +export const newUnauthorized = (reason) => JSON.stringify({ type: TypeUnauthorized, reason }); +export const newError = (code, message) => JSON.stringify({ type: TypeError, code, message }); diff --git a/internal/session/session.js b/internal/session/session.js new file mode 100644 index 0000000..b851f8d --- /dev/null +++ b/internal/session/session.js @@ -0,0 +1,37 @@ +import crypto from 'crypto'; + +/** + * Generates a cryptographically secure UUID version 4. + * @returns {string} + */ +export function generateUUIDv4() { + return crypto.randomUUID(); +} + +/** + * Session models an active call coordination session. + */ +export class Session { + /** + * @param {string} rideID + * @param {string} driverID + * @param {string} passengerID + * @param {number} durationMs + * @param {string} [driverIP] + * @param {string} [passengerIP] + */ + constructor(rideID, driverID, passengerID, durationMs, driverIP = null, passengerIP = null) { + this.sessionID = generateUUIDv4(); + this.rideID = rideID; + this.driverID = driverID; + this.passengerID = passengerID; + this.driverIP = driverIP; + this.passengerIP = passengerIP; + this.createdAt = new Date(); + this.expiresAt = new Date(this.createdAt.getTime() + durationMs); + this.status = 'waiting'; // "waiting" | "active" | "ended" + this.driverConn = null; + this.passengerConn = null; + this.timer = null; + } +} diff --git a/internal/session/store.js b/internal/session/store.js new file mode 100644 index 0000000..17a6baa --- /dev/null +++ b/internal/session/store.js @@ -0,0 +1,88 @@ +import { Session } from './session.js'; + +/** + * Store handles active in-memory sessions indexed by session ID. + * It also maintains a ride ID mapping to prevent duplicate active sessions per ride. + */ +export class Store { + constructor() { + this.sessions = new Map(); // key: sessionID, value: Session + this.rideToSession = new Map(); // key: rideID, value: Session + } + + /** + * Retrieves a session by its session ID. + * @param {string} sessionID + * @returns {Session|undefined} + */ + getSession(sessionID) { + return this.sessions.get(sessionID); + } + + /** + * Retrieves an active session by its ride ID. + * @param {string} rideID + * @returns {Session|undefined} + */ + getSessionByRide(rideID) { + return this.rideToSession.get(rideID); + } + + /** + * Registers a new call session, checking for active duplicates. + * @param {string} rideID + * @param {string} driverID + * @param {string} passengerID + * @param {number} durationMs + * @param {string} [driverIP] + * @param {string} [passengerIP] + * @returns {Session} + * @throws {Error} Throws if an active session already exists for the ride + */ + createSession(rideID, driverID, passengerID, durationMs, driverIP = null, passengerIP = null) { + const existing = this.rideToSession.get(rideID); + if (existing && existing.status !== 'ended') { + throw new Error('session_exists'); + } + + const sess = new Session(rideID, driverID, passengerID, durationMs, driverIP, passengerIP); + this.sessions.set(sess.sessionID, sess); + this.rideToSession.set(rideID, sess); + return sess; + } + + /** + * Removes a session from registry mappings. + * @param {string} sessionID + */ + destroySession(sessionID) { + const sess = this.sessions.get(sessionID); + if (sess) { + this.sessions.delete(sessionID); + this.rideToSession.delete(sess.rideID); + } + } + + /** + * Finds expired active sessions. + * @returns {Session[]} + */ + getExpiredSessions() { + const expired = []; + const now = Date.now(); + for (const sess of this.sessions.values()) { + if (sess.status !== 'ended' && now > sess.expiresAt.getTime()) { + expired.push(sess); + } + } + return expired; + } + + /** + * Returns active sessions count. + * @returns {number} + */ + getActiveCount() { + return this.sessions.size; + } +} diff --git a/internal/timer/timer.js b/internal/timer/timer.js new file mode 100644 index 0000000..1d02e6d --- /dev/null +++ b/internal/timer/timer.js @@ -0,0 +1,32 @@ +/** + * CallTimer wraps Node's setTimeout/clearTimeout to allow session cancellations. + */ +export class CallTimer { + /** + * @param {number} durationMs + * @param {function} onExpiry + */ + constructor(durationMs, onExpiry) { + this.timer = setTimeout(onExpiry, durationMs); + } + + /** + * Cancels the active timer. + */ + stop() { + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + } +} + +/** + * Starts a session timeout timer. + * @param {number} durationMs + * @param {function} onExpiry + * @returns {CallTimer} + */ +export function startTimer(durationMs, onExpiry) { + return new CallTimer(durationMs, onExpiry); +} diff --git a/internal/ws/client.js b/internal/ws/client.js new file mode 100644 index 0000000..c678620 --- /dev/null +++ b/internal/ws/client.js @@ -0,0 +1,151 @@ +import { logger } from '../logger/logger.js'; +import * as protocol from '../protocol/messages.js'; + +/** + * Client represents a single active WebSocket connection wrapper. + * It encapsulates the raw connection, handles ping/pong heartbeat, + * parses messages, and routes events back to the coordinator Hub. + */ +export class Client { + /** + * Initializes a new client instance and sets up connection event listeners and heartbeat. + * @param {object} conn The raw WS socket connection instance from the 'ws' library. + * @param {object} hub The central Hub coordinator to process and route messages. + */ + constructor(conn, hub) { + this.conn = conn; // The underlying raw WebSocket connection + this.hub = hub; // Reference to the main coordinator hub + this.userID = ''; // Authenticated user ID (driver_id or passenger_id) + this.rideID = ''; // Associated ride ID + this.role = ''; // User's role: 'driver' or 'passenger' + this.isAlive = true; // Connection status flag updated by heartbeat pong responses + this.pongTimeout = null; // Timer waiting for a pong reply after sending a ping + + /** + * Listener for incoming WebSocket message payloads. + * Enforces a hard read limit of 4096 bytes (4KB) to prevent Denial of Service (DoS) memory consumption. + */ + this.conn.on('message', (data) => { + if (data.length > 4096) { + logger.warn('payload_too_large', { remote_ip: this.remoteIP() }); + this.send(protocol.newError(protocol.ErrPayloadTooLarge, 'Payload too large')); + this.close(); + return; + } + // Pass the message to the hub for authentication and signaling routing + this.hub.handleMessage(this, data); + }); + + /** + * Listener for the 'pong' response frame. + * Indicates that the client is responsive. Clears the timeout timer. + */ + this.conn.on('pong', () => { + this.isAlive = true; + if (this.pongTimeout) { + clearTimeout(this.pongTimeout); + this.pongTimeout = null; + } + }); + + /** + * Listener for connection close event. + * Triggers cleanup of heartbeat timers and unregisters the client from the hub. + */ + this.conn.on('close', () => { + this.cleanup(); + this.hub.unregister(this); + }); + + /** + * Listener for socket connection errors. + * Logs the error details and safely terminates the connection. + */ + this.conn.on('error', (err) => { + logger.error('websocket_error', { + user_id: this.userID, + ride_id: this.rideID, + error: err.message + }); + this.close(); + }); + + /** + * Heartbeat loop: Pings the client every 20 seconds. + * If the client does not respond with a pong within 10 seconds, the connection is considered dead and closed. + * This prevents resource leaks from half-open TCP connections. + */ + this.pingInterval = setInterval(() => { + this.isAlive = false; + try { + this.conn.ping(); + } catch (err) { + this.close(); + return; + } + + // Schedule a timeout check for 10 seconds. Close connection if isAlive is still false. + this.pongTimeout = setTimeout(() => { + if (!this.isAlive) { + logger.warn('heartbeat_timeout', { + user_id: this.userID, + ride_id: this.rideID, + remote_ip: this.remoteIP() + }); + this.close(); + } + }, 10000); + }, 20000); + } + + /** + * Sends a serialized JSON message down the WebSocket connection wire. + * Checks the connection readyState to ensure it is in the OPEN state before writing. + * @param {string} msg Serialized JSON string message payload. + */ + send(msg) { + if (this.conn.readyState === 1) { // WebSocket.OPEN state + try { + this.conn.send(msg); + } catch (err) { + logger.error('websocket_send_failed', { error: err.message }); + } + } + } + + /** + * Resolves the remote IP address of the connected client. + * Falls back to 'unknown' if the connection socket is already closed or unavailable. + * @returns {string} The remote client's IP address. + */ + remoteIP() { + return this.conn._socket ? this.conn._socket.remoteAddress : 'unknown'; + } + + /** + * Safely terminates the WebSocket connection and releases local memory and timers. + */ + close() { + try { + this.conn.close(); + } catch (err) { + // Ignored: connection might already be closed or unreachable + } + this.cleanup(); + } + + /** + * Cleans up and clears all scheduled interval pings and timeout check timers. + * This is critical to prevent CPU timer memory leaks when client closes. + */ + cleanup() { + if (this.pingInterval) { + clearInterval(this.pingInterval); + this.pingInterval = null; + } + if (this.pongTimeout) { + clearTimeout(this.pongTimeout); + this.pongTimeout = null; + } + } +} diff --git a/internal/ws/handler.js b/internal/ws/handler.js new file mode 100644 index 0000000..cbe68da --- /dev/null +++ b/internal/ws/handler.js @@ -0,0 +1,59 @@ +import { WebSocketServer } from 'ws'; +import { Client } from './client.js'; +import { logger } from '../logger/logger.js'; + +/** + * Attaches the WebSocket server upgrade hook to the given HTTP server instance. + * It intercepts standard HTTP GET requests for WebSocket protocols and handles upgrades. + * @param {object} server The native http.Server instance. + * @param {object} hub The central Hub coordinator instance. + * @param {object} limiter The IP-based RateLimiter instance. + */ +export function setupWebSocket(server, hub, limiter) { + // Initialize ws server in noServer mode since we manually intercept upgrade requests. + const wss = new WebSocketServer({ noServer: true }); + + /** + * Listens for the HTTP 'upgrade' event to switch protocol from HTTP to WebSocket. + */ + server.on('upgrade', (request, socket, head) => { + const ip = getClientIP(request); + + // Enforce IP-based rate limiting to prevent spam connections. + if (!limiter.allow(ip)) { + logger.warn('rate_limit_exceeded', { remote_ip: ip }); + // Send 429 Too Many Requests response to client and close connection. + socket.write('HTTP/1.1 429 Too Many Requests\r\nConnection: close\r\n\r\nrate_limited'); + socket.destroy(); + return; + } + + // Hand over control to 'ws' library to complete the upgrade protocol handshake. + wss.handleUpgrade(request, socket, head, (wsConn) => { + // Set remote address on socket mock for later client IP queries. + // This ensures we can resolve client IP even after connection is upgraded. + wsConn._socket = wsConn._socket || {}; + wsConn._socket.remoteAddress = ip; + + // Instantiate a new Client wrapper. + // Event listeners are automatically attached in the Client constructor. + new Client(wsConn, hub); + }); + }); +} + +/** + * Resolves the client's original IP address, supporting reverse proxy setups + * (like Nginx) that forward the real client IP via X-Forwarded-For header. + * @param {object} request The http.IncomingMessage request object. + * @returns {string} The resolved IP address. + */ +function getClientIP(request) { + const xff = request.headers['x-forwarded-for']; + if (xff) { + // If proxied multiple times, the first element is the client's real IP. + const ips = xff.split(','); + return ips[0].trim(); + } + return request.socket.remoteAddress || 'unknown'; +} diff --git a/internal/ws/hub.js b/internal/ws/hub.js new file mode 100644 index 0000000..950d821 --- /dev/null +++ b/internal/ws/hub.js @@ -0,0 +1,454 @@ +import { logger } from '../logger/logger.js'; +import * as protocol from '../protocol/messages.js'; +import { logSessionActive, logSessionInitiator, logSessionEnded } from '../db/db.js'; +import { config } from '../config/config.js'; + +/** + * Hub is the central coordinator for all active call sessions and client sockets. + * It manages connection mappings, routing of WebRTC signaling payloads (Offer, Answer, ICE), + * enforcing call session timers, and logging call connectivity changes to the database. + */ +export class Hub { + /** + * Initializes the Hub instance. + * @param {object} store The session memory store that holds active call profiles. + */ + constructor(store) { + this.clients = new Map(); // Map storing authenticated socket wrappers: Key is userID, Value is Client instance + this.store = store; // Reference to the active sessions in-memory storage + + // Background job: Periodically scans the session store every 10 seconds to close expired sessions + this.sweepInterval = setInterval(() => this.sweepExpiredSessions(), 10000); + if (this.sweepInterval.unref) { + this.sweepInterval.unref(); // Prevent this interval from keeping the Node process alive on exit + } + } + + /** + * Unregisters a disconnected client from the active clients registry map. + * If the client had an active session, it triggers call termination. + * @param {Client} client The Client connection instance. + */ + unregister(client) { + if (client.userID) { + // Check if the registered instance matches the client being removed + if (this.clients.get(client.userID) === client) { + this.clients.delete(client.userID); + logger.info('client_disconnected', { + user_id: client.userID, + session_id: client.sessionID, + remote_ip: client.remoteIP() + }); + } + } + + // If client was part of an active session, end the session since one participant disconnected + if (client.sessionID) { + this.endSession(client.sessionID, client.role); + } + } + + /** + * Ends an active call session because one of the participants disconnected. + * Cleans up timeouts, notifies the counterparty, and deletes session records. + * @param {string} sessionID The unique identifier of the call session. + * @param {string} leftRole The role of the client who disconnected ('driver' or 'passenger'). + */ + endSession(sessionID, leftRole) { + const sess = this.store.getSession(sessionID); + if (!sess) return; + + if (sess.status === 'ended') return; + sess.status = 'ended'; + + // Log the disconnection event and reason to the database + logSessionEnded(sessionID, `disconnect_${leftRole}`); + + // Stop the 60 seconds hard call duration timer + if (sess.timer) { + sess.timer.stop(); + sess.timer = null; + } + + logger.info('session_ended_by_disconnect', { + session_id: sessionID, + left_role: leftRole + }); + + // Notify the remaining peer that their partner left, and close their connection + const peer = leftRole === 'driver' ? sess.passengerConn : sess.driverConn; + if (peer) { + peer.send(protocol.newParticipantLeft(leftRole)); + peer.close(); + } + + // Evict the session from the active store index + this.store.destroySession(sessionID); + } + + /** + * Forces a session to end due to reaching the maximum call duration (e.g. 60 seconds). + * @param {string} sessionID The unique identifier of the call session. + * @param {string} reason The termination reason code (usually 'max_duration_reached'). + */ + forceEndSession(sessionID, reason) { + const sess = this.store.getSession(sessionID); + if (!sess) return; + + if (sess.status === 'ended') return; + sess.status = 'ended'; + + // Log the timeout event to the database + logSessionEnded(sessionID, reason); + + if (sess.timer) { + sess.timer.stop(); + sess.timer = null; + } + + logger.info('call_timeout', { + session_id: sessionID, + reason: reason + }); + + // Construct and transmit the timeout signal to both participants + const timeoutMsg = protocol.newCallTimeout(reason); + + if (sess.driverConn) { + sess.driverConn.send(timeoutMsg); + sess.driverConn.close(); + } + if (sess.passengerConn) { + sess.passengerConn.send(timeoutMsg); + sess.passengerConn.close(); + } + + this.store.destroySession(sessionID); + } + + /** + * Ends an active call session because one of the participants hung up. + * @param {string} sessionID The unique identifier of the call session. + */ + userEndCall(sessionID) { + const sess = this.store.getSession(sessionID); + if (!sess) return; + + if (sess.status === 'ended') return; + sess.status = 'ended'; + + // Log the hangup event to the database + logSessionEnded(sessionID, 'user_terminated'); + + if (sess.timer) { + sess.timer.stop(); + sess.timer = null; + } + + logger.info('call_ended', { + session_id: sessionID, + reason: 'user_terminated' + }); + + // Notify both participants that the call was hung up + const endedMsg = protocol.newCallEnded('user_terminated'); + + if (sess.driverConn) { + sess.driverConn.send(endedMsg); + sess.driverConn.close(); + } + if (sess.passengerConn) { + sess.passengerConn.send(endedMsg); + sess.passengerConn.close(); + } + + this.store.destroySession(sessionID); + } + + /** + * Gracefully shuts down all connected clients (invoked during server shutdown). + */ + shutdownGracefully() { + logger.info('server_shutdown_alert_clients'); + for (const client of this.clients.values()) { + client.send(protocol.newCallEnded('server_shutdown')); + client.close(); + } + } + + /** + * Sweeps and shuts down any sessions that have exceeded their expiration timestamps. + */ + sweepExpiredSessions() { + const expired = this.store.getExpiredSessions(); + for (const sess of expired) { + this.forceEndSession(sess.sessionID, 'max_duration_reached'); + } + } + + /** + * Returns the count of currently connected and authenticated WebSocket client connections. + * @returns {number} Active client connections count. + */ + getConnectedClientsCount() { + return this.clients.size; + } + + /** + * Processes incoming raw WebSocket message payloads. + * Handles client authentication and parses signalling messages. + * @param {Client} client The Client socket wrapper that received the message. + * @param {Buffer|string} msgBytes The raw message payload bytes. + */ + handleMessage(client, msgBytes) { + let base; + try { + base = JSON.parse(msgBytes.toString()); + } catch (err) { + client.send(protocol.newError(protocol.ErrTokenInvalid, 'Invalid JSON format')); + return; + } + + // 1. Enforce Authentication (Must be the very first message sent by a client connection) + if (!client.userID) { + if (base.type !== protocol.TypeAuthenticate) { + client.send(protocol.newError(protocol.ErrTokenInvalid, 'Authentication required')); + client.close(); + return; + } + + const { session_id, user_id } = base; + if (!session_id || !user_id) { + client.send(protocol.newError(protocol.ErrTokenInvalid, 'Missing session_id or user_id')); + client.close(); + return; + } + + // Check if the session exists in the memory store + const sess = this.store.getSession(session_id); + if (!sess) { + client.send(protocol.newError(protocol.ErrSessionNotFound, 'No active session found')); + client.close(); + return; + } + + if (sess.status === 'ended') { + client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session already ended')); + client.close(); + return; + } + + // Authorize client: user_id must match the driverID or passengerID pre-created for this session + let role = ''; + if (user_id === sess.driverID) { + role = 'driver'; + if (sess.driverConn) { + client.send(protocol.newError(protocol.ErrSessionExists, 'Driver already connected')); + client.close(); + return; + } + } else if (user_id === sess.passengerID) { + role = 'passenger'; + if (sess.passengerConn) { + client.send(protocol.newError(protocol.ErrSessionExists, 'Passenger already connected')); + client.close(); + return; + } + } else { + client.send(protocol.newError(protocol.ErrUnauthorizedUser, 'User not authorized for this session')); + client.close(); + return; + } + + // Kick out any duplicate connections for this user ID + if (this.clients.has(user_id)) { + const existing = this.clients.get(user_id); + if (existing) { + existing.close(); + } + } + + // Bind session context to the Client object + client.userID = user_id; + client.sessionID = session_id; + client.role = role; + client.rideID = sess.rideID; + + // Register the client in the hub's active connections map + this.clients.set(client.userID, client); + + // Register connection in the session + if (role === 'driver') { + sess.driverConn = client; + } else { + sess.passengerConn = client; + } + + // Log a warning if the connection IP does not match the pre-registered IP + const connectionIP = client.remoteIP(); + const expectedIP = role === 'driver' ? sess.driverIP : sess.passengerIP; + if (expectedIP && expectedIP !== '127.0.0.1' && expectedIP !== 'localhost' && connectionIP !== expectedIP) { + logger.warn('client_ip_mismatch', { + user_id: user_id, + session_id: session_id, + role: role, + expected_ip: expectedIP, + connection_ip: connectionIP + }); + } + + logger.info('client_authenticated', { + user_id: client.userID, + session_id: client.sessionID, + role: client.role, + remote_ip: connectionIP + }); + + // Prepare ICE servers configuration dynamically + const iceServers = [ + { urls: 'stun:stun.l.google.com:19302' }, + { urls: 'stun:stun1.l.google.com:19302' } + ]; + if (config.turnUrl) { + iceServers.push({ + urls: `${config.turnUrl}?transport=udp`, + username: config.turnUsername, + credential: config.turnCredential + }); + iceServers.push({ + urls: `${config.turnUrl}?transport=tcp`, + username: config.turnUsername, + credential: config.turnCredential + }); + } + + // Confirm successful authentication to the client with the dynamic ICE servers configuration + client.send(protocol.newAuthenticated(client.userID, iceServers)); + + // Trigger WebRTC active session joined if both are connected + if (sess.driverConn && sess.passengerConn) { + sess.status = 'active'; + const driverConnIP = sess.driverConn.remoteIP(); + const passengerConnIP = sess.passengerConn.remoteIP(); + logSessionActive(session_id, driverConnIP, passengerConnIP); + + logger.info('session_joined_active', { + session_id: session_id, + ride_id: sess.rideID, + driver_conn_ip: driverConnIP, + passenger_conn_ip: passengerConnIP + }); + + // Notify both clients that they have successfully joined the active session + const joinedMsg = protocol.newSessionJoined(session_id, sess.rideID); + sess.driverConn.send(joinedMsg); + sess.passengerConn.send(joinedMsg); + + // Send participant joined alerts to trigger WebRTC peer-to-peer negotiation + sess.driverConn.send(protocol.newParticipantJoined('passenger')); + sess.passengerConn.send(protocol.newParticipantJoined('driver')); + } + return; + } + + // 2. Process authenticated WebRTC signaling payloads + switch (base.type) { + case protocol.TypeOffer: + this.handleOffer(client, base); + break; + case protocol.TypeAnswer: + this.handleAnswer(client, base); + break; + case protocol.TypeICECandidate: + this.handleICECandidate(client, base); + break; + case protocol.TypeHeartbeat: + client.send(protocol.newPong()); // Reply with pong frame + break; + case protocol.TypeEndCall: + this.handleEndCall(client, base); + break; + default: + client.send(protocol.newError('bad_request', 'Unknown message type')); + } + } + + /** + * Relays a WebRTC SDP Offer message to the peer connection. + * Also identifies and logs the caller (initiator) of the call in the database on the first offer. + * @param {Client} client The Client instance that sent the offer. + * @param {object} m The offer message object containing the SDP. + */ + handleOffer(client, m) { + const sess = this.store.getSession(client.sessionID); + if (!sess) { + client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session not found')); + return; + } + + // Log initiator of the call in the database (the user who sends the first WebRTC offer) + if (!sess.initiatedBy) { + sess.initiatedBy = client.userID; + logSessionInitiator(client.sessionID, client.userID); + } + + const target = client.role === 'driver' ? sess.passengerConn : sess.driverConn; + if (!target) { + client.send(protocol.newError(protocol.ErrSessionNotFound, 'Peer not connected')); + return; + } + + target.send(protocol.newRelayOffer(m.sdp)); + } + + /** + * Relays a WebRTC SDP Answer message back to the peer. + * @param {Client} client The Client instance that sent the answer. + * @param {object} m The answer message object containing the SDP. + */ + handleAnswer(client, m) { + const sess = this.store.getSession(client.sessionID); + if (!sess) { + client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session not found')); + return; + } + + const target = client.role === 'driver' ? sess.passengerConn : sess.driverConn; + if (!target) { + client.send(protocol.newError(protocol.ErrSessionNotFound, 'Peer not connected')); + return; + } + + target.send(protocol.newRelayAnswer(m.sdp)); + } + + /** + * Relays a WebRTC ICE candidate message to the peer. + * @param {Client} client The Client instance that sent the candidate. + * @param {object} m The ICE candidate message object. + */ + handleICECandidate(client, m) { + const sess = this.store.getSession(client.sessionID); + if (!sess) { + client.send(protocol.newError(protocol.ErrSessionNotFound, 'Session not found')); + return; + } + + const target = client.role === 'driver' ? sess.passengerConn : sess.driverConn; + if (!target) { + client.send(protocol.newError(protocol.ErrSessionNotFound, 'Peer not connected')); + return; + } + + target.send(protocol.newRelayICE(m.candidate)); + } + + /** + * Handles user-triggered end_call action. + * @param {Client} client The Client instance that requested to end the call. + * @param {object} m The end_call message object. + */ + handleEndCall(client, m) { + this.userEndCall(client.sessionID); + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..f8763d0 --- /dev/null +++ b/package.json @@ -0,0 +1,15 @@ +{ + "name": "intaleq-voice-call", + "version": "1.0.0", + "description": "Voice Call Signaling Backend in Node.js for Intaleq", + "main": "cmd/server/main.js", + "type": "module", + "scripts": { + "start": "node cmd/server/main.js", + "dev": "node cmd/server/main.js" + }, + "dependencies": { + "mysql2": "^3.9.2", + "ws": "^8.16.0" + } +} diff --git a/scripts/healthcheck.sh b/scripts/healthcheck.sh new file mode 100755 index 0000000..6f57292 --- /dev/null +++ b/scripts/healthcheck.sh @@ -0,0 +1,16 @@ +#!/bin/sh +# Exit immediately if any command fails +set -e + +# Query the local health check endpoint +HEALTH_URL="http://127.0.0.1:47880/health" + +response=$(curl -s -f "$HEALTH_URL") + +# Verify the response contains 'status' equal to 'ok' +if echo "$response" | grep -q '"status":"ok"'; then + exit 0 +else + echo "Health check failed: response: $response" + exit 1 +fi