This commit is contained in:
Lars 2025-12-11 11:11:01 +01:00
parent 5aae33f578
commit fd47a954bd
2 changed files with 56 additions and 147 deletions

View File

@ -1,20 +1,19 @@
"""
app/routers/ingest.py
API-Endpunkte für WP-11 (Discovery & Persistence).
Fixed Async/Await Issues.
Fixed Async/Await Integration with Discovery Service.
"""
import os
import time
import logging
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
from typing import Optional, Dict, Any
from app.core.ingestion import IngestionService
from app.core.retriever import Retriever
from app.models.dto import QueryRequest
# WICHTIG: Wir nutzen wieder den spezialisierten DiscoveryService
from app.services.discovery import DiscoveryService
# Logger Konfiguration
logger = logging.getLogger(__name__)
router = APIRouter()
@ -36,51 +35,20 @@ class SaveResponse(BaseModel):
note_id: str
stats: Dict[str, Any]
# --- Services ---
discovery_service = DiscoveryService()
# --- Endpoints ---
@router.post("/analyze")
async def analyze_draft(req: AnalyzeRequest):
"""
WP-11 Intelligence: Liefert Link-Vorschläge via Retriever.
WP-11 Intelligence: Liefert Link-Vorschläge (Exact + Semantic).
"""
try:
# Wir nutzen den Retriever direkt, statt eines extra DiscoveryServices
retriever = Retriever()
suggestions = []
# 1. Suche nach ähnlichen Inhalten (Semantic)
query_text = req.text[:400]
if not query_text.strip():
return {"suggestions": []}
# Check ob Retriever async ist (in v2.3 oft ja)
if hasattr(retriever.search, '__await__'):
hits_result = await retriever.search(QueryRequest(query=query_text, top_k=5, mode="hybrid"))
else:
# Fallback sync
hits_result = await retriever.search(QueryRequest(query=query_text, top_k=5, mode="hybrid"))
seen_titles = set()
for hit in hits_result.results:
title = hit.payload.get("note_id") or hit.node_id
if not title or title in seen_titles: continue
seen_titles.add(title)
# Simple Edge Logic
edge_kind = "related_to"
if req.type == "project": edge_kind = "depends_on"
if req.type == "decision": edge_kind = "references"
if hit.total_score > 0.65:
suggestions.append({
"target_title": title,
"target_id": hit.node_id,
"suggested_markdown": f"[[rel:{edge_kind} {title}]]",
"reason": f"Semantisch ähnlich ({hit.total_score:.2f})",
"type": "semantic"
})
return {"suggestions": suggestions}
# Wir delegieren an den Service, der Exact Matching, Config und Semantik beherrscht
result = await discovery_service.analyze_draft(req.text, req.type)
return result
except Exception as e:
logger.error(f"Analyze failed: {e}", exc_info=True)
@ -97,21 +65,22 @@ async def save_note(req: SaveRequest):
abs_vault_root = os.path.abspath(vault_root)
if not os.path.exists(abs_vault_root):
# Versuche ihn zu erstellen, falls er fehlt
os.makedirs(abs_vault_root, exist_ok=True)
try:
os.makedirs(abs_vault_root, exist_ok=True)
except Exception:
raise HTTPException(status_code=500, detail=f"Vault root missing and cannot create: {abs_vault_root}")
# 2. Filename
final_filename = req.filename
if not final_filename:
final_filename = f"draft_{int(time.time())}.md"
# 3. Ingestion Service
# 3. Ingestion Service (Async)
ingest_service = IngestionService()
logger.info(f"Saving {final_filename} to {req.folder}")
# --- DER FIX: AWAIT HINZUFÜGEN ---
# Da ingestion.py auf async umgestellt wurde, MÜSSEN wir hier warten.
# Async Call zum Ingestion Service
result = await ingest_service.create_from_text(
markdown_content=req.markdown_content,
filename=final_filename,
@ -119,7 +88,6 @@ async def save_note(req: SaveRequest):
folder=req.folder
)
# Fehlerprüfung auf dem Dictionary
if result.get("status") == "error":
raise HTTPException(status_code=500, detail=result.get("error"))

View File

@ -1,14 +1,14 @@
"""
app/services/discovery.py
Service für Link-Vorschläge und Knowledge-Discovery (WP-11).
Analysiert Drafts auf Keywords und semantische Ähnlichkeiten.
Implementiert 'Late Binding' für Edge-Typen via types.yaml.
Adaptiert für Async-Architecture (v2.4).
"""
import logging
import os
from typing import List, Dict, Any, Optional
from typing import List, Dict, Any
import yaml
# Wir nutzen hier weiterhin die Low-Level Funktionen, da diese stabil sind
from app.core.qdrant import QdrantConfig, get_client
from app.models.dto import QueryRequest
from app.core.retriever import hybrid_retrieve
@ -17,27 +17,21 @@ logger = logging.getLogger(__name__)
class DiscoveryService:
def __init__(self, collection_prefix: str = None):
# 1. Config laden
self.cfg = QdrantConfig.from_env()
# Prefix Priorität: Argument > Env > Default
self.prefix = collection_prefix or self.cfg.prefix or "mindnet"
self.client = get_client(self.cfg)
# 2. Registry für Late Binding laden (Edge Defaults)
self.registry = self._load_type_registry()
async def analyze_draft(self, text: str, current_type: str) -> Dict[str, Any]:
"""
Analysiert einen Draft-Text und schlägt Verlinkungen vor.
Nutzt 'types.yaml' um den passenden Edge-Typ vorzuschlagen.
Kombiniert Exact Match (Titel/Alias) und Semantic Match.
"""
suggestions = []
# Welcher Edge-Typ ist für diesen Draft-Typ (z.B. 'project') der Standard?
# Late Binding: Wir schauen in die Config, statt es zu hardcoden.
default_edge_type = self._get_default_edge_type(current_type)
# 1. Exact Match: Finde Begriffe im Text, die als Notiz-Titel existieren
# (Dies läuft synchron, ist aber sehr schnell durch Qdrant Scroll)
known_entities = self._fetch_all_titles_and_aliases()
found_entities = self._find_entities_in_text(text, known_entities)
@ -45,10 +39,7 @@ class DiscoveryService:
for entity in found_entities:
existing_target_ids.add(entity["id"])
# Vorschlag generieren
target_title = entity["title"]
# Markdown-Vorschlag: [[rel:depends_on Ziel]]
suggested_md = f"[[rel:{default_edge_type} {target_title}]]"
suggestions.append({
@ -59,21 +50,21 @@ class DiscoveryService:
"suggested_edge_type": default_edge_type,
"suggested_markdown": suggested_md,
"confidence": 1.0,
"reason": f"Existierender Titel (Default für '{current_type}': {default_edge_type})"
"reason": f"Exakter Treffer (Default für '{current_type}': {default_edge_type})"
})
# 2. Semantic Match: Finde inhaltlich ähnliche Notizen
semantic_hits = self._get_semantic_suggestions(text)
# Wir filtern Ergebnisse heraus, die wir schon per Exact Match gefunden haben.
semantic_hits = await self._get_semantic_suggestions_async(text)
for hit in semantic_hits:
if hit.node_id in existing_target_ids:
continue
if hit.total_score > 0.65:
# Bei semantischen Treffern ist 'related_to' oft sicherer als 'depends_on',
# es sei denn, die Config erzwingt etwas anderes.
# Wir bleiben hier beim Config-Default, um konsistent zu sein.
target_title = hit.payload.get("title", "Unbekannt")
# FIX: Titel aus Payload lesen, nicht ID!
target_title = hit.payload.get("title") or hit.node_id
suggested_md = f"[[rel:{default_edge_type} {target_title}]]"
suggestions.append({
@ -84,124 +75,74 @@ class DiscoveryService:
"suggested_edge_type": default_edge_type,
"suggested_markdown": suggested_md,
"confidence": round(hit.total_score, 2),
"reason": f"Semantische Ähnlichkeit (Score: {round(hit.total_score, 2)})"
"reason": f"Semantische Ähnlichkeit ({hit.total_score:.2f})"
})
return {
"draft_length": len(text),
"draft_type": current_type,
"default_strategy": default_edge_type,
"suggestions_count": len(suggestions),
"suggestions": suggestions
}
# --- Configuration & Late Binding Helpers ---
# --- Helpers ---
async def _get_semantic_suggestions_async(self, text: str):
"""Async Wrapper um den Hybrid Retriever."""
req = QueryRequest(query=text, top_k=5, explain=False)
try:
# Da hybrid_retrieve (noch) sync ist, rufen wir es direkt auf.
# In einer voll-async Umgebung würde man dies in einen Thread-Pool auslagern,
# aber da Qdrant-Client sync ist, ist das hier okay.
res = hybrid_retrieve(req)
return res.results
except Exception as e:
logger.error(f"Semantic suggestion failed: {e}")
return []
def _load_type_registry(self) -> dict:
"""Lädt die types.yaml für Konfigurations-Zugriffe."""
path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
if not os.path.exists(path):
# Fallback relative Pfade
if os.path.exists("types.yaml"):
path = "types.yaml"
elif os.path.exists("../config/types.yaml"):
path = "../config/types.yaml"
else:
return {}
if os.path.exists("types.yaml"): path = "types.yaml"
else: return {}
try:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.warning(f"Failed to load types registry: {e}")
return {}
with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {}
except Exception: return {}
def _get_default_edge_type(self, note_type: str) -> str:
"""
Ermittelt den bevorzugten Kanten-Typ für einen gegebenen Notiz-Typ.
Logik: types.yaml -> types -> {note_type} -> edge_defaults[0]
Fallback: 'related_to'
"""
# 1. Config für den Typ laden
types_cfg = self.registry.get("types", {})
type_def = types_cfg.get(note_type, {})
# 2. Defaults prüfen
defaults = type_def.get("edge_defaults")
if defaults and isinstance(defaults, list) and len(defaults) > 0:
# Wir nehmen den ersten Default als "Haupt-Beziehung"
return defaults[0]
# 3. Fallback, falls nichts konfiguriert ist
return "related_to"
# --- Core Logic ---
def _fetch_all_titles_and_aliases(self) -> List[Dict]:
notes = []
next_page = None
col_name = f"{self.prefix}_notes"
try:
while True:
res, next_page = self.client.scroll(
collection_name=col_name,
limit=1000,
offset=next_page,
with_payload=True,
with_vectors=False
)
res, next_page = self.client.scroll(collection_name=col_name, limit=1000, offset=next_page, with_payload=True, with_vectors=False)
for point in res:
pl = point.payload or {}
# Aliases robust lesen (kann Liste oder String sein)
aliases = pl.get("aliases") or []
if isinstance(aliases, str):
aliases = [aliases]
notes.append({
"id": pl.get("note_id"),
"title": pl.get("title"),
"aliases": aliases
})
if next_page is None:
break
except Exception as e:
logger.error(f"Error fetching titles: {e}")
return []
if isinstance(aliases, str): aliases = [aliases]
notes.append({"id": pl.get("note_id"), "title": pl.get("title"), "aliases": aliases})
if next_page is None: break
except Exception: return []
return notes
def _find_entities_in_text(self, text: str, entities: List[Dict]) -> List[Dict]:
found = []
text_lower = text.lower()
for entity in entities:
# 1. Title Match
title = entity.get("title")
if title and title.lower() in text_lower:
found.append({
"match": title,
"title": title,
"id": entity["id"]
})
found.append({"match": title, "title": title, "id": entity["id"]})
continue
# 2. Alias Match
aliases = entity.get("aliases", [])
for alias in aliases:
if alias and str(alias).lower() in text_lower:
found.append({
"match": alias,
"title": title,
"id": entity["id"]
})
found.append({"match": alias, "title": title, "id": entity["id"]})
break
return found
def _get_semantic_suggestions(self, text: str):
req = QueryRequest(query=text, top_k=5, explain=False)
try:
res = hybrid_retrieve(req)
return res.results
except Exception as e:
logger.error(f"Semantic suggestion error: {e}")
return []