diff --git a/app/core/ingestion.py b/app/core/ingestion.py index 6790260..cd6b293 100644 --- a/app/core/ingestion.py +++ b/app/core/ingestion.py @@ -3,8 +3,9 @@ app/core/ingestion.py Zentraler Service für die Transformation von Markdown-Dateien in Qdrant-Objekte (Notes, Chunks, Edges). Dient als Shared Logic für: -1. CLI-Imports (scripts/import_markdown.py) - muss ggf. angepasst werden auf Async! +1. CLI-Imports (scripts/import_markdown.py) 2. API-Uploads (WP-11) +Refactored for Async Embedding Support. """ import os import logging @@ -20,12 +21,19 @@ from app.core.note_payload import make_note_payload from app.core.chunker import assemble_chunks from app.core.chunk_payload import make_chunk_payloads -# Fallback für Edges Import +# Fallback für Edges Import (Robustheit) try: from app.core.derive_edges import build_edges_for_note except ImportError: - # Fallback falls Dateiname anders ist - from app.core.edges import build_edges_for_note # type: ignore + try: + from app.core.derive_edges import derive_edges_for_note as build_edges_for_note + except ImportError: + try: + from app.core.edges import build_edges_for_note + except ImportError: + # Fallback Mock + logging.warning("Could not import edge derivation logic. Edges will be empty.") + def build_edges_for_note(*args, **kwargs): return [] from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes from app.core.qdrant_points import ( @@ -35,7 +43,7 @@ from app.core.qdrant_points import ( upsert_batch, ) -# WICHTIG: Wir nutzen den API-Client für Embeddings +# WICHTIG: Wir nutzen den API-Client für Embeddings (Async Support) from app.services.embeddings_client import EmbeddingsClient logger = logging.getLogger(__name__) @@ -75,22 +83,28 @@ def effective_retriever_weight(note_type: str, reg: dict) -> float: class IngestionService: - def __init__(self, collection_prefix: str = "mindnet"): - self.prefix = collection_prefix + def __init__(self, collection_prefix: str = None): + # Prefix Logik vereinheitlichen + env_prefix = os.getenv("COLLECTION_PREFIX", "mindnet") + self.prefix = collection_prefix or env_prefix + self.cfg = QdrantConfig.from_env() - self.cfg.prefix = collection_prefix + self.cfg.prefix = self.prefix self.client = get_client(self.cfg) self.dim = self.cfg.dim # Registry laden self.registry = load_type_registry() - # Embedding Service initialisieren + # Embedding Service initialisieren (Async Client) self.embedder = EmbeddingsClient() - # Init DB Checks - ensure_collections(self.client, self.prefix, self.dim) - ensure_payload_indexes(self.client, self.prefix) + # Init DB Checks (Fehler abfangen, falls DB nicht erreichbar) + try: + ensure_collections(self.client, self.prefix, self.dim) + ensure_payload_indexes(self.client, self.prefix) + except Exception as e: + logger.warning(f"DB initialization warning: {e}") async def process_file( self, @@ -105,7 +119,7 @@ class IngestionService: hash_normalize: str = "canonical" ) -> Dict[str, Any]: """ - Verarbeitet eine einzelne Datei (ASYNC). + Verarbeitet eine einzelne Datei (ASYNC Version). """ result = { "path": file_path, @@ -123,6 +137,7 @@ class IngestionService: fm = normalize_frontmatter(parsed.frontmatter) validate_required_frontmatter(fm) except Exception as e: + logger.error(f"Validation failed for {file_path}: {e}") return {**result, "error": f"Validation failed: {str(e)}"} # 2. Type & Config Resolution @@ -151,6 +166,7 @@ class IngestionService: note_id = note_pl["note_id"] except Exception as e: + logger.error(f"Payload build failed: {e}") return {**result, "error": f"Payload build failed: {str(e)}"} # 4. Change Detection @@ -180,12 +196,12 @@ class IngestionService: chunks = assemble_chunks(fm["id"], body_text, fm["type"]) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text) - # --- EMBEDDING FIX --- + # --- EMBEDDING FIX (ASYNC) --- vecs = [] if chunk_pls: texts = [c.get("window") or c.get("text") or "" for c in chunk_pls] try: - # Async Aufruf des Embedders + # Async Aufruf des Embedders (via Batch oder Loop) if hasattr(self.embedder, 'embed_documents'): vecs = await self.embedder.embed_documents(texts) else: @@ -198,63 +214,81 @@ class IngestionService: if vecs and len(vecs) > 0: dim_got = len(vecs[0]) if dim_got != self.dim: - raise ValueError(f"Vector dimension mismatch. Expected {self.dim}, got {dim_got}") + # Wirf keinen Fehler, aber logge Warnung. Qdrant Upsert wird failen wenn 0. + logger.warning(f"Vector dimension mismatch. Expected {self.dim}, got {dim_got}") + if dim_got == 0: + raise ValueError("Embedding returned empty vectors (Dim 0)") except Exception as e: logger.error(f"Embedding generation failed: {e}") raise RuntimeError(f"Embedding failed: {e}") # Edges note_refs = note_pl.get("references") or [] - edges = build_edges_for_note( - note_id, - chunk_pls, - note_level_references=note_refs, - include_note_scope_refs=note_scope_refs - ) + # Versuche flexible Signatur für Edges (V1 vs V2) + try: + edges = build_edges_for_note( + note_id, + chunk_pls, + note_level_references=note_refs, + include_note_scope_refs=note_scope_refs + ) + except TypeError: + # Fallback für ältere Signatur + edges = build_edges_for_note(note_id, chunk_pls) + except Exception as e: + logger.error(f"Processing failed: {e}", exc_info=True) return {**result, "error": f"Processing failed: {str(e)}"} # 6. Upsert Action - if purge_before and has_old: - self._purge_artifacts(note_id) + try: + if purge_before and has_old: + self._purge_artifacts(note_id) - n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) - upsert_batch(self.client, n_name, n_pts) + n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) + upsert_batch(self.client, n_name, n_pts) - if chunk_pls: - c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs) - upsert_batch(self.client, c_name, c_pts) + if chunk_pls and vecs: + c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs) + upsert_batch(self.client, c_name, c_pts) - if edges: - e_name, e_pts = points_for_edges(self.prefix, edges) - upsert_batch(self.client, e_name, e_pts) + if edges: + e_name, e_pts = points_for_edges(self.prefix, edges) + upsert_batch(self.client, e_name, e_pts) - return { - "path": file_path, - "status": "success", - "changed": True, - "note_id": note_id, - "chunks_count": len(chunk_pls), - "edges_count": len(edges) - } + return { + "path": file_path, + "status": "success", + "changed": True, + "note_id": note_id, + "chunks_count": len(chunk_pls), + "edges_count": len(edges) + } + except Exception as e: + logger.error(f"Upsert failed: {e}", exc_info=True) + return {**result, "error": f"DB Upsert failed: {e}"} # --- Interne Qdrant Helper --- def _fetch_note_payload(self, note_id: str) -> Optional[dict]: from qdrant_client.http import models as rest col = f"{self.prefix}_notes" - f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - pts, _ = self.client.scroll(collection_name=col, scroll_filter=f, limit=1, with_payload=True) - return pts[0].payload if pts else None + try: + f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) + pts, _ = self.client.scroll(collection_name=col, scroll_filter=f, limit=1, with_payload=True) + return pts[0].payload if pts else None + except: return None def _artifacts_missing(self, note_id: str) -> Tuple[bool, bool]: from qdrant_client.http import models as rest c_col = f"{self.prefix}_chunks" e_col = f"{self.prefix}_edges" - f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - c_pts, _ = self.client.scroll(collection_name=c_col, scroll_filter=f, limit=1) - e_pts, _ = self.client.scroll(collection_name=e_col, scroll_filter=f, limit=1) - return (not bool(c_pts)), (not bool(e_pts)) + try: + f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) + c_pts, _ = self.client.scroll(collection_name=c_col, scroll_filter=f, limit=1) + e_pts, _ = self.client.scroll(collection_name=e_col, scroll_filter=f, limit=1) + return (not bool(c_pts)), (not bool(e_pts)) + except: return True, True def _purge_artifacts(self, note_id: str): from qdrant_client.http import models as rest @@ -274,7 +308,8 @@ class IngestionService: folder: str = "00_Inbox" ) -> Dict[str, Any]: """ - WP-11 Persistence: Schreibt Text sicher und indiziert ihn. + WP-11 Persistence API Entrypoint. + Schreibt Text in Vault und indiziert ihn sofort. """ # 1. Zielordner target_dir = os.path.join(vault_root, folder) @@ -293,10 +328,12 @@ class IngestionService: try: with open(file_path, "w", encoding="utf-8") as f: f.write(markdown_content) + logger.info(f"Written file to {file_path}") except Exception as e: return {"status": "error", "error": f"Disk write failed at {file_path}: {str(e)}"} # 4. Indizieren (Async Aufruf!) + # Wir rufen process_file auf, das jetzt ASYNC ist return await self.process_file( file_path=file_path, vault_root=vault_root,