254 lines
9.8 KiB
Python
254 lines
9.8 KiB
Python
"""
|
|
FILE: app/services/discovery.py
|
|
DESCRIPTION: Service für WP-11. Analysiert Texte, findet Entitäten und schlägt typisierte Verbindungen vor ("Matrix-Logic").
|
|
VERSION: 0.6.0
|
|
STATUS: Active
|
|
DEPENDENCIES: app.core.qdrant, app.models.dto, app.core.retriever
|
|
EXTERNAL_CONFIG: config/types.yaml
|
|
LAST_ANALYSIS: 2025-12-15
|
|
"""
|
|
import logging
|
|
import asyncio
|
|
import os
|
|
from typing import List, Dict, Any, Optional, Set
|
|
import yaml
|
|
|
|
from app.core.qdrant import QdrantConfig, get_client
|
|
from app.models.dto import QueryRequest
|
|
from app.core.retriever import hybrid_retrieve
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DiscoveryService:
|
|
def __init__(self, collection_prefix: str = None):
|
|
self.cfg = QdrantConfig.from_env()
|
|
self.prefix = collection_prefix or self.cfg.prefix or "mindnet"
|
|
self.client = get_client(self.cfg)
|
|
self.registry = self._load_type_registry()
|
|
|
|
async def analyze_draft(self, text: str, current_type: str) -> Dict[str, Any]:
|
|
"""
|
|
Analysiert den Text und liefert Vorschläge mit kontext-sensitiven Kanten-Typen.
|
|
"""
|
|
suggestions = []
|
|
|
|
# Fallback, falls keine spezielle Regel greift
|
|
default_edge_type = self._get_default_edge_type(current_type)
|
|
|
|
# Tracking-Sets für Deduplizierung (Wir merken uns NOTE-IDs)
|
|
seen_target_note_ids = set()
|
|
|
|
# ---------------------------------------------------------
|
|
# 1. Exact Match: Titel/Aliases
|
|
# ---------------------------------------------------------
|
|
# Holt Titel, Aliases UND Typen aus dem Index
|
|
known_entities = self._fetch_all_titles_and_aliases()
|
|
found_entities = self._find_entities_in_text(text, known_entities)
|
|
|
|
for entity in found_entities:
|
|
if entity["id"] in seen_target_note_ids:
|
|
continue
|
|
seen_target_note_ids.add(entity["id"])
|
|
|
|
# INTELLIGENTE KANTEN-LOGIK (MATRIX)
|
|
target_type = entity.get("type", "concept")
|
|
smart_edge = self._resolve_edge_type(current_type, target_type)
|
|
|
|
suggestions.append({
|
|
"type": "exact_match",
|
|
"text_found": entity["match"],
|
|
"target_title": entity["title"],
|
|
"target_id": entity["id"],
|
|
"suggested_edge_type": smart_edge,
|
|
"suggested_markdown": f"[[rel:{smart_edge} {entity['title']}]]",
|
|
"confidence": 1.0,
|
|
"reason": f"Exakter Treffer: '{entity['match']}' ({target_type})"
|
|
})
|
|
|
|
# ---------------------------------------------------------
|
|
# 2. Semantic Match: Sliding Window & Footer Focus
|
|
# ---------------------------------------------------------
|
|
search_queries = self._generate_search_queries(text)
|
|
|
|
# Async parallel abfragen
|
|
tasks = [self._get_semantic_suggestions_async(q) for q in search_queries]
|
|
results_list = await asyncio.gather(*tasks)
|
|
|
|
# Ergebnisse verarbeiten
|
|
for hits in results_list:
|
|
for hit in hits:
|
|
note_id = hit.payload.get("note_id")
|
|
if not note_id: continue
|
|
|
|
# Deduplizierung (Notiz-Ebene)
|
|
if note_id in seen_target_note_ids:
|
|
continue
|
|
|
|
# Score Check (Threshold 0.50 für nomic-embed-text)
|
|
if hit.total_score > 0.50:
|
|
seen_target_note_ids.add(note_id)
|
|
|
|
target_title = hit.payload.get("title") or "Unbekannt"
|
|
|
|
# INTELLIGENTE KANTEN-LOGIK (MATRIX)
|
|
# Den Typ der gefundenen Notiz aus dem Payload lesen
|
|
target_type = hit.payload.get("type", "concept")
|
|
smart_edge = self._resolve_edge_type(current_type, target_type)
|
|
|
|
suggestions.append({
|
|
"type": "semantic_match",
|
|
"text_found": (hit.source.get("text") or "")[:60] + "...",
|
|
"target_title": target_title,
|
|
"target_id": note_id,
|
|
"suggested_edge_type": smart_edge,
|
|
"suggested_markdown": f"[[rel:{smart_edge} {target_title}]]",
|
|
"confidence": round(hit.total_score, 2),
|
|
"reason": f"Semantisch ähnlich zu {target_type} ({hit.total_score:.2f})"
|
|
})
|
|
|
|
# Sortieren nach Confidence
|
|
suggestions.sort(key=lambda x: x["confidence"], reverse=True)
|
|
|
|
return {
|
|
"draft_length": len(text),
|
|
"analyzed_windows": len(search_queries),
|
|
"suggestions_count": len(suggestions),
|
|
"suggestions": suggestions[:10]
|
|
}
|
|
|
|
# ---------------------------------------------------------
|
|
# Core Logic: Die Matrix
|
|
# ---------------------------------------------------------
|
|
|
|
def _resolve_edge_type(self, source_type: str, target_type: str) -> str:
|
|
"""
|
|
Entscheidungsmatrix für komplexe Verbindungen.
|
|
Definiert, wie Typ A auf Typ B verlinken sollte.
|
|
"""
|
|
st = source_type.lower()
|
|
tt = target_type.lower()
|
|
|
|
# Regeln für 'experience' (Erfahrungen)
|
|
if st == "experience":
|
|
if tt == "value": return "based_on"
|
|
if tt == "principle": return "derived_from"
|
|
if tt == "trip": return "part_of"
|
|
if tt == "lesson": return "learned"
|
|
if tt == "project": return "related_to" # oder belongs_to
|
|
|
|
# Regeln für 'project'
|
|
if st == "project":
|
|
if tt == "decision": return "depends_on"
|
|
if tt == "concept": return "uses"
|
|
if tt == "person": return "managed_by"
|
|
|
|
# Regeln für 'decision' (ADR)
|
|
if st == "decision":
|
|
if tt == "principle": return "compliant_with"
|
|
if tt == "requirement": return "addresses"
|
|
|
|
# Fallback: Standard aus der types.yaml für den Source-Typ
|
|
return self._get_default_edge_type(st)
|
|
|
|
# ---------------------------------------------------------
|
|
# Sliding Windows
|
|
# ---------------------------------------------------------
|
|
|
|
def _generate_search_queries(self, text: str) -> List[str]:
|
|
"""
|
|
Erzeugt intelligente Fenster + Footer Scan.
|
|
"""
|
|
text_len = len(text)
|
|
if not text: return []
|
|
|
|
queries = []
|
|
|
|
# 1. Start / Gesamtkontext
|
|
queries.append(text[:600])
|
|
|
|
# 2. Footer-Scan (Wichtig für "Projekt"-Referenzen am Ende)
|
|
if text_len > 150:
|
|
footer = text[-250:]
|
|
if footer not in queries:
|
|
queries.append(footer)
|
|
|
|
# 3. Sliding Window für lange Texte
|
|
if text_len > 800:
|
|
window_size = 500
|
|
step = 1500
|
|
for i in range(window_size, text_len - window_size, step):
|
|
end_pos = min(i + window_size, text_len)
|
|
chunk = text[i:end_pos]
|
|
if len(chunk) > 100:
|
|
queries.append(chunk)
|
|
|
|
return queries
|
|
|
|
# ---------------------------------------------------------
|
|
# Standard Helpers
|
|
# ---------------------------------------------------------
|
|
|
|
async def _get_semantic_suggestions_async(self, text: str):
|
|
req = QueryRequest(query=text, top_k=5, explain=False)
|
|
try:
|
|
res = hybrid_retrieve(req)
|
|
return res.results
|
|
except Exception as e:
|
|
logger.error(f"Semantic suggestion error: {e}")
|
|
return []
|
|
|
|
def _load_type_registry(self) -> dict:
|
|
path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
|
|
if not os.path.exists(path):
|
|
if os.path.exists("types.yaml"): path = "types.yaml"
|
|
else: return {}
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {}
|
|
except Exception: return {}
|
|
|
|
def _get_default_edge_type(self, note_type: str) -> str:
|
|
types_cfg = self.registry.get("types", {})
|
|
type_def = types_cfg.get(note_type, {})
|
|
defaults = type_def.get("edge_defaults")
|
|
return defaults[0] if defaults else "related_to"
|
|
|
|
def _fetch_all_titles_and_aliases(self) -> List[Dict]:
|
|
notes = []
|
|
next_page = None
|
|
col = f"{self.prefix}_notes"
|
|
try:
|
|
while True:
|
|
res, next_page = self.client.scroll(
|
|
collection_name=col, limit=1000, offset=next_page,
|
|
with_payload=True, with_vectors=False
|
|
)
|
|
for point in res:
|
|
pl = point.payload or {}
|
|
aliases = pl.get("aliases") or []
|
|
if isinstance(aliases, str): aliases = [aliases]
|
|
|
|
notes.append({
|
|
"id": pl.get("note_id"),
|
|
"title": pl.get("title"),
|
|
"aliases": aliases,
|
|
"type": pl.get("type", "concept") # WICHTIG: Typ laden für Matrix
|
|
})
|
|
if next_page is None: break
|
|
except Exception: pass
|
|
return notes
|
|
|
|
def _find_entities_in_text(self, text: str, entities: List[Dict]) -> List[Dict]:
|
|
found = []
|
|
text_lower = text.lower()
|
|
for entity in entities:
|
|
# Title Check
|
|
title = entity.get("title")
|
|
if title and title.lower() in text_lower:
|
|
found.append({"match": title, "title": title, "id": entity["id"], "type": entity["type"]})
|
|
continue
|
|
# Alias Check
|
|
for alias in entity.get("aliases", []):
|
|
if str(alias).lower() in text_lower:
|
|
found.append({"match": alias, "title": title, "id": entity["id"], "type": entity["type"]})
|
|
break
|
|
return found |