discovery opt - deduplicate, last 300 Zeichen

This commit is contained in:
Lars 2025-12-11 14:46:56 +01:00
parent b1cf89982b
commit a1a58727fd

View File

@ -1,143 +1,152 @@
""" """
app/services/discovery.py app/services/discovery.py
Service für Link-Vorschläge und Knowledge-Discovery (WP-11). Service für Link-Vorschläge und Knowledge-Discovery (WP-11).
Implementiert Sliding Window für lange Texte und Late Binding für Edge-Typen. Optimiert: Deduplizierung pro Notiz & Footer-Fokus für kurze Texte.
""" """
import logging import logging
import asyncio import asyncio
import os # <--- Added missing import import os
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional, Set
import yaml import yaml
from app.core.qdrant import QdrantConfig, get_client from app.core.qdrant import QdrantConfig, get_client
from app.models.dto import QueryRequest from app.models.dto import QueryRequest
# Hinweis: hybrid_retrieve ist aktuell synchron. In einer reinen Async-Welt
# würde man dies refactorn, aber hier wrappen wir es.
from app.core.retriever import hybrid_retrieve from app.core.retriever import hybrid_retrieve
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class DiscoveryService: class DiscoveryService:
def __init__(self, collection_prefix: str = None): def __init__(self, collection_prefix: str = None):
# 1. Config laden
self.cfg = QdrantConfig.from_env() self.cfg = QdrantConfig.from_env()
# Prefix Priorität: Argument > Env > Default
self.prefix = collection_prefix or self.cfg.prefix or "mindnet" self.prefix = collection_prefix or self.cfg.prefix or "mindnet"
self.client = get_client(self.cfg) self.client = get_client(self.cfg)
# 2. Registry für Late Binding laden (Edge Defaults)
self.registry = self._load_type_registry() self.registry = self._load_type_registry()
async def analyze_draft(self, text: str, current_type: str) -> Dict[str, Any]: async def analyze_draft(self, text: str, current_type: str) -> Dict[str, Any]:
"""
Analysiert einen Draft-Text und schlägt Verlinkungen vor.
Nutzt Sliding Window für Semantik und Full-Text Scan für Entity Recognition.
"""
suggestions = [] suggestions = []
# Default Edge Typ aus Config (z.B. 'depends_on' für Projekte)
default_edge_type = self._get_default_edge_type(current_type) default_edge_type = self._get_default_edge_type(current_type)
# Tracking-Sets für Deduplizierung (Wir merken uns NOTE-IDs, nicht Chunk-IDs)
seen_target_note_ids = set()
# --------------------------------------------------------- # ---------------------------------------------------------
# 1. Exact Match: Finde Titel/Aliases im Text # 1. Exact Match: Titel/Aliases
# --------------------------------------------------------- # ---------------------------------------------------------
known_entities = self._fetch_all_titles_and_aliases() known_entities = self._fetch_all_titles_and_aliases()
found_entities = self._find_entities_in_text(text, known_entities) found_entities = self._find_entities_in_text(text, known_entities)
existing_target_ids = set()
for entity in found_entities: for entity in found_entities:
existing_target_ids.add(entity["id"]) # Duplikate vermeiden
target_title = entity["title"] if entity["id"] in seen_target_note_ids:
suggested_md = f"[[rel:{default_edge_type} {target_title}]]" continue
seen_target_note_ids.add(entity["id"])
suggestions.append({ suggestions.append({
"type": "exact_match", "type": "exact_match",
"text_found": entity["match"], "text_found": entity["match"],
"target_title": target_title, "target_title": entity["title"],
"target_id": entity["id"], "target_id": entity["id"],
"suggested_edge_type": default_edge_type, "suggested_edge_type": default_edge_type,
"suggested_markdown": suggested_md, "suggested_markdown": f"[[rel:{default_edge_type} {entity['title']}]]",
"confidence": 1.0, "confidence": 1.0,
"reason": f"Exakter Treffer: '{entity['match']}'" "reason": f"Exakter Treffer: '{entity['match']}'"
}) })
# --------------------------------------------------------- # ---------------------------------------------------------
# 2. Semantic Match: Sliding Window Analyse # 2. Semantic Match: Sliding Window & Footer Focus
# --------------------------------------------------------- # ---------------------------------------------------------
# Zerlege Text in sinnvolle Abschnitte für das Embedding
search_queries = self._generate_search_queries(text) search_queries = self._generate_search_queries(text)
# Parallel alle Abschnitte suchen # Async parallel abfragen
tasks = [self._get_semantic_suggestions_async(q) for q in search_queries] tasks = [self._get_semantic_suggestions_async(q) for q in search_queries]
results_list = await asyncio.gather(*tasks) results_list = await asyncio.gather(*tasks)
# Ergebnisse zusammenführen # Ergebnisse verarbeiten
seen_semantic_ids = set()
for hits in results_list: for hits in results_list:
for hit in hits: for hit in hits:
# Duplikate filtern (schon als Exact Match oder schon als anderer Semantic Hit) # WICHTIG: Note ID aus Payload holen (Chunk ID ist hit.node_id)
if hit.node_id in existing_target_ids or hit.node_id in seen_semantic_ids: note_id = hit.payload.get("note_id")
# Fallback, falls Payload leer (sollte nicht passieren)
if not note_id:
continue continue
# Schwellwert: Mit 'nomic-embed-text' sind Scores oft schärfer. # 1. Check: Haben wir diese NOTIZ schon? (Egal welcher Chunk)
# 0.50 ist ein guter Startwert für semantische Nähe. if note_id in seen_target_note_ids:
if hit.total_score > 0.50: continue
seen_semantic_ids.add(hit.node_id)
# Titel aus Payload holen (wurde in chunk_payload.py gefixt) # 2. Score Check (Threshold)
target_title = hit.payload.get("title") or hit.node_id if hit.total_score > 0.50:
seen_target_note_ids.add(note_id) # Blockiere weitere Chunks dieser Notiz
target_title = hit.payload.get("title") or "Unbekannt"
suggested_md = f"[[rel:{default_edge_type} {target_title}]]" suggested_md = f"[[rel:{default_edge_type} {target_title}]]"
suggestions.append({ suggestions.append({
"type": "semantic_match", "type": "semantic_match",
"text_found": (hit.source.get("text") or "")[:60] + "...", "text_found": (hit.source.get("text") or "")[:60] + "...",
"target_title": target_title, "target_title": target_title,
"target_id": hit.node_id, "target_id": note_id, # Wir verlinken auf die Notiz, nicht den Chunk
"suggested_edge_type": default_edge_type, "suggested_edge_type": default_edge_type,
"suggested_markdown": suggested_md, "suggested_markdown": suggested_md,
"confidence": round(hit.total_score, 2), "confidence": round(hit.total_score, 2),
"reason": f"Semantisch ähnlich ({hit.total_score:.2f})" "reason": f"Semantisch ähnlich ({hit.total_score:.2f})"
}) })
# Sortieren nach Confidence (Höchste zuerst) # Sortieren nach Confidence
suggestions.sort(key=lambda x: x["confidence"], reverse=True) suggestions.sort(key=lambda x: x["confidence"], reverse=True)
return { return {
"draft_length": len(text), "draft_length": len(text),
"analyzed_windows": len(search_queries), "analyzed_windows": len(search_queries),
"suggestions_count": len(suggestions), "suggestions_count": len(suggestions),
"suggestions": suggestions[:10] # Top 10 reichen "suggestions": suggestions[:10]
} }
# --- Interne Helfer --- # --- Optimierte Sliding Windows ---
def _generate_search_queries(self, text: str) -> List[str]: def _generate_search_queries(self, text: str) -> List[str]:
"""Erzeugt Sliding Windows über den Text.""" """
Erzeugt intelligente Fenster.
Besonderheit: Erzwingt 'Footer-Scan' auch bei kurzen Texten,
damit "Referenzen am Ende" nicht im Kontext untergehen.
"""
text_len = len(text)
if not text: return [] if not text: return []
if len(text) < 600: return [text]
queries = [] queries = []
# 1. Anfang (Kontext)
queries.append(text[:500])
# 2. Mitte # A) Der gesamte Text (oder Anfang) für den groben Kontext
mid = len(text) // 2 # Bei sehr kurzen Texten ist das alles.
queries.append(text[mid-250 : mid+250]) queries.append(text[:600])
# 3. Ende (Fazit) # B) Der "Footer-Scan" (Das Ende)
if len(text) > 800: # Wenn der Text > 150 Zeichen ist, nehmen wir die letzten 200 Zeichen separat.
queries.append(text[-500:]) # Grund: Oft steht dort "Gehört zu Projekt X".
# Wenn wir das isolieren, ist der Vektor "Projekt X" sehr rein.
if text_len > 150:
footer = text[-250:]
# Nur hinzufügen, wenn es sich signifikant vom Start unterscheidet
if footer not in queries:
queries.append(footer)
# C) Sliding Window für lange Texte (> 800 Chars)
if text_len > 800:
window_size = 500
step = 1500
for i in range(window_size, text_len - window_size, step):
end_pos = min(i + window_size, text_len)
chunk = text[i:end_pos]
if len(chunk) > 100:
queries.append(chunk)
return queries return queries
# --- Standard Helper (Unverändert) ---
async def _get_semantic_suggestions_async(self, text: str): async def _get_semantic_suggestions_async(self, text: str):
"""Wrapper um den Retriever (sync)."""
req = QueryRequest(query=text, top_k=5, explain=False) req = QueryRequest(query=text, top_k=5, explain=False)
try: try:
# Hier blockieren wir kurz den Loop, da hybrid_retrieve sync ist.
# In High-Load Szenarien müsste das in einen ThreadPoolExecutor.
res = hybrid_retrieve(req) res = hybrid_retrieve(req)
return res.results return res.results
except Exception as e: except Exception as e:
@ -150,63 +159,41 @@ class DiscoveryService:
if os.path.exists("types.yaml"): path = "types.yaml" if os.path.exists("types.yaml"): path = "types.yaml"
else: return {} else: return {}
try: try:
with open(path, "r", encoding="utf-8") as f: with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {}
return yaml.safe_load(f) or {} except Exception: return {}
except Exception:
return {}
def _get_default_edge_type(self, note_type: str) -> str: def _get_default_edge_type(self, note_type: str) -> str:
types_cfg = self.registry.get("types", {}) types_cfg = self.registry.get("types", {})
type_def = types_cfg.get(note_type, {}) type_def = types_cfg.get(note_type, {})
defaults = type_def.get("edge_defaults") defaults = type_def.get("edge_defaults")
if defaults and isinstance(defaults, list) and len(defaults) > 0: return defaults[0] if defaults else "related_to"
return defaults[0]
return "related_to"
def _fetch_all_titles_and_aliases(self) -> List[Dict]: def _fetch_all_titles_and_aliases(self) -> List[Dict]:
notes = [] notes = []
next_page = None next_page = None
col_name = f"{self.prefix}_notes" col = f"{self.prefix}_notes"
try: try:
while True: while True:
res, next_page = self.client.scroll( res, next_page = self.client.scroll(collection_name=col, limit=1000, offset=next_page, with_payload=True, with_vectors=False)
collection_name=col_name,
limit=1000,
offset=next_page,
with_payload=True,
with_vectors=False
)
for point in res: for point in res:
pl = point.payload or {} pl = point.payload or {}
aliases = pl.get("aliases") or [] aliases = pl.get("aliases") or []
if isinstance(aliases, str): aliases = [aliases] if isinstance(aliases, str): aliases = [aliases]
notes.append({"id": pl.get("note_id"), "title": pl.get("title"), "aliases": aliases})
notes.append({
"id": pl.get("note_id"),
"title": pl.get("title"),
"aliases": aliases
})
if next_page is None: break if next_page is None: break
except Exception as e: except Exception: pass
logger.error(f"Error fetching titles: {e}")
return []
return notes return notes
def _find_entities_in_text(self, text: str, entities: List[Dict]) -> List[Dict]: def _find_entities_in_text(self, text: str, entities: List[Dict]) -> List[Dict]:
found = [] found = []
text_lower = text.lower() text_lower = text.lower()
for entity in entities: for entity in entities:
# Title
title = entity.get("title") title = entity.get("title")
if title and title.lower() in text_lower: if title and title.lower() in text_lower:
found.append({"match": title, "title": title, "id": entity["id"]}) found.append({"match": title, "title": title, "id": entity["id"]})
continue continue
# Aliases for alias in entity.get("aliases", []):
aliases = entity.get("aliases", []) if str(alias).lower() in text_lower:
for alias in aliases:
if alias and str(alias).lower() in text_lower:
found.append({"match": alias, "title": title, "id": entity["id"]}) found.append({"match": alias, "title": title, "id": entity["id"]})
break break
return found return found