diff --git a/app/core/chunking/chunking_processor.py b/app/core/chunking/chunking_processor.py index 1a17acb..26c2b68 100644 --- a/app/core/chunking/chunking_processor.py +++ b/app/core/chunking/chunking_processor.py @@ -1,7 +1,8 @@ """ FILE: app/core/chunking/chunking_processor.py DESCRIPTION: Der zentrale Orchestrator für das Chunking-System. - AUDIT v3.3.3: Wiederherstellung der "Gold-Standard" Qualität. + AUDIT v3.3.4: Wiederherstellung der "Gold-Standard" Qualität. + - Fix: Synchronisierung der Parameter (context_prefix) für alle Strategien. - Integriert physikalische Kanten-Injektion (Propagierung). - Stellt H1-Kontext-Fenster sicher. - Baut den Candidate-Pool für die WP-15b Ingestion auf. @@ -30,16 +31,19 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op fm, body_text = extract_frontmatter_from_text(md_text) blocks, doc_title = parse_blocks(md_text) - # Vorbereitung des H1-Präfix für die Embedding-Fenster + # Vorbereitung des H1-Präfix für die Embedding-Fenster (Breadcrumbs) h1_prefix = f"# {doc_title}" if doc_title else "" # 2. Anwendung der Splitting-Strategie - # Wir übergeben den Dokument-Titel/Präfix für die Window-Bildung. + # Alle Strategien nutzen nun einheitlich context_prefix für die Window-Bildung. if config.get("strategy") == "by_heading": - chunks = await asyncio.to_thread(strategy_by_heading, blocks, config, note_id, doc_title) + chunks = await asyncio.to_thread( + strategy_by_heading, blocks, config, note_id, context_prefix=h1_prefix + ) else: - # sliding_window nutzt nun den context_prefix für das Window-Feld. - chunks = await asyncio.to_thread(strategy_sliding_window, blocks, config, note_id, context_prefix=h1_prefix) + chunks = await asyncio.to_thread( + strategy_sliding_window, blocks, config, note_id, context_prefix=h1_prefix + ) if not chunks: return [] @@ -52,6 +56,7 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op # Zuerst die explizit im Text vorhandenen Kanten sammeln. for ch in chunks: # Wir extrahieren aus dem bereits (durch Propagation) angereicherten Text. + # ch.candidate_pool wird im Modell-Konstruktor als leere Liste initialisiert. for e_str in parse_edges_robust(ch.text): parts = e_str.split(':', 1) if len(parts) == 2: @@ -71,7 +76,7 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op parts = e_str.split(':', 1) if len(parts) == 2: k, t = parts - # Diese Kanten werden als "Global Pool" markiert für die spätere KI-Prüfung. + # Diese Kanten werden als "global_pool" markiert für die spätere KI-Prüfung. for ch in chunks: ch.candidate_pool.append({"kind": k, "to": t, "provenance": "global_pool"}) @@ -80,6 +85,7 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op seen = set() unique = [] for c in ch.candidate_pool: + # Eindeutigkeit über Typ, Ziel und Herkunft (Provenance) key = (c["kind"], c["to"], c["provenance"]) if key not in seen: seen.add(key) diff --git a/app/core/chunking/chunking_propagation.py b/app/core/chunking/chunking_propagation.py index af68442..890b89e 100644 --- a/app/core/chunking/chunking_propagation.py +++ b/app/core/chunking/chunking_propagation.py @@ -1,7 +1,8 @@ """ FILE: app/core/chunking/chunking_propagation.py DESCRIPTION: Injiziert Sektions-Kanten physisch in den Text (Embedding-Enrichment). - Fix v3.3.5: Erkennt Wikilink-Targets, um Dopplungen zu verhindern. + Fix v3.3.6: Nutzt robustes Parsing zur Erkennung vorhandener Kanten, + um Dopplungen direkt hinter [!edge] Callouts format-agnostisch zu verhindern. """ from typing import List, Dict, Set from .chunking_models import Chunk @@ -34,15 +35,19 @@ def propagate_section_edges(chunks: List[Chunk]) -> List[Chunk]: if not edges_to_add: continue + # Vorhandene Kanten (Typ:Ziel) in DIESEM Chunk ermitteln, + # um Dopplungen (z.B. durch Callouts) zu vermeiden. + existing_edges = parse_edges_robust(ch.text) + injections = [] - for e_str in edges_to_add: - kind, target = e_str.split(':', 1) - - # DER FIX: Wir prüfen, ob das Ziel (target) bereits im Text vorkommt. - # Wir suchen nach [[target]] (Callout-Stil) oder |target]] (Rel-Stil). - if f"[[{target}]]" in ch.text or f"|{target}]]" in ch.text: + # Sortierung für deterministische Ergebnisse + for e_str in sorted(list(edges_to_add)): + # Wenn die Kante (Typ + Ziel) bereits vorhanden ist (egal welches Format), + # überspringen wir die Injektion für diesen Chunk. + if e_str in existing_edges: continue + kind, target = e_str.split(':', 1) injections.append(f"[[rel:{kind}|{target}]]") if injections: diff --git a/app/core/chunking/chunking_strategies.py b/app/core/chunking/chunking_strategies.py index e16121a..5ca68fe 100644 --- a/app/core/chunking/chunking_strategies.py +++ b/app/core/chunking/chunking_strategies.py @@ -1,25 +1,29 @@ """ FILE: app/core/chunking/chunking_strategies.py -DESCRIPTION: Strategien für atomares Sektions-Chunking v3.9.8. +DESCRIPTION: Strategien für atomares Sektions-Chunking v3.9.9. Implementiert das 'Pack-and-Carry-Over' Verfahren nach Regel 1-3. - Keine redundante Kanten-Injektion. - Strikte Einhaltung von Sektionsgrenzen via Look-Ahead. + - Fix: Synchronisierung der Parameter mit dem Orchestrator (context_prefix). """ from typing import List, Dict, Any, Optional from .chunking_models import RawBlock, Chunk from .chunking_utils import estimate_tokens from .chunking_parser import split_sentences -def _create_win(doc_title: str, sec_title: Optional[str], text: str) -> str: +def _create_win(context_prefix: str, sec_title: Optional[str], text: str) -> str: """Baut den Breadcrumb-Kontext für das Embedding-Fenster.""" - parts = [doc_title] if doc_title else [] - if sec_title and sec_title != doc_title: parts.append(sec_title) + parts = [context_prefix] if context_prefix else [] + # Verhindert Dopplung, falls der Context-Prefix (H1) bereits den Sektionsnamen enthält + if sec_title and f"# {sec_title}" != context_prefix and sec_title not in (context_prefix or ""): + parts.append(sec_title) prefix = " > ".join(parts) return f"{prefix}\n{text}".strip() if prefix else text -def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: +def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]: """ Universelle Heading-Strategie mit Carry-Over Logik. + Synchronisiert auf context_prefix für Kompatibilität mit dem Orchestrator. """ smart_edge = config.get("enable_smart_edge_allocation", True) strict = config.get("strict_heading_split", False) @@ -34,7 +38,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: def _emit(txt, title, path): """Schreibt den finalen Chunk ohne Text-Modifikationen.""" idx = len(chunks) - win = _create_win(doc_title, title, txt) + win = _create_win(context_prefix, title, txt) chunks.append(Chunk( id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), @@ -139,7 +143,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: return chunks -def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: +def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]: """Standard-Sliding-Window für flache Texte ohne Sektionsfokus.""" target = config.get("target", 400); max_tokens = config.get("max", 600) chunks: List[Chunk] = []; buf: List[RawBlock] = [] @@ -149,14 +153,14 @@ def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note curr_tokens = sum(estimate_tokens(x.text) for x in buf) if buf else 0 if curr_tokens + b_tokens > max_tokens and buf: txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) - win = _create_win(doc_title, buf[0].section_title, txt) + win = _create_win(context_prefix, buf[0].section_title, txt) chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=curr_tokens, section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) buf = [] buf.append(b) if buf: txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) - win = _create_win(doc_title, buf[0].section_title, txt) + win = _create_win(context_prefix, buf[0].section_title, txt) chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) return chunks \ No newline at end of file diff --git a/app/core/ingestion/ingestion_note_payload.py b/app/core/ingestion/ingestion_note_payload.py index 3df4d4a..5d30707 100644 --- a/app/core/ingestion/ingestion_note_payload.py +++ b/app/core/ingestion/ingestion_note_payload.py @@ -54,6 +54,7 @@ def _get_hash_source_content(n: Dict[str, Any], mode: str) -> str: fm = n.get("frontmatter") or {} meta_parts = [] # Wir inkludieren alle Felder, die das Chunking oder Retrieval beeinflussen + # Jede Änderung hier führt nun zwingend zu einem neuen Full-Hash keys = [ "title", "type", "status", "tags", "chunking_profile", "chunk_profile", @@ -143,6 +144,7 @@ def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]: } # --- MULTI-HASH --- + # Generiert Hashes für Change Detection (WP-15b) for mode in ["body", "full"]: content = _get_hash_source_content(n, mode) payload["hashes"][f"{mode}:{hash_source}:{hash_normalize}"] = _compute_hash(content) diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 8d6114d..e868401 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -4,8 +4,8 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-14: Modularisierung der Datenbank-Ebene (app.core.database). WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v2.13.10: Umstellung auf app.core.database Infrastruktur. -VERSION: 2.13.10 + AUDIT v2.13.11: Synchronisierung mit Atomic-Chunking v3.9.9. +VERSION: 2.13.11 STATUS: Active """ import logging @@ -60,6 +60,7 @@ class IngestionService: self.embedder = EmbeddingsClient() self.llm = LLMService() + # Festlegen, welcher Hash für die Change-Detection maßgeblich ist self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache @@ -130,12 +131,18 @@ class IngestionService: ) note_id = note_pl["note_id"] + # Abgleich mit der Datenbank (Qdrant) old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) + + # Prüfung gegen den konfigurierten Hash-Modus (body vs. full) check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}" old_hash = (old_payload or {}).get("hashes", {}).get(check_key) new_hash = note_pl.get("hashes", {}).get(check_key) + # Check ob Chunks oder Kanten in der DB fehlen (Reparatur-Modus) c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id) + + # Wenn Hash identisch und Artefakte vorhanden -> Skip if not (force_replace or not old_payload or old_hash != new_hash or c_miss or e_miss): return {**result, "status": "unchanged", "note_id": note_id} @@ -146,36 +153,46 @@ class IngestionService: try: body_text = getattr(parsed, "body", "") or "" edge_registry.ensure_latest() + + # Profil-Auflösung via Registry profile = fm.get("chunk_profile") or fm.get("chunking_profile") or "sliding_standard" chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type) enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False) - # WP-15b: Chunker-Aufruf bereitet Candidate-Pool vor + # WP-15b: Chunker-Aufruf bereitet den Candidate-Pool pro Chunk vor. + # assemble_chunks (v3.3.4) führt intern auch die Propagierung durch. chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg) + + # Semantische Kanten-Validierung (Smart Edge Allocation) for ch in chunks: filtered = [] for cand in getattr(ch, "candidate_pool", []): - # WP-15b: Nur global_pool Kandidaten erfordern binäre Validierung + # Nur global_pool Kandidaten (aus dem Pool am Ende) erfordern KI-Validierung if cand.get("provenance") == "global_pool" and enable_smart: if await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm, self.settings.MINDNET_LLM_PROVIDER): filtered.append(cand) else: + # Explizite Kanten (Wikilinks/Callouts) werden ungeprüft übernommen filtered.append(cand) ch.candidate_pool = filtered - # Payload-Erstellung via interne Module + # Payload-Erstellung für die Chunks chunk_pls = make_chunk_payloads( fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry ) + + # Vektorisierung der Fenster-Texte vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] - # Kanten-Aggregation + # Aggregation aller finalen Kanten (Edges) edges = build_edges_for_note( note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs ) + + # Kanten-Typen via Registry validieren/auflösen for e in edges: e["kind"] = edge_registry.resolve( e.get("kind", "related_to"), @@ -184,16 +201,20 @@ class IngestionService: ) # 4. DB Upsert via modularisierter Points-Logik + # WICHTIG: Wenn sich der Inhalt geändert hat, löschen wir erst alle alten Fragmente. if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id) + # Speichern der Haupt-Note n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) upsert_batch(self.client, n_name, n_pts) + # Speichern der Chunks if chunk_pls and vecs: c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1] upsert_batch(self.client, f"{self.prefix}_chunks", c_pts) + # Speichern der Kanten if edges: e_pts = points_for_edges(self.prefix, edges)[1] upsert_batch(self.client, f"{self.prefix}_edges", e_pts) @@ -217,4 +238,5 @@ class IngestionService: with open(target_path, "w", encoding="utf-8") as f: f.write(markdown_content) await asyncio.sleep(0.1) + # Triggert sofortigen Import mit force_replace/purge_before return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) \ No newline at end of file