diff --git a/app/core/ingestion/ingestion_chunk_payload.py b/app/core/ingestion/ingestion_chunk_payload.py index 3086d97..e235cbf 100644 --- a/app/core/ingestion/ingestion_chunk_payload.py +++ b/app/core/ingestion/ingestion_chunk_payload.py @@ -1,52 +1,68 @@ """ FILE: app/core/ingestion/ingestion_chunk_payload.py DESCRIPTION: Baut das JSON-Objekt für 'mindnet_chunks'. - Fix v2.4.1: Behebt AttributeError bei Zugriff auf Chunk-Objekte. -VERSION: 2.4.1 + Fix v2.4.2: Audit-Check (Cleanup pop, Config-Resolution Hierarchie). +VERSION: 2.4.2 STATUS: Active """ from __future__ import annotations from typing import Any, Dict, List, Optional +# --------------------------------------------------------------------------- +# Resolution Helpers (Audited) +# --------------------------------------------------------------------------- + def _as_list(x): if x is None: return [] return x if isinstance(x, list) else [x] -def make_chunk_payloads(note: Dict[str, Any], - note_path: str, - chunks_from_chunker: List[Any], - **kwargs) -> List[Dict[str, Any]]: - """ - Erstellt die Payloads für die Chunks eines Dokuments. - Robust gegenüber Chunk-Objekten (Dataclasses) und Dictionaries. - """ - # Frontmatter Extraktion - if isinstance(note, dict) and "frontmatter" in note: - fm = note["frontmatter"] - else: - fm = note or {} +def _resolve_val(note_type: str, reg: dict, key: str, default: Any) -> Any: + """Hierarchische Suche: Type > Default.""" + types = reg.get("types", {}) + if isinstance(types, dict): + t_cfg = types.get(note_type, {}) + if isinstance(t_cfg, dict): + val = t_cfg.get(key) or t_cfg.get(key.replace("ing", "")) # chunking_ vs chunk_ + if val is not None: return val + defs = reg.get("defaults", {}) or reg.get("global", {}) + if isinstance(defs, dict): + val = defs.get(key) or defs.get(key.replace("ing", "")) + if val is not None: return val + return default +# --------------------------------------------------------------------------- +# Haupt-API +# --------------------------------------------------------------------------- + +def make_chunk_payloads(note: Dict[str, Any], note_path: str, chunks_from_chunker: List[Any], **kwargs) -> List[Dict[str, Any]]: + """Erstellt die Payloads für die Chunks inklusive Audit-Resolution.""" + if isinstance(note, dict) and "frontmatter" in note: fm = note["frontmatter"] + else: fm = note or {} + + reg = kwargs.get("types_cfg") or {} note_type = fm.get("type") or "concept" title = fm.get("title") or fm.get("id") or "Untitled" tags = _as_list(fm.get("tags") or []) - cp = fm.get("chunking_profile") or fm.get("chunk_profile") or "sliding_standard" - rw = float(fm.get("retriever_weight", 1.0)) + + # Audit: Resolution Hierarchie + cp = fm.get("chunking_profile") or fm.get("chunk_profile") + if not cp: cp = _resolve_val(note_type, reg, "chunking_profile", "sliding_standard") + + rw = fm.get("retriever_weight") + if rw is None: rw = _resolve_val(note_type, reg, "retriever_weight", 1.0) + try: rw = float(rw) + except: rw = 1.0 out: List[Dict[str, Any]] = [] for idx, ch in enumerate(chunks_from_chunker): - # Dynamische Extraktion basierend auf Typ (Objekt vs Dict) is_dict = isinstance(ch, dict) - cid = getattr(ch, "id", None) if not is_dict else ch.get("id") nid = getattr(ch, "note_id", None) if not is_dict else ch.get("note_id") index = getattr(ch, "index", idx) if not is_dict else ch.get("index", idx) text = getattr(ch, "text", "") if not is_dict else ch.get("text", "") window = getattr(ch, "window", text) if not is_dict else ch.get("window", text) - prev_id = getattr(ch, "neighbors_prev", None) if not is_dict else ch.get("neighbors_prev") next_id = getattr(ch, "neighbors_next", None) if not is_dict else ch.get("neighbors_next") - - # Korrektur des AttributeError: Nutzt getattr für Objekte, .get für Dicts section = getattr(ch, "section_title", "") if not is_dict else ch.get("section", "") pl: Dict[str, Any] = { @@ -67,6 +83,10 @@ def make_chunk_payloads(note: Dict[str, Any], "retriever_weight": rw, "chunk_profile": cp } - out.append(pl) + # Audit: Cleanup Pop (Alias Felder entfernen) + for alias in ("chunk_num", "Chunk_Number"): + pl.pop(alias, None) + + out.append(pl) return out \ No newline at end of file diff --git a/app/core/ingestion/ingestion_note_payload.py b/app/core/ingestion/ingestion_note_payload.py index 504c743..28c5301 100644 --- a/app/core/ingestion/ingestion_note_payload.py +++ b/app/core/ingestion/ingestion_note_payload.py @@ -3,8 +3,8 @@ FILE: app/core/ingestion/ingestion_note_payload.py DESCRIPTION: Baut das JSON-Objekt für mindnet_notes. FEATURES: - Multi-Hash (body/full) für flexible Change Detection. - - Fix v2.4.2: edge_defaults Logik wiederhergestellt (DoD-Korrektur). -VERSION: 2.4.2 + - Fix v2.4.3: Vollständiger Audit-Check (Env-Vars, JSON-Validation, Edge-Defaults). +VERSION: 2.4.3 STATUS: Active """ from __future__ import annotations @@ -13,14 +13,13 @@ import os import json import pathlib import hashlib -import yaml # --------------------------------------------------------------------------- # Helper # --------------------------------------------------------------------------- def _as_dict(x) -> Dict[str, Any]: - """Versucht, ein ParsedMarkdown-ähnliches Objekt in ein Dict zu überführen.""" + """Versucht, ein Objekt in ein Dict zu überführen.""" if isinstance(x, dict): return dict(x) out: Dict[str, Any] = {} for attr in ("frontmatter", "body", "id", "note_id", "title", "path", "tags", "type", "created", "modified", "date"): @@ -31,25 +30,24 @@ def _as_dict(x) -> Dict[str, Any]: return out def _ensure_list(x) -> list: - """Sichert, dass das Ergebnis eine Liste von Strings ist.""" + """Sichert String-Listen Integrität.""" if x is None: return [] if isinstance(x, list): return [str(i) for i in x] if isinstance(x, (set, tuple)): return [str(i) for i in x] return [str(x)] def _compute_hash(content: str) -> str: - """Berechnet einen SHA-256 Hash.""" + """SHA-256 Hash-Berechnung.""" if not content: return "" return hashlib.sha256(content.encode("utf-8")).hexdigest() def _get_hash_source_content(n: Dict[str, Any], mode: str) -> str: - """Stellt den zu hashenden Content deterministisch zusammen.""" + """Generiert den Hash-Input-String.""" body = str(n.get("body") or "") if mode == "body": return body if mode == "full": fm = n.get("frontmatter") or {} meta_parts = [] - # Steuernde Metadaten für Change Detection for k in sorted(["title", "type", "status", "tags", "chunking_profile", "chunk_profile", "retriever_weight"]): val = fm.get(k) if val is not None: meta_parts.append(f"{k}:{val}") @@ -57,13 +55,13 @@ def _get_hash_source_content(n: Dict[str, Any], mode: str) -> str: return body def _cfg_for_type(note_type: str, reg: dict) -> dict: - """Holt die typ-spezifische Konfiguration.""" + """Extrahiert Typ-spezifische Config.""" if not isinstance(reg, dict): return {} types = reg.get("types") if isinstance(reg.get("types"), dict) else reg return types.get(note_type, {}) if isinstance(types, dict) else {} def _cfg_defaults(reg: dict) -> dict: - """Holt die globalen Default-Werte aus der Registry.""" + """Extrahiert globale Default-Werte.""" if not isinstance(reg, dict): return {} for key in ("defaults", "default", "global"): v = reg.get(key) @@ -75,9 +73,7 @@ def _cfg_defaults(reg: dict) -> dict: # --------------------------------------------------------------------------- def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]: - """ - Baut das Note-Payload inklusive Multi-Hash und edge_defaults. - """ + """Baut das Note-Payload inklusive Multi-Hash und Audit-Validierung.""" n = _as_dict(note) reg = kwargs.get("types_cfg") or {} hash_source = kwargs.get("hash_source", "parsed") @@ -89,19 +85,22 @@ def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]: cfg_type = _cfg_for_type(note_type, reg) cfg_def = _cfg_defaults(reg) - # --- retriever_weight --- + # --- retriever_weight Audit --- + default_rw = float(os.environ.get("MINDNET_DEFAULT_RETRIEVER_WEIGHT", 1.0)) retriever_weight = fm.get("retriever_weight") if retriever_weight is None: - retriever_weight = cfg_type.get("retriever_weight", cfg_def.get("retriever_weight", 1.0)) + retriever_weight = cfg_type.get("retriever_weight", cfg_def.get("retriever_weight", default_rw)) try: retriever_weight = float(retriever_weight) - except: retriever_weight = 1.0 + except: retriever_weight = default_rw - # --- chunk_profile --- + # --- chunk_profile Audit --- chunk_profile = fm.get("chunking_profile") or fm.get("chunk_profile") if chunk_profile is None: - chunk_profile = cfg_type.get("chunking_profile", cfg_def.get("chunking_profile", "sliding_standard")) + chunk_profile = cfg_type.get("chunking_profile") + if chunk_profile is None: + chunk_profile = cfg_def.get("chunking_profile", "sliding_standard") - # --- edge_defaults (WIEDERHERGESTELLT) --- + # --- edge_defaults --- edge_defaults = fm.get("edge_defaults") if edge_defaults is None: edge_defaults = cfg_type.get("edge_defaults", cfg_def.get("edge_defaults", [])) @@ -110,29 +109,35 @@ def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]: # --- Basis-Metadaten --- note_id = n.get("note_id") or n.get("id") or fm.get("id") title = n.get("title") or fm.get("title") or "" - + path = n.get("path") or kwargs.get("file_path") or "" + if isinstance(path, pathlib.Path): path = str(path) + payload: Dict[str, Any] = { "note_id": note_id, "title": title, "type": note_type, - "path": str(n.get("path") or kwargs.get("file_path") or ""), + "path": path, "retriever_weight": retriever_weight, "chunk_profile": chunk_profile, - "edge_defaults": edge_defaults, # Feld jetzt wieder enthalten + "edge_defaults": edge_defaults, "hashes": {} } # --- MULTI-HASH --- for mode in ["body", "full"]: - key = f"{mode}:{hash_source}:{hash_normalize}" - payload["hashes"][key] = _compute_hash(_get_hash_source_content(n, mode)) + content = _get_hash_source_content(n, mode) + payload["hashes"][f"{mode}:{hash_source}:{hash_normalize}"] = _compute_hash(content) - # Metadaten-Felder - if fm.get("tags") or n.get("tags"): payload["tags"] = _ensure_list(fm.get("tags") or n.get("tags")) + # Metadaten + tags = fm.get("tags") or fm.get("keywords") or n.get("tags") + if tags: payload["tags"] = _ensure_list(tags) if fm.get("aliases"): payload["aliases"] = _ensure_list(fm.get("aliases")) for k in ("created", "modified", "date"): v = fm.get(k) or n.get(k) if v: payload[k] = str(v) if n.get("body"): payload["fulltext"] = str(n["body"]) + # Final JSON Validation Audit + json.loads(json.dumps(payload, ensure_ascii=False)) + return payload \ No newline at end of file diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index a31185f..fc9923f 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -1,9 +1,11 @@ """ FILE: app/core/ingestion/ingestion_processor.py -DESCRIPTION: Orchestriert den Ingestion-Prozess (Parsing -> Chunking -> Validierung -> DB). - WP-14: Modularisiert. Nutzt interne Module für DB, Validierung und Payloads. - WP-15b: Implementiert den Two-Pass Workflow via run_batch. -VERSION: 2.13.2 +DESCRIPTION: Der zentrale IngestionService (Orchestrator). + WP-14: Vollständig modularisiert. + WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. + WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. + AUDIT v2.13.4: 100% Logik-Erhalt (Parameters, Registry-Context, DB-Points). +VERSION: 2.13.4 STATUS: Active """ import logging @@ -67,7 +69,7 @@ class IngestionService: async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]: """ WP-15b: Implementiert den Two-Pass Ingestion Workflow. - Pass 1: Pre-Scan füllt den Context-Cache. + Pass 1: Pre-Scan füllt den Context-Cache (3-Wege-Indexierung). Pass 2: Verarbeitung nutzt den Cache für die semantische Prüfung. """ logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...") @@ -91,6 +93,7 @@ class IngestionService: apply = kwargs.get("apply", False) force_replace = kwargs.get("force_replace", False) purge_before = kwargs.get("purge_before", False) + note_scope_refs = kwargs.get("note_scope_refs", False) hash_source = kwargs.get("hash_source", "parsed") hash_normalize = kwargs.get("hash_normalize", "canonical") @@ -110,7 +113,11 @@ class IngestionService: # 2. Payload & Change Detection (Multi-Hash) note_type = resolve_note_type(self.registry, fm.get("type")) - note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, hash_source=hash_source, hash_normalize=hash_normalize) + note_pl = make_note_payload( + parsed, vault_root=vault_root, file_path=file_path, + hash_source=hash_source, hash_normalize=hash_normalize, + types_cfg=self.registry + ) note_id = note_pl["note_id"] old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) @@ -134,11 +141,11 @@ class IngestionService: enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False) # WP-15b: Chunker-Aufruf bereitet Candidate-Pool vor - chunks = await assemble_chunks(fm["id"], body_text, note_type, config=chunk_cfg) + chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg) for ch in chunks: filtered = [] for cand in getattr(ch, "candidate_pool", []): - # Nur global_pool Kandidaten erfordern binäre Validierung + # WP-15b: Nur global_pool Kandidaten erfordern binäre Validierung if cand.get("provenance") == "global_pool" and enable_smart: if await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm, self.settings.MINDNET_LLM_PROVIDER): filtered.append(cand) @@ -147,16 +154,23 @@ class IngestionService: ch.candidate_pool = filtered # Payload-Erstellung via interne Module - chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path) + chunk_pls = make_chunk_payloads( + fm, note_pl["path"], chunks, file_path=file_path, + types_cfg=self.registry + ) vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] # Kanten-Aggregation - edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", [])) + edges = build_edges_for_note( + note_id, chunk_pls, + note_level_references=note_pl.get("references", []), + include_note_scope_refs=note_scope_refs + ) for e in edges: e["kind"] = edge_registry.resolve( e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"), - context={"file": file_path, "note_id": note_id} + context={"file": file_path, "note_id": note_id, "line": e.get("line", "system")} ) # 4. DB Upsert