🚀 مُصادَق: الإطلاق الأولي للنظام المتكامل
This commit is contained in:
83
app/Services/QueueService.php
Normal file
83
app/Services/QueueService.php
Normal file
@@ -0,0 +1,83 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Services;
|
||||
|
||||
use App\Core\Redis;
|
||||
use App\Core\Database;
|
||||
|
||||
final class QueueService
|
||||
{
|
||||
private const REDIS_QUEUE = 'musadaq_jobs';
|
||||
|
||||
public static function push(string $type, array $payload, int $priority = 0, int $delay = 0): void
|
||||
{
|
||||
$job = [
|
||||
'id' => bin2hex(random_bytes(16)),
|
||||
'type' => $type,
|
||||
'payload' => $payload,
|
||||
'priority' => $priority,
|
||||
'attempts' => 0,
|
||||
'created_at' => time()
|
||||
];
|
||||
|
||||
try {
|
||||
$redis = Redis::getInstance();
|
||||
$redis->lpush(self::REDIS_QUEUE, json_encode($job));
|
||||
} catch (\Throwable $e) {
|
||||
// Fallback to MySQL
|
||||
self::pushToDatabase($job);
|
||||
}
|
||||
}
|
||||
|
||||
private static function pushToDatabase(array $job): void
|
||||
{
|
||||
$db = Database::getInstance();
|
||||
$stmt = $db->prepare("INSERT INTO queue_jobs (id, type, payload, priority, status) VALUES (?, ?, ?, ?, 'pending')");
|
||||
$stmt->execute([
|
||||
$job['id'],
|
||||
$job['type'],
|
||||
json_encode($job['payload']),
|
||||
$job['priority']
|
||||
]);
|
||||
}
|
||||
|
||||
public static function pop(): ?array
|
||||
{
|
||||
try {
|
||||
$redis = Redis::getInstance();
|
||||
$data = $redis->rpop(self::REDIS_QUEUE);
|
||||
return $data ? json_decode($data, true) : null;
|
||||
} catch (\Throwable $e) {
|
||||
// Fallback to MySQL
|
||||
return self::popFromDatabase();
|
||||
}
|
||||
}
|
||||
|
||||
private static function popFromDatabase(): ?array
|
||||
{
|
||||
$db = Database::getInstance();
|
||||
$db->beginTransaction();
|
||||
try {
|
||||
$stmt = $db->prepare("SELECT * FROM queue_jobs WHERE status = 'pending' ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE");
|
||||
$stmt->execute();
|
||||
$job = $stmt->fetch();
|
||||
|
||||
if ($job) {
|
||||
$db->prepare("UPDATE queue_jobs SET status = 'processing', locked_at = NOW() WHERE id = ?")->execute([$job['id']]);
|
||||
$db->commit();
|
||||
return [
|
||||
'id' => $job['id'],
|
||||
'type' => $job['type'],
|
||||
'payload' => json_decode($job['payload'], true),
|
||||
'attempts' => $job['attempts']
|
||||
];
|
||||
}
|
||||
$db->commit();
|
||||
} catch (\Throwable $e) {
|
||||
$db->rollBack();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user