pdo = $connection->getPdo(); $this->rssParser = $rssParser; $this->aiAnalyzer = $aiAnalyzer; $this->logger = $logger; $this->notifier = $notifier; } /** * Collect from all active sources. */ public function collectAll(): array { if (!$this->acquireLock()) { throw new \Exception("Another collector run is currently in progress."); } $results = [ 'total_sources' => 0, 'processed' => 0, 'errors' => 0, 'new_opportunities' => 0, 'new_organizations' => 0, 'details' => [], ]; try { $this->maxNewPerFeed = (int)($this->getSetting('crawler_max_new_per_feed') ?: 5); $this->maxNewTotal = (int)($this->getSetting('crawler_max_new_total') ?: 100); $sources = $this->getActiveSources(); $totalNewProcessed = 0; foreach ($sources as $source) { $results['total_sources']++; if ($totalNewProcessed >= $this->maxNewTotal) { $results['details'][] = [ 'source' => $source['name'], 'type' => $source['type'], 'status' => 'skipped', 'reason' => 'Global limit reached', ]; continue; } try { $result = $this->collectSource($source, $totalNewProcessed); $results['processed']++; $results['new_opportunities'] += $result['opportunities']; $results['new_organizations'] += $result['organizations']; $results['details'][] = [ 'source' => $source['name'], 'type' => $source['type'], 'status' => 'success', 'entries_found' => $result['entries_found'], 'new_opportunities' => $result['opportunities'], 'new_organizations' => $result['organizations'], ]; } catch (Throwable $e) { $results['errors']++; $results['details'][] = [ 'source' => $source['name'], 'type' => $source['type'], 'status' => 'error', 'error' => $e->getMessage(), ]; } } $this->logger->log(null, 'collector_run', 'Collector completed: ' . json_encode([ 'total_sources' => $results['total_sources'], 'processed' => $results['processed'], 'errors' => $results['errors'], 'new_opportunities' => $results['new_opportunities'], 'new_organizations' => $results['new_organizations'], ])); // Send Telegram notification if enabled if ($this->getSetting('telegram_enabled') === '1') { $this->notifier->loadSettings(); $this->notifier->notifyCollectorResults($results); } } finally { $this->releaseLock(); } return $results; } /** * Collect from a single source. */ public function collectSource(array $source, int &$totalNewProcessed): array { $result = [ 'entries_found' => 0, 'opportunities' => 0, 'organizations' => 0, ]; if ($source['type'] === 'rss') { $entries = $this->rssParser->fetchEntries($source['url']); $result['entries_found'] = count($entries); $newEntriesInSource = 0; foreach ($entries as $entry) { if ($totalNewProcessed >= $this->maxNewTotal) { break; } if ($this->rssParser->entryExists($entry['url'])) { continue; } if ($newEntriesInSource >= $this->maxNewPerFeed) { break; // stop processing entries since they are sorted newest-first } $this->processEntry($entry, $source, $result); $newEntriesInSource++; $totalNewProcessed++; } } return $result; } /** * Acquire run lock. */ private function acquireLock(): bool { $lockFile = __DIR__ . '/../../../collector.lock'; $this->lockHandle = @fopen($lockFile, 'c'); if (!$this->lockHandle) { return false; } if (!flock($this->lockHandle, LOCK_EX | LOCK_NB)) { fclose($this->lockHandle); $this->lockHandle = null; return false; } return true; } /** * Release run lock. */ private function releaseLock(): void { if ($this->lockHandle) { flock($this->lockHandle, LOCK_UN); fclose($this->lockHandle); @unlink(__DIR__ . '/../../../collector.lock'); $this->lockHandle = null; } } /** * Process a single entry: analyze, save opportunity, save organization. */ private function processEntry(array $entry, array $source, array &$result): void { // Skip if already exists if ($this->rssParser->entryExists($entry['url'])) { return; } // AI Analysis $analysis = $this->aiAnalyzer->analyze($entry['title'], $entry['description']); // Extract organization if any $orgId = null; if (!empty($analysis['organization_name'])) { $orgId = $this->rssParser->organizationExists($analysis['organization_name']); } // If no org found and AI suggests one, try to extract more details if (!$orgId && !empty($analysis['organization_name'])) { $orgData = $this->aiAnalyzer->extractOrganization($entry['title'] . ' ' . $entry['description']); if (!empty($orgData['name'])) { $orgId = $this->createOrganization($orgData); if ($orgId) { $result['organizations']++; } } } // Create opportunity $this->createOpportunity($entry, $analysis, $orgId, $source); $result['opportunities']++; } /** * Create an organization record. */ private function createOrganization(array $data): ?int { try { $stmt = $this->pdo->prepare( "INSERT INTO organizations (name, description, type, country, website_url, crm_status) VALUES (?, ?, ?, ?, ?, 'New')" ); $stmt->execute([ $data['name'], $data['description'] ?? '', $data['type'] ?? 'partner', $data['country'] ?? null, $data['website'] ?? null, ]); return (int)$this->pdo->lastInsertId(); } catch (Throwable $e) { return null; } } /** * Create an opportunity record. */ private function createOpportunity(array $entry, array $analysis, ?int $orgId, array $source): void { try { $score = min(100, max(0, $analysis['score'] ?? 10)); $stmt = $this->pdo->prepare( "INSERT INTO opportunities (title, description, type, organization_id, url, status, score, raw_data) VALUES (?, ?, ?, ?, ?, 'active', ?, ?)" ); $stmt->execute([ $entry['title'], $analysis['summary'] ?? $entry['description'], $analysis['opportunity_type'] ?? $analysis['type'] ?? 'other', $orgId, $entry['url'], $score, json_encode([ 'source_id' => $source['id'] ?? null, 'source_name' => $source['name'] ?? '', 'published_at' => $entry['published_at'], 'categories' => $entry['categories'] ?? [], 'analysis' => $analysis, ]), ]); $opportunityId = (int)$this->pdo->lastInsertId(); // Save tags if (!empty($analysis['tags'])) { foreach ($analysis['tags'] as $tagName) { $tagId = $this->getOrCreateTag($tagName); if ($tagId) { $stmt = $this->pdo->prepare( "INSERT IGNORE INTO opportunity_tags (opportunity_id, tag_id) VALUES (?, ?)" ); $stmt->execute([$opportunityId, $tagId]); } } } // Trigger Telegram notification if enabled if ($this->getSetting('telegram_enabled') === '1') { $minScore = (int)($this->getSetting('telegram_min_score') ?: 75); $oppType = $analysis['opportunity_type'] ?? $analysis['type'] ?? 'other'; $importantTypes = ['vc_funding', 'accelerator', 'incubator', 'grant', 'competition']; if ($score >= $minScore && in_array($oppType, $importantTypes)) { $orgName = ''; if ($orgId) { $orgStmt = $this->pdo->prepare("SELECT name FROM organizations WHERE id = ?"); $orgStmt->execute([$orgId]); $orgName = $orgStmt->fetchColumn() ?: ''; } $this->notifier->loadSettings(); $this->notifier->notifyNewOpportunity([ 'title' => $entry['title'], 'type' => $oppType, 'score' => $score, 'url' => $entry['url'], 'description' => $analysis['summary'] ?? $entry['description'], 'org_name' => $orgName, ]); } } } catch (Throwable $e) { // Log but don't fail } } /** * Get or create a tag. */ private function getOrCreateTag(string $name): ?int { $slug = strtolower(preg_replace('/[^a-z0-9]+/', '-', $name)); $slug = trim($slug, '-'); $stmt = $this->pdo->prepare("SELECT id FROM tags WHERE slug = ?"); $stmt->execute([$slug]); $id = $stmt->fetchColumn(); if ($id) { return (int)$id; } try { $stmt = $this->pdo->prepare("INSERT INTO tags (name, slug) VALUES (?, ?)"); $stmt->execute([$name, $slug]); return (int)$this->pdo->lastInsertId(); } catch (Throwable $e) { return null; } } /** * Get all active sources. */ public function getActiveSources(): array { $stmt = $this->pdo->query( "SELECT s.*, GROUP_CONCAT(sc.category) as categories FROM sources s LEFT JOIN source_categories sc ON sc.source_id = s.id WHERE s.status = 'active' GROUP BY s.id" ); return $stmt->fetchAll() ?: []; } /** * Get a setting by key from the database. */ private function getSetting(string $key): ?string { try { $stmt = $this->pdo->prepare("SELECT `value` FROM settings WHERE `key` = ?"); $stmt->execute([$key]); $val = $stmt->fetchColumn(); return $val !== false ? $val : null; } catch (Throwable $e) { return null; } } }