WP15 #9
|
|
@ -2,13 +2,17 @@
|
|||
app/core/ingestion.py
|
||||
|
||||
Zentraler Service für die Transformation von Markdown-Dateien in Qdrant-Objekte (Notes, Chunks, Edges).
|
||||
Dient als Shared Logic für:
|
||||
1. CLI-Imports (scripts/import_markdown.py)
|
||||
2. API-Uploads (WP-11)
|
||||
Refactored for Async Embedding & Async Chunking (WP-15).
|
||||
Features:
|
||||
- Incremental Update (Hashing) für Massenimport.
|
||||
- Force Smart Edges für Single-File (UI).
|
||||
- Async Embedding & Chunking.
|
||||
|
||||
Version: 2.5.1 (Full Restore + WP-15 Logic)
|
||||
"""
|
||||
import os
|
||||
import logging
|
||||
import asyncio
|
||||
import time
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
|
||||
# Core Module Imports
|
||||
|
|
@ -19,21 +23,13 @@ from app.core.parser import (
|
|||
)
|
||||
from app.core.note_payload import make_note_payload
|
||||
# ASYNC CHUNKER (WP-15)
|
||||
from app.core.chunker import assemble_chunks
|
||||
from app.core.chunker import assemble_chunks, get_chunk_config
|
||||
from app.core.chunk_payload import make_chunk_payloads
|
||||
|
||||
# Fallback für Edges Import (Robustheit)
|
||||
# Fallback für Edges Import
|
||||
try:
|
||||
from app.core.derive_edges import build_edges_for_note
|
||||
except ImportError:
|
||||
try:
|
||||
from app.core.derive_edges import derive_edges_for_note as build_edges_for_note
|
||||
except ImportError:
|
||||
try:
|
||||
from app.core.edges import build_edges_for_note
|
||||
except ImportError:
|
||||
# Fallback Mock
|
||||
logging.warning("Could not import edge derivation logic. Edges will be empty.")
|
||||
def build_edges_for_note(*args, **kwargs): return []
|
||||
|
||||
from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
|
||||
|
|
@ -44,48 +40,26 @@ from app.core.qdrant_points import (
|
|||
upsert_batch,
|
||||
)
|
||||
|
||||
# WICHTIG: Wir nutzen den API-Client für Embeddings (Async Support)
|
||||
from app.services.embeddings_client import EmbeddingsClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# --- Helper für Type-Registry ---
|
||||
# --- Helper ---
|
||||
def load_type_registry(custom_path: Optional[str] = None) -> dict:
|
||||
import yaml
|
||||
path = custom_path or os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
|
||||
if not os.path.exists(path):
|
||||
if os.path.exists("types.yaml"):
|
||||
path = "types.yaml"
|
||||
else:
|
||||
return {}
|
||||
if not os.path.exists(path): return {}
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
return yaml.safe_load(f) or {}
|
||||
except Exception:
|
||||
return {}
|
||||
with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {}
|
||||
except Exception: return {}
|
||||
|
||||
def resolve_note_type(requested: Optional[str], reg: dict) -> str:
|
||||
types = reg.get("types", {})
|
||||
if requested and requested in types:
|
||||
return requested
|
||||
if requested and requested in types: return requested
|
||||
return "concept"
|
||||
|
||||
def effective_chunk_profile(note_type: str, reg: dict) -> str:
|
||||
t_cfg = reg.get("types", {}).get(note_type, {})
|
||||
if t_cfg and t_cfg.get("chunk_profile"):
|
||||
return t_cfg.get("chunk_profile")
|
||||
return reg.get("defaults", {}).get("chunk_profile", "default")
|
||||
|
||||
def effective_retriever_weight(note_type: str, reg: dict) -> float:
|
||||
t_cfg = reg.get("types", {}).get(note_type, {})
|
||||
if t_cfg and "retriever_weight" in t_cfg:
|
||||
return float(t_cfg["retriever_weight"])
|
||||
return float(reg.get("defaults", {}).get("retriever_weight", 1.0))
|
||||
|
||||
|
||||
class IngestionService:
|
||||
def __init__(self, collection_prefix: str = None):
|
||||
# Prefix Logik vereinheitlichen
|
||||
env_prefix = os.getenv("COLLECTION_PREFIX", "mindnet")
|
||||
self.prefix = collection_prefix or env_prefix
|
||||
|
||||
|
|
@ -93,19 +67,15 @@ class IngestionService:
|
|||
self.cfg.prefix = self.prefix
|
||||
self.client = get_client(self.cfg)
|
||||
self.dim = self.cfg.dim
|
||||
|
||||
# Registry laden
|
||||
self.registry = load_type_registry()
|
||||
|
||||
# Embedding Service initialisieren (Async Client)
|
||||
self.embedder = EmbeddingsClient()
|
||||
|
||||
# Init DB Checks (Fehler abfangen, falls DB nicht erreichbar)
|
||||
# Init Checks
|
||||
try:
|
||||
ensure_collections(self.client, self.prefix, self.dim)
|
||||
ensure_payload_indexes(self.client, self.prefix)
|
||||
except Exception as e:
|
||||
logger.warning(f"DB initialization warning: {e}")
|
||||
logger.warning(f"DB init warning: {e}")
|
||||
|
||||
async def process_file(
|
||||
self,
|
||||
|
|
@ -117,10 +87,12 @@ class IngestionService:
|
|||
note_scope_refs: bool = False,
|
||||
hash_mode: str = "body",
|
||||
hash_source: str = "parsed",
|
||||
hash_normalize: str = "canonical"
|
||||
hash_normalize: str = "canonical",
|
||||
force_smart_edges: bool = False # NEU: Override für UI
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Verarbeitet eine einzelne Datei (ASYNC Version).
|
||||
Verarbeitet eine einzelne Datei (ASYNC).
|
||||
Enthält Hashing-Logik für inkrementelle Updates.
|
||||
"""
|
||||
result = {
|
||||
"path": file_path,
|
||||
|
|
@ -129,11 +101,10 @@ class IngestionService:
|
|||
"error": None
|
||||
}
|
||||
|
||||
# 1. Parse & Frontmatter
|
||||
# 1. Parse & Validate
|
||||
try:
|
||||
parsed = read_markdown(file_path)
|
||||
if not parsed:
|
||||
return {**result, "error": "Empty or unreadable file"}
|
||||
if not parsed: return {**result, "error": "Empty/Unreadable"}
|
||||
|
||||
fm = normalize_frontmatter(parsed.frontmatter)
|
||||
validate_required_frontmatter(fm)
|
||||
|
|
@ -141,46 +112,42 @@ class IngestionService:
|
|||
logger.error(f"Validation failed for {file_path}: {e}")
|
||||
return {**result, "error": f"Validation failed: {str(e)}"}
|
||||
|
||||
# 2. Type & Config Resolution
|
||||
# 2. Resolve Type
|
||||
note_type = resolve_note_type(fm.get("type"), self.registry)
|
||||
fm["type"] = note_type
|
||||
fm["chunk_profile"] = effective_chunk_profile(note_type, self.registry)
|
||||
|
||||
weight = fm.get("retriever_weight")
|
||||
if weight is None:
|
||||
weight = effective_retriever_weight(note_type, self.registry)
|
||||
fm["retriever_weight"] = float(weight)
|
||||
|
||||
# 3. Build Note Payload
|
||||
# 3. Build Payload & Hash
|
||||
try:
|
||||
note_pl = make_note_payload(
|
||||
parsed,
|
||||
vault_root=vault_root,
|
||||
file_path=file_path,
|
||||
hash_mode=hash_mode,
|
||||
hash_normalize=hash_normalize,
|
||||
hash_source=hash_source,
|
||||
file_path=file_path
|
||||
hash_source=hash_source
|
||||
)
|
||||
if not note_pl.get("fulltext"):
|
||||
note_pl["fulltext"] = getattr(parsed, "body", "") or ""
|
||||
note_pl["retriever_weight"] = fm["retriever_weight"]
|
||||
|
||||
note_id = note_pl["note_id"]
|
||||
except Exception as e:
|
||||
logger.error(f"Payload build failed: {e}")
|
||||
return {**result, "error": f"Payload build failed: {str(e)}"}
|
||||
|
||||
# 4. Change Detection
|
||||
# 4. Change Detection (Das Herzstück für Massenimport)
|
||||
old_payload = None
|
||||
if not force_replace:
|
||||
old_payload = self._fetch_note_payload(note_id)
|
||||
|
||||
has_old = old_payload is not None
|
||||
|
||||
# Hash Vergleich
|
||||
key_current = f"{hash_mode}:{hash_source}:{hash_normalize}"
|
||||
old_hash = (old_payload or {}).get("hashes", {}).get(key_current)
|
||||
new_hash = note_pl.get("hashes", {}).get(key_current)
|
||||
|
||||
hash_changed = (old_hash != new_hash)
|
||||
|
||||
# Check ob Chunks/Edges in DB fehlen (Reparatur-Modus)
|
||||
chunks_missing, edges_missing = self._artifacts_missing(note_id)
|
||||
|
||||
should_write = force_replace or (not has_old) or hash_changed or chunks_missing or edges_missing
|
||||
|
|
@ -191,62 +158,51 @@ class IngestionService:
|
|||
if not apply:
|
||||
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
|
||||
|
||||
# 5. Processing (Chunking, Embedding, Edges)
|
||||
# 5. Processing (Chunking, Embedding)
|
||||
try:
|
||||
body_text = getattr(parsed, "body", "") or ""
|
||||
|
||||
# --- FIX: AWAIT ASYNC CHUNKER (WP-15 Update) ---
|
||||
# assemble_chunks ist jetzt eine Coroutine und muss mit await aufgerufen werden.
|
||||
chunks = await assemble_chunks(fm["id"], body_text, fm["type"])
|
||||
# -----------------------------------------------
|
||||
# --- WP-15 LOGIC ---
|
||||
# Config laden und ggf. überschreiben
|
||||
chunk_config = get_chunk_config(note_type)
|
||||
if force_smart_edges:
|
||||
logger.info(f"Ingestion: Forcing Smart Edges for {note_id}")
|
||||
chunk_config["enable_smart_edge_allocation"] = True
|
||||
|
||||
# Async Chunking
|
||||
chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config)
|
||||
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
|
||||
|
||||
# --- EMBEDDING (ASYNC) ---
|
||||
# Embedding
|
||||
vecs = []
|
||||
if chunk_pls:
|
||||
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
|
||||
try:
|
||||
# Async Aufruf des Embedders (via Batch oder Loop)
|
||||
if hasattr(self.embedder, 'embed_documents'):
|
||||
vecs = await self.embedder.embed_documents(texts)
|
||||
else:
|
||||
# Fallback Loop falls Client kein Batch unterstützt
|
||||
for t in texts:
|
||||
v = await self.embedder.embed_query(t)
|
||||
vecs.append(v)
|
||||
|
||||
# Validierung der Dimensionen
|
||||
if vecs and len(vecs) > 0:
|
||||
dim_got = len(vecs[0])
|
||||
if dim_got != self.dim:
|
||||
# Wirf keinen Fehler, aber logge Warnung. Qdrant Upsert wird failen wenn 0.
|
||||
logger.warning(f"Vector dimension mismatch. Expected {self.dim}, got {dim_got}")
|
||||
if dim_got == 0:
|
||||
raise ValueError("Embedding returned empty vectors (Dim 0)")
|
||||
for t in texts: vecs.append(await self.embedder.embed_query(t))
|
||||
except Exception as e:
|
||||
logger.error(f"Embedding generation failed: {e}")
|
||||
raise RuntimeError(f"Embedding failed: {e}")
|
||||
# Embedding Fehler sind kritisch
|
||||
logger.error(f"Embedding failed: {e}")
|
||||
raise e
|
||||
|
||||
# Edges
|
||||
note_refs = note_pl.get("references") or []
|
||||
# Versuche flexible Signatur für Edges (V1 vs V2)
|
||||
try:
|
||||
edges = build_edges_for_note(
|
||||
note_id,
|
||||
chunk_pls,
|
||||
note_level_references=note_refs,
|
||||
note_level_references=note_pl.get("references", []),
|
||||
include_note_scope_refs=note_scope_refs
|
||||
)
|
||||
except TypeError:
|
||||
# Fallback für ältere Signatur
|
||||
edges = build_edges_for_note(note_id, chunk_pls)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Processing failed: {e}", exc_info=True)
|
||||
return {**result, "error": f"Processing failed: {str(e)}"}
|
||||
|
||||
# 6. Upsert Action
|
||||
# 6. Upsert (Atomic Write recommended, here Batch)
|
||||
try:
|
||||
if purge_before and has_old:
|
||||
self._purge_artifacts(note_id)
|
||||
|
|
@ -274,7 +230,7 @@ class IngestionService:
|
|||
logger.error(f"Upsert failed: {e}", exc_info=True)
|
||||
return {**result, "error": f"DB Upsert failed: {e}"}
|
||||
|
||||
# --- Interne Qdrant Helper ---
|
||||
# --- Interne Qdrant Helper (Wichtig für Change Detection) ---
|
||||
|
||||
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
|
||||
from qdrant_client.http import models as rest
|
||||
|
|
@ -303,8 +259,7 @@ class IngestionService:
|
|||
for suffix in ["chunks", "edges"]:
|
||||
try:
|
||||
self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception: pass
|
||||
|
||||
async def create_from_text(
|
||||
self,
|
||||
|
|
@ -315,35 +270,33 @@ class IngestionService:
|
|||
) -> Dict[str, Any]:
|
||||
"""
|
||||
WP-11 Persistence API Entrypoint.
|
||||
Schreibt Text in Vault und indiziert ihn sofort.
|
||||
Speichert Text und erzwingt sofortige Indizierung mit Smart Edges.
|
||||
"""
|
||||
# 1. Zielordner
|
||||
target_dir = os.path.join(vault_root, folder)
|
||||
try:
|
||||
os.makedirs(target_dir, exist_ok=True)
|
||||
except Exception as e:
|
||||
return {"status": "error", "error": f"Could not create folder {target_dir}: {e}"}
|
||||
|
||||
# 2. Dateiname
|
||||
safe_filename = os.path.basename(filename)
|
||||
if not safe_filename.endswith(".md"):
|
||||
safe_filename += ".md"
|
||||
file_path = os.path.join(target_dir, safe_filename)
|
||||
file_path = os.path.join(target_dir, filename)
|
||||
|
||||
# 3. Schreiben
|
||||
try:
|
||||
# Robust Write: Ensure Flush
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(markdown_content)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
|
||||
# Kurzer Sleep für OS Filesystem Latenz
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
logger.info(f"Written file to {file_path}")
|
||||
except Exception as e:
|
||||
return {"status": "error", "error": f"Disk write failed at {file_path}: {str(e)}"}
|
||||
return {"status": "error", "error": f"Disk write failed: {str(e)}"}
|
||||
|
||||
# 4. Indizieren (Async Aufruf!)
|
||||
# Wir rufen process_file auf, das jetzt ASYNC ist
|
||||
# Hier aktivieren wir die Smart Edges explizit für den Single-File Import
|
||||
return await self.process_file(
|
||||
file_path=file_path,
|
||||
vault_root=vault_root,
|
||||
apply=True,
|
||||
force_replace=True,
|
||||
purge_before=True
|
||||
purge_before=True,
|
||||
force_smart_edges=True # <--- HIER: Intelligence Override
|
||||
)
|
||||
Loading…
Reference in New Issue
Block a user