From 6aa6b32a6cb9c874b5db000dfedf7a11ddc24d64 Mon Sep 17 00:00:00 2001 From: Lars Date: Tue, 30 Dec 2025 08:40:19 +0100 Subject: [PATCH] Update chunking system to version 3.9.9, synchronizing parameters with the orchestrator and enhancing edge detection. Implement robust parsing to prevent duplicate edges in section propagation. Adjust comments for clarity and consistency across the codebase. --- app/core/chunking/chunking_processor.py | 20 ++++++++---- app/core/chunking/chunking_propagation.py | 19 +++++++---- app/core/chunking/chunking_strategies.py | 22 +++++++------ app/core/ingestion/ingestion_note_payload.py | 2 ++ app/core/ingestion/ingestion_processor.py | 34 ++++++++++++++++---- 5 files changed, 68 insertions(+), 29 deletions(-) diff --git a/app/core/chunking/chunking_processor.py b/app/core/chunking/chunking_processor.py index 1a17acb..26c2b68 100644 --- a/app/core/chunking/chunking_processor.py +++ b/app/core/chunking/chunking_processor.py @@ -1,7 +1,8 @@ """ FILE: app/core/chunking/chunking_processor.py DESCRIPTION: Der zentrale Orchestrator für das Chunking-System. - AUDIT v3.3.3: Wiederherstellung der "Gold-Standard" Qualität. + AUDIT v3.3.4: Wiederherstellung der "Gold-Standard" Qualität. + - Fix: Synchronisierung der Parameter (context_prefix) für alle Strategien. - Integriert physikalische Kanten-Injektion (Propagierung). - Stellt H1-Kontext-Fenster sicher. - Baut den Candidate-Pool für die WP-15b Ingestion auf. @@ -30,16 +31,19 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op fm, body_text = extract_frontmatter_from_text(md_text) blocks, doc_title = parse_blocks(md_text) - # Vorbereitung des H1-Präfix für die Embedding-Fenster + # Vorbereitung des H1-Präfix für die Embedding-Fenster (Breadcrumbs) h1_prefix = f"# {doc_title}" if doc_title else "" # 2. Anwendung der Splitting-Strategie - # Wir übergeben den Dokument-Titel/Präfix für die Window-Bildung. + # Alle Strategien nutzen nun einheitlich context_prefix für die Window-Bildung. if config.get("strategy") == "by_heading": - chunks = await asyncio.to_thread(strategy_by_heading, blocks, config, note_id, doc_title) + chunks = await asyncio.to_thread( + strategy_by_heading, blocks, config, note_id, context_prefix=h1_prefix + ) else: - # sliding_window nutzt nun den context_prefix für das Window-Feld. - chunks = await asyncio.to_thread(strategy_sliding_window, blocks, config, note_id, context_prefix=h1_prefix) + chunks = await asyncio.to_thread( + strategy_sliding_window, blocks, config, note_id, context_prefix=h1_prefix + ) if not chunks: return [] @@ -52,6 +56,7 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op # Zuerst die explizit im Text vorhandenen Kanten sammeln. for ch in chunks: # Wir extrahieren aus dem bereits (durch Propagation) angereicherten Text. + # ch.candidate_pool wird im Modell-Konstruktor als leere Liste initialisiert. for e_str in parse_edges_robust(ch.text): parts = e_str.split(':', 1) if len(parts) == 2: @@ -71,7 +76,7 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op parts = e_str.split(':', 1) if len(parts) == 2: k, t = parts - # Diese Kanten werden als "Global Pool" markiert für die spätere KI-Prüfung. + # Diese Kanten werden als "global_pool" markiert für die spätere KI-Prüfung. for ch in chunks: ch.candidate_pool.append({"kind": k, "to": t, "provenance": "global_pool"}) @@ -80,6 +85,7 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op seen = set() unique = [] for c in ch.candidate_pool: + # Eindeutigkeit über Typ, Ziel und Herkunft (Provenance) key = (c["kind"], c["to"], c["provenance"]) if key not in seen: seen.add(key) diff --git a/app/core/chunking/chunking_propagation.py b/app/core/chunking/chunking_propagation.py index af68442..890b89e 100644 --- a/app/core/chunking/chunking_propagation.py +++ b/app/core/chunking/chunking_propagation.py @@ -1,7 +1,8 @@ """ FILE: app/core/chunking/chunking_propagation.py DESCRIPTION: Injiziert Sektions-Kanten physisch in den Text (Embedding-Enrichment). - Fix v3.3.5: Erkennt Wikilink-Targets, um Dopplungen zu verhindern. + Fix v3.3.6: Nutzt robustes Parsing zur Erkennung vorhandener Kanten, + um Dopplungen direkt hinter [!edge] Callouts format-agnostisch zu verhindern. """ from typing import List, Dict, Set from .chunking_models import Chunk @@ -34,15 +35,19 @@ def propagate_section_edges(chunks: List[Chunk]) -> List[Chunk]: if not edges_to_add: continue + # Vorhandene Kanten (Typ:Ziel) in DIESEM Chunk ermitteln, + # um Dopplungen (z.B. durch Callouts) zu vermeiden. + existing_edges = parse_edges_robust(ch.text) + injections = [] - for e_str in edges_to_add: - kind, target = e_str.split(':', 1) - - # DER FIX: Wir prüfen, ob das Ziel (target) bereits im Text vorkommt. - # Wir suchen nach [[target]] (Callout-Stil) oder |target]] (Rel-Stil). - if f"[[{target}]]" in ch.text or f"|{target}]]" in ch.text: + # Sortierung für deterministische Ergebnisse + for e_str in sorted(list(edges_to_add)): + # Wenn die Kante (Typ + Ziel) bereits vorhanden ist (egal welches Format), + # überspringen wir die Injektion für diesen Chunk. + if e_str in existing_edges: continue + kind, target = e_str.split(':', 1) injections.append(f"[[rel:{kind}|{target}]]") if injections: diff --git a/app/core/chunking/chunking_strategies.py b/app/core/chunking/chunking_strategies.py index e16121a..5ca68fe 100644 --- a/app/core/chunking/chunking_strategies.py +++ b/app/core/chunking/chunking_strategies.py @@ -1,25 +1,29 @@ """ FILE: app/core/chunking/chunking_strategies.py -DESCRIPTION: Strategien für atomares Sektions-Chunking v3.9.8. +DESCRIPTION: Strategien für atomares Sektions-Chunking v3.9.9. Implementiert das 'Pack-and-Carry-Over' Verfahren nach Regel 1-3. - Keine redundante Kanten-Injektion. - Strikte Einhaltung von Sektionsgrenzen via Look-Ahead. + - Fix: Synchronisierung der Parameter mit dem Orchestrator (context_prefix). """ from typing import List, Dict, Any, Optional from .chunking_models import RawBlock, Chunk from .chunking_utils import estimate_tokens from .chunking_parser import split_sentences -def _create_win(doc_title: str, sec_title: Optional[str], text: str) -> str: +def _create_win(context_prefix: str, sec_title: Optional[str], text: str) -> str: """Baut den Breadcrumb-Kontext für das Embedding-Fenster.""" - parts = [doc_title] if doc_title else [] - if sec_title and sec_title != doc_title: parts.append(sec_title) + parts = [context_prefix] if context_prefix else [] + # Verhindert Dopplung, falls der Context-Prefix (H1) bereits den Sektionsnamen enthält + if sec_title and f"# {sec_title}" != context_prefix and sec_title not in (context_prefix or ""): + parts.append(sec_title) prefix = " > ".join(parts) return f"{prefix}\n{text}".strip() if prefix else text -def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: +def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]: """ Universelle Heading-Strategie mit Carry-Over Logik. + Synchronisiert auf context_prefix für Kompatibilität mit dem Orchestrator. """ smart_edge = config.get("enable_smart_edge_allocation", True) strict = config.get("strict_heading_split", False) @@ -34,7 +38,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: def _emit(txt, title, path): """Schreibt den finalen Chunk ohne Text-Modifikationen.""" idx = len(chunks) - win = _create_win(doc_title, title, txt) + win = _create_win(context_prefix, title, txt) chunks.append(Chunk( id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), @@ -139,7 +143,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: return chunks -def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: +def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]: """Standard-Sliding-Window für flache Texte ohne Sektionsfokus.""" target = config.get("target", 400); max_tokens = config.get("max", 600) chunks: List[Chunk] = []; buf: List[RawBlock] = [] @@ -149,14 +153,14 @@ def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note curr_tokens = sum(estimate_tokens(x.text) for x in buf) if buf else 0 if curr_tokens + b_tokens > max_tokens and buf: txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) - win = _create_win(doc_title, buf[0].section_title, txt) + win = _create_win(context_prefix, buf[0].section_title, txt) chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=curr_tokens, section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) buf = [] buf.append(b) if buf: txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) - win = _create_win(doc_title, buf[0].section_title, txt) + win = _create_win(context_prefix, buf[0].section_title, txt) chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) return chunks \ No newline at end of file diff --git a/app/core/ingestion/ingestion_note_payload.py b/app/core/ingestion/ingestion_note_payload.py index 3df4d4a..5d30707 100644 --- a/app/core/ingestion/ingestion_note_payload.py +++ b/app/core/ingestion/ingestion_note_payload.py @@ -54,6 +54,7 @@ def _get_hash_source_content(n: Dict[str, Any], mode: str) -> str: fm = n.get("frontmatter") or {} meta_parts = [] # Wir inkludieren alle Felder, die das Chunking oder Retrieval beeinflussen + # Jede Änderung hier führt nun zwingend zu einem neuen Full-Hash keys = [ "title", "type", "status", "tags", "chunking_profile", "chunk_profile", @@ -143,6 +144,7 @@ def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]: } # --- MULTI-HASH --- + # Generiert Hashes für Change Detection (WP-15b) for mode in ["body", "full"]: content = _get_hash_source_content(n, mode) payload["hashes"][f"{mode}:{hash_source}:{hash_normalize}"] = _compute_hash(content) diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 8d6114d..e868401 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -4,8 +4,8 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-14: Modularisierung der Datenbank-Ebene (app.core.database). WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v2.13.10: Umstellung auf app.core.database Infrastruktur. -VERSION: 2.13.10 + AUDIT v2.13.11: Synchronisierung mit Atomic-Chunking v3.9.9. +VERSION: 2.13.11 STATUS: Active """ import logging @@ -60,6 +60,7 @@ class IngestionService: self.embedder = EmbeddingsClient() self.llm = LLMService() + # Festlegen, welcher Hash für die Change-Detection maßgeblich ist self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache @@ -130,12 +131,18 @@ class IngestionService: ) note_id = note_pl["note_id"] + # Abgleich mit der Datenbank (Qdrant) old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) + + # Prüfung gegen den konfigurierten Hash-Modus (body vs. full) check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}" old_hash = (old_payload or {}).get("hashes", {}).get(check_key) new_hash = note_pl.get("hashes", {}).get(check_key) + # Check ob Chunks oder Kanten in der DB fehlen (Reparatur-Modus) c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id) + + # Wenn Hash identisch und Artefakte vorhanden -> Skip if not (force_replace or not old_payload or old_hash != new_hash or c_miss or e_miss): return {**result, "status": "unchanged", "note_id": note_id} @@ -146,36 +153,46 @@ class IngestionService: try: body_text = getattr(parsed, "body", "") or "" edge_registry.ensure_latest() + + # Profil-Auflösung via Registry profile = fm.get("chunk_profile") or fm.get("chunking_profile") or "sliding_standard" chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type) enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False) - # WP-15b: Chunker-Aufruf bereitet Candidate-Pool vor + # WP-15b: Chunker-Aufruf bereitet den Candidate-Pool pro Chunk vor. + # assemble_chunks (v3.3.4) führt intern auch die Propagierung durch. chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg) + + # Semantische Kanten-Validierung (Smart Edge Allocation) for ch in chunks: filtered = [] for cand in getattr(ch, "candidate_pool", []): - # WP-15b: Nur global_pool Kandidaten erfordern binäre Validierung + # Nur global_pool Kandidaten (aus dem Pool am Ende) erfordern KI-Validierung if cand.get("provenance") == "global_pool" and enable_smart: if await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm, self.settings.MINDNET_LLM_PROVIDER): filtered.append(cand) else: + # Explizite Kanten (Wikilinks/Callouts) werden ungeprüft übernommen filtered.append(cand) ch.candidate_pool = filtered - # Payload-Erstellung via interne Module + # Payload-Erstellung für die Chunks chunk_pls = make_chunk_payloads( fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry ) + + # Vektorisierung der Fenster-Texte vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] - # Kanten-Aggregation + # Aggregation aller finalen Kanten (Edges) edges = build_edges_for_note( note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs ) + + # Kanten-Typen via Registry validieren/auflösen for e in edges: e["kind"] = edge_registry.resolve( e.get("kind", "related_to"), @@ -184,16 +201,20 @@ class IngestionService: ) # 4. DB Upsert via modularisierter Points-Logik + # WICHTIG: Wenn sich der Inhalt geändert hat, löschen wir erst alle alten Fragmente. if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id) + # Speichern der Haupt-Note n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) upsert_batch(self.client, n_name, n_pts) + # Speichern der Chunks if chunk_pls and vecs: c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1] upsert_batch(self.client, f"{self.prefix}_chunks", c_pts) + # Speichern der Kanten if edges: e_pts = points_for_edges(self.prefix, edges)[1] upsert_batch(self.client, f"{self.prefix}_edges", e_pts) @@ -217,4 +238,5 @@ class IngestionService: with open(target_path, "w", encoding="utf-8") as f: f.write(markdown_content) await asyncio.sleep(0.1) + # Triggert sofortigen Import mit force_replace/purge_before return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) \ No newline at end of file