Implement WP-26 v1.0 - Phase 2: Enhance edge scoring and aggregation configuration

- Introduced configurable edge scoring with internal and external boosts for intra-note edges.
- Added aggregation configuration to support note-level and chunk-level retrieval strategies.
- Updated retriever and graph subgraph modules to utilize new scoring and aggregation logic.
- Enhanced YAML configuration to include new parameters for edge scoring and aggregation levels.
- Added boolean indexing for filtering based on edge properties in the setup script.
This commit is contained in:
Lars 2026-01-25 21:06:13 +01:00
parent e86e9f2313
commit c5215e22e7
5 changed files with 384 additions and 19 deletions

View File

@ -5,18 +5,56 @@ DESCRIPTION: In-Memory Repräsentation eines Graphen für Scoring und Analyse.
WP-15c Update: Erhalt von Metadaten (target_section, provenance)
für präzises Retrieval-Reasoning.
WP-24c v4.1.0: Scope-Awareness und Section-Filtering Support.
VERSION: 1.3.0 (WP-24c: Gold-Standard v4.1.0)
WP-26 v1.0: is_internal-Boost für Intra-Note-Edges.
VERSION: 1.4.0 (WP-26: Intra-Note-Edge-Boost)
STATUS: Active
"""
import os
import math
from functools import lru_cache
from collections import defaultdict
from typing import Dict, List, Optional, DefaultDict, Any, Set
from qdrant_client import QdrantClient
try:
import yaml
except ImportError:
yaml = None
# Lokale Paket-Imports
from .graph_weights import EDGE_BASE_WEIGHTS, calculate_edge_weight
from .graph_db_adapter import fetch_edges_from_qdrant
import logging
logger = logging.getLogger(__name__)
@lru_cache
def get_edge_scoring_config() -> Dict[str, float]:
"""
WP-26 v1.0: Lädt Edge-Scoring-Konfiguration aus retriever.yaml.
Returns:
Dict mit internal_edge_boost und external_edge_boost
"""
defaults = {
"internal_edge_boost": 1.2, # +20% Boost für Intra-Note-Edges
"external_edge_boost": 1.0 # Standard für Inter-Note-Edges
}
config_path = os.getenv("MINDNET_RETRIEVER_CONFIG", "config/retriever.yaml")
if yaml and os.path.exists(config_path):
try:
with open(config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
edge_scoring = data.get("edge_scoring", {})
defaults["internal_edge_boost"] = float(edge_scoring.get("internal_edge_boost", defaults["internal_edge_boost"]))
defaults["external_edge_boost"] = float(edge_scoring.get("external_edge_boost", defaults["external_edge_boost"]))
except Exception as e:
logger.warning(f"Edge-Scoring-Konfiguration konnte nicht geladen werden: {e}")
return defaults
class Subgraph:
"""
Leichtgewichtiger Subgraph mit Adjazenzlisten & Kennzahlen.
@ -36,24 +74,39 @@ class Subgraph:
"""
Fügt eine Kante hinzu und aktualisiert Indizes.
WP-15c: Speichert das vollständige Payload für den Explanation Layer.
WP-26 v1.0: Wendet is_internal-Boost auf Intra-Note-Edges an.
"""
src = e.get("source")
tgt = e.get("target")
kind = e.get("kind")
# Basis-Gewicht aus Payload oder Edge-Weights
base_weight = e.get("weight", EDGE_BASE_WEIGHTS.get(kind, 0.0))
# WP-26 v1.0: is_internal-Boost anwenden
is_internal = e.get("is_internal", False)
edge_scoring = get_edge_scoring_config()
if is_internal:
weight_multiplier = edge_scoring["internal_edge_boost"]
else:
weight_multiplier = edge_scoring["external_edge_boost"]
final_weight = base_weight * weight_multiplier
# Das gesamte Payload wird als Kanten-Objekt behalten
# Wir stellen sicher, dass alle relevanten Metadaten vorhanden sind
edge_data = {
"source": src,
"target": tgt,
"kind": kind,
"weight": e.get("weight", EDGE_BASE_WEIGHTS.get(kind, 0.0)),
"weight": final_weight,
"provenance": e.get("provenance", "rule"),
"confidence": e.get("confidence", 1.0),
"target_section": e.get("target_section"), # Essentiell für Präzision
"is_super_edge": e.get("is_super_edge", False),
"virtual": e.get("virtual", False), # WP-24c v4.1.0: Für Authority-Priorisierung
"chunk_id": e.get("chunk_id") # WP-24c v4.1.0: Für RAG-Kontext
"chunk_id": e.get("chunk_id"), # WP-24c v4.1.0: Für RAG-Kontext
"is_internal": is_internal # WP-26 v1.0: Flag für Debugging
}
owner = e.get("note_id")

View File

@ -3,7 +3,8 @@ FILE: app/core/retrieval/retriever.py
DESCRIPTION: Haupt-Schnittstelle für die Suche. Orchestriert Vektorsuche und Graph-Expansion.
WP-15c Update: Note-Level Diversity Pooling & Super-Edge Aggregation.
WP-24c v4.1.0: Gold-Standard - Scope-Awareness, Section-Filtering, Authority-Priorisierung.
VERSION: 0.8.0 (WP-24c: Gold-Standard v4.1.0)
WP-26 v1.0: Konfigurierbare Aggregation (note/chunk Level).
VERSION: 0.9.0 (WP-26: Aggregation-Level)
STATUS: Active
DEPENDENCIES: app.config, app.models.dto, app.core.database*, app.core.graph_adapter
"""
@ -34,8 +35,39 @@ from qdrant_client.http import models as rest
# Mathematische Engine importieren
from app.core.retrieval.retriever_scoring import get_weights, compute_wp22_score
try:
import yaml
except ImportError:
yaml = None
logger = logging.getLogger(__name__)
def _get_aggregation_config() -> Dict[str, Any]:
"""
WP-26 v1.0: Lädt Aggregation-Konfiguration aus retriever.yaml.
Returns:
Dict mit level ("note" oder "chunk") und max_chunks_per_note
"""
defaults = {
"level": "note", # "note" (Default) oder "chunk"
"max_chunks_per_note": 3 # Limit bei "note"-Level
}
config_path = os.getenv("MINDNET_RETRIEVER_CONFIG", "config/retriever.yaml")
if yaml and os.path.exists(config_path):
try:
with open(config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
aggregation = data.get("aggregation", {})
defaults["level"] = str(aggregation.get("level", defaults["level"])).lower()
defaults["max_chunks_per_note"] = int(aggregation.get("max_chunks_per_note", defaults["max_chunks_per_note"]))
except Exception as e:
logger.warning(f"Aggregation-Konfiguration konnte nicht geladen werden: {e}")
return defaults
# ==============================================================================
# 1. CORE HELPERS & CONFIG LOADERS
# ==============================================================================
@ -316,22 +348,33 @@ def _build_hits_from_semantic(
# 1. Sortierung nach finalem mathematischen Score
enriched_sorted = sorted(enriched, key=lambda h: h[3]["total"], reverse=True)
# 2. WP-15c: Note-Level Diversity Pooling
# Wir behalten pro note_id nur den Hit mit dem höchsten total_score.
# Dies verhindert, dass 10 Chunks derselben Note andere KeyNotes verdrängen.
unique_note_hits = []
seen_notes = set()
# 2. WP-26 v1.0: Konfigurierbare Aggregation (Note-Level oder Chunk-Level)
aggregation_config = _get_aggregation_config()
aggregation_level = aggregation_config["level"]
max_chunks_per_note = aggregation_config["max_chunks_per_note"]
for item in enriched_sorted:
_, _, payload, _ = item
note_id = str(payload.get("note_id", "unknown"))
if aggregation_level == "chunk":
# WP-26 v1.0: Chunk-Level - alle Chunks individuell ranken (keine Deduplizierung)
logger.debug(f"📊 [AGGREGATION] Chunk-Level: Alle {len(enriched_sorted)} Chunks individuell")
pooled_hits = enriched_sorted
else:
# WP-15c: Note-Level Diversity Pooling (Default)
# Behalten pro note_id bis zu max_chunks_per_note Hits
pooled_hits = []
note_chunk_count: Dict[str, int] = defaultdict(int)
if note_id not in seen_notes:
unique_note_hits.append(item)
seen_notes.add(note_id)
for item in enriched_sorted:
_, _, payload, _ = item
note_id = str(payload.get("note_id", "unknown"))
if note_chunk_count[note_id] < max_chunks_per_note:
pooled_hits.append(item)
note_chunk_count[note_id] += 1
logger.debug(f"📊 [AGGREGATION] Note-Level: {len(pooled_hits)} Chunks (max {max_chunks_per_note}/Note)")
# 3. Begrenzung auf top_k nach dem Diversity-Pooling
limited_hits = unique_note_hits[: max(1, top_k)]
limited_hits = pooled_hits[: max(1, top_k)]
results: List[QueryHit] = []
for pid, s_score, pl, dbg in limited_hits:

View File

@ -1,4 +1,16 @@
version: 1.2
version: 1.3
# WP-26 Phase 2: Aggregation-Level für Retrieval
# - note: Beste Chunk pro Note (Default, wie bisher)
# - chunk: Alle Chunks individuell ranken
aggregation:
level: note # "note" (default) oder "chunk"
max_chunks_per_note: 3 # Optional: Limit bei "note"-Level
# WP-26 Phase 2: Edge-Scoring mit Intra-Note-Boost
edge_scoring:
internal_edge_boost: 1.2 # +20% Boost für Intra-Note-Edges (is_internal=true)
external_edge_boost: 1.0 # Standard für Inter-Note-Edges
scoring:
# W_sem: skaliert den Term (semantic_score * retriever_weight)

View File

@ -2,9 +2,9 @@
# -*- coding: utf-8 -*-
"""
FILE: scripts/setup_mindnet_collections.py
VERSION: 2.1.0 (2025-12-15)
VERSION: 2.2.0 (2026-01-25)
STATUS: Active
COMPATIBILITY: v2.9.1 (Post-WP14/WP-15b)
COMPATIBILITY: v2.9.1 (Post-WP14/WP-15b), WP-26 (Intra-Note-Edges)
Zweck:
-------
@ -107,6 +107,12 @@ class QdrantHTTP:
payload = {"field_name": field, "field_schema": {"type": "text"}}
self.rq("PUT", f"/collections/{collection}/index", json=payload)
print(f"[+] Index text on {collection}.{field}")
def create_bool_index(self, collection: str, field: str) -> None:
"""WP-26 v1.0: Boolean-Index für Filterung (z.B. is_internal)."""
payload = {"field_name": field, "field_schema": "bool"}
self.rq("PUT", f"/collections/{collection}/index", json=payload)
print(f"[+] Index bool on {collection}.{field}")
def list_collections(self) -> Dict[str, Any]:
r = self.rq("GET", "/collections")
@ -129,6 +135,9 @@ def setup_mindnet_collections(q: QdrantHTTP, prefix: str, dim: int, distance: st
q.create_keyword_index(chunks, f)
for f in ["tags", "Rolle", "links"]:
q.create_keyword_index(chunks, f)
# WP-26 v1.0: note_type für Filterung (Section-Type vs Note-Type)
q.create_keyword_index(chunks, "note_type")
q.create_keyword_index(chunks, "type") # Effektiver Typ (section_type || note_type)
q.create_text_index(chunks, "text") # Volltextsuche auf dem Textfeld
# mindnet_notes: Metadaten der Notizen
@ -145,8 +154,16 @@ def setup_mindnet_collections(q: QdrantHTTP, prefix: str, dim: int, distance: st
"dst_chunk_id",
"link_text",
"relation",
"kind", # WP-26 v1.0: Kantentyp für Filterung
"source_id", # WP-26 v1.0: Source-ID für Graph-Queries
"target_id", # WP-26 v1.0: Target-ID für Graph-Queries
"scope", # WP-26 v1.0: "chunk" oder "note"
"provenance", # WP-26 v1.0: Herkunft der Kante
]:
q.create_keyword_index(edges, f)
# WP-26 v1.0: Boolean-Index für is_internal (Intra-Note-Edge-Filterung)
q.create_bool_index(edges, "is_internal")
def parse_args() -> argparse.Namespace:

View File

@ -0,0 +1,240 @@
"""
FILE: tests/test_wp26_phase2_retriever.py
DESCRIPTION: Unit-Tests für WP-26 Phase 2: Retriever-Anpassungen
- is_internal-Boost für Intra-Note-Edges
- Konfigurierbare Aggregation (Note/Chunk Level)
VERSION: 1.0.0
"""
import pytest
from unittest.mock import patch, MagicMock
import os
class TestEdgeScoringConfig:
"""UT-19: Edge-Scoring-Konfiguration"""
def test_get_edge_scoring_config_defaults(self):
"""Default-Werte werden korrekt geladen"""
from app.core.graph.graph_subgraph import get_edge_scoring_config
# Cache leeren
get_edge_scoring_config.cache_clear()
# Mit nicht-existierender Config-Datei
with patch.dict(os.environ, {"MINDNET_RETRIEVER_CONFIG": "/nonexistent/path.yaml"}):
get_edge_scoring_config.cache_clear()
config = get_edge_scoring_config()
assert config["internal_edge_boost"] == 1.2
assert config["external_edge_boost"] == 1.0
def test_get_edge_scoring_config_from_yaml(self):
"""Werte werden aus YAML geladen"""
from app.core.graph.graph_subgraph import get_edge_scoring_config
# Cache leeren und echte Config laden
get_edge_scoring_config.cache_clear()
# Mit echter Config-Datei
config = get_edge_scoring_config()
# Die Werte sollten den Defaults entsprechen (aus retriever.yaml)
assert config["internal_edge_boost"] >= 1.0
assert config["external_edge_boost"] >= 1.0
class TestIsInternalBoost:
"""UT-20: is_internal-Boost im Subgraph"""
def test_internal_edge_gets_boost(self):
"""Intra-Note-Edges erhalten höheres Gewicht"""
from app.core.graph.graph_subgraph import Subgraph, get_edge_scoring_config
# Cache leeren
get_edge_scoring_config.cache_clear()
sg = Subgraph()
# Interne Edge (innerhalb derselben Note)
sg.add_edge({
"source": "note1#c01",
"target": "note1#c02",
"kind": "derives",
"weight": 1.0,
"is_internal": True
})
# Prüfe, dass das Gewicht erhöht wurde
edges = sg.adj.get("note1#c01", [])
assert len(edges) == 1
internal_boost = get_edge_scoring_config()["internal_edge_boost"]
assert edges[0]["weight"] == 1.0 * internal_boost
assert edges[0]["is_internal"] is True
def test_external_edge_no_boost(self):
"""Inter-Note-Edges erhalten keinen Boost"""
from app.core.graph.graph_subgraph import Subgraph, get_edge_scoring_config
# Cache leeren
get_edge_scoring_config.cache_clear()
sg = Subgraph()
# Externe Edge (zwischen verschiedenen Notes)
sg.add_edge({
"source": "note1#c01",
"target": "note2#c01",
"kind": "references",
"weight": 1.0,
"is_internal": False
})
edges = sg.adj.get("note1#c01", [])
assert len(edges) == 1
external_boost = get_edge_scoring_config()["external_edge_boost"]
assert edges[0]["weight"] == 1.0 * external_boost
assert edges[0]["is_internal"] is False
def test_edge_bonus_aggregation_with_internal(self):
"""Edge-Bonus aggregiert korrekt mit is_internal-Boost"""
from app.core.graph.graph_subgraph import Subgraph, get_edge_scoring_config
get_edge_scoring_config.cache_clear()
sg = Subgraph()
# Zwei Edges: eine interne, eine externe
sg.add_edge({
"source": "note1",
"target": "note2",
"kind": "solves",
"weight": 1.5,
"is_internal": True
})
sg.add_edge({
"source": "note1",
"target": "note3",
"kind": "references",
"weight": 0.1,
"is_internal": False
})
# Aggregierter Bonus
bonus = sg.edge_bonus("note1")
# Sollte > 0 sein
assert bonus > 0
class TestAggregationConfig:
"""UT-21: Aggregation-Konfiguration"""
def test_get_aggregation_config_defaults(self):
"""Default-Werte werden korrekt geladen"""
from app.core.retrieval.retriever import _get_aggregation_config
# Mit nicht-existierender Config-Datei
with patch.dict(os.environ, {"MINDNET_RETRIEVER_CONFIG": "/nonexistent/path.yaml"}):
config = _get_aggregation_config()
assert config["level"] == "note"
assert config["max_chunks_per_note"] == 3
def test_get_aggregation_config_from_yaml(self):
"""Werte werden aus YAML geladen"""
from app.core.retrieval.retriever import _get_aggregation_config
config = _get_aggregation_config()
# Die Werte sollten aus retriever.yaml kommen
assert config["level"] in ["note", "chunk"]
assert config["max_chunks_per_note"] >= 1
class TestNoteLevelAggregation:
"""UT-22: Note-Level Aggregation mit max_chunks_per_note"""
def test_note_level_limits_chunks(self):
"""Note-Level-Aggregation limitiert Chunks pro Note"""
# Mock-Daten: 5 Chunks von Note1, 3 Chunks von Note2
mock_hits = [
("c1", 0.9, {"note_id": "note1", "chunk_id": "c1"}),
("c2", 0.85, {"note_id": "note1", "chunk_id": "c2"}),
("c3", 0.8, {"note_id": "note2", "chunk_id": "c3"}),
("c4", 0.75, {"note_id": "note1", "chunk_id": "c4"}),
("c5", 0.7, {"note_id": "note2", "chunk_id": "c5"}),
("c6", 0.65, {"note_id": "note1", "chunk_id": "c6"}),
("c7", 0.6, {"note_id": "note1", "chunk_id": "c7"}),
]
# Simuliere Note-Level-Aggregation mit max_chunks_per_note=2
max_chunks_per_note = 2
pooled = []
note_count = {}
for pid, score, payload in sorted(mock_hits, key=lambda x: x[1], reverse=True):
note_id = payload["note_id"]
if note_count.get(note_id, 0) < max_chunks_per_note:
pooled.append((pid, score, payload))
note_count[note_id] = note_count.get(note_id, 0) + 1
# Erwartung: 2 von note1, 2 von note2 = 4 Chunks
assert len(pooled) == 4
# Prüfe, dass jede Note maximal 2 Chunks hat
note1_chunks = [p for p in pooled if p[2]["note_id"] == "note1"]
note2_chunks = [p for p in pooled if p[2]["note_id"] == "note2"]
assert len(note1_chunks) == 2
assert len(note2_chunks) == 2
class TestChunkLevelAggregation:
"""UT-23: Chunk-Level Aggregation (keine Deduplizierung)"""
def test_chunk_level_no_dedup(self):
"""Chunk-Level-Aggregation gibt alle Chunks zurück"""
mock_hits = [
("c1", 0.9, {"note_id": "note1"}),
("c2", 0.85, {"note_id": "note1"}),
("c3", 0.8, {"note_id": "note1"}),
("c4", 0.75, {"note_id": "note1"}),
("c5", 0.7, {"note_id": "note1"}),
]
# Chunk-Level: Keine Deduplizierung
aggregation_level = "chunk"
if aggregation_level == "chunk":
pooled = mock_hits
else:
pooled = [] # Note-Level würde nur 1 behalten
# Alle 5 Chunks sollten erhalten bleiben
assert len(pooled) == 5
class TestQdrantIndexSetup:
"""UT-24: Qdrant-Index-Setup"""
def test_bool_index_method_exists(self):
"""create_bool_index Methode existiert"""
from scripts.setup_mindnet_collections import QdrantHTTP
q = QdrantHTTP("http://localhost:6333")
assert hasattr(q, "create_bool_index")
def test_setup_includes_is_internal_index(self):
"""Setup-Funktion enthält is_internal Index"""
import inspect
from scripts.setup_mindnet_collections import setup_mindnet_collections
# Prüfe den Quellcode der Funktion
source = inspect.getsource(setup_mindnet_collections)
assert "is_internal" in source
assert "create_bool_index" in source
if __name__ == "__main__":
pytest.main([__file__, "-v"])