From a6d37c92d2f6af47766e8367f95d4ca10838a842 Mon Sep 17 00:00:00 2001 From: Lars Date: Sat, 27 Dec 2025 10:40:44 +0100 Subject: [PATCH] Integration von payload modulen in die neue Struktur --- app/core/ingestion/ingestion_chunk_payload.py | 46 ++++++++ app/core/ingestion/ingestion_note_payload.py | 82 +++++++++++++ app/core/ingestion/ingestion_processor.py | 110 ++++++++++++------ 3 files changed, 205 insertions(+), 33 deletions(-) create mode 100644 app/core/ingestion/ingestion_chunk_payload.py create mode 100644 app/core/ingestion/ingestion_note_payload.py diff --git a/app/core/ingestion/ingestion_chunk_payload.py b/app/core/ingestion/ingestion_chunk_payload.py new file mode 100644 index 0000000..67c48fb --- /dev/null +++ b/app/core/ingestion/ingestion_chunk_payload.py @@ -0,0 +1,46 @@ +""" +FILE: app/core/ingestion/ingestion_chunk_payload.py +DESCRIPTION: Baut das JSON-Objekt für mindnet_chunks. +VERSION: 2.4.0 +""" +from __future__ import annotations +from typing import Any, Dict, List, Optional + +def _as_list(x): + if x is None: return [] + return x if isinstance(x, list) else [x] + +def make_chunk_payloads(note: Dict[str, Any], note_path: str, chunks_from_chunker: List[Any], **kwargs) -> List[Dict[str, Any]]: + """Erstellt die Payloads für die Chunks eines Dokuments.""" + if isinstance(note, dict) and "frontmatter" in note: fm = note["frontmatter"] + else: fm = note or {} + + note_type = fm.get("type") or "concept" + title = fm.get("title") or fm.get("id") or "Untitled" + tags = _as_list(fm.get("tags") or []) + cp = fm.get("chunking_profile") or fm.get("chunk_profile") or "sliding_standard" + rw = float(fm.get("retriever_weight", 1.0)) + + out: List[Dict[str, Any]] = [] + for idx, ch in enumerate(chunks_from_chunker): + text = getattr(ch, "text", "") or ch.get("text", "") + pl: Dict[str, Any] = { + "note_id": getattr(ch, "note_id", None) or fm.get("id"), + "chunk_id": getattr(ch, "id", None), + "title": title, + "index": int(getattr(ch, "index", idx)), + "ord": int(getattr(ch, "index", idx)) + 1, + "type": note_type, + "tags": tags, + "text": text, + "window": getattr(ch, "window", text), + "neighbors_prev": _as_list(getattr(ch, "neighbors_prev", None)), + "neighbors_next": _as_list(getattr(ch, "neighbors_next", None)), + "section": getattr(ch, "section_title", "") or ch.get("section", ""), + "path": note_path, + "source_path": kwargs.get("file_path") or note_path, + "retriever_weight": rw, + "chunk_profile": cp + } + out.append(pl) + return out \ No newline at end of file diff --git a/app/core/ingestion/ingestion_note_payload.py b/app/core/ingestion/ingestion_note_payload.py new file mode 100644 index 0000000..045efdd --- /dev/null +++ b/app/core/ingestion/ingestion_note_payload.py @@ -0,0 +1,82 @@ +""" +FILE: app/core/ingestion/ingestion_note_payload.py +DESCRIPTION: Baut das JSON-Objekt für mindnet_notes. +FEATURES: Multi-Hash (body/full), Config-Fix für chunking_profile. +VERSION: 2.4.0 +""" +from __future__ import annotations +from typing import Any, Dict, Tuple, Optional +import os +import json +import pathlib +import hashlib +import yaml + +def _as_dict(x) -> Dict[str, Any]: + if isinstance(x, dict): return dict(x) + out: Dict[str, Any] = {} + for attr in ("frontmatter", "body", "id", "note_id", "title", "path", "tags", "type", "created", "modified", "date"): + if hasattr(x, attr): + val = getattr(x, attr) + if val is not None: out[attr] = val + if not out: out["raw"] = str(x) + return out + +def _ensure_list(x) -> list: + if x is None: return [] + if isinstance(x, list): return [str(i) for i in x] + if isinstance(x, (set, tuple)): return [str(i) for i in x] + return [str(x)] + +def _compute_hash(content: str) -> str: + if not content: return "" + return hashlib.sha256(content.encode("utf-8")).hexdigest() + +def _get_hash_source_content(n: Dict[str, Any], mode: str) -> str: + body = str(n.get("body") or "") + if mode == "body": return body + if mode == "full": + fm = n.get("frontmatter") or {} + meta_parts = [] + for k in sorted(["title", "type", "status", "tags", "chunking_profile", "chunk_profile", "retriever_weight"]): + val = fm.get(k) + if val is not None: meta_parts.append(f"{k}:{val}") + return f" {'|'.join(meta_parts)}||{body}" + return body + +def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]: + """Baut das Note-Payload inklusive Multi-Hash.""" + n = _as_dict(note) + reg = kwargs.get("types_cfg") or {} + hash_source = kwargs.get("hash_source", "parsed") + hash_normalize = kwargs.get("hash_normalize", "canonical") + + fm = n.get("frontmatter") or {} + note_type = str(fm.get("type") or n.get("type") or "concept") + + # Weights & Profiles + retriever_weight = fm.get("retriever_weight", 1.0) + chunk_profile = fm.get("chunking_profile") or fm.get("chunk_profile") or "sliding_standard" + + payload: Dict[str, Any] = { + "note_id": n.get("note_id") or n.get("id") or fm.get("id"), + "title": n.get("title") or fm.get("title") or "", + "type": note_type, + "path": str(n.get("path") or kwargs.get("path") or ""), + "retriever_weight": float(retriever_weight), + "chunk_profile": chunk_profile, + "hashes": {} + } + + for mode in ["body", "full"]: + key = f"{mode}:{hash_source}:{hash_normalize}" + payload["hashes"][key] = _compute_hash(_get_hash_source_content(n, mode)) + + if fm.get("tags") or n.get("tags"): payload["tags"] = _ensure_list(fm.get("tags") or n.get("tags")) + if fm.get("aliases"): payload["aliases"] = _ensure_list(fm.get("aliases")) + for k in ("created", "modified", "date"): + v = fm.get(k) or n.get(k) + if v: payload[k] = str(v) + if n.get("body"): payload["fulltext"] = str(n["body"]) + + return payload \ No newline at end of file diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 06c292d..a31185f 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -1,31 +1,38 @@ """ FILE: app/core/ingestion/ingestion_processor.py DESCRIPTION: Orchestriert den Ingestion-Prozess (Parsing -> Chunking -> Validierung -> DB). + WP-14: Modularisiert. Nutzt interne Module für DB, Validierung und Payloads. + WP-15b: Implementiert den Two-Pass Workflow via run_batch. +VERSION: 2.13.2 +STATUS: Active """ import logging import asyncio +import os from typing import Dict, List, Optional, Tuple, Any +# Core Module Imports from app.core.parser import ( read_markdown, pre_scan_markdown, normalize_frontmatter, validate_required_frontmatter, NoteContext ) -from app.core.note_payload import make_note_payload from app.core.chunker import assemble_chunks -from app.core.chunk_payload import make_chunk_payloads from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch +# Services from app.services.embeddings_client import EmbeddingsClient from app.services.edge_registry import registry as edge_registry from app.services.llm_service import LLMService -# Package-Interne Imports +# Package-Interne Imports (Refactoring WP-14) from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts from .ingestion_validation import validate_edge_candidate +from .ingestion_note_payload import make_note_payload +from .ingestion_chunk_payload import make_chunk_payloads -# Fallback für Edges +# Fallback für Edges (Struktur-Verknüpfung) try: from app.core.derive_edges import build_edges_for_note except ImportError: @@ -35,8 +42,10 @@ logger = logging.getLogger(__name__) class IngestionService: def __init__(self, collection_prefix: str = None): + """Initialisiert den Service und stellt die DB-Verbindung bereit.""" from app.config import get_settings self.settings = get_settings() + self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX self.cfg = QdrantConfig.from_env() self.cfg.prefix = self.prefix @@ -45,28 +54,37 @@ class IngestionService: self.registry = load_type_registry() self.embedder = EmbeddingsClient() self.llm = LLMService() + self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE - self.batch_cache: Dict[str, NoteContext] = {} + self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache try: ensure_collections(self.client, self.prefix, self.dim) ensure_payload_indexes(self.client, self.prefix) - except Exception as e: logger.warning(f"DB init warning: {e}") + except Exception as e: + logger.warning(f"DB initialization warning: {e}") async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]: - """WP-15b: Two-Pass Ingestion Workflow.""" + """ + WP-15b: Implementiert den Two-Pass Ingestion Workflow. + Pass 1: Pre-Scan füllt den Context-Cache. + Pass 2: Verarbeitung nutzt den Cache für die semantische Prüfung. + """ logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...") for path in file_paths: - ctx = pre_scan_markdown(path) - if ctx: - self.batch_cache[ctx.note_id] = ctx - self.batch_cache[ctx.title] = ctx - import os - fname = os.path.splitext(os.path.basename(path))[0] - self.batch_cache[fname] = ctx + try: + ctx = pre_scan_markdown(path) + if ctx: + # Mehrfache Indizierung für robusten Look-up (ID, Titel, Dateiname) + self.batch_cache[ctx.note_id] = ctx + self.batch_cache[ctx.title] = ctx + fname = os.path.splitext(os.path.basename(path))[0] + self.batch_cache[fname] = ctx + except Exception as e: + logger.warning(f"⚠️ Pre-scan failed for {path}: {e}") logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...") - return [await self.process_file(p, vault_root, apply=True) for p in file_paths] + return [await self.process_file(p, vault_root, apply=True, purge_before=True) for p in file_paths] async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]: """Transformiert eine Markdown-Datei in den Graphen.""" @@ -78,18 +96,19 @@ class IngestionService: result = {"path": file_path, "status": "skipped", "changed": False, "error": None} - # 1. Parse & Lifecycle + # 1. Parse & Lifecycle Gate try: parsed = read_markdown(file_path) if not parsed: return {**result, "error": "Empty file"} fm = normalize_frontmatter(parsed.frontmatter) validate_required_frontmatter(fm) - except Exception as e: return {**result, "error": f"Validation failed: {str(e)}"} + except Exception as e: + return {**result, "error": f"Validation failed: {str(e)}"} if fm.get("status", "draft").lower().strip() in ["system", "template", "archive", "hidden"]: return {**result, "status": "skipped", "reason": "lifecycle_filter"} - # 2. Payload & Change Detection + # 2. Payload & Change Detection (Multi-Hash) note_type = resolve_note_type(self.registry, fm.get("type")) note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, hash_source=hash_source, hash_normalize=hash_normalize) note_id = note_pl["note_id"] @@ -103,9 +122,10 @@ class IngestionService: if not (force_replace or not old_payload or old_hash != new_hash or c_miss or e_miss): return {**result, "status": "unchanged", "note_id": note_id} - if not apply: return {**result, "status": "dry-run", "changed": True, "note_id": note_id} + if not apply: + return {**result, "status": "dry-run", "changed": True, "note_id": note_id} - # 3. Processing + # 3. Deep Processing (Chunking, Validation, Embedding) try: body_text = getattr(parsed, "body", "") or "" edge_registry.ensure_latest() @@ -113,40 +133,64 @@ class IngestionService: chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type) enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False) + # WP-15b: Chunker-Aufruf bereitet Candidate-Pool vor chunks = await assemble_chunks(fm["id"], body_text, note_type, config=chunk_cfg) for ch in chunks: filtered = [] for cand in getattr(ch, "candidate_pool", []): + # Nur global_pool Kandidaten erfordern binäre Validierung if cand.get("provenance") == "global_pool" and enable_smart: if await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm, self.settings.MINDNET_LLM_PROVIDER): filtered.append(cand) - else: filtered.append(cand) + else: + filtered.append(cand) ch.candidate_pool = filtered - chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text) + # Payload-Erstellung via interne Module + chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path) vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] + # Kanten-Aggregation edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", [])) for e in edges: - e["kind"] = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"), context={"file": file_path, "note_id": note_id}) + e["kind"] = edge_registry.resolve( + e.get("kind", "related_to"), + provenance=e.get("provenance", "explicit"), + context={"file": file_path, "note_id": note_id} + ) # 4. DB Upsert - if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id) + if purge_before and old_payload: + purge_artifacts(self.client, self.prefix, note_id) + n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) upsert_batch(self.client, n_name, n_pts) - if chunk_pls and vecs: upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1]) - if edges: upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, edges)[1]) - return {"path": file_path, "status": "success", "changed": True, "note_id": note_id, "chunks_count": len(chunk_pls), "edges_count": len(edges)} + if chunk_pls and vecs: + c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1] + upsert_batch(self.client, f"{self.prefix}_chunks", c_pts) + + if edges: + e_pts = points_for_edges(self.prefix, edges)[1] + upsert_batch(self.client, f"{self.prefix}_edges", e_pts) + + return { + "path": file_path, + "status": "success", + "changed": True, + "note_id": note_id, + "chunks_count": len(chunk_pls), + "edges_count": len(edges) + } except Exception as e: logger.error(f"Processing failed: {e}", exc_info=True) return {**result, "error": str(e)} async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]: - import os - target_dir = os.path.join(vault_root, folder) - os.makedirs(target_dir, exist_ok=True) - file_path = os.path.join(target_dir, filename) - with open(file_path, "w", encoding="utf-8") as f: f.write(markdown_content) + """Erstellt eine Note aus einem Textstream und triggert die Ingestion.""" + target_path = os.path.join(vault_root, folder, filename) + os.makedirs(os.path.dirname(target_path), exist_ok=True) + with open(target_path, "w", encoding="utf-8") as f: + f.write(markdown_content) await asyncio.sleep(0.1) - return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) \ No newline at end of file + return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) \ No newline at end of file