diff --git a/app/core/chunker.py b/app/core/chunker.py index 07b5f47..c77a43c 100644 --- a/app/core/chunker.py +++ b/app/core/chunker.py @@ -1,13 +1,16 @@ """ FILE: app/core/chunker.py DESCRIPTION: Zerlegt Texte in Chunks (Sliding Window oder nach Headings). - Orchestriert die Smart-Edge-Allocation via SemanticAnalyzer. - FIX V3: Support für mehrzeilige Callouts und Section-Propagation. -VERSION: 3.1.0 (Full Compatibility Merge) + WP-15b: Implementiert Edge-Inheritance und Candidate-Pool Vorbereitung. + Zentralisiert die Kanten-Vorbereitung für die spätere binäre Validierung. + Bietet volle Unterstützung für Hybrid-Chunking (Strict/Soft/Safety-Net). +VERSION: 3.2.0 +STATUS: Active +DEPENDENCIES: re, math, yaml, pathlib, asyncio, logging """ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import List, Dict, Optional, Tuple, Any, Set import re import math @@ -17,15 +20,18 @@ import asyncio import logging # Services -from app.services.semantic_analyzer import get_semantic_analyzer +# In WP-15b wird die KI-Validierung in die ingestion.py verlagert. +# Wir behalten den Import für Abwärtskompatibilität, falls Legacy-Skripte ihn benötigen. +try: + from app.services.semantic_analyzer import get_semantic_analyzer +except ImportError: + def get_semantic_analyzer(): return None # Core Imports -# Wir importieren build_edges_for_note nur, um kompatibel zur Signatur zu bleiben -# oder für den Fallback. try: from app.core.derive_edges import build_edges_for_note except ImportError: - # Mock für Tests + # Fallback für Standalone-Betrieb oder Tests def build_edges_for_note(note_id, chunks, note_level_references=None, include_note_scope_refs=False): return [] logger = logging.getLogger(__name__) @@ -54,7 +60,7 @@ def _load_yaml_config() -> Dict[str, Any]: def get_chunk_config(note_type: str) -> Dict[str, Any]: """ Lädt die Chunking-Strategie basierend auf dem Note-Type aus types.yaml. - Dies sichert die Kompatibilität zu WP-15 (Profile). + Sichert die Kompatibilität zu WP-15 Profilen. """ full_config = _load_yaml_config() profiles = full_config.get("chunking_profiles", {}) @@ -75,6 +81,7 @@ def get_chunk_config(note_type: str) -> Dict[str, Any]: return config def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]: + """Trennt YAML-Frontmatter vom eigentlichen Text.""" fm_match = re.match(r'^\s*---\s*\n(.*?)\n---', md_text, re.DOTALL) if not fm_match: return {}, md_text try: @@ -89,12 +96,15 @@ def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]: # 2. DATA CLASSES & TEXT TOOLS # ========================================== -_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])'); _WS = re.compile(r'\s+') +_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])') +_WS = re.compile(r'\s+') def estimate_tokens(text: str) -> int: + """Grobe Schätzung der Token-Anzahl (4 Zeichen pro Token).""" return max(1, math.ceil(len(text.strip()) / 4)) def split_sentences(text: str) -> list[str]: + """Teilt Text in Sätze auf unter Berücksichtigung von Interpunktion.""" text = _WS.sub(' ', text.strip()) if not text: return [] parts = _SENT_SPLIT.split(text) @@ -102,13 +112,26 @@ def split_sentences(text: str) -> list[str]: @dataclass class RawBlock: - kind: str; text: str; level: Optional[int]; section_path: str; section_title: Optional[str] + kind: str + text: str + level: Optional[int] + section_path: str + section_title: Optional[str] @dataclass class Chunk: - id: str; note_id: str; index: int; text: str; window: str; token_count: int - section_title: Optional[str]; section_path: str - neighbors_prev: Optional[str]; neighbors_next: Optional[str] + id: str + note_id: str + index: int + text: str + window: str + token_count: int + section_title: Optional[str] + section_path: str + neighbors_prev: Optional[str] + neighbors_next: Optional[str] + # WP-15b: Liste von Kandidaten für die semantische Validierung + candidate_pool: List[Dict[str, Any]] = field(default_factory=list) suggested_edges: Optional[List[str]] = None # ========================================== @@ -118,7 +141,7 @@ class Chunk: def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: """ Zerlegt Text in logische Blöcke (Absätze, Header). - Wichtig für die Strategie 'by_heading'. + Wichtig für die Strategie 'by_heading' und die Edge-Inheritance. """ blocks = [] h1_title = "Dokument" @@ -165,14 +188,15 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "", context_prefix: str = "") -> List[Chunk]: """ - Die Standard-Strategie aus WP-15. - Fasst Blöcke zusammen und schneidet bei 'target' Tokens (mit Satz-Rücksicht). + Standard-Strategie aus WP-15. + Fasst Blöcke zusammen und schneidet bei 'target' Tokens. """ target = config.get("target", 400) max_tokens = config.get("max", 600) overlap_val = config.get("overlap", (50, 80)) overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val - chunks = []; buf = [] + chunks = [] + buf = [] def _create_chunk(txt, win, sec, path): idx = len(chunks) @@ -180,7 +204,7 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None, - suggested_edges=[] + candidate_pool=[] )) def flush_buffer(): @@ -190,14 +214,11 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not text_body = "\n\n".join([b.text for b in buf]) sec_title = buf[-1].section_title if buf else None sec_path = buf[-1].section_path if buf else "/" - - # Context Prefix (z.B. H1) voranstellen für Embedding-Qualität win_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body if estimate_tokens(text_body) <= max_tokens: _create_chunk(text_body, win_body, sec_title, sec_path) else: - # Zu groß -> Satzweiser Split sentences = split_sentences(text_body) current_chunk_sents = [] current_len = 0 @@ -209,15 +230,13 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt _create_chunk(c_txt, c_win, sec_title, sec_path) - # Overlap für nächsten Chunk overlap_sents = [] ov_len = 0 for s in reversed(current_chunk_sents): if ov_len + estimate_tokens(s) < overlap: overlap_sents.insert(0, s) ov_len += estimate_tokens(s) - else: - break + else: break current_chunk_sents = list(overlap_sents) current_chunk_sents.append(sent) @@ -226,12 +245,10 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not current_chunk_sents.append(sent) current_len += sent_len - # Rest if current_chunk_sents: c_txt = " ".join(current_chunk_sents) c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt _create_chunk(c_txt, c_win, sec_title, sec_path) - buf = [] for b in blocks: @@ -248,132 +265,137 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: """ - Strategie für strukturierte Daten (Profile, Werte). - Nutzt sliding_window, forciert aber Schnitte an Headings (via parse_blocks Vorarbeit). + Hybrid-Strategie v2.9 (Strict/Soft/Safety-Net). """ - return _strategy_sliding_window(blocks, config, note_id, doc_title, context_prefix=f"# {doc_title}") + strict = config.get("strict_heading_split", False) + target = config.get("target", 400) + max_tokens = config.get("max", 600) + split_level = config.get("split_level", 2) + + chunks = [] + current_buf = [] + current_tokens = 0 + + def _flush(sec_title, sec_path): + nonlocal current_buf, current_tokens + if not current_buf: return + txt = "\n\n".join(current_buf) + win = f"# {doc_title}\n## {sec_title}\n{txt}".strip() if sec_title else txt + idx = len(chunks) + chunks.append(Chunk( + id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, + text=txt, window=win, token_count=estimate_tokens(txt), + section_title=sec_title, section_path=sec_path, + neighbors_prev=None, neighbors_next=None, + candidate_pool=[] + )) + current_buf = [] + current_tokens = 0 + + for b in blocks: + if b.kind == "heading": + # Hierarchie-Check: Split bei Überschriften oberhalb des Split-Levels + if b.level < split_level: + _flush(b.section_title, b.section_path) + elif b.level == split_level: + if strict or current_tokens >= target: + _flush(b.section_title, b.section_path) + continue + + block_tokens = estimate_tokens(b.text) + if current_tokens + block_tokens > max_tokens and current_buf: + _flush(b.section_title, b.section_path) + + current_buf.append(b.text) + current_tokens += block_tokens + + if current_buf: + last = blocks[-1] if blocks else None + _flush(last.section_title if last else None, last.section_path if last else "/") + + return chunks # ========================================== -# 4. ROBUST EDGE PARSING & PROPAGATION (NEU) +# 4. ROBUST EDGE PARSING & PROPAGATION # ========================================== def _parse_edges_robust(text: str) -> Set[str]: """ - NEU: Findet Kanten im Text, auch wenn sie mehrzeilig oder 'kaputt' formatiert sind. - Erkennt: - > [!edge] type - > [[Link]] - Returns: Set von Strings "kind:target" + Findet Kanten im Text (Wikilinks, Inlines, Callouts). + Fix V3: Support für mehrzeilige Callouts. """ found_edges = set() - # A. Inline [[rel:type|target]] (Standard) + # A. Inline [[rel:type|target]] inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text) for kind, target in inlines: - k = kind.strip() + k = kind.strip().lower() t = target.strip() if k and t: found_edges.add(f"{k}:{t}") - # B. Multiline Callouts Parsing (Der Fix für dein Problem) + # B. Multiline Callouts Parsing (WP-15 Fix) lines = text.split('\n') current_edge_type = None - for line in lines: stripped = line.strip() - - # 1. Start Blockquote: > [!edge] type - # (Erlaubt optionalen Doppelpunkt) callout_match = re.match(r'>\s*\[!edge\]\s*([^:\s]+)', stripped) if callout_match: - current_edge_type = callout_match.group(1).strip() - - # Check: Sind Links noch in der GLEICHEN Zeile? + current_edge_type = callout_match.group(1).strip().lower() links = re.findall(r'\[\[([^\]]+)\]\]', stripped) for l in links: - if "rel:" not in l: - found_edges.add(f"{current_edge_type}:{l}") + if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") continue - # 2. Continuation Line: > [[Target]] - # Wenn wir noch im 'edge mode' sind und die Zeile ein Zitat ist if current_edge_type and stripped.startswith('>'): links = re.findall(r'\[\[([^\]]+)\]\]', stripped) for l in links: - if "rel:" not in l: - found_edges.add(f"{current_edge_type}:{l}") - - # 3. End of Blockquote (kein '>') -> Reset Type + if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") elif not stripped.startswith('>'): current_edge_type = None return found_edges -def _propagate_section_edges(chunks: List[Chunk]) -> List[Chunk]: +def _propagate_section_edges(chunks: List[Chunk], blocks: List[RawBlock]) -> List[Chunk]: """ - NEU: Verteilt Kanten innerhalb einer Sektion. - Löst das Problem: Callout steht oben im Kapitel, gilt aber für alle Chunks darunter. + WP-15b: Implementiert Edge-Inheritance. + Kanten aus Überschriften werden an untergeordnete Chunks vererbt. """ - # Step 1: Sammeln pro Sektion - section_map = {} # path -> set(kind:target) + section_inheritance: Dict[str, Set[str]] = {} + # 1. Sammeln aus den Heading-Blöcken + for b in blocks: + if b.kind == "heading": + edges = _parse_edges_robust(b.text) + if edges: + if b.section_path not in section_inheritance: + section_inheritance[b.section_path] = set() + section_inheritance[b.section_path].update(edges) + + # 2. Injektion in den Candidate-Pool for ch in chunks: - # Root-Level "/" ignorieren wir meist, da zu global - if not ch.section_path or ch.section_path == "/": continue - - edges = _parse_edges_robust(ch.text) - if edges: - if ch.section_path not in section_map: - section_map[ch.section_path] = set() - section_map[ch.section_path].update(edges) - - # Step 2: Injizieren (Broadcasting) - for ch in chunks: - if ch.section_path in section_map: - edges_to_add = section_map[ch.section_path] - if not edges_to_add: continue - - injections = [] - for e_str in edges_to_add: - kind, target = e_str.split(':', 1) - # Check: Kante schon im Text? - token = f"[[rel:{kind}|{target}]]" - if token not in ch.text: - injections.append(token) - - if injections: - # Wir schreiben die Kanten "hart" in den Text. - # Damit findet sie derive_edges.py später garantiert. - block = "\n\n\n" + " ".join(injections) - ch.text += block - # Auch ins Window schreiben für Embedding-Kontext - ch.window += block + inherited = section_inheritance.get(ch.section_path, set()) + for e_str in inherited: + kind, target = e_str.split(':', 1) + ch.candidate_pool.append({"kind": kind, "to": target, "provenance": "inherited"}) return chunks # ========================================== -# 5. ORCHESTRATION (ASYNC) +# 5. ORCHESTRATION (WP-15b) # ========================================== async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]: """ - Hauptfunktion. Verbindet Parsing, Splitting und Edge-Allocation. + Hauptfunktion zur Chunk-Generierung. + Baut den Candidate-Pool für die semantische Validierung auf. """ - # 1. Config laden (WP-15 Kompatibilität) if config is None: config = get_chunk_config(note_type) fm, body_text = extract_frontmatter_from_text(md_text) - note_status = fm.get("status", "").lower() - primary_strategy = config.get("strategy", "sliding_window") - enable_smart_edges = config.get("enable_smart_edge_allocation", False) - # Drafts skippen LLM um Kosten/Zeit zu sparen - if enable_smart_edges and note_status in ["draft", "initial_gen"]: - logger.info(f"Chunker: Skipping Smart Edges for draft '{note_id}'.") - enable_smart_edges = False - - # 2. Parsing & Splitting + # 1. Parsing & Splitting blocks, doc_title = parse_blocks(md_text) if primary_strategy == "by_heading": @@ -381,94 +403,45 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op else: chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id, doc_title) - if not chunks: - return [] + if not chunks: return [] - # 3. NEU: Propagation VOR Smart Edge Allocation - # Das repariert die fehlenden Kanten aus deinen Callouts. - chunks = _propagate_section_edges(chunks) + # 2. WP-15b: Candidate Pool Vorbereitung + + # A. Edge Inheritance (Sektions-Propagation) + chunks = _propagate_section_edges(chunks, blocks) + + # B. Explicit Edges (Direkt im Chunk-Text enthalten) + for ch in chunks: + explicit = _parse_edges_robust(ch.text) + for e_str in explicit: + kind, target = e_str.split(':', 1) + ch.candidate_pool.append({"kind": kind, "to": target, "provenance": "explicit"}) - # 4. Smart Edges (LLM) - if enable_smart_edges: - chunks = await _run_smart_edge_allocation(chunks, md_text, note_id, note_type) + # C. Global "Unassigned Pool" Detection (Safety Net) + # Sucht nach einer Sektion "Unzugeordnete Kanten" im Body + unassigned_pool = set() + pool_match = re.search(r'###?\s*(?:Unzugeordnete Kanten|Edge Pool|Candidates)\s*\n(.*?)(?:\n#|$)', body_text, re.DOTALL | re.IGNORECASE) + if pool_match: + unassigned_pool = _parse_edges_robust(pool_match.group(1)) + for ch in chunks: + for e_str in unassigned_pool: + kind, target = e_str.split(':', 1) + ch.candidate_pool.append({"kind": kind, "to": target, "provenance": "global_pool"}) - # 5. Linking + # D. De-Duplikation des Pools + for ch in chunks: + seen = set() + unique_pool = [] + for cand in ch.candidate_pool: + key = (cand["kind"], cand["to"]) + if key not in seen: + seen.add(key) + unique_pool.append(cand) + ch.candidate_pool = unique_pool + + # 3. Nachbarschafts-Verkettung (Struktur-Kanten) for i, ch in enumerate(chunks): ch.neighbors_prev = chunks[i-1].id if i > 0 else None ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None - return chunks - -def _extract_all_edges_from_md(md_text: str, note_id: str, note_type: str) -> List[str]: - """ - Hilfsfunktion: Sammelt ALLE Kanten für den LLM-Kandidaten-Pool. - """ - # A. Via derive_edges (Standard) - dummy_chunk = { - "chunk_id": f"{note_id}#full", - "text": md_text, - "content": md_text, - "window": md_text, - "type": note_type - } - # Signatur-Anpassung beachten (WP-15 Fix) - raw_edges = build_edges_for_note( - note_id, - [dummy_chunk], - note_level_references=None, - include_note_scope_refs=False - ) - all_candidates = set() - for e in raw_edges: - kind = e.get("kind") - target = e.get("target_id") - if target and kind not in ["belongs_to", "next", "prev", "backlink"]: - all_candidates.add(f"{kind}:{target}") - - # B. Via Robust Parser (NEU) - fängt die multiline Callouts - robust_edges = _parse_edges_robust(md_text) - all_candidates.update(robust_edges) - - return list(all_candidates) - -async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_id: str, note_type: str) -> List[Chunk]: - """ - Der LLM-Schritt (WP-15). Filtert irrelevante Kanten. - """ - analyzer = get_semantic_analyzer() - candidate_list = _extract_all_edges_from_md(full_text, note_id, note_type) - - if not candidate_list: - return chunks - - tasks = [] - for chunk in chunks: - tasks.append(analyzer.assign_edges_to_chunk(chunk.text, candidate_list, note_type)) - - results_per_chunk = await asyncio.gather(*tasks) - - assigned_edges_global = set() - - for i, confirmed_edges in enumerate(results_per_chunk): - chunk = chunks[i] - chunk.suggested_edges = confirmed_edges - assigned_edges_global.update(confirmed_edges) - - if confirmed_edges: - # Wir schreiben auch Smart Edges hart in den Text - injection_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in confirmed_edges if ':' in e]) - chunk.text += injection_str - chunk.window += injection_str - - # Fallback für Kanten, die das LLM nirgendwo zugeordnet hat - # (Damit nichts verloren geht -> Safety Fallback) - unassigned = set(candidate_list) - assigned_edges_global - if unassigned: - fallback_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in unassigned if ':' in e]) - for chunk in chunks: - chunk.text += fallback_str - chunk.window += fallback_str - if chunk.suggested_edges is None: chunk.suggested_edges = [] - chunk.suggested_edges.extend(list(unassigned)) - return chunks \ No newline at end of file diff --git a/app/core/derive_edges.py b/app/core/derive_edges.py index 96e0ad0..31204c9 100644 --- a/app/core/derive_edges.py +++ b/app/core/derive_edges.py @@ -1,17 +1,20 @@ """ FILE: app/core/derive_edges.py DESCRIPTION: Extrahiert Graph-Kanten aus Text. Unterstützt Wikilinks, Inline-Relations ([[rel:type|target]]) und Obsidian Callouts. -VERSION: 2.0.0 + WP-15b: Integration des Candidate-Pools und Provenance-Priorisierung. + Sichert die Graph-Integrität durch confidence-basiertes De-Duplicating. +VERSION: 2.1.0 STATUS: Active -DEPENDENCIES: re, os, yaml, typing +DEPENDENCIES: re, os, yaml, typing, hashlib EXTERNAL_CONFIG: config/types.yaml -LAST_ANALYSIS: 2025-12-15 +LAST_ANALYSIS: 2025-12-26 """ from __future__ import annotations import os import re +import hashlib from typing import Iterable, List, Optional, Tuple, Set, Dict try: @@ -20,17 +23,18 @@ except Exception: # pragma: no cover yaml = None # --------------------------------------------------------------------------- # -# Utilities +# 1. Utilities & ID Generation # --------------------------------------------------------------------------- # def _get(d: dict, *keys, default=None): + """Sicherer Zugriff auf verschachtelte Dictionary-Keys.""" for k in keys: if isinstance(d, dict) and k in d and d[k] is not None: return d[k] return default def _chunk_text_for_refs(chunk: dict) -> str: - # bevorzugt 'window' → dann 'text' → 'content' → 'raw' + """Extrahiert den relevanten Text für die Referenzsuche (bevorzugt Window).""" return ( _get(chunk, "window") or _get(chunk, "text") @@ -40,6 +44,7 @@ def _chunk_text_for_refs(chunk: dict) -> str: ) def _dedupe_seq(seq: Iterable[str]) -> List[str]: + """Dedupliziert eine Sequenz von Strings unter Beibehaltung der Reihenfolge.""" seen: Set[str] = set() out: List[str] = [] for s in seq: @@ -49,9 +54,10 @@ def _dedupe_seq(seq: Iterable[str]) -> List[str]: return out def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: + """Konstruiert ein valides Kanten-Payload-Objekt für Qdrant.""" pl = { "kind": kind, - "relation": kind, # Alias (v2) + "relation": kind, # Alias für Abwärtskompatibilität (v2) "scope": scope, # "chunk" | "note" "source_id": source_id, "target_id": target_id, @@ -62,25 +68,38 @@ def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, e return pl def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: + """Erzeugt eine deterministische 12-Byte ID mittels BLAKE2s.""" base = f"{kind}:{s}->{t}#{scope}" if rule_id: base += f"|{rule_id}" try: - import hashlib return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() except Exception: # pragma: no cover return base # --------------------------------------------------------------------------- # -# Typen-Registry (types.yaml) +# 2. Konfiguration & Provenance-Skala # --------------------------------------------------------------------------- # +# WP-15b: Prioritäten-Ranking für die De-Duplizierung +PROVENANCE_PRIORITY = { + "explicit:wikilink": 1.00, + "inline:rel": 0.95, + "callout:edge": 0.90, + "semantic_ai": 0.90, # Validierte KI-Kanten + "structure:belongs_to": 1.00, + "structure:order": 0.95, # next/prev + "explicit:note_scope": 1.00, + "derived:backlink": 0.90, + "edge_defaults": 0.70 # Heuristik (types.yaml) +} + def _env(n: str, default: Optional[str] = None) -> str: v = os.getenv(n) return v if v is not None else (default or "") def _load_types_registry() -> dict: - """Lädt die YAML-Registry aus MINDNET_TYPES_FILE oder ./config/types.yaml""" + """Lädt die YAML-Registry zur Ermittlung von Standard-Kanten.""" p = _env("MINDNET_TYPES_FILE", "./config/types.yaml") if not os.path.isfile(p) or yaml is None: return {} @@ -97,13 +116,7 @@ def _get_types_map(reg: dict) -> dict: return reg if isinstance(reg, dict) else {} def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: - """ - Liefert die edge_defaults-Liste für den gegebenen Notiztyp. - Fallback-Reihenfolge: - 1) reg['types'][note_type]['edge_defaults'] - 2) reg['defaults']['edge_defaults'] (oder 'default'/'global') - 3) [] - """ + """Liefert die edge_defaults-Liste für den gegebenen Notiztyp.""" types_map = _get_types_map(reg) if note_type and isinstance(types_map, dict): t = types_map.get(note_type) @@ -116,29 +129,19 @@ def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: return [] # --------------------------------------------------------------------------- # -# Parser für Links / Relationen +# 3. Parser für Links / Relationen (Core Logik v2.0.0) # --------------------------------------------------------------------------- # # Normale Wikilinks (Fallback) _WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]") -# Getypte Inline-Relationen: -# [[rel:KIND | Target]] -# [[rel:KIND Target]] +# Getypte Inline-Relationen _REL_PIPE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s*\|\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) _REL_SPACE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s+(?P[^\]]+?)\s*\]\]", re.IGNORECASE) -# rel: KIND [[Target]] (reines Textmuster) _REL_TEXT = re.compile(r"rel\s*:\s*(?P[a-z_]+)\s*\[\[\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) def _extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: - """ - Gibt Liste (kind, target) zurück und den Text mit entfernten getypten Relation-Links, - damit die generische Wikilink-Erkennung sie nicht doppelt zählt. - Unterstützt drei Varianten: - - [[rel:KIND | Target]] - - [[rel:KIND Target]] - - rel: KIND [[Target]] - """ + """Extrahiert [[rel:KIND|Target]] und entfernt sie zur Vermeidung von Dubletten.""" pairs: List[Tuple[str,str]] = [] def _collect(m): k = (m.group("kind") or "").strip().lower() @@ -152,17 +155,13 @@ def _extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: text = _REL_TEXT.sub(_collect, text) return pairs, text -# Obsidian Callout Parser +# Obsidian Callout Parser für mehrzeilige Blöcke _CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE) _REL_LINE = re.compile(r"^(?P[a-z_]+)\s*:\s*(?P.+?)\s*$", re.IGNORECASE) _WIKILINKS_IN_LINE = re.compile(r"\[\[([^\]]+)\]\]") def _extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: - """ - Findet [!edge]-Callouts und extrahiert (kind, target). Entfernt den gesamten - Callout-Block aus dem Text (damit Wikilinks daraus nicht zusätzlich als - "references" gezählt werden). - """ + """Verarbeitet [!edge]-Callouts und entfernt diese aus dem Textfluss.""" if not text: return [], text @@ -205,21 +204,20 @@ def _extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: t = raw.strip() if t: out_pairs.append((kind, t)) - - # Callout wird NICHT in keep_lines übernommen continue remainder = "\n".join(keep_lines) return out_pairs, remainder def _extract_wikilinks(text: str) -> List[str]: + """Extrahiert Standard-Wikilinks aus dem verbleibenden Text.""" ids: List[str] = [] for m in _WIKILINK_RE.finditer(text or ""): ids.append(m.group(1).strip()) return ids # --------------------------------------------------------------------------- # -# Hauptfunktion +# 4. Hauptfunktion (build_edges_for_note) # --------------------------------------------------------------------------- # def build_edges_for_note( @@ -229,24 +227,13 @@ def build_edges_for_note( include_note_scope_refs: bool = False, ) -> List[dict]: """ - Erzeugt Kanten für eine Note. - - - belongs_to: für jeden Chunk (chunk -> note) - - next / prev: zwischen aufeinanderfolgenden Chunks - - references: pro Chunk aus window/text (via Wikilinks) - - typed inline relations: [[rel:KIND | Target]] / [[rel:KIND Target]] / rel: KIND [[Target]] - - Obsidian Callouts: > [!edge] KIND: [[Target]] [[Target2]] - - optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references - - typenbasierte Default-Kanten (edge_defaults) je gefundener Referenz + Erzeugt und aggregiert alle Kanten für eine Note inklusive WP-15b Candidate-Processing. + Setzt Provenance-Ranking zur Graph-Stabilisierung ein. """ edges: List[dict] = [] + note_type = _get(chunks[0], "type") if chunks else "concept" - # Note-Typ (aus erstem Chunk erwartet) - note_type = None - if chunks: - note_type = _get(chunks[0], "type") - - # 1) belongs_to + # 1) Struktur-Kanten: belongs_to (Chunk -> Note) for ch in chunks: cid = _get(ch, "chunk_id", "id") if not cid: @@ -254,12 +241,12 @@ def build_edges_for_note( edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to"), - "provenance": "rule", + "provenance": "structure", "rule_id": "structure:belongs_to", - "confidence": 1.0, + "confidence": PROVENANCE_PRIORITY["structure:belongs_to"], })) - # 2) next / prev + # 2) Struktur-Kanten: next / prev (Sequenz) for i in range(len(chunks) - 1): a, b = chunks[i], chunks[i + 1] a_id = _get(a, "chunk_id", "id") @@ -269,19 +256,19 @@ def build_edges_for_note( edges.append(_edge("next", "chunk", a_id, b_id, note_id, { "chunk_id": a_id, "edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order"), - "provenance": "rule", + "provenance": "structure", "rule_id": "structure:order", - "confidence": 0.95, + "confidence": PROVENANCE_PRIORITY["structure:order"], })) edges.append(_edge("prev", "chunk", b_id, a_id, note_id, { "chunk_id": b_id, "edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order"), - "provenance": "rule", + "provenance": "structure", "rule_id": "structure:order", - "confidence": 0.95, + "confidence": PROVENANCE_PRIORITY["structure:order"], })) - # 3) references + typed inline + callouts + defaults (chunk-scope) + # 3) Inhaltliche Kanten (Refs, Inlines, Callouts, Candidates) reg = _load_types_registry() defaults = _edge_defaults_for(note_type, reg) refs_all: List[str] = [] @@ -292,51 +279,49 @@ def build_edges_for_note( continue raw = _chunk_text_for_refs(ch) - # 3a) typed inline relations + # 3a) Typed Inline Relations typed, remainder = _extract_typed_relations(raw) for kind, target in typed: - kind = kind.strip().lower() - if not kind or not target: - continue - edges.append(_edge(kind, "chunk", cid, target, note_id, { + k = kind.strip().lower() + if not k or not target: continue + edges.append(_edge(k, "chunk", cid, target, note_id, { "chunk_id": cid, - "edge_id": _mk_edge_id(kind, cid, target, "chunk", "inline:rel"), + "edge_id": _mk_edge_id(k, cid, target, "chunk", "inline:rel"), "provenance": "explicit", "rule_id": "inline:rel", - "confidence": 0.95, + "confidence": PROVENANCE_PRIORITY["inline:rel"], })) - if kind in {"related_to", "similar_to"}: - edges.append(_edge(kind, "chunk", target, cid, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(kind, target, cid, "chunk", "inline:rel"), - "provenance": "explicit", - "rule_id": "inline:rel", - "confidence": 0.95, - })) - # 3b) callouts + # 3b) WP-15b Candidate Pool Integration (KI-validierte Kanten) + # Verarbeitet Kanten, die bereits in der Ingestion semantisch geprüft wurden. + pool = ch.get("candidate_pool") or ch.get("candidate_edges") or [] + for cand in pool: + target = cand.get("to") + kind = cand.get("kind", "related_to") + prov = cand.get("provenance", "semantic_ai") + if not target: continue + edges.append(_edge(kind, "chunk", cid, target, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(kind, cid, target, "chunk", f"candidate:{prov}"), + "provenance": prov, + "rule_id": f"candidate:{prov}", + "confidence": PROVENANCE_PRIORITY.get(prov, 0.90), + })) + + # 3c) Obsidian Callouts call_pairs, remainder2 = _extract_callout_relations(remainder) for kind, target in call_pairs: k = (kind or "").strip().lower() - if not k or not target: - continue + if not k or not target: continue edges.append(_edge(k, "chunk", cid, target, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(k, cid, target, "chunk", "callout:edge"), "provenance": "explicit", "rule_id": "callout:edge", - "confidence": 0.95, + "confidence": PROVENANCE_PRIORITY["callout:edge"], })) - if k in {"related_to", "similar_to"}: - edges.append(_edge(k, "chunk", target, cid, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(k, target, cid, "chunk", "callout:edge"), - "provenance": "explicit", - "rule_id": "callout:edge", - "confidence": 0.95, - })) - # 3c) generische Wikilinks → references (+ defaults je Ref) + # 3d) Standard-Wikilinks -> references (+ defaults) refs = _extract_wikilinks(remainder2) for r in refs: edges.append(_edge("references", "chunk", cid, r, note_id, { @@ -345,76 +330,65 @@ def build_edges_for_note( "edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink"), "provenance": "explicit", "rule_id": "explicit:wikilink", - "confidence": 1.0, + "confidence": PROVENANCE_PRIORITY["explicit:wikilink"], })) + # Regelbasierte Kanten aus types.yaml anhängen for rel in defaults: - if rel == "references": - continue + if rel == "references": continue edges.append(_edge(rel, "chunk", cid, r, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{note_type}:{rel}"), "provenance": "rule", "rule_id": f"edge_defaults:{note_type}:{rel}", - "confidence": 0.7, + "confidence": PROVENANCE_PRIORITY["edge_defaults"], })) - if rel in {"related_to", "similar_to"}: - edges.append(_edge(rel, "chunk", r, cid, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(rel, r, cid, "chunk", f"edge_defaults:{note_type}:{rel}"), - "provenance": "rule", - "rule_id": f"edge_defaults:{note_type}:{rel}", - "confidence": 0.7, - })) refs_all.extend(refs) - # 4) optional note-scope refs/backlinks (+ defaults) + # 4) Optionale Note-Scope Referenzen & Backlinks if include_note_scope_refs: refs_note = list(refs_all or []) if note_level_references: refs_note.extend([r for r in note_level_references if isinstance(r, str) and r]) refs_note = _dedupe_seq(refs_note) + for r in refs_note: edges.append(_edge("references", "note", note_id, r, note_id, { "edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope"), "provenance": "explicit", "rule_id": "explicit:note_scope", - "confidence": 1.0, + "confidence": PROVENANCE_PRIORITY["explicit:note_scope"], })) + # Backlink-Erzeugung zur Graphen-Stärkung edges.append(_edge("backlink", "note", r, note_id, note_id, { "edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink"), "provenance": "rule", "rule_id": "derived:backlink", - "confidence": 0.9, + "confidence": PROVENANCE_PRIORITY["derived:backlink"], })) for rel in defaults: - if rel == "references": - continue + if rel == "references": continue edges.append(_edge(rel, "note", note_id, r, note_id, { "edge_id": _mk_edge_id(rel, note_id, r, "note", f"edge_defaults:{note_type}:{rel}"), "provenance": "rule", "rule_id": f"edge_defaults:{note_type}:{rel}", - "confidence": 0.7, + "confidence": PROVENANCE_PRIORITY["edge_defaults"], })) - if rel in {"related_to", "similar_to"}: - edges.append(_edge(rel, "note", r, note_id, note_id, { - "edge_id": _mk_edge_id(rel, r, note_id, "note", f"edge_defaults:{note_type}:{rel}"), - "provenance": "rule", - "rule_id": f"edge_defaults:{note_type}:{rel}", - "confidence": 0.7, - })) - # 5) De-Dupe (source_id, target_id, relation, rule_id) - seen: Set[Tuple[str,str,str,str]] = set() - out: List[dict] = [] + # 5) WP-15b: Confidence-basierte De-Duplizierung + # Wenn dieselbe Relation mehrfach existiert, gewinnt die mit der höchsten Confidence. + unique_map: Dict[Tuple[str, str, str], dict] = {} + for e in edges: - s = str(e.get("source_id") or "") - t = str(e.get("target_id") or "") + s, t = str(e.get("source_id")), str(e.get("target_id")) rel = str(e.get("relation") or e.get("kind") or "edge") - rule = str(e.get("rule_id") or "") - key = (s, t, rel, rule) - if key in seen: - continue - seen.add(key) - out.append(e) - return out + key = (s, t, rel) + + if key not in unique_map: + unique_map[key] = e + else: + # Vergleich der Vertrauenswürdigkeit (Provenance Ranking) + if e.get("confidence", 0) > unique_map[key].get("confidence", 0): + unique_map[key] = e + + return list(unique_map.values()) \ No newline at end of file diff --git a/app/core/ingestion.py b/app/core/ingestion.py index fa71d1f..ce35daf 100644 --- a/app/core/ingestion.py +++ b/app/core/ingestion.py @@ -3,12 +3,12 @@ FILE: app/core/ingestion.py DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen. WP-20: Optimiert für OpenRouter (mistralai/mistral-7b-instruct:free). WP-22: Content Lifecycle, Edge Registry Validation & Multi-Hash. -FIX: Deep Fallback Logic (v2.11.14). Erkennt Policy Violations auch in validen - JSON-Objekten und erzwingt den lokalen Ollama-Sprung, um Kantenverlust - bei umfangreichen Protokollen zu verhindern. -VERSION: 2.11.14 + WP-15b: Two-Pass Ingestion mit LocalBatchCache & Candidate-Validation. + FIX: Beibehaltung der Deep Fallback Logic (v2.11.14) zur JSON-Recovery. +VERSION: 2.12.0 STATUS: Active -DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service, app.services.edge_registry +DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, + app.services.llm_service, app.services.edge_registry """ import os import json @@ -21,9 +21,11 @@ from typing import Dict, List, Optional, Tuple, Any # Core Module Imports from app.core.parser import ( read_markdown, + pre_scan_markdown, normalize_frontmatter, validate_required_frontmatter, extract_edges_with_context, + NoteContext ) from app.core.note_payload import make_note_payload from app.core.chunker import assemble_chunks, get_chunk_config @@ -49,7 +51,7 @@ from app.services.llm_service import LLMService logger = logging.getLogger(__name__) -# --- Global Helpers --- +# --- Global Helpers (Full Compatibility v2.11.14) --- def extract_json_from_response(text: str) -> Any: """ Extrahiert JSON-Daten und bereinigt LLM-Steuerzeichen (Mistral/Llama). @@ -115,6 +117,7 @@ class IngestionService: self.llm = LLMService() self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE + self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache try: ensure_collections(self.client, self.prefix, self.dim) @@ -122,6 +125,54 @@ class IngestionService: except Exception as e: logger.warning(f"DB init warning: {e}") + async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]: + """ + WP-15b: Implementiert den Two-Pass Ingestion Workflow. + Pass 1: Pre-Scan baut Kontext-Cache auf. + Pass 2: Processing führt semantische Validierung durch. + """ + logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Batch Cache...") + for path in file_paths: + ctx = pre_scan_markdown(path) + if ctx: + self.batch_cache[ctx.note_id] = ctx + + logger.info(f"🚀 [Pass 2] Processing {len(file_paths)} files...") + results = [] + for path in file_paths: + res = await self.process_file(path, vault_root, apply=True) + results.append(res) + return results + + async def _validate_candidate(self, chunk_text: str, edge: Dict) -> bool: + """ + WP-15b: Validiert einen Kanten-Kandidaten semantisch gegen das Ziel. + Nutzt den Cache aus Pass 1, um dem LLM Kontext der Ziel-Note zu geben. + """ + target_id = edge.get("to") + target_ctx = self.batch_cache.get(target_id) + + # Falls Zielnotiz nicht im aktuellen Batch ist: 'explicit' durchlassen (Hard-Link Integrity) + if not target_ctx: + return True + + provider = self.settings.MINDNET_LLM_PROVIDER + template = self.llm.get_prompt("edge_validation", provider) + + try: + prompt = template.format( + chunk_text=chunk_text[:1500], + target_title=target_ctx.title, + target_summary=target_ctx.summary, + edge_kind=edge.get("kind", "related_to") + ) + + response = await self.llm.generate_raw_response(prompt, priority="background") + return "YES" in response.upper() + except Exception as e: + logger.warning(f"⚠️ Semantic validation error for {target_id}: {e}") + return True # Fallback: Im Zweifel Link behalten + def _resolve_note_type(self, requested: Optional[str]) -> str: """Bestimmt den finalen Notiz-Typ (Fallback auf 'concept').""" types = self.registry.get("types", {}) @@ -138,109 +189,12 @@ class IngestionService: return cfg return get_chunk_config(note_type) - async def _perform_smart_edge_allocation(self, text: str, note_id: str) -> List[Dict]: - """ - KI-Extraktion mit Deep-Fallback Logik. - Erzwingt den lokalen Ollama-Sprung, wenn die Cloud-Antwort keine verwertbaren - Kanten liefert (häufig bei Policy Violations auf OpenRouter). - """ - provider = self.settings.MINDNET_LLM_PROVIDER - model = self.settings.OPENROUTER_MODEL if provider == "openrouter" else self.settings.GEMINI_MODEL - - logger.info(f"🚀 [Ingestion] Turbo-Mode: Extracting edges for '{note_id}' using {model} on {provider}") - - edge_registry.ensure_latest() - valid_types_str = ", ".join(sorted(list(edge_registry.valid_types))) - - template = self.llm.get_prompt("edge_extraction", provider) - - try: - try: - # Wir begrenzen den Kontext auf 6000 Zeichen (ca. 1500 Token) - prompt = template.format( - text=text[:6000], - note_id=note_id, - valid_types=valid_types_str - ) - except KeyError as ke: - logger.error(f"❌ [Ingestion] Prompt-Template Fehler (Variable {ke} fehlt).") - return [] - - # 1. Versuch: Anfrage an den primären Cloud-Provider - response_json = await self.llm.generate_raw_response( - prompt=prompt, priority="background", force_json=True, - provider=provider, model_override=model - ) - - # Initiales Parsing - raw_data = extract_json_from_response(response_json) - - # 2. Dictionary Recovery (Versuche Liste aus Dict zu extrahieren) - candidates = [] - if isinstance(raw_data, list): - candidates = raw_data - elif isinstance(raw_data, dict): - logger.info(f"ℹ️ [Ingestion] LLM returned dict, checking for embedded lists in {note_id}") - for k in ["edges", "links", "results", "kanten", "matches", "edge_list"]: - if k in raw_data and isinstance(raw_data[k], list): - candidates = raw_data[k] - break - # Wenn immer noch keine Liste gefunden, versuche Key-Value Paare (Dict Recovery) - if not candidates: - for k, v in raw_data.items(): - if isinstance(v, str): candidates.append(f"{k}:{v}") - elif isinstance(v, list): [candidates.append(f"{k}:{i}") for i in v if isinstance(i, str)] - - # 3. DEEP FALLBACK: Wenn nach allen Recovery-Versuchen die Liste leer ist UND wir in der Cloud waren - # Triggert den Fallback bei "Data Policy Violations" (leere oder Fehler-JSONs). - if not candidates and provider != "ollama" and self.settings.LLM_FALLBACK_ENABLED: - logger.warning( - f"🛑 [Ingestion] Cloud-Antwort für {note_id} lieferte keine verwertbaren Kanten. " - f"Mögliche Policy Violation oder Refusal. Erzwinge LOKALEN FALLBACK via Ollama..." - ) - response_json_local = await self.llm.generate_raw_response( - prompt=prompt, priority="background", force_json=True, provider="ollama" - ) - raw_data_local = extract_json_from_response(response_json_local) - - # Wiederhole Recovery für lokale Antwort - if isinstance(raw_data_local, list): - candidates = raw_data_local - elif isinstance(raw_data_local, dict): - for k in ["edges", "links", "results"]: - if k in raw_data_local and isinstance(raw_data_local[k], list): - candidates = raw_data_local[k]; break - - if not candidates: - logger.warning(f"⚠️ [Ingestion] Auch nach Fallback keine extrahierbaren Kanten für {note_id}") - return [] - - processed = [] - for item in candidates: - if isinstance(item, dict) and "to" in item: - item["provenance"] = "semantic_ai" - item["line"] = f"ai-{provider}" - processed.append(item) - elif isinstance(item, str) and ":" in item: - parts = item.split(":", 1) - processed.append({ - "to": parts[1].strip(), - "kind": parts[0].strip(), - "provenance": "semantic_ai", - "line": f"ai-{provider}" - }) - return processed - - except Exception as e: - logger.warning(f"⚠️ [Ingestion] Smart Edge Allocation failed for {note_id}: {e}") - return [] - async def process_file( self, file_path: str, vault_root: str, force_replace: bool = False, apply: bool = False, purge_before: bool = False, note_scope_refs: bool = False, hash_source: str = "parsed", hash_normalize: str = "canonical" ) -> Dict[str, Any]: - """Transformiert eine Markdown-Datei in den Graphen (Notes, Chunks, Edges).""" + """Transformiert eine Markdown-Datei in den Graphen.""" result = {"path": file_path, "status": "skipped", "changed": False, "error": None} # 1. Parse & Lifecycle Gate @@ -252,12 +206,12 @@ class IngestionService: except Exception as e: return {**result, "error": f"Validation failed: {str(e)}"} - # WP-22: Filter für Systemdateien und Entwürfe + # Lifecycle Filter (WP-22) status = fm.get("status", "draft").lower().strip() if status in ["system", "template", "archive", "hidden"]: return {**result, "status": "skipped", "reason": f"lifecycle_{status}"} - # 2. Config Resolution & Payload Construction + # 2. Config Resolution & Payload note_type = self._resolve_note_type(fm.get("type")) fm["type"] = note_type @@ -267,15 +221,13 @@ class IngestionService: except Exception as e: return {**result, "error": f"Payload failed: {str(e)}"} - # 3. Change Detection (Strikte DoD Umsetzung) + # 3. Change Detection (v2.11.14 Logic) old_payload = None if force_replace else self._fetch_note_payload(note_id) check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}" old_hash = (old_payload or {}).get("hashes", {}).get(check_key) new_hash = note_pl.get("hashes", {}).get(check_key) - # Prüfung auf fehlende Artefakte in Qdrant chunks_missing, edges_missing = self._artifacts_missing(note_id) - should_write = force_replace or (not old_payload) or (old_hash != new_hash) or chunks_missing or edges_missing if not should_write: @@ -284,40 +236,42 @@ class IngestionService: if not apply: return {**result, "status": "dry-run", "changed": True, "note_id": note_id} - # 4. Processing (Chunking, Embedding, AI Edges) + # 4. Processing (Chunking, Embedding, Validated Edges) try: body_text = getattr(parsed, "body", "") or "" edge_registry.ensure_latest() - # Profil-gesteuertes Chunking + # Chunker Resolution profile = fm.get("chunk_profile") or fm.get("chunking_profile") or "sliding_standard" chunk_cfg = self._get_chunk_config_by_profile(profile, note_type) chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_cfg) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text) - # Vektorisierung + # Embeddings vecs = [] if chunk_pls: texts = [c.get("window") or c.get("text") or "" for c in chunk_pls] vecs = await self.embedder.embed_documents(texts) - # Kanten-Extraktion + # Kanten-Extraktion & WP-15b Validierung edges = [] context = {"file": file_path, "note_id": note_id} - # A. Explizite Kanten (User / Wikilinks) - for e in extract_edges_with_context(parsed): - e["kind"] = edge_registry.resolve(edge_type=e["kind"], provenance="explicit", context={**context, "line": e.get("line")}) - edges.append(e) + # A. Explizite Kandidaten (Wikilinks) + raw_candidates = extract_edges_with_context(parsed) + for cand in raw_candidates: + # Semantische Prüfung gegen Pass 1 Cache + if await self._validate_candidate(body_text, cand): + cand["kind"] = edge_registry.resolve( + edge_type=cand["kind"], + provenance="explicit", + context={**context, "line": cand.get("line")} + ) + edges.append(cand) + else: + logger.info(f"🚫 WP-15b: Candidate rejected: {cand['kind']} -> {cand['to']}") - # B. KI Kanten (Turbo Mode mit v2.11.14 Fallback) - ai_edges = await self._perform_smart_edge_allocation(body_text, note_id) - for e in ai_edges: - valid_kind = edge_registry.resolve(edge_type=e.get("kind"), provenance="semantic_ai", context={**context, "line": e.get("line")}) - e["kind"] = valid_kind - edges.append(e) - - # C. System Kanten (Struktur) + # B. System Kanten (Struktur) try: sys_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs) except: diff --git a/app/core/parser.py b/app/core/parser.py index b47aeb7..7d183c0 100644 --- a/app/core/parser.py +++ b/app/core/parser.py @@ -2,10 +2,11 @@ FILE: app/core/parser.py DESCRIPTION: Liest Markdown-Dateien fehlertolerant (Encoding-Fallback). Trennt Frontmatter (YAML) vom Body. WP-22 Erweiterung: Kanten-Extraktion mit Zeilennummern für die EdgeRegistry. -VERSION: 1.8.0 + WP-15b: Implementierung NoteContext und pre_scan_markdown für Pass 1 Ingestion. +VERSION: 1.9.0 STATUS: Active DEPENDENCIES: yaml, re, dataclasses, json, io, os -LAST_ANALYSIS: 2025-12-23 +LAST_ANALYSIS: 2025-12-26 """ from __future__ import annotations @@ -32,6 +33,15 @@ class ParsedNote: body: str path: str +@dataclass +class NoteContext: + """Metadaten-Container für den flüchtigen LocalBatchCache (Pass 1).""" + note_id: str + title: str + type: str + summary: str + tags: List[str] + # --------------------------------------------------------------------- # Frontmatter-Erkennung @@ -152,6 +162,32 @@ def read_markdown(path: str) -> Optional[ParsedNote]: return ParsedNote(frontmatter=fm or {}, body=body or "", path=path) +def pre_scan_markdown(path: str) -> Optional[NoteContext]: + """ + WP-15b: Schneller Scan für den LocalBatchCache (Pass 1). + Extrahiert nur Identität und Kurz-Kontext zur semantischen Validierung. + """ + parsed = read_markdown(path) + if not parsed: + return None + + fm = parsed.frontmatter + # ID-Findung: Frontmatter ID oder Dateiname als Fallback + note_id = str(fm.get("id") or os.path.splitext(os.path.basename(path))[0]) + + # Erstelle Kurz-Zusammenfassung (erste 500 Zeichen des Body, bereinigt) + clean_body = re.sub(r'[#*`>]', '', parsed.body[:600]).strip() + summary = clean_body[:500] + "..." if len(clean_body) > 500 else clean_body + + return NoteContext( + note_id=note_id, + title=str(fm.get("title", note_id)), + type=str(fm.get("type", "concept")), + summary=summary, + tags=fm.get("tags", []) if isinstance(fm.get("tags"), list) else [] + ) + + def validate_required_frontmatter(fm: Dict[str, Any], required: Tuple[str, ...] = ("id", "title")) -> None: """ diff --git a/app/services/edge_registry.py b/app/services/edge_registry.py index 95be97b..0763370 100644 --- a/app/services/edge_registry.py +++ b/app/services/edge_registry.py @@ -1,11 +1,14 @@ """ FILE: app/services/edge_registry.py DESCRIPTION: Single Source of Truth für Kanten-Typen mit dynamischem Reload. + WP-15b: Erweiterte Provenance-Prüfung für die Candidate-Validation. + Sichert die Graph-Integrität durch strikte Trennung von System- und Inhaltskanten. WP-22: Fix für absolute Pfade außerhalb des Vaults (Prod-Dictionary). WP-20: Synchronisation mit zentralen Settings (v0.6.2). -VERSION: 0.7.5 +VERSION: 0.8.0 STATUS: Active DEPENDENCIES: re, os, json, logging, time, app.config +LAST_ANALYSIS: 2025-12-26 """ import re import os @@ -19,7 +22,12 @@ from app.config import get_settings logger = logging.getLogger(__name__) class EdgeRegistry: + """ + Zentraler Verwalter für das Kanten-Vokabular. + Implementiert das Singleton-Pattern für konsistente Validierung über alle Services. + """ _instance = None + # System-Kanten, die nicht durch User oder KI gesetzt werden dürfen FORBIDDEN_SYSTEM_EDGES = {"next", "prev", "belongs_to"} def __new__(cls, *args, **kwargs): @@ -51,7 +59,7 @@ class EdgeRegistry: def ensure_latest(self): """ Prüft den Zeitstempel der Vokabular-Datei und lädt bei Bedarf neu. - Verhindert den AttributeError in der Ingestion-Pipeline. + Verhindert Inkonsistenzen bei Laufzeit-Updates des Dictionaries. """ if not os.path.exists(self.full_vocab_path): logger.error(f"!!! [EDGE-REGISTRY ERROR] File not found: {self.full_vocab_path} !!!") @@ -66,7 +74,10 @@ class EdgeRegistry: logger.error(f"!!! [EDGE-REGISTRY] Error checking file time: {e}") def _load_vocabulary(self): - """Parst das Markdown-Wörterbuch und baut die Canonical-Map auf.""" + """ + Parst das Markdown-Wörterbuch und baut die Canonical-Map auf. + Erkennt Tabellen-Strukturen und extrahiert fettgedruckte System-Typen. + """ self.canonical_map.clear() self.valid_types.clear() @@ -101,8 +112,8 @@ class EdgeRegistry: def resolve(self, edge_type: str, provenance: str = "explicit", context: dict = None) -> str: """ - Validiert einen Kanten-Typ gegen das Vokabular. - Loggt unbekannte Typen für die spätere manuelle Pflege. + WP-15b: Validiert einen Kanten-Typ gegen das Vokabular und prüft Berechtigungen. + Sichert, dass nur strukturelle Prozesse System-Kanten setzen dürfen. """ self.ensure_latest() if not edge_type: @@ -112,20 +123,23 @@ class EdgeRegistry: clean_type = edge_type.lower().strip().replace(" ", "_").replace("-", "_") ctx = context or {} - # System-Kanten dürfen nicht manuell vergeben werden - if provenance == "explicit" and clean_type in self.FORBIDDEN_SYSTEM_EDGES: - self._log_issue(clean_type, "forbidden_system_usage", ctx) + # WP-15b: System-Kanten dürfen weder manuell noch durch KI/Vererbung gesetzt werden. + # Nur Provenienz 'structure' (interne Prozesse) ist autorisiert. + # Wir blockieren hier alle Provenienzen außer 'structure'. + restricted_provenance = ["explicit", "semantic_ai", "inherited", "global_pool", "rule"] + if provenance in restricted_provenance and clean_type in self.FORBIDDEN_SYSTEM_EDGES: + self._log_issue(clean_type, f"forbidden_usage_by_{provenance}", ctx) return "related_to" - # System-Kanten sind nur bei struktureller Provenienz erlaubt + # System-Kanten sind NUR bei struktureller Provenienz erlaubt if provenance == "structure" and clean_type in self.FORBIDDEN_SYSTEM_EDGES: return clean_type - # Mapping auf kanonischen Namen + # Mapping auf kanonischen Namen (Alias-Auflösung) if clean_type in self.canonical_map: return self.canonical_map[clean_type] - # Fallback und Logging + # Fallback und Logging unbekannter Typen für Admin-Review self._log_issue(clean_type, "unknown_type", ctx) return clean_type @@ -139,12 +153,13 @@ class EdgeRegistry: "error": error_kind, "file": ctx.get("file", "unknown"), "line": ctx.get("line", "unknown"), - "note_id": ctx.get("note_id", "unknown") + "note_id": ctx.get("note_id", "unknown"), + "provenance": ctx.get("provenance", "unknown") } with open(self.unknown_log_path, "a", encoding="utf-8") as f: f.write(json.dumps(entry) + "\n") except Exception: pass -# Singleton Export +# Singleton Export für systemweiten Zugriff registry = EdgeRegistry() \ No newline at end of file diff --git a/config/prompts.yaml b/config/prompts.yaml index 13b800d..f554155 100644 --- a/config/prompts.yaml +++ b/config/prompts.yaml @@ -1,6 +1,7 @@ -# config/prompts.yaml — Final V2.5.5 (OpenRouter Hardening) +# config/prompts.yaml — Final V2.6.0 (WP-15b Candidate-Validation) # WP-20: Optimierte Cloud-Templates zur Unterdrückung von Modell-Geschwätz. # FIX: Explizite Verbote für Einleitungstexte zur Vermeidung von JSON-Parsing-Fehlern. +# WP-15b: Integration der binären edge_validation für den Two-Pass Workflow. # OLLAMA: UNVERÄNDERT laut Benutzeranweisung. system_prompt: | @@ -215,7 +216,7 @@ edge_extraction: 4. Antworte AUSSCHLIESSLICH in validem JSON als Liste von Objekten. BEISPIEL: - [[ {{"to": "Ziel-Konzept", "kind": "beziehungs_typ"}} ]] + [[ {{"to": "Ziel-Konzept", \"kind\": \"beziehungs_typ\"}} ]] TEXT: """ @@ -227,13 +228,46 @@ edge_extraction: Analysiere '{note_id}'. Extrahiere semantische Beziehungen. ERLAUBTE TYPEN: {valid_types} TEXT: {text} - OUTPUT: STRIKT JSON-Array von Objekten: [[{{"to":"Ziel","kind":"typ"}}]]. Kein Text davor/danach. Wenn nichts: []. + OUTPUT: STRIKT JSON-Array von Objekten: [[{{"to\":\"Ziel\",\"kind\":\"typ\"}}]]. Kein Text davor/danach. Wenn nichts: []. openrouter: | TASK: Extrahiere semantische Relationen für '{note_id}'. ERLAUBTE TYPEN: {valid_types} TEXT: {text} ANWEISUNG: Antworte AUSSCHLIESSLICH mit einem JSON-Array von Objekten. - FORMAT: [[{{"to":"Ziel-Begriff","kind":"typ"}}]] + FORMAT: [[{{"to\":\"Ziel-Begriff\",\"kind\":\"typ\"}}]] STRIKTES VERBOT: Schreibe keine Einleitung, keine Analyse und keine Erklärungen. Wenn keine Relationen existieren, antworte NUR mit: [] - OUTPUT: \ No newline at end of file + OUTPUT: + +# --------------------------------------------------------- +# 8. WP-15b: EDGE VALIDATION (Intent: VALIDATE) +# --------------------------------------------------------- +edge_validation: + gemini: | + Bewerte die semantische Validität dieser Verbindung im Wissensgraph. + + KONTEXT DER QUELLE (Chunk): + "{chunk_text}" + + ZIEL-NOTIZ: "{target_title}" + ZIEL-BESCHREIBUNG (Zusammenfassung): + "{target_summary}" + + GEPLANTE RELATION: "{edge_kind}" + + FRAGE: Bestätigt der Kontext der Quelle die Beziehung '{edge_kind}' zum Ziel? + REGEL: Antworte NUR mit 'YES' oder 'NO'. Keine Erklärungen oder Smalltalk. + openrouter: | + Verify semantic relation for graph construction. + Source Context: {chunk_text} + Target Note: {target_title} + Target Summary: {target_summary} + Proposed Relation: {edge_kind} + Instruction: Does the source context support this relation to the target? + Result: Respond ONLY with 'YES' or 'NO'. + ollama: | + Bewerte die semantische Korrektheit dieser Verbindung. + QUELLE: {chunk_text} + ZIEL: {target_title} ({target_summary}) + BEZIEHUNG: {edge_kind} + Ist diese Verbindung valide? Antworte NUR mit YES oder NO. \ No newline at end of file