101 lines
3.8 KiB
Python
101 lines
3.8 KiB
Python
"""
|
|
FILE: app/core/chunking/chunking_parser.py
|
|
DESCRIPTION: Zerlegt Markdown in Blöcke. Hält H1-Überschriften im Stream.
|
|
"""
|
|
import re
|
|
from typing import List, Tuple, Set
|
|
from .chunking_models import RawBlock
|
|
from .chunking_utils import extract_frontmatter_from_text
|
|
|
|
_WS = re.compile(r'\s+')
|
|
_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])')
|
|
|
|
def split_sentences(text: str) -> list[str]:
|
|
"""Teilt Text in Sätze auf."""
|
|
text = _WS.sub(' ', text.strip())
|
|
if not text: return []
|
|
return [p.strip() for p in _SENT_SPLIT.split(text) if p.strip()]
|
|
|
|
def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
|
|
"""Zerlegt Text in logische Einheiten, inklusive aller Überschriften."""
|
|
blocks = []
|
|
h1_title = "Dokument"; section_path = "/"; current_section_title = None
|
|
fm, text_without_fm = extract_frontmatter_from_text(md_text)
|
|
|
|
# H1 für Note-Titel extrahieren (Metadaten)
|
|
h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE)
|
|
if h1_match: h1_title = h1_match.group(1).strip()
|
|
|
|
lines = text_without_fm.split('\n')
|
|
buffer = []
|
|
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
|
|
# Heading-Erkennung (H1 bis H6)
|
|
heading_match = re.match(r'^(#{1,6})\s+(.*)', stripped)
|
|
if heading_match:
|
|
# Vorherigen Text-Block abschließen
|
|
if buffer:
|
|
content = "\n".join(buffer).strip()
|
|
if content:
|
|
blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title))
|
|
buffer = []
|
|
|
|
level = len(heading_match.group(1))
|
|
title = heading_match.group(2).strip()
|
|
|
|
# Pfad- und Titel-Update
|
|
if level == 1:
|
|
current_section_title = title
|
|
section_path = "/"
|
|
elif level == 2:
|
|
current_section_title = title
|
|
section_path = f"/{current_section_title}"
|
|
|
|
blocks.append(RawBlock("heading", stripped, level, section_path, current_section_title))
|
|
continue
|
|
|
|
if not stripped and not line.startswith('>'): # Leerzeilen (außer in Callouts) trennen Blöcke
|
|
if buffer:
|
|
content = "\n".join(buffer).strip()
|
|
if content:
|
|
blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title))
|
|
buffer = []
|
|
else:
|
|
buffer.append(line)
|
|
|
|
if buffer:
|
|
content = "\n".join(buffer).strip()
|
|
if content:
|
|
blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title))
|
|
|
|
return blocks, h1_title
|
|
|
|
def parse_edges_robust(text: str) -> Set[str]:
|
|
"""Extrahiert Kanten-Kandidaten (Wikilinks, Callouts)."""
|
|
found_edges = set()
|
|
inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text)
|
|
for kind, target in inlines:
|
|
k = kind.strip().lower()
|
|
t = target.strip()
|
|
if k and t: found_edges.add(f"{k}:{t}")
|
|
|
|
lines = text.split('\n')
|
|
current_edge_type = None
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
callout_match = re.match(r'>\s*\[!edge\]\s*([^:\s]+)', stripped)
|
|
if callout_match:
|
|
current_edge_type = callout_match.group(1).strip().lower()
|
|
links = re.findall(r'\[\[([^\]]+)\]\]', stripped)
|
|
for l in links:
|
|
if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}")
|
|
continue
|
|
if current_edge_type and stripped.startswith('>'):
|
|
links = re.findall(r'\[\[([^\]]+)\]\]', stripped)
|
|
for l in links:
|
|
if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}")
|
|
elif not stripped.startswith('>'):
|
|
current_edge_type = None
|
|
return found_edges |