Files
musadaq-saas/app/Services/QueueService.php

84 lines
2.4 KiB
PHP

<?php
declare(strict_types=1);
namespace App\Services;
use App\Core\Redis;
use App\Core\Database;
final class QueueService
{
private const REDIS_QUEUE = 'musadaq_jobs';
public static function push(string $type, array $payload, int $priority = 0, int $delay = 0): void
{
$job = [
'id' => bin2hex(random_bytes(16)),
'type' => $type,
'payload' => $payload,
'priority' => $priority,
'attempts' => 0,
'created_at' => time()
];
try {
$redis = Redis::getInstance();
$redis->lpush(self::REDIS_QUEUE, json_encode($job));
} catch (\Throwable $e) {
// Fallback to MySQL
self::pushToDatabase($job);
}
}
private static function pushToDatabase(array $job): void
{
$db = Database::getInstance();
$stmt = $db->prepare("INSERT INTO queue_jobs (id, type, payload, priority, status) VALUES (?, ?, ?, ?, 'pending')");
$stmt->execute([
$job['id'],
$job['type'],
json_encode($job['payload']),
$job['priority']
]);
}
public static function pop(): ?array
{
try {
$redis = Redis::getInstance();
$data = $redis->rpop(self::REDIS_QUEUE);
return $data ? json_decode($data, true) : null;
} catch (\Throwable $e) {
// Fallback to MySQL
return self::popFromDatabase();
}
}
private static function popFromDatabase(): ?array
{
$db = Database::getInstance();
$db->beginTransaction();
try {
$stmt = $db->prepare("SELECT * FROM queue_jobs WHERE status = 'pending' ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE");
$stmt->execute();
$job = $stmt->fetch();
if ($job) {
$db->prepare("UPDATE queue_jobs SET status = 'processing', locked_at = NOW() WHERE id = ?")->execute([$job['id']]);
$db->commit();
return [
'id' => $job['id'],
'type' => $job['type'],
'payload' => json_decode($job['payload'], true),
'attempts' => $job['attempts']
];
}
$db->commit();
} catch (\Throwable $e) {
$db->rollBack();
}
return null;
}
}