WP20 openrouter
This commit is contained in:
parent
2a98c37ca1
commit
c60aba63a4
|
|
@ -1,7 +1,7 @@
|
|||
"""
|
||||
FILE: app/core/ingestion.py
|
||||
DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen (Notes, Chunks, Edges).
|
||||
WP-20: Integration von Smart Edge Allocation via Hybrid LLM (Gemini/Ollama).
|
||||
WP-20: Integration von Smart Edge Allocation via Hybrid LLM (Gemini/Gemma/OpenRouter).
|
||||
WP-22: Integration von Content Lifecycle (Status Gate) und Edge Registry Validation.
|
||||
WP-22: Kontextsensitive Kanten-Validierung mit Fundort-Reporting (Zeilennummern).
|
||||
WP-22: Multi-Hash Refresh für konsistente Change Detection.
|
||||
|
|
@ -22,7 +22,7 @@ from app.core.parser import (
|
|||
read_markdown,
|
||||
normalize_frontmatter,
|
||||
validate_required_frontmatter,
|
||||
extract_edges_with_context, #
|
||||
extract_edges_with_context, # WP-22: Funktion für Zeilennummern
|
||||
)
|
||||
from app.core.note_payload import make_note_payload
|
||||
from app.core.chunker import assemble_chunks, get_chunk_config
|
||||
|
|
@ -44,7 +44,7 @@ from app.core.qdrant_points import (
|
|||
|
||||
from app.services.embeddings_client import EmbeddingsClient
|
||||
from app.services.edge_registry import registry as edge_registry
|
||||
from app.services.llm_service import LLMService #
|
||||
from app.services.llm_service import LLMService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -52,7 +52,9 @@ logger = logging.getLogger(__name__)
|
|||
def load_type_registry(custom_path: Optional[str] = None) -> dict:
|
||||
"""Lädt die types.yaml zur Steuerung der typ-spezifischen Ingestion."""
|
||||
import yaml
|
||||
path = custom_path or os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
|
||||
from app.config import get_settings
|
||||
settings = get_settings()
|
||||
path = custom_path or getattr(settings, "MINDNET_TYPES_FILE", "config/types.yaml")
|
||||
if not os.path.exists(path): return {}
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {}
|
||||
|
|
@ -100,17 +102,18 @@ def effective_retriever_weight(fm: dict, note_type: str, reg: dict) -> float:
|
|||
class IngestionService:
|
||||
def __init__(self, collection_prefix: str = None):
|
||||
from app.config import get_settings
|
||||
self.settings = get_settings() #
|
||||
self.settings = get_settings()
|
||||
|
||||
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
|
||||
self.cfg = QdrantConfig.from_env()
|
||||
self.cfg.prefix = self.prefix
|
||||
self.client = get_client(self.cfg)
|
||||
self.dim = self.cfg.VECTOR_SIZE #
|
||||
self.type_registry = load_type_registry()
|
||||
self.dim = self.cfg.dim if hasattr(self.cfg, 'dim') else self.settings.VECTOR_SIZE
|
||||
self.registry = load_type_registry()
|
||||
self.embedder = EmbeddingsClient()
|
||||
self.llm = LLMService() #
|
||||
self.llm = LLMService() # WP-20 Integration
|
||||
|
||||
# Change Detection Modus (full oder body)
|
||||
self.active_hash_mode = os.getenv("MINDNET_CHANGE_DETECTION_MODE", "full")
|
||||
|
||||
try:
|
||||
|
|
@ -120,8 +123,8 @@ class IngestionService:
|
|||
logger.warning(f"DB init warning: {e}")
|
||||
|
||||
def _get_chunk_config_by_profile(self, profile_name: str, note_type: str) -> Dict[str, Any]:
|
||||
"""Holt die Chunker-Parameter für ein spezifisches Profil."""
|
||||
profiles = self.type_registry.get("chunking_profiles", {})
|
||||
"""Holt die Chunker-Parameter (max, target, overlap) für ein spezifisches Profil."""
|
||||
profiles = self.registry.get("chunking_profiles", {})
|
||||
if profile_name in profiles:
|
||||
cfg = profiles[profile_name].copy()
|
||||
if "overlap" in cfg and isinstance(cfg["overlap"], list):
|
||||
|
|
@ -134,30 +137,24 @@ class IngestionService:
|
|||
WP-20: Nutzt den Hybrid LLM Service für die semantische Kanten-Extraktion.
|
||||
Verwendet provider-spezifische Prompts aus der config.
|
||||
"""
|
||||
provider = self.settings.MINDNET_LLM_PROVIDER #
|
||||
# Wir priorisieren Gemma für Ingestion, falls verfügbar (OpenRouter/Cloud)
|
||||
model = getattr(self.settings, "GEMMA_MODEL", None)
|
||||
provider = self.settings.MINDNET_LLM_PROVIDER
|
||||
|
||||
# Prompt-Lookup (Fallback auf Standard-Struktur falls Key fehlt)
|
||||
prompt_data = self.llm.prompts.get("edge_extraction", {})
|
||||
if isinstance(prompt_data, dict):
|
||||
template = prompt_data.get(provider, prompt_data.get("ollama", ""))
|
||||
else:
|
||||
template = str(prompt_data)
|
||||
|
||||
if not template:
|
||||
template = "Extrahiere semantische Relationen aus: {text}. Antworte als JSON: [{\"to\": \"X\", \"kind\": \"Y\"}]"
|
||||
|
||||
template = self.llm.get_prompt("edge_extraction")
|
||||
prompt = template.format(text=text[:6000], note_id=note_id)
|
||||
|
||||
try:
|
||||
# Nutzt die Semaphore für Hintergrund-Tasks
|
||||
# Hintergrund-Task mit Semaphore
|
||||
response_json = await self.llm.generate_raw_response(
|
||||
prompt=prompt,
|
||||
priority="background",
|
||||
force_json=True
|
||||
force_json=True,
|
||||
model_override=model
|
||||
)
|
||||
data = json.loads(response_json)
|
||||
|
||||
# Anreicherung mit Provenance-Metadaten für WP-22 Registry
|
||||
# Provenance für die EdgeRegistry
|
||||
for item in data:
|
||||
item["provenance"] = "semantic_ai"
|
||||
item["line"] = f"ai-{provider}"
|
||||
|
|
@ -197,16 +194,15 @@ class IngestionService:
|
|||
return {**result, "status": "skipped", "reason": f"lifecycle_status_{status}"}
|
||||
|
||||
# 2. Type & Config Resolution
|
||||
note_type = resolve_note_type(fm.get("type"), self.type_registry)
|
||||
note_type = resolve_note_type(fm.get("type"), self.registry)
|
||||
fm["type"] = note_type
|
||||
|
||||
effective_profile = effective_chunk_profile_name(fm, note_type, self.type_registry)
|
||||
effective_weight = effective_retriever_weight(fm, note_type, self.type_registry)
|
||||
effective_profile = effective_chunk_profile_name(fm, note_type, self.registry)
|
||||
effective_weight = effective_retriever_weight(fm, note_type, self.registry)
|
||||
|
||||
fm["chunk_profile"] = effective_profile
|
||||
fm["retriever_weight"] = effective_weight
|
||||
|
||||
# 3. Build Note Payload (Inkl. Multi-Hash für WP-22)
|
||||
# 3. Build Note Payload
|
||||
try:
|
||||
note_pl = make_note_payload(parsed, vault_root=vault_root, hash_normalize=hash_normalize, hash_source=hash_source, file_path=file_path)
|
||||
if not note_pl.get("fulltext"): note_pl["fulltext"] = getattr(parsed, "body", "") or ""
|
||||
|
|
@ -244,7 +240,7 @@ class IngestionService:
|
|||
# 5. Processing (Chunking, Embedding, Edge Generation)
|
||||
try:
|
||||
body_text = getattr(parsed, "body", "") or ""
|
||||
edge_registry.ensure_latest() #
|
||||
edge_registry.ensure_latest()
|
||||
|
||||
chunk_config = self._get_chunk_config_by_profile(effective_profile, note_type)
|
||||
chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config)
|
||||
|
|
@ -255,11 +251,11 @@ class IngestionService:
|
|||
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
|
||||
vecs = await self.embedder.embed_documents(texts)
|
||||
|
||||
# --- WP-22: Kanten-Extraktion & Validierung ---
|
||||
# --- WP-22/WP-20: Kanten-Extraktion & Validierung ---
|
||||
edges = []
|
||||
context = {"file": file_path, "note_id": note_id}
|
||||
|
||||
# A. Explizite User-Kanten mit Zeilennummern
|
||||
# A. Explizite User-Kanten
|
||||
explicit_edges = extract_edges_with_context(parsed)
|
||||
for e in explicit_edges:
|
||||
e["kind"] = edge_registry.resolve(edge_type=e["kind"], provenance="explicit", context={**context, "line": e.get("line")})
|
||||
|
|
@ -271,20 +267,24 @@ class IngestionService:
|
|||
e["kind"] = edge_registry.resolve(edge_type=e.get("kind"), provenance="semantic_ai", context={**context, "line": e.get("line")})
|
||||
edges.append(e)
|
||||
|
||||
# C. System-Kanten (Struktur: belongs_to, next, prev)
|
||||
raw_system_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []))
|
||||
# C. System-Kanten
|
||||
try:
|
||||
raw_system_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs)
|
||||
except TypeError:
|
||||
raw_system_edges = build_edges_for_note(note_id, chunk_pls)
|
||||
|
||||
for e in raw_system_edges:
|
||||
e["kind"] = edge_registry.resolve(edge_type=e.get("kind", "belongs_to"), provenance="structure", context={**context, "line": "system"})
|
||||
if e["kind"]: edges.append(e)
|
||||
valid_kind = edge_registry.resolve(edge_type=e.get("kind", "belongs_to"), provenance="structure", context={**context, "line": "system"})
|
||||
e["kind"] = valid_kind
|
||||
if valid_kind: edges.append(e)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Processing failed: {e}", exc_info=True)
|
||||
return {**result, "error": f"Processing failed: {str(e)}"}
|
||||
|
||||
# 6. Upsert in Qdrant
|
||||
# 6. Upsert
|
||||
try:
|
||||
if purge_before and has_old:
|
||||
self._purge_artifacts(note_id)
|
||||
if purge_before and has_old: self._purge_artifacts(note_id)
|
||||
|
||||
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
|
||||
upsert_batch(self.client, n_name, n_pts)
|
||||
|
|
@ -297,10 +297,7 @@ class IngestionService:
|
|||
e_name, e_pts = points_for_edges(self.prefix, edges)
|
||||
upsert_batch(self.client, e_name, e_pts)
|
||||
|
||||
return {
|
||||
"path": file_path, "status": "success", "changed": True, "note_id": note_id,
|
||||
"chunks_count": len(chunk_pls), "edges_count": len(edges)
|
||||
}
|
||||
return {"path": file_path, "status": "success", "changed": True, "note_id": note_id, "chunks_count": len(chunk_pls), "edges_count": len(edges)}
|
||||
except Exception as e:
|
||||
logger.error(f"Upsert failed: {e}", exc_info=True)
|
||||
return {**result, "error": f"DB Upsert failed: {e}"}
|
||||
|
|
@ -331,8 +328,7 @@ class IngestionService:
|
|||
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||||
selector = rest.FilterSelector(filter=f)
|
||||
for suffix in ["chunks", "edges"]:
|
||||
try:
|
||||
self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector)
|
||||
try: self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector)
|
||||
except Exception: pass
|
||||
|
||||
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
|
||||
|
|
@ -346,6 +342,7 @@ class IngestionService:
|
|||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
await asyncio.sleep(0.1)
|
||||
logger.info(f"Written file to {file_path}")
|
||||
except Exception as e:
|
||||
return {"status": "error", "error": f"Disk write failed: {str(e)}"}
|
||||
return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)
|
||||
Loading…
Reference in New Issue
Block a user