From 52210a91fd693758959d990325392642d08abd09 Mon Sep 17 00:00:00 2001 From: Lars Date: Thu, 11 Dec 2025 13:50:40 +0100 Subject: [PATCH] update aus WP11 --- app/routers/ingest.py | 99 ++++++---------------- app/services/discovery.py | 132 ++++++++++++++++++------------ app/services/embeddings_client.py | 97 ++++++++++------------ 3 files changed, 147 insertions(+), 181 deletions(-) diff --git a/app/routers/ingest.py b/app/routers/ingest.py index a1407b3..d40b529 100644 --- a/app/routers/ingest.py +++ b/app/routers/ingest.py @@ -1,6 +1,7 @@ """ app/routers/ingest.py API-Endpunkte für WP-11 (Discovery & Persistence). +Delegiert an Services. """ import os import time @@ -10,17 +11,13 @@ from pydantic import BaseModel from typing import Optional, Dict, Any from app.core.ingestion import IngestionService -# Fallback: Falls DiscoveryService noch fehlt, nutzen wir Ingest Service Features oder Mock -# Wir gehen hier davon aus, dass wir alles im IngestionService oder Router machen können, -# um Importfehler zu vermeiden. -from app.core.retriever import Retriever -from app.models.dto import QueryRequest +from app.services.discovery import DiscoveryService logger = logging.getLogger(__name__) - router = APIRouter() -# --- DTOs --- +# Services Init (Global oder via Dependency Injection) +discovery_service = DiscoveryService() class AnalyzeRequest(BaseModel): text: str @@ -37,102 +34,56 @@ class SaveResponse(BaseModel): note_id: str stats: Dict[str, Any] -# --- Endpoints --- - @router.post("/analyze") async def analyze_draft(req: AnalyzeRequest): """ - WP-11 Intelligence: Liefert Link-Vorschläge. - Implementiert direkt hier, um Abhängigkeiten zu reduzieren. + WP-11 Intelligence: Liefert Link-Vorschläge via DiscoveryService. """ try: - retriever = Retriever() - suggestions = [] - - query_text = req.text[:400] - if not query_text.strip(): - return {"suggestions": []} - - # 1. Semantic Search - # Safe async call check - if hasattr(retriever.search, '__await__'): - hits_result = await retriever.search(QueryRequest(query=query_text, top_k=5, mode="hybrid")) - else: - hits_result = await retriever.search(QueryRequest(query=query_text, top_k=5, mode="hybrid")) - - seen_titles = set() - for hit in hits_result.results: - # Titel ermitteln - title = hit.payload.get("note_id") or hit.node_id - if not title or title in seen_titles: continue - seen_titles.add(title) - - edge_kind = "related_to" - if req.type == "project": edge_kind = "depends_on" - if req.type == "decision": edge_kind = "references" - - # Score Threshold - if hit.total_score > 0.4: # Etwas toleranter - suggestions.append({ - "target_title": title, - "target_id": hit.node_id, - "suggested_markdown": f"[[rel:{edge_kind} {title}]]", - "reason": f"Semantisch ähnlich ({hit.total_score:.2f})", - "type": "semantic" - }) - - return {"suggestions": suggestions} - + # Hier rufen wir jetzt den verbesserten Service auf + result = await discovery_service.analyze_draft(req.text, req.type) + return result except Exception as e: logger.error(f"Analyze failed: {e}", exc_info=True) - # Kein 500er werfen, lieber leere Liste, damit UI nicht crasht return {"suggestions": [], "error": str(e)} @router.post("/save", response_model=SaveResponse) async def save_note(req: SaveRequest): """ - WP-11 Persistence: Speichert Markdown physisch und indiziert es sofort. + WP-11 Persistence: Speichert und indiziert. """ try: vault_root = os.getenv("MINDNET_VAULT_ROOT", "./vault") abs_vault_root = os.path.abspath(vault_root) if not os.path.exists(abs_vault_root): - os.makedirs(abs_vault_root, exist_ok=True) - - final_filename = req.filename - if not final_filename: - final_filename = f"draft_{int(time.time())}.md" + try: os.makedirs(abs_vault_root, exist_ok=True) + except: pass + final_filename = req.filename or f"draft_{int(time.time())}.md" ingest_service = IngestionService() - logger.info(f"Saving {final_filename} to {req.folder}") - - # --- AWAIT WICHTIG! --- - # Wir rufen save_and_index auf (so hieß es in meiner IngestionService Implementierung) - # Wenn deine Methode create_from_text heißt, ändere es hier entsprechend. - # Ich nutze hier save_and_index als Standard aus WP-11. + # Async Call + result = await ingest_service.create_from_text( + markdown_content=req.markdown_content, + filename=final_filename, + vault_root=abs_vault_root, + folder=req.folder + ) - if hasattr(ingest_service, 'save_and_index'): - result = await ingest_service.save_and_index(req.markdown_content, final_filename) - elif hasattr(ingest_service, 'create_from_text'): - # Fallback falls du die alte Version hast - result = await ingest_service.create_from_text(req.markdown_content, final_filename, abs_vault_root, req.folder) - else: - raise RuntimeError("IngestionService hat weder save_and_index noch create_from_text") - if result.get("status") == "error": raise HTTPException(status_code=500, detail=result.get("error")) return SaveResponse( status="success", - file_path=result.get("file_path") or result.get("path", "unknown"), + file_path=result.get("path", "unknown"), note_id=result.get("note_id", "unknown"), - stats=result.get("stats", {}) + stats={ + "chunks": result.get("chunks_count", 0), + "edges": result.get("edges_count", 0) + } ) - - except HTTPException as he: - raise he + except HTTPException as he: raise he except Exception as e: logger.error(f"Save failed: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Save failed: {str(e)}") \ No newline at end of file diff --git a/app/services/discovery.py b/app/services/discovery.py index 7d25941..adbafc8 100644 --- a/app/services/discovery.py +++ b/app/services/discovery.py @@ -1,14 +1,12 @@ """ app/services/discovery.py -Service für Link-Vorschläge und Knowledge-Discovery (WP-11). -Adaptiert für Async-Architecture (v2.4). +Updated for WP-11: Sliding Window Analysis. """ import logging -import os +import asyncio from typing import List, Dict, Any import yaml -# Wir nutzen hier weiterhin die Low-Level Funktionen, da diese stabil sind from app.core.qdrant import QdrantConfig, get_client from app.models.dto import QueryRequest from app.core.retriever import hybrid_retrieve @@ -24,81 +22,110 @@ class DiscoveryService: async def analyze_draft(self, text: str, current_type: str) -> Dict[str, Any]: """ - Analysiert einen Draft-Text und schlägt Verlinkungen vor. - Kombiniert Exact Match (Titel/Alias) und Semantic Match. + Analysiert den Draft mit Sliding Window Strategie. """ suggestions = [] default_edge_type = self._get_default_edge_type(current_type) - # 1. Exact Match: Finde Begriffe im Text, die als Notiz-Titel existieren - # (Dies läuft synchron, ist aber sehr schnell durch Qdrant Scroll) + # 1. Exact Match (Läuft über gesamten Text, schnell genug) known_entities = self._fetch_all_titles_and_aliases() found_entities = self._find_entities_in_text(text, known_entities) existing_target_ids = set() - for entity in found_entities: existing_target_ids.add(entity["id"]) - target_title = entity["title"] - suggested_md = f"[[rel:{default_edge_type} {target_title}]]" - suggestions.append({ "type": "exact_match", "text_found": entity["match"], - "target_title": target_title, + "target_title": entity["title"], "target_id": entity["id"], "suggested_edge_type": default_edge_type, - "suggested_markdown": suggested_md, + "suggested_markdown": f"[[rel:{default_edge_type} {entity['title']}]]", "confidence": 1.0, - "reason": f"Exakter Treffer (Default für '{current_type}': {default_edge_type})" + "reason": f"Exakter Treffer ({entity['match']})" }) - # 2. Semantic Match: Finde inhaltlich ähnliche Notizen - # Wir filtern Ergebnisse heraus, die wir schon per Exact Match gefunden haben. - semantic_hits = await self._get_semantic_suggestions_async(text) + # 2. Semantic Match (Sliding Window) + # Wir zerlegen den Text in relevante Chunks, um Token-Limits zu umgehen + # und Fokus zu streuen. + search_queries = self._generate_search_queries(text) - for hit in semantic_hits: - if hit.node_id in existing_target_ids: - continue - - if hit.total_score > 0.65: - # FIX: Titel aus Payload lesen, nicht ID! - target_title = hit.payload.get("title") or hit.node_id + # Parallel Execution für alle Queries + tasks = [self._get_semantic_suggestions_async(q) for q in search_queries] + results_list = await asyncio.gather(*tasks) + + # Ergebnisse mergen und deduplizieren + seen_semantic_ids = set() + + for hits in results_list: + for hit in hits: + if hit.node_id in existing_target_ids or hit.node_id in seen_semantic_ids: + continue - suggested_md = f"[[rel:{default_edge_type} {target_title}]]" - - suggestions.append({ - "type": "semantic_match", - "text_found": (hit.source.get("text") or "")[:50] + "...", - "target_title": target_title, - "target_id": hit.node_id, - "suggested_edge_type": default_edge_type, - "suggested_markdown": suggested_md, - "confidence": round(hit.total_score, 2), - "reason": f"Semantische Ähnlichkeit ({hit.total_score:.2f})" - }) + # Threshold Tuning: Bei 'nomic' sind Scores oft niedriger (0.4-0.6 ist schon gut) + # Wir setzen ihn moderat auf 0.50 + if hit.total_score > 0.50: + seen_semantic_ids.add(hit.node_id) + target_title = hit.payload.get("title") or hit.node_id + + suggestions.append({ + "type": "semantic_match", + "text_found": (hit.source.get("text") or "")[:60] + "...", + "target_title": target_title, + "target_id": hit.node_id, + "suggested_edge_type": default_edge_type, + "suggested_markdown": f"[[rel:{default_edge_type} {target_title}]]", + "confidence": round(hit.total_score, 2), + "reason": f"Semantisch ähnlich ({hit.total_score:.2f})" + }) + + # Sortieren nach Confidence + suggestions.sort(key=lambda x: x["confidence"], reverse=True) return { "draft_length": len(text), "suggestions_count": len(suggestions), - "suggestions": suggestions + "suggestions": suggestions[:10] # Limit auf Top 10 } # --- Helpers --- + def _generate_search_queries(self, text: str) -> List[str]: + """ + Zerlegt den Text in bis zu 3 Such-Queries: + 1. Der Anfang (Kontext/Einleitung) + 2. Die Mitte (Details) + 3. Das Ende (Fazit/Zusammenfassung) + """ + if len(text) < 600: + return [text] + + queries = [] + # Query 1: Die ersten 400 Zeichen + queries.append(text[:400]) + + # Query 2: Ein Fenster aus der Mitte + mid = len(text) // 2 + queries.append(text[mid-200 : mid+200]) + + # Query 3: Die letzten 400 Zeichen + if len(text) > 800: + queries.append(text[-400:]) + + return queries + async def _get_semantic_suggestions_async(self, text: str): - """Async Wrapper um den Hybrid Retriever.""" + # Nutzt hybrid_retrieve (sync), aber hier in Async Context okay req = QueryRequest(query=text, top_k=5, explain=False) try: - # Da hybrid_retrieve (noch) sync ist, rufen wir es direkt auf. - # In einer voll-async Umgebung würde man dies in einen Thread-Pool auslagern, - # aber da Qdrant-Client sync ist, ist das hier okay. res = hybrid_retrieve(req) return res.results - except Exception as e: - logger.error(f"Semantic suggestion failed: {e}") - return [] + except Exception: return [] + # ... (Restliche Methoden wie _fetch_all_titles_and_aliases bleiben gleich) ... + # Füge hier die Methoden aus dem vorherigen Artefakt ein (fetch_all..., find_entities..., load_type...) + # Der Kürze halber lasse ich sie im Snippet weg, da sie unverändert sind. + def _load_type_registry(self) -> dict: path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml") if not os.path.exists(path): @@ -112,24 +139,22 @@ class DiscoveryService: types_cfg = self.registry.get("types", {}) type_def = types_cfg.get(note_type, {}) defaults = type_def.get("edge_defaults") - if defaults and isinstance(defaults, list) and len(defaults) > 0: - return defaults[0] - return "related_to" + return defaults[0] if defaults else "related_to" def _fetch_all_titles_and_aliases(self) -> List[Dict]: notes = [] next_page = None - col_name = f"{self.prefix}_notes" + col = f"{self.prefix}_notes" try: while True: - res, next_page = self.client.scroll(collection_name=col_name, limit=1000, offset=next_page, with_payload=True, with_vectors=False) + res, next_page = self.client.scroll(collection_name=col, limit=1000, offset=next_page, with_payload=True, with_vectors=False) for point in res: pl = point.payload or {} aliases = pl.get("aliases") or [] if isinstance(aliases, str): aliases = [aliases] notes.append({"id": pl.get("note_id"), "title": pl.get("title"), "aliases": aliases}) if next_page is None: break - except Exception: return [] + except Exception: pass return notes def _find_entities_in_text(self, text: str, entities: List[Dict]) -> List[Dict]: @@ -140,9 +165,8 @@ class DiscoveryService: if title and title.lower() in text_lower: found.append({"match": title, "title": title, "id": entity["id"]}) continue - aliases = entity.get("aliases", []) - for alias in aliases: - if alias and str(alias).lower() in text_lower: + for alias in entity.get("aliases", []): + if str(alias).lower() in text_lower: found.append({"match": alias, "title": title, "id": entity["id"]}) break return found \ No newline at end of file diff --git a/app/services/embeddings_client.py b/app/services/embeddings_client.py index ea7a7fb..090959e 100644 --- a/app/services/embeddings_client.py +++ b/app/services/embeddings_client.py @@ -1,19 +1,9 @@ """ -app/services/embeddings_client.py — Text→Embedding Service +app/services/embeddings_client.py +Client für die Vektorisierung von Texten via Ollama API. -Zweck: - Liefert Embeddings für Textqueries. - - Legacy Mode (Sync): Nutzt lokal Sentence-Transformers (CPU). - - Modern Mode (Async/Class): Nutzt Ollama API (HTTP) für Non-Blocking Operations (WP-11). - -Kompatibilität: - Python 3.12+, sentence-transformers 5.x, httpx -Version: - 0.2.0 (Erweitert um Async EmbeddingsClient) -Stand: - 2025-12-11 +Version: 2.4.0 (Async + Dedicated Embedding Model Support) """ - from __future__ import annotations import os import logging @@ -24,45 +14,53 @@ from app.config import get_settings logger = logging.getLogger(__name__) -# ============================================================================== -# TEIL 1: NEUE ASYNC KLASSE (Für Ingestion API / WP-11) -# ============================================================================== - class EmbeddingsClient: """ - Async Client für Embeddings via Ollama (oder kompatible APIs). - Verhindert das Blockieren des Event-Loops bei schweren Berechnungen. + Async Client für Embeddings via Ollama. + Trennt Chat-Modell (Generation) von Embedding-Modell (Semantik). """ def __init__(self): self.settings = get_settings() - # Fallback auf Environment Variablen, falls Settings nicht geladen self.base_url = os.getenv("MINDNET_OLLAMA_URL", "http://127.0.0.1:11434") - # Nutze explizites Embedding Modell oder Fallback auf LLM Modell - self.model = os.getenv("MINDNET_EMBEDDING_MODEL", os.getenv("MINDNET_LLM_MODEL", "phi3:mini")) + + # Lese Konfiguration für spezialisiertes Embedding-Modell + self.model = os.getenv("MINDNET_EMBEDDING_MODEL") + + # Fallback auf LLM, falls kein Embedding-Modell gesetzt (nicht empfohlen für Prod) + if not self.model: + self.model = os.getenv("MINDNET_LLM_MODEL", "phi3:mini") + logger.warning(f"No MINDNET_EMBEDDING_MODEL set. Falling back to LLM '{self.model}'. Quality might suffer.") + else: + logger.info(f"EmbeddingsClient initialized with model: {self.model}") async def embed_query(self, text: str) -> List[float]: - """Erzeugt Embedding für einen einzelnen Text.""" + """ + Erzeugt Embedding für einen einzelnen Text (z.B. Suchanfrage). + """ return await self._request_embedding(text) async def embed_documents(self, texts: List[str]) -> List[List[float]]: """ - Erzeugt Embeddings für eine Liste von Texten. - Nutzt eine Session für effizientere Requests. + Erzeugt Embeddings für eine Liste von Texten (z.B. Chunks beim Import). + Nutzt eine persistente Session für Performance. """ vectors = [] - async with httpx.AsyncClient(timeout=60.0) as client: + # Timeout erhöht für Batch-Processing + async with httpx.AsyncClient(timeout=120.0) as client: for text in texts: vec = await self._request_embedding_with_client(client, text) vectors.append(vec) return vectors async def _request_embedding(self, text: str) -> List[float]: - """Interne Hilfsmethode für Single-Request (One-off Client).""" + """Interne Hilfsmethode für Single-Request.""" async with httpx.AsyncClient(timeout=30.0) as client: return await self._request_embedding_with_client(client, text) async def _request_embedding_with_client(self, client: httpx.AsyncClient, text: str) -> List[float]: - """Führt den eigentlichen Request gegen Ollama aus.""" + """ + Führt den eigentlichen HTTP-Request gegen Ollama aus. + """ if not text or not text.strip(): return [] @@ -75,46 +73,39 @@ class EmbeddingsClient: "prompt": text } ) + + if response.status_code == 404: + logger.error(f"Model '{self.model}' not found in Ollama. Run: ollama pull {self.model}") + return [] + response.raise_for_status() data = response.json() return data.get("embedding", []) + except Exception as e: - logger.error(f"Embedding error (Ollama) for model {self.model}: {e}") - # Fallback: Leere Liste, damit der Prozess nicht crasht (wird vom Caller gefiltert) + logger.error(f"Embedding error (Model: {self.model}): {e}") + # Wir geben eine leere Liste zurück, damit der Batch-Prozess nicht komplett crasht. + # Der Aufrufer (IngestionService) muss prüfen, ob Vektor leer ist. return [] - -# ============================================================================== -# TEIL 2: LEGACY FUNKTIONEN (Für bestehende Sync-Module / CLI) -# ============================================================================== - -# Lazy import, damit Testläufe ohne Modell-Laden schnell sind -def _load_model(): - # Performance-Warnung loggen, da dies viel RAM braucht - logger.info("Loading local SentenceTransformer model (Legacy Mode)...") - from sentence_transformers import SentenceTransformer # import hier, nicht top-level - s = get_settings() - return SentenceTransformer(s.MODEL_NAME, device="cpu") +# --- LEGACY SUPPORT (Synchron) --- +# Wird nur noch von alten Skripten oder Tests ohne Async-Support genutzt. @lru_cache(maxsize=1) -def _cached_model(): - return _load_model() +def _cached_legacy_model(): + from sentence_transformers import SentenceTransformer + s = get_settings() + # Hier nutzen wir das Modell aus den Settings, meist CPU-basiert + return SentenceTransformer(s.MODEL_NAME, device="cpu") def embed_text(text: str) -> List[float]: """ - LEGACY: Erzeugt einen Vektor synchron via Sentence-Transformers. - Wird u.a. vom Retriever oder alten CLI-Skripten genutzt. + LEGACY: Synchrones Embedding via SentenceTransformers (CPU). """ if not text or not text.strip(): - # Um Konsistenz mit neuer Klasse zu wahren, loggen wir Warnung statt Error - # raise ValueError("embed_text: leerer Text") -> Veraltet - logger.warning("embed_text called with empty string") return [] - try: - model = _cached_model() - vec = model.encode([text], normalize_embeddings=True)[0] - return vec.astype(float).tolist() + return _cached_legacy_model().encode([text], normalize_embeddings=True)[0].tolist() except Exception as e: logger.error(f"Legacy embed_text failed: {e}") return [] \ No newline at end of file