diff --git a/app/core/derive_edges.py b/app/core/derive_edges.py index 31204c9..392d05a 100644 --- a/app/core/derive_edges.py +++ b/app/core/derive_edges.py @@ -1,394 +1,10 @@ """ FILE: app/core/derive_edges.py -DESCRIPTION: Extrahiert Graph-Kanten aus Text. Unterstützt Wikilinks, Inline-Relations ([[rel:type|target]]) und Obsidian Callouts. - WP-15b: Integration des Candidate-Pools und Provenance-Priorisierung. - Sichert die Graph-Integrität durch confidence-basiertes De-Duplicating. -VERSION: 2.1.0 -STATUS: Active -DEPENDENCIES: re, os, yaml, typing, hashlib -EXTERNAL_CONFIG: config/types.yaml -LAST_ANALYSIS: 2025-12-26 +DESCRIPTION: Facade für das neue graph Package. + WP-14: Modularisierung abgeschlossen. +VERSION: 2.2.0 """ +from .graph.graph_derive_edges import build_edges_for_note +from .graph.graph_utils import PROVENANCE_PRIORITY -from __future__ import annotations - -import os -import re -import hashlib -from typing import Iterable, List, Optional, Tuple, Set, Dict - -try: - import yaml # optional, nur für types.yaml -except Exception: # pragma: no cover - yaml = None - -# --------------------------------------------------------------------------- # -# 1. Utilities & ID Generation -# --------------------------------------------------------------------------- # - -def _get(d: dict, *keys, default=None): - """Sicherer Zugriff auf verschachtelte Dictionary-Keys.""" - for k in keys: - if isinstance(d, dict) and k in d and d[k] is not None: - return d[k] - return default - -def _chunk_text_for_refs(chunk: dict) -> str: - """Extrahiert den relevanten Text für die Referenzsuche (bevorzugt Window).""" - return ( - _get(chunk, "window") - or _get(chunk, "text") - or _get(chunk, "content") - or _get(chunk, "raw") - or "" - ) - -def _dedupe_seq(seq: Iterable[str]) -> List[str]: - """Dedupliziert eine Sequenz von Strings unter Beibehaltung der Reihenfolge.""" - seen: Set[str] = set() - out: List[str] = [] - for s in seq: - if s not in seen: - seen.add(s) - out.append(s) - return out - -def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: - """Konstruiert ein valides Kanten-Payload-Objekt für Qdrant.""" - pl = { - "kind": kind, - "relation": kind, # Alias für Abwärtskompatibilität (v2) - "scope": scope, # "chunk" | "note" - "source_id": source_id, - "target_id": target_id, - "note_id": note_id, # Träger-Note der Kante - } - if extra: - pl.update(extra) - return pl - -def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: - """Erzeugt eine deterministische 12-Byte ID mittels BLAKE2s.""" - base = f"{kind}:{s}->{t}#{scope}" - if rule_id: - base += f"|{rule_id}" - try: - return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() - except Exception: # pragma: no cover - return base - -# --------------------------------------------------------------------------- # -# 2. Konfiguration & Provenance-Skala -# --------------------------------------------------------------------------- # - -# WP-15b: Prioritäten-Ranking für die De-Duplizierung -PROVENANCE_PRIORITY = { - "explicit:wikilink": 1.00, - "inline:rel": 0.95, - "callout:edge": 0.90, - "semantic_ai": 0.90, # Validierte KI-Kanten - "structure:belongs_to": 1.00, - "structure:order": 0.95, # next/prev - "explicit:note_scope": 1.00, - "derived:backlink": 0.90, - "edge_defaults": 0.70 # Heuristik (types.yaml) -} - -def _env(n: str, default: Optional[str] = None) -> str: - v = os.getenv(n) - return v if v is not None else (default or "") - -def _load_types_registry() -> dict: - """Lädt die YAML-Registry zur Ermittlung von Standard-Kanten.""" - p = _env("MINDNET_TYPES_FILE", "./config/types.yaml") - if not os.path.isfile(p) or yaml is None: - return {} - try: - with open(p, "r", encoding="utf-8") as f: - data = yaml.safe_load(f) or {} - return data - except Exception: - return {} - -def _get_types_map(reg: dict) -> dict: - if isinstance(reg, dict) and isinstance(reg.get("types"), dict): - return reg["types"] - return reg if isinstance(reg, dict) else {} - -def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: - """Liefert die edge_defaults-Liste für den gegebenen Notiztyp.""" - types_map = _get_types_map(reg) - if note_type and isinstance(types_map, dict): - t = types_map.get(note_type) - if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list): - return [str(x) for x in t["edge_defaults"] if isinstance(x, str)] - for key in ("defaults", "default", "global"): - v = reg.get(key) - if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list): - return [str(x) for x in v["edge_defaults"] if isinstance(x, str)] - return [] - -# --------------------------------------------------------------------------- # -# 3. Parser für Links / Relationen (Core Logik v2.0.0) -# --------------------------------------------------------------------------- # - -# Normale Wikilinks (Fallback) -_WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]") - -# Getypte Inline-Relationen -_REL_PIPE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s*\|\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) -_REL_SPACE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s+(?P[^\]]+?)\s*\]\]", re.IGNORECASE) -_REL_TEXT = re.compile(r"rel\s*:\s*(?P[a-z_]+)\s*\[\[\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) - -def _extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: - """Extrahiert [[rel:KIND|Target]] und entfernt sie zur Vermeidung von Dubletten.""" - pairs: List[Tuple[str,str]] = [] - def _collect(m): - k = (m.group("kind") or "").strip().lower() - t = (m.group("target") or "").strip() - if k and t: - pairs.append((k, t)) - return "" # Link entfernen - - text = _REL_PIPE.sub(_collect, text) - text = _REL_SPACE.sub(_collect, text) - text = _REL_TEXT.sub(_collect, text) - return pairs, text - -# Obsidian Callout Parser für mehrzeilige Blöcke -_CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE) -_REL_LINE = re.compile(r"^(?P[a-z_]+)\s*:\s*(?P.+?)\s*$", re.IGNORECASE) -_WIKILINKS_IN_LINE = re.compile(r"\[\[([^\]]+)\]\]") - -def _extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: - """Verarbeitet [!edge]-Callouts und entfernt diese aus dem Textfluss.""" - if not text: - return [], text - - lines = text.splitlines() - out_pairs: List[Tuple[str,str]] = [] - keep_lines: List[str] = [] - i = 0 - - while i < len(lines): - m = _CALLOUT_START.match(lines[i]) - if not m: - keep_lines.append(lines[i]) - i += 1 - continue - - block_lines: List[str] = [] - first_rest = m.group(1) or "" - if first_rest.strip(): - block_lines.append(first_rest) - - i += 1 - while i < len(lines) and lines[i].lstrip().startswith('>'): - block_lines.append(lines[i].lstrip()[1:].lstrip()) - i += 1 - - for bl in block_lines: - mrel = _REL_LINE.match(bl) - if not mrel: - continue - kind = (mrel.group("kind") or "").strip().lower() - targets = mrel.group("targets") or "" - found = _WIKILINKS_IN_LINE.findall(targets) - if found: - for t in found: - t = t.strip() - if t: - out_pairs.append((kind, t)) - else: - for raw in re.split(r"[,;]", targets): - t = raw.strip() - if t: - out_pairs.append((kind, t)) - continue - - remainder = "\n".join(keep_lines) - return out_pairs, remainder - -def _extract_wikilinks(text: str) -> List[str]: - """Extrahiert Standard-Wikilinks aus dem verbleibenden Text.""" - ids: List[str] = [] - for m in _WIKILINK_RE.finditer(text or ""): - ids.append(m.group(1).strip()) - return ids - -# --------------------------------------------------------------------------- # -# 4. Hauptfunktion (build_edges_for_note) -# --------------------------------------------------------------------------- # - -def build_edges_for_note( - note_id: str, - chunks: List[dict], - note_level_references: Optional[List[str]] = None, - include_note_scope_refs: bool = False, -) -> List[dict]: - """ - Erzeugt und aggregiert alle Kanten für eine Note inklusive WP-15b Candidate-Processing. - Setzt Provenance-Ranking zur Graph-Stabilisierung ein. - """ - edges: List[dict] = [] - note_type = _get(chunks[0], "type") if chunks else "concept" - - # 1) Struktur-Kanten: belongs_to (Chunk -> Note) - for ch in chunks: - cid = _get(ch, "chunk_id", "id") - if not cid: - continue - edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to"), - "provenance": "structure", - "rule_id": "structure:belongs_to", - "confidence": PROVENANCE_PRIORITY["structure:belongs_to"], - })) - - # 2) Struktur-Kanten: next / prev (Sequenz) - for i in range(len(chunks) - 1): - a, b = chunks[i], chunks[i + 1] - a_id = _get(a, "chunk_id", "id") - b_id = _get(b, "chunk_id", "id") - if not a_id or not b_id: - continue - edges.append(_edge("next", "chunk", a_id, b_id, note_id, { - "chunk_id": a_id, - "edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order"), - "provenance": "structure", - "rule_id": "structure:order", - "confidence": PROVENANCE_PRIORITY["structure:order"], - })) - edges.append(_edge("prev", "chunk", b_id, a_id, note_id, { - "chunk_id": b_id, - "edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order"), - "provenance": "structure", - "rule_id": "structure:order", - "confidence": PROVENANCE_PRIORITY["structure:order"], - })) - - # 3) Inhaltliche Kanten (Refs, Inlines, Callouts, Candidates) - reg = _load_types_registry() - defaults = _edge_defaults_for(note_type, reg) - refs_all: List[str] = [] - - for ch in chunks: - cid = _get(ch, "chunk_id", "id") - if not cid: - continue - raw = _chunk_text_for_refs(ch) - - # 3a) Typed Inline Relations - typed, remainder = _extract_typed_relations(raw) - for kind, target in typed: - k = kind.strip().lower() - if not k or not target: continue - edges.append(_edge(k, "chunk", cid, target, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(k, cid, target, "chunk", "inline:rel"), - "provenance": "explicit", - "rule_id": "inline:rel", - "confidence": PROVENANCE_PRIORITY["inline:rel"], - })) - - # 3b) WP-15b Candidate Pool Integration (KI-validierte Kanten) - # Verarbeitet Kanten, die bereits in der Ingestion semantisch geprüft wurden. - pool = ch.get("candidate_pool") or ch.get("candidate_edges") or [] - for cand in pool: - target = cand.get("to") - kind = cand.get("kind", "related_to") - prov = cand.get("provenance", "semantic_ai") - if not target: continue - edges.append(_edge(kind, "chunk", cid, target, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(kind, cid, target, "chunk", f"candidate:{prov}"), - "provenance": prov, - "rule_id": f"candidate:{prov}", - "confidence": PROVENANCE_PRIORITY.get(prov, 0.90), - })) - - # 3c) Obsidian Callouts - call_pairs, remainder2 = _extract_callout_relations(remainder) - for kind, target in call_pairs: - k = (kind or "").strip().lower() - if not k or not target: continue - edges.append(_edge(k, "chunk", cid, target, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(k, cid, target, "chunk", "callout:edge"), - "provenance": "explicit", - "rule_id": "callout:edge", - "confidence": PROVENANCE_PRIORITY["callout:edge"], - })) - - # 3d) Standard-Wikilinks -> references (+ defaults) - refs = _extract_wikilinks(remainder2) - for r in refs: - edges.append(_edge("references", "chunk", cid, r, note_id, { - "chunk_id": cid, - "ref_text": r, - "edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink"), - "provenance": "explicit", - "rule_id": "explicit:wikilink", - "confidence": PROVENANCE_PRIORITY["explicit:wikilink"], - })) - # Regelbasierte Kanten aus types.yaml anhängen - for rel in defaults: - if rel == "references": continue - edges.append(_edge(rel, "chunk", cid, r, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{note_type}:{rel}"), - "provenance": "rule", - "rule_id": f"edge_defaults:{note_type}:{rel}", - "confidence": PROVENANCE_PRIORITY["edge_defaults"], - })) - - refs_all.extend(refs) - - # 4) Optionale Note-Scope Referenzen & Backlinks - if include_note_scope_refs: - refs_note = list(refs_all or []) - if note_level_references: - refs_note.extend([r for r in note_level_references if isinstance(r, str) and r]) - refs_note = _dedupe_seq(refs_note) - - for r in refs_note: - edges.append(_edge("references", "note", note_id, r, note_id, { - "edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope"), - "provenance": "explicit", - "rule_id": "explicit:note_scope", - "confidence": PROVENANCE_PRIORITY["explicit:note_scope"], - })) - # Backlink-Erzeugung zur Graphen-Stärkung - edges.append(_edge("backlink", "note", r, note_id, note_id, { - "edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink"), - "provenance": "rule", - "rule_id": "derived:backlink", - "confidence": PROVENANCE_PRIORITY["derived:backlink"], - })) - for rel in defaults: - if rel == "references": continue - edges.append(_edge(rel, "note", note_id, r, note_id, { - "edge_id": _mk_edge_id(rel, note_id, r, "note", f"edge_defaults:{note_type}:{rel}"), - "provenance": "rule", - "rule_id": f"edge_defaults:{note_type}:{rel}", - "confidence": PROVENANCE_PRIORITY["edge_defaults"], - })) - - # 5) WP-15b: Confidence-basierte De-Duplizierung - # Wenn dieselbe Relation mehrfach existiert, gewinnt die mit der höchsten Confidence. - unique_map: Dict[Tuple[str, str, str], dict] = {} - - for e in edges: - s, t = str(e.get("source_id")), str(e.get("target_id")) - rel = str(e.get("relation") or e.get("kind") or "edge") - key = (s, t, rel) - - if key not in unique_map: - unique_map[key] = e - else: - # Vergleich der Vertrauenswürdigkeit (Provenance Ranking) - if e.get("confidence", 0) > unique_map[key].get("confidence", 0): - unique_map[key] = e - - return list(unique_map.values()) \ No newline at end of file +__all__ = ["build_edges_for_note", "PROVENANCE_PRIORITY"] \ No newline at end of file diff --git a/app/core/graph/__init__.py b/app/core/graph/__init__.py new file mode 100644 index 0000000..e7b7ceb --- /dev/null +++ b/app/core/graph/__init__.py @@ -0,0 +1,16 @@ +""" +FILE: app/core/graph/__init__.py +DESCRIPTION: Unified Graph Package. Exportiert Kanten-Ableitung und Graph-Adapter. +""" +from .graph_derive_edges import build_edges_for_note +from .graph_utils import PROVENANCE_PRIORITY +from .graph_subgraph import Subgraph, expand +from .graph_weights import EDGE_BASE_WEIGHTS + +__all__ = [ + "build_edges_for_note", + "PROVENANCE_PRIORITY", + "Subgraph", + "expand", + "EDGE_BASE_WEIGHTS" +] \ No newline at end of file diff --git a/app/core/graph/graph_db_adapter.py b/app/core/graph/graph_db_adapter.py new file mode 100644 index 0000000..e3fff2f --- /dev/null +++ b/app/core/graph/graph_db_adapter.py @@ -0,0 +1,56 @@ +""" +FILE: app/core/graph/graph_db_adapter.py +DESCRIPTION: Datenbeschaffung aus Qdrant für den Graphen. +""" +from typing import List, Dict, Optional +from qdrant_client import QdrantClient +from qdrant_client.http import models as rest +from app.core.qdrant import collection_names + +def fetch_edges_from_qdrant( + client: QdrantClient, + prefix: str, + seeds: List[str], + edge_types: Optional[List[str]] = None, + limit: int = 2048, +) -> List[Dict]: + """ + Holt Edges aus der Datenbank basierend auf Seed-IDs. + Filtert auf source_id, target_id oder note_id. + """ + if not seeds or limit <= 0: + return [] + + _, _, edges_col = collection_names(prefix) + + seed_conditions = [] + for field in ("source_id", "target_id", "note_id"): + for s in seeds: + seed_conditions.append( + rest.FieldCondition(key=field, match=rest.MatchValue(value=str(s))) + ) + seeds_filter = rest.Filter(should=seed_conditions) if seed_conditions else None + + type_filter = None + if edge_types: + type_conds = [ + rest.FieldCondition(key="kind", match=rest.MatchValue(value=str(k))) + for k in edge_types + ] + type_filter = rest.Filter(should=type_conds) + + must = [] + if seeds_filter: must.append(seeds_filter) + if type_filter: must.append(type_filter) + + flt = rest.Filter(must=must) if must else None + + pts, _ = client.scroll( + collection_name=edges_col, + scroll_filter=flt, + limit=limit, + with_payload=True, + with_vectors=False, + ) + + return [dict(p.payload) for p in pts if p.payload] \ No newline at end of file diff --git a/app/core/graph/graph_derive_edges.py b/app/core/graph/graph_derive_edges.py new file mode 100644 index 0000000..284e789 --- /dev/null +++ b/app/core/graph/graph_derive_edges.py @@ -0,0 +1,112 @@ +""" +FILE: app/core/graph/graph_derive_edges.py +DESCRIPTION: Hauptlogik zur Kanten-Aggregation und De-Duplizierung. +""" +from typing import List, Optional, Dict, Tuple +from .graph_utils import ( + _get, _edge, _mk_edge_id, _dedupe_seq, + PROVENANCE_PRIORITY, load_types_registry, get_edge_defaults_for +) +from .graph_extractors import ( + extract_typed_relations, extract_callout_relations, extract_wikilinks +) + +def build_edges_for_note( + note_id: str, + chunks: List[dict], + note_level_references: Optional[List[str]] = None, + include_note_scope_refs: bool = False, +) -> List[dict]: + """Erzeugt und aggregiert alle Kanten für eine Note (WP-15b).""" + edges: List[dict] = [] + note_type = _get(chunks[0], "type") if chunks else "concept" + + # 1) Struktur-Kanten (belongs_to, next/prev) + for idx, ch in enumerate(chunks): + cid = _get(ch, "chunk_id", "id") + if not cid: continue + edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, { + "chunk_id": cid, "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to"), + "provenance": "structure", "rule_id": "structure:belongs_to", "confidence": PROVENANCE_PRIORITY["structure:belongs_to"] + })) + if idx < len(chunks) - 1: + next_id = _get(chunks[idx+1], "chunk_id", "id") + if next_id: + edges.append(_edge("next", "chunk", cid, next_id, note_id, { + "chunk_id": cid, "edge_id": _mk_edge_id("next", cid, next_id, "chunk", "structure:order"), + "provenance": "structure", "rule_id": "structure:order", "confidence": PROVENANCE_PRIORITY["structure:order"] + })) + edges.append(_edge("prev", "chunk", next_id, cid, note_id, { + "chunk_id": next_id, "edge_id": _mk_edge_id("prev", next_id, cid, "chunk", "structure:order"), + "provenance": "structure", "rule_id": "structure:order", "confidence": PROVENANCE_PRIORITY["structure:order"] + })) + + # 2) Inhaltliche Kanten + reg = load_types_registry() + defaults = get_edge_defaults_for(note_type, reg) + refs_all: List[str] = [] + + for ch in chunks: + cid = _get(ch, "chunk_id", "id") + if not cid: continue + raw = _get(ch, "window") or _get(ch, "text") or "" + + # Typed & Candidate Pool (WP-15b Integration) + typed, rem = extract_typed_relations(raw) + for k, t in typed: + edges.append(_edge(k, "chunk", cid, t, note_id, { + "chunk_id": cid, "edge_id": _mk_edge_id(k, cid, t, "chunk", "inline:rel"), + "provenance": "explicit", "rule_id": "inline:rel", "confidence": PROVENANCE_PRIORITY["inline:rel"] + })) + + pool = ch.get("candidate_pool") or ch.get("candidate_edges") or [] + for cand in pool: + t, k, p = cand.get("to"), cand.get("kind", "related_to"), cand.get("provenance", "semantic_ai") + if t: + edges.append(_edge(k, "chunk", cid, t, note_id, { + "chunk_id": cid, "edge_id": _mk_edge_id(k, cid, t, "chunk", f"candidate:{p}"), + "provenance": p, "rule_id": f"candidate:{p}", "confidence": PROVENANCE_PRIORITY.get(p, 0.90) + })) + + # Callouts & Wikilinks + call_pairs, rem2 = extract_callout_relations(rem) + for k, t in call_pairs: + edges.append(_edge(k, "chunk", cid, t, note_id, { + "chunk_id": cid, "edge_id": _mk_edge_id(k, cid, t, "chunk", "callout:edge"), + "provenance": "explicit", "rule_id": "callout:edge", "confidence": PROVENANCE_PRIORITY["callout:edge"] + })) + + refs = extract_wikilinks(rem2) + for r in refs: + edges.append(_edge("references", "chunk", cid, r, note_id, { + "chunk_id": cid, "ref_text": r, "edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink"), + "provenance": "explicit", "rule_id": "explicit:wikilink", "confidence": PROVENANCE_PRIORITY["explicit:wikilink"] + })) + for rel in defaults: + if rel != "references": + edges.append(_edge(rel, "chunk", cid, r, note_id, { + "chunk_id": cid, "edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{rel}"), + "provenance": "rule", "rule_id": f"edge_defaults:{rel}", "confidence": PROVENANCE_PRIORITY["edge_defaults"] + })) + refs_all.extend(refs) + + # 3) Note-Scope & De-Duplizierung + if include_note_scope_refs: + refs_note = _dedupe_seq((refs_all or []) + (note_level_references or [])) + for r in refs_note: + edges.append(_edge("references", "note", note_id, r, note_id, { + "edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope"), + "provenance": "explicit", "confidence": PROVENANCE_PRIORITY["explicit:note_scope"] + })) + edges.append(_edge("backlink", "note", r, note_id, note_id, { + "edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink"), + "provenance": "rule", "confidence": PROVENANCE_PRIORITY["derived:backlink"] + })) + + unique_map: Dict[Tuple[str, str, str], dict] = {} + for e in edges: + key = (str(e.get("source_id")), str(e.get("target_id")), str(e.get("kind"))) + if key not in unique_map or e.get("confidence", 0) > unique_map[key].get("confidence", 0): + unique_map[key] = e + + return list(unique_map.values()) \ No newline at end of file diff --git a/app/core/graph/graph_extractors.py b/app/core/graph/graph_extractors.py new file mode 100644 index 0000000..9c1fedf --- /dev/null +++ b/app/core/graph/graph_extractors.py @@ -0,0 +1,55 @@ +""" +FILE: app/core/graph/graph_extractors.py +DESCRIPTION: Regex-basierte Extraktion von Relationen aus Text. +""" +import re +from typing import List, Tuple + +_WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]") +_REL_PIPE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s*\|\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) +_REL_SPACE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s+(?P[^\]]+?)\s*\]\]", re.IGNORECASE) +_REL_TEXT = re.compile(r"rel\s*:\s*(?P[a-z_]+)\s*\[\[\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) + +_CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE) +_REL_LINE = re.compile(r"^(?P[a-z_]+)\s*:\s*(?P.+?)\s*$", re.IGNORECASE) +_WIKILINKS_IN_LINE = re.compile(r"\[\[([^\]]+)\]\]") + +def extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: + """Extrahiert [[rel:KIND|Target]].""" + pairs = [] + def _collect(m): + k, t = (m.group("kind") or "").strip().lower(), (m.group("target") or "").strip() + if k and t: pairs.append((k, t)) + return "" + text = _REL_PIPE.sub(_collect, text) + text = _REL_SPACE.sub(_collect, text) + text = _REL_TEXT.sub(_collect, text) + return pairs, text + +def extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: + """Verarbeitet Obsidian [!edge]-Callouts.""" + if not text: return [], text + lines = text.splitlines(); out_pairs, keep_lines, i = [], [], 0 + while i < len(lines): + m = _CALLOUT_START.match(lines[i]) + if not m: + keep_lines.append(lines[i]); i += 1; continue + block_lines = [m.group(1)] if m.group(1).strip() else [] + i += 1 + while i < len(lines) and lines[i].lstrip().startswith('>'): + block_lines.append(lines[i].lstrip()[1:].lstrip()); i += 1 + for bl in block_lines: + mrel = _REL_LINE.match(bl) + if not mrel: continue + kind, targets = mrel.group("kind").strip().lower(), mrel.group("targets") or "" + found = _WIKILINKS_IN_LINE.findall(targets) + if found: + for t in found: out_pairs.append((kind, t.strip())) + else: + for raw in re.split(r"[,;]", targets): + if raw.strip(): out_pairs.append((kind, raw.strip())) + return out_pairs, "\n".join(keep_lines) + +def extract_wikilinks(text: str) -> List[str]: + """Extrahiert Standard-Wikilinks.""" + return [m.group(1).strip() for m in _WIKILINK_RE.finditer(text or "")] \ No newline at end of file diff --git a/app/core/graph/graph_subgraph.py b/app/core/graph/graph_subgraph.py new file mode 100644 index 0000000..593b09e --- /dev/null +++ b/app/core/graph/graph_subgraph.py @@ -0,0 +1,106 @@ +""" +FILE: app/core/graph/graph_subgraph.py +DESCRIPTION: In-Memory Repräsentation eines Graphen für Scoring und Analyse. +""" +import math +from collections import defaultdict +from typing import Dict, List, Optional, DefaultDict, Any, Set +from qdrant_client import QdrantClient +from .graph_weights import EDGE_BASE_WEIGHTS, calculate_edge_weight +from .graph_db_adapter import fetch_edges_from_qdrant + +class Subgraph: + """Leichtgewichtiger Subgraph mit Adjazenzlisten & Kennzahlen.""" + + def __init__(self) -> None: + self.adj: DefaultDict[str, List[Dict]] = defaultdict(list) + self.reverse_adj: DefaultDict[str, List[Dict]] = defaultdict(list) + self.in_degree: DefaultDict[str, int] = defaultdict(int) + self.out_degree: DefaultDict[str, int] = defaultdict(int) + + def add_edge(self, e: Dict) -> None: + """Fügt eine Kante hinzu und aktualisiert Indizes.""" + src = e.get("source") + tgt = e.get("target") + kind = e.get("kind") + weight = e.get("weight", EDGE_BASE_WEIGHTS.get(kind, 0.0)) + owner = e.get("note_id") + + if not src or not tgt: + return + + # 1. Forward + self.adj[src].append({"target": tgt, "kind": kind, "weight": weight}) + self.out_degree[src] += 1 + self.in_degree[tgt] += 1 + + # 2. Reverse (WP-04b Explanation) + self.reverse_adj[tgt].append({"source": src, "kind": kind, "weight": weight}) + + # 3. Kontext-Note Handling + if owner and owner != src: + self.adj[owner].append({"target": tgt, "kind": kind, "weight": weight}) + self.out_degree[owner] += 1 + if owner != tgt: + self.reverse_adj[tgt].append({"source": owner, "kind": kind, "weight": weight, "via_context": True}) + self.in_degree[owner] += 1 + + def aggregate_edge_bonus(self, node_id: str) -> float: + """Summe der ausgehenden Kantengewichte (Hub-Score).""" + return sum(edge["weight"] for edge in self.adj.get(node_id, [])) + + def edge_bonus(self, node_id: str) -> float: + """API für Retriever (WP-04a Kompatibilität).""" + return self.aggregate_edge_bonus(node_id) + + def centrality_bonus(self, node_id: str) -> float: + """Log-gedämpfte Zentralität (In-Degree).""" + indeg = self.in_degree.get(node_id, 0) + if indeg <= 0: + return 0.0 + return min(math.log1p(indeg) / 10.0, 0.15) + + def get_outgoing_edges(self, node_id: str) -> List[Dict[str, Any]]: + return self.adj.get(node_id, []) + + def get_incoming_edges(self, node_id: str) -> List[Dict[str, Any]]: + return self.reverse_adj.get(node_id, []) + + +def expand( + client: QdrantClient, + prefix: str, + seeds: List[str], + depth: int = 1, + edge_types: Optional[List[str]] = None, +) -> Subgraph: + """Expandiert ab Seeds entlang von Edges bis zu einer bestimmten Tiefe.""" + sg = Subgraph() + frontier = set(seeds) + visited = set() + + for _ in range(max(depth, 0)): + if not frontier: + break + + payloads = fetch_edges_from_qdrant(client, prefix, list(frontier), edge_types) + next_frontier: Set[str] = set() + + for pl in payloads: + src, tgt = pl.get("source_id"), pl.get("target_id") + if not src or not tgt: continue + + sg.add_edge({ + "source": src, "target": tgt, + "kind": pl.get("kind", "edge"), + "weight": calculate_edge_weight(pl), + "note_id": pl.get("note_id"), + }) + + if tgt not in visited: + next_frontier.add(str(tgt)) + + visited |= frontier + frontier = next_frontier - visited + + return sg \ No newline at end of file diff --git a/app/core/graph/graph_utils.py b/app/core/graph/graph_utils.py new file mode 100644 index 0000000..5f295ed --- /dev/null +++ b/app/core/graph/graph_utils.py @@ -0,0 +1,81 @@ +""" +FILE: app/core/graph/graph_utils.py +DESCRIPTION: Basale Werkzeuge, ID-Generierung und Provenance-Konfiguration für den Graphen. +""" +import os +import hashlib +from typing import Iterable, List, Optional, Set, Any + +try: + import yaml +except ImportError: + yaml = None + +# WP-15b: Prioritäten-Ranking für die De-Duplizierung +PROVENANCE_PRIORITY = { + "explicit:wikilink": 1.00, + "inline:rel": 0.95, + "callout:edge": 0.90, + "semantic_ai": 0.90, # Validierte KI-Kanten + "structure:belongs_to": 1.00, + "structure:order": 0.95, # next/prev + "explicit:note_scope": 1.00, + "derived:backlink": 0.90, + "edge_defaults": 0.70 # Heuristik (types.yaml) +} + +def _get(d: dict, *keys, default=None): + """Sicherer Zugriff auf verschachtelte Keys.""" + for k in keys: + if isinstance(d, dict) and k in d and d[k] is not None: + return d[k] + return default + +def _dedupe_seq(seq: Iterable[str]) -> List[str]: + """Dedupliziert Strings unter Beibehaltung der Reihenfolge.""" + seen: Set[str] = set() + out: List[str] = [] + for s in seq: + if s not in seen: + seen.add(s); out.append(s) + return out + +def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: + """Erzeugt eine deterministische 12-Byte ID mittels BLAKE2s.""" + base = f"{kind}:{s}->{t}#{scope}" + if rule_id: base += f"|{rule_id}" + return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() + +def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: + """Konstruiert ein Kanten-Payload für Qdrant.""" + pl = { + "kind": kind, + "relation": kind, + "scope": scope, + "source_id": source_id, + "target_id": target_id, + "note_id": note_id, + } + if extra: pl.update(extra) + return pl + +def load_types_registry() -> dict: + """Lädt die YAML-Registry.""" + p = os.getenv("MINDNET_TYPES_FILE", "./config/types.yaml") + if not os.path.isfile(p) or yaml is None: return {} + try: + with open(p, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} + except Exception: return {} + +def get_edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: + """Ermittelt Standard-Kanten für einen Typ.""" + types_map = reg.get("types", reg) if isinstance(reg, dict) else {} + if note_type and isinstance(types_map, dict): + t = types_map.get(note_type) + if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list): + return [str(x) for x in t["edge_defaults"] if isinstance(x, str)] + for key in ("defaults", "default", "global"): + v = reg.get(key) + if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list): + return [str(x) for x in v["edge_defaults"] if isinstance(x, str)] + return [] \ No newline at end of file diff --git a/app/core/graph/graph_weights.py b/app/core/graph/graph_weights.py new file mode 100644 index 0000000..5fc2f68 --- /dev/null +++ b/app/core/graph/graph_weights.py @@ -0,0 +1,39 @@ +""" +FILE: app/core/graph/graph_weights.py +DESCRIPTION: Definition der Basisgewichte und Berechnung der Kanteneffektivität. +""" +from typing import Dict + +# Basisgewichte je Edge-Typ (WP-04a Config) +EDGE_BASE_WEIGHTS: Dict[str, float] = { + # Struktur + "belongs_to": 0.10, + "next": 0.06, + "prev": 0.06, + "backlink": 0.04, + "references_at": 0.08, + + # Wissen + "references": 0.20, + "depends_on": 0.18, + "related_to": 0.15, + "similar_to": 0.12, +} + +def calculate_edge_weight(pl: Dict) -> float: + """Berechnet das effektive Edge-Gewicht aus kind + confidence.""" + kind = pl.get("kind", "edge") + base = EDGE_BASE_WEIGHTS.get(kind, 0.0) + + conf_raw = pl.get("confidence", None) + try: + conf = float(conf_raw) if conf_raw is not None else None + except Exception: + conf = None + + if conf is None: + return base + + # Clamp confidence 0.0 - 1.0 + conf = max(0.0, min(1.0, conf)) + return base * conf \ No newline at end of file diff --git a/app/core/graph_adapter.py b/app/core/graph_adapter.py index e4b2cb7..ee36f9e 100644 --- a/app/core/graph_adapter.py +++ b/app/core/graph_adapter.py @@ -1,249 +1,10 @@ """ FILE: app/core/graph_adapter.py -DESCRIPTION: Lädt Kanten aus Qdrant und baut einen In-Memory Subgraphen für Scoring (Centrality) und Explanation. -VERSION: 0.4.0 -STATUS: Active -DEPENDENCIES: qdrant_client, app.core.qdrant -LAST_ANALYSIS: 2025-12-15 +DESCRIPTION: Facade für das neue graph Package (Adapter-Teil). + WP-14: Modularisierung abgeschlossen. +VERSION: 0.5.0 """ +from .graph.graph_subgraph import Subgraph, expand +from .graph.graph_weights import EDGE_BASE_WEIGHTS -from __future__ import annotations - -from typing import Dict, List, Optional, DefaultDict, Any -from collections import defaultdict - -from qdrant_client import QdrantClient -from qdrant_client.http import models as rest - -from app.core.qdrant import collection_names - -# Legacy-Import Fallback -try: # pragma: no cover - from app.core.qdrant_points import get_edges_for_sources # type: ignore -except Exception: # pragma: no cover - get_edges_for_sources = None # type: ignore - - -# Basisgewichte je Edge-Typ (WP-04a Config) -EDGE_BASE_WEIGHTS: Dict[str, float] = { - # Struktur - "belongs_to": 0.10, - "next": 0.06, - "prev": 0.06, - "backlink": 0.04, - "references_at": 0.08, - - # Wissen - "references": 0.20, - "depends_on": 0.18, - "related_to": 0.15, - "similar_to": 0.12, -} - - -def _edge_weight(pl: Dict) -> float: - """Berechnet das effektive Edge-Gewicht aus kind + confidence.""" - kind = pl.get("kind", "edge") - base = EDGE_BASE_WEIGHTS.get(kind, 0.0) - - conf_raw = pl.get("confidence", None) - try: - conf = float(conf_raw) if conf_raw is not None else None - except Exception: - conf = None - - if conf is None: - return base - - if conf < 0.0: conf = 0.0 - if conf > 1.0: conf = 1.0 - - return base * conf - - -def _fetch_edges( - client: QdrantClient, - prefix: str, - seeds: List[str], - edge_types: Optional[List[str]] = None, - limit: int = 2048, -) -> List[Dict]: - """ - Holt Edges direkt aus der *_edges Collection. - Filter: source_id IN seeds OR target_id IN seeds OR note_id IN seeds - """ - if not seeds or limit <= 0: - return [] - - _, _, edges_col = collection_names(prefix) - - seed_conditions = [] - for field in ("source_id", "target_id", "note_id"): - for s in seeds: - seed_conditions.append( - rest.FieldCondition(key=field, match=rest.MatchValue(value=str(s))) - ) - seeds_filter = rest.Filter(should=seed_conditions) if seed_conditions else None - - type_filter = None - if edge_types: - type_conds = [ - rest.FieldCondition(key="kind", match=rest.MatchValue(value=str(k))) - for k in edge_types - ] - type_filter = rest.Filter(should=type_conds) - - must = [] - if seeds_filter: must.append(seeds_filter) - if type_filter: must.append(type_filter) - - flt = rest.Filter(must=must) if must else None - - pts, _ = client.scroll( - collection_name=edges_col, - scroll_filter=flt, - limit=limit, - with_payload=True, - with_vectors=False, - ) - - out: List[Dict] = [] - for p in pts or []: - pl = dict(p.payload or {}) - if pl: - out.append(pl) - return out - - -class Subgraph: - """Leichtgewichtiger Subgraph mit Adjazenzlisten & Kennzahlen.""" - - def __init__(self) -> None: - # Forward: source -> [targets] - self.adj: DefaultDict[str, List[Dict]] = defaultdict(list) - # Reverse: target -> [sources] (Neu für WP-04b Explanation) - self.reverse_adj: DefaultDict[str, List[Dict]] = defaultdict(list) - - self.in_degree: DefaultDict[str, int] = defaultdict(int) - self.out_degree: DefaultDict[str, int] = defaultdict(int) - - def add_edge(self, e: Dict) -> None: - """ - Fügt eine Kante hinzu und aktualisiert Forward/Reverse Indizes. - e muss enthalten: source, target, kind, weight. - """ - src = e.get("source") - tgt = e.get("target") - kind = e.get("kind") - weight = e.get("weight", EDGE_BASE_WEIGHTS.get(kind, 0.0)) - owner = e.get("note_id") - - if not src or not tgt: - return - - # 1. Primäre Adjazenz (Forward) - edge_data = {"target": tgt, "kind": kind, "weight": weight} - self.adj[src].append(edge_data) - self.out_degree[src] += 1 - self.in_degree[tgt] += 1 - - # 2. Reverse Adjazenz (Neu für Explanation) - # Wir speichern, woher die Kante kam. - rev_data = {"source": src, "kind": kind, "weight": weight} - self.reverse_adj[tgt].append(rev_data) - - # 3. Kontext-Note Handling (Forward & Reverse) - # Wenn eine Kante "im Kontext einer Note" (owner) definiert ist, - # schreiben wir sie der Note gut, damit der Retriever Scores auf Note-Ebene findet. - if owner and owner != src: - # Forward: Owner -> Target - self.adj[owner].append(edge_data) - self.out_degree[owner] += 1 - - # Reverse: Target wird vom Owner referenziert (indirekt) - if owner != tgt: - rev_owner_data = {"source": owner, "kind": kind, "weight": weight, "via_context": True} - self.reverse_adj[tgt].append(rev_owner_data) - self.in_degree[owner] += 1 # Leichter Centrality Boost für den Owner - - def aggregate_edge_bonus(self, node_id: str) -> float: - """Summe der ausgehenden Kantengewichte (Hub-Score).""" - return sum(edge["weight"] for edge in self.adj.get(node_id, [])) - - def edge_bonus(self, node_id: str) -> float: - """API für Retriever (WP-04a Kompatibilität).""" - return self.aggregate_edge_bonus(node_id) - - def centrality_bonus(self, node_id: str) -> float: - """Log-gedämpfte Zentralität (In-Degree).""" - import math - indeg = self.in_degree.get(node_id, 0) - if indeg <= 0: - return 0.0 - return min(math.log1p(indeg) / 10.0, 0.15) - - # --- WP-04b Explanation Helpers --- - - def get_outgoing_edges(self, node_id: str) -> List[Dict[str, Any]]: - """Liefert Liste aller Ziele, auf die dieser Knoten zeigt.""" - return self.adj.get(node_id, []) - - def get_incoming_edges(self, node_id: str) -> List[Dict[str, Any]]: - """Liefert Liste aller Quellen, die auf diesen Knoten zeigen.""" - return self.reverse_adj.get(node_id, []) - - -def expand( - client: QdrantClient, - prefix: str, - seeds: List[str], - depth: int = 1, - edge_types: Optional[List[str]] = None, -) -> Subgraph: - """ - Expandiert ab Seeds entlang von Edges (bis `depth`). - """ - sg = Subgraph() - frontier = set(seeds) - visited = set() - - max_depth = max(depth, 0) - - for _ in range(max_depth): - if not frontier: - break - - edges_payloads = _fetch_edges( - client=client, - prefix=prefix, - seeds=list(frontier), - edge_types=edge_types, - limit=2048, - ) - - next_frontier = set() - for pl in edges_payloads: - src = pl.get("source_id") - tgt = pl.get("target_id") - - # Skip invalid edges - if not src or not tgt: - continue - - e = { - "source": src, - "target": tgt, - "kind": pl.get("kind", "edge"), - "weight": _edge_weight(pl), - "note_id": pl.get("note_id"), - } - sg.add_edge(e) - - # Nur weitersuchen, wenn Target noch nicht besucht - if tgt and tgt not in visited: - next_frontier.add(tgt) - - visited |= frontier - frontier = next_frontier - visited - - return sg \ No newline at end of file +__all__ = ["Subgraph", "expand", "EDGE_BASE_WEIGHTS"] \ No newline at end of file