mindnet/app/core/parser.py
2025-12-23 07:31:33 +01:00

250 lines
8.5 KiB
Python

"""
FILE: app/core/parser.py
DESCRIPTION: Liest Markdown-Dateien fehlertolerant (Encoding-Fallback). Trennt Frontmatter (YAML) vom Body.
VERSION: 1.7.1
STATUS: Active
DEPENDENCIES: yaml, re, dataclasses, json, io, os
LAST_ANALYSIS: 2025-12-15
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple, Iterable, List
import io
import json
import os
import re
try:
import yaml # PyYAML
except Exception as e: # pragma: no cover
yaml = None # Fehler wird zur Laufzeit geworfen, falls wirklich benötigt
# ---------------------------------------------------------------------
# Datamodell
# ---------------------------------------------------------------------
@dataclass
class ParsedNote:
frontmatter: Dict[str, Any]
body: str
path: str
# ---------------------------------------------------------------------
# Frontmatter-Erkennung
# ---------------------------------------------------------------------
# Öffentliche Kompatibilitäts-Konstante: frühere Skripte importieren FRONTMATTER_RE
FRONTMATTER_RE = re.compile(r"^\s*---\s*$") # <— public
# Zusätzlich interner Alias (falls jemand ihn referenziert)
FRONTMATTER_END = FRONTMATTER_RE # <— public alias
# interne Namen bleiben bestehen
_FRONTMATTER_HEAD = FRONTMATTER_RE
_FRONTMATTER_END = FRONTMATTER_RE
def _split_frontmatter(text: str) -> Tuple[Dict[str, Any], str]:
"""
Zerlegt Text in (frontmatter: dict, body: str).
Erkennt Frontmatter nur, wenn die erste Zeile '---' ist und später ein zweites '---' folgt.
YAML-Fehler im Frontmatter führen NICHT zum Abbruch: es wird dann ein leeres dict benutzt.
"""
lines = text.splitlines(True) # keep line endings
if not lines:
return {}, ""
if not _FRONTMATTER_HEAD.match(lines[0]):
# kein Frontmatter-Header → gesamter Text ist Body
return {}, text
end_idx = None
# Suche nach nächstem '---' (max. 2000 Zeilen als Sicherheitslimit)
for i in range(1, min(len(lines), 2000)):
if _FRONTMATTER_END.match(lines[i]):
end_idx = i
break
if end_idx is None:
# unvollständiger Frontmatter-Block → behandle alles als Body
return {}, text
fm_raw = "".join(lines[1:end_idx])
body = "".join(lines[end_idx + 1:])
data: Dict[str, Any] = {}
if yaml is None:
raise RuntimeError("PyYAML ist nicht installiert (pip install pyyaml).")
try:
loaded = yaml.safe_load(fm_raw) or {}
if isinstance(loaded, dict):
data = loaded
else:
data = {}
except Exception as e:
# YAML-Fehler nicht fatal machen
print(json.dumps({"warn": "frontmatter_yaml_parse_failed", "error": str(e)}))
data = {}
# optionales kosmetisches Trim: eine führende Leerzeile im Body entfernen
if body.startswith("\n"):
body = body[1:]
return data, body
# ---------------------------------------------------------------------
# Robustes Lesen mit Encoding-Fallback
# ---------------------------------------------------------------------
_FALLBACK_ENCODINGS: Tuple[str, ...] = ("utf-8", "utf-8-sig", "cp1252", "latin-1")
def _read_text_with_fallback(path: str) -> Tuple[str, str, bool]:
"""
Liest Datei mit mehreren Decodierungsversuchen.
Rückgabe: (text, used_encoding, had_fallback)
- had_fallback=True, falls NICHT 'utf-8' verwendet wurde (oder 'utf-8-sig').
"""
last_err: Optional[str] = None
for enc in _FALLBACK_ENCODINGS:
try:
with io.open(path, "r", encoding=enc, errors="strict") as f:
text = f.read()
# 'utf-8-sig' zählt hier als Fallback (weil BOM), aber ist unproblematisch
return text, enc, (enc != "utf-8")
except UnicodeDecodeError as e:
last_err = f"{type(e).__name__}: {e}"
continue
# Letzter, extrem defensiver Fallback: Bytes → UTF-8 mit REPLACE (keine Exception)
with open(path, "rb") as fb:
raw = fb.read()
text = raw.decode("utf-8", errors="replace")
print(json.dumps({
"path": path,
"warn": "encoding_fallback_exhausted",
"info": last_err or "unknown"
}, ensure_ascii=False))
return text, "utf-8(replace)", True
# ---------------------------------------------------------------------
# Öffentliche API
# ---------------------------------------------------------------------
def read_markdown(path: str) -> Optional[ParsedNote]:
"""
Liest eine Markdown-Datei fehlertolerant:
- Erlaubt verschiedene Encodings (UTF-8 bevorzugt, cp1252/latin-1 als Fallback).
- Schlägt NICHT mit UnicodeDecodeError fehl.
- Gibt ParsedNote(frontmatter, body, path) zurück oder None, falls die Datei nicht existiert.
Bei Decoding-Fallback wird ein JSON-Warnhinweis geloggt:
{"path": "...", "warn": "encoding_fallback_used", "used": "cp1252"}
"""
if not os.path.exists(path):
return None
text, enc, had_fb = _read_text_with_fallback(path)
if had_fb:
print(json.dumps({"path": path, "warn": "encoding_fallback_used", "used": enc}, ensure_ascii=False))
fm, body = _split_frontmatter(text)
return ParsedNote(frontmatter=fm or {}, body=body or "", path=path)
def validate_required_frontmatter(fm: Dict[str, Any],
required: Tuple[str, ...] = ("id", "title")) -> None:
"""
Prüft, ob alle Pflichtfelder vorhanden sind.
Default-kompatibel: ('id', 'title'), kann aber vom Aufrufer erweitert werden, z. B.:
validate_required_frontmatter(fm, required=("id","title","type","status","created"))
Hebt ValueError, falls Felder fehlen oder leer sind.
"""
if fm is None:
fm = {}
missing = []
for k in required:
v = fm.get(k)
if v is None:
missing.append(k)
elif isinstance(v, str) and not v.strip():
missing.append(k)
if missing:
raise ValueError(f"Missing required frontmatter fields: {', '.join(missing)}")
# Plausibilitäten: 'tags' sollte eine Liste sein, wenn vorhanden
if "tags" in fm and fm["tags"] not in (None, "") and not isinstance(fm["tags"], (list, tuple)):
raise ValueError("frontmatter 'tags' must be a list of strings")
def normalize_frontmatter(fm: Dict[str, Any]) -> Dict[str, Any]:
"""
Sanfte Normalisierung ohne Semantikänderung:
- 'tags' → Liste von Strings (Trim)
- 'embedding_exclude' → bool
- andere Felder unverändert
"""
out = dict(fm or {})
if "tags" in out:
if isinstance(out["tags"], str):
out["tags"] = [out["tags"].strip()] if out["tags"].strip() else []
elif isinstance(out["tags"], list):
out["tags"] = [str(t).strip() for t in out["tags"] if t is not None]
else:
out["tags"] = [str(out["tags"]).strip()] if out["tags"] not in (None, "") else []
if "embedding_exclude" in out:
out["embedding_exclude"] = bool(out["embedding_exclude"])
return out
# ------------------------------ Wikilinks ---------------------------- #
# Basismuster für [[...]]; die Normalisierung (id vor '#', vor '|') macht extract_wikilinks
_WIKILINK_RE = re.compile(r"\[\[([^\]]+)\]\]")
def extract_wikilinks(text: str) -> List[str]:
"""
Extrahiert Wikilinks wie [[id]], [[id#anchor]], [[id|label]], [[id#anchor|label]].
Rückgabe sind NUR die Ziel-IDs (ohne Anchor/Label), führend/folgend getrimmt.
Keine aggressive Slug-Normalisierung (die kann später im Resolver erfolgen).
"""
if not text:
return []
out: List[str] = []
for m in _WIKILINK_RE.finditer(text):
raw = (m.group(1) or "").strip()
if not raw:
continue
# Split an Pipe (Label) → links vor '|'
if "|" in raw:
raw = raw.split("|", 1)[0].strip()
# Split an Anchor
if "#" in raw:
raw = raw.split("#", 1)[0].strip()
if raw:
out.append(raw)
return out
def extract_edges_with_context(note: ParsedNote) -> List[Dict[str, Any]]:
"""Extrahiert Kanten-Typen, Ziele und Zeilennummern."""
edges = []
lines = note.body.splitlines()
# Erkennt [[rel:typ Ziel]]
rel_pattern = re.compile(r"\[\[rel:([a-zA-Z0-9_-]+)\s+([^\]|#]+)\]\]")
for i, line in enumerate(lines):
for match in rel_pattern.finditer(line):
edges.append({
"kind": match.group(1).strip(),
"target": match.group(2).strip(),
"line": i + 1,
"provenance": "explicit"
})
return edges