From a1a58727fd9595026f5fa921bc1d89d2371ef042 Mon Sep 17 00:00:00 2001 From: Lars Date: Thu, 11 Dec 2025 14:46:56 +0100 Subject: [PATCH] discovery opt - deduplicate, last 300 Zeichen --- app/services/discovery.py | 159 +++++++++++++++++--------------------- 1 file changed, 73 insertions(+), 86 deletions(-) diff --git a/app/services/discovery.py b/app/services/discovery.py index 74cd57e..40f731c 100644 --- a/app/services/discovery.py +++ b/app/services/discovery.py @@ -1,143 +1,152 @@ """ app/services/discovery.py Service für Link-Vorschläge und Knowledge-Discovery (WP-11). -Implementiert Sliding Window für lange Texte und Late Binding für Edge-Typen. +Optimiert: Deduplizierung pro Notiz & Footer-Fokus für kurze Texte. """ import logging import asyncio -import os # <--- Added missing import -from typing import List, Dict, Any, Optional +import os +from typing import List, Dict, Any, Optional, Set import yaml from app.core.qdrant import QdrantConfig, get_client from app.models.dto import QueryRequest -# Hinweis: hybrid_retrieve ist aktuell synchron. In einer reinen Async-Welt -# würde man dies refactorn, aber hier wrappen wir es. from app.core.retriever import hybrid_retrieve logger = logging.getLogger(__name__) class DiscoveryService: def __init__(self, collection_prefix: str = None): - # 1. Config laden self.cfg = QdrantConfig.from_env() - # Prefix Priorität: Argument > Env > Default self.prefix = collection_prefix or self.cfg.prefix or "mindnet" self.client = get_client(self.cfg) - - # 2. Registry für Late Binding laden (Edge Defaults) self.registry = self._load_type_registry() async def analyze_draft(self, text: str, current_type: str) -> Dict[str, Any]: - """ - Analysiert einen Draft-Text und schlägt Verlinkungen vor. - Nutzt Sliding Window für Semantik und Full-Text Scan für Entity Recognition. - """ suggestions = [] - - # Default Edge Typ aus Config (z.B. 'depends_on' für Projekte) default_edge_type = self._get_default_edge_type(current_type) + # Tracking-Sets für Deduplizierung (Wir merken uns NOTE-IDs, nicht Chunk-IDs) + seen_target_note_ids = set() + # --------------------------------------------------------- - # 1. Exact Match: Finde Titel/Aliases im Text + # 1. Exact Match: Titel/Aliases # --------------------------------------------------------- known_entities = self._fetch_all_titles_and_aliases() found_entities = self._find_entities_in_text(text, known_entities) - existing_target_ids = set() - for entity in found_entities: - existing_target_ids.add(entity["id"]) - target_title = entity["title"] - suggested_md = f"[[rel:{default_edge_type} {target_title}]]" - + # Duplikate vermeiden + if entity["id"] in seen_target_note_ids: + continue + seen_target_note_ids.add(entity["id"]) + suggestions.append({ "type": "exact_match", "text_found": entity["match"], - "target_title": target_title, + "target_title": entity["title"], "target_id": entity["id"], "suggested_edge_type": default_edge_type, - "suggested_markdown": suggested_md, + "suggested_markdown": f"[[rel:{default_edge_type} {entity['title']}]]", "confidence": 1.0, "reason": f"Exakter Treffer: '{entity['match']}'" }) # --------------------------------------------------------- - # 2. Semantic Match: Sliding Window Analyse + # 2. Semantic Match: Sliding Window & Footer Focus # --------------------------------------------------------- - # Zerlege Text in sinnvolle Abschnitte für das Embedding search_queries = self._generate_search_queries(text) - # Parallel alle Abschnitte suchen + # Async parallel abfragen tasks = [self._get_semantic_suggestions_async(q) for q in search_queries] results_list = await asyncio.gather(*tasks) - # Ergebnisse zusammenführen - seen_semantic_ids = set() - + # Ergebnisse verarbeiten for hits in results_list: for hit in hits: - # Duplikate filtern (schon als Exact Match oder schon als anderer Semantic Hit) - if hit.node_id in existing_target_ids or hit.node_id in seen_semantic_ids: + # WICHTIG: Note ID aus Payload holen (Chunk ID ist hit.node_id) + note_id = hit.payload.get("note_id") + + # Fallback, falls Payload leer (sollte nicht passieren) + if not note_id: + continue + + # 1. Check: Haben wir diese NOTIZ schon? (Egal welcher Chunk) + if note_id in seen_target_note_ids: continue - # Schwellwert: Mit 'nomic-embed-text' sind Scores oft schärfer. - # 0.50 ist ein guter Startwert für semantische Nähe. + # 2. Score Check (Threshold) if hit.total_score > 0.50: - seen_semantic_ids.add(hit.node_id) + seen_target_note_ids.add(note_id) # Blockiere weitere Chunks dieser Notiz - # Titel aus Payload holen (wurde in chunk_payload.py gefixt) - target_title = hit.payload.get("title") or hit.node_id + target_title = hit.payload.get("title") or "Unbekannt" suggested_md = f"[[rel:{default_edge_type} {target_title}]]" suggestions.append({ "type": "semantic_match", "text_found": (hit.source.get("text") or "")[:60] + "...", "target_title": target_title, - "target_id": hit.node_id, + "target_id": note_id, # Wir verlinken auf die Notiz, nicht den Chunk "suggested_edge_type": default_edge_type, "suggested_markdown": suggested_md, "confidence": round(hit.total_score, 2), "reason": f"Semantisch ähnlich ({hit.total_score:.2f})" }) - # Sortieren nach Confidence (Höchste zuerst) + # Sortieren nach Confidence suggestions.sort(key=lambda x: x["confidence"], reverse=True) return { "draft_length": len(text), "analyzed_windows": len(search_queries), "suggestions_count": len(suggestions), - "suggestions": suggestions[:10] # Top 10 reichen + "suggestions": suggestions[:10] } - # --- Interne Helfer --- + # --- Optimierte Sliding Windows --- def _generate_search_queries(self, text: str) -> List[str]: - """Erzeugt Sliding Windows über den Text.""" + """ + Erzeugt intelligente Fenster. + Besonderheit: Erzwingt 'Footer-Scan' auch bei kurzen Texten, + damit "Referenzen am Ende" nicht im Kontext untergehen. + """ + text_len = len(text) if not text: return [] - if len(text) < 600: return [text] queries = [] - # 1. Anfang (Kontext) - queries.append(text[:500]) - # 2. Mitte - mid = len(text) // 2 - queries.append(text[mid-250 : mid+250]) + # A) Der gesamte Text (oder Anfang) für den groben Kontext + # Bei sehr kurzen Texten ist das alles. + queries.append(text[:600]) - # 3. Ende (Fazit) - if len(text) > 800: - queries.append(text[-500:]) - + # B) Der "Footer-Scan" (Das Ende) + # Wenn der Text > 150 Zeichen ist, nehmen wir die letzten 200 Zeichen separat. + # Grund: Oft steht dort "Gehört zu Projekt X". + # Wenn wir das isolieren, ist der Vektor "Projekt X" sehr rein. + if text_len > 150: + footer = text[-250:] + # Nur hinzufügen, wenn es sich signifikant vom Start unterscheidet + if footer not in queries: + queries.append(footer) + + # C) Sliding Window für lange Texte (> 800 Chars) + if text_len > 800: + window_size = 500 + step = 1500 + for i in range(window_size, text_len - window_size, step): + end_pos = min(i + window_size, text_len) + chunk = text[i:end_pos] + if len(chunk) > 100: + queries.append(chunk) + return queries + # --- Standard Helper (Unverändert) --- + async def _get_semantic_suggestions_async(self, text: str): - """Wrapper um den Retriever (sync).""" req = QueryRequest(query=text, top_k=5, explain=False) try: - # Hier blockieren wir kurz den Loop, da hybrid_retrieve sync ist. - # In High-Load Szenarien müsste das in einen ThreadPoolExecutor. res = hybrid_retrieve(req) return res.results except Exception as e: @@ -150,63 +159,41 @@ class DiscoveryService: if os.path.exists("types.yaml"): path = "types.yaml" else: return {} try: - with open(path, "r", encoding="utf-8") as f: - return yaml.safe_load(f) or {} - except Exception: - return {} + with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} + except Exception: return {} def _get_default_edge_type(self, note_type: str) -> str: types_cfg = self.registry.get("types", {}) type_def = types_cfg.get(note_type, {}) defaults = type_def.get("edge_defaults") - if defaults and isinstance(defaults, list) and len(defaults) > 0: - return defaults[0] - return "related_to" + return defaults[0] if defaults else "related_to" def _fetch_all_titles_and_aliases(self) -> List[Dict]: notes = [] next_page = None - col_name = f"{self.prefix}_notes" - + col = f"{self.prefix}_notes" try: while True: - res, next_page = self.client.scroll( - collection_name=col_name, - limit=1000, - offset=next_page, - with_payload=True, - with_vectors=False - ) + res, next_page = self.client.scroll(collection_name=col, limit=1000, offset=next_page, with_payload=True, with_vectors=False) for point in res: pl = point.payload or {} aliases = pl.get("aliases") or [] if isinstance(aliases, str): aliases = [aliases] - - notes.append({ - "id": pl.get("note_id"), - "title": pl.get("title"), - "aliases": aliases - }) - + notes.append({"id": pl.get("note_id"), "title": pl.get("title"), "aliases": aliases}) if next_page is None: break - except Exception as e: - logger.error(f"Error fetching titles: {e}") - return [] + except Exception: pass return notes def _find_entities_in_text(self, text: str, entities: List[Dict]) -> List[Dict]: found = [] text_lower = text.lower() for entity in entities: - # Title title = entity.get("title") if title and title.lower() in text_lower: found.append({"match": title, "title": title, "id": entity["id"]}) continue - # Aliases - aliases = entity.get("aliases", []) - for alias in aliases: - if alias and str(alias).lower() in text_lower: + for alias in entity.get("aliases", []): + if str(alias).lower() in text_lower: found.append({"match": alias, "title": title, "id": entity["id"]}) break return found \ No newline at end of file