bin2hex(random_bytes(16)), 'type' => $type, 'payload' => $payload, 'priority' => $priority, 'attempts' => 0, 'created_at' => time() ]; try { $redis = Redis::getInstance(); $redis->lpush(self::REDIS_QUEUE, json_encode($job)); } catch (\Throwable $e) { // Fallback to MySQL self::pushToDatabase($job); } } private static function pushToDatabase(array $job): void { $db = Database::getInstance(); $stmt = $db->prepare("INSERT INTO queue_jobs (id, type, payload, priority, status) VALUES (?, ?, ?, ?, 'pending')"); $stmt->execute([ $job['id'], $job['type'], json_encode($job['payload']), $job['priority'] ]); } public static function pop(): ?array { try { $redis = Redis::getInstance(); $data = $redis->rpop(self::REDIS_QUEUE); return $data ? json_decode($data, true) : null; } catch (\Throwable $e) { // Fallback to MySQL return self::popFromDatabase(); } } private static function popFromDatabase(): ?array { $db = Database::getInstance(); $db->beginTransaction(); try { $stmt = $db->prepare("SELECT * FROM queue_jobs WHERE status = 'pending' ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE"); $stmt->execute(); $job = $stmt->fetch(); if ($job) { $db->prepare("UPDATE queue_jobs SET status = 'processing', locked_at = NOW() WHERE id = ?")->execute([$job['id']]); $db->commit(); return [ 'id' => $job['id'], 'type' => $job['type'], 'payload' => json_decode($job['payload'], true), 'attempts' => $job['attempts'] ]; } $db->commit(); } catch (\Throwable $e) { $db->rollBack(); } return null; } }