mindnet/app/services/discovery.py

242 lines
9.7 KiB
Python

"""
FILE: app/services/discovery.py
DESCRIPTION: Service für WP-11 (Discovery API). Analysiert Entwürfe, findet Entitäten
und schlägt typisierte Verbindungen basierend auf der Topologie vor.
WP-24c: Vollständige Umstellung auf EdgeRegistry für dynamische Vorschläge.
WP-15b: Unterstützung für hybride Suche und Alias-Erkennung.
VERSION: 1.1.0 (WP-24c: Full Registry Integration & Audit Fix)
STATUS: Active
COMPATIBILITY: 100% (Identische API-Signatur wie v0.6.0)
"""
import logging
import asyncio
import os
from typing import List, Dict, Any, Optional, Set
import yaml
from app.core.database.qdrant import QdrantConfig, get_client
from app.models.dto import QueryRequest
from app.core.retrieval.retriever import hybrid_retrieve
# WP-24c: Zentrale Topologie-Quelle
from app.services.edge_registry import registry as edge_registry
logger = logging.getLogger(__name__)
class DiscoveryService:
def __init__(self, collection_prefix: str = None):
"""Initialisiert den Discovery Service mit Qdrant-Anbindung."""
self.cfg = QdrantConfig.from_env()
self.prefix = collection_prefix or self.cfg.prefix or "mindnet"
self.client = get_client(self.cfg)
# Die Registry wird für Typ-Metadaten geladen (Schema-Validierung)
self.registry = self._load_type_registry()
async def analyze_draft(self, text: str, current_type: str) -> Dict[str, Any]:
"""
Analysiert einen Textentwurf auf potenzielle Verbindungen.
1. Findet exakte Treffer (Titel/Aliasse).
2. Führt semantische Suchen für verschiedene Textabschnitte aus.
3. Schlägt topologisch korrekte Kanten-Typen vor.
"""
if not text or len(text.strip()) < 3:
return {"suggestions": [], "status": "empty_input"}
suggestions = []
seen_target_ids = set()
# --- PHASE 1: EXACT MATCHES (TITEL & ALIASSE) ---
# Lädt alle bekannten Titel/Aliasse für einen schnellen Scan
known_entities = self._fetch_all_titles_and_aliases()
exact_matches = self._find_entities_in_text(text, known_entities)
for entity in exact_matches:
target_id = entity["id"]
if target_id in seen_target_ids:
continue
seen_target_ids.add(target_id)
target_type = entity.get("type", "concept")
# WP-24c: Dynamische Kanten-Ermittlung statt Hardcoded Matrix
suggested_kind = self._resolve_edge_type(current_type, target_type)
suggestions.append({
"type": "exact_match",
"text_found": entity["match"],
"target_title": entity["title"],
"target_id": target_id,
"suggested_edge_type": suggested_kind,
"suggested_markdown": f"[[rel:{suggest_kind} {entity['title']}]]",
"confidence": 1.0,
"reason": f"Direkte Erwähnung von '{entity['match']}' ({target_type})"
})
# --- PHASE 2: SEMANTIC MATCHES (VECTOR SEARCH) ---
# Erzeugt Suchanfragen für verschiedene Fenster des Textes
search_queries = self._generate_search_queries(text)
# Parallele Ausführung der Suchanfragen (Cloud-Performance)
tasks = [self._get_semantic_suggestions_async(q) for q in search_queries]
results_list = await asyncio.gather(*tasks)
for hits in results_list:
for hit in hits:
payload = hit.payload or {}
target_id = payload.get("note_id")
if not target_id or target_id in seen_target_ids:
continue
# Relevanz-Threshold (Modell-spezifisch für nomic)
if hit.total_score > 0.55:
seen_target_ids.add(target_id)
target_type = payload.get("type", "concept")
target_title = payload.get("title") or "Unbenannt"
# WP-24c: Nutzung der Topologie-Engine
suggested_kind = self._resolve_edge_type(current_type, target_type)
suggestions.append({
"type": "semantic_match",
"text_found": (hit.source.get("text") or "")[:80] + "...",
"target_title": target_title,
"target_id": target_id,
"suggested_edge_type": suggested_kind,
"suggested_markdown": f"[[rel:{suggested_kind} {target_title}]]",
"confidence": round(hit.total_score, 2),
"reason": f"Semantischer Bezug zu {target_type} ({int(hit.total_score*100)}%)"
})
# Sortierung nach Konfidenz
suggestions.sort(key=lambda x: x["confidence"], reverse=True)
return {
"draft_length": len(text),
"analyzed_windows": len(search_queries),
"suggestions_count": len(suggestions),
"suggestions": suggestions[:12] # Top 12 Vorschläge
}
# --- LOGIK-ZENTRALE (WP-24c) ---
def _resolve_edge_type(self, source_type: str, target_type: str) -> str:
"""
Ermittelt den optimalen Kanten-Typ zwischen zwei Notiz-Typen.
Nutzt EdgeRegistry (graph_schema.md) statt lokaler Matrix.
"""
# 1. Spezifische Prüfung: Gibt es eine Regel für Source -> Target?
info = edge_registry.get_topology_info(source_type, target_type)
typical = info.get("typical", [])
if typical:
return typical[0] # Erster Vorschlag aus dem Schema
# 2. Fallback: Was ist für den Quell-Typ generell typisch? (Source -> any)
info_fallback = edge_registry.get_topology_info(source_type, "any")
typical_fallback = info_fallback.get("typical", [])
if typical_fallback:
return typical_fallback[0]
# 3. Globaler Fallback (Sicherheitsnetz)
return "related_to"
# --- HELPERS (VOLLSTÄNDIG ERHALTEN) ---
def _generate_search_queries(self, text: str) -> List[str]:
"""Erzeugt überlappende Fenster für die Vektorsuche (Sliding Window)."""
text_len = len(text)
queries = []
# Fokus A: Dokument-Anfang (Kontext)
queries.append(text[:600])
# Fokus B: Dokument-Ende (Aktueller Schreibfokus)
if text_len > 250:
footer = text[-350:]
if footer not in queries:
queries.append(footer)
# Fokus C: Zwischenabschnitte bei langen Texten
if text_len > 1200:
window_size = 500
step = 1200
for i in range(600, text_len - 400, step):
chunk = text[i:i+window_size]
if len(chunk) > 100:
queries.append(chunk)
return queries
async def _get_semantic_suggestions_async(self, text: str):
"""Führt eine asynchrone Vektorsuche über den Retriever aus."""
req = QueryRequest(query=text, top_k=6, explain=False)
try:
# Nutzt hybrid_retrieve (WP-15b Standard)
res = hybrid_retrieve(req)
return res.results
except Exception as e:
logger.error(f"Discovery retrieval error: {e}")
return []
def _load_type_registry(self) -> dict:
"""Lädt die types.yaml für Typ-Definitionen."""
path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
if not os.path.exists(path):
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception:
return {}
def _fetch_all_titles_and_aliases(self) -> List[Dict]:
"""Holt alle Note-IDs, Titel und Aliasse für den Exakt-Match Abgleich."""
entities = []
next_page = None
col = f"{self.prefix}_notes"
try:
while True:
res, next_page = self.client.scroll(
collection_name=col, limit=1000, offset=next_page,
with_payload=True, with_vectors=False
)
for point in res:
pl = point.payload or {}
aliases = pl.get("aliases") or []
if isinstance(aliases, str):
aliases = [aliases]
entities.append({
"id": pl.get("note_id"),
"title": pl.get("title"),
"aliases": aliases,
"type": pl.get("type", "concept")
})
if next_page is None:
break
except Exception as e:
logger.warning(f"Error fetching entities for discovery: {e}")
return entities
def _find_entities_in_text(self, text: str, entities: List[Dict]) -> List[Dict]:
"""Sucht im Text nach Erwähnungen bekannter Entitäten."""
found = []
text_lower = text.lower()
for entity in entities:
title = entity.get("title")
# Titel-Check
if title and title.lower() in text_lower:
found.append({
"match": title, "title": title,
"id": entity["id"], "type": entity["type"]
})
continue
# Alias-Check
for alias in entity.get("aliases", []):
if str(alias).lower() in text_lower:
found.append({
"match": str(alias), "title": title,
"id": entity["id"], "type": entity["type"]
})
break
return found