Deploy on 2026-06-05 17:13:07
This commit is contained in:
@@ -8,6 +8,7 @@ class AiAnalyzer
|
|||||||
{
|
{
|
||||||
private ?string $apiKey;
|
private ?string $apiKey;
|
||||||
private string $model;
|
private string $model;
|
||||||
|
private static ?float $lastCallTime = null;
|
||||||
|
|
||||||
public function __construct()
|
public function __construct()
|
||||||
{
|
{
|
||||||
@@ -108,6 +109,16 @@ PROMPT;
|
|||||||
*/
|
*/
|
||||||
private function callGemini(string $prompt): string
|
private function callGemini(string $prompt): string
|
||||||
{
|
{
|
||||||
|
if (self::$lastCallTime !== null) {
|
||||||
|
$elapsed = microtime(true) - self::$lastCallTime;
|
||||||
|
$minInterval = 4.5; // Space out requests to under 15 RPM
|
||||||
|
if ($elapsed < $minInterval) {
|
||||||
|
$sleepTime = $minInterval - $elapsed;
|
||||||
|
usleep((int)($sleepTime * 1000000));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self::$lastCallTime = microtime(true);
|
||||||
|
|
||||||
$url = "https://generativelanguage.googleapis.com/v1beta/models/{$this->model}:generateContent?key={$this->apiKey}";
|
$url = "https://generativelanguage.googleapis.com/v1beta/models/{$this->model}:generateContent?key={$this->apiKey}";
|
||||||
|
|
||||||
$payload = json_encode([
|
$payload = json_encode([
|
||||||
|
|||||||
@@ -16,6 +16,10 @@ class Collector
|
|||||||
private ActivityLogger $logger;
|
private ActivityLogger $logger;
|
||||||
private TelegramNotifier $notifier;
|
private TelegramNotifier $notifier;
|
||||||
|
|
||||||
|
private $lockHandle = null;
|
||||||
|
private int $maxNewPerFeed = 5;
|
||||||
|
private int $maxNewTotal = 100;
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
Connection $connection,
|
Connection $connection,
|
||||||
RssParser $rssParser,
|
RssParser $rssParser,
|
||||||
@@ -35,6 +39,10 @@ class Collector
|
|||||||
*/
|
*/
|
||||||
public function collectAll(): array
|
public function collectAll(): array
|
||||||
{
|
{
|
||||||
|
if (!$this->acquireLock()) {
|
||||||
|
throw new \Exception("Another collector run is currently in progress.");
|
||||||
|
}
|
||||||
|
|
||||||
$results = [
|
$results = [
|
||||||
'total_sources' => 0,
|
'total_sources' => 0,
|
||||||
'processed' => 0,
|
'processed' => 0,
|
||||||
@@ -44,46 +52,66 @@ class Collector
|
|||||||
'details' => [],
|
'details' => [],
|
||||||
];
|
];
|
||||||
|
|
||||||
$sources = $this->getActiveSources();
|
try {
|
||||||
|
$this->maxNewPerFeed = (int)($this->getSetting('crawler_max_new_per_feed') ?: 5);
|
||||||
|
$this->maxNewTotal = (int)($this->getSetting('crawler_max_new_total') ?: 100);
|
||||||
|
|
||||||
foreach ($sources as $source) {
|
$sources = $this->getActiveSources();
|
||||||
$results['total_sources']++;
|
$totalNewProcessed = 0;
|
||||||
try {
|
|
||||||
$result = $this->collectSource($source);
|
foreach ($sources as $source) {
|
||||||
$results['processed']++;
|
$results['total_sources']++;
|
||||||
$results['new_opportunities'] += $result['opportunities'];
|
|
||||||
$results['new_organizations'] += $result['organizations'];
|
if ($totalNewProcessed >= $this->maxNewTotal) {
|
||||||
$results['details'][] = [
|
$results['details'][] = [
|
||||||
'source' => $source['name'],
|
'source' => $source['name'],
|
||||||
'type' => $source['type'],
|
'type' => $source['type'],
|
||||||
'status' => 'success',
|
'status' => 'skipped',
|
||||||
'entries_found' => $result['entries_found'],
|
'reason' => 'Global limit reached',
|
||||||
'new_opportunities' => $result['opportunities'],
|
];
|
||||||
'new_organizations' => $result['organizations'],
|
continue;
|
||||||
];
|
}
|
||||||
} catch (Throwable $e) {
|
|
||||||
$results['errors']++;
|
try {
|
||||||
$results['details'][] = [
|
$result = $this->collectSource($source, $totalNewProcessed);
|
||||||
'source' => $source['name'],
|
$results['processed']++;
|
||||||
'type' => $source['type'],
|
$results['new_opportunities'] += $result['opportunities'];
|
||||||
'status' => 'error',
|
$results['new_organizations'] += $result['organizations'];
|
||||||
'error' => $e->getMessage(),
|
$results['details'][] = [
|
||||||
];
|
'source' => $source['name'],
|
||||||
|
'type' => $source['type'],
|
||||||
|
'status' => 'success',
|
||||||
|
'entries_found' => $result['entries_found'],
|
||||||
|
'new_opportunities' => $result['opportunities'],
|
||||||
|
'new_organizations' => $result['organizations'],
|
||||||
|
];
|
||||||
|
} catch (Throwable $e) {
|
||||||
|
$results['errors']++;
|
||||||
|
$results['details'][] = [
|
||||||
|
'source' => $source['name'],
|
||||||
|
'type' => $source['type'],
|
||||||
|
'status' => 'error',
|
||||||
|
'error' => $e->getMessage(),
|
||||||
|
];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
$this->logger->log(null, 'collector_run', 'Collector completed: ' . json_encode([
|
$this->logger->log(null, 'collector_run', 'Collector completed: ' . json_encode([
|
||||||
'total_sources' => $results['total_sources'],
|
'total_sources' => $results['total_sources'],
|
||||||
'processed' => $results['processed'],
|
'processed' => $results['processed'],
|
||||||
'errors' => $results['errors'],
|
'errors' => $results['errors'],
|
||||||
'new_opportunities' => $results['new_opportunities'],
|
'new_opportunities' => $results['new_opportunities'],
|
||||||
'new_organizations' => $results['new_organizations'],
|
'new_organizations' => $results['new_organizations'],
|
||||||
]));
|
]));
|
||||||
|
|
||||||
// Send Telegram notification if enabled
|
// Send Telegram notification if enabled
|
||||||
if ($this->getSetting('telegram_enabled') === '1') {
|
if ($this->getSetting('telegram_enabled') === '1') {
|
||||||
$this->notifier->loadSettings();
|
$this->notifier->loadSettings();
|
||||||
$this->notifier->notifyCollectorResults($results);
|
$this->notifier->notifyCollectorResults($results);
|
||||||
|
}
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
$this->releaseLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
return $results;
|
return $results;
|
||||||
@@ -92,7 +120,7 @@ class Collector
|
|||||||
/**
|
/**
|
||||||
* Collect from a single source.
|
* Collect from a single source.
|
||||||
*/
|
*/
|
||||||
public function collectSource(array $source): array
|
public function collectSource(array $source, int &$totalNewProcessed): array
|
||||||
{
|
{
|
||||||
$result = [
|
$result = [
|
||||||
'entries_found' => 0,
|
'entries_found' => 0,
|
||||||
@@ -104,14 +132,60 @@ class Collector
|
|||||||
$entries = $this->rssParser->fetchEntries($source['url']);
|
$entries = $this->rssParser->fetchEntries($source['url']);
|
||||||
$result['entries_found'] = count($entries);
|
$result['entries_found'] = count($entries);
|
||||||
|
|
||||||
|
$newEntriesInSource = 0;
|
||||||
foreach ($entries as $entry) {
|
foreach ($entries as $entry) {
|
||||||
|
if ($totalNewProcessed >= $this->maxNewTotal) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($this->rssParser->entryExists($entry['url'])) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($newEntriesInSource >= $this->maxNewPerFeed) {
|
||||||
|
break; // stop processing entries since they are sorted newest-first
|
||||||
|
}
|
||||||
|
|
||||||
$this->processEntry($entry, $source, $result);
|
$this->processEntry($entry, $source, $result);
|
||||||
|
$newEntriesInSource++;
|
||||||
|
$totalNewProcessed++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return $result;
|
return $result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquire run lock.
|
||||||
|
*/
|
||||||
|
private function acquireLock(): bool
|
||||||
|
{
|
||||||
|
$lockFile = __DIR__ . '/../../../collector.lock';
|
||||||
|
$this->lockHandle = @fopen($lockFile, 'c');
|
||||||
|
if (!$this->lockHandle) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!flock($this->lockHandle, LOCK_EX | LOCK_NB)) {
|
||||||
|
fclose($this->lockHandle);
|
||||||
|
$this->lockHandle = null;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release run lock.
|
||||||
|
*/
|
||||||
|
private function releaseLock(): void
|
||||||
|
{
|
||||||
|
if ($this->lockHandle) {
|
||||||
|
flock($this->lockHandle, LOCK_UN);
|
||||||
|
fclose($this->lockHandle);
|
||||||
|
@unlink(__DIR__ . '/../../../collector.lock');
|
||||||
|
$this->lockHandle = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process a single entry: analyze, save opportunity, save organization.
|
* Process a single entry: analyze, save opportunity, save organization.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -89,6 +89,8 @@ class DatabaseSeeder
|
|||||||
['key' => 'system_email', 'value' => 'info@scoutiq.intaleqapp.com', 'description' => 'Primary sender email address.'],
|
['key' => 'system_email', 'value' => 'info@scoutiq.intaleqapp.com', 'description' => 'Primary sender email address.'],
|
||||||
['key' => 'crawler_enabled', 'value' => 'true', 'description' => 'Global toggle for data collection crawlers.'],
|
['key' => 'crawler_enabled', 'value' => 'true', 'description' => 'Global toggle for data collection crawlers.'],
|
||||||
['key' => 'crawler_interval_hours', 'value' => '24', 'description' => 'Delay between crawler runs.'],
|
['key' => 'crawler_interval_hours', 'value' => '24', 'description' => 'Delay between crawler runs.'],
|
||||||
|
['key' => 'crawler_max_new_per_feed', 'value' => '5', 'description' => 'Maximum new opportunities to process per feed in a single run.'],
|
||||||
|
['key' => 'crawler_max_new_total', 'value' => '100', 'description' => 'Maximum total new opportunities to process in a single collector run.'],
|
||||||
];
|
];
|
||||||
|
|
||||||
foreach ($settings as $setting) {
|
foreach ($settings as $setting) {
|
||||||
|
|||||||
Reference in New Issue
Block a user