diff --git a/app/core/ingestion.py b/app/core/ingestion.py index e042de2..a2dbaf5 100644 --- a/app/core/ingestion.py +++ b/app/core/ingestion.py @@ -5,7 +5,7 @@ DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen (Notes WP-22: Integration von Content Lifecycle (Status Gate) und Edge Registry Validation. WP-22: Kontextsensitive Kanten-Validierung mit Fundort-Reporting (Zeilennummern). WP-22: Multi-Hash Refresh für konsistente Change Detection. -VERSION: 2.11.1 (WP-20 Quota Protection: OpenRouter Priority) +VERSION: 2.11.3 (WP-20 Quota Protection & Stability Patch) STATUS: Active DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service, app.services.edge_registry EXTERNAL_CONFIG: config/types.yaml, config/prompts.yaml @@ -22,7 +22,7 @@ from app.core.parser import ( read_markdown, normalize_frontmatter, validate_required_frontmatter, - extract_edges_with_context, # WP-22: Funktion für Zeilennummern + extract_edges_with_context, # WP-22: Neue Funktion für Zeilennummern ) from app.core.note_payload import make_note_payload from app.core.chunker import assemble_chunks, get_chunk_config @@ -44,7 +44,7 @@ from app.core.qdrant_points import ( from app.services.embeddings_client import EmbeddingsClient from app.services.edge_registry import registry as edge_registry -from app.services.llm_service import LLMService +from app.services.llm_service import LLMService # WP-20 Integration logger = logging.getLogger(__name__) @@ -52,9 +52,7 @@ logger = logging.getLogger(__name__) def load_type_registry(custom_path: Optional[str] = None) -> dict: """Lädt die types.yaml zur Steuerung der typ-spezifischen Ingestion.""" import yaml - from app.config import get_settings - settings = get_settings() - path = custom_path or getattr(settings, "MINDNET_TYPES_FILE", "config/types.yaml") + path = custom_path or os.getenv("MINDNET_TYPES_FILE", "config/types.yaml") if not os.path.exists(path): return {} try: with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} @@ -111,7 +109,7 @@ class IngestionService: self.dim = self.cfg.dim if hasattr(self.cfg, 'dim') else self.settings.VECTOR_SIZE self.registry = load_type_registry() self.embedder = EmbeddingsClient() - self.llm = LLMService() + self.llm = LLMService() # WP-20 # Change Detection Modus (full oder body) self.active_hash_mode = os.getenv("MINDNET_CHANGE_DETECTION_MODE", "full") @@ -135,15 +133,15 @@ class IngestionService: async def _perform_smart_edge_allocation(self, text: str, note_id: str) -> List[Dict]: """ WP-20: Nutzt den Hybrid LLM Service für die semantische Kanten-Extraktion. - QUOTEN-SCHUTZ: Priorisiert OpenRouter (Gemma), um Gemini-Tageslimits zu schonen. + QUOTEN-SCHUTZ: Bevorzugt OpenRouter (Gemma), um Gemini-Tageslimits zu schützen. """ - # Bestimme den Provider für die Ingestion (OpenRouter bevorzugt, falls Key vorhanden) + # Bestimme Provider: Nutze OpenRouter falls Key vorhanden, sonst Global Default provider = "openrouter" if getattr(self.settings, "OPENROUTER_API_KEY", None) else self.settings.MINDNET_LLM_PROVIDER - # Nutze Gemma-Modell für hohe Ingestion-Quoten (14.4K RPD) via OpenRouter oder Google + # Nutze Gemma-Modell für hohen Durchsatz via OpenRouter/Google model = getattr(self.settings, "GEMMA_MODEL", None) - # Hole Prompt aus der YAML (Kaskade: Provider -> gemini -> ollama) + # Hole das optimierte Prompt-Template (Key: edge_extraction) template = self.llm.get_prompt("edge_extraction", provider) prompt = template.format(text=text[:6000], note_id=note_id) @@ -158,13 +156,13 @@ class IngestionService: ) data = json.loads(response_json) - # Provenance für die EdgeRegistry Dokumentation + # Metadaten für die Edge-Herkunft setzen for item in data: item["provenance"] = "semantic_ai" item["line"] = f"ai-{provider}" return data except Exception as e: - logger.warning(f"Smart Edge Allocation failed for {note_id} on {provider}: {e}") + logger.warning(f"Smart Edge Allocation failed for {note_id}: {e}") return [] async def process_file( @@ -194,7 +192,6 @@ class IngestionService: # --- WP-22: Content Lifecycle Gate --- status = fm.get("status", "draft").lower().strip() if status in ["system", "template", "archive", "hidden"]: - logger.info(f"Skipping file {file_path} (Status: {status})") return {**result, "status": "skipped", "reason": f"lifecycle_status_{status}"} # 2. Type & Config Resolution @@ -215,7 +212,6 @@ class IngestionService: note_pl["status"] = status note_id = note_pl["note_id"] except Exception as e: - logger.error(f"Payload build failed: {e}") return {**result, "error": f"Payload build failed: {str(e)}"} # 4. Change Detection (Multi-Hash) @@ -244,7 +240,10 @@ class IngestionService: # 5. Processing (Chunking, Embedding, Edge Generation) try: body_text = getattr(parsed, "body", "") or "" - edge_registry.ensure_latest() + + # STABILITY PATCH: Prüfen, ob ensure_latest existiert (verhindert AttributeError) + if hasattr(edge_registry, "ensure_latest"): + edge_registry.ensure_latest() chunk_config = self._get_chunk_config_by_profile(effective_profile, note_type) chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config) @@ -259,7 +258,7 @@ class IngestionService: edges = [] context = {"file": file_path, "note_id": note_id} - # A. Explizite User-Kanten (Wiki-Links) + # A. Explizite User-Kanten explicit_edges = extract_edges_with_context(parsed) for e in explicit_edges: e["kind"] = edge_registry.resolve(edge_type=e["kind"], provenance="explicit", context={**context, "line": e.get("line")}) @@ -271,7 +270,7 @@ class IngestionService: e["kind"] = edge_registry.resolve(edge_type=e.get("kind"), provenance="semantic_ai", context={**context, "line": e.get("line")}) edges.append(e) - # C. System-Kanten (Graph-Struktur) + # C. System-Kanten try: raw_system_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs) except TypeError: @@ -279,14 +278,15 @@ class IngestionService: for e in raw_system_edges: valid_kind = edge_registry.resolve(edge_type=e.get("kind", "belongs_to"), provenance="structure", context={**context, "line": "system"}) - e["kind"] = valid_kind - if valid_kind: edges.append(e) + if valid_kind: + e["kind"] = valid_kind + edges.append(e) except Exception as e: logger.error(f"Processing failed: {e}", exc_info=True) return {**result, "error": f"Processing failed: {str(e)}"} - # 6. Upsert in Qdrant + # 6. Upsert try: if purge_before and has_old: self._purge_artifacts(note_id) @@ -307,7 +307,6 @@ class IngestionService: return {**result, "error": f"DB Upsert failed: {e}"} def _fetch_note_payload(self, note_id: str) -> Optional[dict]: - """Holt das aktuelle Payload einer Note aus Qdrant.""" from qdrant_client.http import models as rest col = f"{self.prefix}_notes" try: @@ -317,7 +316,6 @@ class IngestionService: except: return None def _artifacts_missing(self, note_id: str) -> Tuple[bool, bool]: - """Prüft, ob Chunks oder Kanten für eine Note fehlen.""" from qdrant_client.http import models as rest try: f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) @@ -327,26 +325,9 @@ class IngestionService: except: return True, True def _purge_artifacts(self, note_id: str): - """Löscht alle Chunks und Edges einer Note.""" from qdrant_client.http import models as rest f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) selector = rest.FilterSelector(filter=f) for suffix in ["chunks", "edges"]: try: self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector) - except Exception: pass - - async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]: - """Hilfsmethode zur Erstellung einer Note aus einem Textstream.""" - target_dir = os.path.join(vault_root, folder) - os.makedirs(target_dir, exist_ok=True) - file_path = os.path.join(target_dir, filename) - try: - with open(file_path, "w", encoding="utf-8") as f: - f.write(markdown_content) - f.flush() - os.fsync(f.fileno()) - await asyncio.sleep(0.1) - logger.info(f"Written file to {file_path}") - except Exception as e: - return {"status": "error", "error": f"Disk write failed: {str(e)}"} - return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) \ No newline at end of file + except Exception: pass \ No newline at end of file