""" FILE: app/core/ingestion/ingestion_note_payload.py DESCRIPTION: Baut das JSON-Objekt für mindnet_notes. WP-14: Integration der zentralen Registry. WP-24c: Dynamische Ermittlung von edge_defaults aus dem Graph-Schema. VERSION: 2.5.0 (WP-24c: Dynamic Topology Integration) STATUS: Active """ from __future__ import annotations from typing import Any, Dict, Tuple, Optional import os import json import pathlib import hashlib # Import der zentralen Registry-Logik from app.core.registry import load_type_registry # WP-24c: Zugriff auf das dynamische Graph-Schema from app.services.edge_registry import registry as edge_registry # --------------------------------------------------------------------------- # Helper # --------------------------------------------------------------------------- def _as_dict(x) -> Dict[str, Any]: """Versucht, ein Objekt in ein Dict zu überführen.""" if isinstance(x, dict): return dict(x) out: Dict[str, Any] = {} for attr in ("frontmatter", "body", "id", "note_id", "title", "path", "tags", "type", "created", "modified", "date"): if hasattr(x, attr): val = getattr(x, attr) if val is not None: out[attr] = val if not out: out["raw"] = str(x) return out def _ensure_list(x) -> list: """Sichert String-Listen Integrität.""" if x is None: return [] if isinstance(x, list): return [str(i) for i in x] if isinstance(x, (set, tuple)): return [str(i) for i in x] return [str(x)] def _compute_hash(content: str) -> str: """SHA-256 Hash-Berechnung.""" if not content: return "" return hashlib.sha256(content.encode("utf-8")).hexdigest() def _get_hash_source_content(n: Dict[str, Any], mode: str) -> str: """ Generiert den Hash-Input-String basierend auf Body oder Metadaten. Inkludiert alle entscheidungsrelevanten Profil-Parameter. """ body = str(n.get("body") or "").strip() if mode == "body": return body if mode == "full": fm = n.get("frontmatter") or {} meta_parts = [] # Alle Felder, die das Chunking oder Retrieval beeinflussen keys = [ "title", "type", "status", "tags", "chunking_profile", "chunk_profile", "retriever_weight", "split_level", "strict_heading_split" ] for k in sorted(keys): val = fm.get(k) if val is not None: meta_parts.append(f"{k}:{val}") return f"{'|'.join(meta_parts)}||{body}" return body def _cfg_for_type(note_type: str, reg: dict) -> dict: """Extrahiert Typ-spezifische Config aus der Registry.""" if not isinstance(reg, dict): return {} types = reg.get("types") if isinstance(reg.get("types"), dict) else reg return types.get(note_type, {}) if isinstance(types, dict) else {} def _cfg_defaults(reg: dict) -> dict: """Extrahiert globale Default-Werte aus der Registry.""" if not isinstance(reg, dict): return {} for key in ("defaults", "default", "global"): v = reg.get(key) if isinstance(v, dict): return v return {} # --------------------------------------------------------------------------- # Haupt-API # --------------------------------------------------------------------------- def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]: """ Baut das Note-Payload inklusive Multi-Hash und Audit-Validierung. WP-24c: Nutzt die EdgeRegistry zur dynamischen Auflösung von Typical Edges. """ n = _as_dict(note) # Registry & Context Settings reg = kwargs.get("types_cfg") or load_type_registry() hash_source = kwargs.get("hash_source", "parsed") hash_normalize = kwargs.get("hash_normalize", "canonical") fm = n.get("frontmatter") or {} note_type = str(fm.get("type") or n.get("type") or "concept") cfg_type = _cfg_for_type(note_type, reg) cfg_def = _cfg_defaults(reg) ingest_cfg = reg.get("ingestion_settings", {}) # --- retriever_weight Audit --- default_rw = float(os.environ.get("MINDNET_DEFAULT_RETRIEVER_WEIGHT", 1.0)) retriever_weight = fm.get("retriever_weight") if retriever_weight is None: retriever_weight = cfg_type.get("retriever_weight", cfg_def.get("retriever_weight", default_rw)) try: retriever_weight = float(retriever_weight) except: retriever_weight = default_rw # --- chunk_profile Audit --- chunk_profile = fm.get("chunking_profile") or fm.get("chunk_profile") if chunk_profile is None: chunk_profile = cfg_type.get("chunking_profile") or cfg_type.get("chunk_profile") if chunk_profile is None: chunk_profile = ingest_cfg.get("default_chunk_profile", cfg_def.get("chunking_profile", "sliding_standard")) # --- WP-24c: edge_defaults Dynamisierung --- # 1. Priorität: Manuelle Definition im Frontmatter edge_defaults = fm.get("edge_defaults") # 2. Priorität: Dynamische Abfrage der 'Typical Edges' aus dem Graph-Schema if edge_defaults is None: topology = edge_registry.get_topology_info(note_type, "any") edge_defaults = topology.get("typical", []) # 3. Fallback: Leere Liste, falls kein Schema-Eintrag existiert edge_defaults = _ensure_list(edge_defaults) # --- Basis-Metadaten --- note_id = n.get("note_id") or n.get("id") or fm.get("id") title = n.get("title") or fm.get("title") or "" path = n.get("path") or kwargs.get("file_path") or "" if isinstance(path, pathlib.Path): path = str(path) payload: Dict[str, Any] = { "note_id": note_id, "title": title, "type": note_type, "path": path, "retriever_weight": retriever_weight, "chunk_profile": chunk_profile, "edge_defaults": edge_defaults, "hashes": {} } # --- MULTI-HASH --- # Generiert Hashes für Change Detection (WP-15b) for mode in ["body", "full"]: content = _get_hash_source_content(n, mode) payload["hashes"][f"{mode}:{hash_source}:{hash_normalize}"] = _compute_hash(content) # Metadaten Anreicherung (Tags, Aliases, Zeitstempel) tags = fm.get("tags") or fm.get("keywords") or n.get("tags") if tags: payload["tags"] = _ensure_list(tags) aliases = fm.get("aliases") if aliases: payload["aliases"] = _ensure_list(aliases) for k in ("created", "modified", "date"): v = fm.get(k) or n.get(k) if v: payload[k] = str(v) if n.get("body"): payload["fulltext"] = str(n["body"]) # Final JSON Validation Audit json.loads(json.dumps(payload, ensure_ascii=False)) return payload