Import Script und Logging für WP15b
This commit is contained in:
parent
f6b2375d65
commit
c676c8263f
|
|
@ -4,8 +4,10 @@ DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen.
|
||||||
WP-20: Optimiert für OpenRouter (mistralai/mistral-7b-instruct:free).
|
WP-20: Optimiert für OpenRouter (mistralai/mistral-7b-instruct:free).
|
||||||
WP-22: Content Lifecycle, Edge Registry Validation & Multi-Hash.
|
WP-22: Content Lifecycle, Edge Registry Validation & Multi-Hash.
|
||||||
WP-15b: Two-Pass Ingestion mit LocalBatchCache & Candidate-Validation.
|
WP-15b: Two-Pass Ingestion mit LocalBatchCache & Candidate-Validation.
|
||||||
FIX: Beibehaltung der Deep Fallback Logic (v2.11.14) zur JSON-Recovery.
|
FIX: Deep Fallback Logic (v2.11.14). Erkennt Policy Violations auch in validen
|
||||||
VERSION: 2.12.0
|
JSON-Objekten und erzwingt den lokalen Ollama-Sprung, um Kantenverlust
|
||||||
|
bei umfangreichen Protokollen zu verhindern.
|
||||||
|
VERSION: 2.12.1
|
||||||
STATUS: Active
|
STATUS: Active
|
||||||
DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker,
|
DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker,
|
||||||
app.services.llm_service, app.services.edge_registry
|
app.services.llm_service, app.services.edge_registry
|
||||||
|
|
@ -128,16 +130,16 @@ class IngestionService:
|
||||||
async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]:
|
async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
WP-15b: Implementiert den Two-Pass Ingestion Workflow.
|
WP-15b: Implementiert den Two-Pass Ingestion Workflow.
|
||||||
Pass 1: Pre-Scan baut Kontext-Cache auf.
|
Pass 1: Pre-Scan baut flüchtigen Kontext-Cache auf.
|
||||||
Pass 2: Processing führt semantische Validierung durch.
|
Pass 2: Processing führt die eigentliche semantische Validierung durch.
|
||||||
"""
|
"""
|
||||||
logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Batch Cache...")
|
logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...")
|
||||||
for path in file_paths:
|
for path in file_paths:
|
||||||
ctx = pre_scan_markdown(path)
|
ctx = pre_scan_markdown(path)
|
||||||
if ctx:
|
if ctx:
|
||||||
self.batch_cache[ctx.note_id] = ctx
|
self.batch_cache[ctx.note_id] = ctx
|
||||||
|
|
||||||
logger.info(f"🚀 [Pass 2] Processing {len(file_paths)} files...")
|
logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...")
|
||||||
results = []
|
results = []
|
||||||
for path in file_paths:
|
for path in file_paths:
|
||||||
res = await self.process_file(path, vault_root, apply=True)
|
res = await self.process_file(path, vault_root, apply=True)
|
||||||
|
|
@ -152,14 +154,17 @@ class IngestionService:
|
||||||
target_id = edge.get("to")
|
target_id = edge.get("to")
|
||||||
target_ctx = self.batch_cache.get(target_id)
|
target_ctx = self.batch_cache.get(target_id)
|
||||||
|
|
||||||
# Falls Zielnotiz nicht im aktuellen Batch ist: 'explicit' durchlassen (Hard-Link Integrity)
|
# Sicherheits-Fallback: Wenn Zielnotiz nicht im aktuellen Batch ist,
|
||||||
|
# lassen wir die Kante als 'explicit' durch (Hard-Link Integrity).
|
||||||
if not target_ctx:
|
if not target_ctx:
|
||||||
|
logger.info(f"ℹ️ [VALIDATION SKIP] No cache context for '{target_id}' - allowing link.")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
provider = self.settings.MINDNET_LLM_PROVIDER
|
provider = self.settings.MINDNET_LLM_PROVIDER
|
||||||
template = self.llm.get_prompt("edge_validation", provider)
|
template = self.llm.get_prompt("edge_validation", provider)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
logger.info(f"⚖️ [VALIDATING] Relation '{edge.get('kind')}' -> '{target_id}'...")
|
||||||
prompt = template.format(
|
prompt = template.format(
|
||||||
chunk_text=chunk_text[:1500],
|
chunk_text=chunk_text[:1500],
|
||||||
target_title=target_ctx.title,
|
target_title=target_ctx.title,
|
||||||
|
|
@ -168,7 +173,14 @@ class IngestionService:
|
||||||
)
|
)
|
||||||
|
|
||||||
response = await self.llm.generate_raw_response(prompt, priority="background")
|
response = await self.llm.generate_raw_response(prompt, priority="background")
|
||||||
return "YES" in response.upper()
|
is_valid = "YES" in response.upper()
|
||||||
|
|
||||||
|
if is_valid:
|
||||||
|
logger.info(f"✅ [VALIDATED] Relation '{edge.get('kind')}' to '{target_id}' confirmed.")
|
||||||
|
else:
|
||||||
|
logger.info(f"🚫 [REJECTED] WP-15b Candidate: '{edge.get('kind')}' -> '{target_id}' not relevant.")
|
||||||
|
|
||||||
|
return is_valid
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"⚠️ Semantic validation error for {target_id}: {e}")
|
logger.warning(f"⚠️ Semantic validation error for {target_id}: {e}")
|
||||||
return True # Fallback: Im Zweifel Link behalten
|
return True # Fallback: Im Zweifel Link behalten
|
||||||
|
|
@ -244,44 +256,49 @@ class IngestionService:
|
||||||
# Chunker Resolution
|
# Chunker Resolution
|
||||||
profile = fm.get("chunk_profile") or fm.get("chunking_profile") or "sliding_standard"
|
profile = fm.get("chunk_profile") or fm.get("chunking_profile") or "sliding_standard"
|
||||||
chunk_cfg = self._get_chunk_config_by_profile(profile, note_type)
|
chunk_cfg = self._get_chunk_config_by_profile(profile, note_type)
|
||||||
|
enable_smart_edges = chunk_cfg.get("enable_smart_edge_allocation", False)
|
||||||
|
|
||||||
|
# WP-15b: Chunker bereitet nun den Candidate-Pool vor.
|
||||||
chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_cfg)
|
chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_cfg)
|
||||||
|
|
||||||
|
# WP-15b: Validierung der Kandidaten aus dem Global Pool.
|
||||||
|
for ch_obj in chunks:
|
||||||
|
filtered_pool = []
|
||||||
|
for cand in getattr(ch_obj, "candidate_pool", []):
|
||||||
|
# Nur 'global_pool' (Unzugeordnete Kanten) erfordern LLM-Validierung.
|
||||||
|
# Sektions-Kanten ('inherited') werden direkt akzeptiert.
|
||||||
|
if cand.get("provenance") == "global_pool" and enable_smart_edges:
|
||||||
|
if await self._validate_candidate(ch_obj.text, cand):
|
||||||
|
filtered_pool.append(cand)
|
||||||
|
else:
|
||||||
|
filtered_pool.append(cand)
|
||||||
|
ch_obj.candidate_pool = filtered_pool
|
||||||
|
|
||||||
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
|
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
|
||||||
|
|
||||||
# Embeddings
|
# Embeddings generieren
|
||||||
vecs = []
|
vecs = []
|
||||||
if chunk_pls:
|
if chunk_pls:
|
||||||
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
|
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
|
||||||
vecs = await self.embedder.embed_documents(texts)
|
vecs = await self.embedder.embed_documents(texts)
|
||||||
|
|
||||||
# Kanten-Extraktion & WP-15b Validierung
|
# Kanten finalisieren via derive_edges Aggregator (WP-15b kompatibel)
|
||||||
edges = []
|
# Nutzt das Provenance-Ranking (v2.1.0).
|
||||||
context = {"file": file_path, "note_id": note_id}
|
edges = build_edges_for_note(
|
||||||
|
note_id,
|
||||||
# A. Explizite Kandidaten (Wikilinks)
|
chunk_pls,
|
||||||
raw_candidates = extract_edges_with_context(parsed)
|
note_level_references=note_pl.get("references", []),
|
||||||
for cand in raw_candidates:
|
include_note_scope_refs=note_scope_refs
|
||||||
# Semantische Prüfung gegen Pass 1 Cache
|
|
||||||
if await self._validate_candidate(body_text, cand):
|
|
||||||
cand["kind"] = edge_registry.resolve(
|
|
||||||
edge_type=cand["kind"],
|
|
||||||
provenance="explicit",
|
|
||||||
context={**context, "line": cand.get("line")}
|
|
||||||
)
|
)
|
||||||
edges.append(cand)
|
|
||||||
else:
|
|
||||||
logger.info(f"🚫 WP-15b: Candidate rejected: {cand['kind']} -> {cand['to']}")
|
|
||||||
|
|
||||||
# B. System Kanten (Struktur)
|
# Alias-Auflösung & Registry Enforcement
|
||||||
try:
|
context = {"file": file_path, "note_id": note_id}
|
||||||
sys_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs)
|
for e in edges:
|
||||||
except:
|
e["kind"] = edge_registry.resolve(
|
||||||
sys_edges = build_edges_for_note(note_id, chunk_pls)
|
edge_type=e.get("kind", "related_to"),
|
||||||
|
provenance=e.get("provenance", "explicit"),
|
||||||
for e in sys_edges:
|
context={**context, "line": e.get("line", "system")}
|
||||||
valid_kind = edge_registry.resolve(edge_type=e.get("kind", "belongs_to"), provenance="structure", context={**context, "line": "system"})
|
)
|
||||||
if valid_kind:
|
|
||||||
e["kind"] = valid_kind
|
|
||||||
edges.append(e)
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Processing failed for {file_path}: {e}", exc_info=True)
|
logger.error(f"Processing failed for {file_path}: {e}", exc_info=True)
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,9 @@
|
||||||
"""
|
"""
|
||||||
scripts/import_markdown.py
|
scripts/import_markdown.py
|
||||||
CLI-Tool zum Importieren von Markdown-Dateien in Qdrant.
|
CLI-Tool zum Importieren von Markdown-Dateien in Qdrant.
|
||||||
Updated for Mindnet v2.3.6 (Async Ingestion Support).
|
WP-15b: Implementiert den Two-Pass Workflow (Pre-Scan + Processing).
|
||||||
|
Sorgt dafür, dass der LocalBatchCache vor der Verarbeitung gefüllt wird.
|
||||||
|
VERSION: 2.4.0
|
||||||
"""
|
"""
|
||||||
import asyncio
|
import asyncio
|
||||||
import os
|
import os
|
||||||
|
|
@ -11,21 +13,16 @@ import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
import logging
|
|
||||||
# Setzt das Level global auf INFO, damit Sie den Fortschritt sehen
|
# Setzt das Level global auf INFO, damit Sie den Fortschritt sehen
|
||||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
|
||||||
|
|
||||||
# Wenn Sie TIEFE Einblicke wollen, setzen Sie den SemanticAnalyzer spezifisch auf DEBUG:
|
|
||||||
logging.getLogger("app.services.semantic_analyzer").setLevel(logging.DEBUG)
|
|
||||||
|
|
||||||
# Importiere den neuen Async Service
|
# Importiere den neuen Async Service
|
||||||
# Stellen wir sicher, dass der Pfad stimmt (Pythonpath)
|
|
||||||
import sys
|
import sys
|
||||||
sys.path.append(os.getcwd())
|
sys.path.append(os.getcwd())
|
||||||
|
|
||||||
from app.core.ingestion import IngestionService
|
from app.core.ingestion import IngestionService
|
||||||
|
from app.core.parser import pre_scan_markdown
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
|
|
||||||
logger = logging.getLogger("importer")
|
logger = logging.getLogger("importer")
|
||||||
|
|
||||||
async def main_async(args):
|
async def main_async(args):
|
||||||
|
|
@ -34,7 +31,7 @@ async def main_async(args):
|
||||||
logger.error(f"Vault path does not exist: {vault_path}")
|
logger.error(f"Vault path does not exist: {vault_path}")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Service initialisieren (startet Async Clients)
|
# 1. Service initialisieren
|
||||||
logger.info(f"Initializing IngestionService (Prefix: {args.prefix})")
|
logger.info(f"Initializing IngestionService (Prefix: {args.prefix})")
|
||||||
service = IngestionService(collection_prefix=args.prefix)
|
service = IngestionService(collection_prefix=args.prefix)
|
||||||
|
|
||||||
|
|
@ -46,14 +43,31 @@ async def main_async(args):
|
||||||
|
|
||||||
logger.info(f"Found {len(files)} markdown files.")
|
logger.info(f"Found {len(files)} markdown files.")
|
||||||
|
|
||||||
stats = {"processed": 0, "skipped": 0, "errors": 0}
|
# =========================================================================
|
||||||
|
# PASS 1: Global Pre-Scan (WP-15b)
|
||||||
|
# Füllt den LocalBatchCache für die semantische Kanten-Validierung.
|
||||||
|
# =========================================================================
|
||||||
|
logger.info(f"🔍 [Pass 1] Pre-scanning {len(files)} files for global context cache...")
|
||||||
|
for f_path in files:
|
||||||
|
try:
|
||||||
|
ctx = pre_scan_markdown(str(f_path))
|
||||||
|
if ctx:
|
||||||
|
service.batch_cache[ctx.note_id] = ctx
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"⚠️ Could not pre-scan {f_path}: {e}")
|
||||||
|
|
||||||
# Wir nutzen eine Semaphore, um nicht zu viele Files gleichzeitig zu öffnen/embedden
|
logger.info(f"✅ Cache populated with {len(service.batch_cache)} note contexts.")
|
||||||
sem = asyncio.Semaphore(5) # Max 5 concurrent files to avoid OOM or Rate Limit
|
|
||||||
|
# =========================================================================
|
||||||
|
# PASS 2: Processing (Batch-Verarbeitung)
|
||||||
|
# =========================================================================
|
||||||
|
stats = {"processed": 0, "skipped": 0, "errors": 0}
|
||||||
|
sem = asyncio.Semaphore(5) # Max 5 parallele Dateien für Stabilität
|
||||||
|
|
||||||
async def process_with_limit(f_path):
|
async def process_with_limit(f_path):
|
||||||
async with sem:
|
async with sem:
|
||||||
try:
|
try:
|
||||||
|
# Nutzt den nun gefüllten Batch-Cache für die Validierung
|
||||||
res = await service.process_file(
|
res = await service.process_file(
|
||||||
file_path=str(f_path),
|
file_path=str(f_path),
|
||||||
vault_root=str(vault_path),
|
vault_root=str(vault_path),
|
||||||
|
|
@ -65,8 +79,8 @@ async def main_async(args):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return {"status": "error", "error": str(e), "path": str(f_path)}
|
return {"status": "error", "error": str(e), "path": str(f_path)}
|
||||||
|
|
||||||
# Batch Processing
|
logger.info(f"🚀 [Pass 2] Starting semantic processing in batches...")
|
||||||
# Wir verarbeiten in Chunks, um den Progress zu sehen
|
|
||||||
batch_size = 20
|
batch_size = 20
|
||||||
for i in range(0, len(files), batch_size):
|
for i in range(0, len(files), batch_size):
|
||||||
batch = files[i:i+batch_size]
|
batch = files[i:i+batch_size]
|
||||||
|
|
@ -92,7 +106,7 @@ def main():
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
default_prefix = os.getenv("COLLECTION_PREFIX", "mindnet")
|
default_prefix = os.getenv("COLLECTION_PREFIX", "mindnet")
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description="Import Vault to Qdrant (Async)")
|
parser = argparse.ArgumentParser(description="Import Vault to Qdrant (Two-Pass Ingestion)")
|
||||||
parser.add_argument("--vault", default="./vault", help="Path to vault root")
|
parser.add_argument("--vault", default="./vault", help="Path to vault root")
|
||||||
parser.add_argument("--prefix", default=default_prefix, help="Collection prefix")
|
parser.add_argument("--prefix", default=default_prefix, help="Collection prefix")
|
||||||
parser.add_argument("--force", action="store_true", help="Force re-index all files")
|
parser.add_argument("--force", action="store_true", help="Force re-index all files")
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user