293 lines
9.5 KiB
Python
293 lines
9.5 KiB
Python
"""
|
|
FILE: app/core/parser.py
|
|
DESCRIPTION: Liest Markdown-Dateien fehlertolerant (Encoding-Fallback). Trennt Frontmatter (YAML) vom Body.
|
|
WP-22 Erweiterung: Kanten-Extraktion mit Zeilennummern für die EdgeRegistry.
|
|
WP-15b: Implementierung NoteContext und pre_scan_markdown für Pass 1 Ingestion.
|
|
VERSION: 1.9.0
|
|
STATUS: Active
|
|
DEPENDENCIES: yaml, re, dataclasses, json, io, os
|
|
LAST_ANALYSIS: 2025-12-26
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, Optional, Tuple, Iterable, List
|
|
import io
|
|
import json
|
|
import os
|
|
import re
|
|
|
|
try:
|
|
import yaml # PyYAML
|
|
except Exception as e: # pragma: no cover
|
|
yaml = None # Fehler wird zur Laufzeit geworfen, falls wirklich benötigt
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Datamodell
|
|
# ---------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class ParsedNote:
|
|
frontmatter: Dict[str, Any]
|
|
body: str
|
|
path: str
|
|
|
|
@dataclass
|
|
class NoteContext:
|
|
"""Metadaten-Container für den flüchtigen LocalBatchCache (Pass 1)."""
|
|
note_id: str
|
|
title: str
|
|
type: str
|
|
summary: str
|
|
tags: List[str]
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Frontmatter-Erkennung
|
|
# ---------------------------------------------------------------------
|
|
|
|
# Öffentliche Kompatibilitäts-Konstante: frühere Skripte importieren FRONTMATTER_RE
|
|
FRONTMATTER_RE = re.compile(r"^\s*---\s*$") # <— public
|
|
# Zusätzlich interner Alias (falls jemand ihn referenziert)
|
|
FRONTMATTER_END = FRONTMATTER_RE # <— public alias
|
|
|
|
# interne Namen bleiben bestehen
|
|
_FRONTMATTER_HEAD = FRONTMATTER_RE
|
|
_FRONTMATTER_END = FRONTMATTER_RE
|
|
|
|
|
|
def _split_frontmatter(text: str) -> Tuple[Dict[str, Any], str]:
|
|
"""
|
|
Zerlegt Text in (frontmatter: dict, body: str).
|
|
Erkennt Frontmatter nur, wenn die erste Zeile '---' ist und später ein zweites '---' folgt.
|
|
YAML-Fehler im Frontmatter führen NICHT zum Abbruch: es wird dann ein leeres dict benutzt.
|
|
"""
|
|
lines = text.splitlines(True) # keep line endings
|
|
if not lines:
|
|
return {}, ""
|
|
|
|
if not _FRONTMATTER_HEAD.match(lines[0]):
|
|
# kein Frontmatter-Header → gesamter Text ist Body
|
|
return {}, text
|
|
|
|
end_idx = None
|
|
# Suche nach nächstem '---' (max. 2000 Zeilen als Sicherheitslimit)
|
|
for i in range(1, min(len(lines), 2000)):
|
|
if _FRONTMATTER_END.match(lines[i]):
|
|
end_idx = i
|
|
break
|
|
|
|
if end_idx is None:
|
|
# unvollständiger Frontmatter-Block → behandle alles als Body
|
|
return {}, text
|
|
|
|
fm_raw = "".join(lines[1:end_idx])
|
|
body = "".join(lines[end_idx + 1:])
|
|
|
|
data: Dict[str, Any] = {}
|
|
if yaml is None:
|
|
raise RuntimeError("PyYAML ist nicht installiert (pip install pyyaml).")
|
|
|
|
try:
|
|
loaded = yaml.safe_load(fm_raw) or {}
|
|
if isinstance(loaded, dict):
|
|
data = loaded
|
|
else:
|
|
data = {}
|
|
except Exception as e:
|
|
# YAML-Fehler nicht fatal machen
|
|
print(json.dumps({"warn": "frontmatter_yaml_parse_failed", "error": str(e)}))
|
|
data = {}
|
|
|
|
# optionales kosmetisches Trim: eine führende Leerzeile im Body entfernen
|
|
if body.startswith("\n"):
|
|
body = body[1:]
|
|
|
|
return data, body
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Robustes Lesen mit Encoding-Fallback
|
|
# ---------------------------------------------------------------------
|
|
|
|
_FALLBACK_ENCODINGS: Tuple[str, ...] = ("utf-8", "utf-8-sig", "cp1252", "latin-1")
|
|
|
|
|
|
def _read_text_with_fallback(path: str) -> Tuple[str, str, bool]:
|
|
"""
|
|
Liest Datei mit mehreren Decodierungsversuchen.
|
|
Rückgabe: (text, used_encoding, had_fallback)
|
|
- had_fallback=True, falls NICHT 'utf-8' verwendet wurde (oder 'utf-8-sig').
|
|
"""
|
|
last_err: Optional[str] = None
|
|
for enc in _FALLBACK_ENCODINGS:
|
|
try:
|
|
with io.open(path, "r", encoding=enc, errors="strict") as f:
|
|
text = f.read()
|
|
# 'utf-8-sig' zählt hier als Fallback (weil BOM), aber ist unproblematisch
|
|
return text, enc, (enc != "utf-8")
|
|
except UnicodeDecodeError as e:
|
|
last_err = f"{type(e).__name__}: {e}"
|
|
continue
|
|
|
|
# Letzter, extrem defensiver Fallback: Bytes → UTF-8 mit REPLACE (keine Exception)
|
|
with open(path, "rb") as fb:
|
|
raw = fb.read()
|
|
text = raw.decode("utf-8", errors="replace")
|
|
print(json.dumps({
|
|
"path": path,
|
|
"warn": "encoding_fallback_exhausted",
|
|
"info": last_err or "unknown"
|
|
}, ensure_ascii=False))
|
|
return text, "utf-8(replace)", True
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Öffentliche API
|
|
# ---------------------------------------------------------------------
|
|
|
|
def read_markdown(path: str) -> Optional[ParsedNote]:
|
|
"""
|
|
Liest eine Markdown-Datei fehlertolerant.
|
|
"""
|
|
if not os.path.exists(path):
|
|
return None
|
|
|
|
text, enc, had_fb = _read_text_with_fallback(path)
|
|
if had_fb:
|
|
print(json.dumps({"path": path, "warn": "encoding_fallback_used", "used": enc}, ensure_ascii=False))
|
|
|
|
fm, body = _split_frontmatter(text)
|
|
return ParsedNote(frontmatter=fm or {}, body=body or "", path=path)
|
|
|
|
|
|
def pre_scan_markdown(path: str) -> Optional[NoteContext]:
|
|
"""
|
|
WP-15b: Schneller Scan für den LocalBatchCache (Pass 1).
|
|
Extrahiert nur Identität und Kurz-Kontext zur semantischen Validierung.
|
|
"""
|
|
parsed = read_markdown(path)
|
|
if not parsed:
|
|
return None
|
|
|
|
fm = parsed.frontmatter
|
|
# ID-Findung: Frontmatter ID oder Dateiname als Fallback
|
|
note_id = str(fm.get("id") or os.path.splitext(os.path.basename(path))[0])
|
|
|
|
# Erstelle Kurz-Zusammenfassung (erste 500 Zeichen des Body, bereinigt)
|
|
clean_body = re.sub(r'[#*`>]', '', parsed.body[:600]).strip()
|
|
summary = clean_body[:500] + "..." if len(clean_body) > 500 else clean_body
|
|
|
|
return NoteContext(
|
|
note_id=note_id,
|
|
title=str(fm.get("title", note_id)),
|
|
type=str(fm.get("type", "concept")),
|
|
summary=summary,
|
|
tags=fm.get("tags", []) if isinstance(fm.get("tags"), list) else []
|
|
)
|
|
|
|
|
|
def validate_required_frontmatter(fm: Dict[str, Any],
|
|
required: Tuple[str, ...] = ("id", "title")) -> None:
|
|
"""
|
|
Prüft, ob alle Pflichtfelder vorhanden sind.
|
|
"""
|
|
if fm is None:
|
|
fm = {}
|
|
missing = []
|
|
for k in required:
|
|
v = fm.get(k)
|
|
if v is None:
|
|
missing.append(k)
|
|
elif isinstance(v, str) and not v.strip():
|
|
missing.append(k)
|
|
if missing:
|
|
raise ValueError(f"Missing required frontmatter fields: {', '.join(missing)}")
|
|
|
|
if "tags" in fm and fm["tags"] not in (None, "") and not isinstance(fm["tags"], (list, tuple)):
|
|
raise ValueError("frontmatter 'tags' must be a list of strings")
|
|
|
|
|
|
def normalize_frontmatter(fm: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Normalisierung von Tags und anderen Feldern.
|
|
"""
|
|
out = dict(fm or {})
|
|
if "tags" in out:
|
|
if isinstance(out["tags"], str):
|
|
out["tags"] = [out["tags"].strip()] if out["tags"].strip() else []
|
|
elif isinstance(out["tags"], list):
|
|
out["tags"] = [str(t).strip() for t in out["tags"] if t is not None]
|
|
else:
|
|
out["tags"] = [str(out["tags"]).strip()] if out["tags"] not in (None, "") else []
|
|
if "embedding_exclude" in out:
|
|
out["embedding_exclude"] = bool(out["embedding_exclude"])
|
|
return out
|
|
|
|
|
|
# ------------------------------ Wikilinks ---------------------------- #
|
|
|
|
_WIKILINK_RE = re.compile(r"\[\[([^\]]+)\]\]")
|
|
|
|
|
|
def extract_wikilinks(text: str) -> List[str]:
|
|
"""
|
|
Extrahiert Wikilinks als einfache Liste von IDs.
|
|
"""
|
|
if not text:
|
|
return []
|
|
out: List[str] = []
|
|
for m in _WIKILINK_RE.finditer(text):
|
|
raw = (m.group(1) or "").strip()
|
|
if not raw:
|
|
continue
|
|
if "|" in raw:
|
|
raw = raw.split("|", 1)[0].strip()
|
|
if "#" in raw:
|
|
raw = raw.split("#", 1)[0].strip()
|
|
if raw:
|
|
out.append(raw)
|
|
return out
|
|
|
|
|
|
def extract_edges_with_context(parsed: ParsedNote) -> List[Dict[str, Any]]:
|
|
"""
|
|
WP-22: Extrahiert Wikilinks [[Ziel|Typ]] aus dem Body und speichert die Zeilennummer.
|
|
Gibt eine Liste von Dictionaries zurück, die direkt von der Ingestion verarbeitet werden können.
|
|
"""
|
|
edges = []
|
|
if not parsed or not parsed.body:
|
|
return edges
|
|
|
|
# Wir nutzen splitlines(True), um Zeilenumbrüche für die Positionsberechnung zu erhalten,
|
|
# oder einfaches splitlines() für die reine Zeilennummerierung.
|
|
lines = parsed.body.splitlines()
|
|
|
|
for line_num, line_content in enumerate(lines, 1):
|
|
for match in _WIKILINK_RE.finditer(line_content):
|
|
raw = (match.group(1) or "").strip()
|
|
if not raw:
|
|
continue
|
|
|
|
# Syntax: [[Ziel|Typ]]
|
|
if "|" in raw:
|
|
parts = raw.split("|", 1)
|
|
target = parts[0].strip()
|
|
kind = parts[1].strip()
|
|
else:
|
|
target = raw.strip()
|
|
kind = "related_to" # Default-Typ
|
|
|
|
# Anchor (#) entfernen, da Relationen auf Notiz-Ebene (ID) basieren
|
|
if "#" in target:
|
|
target = target.split("#", 1)[0].strip()
|
|
|
|
if target:
|
|
edges.append({
|
|
"to": target,
|
|
"kind": kind,
|
|
"line": line_num,
|
|
"provenance": "explicit"
|
|
})
|
|
return edges |