84 lines
2.4 KiB
PHP
84 lines
2.4 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace App\Services;
|
|
|
|
use App\Core\Redis;
|
|
use App\Core\Database;
|
|
|
|
final class QueueService
|
|
{
|
|
private const REDIS_QUEUE = 'musadaq_jobs';
|
|
|
|
public static function push(string $type, array $payload, int $priority = 0, int $delay = 0): void
|
|
{
|
|
$job = [
|
|
'id' => bin2hex(random_bytes(16)),
|
|
'type' => $type,
|
|
'payload' => $payload,
|
|
'priority' => $priority,
|
|
'attempts' => 0,
|
|
'created_at' => time()
|
|
];
|
|
|
|
try {
|
|
$redis = Redis::getInstance();
|
|
$redis->lpush(self::REDIS_QUEUE, json_encode($job));
|
|
} catch (\Throwable $e) {
|
|
// Fallback to MySQL
|
|
self::pushToDatabase($job);
|
|
}
|
|
}
|
|
|
|
private static function pushToDatabase(array $job): void
|
|
{
|
|
$db = Database::getInstance();
|
|
$stmt = $db->prepare("INSERT INTO queue_jobs (id, type, payload, priority, status) VALUES (?, ?, ?, ?, 'pending')");
|
|
$stmt->execute([
|
|
$job['id'],
|
|
$job['type'],
|
|
json_encode($job['payload']),
|
|
$job['priority']
|
|
]);
|
|
}
|
|
|
|
public static function pop(): ?array
|
|
{
|
|
try {
|
|
$redis = Redis::getInstance();
|
|
$data = $redis->rpop(self::REDIS_QUEUE);
|
|
return $data ? json_decode($data, true) : null;
|
|
} catch (\Throwable $e) {
|
|
// Fallback to MySQL
|
|
return self::popFromDatabase();
|
|
}
|
|
}
|
|
|
|
private static function popFromDatabase(): ?array
|
|
{
|
|
$db = Database::getInstance();
|
|
$db->beginTransaction();
|
|
try {
|
|
$stmt = $db->prepare("SELECT * FROM queue_jobs WHERE status = 'pending' ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE");
|
|
$stmt->execute();
|
|
$job = $stmt->fetch();
|
|
|
|
if ($job) {
|
|
$db->prepare("UPDATE queue_jobs SET status = 'processing', locked_at = NOW() WHERE id = ?")->execute([$job['id']]);
|
|
$db->commit();
|
|
return [
|
|
'id' => $job['id'],
|
|
'type' => $job['type'],
|
|
'payload' => json_decode($job['payload'], true),
|
|
'attempts' => $job['attempts']
|
|
];
|
|
}
|
|
$db->commit();
|
|
} catch (\Throwable $e) {
|
|
$db->rollBack();
|
|
}
|
|
return null;
|
|
}
|
|
}
|