mindnet/app/services/discovery.py
2025-12-11 11:11:01 +01:00

148 lines
6.0 KiB
Python

"""
app/services/discovery.py
Service für Link-Vorschläge und Knowledge-Discovery (WP-11).
Adaptiert für Async-Architecture (v2.4).
"""
import logging
import os
from typing import List, Dict, Any
import yaml
# Wir nutzen hier weiterhin die Low-Level Funktionen, da diese stabil sind
from app.core.qdrant import QdrantConfig, get_client
from app.models.dto import QueryRequest
from app.core.retriever import hybrid_retrieve
logger = logging.getLogger(__name__)
class DiscoveryService:
def __init__(self, collection_prefix: str = None):
self.cfg = QdrantConfig.from_env()
self.prefix = collection_prefix or self.cfg.prefix or "mindnet"
self.client = get_client(self.cfg)
self.registry = self._load_type_registry()
async def analyze_draft(self, text: str, current_type: str) -> Dict[str, Any]:
"""
Analysiert einen Draft-Text und schlägt Verlinkungen vor.
Kombiniert Exact Match (Titel/Alias) und Semantic Match.
"""
suggestions = []
default_edge_type = self._get_default_edge_type(current_type)
# 1. Exact Match: Finde Begriffe im Text, die als Notiz-Titel existieren
# (Dies läuft synchron, ist aber sehr schnell durch Qdrant Scroll)
known_entities = self._fetch_all_titles_and_aliases()
found_entities = self._find_entities_in_text(text, known_entities)
existing_target_ids = set()
for entity in found_entities:
existing_target_ids.add(entity["id"])
target_title = entity["title"]
suggested_md = f"[[rel:{default_edge_type} {target_title}]]"
suggestions.append({
"type": "exact_match",
"text_found": entity["match"],
"target_title": target_title,
"target_id": entity["id"],
"suggested_edge_type": default_edge_type,
"suggested_markdown": suggested_md,
"confidence": 1.0,
"reason": f"Exakter Treffer (Default für '{current_type}': {default_edge_type})"
})
# 2. Semantic Match: Finde inhaltlich ähnliche Notizen
# Wir filtern Ergebnisse heraus, die wir schon per Exact Match gefunden haben.
semantic_hits = await self._get_semantic_suggestions_async(text)
for hit in semantic_hits:
if hit.node_id in existing_target_ids:
continue
if hit.total_score > 0.65:
# FIX: Titel aus Payload lesen, nicht ID!
target_title = hit.payload.get("title") or hit.node_id
suggested_md = f"[[rel:{default_edge_type} {target_title}]]"
suggestions.append({
"type": "semantic_match",
"text_found": (hit.source.get("text") or "")[:50] + "...",
"target_title": target_title,
"target_id": hit.node_id,
"suggested_edge_type": default_edge_type,
"suggested_markdown": suggested_md,
"confidence": round(hit.total_score, 2),
"reason": f"Semantische Ähnlichkeit ({hit.total_score:.2f})"
})
return {
"draft_length": len(text),
"suggestions_count": len(suggestions),
"suggestions": suggestions
}
# --- Helpers ---
async def _get_semantic_suggestions_async(self, text: str):
"""Async Wrapper um den Hybrid Retriever."""
req = QueryRequest(query=text, top_k=5, explain=False)
try:
# Da hybrid_retrieve (noch) sync ist, rufen wir es direkt auf.
# In einer voll-async Umgebung würde man dies in einen Thread-Pool auslagern,
# aber da Qdrant-Client sync ist, ist das hier okay.
res = hybrid_retrieve(req)
return res.results
except Exception as e:
logger.error(f"Semantic suggestion failed: {e}")
return []
def _load_type_registry(self) -> dict:
path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
if not os.path.exists(path):
if os.path.exists("types.yaml"): path = "types.yaml"
else: return {}
try:
with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {}
except Exception: return {}
def _get_default_edge_type(self, note_type: str) -> str:
types_cfg = self.registry.get("types", {})
type_def = types_cfg.get(note_type, {})
defaults = type_def.get("edge_defaults")
if defaults and isinstance(defaults, list) and len(defaults) > 0:
return defaults[0]
return "related_to"
def _fetch_all_titles_and_aliases(self) -> List[Dict]:
notes = []
next_page = None
col_name = f"{self.prefix}_notes"
try:
while True:
res, next_page = self.client.scroll(collection_name=col_name, limit=1000, offset=next_page, with_payload=True, with_vectors=False)
for point in res:
pl = point.payload or {}
aliases = pl.get("aliases") or []
if isinstance(aliases, str): aliases = [aliases]
notes.append({"id": pl.get("note_id"), "title": pl.get("title"), "aliases": aliases})
if next_page is None: break
except Exception: return []
return notes
def _find_entities_in_text(self, text: str, entities: List[Dict]) -> List[Dict]:
found = []
text_lower = text.lower()
for entity in entities:
title = entity.get("title")
if title and title.lower() in text_lower:
found.append({"match": title, "title": title, "id": entity["id"]})
continue
aliases = entity.get("aliases", [])
for alias in aliases:
if alias and str(alias).lower() in text_lower:
found.append({"match": alias, "title": title, "id": entity["id"]})
break
return found