bug fix
This commit is contained in:
parent
1fde4ed72a
commit
5fa02aed2d
|
|
@ -1,72 +1,94 @@
|
||||||
"""
|
"""
|
||||||
app/services/discovery.py
|
app/services/discovery.py
|
||||||
Updated for WP-11: Sliding Window Analysis.
|
Service für Link-Vorschläge und Knowledge-Discovery (WP-11).
|
||||||
|
Implementiert Sliding Window für lange Texte und Late Binding für Edge-Typen.
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
from typing import List, Dict, Any
|
import os # <--- Added missing import
|
||||||
|
from typing import List, Dict, Any, Optional
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from app.core.qdrant import QdrantConfig, get_client
|
from app.core.qdrant import QdrantConfig, get_client
|
||||||
from app.models.dto import QueryRequest
|
from app.models.dto import QueryRequest
|
||||||
|
# Hinweis: hybrid_retrieve ist aktuell synchron. In einer reinen Async-Welt
|
||||||
|
# würde man dies refactorn, aber hier wrappen wir es.
|
||||||
from app.core.retriever import hybrid_retrieve
|
from app.core.retriever import hybrid_retrieve
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class DiscoveryService:
|
class DiscoveryService:
|
||||||
def __init__(self, collection_prefix: str = None):
|
def __init__(self, collection_prefix: str = None):
|
||||||
|
# 1. Config laden
|
||||||
self.cfg = QdrantConfig.from_env()
|
self.cfg = QdrantConfig.from_env()
|
||||||
|
# Prefix Priorität: Argument > Env > Default
|
||||||
self.prefix = collection_prefix or self.cfg.prefix or "mindnet"
|
self.prefix = collection_prefix or self.cfg.prefix or "mindnet"
|
||||||
self.client = get_client(self.cfg)
|
self.client = get_client(self.cfg)
|
||||||
|
|
||||||
|
# 2. Registry für Late Binding laden (Edge Defaults)
|
||||||
self.registry = self._load_type_registry()
|
self.registry = self._load_type_registry()
|
||||||
|
|
||||||
async def analyze_draft(self, text: str, current_type: str) -> Dict[str, Any]:
|
async def analyze_draft(self, text: str, current_type: str) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Analysiert den Draft mit Sliding Window Strategie.
|
Analysiert einen Draft-Text und schlägt Verlinkungen vor.
|
||||||
|
Nutzt Sliding Window für Semantik und Full-Text Scan für Entity Recognition.
|
||||||
"""
|
"""
|
||||||
suggestions = []
|
suggestions = []
|
||||||
|
|
||||||
|
# Default Edge Typ aus Config (z.B. 'depends_on' für Projekte)
|
||||||
default_edge_type = self._get_default_edge_type(current_type)
|
default_edge_type = self._get_default_edge_type(current_type)
|
||||||
|
|
||||||
# 1. Exact Match (Läuft über gesamten Text, schnell genug)
|
# ---------------------------------------------------------
|
||||||
|
# 1. Exact Match: Finde Titel/Aliases im Text
|
||||||
|
# ---------------------------------------------------------
|
||||||
known_entities = self._fetch_all_titles_and_aliases()
|
known_entities = self._fetch_all_titles_and_aliases()
|
||||||
found_entities = self._find_entities_in_text(text, known_entities)
|
found_entities = self._find_entities_in_text(text, known_entities)
|
||||||
|
|
||||||
existing_target_ids = set()
|
existing_target_ids = set()
|
||||||
|
|
||||||
for entity in found_entities:
|
for entity in found_entities:
|
||||||
existing_target_ids.add(entity["id"])
|
existing_target_ids.add(entity["id"])
|
||||||
|
target_title = entity["title"]
|
||||||
|
suggested_md = f"[[rel:{default_edge_type} {target_title}]]"
|
||||||
|
|
||||||
suggestions.append({
|
suggestions.append({
|
||||||
"type": "exact_match",
|
"type": "exact_match",
|
||||||
"text_found": entity["match"],
|
"text_found": entity["match"],
|
||||||
"target_title": entity["title"],
|
"target_title": target_title,
|
||||||
"target_id": entity["id"],
|
"target_id": entity["id"],
|
||||||
"suggested_edge_type": default_edge_type,
|
"suggested_edge_type": default_edge_type,
|
||||||
"suggested_markdown": f"[[rel:{default_edge_type} {entity['title']}]]",
|
"suggested_markdown": suggested_md,
|
||||||
"confidence": 1.0,
|
"confidence": 1.0,
|
||||||
"reason": f"Exakter Treffer ({entity['match']})"
|
"reason": f"Exakter Treffer: '{entity['match']}'"
|
||||||
})
|
})
|
||||||
|
|
||||||
# 2. Semantic Match (Sliding Window)
|
# ---------------------------------------------------------
|
||||||
# Wir zerlegen den Text in relevante Chunks, um Token-Limits zu umgehen
|
# 2. Semantic Match: Sliding Window Analyse
|
||||||
# und Fokus zu streuen.
|
# ---------------------------------------------------------
|
||||||
|
# Zerlege Text in sinnvolle Abschnitte für das Embedding
|
||||||
search_queries = self._generate_search_queries(text)
|
search_queries = self._generate_search_queries(text)
|
||||||
|
|
||||||
# Parallel Execution für alle Queries
|
# Parallel alle Abschnitte suchen
|
||||||
tasks = [self._get_semantic_suggestions_async(q) for q in search_queries]
|
tasks = [self._get_semantic_suggestions_async(q) for q in search_queries]
|
||||||
results_list = await asyncio.gather(*tasks)
|
results_list = await asyncio.gather(*tasks)
|
||||||
|
|
||||||
# Ergebnisse mergen und deduplizieren
|
# Ergebnisse zusammenführen
|
||||||
seen_semantic_ids = set()
|
seen_semantic_ids = set()
|
||||||
|
|
||||||
for hits in results_list:
|
for hits in results_list:
|
||||||
for hit in hits:
|
for hit in hits:
|
||||||
|
# Duplikate filtern (schon als Exact Match oder schon als anderer Semantic Hit)
|
||||||
if hit.node_id in existing_target_ids or hit.node_id in seen_semantic_ids:
|
if hit.node_id in existing_target_ids or hit.node_id in seen_semantic_ids:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Threshold Tuning: Bei 'nomic' sind Scores oft niedriger (0.4-0.6 ist schon gut)
|
# Schwellwert: Mit 'nomic-embed-text' sind Scores oft schärfer.
|
||||||
# Wir setzen ihn moderat auf 0.50
|
# 0.50 ist ein guter Startwert für semantische Nähe.
|
||||||
if hit.total_score > 0.50:
|
if hit.total_score > 0.50:
|
||||||
seen_semantic_ids.add(hit.node_id)
|
seen_semantic_ids.add(hit.node_id)
|
||||||
|
|
||||||
|
# Titel aus Payload holen (wurde in chunk_payload.py gefixt)
|
||||||
target_title = hit.payload.get("title") or hit.node_id
|
target_title = hit.payload.get("title") or hit.node_id
|
||||||
|
suggested_md = f"[[rel:{default_edge_type} {target_title}]]"
|
||||||
|
|
||||||
suggestions.append({
|
suggestions.append({
|
||||||
"type": "semantic_match",
|
"type": "semantic_match",
|
||||||
|
|
@ -74,99 +96,117 @@ class DiscoveryService:
|
||||||
"target_title": target_title,
|
"target_title": target_title,
|
||||||
"target_id": hit.node_id,
|
"target_id": hit.node_id,
|
||||||
"suggested_edge_type": default_edge_type,
|
"suggested_edge_type": default_edge_type,
|
||||||
"suggested_markdown": f"[[rel:{default_edge_type} {target_title}]]",
|
"suggested_markdown": suggested_md,
|
||||||
"confidence": round(hit.total_score, 2),
|
"confidence": round(hit.total_score, 2),
|
||||||
"reason": f"Semantisch ähnlich ({hit.total_score:.2f})"
|
"reason": f"Semantisch ähnlich ({hit.total_score:.2f})"
|
||||||
})
|
})
|
||||||
|
|
||||||
# Sortieren nach Confidence
|
# Sortieren nach Confidence (Höchste zuerst)
|
||||||
suggestions.sort(key=lambda x: x["confidence"], reverse=True)
|
suggestions.sort(key=lambda x: x["confidence"], reverse=True)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"draft_length": len(text),
|
"draft_length": len(text),
|
||||||
|
"analyzed_windows": len(search_queries),
|
||||||
"suggestions_count": len(suggestions),
|
"suggestions_count": len(suggestions),
|
||||||
"suggestions": suggestions[:10] # Limit auf Top 10
|
"suggestions": suggestions[:10] # Top 10 reichen
|
||||||
}
|
}
|
||||||
|
|
||||||
# --- Helpers ---
|
# --- Interne Helfer ---
|
||||||
|
|
||||||
def _generate_search_queries(self, text: str) -> List[str]:
|
def _generate_search_queries(self, text: str) -> List[str]:
|
||||||
"""
|
"""Erzeugt Sliding Windows über den Text."""
|
||||||
Zerlegt den Text in bis zu 3 Such-Queries:
|
if not text: return []
|
||||||
1. Der Anfang (Kontext/Einleitung)
|
if len(text) < 600: return [text]
|
||||||
2. Die Mitte (Details)
|
|
||||||
3. Das Ende (Fazit/Zusammenfassung)
|
|
||||||
"""
|
|
||||||
if len(text) < 600:
|
|
||||||
return [text]
|
|
||||||
|
|
||||||
queries = []
|
queries = []
|
||||||
# Query 1: Die ersten 400 Zeichen
|
# 1. Anfang (Kontext)
|
||||||
queries.append(text[:400])
|
queries.append(text[:500])
|
||||||
|
|
||||||
# Query 2: Ein Fenster aus der Mitte
|
# 2. Mitte
|
||||||
mid = len(text) // 2
|
mid = len(text) // 2
|
||||||
queries.append(text[mid-200 : mid+200])
|
queries.append(text[mid-250 : mid+250])
|
||||||
|
|
||||||
# Query 3: Die letzten 400 Zeichen
|
# 3. Ende (Fazit)
|
||||||
if len(text) > 800:
|
if len(text) > 800:
|
||||||
queries.append(text[-400:])
|
queries.append(text[-500:])
|
||||||
|
|
||||||
return queries
|
return queries
|
||||||
|
|
||||||
async def _get_semantic_suggestions_async(self, text: str):
|
async def _get_semantic_suggestions_async(self, text: str):
|
||||||
# Nutzt hybrid_retrieve (sync), aber hier in Async Context okay
|
"""Wrapper um den Retriever (sync)."""
|
||||||
req = QueryRequest(query=text, top_k=5, explain=False)
|
req = QueryRequest(query=text, top_k=5, explain=False)
|
||||||
try:
|
try:
|
||||||
|
# Hier blockieren wir kurz den Loop, da hybrid_retrieve sync ist.
|
||||||
|
# In High-Load Szenarien müsste das in einen ThreadPoolExecutor.
|
||||||
res = hybrid_retrieve(req)
|
res = hybrid_retrieve(req)
|
||||||
return res.results
|
return res.results
|
||||||
except Exception: return []
|
except Exception as e:
|
||||||
|
logger.error(f"Semantic suggestion error: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
# ... (Restliche Methoden wie _fetch_all_titles_and_aliases bleiben gleich) ...
|
|
||||||
# Füge hier die Methoden aus dem vorherigen Artefakt ein (fetch_all..., find_entities..., load_type...)
|
|
||||||
# Der Kürze halber lasse ich sie im Snippet weg, da sie unverändert sind.
|
|
||||||
|
|
||||||
def _load_type_registry(self) -> dict:
|
def _load_type_registry(self) -> dict:
|
||||||
path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
|
path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
|
||||||
if not os.path.exists(path):
|
if not os.path.exists(path):
|
||||||
if os.path.exists("types.yaml"): path = "types.yaml"
|
if os.path.exists("types.yaml"): path = "types.yaml"
|
||||||
else: return {}
|
else: return {}
|
||||||
try:
|
try:
|
||||||
with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {}
|
with open(path, "r", encoding="utf-8") as f:
|
||||||
except Exception: return {}
|
return yaml.safe_load(f) or {}
|
||||||
|
except Exception:
|
||||||
|
return {}
|
||||||
|
|
||||||
def _get_default_edge_type(self, note_type: str) -> str:
|
def _get_default_edge_type(self, note_type: str) -> str:
|
||||||
types_cfg = self.registry.get("types", {})
|
types_cfg = self.registry.get("types", {})
|
||||||
type_def = types_cfg.get(note_type, {})
|
type_def = types_cfg.get(note_type, {})
|
||||||
defaults = type_def.get("edge_defaults")
|
defaults = type_def.get("edge_defaults")
|
||||||
return defaults[0] if defaults else "related_to"
|
if defaults and isinstance(defaults, list) and len(defaults) > 0:
|
||||||
|
return defaults[0]
|
||||||
|
return "related_to"
|
||||||
|
|
||||||
def _fetch_all_titles_and_aliases(self) -> List[Dict]:
|
def _fetch_all_titles_and_aliases(self) -> List[Dict]:
|
||||||
notes = []
|
notes = []
|
||||||
next_page = None
|
next_page = None
|
||||||
col = f"{self.prefix}_notes"
|
col_name = f"{self.prefix}_notes"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
res, next_page = self.client.scroll(collection_name=col, limit=1000, offset=next_page, with_payload=True, with_vectors=False)
|
res, next_page = self.client.scroll(
|
||||||
|
collection_name=col_name,
|
||||||
|
limit=1000,
|
||||||
|
offset=next_page,
|
||||||
|
with_payload=True,
|
||||||
|
with_vectors=False
|
||||||
|
)
|
||||||
for point in res:
|
for point in res:
|
||||||
pl = point.payload or {}
|
pl = point.payload or {}
|
||||||
aliases = pl.get("aliases") or []
|
aliases = pl.get("aliases") or []
|
||||||
if isinstance(aliases, str): aliases = [aliases]
|
if isinstance(aliases, str): aliases = [aliases]
|
||||||
notes.append({"id": pl.get("note_id"), "title": pl.get("title"), "aliases": aliases})
|
|
||||||
|
notes.append({
|
||||||
|
"id": pl.get("note_id"),
|
||||||
|
"title": pl.get("title"),
|
||||||
|
"aliases": aliases
|
||||||
|
})
|
||||||
|
|
||||||
if next_page is None: break
|
if next_page is None: break
|
||||||
except Exception: pass
|
except Exception as e:
|
||||||
|
logger.error(f"Error fetching titles: {e}")
|
||||||
|
return []
|
||||||
return notes
|
return notes
|
||||||
|
|
||||||
def _find_entities_in_text(self, text: str, entities: List[Dict]) -> List[Dict]:
|
def _find_entities_in_text(self, text: str, entities: List[Dict]) -> List[Dict]:
|
||||||
found = []
|
found = []
|
||||||
text_lower = text.lower()
|
text_lower = text.lower()
|
||||||
for entity in entities:
|
for entity in entities:
|
||||||
|
# Title
|
||||||
title = entity.get("title")
|
title = entity.get("title")
|
||||||
if title and title.lower() in text_lower:
|
if title and title.lower() in text_lower:
|
||||||
found.append({"match": title, "title": title, "id": entity["id"]})
|
found.append({"match": title, "title": title, "id": entity["id"]})
|
||||||
continue
|
continue
|
||||||
for alias in entity.get("aliases", []):
|
# Aliases
|
||||||
if str(alias).lower() in text_lower:
|
aliases = entity.get("aliases", [])
|
||||||
|
for alias in aliases:
|
||||||
|
if alias and str(alias).lower() in text_lower:
|
||||||
found.append({"match": alias, "title": title, "id": entity["id"]})
|
found.append({"match": alias, "title": title, "id": entity["id"]})
|
||||||
break
|
break
|
||||||
return found
|
return found
|
||||||
Loading…
Reference in New Issue
Block a user