""" FILE: app/services/discovery.py DESCRIPTION: Service für WP-11 (Discovery API). Analysiert Entwürfe, findet Entitäten und schlägt typisierte Verbindungen basierend auf der Topologie vor. WP-24c: Vollständige Umstellung auf EdgeRegistry für dynamische Vorschläge. WP-15b: Unterstützung für hybride Suche und Alias-Erkennung. VERSION: 1.1.0 (WP-24c: Full Registry Integration & Audit Fix) STATUS: Active COMPATIBILITY: 100% (Identische API-Signatur wie v0.6.0) """ import logging import asyncio import os from typing import List, Dict, Any, Optional, Set import yaml from app.core.database.qdrant import QdrantConfig, get_client from app.models.dto import QueryRequest from app.core.retrieval.retriever import hybrid_retrieve # WP-24c: Zentrale Topologie-Quelle from app.services.edge_registry import registry as edge_registry logger = logging.getLogger(__name__) class DiscoveryService: def __init__(self, collection_prefix: str = None): """Initialisiert den Discovery Service mit Qdrant-Anbindung.""" self.cfg = QdrantConfig.from_env() self.prefix = collection_prefix or self.cfg.prefix or "mindnet" self.client = get_client(self.cfg) # Die Registry wird für Typ-Metadaten geladen (Schema-Validierung) self.registry = self._load_type_registry() async def analyze_draft(self, text: str, current_type: str) -> Dict[str, Any]: """ Analysiert einen Textentwurf auf potenzielle Verbindungen. 1. Findet exakte Treffer (Titel/Aliasse). 2. Führt semantische Suchen für verschiedene Textabschnitte aus. 3. Schlägt topologisch korrekte Kanten-Typen vor. """ if not text or len(text.strip()) < 3: return {"suggestions": [], "status": "empty_input"} suggestions = [] seen_target_ids = set() # --- PHASE 1: EXACT MATCHES (TITEL & ALIASSE) --- # Lädt alle bekannten Titel/Aliasse für einen schnellen Scan known_entities = self._fetch_all_titles_and_aliases() exact_matches = self._find_entities_in_text(text, known_entities) for entity in exact_matches: target_id = entity["id"] if target_id in seen_target_ids: continue seen_target_ids.add(target_id) target_type = entity.get("type", "concept") # WP-24c: Dynamische Kanten-Ermittlung statt Hardcoded Matrix suggested_kind = self._resolve_edge_type(current_type, target_type) suggestions.append({ "type": "exact_match", "text_found": entity["match"], "target_title": entity["title"], "target_id": target_id, "suggested_edge_type": suggested_kind, "suggested_markdown": f"[[rel:{suggest_kind} {entity['title']}]]", "confidence": 1.0, "reason": f"Direkte Erwähnung von '{entity['match']}' ({target_type})" }) # --- PHASE 2: SEMANTIC MATCHES (VECTOR SEARCH) --- # Erzeugt Suchanfragen für verschiedene Fenster des Textes search_queries = self._generate_search_queries(text) # Parallele Ausführung der Suchanfragen (Cloud-Performance) tasks = [self._get_semantic_suggestions_async(q) for q in search_queries] results_list = await asyncio.gather(*tasks) for hits in results_list: for hit in hits: payload = hit.payload or {} target_id = payload.get("note_id") if not target_id or target_id in seen_target_ids: continue # Relevanz-Threshold (Modell-spezifisch für nomic) if hit.total_score > 0.55: seen_target_ids.add(target_id) target_type = payload.get("type", "concept") target_title = payload.get("title") or "Unbenannt" # WP-24c: Nutzung der Topologie-Engine suggested_kind = self._resolve_edge_type(current_type, target_type) suggestions.append({ "type": "semantic_match", "text_found": (hit.source.get("text") or "")[:80] + "...", "target_title": target_title, "target_id": target_id, "suggested_edge_type": suggested_kind, "suggested_markdown": f"[[rel:{suggested_kind} {target_title}]]", "confidence": round(hit.total_score, 2), "reason": f"Semantischer Bezug zu {target_type} ({int(hit.total_score*100)}%)" }) # Sortierung nach Konfidenz suggestions.sort(key=lambda x: x["confidence"], reverse=True) return { "draft_length": len(text), "analyzed_windows": len(search_queries), "suggestions_count": len(suggestions), "suggestions": suggestions[:12] # Top 12 Vorschläge } # --- LOGIK-ZENTRALE (WP-24c) --- def _resolve_edge_type(self, source_type: str, target_type: str) -> str: """ Ermittelt den optimalen Kanten-Typ zwischen zwei Notiz-Typen. Nutzt EdgeRegistry (graph_schema.md) statt lokaler Matrix. """ # 1. Spezifische Prüfung: Gibt es eine Regel für Source -> Target? info = edge_registry.get_topology_info(source_type, target_type) typical = info.get("typical", []) if typical: return typical[0] # Erster Vorschlag aus dem Schema # 2. Fallback: Was ist für den Quell-Typ generell typisch? (Source -> any) info_fallback = edge_registry.get_topology_info(source_type, "any") typical_fallback = info_fallback.get("typical", []) if typical_fallback: return typical_fallback[0] # 3. Globaler Fallback (Sicherheitsnetz) return "related_to" # --- HELPERS (VOLLSTÄNDIG ERHALTEN) --- def _generate_search_queries(self, text: str) -> List[str]: """Erzeugt überlappende Fenster für die Vektorsuche (Sliding Window).""" text_len = len(text) queries = [] # Fokus A: Dokument-Anfang (Kontext) queries.append(text[:600]) # Fokus B: Dokument-Ende (Aktueller Schreibfokus) if text_len > 250: footer = text[-350:] if footer not in queries: queries.append(footer) # Fokus C: Zwischenabschnitte bei langen Texten if text_len > 1200: window_size = 500 step = 1200 for i in range(600, text_len - 400, step): chunk = text[i:i+window_size] if len(chunk) > 100: queries.append(chunk) return queries async def _get_semantic_suggestions_async(self, text: str): """Führt eine asynchrone Vektorsuche über den Retriever aus.""" req = QueryRequest(query=text, top_k=6, explain=False) try: # Nutzt hybrid_retrieve (WP-15b Standard) res = hybrid_retrieve(req) return res.results except Exception as e: logger.error(f"Discovery retrieval error: {e}") return [] def _load_type_registry(self) -> dict: """Lädt die types.yaml für Typ-Definitionen.""" path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml") if not os.path.exists(path): return {} try: with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} except Exception: return {} def _fetch_all_titles_and_aliases(self) -> List[Dict]: """Holt alle Note-IDs, Titel und Aliasse für den Exakt-Match Abgleich.""" entities = [] next_page = None col = f"{self.prefix}_notes" try: while True: res, next_page = self.client.scroll( collection_name=col, limit=1000, offset=next_page, with_payload=True, with_vectors=False ) for point in res: pl = point.payload or {} aliases = pl.get("aliases") or [] if isinstance(aliases, str): aliases = [aliases] entities.append({ "id": pl.get("note_id"), "title": pl.get("title"), "aliases": aliases, "type": pl.get("type", "concept") }) if next_page is None: break except Exception as e: logger.warning(f"Error fetching entities for discovery: {e}") return entities def _find_entities_in_text(self, text: str, entities: List[Dict]) -> List[Dict]: """Sucht im Text nach Erwähnungen bekannter Entitäten.""" found = [] text_lower = text.lower() for entity in entities: title = entity.get("title") # Titel-Check if title and title.lower() in text_lower: found.append({ "match": title, "title": title, "id": entity["id"], "type": entity["type"] }) continue # Alias-Check for alias in entity.get("aliases", []): if str(alias).lower() in text_lower: found.append({ "match": str(alias), "title": title, "id": entity["id"], "type": entity["type"] }) break return found