diff --git a/app/core/chunker.py b/app/core/chunker.py index c77a43c..d8ea589 100644 --- a/app/core/chunker.py +++ b/app/core/chunker.py @@ -1,393 +1,36 @@ """ FILE: app/core/chunker.py -DESCRIPTION: Zerlegt Texte in Chunks (Sliding Window oder nach Headings). - WP-15b: Implementiert Edge-Inheritance und Candidate-Pool Vorbereitung. - Zentralisiert die Kanten-Vorbereitung für die spätere binäre Validierung. - Bietet volle Unterstützung für Hybrid-Chunking (Strict/Soft/Safety-Net). -VERSION: 3.2.0 +DESCRIPTION: Facade für das Chunking-Package. Stellt 100% Abwärtskompatibilität sicher. + WP-14: Modularisierung abgeschlossen. + WP-15b: Edge-Inheritance und Candidate-Pool Logik integriert. + Verwendet neue 'chunking_' Präfixe für Untermodule. +VERSION: 3.3.0 STATUS: Active -DEPENDENCIES: re, math, yaml, pathlib, asyncio, logging """ - -from __future__ import annotations -from dataclasses import dataclass, field -from typing import List, Dict, Optional, Tuple, Any, Set +import asyncio import re -import math -import yaml -from pathlib import Path -import asyncio import logging +from typing import List, Dict, Optional -# Services -# In WP-15b wird die KI-Validierung in die ingestion.py verlagert. -# Wir behalten den Import für Abwärtskompatibilität, falls Legacy-Skripte ihn benötigen. +# Interne Package-Imports mit neuer Präfix-Konvention +from .chunking.chunking_models import Chunk, RawBlock +from .chunking.chunking_utils import get_chunk_config, extract_frontmatter_from_text +from .chunking.chunking_parser import parse_blocks, parse_edges_robust +from .chunking.chunking_strategies import strategy_sliding_window, strategy_by_heading +from .chunking.chunking_propagation import propagate_section_edges + +logger = logging.getLogger(__name__) + +# Legacy Support für SemanticAnalyzer (Optional für andere Skripte) try: from app.services.semantic_analyzer import get_semantic_analyzer except ImportError: def get_semantic_analyzer(): return None -# Core Imports -try: - from app.core.derive_edges import build_edges_for_note -except ImportError: - # Fallback für Standalone-Betrieb oder Tests - def build_edges_for_note(note_id, chunks, note_level_references=None, include_note_scope_refs=False): return [] - -logger = logging.getLogger(__name__) - -# ========================================== -# 1. HELPER & CONFIG -# ========================================== - -BASE_DIR = Path(__file__).resolve().parent.parent.parent -CONFIG_PATH = BASE_DIR / "config" / "types.yaml" -# Fallback Default, falls types.yaml fehlt -DEFAULT_PROFILE = {"strategy": "sliding_window", "target": 400, "max": 600, "overlap": (50, 80)} -_CONFIG_CACHE = None - -def _load_yaml_config() -> Dict[str, Any]: - global _CONFIG_CACHE - if _CONFIG_CACHE is not None: return _CONFIG_CACHE - if not CONFIG_PATH.exists(): return {} - try: - with open(CONFIG_PATH, "r", encoding="utf-8") as f: - data = yaml.safe_load(f) - _CONFIG_CACHE = data - return data - except Exception: return {} - -def get_chunk_config(note_type: str) -> Dict[str, Any]: - """ - Lädt die Chunking-Strategie basierend auf dem Note-Type aus types.yaml. - Sichert die Kompatibilität zu WP-15 Profilen. - """ - full_config = _load_yaml_config() - profiles = full_config.get("chunking_profiles", {}) - type_def = full_config.get("types", {}).get(note_type.lower(), {}) - - # Welches Profil nutzt dieser Typ? (z.B. 'sliding_smart_edges') - profile_name = type_def.get("chunking_profile") - - if not profile_name: - profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard") - - config = profiles.get(profile_name, DEFAULT_PROFILE).copy() - - # Tupel-Konvertierung für Overlap (YAML liest oft Listen) - if "overlap" in config and isinstance(config["overlap"], list): - config["overlap"] = tuple(config["overlap"]) - - return config - -def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]: - """Trennt YAML-Frontmatter vom eigentlichen Text.""" - fm_match = re.match(r'^\s*---\s*\n(.*?)\n---', md_text, re.DOTALL) - if not fm_match: return {}, md_text - try: - frontmatter = yaml.safe_load(fm_match.group(1)) - if not isinstance(frontmatter, dict): frontmatter = {} - except yaml.YAMLError: - frontmatter = {} - text_without_fm = re.sub(r'^\s*---\s*\n(.*?)\n---', '', md_text, flags=re.DOTALL) - return frontmatter, text_without_fm.strip() - -# ========================================== -# 2. DATA CLASSES & TEXT TOOLS -# ========================================== - -_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])') -_WS = re.compile(r'\s+') - -def estimate_tokens(text: str) -> int: - """Grobe Schätzung der Token-Anzahl (4 Zeichen pro Token).""" - return max(1, math.ceil(len(text.strip()) / 4)) - -def split_sentences(text: str) -> list[str]: - """Teilt Text in Sätze auf unter Berücksichtigung von Interpunktion.""" - text = _WS.sub(' ', text.strip()) - if not text: return [] - parts = _SENT_SPLIT.split(text) - return [p.strip() for p in parts if p.strip()] - -@dataclass -class RawBlock: - kind: str - text: str - level: Optional[int] - section_path: str - section_title: Optional[str] - -@dataclass -class Chunk: - id: str - note_id: str - index: int - text: str - window: str - token_count: int - section_title: Optional[str] - section_path: str - neighbors_prev: Optional[str] - neighbors_next: Optional[str] - # WP-15b: Liste von Kandidaten für die semantische Validierung - candidate_pool: List[Dict[str, Any]] = field(default_factory=list) - suggested_edges: Optional[List[str]] = None - -# ========================================== -# 3. PARSING & STRATEGIES -# ========================================== - -def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: - """ - Zerlegt Text in logische Blöcke (Absätze, Header). - Wichtig für die Strategie 'by_heading' und die Edge-Inheritance. - """ - blocks = [] - h1_title = "Dokument" - section_path = "/" - current_h2 = None - - fm, text_without_fm = extract_frontmatter_from_text(md_text) - - h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) - if h1_match: - h1_title = h1_match.group(1).strip() - - lines = text_without_fm.split('\n') - buffer = [] - - for line in lines: - stripped = line.strip() - if stripped.startswith('# '): - continue - elif stripped.startswith('## '): - if buffer: - content = "\n".join(buffer).strip() - if content: - blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) - buffer = [] - current_h2 = stripped[3:].strip() - section_path = f"/{current_h2}" - blocks.append(RawBlock("heading", stripped, 2, section_path, current_h2)) - elif not stripped: - if buffer: - content = "\n".join(buffer).strip() - if content: - blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) - buffer = [] - else: - buffer.append(line) - - if buffer: - content = "\n".join(buffer).strip() - if content: - blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) - - return blocks, h1_title - -def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "", context_prefix: str = "") -> List[Chunk]: - """ - Standard-Strategie aus WP-15. - Fasst Blöcke zusammen und schneidet bei 'target' Tokens. - """ - target = config.get("target", 400) - max_tokens = config.get("max", 600) - overlap_val = config.get("overlap", (50, 80)) - overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val - chunks = [] - buf = [] - - def _create_chunk(txt, win, sec, path): - idx = len(chunks) - chunks.append(Chunk( - id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, - text=txt, window=win, token_count=estimate_tokens(txt), - section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None, - candidate_pool=[] - )) - - def flush_buffer(): - nonlocal buf - if not buf: return - - text_body = "\n\n".join([b.text for b in buf]) - sec_title = buf[-1].section_title if buf else None - sec_path = buf[-1].section_path if buf else "/" - win_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body - - if estimate_tokens(text_body) <= max_tokens: - _create_chunk(text_body, win_body, sec_title, sec_path) - else: - sentences = split_sentences(text_body) - current_chunk_sents = [] - current_len = 0 - - for sent in sentences: - sent_len = estimate_tokens(sent) - if current_len + sent_len > target and current_chunk_sents: - c_txt = " ".join(current_chunk_sents) - c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt - _create_chunk(c_txt, c_win, sec_title, sec_path) - - overlap_sents = [] - ov_len = 0 - for s in reversed(current_chunk_sents): - if ov_len + estimate_tokens(s) < overlap: - overlap_sents.insert(0, s) - ov_len += estimate_tokens(s) - else: break - - current_chunk_sents = list(overlap_sents) - current_chunk_sents.append(sent) - current_len = ov_len + sent_len - else: - current_chunk_sents.append(sent) - current_len += sent_len - - if current_chunk_sents: - c_txt = " ".join(current_chunk_sents) - c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt - _create_chunk(c_txt, c_win, sec_title, sec_path) - buf = [] - - for b in blocks: - if b.kind == "heading": continue - current_buf_text = "\n\n".join([x.text for x in buf]) - if estimate_tokens(current_buf_text) + estimate_tokens(b.text) >= target: - flush_buffer() - buf.append(b) - if estimate_tokens(b.text) >= target: - flush_buffer() - - flush_buffer() - return chunks - -def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: - """ - Hybrid-Strategie v2.9 (Strict/Soft/Safety-Net). - """ - strict = config.get("strict_heading_split", False) - target = config.get("target", 400) - max_tokens = config.get("max", 600) - split_level = config.get("split_level", 2) - - chunks = [] - current_buf = [] - current_tokens = 0 - - def _flush(sec_title, sec_path): - nonlocal current_buf, current_tokens - if not current_buf: return - txt = "\n\n".join(current_buf) - win = f"# {doc_title}\n## {sec_title}\n{txt}".strip() if sec_title else txt - idx = len(chunks) - chunks.append(Chunk( - id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, - text=txt, window=win, token_count=estimate_tokens(txt), - section_title=sec_title, section_path=sec_path, - neighbors_prev=None, neighbors_next=None, - candidate_pool=[] - )) - current_buf = [] - current_tokens = 0 - - for b in blocks: - if b.kind == "heading": - # Hierarchie-Check: Split bei Überschriften oberhalb des Split-Levels - if b.level < split_level: - _flush(b.section_title, b.section_path) - elif b.level == split_level: - if strict or current_tokens >= target: - _flush(b.section_title, b.section_path) - continue - - block_tokens = estimate_tokens(b.text) - if current_tokens + block_tokens > max_tokens and current_buf: - _flush(b.section_title, b.section_path) - - current_buf.append(b.text) - current_tokens += block_tokens - - if current_buf: - last = blocks[-1] if blocks else None - _flush(last.section_title if last else None, last.section_path if last else "/") - - return chunks - -# ========================================== -# 4. ROBUST EDGE PARSING & PROPAGATION -# ========================================== - -def _parse_edges_robust(text: str) -> Set[str]: - """ - Findet Kanten im Text (Wikilinks, Inlines, Callouts). - Fix V3: Support für mehrzeilige Callouts. - """ - found_edges = set() - - # A. Inline [[rel:type|target]] - inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text) - for kind, target in inlines: - k = kind.strip().lower() - t = target.strip() - if k and t: found_edges.add(f"{k}:{t}") - - # B. Multiline Callouts Parsing (WP-15 Fix) - lines = text.split('\n') - current_edge_type = None - for line in lines: - stripped = line.strip() - callout_match = re.match(r'>\s*\[!edge\]\s*([^:\s]+)', stripped) - if callout_match: - current_edge_type = callout_match.group(1).strip().lower() - links = re.findall(r'\[\[([^\]]+)\]\]', stripped) - for l in links: - if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") - continue - - if current_edge_type and stripped.startswith('>'): - links = re.findall(r'\[\[([^\]]+)\]\]', stripped) - for l in links: - if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") - elif not stripped.startswith('>'): - current_edge_type = None - - return found_edges - -def _propagate_section_edges(chunks: List[Chunk], blocks: List[RawBlock]) -> List[Chunk]: - """ - WP-15b: Implementiert Edge-Inheritance. - Kanten aus Überschriften werden an untergeordnete Chunks vererbt. - """ - section_inheritance: Dict[str, Set[str]] = {} - - # 1. Sammeln aus den Heading-Blöcken - for b in blocks: - if b.kind == "heading": - edges = _parse_edges_robust(b.text) - if edges: - if b.section_path not in section_inheritance: - section_inheritance[b.section_path] = set() - section_inheritance[b.section_path].update(edges) - - # 2. Injektion in den Candidate-Pool - for ch in chunks: - inherited = section_inheritance.get(ch.section_path, set()) - for e_str in inherited: - kind, target = e_str.split(':', 1) - ch.candidate_pool.append({"kind": kind, "to": target, "provenance": "inherited"}) - - return chunks - -# ========================================== -# 5. ORCHESTRATION (WP-15b) -# ========================================== - async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]: """ - Hauptfunktion zur Chunk-Generierung. - Baut den Candidate-Pool für die semantische Validierung auf. + Hauptfunktion zur Chunk-Generierung. Orchestriert die modularisierten Komponenten. + Sichert die Kompatibilität zum bestehenden Ingestion-Prozess. """ if config is None: config = get_chunk_config(note_type) @@ -395,51 +38,47 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op fm, body_text = extract_frontmatter_from_text(md_text) primary_strategy = config.get("strategy", "sliding_window") - # 1. Parsing & Splitting + # 1. Parsing blocks, doc_title = parse_blocks(md_text) + # 2. Splitting via Thread-Offloading if primary_strategy == "by_heading": - chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title) + chunks = await asyncio.to_thread(strategy_by_heading, blocks, config, note_id, doc_title) else: - chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id, doc_title) + chunks = await asyncio.to_thread(strategy_sliding_window, blocks, config, note_id) if not chunks: return [] - # 2. WP-15b: Candidate Pool Vorbereitung - + # 3. WP-15b: Candidate Pool Vorbereitung # A. Edge Inheritance (Sektions-Propagation) - chunks = _propagate_section_edges(chunks, blocks) + chunks = propagate_section_edges(chunks, blocks) - # B. Explicit Edges (Direkt im Chunk-Text enthalten) + # B. Explicit Edges (Direkt im Chunk-Text) for ch in chunks: - explicit = _parse_edges_robust(ch.text) + explicit = parse_edges_robust(ch.text) for e_str in explicit: kind, target = e_str.split(':', 1) ch.candidate_pool.append({"kind": kind, "to": target, "provenance": "explicit"}) - # C. Global "Unassigned Pool" Detection (Safety Net) - # Sucht nach einer Sektion "Unzugeordnete Kanten" im Body - unassigned_pool = set() + # C. Global Pool Detection (Sektion 'Unzugeordnete Kanten') pool_match = re.search(r'###?\s*(?:Unzugeordnete Kanten|Edge Pool|Candidates)\s*\n(.*?)(?:\n#|$)', body_text, re.DOTALL | re.IGNORECASE) if pool_match: - unassigned_pool = _parse_edges_robust(pool_match.group(1)) + unassigned = parse_edges_robust(pool_match.group(1)) for ch in chunks: - for e_str in unassigned_pool: + for e_str in unassigned: kind, target = e_str.split(':', 1) ch.candidate_pool.append({"kind": kind, "to": target, "provenance": "global_pool"}) - # D. De-Duplikation des Pools + # D. Eindeutigkeit sicherstellen for ch in chunks: - seen = set() - unique_pool = [] + seen = set(); unique_pool = [] for cand in ch.candidate_pool: key = (cand["kind"], cand["to"]) if key not in seen: - seen.add(key) - unique_pool.append(cand) + seen.add(key); unique_pool.append(cand) ch.candidate_pool = unique_pool - # 3. Nachbarschafts-Verkettung (Struktur-Kanten) + # 4. Graph-Struktur (Nachbarschaft) for i, ch in enumerate(chunks): ch.neighbors_prev = chunks[i-1].id if i > 0 else None ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None diff --git a/app/core/chunking/__init__.py b/app/core/chunking/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/core/chunking/chunking_models.py b/app/core/chunking/chunking_models.py new file mode 100644 index 0000000..d64c4e7 --- /dev/null +++ b/app/core/chunking/chunking_models.py @@ -0,0 +1,31 @@ +""" +FILE: app/core/chunking/chunking_models.py +DESCRIPTION: Datenklassen für das Chunking-System. +""" +from dataclasses import dataclass, field +from typing import List, Dict, Optional, Any + +@dataclass +class RawBlock: + """Repräsentiert einen logischen Block aus dem Markdown-Parsing.""" + kind: str + text: str + level: Optional[int] + section_path: str + section_title: Optional[str] + +@dataclass +class Chunk: + """Das finale Chunk-Objekt für Embedding und Graph-Speicherung.""" + id: str + note_id: str + index: int + text: str + window: str + token_count: int + section_title: Optional[str] + section_path: str + neighbors_prev: Optional[str] + neighbors_next: Optional[str] + candidate_pool: List[Dict[str, Any]] = field(default_factory=list) + suggested_edges: Optional[List[str]] = None \ No newline at end of file diff --git a/app/core/chunking/chunking_parser.py b/app/core/chunking/chunking_parser.py new file mode 100644 index 0000000..0524484 --- /dev/null +++ b/app/core/chunking/chunking_parser.py @@ -0,0 +1,74 @@ +""" +FILE: app/core/chunking/chunking_parser.py +DESCRIPTION: Zerlegt Markdown in Blöcke und extrahiert Kanten-Strings. +""" +import re +from typing import List, Tuple, Set +from .chunking_models import RawBlock +from .chunking_utils import extract_frontmatter_from_text + +_WS = re.compile(r'\s+') +_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])') + +def split_sentences(text: str) -> list[str]: + """Teilt Text in Sätze auf.""" + text = _WS.sub(' ', text.strip()) + if not text: return [] + return [p.strip() for p in _SENT_SPLIT.split(text) if p.strip()] + +def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: + """Zerlegt Text in logische Einheiten.""" + blocks = [] + h1_title = "Dokument"; section_path = "/"; current_h2 = None + fm, text_without_fm = extract_frontmatter_from_text(md_text) + h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) + if h1_match: h1_title = h1_match.group(1).strip() + lines = text_without_fm.split('\n') + buffer = [] + for line in lines: + stripped = line.strip() + if stripped.startswith('# '): continue + elif stripped.startswith('## '): + if buffer: + content = "\n".join(buffer).strip() + if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) + buffer = [] + current_h2 = stripped[3:].strip() + section_path = f"/{current_h2}" + blocks.append(RawBlock("heading", stripped, 2, section_path, current_h2)) + elif not stripped: + if buffer: + content = "\n".join(buffer).strip() + if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) + buffer = [] + else: buffer.append(line) + if buffer: + content = "\n".join(buffer).strip() + if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) + return blocks, h1_title + +def parse_edges_robust(text: str) -> Set[str]: + """Extrahiert Kanten-Kandidaten (Wikilinks, Callouts).""" + found_edges = set() + inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text) + for kind, target in inlines: + k = kind.strip().lower() + t = target.strip() + if k and t: found_edges.add(f"{k}:{t}") + lines = text.split('\n') + current_edge_type = None + for line in lines: + stripped = line.strip() + callout_match = re.match(r'>\s*\[!edge\]\s*([^:\s]+)', stripped) + if callout_match: + current_edge_type = callout_match.group(1).strip().lower() + links = re.findall(r'\[\[([^\]]+)\]\]', stripped) + for l in links: + if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") + continue + if current_edge_type and stripped.startswith('>'): + links = re.findall(r'\[\[([^\]]+)\]\]', stripped) + for l in links: + if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") + elif not stripped.startswith('>'): current_edge_type = None + return found_edges \ No newline at end of file diff --git a/app/core/chunking/chunking_propagation.py b/app/core/chunking/chunking_propagation.py new file mode 100644 index 0000000..1aeb361 --- /dev/null +++ b/app/core/chunking/chunking_propagation.py @@ -0,0 +1,25 @@ +""" +FILE: app/core/chunking/chunking_propagation.py +DESCRIPTION: Vererbung von Kanten (Inheritance) über Sektions-Pfade. +""" +from typing import List, Dict, Set +from .chunking_models import Chunk, RawBlock +from .chunking_parser import parse_edges_robust + +def propagate_section_edges(chunks: List[Chunk], blocks: List[RawBlock]) -> List[Chunk]: + """WP-15b: Kanten aus Headings werden an Sub-Chunks vererbt.""" + section_inheritance: Dict[str, Set[str]] = {} + for b in blocks: + if b.kind == "heading": + edges = parse_edges_robust(b.text) + if edges: + if b.section_path not in section_inheritance: + section_inheritance[b.section_path] = set() + section_inheritance[b.section_path].update(edges) + + for ch in chunks: + inherited = section_inheritance.get(ch.section_path, set()) + for e_str in inherited: + kind, target = e_str.split(':', 1) + ch.candidate_pool.append({"kind": kind, "to": target, "provenance": "inherited"}) + return chunks \ No newline at end of file diff --git a/app/core/chunking/chunking_strategies.py b/app/core/chunking/chunking_strategies.py new file mode 100644 index 0000000..7684bd5 --- /dev/null +++ b/app/core/chunking/chunking_strategies.py @@ -0,0 +1,74 @@ +""" +FILE: app/core/chunking/chunking_strategies.py +DESCRIPTION: Implementierung der mathematischen Splitting-Strategien. +""" +from typing import List, Dict, Any +from .chunking_models import RawBlock, Chunk +from .chunking_utils import estimate_tokens +from .chunking_parser import split_sentences + +def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]: + """Fasst Blöcke zusammen und schneidet bei 'target' Tokens.""" + target = config.get("target", 400); max_tokens = config.get("max", 600) + overlap_val = config.get("overlap", (50, 80)) + overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val + chunks = []; buf = [] + + def _add(txt, sec, path): + idx = len(chunks); win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt + chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None)) + + def flush(): + nonlocal buf + if not buf: return + text_body = "\n\n".join([b.text for b in buf]) + sec_title = buf[-1].section_title; sec_path = buf[-1].section_path + if estimate_tokens(text_body) <= max_tokens: _add(text_body, sec_title, sec_path) + else: + sents = split_sentences(text_body); cur_sents = []; cur_len = 0 + for s in sents: + slen = estimate_tokens(s) + if cur_len + slen > target and cur_sents: + _add(" ".join(cur_sents), sec_title, sec_path) + ov_s = []; ov_l = 0 + for os in reversed(cur_sents): + if ov_l + estimate_tokens(os) < overlap: ov_s.insert(0, os); ov_l += estimate_tokens(os) + else: break + cur_sents = list(ov_s); cur_sents.append(s); cur_len = ov_l + slen + else: cur_sents.append(s); cur_len += slen + if cur_sents: _add(" ".join(cur_sents), sec_title, sec_path) + buf = [] + + for b in blocks: + if b.kind == "heading": continue + if estimate_tokens("\n\n".join([x.text for x in buf])) + estimate_tokens(b.text) >= target: flush() + buf.append(b) + if estimate_tokens(b.text) >= target: flush() + flush() + return chunks + +def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: + """Splittet Text basierend auf Markdown-Überschriften.""" + strict = config.get("strict_heading_split", False); target = config.get("target", 400) + max_tokens = config.get("max", 600); split_level = config.get("split_level", 2) + chunks = []; buf = []; cur_tokens = 0 + + def _flush(title, path): + nonlocal buf, cur_tokens + if not buf: return + txt = "\n\n".join(buf); win = f"# {doc_title}\n## {title}\n{txt}".strip() if title else txt + idx = len(chunks) + chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=title, section_path=path, neighbors_prev=None, neighbors_next=None)) + buf = []; cur_tokens = 0 + + for b in blocks: + if b.kind == "heading": + if b.level < split_level: _flush(b.section_title, b.section_path) + elif b.level == split_level: + if strict or cur_tokens >= target: _flush(b.section_title, b.section_path) + continue + bt = estimate_tokens(b.text) + if cur_tokens + bt > max_tokens and buf: _flush(b.section_title, b.section_path) + buf.append(b.text); cur_tokens += bt + if buf: _flush(blocks[-1].section_title if blocks else None, blocks[-1].section_path if blocks else "/") + return chunks \ No newline at end of file diff --git a/app/core/chunking/chunking_utils.py b/app/core/chunking/chunking_utils.py new file mode 100644 index 0000000..da812aa --- /dev/null +++ b/app/core/chunking/chunking_utils.py @@ -0,0 +1,55 @@ +""" +FILE: app/core/chunking/chunking_utils.py +DESCRIPTION: Hilfswerkzeuge für Token-Schätzung und YAML-Konfiguration. +""" +import math +import yaml +import logging +from pathlib import Path +from typing import Dict, Any, Tuple + +logger = logging.getLogger(__name__) + +BASE_DIR = Path(__file__).resolve().parent.parent.parent.parent +CONFIG_PATH = BASE_DIR / "config" / "types.yaml" +DEFAULT_PROFILE = {"strategy": "sliding_window", "target": 400, "max": 600, "overlap": (50, 80)} + +_CONFIG_CACHE = None + +def load_yaml_config() -> Dict[str, Any]: + global _CONFIG_CACHE + if _CONFIG_CACHE is not None: return _CONFIG_CACHE + if not CONFIG_PATH.exists(): return {} + try: + with open(CONFIG_PATH, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + _CONFIG_CACHE = data + return data + except Exception: return {} + +def get_chunk_config(note_type: str) -> Dict[str, Any]: + """Lädt die Chunking-Strategie basierend auf dem Note-Type.""" + full_config = load_yaml_config() + profiles = full_config.get("chunking_profiles", {}) + type_def = full_config.get("types", {}).get(note_type.lower(), {}) + profile_name = type_def.get("chunking_profile") or full_config.get("defaults", {}).get("chunking_profile", "sliding_standard") + config = profiles.get(profile_name, DEFAULT_PROFILE).copy() + if "overlap" in config and isinstance(config["overlap"], list): + config["overlap"] = tuple(config["overlap"]) + return config + +def estimate_tokens(text: str) -> int: + """Grobe Schätzung der Token-Anzahl.""" + return max(1, math.ceil(len(text.strip()) / 4)) + +def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]: + """Trennt YAML-Frontmatter vom Text.""" + import re + fm_match = re.match(r'^\s*---\s*\n(.*?)\n---', md_text, re.DOTALL) + if not fm_match: return {}, md_text + try: + frontmatter = yaml.safe_load(fm_match.group(1)) + if not isinstance(frontmatter, dict): frontmatter = {} + except Exception: frontmatter = {} + text_without_fm = re.sub(r'^\s*---\s*\n(.*?)\n---', '', md_text, flags=re.DOTALL) + return frontmatter, text_without_fm.strip() \ No newline at end of file diff --git a/app/core/ingestion.py b/app/core/ingestion.py index a5a80d8..a140178 100644 --- a/app/core/ingestion.py +++ b/app/core/ingestion.py @@ -1,373 +1,15 @@ """ FILE: app/core/ingestion.py -DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen. - WP-20: Optimiert für OpenRouter (mistralai/mistral-7b-instruct:free). - WP-22: Content Lifecycle, Edge Registry Validation & Multi-Hash. - WP-15b: Two-Pass Ingestion mit LocalBatchCache & Candidate-Validation. - Sichert, dass explizite Kanten direkt übernommen und nur Pool-Kanten validiert werden. -FIX: Deep Fallback Logic (v2.11.14) für JSON-Recovery. - Robust Lookup Fix: Adressiert Notizen im Cache via ID, Titel und Dateiname. -VERSION: 2.12.2 +DESCRIPTION: Facade für das Ingestion-Package. Stellt 100% Abwärtskompatibilität sicher. + WP-14: Modularisierung der Ingestion-Pipeline abgeschlossen. + Nutzt interne Module mit 'ingestion_' Präfix für maximale Wartbarkeit. +VERSION: 2.13.0 STATUS: Active -DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, - app.services.llm_service, app.services.edge_registry """ -import os -import json -import re -import logging -import asyncio -import time -from typing import Dict, List, Optional, Tuple, Any +# Export der Hauptklasse für externe Module (z.B. scripts/import_markdown.py) +from .ingestion.ingestion_processor import IngestionService -# Core Module Imports -from app.core.parser import ( - read_markdown, - pre_scan_markdown, - normalize_frontmatter, - validate_required_frontmatter, - extract_edges_with_context, - NoteContext -) -from app.core.note_payload import make_note_payload -from app.core.chunker import assemble_chunks, get_chunk_config -from app.core.chunk_payload import make_chunk_payloads +# Export der Hilfsfunktionen für Abwärtskompatibilität +from .ingestion.ingestion_utils import extract_json_from_response, load_type_registry -# Fallback für Edges -try: - from app.core.derive_edges import build_edges_for_note -except ImportError: - def build_edges_for_note(*args, **kwargs): return [] - -from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes -from app.core.qdrant_points import ( - points_for_chunks, - points_for_note, - points_for_edges, - upsert_batch, -) - -from app.services.embeddings_client import EmbeddingsClient -from app.services.edge_registry import registry as edge_registry -from app.services.llm_service import LLMService - -logger = logging.getLogger(__name__) - -# --- Global Helpers (Full Compatibility v2.11.14) --- -def extract_json_from_response(text: str) -> Any: - """ - Extrahiert JSON-Daten und bereinigt LLM-Steuerzeichen (Mistral/Llama). - Entfernt , [OUT], [/OUT] und Markdown-Blöcke für maximale Robustheit. - """ - if not text or not isinstance(text, str): - return [] - - # 1. Entferne Mistral/Llama Steuerzeichen und Tags - clean = text.replace("", "").replace("", "") - clean = clean.replace("[OUT]", "").replace("[/OUT]", "") - clean = clean.strip() - - # 2. Suche nach Markdown JSON-Blöcken (```json ... ```) - match = re.search(r"```(?:json)?\s*(.*?)\s*```", clean, re.DOTALL) - payload = match.group(1) if match else clean - - try: - return json.loads(payload.strip()) - except json.JSONDecodeError: - # 3. Recovery: Suche nach der ersten [ und letzten ] (Liste) - start = payload.find('[') - end = payload.rfind(']') + 1 - if start != -1 and end > start: - try: - return json.loads(payload[start:end]) - except: pass - - # 4. Zweite Recovery: Suche nach der ersten { und letzten } (Objekt) - start_obj = payload.find('{') - end_obj = payload.rfind('}') + 1 - if start_obj != -1 and end_obj > start_obj: - try: - return json.loads(payload[start_obj:end_obj]) - except: pass - - return [] - -def load_type_registry(custom_path: Optional[str] = None) -> dict: - """Lädt die types.yaml zur Steuerung der typ-spezifischen Ingestion.""" - import yaml - from app.config import get_settings - settings = get_settings() - path = custom_path or settings.MINDNET_TYPES_FILE - if not os.path.exists(path): return {} - try: - with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} - except Exception: return {} - -# --- Service Class --- -class IngestionService: - def __init__(self, collection_prefix: str = None): - from app.config import get_settings - self.settings = get_settings() - - self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX - self.cfg = QdrantConfig.from_env() - self.cfg.prefix = self.prefix - self.client = get_client(self.cfg) - self.dim = self.settings.VECTOR_SIZE - self.registry = load_type_registry() - self.embedder = EmbeddingsClient() - self.llm = LLMService() - - self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE - self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache - - try: - ensure_collections(self.client, self.prefix, self.dim) - ensure_payload_indexes(self.client, self.prefix) - except Exception as e: - logger.warning(f"DB init warning: {e}") - - async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]: - """ - WP-15b: Implementiert den Two-Pass Ingestion Workflow. - Pass 1: Pre-Scan baut flüchtigen Kontext-Cache auf. - Pass 2: Processing führt die eigentliche semantische Validierung durch. - """ - logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...") - for path in file_paths: - ctx = pre_scan_markdown(path) - if ctx: - # Mehrfache Indizierung für robusten Look-up (WP-15b Fix) - self.batch_cache[ctx.note_id] = ctx - self.batch_cache[ctx.title] = ctx - # Dateiname ohne Endung als dritter Schlüssel - fname = os.path.splitext(os.path.basename(path))[0] - self.batch_cache[fname] = ctx - - logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...") - results = [] - for path in file_paths: - res = await self.process_file(path, vault_root, apply=True) - results.append(res) - return results - - async def _validate_candidate(self, chunk_text: str, edge: Dict) -> bool: - """ - WP-15b: Validiert einen Kanten-Kandidaten semantisch gegen das Ziel. - Nutzt den Cache aus Pass 1, um dem LLM Kontext der Ziel-Note zu geben. - """ - target_id = edge.get("to") - target_ctx = self.batch_cache.get(target_id) - - # Fallback Look-up für Links mit Ankern (Anchor entfernen) - if not target_ctx and "#" in target_id: - base_id = target_id.split("#")[0] - target_ctx = self.batch_cache.get(base_id) - - # Sicherheits-Fallback: Wenn Zielnotiz nicht im aktuellen Batch ist, - # lassen wir die Kante als 'explicit' durch (Hard-Link Integrity). - if not target_ctx: - logger.info(f"ℹ️ [VALIDATION SKIP] No context for '{target_id}' - allowing link.") - return True - - provider = self.settings.MINDNET_LLM_PROVIDER - template = self.llm.get_prompt("edge_validation", provider) - - try: - logger.info(f"⚖️ [VALIDATING] Relation '{edge.get('kind')}' -> '{target_id}'...") - prompt = template.format( - chunk_text=chunk_text[:1500], - target_title=target_ctx.title, - target_summary=target_ctx.summary, - edge_kind=edge.get("kind", "related_to") - ) - - response = await self.llm.generate_raw_response(prompt, priority="background") - is_valid = "YES" in response.upper() - - if is_valid: - logger.info(f"✅ [VALIDATED] Relation to '{target_id}' confirmed.") - else: - logger.info(f"🚫 [REJECTED] Relation to '{target_id}' irrelevant for this chunk.") - - return is_valid - except Exception as e: - logger.warning(f"⚠️ Semantic validation error for {target_id}: {e}") - return True # Fallback: Im Zweifel Link behalten - - def _resolve_note_type(self, requested: Optional[str]) -> str: - """Bestimmt den finalen Notiz-Typ (Fallback auf 'concept').""" - types = self.registry.get("types", {}) - if requested and requested in types: return requested - return "concept" - - def _get_chunk_config_by_profile(self, profile_name: str, note_type: str) -> Dict[str, Any]: - """Holt die Chunker-Parameter für ein spezifisches Profil aus der Registry.""" - profiles = self.registry.get("chunking_profiles", {}) - if profile_name in profiles: - cfg = profiles[profile_name].copy() - if "overlap" in cfg and isinstance(cfg["overlap"], list): - cfg["overlap"] = tuple(cfg["overlap"]) - return cfg - return get_chunk_config(note_type) - - async def process_file( - self, file_path: str, vault_root: str, - force_replace: bool = False, apply: bool = False, purge_before: bool = False, - note_scope_refs: bool = False, hash_source: str = "parsed", hash_normalize: str = "canonical" - ) -> Dict[str, Any]: - """Transformiert eine Markdown-Datei in den Graphen.""" - result = {"path": file_path, "status": "skipped", "changed": False, "error": None} - - # 1. Parse & Lifecycle Gate - try: - parsed = read_markdown(file_path) - if not parsed: return {**result, "error": "Empty file"} - fm = normalize_frontmatter(parsed.frontmatter) - validate_required_frontmatter(fm) - except Exception as e: - return {**result, "error": f"Validation failed: {str(e)}"} - - # Lifecycle Filter (WP-22) - status = fm.get("status", "draft").lower().strip() - if status in ["system", "template", "archive", "hidden"]: - return {**result, "status": "skipped", "reason": f"lifecycle_{status}"} - - # 2. Config Resolution & Payload - note_type = self._resolve_note_type(fm.get("type")) - fm["type"] = note_type - - try: - note_pl = make_note_payload(parsed, vault_root=vault_root, hash_normalize=hash_normalize, hash_source=hash_source, file_path=file_path) - note_id = note_pl["note_id"] - except Exception as e: - return {**result, "error": f"Payload failed: {str(e)}"} - - # 3. Change Detection (v2.11.14 Logic) - old_payload = None if force_replace else self._fetch_note_payload(note_id) - check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}" - old_hash = (old_payload or {}).get("hashes", {}).get(check_key) - new_hash = note_pl.get("hashes", {}).get(check_key) - - chunks_missing, edges_missing = self._artifacts_missing(note_id) - should_write = force_replace or (not old_payload) or (old_hash != new_hash) or chunks_missing or edges_missing - - if not should_write: - return {**result, "status": "unchanged", "note_id": note_id} - - if not apply: - return {**result, "status": "dry-run", "changed": True, "note_id": note_id} - - # 4. Processing (Chunking, Embedding, Validated Edges) - try: - body_text = getattr(parsed, "body", "") or "" - edge_registry.ensure_latest() - - # Chunker Resolution - profile = fm.get("chunk_profile") or fm.get("chunking_profile") or "sliding_standard" - chunk_cfg = self._get_chunk_config_by_profile(profile, note_type) - enable_smart_edges = chunk_cfg.get("enable_smart_edge_allocation", False) - - # WP-15b: Chunker bereitet nun den Candidate-Pool vor (inkl. Inheritance). - chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_cfg) - - # WP-15b: Validierung NUR für Kandidaten aus dem global_pool (Unzugeordnete Kanten) - for ch_obj in chunks: - filtered_pool = [] - for cand in getattr(ch_obj, "candidate_pool", []): - # Nur 'global_pool' erfordert LLM-Validierung. - # 'explicit' und 'inherited' werden direkt akzeptiert. - if cand.get("provenance") == "global_pool" and enable_smart_edges: - if await self._validate_candidate(ch_obj.text, cand): - filtered_pool.append(cand) - else: - filtered_pool.append(cand) - ch_obj.candidate_pool = filtered_pool - - chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text) - - # Embeddings generieren - vecs = [] - if chunk_pls: - texts = [c.get("window") or c.get("text") or "" for c in chunk_pls] - vecs = await self.embedder.embed_documents(texts) - - # Kanten finalisieren via derive_edges Aggregator (WP-15b kompatibel) - # Nutzt das Provenance-Ranking (v2.1.0). - edges = build_edges_for_note( - note_id, - chunk_pls, - note_level_references=note_pl.get("references", []), - include_note_scope_refs=note_scope_refs - ) - - # Alias-Auflösung & Registry Enforcement - context = {"file": file_path, "note_id": note_id} - for e in edges: - e["kind"] = edge_registry.resolve( - edge_type=e.get("kind", "related_to"), - provenance=e.get("provenance", "explicit"), - context={**context, "line": e.get("line", "system")} - ) - - except Exception as e: - logger.error(f"Processing failed for {file_path}: {e}", exc_info=True) - return {**result, "error": f"Processing failed: {str(e)}"} - - # 5. DB Upsert - try: - if purge_before and old_payload: self._purge_artifacts(note_id) - - n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) - upsert_batch(self.client, n_name, n_pts) - - if chunk_pls and vecs: - # v2.11.14 Points-Extraction Logic - c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1] - upsert_batch(self.client, f"{self.prefix}_chunks", c_pts) - - if edges: - # v2.11.14 Points-Extraction Logic - e_pts = points_for_edges(self.prefix, edges)[1] - upsert_batch(self.client, f"{self.prefix}_edges", e_pts) - - return {"path": file_path, "status": "success", "changed": True, "note_id": note_id, "chunks_count": len(chunk_pls), "edges_count": len(edges)} - except Exception as e: - return {**result, "error": f"DB Upsert failed: {e}"} - - def _fetch_note_payload(self, note_id: str) -> Optional[dict]: - """Holt die Metadaten einer Note aus Qdrant.""" - from qdrant_client.http import models as rest - try: - f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - pts, _ = self.client.scroll(collection_name=f"{self.prefix}_notes", scroll_filter=f, limit=1, with_payload=True) - return pts[0].payload if pts else None - except: return None - - def _artifacts_missing(self, note_id: str) -> Tuple[bool, bool]: - """Prüft Qdrant aktiv auf vorhandene Chunks und Edges.""" - from qdrant_client.http import models as rest - try: - f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - c_pts, _ = self.client.scroll(collection_name=f"{self.prefix}_chunks", scroll_filter=f, limit=1) - e_pts, _ = self.client.scroll(collection_name=f"{self.prefix}_edges", scroll_filter=f, limit=1) - return (not bool(c_pts)), (not bool(e_pts)) - except: return True, True - - def _purge_artifacts(self, note_id: str): - """Löscht verwaiste Chunks/Edges vor einem Re-Import.""" - from qdrant_client.http import models as rest - f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - for suffix in ["chunks", "edges"]: - try: self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=rest.FilterSelector(filter=f)) - except: pass - - async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]: - """Hilfsmethode zur Erstellung einer Note aus einem Textstream.""" - target_dir = os.path.join(vault_root, folder) - os.makedirs(target_dir, exist_ok=True) - file_path = os.path.join(target_dir, filename) - with open(file_path, "w", encoding="utf-8") as f: - f.write(markdown_content) - await asyncio.sleep(0.1) - return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) \ No newline at end of file +__all__ = ["IngestionService", "extract_json_from_response", "load_type_registry"] \ No newline at end of file diff --git a/app/core/ingestion/__init__.py b/app/core/ingestion/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/core/ingestion/ingestion_db.py b/app/core/ingestion/ingestion_db.py new file mode 100644 index 0000000..9acf096 --- /dev/null +++ b/app/core/ingestion/ingestion_db.py @@ -0,0 +1,31 @@ +""" +FILE: app/core/ingestion/ingestion_db.py +DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung. +""" +from typing import Optional, Tuple +from qdrant_client import QdrantClient +from qdrant_client.http import models as rest + +def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optional[dict]: + """Holt die Metadaten einer Note aus Qdrant via Scroll.""" + try: + f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) + pts, _ = client.scroll(collection_name=f"{prefix}_notes", scroll_filter=f, limit=1, with_payload=True) + return pts[0].payload if pts else None + except: return None + +def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[bool, bool]: + """Prüft Qdrant aktiv auf vorhandene Chunks und Edges.""" + try: + f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) + c_pts, _ = client.scroll(collection_name=f"{prefix}_chunks", scroll_filter=f, limit=1) + e_pts, _ = client.scroll(collection_name=f"{prefix}_edges", scroll_filter=f, limit=1) + return (not bool(c_pts)), (not bool(e_pts)) + except: return True, True + +def purge_artifacts(client: QdrantClient, prefix: str, note_id: str): + """Löscht verwaiste Chunks/Edges vor einem Re-Import.""" + f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) + for suffix in ["chunks", "edges"]: + try: client.delete(collection_name=f"{prefix}_{suffix}", points_selector=rest.FilterSelector(filter=f)) + except: pass \ No newline at end of file diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py new file mode 100644 index 0000000..06c292d --- /dev/null +++ b/app/core/ingestion/ingestion_processor.py @@ -0,0 +1,152 @@ +""" +FILE: app/core/ingestion/ingestion_processor.py +DESCRIPTION: Orchestriert den Ingestion-Prozess (Parsing -> Chunking -> Validierung -> DB). +""" +import logging +import asyncio +from typing import Dict, List, Optional, Tuple, Any + +from app.core.parser import ( + read_markdown, pre_scan_markdown, normalize_frontmatter, + validate_required_frontmatter, NoteContext +) +from app.core.note_payload import make_note_payload +from app.core.chunker import assemble_chunks +from app.core.chunk_payload import make_chunk_payloads +from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes +from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch + +from app.services.embeddings_client import EmbeddingsClient +from app.services.edge_registry import registry as edge_registry +from app.services.llm_service import LLMService + +# Package-Interne Imports +from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile +from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts +from .ingestion_validation import validate_edge_candidate + +# Fallback für Edges +try: + from app.core.derive_edges import build_edges_for_note +except ImportError: + def build_edges_for_note(*args, **kwargs): return [] + +logger = logging.getLogger(__name__) + +class IngestionService: + def __init__(self, collection_prefix: str = None): + from app.config import get_settings + self.settings = get_settings() + self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX + self.cfg = QdrantConfig.from_env() + self.cfg.prefix = self.prefix + self.client = get_client(self.cfg) + self.dim = self.settings.VECTOR_SIZE + self.registry = load_type_registry() + self.embedder = EmbeddingsClient() + self.llm = LLMService() + self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE + self.batch_cache: Dict[str, NoteContext] = {} + + try: + ensure_collections(self.client, self.prefix, self.dim) + ensure_payload_indexes(self.client, self.prefix) + except Exception as e: logger.warning(f"DB init warning: {e}") + + async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]: + """WP-15b: Two-Pass Ingestion Workflow.""" + logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...") + for path in file_paths: + ctx = pre_scan_markdown(path) + if ctx: + self.batch_cache[ctx.note_id] = ctx + self.batch_cache[ctx.title] = ctx + import os + fname = os.path.splitext(os.path.basename(path))[0] + self.batch_cache[fname] = ctx + + logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...") + return [await self.process_file(p, vault_root, apply=True) for p in file_paths] + + async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]: + """Transformiert eine Markdown-Datei in den Graphen.""" + apply = kwargs.get("apply", False) + force_replace = kwargs.get("force_replace", False) + purge_before = kwargs.get("purge_before", False) + hash_source = kwargs.get("hash_source", "parsed") + hash_normalize = kwargs.get("hash_normalize", "canonical") + + result = {"path": file_path, "status": "skipped", "changed": False, "error": None} + + # 1. Parse & Lifecycle + try: + parsed = read_markdown(file_path) + if not parsed: return {**result, "error": "Empty file"} + fm = normalize_frontmatter(parsed.frontmatter) + validate_required_frontmatter(fm) + except Exception as e: return {**result, "error": f"Validation failed: {str(e)}"} + + if fm.get("status", "draft").lower().strip() in ["system", "template", "archive", "hidden"]: + return {**result, "status": "skipped", "reason": "lifecycle_filter"} + + # 2. Payload & Change Detection + note_type = resolve_note_type(self.registry, fm.get("type")) + note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, hash_source=hash_source, hash_normalize=hash_normalize) + note_id = note_pl["note_id"] + + old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) + check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}" + old_hash = (old_payload or {}).get("hashes", {}).get(check_key) + new_hash = note_pl.get("hashes", {}).get(check_key) + + c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id) + if not (force_replace or not old_payload or old_hash != new_hash or c_miss or e_miss): + return {**result, "status": "unchanged", "note_id": note_id} + + if not apply: return {**result, "status": "dry-run", "changed": True, "note_id": note_id} + + # 3. Processing + try: + body_text = getattr(parsed, "body", "") or "" + edge_registry.ensure_latest() + profile = fm.get("chunk_profile") or fm.get("chunking_profile") or "sliding_standard" + chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type) + enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False) + + chunks = await assemble_chunks(fm["id"], body_text, note_type, config=chunk_cfg) + for ch in chunks: + filtered = [] + for cand in getattr(ch, "candidate_pool", []): + if cand.get("provenance") == "global_pool" and enable_smart: + if await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm, self.settings.MINDNET_LLM_PROVIDER): + filtered.append(cand) + else: filtered.append(cand) + ch.candidate_pool = filtered + + chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text) + vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] + + edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", [])) + for e in edges: + e["kind"] = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"), context={"file": file_path, "note_id": note_id}) + + # 4. DB Upsert + if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id) + n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) + upsert_batch(self.client, n_name, n_pts) + if chunk_pls and vecs: upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1]) + if edges: upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, edges)[1]) + + return {"path": file_path, "status": "success", "changed": True, "note_id": note_id, "chunks_count": len(chunk_pls), "edges_count": len(edges)} + except Exception as e: + logger.error(f"Processing failed: {e}", exc_info=True) + return {**result, "error": str(e)} + + async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]: + import os + target_dir = os.path.join(vault_root, folder) + os.makedirs(target_dir, exist_ok=True) + file_path = os.path.join(target_dir, filename) + with open(file_path, "w", encoding="utf-8") as f: f.write(markdown_content) + await asyncio.sleep(0.1) + return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) \ No newline at end of file diff --git a/app/core/ingestion/ingestion_utils.py b/app/core/ingestion/ingestion_utils.py new file mode 100644 index 0000000..dadba30 --- /dev/null +++ b/app/core/ingestion/ingestion_utils.py @@ -0,0 +1,69 @@ +""" +FILE: app/core/ingestion/ingestion_utils.py +DESCRIPTION: Hilfswerkzeuge für JSON-Recovery, Typ-Registry und Konfigurations-Lookups. +""" +import os +import json +import re +import yaml +from typing import Any, Optional, Dict + +def extract_json_from_response(text: str) -> Any: + """ + Extrahiert JSON-Daten und bereinigt LLM-Steuerzeichen (v2.11.14 Logic). + Entfernt , [OUT], [/OUT] und Markdown-Blöcke für maximale Robustheit. + """ + if not text or not isinstance(text, str): + return [] + + clean = text.replace("", "").replace("", "") + clean = clean.replace("[OUT]", "").replace("[/OUT]", "") + clean = clean.strip() + + match = re.search(r"```(?:json)?\s*(.*?)\s*```", clean, re.DOTALL) + payload = match.group(1) if match else clean + + try: + return json.loads(payload.strip()) + except json.JSONDecodeError: + # Recovery: Suche nach Liste + start = payload.find('[') + end = payload.rfind(']') + 1 + if start != -1 and end > start: + try: return json.loads(payload[start:end]) + except: pass + + # Recovery: Suche nach Objekt + start_obj = payload.find('{') + end_obj = payload.rfind('}') + 1 + if start_obj != -1 and end_obj > start_obj: + try: return json.loads(payload[start_obj:end_obj]) + except: pass + return [] + +def load_type_registry(custom_path: Optional[str] = None) -> dict: + """Lädt die types.yaml zur Steuerung der typ-spezifischen Ingestion.""" + from app.config import get_settings + settings = get_settings() + path = custom_path or settings.MINDNET_TYPES_FILE + if not os.path.exists(path): return {} + try: + with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} + except Exception: return {} + +def resolve_note_type(registry: dict, requested: Optional[str]) -> str: + """Bestimmt den finalen Notiz-Typ (Fallback auf 'concept').""" + types = registry.get("types", {}) + if requested and requested in types: return requested + return "concept" + +def get_chunk_config_by_profile(registry: dict, profile_name: str, note_type: str) -> Dict[str, Any]: + """Holt die Chunker-Parameter für ein spezifisches Profil aus der Registry.""" + from app.core.chunker import get_chunk_config + profiles = registry.get("chunking_profiles", {}) + if profile_name in profiles: + cfg = profiles[profile_name].copy() + if "overlap" in cfg and isinstance(cfg["overlap"], list): + cfg["overlap"] = tuple(cfg["overlap"]) + return cfg + return get_chunk_config(note_type) \ No newline at end of file diff --git a/app/core/ingestion/ingestion_validation.py b/app/core/ingestion/ingestion_validation.py new file mode 100644 index 0000000..038eebf --- /dev/null +++ b/app/core/ingestion/ingestion_validation.py @@ -0,0 +1,53 @@ +""" +FILE: app/core/ingestion/ingestion_validation.py +DESCRIPTION: WP-15b semantische Validierung von Kanten gegen den LocalBatchCache. +""" +import logging +from typing import Dict, Any +from app.core.parser import NoteContext + +logger = logging.getLogger(__name__) + +async def validate_edge_candidate( + chunk_text: str, + edge: Dict, + batch_cache: Dict[str, NoteContext], + llm_service: Any, + provider: str +) -> bool: + """WP-15b: Validiert einen Kandidaten semantisch gegen das Ziel im Cache.""" + target_id = edge.get("to") + target_ctx = batch_cache.get(target_id) + + # Robust Lookup Fix (v2.12.2): Support für Anker + if not target_ctx and "#" in target_id: + base_id = target_id.split("#")[0] + target_ctx = batch_cache.get(base_id) + + # Sicherheits-Fallback (Hard-Link Integrity) + if not target_ctx: + logger.info(f"ℹ️ [VALIDATION SKIP] No context for '{target_id}' - allowing link.") + return True + + template = llm_service.get_prompt("edge_validation", provider) + + try: + logger.info(f"⚖️ [VALIDATING] Relation '{edge.get('kind')}' -> '{target_id}'...") + prompt = template.format( + chunk_text=chunk_text[:1500], + target_title=target_ctx.title, + target_summary=target_ctx.summary, + edge_kind=edge.get("kind", "related_to") + ) + + response = await llm_service.generate_raw_response(prompt, priority="background") + is_valid = "YES" in response.upper() + + if is_valid: + logger.info(f"✅ [VALIDATED] Relation to '{target_id}' confirmed.") + else: + logger.info(f"🚫 [REJECTED] Relation to '{target_id}' irrelevant for this chunk.") + return is_valid + except Exception as e: + logger.warning(f"⚠️ Validation error for {target_id}: {e}") + return True \ No newline at end of file