neue Wp20

This commit is contained in:
Lars 2025-12-23 17:56:44 +01:00
parent 49b454d2ec
commit a733212c0f

View File

@ -5,7 +5,7 @@ DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen (Notes
WP-22: Integration von Content Lifecycle (Status Gate) und Edge Registry Validation. WP-22: Integration von Content Lifecycle (Status Gate) und Edge Registry Validation.
WP-22: Kontextsensitive Kanten-Validierung mit Fundort-Reporting (Zeilennummern). WP-22: Kontextsensitive Kanten-Validierung mit Fundort-Reporting (Zeilennummern).
WP-22: Multi-Hash Refresh für konsistente Change Detection. WP-22: Multi-Hash Refresh für konsistente Change Detection.
VERSION: 2.11.1 (WP-20 Quota Protection: OpenRouter Priority) VERSION: 2.11.3 (WP-20 Quota Protection & Stability Patch)
STATUS: Active STATUS: Active
DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service, app.services.edge_registry DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service, app.services.edge_registry
EXTERNAL_CONFIG: config/types.yaml, config/prompts.yaml EXTERNAL_CONFIG: config/types.yaml, config/prompts.yaml
@ -22,7 +22,7 @@ from app.core.parser import (
read_markdown, read_markdown,
normalize_frontmatter, normalize_frontmatter,
validate_required_frontmatter, validate_required_frontmatter,
extract_edges_with_context, # WP-22: Funktion für Zeilennummern extract_edges_with_context, # WP-22: Neue Funktion für Zeilennummern
) )
from app.core.note_payload import make_note_payload from app.core.note_payload import make_note_payload
from app.core.chunker import assemble_chunks, get_chunk_config from app.core.chunker import assemble_chunks, get_chunk_config
@ -44,7 +44,7 @@ from app.core.qdrant_points import (
from app.services.embeddings_client import EmbeddingsClient from app.services.embeddings_client import EmbeddingsClient
from app.services.edge_registry import registry as edge_registry from app.services.edge_registry import registry as edge_registry
from app.services.llm_service import LLMService from app.services.llm_service import LLMService # WP-20 Integration
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -52,9 +52,7 @@ logger = logging.getLogger(__name__)
def load_type_registry(custom_path: Optional[str] = None) -> dict: def load_type_registry(custom_path: Optional[str] = None) -> dict:
"""Lädt die types.yaml zur Steuerung der typ-spezifischen Ingestion.""" """Lädt die types.yaml zur Steuerung der typ-spezifischen Ingestion."""
import yaml import yaml
from app.config import get_settings path = custom_path or os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
settings = get_settings()
path = custom_path or getattr(settings, "MINDNET_TYPES_FILE", "config/types.yaml")
if not os.path.exists(path): return {} if not os.path.exists(path): return {}
try: try:
with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {}
@ -111,7 +109,7 @@ class IngestionService:
self.dim = self.cfg.dim if hasattr(self.cfg, 'dim') else self.settings.VECTOR_SIZE self.dim = self.cfg.dim if hasattr(self.cfg, 'dim') else self.settings.VECTOR_SIZE
self.registry = load_type_registry() self.registry = load_type_registry()
self.embedder = EmbeddingsClient() self.embedder = EmbeddingsClient()
self.llm = LLMService() self.llm = LLMService() # WP-20
# Change Detection Modus (full oder body) # Change Detection Modus (full oder body)
self.active_hash_mode = os.getenv("MINDNET_CHANGE_DETECTION_MODE", "full") self.active_hash_mode = os.getenv("MINDNET_CHANGE_DETECTION_MODE", "full")
@ -135,15 +133,15 @@ class IngestionService:
async def _perform_smart_edge_allocation(self, text: str, note_id: str) -> List[Dict]: async def _perform_smart_edge_allocation(self, text: str, note_id: str) -> List[Dict]:
""" """
WP-20: Nutzt den Hybrid LLM Service für die semantische Kanten-Extraktion. WP-20: Nutzt den Hybrid LLM Service für die semantische Kanten-Extraktion.
QUOTEN-SCHUTZ: Priorisiert OpenRouter (Gemma), um Gemini-Tageslimits zu schonen. QUOTEN-SCHUTZ: Bevorzugt OpenRouter (Gemma), um Gemini-Tageslimits zu schützen.
""" """
# Bestimme den Provider für die Ingestion (OpenRouter bevorzugt, falls Key vorhanden) # Bestimme Provider: Nutze OpenRouter falls Key vorhanden, sonst Global Default
provider = "openrouter" if getattr(self.settings, "OPENROUTER_API_KEY", None) else self.settings.MINDNET_LLM_PROVIDER provider = "openrouter" if getattr(self.settings, "OPENROUTER_API_KEY", None) else self.settings.MINDNET_LLM_PROVIDER
# Nutze Gemma-Modell für hohe Ingestion-Quoten (14.4K RPD) via OpenRouter oder Google # Nutze Gemma-Modell für hohen Durchsatz via OpenRouter/Google
model = getattr(self.settings, "GEMMA_MODEL", None) model = getattr(self.settings, "GEMMA_MODEL", None)
# Hole Prompt aus der YAML (Kaskade: Provider -> gemini -> ollama) # Hole das optimierte Prompt-Template (Key: edge_extraction)
template = self.llm.get_prompt("edge_extraction", provider) template = self.llm.get_prompt("edge_extraction", provider)
prompt = template.format(text=text[:6000], note_id=note_id) prompt = template.format(text=text[:6000], note_id=note_id)
@ -158,13 +156,13 @@ class IngestionService:
) )
data = json.loads(response_json) data = json.loads(response_json)
# Provenance für die EdgeRegistry Dokumentation # Metadaten für die Edge-Herkunft setzen
for item in data: for item in data:
item["provenance"] = "semantic_ai" item["provenance"] = "semantic_ai"
item["line"] = f"ai-{provider}" item["line"] = f"ai-{provider}"
return data return data
except Exception as e: except Exception as e:
logger.warning(f"Smart Edge Allocation failed for {note_id} on {provider}: {e}") logger.warning(f"Smart Edge Allocation failed for {note_id}: {e}")
return [] return []
async def process_file( async def process_file(
@ -194,7 +192,6 @@ class IngestionService:
# --- WP-22: Content Lifecycle Gate --- # --- WP-22: Content Lifecycle Gate ---
status = fm.get("status", "draft").lower().strip() status = fm.get("status", "draft").lower().strip()
if status in ["system", "template", "archive", "hidden"]: if status in ["system", "template", "archive", "hidden"]:
logger.info(f"Skipping file {file_path} (Status: {status})")
return {**result, "status": "skipped", "reason": f"lifecycle_status_{status}"} return {**result, "status": "skipped", "reason": f"lifecycle_status_{status}"}
# 2. Type & Config Resolution # 2. Type & Config Resolution
@ -215,7 +212,6 @@ class IngestionService:
note_pl["status"] = status note_pl["status"] = status
note_id = note_pl["note_id"] note_id = note_pl["note_id"]
except Exception as e: except Exception as e:
logger.error(f"Payload build failed: {e}")
return {**result, "error": f"Payload build failed: {str(e)}"} return {**result, "error": f"Payload build failed: {str(e)}"}
# 4. Change Detection (Multi-Hash) # 4. Change Detection (Multi-Hash)
@ -244,7 +240,10 @@ class IngestionService:
# 5. Processing (Chunking, Embedding, Edge Generation) # 5. Processing (Chunking, Embedding, Edge Generation)
try: try:
body_text = getattr(parsed, "body", "") or "" body_text = getattr(parsed, "body", "") or ""
edge_registry.ensure_latest()
# STABILITY PATCH: Prüfen, ob ensure_latest existiert (verhindert AttributeError)
if hasattr(edge_registry, "ensure_latest"):
edge_registry.ensure_latest()
chunk_config = self._get_chunk_config_by_profile(effective_profile, note_type) chunk_config = self._get_chunk_config_by_profile(effective_profile, note_type)
chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config) chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config)
@ -259,7 +258,7 @@ class IngestionService:
edges = [] edges = []
context = {"file": file_path, "note_id": note_id} context = {"file": file_path, "note_id": note_id}
# A. Explizite User-Kanten (Wiki-Links) # A. Explizite User-Kanten
explicit_edges = extract_edges_with_context(parsed) explicit_edges = extract_edges_with_context(parsed)
for e in explicit_edges: for e in explicit_edges:
e["kind"] = edge_registry.resolve(edge_type=e["kind"], provenance="explicit", context={**context, "line": e.get("line")}) e["kind"] = edge_registry.resolve(edge_type=e["kind"], provenance="explicit", context={**context, "line": e.get("line")})
@ -271,7 +270,7 @@ class IngestionService:
e["kind"] = edge_registry.resolve(edge_type=e.get("kind"), provenance="semantic_ai", context={**context, "line": e.get("line")}) e["kind"] = edge_registry.resolve(edge_type=e.get("kind"), provenance="semantic_ai", context={**context, "line": e.get("line")})
edges.append(e) edges.append(e)
# C. System-Kanten (Graph-Struktur) # C. System-Kanten
try: try:
raw_system_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs) raw_system_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs)
except TypeError: except TypeError:
@ -279,14 +278,15 @@ class IngestionService:
for e in raw_system_edges: for e in raw_system_edges:
valid_kind = edge_registry.resolve(edge_type=e.get("kind", "belongs_to"), provenance="structure", context={**context, "line": "system"}) valid_kind = edge_registry.resolve(edge_type=e.get("kind", "belongs_to"), provenance="structure", context={**context, "line": "system"})
e["kind"] = valid_kind if valid_kind:
if valid_kind: edges.append(e) e["kind"] = valid_kind
edges.append(e)
except Exception as e: except Exception as e:
logger.error(f"Processing failed: {e}", exc_info=True) logger.error(f"Processing failed: {e}", exc_info=True)
return {**result, "error": f"Processing failed: {str(e)}"} return {**result, "error": f"Processing failed: {str(e)}"}
# 6. Upsert in Qdrant # 6. Upsert
try: try:
if purge_before and has_old: self._purge_artifacts(note_id) if purge_before and has_old: self._purge_artifacts(note_id)
@ -307,7 +307,6 @@ class IngestionService:
return {**result, "error": f"DB Upsert failed: {e}"} return {**result, "error": f"DB Upsert failed: {e}"}
def _fetch_note_payload(self, note_id: str) -> Optional[dict]: def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
"""Holt das aktuelle Payload einer Note aus Qdrant."""
from qdrant_client.http import models as rest from qdrant_client.http import models as rest
col = f"{self.prefix}_notes" col = f"{self.prefix}_notes"
try: try:
@ -317,7 +316,6 @@ class IngestionService:
except: return None except: return None
def _artifacts_missing(self, note_id: str) -> Tuple[bool, bool]: def _artifacts_missing(self, note_id: str) -> Tuple[bool, bool]:
"""Prüft, ob Chunks oder Kanten für eine Note fehlen."""
from qdrant_client.http import models as rest from qdrant_client.http import models as rest
try: try:
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
@ -327,26 +325,9 @@ class IngestionService:
except: return True, True except: return True, True
def _purge_artifacts(self, note_id: str): def _purge_artifacts(self, note_id: str):
"""Löscht alle Chunks und Edges einer Note."""
from qdrant_client.http import models as rest from qdrant_client.http import models as rest
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
selector = rest.FilterSelector(filter=f) selector = rest.FilterSelector(filter=f)
for suffix in ["chunks", "edges"]: for suffix in ["chunks", "edges"]:
try: self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector) try: self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector)
except Exception: pass except Exception: pass
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
"""Hilfsmethode zur Erstellung einer Note aus einem Textstream."""
target_dir = os.path.join(vault_root, folder)
os.makedirs(target_dir, exist_ok=True)
file_path = os.path.join(target_dir, filename)
try:
with open(file_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
f.flush()
os.fsync(f.fileno())
await asyncio.sleep(0.1)
logger.info(f"Written file to {file_path}")
except Exception as e:
return {"status": "error", "error": f"Disk write failed: {str(e)}"}
return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)