from __future__ import annotations import threading from typing import TYPE_CHECKING if TYPE_CHECKING: from pathlib import Path import pytest playwright = pytest.importorskip("playwright.sync_api") from auto_reverse.browser import Browser # noqa: E402 from auto_reverse.doc.engine import DocEngine # noqa: E402 from auto_reverse.proxy import ProxyServer # noqa: E402 from auto_reverse.store import FlowStore, ScopeFilter # noqa: E402 def test_capture_to_spec_end_to_end(tmp_path: Path, fixture_site: str): from urllib.parse import urlsplit host = urlsplit(fixture_site).hostname port = urlsplit(fixture_site).port scope = ScopeFilter(target_hosts={f"{host}:{port}", host}) engine_holder: dict = {} def on_new(sig): engine_holder["engine"].document(sig) store = FlowStore(scope, on_new_signature=on_new) engine = DocEngine(store, out_dir=tmp_path, title="fixture", use_llm=False) engine_holder["engine"] = engine proxy = ProxyServer(store, archive_path=tmp_path / "archive.log", port=0) try: proxy.start() except Exception as exc: pytest.skip(f"proxy unavailable: {exc}") try: browser = Browser(proxy_port=proxy.port, headless=True) browser.start() except Exception as exc: proxy.stop() pytest.skip(f"browser unavailable: {exc}") try: browser.navigate(fixture_site + "/") # triggers fetch('/api/users') # allow capture to settle threading.Event().wait(1.0) assert any( "/api/users" in r.signature.path_template for r in store.endpoints() ) finally: browser.stop() proxy.stop()