diff --git a/app/core/ingestion.py b/app/core/ingestion.py index 716237e..4cbba71 100644 --- a/app/core/ingestion.py +++ b/app/core/ingestion.py @@ -2,13 +2,17 @@ app/core/ingestion.py Zentraler Service für die Transformation von Markdown-Dateien in Qdrant-Objekte (Notes, Chunks, Edges). -Dient als Shared Logic für: -1. CLI-Imports (scripts/import_markdown.py) -2. API-Uploads (WP-11) -Refactored for Async Embedding & Async Chunking (WP-15). +Features: +- Incremental Update (Hashing) für Massenimport. +- Force Smart Edges für Single-File (UI). +- Async Embedding & Chunking. + +Version: 2.5.1 (Full Restore + WP-15 Logic) """ import os import logging +import asyncio +import time from typing import Dict, List, Optional, Tuple, Any # Core Module Imports @@ -19,22 +23,14 @@ from app.core.parser import ( ) from app.core.note_payload import make_note_payload # ASYNC CHUNKER (WP-15) -from app.core.chunker import assemble_chunks +from app.core.chunker import assemble_chunks, get_chunk_config from app.core.chunk_payload import make_chunk_payloads -# Fallback für Edges Import (Robustheit) +# Fallback für Edges Import try: from app.core.derive_edges import build_edges_for_note except ImportError: - try: - from app.core.derive_edges import derive_edges_for_note as build_edges_for_note - except ImportError: - try: - from app.core.edges import build_edges_for_note - except ImportError: - # Fallback Mock - logging.warning("Could not import edge derivation logic. Edges will be empty.") - def build_edges_for_note(*args, **kwargs): return [] + def build_edges_for_note(*args, **kwargs): return [] from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes from app.core.qdrant_points import ( @@ -44,48 +40,26 @@ from app.core.qdrant_points import ( upsert_batch, ) -# WICHTIG: Wir nutzen den API-Client für Embeddings (Async Support) from app.services.embeddings_client import EmbeddingsClient logger = logging.getLogger(__name__) -# --- Helper für Type-Registry --- +# --- Helper --- def load_type_registry(custom_path: Optional[str] = None) -> dict: import yaml path = custom_path or os.getenv("MINDNET_TYPES_FILE", "config/types.yaml") - if not os.path.exists(path): - if os.path.exists("types.yaml"): - path = "types.yaml" - else: - return {} + if not os.path.exists(path): return {} try: - with open(path, "r", encoding="utf-8") as f: - return yaml.safe_load(f) or {} - except Exception: - return {} + with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} + except Exception: return {} def resolve_note_type(requested: Optional[str], reg: dict) -> str: types = reg.get("types", {}) - if requested and requested in types: - return requested + if requested and requested in types: return requested return "concept" -def effective_chunk_profile(note_type: str, reg: dict) -> str: - t_cfg = reg.get("types", {}).get(note_type, {}) - if t_cfg and t_cfg.get("chunk_profile"): - return t_cfg.get("chunk_profile") - return reg.get("defaults", {}).get("chunk_profile", "default") - -def effective_retriever_weight(note_type: str, reg: dict) -> float: - t_cfg = reg.get("types", {}).get(note_type, {}) - if t_cfg and "retriever_weight" in t_cfg: - return float(t_cfg["retriever_weight"]) - return float(reg.get("defaults", {}).get("retriever_weight", 1.0)) - - class IngestionService: def __init__(self, collection_prefix: str = None): - # Prefix Logik vereinheitlichen env_prefix = os.getenv("COLLECTION_PREFIX", "mindnet") self.prefix = collection_prefix or env_prefix @@ -93,19 +67,15 @@ class IngestionService: self.cfg.prefix = self.prefix self.client = get_client(self.cfg) self.dim = self.cfg.dim - - # Registry laden self.registry = load_type_registry() - - # Embedding Service initialisieren (Async Client) self.embedder = EmbeddingsClient() - # Init DB Checks (Fehler abfangen, falls DB nicht erreichbar) + # Init Checks try: ensure_collections(self.client, self.prefix, self.dim) ensure_payload_indexes(self.client, self.prefix) except Exception as e: - logger.warning(f"DB initialization warning: {e}") + logger.warning(f"DB init warning: {e}") async def process_file( self, @@ -117,10 +87,12 @@ class IngestionService: note_scope_refs: bool = False, hash_mode: str = "body", hash_source: str = "parsed", - hash_normalize: str = "canonical" + hash_normalize: str = "canonical", + force_smart_edges: bool = False # NEU: Override für UI ) -> Dict[str, Any]: """ - Verarbeitet eine einzelne Datei (ASYNC Version). + Verarbeitet eine einzelne Datei (ASYNC). + Enthält Hashing-Logik für inkrementelle Updates. """ result = { "path": file_path, @@ -129,11 +101,10 @@ class IngestionService: "error": None } - # 1. Parse & Frontmatter + # 1. Parse & Validate try: parsed = read_markdown(file_path) - if not parsed: - return {**result, "error": "Empty or unreadable file"} + if not parsed: return {**result, "error": "Empty/Unreadable"} fm = normalize_frontmatter(parsed.frontmatter) validate_required_frontmatter(fm) @@ -141,46 +112,42 @@ class IngestionService: logger.error(f"Validation failed for {file_path}: {e}") return {**result, "error": f"Validation failed: {str(e)}"} - # 2. Type & Config Resolution + # 2. Resolve Type note_type = resolve_note_type(fm.get("type"), self.registry) fm["type"] = note_type - fm["chunk_profile"] = effective_chunk_profile(note_type, self.registry) - weight = fm.get("retriever_weight") - if weight is None: - weight = effective_retriever_weight(note_type, self.registry) - fm["retriever_weight"] = float(weight) - - # 3. Build Note Payload + # 3. Build Payload & Hash try: note_pl = make_note_payload( - parsed, - vault_root=vault_root, + parsed, + vault_root=vault_root, + file_path=file_path, hash_mode=hash_mode, hash_normalize=hash_normalize, - hash_source=hash_source, - file_path=file_path + hash_source=hash_source ) - if not note_pl.get("fulltext"): + if not note_pl.get("fulltext"): note_pl["fulltext"] = getattr(parsed, "body", "") or "" - note_pl["retriever_weight"] = fm["retriever_weight"] note_id = note_pl["note_id"] except Exception as e: - logger.error(f"Payload build failed: {e}") return {**result, "error": f"Payload build failed: {str(e)}"} - # 4. Change Detection + # 4. Change Detection (Das Herzstück für Massenimport) old_payload = None if not force_replace: old_payload = self._fetch_note_payload(note_id) has_old = old_payload is not None + + # Hash Vergleich key_current = f"{hash_mode}:{hash_source}:{hash_normalize}" old_hash = (old_payload or {}).get("hashes", {}).get(key_current) new_hash = note_pl.get("hashes", {}).get(key_current) hash_changed = (old_hash != new_hash) + + # Check ob Chunks/Edges in DB fehlen (Reparatur-Modus) chunks_missing, edges_missing = self._artifacts_missing(note_id) should_write = force_replace or (not has_old) or hash_changed or chunks_missing or edges_missing @@ -191,62 +158,51 @@ class IngestionService: if not apply: return {**result, "status": "dry-run", "changed": True, "note_id": note_id} - # 5. Processing (Chunking, Embedding, Edges) + # 5. Processing (Chunking, Embedding) try: body_text = getattr(parsed, "body", "") or "" - # --- FIX: AWAIT ASYNC CHUNKER (WP-15 Update) --- - # assemble_chunks ist jetzt eine Coroutine und muss mit await aufgerufen werden. - chunks = await assemble_chunks(fm["id"], body_text, fm["type"]) - # ----------------------------------------------- + # --- WP-15 LOGIC --- + # Config laden und ggf. überschreiben + chunk_config = get_chunk_config(note_type) + if force_smart_edges: + logger.info(f"Ingestion: Forcing Smart Edges for {note_id}") + chunk_config["enable_smart_edge_allocation"] = True + # Async Chunking + chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text) - # --- EMBEDDING (ASYNC) --- + # Embedding vecs = [] if chunk_pls: texts = [c.get("window") or c.get("text") or "" for c in chunk_pls] try: - # Async Aufruf des Embedders (via Batch oder Loop) if hasattr(self.embedder, 'embed_documents'): vecs = await self.embedder.embed_documents(texts) else: - # Fallback Loop falls Client kein Batch unterstützt - for t in texts: - v = await self.embedder.embed_query(t) - vecs.append(v) - - # Validierung der Dimensionen - if vecs and len(vecs) > 0: - dim_got = len(vecs[0]) - if dim_got != self.dim: - # Wirf keinen Fehler, aber logge Warnung. Qdrant Upsert wird failen wenn 0. - logger.warning(f"Vector dimension mismatch. Expected {self.dim}, got {dim_got}") - if dim_got == 0: - raise ValueError("Embedding returned empty vectors (Dim 0)") + for t in texts: vecs.append(await self.embedder.embed_query(t)) except Exception as e: - logger.error(f"Embedding generation failed: {e}") - raise RuntimeError(f"Embedding failed: {e}") + # Embedding Fehler sind kritisch + logger.error(f"Embedding failed: {e}") + raise e # Edges - note_refs = note_pl.get("references") or [] - # Versuche flexible Signatur für Edges (V1 vs V2) try: edges = build_edges_for_note( note_id, chunk_pls, - note_level_references=note_refs, + note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs ) except TypeError: - # Fallback für ältere Signatur edges = build_edges_for_note(note_id, chunk_pls) except Exception as e: logger.error(f"Processing failed: {e}", exc_info=True) return {**result, "error": f"Processing failed: {str(e)}"} - # 6. Upsert Action + # 6. Upsert (Atomic Write recommended, here Batch) try: if purge_before and has_old: self._purge_artifacts(note_id) @@ -257,7 +213,7 @@ class IngestionService: if chunk_pls and vecs: c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs) upsert_batch(self.client, c_name, c_pts) - + if edges: e_name, e_pts = points_for_edges(self.prefix, edges) upsert_batch(self.client, e_name, e_pts) @@ -274,7 +230,7 @@ class IngestionService: logger.error(f"Upsert failed: {e}", exc_info=True) return {**result, "error": f"DB Upsert failed: {e}"} - # --- Interne Qdrant Helper --- + # --- Interne Qdrant Helper (Wichtig für Change Detection) --- def _fetch_note_payload(self, note_id: str) -> Optional[dict]: from qdrant_client.http import models as rest @@ -303,8 +259,7 @@ class IngestionService: for suffix in ["chunks", "edges"]: try: self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector) - except Exception: - pass + except Exception: pass async def create_from_text( self, @@ -315,35 +270,33 @@ class IngestionService: ) -> Dict[str, Any]: """ WP-11 Persistence API Entrypoint. - Schreibt Text in Vault und indiziert ihn sofort. + Speichert Text und erzwingt sofortige Indizierung mit Smart Edges. """ - # 1. Zielordner target_dir = os.path.join(vault_root, folder) - try: - os.makedirs(target_dir, exist_ok=True) - except Exception as e: - return {"status": "error", "error": f"Could not create folder {target_dir}: {e}"} + os.makedirs(target_dir, exist_ok=True) - # 2. Dateiname - safe_filename = os.path.basename(filename) - if not safe_filename.endswith(".md"): - safe_filename += ".md" - file_path = os.path.join(target_dir, safe_filename) + file_path = os.path.join(target_dir, filename) - # 3. Schreiben try: + # Robust Write: Ensure Flush with open(file_path, "w", encoding="utf-8") as f: f.write(markdown_content) + f.flush() + os.fsync(f.fileno()) + + # Kurzer Sleep für OS Filesystem Latenz + await asyncio.sleep(0.1) + logger.info(f"Written file to {file_path}") except Exception as e: - return {"status": "error", "error": f"Disk write failed at {file_path}: {str(e)}"} + return {"status": "error", "error": f"Disk write failed: {str(e)}"} - # 4. Indizieren (Async Aufruf!) - # Wir rufen process_file auf, das jetzt ASYNC ist + # Hier aktivieren wir die Smart Edges explizit für den Single-File Import return await self.process_file( file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, - purge_before=True + purge_before=True, + force_smart_edges=True # <--- HIER: Intelligence Override ) \ No newline at end of file