diff --git a/app/core/ingestion.py b/app/core/ingestion.py index 9b96478..13b5db3 100644 --- a/app/core/ingestion.py +++ b/app/core/ingestion.py @@ -1,7 +1,9 @@ """ FILE: app/core/ingestion.py -DESCRIPTION: Haupt-Ingestion-Logik. Liest Markdown, steuert Change-Detection (via ENV) und schreibt in Qdrant. -VERSION: 2.6.0 (Feat: Flexible Change Detection & Full Logic Restoration) +DESCRIPTION: Haupt-Ingestion-Logik. +FIX: Korrekte Priorisierung von Frontmatter für chunk_profile und retriever_weight. + Lade Chunk-Config basierend auf dem effektiven Profil, nicht nur dem Notiz-Typ. +VERSION: 2.7.0 (Fix: Frontmatter Overrides & Config Loading) STATUS: Active DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.core.derive_edges, app.core.qdrant*, app.services.embeddings_client EXTERNAL_CONFIG: config/types.yaml @@ -54,18 +56,42 @@ def resolve_note_type(requested: Optional[str], reg: dict) -> str: if requested and requested in types: return requested return "concept" -def effective_chunk_profile(note_type: str, reg: dict) -> str: +def effective_chunk_profile_name(fm: dict, note_type: str, reg: dict) -> str: + """ + Ermittelt den Namen des Chunk-Profils. + Prio: 1. Frontmatter -> 2. Type-Config -> 3. Default + """ + # 1. Frontmatter Override + override = fm.get("chunking_profile") or fm.get("chunk_profile") + if override and isinstance(override, str): + return override + + # 2. Type Config t_cfg = reg.get("types", {}).get(note_type, {}) - # FIX: Konsistenz mit note_payload.py - Prüfe erst den korrekten Key if t_cfg: - if t_cfg.get("chunking_profile"): return t_cfg.get("chunking_profile") - if t_cfg.get("chunk_profile"): return t_cfg.get("chunk_profile") # Legacy + cp = t_cfg.get("chunking_profile") or t_cfg.get("chunk_profile") + if cp: return cp + + # 3. Global Default return reg.get("defaults", {}).get("chunking_profile", "sliding_standard") -def effective_retriever_weight(note_type: str, reg: dict) -> float: +def effective_retriever_weight(fm: dict, note_type: str, reg: dict) -> float: + """ + Ermittelt das Retriever Weight. + Prio: 1. Frontmatter -> 2. Type-Config -> 3. Default + """ + # 1. Frontmatter Override + override = fm.get("retriever_weight") + if override is not None: + try: return float(override) + except: pass + + # 2. Type Config t_cfg = reg.get("types", {}).get(note_type, {}) if t_cfg and "retriever_weight" in t_cfg: return float(t_cfg["retriever_weight"]) + + # 3. Global Default return float(reg.get("defaults", {}).get("retriever_weight", 1.0)) @@ -90,6 +116,23 @@ class IngestionService: except Exception as e: logger.warning(f"DB init warning: {e}") + def _get_chunk_config_by_profile(self, profile_name: str, note_type: str) -> Dict[str, Any]: + """ + Lädt die konkrete Config (target, max, overlap) für einen Profilnamen. + """ + # Suche direkt in den definierten Profilen der Registry + profiles = self.registry.get("chunking_profiles", {}) + if profile_name in profiles: + cfg = profiles[profile_name].copy() + # Tuple-Fix für Overlap (wie in chunker.py) + if "overlap" in cfg and isinstance(cfg["overlap"], list): + cfg["overlap"] = tuple(cfg["overlap"]) + return cfg + + # Fallback: Wenn Profilname unbekannt, nutze Standard für den Typ via Chunker + logger.warning(f"Profile '{profile_name}' not found in registry. Falling back to type defaults.") + return get_chunk_config(note_type) + async def process_file( self, file_path: str, @@ -98,46 +141,37 @@ class IngestionService: apply: bool = False, purge_before: bool = False, note_scope_refs: bool = False, - # Hash-Mode wird nicht mehr übergeben, sondern via ENV gesteuert. - # Source und Normalize bleiben konfigurierbar. hash_source: str = "parsed", hash_normalize: str = "canonical" ) -> Dict[str, Any]: - """ - Verarbeitet eine einzelne Datei (ASYNC). - Inklusive Change Detection (Hash-Check) gegen Qdrant. - """ - result = { - "path": file_path, - "status": "skipped", - "changed": False, - "error": None - } + + result = {"path": file_path, "status": "skipped", "changed": False, "error": None} # 1. Parse & Frontmatter Validation try: parsed = read_markdown(file_path) - if not parsed: - return {**result, "error": "Empty or unreadable file"} - + if not parsed: return {**result, "error": "Empty or unreadable file"} fm = normalize_frontmatter(parsed.frontmatter) validate_required_frontmatter(fm) except Exception as e: logger.error(f"Validation failed for {file_path}: {e}") return {**result, "error": f"Validation failed: {str(e)}"} - # 2. Type & Config Resolution + # 2. Type & Config Resolution (FIXED) + # Wir ermitteln erst den Typ note_type = resolve_note_type(fm.get("type"), self.registry) fm["type"] = note_type - fm["chunk_profile"] = effective_chunk_profile(note_type, self.registry) - weight = fm.get("retriever_weight") - if weight is None: - weight = effective_retriever_weight(note_type, self.registry) - fm["retriever_weight"] = float(weight) + # Dann ermitteln wir die effektiven Werte unter Berücksichtigung des Frontmatters! + # Hier lag der Fehler: Vorher wurde einfach überschrieben. + effective_profile = effective_chunk_profile_name(fm, note_type, self.registry) + effective_weight = effective_retriever_weight(fm, note_type, self.registry) + + # Wir schreiben die effektiven Werte zurück ins FM, damit note_payload sie sicher hat + fm["chunk_profile"] = effective_profile + fm["retriever_weight"] = effective_weight # 3. Build Note Payload - # Ruft make_note_payload auf, welches JETZT alle Hash-Varianten berechnet. try: note_pl = make_note_payload( parsed, @@ -146,42 +180,33 @@ class IngestionService: hash_source=hash_source, file_path=file_path ) - if not note_pl.get("fulltext"): - note_pl["fulltext"] = getattr(parsed, "body", "") or "" - note_pl["retriever_weight"] = fm["retriever_weight"] + # Text Body Fallback + if not note_pl.get("fulltext"): note_pl["fulltext"] = getattr(parsed, "body", "") or "" + + # Update Payload with explicit effective values (Sicherheit) + note_pl["retriever_weight"] = effective_weight + note_pl["chunk_profile"] = effective_profile note_id = note_pl["note_id"] except Exception as e: logger.error(f"Payload build failed: {e}") return {**result, "error": f"Payload build failed: {str(e)}"} - # 4. Change Detection (Updated Logic with ENV Strategy) + # 4. Change Detection old_payload = None if not force_replace: old_payload = self._fetch_note_payload(note_id) has_old = old_payload is not None - - # Wir bauen den Key basierend auf der ENV-Einstellung check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}" old_hashes = (old_payload or {}).get("hashes") - - # Fallback Logik für alte Daten - if isinstance(old_hashes, dict): - old_hash = old_hashes.get(check_key) - elif isinstance(old_hashes, str): - # Sehr alte Legacy Daten hatten Hash direkt als String (meist Body) - # Wenn wir im Body-Modus sind, ist das okay, sonst Force Update - old_hash = old_hashes if self.active_hash_mode == "body" else None - else: - old_hash = None + if isinstance(old_hashes, dict): old_hash = old_hashes.get(check_key) + elif isinstance(old_hashes, str) and self.active_hash_mode == "body": old_hash = old_hashes + else: old_hash = None new_hash = note_pl.get("hashes", {}).get(check_key) - - # Vergleich hash_changed = (old_hash != new_hash) - chunks_missing, edges_missing = self._artifacts_missing(note_id) should_write = force_replace or (not has_old) or hash_changed or chunks_missing or edges_missing @@ -192,17 +217,19 @@ class IngestionService: if not apply: return {**result, "status": "dry-run", "changed": True, "note_id": note_id} - # 5. Processing (Chunking, Embedding, Edges) + # 5. Processing try: body_text = getattr(parsed, "body", "") or "" - # --- Config Loading (Clean) --- - chunk_config = get_chunk_config(note_type) + # FIX: Wir laden jetzt die Config für das SPEZIFISCHE Profil + # (z.B. wenn User "sliding_short" wollte, laden wir dessen Params) + chunk_config = self._get_chunk_config_by_profile(effective_profile, note_type) chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config) + + # chunk_payloads werden mit den aktualisierten FM-Werten gebaut chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text) - # Embedding vecs = [] if chunk_pls: texts = [c.get("window") or c.get("text") or "" for c in chunk_pls] @@ -217,7 +244,6 @@ class IngestionService: logger.error(f"Embedding failed: {e}") raise RuntimeError(f"Embedding failed: {e}") - # Edges try: edges = build_edges_for_note( note_id, @@ -232,7 +258,7 @@ class IngestionService: logger.error(f"Processing failed: {e}", exc_info=True) return {**result, "error": f"Processing failed: {str(e)}"} - # 6. Upsert Action + # 6. Upsert try: if purge_before and has_old: self._purge_artifacts(note_id) @@ -259,9 +285,8 @@ class IngestionService: except Exception as e: logger.error(f"Upsert failed: {e}", exc_info=True) return {**result, "error": f"DB Upsert failed: {e}"} - - # --- Qdrant Helper --- - + + # ... (Restliche Methoden wie _fetch_note_payload bleiben unverändert) ... def _fetch_note_payload(self, note_id: str) -> Optional[dict]: from qdrant_client.http import models as rest col = f"{self.prefix}_notes" @@ -291,38 +316,17 @@ class IngestionService: self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector) except Exception: pass - async def create_from_text( - self, - markdown_content: str, - filename: str, - vault_root: str, - folder: str = "00_Inbox" - ) -> Dict[str, Any]: - """ - WP-11 Persistence API Entrypoint. - """ - target_dir = os.path.join(vault_root, folder) - os.makedirs(target_dir, exist_ok=True) - - file_path = os.path.join(target_dir, filename) - - try: - # Robust Write: Ensure Flush & Sync - with open(file_path, "w", encoding="utf-8") as f: - f.write(markdown_content) - f.flush() - os.fsync(f.fileno()) - - await asyncio.sleep(0.1) - - logger.info(f"Written file to {file_path}") - except Exception as e: - return {"status": "error", "error": f"Disk write failed: {str(e)}"} - - return await self.process_file( - file_path=file_path, - vault_root=vault_root, - apply=True, - force_replace=True, - purge_before=True - ) \ No newline at end of file + async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]: + target_dir = os.path.join(vault_root, folder) + os.makedirs(target_dir, exist_ok=True) + file_path = os.path.join(target_dir, filename) + try: + with open(file_path, "w", encoding="utf-8") as f: + f.write(markdown_content) + f.flush() + os.fsync(f.fileno()) + await asyncio.sleep(0.1) + logger.info(f"Written file to {file_path}") + except Exception as e: + return {"status": "error", "error": f"Disk write failed: {str(e)}"} + return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) \ No newline at end of file