247 lines
8.8 KiB
Python
247 lines
8.8 KiB
Python
"""
|
||
Scout-Logik: Domain scannen, Links extrahieren, via OpenRouter Publikations-URL identifizieren.
|
||
Mit Stealth und HTTP/2-Fallback gegen extreme Bot-Detection.
|
||
"""
|
||
import json
|
||
import re
|
||
from typing import Any
|
||
from urllib.parse import urlparse
|
||
|
||
import httpx
|
||
from playwright.async_api import async_playwright
|
||
from playwright_stealth import Stealth
|
||
|
||
# OpenRouter Base-URL und Modell
|
||
OPENROUTER_BASE = "https://openrouter.ai/api/v1"
|
||
DEFAULT_MODEL = "google/gemini-flash-1.5-8b"
|
||
|
||
# Browser-Konfiguration gegen Bot-Detection (z. B. McKinsey) und HTTP/2-Fehler
|
||
USER_AGENT = (
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
||
"(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36"
|
||
)
|
||
EXTRA_HEADERS = {
|
||
"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
|
||
"Referer": "https://www.google.com/",
|
||
}
|
||
VIEWPORT = {"width": 1920, "height": 1080}
|
||
GOTO_TIMEOUT = 60_000 # ms
|
||
|
||
|
||
def _normalize_domain(domain: str) -> str:
|
||
"""Extrahiert die reine Domain (Host) ohne Schema und Pfad."""
|
||
s = domain.strip()
|
||
if s.startswith("http://"):
|
||
s = s[7:]
|
||
elif s.startswith("https://"):
|
||
s = s[8:]
|
||
if "/" in s:
|
||
s = s.split("/", 1)[0]
|
||
return s.lower() or domain
|
||
|
||
|
||
def _url_with_www(domain: str) -> str:
|
||
"""Immer https://www.{domain} für einheitlichen Aufruf."""
|
||
host = _normalize_domain(domain)
|
||
return f"https://www.{host}"
|
||
|
||
|
||
def _url_without_www(domain: str) -> str:
|
||
"""https://{domain} für Fallback ohne www."""
|
||
host = _normalize_domain(domain)
|
||
return f"https://{host}"
|
||
|
||
|
||
async def _fetch_links_impl(domain: str, disable_http2: bool = False) -> list[dict[str, str]]:
|
||
"""
|
||
Interne Implementierung: Browser starten, Stealth anwenden, Zielseite laden, Links extrahieren.
|
||
Bei disable_http2=True wird Chromium mit args=['--disable-http2'] gestartet (Fallback bei ERR_HTTP2_PROTOCOL_ERROR).
|
||
"""
|
||
url_primary = _url_with_www(domain)
|
||
url_insights = f"{url_primary.rstrip('/')}/featured-insights" # Direkt zur Insights-Seite (z. B. McKinsey)
|
||
url_fallback = _url_without_www(domain)
|
||
links: list[dict[str, str]] = []
|
||
|
||
launch_options: dict[str, Any] = {"headless": True}
|
||
if disable_http2:
|
||
launch_options["args"] = ["--disable-http2"]
|
||
|
||
async with async_playwright() as p:
|
||
browser = await p.chromium.launch(**launch_options)
|
||
try:
|
||
# Kontext: aktueller User-Agent, JavaScript an, Headers, Viewport (Bot-Detection umgehen)
|
||
context = await browser.new_context(
|
||
user_agent=USER_AGENT,
|
||
extra_http_headers=EXTRA_HEADERS,
|
||
viewport=VIEWPORT,
|
||
java_script_enabled=True,
|
||
)
|
||
# Stealth auf Kontext anwenden (API 2.x: apply_stealth_async(context))
|
||
await Stealth().apply_stealth_async(context)
|
||
page = await context.new_page()
|
||
|
||
# Versuch 1: Direkt zur Zielseite (z. B. https://www.mckinsey.com/featured-insights)
|
||
try:
|
||
await page.goto(
|
||
url_insights,
|
||
wait_until="networkidle",
|
||
timeout=GOTO_TIMEOUT,
|
||
)
|
||
except Exception:
|
||
# Versuch 2: Startseite mit www
|
||
try:
|
||
await page.goto(
|
||
url_primary,
|
||
wait_until="networkidle",
|
||
timeout=GOTO_TIMEOUT,
|
||
)
|
||
except Exception:
|
||
# Versuch 3: ohne www
|
||
try:
|
||
await page.goto(
|
||
url_fallback,
|
||
wait_until="networkidle",
|
||
timeout=GOTO_TIMEOUT,
|
||
)
|
||
except Exception:
|
||
# Versuch 4: domcontentloaded als letzter Fallback
|
||
try:
|
||
await page.goto(
|
||
url_primary,
|
||
wait_until="domcontentloaded",
|
||
timeout=30_000,
|
||
)
|
||
except Exception:
|
||
await page.goto(
|
||
url_fallback,
|
||
wait_until="domcontentloaded",
|
||
timeout=30_000,
|
||
)
|
||
|
||
# Alle <a href="..."> auslesen
|
||
links = await page.evaluate(
|
||
"""() => {
|
||
const anchors = document.querySelectorAll('a[href]');
|
||
return Array.from(anchors).map(a => ({
|
||
text: (a.textContent || '').trim().slice(0, 200),
|
||
href: a.getAttribute('href') || ''
|
||
})).filter(x => x.href);
|
||
}"""
|
||
)
|
||
await context.close()
|
||
finally:
|
||
await browser.close()
|
||
|
||
return links
|
||
|
||
|
||
async def _fetch_links_with_playwright(domain: str) -> list[dict[str, str]]:
|
||
"""
|
||
Lädt die Domain mit Playwright (Stealth, Kontext gegen Bot-Detection), extrahiert <a>-Tags.
|
||
Bei ERR_HTTP2_PROTOCOL_ERROR: erneuter Versuch mit --disable-http2.
|
||
"""
|
||
try:
|
||
return await _fetch_links_impl(domain, disable_http2=False)
|
||
except Exception as e:
|
||
err_msg = str(e)
|
||
if "ERR_HTTP2_PROTOCOL_ERROR" in err_msg or "net::ERR_" in err_msg:
|
||
return await _fetch_links_impl(domain, disable_http2=True)
|
||
raise
|
||
|
||
|
||
def _make_absolute(href: str, base_url: str) -> str:
|
||
"""Macht relative URLs absolut (einfache Heuristik)."""
|
||
if not href or href.startswith("#"):
|
||
return ""
|
||
if href.startswith("http://") or href.startswith("https://"):
|
||
return href
|
||
base = base_url.rstrip("/")
|
||
if href.startswith("/"):
|
||
parsed = urlparse(base)
|
||
return f"{parsed.scheme}://{parsed.netloc}{href}"
|
||
return f"{base}/{href}"
|
||
|
||
|
||
async def _ask_openrouter(api_key: str, links: list[dict[str, str]], domain: str) -> str | None:
|
||
"""
|
||
Sendet die Link-Liste an OpenRouter und fordert die beste Publikations-URL an.
|
||
Erwartet Antwort im Format: {"url": "..."} – unverändert.
|
||
"""
|
||
base_url = _url_with_www(domain)
|
||
prompt = (
|
||
"Analysiere diese Links einer Unternehmensberatung. "
|
||
"Welcher Link führt zur Seite mit Reports, Insights oder Fachartikeln? "
|
||
"Antworte NUR mit der absoluten URL im JSON-Format: {'url': '...'}"
|
||
)
|
||
links_text = "\n".join(
|
||
f"- {l.get('text', '')} -> {_make_absolute(l.get('href', ''), base_url)}"
|
||
for l in links[:80] # Begrenzen, um Token-Limit zu schonen
|
||
)
|
||
user_content = f"Links von {domain}:\n{links_text}"
|
||
|
||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||
resp = await client.post(
|
||
f"{OPENROUTER_BASE}/chat/completions",
|
||
headers={
|
||
"Authorization": f"Bearer {api_key}",
|
||
"Content-Type": "application/json",
|
||
"HTTP-Referer": base_url,
|
||
},
|
||
json={
|
||
"model": DEFAULT_MODEL,
|
||
"messages": [
|
||
{"role": "system", "content": prompt},
|
||
{"role": "user", "content": user_content},
|
||
],
|
||
"max_tokens": 256,
|
||
},
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
choice = (data.get("choices") or [None])[0]
|
||
if not choice:
|
||
return None
|
||
content = (choice.get("message") or {}).get("content") or ""
|
||
if not content.strip():
|
||
return None
|
||
|
||
# JSON aus Antwort extrahieren (falls von Markdown umgeben)
|
||
content = content.strip()
|
||
json_match = re.search(r"\{[^{}]*\"url\"[^{}]*\}", content)
|
||
if json_match:
|
||
try:
|
||
obj = json.loads(json_match.group())
|
||
return (obj.get("url") or "").strip() or None
|
||
except json.JSONDecodeError:
|
||
pass
|
||
try:
|
||
obj = json.loads(content)
|
||
return (obj.get("url") or "").strip() or None
|
||
except json.JSONDecodeError:
|
||
return None
|
||
|
||
|
||
async def get_publication_url(domain: str, *, api_key: str | None = None) -> dict[str, Any]:
|
||
"""
|
||
Hauptfunktion: Domain scannen, Links an OpenRouter senden,
|
||
gefundene Publikations-URL zurückgeben.
|
||
"""
|
||
import os
|
||
key = api_key or os.getenv("OPENROUTER_API_KEY")
|
||
if not key:
|
||
return {"url": None, "error": "OPENROUTER_API_KEY nicht gesetzt"}
|
||
|
||
try:
|
||
links = await _fetch_links_with_playwright(domain)
|
||
except Exception as e:
|
||
return {"url": None, "error": f"Playwright/Scrape-Fehler: {e!s}"}
|
||
|
||
if not links:
|
||
return {"url": None, "error": "Keine Links auf der Seite gefunden"}
|
||
|
||
try:
|
||
url = await _ask_openrouter(key, links, domain)
|
||
return {"url": url, "error": None}
|
||
except Exception as e:
|
||
return {"url": None, "error": f"OpenRouter-Fehler: {e!s}"}
|