ping(); return $redis; } catch (\Exception $e) { logMsg('⚠️ Redis ping failed, reconnecting...'); $redis = null; } } try { $client = new RedisClient([ 'scheme' => 'tcp', 'host' => '127.0.0.1', 'port' => 6379, 'password' => $redisPass, 'read_write_timeout' => 0, ]); $client->connect(); $redis = $client; return $redis; } catch (\Exception $e) { logMsg('❌ Redis Error: ' . $e->getMessage()); return null; } } // ============================================================ // 📐 Haversine Distance (متر) // ============================================================ function haversineDistance(float $lat1, float $lng1, float $lat2, float $lng2): float { $R = 6371000; $dLat = deg2rad($lat2 - $lat1); $dLng = deg2rad($lng2 - $lng1); $a = sin($dLat / 2) ** 2 + cos(deg2rad($lat1)) * cos(deg2rad($lat2)) * sin($dLng / 2) ** 2; return $R * 2 * atan2(sqrt($a), sqrt(1 - $a)); } // ============================================================ // 📡 Forward موقع السائق → سيرفر الراكب (ASYNC) // ============================================================ function forwardLocationToPassengerSocket( string $driverId, string $passengerId, array $payload, string $internalKey, array &$fwdThrottle ): void { if (empty($passengerId)) return; $now = time(); $last = $fwdThrottle[$driverId] ?? null; if ($last !== null) { $timeDiff = $now - $last['ts']; $dist = haversineDistance( $last['lat'], $last['lng'], (float)$payload['lat'], (float)$payload['lng'] ); if ($dist < FORWARD_MIN_METERS && $timeDiff < FORWARD_MAX_SECONDS) return; } $fwdThrottle[$driverId] = [ 'ts' => $now, 'lat' => (float)$payload['lat'], 'lng' => (float)$payload['lng'], ]; $http = new AsyncHttp(); $http->request( 'http://127.0.0.1:3031', [ 'method' => 'POST', 'data' => http_build_query([ 'action' => 'update_driver_location', 'passenger_id' => $passengerId, 'payload' => json_encode($payload), ]), 'headers' => [ 'Content-Type' => 'application/x-www-form-urlencoded', 'x-internal-key' => $internalKey, 'Connection' => 'close', ], 'timeout' => 3, ], null, fn(\Exception $e) => logMsg('⚠️ Forward failed: ' . $e->getMessage()) ); } // ============================================================ // 📲 FCM (ASYNC) // ============================================================ function sendFCM_Async(string $token, string $title, string $body, array $rideData): void { if (empty($token)) return; $http = new AsyncHttp(); $http->request( 'https://api.intaleq.xyz/intaleq/ride/firebase/send_fcm.php', [ 'method' => 'POST', 'data' => json_encode([ 'target' => $token, 'title' => $title, 'body' => $body, 'isTopic' => false, 'category' => 'Order', 'tone' => 'start', 'passengerList' => json_encode($rideData), ]), 'headers' => ['Content-Type' => 'application/json; charset=UTF-8'], 'timeout' => 5, ], null, fn(\Exception $e) => logMsg('⚠️ FCM failed: ' . $e->getMessage()) ); } // ============================================================ // 🧠 Memory State & Event Buffer // ============================================================ $connectedDrivers = []; $active_orders_drivers = []; $driverState = []; $fwdThrottle = []; $eventBuffer = []; // 🚀 Level 2: مصفوفة تجميع الأحداث لـ Redis // ============================================================ // 🚀 Socket.IO — بورت 2020 // ============================================================ $io = new SocketIO(2020); // ============================================================ // A. Internal HTTP Server & Redis Batch Processor (Worker Start) // ============================================================ $io->on('workerStart', function () use ($io, $INTERNAL_KEY) { // 🚀 1. Redis Pipeline Batch Processor (Level 2) // يعمل كل نصف ثانية، يجمع كل الأوامر ويرسلها لـ Redis دفعة واحدة Timer::add(REDIS_BATCH_INTERVAL, function() { global $eventBuffer; if (empty($eventBuffer)) return; $redis = getRedis(); if (!$redis) return; try { $pipe = $redis->pipeline(); $processedCount = 0; foreach ($eventBuffer as $driverId => $ops) { $profileKey = "driver:profile:$driverId"; $processedCount++; if (isset($ops['hmset'])) { $pipe->hmset($profileKey, $ops['hmset']); } if (isset($ops['expire'])) { $pipe->expire($profileKey, $ops['expire']); } if (isset($ops['status_change'])) { $oldStatus = $ops['status_change']['old']; $newStatus = $ops['status_change']['new']; if ($oldStatus === 'on') $pipe->zrem('geo:drivers:busy', $driverId); if ($oldStatus === 'off') $pipe->zrem('geo:drivers:available', $driverId); if ($newStatus === 'close' || $newStatus === 'blocked') { $pipe->zrem('geo:drivers:available', $driverId); $pipe->zrem('geo:drivers:busy', $driverId); } } if (isset($ops['geoadd'])) { $st = $ops['geoadd']['status']; $lng = $ops['geoadd']['lng']; $lat = $ops['geoadd']['lat']; if ($st === 'off') { $pipe->geoadd('geo:drivers:available', $lng, $lat, $driverId); } elseif ($st === 'on') { $pipe->geoadd('geo:drivers:busy', $lng, $lat, $driverId); } } } $pipe->execute(); $eventBuffer = []; // إفراغ المصفوفة بعد التنفيذ الناجح // logMsg("⚡ Processed Redis Batch: $processedCount drivers updated in 1 network call."); } catch (\Exception $e) { logMsg("⚠️ Redis Pipeline Error: " . $e->getMessage()); } }); // 🌐 2. Internal HTTP Server — بورت 2021 $innerHttp = new Worker('http://0.0.0.0:2021'); $innerHttp->onMessage = function ($connection, $request) use ($io, $INTERNAL_KEY) { global $active_orders_drivers, $connectedDrivers; $headers = $request->header(); if (($headers['x-internal-key'] ?? '') !== $INTERNAL_KEY) { $connection->send('Unauthorized'); return; } $post = $request->post(); $action = trim($post['action'] ?? ''); $redis = getRedis(); // ── 1. Dispatch Order ──────────────────────────────── if ($action === 'dispatch_order') { $rideId = $post['ride_id'] ?? null; $drivers = json_decode($post['drivers_ids'] ?? '[]', true); $payload = $post['payload'] ?? []; if (is_array($payload)) $payload = array_values($payload); if ($rideId && !empty($drivers)) { $active_orders_drivers[$rideId] = $drivers; logMsg("🚀 Dispatch Ride #$rideId → " . count($drivers) . ' drivers.'); } foreach ($drivers as $driverId) { if (!isset($connectedDrivers[$driverId])) continue; $io->to('driver_' . $driverId)->emit('new_ride_request', $payload); $platform = $connectedDrivers[$driverId]['platform'] ?? 'android'; $token = $connectedDrivers[$driverId]['token'] ?? ''; if (!empty($token)) { sendFCM_Async($token, 'طلب جديد', 'لديك رحلة جديدة قريبة منك', $payload); } } $connection->send('Dispatched'); // ── 2. Market New Ride ──────────────────────────────── } elseif ($action === 'market_new_ride') { $payload = $post['payload'] ?? []; $rideId = $payload['id'] ?? null; $lat = (float)($payload['start_lat'] ?? 0); $lng = (float)($payload['start_lng'] ?? 0); if (!$redis || !$rideId || $lat == 0 || $lng == 0) { $connection->send('Error: Redis unavailable or invalid coords'); return; } $redis->geoadd('geo:rides:waiting', $lng, $lat, $rideId); $nearbyDrivers = $redis->georadius('geo:drivers:available', $lng, $lat, 50, 'km'); $count = 0; foreach ($nearbyDrivers as $driverId) { if (isset($connectedDrivers[$driverId])) { $io->to('driver_' . $driverId)->emit('market_new_ride', $payload); $count++; } } logMsg("📢 Market Ride #$rideId → $count drivers."); $connection->send("Broadcasted to $count drivers"); // ── 3. Get Nearby Ride IDs ──────────────────────────── } elseif ($action === 'get_nearby_ride_ids') { $lat = (float)($post['lat'] ?? 0); $lng = (float)($post['lng'] ?? 0); $radius = (float)($post['radius'] ?? 9); if (!$redis) { $connection->send(json_encode([])); return; } $results = $redis->georadius( 'geo:rides:waiting', $lng, $lat, $radius, 'km', ['WITHDIST' => true, 'SORT' => 'ASC', 'COUNT' => 40] ); $connection->send(json_encode($results)); // ── 4. Ride Taken ───────────────────────────────────── } elseif ($action === 'ride_taken_event') { $rideId = $post['ride_id'] ?? null; $winnerDriverId = $post['taken_by_driver_id'] ?? null; if (!$rideId) { $connection->send('Error: Missing ride_id'); return; } if ($redis) $redis->zrem('geo:rides:waiting', $rideId); $io->emit('ride_taken', [ 'ride_id' => $rideId, 'taken_by_driver_id' => $winnerDriverId, ]); unset($active_orders_drivers[$rideId]); logMsg("✅ Ride #$rideId taken by #$winnerDriverId."); $connection->send('OK'); // ── 5. Force Disconnect ─────────────────────────────── } elseif ($action === 'force_disconnect') { $driverId = $post['driver_id'] ?? null; if ($driverId && isset($connectedDrivers[$driverId])) { $connectedDrivers[$driverId]['conn']->disconnect(); unset($connectedDrivers[$driverId]); if ($redis) { $redis->zrem('geo:drivers:available', $driverId); $redis->zrem('geo:drivers:busy', $driverId); } logMsg("🚫 Driver #$driverId force-disconnected."); $connection->send('Disconnected'); } else { $connection->send('Driver not connected'); } } else { $connection->send('Unknown action'); } }; $innerHttp->listen(); }); // ============================================================ // B. WebSocket Events للسائقين // ============================================================ $io->on('connection', function ($socket) use ($INTERNAL_KEY) { global $connectedDrivers, $driverState, $fwdThrottle, $eventBuffer; $query = $socket->handshake['query'] ?? []; $driverId = $query['driver_id'] ?? null; $platform = $query['platform'] ?? 'android'; $token = $query['token'] ?? ''; if (!$driverId) { $socket->disconnect(); return; } $socket->join('driver_' . $driverId); $connectedDrivers[$driverId] = [ 'conn' => $socket, 'platform' => $platform, 'token' => $token, ]; if (!isset($driverState[$driverId])) { $driverState[$driverId] = [ 'lat' => 0.0, 'lng' => 0.0, 'speed' => -999.0, 'heading' => -999.0, 'status' => '', 'expire_ts' => 0, ]; } logMsg("✅ Driver Connected: #$driverId ($platform)"); $socket->on('ping_alive', function () { // Socket.IO handles pong automatically }); $socket->on('update_location', function ($data) use ($driverId, $INTERNAL_KEY, &$driverState, &$fwdThrottle, &$eventBuffer) { global $connectedDrivers; $data = (array) $data; $lat = isset($data['lat']) ? (float)$data['lat'] : null; $lng = isset($data['lng']) ? (float)$data['lng'] : null; $heading = (float)($data['heading'] ?? 0); $speed = (float)($data['speed'] ?? 0); $status = (string)($data['status'] ?? 'off'); $distance = (float)($data['distance'] ?? 0); $passengerId = (string)($data['passenger_id'] ?? ''); $rideId = $data['ride_id'] ?? null; if ($lat === null || $lng === null) return; $state = &$driverState[$driverId]; $now = time(); // 1. Forward للراكب (ASYNC + throttle) if (!empty($passengerId)) { forwardLocationToPassengerSocket( $driverId, $passengerId, [ 'latitude' => $lat, 'longitude' => $lng, 'heading' => $heading, 'speed' => $speed, 'ride_id' => $rideId, 'driver_id' => $driverId, ], $INTERNAL_KEY, $fwdThrottle ); } // 2. حساب ماذا تغيّر لتجنب ضغط Redis $movedMeters = ($state['lat'] == 0.0 && $state['lng'] == 0.0) ? 999.0 : haversineDistance($state['lat'], $state['lng'], $lat, $lng); $didMove = $movedMeters >= MIN_MOVE_METERS; $speedMs = $speed / 3.6; $speedChanged = abs($speedMs - $state['speed']) >= HMSET_SPEED_DELTA; $headingChanged = abs($heading - $state['heading']) >= HMSET_HEADING_DELTA; $statusChanged = ($status !== $state['status']); $needHmset = $speedChanged || $headingChanged || $statusChanged; $needGeoadd = $didMove; $needExpireRefresh = ($now - $state['expire_ts']) >= EXPIRE_REFRESH_SECONDS; if (!$needHmset && (!$needGeoadd && !$statusChanged) && !$needExpireRefresh) { return; // لم يتغير شيء مهم، تجاهل تماماً (0 عمليات Redis) } // 🚀 3. Buffering Event بدل الإرسال المباشر لـ Redis (Level 2 Magic) if (!isset($eventBuffer[$driverId])) { $eventBuffer[$driverId] = []; } if ($needHmset) { $eventBuffer[$driverId]['hmset'] = [ 'id' => $driverId, 'heading' => $heading, 'speed' => $speed, 'status' => $status, 'updated_at' => $now ]; $state['speed'] = $speedMs; $state['heading'] = $heading; } if ($needExpireRefresh || $needHmset) { $eventBuffer[$driverId]['expire'] = 900; $state['expire_ts'] = $now; } if ($statusChanged) { $eventBuffer[$driverId]['status_change'] = [ 'old' => $state['status'], 'new' => $status ]; $state['status'] = $status; // Auto disconnect if blocked if ($status === 'blocked') { if (isset($connectedDrivers[$driverId])) { $connectedDrivers[$driverId]['conn']->disconnect(); unset($connectedDrivers[$driverId]); } } } if ($needGeoadd || $statusChanged) { $eventBuffer[$driverId]['geoadd'] = [ 'status' => $status, 'lng' => $lng, 'lat' => $lat ]; if ($needGeoadd) { $state['lat'] = $lat; $state['lng'] = $lng; } } }); $socket->on('disconnect', function () use ($driverId) { global $connectedDrivers, $driverState, $fwdThrottle; unset($connectedDrivers[$driverId]); unset($driverState[$driverId]); unset($fwdThrottle[$driverId]); logMsg("❌ Driver Disconnected: #$driverId"); }); }); Worker::runAll();