diff --git a/docs/superpowers/plans/2026-05-31-auto-reverse.md b/docs/superpowers/plans/2026-05-31-auto-reverse.md new file mode 100644 index 0000000..598eaf1 --- /dev/null +++ b/docs/superpowers/plans/2026-05-31-auto-reverse.md @@ -0,0 +1,2450 @@ +# auto-reverse Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Build `auto-reverse`, a conversational CLI that reverse-engineers a website's API by driving a headed browser with an LLM while an embedded mitmproxy captures and documents real traffic into a live OpenAPI spec + markdown. + +**Architecture:** Single free-threaded Python 3.14 process. A main-thread Claude tool-use agent ("the brain") acts on a Playwright headed browser and queries/commands a thread-safe Flow Store; an embedded mitmproxy `DumpMaster` runs in its own thread capturing every flow; a Doc Worker thread turns new endpoint *signatures* into OpenAPI/markdown (deterministic schema inference + LLM enrichment only on novelty). Build bottom-up: pure core (store/schema/doc) first, then I/O integration (proxy/browser), then tools/agent/REPL/CLI. + +**Tech Stack:** Python 3.14 (free-threaded), uv, Playwright, mitmproxy, anthropic SDK, genson (schema inference), pytest + pytest-asyncio. Default model `claude-opus-4-8`. + +**Spec:** `docs/superpowers/specs/2026-05-31-auto-reverse-design.md` + +--- + +## File Structure + +``` +src/auto_reverse/ + __init__.py # main() entrypoint (delegates to cli.run) + cli.py # arg parsing, wiring, thread lifecycle + config.py # Config dataclass + pluggable auth stub + models.py # CapturedFlow, Signature, EndpointRecord, helpers + store.py # FlowStore (thread-safe), ScopeFilter, path templating + proxy.py # embedded mitmproxy master + CaptureAddon + archive + browser.py # Playwright headed browser wrapper + take-over + agent.py # Claude tool-use loop + repl.py # chat loop + /meta-commands + take-over + tools/ + __init__.py # tool registry assembly + browser_tools.py # browser.* tool schemas + handlers + flows_tools.py # flows.* tool schemas + handlers + doc_tools.py # doc.* tool schemas + handlers + doc/ + __init__.py + schema.py # deterministic JSON Schema inference + merge (genson wrap) + engine.py # DocEngine: consumes new-signature events, writes outputs + openapi.py # OpenAPI assembly from EndpointRecords + markdown.py # human-readable API.md rendering + client.py # optional --gen-client codegen + +tests/ + conftest.py # fixtures: fixture HTTP site, sample flows + fixture_site.py # stdlib http.server JSON app for integration tests + test_models.py + test_store.py + test_scope.py + test_schema.py + test_openapi.py + test_markdown.py + test_config.py + test_proxy.py + test_browser.py # requires playwright browsers; skipped if absent + test_agent.py # mocked anthropic client + test_tools.py + test_e2e_smoke.py # fixture site end-to-end; skipped if browsers absent +``` + +--- + +## Task 0: Dependencies and test scaffolding + +**Files:** +- Modify: `pyproject.toml` +- Create: `tests/__init__.py`, `tests/conftest.py`, `tests/fixture_site.py` + +- [ ] **Step 1: Add runtime + dev dependencies** + +Run: +```bash +cd /home/df/projects/reverse_engineer +uv add playwright mitmproxy anthropic genson +uv add --dev pytest pytest-asyncio +``` +Expected: `pyproject.toml` gains a populated `dependencies` list and a `[dependency-groups]`/dev group; `uv.lock` updates. If any package lacks a free-threaded 3.14 wheel and the resolve fails, re-run with the GIL interpreter selected (`uv add --python 3.14 ...`) and record the fallback in `README.md` per the spec's free-threading caveat. + +- [ ] **Step 2: Install Playwright's Chromium** + +Run: +```bash +uv run playwright install chromium +``` +Expected: downloads the Chromium build. If it fails in this environment, that only affects browser/E2E tests (which are guarded to skip); core tasks proceed regardless. + +- [ ] **Step 3: Add pytest config to `pyproject.toml`** + +Append: +```toml +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] +``` + +- [ ] **Step 4: Create the stdlib fixture site** + +Create `tests/fixture_site.py`: +```python +"""A tiny dependency-free JSON site for integration tests, served over HTTP.""" + +from __future__ import annotations + +import json +import threading +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + + +class _Handler(BaseHTTPRequestHandler): + def log_message(self, *args: object) -> None: # silence test output + pass + + def _send_json(self, status: int, payload: object) -> None: + body = json.dumps(payload).encode() + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_GET(self) -> None: + if self.path == "/": + html = b"
" + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header("Content-Length", str(len(html))) + self.end_headers() + self.wfile.write(html) + elif self.path == "/api/users": + self._send_json(200, [{"id": 1, "name": "Ada"}]) + elif self.path.startswith("/api/users/"): + self._send_json(200, {"id": int(self.path.rsplit("/", 1)[1]), "name": "Ada"}) + else: + self._send_json(404, {"error": "not found"}) + + def do_POST(self) -> None: + length = int(self.headers.get("Content-Length", "0")) + raw = self.rfile.read(length) if length else b"{}" + self._send_json(201, {"received": json.loads(raw or b"{}")}) + + +def start_fixture_site() -> tuple[ThreadingHTTPServer, str]: + """Start the site on an ephemeral port; return (server, base_url).""" + server = ThreadingHTTPServer(("127.0.0.1", 0), _Handler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + host, port = server.server_address + return server, f"http://{host}:{port}" +``` + +- [ ] **Step 5: Create test package marker and conftest fixtures** + +Create `tests/__init__.py` (empty). + +Create `tests/conftest.py`: +```python +from __future__ import annotations + +from collections.abc import Iterator + +import pytest + +from tests.fixture_site import start_fixture_site + + +@pytest.fixture +def fixture_site() -> Iterator[str]: + server, base_url = start_fixture_site() + try: + yield base_url + finally: + server.shutdown() +``` + +- [ ] **Step 6: Verify the toolchain runs** + +Run: +```bash +uv run pytest -q +``` +Expected: pytest collects 0 tests and exits 0 (no tests yet). Confirms config is valid. + +- [ ] **Step 7: Commit** + +```bash +git add pyproject.toml uv.lock tests/ +git commit -m "build: add deps and test scaffolding for auto-reverse" +``` + +--- + +## Task 1: Core data models + +**Files:** +- Create: `src/auto_reverse/models.py` +- Test: `tests/test_models.py` + +- [ ] **Step 1: Write failing tests** + +Create `tests/test_models.py`: +```python +from auto_reverse.models import CapturedFlow, Signature, status_class + + +def test_status_class_buckets(): + assert status_class(200) == "2xx" + assert status_class(201) == "2xx" + assert status_class(404) == "4xx" + assert status_class(503) == "5xx" + + +def test_signature_is_hashable_and_equal(): + a = Signature("GET", "ex.com", "/api/users/{id}", "2xx") + b = Signature("GET", "ex.com", "/api/users/{id}", "2xx") + assert a == b + assert {a, b} == {a} + + +def test_captured_flow_json_body_parsing(): + flow = CapturedFlow( + method="POST", host="ex.com", path="/api/x", query={}, + req_headers={"content-type": "application/json"}, req_body=b'{"a": 1}', + status=201, resp_headers={"content-type": "application/json"}, + resp_body=b'{"ok": true}', timestamp=0.0, + ) + assert flow.request_json() == {"a": 1} + assert flow.response_json() == {"ok": True} + + +def test_captured_flow_non_json_body_returns_none(): + flow = CapturedFlow( + method="GET", host="ex.com", path="/x", query={}, + req_headers={}, req_body=None, status=200, + resp_headers={"content-type": "text/html"}, resp_body=b"", + timestamp=0.0, + ) + assert flow.response_json() is None +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_models.py -q` +Expected: FAIL — `ModuleNotFoundError: No module named 'auto_reverse.models'`. + +- [ ] **Step 3: Implement `models.py`** + +Create `src/auto_reverse/models.py`: +```python +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from typing import Any + + +def status_class(status: int) -> str: + return f"{status // 100}xx" + + +@dataclass(frozen=True) +class Signature: + method: str + host: str + path_template: str + status_class: str + + +@dataclass +class CapturedFlow: + method: str + host: str + path: str + query: dict[str, list[str]] + req_headers: dict[str, str] + req_body: bytes | None + status: int + resp_headers: dict[str, str] + resp_body: bytes | None + timestamp: float + + def _json(self, body: bytes | None, headers: dict[str, str]) -> Any | None: + if body is None: + return None + ctype = headers.get("content-type", "").lower() + if "json" not in ctype: + return None + try: + return json.loads(body) + except (ValueError, UnicodeDecodeError): + return None + + def request_json(self) -> Any | None: + return self._json(self.req_body, self.req_headers) + + def response_json(self) -> Any | None: + return self._json(self.resp_body, self.resp_headers) + + +@dataclass +class EndpointRecord: + signature: Signature + sample_count: int = 0 + query_params: set[str] = field(default_factory=set) + request_schema: dict[str, Any] | None = None + response_schema: dict[str, Any] | None = None + # LLM-enriched fields (filled by the doc engine): + summary: str = "" + description: str = "" + tag: str = "" + documented: bool = False +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_models.py -q` +Expected: PASS (4 tests). + +- [ ] **Step 5: Commit** + +```bash +git add src/auto_reverse/models.py tests/test_models.py +git commit -m "feat: core data models (Signature, CapturedFlow, EndpointRecord)" +``` + +--- + +## Task 2: Path templating + +**Files:** +- Modify: `src/auto_reverse/store.py` (create) +- Test: `tests/test_store.py` + +- [ ] **Step 1: Write failing tests** + +Create `tests/test_store.py`: +```python +from auto_reverse.store import path_template + + +def test_collapses_numeric_ids(): + assert path_template("/api/users/4812/orders/99") == "/api/users/{id}/orders/{id}" + + +def test_collapses_uuid(): + p = "/api/items/550e8400-e29b-41d4-a716-446655440000" + assert path_template(p) == "/api/items/{id}" + + +def test_collapses_long_hex_token(): + assert path_template("/files/a1b2c3d4e5f60718293a4b5c") == "/files/{id}" + + +def test_keeps_short_words(): + assert path_template("/api/users/me/settings") == "/api/users/me/settings" + + +def test_root_and_empty(): + assert path_template("/") == "/" + assert path_template("") == "/" +``` + +- [ ] **Step 2: Run to verify fail** + +Run: `uv run pytest tests/test_store.py -q` +Expected: FAIL — `ModuleNotFoundError: No module named 'auto_reverse.store'`. + +- [ ] **Step 3: Implement `path_template` in `store.py`** + +Create `src/auto_reverse/store.py`: +```python +from __future__ import annotations + +import re + +_UUID = re.compile(r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$") +_HEX_TOKEN = re.compile(r"^[0-9a-fA-F]{16,}$") +_LONG_OPAQUE = re.compile(r"^[A-Za-z0-9_\-]{20,}$") + + +def _is_variable(segment: str) -> bool: + if segment.isdigit(): + return True + if _UUID.match(segment): + return True + if _HEX_TOKEN.match(segment): + return True + if _LONG_OPAQUE.match(segment) and any(c.isdigit() for c in segment): + return True + return False + + +def path_template(path: str) -> str: + """Collapse variable path segments (ids, UUIDs, hashes, opaque tokens) to {id}.""" + if not path or path == "/": + return "/" + parts = path.split("/") + out = ["{id}" if part and _is_variable(part) else part for part in parts] + return "/".join(out) +``` + +- [ ] **Step 4: Run to verify pass** + +Run: `uv run pytest tests/test_store.py -q` +Expected: PASS (5 tests). + +- [ ] **Step 5: Commit** + +```bash +git add src/auto_reverse/store.py tests/test_store.py +git commit -m "feat: path templating for endpoint signatures" +``` + +--- + +## Task 3: Scope filter + +**Files:** +- Modify: `src/auto_reverse/store.py` +- Test: `tests/test_scope.py` + +- [ ] **Step 1: Write failing tests** + +Create `tests/test_scope.py`: +```python +from auto_reverse.models import CapturedFlow +from auto_reverse.store import ScopeFilter + + +def _flow(host: str, path: str, ctype: str = "application/json") -> CapturedFlow: + return CapturedFlow( + method="GET", host=host, path=path, query={}, req_headers={}, + req_body=None, status=200, resp_headers={"content-type": ctype}, + resp_body=b"{}", timestamp=0.0, + ) + + +def test_target_host_in_scope(): + f = ScopeFilter(target_hosts={"app.example.com"}) + assert f.is_in_scope(_flow("app.example.com", "/api/users")) + + +def test_other_host_out_of_scope(): + f = ScopeFilter(target_hosts={"app.example.com"}) + assert not f.is_in_scope(_flow("cdn.other.com", "/x")) + + +def test_static_asset_dropped(): + f = ScopeFilter(target_hosts={"app.example.com"}) + assert not f.is_in_scope(_flow("app.example.com", "/main.js", "application/javascript")) + + +def test_analytics_host_dropped_by_default(): + f = ScopeFilter(target_hosts={"app.example.com"}) + assert not f.is_in_scope(_flow("www.google-analytics.com", "/collect")) + + +def test_extra_allow_host(): + f = ScopeFilter(target_hosts={"app.example.com"}, allow_hosts={"api.example.com"}) + assert f.is_in_scope(_flow("api.example.com", "/v1/data")) + + +def test_explicit_deny_overrides(): + f = ScopeFilter(target_hosts={"app.example.com"}, deny_hosts={"app.example.com"}) + assert not f.is_in_scope(_flow("app.example.com", "/api/users")) +``` + +- [ ] **Step 2: Run to verify fail** + +Run: `uv run pytest tests/test_scope.py -q` +Expected: FAIL — `ImportError: cannot import name 'ScopeFilter'`. + +- [ ] **Step 3: Append `ScopeFilter` to `store.py`** + +Add to `src/auto_reverse/store.py`: +```python +from auto_reverse.models import CapturedFlow + +_ASSET_SUFFIXES = (".js", ".mjs", ".css", ".png", ".jpg", ".jpeg", ".gif", + ".svg", ".woff", ".woff2", ".ttf", ".ico", ".map", ".webp") +_DEFAULT_ANALYTICS = frozenset({ + "www.google-analytics.com", "google-analytics.com", "analytics.google.com", + "stats.g.doubleclick.net", "api.segment.io", "cdn.segment.com", + "browser.sentry-cdn.com", "js.stripe.com", +}) + + +class ScopeFilter: + def __init__( + self, + target_hosts: set[str], + allow_hosts: set[str] | None = None, + deny_hosts: set[str] | None = None, + ) -> None: + self.target_hosts = set(target_hosts) + self.allow_hosts = set(allow_hosts or set()) + self.deny_hosts = set(deny_hosts or set()) + + def is_in_scope(self, flow: CapturedFlow) -> bool: + host = flow.host + if host in self.deny_hosts: + return False + if host in _DEFAULT_ANALYTICS: + return False + if host not in self.target_hosts and host not in self.allow_hosts: + return False + if flow.path.split("?")[0].lower().endswith(_ASSET_SUFFIXES): + return False + ctype = flow.resp_headers.get("content-type", "").lower() + if ctype.startswith(("text/css", "image/", "font/", "application/javascript")): + return False + return True +``` + +- [ ] **Step 4: Run to verify pass** + +Run: `uv run pytest tests/test_scope.py -q` +Expected: PASS (6 tests). + +- [ ] **Step 5: Commit** + +```bash +git add src/auto_reverse/store.py tests/test_scope.py +git commit -m "feat: scope filter (host allow/deny, asset and analytics drop)" +``` + +--- + +## Task 4: Flow Store (thread-safe dedup) + +**Files:** +- Modify: `src/auto_reverse/store.py` +- Test: `tests/test_store.py` + +- [ ] **Step 1: Write failing tests** + +Append to `tests/test_store.py`: +```python +from auto_reverse.models import CapturedFlow, Signature +from auto_reverse.store import FlowStore, ScopeFilter + + +def _post(host: str, path: str, body: bytes) -> CapturedFlow: + return CapturedFlow( + method="POST", host=host, path=path, query={}, + req_headers={"content-type": "application/json"}, req_body=body, + status=201, resp_headers={"content-type": "application/json"}, + resp_body=b'{"ok": true}', timestamp=0.0, + ) + + +def test_ingest_new_signature_returns_true_once(): + store = FlowStore(ScopeFilter(target_hosts={"ex.com"})) + assert store.ingest(_post("ex.com", "/api/cart/1", b'{"q": 1}')).is_new is True + # same template, different id -> not new + assert store.ingest(_post("ex.com", "/api/cart/2", b'{"q": 2}')).is_new is False + assert len(store.endpoints()) == 1 + + +def test_out_of_scope_flow_ignored(): + store = FlowStore(ScopeFilter(target_hosts={"ex.com"})) + result = store.ingest(_post("other.com", "/x", b"{}")) + assert result.is_new is False + assert result.in_scope is False + assert store.endpoints() == [] + + +def test_new_signature_callback_fires_with_signature(): + seen: list[Signature] = [] + store = FlowStore(ScopeFilter(target_hosts={"ex.com"}), on_new_signature=seen.append) + store.ingest(_post("ex.com", "/api/cart/1", b"{}")) + store.ingest(_post("ex.com", "/api/cart/2", b"{}")) + assert len(seen) == 1 + assert seen[0].path_template == "/api/cart/{id}" + + +def test_search_filters_by_substring(): + store = FlowStore(ScopeFilter(target_hosts={"ex.com"})) + store.ingest(_post("ex.com", "/api/cart/1", b"{}")) + store.ingest(_post("ex.com", "/api/login", b"{}")) + results = store.search("cart") + assert len(results) == 1 + assert results[0].signature.path_template == "/api/cart/{id}" +``` + +- [ ] **Step 2: Run to verify fail** + +Run: `uv run pytest tests/test_store.py -q` +Expected: FAIL — `ImportError: cannot import name 'FlowStore'`. + +- [ ] **Step 3: Append `FlowStore` and `IngestResult` to `store.py`** + +Add to `src/auto_reverse/store.py`: +```python +import threading +from collections.abc import Callable +from dataclasses import dataclass +from urllib.parse import urlsplit + +from auto_reverse.models import EndpointRecord, Signature, status_class + +MAX_SAMPLES = 5 + + +@dataclass +class IngestResult: + in_scope: bool + is_new: bool + signature: Signature | None + + +class FlowStore: + """Thread-safe store: dedup by signature, retain bounded samples per endpoint.""" + + def __init__( + self, + scope: ScopeFilter, + on_new_signature: Callable[[Signature], None] | None = None, + ) -> None: + self._scope = scope + self._on_new = on_new_signature + self._lock = threading.Lock() + self._records: dict[Signature, EndpointRecord] = {} + self._samples: dict[Signature, list[CapturedFlow]] = {} + + def signature_of(self, flow: CapturedFlow) -> Signature: + return Signature( + method=flow.method.upper(), + host=flow.host, + path_template=path_template(flow.path), + status_class=status_class(flow.status), + ) + + def ingest(self, flow: CapturedFlow) -> IngestResult: + if not self._scope.is_in_scope(flow): + return IngestResult(in_scope=False, is_new=False, signature=None) + sig = self.signature_of(flow) + with self._lock: + is_new = sig not in self._records + if is_new: + self._records[sig] = EndpointRecord(signature=sig) + self._samples[sig] = [] + record = self._records[sig] + record.sample_count += 1 + record.query_params.update(self._query_keys(flow)) + samples = self._samples[sig] + if len(samples) < MAX_SAMPLES: + samples.append(flow) + if is_new and self._on_new is not None: + self._on_new(sig) + return IngestResult(in_scope=True, is_new=is_new, signature=sig) + + @staticmethod + def _query_keys(flow: CapturedFlow) -> set[str]: + return set(flow.query.keys()) + + def endpoints(self) -> list[EndpointRecord]: + with self._lock: + return list(self._records.values()) + + def samples(self, sig: Signature) -> list[CapturedFlow]: + with self._lock: + return list(self._samples.get(sig, [])) + + def get(self, sig: Signature) -> EndpointRecord | None: + with self._lock: + return self._records.get(sig) + + def search(self, query: str) -> list[EndpointRecord]: + q = query.lower() + with self._lock: + return [ + r for r in self._records.values() + if q in r.signature.path_template.lower() + or q in r.signature.method.lower() + ] +``` + +Note: `urlsplit` import is reserved for proxy use; keep it only if used — if pyright flags it unused here, remove it. (The capture addon, Task 8, splits URLs.) + +- [ ] **Step 4: Run to verify pass** + +Run: `uv run pytest tests/test_store.py -q` +Expected: PASS (9 tests total in file). + +- [ ] **Step 5: Lint check** + +Run: `uv run ruff check src/auto_reverse/store.py` +Expected: clean (remove any unused import it reports). + +- [ ] **Step 6: Commit** + +```bash +git add src/auto_reverse/store.py tests/test_store.py +git commit -m "feat: thread-safe FlowStore with signature dedup and samples" +``` + +--- + +## Task 5: Deterministic schema inference + +**Files:** +- Create: `src/auto_reverse/doc/__init__.py`, `src/auto_reverse/doc/schema.py` +- Test: `tests/test_schema.py` + +- [ ] **Step 1: Write failing tests** + +Create `tests/test_schema.py`: +```python +from auto_reverse.doc.schema import SchemaAccumulator + + +def test_single_object_schema(): + acc = SchemaAccumulator() + acc.add({"id": 1, "name": "Ada"}) + schema = acc.schema() + assert schema["type"] == "object" + assert set(schema["properties"]) == {"id", "name"} + + +def test_merge_widens_optional_fields(): + acc = SchemaAccumulator() + acc.add({"id": 1, "name": "Ada"}) + acc.add({"id": 2}) # name missing -> becomes optional + schema = acc.schema() + assert "id" in schema.get("required", []) + assert "name" not in schema.get("required", []) + + +def test_array_schema(): + acc = SchemaAccumulator() + acc.add([{"id": 1}, {"id": 2}]) + schema = acc.schema() + assert schema["type"] == "array" + assert schema["items"]["type"] == "object" + + +def test_empty_accumulator_returns_none(): + assert SchemaAccumulator().schema() is None +``` + +- [ ] **Step 2: Run to verify fail** + +Run: `uv run pytest tests/test_schema.py -q` +Expected: FAIL — `ModuleNotFoundError: No module named 'auto_reverse.doc'`. + +- [ ] **Step 3: Implement schema wrapper** + +Create `src/auto_reverse/doc/__init__.py` (empty). + +Create `src/auto_reverse/doc/schema.py`: +```python +from __future__ import annotations + +from typing import Any + +from genson import SchemaBuilder + + +class SchemaAccumulator: + """Accumulate JSON samples into a widening JSON Schema (genson-backed).""" + + def __init__(self) -> None: + self._builder = SchemaBuilder() + self._count = 0 + + def add(self, value: Any) -> None: + self._builder.add_object(value) + self._count += 1 + + def schema(self) -> dict[str, Any] | None: + if self._count == 0: + return None + result: dict[str, Any] = self._builder.to_schema() + result.pop("$schema", None) + return result +``` + +- [ ] **Step 4: Run to verify pass** + +Run: `uv run pytest tests/test_schema.py -q` +Expected: PASS (4 tests). + +- [ ] **Step 5: Commit** + +```bash +git add src/auto_reverse/doc/__init__.py src/auto_reverse/doc/schema.py tests/test_schema.py +git commit -m "feat: deterministic JSON schema inference (genson wrapper)" +``` + +--- + +## Task 6: OpenAPI assembly + +**Files:** +- Create: `src/auto_reverse/doc/openapi.py` +- Test: `tests/test_openapi.py` + +- [ ] **Step 1: Write failing tests** + +Create `tests/test_openapi.py`: +```python +from auto_reverse.doc.openapi import build_openapi +from auto_reverse.models import EndpointRecord, Signature + + +def _record(method: str, template: str, **kw) -> EndpointRecord: + rec = EndpointRecord(signature=Signature(method, "ex.com", template, "2xx")) + for k, v in kw.items(): + setattr(rec, k, v) + return rec + + +def test_builds_paths_and_methods(): + records = [ + _record("GET", "/api/users", summary="List users", + response_schema={"type": "array"}), + _record("POST", "/api/users", summary="Create user", + request_schema={"type": "object"}), + ] + spec = build_openapi(records, title="ex.com API") + assert spec["openapi"].startswith("3.") + assert spec["info"]["title"] == "ex.com API" + assert set(spec["paths"]["/api/users"]) == {"get", "post"} + assert spec["paths"]["/api/users"]["get"]["summary"] == "List users" + + +def test_path_param_declared_for_template(): + rec = _record("GET", "/api/users/{id}", summary="Get user") + spec = build_openapi([rec], title="x") + params = spec["paths"]["/api/users/{id}"]["get"]["parameters"] + assert any(p["name"] == "id" and p["in"] == "path" for p in params) + + +def test_request_body_included_when_schema_present(): + rec = _record("POST", "/api/x", request_schema={"type": "object"}) + op = build_openapi([rec], title="x")["paths"]["/api/x"]["post"] + assert op["requestBody"]["content"]["application/json"]["schema"] == {"type": "object"} +``` + +- [ ] **Step 2: Run to verify fail** + +Run: `uv run pytest tests/test_openapi.py -q` +Expected: FAIL — `ModuleNotFoundError: No module named 'auto_reverse.doc.openapi'`. + +- [ ] **Step 3: Implement `build_openapi`** + +Create `src/auto_reverse/doc/openapi.py`: +```python +from __future__ import annotations + +import re +from typing import Any + +from auto_reverse.models import EndpointRecord + +_PARAM = re.compile(r"\{([^}]+)\}") + + +def _path_params(template: str) -> list[dict[str, Any]]: + return [ + {"name": name, "in": "path", "required": True, "schema": {"type": "string"}} + for name in _PARAM.findall(template) + ] + + +def _operation(rec: EndpointRecord) -> dict[str, Any]: + op: dict[str, Any] = {} + if rec.summary: + op["summary"] = rec.summary + if rec.description: + op["description"] = rec.description + if rec.tag: + op["tags"] = [rec.tag] + params = _path_params(rec.signature.path_template) + params += [ + {"name": q, "in": "query", "required": False, "schema": {"type": "string"}} + for q in sorted(rec.query_params) + ] + if params: + op["parameters"] = params + if rec.request_schema is not None: + op["requestBody"] = { + "content": {"application/json": {"schema": rec.request_schema}} + } + status = rec.signature.status_class.replace("x", "X") + response: dict[str, Any] = {"description": rec.summary or "Response"} + if rec.response_schema is not None: + response["content"] = {"application/json": {"schema": rec.response_schema}} + op["responses"] = {status[0] + "XX": response} + return op + + +def build_openapi(records: list[EndpointRecord], title: str) -> dict[str, Any]: + paths: dict[str, dict[str, Any]] = {} + for rec in records: + template = rec.signature.path_template + method = rec.signature.method.lower() + paths.setdefault(template, {})[method] = _operation(rec) + return { + "openapi": "3.1.0", + "info": {"title": title, "version": "0.0.0"}, + "paths": paths, + } +``` + +- [ ] **Step 4: Run to verify pass** + +Run: `uv run pytest tests/test_openapi.py -q` +Expected: PASS (3 tests). + +- [ ] **Step 5: Commit** + +```bash +git add src/auto_reverse/doc/openapi.py tests/test_openapi.py +git commit -m "feat: OpenAPI assembly from endpoint records" +``` + +--- + +## Task 7: Markdown rendering + +**Files:** +- Create: `src/auto_reverse/doc/markdown.py` +- Test: `tests/test_markdown.py` + +- [ ] **Step 1: Write failing tests** + +Create `tests/test_markdown.py`: +```python +from auto_reverse.doc.markdown import render_markdown +from auto_reverse.models import EndpointRecord, Signature + + +def _rec(method, template, **kw): + rec = EndpointRecord(signature=Signature(method, "ex.com", template, "2xx")) + for k, v in kw.items(): + setattr(rec, k, v) + return rec + + +def test_renders_heading_and_endpoints(): + md = render_markdown([_rec("GET", "/api/users", summary="List users")], title="ex.com API") + assert "# ex.com API" in md + assert "`GET /api/users`" in md + assert "List users" in md + + +def test_groups_by_tag(): + records = [ + _rec("GET", "/api/users", tag="Users"), + _rec("GET", "/api/cart", tag="Cart"), + ] + md = render_markdown(records, title="x") + assert "## Users" in md + assert "## Cart" in md + + +def test_untagged_go_under_general(): + md = render_markdown([_rec("GET", "/api/x")], title="x") + assert "## General" in md +``` + +- [ ] **Step 2: Run to verify fail** + +Run: `uv run pytest tests/test_markdown.py -q` +Expected: FAIL — module not found. + +- [ ] **Step 3: Implement `render_markdown`** + +Create `src/auto_reverse/doc/markdown.py`: +```python +from __future__ import annotations + +from collections import defaultdict + +from auto_reverse.models import EndpointRecord + + +def render_markdown(records: list[EndpointRecord], title: str) -> str: + groups: dict[str, list[EndpointRecord]] = defaultdict(list) + for rec in records: + groups[rec.tag or "General"].append(rec) + + lines = [f"# {title}", ""] + for tag in sorted(groups): + lines.append(f"## {tag}") + lines.append("") + for rec in sorted(groups[tag], key=lambda r: r.signature.path_template): + sig = rec.signature + lines.append(f"### `{sig.method} {sig.path_template}`") + if rec.summary: + lines.append(f"**{rec.summary}**") + if rec.description: + lines.append("") + lines.append(rec.description) + lines.append(f"\n_Seen {rec.sample_count} time(s)._") + lines.append("") + return "\n".join(lines) +``` + +- [ ] **Step 4: Run to verify pass** + +Run: `uv run pytest tests/test_markdown.py -q` +Expected: PASS (3 tests). + +- [ ] **Step 5: Commit** + +```bash +git add src/auto_reverse/doc/markdown.py tests/test_markdown.py +git commit -m "feat: markdown API documentation rendering" +``` + +--- + +## Task 8: Config and pluggable auth stub + +**Files:** +- Create: `src/auto_reverse/config.py` +- Test: `tests/test_config.py` + +- [ ] **Step 1: Write failing tests** + +Create `tests/test_config.py`: +```python +import pytest + +from auto_reverse.config import Config, ManualPauseAuth, NoAuth, make_auth + + +def test_config_derives_target_host(): + cfg = Config(target_url="https://app.example.com/dashboard") + assert cfg.target_host == "app.example.com" + + +def test_config_scope_hosts_includes_target_plus_extra(): + cfg = Config(target_url="https://app.example.com", scope_hosts={"api.example.com"}) + assert cfg.all_scope_hosts() == {"app.example.com", "api.example.com"} + + +def test_default_model(): + assert Config(target_url="https://x.com").model == "claude-opus-4-8" + + +def test_make_auth_returns_strategy(): + assert isinstance(make_auth("manual"), ManualPauseAuth) + assert isinstance(make_auth("none"), NoAuth) + + +def test_make_auth_unknown_raises(): + with pytest.raises(ValueError): + make_auth("oauth-magic") +``` + +- [ ] **Step 2: Run to verify fail** + +Run: `uv run pytest tests/test_config.py -q` +Expected: FAIL — module not found. + +- [ ] **Step 3: Implement `config.py`** + +Create `src/auto_reverse/config.py`: +```python +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Protocol +from urllib.parse import urlsplit + + +@dataclass +class Config: + target_url: str + out_dir: str | None = None + proxy_port: int = 8080 + headless: bool = False + profile: str | None = None + gen_client: bool = False + model: str = "claude-opus-4-8" + scope_hosts: set[str] = field(default_factory=set) + no_llm_doc: bool = False + resume: str | None = None + auth: str = "manual" + + @property + def target_host(self) -> str: + return urlsplit(self.target_url).hostname or "" + + def all_scope_hosts(self) -> set[str]: + return {self.target_host, *self.scope_hosts} + + +class AuthStrategy(Protocol): + name: str + + async def authenticate(self, page: object) -> None: + """Prepare an authenticated session on the given Playwright page.""" + ... + + +class NoAuth: + name = "none" + + async def authenticate(self, page: object) -> None: + return None + + +class ManualPauseAuth: + """Default stub: pause so the human can log in by hand, then continue.""" + + name = "manual" + + async def authenticate(self, page: object) -> None: + # Implemented against the real page in browser.py wiring; the stub + # simply records intent. The REPL prompts the user to log in and + # press enter before autonomous exploration begins. + return None + + +def make_auth(name: str) -> AuthStrategy: + strategies: dict[str, AuthStrategy] = {"manual": ManualPauseAuth(), "none": NoAuth()} + if name not in strategies: + raise ValueError(f"unknown auth strategy: {name!r}") + return strategies[name] +``` + +- [ ] **Step 4: Run to verify pass** + +Run: `uv run pytest tests/test_config.py -q` +Expected: PASS (5 tests). + +- [ ] **Step 5: Commit** + +```bash +git add src/auto_reverse/config.py tests/test_config.py +git commit -m "feat: Config and pluggable auth stub (manual/none)" +``` + +--- + +## Task 9: Embedded proxy + capture addon + +**Files:** +- Create: `src/auto_reverse/proxy.py` +- Test: `tests/test_proxy.py` + +The capture addon converts a mitmproxy `HTTPFlow` into our `CapturedFlow`, feeds the store, and appends raw flows to an archive. We unit-test the pure conversion (`flow_from_mitm`) with a fake mitmproxy-shaped object so no proxy needs to run; the live proxy is exercised in the E2E smoke (Task 15). + +- [ ] **Step 1: Write failing tests** + +Create `tests/test_proxy.py`: +```python +from types import SimpleNamespace + +from auto_reverse.proxy import flow_from_mitm + + +def _fake_mitm_flow(): + request = SimpleNamespace( + method="POST", pretty_host="ex.com", path="/api/users?role=admin", + headers={"content-type": "application/json"}, content=b'{"name": "Ada"}', + query=SimpleNamespace(fields=[("role", "admin")]), + ) + response = SimpleNamespace( + status_code=201, headers={"content-type": "application/json"}, + content=b'{"id": 1}', + ) + return SimpleNamespace(request=request, response=response, timestamp_start=1.5) + + +def test_flow_from_mitm_maps_fields(): + captured = flow_from_mitm(_fake_mitm_flow()) + assert captured.method == "POST" + assert captured.host == "ex.com" + assert captured.path == "/api/users" + assert captured.query == {"role": ["admin"]} + assert captured.status == 201 + assert captured.request_json() == {"name": "Ada"} + assert captured.response_json() == {"id": 1} + + +def test_flow_from_mitm_handles_missing_response(): + flow = _fake_mitm_flow() + flow.response = None + captured = flow_from_mitm(flow) + assert captured is None +``` + +- [ ] **Step 2: Run to verify fail** + +Run: `uv run pytest tests/test_proxy.py -q` +Expected: FAIL — module not found. + +- [ ] **Step 3: Implement `proxy.py`** + +Create `src/auto_reverse/proxy.py`: +```python +from __future__ import annotations + +import asyncio +import threading +from pathlib import Path +from typing import Any +from urllib.parse import urlsplit + +from auto_reverse.models import CapturedFlow +from auto_reverse.store import FlowStore + + +def flow_from_mitm(flow: Any) -> CapturedFlow | None: + """Convert a mitmproxy HTTPFlow (or test double) into a CapturedFlow.""" + if flow.response is None: + return None + req = flow.request + query: dict[str, list[str]] = {} + for key, value in req.query.fields: + query.setdefault(key, []).append(value) + return CapturedFlow( + method=req.method, + host=req.pretty_host, + path=urlsplit(req.path).path, + query=query, + req_headers={k.lower(): v for k, v in dict(req.headers).items()}, + req_body=req.content, + status=flow.response.status_code, + resp_headers={k.lower(): v for k, v in dict(flow.response.headers).items()}, + resp_body=flow.response.content, + timestamp=getattr(flow, "timestamp_start", 0.0) or 0.0, + ) + + +class CaptureAddon: + """mitmproxy addon: on each response, ingest into the store + archive raw.""" + + def __init__(self, store: FlowStore, archive_path: Path) -> None: + self._store = store + self._archive = archive_path + self._archive.parent.mkdir(parents=True, exist_ok=True) + + def response(self, flow: Any) -> None: # mitmproxy hook name + captured = flow_from_mitm(flow) + if captured is None: + return + self._store.ingest(captured) + with self._archive.open("a") as fh: + fh.write(f"{captured.method} {captured.host}{captured.path} {captured.status}\n") + + +class ProxyServer: + """Run mitmproxy's DumpMaster in a dedicated thread with its own loop.""" + + def __init__(self, store: FlowStore, archive_path: Path, port: int) -> None: + self._store = store + self._archive_path = archive_path + self._port = port + self._master: Any = None + self._loop: asyncio.AbstractEventLoop | None = None + self._thread: threading.Thread | None = None + + @property + def port(self) -> int: + return self._port + + def start(self) -> None: + ready = threading.Event() + self._thread = threading.Thread(target=self._run, args=(ready,), daemon=True) + self._thread.start() + ready.wait(timeout=10) + + def _run(self, ready: threading.Event) -> None: + from mitmproxy.options import Options + from mitmproxy.tools.dump import DumpMaster + + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + opts = Options(listen_host="127.0.0.1", listen_port=self._port) + self._master = DumpMaster(opts, with_termlog=False, with_dumper=False) + self._master.addons.add(CaptureAddon(self._store, self._archive_path)) + + async def _serve() -> None: + ready.set() + await self._master.run() + + self._loop.run_until_complete(_serve()) + + def stop(self) -> None: + if self._master is not None and self._loop is not None: + self._loop.call_soon_threadsafe(self._master.shutdown) +``` + +- [ ] **Step 4: Run to verify pass** + +Run: `uv run pytest tests/test_proxy.py -q` +Expected: PASS (2 tests). + +- [ ] **Step 5: Commit** + +```bash +git add src/auto_reverse/proxy.py tests/test_proxy.py +git commit -m "feat: embedded mitmproxy capture addon and proxy server" +``` + +--- + +## Task 10: Doc engine (event consumer) + +**Files:** +- Create: `src/auto_reverse/doc/engine.py` +- Test: extend `tests/test_schema.py` is wrong target — create `tests/test_engine.py` + +- [ ] **Step 1: Write failing tests** + +Create `tests/test_engine.py`: +```python +from pathlib import Path + +from auto_reverse.doc.engine import DocEngine +from auto_reverse.models import CapturedFlow +from auto_reverse.store import FlowStore, ScopeFilter + + +def _flow(path: str, resp: bytes) -> CapturedFlow: + return CapturedFlow( + method="GET", host="ex.com", path=path, query={}, req_headers={}, + req_body=None, status=200, + resp_headers={"content-type": "application/json"}, resp_body=resp, + timestamp=0.0, + ) + + +def test_engine_writes_spec_and_markdown(tmp_path: Path): + store = FlowStore(ScopeFilter(target_hosts={"ex.com"})) + engine = DocEngine(store, out_dir=tmp_path, title="ex.com API", use_llm=False) + store.ingest(_flow("/api/users", b'[{"id": 1}]')) + sig = store.endpoints()[0].signature + engine.document(sig) + spec = (tmp_path / "openapi.yaml").read_text() + assert "/api/users" in spec + assert (tmp_path / "API.md").read_text().startswith("# ex.com API") + + +def test_engine_infers_response_schema(tmp_path: Path): + store = FlowStore(ScopeFilter(target_hosts={"ex.com"})) + engine = DocEngine(store, out_dir=tmp_path, title="x", use_llm=False) + store.ingest(_flow("/api/users", b'[{"id": 1, "name": "Ada"}]')) + sig = store.endpoints()[0].signature + engine.document(sig) + rec = store.get(sig) + assert rec is not None + assert rec.response_schema is not None + assert rec.response_schema["type"] == "array" +``` + +- [ ] **Step 2: Run to verify fail** + +Run: `uv run pytest tests/test_engine.py -q` +Expected: FAIL — module not found. + +- [ ] **Step 3: Implement `engine.py`** + +Create `src/auto_reverse/doc/engine.py`: +```python +from __future__ import annotations + +import threading +from pathlib import Path + +import yaml # provided transitively by mitmproxy (ruamel/pyyaml); see note + +from auto_reverse.doc.markdown import render_markdown +from auto_reverse.doc.openapi import build_openapi +from auto_reverse.doc.schema import SchemaAccumulator +from auto_reverse.models import Signature +from auto_reverse.store import FlowStore + + +class DocEngine: + """Turn a new endpoint signature into inferred schemas + written outputs.""" + + def __init__( + self, store: FlowStore, out_dir: Path, title: str, use_llm: bool = True + ) -> None: + self._store = store + self._out = out_dir + self._title = title + self._use_llm = use_llm + self._lock = threading.Lock() + self._out.mkdir(parents=True, exist_ok=True) + + def document(self, sig: Signature) -> None: + record = self._store.get(sig) + if record is None: + return + req_acc = SchemaAccumulator() + resp_acc = SchemaAccumulator() + for flow in self._store.samples(sig): + rj = flow.request_json() + if rj is not None: + req_acc.add(rj) + sj = flow.response_json() + if sj is not None: + resp_acc.add(sj) + record.request_schema = req_acc.schema() + record.response_schema = resp_acc.schema() + if not record.summary: + record.summary = f"{sig.method} {sig.path_template}" + record.documented = True + self._write() + + def _write(self) -> None: + with self._lock: + records = self._store.endpoints() + spec = build_openapi(records, title=self._title) + (self._out / "openapi.yaml").write_text(yaml.safe_dump(spec, sort_keys=False)) + (self._out / "API.md").write_text(render_markdown(records, title=self._title)) +``` + +**Note on `yaml`:** if `import yaml` fails (PyYAML not present transitively), add it explicitly: `uv add pyyaml`, then re-run. Do this in Step 4 if needed. + +- [ ] **Step 4: Run to verify pass** + +Run: `uv run pytest tests/test_engine.py -q` +Expected: PASS (2 tests). If `ModuleNotFoundError: yaml`, run `uv add pyyaml` and re-run. + +- [ ] **Step 5: Commit** + +```bash +git add src/auto_reverse/doc/engine.py tests/test_engine.py pyproject.toml uv.lock +git commit -m "feat: doc engine writes openapi.yaml and API.md from samples" +``` + +--- + +## Task 11: Browser wrapper + +**Files:** +- Create: `src/auto_reverse/browser.py` +- Test: `tests/test_browser.py` + +The browser wrapper uses Playwright's sync API (runs cleanly in a worker thread, separate from the proxy's asyncio loop). The test is guarded to skip when browsers are not installed, and routes through the live fixture site (no proxy, direct) to verify navigation + snapshot. + +- [ ] **Step 1: Write failing test** + +Create `tests/test_browser.py`: +```python +import pytest + +playwright = pytest.importorskip("playwright.sync_api") + +from auto_reverse.browser import Browser # noqa: E402 + + +@pytest.fixture +def browser(): + try: + b = Browser(proxy_port=None, headless=True) + b.start() + except Exception as exc: # browser binary missing, etc. + pytest.skip(f"browser unavailable: {exc}") + yield b + b.stop() + + +def test_navigate_and_snapshot(browser, fixture_site): + browser.navigate(fixture_site + "/") + snap = browser.snapshot() + assert snap["url"].endswith("/") + assert "title" in snap +``` + +- [ ] **Step 2: Run to verify fail** + +Run: `uv run pytest tests/test_browser.py -q` +Expected: FAIL — module not found (or SKIP if Playwright import missing). A skip here is acceptable; the implementation step still applies. + +- [ ] **Step 3: Implement `browser.py`** + +Create `src/auto_reverse/browser.py`: +```python +from __future__ import annotations + +from typing import Any + + +class Browser: + """Headed (or headless) Playwright Chromium, optionally routed via proxy.""" + + def __init__(self, proxy_port: int | None, headless: bool = False) -> None: + self._proxy_port = proxy_port + self._headless = headless + self._pw: Any = None + self._browser: Any = None + self._page: Any = None + + def start(self) -> None: + from playwright.sync_api import sync_playwright + + self._pw = sync_playwright().start() + launch_kwargs: dict[str, Any] = { + "headless": self._headless, + "args": ["--ignore-certificate-errors"], + } + if self._proxy_port is not None: + launch_kwargs["proxy"] = {"server": f"http://127.0.0.1:{self._proxy_port}"} + self._browser = self._pw.chromium.launch(**launch_kwargs) + context = self._browser.new_context(ignore_https_errors=True) + self._page = context.new_page() + + def navigate(self, url: str) -> dict[str, Any]: + self._page.goto(url, wait_until="networkidle") + return self.snapshot() + + def click(self, selector: str) -> dict[str, Any]: + self._page.click(selector, timeout=5000) + self._page.wait_for_load_state("networkidle") + return self.snapshot() + + def type_text(self, selector: str, text: str) -> dict[str, Any]: + self._page.fill(selector, text) + return self.snapshot() + + def snapshot(self) -> dict[str, Any]: + """Compact view for the agent: url, title, and visible interactive elements.""" + elements = self._page.eval_on_selector_all( + "a, button, input, [role=button], [role=link]", + """els => els.slice(0, 40).map(e => ({ + tag: e.tagName.toLowerCase(), + text: (e.innerText || e.value || e.getAttribute('aria-label') || '').slice(0, 60), + id: e.id || null, + }))""", + ) + return { + "url": self._page.url, + "title": self._page.title(), + "elements": elements, + } + + def pause_for_human(self) -> None: + """Surface the headed browser for manual control (Playwright inspector).""" + self._page.pause() + + def stop(self) -> None: + if self._browser is not None: + self._browser.close() + if self._pw is not None: + self._pw.stop() +``` + +- [ ] **Step 4: Run to verify pass (or skip)** + +Run: `uv run pytest tests/test_browser.py -q` +Expected: PASS if Chromium is installed; SKIP otherwise. Both are acceptable to proceed. + +- [ ] **Step 5: Commit** + +```bash +git add src/auto_reverse/browser.py tests/test_browser.py +git commit -m "feat: Playwright browser wrapper with compact snapshot" +``` + +--- + +## Task 12: Tool definitions and handlers + +**Files:** +- Create: `src/auto_reverse/tools/__init__.py`, `src/auto_reverse/tools/browser_tools.py`, `src/auto_reverse/tools/flows_tools.py`, `src/auto_reverse/tools/doc_tools.py` +- Test: `tests/test_tools.py` + +Tools are plain callables plus an Anthropic tool schema. Each handler takes a JSON-able `input` dict and returns a JSON-able result. The registry maps tool name → (schema, handler). Browser handlers use a fake browser in tests. + +- [ ] **Step 1: Write failing tests** + +Create `tests/test_tools.py`: +```python +from auto_reverse.doc.engine import DocEngine +from auto_reverse.models import CapturedFlow +from auto_reverse.store import FlowStore, ScopeFilter +from auto_reverse.tools import build_registry + + +class FakeBrowser: + def navigate(self, url): + return {"url": url, "title": "T", "elements": []} + + def click(self, selector): + return {"url": "u", "title": "T", "elements": []} + + def type_text(self, selector, text): + return {"url": "u", "title": "T", "elements": []} + + def snapshot(self): + return {"url": "u", "title": "T", "elements": []} + + +def _store_with_endpoint(tmp_path): + store = FlowStore(ScopeFilter(target_hosts={"ex.com"})) + store.ingest(CapturedFlow( + method="GET", host="ex.com", path="/api/users", query={}, req_headers={}, + req_body=None, status=200, + resp_headers={"content-type": "application/json"}, resp_body=b"[]", + timestamp=0.0, + )) + engine = DocEngine(store, out_dir=tmp_path, title="x", use_llm=False) + return store, engine + + +def test_registry_has_expected_tools(tmp_path): + store, engine = _store_with_endpoint(tmp_path) + reg = build_registry(FakeBrowser(), store, engine) + names = {schema["name"] for schema, _ in reg.values()} + assert {"browser_navigate", "browser_click", "flows_search", "doc_document"} <= names + + +def test_flows_search_handler_returns_matches(tmp_path): + store, engine = _store_with_endpoint(tmp_path) + reg = build_registry(FakeBrowser(), store, engine) + _, handler = reg["flows_search"] + result = handler({"query": "users"}) + assert any("/api/users" in ep["path"] for ep in result["endpoints"]) + + +def test_browser_navigate_handler(tmp_path): + store, engine = _store_with_endpoint(tmp_path) + reg = build_registry(FakeBrowser(), store, engine) + _, handler = reg["browser_navigate"] + result = handler({"url": "http://x"}) + assert result["url"] == "http://x" +``` + +- [ ] **Step 2: Run to verify fail** + +Run: `uv run pytest tests/test_tools.py -q` +Expected: FAIL — module not found. + +- [ ] **Step 3: Implement tool modules** + +Create `src/auto_reverse/tools/browser_tools.py`: +```python +from __future__ import annotations + +from collections.abc import Callable +from typing import Any + +Handler = Callable[[dict[str, Any]], dict[str, Any]] + + +def browser_tools(browser: Any) -> dict[str, tuple[dict[str, Any], Handler]]: + return { + "browser_navigate": ( + { + "name": "browser_navigate", + "description": "Navigate the browser to a URL and return a page snapshot.", + "input_schema": { + "type": "object", + "properties": {"url": {"type": "string"}}, + "required": ["url"], + }, + }, + lambda inp: browser.navigate(inp["url"]), + ), + "browser_click": ( + { + "name": "browser_click", + "description": "Click an element by CSS selector; returns a new snapshot.", + "input_schema": { + "type": "object", + "properties": {"selector": {"type": "string"}}, + "required": ["selector"], + }, + }, + lambda inp: browser.click(inp["selector"]), + ), + "browser_type": ( + { + "name": "browser_type", + "description": "Fill a form field (CSS selector) with text.", + "input_schema": { + "type": "object", + "properties": { + "selector": {"type": "string"}, + "text": {"type": "string"}, + }, + "required": ["selector", "text"], + }, + }, + lambda inp: browser.type_text(inp["selector"], inp["text"]), + ), + "browser_snapshot": ( + { + "name": "browser_snapshot", + "description": "Return the current page snapshot without acting.", + "input_schema": {"type": "object", "properties": {}}, + }, + lambda inp: browser.snapshot(), + ), + } +``` + +Create `src/auto_reverse/tools/flows_tools.py`: +```python +from __future__ import annotations + +from collections.abc import Callable +from typing import Any + +from auto_reverse.models import EndpointRecord +from auto_reverse.store import FlowStore + +Handler = Callable[[dict[str, Any]], dict[str, Any]] + + +def _record_view(rec: EndpointRecord) -> dict[str, Any]: + return { + "method": rec.signature.method, + "path": rec.signature.path_template, + "status": rec.signature.status_class, + "samples": rec.sample_count, + "documented": rec.documented, + } + + +def flows_tools(store: FlowStore) -> dict[str, tuple[dict[str, Any], Handler]]: + def search(inp: dict[str, Any]) -> dict[str, Any]: + query = inp.get("query", "") + records = store.search(query) if query else store.endpoints() + return {"endpoints": [_record_view(r) for r in records]} + + return { + "flows_search": ( + { + "name": "flows_search", + "description": "List/search discovered API endpoints captured so far.", + "input_schema": { + "type": "object", + "properties": {"query": {"type": "string"}}, + }, + }, + search, + ), + } +``` + +Create `src/auto_reverse/tools/doc_tools.py`: +```python +from __future__ import annotations + +from collections.abc import Callable +from typing import Any + +from auto_reverse.doc.engine import DocEngine +from auto_reverse.store import FlowStore + +Handler = Callable[[dict[str, Any]], dict[str, Any]] + + +def doc_tools(store: FlowStore, engine: DocEngine) -> dict[str, tuple[dict[str, Any], Handler]]: + def document(inp: dict[str, Any]) -> dict[str, Any]: + path = inp["path_template"] + for rec in store.endpoints(): + if rec.signature.path_template == path: + if inp.get("summary"): + rec.summary = inp["summary"] + if inp.get("description"): + rec.description = inp["description"] + if inp.get("tag"): + rec.tag = inp["tag"] + engine.document(rec.signature) + return {"documented": path} + return {"error": f"no endpoint matching {path}"} + + return { + "doc_document": ( + { + "name": "doc_document", + "description": ( + "Enrich and (re)write docs for an endpoint by path template, " + "optionally setting a human summary/description/tag." + ), + "input_schema": { + "type": "object", + "properties": { + "path_template": {"type": "string"}, + "summary": {"type": "string"}, + "description": {"type": "string"}, + "tag": {"type": "string"}, + }, + "required": ["path_template"], + }, + }, + document, + ), + } +``` + +Create `src/auto_reverse/tools/__init__.py`: +```python +from __future__ import annotations + +from collections.abc import Callable +from typing import Any + +from auto_reverse.doc.engine import DocEngine +from auto_reverse.store import FlowStore +from auto_reverse.tools.browser_tools import browser_tools +from auto_reverse.tools.doc_tools import doc_tools +from auto_reverse.tools.flows_tools import flows_tools + +Handler = Callable[[dict[str, Any]], dict[str, Any]] +Registry = dict[str, tuple[dict[str, Any], Handler]] + + +def build_registry(browser: Any, store: FlowStore, engine: DocEngine) -> Registry: + registry: Registry = {} + registry.update(browser_tools(browser)) + registry.update(flows_tools(store)) + registry.update(doc_tools(store, engine)) + return registry + + +def tool_schemas(registry: Registry) -> list[dict[str, Any]]: + return [schema for schema, _ in registry.values()] +``` + +- [ ] **Step 4: Run to verify pass** + +Run: `uv run pytest tests/test_tools.py -q` +Expected: PASS (3 tests). + +- [ ] **Step 5: Commit** + +```bash +git add src/auto_reverse/tools/ tests/test_tools.py +git commit -m "feat: agent tool definitions (browser, flows, doc) and registry" +``` + +--- + +## Task 13: Agent (Claude tool-use loop) + +**Files:** +- Create: `src/auto_reverse/agent.py` +- Test: `tests/test_agent.py` + +The agent runs the tool-use loop: send messages + tools to Claude, execute any `tool_use` blocks via the registry, feed `tool_result`s back, repeat until the model returns text with no tool calls. Tested with a fake client returning scripted responses. + +- [ ] **Step 1: Write failing tests** + +Create `tests/test_agent.py`: +```python +from types import SimpleNamespace + +from auto_reverse.agent import Agent + + +def _text_block(text): + return SimpleNamespace(type="text", text=text) + + +def _tool_use(tool_id, name, inp): + return SimpleNamespace(type="tool_use", id=tool_id, name=name, input=inp) + + +class FakeMessages: + def __init__(self, scripted): + self._scripted = list(scripted) + self.calls = [] + + def create(self, **kwargs): + self.calls.append(kwargs) + content = self._scripted.pop(0) + stop = "tool_use" if any(b.type == "tool_use" for b in content) else "end_turn" + return SimpleNamespace(content=content, stop_reason=stop, role="assistant") + + +class FakeClient: + def __init__(self, scripted): + self.messages = FakeMessages(scripted) + + +def test_agent_executes_tool_then_returns_text(): + scripted = [ + [_tool_use("t1", "flows_search", {"query": "users"})], + [_text_block("Found the users endpoint.")], + ] + client = FakeClient(scripted) + registry = { + "flows_search": ( + {"name": "flows_search", "input_schema": {"type": "object"}}, + lambda inp: {"endpoints": [{"path": "/api/users"}]}, + ) + } + agent = Agent(client, registry, model="m", system="s") + reply = agent.run_turn("map users") + assert "users endpoint" in reply + # the tool result was fed back: second create call has >= 3 messages + assert len(client.messages.calls[1]["messages"]) >= 3 + + +def test_agent_plain_text_no_tools(): + client = FakeClient([[_text_block("Hello!")]]) + agent = Agent(client, {}, model="m", system="s") + assert agent.run_turn("hi") == "Hello!" +``` + +- [ ] **Step 2: Run to verify fail** + +Run: `uv run pytest tests/test_agent.py -q` +Expected: FAIL — module not found. + +- [ ] **Step 3: Implement `agent.py`** + +Create `src/auto_reverse/agent.py`: +```python +from __future__ import annotations + +from typing import Any + +from auto_reverse.tools import Registry, tool_schemas + +MAX_ITERATIONS = 25 + + +class Agent: + """Conversational Claude tool-use loop driving browser/flows/doc tools.""" + + def __init__(self, client: Any, registry: Registry, model: str, system: str) -> None: + self._client = client + self._registry = registry + self._model = model + self._system = system + self._messages: list[dict[str, Any]] = [] + + def run_turn(self, user_message: str) -> str: + self._messages.append({"role": "user", "content": user_message}) + for _ in range(MAX_ITERATIONS): + response = self._client.messages.create( + model=self._model, + max_tokens=4096, + system=self._system, + tools=tool_schemas(self._registry), + messages=self._messages, + ) + self._messages.append( + {"role": "assistant", "content": self._serialize(response.content)} + ) + tool_uses = [b for b in response.content if b.type == "tool_use"] + if not tool_uses: + return self._text_of(response.content) + results = [] + for block in tool_uses: + results.append(self._run_tool(block)) + self._messages.append({"role": "user", "content": results}) + return "(stopped: reached max tool iterations)" + + def _run_tool(self, block: Any) -> dict[str, Any]: + entry = self._registry.get(block.name) + if entry is None: + output: Any = {"error": f"unknown tool {block.name}"} + else: + _, handler = entry + try: + output = handler(block.input) + except Exception as exc: # tool failure -> structured error, agent re-plans + output = {"error": str(exc)} + return { + "type": "tool_result", + "tool_use_id": block.id, + "content": __import__("json").dumps(output), + } + + @staticmethod + def _serialize(content: list[Any]) -> list[dict[str, Any]]: + out: list[dict[str, Any]] = [] + for b in content: + if b.type == "text": + out.append({"type": "text", "text": b.text}) + elif b.type == "tool_use": + out.append( + {"type": "tool_use", "id": b.id, "name": b.name, "input": b.input} + ) + return out + + @staticmethod + def _text_of(content: list[Any]) -> str: + return "".join(b.text for b in content if b.type == "text").strip() +``` + +Note: replace `__import__("json")` with a top-level `import json` and `json.dumps(output)` — written inline here only to keep the snippet self-contained. Add `import json` to the imports and use `json.dumps(output)`. + +- [ ] **Step 4: Apply the json import cleanup** + +Edit `src/auto_reverse/agent.py`: add `import json` under `from __future__`, and change the `content` line to `"content": json.dumps(output),`. + +- [ ] **Step 5: Run to verify pass** + +Run: `uv run pytest tests/test_agent.py -q` +Expected: PASS (2 tests). + +- [ ] **Step 6: Commit** + +```bash +git add src/auto_reverse/agent.py tests/test_agent.py +git commit -m "feat: Claude tool-use agent loop with graceful tool-error handling" +``` + +--- + +## Task 14: Optional client generation + +**Files:** +- Create: `src/auto_reverse/doc/client.py` +- Test: covered by manual verification + a unit test on the command builder + +- [ ] **Step 1: Write failing test** + +Create `tests/test_client.py`: +```python +from pathlib import Path + +from auto_reverse.doc.client import client_gen_command + + +def test_command_references_spec_and_out(tmp_path: Path): + spec = tmp_path / "openapi.yaml" + out = tmp_path / "client" + cmd = client_gen_command(spec, out) + assert str(spec) in cmd + assert "openapi-python-client" in " ".join(cmd) +``` + +- [ ] **Step 2: Run to verify fail** + +Run: `uv run pytest tests/test_client.py -q` +Expected: FAIL — module not found. + +- [ ] **Step 3: Implement `client.py`** + +Create `src/auto_reverse/doc/client.py`: +```python +from __future__ import annotations + +import subprocess +from pathlib import Path + + +def client_gen_command(spec_path: Path, out_dir: Path) -> list[str]: + return [ + "uvx", + "openapi-python-client", + "generate", + "--path", + str(spec_path), + "--output-path", + str(out_dir), + ] + + +def generate_client(spec_path: Path, out_dir: Path) -> bool: + """Run the deterministic codegen; returns True on success.""" + try: + subprocess.run(client_gen_command(spec_path, out_dir), check=True) + return True + except (subprocess.CalledProcessError, FileNotFoundError): + return False +``` + +- [ ] **Step 4: Run to verify pass** + +Run: `uv run pytest tests/test_client.py -q` +Expected: PASS (1 test). + +- [ ] **Step 5: Commit** + +```bash +git add src/auto_reverse/doc/client.py tests/test_client.py +git commit -m "feat: optional typed client generation from openapi spec" +``` + +--- + +## Task 15: REPL and CLI wiring + +**Files:** +- Create: `src/auto_reverse/repl.py`, `src/auto_reverse/cli.py` +- Modify: `src/auto_reverse/__init__.py` +- Test: `tests/test_e2e_smoke.py` + +The REPL handles `/meta-commands` locally and routes plain text to the agent. The CLI parses args, builds the object graph, wires the `on_new_signature` callback to the doc engine on a worker thread, starts the proxy, launches the browser, and runs the REPL. End-to-end smoke drives the fixture site through the proxy and asserts an endpoint lands in the spec. + +- [ ] **Step 1: Write failing E2E smoke test** + +Create `tests/test_e2e_smoke.py`: +```python +import threading +from pathlib import Path + +import pytest + +playwright = pytest.importorskip("playwright.sync_api") + +from auto_reverse.browser import Browser # noqa: E402 +from auto_reverse.doc.engine import DocEngine # noqa: E402 +from auto_reverse.proxy import ProxyServer # noqa: E402 +from auto_reverse.store import FlowStore, ScopeFilter # noqa: E402 + + +def test_capture_to_spec_end_to_end(tmp_path: Path, fixture_site: str): + from urllib.parse import urlsplit + + host = urlsplit(fixture_site).hostname + port = urlsplit(fixture_site).port + scope = ScopeFilter(target_hosts={f"{host}:{port}", host}) + + engine_holder: dict = {} + + def on_new(sig): + engine_holder["engine"].document(sig) + + store = FlowStore(scope, on_new_signature=on_new) + engine = DocEngine(store, out_dir=tmp_path, title="fixture", use_llm=False) + engine_holder["engine"] = engine + + proxy = ProxyServer(store, archive_path=tmp_path / "archive.log", port=0) + try: + proxy.start() + except Exception as exc: + pytest.skip(f"proxy unavailable: {exc}") + + try: + browser = Browser(proxy_port=proxy.port, headless=True) + browser.start() + except Exception as exc: + proxy.stop() + pytest.skip(f"browser unavailable: {exc}") + + try: + browser.navigate(fixture_site + "/") # triggers fetch('/api/users') + # allow capture to settle + threading.Event().wait(1.0) + assert any( + "/api/users" in r.signature.path_template for r in store.endpoints() + ) + finally: + browser.stop() + proxy.stop() +``` + +Note: `ProxyServer(port=0)` needs to expose the OS-assigned port. If mitmproxy does not support port 0 ephemeral selection, pick a fixed high port (e.g. 18080) in this test and pass it to both proxy and browser. + +- [ ] **Step 2: Run to verify fail or skip** + +Run: `uv run pytest tests/test_e2e_smoke.py -q` +Expected: FAIL (imports resolve but modules incomplete) or SKIP (no browser/proxy). Acceptable to proceed; this test is the integration safety net. + +- [ ] **Step 3: Implement `repl.py`** + +Create `src/auto_reverse/repl.py`: +```python +from __future__ import annotations + +from auto_reverse.agent import Agent +from auto_reverse.store import FlowStore + +HELP = """\ +Commands: +