Update: 2026-06-10 18:11:50

This commit is contained in:
Hamza-Ayed
2026-06-10 18:11:50 +03:00
parent a0473a8b0f
commit 977adfe99d
27 changed files with 3946 additions and 206 deletions

View File

@@ -0,0 +1,547 @@
<?php
/**
* driver_socket.php
* ==================
* WebSocket Server للسائقين — بورت 2020
* Internal HTTP Server — بورت 2021
*
* 🚀 Level 2 Architecture (Production Ready):
* - Event Buffering (Batching)
* - Redis Pipelines (تقليل الـ I/O والـ Latency بشكل كبير)
* - Memory State Cache للسائقين
* - جميع طرق HTTP (Dispatch, Market, Force Disconnect...) موجودة بالكامل
*/
use Workerman\Worker;
use Workerman\Timer;
use Workerman\Http\Client as AsyncHttp;
use PHPSocketIO\SocketIO;
use Predis\Client as RedisClient;
require_once __DIR__ . '/vendor/autoload.php';
// ============================================================
// ⚙️ إعدادات عامة
// ============================================================
ini_set('memory_limit', '512M');
date_default_timezone_set('Asia/Amman');
// ── Tunables (إعدادات الأداء) ──────────────────────────────────
const MIN_MOVE_METERS = 10.0; // GEOADD فقط إذا تحرك أكثر من 10 متر
const HMSET_SPEED_DELTA = 1.0; // فرق السرعة المطلوب لتحديث Redis
const HMSET_HEADING_DELTA = 5.0; // فرق الاتجاه المطلوب لتحديث Redis
const EXPIRE_REFRESH_SECONDS = 300; // 5 دقائق لتجديد الـ TTL
const FORWARD_MIN_METERS = 15.0; // HTTP forward للراكب
const FORWARD_MAX_SECONDS = 3; // أقصى مدة للـ Forward
const REDIS_BATCH_INTERVAL = 0.5; // تنفيذ مجمّع (Batch) كل نصف ثانية (500ms)
// ─────────────────────────────────────────────────────────────
function logMsg(string $msg): void {
echo '[' . date('Y-m-d H:i:s') . '] ' . $msg . PHP_EOL;
}
function loadEnvironment(string $filePath): void {
if (!file_exists($filePath)) {
logMsg("⚠️ .env not found: $filePath");
return;
}
foreach (file($filePath, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES) as $line) {
if (str_starts_with(trim($line), '#') || !str_contains($line, '=')) continue;
[$name, $value] = explode('=', $line, 2);
putenv(trim($name) . '=' . trim($value, "\"'"));
}
logMsg('✅ Environment loaded.');
}
loadEnvironment('/home/location/env/.env');
// ============================================================
// 🔐 مفاتيح الأمان
// ============================================================
$INTERNAL_KEY = trim((string) @file_get_contents('/home/location/.internal_socket_key'));
$redisPass = trim((string) @file_get_contents('/home/location/.reds_pass_key'));
if (empty($INTERNAL_KEY)) logMsg('❌ CRITICAL: Internal key missing!');
if (empty($redisPass)) logMsg('❌ CRITICAL: Redis password missing!');
// ============================================================
// 🗄️ Redis Singleton
// ============================================================
$redis = null;
function getRedis(): ?RedisClient {
global $redis, $redisPass;
if ($redis !== null) {
try {
$redis->ping();
return $redis;
} catch (\Exception $e) {
logMsg('⚠️ Redis ping failed, reconnecting...');
$redis = null;
}
}
try {
$client = new RedisClient([
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
'password' => $redisPass,
'read_write_timeout' => 0,
]);
$client->connect();
$redis = $client;
return $redis;
} catch (\Exception $e) {
logMsg('❌ Redis Error: ' . $e->getMessage());
return null;
}
}
// ============================================================
// 📐 Haversine Distance (متر)
// ============================================================
function haversineDistance(float $lat1, float $lng1, float $lat2, float $lng2): float {
$R = 6371000;
$dLat = deg2rad($lat2 - $lat1);
$dLng = deg2rad($lng2 - $lng1);
$a = sin($dLat / 2) ** 2
+ cos(deg2rad($lat1)) * cos(deg2rad($lat2)) * sin($dLng / 2) ** 2;
return $R * 2 * atan2(sqrt($a), sqrt(1 - $a));
}
// ============================================================
// 📡 Forward موقع السائق → سيرفر الراكب (ASYNC)
// ============================================================
function forwardLocationToPassengerSocket(
string $driverId,
string $passengerId,
array $payload,
string $internalKey,
array &$fwdThrottle
): void {
if (empty($passengerId)) return;
$now = time();
$last = $fwdThrottle[$driverId] ?? null;
if ($last !== null) {
$timeDiff = $now - $last['ts'];
$dist = haversineDistance(
$last['lat'], $last['lng'],
(float)$payload['lat'], (float)$payload['lng']
);
if ($dist < FORWARD_MIN_METERS && $timeDiff < FORWARD_MAX_SECONDS) return;
}
$fwdThrottle[$driverId] = [
'ts' => $now,
'lat' => (float)$payload['lat'],
'lng' => (float)$payload['lng'],
];
$http = new AsyncHttp();
$http->request(
'http://127.0.0.1:3031',
[
'method' => 'POST',
'data' => http_build_query([
'action' => 'update_driver_location',
'passenger_id' => $passengerId,
'payload' => json_encode($payload),
]),
'headers' => [
'Content-Type' => 'application/x-www-form-urlencoded',
'x-internal-key' => $internalKey,
'Connection' => 'close',
],
'timeout' => 3,
],
null,
fn(\Exception $e) => logMsg('⚠️ Forward failed: ' . $e->getMessage())
);
}
// ============================================================
// 📲 FCM (ASYNC)
// ============================================================
function sendFCM_Async(string $token, string $title, string $body, array $rideData): void {
if (empty($token)) return;
$http = new AsyncHttp();
$http->request(
'https://api.intaleq.xyz/intaleq/ride/firebase/send_fcm.php',
[
'method' => 'POST',
'data' => json_encode([
'target' => $token,
'title' => $title,
'body' => $body,
'isTopic' => false,
'category' => 'Order',
'tone' => 'start',
'passengerList' => json_encode($rideData),
]),
'headers' => ['Content-Type' => 'application/json; charset=UTF-8'],
'timeout' => 5,
],
null,
fn(\Exception $e) => logMsg('⚠️ FCM failed: ' . $e->getMessage())
);
}
// ============================================================
// 🧠 Memory State & Event Buffer
// ============================================================
$connectedDrivers = [];
$active_orders_drivers = [];
$driverState = [];
$fwdThrottle = [];
$eventBuffer = []; // 🚀 Level 2: مصفوفة تجميع الأحداث لـ Redis
// ============================================================
// 🚀 Socket.IO — بورت 2020
// ============================================================
$io = new SocketIO(2020);
// ============================================================
// A. Internal HTTP Server & Redis Batch Processor (Worker Start)
// ============================================================
$io->on('workerStart', function () use ($io, $INTERNAL_KEY) {
// 🚀 1. Redis Pipeline Batch Processor (Level 2)
// يعمل كل نصف ثانية، يجمع كل الأوامر ويرسلها لـ Redis دفعة واحدة
Timer::add(REDIS_BATCH_INTERVAL, function() {
global $eventBuffer;
if (empty($eventBuffer)) return;
$redis = getRedis();
if (!$redis) return;
try {
$pipe = $redis->pipeline();
$processedCount = 0;
foreach ($eventBuffer as $driverId => $ops) {
$profileKey = "driver:profile:$driverId";
$processedCount++;
if (isset($ops['hmset'])) {
$pipe->hmset($profileKey, $ops['hmset']);
}
if (isset($ops['expire'])) {
$pipe->expire($profileKey, $ops['expire']);
}
if (isset($ops['status_change'])) {
$oldStatus = $ops['status_change']['old'];
$newStatus = $ops['status_change']['new'];
if ($oldStatus === 'on') $pipe->zrem('geo:drivers:busy', $driverId);
if ($oldStatus === 'off') $pipe->zrem('geo:drivers:available', $driverId);
if ($newStatus === 'close' || $newStatus === 'blocked') {
$pipe->zrem('geo:drivers:available', $driverId);
$pipe->zrem('geo:drivers:busy', $driverId);
}
}
if (isset($ops['geoadd'])) {
$st = $ops['geoadd']['status'];
$lng = $ops['geoadd']['lng'];
$lat = $ops['geoadd']['lat'];
if ($st === 'off') {
$pipe->geoadd('geo:drivers:available', $lng, $lat, $driverId);
} elseif ($st === 'on') {
$pipe->geoadd('geo:drivers:busy', $lng, $lat, $driverId);
}
}
}
$pipe->execute();
$eventBuffer = []; // إفراغ المصفوفة بعد التنفيذ الناجح
// logMsg("⚡ Processed Redis Batch: $processedCount drivers updated in 1 network call.");
} catch (\Exception $e) {
logMsg("⚠️ Redis Pipeline Error: " . $e->getMessage());
}
});
// 🌐 2. Internal HTTP Server — بورت 2021
$innerHttp = new Worker('http://0.0.0.0:2021');
$innerHttp->onMessage = function ($connection, $request) use ($io, $INTERNAL_KEY) {
global $active_orders_drivers, $connectedDrivers;
$headers = $request->header();
if (($headers['x-internal-key'] ?? '') !== $INTERNAL_KEY) {
$connection->send('Unauthorized');
return;
}
$post = $request->post();
$action = trim($post['action'] ?? '');
$redis = getRedis();
// ── 1. Dispatch Order ────────────────────────────────
if ($action === 'dispatch_order') {
$rideId = $post['ride_id'] ?? null;
$drivers = json_decode($post['drivers_ids'] ?? '[]', true);
$payload = $post['payload'] ?? [];
if (is_array($payload)) $payload = array_values($payload);
if ($rideId && !empty($drivers)) {
$active_orders_drivers[$rideId] = $drivers;
logMsg("🚀 Dispatch Ride #$rideId" . count($drivers) . ' drivers.');
}
foreach ($drivers as $driverId) {
if (!isset($connectedDrivers[$driverId])) continue;
$io->to('driver_' . $driverId)->emit('new_ride_request', $payload);
$platform = $connectedDrivers[$driverId]['platform'] ?? 'android';
$token = $connectedDrivers[$driverId]['token'] ?? '';
if ($platform === 'ios' && !empty($token)) {
sendFCM_Async($token, 'طلب جديد', 'لديك رحلة جديدة قريبة منك', $payload);
}
}
$connection->send('Dispatched');
// ── 2. Market New Ride ────────────────────────────────
} elseif ($action === 'market_new_ride') {
$payload = $post['payload'] ?? [];
$rideId = $payload['id'] ?? null;
$lat = (float)($payload['start_lat'] ?? 0);
$lng = (float)($payload['start_lng'] ?? 0);
if (!$redis || !$rideId || $lat == 0 || $lng == 0) {
$connection->send('Error: Redis unavailable or invalid coords');
return;
}
$redis->geoadd('geo:rides:waiting', $lng, $lat, $rideId);
$nearbyDrivers = $redis->georadius('geo:drivers:available', $lng, $lat, 50, 'km');
$count = 0;
foreach ($nearbyDrivers as $driverId) {
if (isset($connectedDrivers[$driverId])) {
$io->to('driver_' . $driverId)->emit('market_new_ride', $payload);
$count++;
}
}
logMsg("📢 Market Ride #$rideId$count drivers.");
$connection->send("Broadcasted to $count drivers");
// ── 3. Get Nearby Ride IDs ────────────────────────────
} elseif ($action === 'get_nearby_ride_ids') {
$lat = (float)($post['lat'] ?? 0);
$lng = (float)($post['lng'] ?? 0);
$radius = (float)($post['radius'] ?? 9);
if (!$redis) { $connection->send(json_encode([])); return; }
$results = $redis->georadius(
'geo:rides:waiting', $lng, $lat, $radius, 'km',
['WITHDIST' => true, 'SORT' => 'ASC', 'COUNT' => 40]
);
$connection->send(json_encode($results));
// ── 4. Ride Taken ─────────────────────────────────────
} elseif ($action === 'ride_taken_event') {
$rideId = $post['ride_id'] ?? null;
$winnerDriverId = $post['taken_by_driver_id'] ?? null;
if (!$rideId) { $connection->send('Error: Missing ride_id'); return; }
if ($redis) $redis->zrem('geo:rides:waiting', $rideId);
$io->emit('ride_taken', [
'ride_id' => $rideId,
'taken_by_driver_id' => $winnerDriverId,
]);
unset($active_orders_drivers[$rideId]);
logMsg("✅ Ride #$rideId taken by #$winnerDriverId.");
$connection->send('OK');
// ── 5. Force Disconnect ───────────────────────────────
} elseif ($action === 'force_disconnect') {
$driverId = $post['driver_id'] ?? null;
if ($driverId && isset($connectedDrivers[$driverId])) {
$connectedDrivers[$driverId]['conn']->disconnect();
unset($connectedDrivers[$driverId]);
if ($redis) {
$redis->zrem('geo:drivers:available', $driverId);
$redis->zrem('geo:drivers:busy', $driverId);
}
logMsg("🚫 Driver #$driverId force-disconnected.");
$connection->send('Disconnected');
} else {
$connection->send('Driver not connected');
}
} else {
$connection->send('Unknown action');
}
};
$innerHttp->listen();
});
// ============================================================
// B. WebSocket Events للسائقين
// ============================================================
$io->on('connection', function ($socket) use ($INTERNAL_KEY) {
global $connectedDrivers, $driverState, $fwdThrottle, $eventBuffer;
$query = $socket->handshake['query'] ?? [];
$driverId = $query['driver_id'] ?? null;
$platform = $query['platform'] ?? 'android';
$token = $query['token'] ?? '';
if (!$driverId) {
$socket->disconnect();
return;
}
$socket->join('driver_' . $driverId);
$connectedDrivers[$driverId] = [
'conn' => $socket,
'platform' => $platform,
'token' => $token,
];
if (!isset($driverState[$driverId])) {
$driverState[$driverId] = [
'lat' => 0.0,
'lng' => 0.0,
'speed' => -999.0,
'heading' => -999.0,
'status' => '',
'expire_ts' => 0,
];
}
logMsg("✅ Driver Connected: #$driverId ($platform)");
$socket->on('ping_alive', function () {
// Socket.IO handles pong automatically
});
$socket->on('update_location', function ($data)
use ($driverId, $INTERNAL_KEY, &$driverState, &$fwdThrottle, &$eventBuffer)
{
global $connectedDrivers;
$data = (array) $data;
$lat = isset($data['lat']) ? (float)$data['lat'] : null;
$lng = isset($data['lng']) ? (float)$data['lng'] : null;
$heading = (float)($data['heading'] ?? 0);
$speed = (float)($data['speed'] ?? 0);
$status = (string)($data['status'] ?? 'off');
$distance = (float)($data['distance'] ?? 0);
$passengerId = (string)($data['passenger_id'] ?? '');
$rideId = $data['ride_id'] ?? null;
if ($lat === null || $lng === null) return;
$state = &$driverState[$driverId];
$now = time();
// 1. Forward للراكب (ASYNC + throttle)
if (!empty($passengerId)) {
forwardLocationToPassengerSocket(
$driverId, $passengerId,
[
'latitude' => $lat,
'longitude' => $lng,
'heading' => $heading,
'speed' => $speed,
'ride_id' => $rideId,
'driver_id' => $driverId,
],
$INTERNAL_KEY, $fwdThrottle
);
}
// 2. حساب ماذا تغيّر لتجنب ضغط Redis
$movedMeters = ($state['lat'] == 0.0 && $state['lng'] == 0.0)
? 999.0
: haversineDistance($state['lat'], $state['lng'], $lat, $lng);
$didMove = $movedMeters >= MIN_MOVE_METERS;
$speedMs = $speed / 3.6;
$speedChanged = abs($speedMs - $state['speed']) >= HMSET_SPEED_DELTA;
$headingChanged = abs($heading - $state['heading']) >= HMSET_HEADING_DELTA;
$statusChanged = ($status !== $state['status']);
$needHmset = $speedChanged || $headingChanged || $statusChanged;
$needGeoadd = $didMove;
$needExpireRefresh = ($now - $state['expire_ts']) >= EXPIRE_REFRESH_SECONDS;
if (!$needHmset && (!$needGeoadd && !$statusChanged) && !$needExpireRefresh) {
return; // لم يتغير شيء مهم، تجاهل تماماً (0 عمليات Redis)
}
// 🚀 3. Buffering Event بدل الإرسال المباشر لـ Redis (Level 2 Magic)
if (!isset($eventBuffer[$driverId])) {
$eventBuffer[$driverId] = [];
}
if ($needHmset) {
$eventBuffer[$driverId]['hmset'] = [
'id' => $driverId, 'heading' => $heading, 'speed' => $speed, 'status' => $status, 'updated_at' => $now
];
$state['speed'] = $speedMs;
$state['heading'] = $heading;
}
if ($needExpireRefresh || $needHmset) {
$eventBuffer[$driverId]['expire'] = 900;
$state['expire_ts'] = $now;
}
if ($statusChanged) {
$eventBuffer[$driverId]['status_change'] = [
'old' => $state['status'],
'new' => $status
];
$state['status'] = $status;
// Auto disconnect if blocked
if ($status === 'blocked') {
if (isset($connectedDrivers[$driverId])) {
$connectedDrivers[$driverId]['conn']->disconnect();
unset($connectedDrivers[$driverId]);
}
}
}
if ($needGeoadd || $statusChanged) {
$eventBuffer[$driverId]['geoadd'] = [
'status' => $status,
'lng' => $lng,
'lat' => $lat
];
if ($needGeoadd) {
$state['lat'] = $lat;
$state['lng'] = $lng;
}
}
});
$socket->on('disconnect', function () use ($driverId) {
global $connectedDrivers, $driverState, $fwdThrottle;
unset($connectedDrivers[$driverId]);
unset($driverState[$driverId]);
unset($fwdThrottle[$driverId]);
logMsg("❌ Driver Disconnected: #$driverId");
});
});
Worker::runAll();