From e7cec6acc651ac9e8c5ca973c9edfd3cc16f6bc4 Mon Sep 17 00:00:00 2001 From: Lars Date: Thu, 11 Dec 2025 10:23:23 +0100 Subject: [PATCH] new ingestion, asyn writing, robust --- app/core/ingestion.py | 91 ++++++++++++++++++++++--------------------- 1 file changed, 46 insertions(+), 45 deletions(-) diff --git a/app/core/ingestion.py b/app/core/ingestion.py index 80f9398..6790260 100644 --- a/app/core/ingestion.py +++ b/app/core/ingestion.py @@ -3,13 +3,14 @@ app/core/ingestion.py Zentraler Service für die Transformation von Markdown-Dateien in Qdrant-Objekte (Notes, Chunks, Edges). Dient als Shared Logic für: -1. CLI-Imports (scripts/import_markdown.py) +1. CLI-Imports (scripts/import_markdown.py) - muss ggf. angepasst werden auf Async! 2. API-Uploads (WP-11) """ import os -import json -from typing import Dict, List, Optional, Tuple, Any, Set +import logging +from typing import Dict, List, Optional, Tuple, Any +# Core Module Imports from app.core.parser import ( read_markdown, normalize_frontmatter, @@ -18,10 +19,12 @@ from app.core.parser import ( from app.core.note_payload import make_note_payload from app.core.chunker import assemble_chunks from app.core.chunk_payload import make_chunk_payloads -# Fallback Imports wie im Original-Skript + +# Fallback für Edges Import try: from app.core.derive_edges import build_edges_for_note except ImportError: + # Fallback falls Dateiname anders ist from app.core.edges import build_edges_for_note # type: ignore from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes @@ -32,18 +35,16 @@ from app.core.qdrant_points import ( upsert_batch, ) -# Optionales Embedding -try: - from app.core.embed import embed_texts -except ImportError: - embed_texts = None +# WICHTIG: Wir nutzen den API-Client für Embeddings +from app.services.embeddings_client import EmbeddingsClient -# --- Helper für Type-Registry (ausgelagert aus Script) --- +logger = logging.getLogger(__name__) + +# --- Helper für Type-Registry --- def load_type_registry(custom_path: Optional[str] = None) -> dict: import yaml path = custom_path or os.getenv("MINDNET_TYPES_FILE", "config/types.yaml") if not os.path.exists(path): - # Fallback auf Root-Ebene (für Tests/CLI) if os.path.exists("types.yaml"): path = "types.yaml" else: @@ -58,14 +59,12 @@ def resolve_note_type(requested: Optional[str], reg: dict) -> str: types = reg.get("types", {}) if requested and requested in types: return requested - return "concept" # Default Fallback + return "concept" def effective_chunk_profile(note_type: str, reg: dict) -> str: - # 1. Specific Type t_cfg = reg.get("types", {}).get(note_type, {}) if t_cfg and t_cfg.get("chunk_profile"): return t_cfg.get("chunk_profile") - # 2. Defaults return reg.get("defaults", {}).get("chunk_profile", "default") def effective_retriever_weight(note_type: str, reg: dict) -> float: @@ -79,18 +78,21 @@ class IngestionService: def __init__(self, collection_prefix: str = "mindnet"): self.prefix = collection_prefix self.cfg = QdrantConfig.from_env() - self.cfg.prefix = collection_prefix # Override env if needed + self.cfg.prefix = collection_prefix self.client = get_client(self.cfg) self.dim = self.cfg.dim # Registry laden self.registry = load_type_registry() + # Embedding Service initialisieren + self.embedder = EmbeddingsClient() + # Init DB Checks ensure_collections(self.client, self.prefix, self.dim) ensure_payload_indexes(self.client, self.prefix) - def process_file( + async def process_file( self, file_path: str, vault_root: str, @@ -103,8 +105,7 @@ class IngestionService: hash_normalize: str = "canonical" ) -> Dict[str, Any]: """ - Verarbeitet eine einzelne Datei. - Return: Summary Dict (Erfolg, Änderungen, Stats). + Verarbeitet eine einzelne Datei (ASYNC). """ result = { "path": file_path, @@ -129,7 +130,6 @@ class IngestionService: fm["type"] = note_type fm["chunk_profile"] = effective_chunk_profile(note_type, self.registry) - # Weight Resolution (Frontmatter override > Registry) weight = fm.get("retriever_weight") if weight is None: weight = effective_retriever_weight(note_type, self.registry) @@ -145,7 +145,6 @@ class IngestionService: hash_source=hash_source, file_path=file_path ) - # Ensure fulltext & weight if not note_pl.get("fulltext"): note_pl["fulltext"] = getattr(parsed, "body", "") or "" note_pl["retriever_weight"] = fm["retriever_weight"] @@ -154,21 +153,17 @@ class IngestionService: except Exception as e: return {**result, "error": f"Payload build failed: {str(e)}"} - # 4. Change Detection (Hash Check) - # Wir holen den alten Payload aus Qdrant, wenn wir nicht forcen + # 4. Change Detection old_payload = None if not force_replace: old_payload = self._fetch_note_payload(note_id) has_old = old_payload is not None key_current = f"{hash_mode}:{hash_source}:{hash_normalize}" - old_hash = (old_payload or {}).get("hashes", {}).get(key_current) new_hash = note_pl.get("hashes", {}).get(key_current) hash_changed = (old_hash != new_hash) - - # Artefakte prüfen (Chunks/Edges) chunks_missing, edges_missing = self._artifacts_missing(note_id) should_write = force_replace or (not has_old) or hash_changed or chunks_missing or edges_missing @@ -185,14 +180,29 @@ class IngestionService: chunks = assemble_chunks(fm["id"], body_text, fm["type"]) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text) - # Embeddings + # --- EMBEDDING FIX --- vecs = [] - if embed_texts and chunk_pls: + if chunk_pls: texts = [c.get("window") or c.get("text") or "" for c in chunk_pls] - vecs = embed_texts(texts) - else: - vecs = [[0.0] * self.dim for _ in chunk_pls] - + try: + # Async Aufruf des Embedders + if hasattr(self.embedder, 'embed_documents'): + vecs = await self.embedder.embed_documents(texts) + else: + # Fallback Loop falls Client kein Batch unterstützt + for t in texts: + v = await self.embedder.embed_query(t) + vecs.append(v) + + # Validierung der Dimensionen + if vecs and len(vecs) > 0: + dim_got = len(vecs[0]) + if dim_got != self.dim: + raise ValueError(f"Vector dimension mismatch. Expected {self.dim}, got {dim_got}") + except Exception as e: + logger.error(f"Embedding generation failed: {e}") + raise RuntimeError(f"Embedding failed: {e}") + # Edges note_refs = note_pl.get("references") or [] edges = build_edges_for_note( @@ -208,16 +218,13 @@ class IngestionService: if purge_before and has_old: self._purge_artifacts(note_id) - # Upsert Note n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) upsert_batch(self.client, n_name, n_pts) - # Upsert Chunks if chunk_pls: c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs) upsert_batch(self.client, c_name, c_pts) - # Upsert Edges if edges: e_name, e_pts = points_for_edges(self.prefix, edges) upsert_batch(self.client, e_name, e_pts) @@ -245,12 +252,8 @@ class IngestionService: c_col = f"{self.prefix}_chunks" e_col = f"{self.prefix}_edges" f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - - # Check Chunks c_pts, _ = self.client.scroll(collection_name=c_col, scroll_filter=f, limit=1) - # Check Edges e_pts, _ = self.client.scroll(collection_name=e_col, scroll_filter=f, limit=1) - return (not bool(c_pts)), (not bool(e_pts)) def _purge_artifacts(self, note_id: str): @@ -263,7 +266,7 @@ class IngestionService: except Exception: pass - def create_from_text( + async def create_from_text( self, markdown_content: str, filename: str, @@ -272,20 +275,18 @@ class IngestionService: ) -> Dict[str, Any]: """ WP-11 Persistence: Schreibt Text sicher und indiziert ihn. - Erstellt Verzeichnisse automatisch. """ - # 1. Zielordner vorbereiten + # 1. Zielordner target_dir = os.path.join(vault_root, folder) try: os.makedirs(target_dir, exist_ok=True) except Exception as e: return {"status": "error", "error": f"Could not create folder {target_dir}: {e}"} - # 2. Dateiname bereinigen + # 2. Dateiname safe_filename = os.path.basename(filename) if not safe_filename.endswith(".md"): safe_filename += ".md" - file_path = os.path.join(target_dir, safe_filename) # 3. Schreiben @@ -295,8 +296,8 @@ class IngestionService: except Exception as e: return {"status": "error", "error": f"Disk write failed at {file_path}: {str(e)}"} - # 4. Indizieren (Single File Upsert) - return self.process_file( + # 4. Indizieren (Async Aufruf!) + return await self.process_file( file_path=file_path, vault_root=vault_root, apply=True,