WP4d #16
|
|
@ -2,8 +2,8 @@
|
||||||
FILE: app/core/graph/graph_extractors.py
|
FILE: app/core/graph/graph_extractors.py
|
||||||
DESCRIPTION: Regex-basierte Extraktion von Relationen aus Text.
|
DESCRIPTION: Regex-basierte Extraktion von Relationen aus Text.
|
||||||
AUDIT:
|
AUDIT:
|
||||||
- FIX: extract_callout_relations stoppt nun korrekt bei neuem Header.
|
|
||||||
- Regex für Wikilinks liberalisiert (Umlaute, Sonderzeichen).
|
- Regex für Wikilinks liberalisiert (Umlaute, Sonderzeichen).
|
||||||
|
- Callout-Parser erweitert für Multi-Line-Listen und Header-Typen.
|
||||||
"""
|
"""
|
||||||
import re
|
import re
|
||||||
from typing import List, Tuple
|
from typing import List, Tuple
|
||||||
|
|
@ -16,8 +16,10 @@ _REL_SPACE = re.compile(r"\[\[\s*rel:(?P<kind>[a-z_]+)\s+(?P<target>[^\]]+?)\s*\
|
||||||
_REL_TEXT = re.compile(r"rel\s*:\s*(?P<kind>[a-z_]+)\s*\[\[\s*(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
|
_REL_TEXT = re.compile(r"rel\s*:\s*(?P<kind>[a-z_]+)\s*\[\[\s*(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
|
||||||
|
|
||||||
_CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE)
|
_CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE)
|
||||||
|
# Erkennt "kind: targets..."
|
||||||
_REL_LINE = re.compile(r"^(?P<kind>[a-z_]+)\s*:\s*(?P<targets>.+?)\s*$", re.IGNORECASE)
|
_REL_LINE = re.compile(r"^(?P<kind>[a-z_]+)\s*:\s*(?P<targets>.+?)\s*$", re.IGNORECASE)
|
||||||
_SIMPLE_KIND = re.compile(r"^[a-z_\-]+$", re.IGNORECASE)
|
# Erkennt reine Typen (z.B. "depends_on" im Header)
|
||||||
|
_SIMPLE_KIND = re.compile(r"^[a-z_]+$", re.IGNORECASE)
|
||||||
|
|
||||||
def extract_typed_relations(text: str) -> Tuple[List[Tuple[str, str]], str]:
|
def extract_typed_relations(text: str) -> Tuple[List[Tuple[str, str]], str]:
|
||||||
"""
|
"""
|
||||||
|
|
@ -38,7 +40,9 @@ def extract_typed_relations(text: str) -> Tuple[List[Tuple[str, str]], str]:
|
||||||
def extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]:
|
def extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]:
|
||||||
"""
|
"""
|
||||||
Verarbeitet Obsidian [!edge]-Callouts.
|
Verarbeitet Obsidian [!edge]-Callouts.
|
||||||
Stoppt korrekt, wenn ein neuer Header innerhalb eines Blocks gefunden wird.
|
Unterstützt zwei Formate:
|
||||||
|
1. Explizit: "kind: [[Target]]"
|
||||||
|
2. Implizit (Header): "> [!edge] kind" gefolgt von "[[Target]]" Zeilen
|
||||||
"""
|
"""
|
||||||
if not text: return [], text
|
if not text: return [], text
|
||||||
lines = text.splitlines()
|
lines = text.splitlines()
|
||||||
|
|
@ -48,88 +52,76 @@ def extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]:
|
||||||
|
|
||||||
while i < len(lines):
|
while i < len(lines):
|
||||||
line = lines[i]
|
line = lines[i]
|
||||||
|
|
||||||
# 1. Start eines Blocks erkannt
|
|
||||||
m = _CALLOUT_START.match(line)
|
m = _CALLOUT_START.match(line)
|
||||||
if m:
|
if not m:
|
||||||
|
keep_lines.append(line)
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Callout-Block gefunden. Wir sammeln alle relevanten Zeilen.
|
||||||
block_lines = []
|
block_lines = []
|
||||||
|
|
||||||
|
# Header Content prüfen (z.B. "type" aus "> [!edge] type")
|
||||||
header_raw = m.group(1).strip()
|
header_raw = m.group(1).strip()
|
||||||
if header_raw:
|
if header_raw:
|
||||||
block_lines.append(header_raw)
|
block_lines.append(header_raw)
|
||||||
|
|
||||||
i += 1
|
i += 1
|
||||||
# Sammle Folgezeilen, solange sie mit '>' beginnen UND KEIN neuer Header sind
|
|
||||||
while i < len(lines) and lines[i].lstrip().startswith('>'):
|
while i < len(lines) and lines[i].lstrip().startswith('>'):
|
||||||
# STOP-CHECK: Ist das ein neuer Header?
|
# Entferne '>' und führende Leerzeichen
|
||||||
if _CALLOUT_START.match(lines[i]):
|
|
||||||
break # Breche inneren Loop ab -> Outer Loop behandelt den neuen Header
|
|
||||||
|
|
||||||
content = lines[i].lstrip()[1:].lstrip()
|
content = lines[i].lstrip()[1:].lstrip()
|
||||||
if content:
|
if content:
|
||||||
block_lines.append(content)
|
block_lines.append(content)
|
||||||
i += 1
|
i += 1
|
||||||
|
|
||||||
_process_block(block_lines, out_pairs)
|
# Verarbeitung des Blocks
|
||||||
continue # Weiter im Outer Loop (i steht jetzt auf dem nächsten Header oder Text)
|
current_kind = None
|
||||||
|
|
||||||
# 2. "Headless" Block / Zerschnittener Chunk
|
# Heuristik: Ist die allererste Zeile (meist aus dem Header) ein reiner Typ?
|
||||||
# Wenn Zeile mit '>' beginnt, Links hat, aber wir nicht in einem Header-Block sind
|
# Dann setzen wir diesen als Default für den Block.
|
||||||
if line.lstrip().startswith('>'):
|
if block_lines:
|
||||||
if _WIKILINK_RE.search(line):
|
first = block_lines[0]
|
||||||
block_lines = []
|
# Wenn es NICHT wie "Key: Value" aussieht, aber wie ein Wort:
|
||||||
# Sammeln bis Ende oder neuer Header
|
|
||||||
while i < len(lines) and lines[i].lstrip().startswith('>'):
|
|
||||||
if _CALLOUT_START.match(lines[i]):
|
|
||||||
break
|
|
||||||
|
|
||||||
content = lines[i].lstrip()[1:].lstrip()
|
|
||||||
if content:
|
|
||||||
block_lines.append(content)
|
|
||||||
i += 1
|
|
||||||
|
|
||||||
# Als 'related_to' retten, falls Typ fehlt
|
|
||||||
_process_block(block_lines, out_pairs, default_kind="related_to")
|
|
||||||
continue
|
|
||||||
|
|
||||||
keep_lines.append(line)
|
|
||||||
i += 1
|
|
||||||
|
|
||||||
return out_pairs, "\n".join(keep_lines)
|
|
||||||
|
|
||||||
def _process_block(lines: List[str], out_pairs: List[Tuple[str, str]], default_kind: str = None):
|
|
||||||
"""Parsen eines isolierten Blocks."""
|
|
||||||
current_kind = default_kind
|
|
||||||
|
|
||||||
if lines:
|
|
||||||
first = lines[0]
|
|
||||||
# Ist die erste Zeile ein Typ? (z.B. "based_on")
|
|
||||||
if not _REL_LINE.match(first) and _SIMPLE_KIND.match(first):
|
if not _REL_LINE.match(first) and _SIMPLE_KIND.match(first):
|
||||||
current_kind = first.lower()
|
current_kind = first.lower()
|
||||||
|
|
||||||
for bl in lines:
|
for bl in block_lines:
|
||||||
# Format "kind: [[Target]]"
|
# 1. Prüfen auf explizites "Kind: Targets" (überschreibt Header-Typ für diese Zeile)
|
||||||
mrel = _REL_LINE.match(bl)
|
mrel = _REL_LINE.match(bl)
|
||||||
if mrel:
|
if mrel:
|
||||||
k = mrel.group("kind").strip().lower()
|
line_kind = mrel.group("kind").strip().lower()
|
||||||
targets = mrel.group("targets")
|
targets = mrel.group("targets")
|
||||||
|
|
||||||
|
# Links extrahieren
|
||||||
found = _WIKILINK_RE.findall(targets)
|
found = _WIKILINK_RE.findall(targets)
|
||||||
if found:
|
if found:
|
||||||
for t in found: out_pairs.append((k, t.strip()))
|
for t in found: out_pairs.append((line_kind, t.strip()))
|
||||||
else:
|
else:
|
||||||
|
# Fallback für kommagetrennten Plaintext
|
||||||
for raw in re.split(r"[,;]", targets):
|
for raw in re.split(r"[,;]", targets):
|
||||||
if raw.strip(): out_pairs.append((k, raw.strip()))
|
if raw.strip(): out_pairs.append((line_kind, raw.strip()))
|
||||||
|
|
||||||
|
# Wenn wir eine explizite Zeile gefunden haben, aktualisieren wir NICHT
|
||||||
|
# den current_kind für nachfolgende Zeilen (Design-Entscheidung: lokal scope),
|
||||||
|
# oder wir machen es doch?
|
||||||
|
# Üblicher ist: Header setzt Default, Zeile überschreibt lokal.
|
||||||
|
# Wir lassen current_kind also unangetastet.
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Format "[[Target]]" (nutzt current_kind)
|
# 2. Kein Key:Value Muster -> Prüfen auf Links, die den current_kind nutzen
|
||||||
found = _WIKILINK_RE.findall(bl)
|
found = _WIKILINK_RE.findall(bl)
|
||||||
if found:
|
if found:
|
||||||
if current_kind:
|
if current_kind:
|
||||||
for t in found: out_pairs.append((current_kind, t.strip()))
|
for t in found: out_pairs.append((current_kind, t.strip()))
|
||||||
else:
|
else:
|
||||||
# Fallback ohne Typ
|
# Link ohne Typ und ohne Header-Typ.
|
||||||
for t in found: out_pairs.append(("related_to", t.strip()))
|
# Wird ignoriert oder könnte als 'related_to' fallback dienen.
|
||||||
|
# Aktuell: Ignorieren, um False Positives zu vermeiden.
|
||||||
|
pass
|
||||||
|
|
||||||
|
return out_pairs, "\n".join(keep_lines)
|
||||||
|
|
||||||
def extract_wikilinks(text: str) -> List[str]:
|
def extract_wikilinks(text: str) -> List[str]:
|
||||||
"""Findet Standard-Wikilinks."""
|
"""Findet Standard-Wikilinks [[Target]] oder [[Alias|Target]]."""
|
||||||
if not text: return []
|
if not text: return []
|
||||||
return [m.strip() for m in _WIKILINK_RE.findall(text) if m.strip()]
|
return [m.strip() for m in _WIKILINK_RE.findall(text) if m.strip()]
|
||||||
Loading…
Reference in New Issue
Block a user