refacturing parser

This commit is contained in:
Lars 2025-12-27 14:26:42 +01:00
parent e3858e8bc3
commit 21cda0072a
6 changed files with 210 additions and 288 deletions

View File

@ -1,293 +1,22 @@
"""
FILE: app/core/parser.py
DESCRIPTION: Liest Markdown-Dateien fehlertolerant (Encoding-Fallback). Trennt Frontmatter (YAML) vom Body.
WP-22 Erweiterung: Kanten-Extraktion mit Zeilennummern für die EdgeRegistry.
WP-15b: Implementierung NoteContext und pre_scan_markdown für Pass 1 Ingestion.
VERSION: 1.9.0
STATUS: Active
DEPENDENCIES: yaml, re, dataclasses, json, io, os
LAST_ANALYSIS: 2025-12-26
DESCRIPTION: Facade für das Parsing-Package. Stellt 100% Kompatibilität sicher.
WP-14: Modularisierung abgeschlossen.
VERSION: 1.10.0
"""
from __future__ import annotations
from .parsing.parsing_models import ParsedNote, NoteContext
from .parsing.parsing_utils import (
FRONTMATTER_RE, validate_required_frontmatter,
normalize_frontmatter, extract_wikilinks, extract_edges_with_context
)
from .parsing.parsing_markdown import read_markdown
from .parsing.parsing_scanner import pre_scan_markdown
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple, Iterable, List
import io
import json
import os
import re
# Kompatibilitäts-Aliase
FRONTMATTER_END = FRONTMATTER_RE
try:
import yaml # PyYAML
except Exception as e: # pragma: no cover
yaml = None # Fehler wird zur Laufzeit geworfen, falls wirklich benötigt
# ---------------------------------------------------------------------
# Datamodell
# ---------------------------------------------------------------------
@dataclass
class ParsedNote:
frontmatter: Dict[str, Any]
body: str
path: str
@dataclass
class NoteContext:
"""Metadaten-Container für den flüchtigen LocalBatchCache (Pass 1)."""
note_id: str
title: str
type: str
summary: str
tags: List[str]
# ---------------------------------------------------------------------
# Frontmatter-Erkennung
# ---------------------------------------------------------------------
# Öffentliche Kompatibilitäts-Konstante: frühere Skripte importieren FRONTMATTER_RE
FRONTMATTER_RE = re.compile(r"^\s*---\s*$") # <— public
# Zusätzlich interner Alias (falls jemand ihn referenziert)
FRONTMATTER_END = FRONTMATTER_RE # <— public alias
# interne Namen bleiben bestehen
_FRONTMATTER_HEAD = FRONTMATTER_RE
_FRONTMATTER_END = FRONTMATTER_RE
def _split_frontmatter(text: str) -> Tuple[Dict[str, Any], str]:
"""
Zerlegt Text in (frontmatter: dict, body: str).
Erkennt Frontmatter nur, wenn die erste Zeile '---' ist und später ein zweites '---' folgt.
YAML-Fehler im Frontmatter führen NICHT zum Abbruch: es wird dann ein leeres dict benutzt.
"""
lines = text.splitlines(True) # keep line endings
if not lines:
return {}, ""
if not _FRONTMATTER_HEAD.match(lines[0]):
# kein Frontmatter-Header → gesamter Text ist Body
return {}, text
end_idx = None
# Suche nach nächstem '---' (max. 2000 Zeilen als Sicherheitslimit)
for i in range(1, min(len(lines), 2000)):
if _FRONTMATTER_END.match(lines[i]):
end_idx = i
break
if end_idx is None:
# unvollständiger Frontmatter-Block → behandle alles als Body
return {}, text
fm_raw = "".join(lines[1:end_idx])
body = "".join(lines[end_idx + 1:])
data: Dict[str, Any] = {}
if yaml is None:
raise RuntimeError("PyYAML ist nicht installiert (pip install pyyaml).")
try:
loaded = yaml.safe_load(fm_raw) or {}
if isinstance(loaded, dict):
data = loaded
else:
data = {}
except Exception as e:
# YAML-Fehler nicht fatal machen
print(json.dumps({"warn": "frontmatter_yaml_parse_failed", "error": str(e)}))
data = {}
# optionales kosmetisches Trim: eine führende Leerzeile im Body entfernen
if body.startswith("\n"):
body = body[1:]
return data, body
# ---------------------------------------------------------------------
# Robustes Lesen mit Encoding-Fallback
# ---------------------------------------------------------------------
_FALLBACK_ENCODINGS: Tuple[str, ...] = ("utf-8", "utf-8-sig", "cp1252", "latin-1")
def _read_text_with_fallback(path: str) -> Tuple[str, str, bool]:
"""
Liest Datei mit mehreren Decodierungsversuchen.
Rückgabe: (text, used_encoding, had_fallback)
- had_fallback=True, falls NICHT 'utf-8' verwendet wurde (oder 'utf-8-sig').
"""
last_err: Optional[str] = None
for enc in _FALLBACK_ENCODINGS:
try:
with io.open(path, "r", encoding=enc, errors="strict") as f:
text = f.read()
# 'utf-8-sig' zählt hier als Fallback (weil BOM), aber ist unproblematisch
return text, enc, (enc != "utf-8")
except UnicodeDecodeError as e:
last_err = f"{type(e).__name__}: {e}"
continue
# Letzter, extrem defensiver Fallback: Bytes → UTF-8 mit REPLACE (keine Exception)
with open(path, "rb") as fb:
raw = fb.read()
text = raw.decode("utf-8", errors="replace")
print(json.dumps({
"path": path,
"warn": "encoding_fallback_exhausted",
"info": last_err or "unknown"
}, ensure_ascii=False))
return text, "utf-8(replace)", True
# ---------------------------------------------------------------------
# Öffentliche API
# ---------------------------------------------------------------------
def read_markdown(path: str) -> Optional[ParsedNote]:
"""
Liest eine Markdown-Datei fehlertolerant.
"""
if not os.path.exists(path):
return None
text, enc, had_fb = _read_text_with_fallback(path)
if had_fb:
print(json.dumps({"path": path, "warn": "encoding_fallback_used", "used": enc}, ensure_ascii=False))
fm, body = _split_frontmatter(text)
return ParsedNote(frontmatter=fm or {}, body=body or "", path=path)
def pre_scan_markdown(path: str) -> Optional[NoteContext]:
"""
WP-15b: Schneller Scan für den LocalBatchCache (Pass 1).
Extrahiert nur Identität und Kurz-Kontext zur semantischen Validierung.
"""
parsed = read_markdown(path)
if not parsed:
return None
fm = parsed.frontmatter
# ID-Findung: Frontmatter ID oder Dateiname als Fallback
note_id = str(fm.get("id") or os.path.splitext(os.path.basename(path))[0])
# Erstelle Kurz-Zusammenfassung (erste 500 Zeichen des Body, bereinigt)
clean_body = re.sub(r'[#*`>]', '', parsed.body[:600]).strip()
summary = clean_body[:500] + "..." if len(clean_body) > 500 else clean_body
return NoteContext(
note_id=note_id,
title=str(fm.get("title", note_id)),
type=str(fm.get("type", "concept")),
summary=summary,
tags=fm.get("tags", []) if isinstance(fm.get("tags"), list) else []
)
def validate_required_frontmatter(fm: Dict[str, Any],
required: Tuple[str, ...] = ("id", "title")) -> None:
"""
Prüft, ob alle Pflichtfelder vorhanden sind.
"""
if fm is None:
fm = {}
missing = []
for k in required:
v = fm.get(k)
if v is None:
missing.append(k)
elif isinstance(v, str) and not v.strip():
missing.append(k)
if missing:
raise ValueError(f"Missing required frontmatter fields: {', '.join(missing)}")
if "tags" in fm and fm["tags"] not in (None, "") and not isinstance(fm["tags"], (list, tuple)):
raise ValueError("frontmatter 'tags' must be a list of strings")
def normalize_frontmatter(fm: Dict[str, Any]) -> Dict[str, Any]:
"""
Normalisierung von Tags und anderen Feldern.
"""
out = dict(fm or {})
if "tags" in out:
if isinstance(out["tags"], str):
out["tags"] = [out["tags"].strip()] if out["tags"].strip() else []
elif isinstance(out["tags"], list):
out["tags"] = [str(t).strip() for t in out["tags"] if t is not None]
else:
out["tags"] = [str(out["tags"]).strip()] if out["tags"] not in (None, "") else []
if "embedding_exclude" in out:
out["embedding_exclude"] = bool(out["embedding_exclude"])
return out
# ------------------------------ Wikilinks ---------------------------- #
_WIKILINK_RE = re.compile(r"\[\[([^\]]+)\]\]")
def extract_wikilinks(text: str) -> List[str]:
"""
Extrahiert Wikilinks als einfache Liste von IDs.
"""
if not text:
return []
out: List[str] = []
for m in _WIKILINK_RE.finditer(text):
raw = (m.group(1) or "").strip()
if not raw:
continue
if "|" in raw:
raw = raw.split("|", 1)[0].strip()
if "#" in raw:
raw = raw.split("#", 1)[0].strip()
if raw:
out.append(raw)
return out
def extract_edges_with_context(parsed: ParsedNote) -> List[Dict[str, Any]]:
"""
WP-22: Extrahiert Wikilinks [[Ziel|Typ]] aus dem Body und speichert die Zeilennummer.
Gibt eine Liste von Dictionaries zurück, die direkt von der Ingestion verarbeitet werden können.
"""
edges = []
if not parsed or not parsed.body:
return edges
# Wir nutzen splitlines(True), um Zeilenumbrüche für die Positionsberechnung zu erhalten,
# oder einfaches splitlines() für die reine Zeilennummerierung.
lines = parsed.body.splitlines()
for line_num, line_content in enumerate(lines, 1):
for match in _WIKILINK_RE.finditer(line_content):
raw = (match.group(1) or "").strip()
if not raw:
continue
# Syntax: [[Ziel|Typ]]
if "|" in raw:
parts = raw.split("|", 1)
target = parts[0].strip()
kind = parts[1].strip()
else:
target = raw.strip()
kind = "related_to" # Default-Typ
# Anchor (#) entfernen, da Relationen auf Notiz-Ebene (ID) basieren
if "#" in target:
target = target.split("#", 1)[0].strip()
if target:
edges.append({
"to": target,
"kind": kind,
"line": line_num,
"provenance": "explicit"
})
return edges
__all__ = [
"ParsedNote", "NoteContext", "FRONTMATTER_RE", "FRONTMATTER_END",
"read_markdown", "pre_scan_markdown", "validate_required_frontmatter",
"normalize_frontmatter", "extract_wikilinks", "extract_edges_with_context"
]

View File

@ -0,0 +1,17 @@
"""
FILE: app/core/parsing/__init__.py
DESCRIPTION: Package-Exporte für den Parser.
"""
from .parsing_models import ParsedNote, NoteContext
from .parsing_utils import (
FRONTMATTER_RE, validate_required_frontmatter,
normalize_frontmatter, extract_wikilinks, extract_edges_with_context
)
from .parsing_markdown import read_markdown
from .parsing_scanner import pre_scan_markdown
__all__ = [
"ParsedNote", "NoteContext", "FRONTMATTER_RE", "read_markdown",
"pre_scan_markdown", "validate_required_frontmatter",
"normalize_frontmatter", "extract_wikilinks", "extract_edges_with_context"
]

View File

@ -0,0 +1,60 @@
"""
FILE: app/core/parsing/parsing_markdown.py
DESCRIPTION: Fehlertolerantes Einlesen von Markdown und Frontmatter-Splitting.
"""
import io
import os
import json
from typing import Any, Dict, Optional, Tuple
from .parsing_models import ParsedNote
from .parsing_utils import FRONTMATTER_RE
try:
import yaml
except ImportError:
yaml = None
_FALLBACK_ENCODINGS: Tuple[str, ...] = ("utf-8", "utf-8-sig", "cp1252", "latin-1")
def _split_frontmatter(text: str) -> Tuple[Dict[str, Any], str]:
"""Zerlegt Text in Frontmatter-Dict und Body."""
lines = text.splitlines(True)
if not lines or not FRONTMATTER_RE.match(lines[0]):
return {}, text
end_idx = None
for i in range(1, min(len(lines), 2000)):
if FRONTMATTER_RE.match(lines[i]):
end_idx = i
break
if end_idx is None: return {}, text
fm_raw = "".join(lines[1:end_idx])
body = "".join(lines[end_idx + 1:])
if yaml is None: raise RuntimeError("PyYAML not installed.")
try:
loaded = yaml.safe_load(fm_raw) or {}
data = loaded if isinstance(loaded, dict) else {}
except Exception as e:
print(json.dumps({"warn": "frontmatter_yaml_parse_failed", "error": str(e)}))
data = {}
if body.startswith("\n"): body = body[1:]
return data, body
def _read_text_with_fallback(path: str) -> Tuple[str, str, bool]:
"""Liest Datei mit Encoding-Fallback-Kette."""
last_err = None
for enc in _FALLBACK_ENCODINGS:
try:
with io.open(path, "r", encoding=enc, errors="strict") as f:
return f.read(), enc, (enc != "utf-8")
except UnicodeDecodeError as e:
last_err = str(e); continue
with open(path, "rb") as fb:
text = fb.read().decode("utf-8", errors="replace")
return text, "utf-8(replace)", True
def read_markdown(path: str) -> Optional[ParsedNote]:
"""Öffentliche API zum Einlesen einer Datei."""
if not os.path.exists(path): return None
text, enc, had_fb = _read_text_with_fallback(path)
fm, body = _split_frontmatter(text)
return ParsedNote(frontmatter=fm or {}, body=body or "", path=path)

View File

@ -0,0 +1,22 @@
"""
FILE: app/core/parsing/parsing_models.py
DESCRIPTION: Datenklassen für das Parsing-System.
"""
from dataclasses import dataclass
from typing import Any, Dict, List
@dataclass
class ParsedNote:
"""Container für eine vollständig eingelesene Markdown-Datei."""
frontmatter: Dict[str, Any]
body: str
path: str
@dataclass
class NoteContext:
"""Metadaten-Container für den flüchtigen LocalBatchCache (Pass 1)."""
note_id: str
title: str
type: str
summary: str
tags: List[str]

View File

@ -0,0 +1,25 @@
"""
FILE: app/core/parsing/parsing_scanner.py
DESCRIPTION: Pre-Scan für den LocalBatchCache (Pass 1).
"""
import os
import re
from typing import Optional
from .parsing_models import NoteContext
from .parsing_markdown import read_markdown
def pre_scan_markdown(path: str) -> Optional[NoteContext]:
"""Extrahiert Identität und Kurz-Kontext zur Validierung."""
parsed = read_markdown(path)
if not parsed: return None
fm = parsed.frontmatter
note_id = str(fm.get("id") or os.path.splitext(os.path.basename(path))[0])
clean_body = re.sub(r'[#*`>]', '', parsed.body[:600]).strip()
summary = clean_body[:500] + "..." if len(clean_body) > 500 else clean_body
return NoteContext(
note_id=note_id,
title=str(fm.get("title", note_id)),
type=str(fm.get("type", "concept")),
summary=summary,
tags=fm.get("tags", []) if isinstance(fm.get("tags"), list) else []
)

View File

@ -0,0 +1,69 @@
"""
FILE: app/core/parsing/parsing_utils.py
DESCRIPTION: Werkzeuge zur Validierung, Normalisierung und Wikilink-Extraktion.
"""
import re
from typing import Any, Dict, List, Tuple, Optional
from .parsing_models import ParsedNote
# Öffentliche Konstanten für Abwärtskompatibilität
FRONTMATTER_RE = re.compile(r"^\s*---\s*$")
_WIKILINK_RE = re.compile(r"\[\[([^\]]+)\]\]")
def validate_required_frontmatter(fm: Dict[str, Any], required: Tuple[str, ...] = ("id", "title")) -> None:
"""Prüft, ob alle Pflichtfelder vorhanden sind."""
if fm is None: fm = {}
missing = []
for k in required:
v = fm.get(k)
if v is None or (isinstance(v, str) and not v.strip()):
missing.append(k)
if missing:
raise ValueError(f"Missing required frontmatter fields: {', '.join(missing)}")
if "tags" in fm and fm["tags"] not in (None, "") and not isinstance(fm["tags"], (list, tuple)):
raise ValueError("frontmatter 'tags' must be a list of strings")
def normalize_frontmatter(fm: Dict[str, Any]) -> Dict[str, Any]:
"""Normalisierung von Tags und Boolean-Feldern."""
out = dict(fm or {})
if "tags" in out:
if isinstance(out["tags"], str):
out["tags"] = [out["tags"].strip()] if out["tags"].strip() else []
elif isinstance(out["tags"], list):
out["tags"] = [str(t).strip() for t in out["tags"] if t is not None]
else:
out["tags"] = [str(out["tags"]).strip()] if out["tags"] not in (None, "") else []
if "embedding_exclude" in out:
out["embedding_exclude"] = bool(out["embedding_exclude"])
return out
def extract_wikilinks(text: str) -> List[str]:
"""Extrahiert Wikilinks als einfache Liste von IDs."""
if not text: return []
out: List[str] = []
for m in _WIKILINK_RE.finditer(text):
raw = (m.group(1) or "").strip()
if not raw: continue
if "|" in raw: raw = raw.split("|", 1)[0].strip()
if "#" in raw: raw = raw.split("#", 1)[0].strip()
if raw: out.append(raw)
return out
def extract_edges_with_context(parsed: ParsedNote) -> List[Dict[str, Any]]:
"""WP-22: Extrahiert Wikilinks mit Zeilennummern für die EdgeRegistry."""
edges = []
if not parsed or not parsed.body: return edges
lines = parsed.body.splitlines()
for line_num, line_content in enumerate(lines, 1):
for match in _WIKILINK_RE.finditer(line_content):
raw = (match.group(1) or "").strip()
if not raw: continue
if "|" in raw:
parts = raw.split("|", 1)
target, kind = parts[0].strip(), parts[1].strip()
else:
target, kind = raw.strip(), "related_to"
if "#" in target: target = target.split("#", 1)[0].strip()
if target:
edges.append({"to": target, "kind": kind, "line": line_num, "provenance": "explicit"})
return edges