Files
auto-reverse/docs/superpowers/plans/2026-05-31-auto-reverse.md
T
tomatocream a484ea4f8e docs: add auto-reverse implementation plan
16 bite-sized TDD tasks, bottom-up: core models/store/schema/doc,
then proxy/browser integration, then tools/agent/REPL/CLI, plus
end-to-end capture-to-spec smoke test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-31 23:26:42 +08:00

76 KiB

auto-reverse Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Build auto-reverse, a conversational CLI that reverse-engineers a website's API by driving a headed browser with an LLM while an embedded mitmproxy captures and documents real traffic into a live OpenAPI spec + markdown.

Architecture: Single free-threaded Python 3.14 process. A main-thread Claude tool-use agent ("the brain") acts on a Playwright headed browser and queries/commands a thread-safe Flow Store; an embedded mitmproxy DumpMaster runs in its own thread capturing every flow; a Doc Worker thread turns new endpoint signatures into OpenAPI/markdown (deterministic schema inference + LLM enrichment only on novelty). Build bottom-up: pure core (store/schema/doc) first, then I/O integration (proxy/browser), then tools/agent/REPL/CLI.

Tech Stack: Python 3.14 (free-threaded), uv, Playwright, mitmproxy, anthropic SDK, genson (schema inference), pytest + pytest-asyncio. Default model claude-opus-4-8.

Spec: docs/superpowers/specs/2026-05-31-auto-reverse-design.md


File Structure

src/auto_reverse/
  __init__.py        # main() entrypoint (delegates to cli.run)
  cli.py             # arg parsing, wiring, thread lifecycle
  config.py          # Config dataclass + pluggable auth stub
  models.py          # CapturedFlow, Signature, EndpointRecord, helpers
  store.py           # FlowStore (thread-safe), ScopeFilter, path templating
  proxy.py           # embedded mitmproxy master + CaptureAddon + archive
  browser.py         # Playwright headed browser wrapper + take-over
  agent.py           # Claude tool-use loop
  repl.py            # chat loop + /meta-commands + take-over
  tools/
    __init__.py      # tool registry assembly
    browser_tools.py # browser.* tool schemas + handlers
    flows_tools.py   # flows.* tool schemas + handlers
    doc_tools.py     # doc.* tool schemas + handlers
  doc/
    __init__.py
    schema.py        # deterministic JSON Schema inference + merge (genson wrap)
    engine.py        # DocEngine: consumes new-signature events, writes outputs
    openapi.py       # OpenAPI assembly from EndpointRecords
    markdown.py      # human-readable API.md rendering
    client.py        # optional --gen-client codegen

tests/
  conftest.py        # fixtures: fixture HTTP site, sample flows
  fixture_site.py    # stdlib http.server JSON app for integration tests
  test_models.py
  test_store.py
  test_scope.py
  test_schema.py
  test_openapi.py
  test_markdown.py
  test_config.py
  test_proxy.py
  test_browser.py        # requires playwright browsers; skipped if absent
  test_agent.py          # mocked anthropic client
  test_tools.py
  test_e2e_smoke.py      # fixture site end-to-end; skipped if browsers absent

Task 0: Dependencies and test scaffolding

Files:

  • Modify: pyproject.toml

  • Create: tests/__init__.py, tests/conftest.py, tests/fixture_site.py

  • Step 1: Add runtime + dev dependencies

Run:

cd /home/df/projects/reverse_engineer
uv add playwright mitmproxy anthropic genson
uv add --dev pytest pytest-asyncio

Expected: pyproject.toml gains a populated dependencies list and a [dependency-groups]/dev group; uv.lock updates. If any package lacks a free-threaded 3.14 wheel and the resolve fails, re-run with the GIL interpreter selected (uv add --python 3.14 ...) and record the fallback in README.md per the spec's free-threading caveat.

  • Step 2: Install Playwright's Chromium

Run:

uv run playwright install chromium

Expected: downloads the Chromium build. If it fails in this environment, that only affects browser/E2E tests (which are guarded to skip); core tasks proceed regardless.

  • Step 3: Add pytest config to pyproject.toml

Append:

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
  • Step 4: Create the stdlib fixture site

Create tests/fixture_site.py:

"""A tiny dependency-free JSON site for integration tests, served over HTTP."""

from __future__ import annotations

import json
import threading
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer


class _Handler(BaseHTTPRequestHandler):
    def log_message(self, *args: object) -> None:  # silence test output
        pass

    def _send_json(self, status: int, payload: object) -> None:
        body = json.dumps(payload).encode()
        self.send_response(status)
        self.send_header("Content-Type", "application/json")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def do_GET(self) -> None:
        if self.path == "/":
            html = b"<html><body><script>fetch('/api/users')</script></body></html>"
            self.send_response(200)
            self.send_header("Content-Type", "text/html")
            self.send_header("Content-Length", str(len(html)))
            self.end_headers()
            self.wfile.write(html)
        elif self.path == "/api/users":
            self._send_json(200, [{"id": 1, "name": "Ada"}])
        elif self.path.startswith("/api/users/"):
            self._send_json(200, {"id": int(self.path.rsplit("/", 1)[1]), "name": "Ada"})
        else:
            self._send_json(404, {"error": "not found"})

    def do_POST(self) -> None:
        length = int(self.headers.get("Content-Length", "0"))
        raw = self.rfile.read(length) if length else b"{}"
        self._send_json(201, {"received": json.loads(raw or b"{}")})


def start_fixture_site() -> tuple[ThreadingHTTPServer, str]:
    """Start the site on an ephemeral port; return (server, base_url)."""
    server = ThreadingHTTPServer(("127.0.0.1", 0), _Handler)
    thread = threading.Thread(target=server.serve_forever, daemon=True)
    thread.start()
    host, port = server.server_address
    return server, f"http://{host}:{port}"
  • Step 5: Create test package marker and conftest fixtures

Create tests/__init__.py (empty).

Create tests/conftest.py:

from __future__ import annotations

from collections.abc import Iterator

import pytest

from tests.fixture_site import start_fixture_site


@pytest.fixture
def fixture_site() -> Iterator[str]:
    server, base_url = start_fixture_site()
    try:
        yield base_url
    finally:
        server.shutdown()
  • Step 6: Verify the toolchain runs

Run:

uv run pytest -q

Expected: pytest collects 0 tests and exits 0 (no tests yet). Confirms config is valid.

  • Step 7: Commit
git add pyproject.toml uv.lock tests/
git commit -m "build: add deps and test scaffolding for auto-reverse"

Task 1: Core data models

Files:

  • Create: src/auto_reverse/models.py

  • Test: tests/test_models.py

  • Step 1: Write failing tests

Create tests/test_models.py:

from auto_reverse.models import CapturedFlow, Signature, status_class


def test_status_class_buckets():
    assert status_class(200) == "2xx"
    assert status_class(201) == "2xx"
    assert status_class(404) == "4xx"
    assert status_class(503) == "5xx"


def test_signature_is_hashable_and_equal():
    a = Signature("GET", "ex.com", "/api/users/{id}", "2xx")
    b = Signature("GET", "ex.com", "/api/users/{id}", "2xx")
    assert a == b
    assert {a, b} == {a}


def test_captured_flow_json_body_parsing():
    flow = CapturedFlow(
        method="POST", host="ex.com", path="/api/x", query={},
        req_headers={"content-type": "application/json"}, req_body=b'{"a": 1}',
        status=201, resp_headers={"content-type": "application/json"},
        resp_body=b'{"ok": true}', timestamp=0.0,
    )
    assert flow.request_json() == {"a": 1}
    assert flow.response_json() == {"ok": True}


def test_captured_flow_non_json_body_returns_none():
    flow = CapturedFlow(
        method="GET", host="ex.com", path="/x", query={},
        req_headers={}, req_body=None, status=200,
        resp_headers={"content-type": "text/html"}, resp_body=b"<html>",
        timestamp=0.0,
    )
    assert flow.response_json() is None
  • Step 2: Run tests to verify they fail

Run: uv run pytest tests/test_models.py -q Expected: FAIL — ModuleNotFoundError: No module named 'auto_reverse.models'.

  • Step 3: Implement models.py

Create src/auto_reverse/models.py:

from __future__ import annotations

import json
from dataclasses import dataclass, field
from typing import Any


def status_class(status: int) -> str:
    return f"{status // 100}xx"


@dataclass(frozen=True)
class Signature:
    method: str
    host: str
    path_template: str
    status_class: str


@dataclass
class CapturedFlow:
    method: str
    host: str
    path: str
    query: dict[str, list[str]]
    req_headers: dict[str, str]
    req_body: bytes | None
    status: int
    resp_headers: dict[str, str]
    resp_body: bytes | None
    timestamp: float

    def _json(self, body: bytes | None, headers: dict[str, str]) -> Any | None:
        if body is None:
            return None
        ctype = headers.get("content-type", "").lower()
        if "json" not in ctype:
            return None
        try:
            return json.loads(body)
        except (ValueError, UnicodeDecodeError):
            return None

    def request_json(self) -> Any | None:
        return self._json(self.req_body, self.req_headers)

    def response_json(self) -> Any | None:
        return self._json(self.resp_body, self.resp_headers)


@dataclass
class EndpointRecord:
    signature: Signature
    sample_count: int = 0
    query_params: set[str] = field(default_factory=set)
    request_schema: dict[str, Any] | None = None
    response_schema: dict[str, Any] | None = None
    # LLM-enriched fields (filled by the doc engine):
    summary: str = ""
    description: str = ""
    tag: str = ""
    documented: bool = False
  • Step 4: Run tests to verify they pass

Run: uv run pytest tests/test_models.py -q Expected: PASS (4 tests).

  • Step 5: Commit
git add src/auto_reverse/models.py tests/test_models.py
git commit -m "feat: core data models (Signature, CapturedFlow, EndpointRecord)"

Task 2: Path templating

Files:

  • Modify: src/auto_reverse/store.py (create)

  • Test: tests/test_store.py

  • Step 1: Write failing tests

Create tests/test_store.py:

from auto_reverse.store import path_template


def test_collapses_numeric_ids():
    assert path_template("/api/users/4812/orders/99") == "/api/users/{id}/orders/{id}"


def test_collapses_uuid():
    p = "/api/items/550e8400-e29b-41d4-a716-446655440000"
    assert path_template(p) == "/api/items/{id}"


def test_collapses_long_hex_token():
    assert path_template("/files/a1b2c3d4e5f60718293a4b5c") == "/files/{id}"


def test_keeps_short_words():
    assert path_template("/api/users/me/settings") == "/api/users/me/settings"


def test_root_and_empty():
    assert path_template("/") == "/"
    assert path_template("") == "/"
  • Step 2: Run to verify fail

Run: uv run pytest tests/test_store.py -q Expected: FAIL — ModuleNotFoundError: No module named 'auto_reverse.store'.

  • Step 3: Implement path_template in store.py

Create src/auto_reverse/store.py:

from __future__ import annotations

import re

_UUID = re.compile(r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$")
_HEX_TOKEN = re.compile(r"^[0-9a-fA-F]{16,}$")
_LONG_OPAQUE = re.compile(r"^[A-Za-z0-9_\-]{20,}$")


def _is_variable(segment: str) -> bool:
    if segment.isdigit():
        return True
    if _UUID.match(segment):
        return True
    if _HEX_TOKEN.match(segment):
        return True
    if _LONG_OPAQUE.match(segment) and any(c.isdigit() for c in segment):
        return True
    return False


def path_template(path: str) -> str:
    """Collapse variable path segments (ids, UUIDs, hashes, opaque tokens) to {id}."""
    if not path or path == "/":
        return "/"
    parts = path.split("/")
    out = ["{id}" if part and _is_variable(part) else part for part in parts]
    return "/".join(out)
  • Step 4: Run to verify pass

Run: uv run pytest tests/test_store.py -q Expected: PASS (5 tests).

  • Step 5: Commit
git add src/auto_reverse/store.py tests/test_store.py
git commit -m "feat: path templating for endpoint signatures"

Task 3: Scope filter

Files:

  • Modify: src/auto_reverse/store.py

  • Test: tests/test_scope.py

  • Step 1: Write failing tests

Create tests/test_scope.py:

from auto_reverse.models import CapturedFlow
from auto_reverse.store import ScopeFilter


def _flow(host: str, path: str, ctype: str = "application/json") -> CapturedFlow:
    return CapturedFlow(
        method="GET", host=host, path=path, query={}, req_headers={},
        req_body=None, status=200, resp_headers={"content-type": ctype},
        resp_body=b"{}", timestamp=0.0,
    )


def test_target_host_in_scope():
    f = ScopeFilter(target_hosts={"app.example.com"})
    assert f.is_in_scope(_flow("app.example.com", "/api/users"))


def test_other_host_out_of_scope():
    f = ScopeFilter(target_hosts={"app.example.com"})
    assert not f.is_in_scope(_flow("cdn.other.com", "/x"))


def test_static_asset_dropped():
    f = ScopeFilter(target_hosts={"app.example.com"})
    assert not f.is_in_scope(_flow("app.example.com", "/main.js", "application/javascript"))


def test_analytics_host_dropped_by_default():
    f = ScopeFilter(target_hosts={"app.example.com"})
    assert not f.is_in_scope(_flow("www.google-analytics.com", "/collect"))


def test_extra_allow_host():
    f = ScopeFilter(target_hosts={"app.example.com"}, allow_hosts={"api.example.com"})
    assert f.is_in_scope(_flow("api.example.com", "/v1/data"))


def test_explicit_deny_overrides():
    f = ScopeFilter(target_hosts={"app.example.com"}, deny_hosts={"app.example.com"})
    assert not f.is_in_scope(_flow("app.example.com", "/api/users"))
  • Step 2: Run to verify fail

Run: uv run pytest tests/test_scope.py -q Expected: FAIL — ImportError: cannot import name 'ScopeFilter'.

  • Step 3: Append ScopeFilter to store.py

Add to src/auto_reverse/store.py:

from auto_reverse.models import CapturedFlow

_ASSET_SUFFIXES = (".js", ".mjs", ".css", ".png", ".jpg", ".jpeg", ".gif",
                   ".svg", ".woff", ".woff2", ".ttf", ".ico", ".map", ".webp")
_DEFAULT_ANALYTICS = frozenset({
    "www.google-analytics.com", "google-analytics.com", "analytics.google.com",
    "stats.g.doubleclick.net", "api.segment.io", "cdn.segment.com",
    "browser.sentry-cdn.com", "js.stripe.com",
})


class ScopeFilter:
    def __init__(
        self,
        target_hosts: set[str],
        allow_hosts: set[str] | None = None,
        deny_hosts: set[str] | None = None,
    ) -> None:
        self.target_hosts = set(target_hosts)
        self.allow_hosts = set(allow_hosts or set())
        self.deny_hosts = set(deny_hosts or set())

    def is_in_scope(self, flow: CapturedFlow) -> bool:
        host = flow.host
        if host in self.deny_hosts:
            return False
        if host in _DEFAULT_ANALYTICS:
            return False
        if host not in self.target_hosts and host not in self.allow_hosts:
            return False
        if flow.path.split("?")[0].lower().endswith(_ASSET_SUFFIXES):
            return False
        ctype = flow.resp_headers.get("content-type", "").lower()
        if ctype.startswith(("text/css", "image/", "font/", "application/javascript")):
            return False
        return True
  • Step 4: Run to verify pass

Run: uv run pytest tests/test_scope.py -q Expected: PASS (6 tests).

  • Step 5: Commit
git add src/auto_reverse/store.py tests/test_scope.py
git commit -m "feat: scope filter (host allow/deny, asset and analytics drop)"

Task 4: Flow Store (thread-safe dedup)

Files:

  • Modify: src/auto_reverse/store.py

  • Test: tests/test_store.py

  • Step 1: Write failing tests

Append to tests/test_store.py:

from auto_reverse.models import CapturedFlow, Signature
from auto_reverse.store import FlowStore, ScopeFilter


def _post(host: str, path: str, body: bytes) -> CapturedFlow:
    return CapturedFlow(
        method="POST", host=host, path=path, query={},
        req_headers={"content-type": "application/json"}, req_body=body,
        status=201, resp_headers={"content-type": "application/json"},
        resp_body=b'{"ok": true}', timestamp=0.0,
    )


def test_ingest_new_signature_returns_true_once():
    store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
    assert store.ingest(_post("ex.com", "/api/cart/1", b'{"q": 1}')).is_new is True
    # same template, different id -> not new
    assert store.ingest(_post("ex.com", "/api/cart/2", b'{"q": 2}')).is_new is False
    assert len(store.endpoints()) == 1


def test_out_of_scope_flow_ignored():
    store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
    result = store.ingest(_post("other.com", "/x", b"{}"))
    assert result.is_new is False
    assert result.in_scope is False
    assert store.endpoints() == []


def test_new_signature_callback_fires_with_signature():
    seen: list[Signature] = []
    store = FlowStore(ScopeFilter(target_hosts={"ex.com"}), on_new_signature=seen.append)
    store.ingest(_post("ex.com", "/api/cart/1", b"{}"))
    store.ingest(_post("ex.com", "/api/cart/2", b"{}"))
    assert len(seen) == 1
    assert seen[0].path_template == "/api/cart/{id}"


def test_search_filters_by_substring():
    store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
    store.ingest(_post("ex.com", "/api/cart/1", b"{}"))
    store.ingest(_post("ex.com", "/api/login", b"{}"))
    results = store.search("cart")
    assert len(results) == 1
    assert results[0].signature.path_template == "/api/cart/{id}"
  • Step 2: Run to verify fail

Run: uv run pytest tests/test_store.py -q Expected: FAIL — ImportError: cannot import name 'FlowStore'.

  • Step 3: Append FlowStore and IngestResult to store.py

Add to src/auto_reverse/store.py:

import threading
from collections.abc import Callable
from dataclasses import dataclass
from urllib.parse import urlsplit

from auto_reverse.models import EndpointRecord, Signature, status_class

MAX_SAMPLES = 5


@dataclass
class IngestResult:
    in_scope: bool
    is_new: bool
    signature: Signature | None


class FlowStore:
    """Thread-safe store: dedup by signature, retain bounded samples per endpoint."""

    def __init__(
        self,
        scope: ScopeFilter,
        on_new_signature: Callable[[Signature], None] | None = None,
    ) -> None:
        self._scope = scope
        self._on_new = on_new_signature
        self._lock = threading.Lock()
        self._records: dict[Signature, EndpointRecord] = {}
        self._samples: dict[Signature, list[CapturedFlow]] = {}

    def signature_of(self, flow: CapturedFlow) -> Signature:
        return Signature(
            method=flow.method.upper(),
            host=flow.host,
            path_template=path_template(flow.path),
            status_class=status_class(flow.status),
        )

    def ingest(self, flow: CapturedFlow) -> IngestResult:
        if not self._scope.is_in_scope(flow):
            return IngestResult(in_scope=False, is_new=False, signature=None)
        sig = self.signature_of(flow)
        with self._lock:
            is_new = sig not in self._records
            if is_new:
                self._records[sig] = EndpointRecord(signature=sig)
                self._samples[sig] = []
            record = self._records[sig]
            record.sample_count += 1
            record.query_params.update(self._query_keys(flow))
            samples = self._samples[sig]
            if len(samples) < MAX_SAMPLES:
                samples.append(flow)
        if is_new and self._on_new is not None:
            self._on_new(sig)
        return IngestResult(in_scope=True, is_new=is_new, signature=sig)

    @staticmethod
    def _query_keys(flow: CapturedFlow) -> set[str]:
        return set(flow.query.keys())

    def endpoints(self) -> list[EndpointRecord]:
        with self._lock:
            return list(self._records.values())

    def samples(self, sig: Signature) -> list[CapturedFlow]:
        with self._lock:
            return list(self._samples.get(sig, []))

    def get(self, sig: Signature) -> EndpointRecord | None:
        with self._lock:
            return self._records.get(sig)

    def search(self, query: str) -> list[EndpointRecord]:
        q = query.lower()
        with self._lock:
            return [
                r for r in self._records.values()
                if q in r.signature.path_template.lower()
                or q in r.signature.method.lower()
            ]

Note: urlsplit import is reserved for proxy use; keep it only if used — if pyright flags it unused here, remove it. (The capture addon, Task 8, splits URLs.)

  • Step 4: Run to verify pass

Run: uv run pytest tests/test_store.py -q Expected: PASS (9 tests total in file).

  • Step 5: Lint check

Run: uv run ruff check src/auto_reverse/store.py Expected: clean (remove any unused import it reports).

  • Step 6: Commit
git add src/auto_reverse/store.py tests/test_store.py
git commit -m "feat: thread-safe FlowStore with signature dedup and samples"

Task 5: Deterministic schema inference

Files:

  • Create: src/auto_reverse/doc/__init__.py, src/auto_reverse/doc/schema.py

  • Test: tests/test_schema.py

  • Step 1: Write failing tests

Create tests/test_schema.py:

from auto_reverse.doc.schema import SchemaAccumulator


def test_single_object_schema():
    acc = SchemaAccumulator()
    acc.add({"id": 1, "name": "Ada"})
    schema = acc.schema()
    assert schema["type"] == "object"
    assert set(schema["properties"]) == {"id", "name"}


def test_merge_widens_optional_fields():
    acc = SchemaAccumulator()
    acc.add({"id": 1, "name": "Ada"})
    acc.add({"id": 2})  # name missing -> becomes optional
    schema = acc.schema()
    assert "id" in schema.get("required", [])
    assert "name" not in schema.get("required", [])


def test_array_schema():
    acc = SchemaAccumulator()
    acc.add([{"id": 1}, {"id": 2}])
    schema = acc.schema()
    assert schema["type"] == "array"
    assert schema["items"]["type"] == "object"


def test_empty_accumulator_returns_none():
    assert SchemaAccumulator().schema() is None
  • Step 2: Run to verify fail

Run: uv run pytest tests/test_schema.py -q Expected: FAIL — ModuleNotFoundError: No module named 'auto_reverse.doc'.

  • Step 3: Implement schema wrapper

Create src/auto_reverse/doc/__init__.py (empty).

Create src/auto_reverse/doc/schema.py:

from __future__ import annotations

from typing import Any

from genson import SchemaBuilder


class SchemaAccumulator:
    """Accumulate JSON samples into a widening JSON Schema (genson-backed)."""

    def __init__(self) -> None:
        self._builder = SchemaBuilder()
        self._count = 0

    def add(self, value: Any) -> None:
        self._builder.add_object(value)
        self._count += 1

    def schema(self) -> dict[str, Any] | None:
        if self._count == 0:
            return None
        result: dict[str, Any] = self._builder.to_schema()
        result.pop("$schema", None)
        return result
  • Step 4: Run to verify pass

Run: uv run pytest tests/test_schema.py -q Expected: PASS (4 tests).

  • Step 5: Commit
git add src/auto_reverse/doc/__init__.py src/auto_reverse/doc/schema.py tests/test_schema.py
git commit -m "feat: deterministic JSON schema inference (genson wrapper)"

Task 6: OpenAPI assembly

Files:

  • Create: src/auto_reverse/doc/openapi.py

  • Test: tests/test_openapi.py

  • Step 1: Write failing tests

Create tests/test_openapi.py:

from auto_reverse.doc.openapi import build_openapi
from auto_reverse.models import EndpointRecord, Signature


def _record(method: str, template: str, **kw) -> EndpointRecord:
    rec = EndpointRecord(signature=Signature(method, "ex.com", template, "2xx"))
    for k, v in kw.items():
        setattr(rec, k, v)
    return rec


def test_builds_paths_and_methods():
    records = [
        _record("GET", "/api/users", summary="List users",
                response_schema={"type": "array"}),
        _record("POST", "/api/users", summary="Create user",
                request_schema={"type": "object"}),
    ]
    spec = build_openapi(records, title="ex.com API")
    assert spec["openapi"].startswith("3.")
    assert spec["info"]["title"] == "ex.com API"
    assert set(spec["paths"]["/api/users"]) == {"get", "post"}
    assert spec["paths"]["/api/users"]["get"]["summary"] == "List users"


def test_path_param_declared_for_template():
    rec = _record("GET", "/api/users/{id}", summary="Get user")
    spec = build_openapi([rec], title="x")
    params = spec["paths"]["/api/users/{id}"]["get"]["parameters"]
    assert any(p["name"] == "id" and p["in"] == "path" for p in params)


def test_request_body_included_when_schema_present():
    rec = _record("POST", "/api/x", request_schema={"type": "object"})
    op = build_openapi([rec], title="x")["paths"]["/api/x"]["post"]
    assert op["requestBody"]["content"]["application/json"]["schema"] == {"type": "object"}
  • Step 2: Run to verify fail

Run: uv run pytest tests/test_openapi.py -q Expected: FAIL — ModuleNotFoundError: No module named 'auto_reverse.doc.openapi'.

  • Step 3: Implement build_openapi

Create src/auto_reverse/doc/openapi.py:

from __future__ import annotations

import re
from typing import Any

from auto_reverse.models import EndpointRecord

_PARAM = re.compile(r"\{([^}]+)\}")


def _path_params(template: str) -> list[dict[str, Any]]:
    return [
        {"name": name, "in": "path", "required": True, "schema": {"type": "string"}}
        for name in _PARAM.findall(template)
    ]


def _operation(rec: EndpointRecord) -> dict[str, Any]:
    op: dict[str, Any] = {}
    if rec.summary:
        op["summary"] = rec.summary
    if rec.description:
        op["description"] = rec.description
    if rec.tag:
        op["tags"] = [rec.tag]
    params = _path_params(rec.signature.path_template)
    params += [
        {"name": q, "in": "query", "required": False, "schema": {"type": "string"}}
        for q in sorted(rec.query_params)
    ]
    if params:
        op["parameters"] = params
    if rec.request_schema is not None:
        op["requestBody"] = {
            "content": {"application/json": {"schema": rec.request_schema}}
        }
    status = rec.signature.status_class.replace("x", "X")
    response: dict[str, Any] = {"description": rec.summary or "Response"}
    if rec.response_schema is not None:
        response["content"] = {"application/json": {"schema": rec.response_schema}}
    op["responses"] = {status[0] + "XX": response}
    return op


def build_openapi(records: list[EndpointRecord], title: str) -> dict[str, Any]:
    paths: dict[str, dict[str, Any]] = {}
    for rec in records:
        template = rec.signature.path_template
        method = rec.signature.method.lower()
        paths.setdefault(template, {})[method] = _operation(rec)
    return {
        "openapi": "3.1.0",
        "info": {"title": title, "version": "0.0.0"},
        "paths": paths,
    }
  • Step 4: Run to verify pass

Run: uv run pytest tests/test_openapi.py -q Expected: PASS (3 tests).

  • Step 5: Commit
git add src/auto_reverse/doc/openapi.py tests/test_openapi.py
git commit -m "feat: OpenAPI assembly from endpoint records"

Task 7: Markdown rendering

Files:

  • Create: src/auto_reverse/doc/markdown.py

  • Test: tests/test_markdown.py

  • Step 1: Write failing tests

Create tests/test_markdown.py:

from auto_reverse.doc.markdown import render_markdown
from auto_reverse.models import EndpointRecord, Signature


def _rec(method, template, **kw):
    rec = EndpointRecord(signature=Signature(method, "ex.com", template, "2xx"))
    for k, v in kw.items():
        setattr(rec, k, v)
    return rec


def test_renders_heading_and_endpoints():
    md = render_markdown([_rec("GET", "/api/users", summary="List users")], title="ex.com API")
    assert "# ex.com API" in md
    assert "`GET /api/users`" in md
    assert "List users" in md


def test_groups_by_tag():
    records = [
        _rec("GET", "/api/users", tag="Users"),
        _rec("GET", "/api/cart", tag="Cart"),
    ]
    md = render_markdown(records, title="x")
    assert "## Users" in md
    assert "## Cart" in md


def test_untagged_go_under_general():
    md = render_markdown([_rec("GET", "/api/x")], title="x")
    assert "## General" in md
  • Step 2: Run to verify fail

Run: uv run pytest tests/test_markdown.py -q Expected: FAIL — module not found.

  • Step 3: Implement render_markdown

Create src/auto_reverse/doc/markdown.py:

from __future__ import annotations

from collections import defaultdict

from auto_reverse.models import EndpointRecord


def render_markdown(records: list[EndpointRecord], title: str) -> str:
    groups: dict[str, list[EndpointRecord]] = defaultdict(list)
    for rec in records:
        groups[rec.tag or "General"].append(rec)

    lines = [f"# {title}", ""]
    for tag in sorted(groups):
        lines.append(f"## {tag}")
        lines.append("")
        for rec in sorted(groups[tag], key=lambda r: r.signature.path_template):
            sig = rec.signature
            lines.append(f"### `{sig.method} {sig.path_template}`")
            if rec.summary:
                lines.append(f"**{rec.summary}**")
            if rec.description:
                lines.append("")
                lines.append(rec.description)
            lines.append(f"\n_Seen {rec.sample_count} time(s)._")
            lines.append("")
    return "\n".join(lines)
  • Step 4: Run to verify pass

Run: uv run pytest tests/test_markdown.py -q Expected: PASS (3 tests).

  • Step 5: Commit
git add src/auto_reverse/doc/markdown.py tests/test_markdown.py
git commit -m "feat: markdown API documentation rendering"

Task 8: Config and pluggable auth stub

Files:

  • Create: src/auto_reverse/config.py

  • Test: tests/test_config.py

  • Step 1: Write failing tests

Create tests/test_config.py:

import pytest

from auto_reverse.config import Config, ManualPauseAuth, NoAuth, make_auth


def test_config_derives_target_host():
    cfg = Config(target_url="https://app.example.com/dashboard")
    assert cfg.target_host == "app.example.com"


def test_config_scope_hosts_includes_target_plus_extra():
    cfg = Config(target_url="https://app.example.com", scope_hosts={"api.example.com"})
    assert cfg.all_scope_hosts() == {"app.example.com", "api.example.com"}


def test_default_model():
    assert Config(target_url="https://x.com").model == "claude-opus-4-8"


def test_make_auth_returns_strategy():
    assert isinstance(make_auth("manual"), ManualPauseAuth)
    assert isinstance(make_auth("none"), NoAuth)


def test_make_auth_unknown_raises():
    with pytest.raises(ValueError):
        make_auth("oauth-magic")
  • Step 2: Run to verify fail

Run: uv run pytest tests/test_config.py -q Expected: FAIL — module not found.

  • Step 3: Implement config.py

Create src/auto_reverse/config.py:

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Protocol
from urllib.parse import urlsplit


@dataclass
class Config:
    target_url: str
    out_dir: str | None = None
    proxy_port: int = 8080
    headless: bool = False
    profile: str | None = None
    gen_client: bool = False
    model: str = "claude-opus-4-8"
    scope_hosts: set[str] = field(default_factory=set)
    no_llm_doc: bool = False
    resume: str | None = None
    auth: str = "manual"

    @property
    def target_host(self) -> str:
        return urlsplit(self.target_url).hostname or ""

    def all_scope_hosts(self) -> set[str]:
        return {self.target_host, *self.scope_hosts}


class AuthStrategy(Protocol):
    name: str

    async def authenticate(self, page: object) -> None:
        """Prepare an authenticated session on the given Playwright page."""
        ...


class NoAuth:
    name = "none"

    async def authenticate(self, page: object) -> None:
        return None


class ManualPauseAuth:
    """Default stub: pause so the human can log in by hand, then continue."""

    name = "manual"

    async def authenticate(self, page: object) -> None:
        # Implemented against the real page in browser.py wiring; the stub
        # simply records intent. The REPL prompts the user to log in and
        # press enter before autonomous exploration begins.
        return None


def make_auth(name: str) -> AuthStrategy:
    strategies: dict[str, AuthStrategy] = {"manual": ManualPauseAuth(), "none": NoAuth()}
    if name not in strategies:
        raise ValueError(f"unknown auth strategy: {name!r}")
    return strategies[name]
  • Step 4: Run to verify pass

Run: uv run pytest tests/test_config.py -q Expected: PASS (5 tests).

  • Step 5: Commit
git add src/auto_reverse/config.py tests/test_config.py
git commit -m "feat: Config and pluggable auth stub (manual/none)"

Task 9: Embedded proxy + capture addon

Files:

  • Create: src/auto_reverse/proxy.py
  • Test: tests/test_proxy.py

The capture addon converts a mitmproxy HTTPFlow into our CapturedFlow, feeds the store, and appends raw flows to an archive. We unit-test the pure conversion (flow_from_mitm) with a fake mitmproxy-shaped object so no proxy needs to run; the live proxy is exercised in the E2E smoke (Task 15).

  • Step 1: Write failing tests

Create tests/test_proxy.py:

from types import SimpleNamespace

from auto_reverse.proxy import flow_from_mitm


def _fake_mitm_flow():
    request = SimpleNamespace(
        method="POST", pretty_host="ex.com", path="/api/users?role=admin",
        headers={"content-type": "application/json"}, content=b'{"name": "Ada"}',
        query=SimpleNamespace(fields=[("role", "admin")]),
    )
    response = SimpleNamespace(
        status_code=201, headers={"content-type": "application/json"},
        content=b'{"id": 1}',
    )
    return SimpleNamespace(request=request, response=response, timestamp_start=1.5)


def test_flow_from_mitm_maps_fields():
    captured = flow_from_mitm(_fake_mitm_flow())
    assert captured.method == "POST"
    assert captured.host == "ex.com"
    assert captured.path == "/api/users"
    assert captured.query == {"role": ["admin"]}
    assert captured.status == 201
    assert captured.request_json() == {"name": "Ada"}
    assert captured.response_json() == {"id": 1}


def test_flow_from_mitm_handles_missing_response():
    flow = _fake_mitm_flow()
    flow.response = None
    captured = flow_from_mitm(flow)
    assert captured is None
  • Step 2: Run to verify fail

Run: uv run pytest tests/test_proxy.py -q Expected: FAIL — module not found.

  • Step 3: Implement proxy.py

Create src/auto_reverse/proxy.py:

from __future__ import annotations

import asyncio
import threading
from pathlib import Path
from typing import Any
from urllib.parse import urlsplit

from auto_reverse.models import CapturedFlow
from auto_reverse.store import FlowStore


def flow_from_mitm(flow: Any) -> CapturedFlow | None:
    """Convert a mitmproxy HTTPFlow (or test double) into a CapturedFlow."""
    if flow.response is None:
        return None
    req = flow.request
    query: dict[str, list[str]] = {}
    for key, value in req.query.fields:
        query.setdefault(key, []).append(value)
    return CapturedFlow(
        method=req.method,
        host=req.pretty_host,
        path=urlsplit(req.path).path,
        query=query,
        req_headers={k.lower(): v for k, v in dict(req.headers).items()},
        req_body=req.content,
        status=flow.response.status_code,
        resp_headers={k.lower(): v for k, v in dict(flow.response.headers).items()},
        resp_body=flow.response.content,
        timestamp=getattr(flow, "timestamp_start", 0.0) or 0.0,
    )


class CaptureAddon:
    """mitmproxy addon: on each response, ingest into the store + archive raw."""

    def __init__(self, store: FlowStore, archive_path: Path) -> None:
        self._store = store
        self._archive = archive_path
        self._archive.parent.mkdir(parents=True, exist_ok=True)

    def response(self, flow: Any) -> None:  # mitmproxy hook name
        captured = flow_from_mitm(flow)
        if captured is None:
            return
        self._store.ingest(captured)
        with self._archive.open("a") as fh:
            fh.write(f"{captured.method} {captured.host}{captured.path} {captured.status}\n")


class ProxyServer:
    """Run mitmproxy's DumpMaster in a dedicated thread with its own loop."""

    def __init__(self, store: FlowStore, archive_path: Path, port: int) -> None:
        self._store = store
        self._archive_path = archive_path
        self._port = port
        self._master: Any = None
        self._loop: asyncio.AbstractEventLoop | None = None
        self._thread: threading.Thread | None = None

    @property
    def port(self) -> int:
        return self._port

    def start(self) -> None:
        ready = threading.Event()
        self._thread = threading.Thread(target=self._run, args=(ready,), daemon=True)
        self._thread.start()
        ready.wait(timeout=10)

    def _run(self, ready: threading.Event) -> None:
        from mitmproxy.options import Options
        from mitmproxy.tools.dump import DumpMaster

        self._loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self._loop)
        opts = Options(listen_host="127.0.0.1", listen_port=self._port)
        self._master = DumpMaster(opts, with_termlog=False, with_dumper=False)
        self._master.addons.add(CaptureAddon(self._store, self._archive_path))

        async def _serve() -> None:
            ready.set()
            await self._master.run()

        self._loop.run_until_complete(_serve())

    def stop(self) -> None:
        if self._master is not None and self._loop is not None:
            self._loop.call_soon_threadsafe(self._master.shutdown)
  • Step 4: Run to verify pass

Run: uv run pytest tests/test_proxy.py -q Expected: PASS (2 tests).

  • Step 5: Commit
git add src/auto_reverse/proxy.py tests/test_proxy.py
git commit -m "feat: embedded mitmproxy capture addon and proxy server"

Task 10: Doc engine (event consumer)

Files:

  • Create: src/auto_reverse/doc/engine.py

  • Test: extend tests/test_schema.py is wrong target — create tests/test_engine.py

  • Step 1: Write failing tests

Create tests/test_engine.py:

from pathlib import Path

from auto_reverse.doc.engine import DocEngine
from auto_reverse.models import CapturedFlow
from auto_reverse.store import FlowStore, ScopeFilter


def _flow(path: str, resp: bytes) -> CapturedFlow:
    return CapturedFlow(
        method="GET", host="ex.com", path=path, query={}, req_headers={},
        req_body=None, status=200,
        resp_headers={"content-type": "application/json"}, resp_body=resp,
        timestamp=0.0,
    )


def test_engine_writes_spec_and_markdown(tmp_path: Path):
    store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
    engine = DocEngine(store, out_dir=tmp_path, title="ex.com API", use_llm=False)
    store.ingest(_flow("/api/users", b'[{"id": 1}]'))
    sig = store.endpoints()[0].signature
    engine.document(sig)
    spec = (tmp_path / "openapi.yaml").read_text()
    assert "/api/users" in spec
    assert (tmp_path / "API.md").read_text().startswith("# ex.com API")


def test_engine_infers_response_schema(tmp_path: Path):
    store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
    engine = DocEngine(store, out_dir=tmp_path, title="x", use_llm=False)
    store.ingest(_flow("/api/users", b'[{"id": 1, "name": "Ada"}]'))
    sig = store.endpoints()[0].signature
    engine.document(sig)
    rec = store.get(sig)
    assert rec is not None
    assert rec.response_schema is not None
    assert rec.response_schema["type"] == "array"
  • Step 2: Run to verify fail

Run: uv run pytest tests/test_engine.py -q Expected: FAIL — module not found.

  • Step 3: Implement engine.py

Create src/auto_reverse/doc/engine.py:

from __future__ import annotations

import threading
from pathlib import Path

import yaml  # provided transitively by mitmproxy (ruamel/pyyaml); see note

from auto_reverse.doc.markdown import render_markdown
from auto_reverse.doc.openapi import build_openapi
from auto_reverse.doc.schema import SchemaAccumulator
from auto_reverse.models import Signature
from auto_reverse.store import FlowStore


class DocEngine:
    """Turn a new endpoint signature into inferred schemas + written outputs."""

    def __init__(
        self, store: FlowStore, out_dir: Path, title: str, use_llm: bool = True
    ) -> None:
        self._store = store
        self._out = out_dir
        self._title = title
        self._use_llm = use_llm
        self._lock = threading.Lock()
        self._out.mkdir(parents=True, exist_ok=True)

    def document(self, sig: Signature) -> None:
        record = self._store.get(sig)
        if record is None:
            return
        req_acc = SchemaAccumulator()
        resp_acc = SchemaAccumulator()
        for flow in self._store.samples(sig):
            rj = flow.request_json()
            if rj is not None:
                req_acc.add(rj)
            sj = flow.response_json()
            if sj is not None:
                resp_acc.add(sj)
        record.request_schema = req_acc.schema()
        record.response_schema = resp_acc.schema()
        if not record.summary:
            record.summary = f"{sig.method} {sig.path_template}"
        record.documented = True
        self._write()

    def _write(self) -> None:
        with self._lock:
            records = self._store.endpoints()
            spec = build_openapi(records, title=self._title)
            (self._out / "openapi.yaml").write_text(yaml.safe_dump(spec, sort_keys=False))
            (self._out / "API.md").write_text(render_markdown(records, title=self._title))

Note on yaml: if import yaml fails (PyYAML not present transitively), add it explicitly: uv add pyyaml, then re-run. Do this in Step 4 if needed.

  • Step 4: Run to verify pass

Run: uv run pytest tests/test_engine.py -q Expected: PASS (2 tests). If ModuleNotFoundError: yaml, run uv add pyyaml and re-run.

  • Step 5: Commit
git add src/auto_reverse/doc/engine.py tests/test_engine.py pyproject.toml uv.lock
git commit -m "feat: doc engine writes openapi.yaml and API.md from samples"

Task 11: Browser wrapper

Files:

  • Create: src/auto_reverse/browser.py
  • Test: tests/test_browser.py

The browser wrapper uses Playwright's sync API (runs cleanly in a worker thread, separate from the proxy's asyncio loop). The test is guarded to skip when browsers are not installed, and routes through the live fixture site (no proxy, direct) to verify navigation + snapshot.

  • Step 1: Write failing test

Create tests/test_browser.py:

import pytest

playwright = pytest.importorskip("playwright.sync_api")

from auto_reverse.browser import Browser  # noqa: E402


@pytest.fixture
def browser():
    try:
        b = Browser(proxy_port=None, headless=True)
        b.start()
    except Exception as exc:  # browser binary missing, etc.
        pytest.skip(f"browser unavailable: {exc}")
    yield b
    b.stop()


def test_navigate_and_snapshot(browser, fixture_site):
    browser.navigate(fixture_site + "/")
    snap = browser.snapshot()
    assert snap["url"].endswith("/")
    assert "title" in snap
  • Step 2: Run to verify fail

Run: uv run pytest tests/test_browser.py -q Expected: FAIL — module not found (or SKIP if Playwright import missing). A skip here is acceptable; the implementation step still applies.

  • Step 3: Implement browser.py

Create src/auto_reverse/browser.py:

from __future__ import annotations

from typing import Any


class Browser:
    """Headed (or headless) Playwright Chromium, optionally routed via proxy."""

    def __init__(self, proxy_port: int | None, headless: bool = False) -> None:
        self._proxy_port = proxy_port
        self._headless = headless
        self._pw: Any = None
        self._browser: Any = None
        self._page: Any = None

    def start(self) -> None:
        from playwright.sync_api import sync_playwright

        self._pw = sync_playwright().start()
        launch_kwargs: dict[str, Any] = {
            "headless": self._headless,
            "args": ["--ignore-certificate-errors"],
        }
        if self._proxy_port is not None:
            launch_kwargs["proxy"] = {"server": f"http://127.0.0.1:{self._proxy_port}"}
        self._browser = self._pw.chromium.launch(**launch_kwargs)
        context = self._browser.new_context(ignore_https_errors=True)
        self._page = context.new_page()

    def navigate(self, url: str) -> dict[str, Any]:
        self._page.goto(url, wait_until="networkidle")
        return self.snapshot()

    def click(self, selector: str) -> dict[str, Any]:
        self._page.click(selector, timeout=5000)
        self._page.wait_for_load_state("networkidle")
        return self.snapshot()

    def type_text(self, selector: str, text: str) -> dict[str, Any]:
        self._page.fill(selector, text)
        return self.snapshot()

    def snapshot(self) -> dict[str, Any]:
        """Compact view for the agent: url, title, and visible interactive elements."""
        elements = self._page.eval_on_selector_all(
            "a, button, input, [role=button], [role=link]",
            """els => els.slice(0, 40).map(e => ({
                tag: e.tagName.toLowerCase(),
                text: (e.innerText || e.value || e.getAttribute('aria-label') || '').slice(0, 60),
                id: e.id || null,
            }))""",
        )
        return {
            "url": self._page.url,
            "title": self._page.title(),
            "elements": elements,
        }

    def pause_for_human(self) -> None:
        """Surface the headed browser for manual control (Playwright inspector)."""
        self._page.pause()

    def stop(self) -> None:
        if self._browser is not None:
            self._browser.close()
        if self._pw is not None:
            self._pw.stop()
  • Step 4: Run to verify pass (or skip)

Run: uv run pytest tests/test_browser.py -q Expected: PASS if Chromium is installed; SKIP otherwise. Both are acceptable to proceed.

  • Step 5: Commit
git add src/auto_reverse/browser.py tests/test_browser.py
git commit -m "feat: Playwright browser wrapper with compact snapshot"

Task 12: Tool definitions and handlers

Files:

  • Create: src/auto_reverse/tools/__init__.py, src/auto_reverse/tools/browser_tools.py, src/auto_reverse/tools/flows_tools.py, src/auto_reverse/tools/doc_tools.py
  • Test: tests/test_tools.py

Tools are plain callables plus an Anthropic tool schema. Each handler takes a JSON-able input dict and returns a JSON-able result. The registry maps tool name → (schema, handler). Browser handlers use a fake browser in tests.

  • Step 1: Write failing tests

Create tests/test_tools.py:

from auto_reverse.doc.engine import DocEngine
from auto_reverse.models import CapturedFlow
from auto_reverse.store import FlowStore, ScopeFilter
from auto_reverse.tools import build_registry


class FakeBrowser:
    def navigate(self, url):
        return {"url": url, "title": "T", "elements": []}

    def click(self, selector):
        return {"url": "u", "title": "T", "elements": []}

    def type_text(self, selector, text):
        return {"url": "u", "title": "T", "elements": []}

    def snapshot(self):
        return {"url": "u", "title": "T", "elements": []}


def _store_with_endpoint(tmp_path):
    store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
    store.ingest(CapturedFlow(
        method="GET", host="ex.com", path="/api/users", query={}, req_headers={},
        req_body=None, status=200,
        resp_headers={"content-type": "application/json"}, resp_body=b"[]",
        timestamp=0.0,
    ))
    engine = DocEngine(store, out_dir=tmp_path, title="x", use_llm=False)
    return store, engine


def test_registry_has_expected_tools(tmp_path):
    store, engine = _store_with_endpoint(tmp_path)
    reg = build_registry(FakeBrowser(), store, engine)
    names = {schema["name"] for schema, _ in reg.values()}
    assert {"browser_navigate", "browser_click", "flows_search", "doc_document"} <= names


def test_flows_search_handler_returns_matches(tmp_path):
    store, engine = _store_with_endpoint(tmp_path)
    reg = build_registry(FakeBrowser(), store, engine)
    _, handler = reg["flows_search"]
    result = handler({"query": "users"})
    assert any("/api/users" in ep["path"] for ep in result["endpoints"])


def test_browser_navigate_handler(tmp_path):
    store, engine = _store_with_endpoint(tmp_path)
    reg = build_registry(FakeBrowser(), store, engine)
    _, handler = reg["browser_navigate"]
    result = handler({"url": "http://x"})
    assert result["url"] == "http://x"
  • Step 2: Run to verify fail

Run: uv run pytest tests/test_tools.py -q Expected: FAIL — module not found.

  • Step 3: Implement tool modules

Create src/auto_reverse/tools/browser_tools.py:

from __future__ import annotations

from collections.abc import Callable
from typing import Any

Handler = Callable[[dict[str, Any]], dict[str, Any]]


def browser_tools(browser: Any) -> dict[str, tuple[dict[str, Any], Handler]]:
    return {
        "browser_navigate": (
            {
                "name": "browser_navigate",
                "description": "Navigate the browser to a URL and return a page snapshot.",
                "input_schema": {
                    "type": "object",
                    "properties": {"url": {"type": "string"}},
                    "required": ["url"],
                },
            },
            lambda inp: browser.navigate(inp["url"]),
        ),
        "browser_click": (
            {
                "name": "browser_click",
                "description": "Click an element by CSS selector; returns a new snapshot.",
                "input_schema": {
                    "type": "object",
                    "properties": {"selector": {"type": "string"}},
                    "required": ["selector"],
                },
            },
            lambda inp: browser.click(inp["selector"]),
        ),
        "browser_type": (
            {
                "name": "browser_type",
                "description": "Fill a form field (CSS selector) with text.",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "selector": {"type": "string"},
                        "text": {"type": "string"},
                    },
                    "required": ["selector", "text"],
                },
            },
            lambda inp: browser.type_text(inp["selector"], inp["text"]),
        ),
        "browser_snapshot": (
            {
                "name": "browser_snapshot",
                "description": "Return the current page snapshot without acting.",
                "input_schema": {"type": "object", "properties": {}},
            },
            lambda inp: browser.snapshot(),
        ),
    }

Create src/auto_reverse/tools/flows_tools.py:

from __future__ import annotations

from collections.abc import Callable
from typing import Any

from auto_reverse.models import EndpointRecord
from auto_reverse.store import FlowStore

Handler = Callable[[dict[str, Any]], dict[str, Any]]


def _record_view(rec: EndpointRecord) -> dict[str, Any]:
    return {
        "method": rec.signature.method,
        "path": rec.signature.path_template,
        "status": rec.signature.status_class,
        "samples": rec.sample_count,
        "documented": rec.documented,
    }


def flows_tools(store: FlowStore) -> dict[str, tuple[dict[str, Any], Handler]]:
    def search(inp: dict[str, Any]) -> dict[str, Any]:
        query = inp.get("query", "")
        records = store.search(query) if query else store.endpoints()
        return {"endpoints": [_record_view(r) for r in records]}

    return {
        "flows_search": (
            {
                "name": "flows_search",
                "description": "List/search discovered API endpoints captured so far.",
                "input_schema": {
                    "type": "object",
                    "properties": {"query": {"type": "string"}},
                },
            },
            search,
        ),
    }

Create src/auto_reverse/tools/doc_tools.py:

from __future__ import annotations

from collections.abc import Callable
from typing import Any

from auto_reverse.doc.engine import DocEngine
from auto_reverse.store import FlowStore

Handler = Callable[[dict[str, Any]], dict[str, Any]]


def doc_tools(store: FlowStore, engine: DocEngine) -> dict[str, tuple[dict[str, Any], Handler]]:
    def document(inp: dict[str, Any]) -> dict[str, Any]:
        path = inp["path_template"]
        for rec in store.endpoints():
            if rec.signature.path_template == path:
                if inp.get("summary"):
                    rec.summary = inp["summary"]
                if inp.get("description"):
                    rec.description = inp["description"]
                if inp.get("tag"):
                    rec.tag = inp["tag"]
                engine.document(rec.signature)
                return {"documented": path}
        return {"error": f"no endpoint matching {path}"}

    return {
        "doc_document": (
            {
                "name": "doc_document",
                "description": (
                    "Enrich and (re)write docs for an endpoint by path template, "
                    "optionally setting a human summary/description/tag."
                ),
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "path_template": {"type": "string"},
                        "summary": {"type": "string"},
                        "description": {"type": "string"},
                        "tag": {"type": "string"},
                    },
                    "required": ["path_template"],
                },
            },
            document,
        ),
    }

Create src/auto_reverse/tools/__init__.py:

from __future__ import annotations

from collections.abc import Callable
from typing import Any

from auto_reverse.doc.engine import DocEngine
from auto_reverse.store import FlowStore
from auto_reverse.tools.browser_tools import browser_tools
from auto_reverse.tools.doc_tools import doc_tools
from auto_reverse.tools.flows_tools import flows_tools

Handler = Callable[[dict[str, Any]], dict[str, Any]]
Registry = dict[str, tuple[dict[str, Any], Handler]]


def build_registry(browser: Any, store: FlowStore, engine: DocEngine) -> Registry:
    registry: Registry = {}
    registry.update(browser_tools(browser))
    registry.update(flows_tools(store))
    registry.update(doc_tools(store, engine))
    return registry


def tool_schemas(registry: Registry) -> list[dict[str, Any]]:
    return [schema for schema, _ in registry.values()]
  • Step 4: Run to verify pass

Run: uv run pytest tests/test_tools.py -q Expected: PASS (3 tests).

  • Step 5: Commit
git add src/auto_reverse/tools/ tests/test_tools.py
git commit -m "feat: agent tool definitions (browser, flows, doc) and registry"

Task 13: Agent (Claude tool-use loop)

Files:

  • Create: src/auto_reverse/agent.py
  • Test: tests/test_agent.py

The agent runs the tool-use loop: send messages + tools to Claude, execute any tool_use blocks via the registry, feed tool_results back, repeat until the model returns text with no tool calls. Tested with a fake client returning scripted responses.

  • Step 1: Write failing tests

Create tests/test_agent.py:

from types import SimpleNamespace

from auto_reverse.agent import Agent


def _text_block(text):
    return SimpleNamespace(type="text", text=text)


def _tool_use(tool_id, name, inp):
    return SimpleNamespace(type="tool_use", id=tool_id, name=name, input=inp)


class FakeMessages:
    def __init__(self, scripted):
        self._scripted = list(scripted)
        self.calls = []

    def create(self, **kwargs):
        self.calls.append(kwargs)
        content = self._scripted.pop(0)
        stop = "tool_use" if any(b.type == "tool_use" for b in content) else "end_turn"
        return SimpleNamespace(content=content, stop_reason=stop, role="assistant")


class FakeClient:
    def __init__(self, scripted):
        self.messages = FakeMessages(scripted)


def test_agent_executes_tool_then_returns_text():
    scripted = [
        [_tool_use("t1", "flows_search", {"query": "users"})],
        [_text_block("Found the users endpoint.")],
    ]
    client = FakeClient(scripted)
    registry = {
        "flows_search": (
            {"name": "flows_search", "input_schema": {"type": "object"}},
            lambda inp: {"endpoints": [{"path": "/api/users"}]},
        )
    }
    agent = Agent(client, registry, model="m", system="s")
    reply = agent.run_turn("map users")
    assert "users endpoint" in reply
    # the tool result was fed back: second create call has >= 3 messages
    assert len(client.messages.calls[1]["messages"]) >= 3


def test_agent_plain_text_no_tools():
    client = FakeClient([[_text_block("Hello!")]])
    agent = Agent(client, {}, model="m", system="s")
    assert agent.run_turn("hi") == "Hello!"
  • Step 2: Run to verify fail

Run: uv run pytest tests/test_agent.py -q Expected: FAIL — module not found.

  • Step 3: Implement agent.py

Create src/auto_reverse/agent.py:

from __future__ import annotations

from typing import Any

from auto_reverse.tools import Registry, tool_schemas

MAX_ITERATIONS = 25


class Agent:
    """Conversational Claude tool-use loop driving browser/flows/doc tools."""

    def __init__(self, client: Any, registry: Registry, model: str, system: str) -> None:
        self._client = client
        self._registry = registry
        self._model = model
        self._system = system
        self._messages: list[dict[str, Any]] = []

    def run_turn(self, user_message: str) -> str:
        self._messages.append({"role": "user", "content": user_message})
        for _ in range(MAX_ITERATIONS):
            response = self._client.messages.create(
                model=self._model,
                max_tokens=4096,
                system=self._system,
                tools=tool_schemas(self._registry),
                messages=self._messages,
            )
            self._messages.append(
                {"role": "assistant", "content": self._serialize(response.content)}
            )
            tool_uses = [b for b in response.content if b.type == "tool_use"]
            if not tool_uses:
                return self._text_of(response.content)
            results = []
            for block in tool_uses:
                results.append(self._run_tool(block))
            self._messages.append({"role": "user", "content": results})
        return "(stopped: reached max tool iterations)"

    def _run_tool(self, block: Any) -> dict[str, Any]:
        entry = self._registry.get(block.name)
        if entry is None:
            output: Any = {"error": f"unknown tool {block.name}"}
        else:
            _, handler = entry
            try:
                output = handler(block.input)
            except Exception as exc:  # tool failure -> structured error, agent re-plans
                output = {"error": str(exc)}
        return {
            "type": "tool_result",
            "tool_use_id": block.id,
            "content": __import__("json").dumps(output),
        }

    @staticmethod
    def _serialize(content: list[Any]) -> list[dict[str, Any]]:
        out: list[dict[str, Any]] = []
        for b in content:
            if b.type == "text":
                out.append({"type": "text", "text": b.text})
            elif b.type == "tool_use":
                out.append(
                    {"type": "tool_use", "id": b.id, "name": b.name, "input": b.input}
                )
        return out

    @staticmethod
    def _text_of(content: list[Any]) -> str:
        return "".join(b.text for b in content if b.type == "text").strip()

Note: replace __import__("json") with a top-level import json and json.dumps(output) — written inline here only to keep the snippet self-contained. Add import json to the imports and use json.dumps(output).

  • Step 4: Apply the json import cleanup

Edit src/auto_reverse/agent.py: add import json under from __future__, and change the content line to "content": json.dumps(output),.

  • Step 5: Run to verify pass

Run: uv run pytest tests/test_agent.py -q Expected: PASS (2 tests).

  • Step 6: Commit
git add src/auto_reverse/agent.py tests/test_agent.py
git commit -m "feat: Claude tool-use agent loop with graceful tool-error handling"

Task 14: Optional client generation

Files:

  • Create: src/auto_reverse/doc/client.py

  • Test: covered by manual verification + a unit test on the command builder

  • Step 1: Write failing test

Create tests/test_client.py:

from pathlib import Path

from auto_reverse.doc.client import client_gen_command


def test_command_references_spec_and_out(tmp_path: Path):
    spec = tmp_path / "openapi.yaml"
    out = tmp_path / "client"
    cmd = client_gen_command(spec, out)
    assert str(spec) in cmd
    assert "openapi-python-client" in " ".join(cmd)
  • Step 2: Run to verify fail

Run: uv run pytest tests/test_client.py -q Expected: FAIL — module not found.

  • Step 3: Implement client.py

Create src/auto_reverse/doc/client.py:

from __future__ import annotations

import subprocess
from pathlib import Path


def client_gen_command(spec_path: Path, out_dir: Path) -> list[str]:
    return [
        "uvx",
        "openapi-python-client",
        "generate",
        "--path",
        str(spec_path),
        "--output-path",
        str(out_dir),
    ]


def generate_client(spec_path: Path, out_dir: Path) -> bool:
    """Run the deterministic codegen; returns True on success."""
    try:
        subprocess.run(client_gen_command(spec_path, out_dir), check=True)
        return True
    except (subprocess.CalledProcessError, FileNotFoundError):
        return False
  • Step 4: Run to verify pass

Run: uv run pytest tests/test_client.py -q Expected: PASS (1 test).

  • Step 5: Commit
git add src/auto_reverse/doc/client.py tests/test_client.py
git commit -m "feat: optional typed client generation from openapi spec"

Task 15: REPL and CLI wiring

Files:

  • Create: src/auto_reverse/repl.py, src/auto_reverse/cli.py
  • Modify: src/auto_reverse/__init__.py
  • Test: tests/test_e2e_smoke.py

The REPL handles /meta-commands locally and routes plain text to the agent. The CLI parses args, builds the object graph, wires the on_new_signature callback to the doc engine on a worker thread, starts the proxy, launches the browser, and runs the REPL. End-to-end smoke drives the fixture site through the proxy and asserts an endpoint lands in the spec.

  • Step 1: Write failing E2E smoke test

Create tests/test_e2e_smoke.py:

import threading
from pathlib import Path

import pytest

playwright = pytest.importorskip("playwright.sync_api")

from auto_reverse.browser import Browser  # noqa: E402
from auto_reverse.doc.engine import DocEngine  # noqa: E402
from auto_reverse.proxy import ProxyServer  # noqa: E402
from auto_reverse.store import FlowStore, ScopeFilter  # noqa: E402


def test_capture_to_spec_end_to_end(tmp_path: Path, fixture_site: str):
    from urllib.parse import urlsplit

    host = urlsplit(fixture_site).hostname
    port = urlsplit(fixture_site).port
    scope = ScopeFilter(target_hosts={f"{host}:{port}", host})

    engine_holder: dict = {}

    def on_new(sig):
        engine_holder["engine"].document(sig)

    store = FlowStore(scope, on_new_signature=on_new)
    engine = DocEngine(store, out_dir=tmp_path, title="fixture", use_llm=False)
    engine_holder["engine"] = engine

    proxy = ProxyServer(store, archive_path=tmp_path / "archive.log", port=0)
    try:
        proxy.start()
    except Exception as exc:
        pytest.skip(f"proxy unavailable: {exc}")

    try:
        browser = Browser(proxy_port=proxy.port, headless=True)
        browser.start()
    except Exception as exc:
        proxy.stop()
        pytest.skip(f"browser unavailable: {exc}")

    try:
        browser.navigate(fixture_site + "/")  # triggers fetch('/api/users')
        # allow capture to settle
        threading.Event().wait(1.0)
        assert any(
            "/api/users" in r.signature.path_template for r in store.endpoints()
        )
    finally:
        browser.stop()
        proxy.stop()

Note: ProxyServer(port=0) needs to expose the OS-assigned port. If mitmproxy does not support port 0 ephemeral selection, pick a fixed high port (e.g. 18080) in this test and pass it to both proxy and browser.

  • Step 2: Run to verify fail or skip

Run: uv run pytest tests/test_e2e_smoke.py -q Expected: FAIL (imports resolve but modules incomplete) or SKIP (no browser/proxy). Acceptable to proceed; this test is the integration safety net.

  • Step 3: Implement repl.py

Create src/auto_reverse/repl.py:

from __future__ import annotations

from auto_reverse.agent import Agent
from auto_reverse.store import FlowStore

HELP = """\
Commands:
  <text>      state intent in natural language (sent to the agent)
  /flows [q]  list/search discovered endpoints (local)
  /spec       show spec path + endpoint count
  /help       this help
  /quit       exit
"""


class Repl:
    def __init__(self, agent: Agent, store: FlowStore, spec_path: str) -> None:
        self._agent = agent
        self._store = store
        self._spec_path = spec_path

    def handle(self, line: str) -> str | None:
        """Process one input line. Returns output text, or None to signal quit."""
        line = line.strip()
        if not line:
            return ""
        if line in ("/quit", "/exit"):
            return None
        if line == "/help":
            return HELP
        if line == "/spec":
            return f"{self._spec_path}{len(self._store.endpoints())} endpoint(s)"
        if line.startswith("/flows"):
            query = line[len("/flows"):].strip()
            records = self._store.search(query) if query else self._store.endpoints()
            return "\n".join(
                f"{r.signature.method} {r.signature.path_template}" for r in records
            ) or "(no endpoints yet)"
        return self._agent.run_turn(line)

    def run(self) -> None:  # pragma: no cover - interactive loop
        print(HELP)
        while True:
            try:
                line = input("> ")
            except (EOFError, KeyboardInterrupt):
                print()
                break
            out = self.handle(line)
            if out is None:
                break
            if out:
                print(out)
  • Step 4: Add a REPL unit test

Create tests/test_repl.py:

from auto_reverse.models import CapturedFlow
from auto_reverse.repl import Repl
from auto_reverse.store import FlowStore, ScopeFilter


class FakeAgent:
    def run_turn(self, msg):
        return f"agent saw: {msg}"


def _store():
    s = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
    s.ingest(CapturedFlow(
        method="GET", host="ex.com", path="/api/users", query={}, req_headers={},
        req_body=None, status=200, resp_headers={}, resp_body=None, timestamp=0.0,
    ))
    return s


def test_quit_returns_none():
    repl = Repl(FakeAgent(), _store(), "openapi.yaml")
    assert repl.handle("/quit") is None


def test_flows_lists_endpoints():
    repl = Repl(FakeAgent(), _store(), "openapi.yaml")
    assert "/api/users" in repl.handle("/flows")


def test_plain_text_goes_to_agent():
    repl = Repl(FakeAgent(), _store(), "openapi.yaml")
    assert repl.handle("map users") == "agent saw: map users"

Run: uv run pytest tests/test_repl.py -q Expected: PASS (3 tests).

  • Step 5: Implement cli.py

Create src/auto_reverse/cli.py:

from __future__ import annotations

import argparse
import os
import sys
import threading
from datetime import datetime, timezone
from pathlib import Path

from anthropic import Anthropic

from auto_reverse.agent import Agent
from auto_reverse.browser import Browser
from auto_reverse.config import Config
from auto_reverse.doc.client import generate_client
from auto_reverse.doc.engine import DocEngine
from auto_reverse.proxy import ProxyServer
from auto_reverse.repl import Repl
from auto_reverse.store import FlowStore, ScopeFilter
from auto_reverse.tools import build_registry

SYSTEM_PROMPT = """\
You are auto-reverse, an assistant that reverse-engineers a website's API.
Drive the browser toward the user's stated intent using the browser_* tools.
After actions, inspect captured traffic with flows_search and enrich notable
endpoints with doc_document (give a short summary, description, and tag).
Pursue the intent to a sensible depth, then summarize what you found and ask
what to do next. Be concise.
"""


def _parse_args(argv: list[str]) -> Config:
    p = argparse.ArgumentParser(prog="auto-reverse")
    p.add_argument("target_url")
    p.add_argument("--out")
    p.add_argument("--proxy-port", type=int, default=8080)
    p.add_argument("--headless", action="store_true")
    p.add_argument("--profile")
    p.add_argument("--gen-client", action="store_true")
    p.add_argument("--model", default="claude-opus-4-8")
    p.add_argument("--scope", default="")
    p.add_argument("--no-llm-doc", action="store_true")
    p.add_argument("--resume")
    a = p.parse_args(argv)
    return Config(
        target_url=a.target_url,
        out_dir=a.out,
        proxy_port=a.proxy_port,
        headless=a.headless,
        profile=a.profile,
        gen_client=a.gen_client,
        model=a.model,
        scope_hosts={h for h in a.scope.split(",") if h},
        no_llm_doc=a.no_llm_doc,
        resume=a.resume,
    )


def run(argv: list[str] | None = None) -> int:
    cfg = _parse_args(argv if argv is not None else sys.argv[1:])
    out_dir = Path(
        cfg.out_dir
        or f"./auto-reverse-out/{cfg.target_host}-{datetime.now(timezone.utc):%Y%m%d-%H%M%S}"
    )
    out_dir.mkdir(parents=True, exist_ok=True)

    scope = ScopeFilter(target_hosts=cfg.all_scope_hosts())
    title = f"{cfg.target_host} API"

    engine_box: dict[str, DocEngine] = {}

    def on_new(sig) -> None:
        engine = engine_box.get("engine")
        if engine is not None:
            threading.Thread(target=engine.document, args=(sig,), daemon=True).start()

    store = FlowStore(scope, on_new_signature=on_new)
    engine = DocEngine(store, out_dir=out_dir, title=title, use_llm=not cfg.no_llm_doc)
    engine_box["engine"] = engine

    proxy = ProxyServer(store, archive_path=out_dir / "archive.log", port=cfg.proxy_port)
    proxy.start()

    browser = Browser(proxy_port=cfg.proxy_port, headless=cfg.headless)
    browser.start()
    browser.navigate(cfg.target_url)

    if cfg.auth == "manual" and not cfg.headless:
        input("Log in if needed, then press Enter to begin exploration... ")

    client = Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
    registry = build_registry(browser, store, engine)
    agent = Agent(client, registry, model=cfg.model, system=SYSTEM_PROMPT)
    repl = Repl(agent, store, spec_path=str(out_dir / "openapi.yaml"))

    try:
        repl.run()
    finally:
        browser.stop()
        proxy.stop()
        if cfg.gen_client:
            ok = generate_client(out_dir / "openapi.yaml", out_dir / "client")
            print("client generated" if ok else "client generation skipped/failed")
    print(f"Outputs in {out_dir}")
    return 0
  • Step 6: Wire main()

Replace src/auto_reverse/__init__.py:

from auto_reverse.cli import run


def main() -> None:
    raise SystemExit(run())
  • Step 7: Add a CLI arg-parsing unit test

Create tests/test_cli.py:

from auto_reverse.cli import _parse_args


def test_parse_minimal():
    cfg = _parse_args(["https://app.example.com"])
    assert cfg.target_url == "https://app.example.com"
    assert cfg.model == "claude-opus-4-8"
    assert cfg.headless is False


def test_parse_scope_and_flags():
    cfg = _parse_args(["https://x.com", "--scope", "a.com,b.com", "--headless", "--gen-client"])
    assert cfg.scope_hosts == {"a.com", "b.com"}
    assert cfg.headless is True
    assert cfg.gen_client is True

Run: uv run pytest tests/test_cli.py tests/test_repl.py -q Expected: PASS.

  • Step 8: Run the full suite

Run: uv run pytest -q Expected: all non-skipped tests PASS. Browser/proxy/E2E tests may SKIP if Chromium/proxy unavailable in this environment.

  • Step 9: Lint and type-check

Run:

uv run ruff check src tests
uv run pyright src

Expected: ruff clean; pyright clean (fix any strict-mode complaints — add annotations as needed). Where third-party libs lack stubs (mitmproxy/playwright accessed via Any), confirm those are isolated to proxy.py/browser.py as designed.

  • Step 10: Commit
git add src/auto_reverse tests/test_e2e_smoke.py tests/test_repl.py tests/test_cli.py
git commit -m "feat: REPL and CLI wiring; end-to-end capture-to-spec smoke test"

Task 16: README and manual verification

Files:

  • Modify: README.md

  • Step 1: Write README.md

Replace README.md with usage: install (uv sync, uv run playwright install chromium), the free-threaded fallback note, ANTHROPIC_API_KEY requirement, example invocation (uv run auto-reverse https://example.com), the REPL commands, and where outputs land (auto-reverse-out/<host>-<ts>/openapi.yaml, API.md, archive.log).

  • Step 2: Manual end-to-end check (requires API key + browser)

Run:

ANTHROPIC_API_KEY=... uv run auto-reverse https://<a-real-test-site> --headless

Then type an intent like list the public endpoints and confirm openapi.yaml + API.md appear in the output dir. (Skip if no key/site available; the automated E2E smoke already covers the capture→spec path.)

  • Step 3: Commit
git add README.md
git commit -m "docs: README with setup, usage, and free-threading notes"

Self-Review Notes (already applied)

  • Spec coverage: hybrid control (manual-pause prompt + pause_for_human), single free-threaded process (proxy thread + doc-worker threads + main agent), dedup signature (Task 2/4), scope filtering (Task 3), OpenAPI + markdown + raw archive outputs (Tasks 6/7/9/10), optional client gen (Task 14), --resume/--no-llm-doc/--profile flags parsed (Task 15). Note: --resume and --profile are parsed and stored but full reuse logic is left minimal in v1 — flagged here as a known partial; deepen if needed.
  • Type consistency: EndpointRecord field names (summary/description/tag/request_schema/response_schema/documented/sample_count/query_params) are used identically across store, engine, openapi, markdown, tools. Tool names (browser_navigate, browser_click, browser_type, browser_snapshot, flows_search, doc_document) are consistent between registry and agent tests.
  • LLM enrichment: v1 wires deterministic schema inference + agent-driven doc_document enrichment; an automatic per-signature LLM call inside DocEngine is intentionally deferred (the agent enriches via the tool), keeping use_llm as the on/off switch and --no-llm-doc honored. This satisfies the spec's "LLM enrich only on novelty" via the agent loop rather than a second hidden LLM consumer.