""" FILE: app/services/discovery.py DESCRIPTION: Service für WP-11. Analysiert Texte, findet Entitäten und schlägt typisierte Verbindungen vor ("Matrix-Logic"). VERSION: 0.6.0 STATUS: Active DEPENDENCIES: app.core.qdrant, app.models.dto, app.core.retriever EXTERNAL_CONFIG: config/types.yaml LAST_ANALYSIS: 2025-12-15 """ import logging import asyncio import os from typing import List, Dict, Any, Optional, Set import yaml from app.core.qdrant import QdrantConfig, get_client from app.models.dto import QueryRequest from app.core.retriever import hybrid_retrieve logger = logging.getLogger(__name__) class DiscoveryService: def __init__(self, collection_prefix: str = None): self.cfg = QdrantConfig.from_env() self.prefix = collection_prefix or self.cfg.prefix or "mindnet" self.client = get_client(self.cfg) self.registry = self._load_type_registry() async def analyze_draft(self, text: str, current_type: str) -> Dict[str, Any]: """ Analysiert den Text und liefert Vorschläge mit kontext-sensitiven Kanten-Typen. """ suggestions = [] # Fallback, falls keine spezielle Regel greift default_edge_type = self._get_default_edge_type(current_type) # Tracking-Sets für Deduplizierung (Wir merken uns NOTE-IDs) seen_target_note_ids = set() # --------------------------------------------------------- # 1. Exact Match: Titel/Aliases # --------------------------------------------------------- # Holt Titel, Aliases UND Typen aus dem Index known_entities = self._fetch_all_titles_and_aliases() found_entities = self._find_entities_in_text(text, known_entities) for entity in found_entities: if entity["id"] in seen_target_note_ids: continue seen_target_note_ids.add(entity["id"]) # INTELLIGENTE KANTEN-LOGIK (MATRIX) target_type = entity.get("type", "concept") smart_edge = self._resolve_edge_type(current_type, target_type) suggestions.append({ "type": "exact_match", "text_found": entity["match"], "target_title": entity["title"], "target_id": entity["id"], "suggested_edge_type": smart_edge, "suggested_markdown": f"[[rel:{smart_edge} {entity['title']}]]", "confidence": 1.0, "reason": f"Exakter Treffer: '{entity['match']}' ({target_type})" }) # --------------------------------------------------------- # 2. Semantic Match: Sliding Window & Footer Focus # --------------------------------------------------------- search_queries = self._generate_search_queries(text) # Async parallel abfragen tasks = [self._get_semantic_suggestions_async(q) for q in search_queries] results_list = await asyncio.gather(*tasks) # Ergebnisse verarbeiten for hits in results_list: for hit in hits: note_id = hit.payload.get("note_id") if not note_id: continue # Deduplizierung (Notiz-Ebene) if note_id in seen_target_note_ids: continue # Score Check (Threshold 0.50 für nomic-embed-text) if hit.total_score > 0.50: seen_target_note_ids.add(note_id) target_title = hit.payload.get("title") or "Unbekannt" # INTELLIGENTE KANTEN-LOGIK (MATRIX) # Den Typ der gefundenen Notiz aus dem Payload lesen target_type = hit.payload.get("type", "concept") smart_edge = self._resolve_edge_type(current_type, target_type) suggestions.append({ "type": "semantic_match", "text_found": (hit.source.get("text") or "")[:60] + "...", "target_title": target_title, "target_id": note_id, "suggested_edge_type": smart_edge, "suggested_markdown": f"[[rel:{smart_edge} {target_title}]]", "confidence": round(hit.total_score, 2), "reason": f"Semantisch ähnlich zu {target_type} ({hit.total_score:.2f})" }) # Sortieren nach Confidence suggestions.sort(key=lambda x: x["confidence"], reverse=True) return { "draft_length": len(text), "analyzed_windows": len(search_queries), "suggestions_count": len(suggestions), "suggestions": suggestions[:10] } # --------------------------------------------------------- # Core Logic: Die Matrix # --------------------------------------------------------- def _resolve_edge_type(self, source_type: str, target_type: str) -> str: """ Entscheidungsmatrix für komplexe Verbindungen. Definiert, wie Typ A auf Typ B verlinken sollte. """ st = source_type.lower() tt = target_type.lower() # Regeln für 'experience' (Erfahrungen) if st == "experience": if tt == "value": return "based_on" if tt == "principle": return "derived_from" if tt == "trip": return "part_of" if tt == "lesson": return "learned" if tt == "project": return "related_to" # oder belongs_to # Regeln für 'project' if st == "project": if tt == "decision": return "depends_on" if tt == "concept": return "uses" if tt == "person": return "managed_by" # Regeln für 'decision' (ADR) if st == "decision": if tt == "principle": return "compliant_with" if tt == "requirement": return "addresses" # Fallback: Standard aus der types.yaml für den Source-Typ return self._get_default_edge_type(st) # --------------------------------------------------------- # Sliding Windows # --------------------------------------------------------- def _generate_search_queries(self, text: str) -> List[str]: """ Erzeugt intelligente Fenster + Footer Scan. """ text_len = len(text) if not text: return [] queries = [] # 1. Start / Gesamtkontext queries.append(text[:600]) # 2. Footer-Scan (Wichtig für "Projekt"-Referenzen am Ende) if text_len > 150: footer = text[-250:] if footer not in queries: queries.append(footer) # 3. Sliding Window für lange Texte if text_len > 800: window_size = 500 step = 1500 for i in range(window_size, text_len - window_size, step): end_pos = min(i + window_size, text_len) chunk = text[i:end_pos] if len(chunk) > 100: queries.append(chunk) return queries # --------------------------------------------------------- # Standard Helpers # --------------------------------------------------------- async def _get_semantic_suggestions_async(self, text: str): req = QueryRequest(query=text, top_k=5, explain=False) try: res = hybrid_retrieve(req) return res.results except Exception as e: logger.error(f"Semantic suggestion error: {e}") return [] def _load_type_registry(self) -> dict: path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml") if not os.path.exists(path): if os.path.exists("types.yaml"): path = "types.yaml" else: return {} try: with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} except Exception: return {} def _get_default_edge_type(self, note_type: str) -> str: types_cfg = self.registry.get("types", {}) type_def = types_cfg.get(note_type, {}) defaults = type_def.get("edge_defaults") return defaults[0] if defaults else "related_to" def _fetch_all_titles_and_aliases(self) -> List[Dict]: notes = [] next_page = None col = f"{self.prefix}_notes" try: while True: res, next_page = self.client.scroll( collection_name=col, limit=1000, offset=next_page, with_payload=True, with_vectors=False ) for point in res: pl = point.payload or {} aliases = pl.get("aliases") or [] if isinstance(aliases, str): aliases = [aliases] notes.append({ "id": pl.get("note_id"), "title": pl.get("title"), "aliases": aliases, "type": pl.get("type", "concept") # WICHTIG: Typ laden für Matrix }) if next_page is None: break except Exception: pass return notes def _find_entities_in_text(self, text: str, entities: List[Dict]) -> List[Dict]: found = [] text_lower = text.lower() for entity in entities: # Title Check title = entity.get("title") if title and title.lower() in text_lower: found.append({"match": title, "title": title, "id": entity["id"], "type": entity["type"]}) continue # Alias Check for alias in entity.get("aliases", []): if str(alias).lower() in text_lower: found.append({"match": alias, "title": title, "id": entity["id"], "type": entity["type"]}) break return found