diff --git a/core/Services/FcmService.php b/core/Services/FcmService.php index 1ea7044..4c32e84 100644 --- a/core/Services/FcmService.php +++ b/core/Services/FcmService.php @@ -57,6 +57,10 @@ class FcmService $payload = [ 'message' => [ 'token' => $token, + 'notification' => [ + 'title' => $title, + 'body' => $body, + ], 'data' => $processedData, 'android' => ['priority' => 'HIGH'], 'apns' => [ diff --git a/driver_socket.php b/driver_socket.php new file mode 100644 index 0000000..9d9becc --- /dev/null +++ b/driver_socket.php @@ -0,0 +1,547 @@ +ping(); + return $redis; + } catch (\Exception $e) { + logMsg('⚠️ Redis ping failed, reconnecting...'); + $redis = null; + } + } + + try { + $client = new RedisClient([ + 'scheme' => 'tcp', + 'host' => '127.0.0.1', + 'port' => 6379, + 'password' => $redisPass, + 'read_write_timeout' => 0, + ]); + $client->connect(); + $redis = $client; + return $redis; + } catch (\Exception $e) { + logMsg('❌ Redis Error: ' . $e->getMessage()); + return null; + } +} + +// ============================================================ +// 📐 Haversine Distance (متر) +// ============================================================ +function haversineDistance(float $lat1, float $lng1, float $lat2, float $lng2): float { + $R = 6371000; + $dLat = deg2rad($lat2 - $lat1); + $dLng = deg2rad($lng2 - $lng1); + $a = sin($dLat / 2) ** 2 + + cos(deg2rad($lat1)) * cos(deg2rad($lat2)) * sin($dLng / 2) ** 2; + return $R * 2 * atan2(sqrt($a), sqrt(1 - $a)); +} + +// ============================================================ +// 📡 Forward موقع السائق → سيرفر الراكب (ASYNC) +// ============================================================ +function forwardLocationToPassengerSocket( + string $driverId, + string $passengerId, + array $payload, + string $internalKey, + array &$fwdThrottle +): void { + if (empty($passengerId)) return; + + $now = time(); + $last = $fwdThrottle[$driverId] ?? null; + + if ($last !== null) { + $timeDiff = $now - $last['ts']; + $dist = haversineDistance( + $last['lat'], $last['lng'], + (float)$payload['lat'], (float)$payload['lng'] + ); + if ($dist < FORWARD_MIN_METERS && $timeDiff < FORWARD_MAX_SECONDS) return; + } + + $fwdThrottle[$driverId] = [ + 'ts' => $now, + 'lat' => (float)$payload['lat'], + 'lng' => (float)$payload['lng'], + ]; + + $http = new AsyncHttp(); + $http->request( + 'http://rides.intaleq.xyz:3031', + [ + 'method' => 'POST', + 'data' => http_build_query([ + 'action' => 'update_driver_location', + 'passenger_id' => $passengerId, + 'payload' => json_encode($payload), + ]), + 'headers' => [ + 'Content-Type' => 'application/x-www-form-urlencoded', + 'x-internal-key' => $internalKey, + 'Connection' => 'close', + ], + 'timeout' => 3, + ], + null, + fn(\Exception $e) => logMsg('⚠️ Forward failed: ' . $e->getMessage()) + ); +} + +// ============================================================ +// 📲 FCM (ASYNC) +// ============================================================ +function sendFCM_Async(string $token, string $title, string $body, array $rideData): void { + if (empty($token)) return; + + $http = new AsyncHttp(); + $http->request( + 'https://api.intaleq.xyz/intaleq/ride/firebase/send_fcm.php', + [ + 'method' => 'POST', + 'data' => json_encode([ + 'target' => $token, + 'title' => $title, + 'body' => $body, + 'isTopic' => false, + 'category' => 'Order', + 'tone' => 'start', + 'passengerList' => json_encode($rideData), + ]), + 'headers' => ['Content-Type' => 'application/json; charset=UTF-8'], + 'timeout' => 5, + ], + null, + fn(\Exception $e) => logMsg('⚠️ FCM failed: ' . $e->getMessage()) + ); +} + +// ============================================================ +// 🧠 Memory State & Event Buffer +// ============================================================ +$connectedDrivers = []; +$active_orders_drivers = []; +$driverState = []; +$fwdThrottle = []; +$eventBuffer = []; // 🚀 Level 2: مصفوفة تجميع الأحداث لـ Redis + +// ============================================================ +// 🚀 Socket.IO — بورت 2020 +// ============================================================ +$io = new SocketIO(2020); + +// ============================================================ +// A. Internal HTTP Server & Redis Batch Processor (Worker Start) +// ============================================================ +$io->on('workerStart', function () use ($io, $INTERNAL_KEY) { + + // 🚀 1. Redis Pipeline Batch Processor (Level 2) + // يعمل كل نصف ثانية، يجمع كل الأوامر ويرسلها لـ Redis دفعة واحدة + Timer::add(REDIS_BATCH_INTERVAL, function() { + global $eventBuffer; + if (empty($eventBuffer)) return; + + $redis = getRedis(); + if (!$redis) return; + + try { + $pipe = $redis->pipeline(); + $processedCount = 0; + + foreach ($eventBuffer as $driverId => $ops) { + $profileKey = "driver:profile:$driverId"; + $processedCount++; + + if (isset($ops['hmset'])) { + $pipe->hmset($profileKey, $ops['hmset']); + } + if (isset($ops['expire'])) { + $pipe->expire($profileKey, $ops['expire']); + } + if (isset($ops['status_change'])) { + $oldStatus = $ops['status_change']['old']; + $newStatus = $ops['status_change']['new']; + + if ($oldStatus === 'on') $pipe->zrem('geo:drivers:busy', $driverId); + if ($oldStatus === 'off') $pipe->zrem('geo:drivers:available', $driverId); + + if ($newStatus === 'close' || $newStatus === 'blocked') { + $pipe->zrem('geo:drivers:available', $driverId); + $pipe->zrem('geo:drivers:busy', $driverId); + } + } + if (isset($ops['geoadd'])) { + $st = $ops['geoadd']['status']; + $lng = $ops['geoadd']['lng']; + $lat = $ops['geoadd']['lat']; + + if ($st === 'off') { + $pipe->geoadd('geo:drivers:available', $lng, $lat, $driverId); + } elseif ($st === 'on') { + $pipe->geoadd('geo:drivers:busy', $lng, $lat, $driverId); + } + } + } + + $pipe->execute(); + $eventBuffer = []; // إفراغ المصفوفة بعد التنفيذ الناجح + // logMsg("⚡ Processed Redis Batch: $processedCount drivers updated in 1 network call."); + + } catch (\Exception $e) { + logMsg("⚠️ Redis Pipeline Error: " . $e->getMessage()); + } + }); + + // 🌐 2. Internal HTTP Server — بورت 2021 + $innerHttp = new Worker('http://0.0.0.0:2021'); + + $innerHttp->onMessage = function ($connection, $request) use ($io, $INTERNAL_KEY) { + global $active_orders_drivers, $connectedDrivers; + + $headers = $request->header(); + if (($headers['x-internal-key'] ?? '') !== $INTERNAL_KEY) { + $connection->send('Unauthorized'); + return; + } + + $post = $request->post(); + $action = trim($post['action'] ?? ''); + $redis = getRedis(); + + // ── 1. Dispatch Order ──────────────────────────────── + if ($action === 'dispatch_order') { + $rideId = $post['ride_id'] ?? null; + $drivers = json_decode($post['drivers_ids'] ?? '[]', true); + $payload = $post['payload'] ?? []; + if (is_array($payload)) $payload = array_values($payload); + + if ($rideId && !empty($drivers)) { + $active_orders_drivers[$rideId] = $drivers; + logMsg("🚀 Dispatch Ride #$rideId → " . count($drivers) . ' drivers.'); + } + + foreach ($drivers as $driverId) { + if (!isset($connectedDrivers[$driverId])) continue; + $io->to('driver_' . $driverId)->emit('new_ride_request', $payload); + + $platform = $connectedDrivers[$driverId]['platform'] ?? 'android'; + $token = $connectedDrivers[$driverId]['token'] ?? ''; + if ($platform === 'ios' && !empty($token)) { + sendFCM_Async($token, 'طلب جديد', 'لديك رحلة جديدة قريبة منك', $payload); + } + } + $connection->send('Dispatched'); + + // ── 2. Market New Ride ──────────────────────────────── + } elseif ($action === 'market_new_ride') { + $payload = $post['payload'] ?? []; + $rideId = $payload['id'] ?? null; + $lat = (float)($payload['start_lat'] ?? 0); + $lng = (float)($payload['start_lng'] ?? 0); + + if (!$redis || !$rideId || $lat == 0 || $lng == 0) { + $connection->send('Error: Redis unavailable or invalid coords'); + return; + } + + $redis->geoadd('geo:rides:waiting', $lng, $lat, $rideId); + $nearbyDrivers = $redis->georadius('geo:drivers:available', $lng, $lat, 50, 'km'); + + $count = 0; + foreach ($nearbyDrivers as $driverId) { + if (isset($connectedDrivers[$driverId])) { + $io->to('driver_' . $driverId)->emit('market_new_ride', $payload); + $count++; + } + } + logMsg("📢 Market Ride #$rideId → $count drivers."); + $connection->send("Broadcasted to $count drivers"); + + // ── 3. Get Nearby Ride IDs ──────────────────────────── + } elseif ($action === 'get_nearby_ride_ids') { + $lat = (float)($post['lat'] ?? 0); + $lng = (float)($post['lng'] ?? 0); + $radius = (float)($post['radius'] ?? 9); + + if (!$redis) { $connection->send(json_encode([])); return; } + + $results = $redis->georadius( + 'geo:rides:waiting', $lng, $lat, $radius, 'km', + ['WITHDIST' => true, 'SORT' => 'ASC', 'COUNT' => 40] + ); + $connection->send(json_encode($results)); + + // ── 4. Ride Taken ───────────────────────────────────── + } elseif ($action === 'ride_taken_event') { + $rideId = $post['ride_id'] ?? null; + $winnerDriverId = $post['taken_by_driver_id'] ?? null; + + if (!$rideId) { $connection->send('Error: Missing ride_id'); return; } + + if ($redis) $redis->zrem('geo:rides:waiting', $rideId); + + $io->emit('ride_taken', [ + 'ride_id' => $rideId, + 'taken_by_driver_id' => $winnerDriverId, + ]); + + unset($active_orders_drivers[$rideId]); + logMsg("✅ Ride #$rideId taken by #$winnerDriverId."); + $connection->send('OK'); + + // ── 5. Force Disconnect ─────────────────────────────── + } elseif ($action === 'force_disconnect') { + $driverId = $post['driver_id'] ?? null; + + if ($driverId && isset($connectedDrivers[$driverId])) { + $connectedDrivers[$driverId]['conn']->disconnect(); + unset($connectedDrivers[$driverId]); + + if ($redis) { + $redis->zrem('geo:drivers:available', $driverId); + $redis->zrem('geo:drivers:busy', $driverId); + } + logMsg("🚫 Driver #$driverId force-disconnected."); + $connection->send('Disconnected'); + } else { + $connection->send('Driver not connected'); + } + + } else { + $connection->send('Unknown action'); + } + }; + + $innerHttp->listen(); +}); + +// ============================================================ +// B. WebSocket Events للسائقين +// ============================================================ +$io->on('connection', function ($socket) use ($INTERNAL_KEY) { + global $connectedDrivers, $driverState, $fwdThrottle, $eventBuffer; + + $query = $socket->handshake['query'] ?? []; + $driverId = $query['driver_id'] ?? null; + $platform = $query['platform'] ?? 'android'; + $token = $query['token'] ?? ''; + + if (!$driverId) { + $socket->disconnect(); + return; + } + + $socket->join('driver_' . $driverId); + $connectedDrivers[$driverId] = [ + 'conn' => $socket, + 'platform' => $platform, + 'token' => $token, + ]; + + if (!isset($driverState[$driverId])) { + $driverState[$driverId] = [ + 'lat' => 0.0, + 'lng' => 0.0, + 'speed' => -999.0, + 'heading' => -999.0, + 'status' => '', + 'expire_ts' => 0, + ]; + } + + logMsg("✅ Driver Connected: #$driverId ($platform)"); + + $socket->on('ping_alive', function () { + // Socket.IO handles pong automatically + }); + + $socket->on('update_location', function ($data) + use ($driverId, $INTERNAL_KEY, &$driverState, &$fwdThrottle, &$eventBuffer) + { + global $connectedDrivers; + + $data = (array) $data; + + $lat = isset($data['lat']) ? (float)$data['lat'] : null; + $lng = isset($data['lng']) ? (float)$data['lng'] : null; + $heading = (float)($data['heading'] ?? 0); + $speed = (float)($data['speed'] ?? 0); + $status = (string)($data['status'] ?? 'off'); + $distance = (float)($data['distance'] ?? 0); + $passengerId = (string)($data['passenger_id'] ?? ''); + $rideId = $data['ride_id'] ?? null; + + if ($lat === null || $lng === null) return; + + $state = &$driverState[$driverId]; + $now = time(); + + // 1. Forward للراكب (ASYNC + throttle) + if (!empty($passengerId)) { + forwardLocationToPassengerSocket( + $driverId, $passengerId, + [ + 'lat' => $lat, + 'lng' => $lng, + 'heading' => $heading, + 'speed' => $speed, + 'ride_id' => $rideId, + 'driver_id' => $driverId, + ], + $INTERNAL_KEY, $fwdThrottle + ); + } + + // 2. حساب ماذا تغيّر لتجنب ضغط Redis + $movedMeters = ($state['lat'] == 0.0 && $state['lng'] == 0.0) + ? 999.0 + : haversineDistance($state['lat'], $state['lng'], $lat, $lng); + + $didMove = $movedMeters >= MIN_MOVE_METERS; + $speedMs = $speed / 3.6; + $speedChanged = abs($speedMs - $state['speed']) >= HMSET_SPEED_DELTA; + $headingChanged = abs($heading - $state['heading']) >= HMSET_HEADING_DELTA; + $statusChanged = ($status !== $state['status']); + + $needHmset = $speedChanged || $headingChanged || $statusChanged; + $needGeoadd = $didMove; + $needExpireRefresh = ($now - $state['expire_ts']) >= EXPIRE_REFRESH_SECONDS; + + if (!$needHmset && (!$needGeoadd && !$statusChanged) && !$needExpireRefresh) { + return; // لم يتغير شيء مهم، تجاهل تماماً (0 عمليات Redis) + } + + // 🚀 3. Buffering Event بدل الإرسال المباشر لـ Redis (Level 2 Magic) + if (!isset($eventBuffer[$driverId])) { + $eventBuffer[$driverId] = []; + } + + if ($needHmset) { + $eventBuffer[$driverId]['hmset'] = [ + 'id' => $driverId, 'heading' => $heading, 'speed' => $speed, 'status' => $status, 'updated_at' => $now + ]; + $state['speed'] = $speedMs; + $state['heading'] = $heading; + } + + if ($needExpireRefresh || $needHmset) { + $eventBuffer[$driverId]['expire'] = 900; + $state['expire_ts'] = $now; + } + + if ($statusChanged) { + $eventBuffer[$driverId]['status_change'] = [ + 'old' => $state['status'], + 'new' => $status + ]; + $state['status'] = $status; + + // Auto disconnect if blocked + if ($status === 'blocked') { + if (isset($connectedDrivers[$driverId])) { + $connectedDrivers[$driverId]['conn']->disconnect(); + unset($connectedDrivers[$driverId]); + } + } + } + + if ($needGeoadd || $statusChanged) { + $eventBuffer[$driverId]['geoadd'] = [ + 'status' => $status, + 'lng' => $lng, + 'lat' => $lat + ]; + if ($needGeoadd) { + $state['lat'] = $lat; + $state['lng'] = $lng; + } + } + }); + + $socket->on('disconnect', function () use ($driverId) { + global $connectedDrivers, $driverState, $fwdThrottle; + + unset($connectedDrivers[$driverId]); + unset($driverState[$driverId]); + unset($fwdThrottle[$driverId]); + + logMsg("❌ Driver Disconnected: #$driverId"); + }); +}); + +Worker::runAll(); \ No newline at end of file diff --git a/functions.php b/functions.php index 779fc63..73eb5d9 100755 --- a/functions.php +++ b/functions.php @@ -234,7 +234,7 @@ function notifyPassengerOnRideServer($passenger_id, $payload) { curl_setopt($ch, CURLOPT_POST, 1); curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($postData)); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); - curl_setopt($ch, CURLOPT_TIMEOUT_MS, 1000); + curl_setopt($ch, CURLOPT_TIMEOUT_MS, 3000); curl_setopt($ch, CURLOPT_HTTPHEADER, [ "x-internal-key: $INTERNAL_KEY" ]); diff --git a/passenger_socket.php b/passenger_socket.php new file mode 100644 index 0000000..97663d8 --- /dev/null +++ b/passenger_socket.php @@ -0,0 +1,116 @@ +on('workerStart', function () use ($io, $INTERNAL_KEY, $INTERNAL_PORT) { + + $innerHttp = new Worker("http://0.0.0.0:$INTERNAL_PORT"); + + $innerHttp->onMessage = function ($connection, $request) use ($io, $INTERNAL_KEY) { + + $headers = $request->header(); + if (($headers['x-internal-key'] ?? '') !== $INTERNAL_KEY) { + $connection->send('Unauthorized'); + return; + } + + $post = $request->post(); + $action = trim($post['action'] ?? ''); + + if ($action === 'update_ride_status') { + + $passengerId = $post['passenger_id'] ?? null; + $rawPayload = $post['payload'] ?? null; + + if (!$passengerId || !$rawPayload) { + $connection->send('Error: Missing passenger_id or payload'); + return; + } + + $payload = is_string($rawPayload) + ? (json_decode($rawPayload, true) ?? $rawPayload) + : $rawPayload; + + $io->to('passenger_' . $passengerId)->emit('ride_status_change', $payload); + + $connection->send('OK'); + echo '[' . date('H:i:s') . '] Status update sent to Passenger #' . $passengerId . PHP_EOL; + + } elseif ($action === 'update_driver_location') { + + $passengerId = $post['passenger_id'] ?? null; + $rawPayload = $post['payload'] ?? null; + + if (!$passengerId || !$rawPayload) { + $connection->send('Error: Missing passenger_id or payload'); + return; + } + + $payload = is_string($rawPayload) + ? (json_decode($rawPayload, true) ?? $rawPayload) + : $rawPayload; + + $io->to('passenger_' . $passengerId)->emit('driver_location_update', $payload); + + $connection->send('OK'); + + } else { + $connection->send('Unknown action: ' . $action); + } + }; + + $innerHttp->listen(); + echo '[' . date('H:i:s') . "] Internal HTTP started on port $INTERNAL_PORT" . PHP_EOL; +}); + +$io->on('connection', function ($socket) { + + $query = $socket->handshake['query'] ?? []; + $passengerId = $query['id'] ?? null; + + if (!$passengerId) { + $socket->disconnect(); + return; + } + + $socket->join('passenger_' . $passengerId); + echo '[' . date('H:i:s') . "] Passenger Connected: #$passengerId" . PHP_EOL; + + $socket->on('heartbeat', function ($data) { + // استقبال النبضة فقط — لا يحتاج رد + }); + + $socket->on('disconnect', function () use ($passengerId) { + echo '[' . date('H:i:s') . "] Passenger Disconnected: #$passengerId" . PHP_EOL; + }); +}); + +Worker::runAll(); \ No newline at end of file diff --git a/ride/location/getDriverCarsLocationToPassengerAfterApplied.php b/ride/location/getDriverCarsLocationToPassengerAfterApplied.php index 05e6b68..6e9572c 100644 --- a/ride/location/getDriverCarsLocationToPassengerAfterApplied.php +++ b/ride/location/getDriverCarsLocationToPassengerAfterApplied.php @@ -6,6 +6,9 @@ require_once __DIR__ . '/../../connect.php'; try { + $con_tracking = Database::get('tracking'); + // $con = Database::get('main'); // Add this just in case as well, to be safe. + $driver_id = filterRequest("driver_id"); if ($driver_id === false || empty($driver_id)) { diff --git a/ride/rides/acceptRide.php b/ride/rides/acceptRide.php index 86402f7..083d29d 100755 --- a/ride/rides/acceptRide.php +++ b/ride/rides/acceptRide.php @@ -158,7 +158,7 @@ try { "تم قبول رحلتك", "الكابتن " . ($driverInfo['driverName'] ?? '') . " في طريقه إليك", ['ride_id' => (string) $rideId, 'driver_info' => $driverInfo], - "ride_accepted", + "Accepted Ride", false ); }