diff --git a/app/services/discovery.py b/app/services/discovery.py index adbafc8..74cd57e 100644 --- a/app/services/discovery.py +++ b/app/services/discovery.py @@ -1,72 +1,94 @@ """ app/services/discovery.py -Updated for WP-11: Sliding Window Analysis. +Service für Link-Vorschläge und Knowledge-Discovery (WP-11). +Implementiert Sliding Window für lange Texte und Late Binding für Edge-Typen. """ import logging import asyncio -from typing import List, Dict, Any +import os # <--- Added missing import +from typing import List, Dict, Any, Optional import yaml from app.core.qdrant import QdrantConfig, get_client from app.models.dto import QueryRequest +# Hinweis: hybrid_retrieve ist aktuell synchron. In einer reinen Async-Welt +# würde man dies refactorn, aber hier wrappen wir es. from app.core.retriever import hybrid_retrieve logger = logging.getLogger(__name__) class DiscoveryService: def __init__(self, collection_prefix: str = None): + # 1. Config laden self.cfg = QdrantConfig.from_env() + # Prefix Priorität: Argument > Env > Default self.prefix = collection_prefix or self.cfg.prefix or "mindnet" self.client = get_client(self.cfg) + + # 2. Registry für Late Binding laden (Edge Defaults) self.registry = self._load_type_registry() async def analyze_draft(self, text: str, current_type: str) -> Dict[str, Any]: """ - Analysiert den Draft mit Sliding Window Strategie. + Analysiert einen Draft-Text und schlägt Verlinkungen vor. + Nutzt Sliding Window für Semantik und Full-Text Scan für Entity Recognition. """ suggestions = [] + + # Default Edge Typ aus Config (z.B. 'depends_on' für Projekte) default_edge_type = self._get_default_edge_type(current_type) - # 1. Exact Match (Läuft über gesamten Text, schnell genug) + # --------------------------------------------------------- + # 1. Exact Match: Finde Titel/Aliases im Text + # --------------------------------------------------------- known_entities = self._fetch_all_titles_and_aliases() found_entities = self._find_entities_in_text(text, known_entities) existing_target_ids = set() + for entity in found_entities: existing_target_ids.add(entity["id"]) + target_title = entity["title"] + suggested_md = f"[[rel:{default_edge_type} {target_title}]]" + suggestions.append({ "type": "exact_match", "text_found": entity["match"], - "target_title": entity["title"], + "target_title": target_title, "target_id": entity["id"], "suggested_edge_type": default_edge_type, - "suggested_markdown": f"[[rel:{default_edge_type} {entity['title']}]]", + "suggested_markdown": suggested_md, "confidence": 1.0, - "reason": f"Exakter Treffer ({entity['match']})" + "reason": f"Exakter Treffer: '{entity['match']}'" }) - # 2. Semantic Match (Sliding Window) - # Wir zerlegen den Text in relevante Chunks, um Token-Limits zu umgehen - # und Fokus zu streuen. + # --------------------------------------------------------- + # 2. Semantic Match: Sliding Window Analyse + # --------------------------------------------------------- + # Zerlege Text in sinnvolle Abschnitte für das Embedding search_queries = self._generate_search_queries(text) - # Parallel Execution für alle Queries + # Parallel alle Abschnitte suchen tasks = [self._get_semantic_suggestions_async(q) for q in search_queries] results_list = await asyncio.gather(*tasks) - # Ergebnisse mergen und deduplizieren + # Ergebnisse zusammenführen seen_semantic_ids = set() for hits in results_list: for hit in hits: + # Duplikate filtern (schon als Exact Match oder schon als anderer Semantic Hit) if hit.node_id in existing_target_ids or hit.node_id in seen_semantic_ids: continue - # Threshold Tuning: Bei 'nomic' sind Scores oft niedriger (0.4-0.6 ist schon gut) - # Wir setzen ihn moderat auf 0.50 + # Schwellwert: Mit 'nomic-embed-text' sind Scores oft schärfer. + # 0.50 ist ein guter Startwert für semantische Nähe. if hit.total_score > 0.50: seen_semantic_ids.add(hit.node_id) + + # Titel aus Payload holen (wurde in chunk_payload.py gefixt) target_title = hit.payload.get("title") or hit.node_id + suggested_md = f"[[rel:{default_edge_type} {target_title}]]" suggestions.append({ "type": "semantic_match", @@ -74,99 +96,117 @@ class DiscoveryService: "target_title": target_title, "target_id": hit.node_id, "suggested_edge_type": default_edge_type, - "suggested_markdown": f"[[rel:{default_edge_type} {target_title}]]", + "suggested_markdown": suggested_md, "confidence": round(hit.total_score, 2), "reason": f"Semantisch ähnlich ({hit.total_score:.2f})" }) - # Sortieren nach Confidence + # Sortieren nach Confidence (Höchste zuerst) suggestions.sort(key=lambda x: x["confidence"], reverse=True) return { "draft_length": len(text), + "analyzed_windows": len(search_queries), "suggestions_count": len(suggestions), - "suggestions": suggestions[:10] # Limit auf Top 10 + "suggestions": suggestions[:10] # Top 10 reichen } - # --- Helpers --- + # --- Interne Helfer --- def _generate_search_queries(self, text: str) -> List[str]: - """ - Zerlegt den Text in bis zu 3 Such-Queries: - 1. Der Anfang (Kontext/Einleitung) - 2. Die Mitte (Details) - 3. Das Ende (Fazit/Zusammenfassung) - """ - if len(text) < 600: - return [text] - + """Erzeugt Sliding Windows über den Text.""" + if not text: return [] + if len(text) < 600: return [text] + queries = [] - # Query 1: Die ersten 400 Zeichen - queries.append(text[:400]) + # 1. Anfang (Kontext) + queries.append(text[:500]) - # Query 2: Ein Fenster aus der Mitte + # 2. Mitte mid = len(text) // 2 - queries.append(text[mid-200 : mid+200]) + queries.append(text[mid-250 : mid+250]) - # Query 3: Die letzten 400 Zeichen + # 3. Ende (Fazit) if len(text) > 800: - queries.append(text[-400:]) + queries.append(text[-500:]) return queries async def _get_semantic_suggestions_async(self, text: str): - # Nutzt hybrid_retrieve (sync), aber hier in Async Context okay + """Wrapper um den Retriever (sync).""" req = QueryRequest(query=text, top_k=5, explain=False) try: + # Hier blockieren wir kurz den Loop, da hybrid_retrieve sync ist. + # In High-Load Szenarien müsste das in einen ThreadPoolExecutor. res = hybrid_retrieve(req) return res.results - except Exception: return [] + except Exception as e: + logger.error(f"Semantic suggestion error: {e}") + return [] - # ... (Restliche Methoden wie _fetch_all_titles_and_aliases bleiben gleich) ... - # Füge hier die Methoden aus dem vorherigen Artefakt ein (fetch_all..., find_entities..., load_type...) - # Der Kürze halber lasse ich sie im Snippet weg, da sie unverändert sind. - def _load_type_registry(self) -> dict: path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml") if not os.path.exists(path): if os.path.exists("types.yaml"): path = "types.yaml" else: return {} try: - with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} - except Exception: return {} + with open(path, "r", encoding="utf-8") as f: + return yaml.safe_load(f) or {} + except Exception: + return {} def _get_default_edge_type(self, note_type: str) -> str: types_cfg = self.registry.get("types", {}) type_def = types_cfg.get(note_type, {}) defaults = type_def.get("edge_defaults") - return defaults[0] if defaults else "related_to" + if defaults and isinstance(defaults, list) and len(defaults) > 0: + return defaults[0] + return "related_to" def _fetch_all_titles_and_aliases(self) -> List[Dict]: notes = [] next_page = None - col = f"{self.prefix}_notes" + col_name = f"{self.prefix}_notes" + try: while True: - res, next_page = self.client.scroll(collection_name=col, limit=1000, offset=next_page, with_payload=True, with_vectors=False) + res, next_page = self.client.scroll( + collection_name=col_name, + limit=1000, + offset=next_page, + with_payload=True, + with_vectors=False + ) for point in res: pl = point.payload or {} aliases = pl.get("aliases") or [] if isinstance(aliases, str): aliases = [aliases] - notes.append({"id": pl.get("note_id"), "title": pl.get("title"), "aliases": aliases}) + + notes.append({ + "id": pl.get("note_id"), + "title": pl.get("title"), + "aliases": aliases + }) + if next_page is None: break - except Exception: pass + except Exception as e: + logger.error(f"Error fetching titles: {e}") + return [] return notes def _find_entities_in_text(self, text: str, entities: List[Dict]) -> List[Dict]: found = [] text_lower = text.lower() for entity in entities: + # Title title = entity.get("title") if title and title.lower() in text_lower: found.append({"match": title, "title": title, "id": entity["id"]}) continue - for alias in entity.get("aliases", []): - if str(alias).lower() in text_lower: + # Aliases + aliases = entity.get("aliases", []) + for alias in aliases: + if alias and str(alias).lower() in text_lower: found.append({"match": alias, "title": title, "id": entity["id"]}) break return found \ No newline at end of file