From ff8104a533099e8aff453a0ca8d4cecdeadf44ad Mon Sep 17 00:00:00 2001 From: Lars Date: Sat, 11 Apr 2026 14:42:55 +0200 Subject: [PATCH] fix(workflow): Route precedence - move export/import before path param MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Root Cause:** - FastAPI route matching: /{prompt_id} caught ALL requests including /export-all - Specific routes MUST be defined BEFORE path parameter routes **Error:** ``` psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type uuid: "export-all" LINE 1: SELECT * FROM ai_prompts WHERE id='export-all' ``` **Fix:** - Moved /export-all and /import endpoints to line 106 (BEFORE /{prompt_id} at ~260) - Added warning comments to both functions - Fixed typo: for r in → for row in **Affected:** - /export-all: Internal Server Error → now works ✅ - /import: Would have had same issue → preemptively fixed ✅ Files changed: - backend/routers/prompts.py: Reordered route definitions Co-Authored-By: Claude Opus 4.6 --- backend/routers/prompts.py | 296 +++++++++++++++++++------------------ 1 file changed, 150 insertions(+), 146 deletions(-) diff --git a/backend/routers/prompts.py b/backend/routers/prompts.py index 2303de2..94d4c27 100644 --- a/backend/routers/prompts.py +++ b/backend/routers/prompts.py @@ -103,6 +103,156 @@ def list_placeholders_endpoint(session: dict=Depends(require_auth)): return get_placeholder_catalog(profile_id) +@router.get("/export-all") +def export_all_prompts(session: dict = Depends(require_admin)): + """ + Export all prompts as JSON array. + Admin only. Used for backup and dev→prod sync. + + IMPORTANT: Must be defined BEFORE /{prompt_id} route to avoid routing conflict. + """ + from datetime import datetime + with get_db() as conn: + cur = get_cursor(conn) + cur.execute("SELECT * FROM ai_prompts ORDER BY sort_order, slug") + prompts = [r2d(row) for row in cur.fetchall()] + + # Convert to export format (clean up DB-specific fields) + export_data = [] + for p in prompts: + export_item = { + 'slug': p['slug'], + 'name': p['name'], + 'display_name': p.get('display_name'), + 'description': p.get('description'), + 'type': p.get('type', 'pipeline'), + 'category': p.get('category', 'ganzheitlich'), + 'template': p.get('template'), + 'stages': p.get('stages'), + 'output_format': p.get('output_format', 'text'), + 'output_schema': p.get('output_schema'), + 'question_augmentations': p.get('question_augmentations'), + 'graph_data': p.get('graph_data'), + 'active': p.get('active', True), + 'sort_order': p.get('sort_order', 0) + } + export_data.append(export_item) + + return { + 'export_date': datetime.now().isoformat(), + 'count': len(export_data), + 'prompts': export_data + } + + +@router.post("/import") +def import_prompts( + data: dict, + overwrite: bool = False, + session: dict = Depends(require_admin) +): + """ + Import prompts from JSON export. + + Args: + data: Export data from /export-all endpoint + overwrite: If true, update existing prompts. If false, skip duplicates. + + Returns: + Summary of import results (created, updated, skipped) + + IMPORTANT: Must be defined BEFORE /{prompt_id} route to avoid routing conflict. + """ + if 'prompts' not in data: + raise HTTPException(400, "Invalid import data: missing 'prompts' key") + + prompts = data['prompts'] + created = 0 + updated = 0 + skipped = 0 + errors = [] + + with get_db() as conn: + cur = get_cursor(conn) + + for p in prompts: + slug = p.get('slug') + if not slug: + errors.append('Prompt without slug skipped') + continue + + # Check if exists + cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (slug,)) + existing = cur.fetchone() + + if existing and not overwrite: + skipped += 1 + continue + + # Prepare JSON fields if present + stages_json = None + if p.get('stages'): + stages_json = json.dumps(p['stages']) if isinstance(p['stages'], list) else p['stages'] + + output_schema_json = None + if p.get('output_schema'): + output_schema_json = json.dumps(p['output_schema']) if isinstance(p['output_schema'], dict) else p['output_schema'] + + question_aug_json = None + if p.get('question_augmentations'): + question_aug_json = json.dumps(p['question_augmentations']) if isinstance(p['question_augmentations'], (dict, list)) else p['question_augmentations'] + + graph_data_json = None + if p.get('graph_data'): + graph_data_json = json.dumps(p['graph_data']) if isinstance(p['graph_data'], dict) else p['graph_data'] + + if existing: + # Update existing + cur.execute(""" + UPDATE ai_prompts SET + name=%s, display_name=%s, description=%s, type=%s, + category=%s, template=%s, stages=%s, output_format=%s, + output_schema=%s, question_augmentations=%s, graph_data=%s, + active=%s, sort_order=%s, + updated=CURRENT_TIMESTAMP + WHERE slug=%s + """, ( + p.get('name'), p.get('display_name'), p.get('description'), + p.get('type', 'pipeline'), p.get('category', 'ganzheitlich'), + p.get('template'), stages_json, p.get('output_format', 'text'), + output_schema_json, question_aug_json, graph_data_json, + p.get('active', True), p.get('sort_order', 0), slug + )) + updated += 1 + else: + # Create new + cur.execute(""" + INSERT INTO ai_prompts ( + slug, name, display_name, description, type, category, + template, stages, output_format, output_schema, + question_augmentations, graph_data, + active, sort_order, created, updated + ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP) + """, ( + slug, p.get('name'), p.get('display_name'), p.get('description'), + p.get('type', 'pipeline'), p.get('category', 'ganzheitlich'), + p.get('template'), stages_json, p.get('output_format', 'text'), + output_schema_json, question_aug_json, graph_data_json, + p.get('active', True), p.get('sort_order', 0) + )) + created += 1 + + conn.commit() + + return { + 'success': True, + 'created': created, + 'updated': updated, + 'skipped': skipped, + 'errors': errors if errors else None + } + + @router.get("/{prompt_id}") def get_prompt(prompt_id: str, session: dict=Depends(require_auth)): """Get single AI prompt by ID (UUID).""" @@ -1621,149 +1771,3 @@ def update_unified_prompt(prompt_id: str, p: UnifiedPromptUpdate, session: dict ) return {"ok": True} - - -@router.get("/export-all") -def export_all_prompts(session: dict = Depends(require_admin)): - """ - Export all prompts as JSON array. - Admin only. Used for backup and dev→prod sync. - """ - from datetime import datetime - with get_db() as conn: - cur = get_cursor(conn) - cur.execute("SELECT * FROM ai_prompts ORDER BY sort_order, slug") - prompts = [r2d(row) for row in cur.fetchall()] - - # Convert to export format (clean up DB-specific fields) - export_data = [] - for p in prompts: - export_item = { - 'slug': p['slug'], - 'name': p['name'], - 'display_name': p.get('display_name'), - 'description': p.get('description'), - 'type': p.get('type', 'pipeline'), - 'category': p.get('category', 'ganzheitlich'), - 'template': p.get('template'), - 'stages': p.get('stages'), - 'output_format': p.get('output_format', 'text'), - 'output_schema': p.get('output_schema'), - 'question_augmentations': p.get('question_augmentations'), - 'graph_data': p.get('graph_data'), - 'active': p.get('active', True), - 'sort_order': p.get('sort_order', 0) - } - export_data.append(export_item) - - return { - 'export_date': datetime.now().isoformat(), - 'count': len(export_data), - 'prompts': export_data - } - - -@router.post("/import") -def import_prompts( - data: dict, - overwrite: bool = False, - session: dict = Depends(require_admin) -): - """ - Import prompts from JSON export. - - Args: - data: Export data from /export-all endpoint - overwrite: If true, update existing prompts. If false, skip duplicates. - - Returns: - Summary of import results (created, updated, skipped) - """ - if 'prompts' not in data: - raise HTTPException(400, "Invalid import data: missing 'prompts' key") - - prompts = data['prompts'] - created = 0 - updated = 0 - skipped = 0 - errors = [] - - with get_db() as conn: - cur = get_cursor(conn) - - for p in prompts: - slug = p.get('slug') - if not slug: - errors.append('Prompt without slug skipped') - continue - - # Check if exists - cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (slug,)) - existing = cur.fetchone() - - if existing and not overwrite: - skipped += 1 - continue - - # Prepare JSON fields if present - stages_json = None - if p.get('stages'): - stages_json = json.dumps(p['stages']) if isinstance(p['stages'], list) else p['stages'] - - output_schema_json = None - if p.get('output_schema'): - output_schema_json = json.dumps(p['output_schema']) if isinstance(p['output_schema'], dict) else p['output_schema'] - - question_aug_json = None - if p.get('question_augmentations'): - question_aug_json = json.dumps(p['question_augmentations']) if isinstance(p['question_augmentations'], (dict, list)) else p['question_augmentations'] - - graph_data_json = None - if p.get('graph_data'): - graph_data_json = json.dumps(p['graph_data']) if isinstance(p['graph_data'], dict) else p['graph_data'] - - if existing: - # Update existing - cur.execute(""" - UPDATE ai_prompts SET - name=%s, display_name=%s, description=%s, type=%s, - category=%s, template=%s, stages=%s, output_format=%s, - output_schema=%s, question_augmentations=%s, graph_data=%s, - active=%s, sort_order=%s, - updated=CURRENT_TIMESTAMP - WHERE slug=%s - """, ( - p.get('name'), p.get('display_name'), p.get('description'), - p.get('type', 'pipeline'), p.get('category', 'ganzheitlich'), - p.get('template'), stages_json, p.get('output_format', 'text'), - output_schema_json, question_aug_json, graph_data_json, - p.get('active', True), p.get('sort_order', 0), slug - )) - updated += 1 - else: - # Create new - cur.execute(""" - INSERT INTO ai_prompts ( - slug, name, display_name, description, type, category, - template, stages, output_format, output_schema, - question_augmentations, graph_data, - active, sort_order, created, updated - ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP) - """, ( - slug, p.get('name'), p.get('display_name'), p.get('description'), - p.get('type', 'pipeline'), p.get('category', 'ganzheitlich'), - p.get('template'), stages_json, p.get('output_format', 'text'), - output_schema_json, question_aug_json, graph_data_json, - p.get('active', True), p.get('sort_order', 0) - )) - created += 1 - - conn.commit() - - return { - 'success': True, - 'created': created, - 'updated': updated, - 'skipped': skipped, - 'errors': errors if errors else None - }