mindnet/app/core/chunking/chunking_parser.py

115 lines
4.4 KiB
Python

"""
FILE: app/core/chunking/chunking_parser.py
DESCRIPTION: Zerlegt Markdown in Blöcke. Erhält H1-Überschriften und
gewährleistet die Integrität von Callouts und Listen.
"""
import re
from typing import List, Tuple, Set
from .chunking_models import RawBlock
from .chunking_utils import extract_frontmatter_from_text
_WS = re.compile(r'\s+')
_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])')
def split_sentences(text: str) -> list[str]:
"""Teilt Text in Sätze auf."""
text = _WS.sub(' ', text.strip())
if not text: return []
return [p.strip() for p in _SENT_SPLIT.split(text) if p.strip()]
def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
"""Zerlegt Text in logische Einheiten (RawBlocks), inklusive H1."""
blocks = []
h1_title = "Dokument"
section_path = "/"
current_section_title = None
# Frontmatter entfernen
fm, text_without_fm = extract_frontmatter_from_text(md_text)
# H1 für Note-Titel extrahieren
h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE)
if h1_match:
h1_title = h1_match.group(1).strip()
lines = text_without_fm.split('\n')
buffer = []
for line in lines:
stripped = line.strip()
# Heading-Erkennung (H1 bis H6)
heading_match = re.match(r'^(#{1,6})\s+(.*)', stripped)
if heading_match:
# Vorherigen Text-Block abschließen
if buffer:
content = "\n".join(buffer).strip()
if content:
blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title))
buffer = []
level = len(heading_match.group(1))
title = heading_match.group(2).strip()
# Pfad- und Titel-Update für die Metadaten
if level == 1:
current_section_title = title
section_path = "/"
elif level == 2:
current_section_title = title
section_path = f"/{current_section_title}"
# Die Überschrift als Block hinzufügen (H1 wird NICHT mehr gefiltert)
blocks.append(RawBlock("heading", stripped, level, section_path, current_section_title))
continue
# Trenner oder Leerzeilen beenden einen Block
if (not stripped or stripped == "---") and not line.startswith('>'):
if buffer:
content = "\n".join(buffer).strip()
if content:
blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title))
buffer = []
if stripped == "---":
blocks.append(RawBlock("separator", "---", None, section_path, current_section_title))
else:
buffer.append(line)
if buffer:
content = "\n".join(buffer).strip()
if content:
blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title))
return blocks, h1_title
def parse_edges_robust(text: str) -> Set[str]:
"""Extrahiert Kanten-Kandidaten aus Wikilinks und Callouts."""
found_edges = set()
# 1. Inline Wikilinks [[rel:kind|target]]
inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text)
for kind, target in inlines:
k = kind.strip().lower()
t = target.strip()
if k and t: found_edges.add(f"{k}:{t}")
# 2. Callout Edges > [!edge] kind
lines = text.split('\n')
current_edge_type = None
for line in lines:
stripped = line.strip()
callout_match = re.match(r'>\s*\[!edge\]\s*([^:\s]+)', stripped)
if callout_match:
current_edge_type = callout_match.group(1).strip().lower()
# Links in der gleichen Zeile
links = re.findall(r'\[\[([^\]]+)\]\]', stripped)
for l in links:
if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}")
continue
# Links in Folgezeilen des Callouts
if current_edge_type and stripped.startswith('>'):
links = re.findall(r'\[\[([^\]]+)\]\]', stripped)
for l in links:
if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}")
elif not stripped.startswith('>'):
current_edge_type = None
return found_edges