WP11 #8
|
|
@ -3,8 +3,9 @@ app/core/ingestion.py
|
||||||
|
|
||||||
Zentraler Service für die Transformation von Markdown-Dateien in Qdrant-Objekte (Notes, Chunks, Edges).
|
Zentraler Service für die Transformation von Markdown-Dateien in Qdrant-Objekte (Notes, Chunks, Edges).
|
||||||
Dient als Shared Logic für:
|
Dient als Shared Logic für:
|
||||||
1. CLI-Imports (scripts/import_markdown.py) - muss ggf. angepasst werden auf Async!
|
1. CLI-Imports (scripts/import_markdown.py)
|
||||||
2. API-Uploads (WP-11)
|
2. API-Uploads (WP-11)
|
||||||
|
Refactored for Async Embedding Support.
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
|
|
@ -20,12 +21,19 @@ from app.core.note_payload import make_note_payload
|
||||||
from app.core.chunker import assemble_chunks
|
from app.core.chunker import assemble_chunks
|
||||||
from app.core.chunk_payload import make_chunk_payloads
|
from app.core.chunk_payload import make_chunk_payloads
|
||||||
|
|
||||||
# Fallback für Edges Import
|
# Fallback für Edges Import (Robustheit)
|
||||||
try:
|
try:
|
||||||
from app.core.derive_edges import build_edges_for_note
|
from app.core.derive_edges import build_edges_for_note
|
||||||
except ImportError:
|
except ImportError:
|
||||||
# Fallback falls Dateiname anders ist
|
try:
|
||||||
from app.core.edges import build_edges_for_note # type: ignore
|
from app.core.derive_edges import derive_edges_for_note as build_edges_for_note
|
||||||
|
except ImportError:
|
||||||
|
try:
|
||||||
|
from app.core.edges import build_edges_for_note
|
||||||
|
except ImportError:
|
||||||
|
# Fallback Mock
|
||||||
|
logging.warning("Could not import edge derivation logic. Edges will be empty.")
|
||||||
|
def build_edges_for_note(*args, **kwargs): return []
|
||||||
|
|
||||||
from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
|
from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
|
||||||
from app.core.qdrant_points import (
|
from app.core.qdrant_points import (
|
||||||
|
|
@ -35,7 +43,7 @@ from app.core.qdrant_points import (
|
||||||
upsert_batch,
|
upsert_batch,
|
||||||
)
|
)
|
||||||
|
|
||||||
# WICHTIG: Wir nutzen den API-Client für Embeddings
|
# WICHTIG: Wir nutzen den API-Client für Embeddings (Async Support)
|
||||||
from app.services.embeddings_client import EmbeddingsClient
|
from app.services.embeddings_client import EmbeddingsClient
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
@ -75,22 +83,28 @@ def effective_retriever_weight(note_type: str, reg: dict) -> float:
|
||||||
|
|
||||||
|
|
||||||
class IngestionService:
|
class IngestionService:
|
||||||
def __init__(self, collection_prefix: str = "mindnet"):
|
def __init__(self, collection_prefix: str = None):
|
||||||
self.prefix = collection_prefix
|
# Prefix Logik vereinheitlichen
|
||||||
|
env_prefix = os.getenv("COLLECTION_PREFIX", "mindnet")
|
||||||
|
self.prefix = collection_prefix or env_prefix
|
||||||
|
|
||||||
self.cfg = QdrantConfig.from_env()
|
self.cfg = QdrantConfig.from_env()
|
||||||
self.cfg.prefix = collection_prefix
|
self.cfg.prefix = self.prefix
|
||||||
self.client = get_client(self.cfg)
|
self.client = get_client(self.cfg)
|
||||||
self.dim = self.cfg.dim
|
self.dim = self.cfg.dim
|
||||||
|
|
||||||
# Registry laden
|
# Registry laden
|
||||||
self.registry = load_type_registry()
|
self.registry = load_type_registry()
|
||||||
|
|
||||||
# Embedding Service initialisieren
|
# Embedding Service initialisieren (Async Client)
|
||||||
self.embedder = EmbeddingsClient()
|
self.embedder = EmbeddingsClient()
|
||||||
|
|
||||||
# Init DB Checks
|
# Init DB Checks (Fehler abfangen, falls DB nicht erreichbar)
|
||||||
ensure_collections(self.client, self.prefix, self.dim)
|
try:
|
||||||
ensure_payload_indexes(self.client, self.prefix)
|
ensure_collections(self.client, self.prefix, self.dim)
|
||||||
|
ensure_payload_indexes(self.client, self.prefix)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"DB initialization warning: {e}")
|
||||||
|
|
||||||
async def process_file(
|
async def process_file(
|
||||||
self,
|
self,
|
||||||
|
|
@ -105,7 +119,7 @@ class IngestionService:
|
||||||
hash_normalize: str = "canonical"
|
hash_normalize: str = "canonical"
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Verarbeitet eine einzelne Datei (ASYNC).
|
Verarbeitet eine einzelne Datei (ASYNC Version).
|
||||||
"""
|
"""
|
||||||
result = {
|
result = {
|
||||||
"path": file_path,
|
"path": file_path,
|
||||||
|
|
@ -123,6 +137,7 @@ class IngestionService:
|
||||||
fm = normalize_frontmatter(parsed.frontmatter)
|
fm = normalize_frontmatter(parsed.frontmatter)
|
||||||
validate_required_frontmatter(fm)
|
validate_required_frontmatter(fm)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
logger.error(f"Validation failed for {file_path}: {e}")
|
||||||
return {**result, "error": f"Validation failed: {str(e)}"}
|
return {**result, "error": f"Validation failed: {str(e)}"}
|
||||||
|
|
||||||
# 2. Type & Config Resolution
|
# 2. Type & Config Resolution
|
||||||
|
|
@ -151,6 +166,7 @@ class IngestionService:
|
||||||
|
|
||||||
note_id = note_pl["note_id"]
|
note_id = note_pl["note_id"]
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
logger.error(f"Payload build failed: {e}")
|
||||||
return {**result, "error": f"Payload build failed: {str(e)}"}
|
return {**result, "error": f"Payload build failed: {str(e)}"}
|
||||||
|
|
||||||
# 4. Change Detection
|
# 4. Change Detection
|
||||||
|
|
@ -180,12 +196,12 @@ class IngestionService:
|
||||||
chunks = assemble_chunks(fm["id"], body_text, fm["type"])
|
chunks = assemble_chunks(fm["id"], body_text, fm["type"])
|
||||||
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
|
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
|
||||||
|
|
||||||
# --- EMBEDDING FIX ---
|
# --- EMBEDDING FIX (ASYNC) ---
|
||||||
vecs = []
|
vecs = []
|
||||||
if chunk_pls:
|
if chunk_pls:
|
||||||
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
|
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
|
||||||
try:
|
try:
|
||||||
# Async Aufruf des Embedders
|
# Async Aufruf des Embedders (via Batch oder Loop)
|
||||||
if hasattr(self.embedder, 'embed_documents'):
|
if hasattr(self.embedder, 'embed_documents'):
|
||||||
vecs = await self.embedder.embed_documents(texts)
|
vecs = await self.embedder.embed_documents(texts)
|
||||||
else:
|
else:
|
||||||
|
|
@ -198,63 +214,81 @@ class IngestionService:
|
||||||
if vecs and len(vecs) > 0:
|
if vecs and len(vecs) > 0:
|
||||||
dim_got = len(vecs[0])
|
dim_got = len(vecs[0])
|
||||||
if dim_got != self.dim:
|
if dim_got != self.dim:
|
||||||
raise ValueError(f"Vector dimension mismatch. Expected {self.dim}, got {dim_got}")
|
# Wirf keinen Fehler, aber logge Warnung. Qdrant Upsert wird failen wenn 0.
|
||||||
|
logger.warning(f"Vector dimension mismatch. Expected {self.dim}, got {dim_got}")
|
||||||
|
if dim_got == 0:
|
||||||
|
raise ValueError("Embedding returned empty vectors (Dim 0)")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Embedding generation failed: {e}")
|
logger.error(f"Embedding generation failed: {e}")
|
||||||
raise RuntimeError(f"Embedding failed: {e}")
|
raise RuntimeError(f"Embedding failed: {e}")
|
||||||
|
|
||||||
# Edges
|
# Edges
|
||||||
note_refs = note_pl.get("references") or []
|
note_refs = note_pl.get("references") or []
|
||||||
edges = build_edges_for_note(
|
# Versuche flexible Signatur für Edges (V1 vs V2)
|
||||||
note_id,
|
try:
|
||||||
chunk_pls,
|
edges = build_edges_for_note(
|
||||||
note_level_references=note_refs,
|
note_id,
|
||||||
include_note_scope_refs=note_scope_refs
|
chunk_pls,
|
||||||
)
|
note_level_references=note_refs,
|
||||||
|
include_note_scope_refs=note_scope_refs
|
||||||
|
)
|
||||||
|
except TypeError:
|
||||||
|
# Fallback für ältere Signatur
|
||||||
|
edges = build_edges_for_note(note_id, chunk_pls)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
logger.error(f"Processing failed: {e}", exc_info=True)
|
||||||
return {**result, "error": f"Processing failed: {str(e)}"}
|
return {**result, "error": f"Processing failed: {str(e)}"}
|
||||||
|
|
||||||
# 6. Upsert Action
|
# 6. Upsert Action
|
||||||
if purge_before and has_old:
|
try:
|
||||||
self._purge_artifacts(note_id)
|
if purge_before and has_old:
|
||||||
|
self._purge_artifacts(note_id)
|
||||||
|
|
||||||
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
|
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
|
||||||
upsert_batch(self.client, n_name, n_pts)
|
upsert_batch(self.client, n_name, n_pts)
|
||||||
|
|
||||||
if chunk_pls:
|
if chunk_pls and vecs:
|
||||||
c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)
|
c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)
|
||||||
upsert_batch(self.client, c_name, c_pts)
|
upsert_batch(self.client, c_name, c_pts)
|
||||||
|
|
||||||
if edges:
|
if edges:
|
||||||
e_name, e_pts = points_for_edges(self.prefix, edges)
|
e_name, e_pts = points_for_edges(self.prefix, edges)
|
||||||
upsert_batch(self.client, e_name, e_pts)
|
upsert_batch(self.client, e_name, e_pts)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"path": file_path,
|
"path": file_path,
|
||||||
"status": "success",
|
"status": "success",
|
||||||
"changed": True,
|
"changed": True,
|
||||||
"note_id": note_id,
|
"note_id": note_id,
|
||||||
"chunks_count": len(chunk_pls),
|
"chunks_count": len(chunk_pls),
|
||||||
"edges_count": len(edges)
|
"edges_count": len(edges)
|
||||||
}
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Upsert failed: {e}", exc_info=True)
|
||||||
|
return {**result, "error": f"DB Upsert failed: {e}"}
|
||||||
|
|
||||||
# --- Interne Qdrant Helper ---
|
# --- Interne Qdrant Helper ---
|
||||||
|
|
||||||
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
|
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
|
||||||
from qdrant_client.http import models as rest
|
from qdrant_client.http import models as rest
|
||||||
col = f"{self.prefix}_notes"
|
col = f"{self.prefix}_notes"
|
||||||
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
try:
|
||||||
pts, _ = self.client.scroll(collection_name=col, scroll_filter=f, limit=1, with_payload=True)
|
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||||||
return pts[0].payload if pts else None
|
pts, _ = self.client.scroll(collection_name=col, scroll_filter=f, limit=1, with_payload=True)
|
||||||
|
return pts[0].payload if pts else None
|
||||||
|
except: return None
|
||||||
|
|
||||||
def _artifacts_missing(self, note_id: str) -> Tuple[bool, bool]:
|
def _artifacts_missing(self, note_id: str) -> Tuple[bool, bool]:
|
||||||
from qdrant_client.http import models as rest
|
from qdrant_client.http import models as rest
|
||||||
c_col = f"{self.prefix}_chunks"
|
c_col = f"{self.prefix}_chunks"
|
||||||
e_col = f"{self.prefix}_edges"
|
e_col = f"{self.prefix}_edges"
|
||||||
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
try:
|
||||||
c_pts, _ = self.client.scroll(collection_name=c_col, scroll_filter=f, limit=1)
|
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||||||
e_pts, _ = self.client.scroll(collection_name=e_col, scroll_filter=f, limit=1)
|
c_pts, _ = self.client.scroll(collection_name=c_col, scroll_filter=f, limit=1)
|
||||||
return (not bool(c_pts)), (not bool(e_pts))
|
e_pts, _ = self.client.scroll(collection_name=e_col, scroll_filter=f, limit=1)
|
||||||
|
return (not bool(c_pts)), (not bool(e_pts))
|
||||||
|
except: return True, True
|
||||||
|
|
||||||
def _purge_artifacts(self, note_id: str):
|
def _purge_artifacts(self, note_id: str):
|
||||||
from qdrant_client.http import models as rest
|
from qdrant_client.http import models as rest
|
||||||
|
|
@ -274,7 +308,8 @@ class IngestionService:
|
||||||
folder: str = "00_Inbox"
|
folder: str = "00_Inbox"
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
WP-11 Persistence: Schreibt Text sicher und indiziert ihn.
|
WP-11 Persistence API Entrypoint.
|
||||||
|
Schreibt Text in Vault und indiziert ihn sofort.
|
||||||
"""
|
"""
|
||||||
# 1. Zielordner
|
# 1. Zielordner
|
||||||
target_dir = os.path.join(vault_root, folder)
|
target_dir = os.path.join(vault_root, folder)
|
||||||
|
|
@ -293,10 +328,12 @@ class IngestionService:
|
||||||
try:
|
try:
|
||||||
with open(file_path, "w", encoding="utf-8") as f:
|
with open(file_path, "w", encoding="utf-8") as f:
|
||||||
f.write(markdown_content)
|
f.write(markdown_content)
|
||||||
|
logger.info(f"Written file to {file_path}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return {"status": "error", "error": f"Disk write failed at {file_path}: {str(e)}"}
|
return {"status": "error", "error": f"Disk write failed at {file_path}: {str(e)}"}
|
||||||
|
|
||||||
# 4. Indizieren (Async Aufruf!)
|
# 4. Indizieren (Async Aufruf!)
|
||||||
|
# Wir rufen process_file auf, das jetzt ASYNC ist
|
||||||
return await self.process_file(
|
return await self.process_file(
|
||||||
file_path=file_path,
|
file_path=file_path,
|
||||||
vault_root=vault_root,
|
vault_root=vault_root,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user