new ingestion, asyn writing, robust

This commit is contained in:
Lars 2025-12-11 10:23:23 +01:00
parent a7cda3f51c
commit e7cec6acc6

View File

@ -3,13 +3,14 @@ app/core/ingestion.py
Zentraler Service für die Transformation von Markdown-Dateien in Qdrant-Objekte (Notes, Chunks, Edges). Zentraler Service für die Transformation von Markdown-Dateien in Qdrant-Objekte (Notes, Chunks, Edges).
Dient als Shared Logic für: Dient als Shared Logic für:
1. CLI-Imports (scripts/import_markdown.py) 1. CLI-Imports (scripts/import_markdown.py) - muss ggf. angepasst werden auf Async!
2. API-Uploads (WP-11) 2. API-Uploads (WP-11)
""" """
import os import os
import json import logging
from typing import Dict, List, Optional, Tuple, Any, Set from typing import Dict, List, Optional, Tuple, Any
# Core Module Imports
from app.core.parser import ( from app.core.parser import (
read_markdown, read_markdown,
normalize_frontmatter, normalize_frontmatter,
@ -18,10 +19,12 @@ from app.core.parser import (
from app.core.note_payload import make_note_payload from app.core.note_payload import make_note_payload
from app.core.chunker import assemble_chunks from app.core.chunker import assemble_chunks
from app.core.chunk_payload import make_chunk_payloads from app.core.chunk_payload import make_chunk_payloads
# Fallback Imports wie im Original-Skript
# Fallback für Edges Import
try: try:
from app.core.derive_edges import build_edges_for_note from app.core.derive_edges import build_edges_for_note
except ImportError: except ImportError:
# Fallback falls Dateiname anders ist
from app.core.edges import build_edges_for_note # type: ignore from app.core.edges import build_edges_for_note # type: ignore
from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
@ -32,18 +35,16 @@ from app.core.qdrant_points import (
upsert_batch, upsert_batch,
) )
# Optionales Embedding # WICHTIG: Wir nutzen den API-Client für Embeddings
try: from app.services.embeddings_client import EmbeddingsClient
from app.core.embed import embed_texts
except ImportError:
embed_texts = None
# --- Helper für Type-Registry (ausgelagert aus Script) --- logger = logging.getLogger(__name__)
# --- Helper für Type-Registry ---
def load_type_registry(custom_path: Optional[str] = None) -> dict: def load_type_registry(custom_path: Optional[str] = None) -> dict:
import yaml import yaml
path = custom_path or os.getenv("MINDNET_TYPES_FILE", "config/types.yaml") path = custom_path or os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
if not os.path.exists(path): if not os.path.exists(path):
# Fallback auf Root-Ebene (für Tests/CLI)
if os.path.exists("types.yaml"): if os.path.exists("types.yaml"):
path = "types.yaml" path = "types.yaml"
else: else:
@ -58,14 +59,12 @@ def resolve_note_type(requested: Optional[str], reg: dict) -> str:
types = reg.get("types", {}) types = reg.get("types", {})
if requested and requested in types: if requested and requested in types:
return requested return requested
return "concept" # Default Fallback return "concept"
def effective_chunk_profile(note_type: str, reg: dict) -> str: def effective_chunk_profile(note_type: str, reg: dict) -> str:
# 1. Specific Type
t_cfg = reg.get("types", {}).get(note_type, {}) t_cfg = reg.get("types", {}).get(note_type, {})
if t_cfg and t_cfg.get("chunk_profile"): if t_cfg and t_cfg.get("chunk_profile"):
return t_cfg.get("chunk_profile") return t_cfg.get("chunk_profile")
# 2. Defaults
return reg.get("defaults", {}).get("chunk_profile", "default") return reg.get("defaults", {}).get("chunk_profile", "default")
def effective_retriever_weight(note_type: str, reg: dict) -> float: def effective_retriever_weight(note_type: str, reg: dict) -> float:
@ -79,18 +78,21 @@ class IngestionService:
def __init__(self, collection_prefix: str = "mindnet"): def __init__(self, collection_prefix: str = "mindnet"):
self.prefix = collection_prefix self.prefix = collection_prefix
self.cfg = QdrantConfig.from_env() self.cfg = QdrantConfig.from_env()
self.cfg.prefix = collection_prefix # Override env if needed self.cfg.prefix = collection_prefix
self.client = get_client(self.cfg) self.client = get_client(self.cfg)
self.dim = self.cfg.dim self.dim = self.cfg.dim
# Registry laden # Registry laden
self.registry = load_type_registry() self.registry = load_type_registry()
# Embedding Service initialisieren
self.embedder = EmbeddingsClient()
# Init DB Checks # Init DB Checks
ensure_collections(self.client, self.prefix, self.dim) ensure_collections(self.client, self.prefix, self.dim)
ensure_payload_indexes(self.client, self.prefix) ensure_payload_indexes(self.client, self.prefix)
def process_file( async def process_file(
self, self,
file_path: str, file_path: str,
vault_root: str, vault_root: str,
@ -103,8 +105,7 @@ class IngestionService:
hash_normalize: str = "canonical" hash_normalize: str = "canonical"
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Verarbeitet eine einzelne Datei. Verarbeitet eine einzelne Datei (ASYNC).
Return: Summary Dict (Erfolg, Änderungen, Stats).
""" """
result = { result = {
"path": file_path, "path": file_path,
@ -129,7 +130,6 @@ class IngestionService:
fm["type"] = note_type fm["type"] = note_type
fm["chunk_profile"] = effective_chunk_profile(note_type, self.registry) fm["chunk_profile"] = effective_chunk_profile(note_type, self.registry)
# Weight Resolution (Frontmatter override > Registry)
weight = fm.get("retriever_weight") weight = fm.get("retriever_weight")
if weight is None: if weight is None:
weight = effective_retriever_weight(note_type, self.registry) weight = effective_retriever_weight(note_type, self.registry)
@ -145,7 +145,6 @@ class IngestionService:
hash_source=hash_source, hash_source=hash_source,
file_path=file_path file_path=file_path
) )
# Ensure fulltext & weight
if not note_pl.get("fulltext"): if not note_pl.get("fulltext"):
note_pl["fulltext"] = getattr(parsed, "body", "") or "" note_pl["fulltext"] = getattr(parsed, "body", "") or ""
note_pl["retriever_weight"] = fm["retriever_weight"] note_pl["retriever_weight"] = fm["retriever_weight"]
@ -154,21 +153,17 @@ class IngestionService:
except Exception as e: except Exception as e:
return {**result, "error": f"Payload build failed: {str(e)}"} return {**result, "error": f"Payload build failed: {str(e)}"}
# 4. Change Detection (Hash Check) # 4. Change Detection
# Wir holen den alten Payload aus Qdrant, wenn wir nicht forcen
old_payload = None old_payload = None
if not force_replace: if not force_replace:
old_payload = self._fetch_note_payload(note_id) old_payload = self._fetch_note_payload(note_id)
has_old = old_payload is not None has_old = old_payload is not None
key_current = f"{hash_mode}:{hash_source}:{hash_normalize}" key_current = f"{hash_mode}:{hash_source}:{hash_normalize}"
old_hash = (old_payload or {}).get("hashes", {}).get(key_current) old_hash = (old_payload or {}).get("hashes", {}).get(key_current)
new_hash = note_pl.get("hashes", {}).get(key_current) new_hash = note_pl.get("hashes", {}).get(key_current)
hash_changed = (old_hash != new_hash) hash_changed = (old_hash != new_hash)
# Artefakte prüfen (Chunks/Edges)
chunks_missing, edges_missing = self._artifacts_missing(note_id) chunks_missing, edges_missing = self._artifacts_missing(note_id)
should_write = force_replace or (not has_old) or hash_changed or chunks_missing or edges_missing should_write = force_replace or (not has_old) or hash_changed or chunks_missing or edges_missing
@ -185,13 +180,28 @@ class IngestionService:
chunks = assemble_chunks(fm["id"], body_text, fm["type"]) chunks = assemble_chunks(fm["id"], body_text, fm["type"])
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
# Embeddings # --- EMBEDDING FIX ---
vecs = [] vecs = []
if embed_texts and chunk_pls: if chunk_pls:
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls] texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
vecs = embed_texts(texts) try:
# Async Aufruf des Embedders
if hasattr(self.embedder, 'embed_documents'):
vecs = await self.embedder.embed_documents(texts)
else: else:
vecs = [[0.0] * self.dim for _ in chunk_pls] # Fallback Loop falls Client kein Batch unterstützt
for t in texts:
v = await self.embedder.embed_query(t)
vecs.append(v)
# Validierung der Dimensionen
if vecs and len(vecs) > 0:
dim_got = len(vecs[0])
if dim_got != self.dim:
raise ValueError(f"Vector dimension mismatch. Expected {self.dim}, got {dim_got}")
except Exception as e:
logger.error(f"Embedding generation failed: {e}")
raise RuntimeError(f"Embedding failed: {e}")
# Edges # Edges
note_refs = note_pl.get("references") or [] note_refs = note_pl.get("references") or []
@ -208,16 +218,13 @@ class IngestionService:
if purge_before and has_old: if purge_before and has_old:
self._purge_artifacts(note_id) self._purge_artifacts(note_id)
# Upsert Note
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
upsert_batch(self.client, n_name, n_pts) upsert_batch(self.client, n_name, n_pts)
# Upsert Chunks
if chunk_pls: if chunk_pls:
c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs) c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)
upsert_batch(self.client, c_name, c_pts) upsert_batch(self.client, c_name, c_pts)
# Upsert Edges
if edges: if edges:
e_name, e_pts = points_for_edges(self.prefix, edges) e_name, e_pts = points_for_edges(self.prefix, edges)
upsert_batch(self.client, e_name, e_pts) upsert_batch(self.client, e_name, e_pts)
@ -245,12 +252,8 @@ class IngestionService:
c_col = f"{self.prefix}_chunks" c_col = f"{self.prefix}_chunks"
e_col = f"{self.prefix}_edges" e_col = f"{self.prefix}_edges"
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
# Check Chunks
c_pts, _ = self.client.scroll(collection_name=c_col, scroll_filter=f, limit=1) c_pts, _ = self.client.scroll(collection_name=c_col, scroll_filter=f, limit=1)
# Check Edges
e_pts, _ = self.client.scroll(collection_name=e_col, scroll_filter=f, limit=1) e_pts, _ = self.client.scroll(collection_name=e_col, scroll_filter=f, limit=1)
return (not bool(c_pts)), (not bool(e_pts)) return (not bool(c_pts)), (not bool(e_pts))
def _purge_artifacts(self, note_id: str): def _purge_artifacts(self, note_id: str):
@ -263,7 +266,7 @@ class IngestionService:
except Exception: except Exception:
pass pass
def create_from_text( async def create_from_text(
self, self,
markdown_content: str, markdown_content: str,
filename: str, filename: str,
@ -272,20 +275,18 @@ class IngestionService:
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
WP-11 Persistence: Schreibt Text sicher und indiziert ihn. WP-11 Persistence: Schreibt Text sicher und indiziert ihn.
Erstellt Verzeichnisse automatisch.
""" """
# 1. Zielordner vorbereiten # 1. Zielordner
target_dir = os.path.join(vault_root, folder) target_dir = os.path.join(vault_root, folder)
try: try:
os.makedirs(target_dir, exist_ok=True) os.makedirs(target_dir, exist_ok=True)
except Exception as e: except Exception as e:
return {"status": "error", "error": f"Could not create folder {target_dir}: {e}"} return {"status": "error", "error": f"Could not create folder {target_dir}: {e}"}
# 2. Dateiname bereinigen # 2. Dateiname
safe_filename = os.path.basename(filename) safe_filename = os.path.basename(filename)
if not safe_filename.endswith(".md"): if not safe_filename.endswith(".md"):
safe_filename += ".md" safe_filename += ".md"
file_path = os.path.join(target_dir, safe_filename) file_path = os.path.join(target_dir, safe_filename)
# 3. Schreiben # 3. Schreiben
@ -295,8 +296,8 @@ class IngestionService:
except Exception as e: except Exception as e:
return {"status": "error", "error": f"Disk write failed at {file_path}: {str(e)}"} return {"status": "error", "error": f"Disk write failed at {file_path}: {str(e)}"}
# 4. Indizieren (Single File Upsert) # 4. Indizieren (Async Aufruf!)
return self.process_file( return await self.process_file(
file_path=file_path, file_path=file_path,
vault_root=vault_root, vault_root=vault_root,
apply=True, apply=True,