neuer Importer mit eigenem Service

This commit is contained in:
Lars 2025-12-10 21:41:43 +01:00
parent 62aa24a4bc
commit 865b261294
2 changed files with 309 additions and 575 deletions

264
app/core/ingestion.py Normal file
View File

@ -0,0 +1,264 @@
"""
app/core/ingestion.py
Zentraler Service für die Transformation von Markdown-Dateien in Qdrant-Objekte (Notes, Chunks, Edges).
Dient als Shared Logic für:
1. CLI-Imports (scripts/import_markdown.py)
2. API-Uploads (WP-11)
"""
import os
import json
from typing import Dict, List, Optional, Tuple, Any, Set
from app.core.parser import (
read_markdown,
normalize_frontmatter,
validate_required_frontmatter,
)
from app.core.note_payload import make_note_payload
from app.core.chunker import assemble_chunks
from app.core.chunk_payload import make_chunk_payloads
# Fallback Imports wie im Original-Skript
try:
from app.core.derive_edges import build_edges_for_note
except ImportError:
from app.core.edges import build_edges_for_note # type: ignore
from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.qdrant_points import (
points_for_chunks,
points_for_note,
points_for_edges,
upsert_batch,
)
# Optionales Embedding
try:
from app.core.embed import embed_texts
except ImportError:
embed_texts = None
# --- Helper für Type-Registry (ausgelagert aus Script) ---
def load_type_registry(custom_path: Optional[str] = None) -> dict:
import yaml
path = custom_path or os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
if not os.path.exists(path):
# Fallback auf Root-Ebene (für Tests/CLI)
if os.path.exists("types.yaml"):
path = "types.yaml"
else:
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception:
return {}
def resolve_note_type(requested: Optional[str], reg: dict) -> str:
types = reg.get("types", {})
if requested and requested in types:
return requested
return "concept" # Default Fallback
def effective_chunk_profile(note_type: str, reg: dict) -> str:
# 1. Specific Type
t_cfg = reg.get("types", {}).get(note_type, {})
if t_cfg and t_cfg.get("chunk_profile"):
return t_cfg.get("chunk_profile")
# 2. Defaults
return reg.get("defaults", {}).get("chunk_profile", "default")
def effective_retriever_weight(note_type: str, reg: dict) -> float:
t_cfg = reg.get("types", {}).get(note_type, {})
if t_cfg and "retriever_weight" in t_cfg:
return float(t_cfg["retriever_weight"])
return float(reg.get("defaults", {}).get("retriever_weight", 1.0))
class IngestionService:
def __init__(self, collection_prefix: str = "mindnet"):
self.prefix = collection_prefix
self.cfg = QdrantConfig.from_env()
self.cfg.prefix = collection_prefix # Override env if needed
self.client = get_client(self.cfg)
self.dim = self.cfg.dim
# Registry laden
self.registry = load_type_registry()
# Init DB Checks
ensure_collections(self.client, self.prefix, self.dim)
ensure_payload_indexes(self.client, self.prefix)
def process_file(
self,
file_path: str,
vault_root: str,
force_replace: bool = False,
apply: bool = False,
purge_before: bool = False,
note_scope_refs: bool = False,
hash_mode: str = "body",
hash_source: str = "parsed",
hash_normalize: str = "canonical"
) -> Dict[str, Any]:
"""
Verarbeitet eine einzelne Datei.
Return: Summary Dict (Erfolg, Änderungen, Stats).
"""
result = {
"path": file_path,
"status": "skipped",
"changed": False,
"error": None
}
# 1. Parse & Frontmatter
try:
parsed = read_markdown(file_path)
if not parsed:
return {**result, "error": "Empty or unreadable file"}
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
except Exception as e:
return {**result, "error": f"Validation failed: {str(e)}"}
# 2. Type & Config Resolution
note_type = resolve_note_type(fm.get("type"), self.registry)
fm["type"] = note_type
fm["chunk_profile"] = effective_chunk_profile(note_type, self.registry)
# Weight Resolution (Frontmatter override > Registry)
weight = fm.get("retriever_weight")
if weight is None:
weight = effective_retriever_weight(note_type, self.registry)
fm["retriever_weight"] = float(weight)
# 3. Build Note Payload
try:
note_pl = make_note_payload(
parsed,
vault_root=vault_root,
hash_mode=hash_mode,
hash_normalize=hash_normalize,
hash_source=hash_source,
file_path=file_path
)
# Ensure fulltext & weight
if not note_pl.get("fulltext"):
note_pl["fulltext"] = getattr(parsed, "body", "") or ""
note_pl["retriever_weight"] = fm["retriever_weight"]
note_id = note_pl["note_id"]
except Exception as e:
return {**result, "error": f"Payload build failed: {str(e)}"}
# 4. Change Detection (Hash Check)
# Wir holen den alten Payload aus Qdrant, wenn wir nicht forcen
old_payload = None
if not force_replace:
old_payload = self._fetch_note_payload(note_id)
has_old = old_payload is not None
key_current = f"{hash_mode}:{hash_source}:{hash_normalize}"
old_hash = (old_payload or {}).get("hashes", {}).get(key_current)
new_hash = note_pl.get("hashes", {}).get(key_current)
hash_changed = (old_hash != new_hash)
# Artefakte prüfen (Chunks/Edges)
chunks_missing, edges_missing = self._artifacts_missing(note_id)
should_write = force_replace or (not has_old) or hash_changed or chunks_missing or edges_missing
if not should_write:
return {**result, "status": "unchanged", "note_id": note_id}
if not apply:
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
# 5. Processing (Chunking, Embedding, Edges)
try:
body_text = getattr(parsed, "body", "") or ""
chunks = assemble_chunks(fm["id"], body_text, fm["type"])
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
# Embeddings
vecs = []
if embed_texts and chunk_pls:
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
vecs = embed_texts(texts)
else:
vecs = [[0.0] * self.dim for _ in chunk_pls]
# Edges
note_refs = note_pl.get("references") or []
edges = build_edges_for_note(
note_id,
chunk_pls,
note_level_references=note_refs,
include_note_scope_refs=note_scope_refs
)
except Exception as e:
return {**result, "error": f"Processing failed: {str(e)}"}
# 6. Upsert Action
if purge_before and has_old:
self._purge_artifacts(note_id)
# Upsert Note
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
upsert_batch(self.client, n_name, n_pts)
# Upsert Chunks
if chunk_pls:
c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)
upsert_batch(self.client, c_name, c_pts)
# Upsert Edges
if edges:
e_name, e_pts = points_for_edges(self.prefix, edges)
upsert_batch(self.client, e_name, e_pts)
return {
"path": file_path,
"status": "success",
"changed": True,
"note_id": note_id,
"chunks_count": len(chunk_pls),
"edges_count": len(edges)
}
# --- Interne Qdrant Helper ---
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
from qdrant_client.http import models as rest
col = f"{self.prefix}_notes"
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
pts, _ = self.client.scroll(collection_name=col, scroll_filter=f, limit=1, with_payload=True)
return pts[0].payload if pts else None
def _artifacts_missing(self, note_id: str) -> Tuple[bool, bool]:
from qdrant_client.http import models as rest
c_col = f"{self.prefix}_chunks"
e_col = f"{self.prefix}_edges"
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
# Check Chunks
c_pts, _ = self.client.scroll(collection_name=c_col, scroll_filter=f, limit=1)
# Check Edges
e_pts, _ = self.client.scroll(collection_name=e_col, scroll_filter=f, limit=1)
return (not bool(c_pts)), (not bool(e_pts))
def _purge_artifacts(self, note_id: str):
from qdrant_client.http import models as rest
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
selector = rest.FilterSelector(filter=f)
for suffix in ["chunks", "edges"]:
try:
self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector)
except Exception:
pass

View File

@ -1,593 +1,63 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" """
scripts/import_markdown.py scripts/import_markdown.py
Refactored CLI-Wrapper für den IngestionService.
Zweck
-----
- Liest Markdown-Notizen aus einem Vault ein
- Erzeugt Note-Payload, Chunk-Payloads (+ optionale Embeddings) und Edges
- Schreibt alles idempotent in Qdrant (Notes, Chunks, Edges)
- Integriert eine optionale Type-Registry (types.yaml), um z. B. chunk_profile
und retriever_weight pro Notiz-Typ zu steuern.
Wesentliche Fixes ggü. vorherigen fehlerhaften Ständen
------------------------------------------------------
- `embed_texts` wird optional importiert und defensiv geprüft (kein NameError mehr)
- `effective_chunk_profile` / `effective_retriever_weight` und Registry-Helfer
sind VOR `main()` definiert (kein NameError mehr)
- `retriever_weight` wird in Note- und Chunk-Payload zuverlässig gesetzt
- Robuste Kantenbildung; Fehler bei Edges blockieren Notes/Chunks nicht
- Korrekte Verwendung von `scroll_filter` beim Qdrant-Client
- `--purge-before-upsert` entfernt alte Chunks/Edges einer Note vor dem Upsert
Qdrant / ENV
------------
- QDRANT_URL | QDRANT_HOST/QDRANT_PORT | QDRANT_API_KEY
- COLLECTION_PREFIX (Default: mindnet), via --prefix überschreibbar
- VECTOR_DIM (Default: 384)
- MINDNET_NOTE_SCOPE_REFS: true|false (Default: false)
- MINDNET_TYPES_FILE: Pfad zu types.yaml (optional; Default: ./types.yaml)
Beispiele
---------
# Standard (Body, parsed, canonical)
python3 -m scripts.import_markdown --vault ./vault
# Erstimport nach truncate (Create-Fall)
python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert
# Nur eine Datei (Diagnose)
python3 -m scripts.import_markdown --vault ./vault --only-path ./vault/30_projects/project-demo.md --apply
# Sync-Deletes (Dry-Run → Apply)
python3 -m scripts.import_markdown --vault ./vault --sync-deletes
python3 -m scripts.import_markdown --vault ./vault --sync-deletes --apply
""" """
from __future__ import annotations
import argparse import argparse
import json
import os import os
import json
import sys import sys
from typing import Dict, List, Optional, Tuple, Any, Set
from dotenv import load_dotenv from dotenv import load_dotenv
from qdrant_client.http import models as rest from app.core.ingestion import IngestionService
# --- Projekt-Imports --- def iter_md(root: str):
from app.core.parser import ( out = []
read_markdown, for dp, _, fns in os.walk(root):
normalize_frontmatter, for fn in fns:
validate_required_frontmatter, if fn.endswith(".md") and "/.obsidian/" not in dp:
) out.append(os.path.join(dp, fn).replace("\\", "/"))
from app.core.note_payload import make_note_payload
from app.core.chunker import assemble_chunks
from app.core.chunk_payload import make_chunk_payloads
try:
from app.core.derive_edges import build_edges_for_note
except Exception: # pragma: no cover
from app.core.edges import build_edges_for_note # type: ignore
from app.core.qdrant import (
QdrantConfig,
get_client,
ensure_collections,
ensure_payload_indexes,
)
from app.core.qdrant_points import (
points_for_chunks,
points_for_note,
points_for_edges,
upsert_batch,
)
# embeddings sind optional (z. B. im reinen Payload-Backfill)
try:
from app.core.embed import embed_texts # optional
except Exception: # pragma: no cover
embed_texts = None
# ---------------------------------------------------------------------
# Type-Registry (types.yaml) Helper (robust, optional)
# ---------------------------------------------------------------------
def _env(name: str, default: Optional[str] = None) -> Optional[str]:
v = os.getenv(name)
return v if v is not None else default
def _load_json_or_yaml(path: str) -> dict:
import io
data: dict = {}
if not path or not os.path.exists(path):
return data
try:
import yaml # type: ignore
with io.open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
if not isinstance(data, dict):
return {}
return data
except Exception:
# YAML evtl. nicht installiert versuche JSON
try:
with io.open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
return {}
return data
except Exception:
return {}
def load_type_registry() -> dict:
# Reihenfolge: ENV > ./types.yaml (im aktuellen Arbeitsverzeichnis)
p = _env("MINDNET_TYPES_FILE", None)
if p and os.path.exists(p):
return _load_json_or_yaml(p)
fallback = os.path.abspath("./config/types.yaml") if os.path.exists("./config/types.yaml") else os.path.abspath("./types.yaml")
return _load_json_or_yaml(fallback)
def get_type_config(note_type: Optional[str], reg: dict) -> dict:
if not reg or not isinstance(reg, dict):
return {}
types = reg.get("types", {}) if isinstance(reg.get("types"), dict) else {}
if note_type and isinstance(note_type, str) and note_type in types:
return types[note_type] or {}
# Fallback: concept
return types.get("concept", {}) or {}
def resolve_note_type(requested: Optional[str], reg: dict) -> str:
if requested and isinstance(requested, str):
return requested
# Fallback wenn nichts gesetzt ist
types = reg.get("types", {}) if isinstance(reg.get("types"), dict) else {}
return "concept" if "concept" in types else (requested or "concept")
def effective_chunk_profile(note_type: str, reg: dict) -> Optional[str]:
"""Resolve chunk_profile for type or from defaults/global.
Accepts symbolic profiles: short|medium|long|default.
"""
cfg = get_type_config(note_type, reg)
prof = (cfg.get("chunk_profile") if isinstance(cfg, dict) else None)
if isinstance(prof, str) and prof:
return prof
# defaults fallbacks
for key in ("defaults", "default", "global"):
dcfg = reg.get(key) if isinstance(reg, dict) else None
if isinstance(dcfg, dict):
dprof = dcfg.get("chunk_profile")
if isinstance(dprof, str) and dprof:
return dprof
return "default"
def effective_retriever_weight(note_type: str, reg: dict) -> Optional[float]:
"""Resolve retriever_weight for type or defaults; returns float.
"""
cfg = get_type_config(note_type, reg)
w = (cfg.get("retriever_weight") if isinstance(cfg, dict) else None)
try:
if w is not None:
return float(w)
except Exception:
pass
# defaults fallbacks
for key in ("defaults", "default", "global"):
dcfg = reg.get(key) if isinstance(reg, dict) else None
if isinstance(dcfg, dict):
dw = dcfg.get("retriever_weight")
try:
if dw is not None:
return float(dw)
except Exception:
pass
return 1.0
# ---------------------------------------------------------------------
# Sonstige Helper
# ---------------------------------------------------------------------
def iter_md(root: str) -> List[str]:
out: List[str] = []
for dirpath, _, filenames in os.walk(root):
for fn in filenames:
if not fn.lower().endswith(".md"):
continue
p = os.path.join(dirpath, fn)
pn = p.replace("\\", "/")
if any(ex in pn for ex in ["/.obsidian/", "/_backup_frontmatter/", "/_imported/"]):
continue
out.append(p)
return sorted(out) return sorted(out)
def collections(prefix: str) -> Tuple[str, str, str]: def main():
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
def fetch_existing_note_payload(client, prefix: str, note_id: str) -> Optional[Dict]:
notes_col, _, _ = collections(prefix)
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
points, _ = client.scroll(
collection_name=notes_col,
scroll_filter=f, # wichtig: scroll_filter (nicht: filter)
with_payload=True,
with_vectors=False,
limit=1,
)
if not points:
return None
return points[0].payload or {}
def list_qdrant_note_ids(client, prefix: str) -> Set[str]:
notes_col, _, _ = collections(prefix)
out: Set[str] = set()
next_page = None
while True:
pts, next_page = client.scroll(
collection_name=notes_col,
with_payload=True,
with_vectors=False,
limit=256,
offset=next_page,
)
if not pts:
break
for p in pts:
pl = p.payload or {}
nid = pl.get("note_id")
if isinstance(nid, str):
out.add(nid)
if next_page is None:
break
return out
def purge_note_artifacts(client, prefix: str, note_id: str) -> None:
_, chunks_col, edges_col = collections(prefix)
filt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
for col in (chunks_col, edges_col):
try:
client.delete(
collection_name=col,
points_selector=rest.FilterSelector(filter=filt),
wait=True
)
except Exception as e:
print(json.dumps({"note_id": note_id, "warn": f"delete in {col} via filter failed: {e}"}))
def delete_note_everywhere(client, prefix: str, note_id: str) -> None:
notes_col, chunks_col, edges_col = collections(prefix)
filt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
for col in (edges_col, chunks_col, notes_col):
try:
client.delete(
collection_name=col,
points_selector=rest.FilterSelector(filter=filt),
wait=True
)
except Exception as e:
print(json.dumps({"note_id": note_id, "warn": f"delete in {col} failed: {e}"}))
# --- Neu: Existenz-Checks für Artefakte (fehlertoleranter Rebuild) ---
def _has_any_point(client, collection: str, note_id: str) -> bool:
"""Prüft, ob es mind. einen Punkt mit note_id in der Collection gibt."""
filt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
pts, _ = client.scroll(
collection_name=collection,
scroll_filter=filt,
with_payload=False,
with_vectors=False,
limit=1,
)
return bool(pts)
def artifacts_missing(client, prefix: str, note_id: str) -> Tuple[bool, bool]:
"""Gibt (chunks_missing, edges_missing) zurück."""
_, chunks_col, edges_col = collections(prefix)
chunks_missing = not _has_any_point(client, chunks_col, note_id)
edges_missing = not _has_any_point(client, edges_col, note_id)
return chunks_missing, edges_missing
# ---------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------
def _resolve_mode(m: Optional[str]) -> str:
m = (m or "body").strip().lower()
return m if m in {"body", "frontmatter", "full"} else "body"
def main() -> None:
load_dotenv() load_dotenv()
ap = argparse.ArgumentParser()
ap = argparse.ArgumentParser( ap.add_argument("--vault", required=True)
prog="scripts.import_markdown", ap.add_argument("--apply", action="store_true")
description="Importiert Markdown-Notizen in Qdrant (Notes/Chunks/Edges)." ap.add_argument("--purge-before-upsert", action="store_true")
) ap.add_argument("--force-replace", action="store_true")
ap.add_argument("--vault", required=True, help="Pfad zum Vault (Ordner mit .md-Dateien)") ap.add_argument("--prefix", default="mindnet")
ap.add_argument("--only-path", help="Nur diese Datei verarbeiten (absolut oder relativ)") # Weitere Argumente (compat) können hier hinzugefügt werden, wenn nötig
ap.add_argument("--apply", action="store_true", help="Schreibt nach Qdrant (sonst Dry-Run)")
ap.add_argument("--purge-before-upsert", action="store_true", help="Alte Chunks/Edges der Note vorher löschen")
ap.add_argument("--force-replace", action="store_true", help="Note/Chunks/Edges unabhängig von Hash neu schreiben")
ap.add_argument("--note-id", help="Nur Notes mit dieser ID verarbeiten (Filter)")
ap.add_argument("--note-scope-refs", action="store_true", help="Note-scope References/Backlinks erzeugen")
ap.add_argument("--hash-mode", help="body|frontmatter|full (Default body)")
ap.add_argument("--hash-source", help="parsed|raw (Default parsed)")
ap.add_argument("--hash-normalize", help="canonical|none (Default canonical)")
ap.add_argument("--compare-text", action="store_true", help="Parsed fulltext zusätzlich direkt vergleichen")
ap.add_argument("--baseline-modes", action="store_true", help="Fehlende Hash-Varianten still nachtragen (Notes)")
ap.add_argument("--sync-deletes", action="store_true", help="Qdrant->Vault Lösch-Sync (Dry-Run; mit --apply ausführen)")
ap.add_argument("--prefix", help="Collection-Prefix (überschreibt ENV COLLECTION_PREFIX)")
args = ap.parse_args() args = ap.parse_args()
mode = _resolve_mode(args.hash_mode) # body|frontmatter|full print(f"Init IngestionService (Prefix: {args.prefix})...")
src = _env("MINDNET_HASH_SOURCE", args.hash_source or "parsed") # parsed|raw service = IngestionService(collection_prefix=args.prefix)
norm = _env("MINDNET_HASH_NORMALIZE", args.hash_normalize or "canonical") # canonical|none
note_scope_refs_env = (_env("MINDNET_NOTE_SCOPE_REFS", "false") == "true") files = iter_md(os.path.abspath(args.vault))
note_scope_refs = args.note_scope_refs or note_scope_refs_env print(f"Found {len(files)} files in vault.")
compare_text = args.compare_text or (_env("MINDNET_COMPARE_TEXT", "false") == "true")
# Qdrant
cfg = QdrantConfig.from_env()
if args.prefix:
cfg.prefix = args.prefix.strip()
client = get_client(cfg)
ensure_collections(client, cfg.prefix, cfg.dim)
ensure_payload_indexes(client, cfg.prefix)
# Type-Registry laden (optional)
reg = load_type_registry()
root = os.path.abspath(args.vault)
# Dateiliste
if args.only_path:
only = os.path.abspath(args.only_path)
files = [only]
else:
files = iter_md(root)
if not files:
print("Keine Markdown-Dateien gefunden.", file=sys.stderr)
sys.exit(2)
# Optional: Sync-Deletes vorab
if args.sync_deletes:
vault_note_ids: Set[str] = set()
for path in files:
try:
parsed = read_markdown(path)
if not parsed:
continue
fm = normalize_frontmatter(parsed.frontmatter)
nid = fm.get("id")
if isinstance(nid, str):
vault_note_ids.add(nid)
except Exception:
continue
qdrant_note_ids = list_qdrant_note_ids(client, cfg.prefix)
to_delete = sorted(qdrant_note_ids - vault_note_ids)
print(json.dumps({
"action": "sync-deletes",
"prefix": cfg.prefix,
"qdrant_total": len(qdrant_note_ids),
"vault_total": len(vault_note_ids),
"to_delete_count": len(to_delete),
"to_delete": to_delete[:50] + ([""] if len(to_delete) > 50 else [])
}, ensure_ascii=False))
if args.apply and to_delete:
for nid in to_delete:
print(json.dumps({"action": "delete", "note_id": nid, "decision": "apply"}))
delete_note_everywhere(client, cfg.prefix, nid)
key_current = f"{mode}:{src}:{norm}"
processed = 0 processed = 0
for path in files: errors = 0
try:
parsed = read_markdown(path) for f in files:
if not parsed: # Hier rufen wir den neuen Service
continue res = service.process_file(
except Exception as e: file_path=f,
print(json.dumps({"path": path, "error": f"read_markdown failed: {type(e).__name__}: {e}"})) vault_root=os.path.abspath(args.vault),
continue apply=args.apply,
force_replace=args.force_replace,
# --- Frontmatter prüfen --- purge_before=args.purge_before_upsert
try: )
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm) # Output nur wenn relevant (nicht skipped/unchanged)
except Exception as e: if res.get("status") not in ["skipped", "unchanged"]:
print(json.dumps({"path": path, "error": f"Frontmatter invalid: {type(e).__name__}: {e}"})) print(json.dumps(res, ensure_ascii=False))
continue processed += 1
if args.note_id and not args.only_path and fm.get("id") != args.note_id: if res.get("error"):
continue print(json.dumps(res, ensure_ascii=False), file=sys.stderr)
errors += 1
processed += 1
# --- Type-Registry anwenden (chunk_profile / retriever_weight) ---
try:
note_type = resolve_note_type(fm.get("type"), reg)
except Exception:
note_type = (fm.get("type") or "concept")
fm["type"] = note_type or fm.get("type") or "concept"
prof = effective_chunk_profile(note_type, reg)
if prof:
fm["chunk_profile"] = prof
weight = effective_retriever_weight(note_type, reg)
if weight is not None:
try:
fm["retriever_weight"] = float(weight)
except Exception:
pass # falls FM string-inkonsistent ist
# --- Payload aufbauen (inkl. Hashes) ---
try:
note_pl = make_note_payload(
parsed,
vault_root=root,
hash_mode=mode,
hash_normalize=norm,
hash_source=src,
file_path=path,
)
except Exception as e:
print(json.dumps({"path": path, "error": f"make_note_payload failed: {type(e).__name__}: {e}"}))
continue
if not note_pl.get("fulltext"):
note_pl["fulltext"] = getattr(parsed, "body", "") or ""
# retriever_weight sicher in Note-Payload spiegeln (für spätere Filter)
if "retriever_weight" not in note_pl and fm.get("retriever_weight") is not None:
try:
note_pl["retriever_weight"] = float(fm.get("retriever_weight"))
except Exception:
pass
note_id = note_pl.get("note_id") or fm.get("id")
if not note_id:
print(json.dumps({"path": path, "error": "Missing note_id after payload build"}))
continue
# --- bestehenden Payload laden (zum Diff) ---
old_payload = None if args.force_replace else fetch_existing_note_payload(client, cfg.prefix, note_id)
has_old = old_payload is not None
old_hashes = (old_payload or {}).get("hashes") or {}
old_hash_exact = old_hashes.get(key_current)
new_hash_exact = (note_pl.get("hashes") or {}).get(key_current)
needs_baseline = (old_hash_exact is None)
hash_changed = (old_hash_exact is not None and new_hash_exact is not None and old_hash_exact != new_hash_exact)
text_changed = False
if compare_text:
old_text = (old_payload or {}).get("fulltext") or ""
new_text = note_pl.get("fulltext") or ""
text_changed = (old_text != new_text)
changed = args.force_replace or (not has_old) or hash_changed or text_changed
do_baseline_only = (args.baseline_modes and has_old and needs_baseline and not changed)
# --- Chunks + Embeddings vorbereiten ---
try:
body_text = getattr(parsed, "body", "") or ""
chunks = assemble_chunks(fm["id"], body_text, fm.get("type", "concept"))
chunk_pls: List[Dict[str, Any]] = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "error": f"chunk build failed: {type(e).__name__}: {e}"}))
continue
# retriever_weight auf Chunk-Payload spiegeln
if fm.get("retriever_weight") is not None:
try:
rw = float(fm.get("retriever_weight"))
for pl in chunk_pls:
# Feld nur setzen, wenn noch nicht vorhanden
if "retriever_weight" not in pl:
pl["retriever_weight"] = rw
except Exception:
pass
# Embeddings (fallback: Nullvektoren)
vecs: List[List[float]] = [[0.0] * int(cfg.dim) for _ in chunk_pls]
if embed_texts and chunk_pls:
try:
texts_for_embed = [(pl.get("window") or pl.get("text") or "") for pl in chunk_pls]
vecs = embed_texts(texts_for_embed)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "warn": f"embed_texts failed, using zeros: {e}"}))
# --- Fehlende Artefakte in Qdrant ermitteln ---
chunks_missing, edges_missing = artifacts_missing(client, cfg.prefix, note_id)
# --- Edges (robust) ---
edges: List[Dict[str, Any]] = []
edges_failed = False
should_build_edges = (changed and (not do_baseline_only)) or edges_missing
if should_build_edges:
try:
note_refs = note_pl.get("references") or []
edges = build_edges_for_note(
note_id,
chunk_pls,
note_level_references=note_refs,
include_note_scope_refs=note_scope_refs,
)
except Exception as e:
edges_failed = True
edges = []
print(json.dumps({"path": path, "note_id": note_id, "warn": f"build_edges_for_note failed, skipping edges: {type(e).__name__}: {e}"}))
# --- Summary (stdout) ---
summary = {
"note_id": note_id,
"title": fm.get("title"),
"chunks": len(chunk_pls),
"edges": len(edges),
"edges_failed": edges_failed,
"changed": changed,
"chunks_missing": chunks_missing,
"edges_missing": edges_missing,
"needs_baseline_for_mode": needs_baseline,
"decision": ("baseline-only" if args.apply and do_baseline_only else
"apply" if args.apply and (changed or chunks_missing or edges_missing) else
"apply-skip-unchanged" if args.apply and not (changed or chunks_missing or edges_missing) else
"dry-run"),
"path": note_pl["path"],
"hash_mode": mode,
"hash_normalize": norm,
"hash_source": src,
"prefix": cfg.prefix,
}
print(json.dumps(summary, ensure_ascii=False))
# --- Writes ---
if not args.apply:
continue
if do_baseline_only:
merged_hashes = {}
merged_hashes.update(old_hashes)
merged_hashes.update(note_pl.get("hashes") or {})
if old_payload:
note_pl["hash_fulltext"] = old_payload.get("hash_fulltext", note_pl.get("hash_fulltext"))
note_pl["hash_signature"] = old_payload.get("hash_signature", note_pl.get("hash_signature"))
note_pl["hashes"] = merged_hashes
notes_name, note_pts = points_for_note(cfg.prefix, note_pl, None, cfg.dim)
upsert_batch(client, notes_name, note_pts)
continue
# Wenn nichts geändert und keine Artefakte fehlen → nichts zu tun
if not changed and not (chunks_missing or edges_missing):
continue
# Purge nur bei echten Änderungen (unverändert + fehlende Artefakte ≠ Purge)
if args.purge_before_upsert and has_old and changed:
try:
purge_note_artifacts(client, cfg.prefix, note_id)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "warn": f"purge failed: {e}"}))
# Note nur bei Änderungen neu schreiben
if changed:
notes_name, note_pts = points_for_note(cfg.prefix, note_pl, None, cfg.dim)
upsert_batch(client, notes_name, note_pts)
# Chunks schreiben, wenn geändert ODER vorher fehlend
if chunk_pls and (changed or chunks_missing):
chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs)
upsert_batch(client, chunks_name, chunk_pts)
# Edges schreiben, wenn vorhanden und (geändert ODER vorher fehlend)
if edges and (changed or edges_missing):
edges_name, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_name, edge_pts)
print(f"Done. Processed notes: {processed}")
print(f"Done. Processed/Changed: {processed}. Errors: {errors}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()