pdo = $connection->getPdo(); $this->rssParser = $rssParser; $this->aiAnalyzer = $aiAnalyzer; $this->logger = $logger; } /** * Collect from all active sources. */ public function collectAll(): array { $results = [ 'total_sources' => 0, 'processed' => 0, 'errors' => 0, 'new_opportunities' => 0, 'new_organizations' => 0, 'details' => [], ]; $sources = $this->getActiveSources(); foreach ($sources as $source) { $results['total_sources']++; try { $result = $this->collectSource($source); $results['processed']++; $results['new_opportunities'] += $result['opportunities']; $results['new_organizations'] += $result['organizations']; $results['details'][] = [ 'source' => $source['name'], 'type' => $source['type'], 'status' => 'success', 'entries_found' => $result['entries_found'], 'new_opportunities' => $result['opportunities'], 'new_organizations' => $result['organizations'], ]; } catch (Throwable $e) { $results['errors']++; $results['details'][] = [ 'source' => $source['name'], 'type' => $source['type'], 'status' => 'error', 'error' => $e->getMessage(), ]; } } $this->logger->log(null, 'collector_run', 'Collector completed: ' . json_encode([ 'total_sources' => $results['total_sources'], 'processed' => $results['processed'], 'errors' => $results['errors'], 'new_opportunities' => $results['new_opportunities'], 'new_organizations' => $results['new_organizations'], ])); return $results; } /** * Collect from a single source. */ public function collectSource(array $source): array { $result = [ 'entries_found' => 0, 'opportunities' => 0, 'organizations' => 0, ]; if ($source['type'] === 'rss') { $entries = $this->rssParser->fetchEntries($source['url']); $result['entries_found'] = count($entries); foreach ($entries as $entry) { $this->processEntry($entry, $source, $result); } } return $result; } /** * Process a single entry: analyze, save opportunity, save organization. */ private function processEntry(array $entry, array $source, array &$result): void { // Skip if already exists if ($this->rssParser->entryExists($entry['url'])) { return; } // AI Analysis $analysis = $this->aiAnalyzer->analyze($entry['title'], $entry['description']); // Extract organization if any $orgId = null; if (!empty($analysis['organization_name'])) { $orgId = $this->rssParser->organizationExists($analysis['organization_name']); } // If no org found and AI suggests one, try to extract more details if (!$orgId && !empty($analysis['organization_name'])) { $orgData = $this->aiAnalyzer->extractOrganization($entry['title'] . ' ' . $entry['description']); if (!empty($orgData['name'])) { $orgId = $this->createOrganization($orgData); if ($orgId) { $result['organizations']++; } } } // Create opportunity $this->createOpportunity($entry, $analysis, $orgId, $source); $result['opportunities']++; } /** * Create an organization record. */ private function createOrganization(array $data): ?int { try { $stmt = $this->pdo->prepare( "INSERT INTO organizations (name, description, type, country, website_url, crm_status) VALUES (?, ?, ?, ?, ?, 'New')" ); $stmt->execute([ $data['name'], $data['description'] ?? '', $data['type'] ?? 'partner', $data['country'] ?? null, $data['website'] ?? null, ]); return (int)$this->pdo->lastInsertId(); } catch (Throwable $e) { return null; } } /** * Create an opportunity record. */ private function createOpportunity(array $entry, array $analysis, ?int $orgId, array $source): void { try { $score = min(100, max(0, $analysis['score'] ?? 10)); $stmt = $this->pdo->prepare( "INSERT INTO opportunities (title, description, type, organization_id, url, status, score, raw_data) VALUES (?, ?, ?, ?, ?, 'active', ?, ?)" ); $stmt->execute([ $entry['title'], $analysis['summary'] ?? $entry['description'], $analysis['opportunity_type'] ?? $analysis['type'] ?? 'other', $orgId, $entry['url'], $score, json_encode([ 'source_id' => $source['id'] ?? null, 'source_name' => $source['name'] ?? '', 'published_at' => $entry['published_at'], 'categories' => $entry['categories'] ?? [], 'analysis' => $analysis, ]), ]); $opportunityId = (int)$this->pdo->lastInsertId(); // Save tags if (!empty($analysis['tags'])) { foreach ($analysis['tags'] as $tagName) { $tagId = $this->getOrCreateTag($tagName); if ($tagId) { $stmt = $this->pdo->prepare( "INSERT IGNORE INTO opportunity_tags (opportunity_id, tag_id) VALUES (?, ?)" ); $stmt->execute([$opportunityId, $tagId]); } } } } catch (Throwable $e) { // Log but don't fail } } /** * Get or create a tag. */ private function getOrCreateTag(string $name): ?int { $slug = strtolower(preg_replace('/[^a-z0-9]+/', '-', $name)); $slug = trim($slug, '-'); $stmt = $this->pdo->prepare("SELECT id FROM tags WHERE slug = ?"); $stmt->execute([$slug]); $id = $stmt->fetchColumn(); if ($id) { return (int)$id; } try { $stmt = $this->pdo->prepare("INSERT INTO tags (name, slug) VALUES (?, ?)"); $stmt->execute([$name, $slug]); return (int)$this->pdo->lastInsertId(); } catch (Throwable $e) { return null; } } /** * Get all active sources. */ public function getActiveSources(): array { $stmt = $this->pdo->query( "SELECT s.*, GROUP_CONCAT(sc.category) as categories FROM sources s LEFT JOIN source_categories sc ON sc.source_id = s.id WHERE s.status = 'active' GROUP BY s.id" ); return $stmt->fetchAll() ?: []; } }