Dateien nach "scripts" hochladen
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s

This commit is contained in:
Lars 2025-11-08 15:25:18 +01:00
parent 31457e9240
commit 3a88caac69

View File

@ -1,236 +1,447 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" """
import_markdown.py v3.9.0 Script: scripts/import_markdown.py Markdown Qdrant (Notes, Chunks, Edges)
Version: 3.8.0
Datum: 2025-11-08
Zweck: Erweiterung (WP-03 Type-Registry)
- Idempotenter Import von Markdown-Notizen (Obsidian-Vault) in Qdrant: ---------------------------------
* Notes, Chunks, Edges - Lädt optional config/types.yaml.
* Hash-/Baseline-Mechanik (unverändert, falls schon vorhanden) - Unbekannte/fehlende Typen Fallback "concept" (Warnung, kein Abbruch).
* UTF-8 robust (mit Fallback auf cp1252, Logging) - chunk_profile des Typs wird an make_chunk_payloads(...) übergeben (optional).
* Optional: note_scope_refs - retriever_weight wird falls vorhanden als Feld im Note-Payload gespeichert.
- NEU: Type-Registry wird gelesen und an Chunk-/Edge-Erzeugung gereicht, - edge_defaults["references"] aktiviert *additiv* Note-Scope-References/Backlinks.
ohne bestehende Funktionalität zu brechen.
Kompatibilität: Abwärtskompatibel: Ohne Registry/Typ bleibt das Verhalten identisch zum Stand 20251105.
- Nutzt vorhandene parser-, qdrant- und points-Hilfsfunktionen mit
unveränderten Namen/Signaturen.
- Erwartete Funktionen (nicht geändert):
* app.core.parser.read_markdown(path) -> ParsedNote(frontmatter, body, title, ...)
* app.core.chunker.chunk_markdown(body, note_type) -> List[Chunk]
* app.core.chunk_payload.make_chunk_payloads(chunks, note_id, note_title, note_type, note_path, ...)
* app.core.derive_edges.build_edges_for_note(...)
* app.core.qdrant_points.{ensure_collections_for_prefix, upsert_notes, upsert_chunks, upsert_edges, delete_by_filter}
* app.core.qdrant.get_client(), QdrantConfig.from_env()
- Hashing/Signature/Compare-Varianten bleiben unangetastet (werden nur verwendet, wenn vorhanden).
Aufrufbeispiele:
python3 -m scripts.import_markdown --vault ./test_vault
python3 -m scripts.import_markdown --vault ./test_vault --apply
python3 -m scripts.import_markdown --vault ./test_vault --apply --purge-before-upsert
python3 -m scripts.import_markdown --vault ./vault --apply --prefix "$COLLECTION_PREFIX" --note-scope-refs
""" """
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import json import json
import os import os
import sys import sys
from typing import Any, Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple, Any, Set
# Parser / Chunker / Payload / Edges (bestehende Module) from dotenv import load_dotenv
from app.core.parser import read_markdown # type: ignore from qdrant_client.http import models as rest
from app.core.chunker import chunk_markdown # type: ignore
from app.core.chunk_payload import make_chunk_payloads # type: ignore
from app.core.derive_edges import build_edges_for_note # type: ignore
# Qdrant-Zugriff (bestehende Helfer, Signaturen beibehalten) from app.core.parser import (
from app.core.qdrant import QdrantConfig, get_client # type: ignore read_markdown,
from app.core.qdrant_points import ( # type: ignore normalize_frontmatter,
ensure_collections_for_prefix, validate_required_frontmatter,
upsert_notes, )
upsert_chunks, from app.core.note_payload import make_note_payload
upsert_edges, from app.core.chunker import assemble_chunks
delete_by_filter, from app.core.chunk_payload import make_chunk_payloads
try:
from app.core.derive_edges import build_edges_for_note
except Exception: # pragma: no cover
from app.core.edges import build_edges_for_note # type: ignore
from app.core.qdrant import (
QdrantConfig,
get_client,
ensure_collections,
ensure_payload_indexes,
)
from app.core.qdrant_points import (
points_for_chunks,
points_for_note,
points_for_edges,
upsert_batch,
) )
# Optional: Registry (kein harter Fehler wenn nicht vorhanden) # NEU: Type-Registry (optional)
try: try:
from app.core.type_registry import resolve_chunk_profile from app.core.type_registry import load_type_registry, resolve_note_type, get_type_config, effective_chunk_profile
except Exception: except Exception:
def resolve_chunk_profile(note_type: str, default_profile: str = "default") -> str: load_type_registry = None # type: ignore
return default_profile resolve_note_type = None # type: ignore
get_type_config = None # type: ignore
effective_chunk_profile = None # type: ignore
# --- CLI --- try:
from app.core.embed import embed_texts # optional
except Exception:
embed_texts = None
def _cli() -> argparse.Namespace:
p = argparse.ArgumentParser("import_markdown.py")
p.add_argument("--vault", required=True, help="Pfad zum Vault-Root (Ordner).")
p.add_argument("--apply", action="store_true", help="Änderungen wirklich upserten (sonst Dry-Run).")
p.add_argument("--purge-before-upsert", action="store_true", help="Vor Upsert Daten je Note in Collections entfernen.")
p.add_argument("--prefix", default=os.getenv("COLLECTION_PREFIX", os.getenv("MINDNET_PREFIX", "")),
help="Sammlungspräfix in Qdrant (override).")
p.add_argument("--note-scope-refs", action="store_true",
help="Referenzen ([[...]]) auf Note-Ebene (statt chunk-basiert).")
p.add_argument("--encoding", default="utf-8", help="Bevorzugtes Encoding für .md (Default: utf-8).")
return p.parse_args()
# --- Hilfsfunktionen --- # ---------------------------------------------------------------------
# Helper
# ---------------------------------------------------------------------
def _iter_md_files(root: str) -> List[str]: def iter_md(root: str) -> List[str]:
md_paths: List[str] = [] out: List[str] = []
for base, _, files in os.walk(root): for dirpath, _, filenames in os.walk(root):
for fn in files: for fn in filenames:
if fn.lower().endswith(".md"): if not fn.lower().endswith(".md"):
md_paths.append(os.path.join(base, fn)) continue
md_paths.sort() p = os.path.join(dirpath, fn)
return md_paths pn = p.replace("\\", "/")
if any(ex in pn for ex in ["/.obsidian/", "/_backup_frontmatter/", "/_imported/"]):
continue
out.append(p)
return sorted(out)
def _rel_path(root: str, path: str) -> str:
return os.path.relpath(path, root).replace("\\", "/")
def _safe_read_markdown(path: str, prefer_encoding: str = "utf-8") -> Tuple[Optional[Any], Optional[str]]: def collections(prefix: str) -> Tuple[str, str, str]:
""" return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
UTF-8 lesen; bei Fehler Fallback auf cp1252. Liefert (ParsedNote|None, used_encoding|None).
"""
try: def fetch_existing_note_payload(client, prefix: str, note_id: str) -> Optional[Dict]:
parsed = read_markdown(path) notes_col, _, _ = collections(prefix)
return parsed, prefer_encoding f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
except UnicodeDecodeError: points, _ = client.scroll(
# encoding fallback wird über parser intern gelöst? Falls nicht, hier ein Hinweis: collection_name=notes_col,
# Wir loggen nur, read_markdown aus eurem Parser bleibt die Quelle der Wahrheit. scroll_filter=f,
with_payload=True,
with_vectors=False,
limit=1,
)
if not points:
return None
return points[0].payload or {}
def list_qdrant_note_ids(client, prefix: str) -> Set[str]:
notes_col, _, _ = collections(prefix)
out: Set[str] = set()
next_page = None
while True:
pts, next_page = client.scroll(
collection_name=notes_col,
with_payload=True,
with_vectors=False,
limit=256,
offset=next_page,
)
if not pts:
break
for p in pts:
pl = p.payload or {}
nid = pl.get("note_id")
if isinstance(nid, str):
out.add(nid)
if next_page is None:
break
return out
def purge_note_artifacts(client, prefix: str, note_id: str) -> None:
_, chunks_col, edges_col = collections(prefix)
filt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
for col in (chunks_col, edges_col):
try: try:
# Viele Parser akzeptieren den Inhalt unabhängig vom Encoding; client.delete(
# falls euer Parser zwingend UTF-8 erwartet, müsst ihr dort (parser.py) collection_name=col,
# tolerant implementieren. Wir geben nur ein Log aus: points_selector=rest.FilterSelector(filter=filt),
print(json.dumps({"path": path, "warn": "encoding_fallback_used", "used": "cp1252"})) wait=True
parsed = read_markdown(path) # euer Parser sollte inzwischen tolerant sein )
return parsed, "cp1252"
except Exception as e: except Exception as e:
return None, None print(json.dumps({"note_id": note_id, "warn": f"delete in {col} via filter failed: {e}"}))
except Exception:
return None, None
# --- Main ---
def delete_note_everywhere(client, prefix: str, note_id: str) -> None:
notes_col, chunks_col, edges_col = collections(prefix)
filt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
for col in (edges_col, chunks_col, notes_col):
try:
client.delete(
collection_name=col,
points_selector=rest.FilterSelector(filter=filt),
wait=True
)
except Exception as e:
print(json.dumps({"note_id": note_id, "warn": f"delete in {col} failed: {e}"}))
def _resolve_mode(val: Optional[str]) -> str:
v = (val or os.environ.get("MINDNET_HASH_MODE") or os.environ.get("MINDNET_HASH_COMPARE") or "body").strip().lower()
if v in ("full", "fulltext", "body+frontmatter", "bodyplusfrontmatter"):
return "full"
if v in ("frontmatter", "fm"):
return "frontmatter"
return "body"
def _env(key: str, default: str) -> str:
return (os.environ.get(key) or default).strip().lower()
# ---------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------
def main() -> None: def main() -> None:
args = _cli() load_dotenv()
vault = os.path.abspath(args.vault) ap = argparse.ArgumentParser()
apply = args.apply ap.add_argument("--vault", required=True, help="Pfad zum Obsidian-Vault (Root-Ordner)")
purge = args.purge_before_upsert ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant; ohne Flag nur Dry-Run")
prefix = (args.prefix or "").strip() ap.add_argument("--purge-before-upsert", action="store_true",
note_scope_refs = args.note_scope_refs help="Vor Upsert Chunks & Edges der GEÄNDERTEN Note löschen")
ap.add_argument("--note-id", help="Nur eine bestimmte Note-ID verarbeiten")
ap.add_argument("--only-path", help="Exakt diesen Markdown-Pfad verarbeiten (ignoriert --note-id)")
ap.add_argument("--embed-note", action="store_true", help="Optional: Note-Volltext einbetten")
ap.add_argument("--force-replace", action="store_true",
help="Änderungserkennung ignorieren und immer upserten (+ optional Purge)")
ap.add_argument("--hash-mode", choices=["body", "frontmatter", "full"], default=None,
help="Vergleichsmodus (Body | Frontmatter | Full)")
ap.add_argument("--hash-normalize", choices=["canonical", "none"], default=None)
ap.add_argument("--hash-source", choices=["parsed", "raw"], default=None,
help="Quelle für die Hash-Berechnung (Default: parsed)")
ap.add_argument("--note-scope-refs", action="store_true",
help="(Optional) erzeugt zusätzlich references:note/backlink:note (Default: aus)")
ap.add_argument("--debug-hash-diff", action="store_true",
help="(reserviert) optionaler Body-Diff")
ap.add_argument("--compare-text", action="store_true",
help="Parsed fulltext zusätzlich direkt vergleichen (über Hash hinaus)")
ap.add_argument("--baseline-modes", action="store_true",
help="Fehlende Hash-Varianten im Feld 'hashes' still nachtragen (Upsert NUR Notes)")
ap.add_argument("--sync-deletes", action="store_true",
help="Notes/Chunks/Edges löschen, die in Qdrant existieren aber im Vault fehlen (Dry-Run; mit --apply ausführen)")
ap.add_argument("--prefix", help="Collection-Prefix (überschreibt ENV COLLECTION_PREFIX)")
args = ap.parse_args()
mode = _resolve_mode(args.hash_mode) # body|frontmatter|full
src = _env("MINDNET_HASH_SOURCE", args.hash_source or "parsed") # parsed|raw
norm = _env("MINDNET_HASH_NORMALIZE", args.hash_normalize or "canonical") # canonical|none
note_scope_refs_env = (_env("MINDNET_NOTE_SCOPE_REFS", "false") == "true")
note_scope_refs_flag = args.note_scope_refs or note_scope_refs_env
compare_text = args.compare_text or (_env("MINDNET_COMPARE_TEXT", "false") == "true")
# Qdrant-Client + Collections sicherstellen
cfg = QdrantConfig.from_env() cfg = QdrantConfig.from_env()
if args.prefix:
cfg.prefix = args.prefix.strip()
client = get_client(cfg) client = get_client(cfg)
collections = ensure_collections_for_prefix(client=client, prefix=prefix) ensure_collections(client, cfg.prefix, cfg.dim)
ensure_payload_indexes(client, cfg.prefix)
md_files = _iter_md_files(vault) # Type-Registry laden (optional)
processed = 0 reg = None
if load_type_registry is not None:
reg = load_type_registry()
if reg.get("_using_defaults"):
print(json.dumps({"warn": "type_registry_missing_or_invalid", "info": reg.get("_warning")}))
for path in md_files: root = os.path.abspath(args.vault)
rel = _rel_path(vault, path)
parsed, used_enc = _safe_read_markdown(path, prefer_encoding=args.encoding)
if parsed is None or not getattr(parsed, "frontmatter", None):
print(json.dumps({"path": path, "error": "read_markdown failed"}))
continue
fm = dict(parsed.frontmatter or {}) # Dateiliste bestimmen
note_id = str(fm.get("id") or "").strip() or os.path.splitext(os.path.basename(path))[0] if args.only_path:
note_title = str(fm.get("title") or parsed.title or note_id) only = os.path.abspath(args.only_path)
note_type = str(fm.get("type") or "concept") files = [only]
else:
files = iter_md(root)
if not files:
print("Keine Markdown-Dateien gefunden.", file=sys.stderr)
sys.exit(2)
# Chunking (Registry-Profile → chunk_payload erzeugt 'window' abhängig vom Profil) # Optional: Sync-Deletes vorab
body = getattr(parsed, "body", "") or "" if args.sync_deletes:
chunks = chunk_markdown(body, note_type) vault_note_ids: Set[str] = set()
chunk_profile = resolve_chunk_profile(note_type) for path in files:
try:
chunk_payloads = make_chunk_payloads( parsed = read_markdown(path)
chunks=chunks, if not parsed:
note_id=note_id, continue
note_title=note_title, fm = normalize_frontmatter(parsed.frontmatter)
note_type=note_type, nid = fm.get("id")
note_path=rel, if isinstance(nid, str):
chunk_profile=chunk_profile, vault_note_ids.add(nid)
# window_overwrite=None # falls du das per Env steuern willst, ergänzbar except Exception:
) continue
qdrant_note_ids = list_qdrant_note_ids(client, cfg.prefix)
# Edges erzeugen (inkl. Registry-Defaults harmoniert mit eurem derive_edges) to_delete = sorted(qdrant_note_ids - vault_note_ids)
edges = build_edges_for_note(
note_id=note_id,
note_type=note_type,
chunks=chunk_payloads,
frontmatter=fm,
body_text=body,
note_scope_refs=note_scope_refs,
)
# Note-Payload (ohne Vektor; Embeddings baut ihr upstream/downstream)
note_payload = {
"note_id": note_id,
"title": note_title,
"type": note_type,
"path": rel,
"status": fm.get("status"),
"created": fm.get("created"),
"tags": fm.get("tags", []),
# Optional: retriever_weight aus Registry ablegen? → möglich,
# aber nicht verpflichtend für WP-03. (kann später ergänzt werden)
# "retriever_weight": get_retriever_weight_for_type(note_type),
}
# Dry-run Log (vor Upsert)
print(json.dumps({ print(json.dumps({
"note_id": note_id, "action": "sync-deletes",
"title": note_title, "prefix": cfg.prefix,
"chunks": len(chunk_payloads), "qdrant_total": len(qdrant_note_ids),
"edges": len(edges), "vault_total": len(vault_note_ids),
"changed": True, # Hash/Baseline-Logik bleibt eurer bestehenden Implementierung vorbehalten "to_delete_count": len(to_delete),
"decision": ("apply" if apply else "dry-run"), "to_delete": to_delete[:50] + ([""] if len(to_delete) > 50 else [])
"path": rel, }, ensure_ascii=False))
"hash_mode": os.getenv("MINDNET_HASH_COMPARE", "body"), if args.apply and to_delete:
"hash_normalize": os.getenv("MINDNET_HASH_NORMALIZE", "canonical"), for nid in to_delete:
"hash_source": os.getenv("MINDNET_HASH_SOURCE", "parsed"), print(json.dumps({"action": "delete", "note_id": nid, "decision": "apply"}))
"prefix": prefix, delete_note_everywhere(client, cfg.prefix, nid)
}))
if not apply: key_current = f"{mode}:{src}:{norm}"
processed += 1
processed = 0
for path in files:
# -------- Parse & Validate --------
try:
parsed = read_markdown(path)
except Exception as e:
print(json.dumps({"path": path, "error": f"read_markdown failed: {type(e).__name__}: {e}"}))
continue
if parsed is None:
print(json.dumps({"path": path, "error": "read_markdown returned None"}))
continue continue
# Optional: Purge vor Upsert pro Note try:
if purge: fm = normalize_frontmatter(parsed.frontmatter)
# delete_by_filter erwartet i. d. R. {key: value}-Filter; je Collection separat validate_required_frontmatter(fm)
delete_by_filter(client, collections["notes"], {"note_id": note_id}) except Exception as e:
delete_by_filter(client, collections["chunks"], {"note_id": note_id}) print(json.dumps({"path": path, "error": f"Frontmatter invalid: {type(e).__name__}: {e}"}))
delete_by_filter(client, collections["edges"], {"note_id": note_id}) continue
# Upserts if args.note_id and not args.only_path and fm.get("id") != args.note_id:
# Wichtig: eure upsert_* erwarten typischerweise 'points' mit point_id/uuid etc. continue
# Wir verwenden exakt eure Utilities, ohne die ID-Strategie zu verändern.
upsert_notes(client, collections["notes"], [ {"id": note_id, "payload": note_payload} ])
if chunk_payloads:
upsert_chunks(client, collections["chunks"], [
{"id": cp["chunk_id"], "payload": cp} for cp in chunk_payloads
])
if edges:
upsert_edges(client, collections["edges"], [
{"payload": e} for e in edges
])
processed += 1 processed += 1
# Abschluss-Log # -------- Type-Registry: Typvalidierung & Konfiguration --------
print(json.dumps({ fm_type = (fm.get("type") or "concept")
"summary": "done", if resolve_note_type is not None:
"processed": processed, resolved_type = resolve_note_type(fm_type, reg or {})
"prefix": prefix, else:
"collections": collections, resolved_type = (fm_type or "concept")
"counts": { type_cfg = get_type_config(resolved_type, reg or {"types":{"concept":{}}}) if get_type_config else {}
"notes": 0, # Optional: könntet ihr via count_points auffüllen chunk_profile = effective_chunk_profile(resolved_type, reg or {}) if effective_chunk_profile else None
"chunks": 0, retriever_weight = type_cfg.get("retriever_weight")
"edges": 0
# -------- Build new payload (includes 'hashes') --------
note_pl = make_note_payload(
parsed,
vault_root=root,
hash_mode=mode,
hash_normalize=norm,
hash_source=src,
file_path=path,
)
if not note_pl.get("fulltext"):
note_pl["fulltext"] = getattr(parsed, "body", "") or ""
# retriever_weight (optional) persistieren
if isinstance(retriever_weight, (int, float)):
note_pl["retriever_weight"] = float(retriever_weight)
note_id = note_pl.get("note_id") or fm.get("id")
if not note_id:
print(json.dumps({"path": path, "error": "Missing note_id after payload build"}))
continue
# -------- Fetch old payload --------
old_payload = None if args.force_replace else fetch_existing_note_payload(client, cfg.prefix, note_id)
has_old = old_payload is not None
old_hashes = (old_payload or {}).get("hashes") or {}
old_hash_exact = old_hashes.get(key_current)
new_hash_exact = (note_pl.get("hashes") or {}).get(key_current)
needs_baseline = (old_hash_exact is None)
hash_changed = (old_hash_exact is not None and new_hash_exact is not None and old_hash_exact != new_hash_exact)
text_changed = False
if compare_text:
old_text = (old_payload or {}).get("fulltext") or ""
new_text = note_pl.get("fulltext") or ""
text_changed = (old_text != new_text)
changed = args.force_replace or (not has_old) or hash_changed or text_changed
do_baseline_only = (args.baseline_modes and has_old and needs_baseline and not changed)
# -------- Chunks / Embeddings --------
chunk_pls: List[Dict[str, Any]] = []
try:
body_text = getattr(parsed, "body", "") or ""
# assemble_chunks nutzt weiterhin den Note-Typ (keine Breaking Changes)
chunks = assemble_chunks(fm["id"], body_text, resolved_type)
# chunk_profile beeinflusst ggf. nur die Fenster-Overlap-Synthese
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text, chunk_profile=chunk_profile)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "error": f"chunk build failed: {type(e).__name__}: {e}"}))
continue
vecs: List[List[float]] = [[0.0] * cfg.dim for _ in chunk_pls]
if embed_texts and chunk_pls:
try:
texts_for_embed = [(pl.get("window") or pl.get("text") or "") for pl in chunk_pls]
vecs = embed_texts(texts_for_embed)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "warn": f"embed_texts failed, using zeros: {e}"}))
# -------- Edges (robust) --------
edges: List[Dict[str, Any]] = []
edges_failed = False
if changed and (not do_baseline_only):
try:
note_refs = note_pl.get("references") or []
# Registry kann note-scope references additiv anschalten
edge_defaults = [e for e in (type_cfg.get("edge_defaults") or []) if isinstance(e, str)]
eff_note_scope_refs = bool(note_scope_refs_flag or ("references" in edge_defaults))
edges = build_edges_for_note(
note_id,
chunk_pls,
note_level_references=note_refs,
include_note_scope_refs=eff_note_scope_refs,
)
except Exception as e:
edges_failed = True
edges = []
# WICHTIG: Wir brechen NICHT mehr ab — Note & Chunks werden geschrieben.
print(json.dumps({"path": path, "note_id": note_id, "warn": f"build_edges_for_note failed, skipping edges: {type(e).__name__}: {e}"}))
# -------- Summary --------
summary = {
"note_id": note_id,
"title": fm.get("title"),
"type": resolved_type,
"chunk_profile": chunk_profile,
"retriever_weight": retriever_weight,
"chunks": len(chunk_pls),
"edges": len(edges),
"edges_failed": edges_failed,
"changed": changed,
"needs_baseline_for_mode": needs_baseline,
"decision": ("baseline-only" if args.apply and do_baseline_only else
"apply" if args.apply and changed else
"apply-skip-unchanged" if args.apply and not changed else
"dry-run"),
"path": note_pl["path"],
"hash_mode": mode,
"hash_normalize": norm,
"hash_source": src,
"prefix": cfg.prefix,
} }
})) print(json.dumps(summary, ensure_ascii=False))
# -------- Writes --------
if not args.apply:
continue
if do_baseline_only:
merged_hashes = {}
merged_hashes.update(old_hashes)
merged_hashes.update(note_pl.get("hashes") or {})
if old_payload:
note_pl["hash_fulltext"] = old_payload.get("hash_fulltext", note_pl.get("hash_fulltext"))
note_pl["hash_signature"] = old_payload.get("hash_signature", note_pl.get("hash_signature"))
note_pl["hashes"] = merged_hashes
notes_name, note_pts = points_for_note(cfg.prefix, note_pl, None, cfg.dim)
upsert_batch(client, notes_name, note_pts)
continue
if not changed:
continue
if args.purge_before_upsert and has_old:
try:
purge_note_artifacts(client, cfg.prefix, note_id)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "warn": f"purge failed: {e}"}))
notes_name, note_pts = points_for_note(cfg.prefix, note_pl, None, cfg.dim)
upsert_batch(client, notes_name, note_pts)
if chunk_pls:
chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs)
upsert_batch(client, chunks_name, chunk_pts)
if edges:
edges_name, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_name, edge_pts)
print(f"Done. Processed notes: {processed}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()