update aus WP11

This commit is contained in:
Lars 2025-12-11 13:50:40 +01:00
parent 9d792e11ce
commit 52210a91fd
3 changed files with 147 additions and 181 deletions

View File

@ -1,6 +1,7 @@
"""
app/routers/ingest.py
API-Endpunkte für WP-11 (Discovery & Persistence).
Delegiert an Services.
"""
import os
import time
@ -10,17 +11,13 @@ from pydantic import BaseModel
from typing import Optional, Dict, Any
from app.core.ingestion import IngestionService
# Fallback: Falls DiscoveryService noch fehlt, nutzen wir Ingest Service Features oder Mock
# Wir gehen hier davon aus, dass wir alles im IngestionService oder Router machen können,
# um Importfehler zu vermeiden.
from app.core.retriever import Retriever
from app.models.dto import QueryRequest
from app.services.discovery import DiscoveryService
logger = logging.getLogger(__name__)
router = APIRouter()
# --- DTOs ---
# Services Init (Global oder via Dependency Injection)
discovery_service = DiscoveryService()
class AnalyzeRequest(BaseModel):
text: str
@ -37,102 +34,56 @@ class SaveResponse(BaseModel):
note_id: str
stats: Dict[str, Any]
# --- Endpoints ---
@router.post("/analyze")
async def analyze_draft(req: AnalyzeRequest):
"""
WP-11 Intelligence: Liefert Link-Vorschläge.
Implementiert direkt hier, um Abhängigkeiten zu reduzieren.
WP-11 Intelligence: Liefert Link-Vorschläge via DiscoveryService.
"""
try:
retriever = Retriever()
suggestions = []
query_text = req.text[:400]
if not query_text.strip():
return {"suggestions": []}
# 1. Semantic Search
# Safe async call check
if hasattr(retriever.search, '__await__'):
hits_result = await retriever.search(QueryRequest(query=query_text, top_k=5, mode="hybrid"))
else:
hits_result = await retriever.search(QueryRequest(query=query_text, top_k=5, mode="hybrid"))
seen_titles = set()
for hit in hits_result.results:
# Titel ermitteln
title = hit.payload.get("note_id") or hit.node_id
if not title or title in seen_titles: continue
seen_titles.add(title)
edge_kind = "related_to"
if req.type == "project": edge_kind = "depends_on"
if req.type == "decision": edge_kind = "references"
# Score Threshold
if hit.total_score > 0.4: # Etwas toleranter
suggestions.append({
"target_title": title,
"target_id": hit.node_id,
"suggested_markdown": f"[[rel:{edge_kind} {title}]]",
"reason": f"Semantisch ähnlich ({hit.total_score:.2f})",
"type": "semantic"
})
return {"suggestions": suggestions}
# Hier rufen wir jetzt den verbesserten Service auf
result = await discovery_service.analyze_draft(req.text, req.type)
return result
except Exception as e:
logger.error(f"Analyze failed: {e}", exc_info=True)
# Kein 500er werfen, lieber leere Liste, damit UI nicht crasht
return {"suggestions": [], "error": str(e)}
@router.post("/save", response_model=SaveResponse)
async def save_note(req: SaveRequest):
"""
WP-11 Persistence: Speichert Markdown physisch und indiziert es sofort.
WP-11 Persistence: Speichert und indiziert.
"""
try:
vault_root = os.getenv("MINDNET_VAULT_ROOT", "./vault")
abs_vault_root = os.path.abspath(vault_root)
if not os.path.exists(abs_vault_root):
os.makedirs(abs_vault_root, exist_ok=True)
final_filename = req.filename
if not final_filename:
final_filename = f"draft_{int(time.time())}.md"
try: os.makedirs(abs_vault_root, exist_ok=True)
except: pass
final_filename = req.filename or f"draft_{int(time.time())}.md"
ingest_service = IngestionService()
logger.info(f"Saving {final_filename} to {req.folder}")
# --- AWAIT WICHTIG! ---
# Wir rufen save_and_index auf (so hieß es in meiner IngestionService Implementierung)
# Wenn deine Methode create_from_text heißt, ändere es hier entsprechend.
# Ich nutze hier save_and_index als Standard aus WP-11.
# Async Call
result = await ingest_service.create_from_text(
markdown_content=req.markdown_content,
filename=final_filename,
vault_root=abs_vault_root,
folder=req.folder
)
if hasattr(ingest_service, 'save_and_index'):
result = await ingest_service.save_and_index(req.markdown_content, final_filename)
elif hasattr(ingest_service, 'create_from_text'):
# Fallback falls du die alte Version hast
result = await ingest_service.create_from_text(req.markdown_content, final_filename, abs_vault_root, req.folder)
else:
raise RuntimeError("IngestionService hat weder save_and_index noch create_from_text")
if result.get("status") == "error":
raise HTTPException(status_code=500, detail=result.get("error"))
return SaveResponse(
status="success",
file_path=result.get("file_path") or result.get("path", "unknown"),
file_path=result.get("path", "unknown"),
note_id=result.get("note_id", "unknown"),
stats=result.get("stats", {})
stats={
"chunks": result.get("chunks_count", 0),
"edges": result.get("edges_count", 0)
}
)
except HTTPException as he:
raise he
except HTTPException as he: raise he
except Exception as e:
logger.error(f"Save failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Save failed: {str(e)}")

View File

@ -1,14 +1,12 @@
"""
app/services/discovery.py
Service für Link-Vorschläge und Knowledge-Discovery (WP-11).
Adaptiert für Async-Architecture (v2.4).
Updated for WP-11: Sliding Window Analysis.
"""
import logging
import os
import asyncio
from typing import List, Dict, Any
import yaml
# Wir nutzen hier weiterhin die Low-Level Funktionen, da diese stabil sind
from app.core.qdrant import QdrantConfig, get_client
from app.models.dto import QueryRequest
from app.core.retriever import hybrid_retrieve
@ -24,81 +22,110 @@ class DiscoveryService:
async def analyze_draft(self, text: str, current_type: str) -> Dict[str, Any]:
"""
Analysiert einen Draft-Text und schlägt Verlinkungen vor.
Kombiniert Exact Match (Titel/Alias) und Semantic Match.
Analysiert den Draft mit Sliding Window Strategie.
"""
suggestions = []
default_edge_type = self._get_default_edge_type(current_type)
# 1. Exact Match: Finde Begriffe im Text, die als Notiz-Titel existieren
# (Dies läuft synchron, ist aber sehr schnell durch Qdrant Scroll)
# 1. Exact Match (Läuft über gesamten Text, schnell genug)
known_entities = self._fetch_all_titles_and_aliases()
found_entities = self._find_entities_in_text(text, known_entities)
existing_target_ids = set()
for entity in found_entities:
existing_target_ids.add(entity["id"])
target_title = entity["title"]
suggested_md = f"[[rel:{default_edge_type} {target_title}]]"
suggestions.append({
"type": "exact_match",
"text_found": entity["match"],
"target_title": target_title,
"target_title": entity["title"],
"target_id": entity["id"],
"suggested_edge_type": default_edge_type,
"suggested_markdown": suggested_md,
"suggested_markdown": f"[[rel:{default_edge_type} {entity['title']}]]",
"confidence": 1.0,
"reason": f"Exakter Treffer (Default für '{current_type}': {default_edge_type})"
"reason": f"Exakter Treffer ({entity['match']})"
})
# 2. Semantic Match: Finde inhaltlich ähnliche Notizen
# Wir filtern Ergebnisse heraus, die wir schon per Exact Match gefunden haben.
semantic_hits = await self._get_semantic_suggestions_async(text)
# 2. Semantic Match (Sliding Window)
# Wir zerlegen den Text in relevante Chunks, um Token-Limits zu umgehen
# und Fokus zu streuen.
search_queries = self._generate_search_queries(text)
for hit in semantic_hits:
if hit.node_id in existing_target_ids:
continue
if hit.total_score > 0.65:
# FIX: Titel aus Payload lesen, nicht ID!
target_title = hit.payload.get("title") or hit.node_id
# Parallel Execution für alle Queries
tasks = [self._get_semantic_suggestions_async(q) for q in search_queries]
results_list = await asyncio.gather(*tasks)
# Ergebnisse mergen und deduplizieren
seen_semantic_ids = set()
for hits in results_list:
for hit in hits:
if hit.node_id in existing_target_ids or hit.node_id in seen_semantic_ids:
continue
suggested_md = f"[[rel:{default_edge_type} {target_title}]]"
suggestions.append({
"type": "semantic_match",
"text_found": (hit.source.get("text") or "")[:50] + "...",
"target_title": target_title,
"target_id": hit.node_id,
"suggested_edge_type": default_edge_type,
"suggested_markdown": suggested_md,
"confidence": round(hit.total_score, 2),
"reason": f"Semantische Ähnlichkeit ({hit.total_score:.2f})"
})
# Threshold Tuning: Bei 'nomic' sind Scores oft niedriger (0.4-0.6 ist schon gut)
# Wir setzen ihn moderat auf 0.50
if hit.total_score > 0.50:
seen_semantic_ids.add(hit.node_id)
target_title = hit.payload.get("title") or hit.node_id
suggestions.append({
"type": "semantic_match",
"text_found": (hit.source.get("text") or "")[:60] + "...",
"target_title": target_title,
"target_id": hit.node_id,
"suggested_edge_type": default_edge_type,
"suggested_markdown": f"[[rel:{default_edge_type} {target_title}]]",
"confidence": round(hit.total_score, 2),
"reason": f"Semantisch ähnlich ({hit.total_score:.2f})"
})
# Sortieren nach Confidence
suggestions.sort(key=lambda x: x["confidence"], reverse=True)
return {
"draft_length": len(text),
"suggestions_count": len(suggestions),
"suggestions": suggestions
"suggestions": suggestions[:10] # Limit auf Top 10
}
# --- Helpers ---
def _generate_search_queries(self, text: str) -> List[str]:
"""
Zerlegt den Text in bis zu 3 Such-Queries:
1. Der Anfang (Kontext/Einleitung)
2. Die Mitte (Details)
3. Das Ende (Fazit/Zusammenfassung)
"""
if len(text) < 600:
return [text]
queries = []
# Query 1: Die ersten 400 Zeichen
queries.append(text[:400])
# Query 2: Ein Fenster aus der Mitte
mid = len(text) // 2
queries.append(text[mid-200 : mid+200])
# Query 3: Die letzten 400 Zeichen
if len(text) > 800:
queries.append(text[-400:])
return queries
async def _get_semantic_suggestions_async(self, text: str):
"""Async Wrapper um den Hybrid Retriever."""
# Nutzt hybrid_retrieve (sync), aber hier in Async Context okay
req = QueryRequest(query=text, top_k=5, explain=False)
try:
# Da hybrid_retrieve (noch) sync ist, rufen wir es direkt auf.
# In einer voll-async Umgebung würde man dies in einen Thread-Pool auslagern,
# aber da Qdrant-Client sync ist, ist das hier okay.
res = hybrid_retrieve(req)
return res.results
except Exception as e:
logger.error(f"Semantic suggestion failed: {e}")
return []
except Exception: return []
# ... (Restliche Methoden wie _fetch_all_titles_and_aliases bleiben gleich) ...
# Füge hier die Methoden aus dem vorherigen Artefakt ein (fetch_all..., find_entities..., load_type...)
# Der Kürze halber lasse ich sie im Snippet weg, da sie unverändert sind.
def _load_type_registry(self) -> dict:
path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
if not os.path.exists(path):
@ -112,24 +139,22 @@ class DiscoveryService:
types_cfg = self.registry.get("types", {})
type_def = types_cfg.get(note_type, {})
defaults = type_def.get("edge_defaults")
if defaults and isinstance(defaults, list) and len(defaults) > 0:
return defaults[0]
return "related_to"
return defaults[0] if defaults else "related_to"
def _fetch_all_titles_and_aliases(self) -> List[Dict]:
notes = []
next_page = None
col_name = f"{self.prefix}_notes"
col = f"{self.prefix}_notes"
try:
while True:
res, next_page = self.client.scroll(collection_name=col_name, limit=1000, offset=next_page, with_payload=True, with_vectors=False)
res, next_page = self.client.scroll(collection_name=col, limit=1000, offset=next_page, with_payload=True, with_vectors=False)
for point in res:
pl = point.payload or {}
aliases = pl.get("aliases") or []
if isinstance(aliases, str): aliases = [aliases]
notes.append({"id": pl.get("note_id"), "title": pl.get("title"), "aliases": aliases})
if next_page is None: break
except Exception: return []
except Exception: pass
return notes
def _find_entities_in_text(self, text: str, entities: List[Dict]) -> List[Dict]:
@ -140,9 +165,8 @@ class DiscoveryService:
if title and title.lower() in text_lower:
found.append({"match": title, "title": title, "id": entity["id"]})
continue
aliases = entity.get("aliases", [])
for alias in aliases:
if alias and str(alias).lower() in text_lower:
for alias in entity.get("aliases", []):
if str(alias).lower() in text_lower:
found.append({"match": alias, "title": title, "id": entity["id"]})
break
return found

View File

@ -1,19 +1,9 @@
"""
app/services/embeddings_client.py TextEmbedding Service
app/services/embeddings_client.py
Client für die Vektorisierung von Texten via Ollama API.
Zweck:
Liefert Embeddings für Textqueries.
- Legacy Mode (Sync): Nutzt lokal Sentence-Transformers (CPU).
- Modern Mode (Async/Class): Nutzt Ollama API (HTTP) für Non-Blocking Operations (WP-11).
Kompatibilität:
Python 3.12+, sentence-transformers 5.x, httpx
Version:
0.2.0 (Erweitert um Async EmbeddingsClient)
Stand:
2025-12-11
Version: 2.4.0 (Async + Dedicated Embedding Model Support)
"""
from __future__ import annotations
import os
import logging
@ -24,45 +14,53 @@ from app.config import get_settings
logger = logging.getLogger(__name__)
# ==============================================================================
# TEIL 1: NEUE ASYNC KLASSE (Für Ingestion API / WP-11)
# ==============================================================================
class EmbeddingsClient:
"""
Async Client für Embeddings via Ollama (oder kompatible APIs).
Verhindert das Blockieren des Event-Loops bei schweren Berechnungen.
Async Client für Embeddings via Ollama.
Trennt Chat-Modell (Generation) von Embedding-Modell (Semantik).
"""
def __init__(self):
self.settings = get_settings()
# Fallback auf Environment Variablen, falls Settings nicht geladen
self.base_url = os.getenv("MINDNET_OLLAMA_URL", "http://127.0.0.1:11434")
# Nutze explizites Embedding Modell oder Fallback auf LLM Modell
self.model = os.getenv("MINDNET_EMBEDDING_MODEL", os.getenv("MINDNET_LLM_MODEL", "phi3:mini"))
# Lese Konfiguration für spezialisiertes Embedding-Modell
self.model = os.getenv("MINDNET_EMBEDDING_MODEL")
# Fallback auf LLM, falls kein Embedding-Modell gesetzt (nicht empfohlen für Prod)
if not self.model:
self.model = os.getenv("MINDNET_LLM_MODEL", "phi3:mini")
logger.warning(f"No MINDNET_EMBEDDING_MODEL set. Falling back to LLM '{self.model}'. Quality might suffer.")
else:
logger.info(f"EmbeddingsClient initialized with model: {self.model}")
async def embed_query(self, text: str) -> List[float]:
"""Erzeugt Embedding für einen einzelnen Text."""
"""
Erzeugt Embedding für einen einzelnen Text (z.B. Suchanfrage).
"""
return await self._request_embedding(text)
async def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""
Erzeugt Embeddings für eine Liste von Texten.
Nutzt eine Session für effizientere Requests.
Erzeugt Embeddings für eine Liste von Texten (z.B. Chunks beim Import).
Nutzt eine persistente Session für Performance.
"""
vectors = []
async with httpx.AsyncClient(timeout=60.0) as client:
# Timeout erhöht für Batch-Processing
async with httpx.AsyncClient(timeout=120.0) as client:
for text in texts:
vec = await self._request_embedding_with_client(client, text)
vectors.append(vec)
return vectors
async def _request_embedding(self, text: str) -> List[float]:
"""Interne Hilfsmethode für Single-Request (One-off Client)."""
"""Interne Hilfsmethode für Single-Request."""
async with httpx.AsyncClient(timeout=30.0) as client:
return await self._request_embedding_with_client(client, text)
async def _request_embedding_with_client(self, client: httpx.AsyncClient, text: str) -> List[float]:
"""Führt den eigentlichen Request gegen Ollama aus."""
"""
Führt den eigentlichen HTTP-Request gegen Ollama aus.
"""
if not text or not text.strip():
return []
@ -75,46 +73,39 @@ class EmbeddingsClient:
"prompt": text
}
)
if response.status_code == 404:
logger.error(f"Model '{self.model}' not found in Ollama. Run: ollama pull {self.model}")
return []
response.raise_for_status()
data = response.json()
return data.get("embedding", [])
except Exception as e:
logger.error(f"Embedding error (Ollama) for model {self.model}: {e}")
# Fallback: Leere Liste, damit der Prozess nicht crasht (wird vom Caller gefiltert)
logger.error(f"Embedding error (Model: {self.model}): {e}")
# Wir geben eine leere Liste zurück, damit der Batch-Prozess nicht komplett crasht.
# Der Aufrufer (IngestionService) muss prüfen, ob Vektor leer ist.
return []
# ==============================================================================
# TEIL 2: LEGACY FUNKTIONEN (Für bestehende Sync-Module / CLI)
# ==============================================================================
# Lazy import, damit Testläufe ohne Modell-Laden schnell sind
def _load_model():
# Performance-Warnung loggen, da dies viel RAM braucht
logger.info("Loading local SentenceTransformer model (Legacy Mode)...")
from sentence_transformers import SentenceTransformer # import hier, nicht top-level
s = get_settings()
return SentenceTransformer(s.MODEL_NAME, device="cpu")
# --- LEGACY SUPPORT (Synchron) ---
# Wird nur noch von alten Skripten oder Tests ohne Async-Support genutzt.
@lru_cache(maxsize=1)
def _cached_model():
return _load_model()
def _cached_legacy_model():
from sentence_transformers import SentenceTransformer
s = get_settings()
# Hier nutzen wir das Modell aus den Settings, meist CPU-basiert
return SentenceTransformer(s.MODEL_NAME, device="cpu")
def embed_text(text: str) -> List[float]:
"""
LEGACY: Erzeugt einen Vektor synchron via Sentence-Transformers.
Wird u.a. vom Retriever oder alten CLI-Skripten genutzt.
LEGACY: Synchrones Embedding via SentenceTransformers (CPU).
"""
if not text or not text.strip():
# Um Konsistenz mit neuer Klasse zu wahren, loggen wir Warnung statt Error
# raise ValueError("embed_text: leerer Text") -> Veraltet
logger.warning("embed_text called with empty string")
return []
try:
model = _cached_model()
vec = model.encode([text], normalize_embeddings=True)[0]
return vec.astype(float).tolist()
return _cached_legacy_model().encode([text], normalize_embeddings=True)[0].tolist()
except Exception as e:
logger.error(f"Legacy embed_text failed: {e}")
return []