diff --git a/app/core/ingestion.py b/app/core/ingestion.py index fdd63f9..1fbbf5e 100644 --- a/app/core/ingestion.py +++ b/app/core/ingestion.py @@ -1,7 +1,7 @@ """ FILE: app/core/ingestion.py DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen (Notes, Chunks, Edges). - WP-20: Integration von Smart Edge Allocation via Hybrid LLM (Gemini/Ollama). + WP-20: Integration von Smart Edge Allocation via Hybrid LLM (Gemini/Gemma/OpenRouter). WP-22: Integration von Content Lifecycle (Status Gate) und Edge Registry Validation. WP-22: Kontextsensitive Kanten-Validierung mit Fundort-Reporting (Zeilennummern). WP-22: Multi-Hash Refresh für konsistente Change Detection. @@ -22,7 +22,7 @@ from app.core.parser import ( read_markdown, normalize_frontmatter, validate_required_frontmatter, - extract_edges_with_context, # + extract_edges_with_context, # WP-22: Funktion für Zeilennummern ) from app.core.note_payload import make_note_payload from app.core.chunker import assemble_chunks, get_chunk_config @@ -44,7 +44,7 @@ from app.core.qdrant_points import ( from app.services.embeddings_client import EmbeddingsClient from app.services.edge_registry import registry as edge_registry -from app.services.llm_service import LLMService # +from app.services.llm_service import LLMService logger = logging.getLogger(__name__) @@ -52,7 +52,9 @@ logger = logging.getLogger(__name__) def load_type_registry(custom_path: Optional[str] = None) -> dict: """Lädt die types.yaml zur Steuerung der typ-spezifischen Ingestion.""" import yaml - path = custom_path or os.getenv("MINDNET_TYPES_FILE", "config/types.yaml") + from app.config import get_settings + settings = get_settings() + path = custom_path or getattr(settings, "MINDNET_TYPES_FILE", "config/types.yaml") if not os.path.exists(path): return {} try: with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} @@ -100,17 +102,18 @@ def effective_retriever_weight(fm: dict, note_type: str, reg: dict) -> float: class IngestionService: def __init__(self, collection_prefix: str = None): from app.config import get_settings - self.settings = get_settings() # + self.settings = get_settings() self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX self.cfg = QdrantConfig.from_env() self.cfg.prefix = self.prefix self.client = get_client(self.cfg) - self.dim = self.cfg.VECTOR_SIZE # - self.type_registry = load_type_registry() + self.dim = self.cfg.dim if hasattr(self.cfg, 'dim') else self.settings.VECTOR_SIZE + self.registry = load_type_registry() self.embedder = EmbeddingsClient() - self.llm = LLMService() # + self.llm = LLMService() # WP-20 Integration + # Change Detection Modus (full oder body) self.active_hash_mode = os.getenv("MINDNET_CHANGE_DETECTION_MODE", "full") try: @@ -120,8 +123,8 @@ class IngestionService: logger.warning(f"DB init warning: {e}") def _get_chunk_config_by_profile(self, profile_name: str, note_type: str) -> Dict[str, Any]: - """Holt die Chunker-Parameter für ein spezifisches Profil.""" - profiles = self.type_registry.get("chunking_profiles", {}) + """Holt die Chunker-Parameter (max, target, overlap) für ein spezifisches Profil.""" + profiles = self.registry.get("chunking_profiles", {}) if profile_name in profiles: cfg = profiles[profile_name].copy() if "overlap" in cfg and isinstance(cfg["overlap"], list): @@ -134,30 +137,24 @@ class IngestionService: WP-20: Nutzt den Hybrid LLM Service für die semantische Kanten-Extraktion. Verwendet provider-spezifische Prompts aus der config. """ - provider = self.settings.MINDNET_LLM_PROVIDER # + # Wir priorisieren Gemma für Ingestion, falls verfügbar (OpenRouter/Cloud) + model = getattr(self.settings, "GEMMA_MODEL", None) + provider = self.settings.MINDNET_LLM_PROVIDER - # Prompt-Lookup (Fallback auf Standard-Struktur falls Key fehlt) - prompt_data = self.llm.prompts.get("edge_extraction", {}) - if isinstance(prompt_data, dict): - template = prompt_data.get(provider, prompt_data.get("ollama", "")) - else: - template = str(prompt_data) - - if not template: - template = "Extrahiere semantische Relationen aus: {text}. Antworte als JSON: [{\"to\": \"X\", \"kind\": \"Y\"}]" - + template = self.llm.get_prompt("edge_extraction") prompt = template.format(text=text[:6000], note_id=note_id) try: - # Nutzt die Semaphore für Hintergrund-Tasks + # Hintergrund-Task mit Semaphore response_json = await self.llm.generate_raw_response( prompt=prompt, priority="background", - force_json=True + force_json=True, + model_override=model ) data = json.loads(response_json) - # Anreicherung mit Provenance-Metadaten für WP-22 Registry + # Provenance für die EdgeRegistry for item in data: item["provenance"] = "semantic_ai" item["line"] = f"ai-{provider}" @@ -197,16 +194,15 @@ class IngestionService: return {**result, "status": "skipped", "reason": f"lifecycle_status_{status}"} # 2. Type & Config Resolution - note_type = resolve_note_type(fm.get("type"), self.type_registry) + note_type = resolve_note_type(fm.get("type"), self.registry) fm["type"] = note_type - - effective_profile = effective_chunk_profile_name(fm, note_type, self.type_registry) - effective_weight = effective_retriever_weight(fm, note_type, self.type_registry) + effective_profile = effective_chunk_profile_name(fm, note_type, self.registry) + effective_weight = effective_retriever_weight(fm, note_type, self.registry) fm["chunk_profile"] = effective_profile fm["retriever_weight"] = effective_weight - # 3. Build Note Payload (Inkl. Multi-Hash für WP-22) + # 3. Build Note Payload try: note_pl = make_note_payload(parsed, vault_root=vault_root, hash_normalize=hash_normalize, hash_source=hash_source, file_path=file_path) if not note_pl.get("fulltext"): note_pl["fulltext"] = getattr(parsed, "body", "") or "" @@ -244,7 +240,7 @@ class IngestionService: # 5. Processing (Chunking, Embedding, Edge Generation) try: body_text = getattr(parsed, "body", "") or "" - edge_registry.ensure_latest() # + edge_registry.ensure_latest() chunk_config = self._get_chunk_config_by_profile(effective_profile, note_type) chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config) @@ -255,11 +251,11 @@ class IngestionService: texts = [c.get("window") or c.get("text") or "" for c in chunk_pls] vecs = await self.embedder.embed_documents(texts) - # --- WP-22: Kanten-Extraktion & Validierung --- + # --- WP-22/WP-20: Kanten-Extraktion & Validierung --- edges = [] context = {"file": file_path, "note_id": note_id} - # A. Explizite User-Kanten mit Zeilennummern + # A. Explizite User-Kanten explicit_edges = extract_edges_with_context(parsed) for e in explicit_edges: e["kind"] = edge_registry.resolve(edge_type=e["kind"], provenance="explicit", context={**context, "line": e.get("line")}) @@ -271,20 +267,24 @@ class IngestionService: e["kind"] = edge_registry.resolve(edge_type=e.get("kind"), provenance="semantic_ai", context={**context, "line": e.get("line")}) edges.append(e) - # C. System-Kanten (Struktur: belongs_to, next, prev) - raw_system_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", [])) + # C. System-Kanten + try: + raw_system_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs) + except TypeError: + raw_system_edges = build_edges_for_note(note_id, chunk_pls) + for e in raw_system_edges: - e["kind"] = edge_registry.resolve(edge_type=e.get("kind", "belongs_to"), provenance="structure", context={**context, "line": "system"}) - if e["kind"]: edges.append(e) + valid_kind = edge_registry.resolve(edge_type=e.get("kind", "belongs_to"), provenance="structure", context={**context, "line": "system"}) + e["kind"] = valid_kind + if valid_kind: edges.append(e) except Exception as e: logger.error(f"Processing failed: {e}", exc_info=True) return {**result, "error": f"Processing failed: {str(e)}"} - # 6. Upsert in Qdrant + # 6. Upsert try: - if purge_before and has_old: - self._purge_artifacts(note_id) + if purge_before and has_old: self._purge_artifacts(note_id) n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) upsert_batch(self.client, n_name, n_pts) @@ -297,10 +297,7 @@ class IngestionService: e_name, e_pts = points_for_edges(self.prefix, edges) upsert_batch(self.client, e_name, e_pts) - return { - "path": file_path, "status": "success", "changed": True, "note_id": note_id, - "chunks_count": len(chunk_pls), "edges_count": len(edges) - } + return {"path": file_path, "status": "success", "changed": True, "note_id": note_id, "chunks_count": len(chunk_pls), "edges_count": len(edges)} except Exception as e: logger.error(f"Upsert failed: {e}", exc_info=True) return {**result, "error": f"DB Upsert failed: {e}"} @@ -331,8 +328,7 @@ class IngestionService: f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) selector = rest.FilterSelector(filter=f) for suffix in ["chunks", "edges"]: - try: - self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector) + try: self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector) except Exception: pass async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]: @@ -346,6 +342,7 @@ class IngestionService: f.flush() os.fsync(f.fileno()) await asyncio.sleep(0.1) + logger.info(f"Written file to {file_path}") except Exception as e: return {"status": "error", "error": f"Disk write failed: {str(e)}"} return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) \ No newline at end of file