scriptAudit #11

Merged
Lars merged 24 commits from scriptAudit into main 2025-12-16 18:55:45 +01:00
Showing only changes of commit bf8a814c58 - Show all commits

View File

@ -3,241 +3,222 @@
"""
resolve_unresolved_references.py Unaufgelöste Wikilinks in Qdrant nachträglich auflösen
Version: 1.0.0 (2025-09-05)
Version: 1.1.0 (Fixed for v2.6 Architecture)
Zweck
------
- Findet Edges in {prefix}_edges mit payload.status=="unresolved" und versucht, den Zielknoten
anhand bereits vorhandener Notes in {prefix}_notes aufzulösen.
- Aktualisiert die Edges (setzt target_id, entfernt status, setzt resolution), und erzeugt
NUR für Note-Level 'references' die symmetrische 'backlink'-Kante.
Warum?
------
- Beim ersten Import können Links auf (noch) nicht existierende Notizen zeigen.
- Sobald die Zielnotiz später existiert, kann dieses Skript die Kanten reparieren.
- Findet Edges in {prefix}_edges mit payload.status=="unresolved".
- Baut einen In-Memory Index aller Notizen (Titel/Alias -> ID).
- Aktualisiert die Edges (setzt target_id, entfernt status).
- Erzeugt symmetrische 'backlink'-Kanten für 'references'.
Aufruf
------
# Dry-Run (Standard):
python3 -m scripts.resolve_unresolved_references --prefix mindnet
# Anwenden:
python3 -m scripts.resolve_unresolved_references --prefix mindnet --apply
# Optional: nur X Edges anfassen
python3 -m scripts.resolve_unresolved_references --prefix mindnet --apply --limit 500
Parameter
---------
--prefix : Collection-Prefix (Default: aus Env COLLECION_PREFIX oder "mindnet")
--apply : Änderungen tatsächlich schreiben (ohne --apply = Dry-Run)
--limit : Max. Anzahl unaufgelöster Edges, die in diesem Lauf bearbeitet werden (Default: keine Begrenzung)
--batch : Upsert-Batchgröße (Default: 512)
Voraussetzungen / Hinweise
--------------------------
- Bitte im aktivierten venv laufen lassen (deine Umgebung: `.venv`).
- Qdrant-URL/Key/Prefix/Vektor-Dim werden wie üblich aus ENV gelesen (sieh app/core/qdrant.py). # noqa
- Nutzt die vorhandenen Utilities:
- app/core/qdrant.py (Client/Collections)
- app/core/qdrant_points.py (points_for_edges/upsert_batch)
- app/core/derive_edges.py (build_note_index/resolve_target)
Änderungshistorie
-----------------
1.0.0 Erstveröffentlichung.
python3 -m scripts.resolve_unresolved_references --apply
"""
from __future__ import annotations
import argparse
import logging
import json
from typing import Any, Dict, List, Tuple, Iterable
import uuid
from typing import List, Dict, Any, Iterable
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
from app.core.qdrant import QdrantConfig, get_client, ensure_collections, collection_names # :contentReference[oaicite:3]{index=3}
from app.core.qdrant_points import points_for_edges, upsert_batch # :contentReference[oaicite:4]{index=4}
from app.core.derive_edges import build_note_index, resolve_target # :contentReference[oaicite:5]{index=5}
def _scroll(client: QdrantClient, **kwargs):
"""
Wrapper um qdrant_client.scroll() für unterschiedliche Client-Versionen:
neuere: (points, next_offset)
ältere: (points, next_page_offset, _)
"""
res = client.scroll(**kwargs)
if isinstance(res, tuple):
if len(res) == 2:
points, next_off = res
else:
# ältere Signatur: (points, next_off, _)
points, next_off, _ = res[0], res[1], res[2]
else:
# sehr alte Clients -> konservativ behandeln
points, next_off = res, None
return points, next_off
def _load_all_notes(client: QdrantClient, notes_col: str) -> List[Dict[str, Any]]:
notes: List[Dict[str, Any]] = []
next_off = None
while True:
pts, next_off = _scroll(
client,
collection_name=notes_col,
with_payload=True,
with_vectors=False,
limit=1024,
offset=next_off,
)
for p in pts or []:
pl = getattr(p, "payload", {}) or {}
# Erwartet Felder: note_id, title, path etc. (gemäß Schema) # :contentReference[oaicite:6]{index=6}
if pl.get("note_id"):
notes.append(pl)
if not next_off:
break
return notes
def _iter_unresolved_edges(client: QdrantClient, edges_col: str) -> Iterable[rest.Record]:
"""
Liefert alle Edge-Records mit payload.status == 'unresolved' und 'target_label' (string).
"""
f = rest.Filter(
must=[
rest.FieldCondition(key="status", match=rest.MatchValue(value="unresolved")),
]
)
next_off = None
while True:
pts, next_off = _scroll(
client,
collection_name=edges_col,
scroll_filter=f,
with_payload=True,
with_vectors=False,
limit=1024,
offset=next_off,
)
for p in pts or []:
pl = getattr(p, "payload", {}) or {}
if isinstance(pl.get("target_label"), str):
yield p
if not next_off:
break
from qdrant_client import models
from app.core.qdrant import QdrantConfig, get_client
from app.core.qdrant_points import points_for_edges
# Logging Setup
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
def _make_backlink(source_note_id: str, target_note_id: str, extra: Dict[str, Any]) -> Dict[str, Any]:
"""
Baue eine 'backlink'-Edge-Payload source <- target (note-level).
Hilfsfunktion: Erzeugt die Payload für den Backlink.
"""
e = {
"kind": "backlink",
return {
"source_id": target_note_id,
"target_id": source_note_id,
"kind": "backlink",
"scope": "note",
"text": f"Backlink from {extra.get('alias') or 'note'}",
"rule_id": "derived:backlink",
"confidence": 0.9
}
# Metafelder aus dem Original übernehmen (ohne status)
copy_keys = ["raw", "alias", "heading", "resolution"]
for k in copy_keys:
if k in extra:
e[k] = extra[k]
return e
def build_lookup_index(client, collection_name: str) -> Dict[str, str]:
"""
Lädt ALLE Notizen und baut ein Mapping:
lower(title) -> note_id
lower(alias) -> note_id
"""
logger.info("Building lookup index from existing notes...")
lookup = {}
# Scroll über alle Notizen
next_offset = None
count = 0
while True:
records, next_offset = client.scroll(
collection_name=collection_name,
limit=1000,
offset=next_offset,
with_payload=True,
with_vectors=False
)
for record in records:
pl = record.payload or {}
nid = pl.get("note_id")
if not nid: continue
# 1. Titel
title = pl.get("title")
if title:
lookup[str(title).lower().strip()] = nid
# 2. Aliases (WP-11)
aliases = pl.get("aliases", [])
if isinstance(aliases, str): aliases = [aliases]
for a in aliases:
lookup[str(a).lower().strip()] = nid
count += len(records)
if next_offset is None:
break
logger.info(f"Index built. Mapped {len(lookup)} terms to {count} unique notes.")
return lookup
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--prefix", help="Collection-Prefix (Default: Env/COLLECTION_PREFIX oder 'mindnet')")
ap.add_argument("--apply", action="store_true", help="Änderungen schreiben (ohne Flag = Dry-Run)")
ap.add_argument("--limit", type=int, default=0, help="Max. Anzahl unaufgelöster Edges bearbeiten (0 = alle)")
ap.add_argument("--batch", type=int, default=512, help="Upsert-Batchgröße")
args = ap.parse_args()
parser = argparse.ArgumentParser()
parser.add_argument("--prefix", default=None, help="Collection prefix")
parser.add_argument("--apply", action="store_true", help="Write changes to DB")
parser.add_argument("--limit", type=int, default=0, help="Max edges to process (0=all)")
parser.add_argument("--batch", type=int, default=100, help="Upsert batch size")
args = parser.parse_args()
# Qdrant-Setup
cfg = QdrantConfig.from_env()
if args.prefix:
cfg.prefix = args.prefix
client = get_client(cfg)
ensure_collections(client, cfg.prefix, cfg.dim) # sorgt u. a. für 1D-Vektor-Collection bei Edges :contentReference[oaicite:7]{index=7}
notes_col, _, edges_col = collection_names(cfg.prefix) # :contentReference[oaicite:8]{index=8}
edges_col = f"{cfg.prefix}_edges"
notes_col = f"{cfg.prefix}_notes"
# Notes laden & Index bauen
notes = _load_all_notes(client, notes_col)
idx = build_note_index(notes) # (by_id, by_slug, by_file_slug) :contentReference[oaicite:9]{index=9}
# Unresolved-Edges scannen
to_fix: List[dict] = []
backlinks: List[dict] = []
processed = 0
resolved = 0
for rec in _iter_unresolved_edges(client, edges_col):
if args.limit and processed >= args.limit:
break
processed += 1
pl = dict(rec.payload or {})
kind = pl.get("kind") or "references"
src = pl.get("source_id")
tgt_label = pl.get("target_label") or pl.get("target_id") # Fallback
# Zielauflösung
resolved_id, how = resolve_target(str(tgt_label), idx) # :contentReference[oaicite:10]{index=10}
if not resolved_id:
continue # weiterhin unresolved
# Edge-Update
new_pl = dict(pl)
new_pl["target_id"] = resolved_id
new_pl["resolution"] = how
if "status" in new_pl:
del new_pl["status"]
# ID stabil lassen -> points_for_edges erzeugt UUID aus edge_id/Fallback :contentReference[oaicite:11]{index=11}
if "edge_id" not in new_pl:
# stabiler Key aus (kind, src, tgt, evtl. seq)
seq = new_pl.get("seq") or new_pl.get("order") or ""
new_pl["edge_id"] = f"{kind}:{src}->{resolved_id}#{seq}"
to_fix.append(new_pl)
resolved += 1
# Nur bei Note-Level references (nicht references_at) -> Backlink erzeugen
if kind == "references":
extra = {k: new_pl.get(k) for k in ("raw", "alias", "heading")}
extra["resolution"] = how
backlinks.append(_make_backlink(source_note_id=src, target_note_id=resolved_id, extra=extra))
# Ergebnis ausgeben
summary = {
"prefix": cfg.prefix,
"scanned_unresolved": processed,
"resolved": resolved,
"backlinks_to_create": len(backlinks),
"apply": bool(args.apply),
}
print(json.dumps(summary, ensure_ascii=False))
if not args.apply:
# 1. Index aufbauen
try:
lookup_index = build_lookup_index(client, notes_col)
except Exception as e:
logger.error(f"Failed to build index: {e}")
return
# Upserts (in Batches)
def _batched(items: List[dict], n: int) -> Iterable[List[dict]]:
for i in range(0, len(items), n):
yield items[i : i + n]
# 2. Unresolved Edges finden
logger.info(f"Scanning for unresolved edges in {edges_col}...")
scroll_filter = models.Filter(
must=[
models.FieldCondition(key="status", match=models.MatchValue(value="unresolved"))
]
)
# 1) Updates für reparierte Edges
for chunk in _batched(to_fix, args.batch):
col, pts = points_for_edges(cfg.prefix, chunk) # sorgt für Edge-UUID & Dummy-Vector :contentReference[oaicite:12]{index=12}
upsert_batch(client, col, pts)
unresolved_edges = []
next_page = None
while True:
res, next_page = client.scroll(
collection_name=edges_col,
scroll_filter=scroll_filter,
limit=500,
with_payload=True,
offset=next_page
)
unresolved_edges.extend(res)
if next_page is None or (args.limit > 0 and len(unresolved_edges) >= args.limit):
break
if args.limit > 0:
unresolved_edges = unresolved_edges[:args.limit]
# 2) Backlinks (nur references)
for chunk in _batched(backlinks, args.batch):
col, pts = points_for_edges(cfg.prefix, chunk)
upsert_batch(client, col, pts)
logger.info(f"Found {len(unresolved_edges)} unresolved edges.")
# 3. Auflösen
to_fix = []
backlinks = []
resolved_count = 0
for pt in unresolved_edges:
pl = pt.payload
# Der gesuchte Begriff steckt oft in 'raw_target' (wenn Parser es speichert)
# oder wir nutzen die 'target_id', falls diese temporär den Namen hält (Legacy Parser Verhalten).
# Im v2.6 Parser ist die target_id bei unresolved links oft der slug oder name.
# Strategie: Wir schauen uns das Payload an.
# Fall A: derive_edges hat target_id="[[Missing Note]]" gesetzt (selten)
# Fall B: target_id ist der Slug/Titel in Kleinbuchstaben (häufig)
# Fall C: Es gibt ein Feld 'raw' oder 'text'
candidate = pl.get("target_id")
# Versuch der Auflösung
target_nid = lookup_index.get(str(candidate).lower().strip())
if target_nid:
# TREFFER!
new_pl = pl.copy()
new_pl["target_id"] = target_nid
new_pl.pop("status", None) # Status entfernen -> ist jetzt resolved
new_pl["resolution"] = "healed_by_script"
# Neue Edge ID generieren (Clean architecture)
# Wir behalten die alte ID NICHT, da die ID oft target_id enthält und wir Duplikate vermeiden wollen.
# Alternativ: Update auf bestehender ID. Wir machen hier ein Update.
to_fix.append({
"id": pt.id,
"payload": new_pl
})
# Backlink erzeugen? Nur wenn es eine Referenz ist
if pl.get("kind") == "references":
backlinks.append(_make_backlink(
source_note_id=pl.get("source_id"),
target_note_id=target_nid,
extra={"alias": candidate}
))
resolved_count += 1
logger.info(f"Resolvable: {resolved_count}/{len(unresolved_edges)}")
if not args.apply:
logger.info("DRY RUN. Use --apply to execute.")
return
# 4. Schreiben
if to_fix:
logger.info(f"Updating {len(to_fix)} edges...")
# Qdrant Update: Wir überschreiben den Point.
# Achtung: client.upsert erwartet PointStructs.
points_to_upsert = [
models.PointStruct(id=u["id"], payload=u["payload"], vector={})
for u in to_fix
]
# Batchweise
for i in range(0, len(points_to_upsert), args.batch):
batch = points_to_upsert[i:i+args.batch]
client.upsert(collection_name=edges_col, points=batch)
if backlinks:
logger.info(f"Creating {len(backlinks)} backlinks...")
# Hier nutzen wir den Helper aus qdrant_points für saubere IDs
col, bl_points = points_for_edges(backlinks, cfg.prefix)
# batchweise
for i in range(0, len(bl_points), args.batch):
batch = bl_points[i:i+args.batch]
client.upsert(collection_name=col, points=batch)
logger.info("Done.")
if __name__ == "__main__":
main()
main()