135 lines
5.2 KiB
Python
135 lines
5.2 KiB
Python
"""
|
|
FILE: app/core/graph/graph_extractors.py
|
|
DESCRIPTION: Regex-basierte Extraktion von Relationen aus Text.
|
|
AUDIT:
|
|
- FIX: extract_callout_relations stoppt nun korrekt bei neuem Header.
|
|
- Regex für Wikilinks liberalisiert (Umlaute, Sonderzeichen).
|
|
"""
|
|
import re
|
|
from typing import List, Tuple
|
|
|
|
# Erlaube alle Zeichen außer ']' im Target (fängt Umlaute, Emojis, '&', '#' ab)
|
|
_WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([^\]]+)\]\]")
|
|
|
|
_REL_PIPE = re.compile(r"\[\[\s*rel:(?P<kind>[a-z_]+)\s*\|\s*(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
|
|
_REL_SPACE = re.compile(r"\[\[\s*rel:(?P<kind>[a-z_]+)\s+(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
|
|
_REL_TEXT = re.compile(r"rel\s*:\s*(?P<kind>[a-z_]+)\s*\[\[\s*(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
|
|
|
|
_CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE)
|
|
_REL_LINE = re.compile(r"^(?P<kind>[a-z_]+)\s*:\s*(?P<targets>.+?)\s*$", re.IGNORECASE)
|
|
_SIMPLE_KIND = re.compile(r"^[a-z_\-]+$", re.IGNORECASE)
|
|
|
|
def extract_typed_relations(text: str) -> Tuple[List[Tuple[str, str]], str]:
|
|
"""
|
|
Findet Inline-Relationen wie [[rel:depends_on Target]].
|
|
Gibt (Liste[(kind, target)], bereinigter_text) zurück.
|
|
"""
|
|
if not text: return [], ""
|
|
pairs = []
|
|
def _collect(m):
|
|
k, t = m.group("kind").strip().lower(), m.group("target").strip()
|
|
pairs.append((k, t))
|
|
return ""
|
|
text = _REL_PIPE.sub(_collect, text)
|
|
text = _REL_SPACE.sub(_collect, text)
|
|
text = _REL_TEXT.sub(_collect, text)
|
|
return pairs, text
|
|
|
|
def extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]:
|
|
"""
|
|
Verarbeitet Obsidian [!edge]-Callouts.
|
|
Stoppt korrekt, wenn ein neuer Header innerhalb eines Blocks gefunden wird.
|
|
"""
|
|
if not text: return [], text
|
|
lines = text.splitlines()
|
|
out_pairs = []
|
|
keep_lines = []
|
|
i = 0
|
|
|
|
while i < len(lines):
|
|
line = lines[i]
|
|
|
|
# 1. Start eines Blocks erkannt
|
|
m = _CALLOUT_START.match(line)
|
|
if m:
|
|
block_lines = []
|
|
header_raw = m.group(1).strip()
|
|
if header_raw:
|
|
block_lines.append(header_raw)
|
|
|
|
i += 1
|
|
# Sammle Folgezeilen, solange sie mit '>' beginnen UND KEIN neuer Header sind
|
|
while i < len(lines) and lines[i].lstrip().startswith('>'):
|
|
# STOP-CHECK: Ist das ein neuer Header?
|
|
if _CALLOUT_START.match(lines[i]):
|
|
break # Breche inneren Loop ab -> Outer Loop behandelt den neuen Header
|
|
|
|
content = lines[i].lstrip()[1:].lstrip()
|
|
if content:
|
|
block_lines.append(content)
|
|
i += 1
|
|
|
|
_process_block(block_lines, out_pairs)
|
|
continue # Weiter im Outer Loop (i steht jetzt auf dem nächsten Header oder Text)
|
|
|
|
# 2. "Headless" Block / Zerschnittener Chunk
|
|
# Wenn Zeile mit '>' beginnt, Links hat, aber wir nicht in einem Header-Block sind
|
|
if line.lstrip().startswith('>'):
|
|
if _WIKILINK_RE.search(line):
|
|
block_lines = []
|
|
# Sammeln bis Ende oder neuer Header
|
|
while i < len(lines) and lines[i].lstrip().startswith('>'):
|
|
if _CALLOUT_START.match(lines[i]):
|
|
break
|
|
|
|
content = lines[i].lstrip()[1:].lstrip()
|
|
if content:
|
|
block_lines.append(content)
|
|
i += 1
|
|
|
|
# Als 'related_to' retten, falls Typ fehlt
|
|
_process_block(block_lines, out_pairs, default_kind="related_to")
|
|
continue
|
|
|
|
keep_lines.append(line)
|
|
i += 1
|
|
|
|
return out_pairs, "\n".join(keep_lines)
|
|
|
|
def _process_block(lines: List[str], out_pairs: List[Tuple[str, str]], default_kind: str = None):
|
|
"""Parsen eines isolierten Blocks."""
|
|
current_kind = default_kind
|
|
|
|
if lines:
|
|
first = lines[0]
|
|
# Ist die erste Zeile ein Typ? (z.B. "based_on")
|
|
if not _REL_LINE.match(first) and _SIMPLE_KIND.match(first):
|
|
current_kind = first.lower()
|
|
|
|
for bl in lines:
|
|
# Format "kind: [[Target]]"
|
|
mrel = _REL_LINE.match(bl)
|
|
if mrel:
|
|
k = mrel.group("kind").strip().lower()
|
|
targets = mrel.group("targets")
|
|
found = _WIKILINK_RE.findall(targets)
|
|
if found:
|
|
for t in found: out_pairs.append((k, t.strip()))
|
|
else:
|
|
for raw in re.split(r"[,;]", targets):
|
|
if raw.strip(): out_pairs.append((k, raw.strip()))
|
|
continue
|
|
|
|
# Format "[[Target]]" (nutzt current_kind)
|
|
found = _WIKILINK_RE.findall(bl)
|
|
if found:
|
|
if current_kind:
|
|
for t in found: out_pairs.append((current_kind, t.strip()))
|
|
else:
|
|
# Fallback ohne Typ
|
|
for t in found: out_pairs.append(("related_to", t.strip()))
|
|
|
|
def extract_wikilinks(text: str) -> List[str]:
|
|
"""Findet Standard-Wikilinks."""
|
|
if not text: return []
|
|
return [m.strip() for m in _WIKILINK_RE.findall(text) if m.strip()] |