Update chunking system to version 3.9.9, synchronizing parameters with the orchestrator and enhancing edge detection. Implement robust parsing to prevent duplicate edges in section propagation. Adjust comments for clarity and consistency across the codebase.

This commit is contained in:
Lars 2025-12-30 08:40:19 +01:00
parent 33dff04d47
commit 6aa6b32a6c
5 changed files with 68 additions and 29 deletions

View File

@ -1,7 +1,8 @@
""" """
FILE: app/core/chunking/chunking_processor.py FILE: app/core/chunking/chunking_processor.py
DESCRIPTION: Der zentrale Orchestrator für das Chunking-System. DESCRIPTION: Der zentrale Orchestrator für das Chunking-System.
AUDIT v3.3.3: Wiederherstellung der "Gold-Standard" Qualität. AUDIT v3.3.4: Wiederherstellung der "Gold-Standard" Qualität.
- Fix: Synchronisierung der Parameter (context_prefix) für alle Strategien.
- Integriert physikalische Kanten-Injektion (Propagierung). - Integriert physikalische Kanten-Injektion (Propagierung).
- Stellt H1-Kontext-Fenster sicher. - Stellt H1-Kontext-Fenster sicher.
- Baut den Candidate-Pool für die WP-15b Ingestion auf. - Baut den Candidate-Pool für die WP-15b Ingestion auf.
@ -30,16 +31,19 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op
fm, body_text = extract_frontmatter_from_text(md_text) fm, body_text = extract_frontmatter_from_text(md_text)
blocks, doc_title = parse_blocks(md_text) blocks, doc_title = parse_blocks(md_text)
# Vorbereitung des H1-Präfix für die Embedding-Fenster # Vorbereitung des H1-Präfix für die Embedding-Fenster (Breadcrumbs)
h1_prefix = f"# {doc_title}" if doc_title else "" h1_prefix = f"# {doc_title}" if doc_title else ""
# 2. Anwendung der Splitting-Strategie # 2. Anwendung der Splitting-Strategie
# Wir übergeben den Dokument-Titel/Präfix für die Window-Bildung. # Alle Strategien nutzen nun einheitlich context_prefix für die Window-Bildung.
if config.get("strategy") == "by_heading": if config.get("strategy") == "by_heading":
chunks = await asyncio.to_thread(strategy_by_heading, blocks, config, note_id, doc_title) chunks = await asyncio.to_thread(
strategy_by_heading, blocks, config, note_id, context_prefix=h1_prefix
)
else: else:
# sliding_window nutzt nun den context_prefix für das Window-Feld. chunks = await asyncio.to_thread(
chunks = await asyncio.to_thread(strategy_sliding_window, blocks, config, note_id, context_prefix=h1_prefix) strategy_sliding_window, blocks, config, note_id, context_prefix=h1_prefix
)
if not chunks: if not chunks:
return [] return []
@ -52,6 +56,7 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op
# Zuerst die explizit im Text vorhandenen Kanten sammeln. # Zuerst die explizit im Text vorhandenen Kanten sammeln.
for ch in chunks: for ch in chunks:
# Wir extrahieren aus dem bereits (durch Propagation) angereicherten Text. # Wir extrahieren aus dem bereits (durch Propagation) angereicherten Text.
# ch.candidate_pool wird im Modell-Konstruktor als leere Liste initialisiert.
for e_str in parse_edges_robust(ch.text): for e_str in parse_edges_robust(ch.text):
parts = e_str.split(':', 1) parts = e_str.split(':', 1)
if len(parts) == 2: if len(parts) == 2:
@ -71,7 +76,7 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op
parts = e_str.split(':', 1) parts = e_str.split(':', 1)
if len(parts) == 2: if len(parts) == 2:
k, t = parts k, t = parts
# Diese Kanten werden als "Global Pool" markiert für die spätere KI-Prüfung. # Diese Kanten werden als "global_pool" markiert für die spätere KI-Prüfung.
for ch in chunks: for ch in chunks:
ch.candidate_pool.append({"kind": k, "to": t, "provenance": "global_pool"}) ch.candidate_pool.append({"kind": k, "to": t, "provenance": "global_pool"})
@ -80,6 +85,7 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op
seen = set() seen = set()
unique = [] unique = []
for c in ch.candidate_pool: for c in ch.candidate_pool:
# Eindeutigkeit über Typ, Ziel und Herkunft (Provenance)
key = (c["kind"], c["to"], c["provenance"]) key = (c["kind"], c["to"], c["provenance"])
if key not in seen: if key not in seen:
seen.add(key) seen.add(key)

View File

@ -1,7 +1,8 @@
""" """
FILE: app/core/chunking/chunking_propagation.py FILE: app/core/chunking/chunking_propagation.py
DESCRIPTION: Injiziert Sektions-Kanten physisch in den Text (Embedding-Enrichment). DESCRIPTION: Injiziert Sektions-Kanten physisch in den Text (Embedding-Enrichment).
Fix v3.3.5: Erkennt Wikilink-Targets, um Dopplungen zu verhindern. Fix v3.3.6: Nutzt robustes Parsing zur Erkennung vorhandener Kanten,
um Dopplungen direkt hinter [!edge] Callouts format-agnostisch zu verhindern.
""" """
from typing import List, Dict, Set from typing import List, Dict, Set
from .chunking_models import Chunk from .chunking_models import Chunk
@ -34,15 +35,19 @@ def propagate_section_edges(chunks: List[Chunk]) -> List[Chunk]:
if not edges_to_add: if not edges_to_add:
continue continue
injections = [] # Vorhandene Kanten (Typ:Ziel) in DIESEM Chunk ermitteln,
for e_str in edges_to_add: # um Dopplungen (z.B. durch Callouts) zu vermeiden.
kind, target = e_str.split(':', 1) existing_edges = parse_edges_robust(ch.text)
# DER FIX: Wir prüfen, ob das Ziel (target) bereits im Text vorkommt. injections = []
# Wir suchen nach [[target]] (Callout-Stil) oder |target]] (Rel-Stil). # Sortierung für deterministische Ergebnisse
if f"[[{target}]]" in ch.text or f"|{target}]]" in ch.text: for e_str in sorted(list(edges_to_add)):
# Wenn die Kante (Typ + Ziel) bereits vorhanden ist (egal welches Format),
# überspringen wir die Injektion für diesen Chunk.
if e_str in existing_edges:
continue continue
kind, target = e_str.split(':', 1)
injections.append(f"[[rel:{kind}|{target}]]") injections.append(f"[[rel:{kind}|{target}]]")
if injections: if injections:

View File

@ -1,25 +1,29 @@
""" """
FILE: app/core/chunking/chunking_strategies.py FILE: app/core/chunking/chunking_strategies.py
DESCRIPTION: Strategien für atomares Sektions-Chunking v3.9.8. DESCRIPTION: Strategien für atomares Sektions-Chunking v3.9.9.
Implementiert das 'Pack-and-Carry-Over' Verfahren nach Regel 1-3. Implementiert das 'Pack-and-Carry-Over' Verfahren nach Regel 1-3.
- Keine redundante Kanten-Injektion. - Keine redundante Kanten-Injektion.
- Strikte Einhaltung von Sektionsgrenzen via Look-Ahead. - Strikte Einhaltung von Sektionsgrenzen via Look-Ahead.
- Fix: Synchronisierung der Parameter mit dem Orchestrator (context_prefix).
""" """
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from .chunking_models import RawBlock, Chunk from .chunking_models import RawBlock, Chunk
from .chunking_utils import estimate_tokens from .chunking_utils import estimate_tokens
from .chunking_parser import split_sentences from .chunking_parser import split_sentences
def _create_win(doc_title: str, sec_title: Optional[str], text: str) -> str: def _create_win(context_prefix: str, sec_title: Optional[str], text: str) -> str:
"""Baut den Breadcrumb-Kontext für das Embedding-Fenster.""" """Baut den Breadcrumb-Kontext für das Embedding-Fenster."""
parts = [doc_title] if doc_title else [] parts = [context_prefix] if context_prefix else []
if sec_title and sec_title != doc_title: parts.append(sec_title) # Verhindert Dopplung, falls der Context-Prefix (H1) bereits den Sektionsnamen enthält
if sec_title and f"# {sec_title}" != context_prefix and sec_title not in (context_prefix or ""):
parts.append(sec_title)
prefix = " > ".join(parts) prefix = " > ".join(parts)
return f"{prefix}\n{text}".strip() if prefix else text return f"{prefix}\n{text}".strip() if prefix else text
def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]:
""" """
Universelle Heading-Strategie mit Carry-Over Logik. Universelle Heading-Strategie mit Carry-Over Logik.
Synchronisiert auf context_prefix für Kompatibilität mit dem Orchestrator.
""" """
smart_edge = config.get("enable_smart_edge_allocation", True) smart_edge = config.get("enable_smart_edge_allocation", True)
strict = config.get("strict_heading_split", False) strict = config.get("strict_heading_split", False)
@ -34,7 +38,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id:
def _emit(txt, title, path): def _emit(txt, title, path):
"""Schreibt den finalen Chunk ohne Text-Modifikationen.""" """Schreibt den finalen Chunk ohne Text-Modifikationen."""
idx = len(chunks) idx = len(chunks)
win = _create_win(doc_title, title, txt) win = _create_win(context_prefix, title, txt)
chunks.append(Chunk( chunks.append(Chunk(
id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx,
text=txt, window=win, token_count=estimate_tokens(txt), text=txt, window=win, token_count=estimate_tokens(txt),
@ -139,7 +143,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id:
return chunks return chunks
def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]:
"""Standard-Sliding-Window für flache Texte ohne Sektionsfokus.""" """Standard-Sliding-Window für flache Texte ohne Sektionsfokus."""
target = config.get("target", 400); max_tokens = config.get("max", 600) target = config.get("target", 400); max_tokens = config.get("max", 600)
chunks: List[Chunk] = []; buf: List[RawBlock] = [] chunks: List[Chunk] = []; buf: List[RawBlock] = []
@ -149,14 +153,14 @@ def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note
curr_tokens = sum(estimate_tokens(x.text) for x in buf) if buf else 0 curr_tokens = sum(estimate_tokens(x.text) for x in buf) if buf else 0
if curr_tokens + b_tokens > max_tokens and buf: if curr_tokens + b_tokens > max_tokens and buf:
txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) txt = "\n\n".join([x.text for x in buf]); idx = len(chunks)
win = _create_win(doc_title, buf[0].section_title, txt) win = _create_win(context_prefix, buf[0].section_title, txt)
chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=curr_tokens, section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=curr_tokens, section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None))
buf = [] buf = []
buf.append(b) buf.append(b)
if buf: if buf:
txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) txt = "\n\n".join([x.text for x in buf]); idx = len(chunks)
win = _create_win(doc_title, buf[0].section_title, txt) win = _create_win(context_prefix, buf[0].section_title, txt)
chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None))
return chunks return chunks

View File

@ -54,6 +54,7 @@ def _get_hash_source_content(n: Dict[str, Any], mode: str) -> str:
fm = n.get("frontmatter") or {} fm = n.get("frontmatter") or {}
meta_parts = [] meta_parts = []
# Wir inkludieren alle Felder, die das Chunking oder Retrieval beeinflussen # Wir inkludieren alle Felder, die das Chunking oder Retrieval beeinflussen
# Jede Änderung hier führt nun zwingend zu einem neuen Full-Hash
keys = [ keys = [
"title", "type", "status", "tags", "title", "type", "status", "tags",
"chunking_profile", "chunk_profile", "chunking_profile", "chunk_profile",
@ -143,6 +144,7 @@ def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]:
} }
# --- MULTI-HASH --- # --- MULTI-HASH ---
# Generiert Hashes für Change Detection (WP-15b)
for mode in ["body", "full"]: for mode in ["body", "full"]:
content = _get_hash_source_content(n, mode) content = _get_hash_source_content(n, mode)
payload["hashes"][f"{mode}:{hash_source}:{hash_normalize}"] = _compute_hash(content) payload["hashes"][f"{mode}:{hash_source}:{hash_normalize}"] = _compute_hash(content)

View File

@ -4,8 +4,8 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-14: Modularisierung der Datenbank-Ebene (app.core.database). WP-14: Modularisierung der Datenbank-Ebene (app.core.database).
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v2.13.10: Umstellung auf app.core.database Infrastruktur. AUDIT v2.13.11: Synchronisierung mit Atomic-Chunking v3.9.9.
VERSION: 2.13.10 VERSION: 2.13.11
STATUS: Active STATUS: Active
""" """
import logging import logging
@ -60,6 +60,7 @@ class IngestionService:
self.embedder = EmbeddingsClient() self.embedder = EmbeddingsClient()
self.llm = LLMService() self.llm = LLMService()
# Festlegen, welcher Hash für die Change-Detection maßgeblich ist
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache
@ -130,12 +131,18 @@ class IngestionService:
) )
note_id = note_pl["note_id"] note_id = note_pl["note_id"]
# Abgleich mit der Datenbank (Qdrant)
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
# Prüfung gegen den konfigurierten Hash-Modus (body vs. full)
check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}" check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}"
old_hash = (old_payload or {}).get("hashes", {}).get(check_key) old_hash = (old_payload or {}).get("hashes", {}).get(check_key)
new_hash = note_pl.get("hashes", {}).get(check_key) new_hash = note_pl.get("hashes", {}).get(check_key)
# Check ob Chunks oder Kanten in der DB fehlen (Reparatur-Modus)
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id) c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
# Wenn Hash identisch und Artefakte vorhanden -> Skip
if not (force_replace or not old_payload or old_hash != new_hash or c_miss or e_miss): if not (force_replace or not old_payload or old_hash != new_hash or c_miss or e_miss):
return {**result, "status": "unchanged", "note_id": note_id} return {**result, "status": "unchanged", "note_id": note_id}
@ -146,36 +153,46 @@ class IngestionService:
try: try:
body_text = getattr(parsed, "body", "") or "" body_text = getattr(parsed, "body", "") or ""
edge_registry.ensure_latest() edge_registry.ensure_latest()
# Profil-Auflösung via Registry
profile = fm.get("chunk_profile") or fm.get("chunking_profile") or "sliding_standard" profile = fm.get("chunk_profile") or fm.get("chunking_profile") or "sliding_standard"
chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type) chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type)
enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False) enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False)
# WP-15b: Chunker-Aufruf bereitet Candidate-Pool vor # WP-15b: Chunker-Aufruf bereitet den Candidate-Pool pro Chunk vor.
# assemble_chunks (v3.3.4) führt intern auch die Propagierung durch.
chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg) chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg)
# Semantische Kanten-Validierung (Smart Edge Allocation)
for ch in chunks: for ch in chunks:
filtered = [] filtered = []
for cand in getattr(ch, "candidate_pool", []): for cand in getattr(ch, "candidate_pool", []):
# WP-15b: Nur global_pool Kandidaten erfordern binäre Validierung # Nur global_pool Kandidaten (aus dem Pool am Ende) erfordern KI-Validierung
if cand.get("provenance") == "global_pool" and enable_smart: if cand.get("provenance") == "global_pool" and enable_smart:
if await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm, self.settings.MINDNET_LLM_PROVIDER): if await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm, self.settings.MINDNET_LLM_PROVIDER):
filtered.append(cand) filtered.append(cand)
else: else:
# Explizite Kanten (Wikilinks/Callouts) werden ungeprüft übernommen
filtered.append(cand) filtered.append(cand)
ch.candidate_pool = filtered ch.candidate_pool = filtered
# Payload-Erstellung via interne Module # Payload-Erstellung für die Chunks
chunk_pls = make_chunk_payloads( chunk_pls = make_chunk_payloads(
fm, note_pl["path"], chunks, file_path=file_path, fm, note_pl["path"], chunks, file_path=file_path,
types_cfg=self.registry types_cfg=self.registry
) )
# Vektorisierung der Fenster-Texte
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
# Kanten-Aggregation # Aggregation aller finalen Kanten (Edges)
edges = build_edges_for_note( edges = build_edges_for_note(
note_id, chunk_pls, note_id, chunk_pls,
note_level_references=note_pl.get("references", []), note_level_references=note_pl.get("references", []),
include_note_scope_refs=note_scope_refs include_note_scope_refs=note_scope_refs
) )
# Kanten-Typen via Registry validieren/auflösen
for e in edges: for e in edges:
e["kind"] = edge_registry.resolve( e["kind"] = edge_registry.resolve(
e.get("kind", "related_to"), e.get("kind", "related_to"),
@ -184,16 +201,20 @@ class IngestionService:
) )
# 4. DB Upsert via modularisierter Points-Logik # 4. DB Upsert via modularisierter Points-Logik
# WICHTIG: Wenn sich der Inhalt geändert hat, löschen wir erst alle alten Fragmente.
if purge_before and old_payload: if purge_before and old_payload:
purge_artifacts(self.client, self.prefix, note_id) purge_artifacts(self.client, self.prefix, note_id)
# Speichern der Haupt-Note
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
upsert_batch(self.client, n_name, n_pts) upsert_batch(self.client, n_name, n_pts)
# Speichern der Chunks
if chunk_pls and vecs: if chunk_pls and vecs:
c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1] c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1]
upsert_batch(self.client, f"{self.prefix}_chunks", c_pts) upsert_batch(self.client, f"{self.prefix}_chunks", c_pts)
# Speichern der Kanten
if edges: if edges:
e_pts = points_for_edges(self.prefix, edges)[1] e_pts = points_for_edges(self.prefix, edges)[1]
upsert_batch(self.client, f"{self.prefix}_edges", e_pts) upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
@ -217,4 +238,5 @@ class IngestionService:
with open(target_path, "w", encoding="utf-8") as f: with open(target_path, "w", encoding="utf-8") as f:
f.write(markdown_content) f.write(markdown_content)
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
# Triggert sofortigen Import mit force_replace/purge_before
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)