bug fix
This commit is contained in:
parent
d34f388ee1
commit
1fe9582cbe
|
|
@ -1,13 +1,8 @@
|
||||||
"""
|
"""
|
||||||
app/core/ingestion.py
|
app/core/ingestion.py
|
||||||
|
|
||||||
Zentraler Service für die Transformation von Markdown-Dateien in Qdrant-Objekte (Notes, Chunks, Edges).
|
Zentraler Service für die Transformation von Markdown-Dateien in Qdrant-Objekte.
|
||||||
Features:
|
Version: 2.5.2 (Full Feature: Change Detection + Robust IO + Clean Config)
|
||||||
- Incremental Update (Hashing) für Massenimport.
|
|
||||||
- Force Smart Edges für Single-File (UI).
|
|
||||||
- Async Embedding & Chunking.
|
|
||||||
|
|
||||||
Version: 2.5.1 (Full Restore + WP-15 Logic)
|
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
|
|
@ -22,11 +17,10 @@ from app.core.parser import (
|
||||||
validate_required_frontmatter,
|
validate_required_frontmatter,
|
||||||
)
|
)
|
||||||
from app.core.note_payload import make_note_payload
|
from app.core.note_payload import make_note_payload
|
||||||
# ASYNC CHUNKER (WP-15)
|
|
||||||
from app.core.chunker import assemble_chunks, get_chunk_config
|
from app.core.chunker import assemble_chunks, get_chunk_config
|
||||||
from app.core.chunk_payload import make_chunk_payloads
|
from app.core.chunk_payload import make_chunk_payloads
|
||||||
|
|
||||||
# Fallback für Edges Import
|
# Fallback für Edges
|
||||||
try:
|
try:
|
||||||
from app.core.derive_edges import build_edges_for_note
|
from app.core.derive_edges import build_edges_for_note
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
|
@ -58,6 +52,19 @@ def resolve_note_type(requested: Optional[str], reg: dict) -> str:
|
||||||
if requested and requested in types: return requested
|
if requested and requested in types: return requested
|
||||||
return "concept"
|
return "concept"
|
||||||
|
|
||||||
|
def effective_chunk_profile(note_type: str, reg: dict) -> str:
|
||||||
|
t_cfg = reg.get("types", {}).get(note_type, {})
|
||||||
|
if t_cfg and t_cfg.get("chunk_profile"):
|
||||||
|
return t_cfg.get("chunk_profile")
|
||||||
|
return reg.get("defaults", {}).get("chunk_profile", "default")
|
||||||
|
|
||||||
|
def effective_retriever_weight(note_type: str, reg: dict) -> float:
|
||||||
|
t_cfg = reg.get("types", {}).get(note_type, {})
|
||||||
|
if t_cfg and "retriever_weight" in t_cfg:
|
||||||
|
return float(t_cfg["retriever_weight"])
|
||||||
|
return float(reg.get("defaults", {}).get("retriever_weight", 1.0))
|
||||||
|
|
||||||
|
|
||||||
class IngestionService:
|
class IngestionService:
|
||||||
def __init__(self, collection_prefix: str = None):
|
def __init__(self, collection_prefix: str = None):
|
||||||
env_prefix = os.getenv("COLLECTION_PREFIX", "mindnet")
|
env_prefix = os.getenv("COLLECTION_PREFIX", "mindnet")
|
||||||
|
|
@ -70,7 +77,6 @@ class IngestionService:
|
||||||
self.registry = load_type_registry()
|
self.registry = load_type_registry()
|
||||||
self.embedder = EmbeddingsClient()
|
self.embedder = EmbeddingsClient()
|
||||||
|
|
||||||
# Init Checks
|
|
||||||
try:
|
try:
|
||||||
ensure_collections(self.client, self.prefix, self.dim)
|
ensure_collections(self.client, self.prefix, self.dim)
|
||||||
ensure_payload_indexes(self.client, self.prefix)
|
ensure_payload_indexes(self.client, self.prefix)
|
||||||
|
|
@ -87,12 +93,11 @@ class IngestionService:
|
||||||
note_scope_refs: bool = False,
|
note_scope_refs: bool = False,
|
||||||
hash_mode: str = "body",
|
hash_mode: str = "body",
|
||||||
hash_source: str = "parsed",
|
hash_source: str = "parsed",
|
||||||
hash_normalize: str = "canonical",
|
hash_normalize: str = "canonical"
|
||||||
force_smart_edges: bool = False # NEU: Override für UI
|
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Verarbeitet eine einzelne Datei (ASYNC).
|
Verarbeitet eine einzelne Datei (ASYNC).
|
||||||
Enthält Hashing-Logik für inkrementelle Updates.
|
Inklusive Change Detection (Hash-Check) gegen Qdrant.
|
||||||
"""
|
"""
|
||||||
result = {
|
result = {
|
||||||
"path": file_path,
|
"path": file_path,
|
||||||
|
|
@ -101,10 +106,11 @@ class IngestionService:
|
||||||
"error": None
|
"error": None
|
||||||
}
|
}
|
||||||
|
|
||||||
# 1. Parse & Validate
|
# 1. Parse & Frontmatter Validation
|
||||||
try:
|
try:
|
||||||
parsed = read_markdown(file_path)
|
parsed = read_markdown(file_path)
|
||||||
if not parsed: return {**result, "error": "Empty/Unreadable"}
|
if not parsed:
|
||||||
|
return {**result, "error": "Empty or unreadable file"}
|
||||||
|
|
||||||
fm = normalize_frontmatter(parsed.frontmatter)
|
fm = normalize_frontmatter(parsed.frontmatter)
|
||||||
validate_required_frontmatter(fm)
|
validate_required_frontmatter(fm)
|
||||||
|
|
@ -112,42 +118,46 @@ class IngestionService:
|
||||||
logger.error(f"Validation failed for {file_path}: {e}")
|
logger.error(f"Validation failed for {file_path}: {e}")
|
||||||
return {**result, "error": f"Validation failed: {str(e)}"}
|
return {**result, "error": f"Validation failed: {str(e)}"}
|
||||||
|
|
||||||
# 2. Resolve Type
|
# 2. Type & Config Resolution
|
||||||
note_type = resolve_note_type(fm.get("type"), self.registry)
|
note_type = resolve_note_type(fm.get("type"), self.registry)
|
||||||
fm["type"] = note_type
|
fm["type"] = note_type
|
||||||
|
fm["chunk_profile"] = effective_chunk_profile(note_type, self.registry)
|
||||||
|
|
||||||
# 3. Build Payload & Hash
|
weight = fm.get("retriever_weight")
|
||||||
|
if weight is None:
|
||||||
|
weight = effective_retriever_weight(note_type, self.registry)
|
||||||
|
fm["retriever_weight"] = float(weight)
|
||||||
|
|
||||||
|
# 3. Build Note Payload
|
||||||
try:
|
try:
|
||||||
note_pl = make_note_payload(
|
note_pl = make_note_payload(
|
||||||
parsed,
|
parsed,
|
||||||
vault_root=vault_root,
|
vault_root=vault_root,
|
||||||
file_path=file_path,
|
|
||||||
hash_mode=hash_mode,
|
hash_mode=hash_mode,
|
||||||
hash_normalize=hash_normalize,
|
hash_normalize=hash_normalize,
|
||||||
hash_source=hash_source
|
hash_source=hash_source,
|
||||||
|
file_path=file_path
|
||||||
)
|
)
|
||||||
if not note_pl.get("fulltext"):
|
if not note_pl.get("fulltext"):
|
||||||
note_pl["fulltext"] = getattr(parsed, "body", "") or ""
|
note_pl["fulltext"] = getattr(parsed, "body", "") or ""
|
||||||
|
note_pl["retriever_weight"] = fm["retriever_weight"]
|
||||||
|
|
||||||
note_id = note_pl["note_id"]
|
note_id = note_pl["note_id"]
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
logger.error(f"Payload build failed: {e}")
|
||||||
return {**result, "error": f"Payload build failed: {str(e)}"}
|
return {**result, "error": f"Payload build failed: {str(e)}"}
|
||||||
|
|
||||||
# 4. Change Detection (Das Herzstück für Massenimport)
|
# 4. Change Detection (Das fehlende Stück!)
|
||||||
old_payload = None
|
old_payload = None
|
||||||
if not force_replace:
|
if not force_replace:
|
||||||
old_payload = self._fetch_note_payload(note_id)
|
old_payload = self._fetch_note_payload(note_id)
|
||||||
|
|
||||||
has_old = old_payload is not None
|
has_old = old_payload is not None
|
||||||
|
|
||||||
# Hash Vergleich
|
|
||||||
key_current = f"{hash_mode}:{hash_source}:{hash_normalize}"
|
key_current = f"{hash_mode}:{hash_source}:{hash_normalize}"
|
||||||
old_hash = (old_payload or {}).get("hashes", {}).get(key_current)
|
old_hash = (old_payload or {}).get("hashes", {}).get(key_current)
|
||||||
new_hash = note_pl.get("hashes", {}).get(key_current)
|
new_hash = note_pl.get("hashes", {}).get(key_current)
|
||||||
|
|
||||||
hash_changed = (old_hash != new_hash)
|
hash_changed = (old_hash != new_hash)
|
||||||
|
|
||||||
# Check ob Chunks/Edges in DB fehlen (Reparatur-Modus)
|
|
||||||
chunks_missing, edges_missing = self._artifacts_missing(note_id)
|
chunks_missing, edges_missing = self._artifacts_missing(note_id)
|
||||||
|
|
||||||
should_write = force_replace or (not has_old) or hash_changed or chunks_missing or edges_missing
|
should_write = force_replace or (not has_old) or hash_changed or chunks_missing or edges_missing
|
||||||
|
|
@ -158,18 +168,14 @@ class IngestionService:
|
||||||
if not apply:
|
if not apply:
|
||||||
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
|
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
|
||||||
|
|
||||||
# 5. Processing (Chunking, Embedding)
|
# 5. Processing (Chunking, Embedding, Edges)
|
||||||
try:
|
try:
|
||||||
body_text = getattr(parsed, "body", "") or ""
|
body_text = getattr(parsed, "body", "") or ""
|
||||||
|
|
||||||
# --- WP-15 LOGIC ---
|
# --- Config Loading (Clean) ---
|
||||||
# Config laden und ggf. überschreiben
|
|
||||||
chunk_config = get_chunk_config(note_type)
|
chunk_config = get_chunk_config(note_type)
|
||||||
if force_smart_edges:
|
# Hier greift die Logik aus types.yaml (smart=True/False)
|
||||||
logger.info(f"Ingestion: Forcing Smart Edges for {note_id}")
|
|
||||||
chunk_config["enable_smart_edge_allocation"] = True
|
|
||||||
|
|
||||||
# Async Chunking
|
|
||||||
chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config)
|
chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config)
|
||||||
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
|
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
|
||||||
|
|
||||||
|
|
@ -181,11 +187,12 @@ class IngestionService:
|
||||||
if hasattr(self.embedder, 'embed_documents'):
|
if hasattr(self.embedder, 'embed_documents'):
|
||||||
vecs = await self.embedder.embed_documents(texts)
|
vecs = await self.embedder.embed_documents(texts)
|
||||||
else:
|
else:
|
||||||
for t in texts: vecs.append(await self.embedder.embed_query(t))
|
for t in texts:
|
||||||
|
v = await self.embedder.embed_query(t)
|
||||||
|
vecs.append(v)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Embedding Fehler sind kritisch
|
|
||||||
logger.error(f"Embedding failed: {e}")
|
logger.error(f"Embedding failed: {e}")
|
||||||
raise e
|
raise RuntimeError(f"Embedding failed: {e}")
|
||||||
|
|
||||||
# Edges
|
# Edges
|
||||||
try:
|
try:
|
||||||
|
|
@ -202,7 +209,7 @@ class IngestionService:
|
||||||
logger.error(f"Processing failed: {e}", exc_info=True)
|
logger.error(f"Processing failed: {e}", exc_info=True)
|
||||||
return {**result, "error": f"Processing failed: {str(e)}"}
|
return {**result, "error": f"Processing failed: {str(e)}"}
|
||||||
|
|
||||||
# 6. Upsert (Atomic Write recommended, here Batch)
|
# 6. Upsert Action
|
||||||
try:
|
try:
|
||||||
if purge_before and has_old:
|
if purge_before and has_old:
|
||||||
self._purge_artifacts(note_id)
|
self._purge_artifacts(note_id)
|
||||||
|
|
@ -230,7 +237,7 @@ class IngestionService:
|
||||||
logger.error(f"Upsert failed: {e}", exc_info=True)
|
logger.error(f"Upsert failed: {e}", exc_info=True)
|
||||||
return {**result, "error": f"DB Upsert failed: {e}"}
|
return {**result, "error": f"DB Upsert failed: {e}"}
|
||||||
|
|
||||||
# --- Interne Qdrant Helper (Wichtig für Change Detection) ---
|
# --- Qdrant Helper (Restored) ---
|
||||||
|
|
||||||
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
|
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
|
||||||
from qdrant_client.http import models as rest
|
from qdrant_client.http import models as rest
|
||||||
|
|
@ -270,7 +277,6 @@ class IngestionService:
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
WP-11 Persistence API Entrypoint.
|
WP-11 Persistence API Entrypoint.
|
||||||
Speichert Text und erzwingt sofortige Indizierung mit Smart Edges.
|
|
||||||
"""
|
"""
|
||||||
target_dir = os.path.join(vault_root, folder)
|
target_dir = os.path.join(vault_root, folder)
|
||||||
os.makedirs(target_dir, exist_ok=True)
|
os.makedirs(target_dir, exist_ok=True)
|
||||||
|
|
@ -278,25 +284,22 @@ class IngestionService:
|
||||||
file_path = os.path.join(target_dir, filename)
|
file_path = os.path.join(target_dir, filename)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Robust Write: Ensure Flush
|
# Robust Write: Ensure Flush & Sync
|
||||||
with open(file_path, "w", encoding="utf-8") as f:
|
with open(file_path, "w", encoding="utf-8") as f:
|
||||||
f.write(markdown_content)
|
f.write(markdown_content)
|
||||||
f.flush()
|
f.flush()
|
||||||
os.fsync(f.fileno())
|
os.fsync(f.fileno())
|
||||||
|
|
||||||
# Kurzer Sleep für OS Filesystem Latenz
|
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
logger.info(f"Written file to {file_path}")
|
logger.info(f"Written file to {file_path}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return {"status": "error", "error": f"Disk write failed: {str(e)}"}
|
return {"status": "error", "error": f"Disk write failed: {str(e)}"}
|
||||||
|
|
||||||
# Hier aktivieren wir die Smart Edges explizit für den Single-File Import
|
|
||||||
return await self.process_file(
|
return await self.process_file(
|
||||||
file_path=file_path,
|
file_path=file_path,
|
||||||
vault_root=vault_root,
|
vault_root=vault_root,
|
||||||
apply=True,
|
apply=True,
|
||||||
force_replace=True,
|
force_replace=True,
|
||||||
purge_before=True,
|
purge_before=True
|
||||||
force_smart_edges=True # <--- HIER: Intelligence Override
|
|
||||||
)
|
)
|
||||||
|
|
@ -30,7 +30,7 @@ chunking_profiles:
|
||||||
# bei der Generierung nicht zu überlasten. Später wieder aktivieren.
|
# bei der Generierung nicht zu überlasten. Später wieder aktivieren.
|
||||||
sliding_smart_edges:
|
sliding_smart_edges:
|
||||||
strategy: sliding_window
|
strategy: sliding_window
|
||||||
enable_smart_edge_allocation: false
|
enable_smart_edge_allocation: true
|
||||||
target: 400
|
target: 400
|
||||||
max: 600
|
max: 600
|
||||||
overlap: [50, 80]
|
overlap: [50, 80]
|
||||||
|
|
@ -39,7 +39,7 @@ chunking_profiles:
|
||||||
# Für Profile, Werte, Prinzipien. Trennt hart an Überschriften (H2).
|
# Für Profile, Werte, Prinzipien. Trennt hart an Überschriften (H2).
|
||||||
structured_smart_edges:
|
structured_smart_edges:
|
||||||
strategy: by_heading
|
strategy: by_heading
|
||||||
enable_smart_edge_allocation: false
|
enable_smart_edge_allocation: true
|
||||||
split_level: 2
|
split_level: 2
|
||||||
max: 600
|
max: 600
|
||||||
target: 400
|
target: 400
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user