diff --git a/app/core/ingestion.py b/app/core/ingestion.py index 4cbba71..8035c5c 100644 --- a/app/core/ingestion.py +++ b/app/core/ingestion.py @@ -1,13 +1,8 @@ """ app/core/ingestion.py -Zentraler Service für die Transformation von Markdown-Dateien in Qdrant-Objekte (Notes, Chunks, Edges). -Features: -- Incremental Update (Hashing) für Massenimport. -- Force Smart Edges für Single-File (UI). -- Async Embedding & Chunking. - -Version: 2.5.1 (Full Restore + WP-15 Logic) +Zentraler Service für die Transformation von Markdown-Dateien in Qdrant-Objekte. +Version: 2.5.2 (Full Feature: Change Detection + Robust IO + Clean Config) """ import os import logging @@ -22,11 +17,10 @@ from app.core.parser import ( validate_required_frontmatter, ) from app.core.note_payload import make_note_payload -# ASYNC CHUNKER (WP-15) from app.core.chunker import assemble_chunks, get_chunk_config from app.core.chunk_payload import make_chunk_payloads -# Fallback für Edges Import +# Fallback für Edges try: from app.core.derive_edges import build_edges_for_note except ImportError: @@ -58,6 +52,19 @@ def resolve_note_type(requested: Optional[str], reg: dict) -> str: if requested and requested in types: return requested return "concept" +def effective_chunk_profile(note_type: str, reg: dict) -> str: + t_cfg = reg.get("types", {}).get(note_type, {}) + if t_cfg and t_cfg.get("chunk_profile"): + return t_cfg.get("chunk_profile") + return reg.get("defaults", {}).get("chunk_profile", "default") + +def effective_retriever_weight(note_type: str, reg: dict) -> float: + t_cfg = reg.get("types", {}).get(note_type, {}) + if t_cfg and "retriever_weight" in t_cfg: + return float(t_cfg["retriever_weight"]) + return float(reg.get("defaults", {}).get("retriever_weight", 1.0)) + + class IngestionService: def __init__(self, collection_prefix: str = None): env_prefix = os.getenv("COLLECTION_PREFIX", "mindnet") @@ -70,7 +77,6 @@ class IngestionService: self.registry = load_type_registry() self.embedder = EmbeddingsClient() - # Init Checks try: ensure_collections(self.client, self.prefix, self.dim) ensure_payload_indexes(self.client, self.prefix) @@ -87,12 +93,11 @@ class IngestionService: note_scope_refs: bool = False, hash_mode: str = "body", hash_source: str = "parsed", - hash_normalize: str = "canonical", - force_smart_edges: bool = False # NEU: Override für UI + hash_normalize: str = "canonical" ) -> Dict[str, Any]: """ Verarbeitet eine einzelne Datei (ASYNC). - Enthält Hashing-Logik für inkrementelle Updates. + Inklusive Change Detection (Hash-Check) gegen Qdrant. """ result = { "path": file_path, @@ -101,10 +106,11 @@ class IngestionService: "error": None } - # 1. Parse & Validate + # 1. Parse & Frontmatter Validation try: parsed = read_markdown(file_path) - if not parsed: return {**result, "error": "Empty/Unreadable"} + if not parsed: + return {**result, "error": "Empty or unreadable file"} fm = normalize_frontmatter(parsed.frontmatter) validate_required_frontmatter(fm) @@ -112,42 +118,46 @@ class IngestionService: logger.error(f"Validation failed for {file_path}: {e}") return {**result, "error": f"Validation failed: {str(e)}"} - # 2. Resolve Type + # 2. Type & Config Resolution note_type = resolve_note_type(fm.get("type"), self.registry) fm["type"] = note_type + fm["chunk_profile"] = effective_chunk_profile(note_type, self.registry) - # 3. Build Payload & Hash + weight = fm.get("retriever_weight") + if weight is None: + weight = effective_retriever_weight(note_type, self.registry) + fm["retriever_weight"] = float(weight) + + # 3. Build Note Payload try: note_pl = make_note_payload( - parsed, - vault_root=vault_root, - file_path=file_path, + parsed, + vault_root=vault_root, hash_mode=hash_mode, hash_normalize=hash_normalize, - hash_source=hash_source + hash_source=hash_source, + file_path=file_path ) - if not note_pl.get("fulltext"): + if not note_pl.get("fulltext"): note_pl["fulltext"] = getattr(parsed, "body", "") or "" + note_pl["retriever_weight"] = fm["retriever_weight"] note_id = note_pl["note_id"] except Exception as e: + logger.error(f"Payload build failed: {e}") return {**result, "error": f"Payload build failed: {str(e)}"} - # 4. Change Detection (Das Herzstück für Massenimport) + # 4. Change Detection (Das fehlende Stück!) old_payload = None if not force_replace: old_payload = self._fetch_note_payload(note_id) has_old = old_payload is not None - - # Hash Vergleich key_current = f"{hash_mode}:{hash_source}:{hash_normalize}" old_hash = (old_payload or {}).get("hashes", {}).get(key_current) new_hash = note_pl.get("hashes", {}).get(key_current) hash_changed = (old_hash != new_hash) - - # Check ob Chunks/Edges in DB fehlen (Reparatur-Modus) chunks_missing, edges_missing = self._artifacts_missing(note_id) should_write = force_replace or (not has_old) or hash_changed or chunks_missing or edges_missing @@ -158,18 +168,14 @@ class IngestionService: if not apply: return {**result, "status": "dry-run", "changed": True, "note_id": note_id} - # 5. Processing (Chunking, Embedding) + # 5. Processing (Chunking, Embedding, Edges) try: body_text = getattr(parsed, "body", "") or "" - # --- WP-15 LOGIC --- - # Config laden und ggf. überschreiben + # --- Config Loading (Clean) --- chunk_config = get_chunk_config(note_type) - if force_smart_edges: - logger.info(f"Ingestion: Forcing Smart Edges for {note_id}") - chunk_config["enable_smart_edge_allocation"] = True + # Hier greift die Logik aus types.yaml (smart=True/False) - # Async Chunking chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text) @@ -181,11 +187,12 @@ class IngestionService: if hasattr(self.embedder, 'embed_documents'): vecs = await self.embedder.embed_documents(texts) else: - for t in texts: vecs.append(await self.embedder.embed_query(t)) + for t in texts: + v = await self.embedder.embed_query(t) + vecs.append(v) except Exception as e: - # Embedding Fehler sind kritisch logger.error(f"Embedding failed: {e}") - raise e + raise RuntimeError(f"Embedding failed: {e}") # Edges try: @@ -202,7 +209,7 @@ class IngestionService: logger.error(f"Processing failed: {e}", exc_info=True) return {**result, "error": f"Processing failed: {str(e)}"} - # 6. Upsert (Atomic Write recommended, here Batch) + # 6. Upsert Action try: if purge_before and has_old: self._purge_artifacts(note_id) @@ -230,7 +237,7 @@ class IngestionService: logger.error(f"Upsert failed: {e}", exc_info=True) return {**result, "error": f"DB Upsert failed: {e}"} - # --- Interne Qdrant Helper (Wichtig für Change Detection) --- + # --- Qdrant Helper (Restored) --- def _fetch_note_payload(self, note_id: str) -> Optional[dict]: from qdrant_client.http import models as rest @@ -270,7 +277,6 @@ class IngestionService: ) -> Dict[str, Any]: """ WP-11 Persistence API Entrypoint. - Speichert Text und erzwingt sofortige Indizierung mit Smart Edges. """ target_dir = os.path.join(vault_root, folder) os.makedirs(target_dir, exist_ok=True) @@ -278,25 +284,22 @@ class IngestionService: file_path = os.path.join(target_dir, filename) try: - # Robust Write: Ensure Flush + # Robust Write: Ensure Flush & Sync with open(file_path, "w", encoding="utf-8") as f: f.write(markdown_content) f.flush() os.fsync(f.fileno()) - # Kurzer Sleep für OS Filesystem Latenz await asyncio.sleep(0.1) logger.info(f"Written file to {file_path}") except Exception as e: return {"status": "error", "error": f"Disk write failed: {str(e)}"} - # Hier aktivieren wir die Smart Edges explizit für den Single-File Import return await self.process_file( file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, - purge_before=True, - force_smart_edges=True # <--- HIER: Intelligence Override + purge_before=True ) \ No newline at end of file diff --git a/config/types.yaml b/config/types.yaml index 9753c9f..a3385e0 100644 --- a/config/types.yaml +++ b/config/types.yaml @@ -30,7 +30,7 @@ chunking_profiles: # bei der Generierung nicht zu überlasten. Später wieder aktivieren. sliding_smart_edges: strategy: sliding_window - enable_smart_edge_allocation: false + enable_smart_edge_allocation: true target: 400 max: 600 overlap: [50, 80] @@ -39,7 +39,7 @@ chunking_profiles: # Für Profile, Werte, Prinzipien. Trennt hart an Überschriften (H2). structured_smart_edges: strategy: by_heading - enable_smart_edge_allocation: false + enable_smart_edge_allocation: true split_level: 2 max: 600 target: 400