58 lines
1.6 KiB
Python
58 lines
1.6 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import threading
|
||
|
|
from typing import TYPE_CHECKING
|
||
|
|
|
||
|
|
if TYPE_CHECKING:
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
playwright = pytest.importorskip("playwright.sync_api")
|
||
|
|
|
||
|
|
from auto_reverse.browser import Browser # noqa: E402
|
||
|
|
from auto_reverse.doc.engine import DocEngine # noqa: E402
|
||
|
|
from auto_reverse.proxy import ProxyServer # noqa: E402
|
||
|
|
from auto_reverse.store import FlowStore, ScopeFilter # noqa: E402
|
||
|
|
|
||
|
|
|
||
|
|
def test_capture_to_spec_end_to_end(tmp_path: Path, fixture_site: str):
|
||
|
|
from urllib.parse import urlsplit
|
||
|
|
|
||
|
|
host = urlsplit(fixture_site).hostname
|
||
|
|
port = urlsplit(fixture_site).port
|
||
|
|
scope = ScopeFilter(target_hosts={f"{host}:{port}", host})
|
||
|
|
|
||
|
|
engine_holder: dict = {}
|
||
|
|
|
||
|
|
def on_new(sig):
|
||
|
|
engine_holder["engine"].document(sig)
|
||
|
|
|
||
|
|
store = FlowStore(scope, on_new_signature=on_new)
|
||
|
|
engine = DocEngine(store, out_dir=tmp_path, title="fixture", use_llm=False)
|
||
|
|
engine_holder["engine"] = engine
|
||
|
|
|
||
|
|
proxy = ProxyServer(store, archive_path=tmp_path / "archive.log", port=0)
|
||
|
|
try:
|
||
|
|
proxy.start()
|
||
|
|
except Exception as exc:
|
||
|
|
pytest.skip(f"proxy unavailable: {exc}")
|
||
|
|
|
||
|
|
try:
|
||
|
|
browser = Browser(proxy_port=proxy.port, headless=True)
|
||
|
|
browser.start()
|
||
|
|
except Exception as exc:
|
||
|
|
proxy.stop()
|
||
|
|
pytest.skip(f"browser unavailable: {exc}")
|
||
|
|
|
||
|
|
try:
|
||
|
|
browser.navigate(fixture_site + "/") # triggers fetch('/api/users')
|
||
|
|
# allow capture to settle
|
||
|
|
threading.Event().wait(1.0)
|
||
|
|
assert any(
|
||
|
|
"/api/users" in r.signature.path_template for r in store.endpoints()
|
||
|
|
)
|
||
|
|
finally:
|
||
|
|
browser.stop()
|
||
|
|
proxy.stop()
|