Files
auto-reverse/docs/superpowers/plans/2026-05-31-auto-reverse.md
T
tomatocream a484ea4f8e docs: add auto-reverse implementation plan
16 bite-sized TDD tasks, bottom-up: core models/store/schema/doc,
then proxy/browser integration, then tools/agent/REPL/CLI, plus
end-to-end capture-to-spec smoke test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-31 23:26:42 +08:00

2451 lines
76 KiB
Markdown

# auto-reverse Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Build `auto-reverse`, a conversational CLI that reverse-engineers a website's API by driving a headed browser with an LLM while an embedded mitmproxy captures and documents real traffic into a live OpenAPI spec + markdown.
**Architecture:** Single free-threaded Python 3.14 process. A main-thread Claude tool-use agent ("the brain") acts on a Playwright headed browser and queries/commands a thread-safe Flow Store; an embedded mitmproxy `DumpMaster` runs in its own thread capturing every flow; a Doc Worker thread turns new endpoint *signatures* into OpenAPI/markdown (deterministic schema inference + LLM enrichment only on novelty). Build bottom-up: pure core (store/schema/doc) first, then I/O integration (proxy/browser), then tools/agent/REPL/CLI.
**Tech Stack:** Python 3.14 (free-threaded), uv, Playwright, mitmproxy, anthropic SDK, genson (schema inference), pytest + pytest-asyncio. Default model `claude-opus-4-8`.
**Spec:** `docs/superpowers/specs/2026-05-31-auto-reverse-design.md`
---
## File Structure
```
src/auto_reverse/
__init__.py # main() entrypoint (delegates to cli.run)
cli.py # arg parsing, wiring, thread lifecycle
config.py # Config dataclass + pluggable auth stub
models.py # CapturedFlow, Signature, EndpointRecord, helpers
store.py # FlowStore (thread-safe), ScopeFilter, path templating
proxy.py # embedded mitmproxy master + CaptureAddon + archive
browser.py # Playwright headed browser wrapper + take-over
agent.py # Claude tool-use loop
repl.py # chat loop + /meta-commands + take-over
tools/
__init__.py # tool registry assembly
browser_tools.py # browser.* tool schemas + handlers
flows_tools.py # flows.* tool schemas + handlers
doc_tools.py # doc.* tool schemas + handlers
doc/
__init__.py
schema.py # deterministic JSON Schema inference + merge (genson wrap)
engine.py # DocEngine: consumes new-signature events, writes outputs
openapi.py # OpenAPI assembly from EndpointRecords
markdown.py # human-readable API.md rendering
client.py # optional --gen-client codegen
tests/
conftest.py # fixtures: fixture HTTP site, sample flows
fixture_site.py # stdlib http.server JSON app for integration tests
test_models.py
test_store.py
test_scope.py
test_schema.py
test_openapi.py
test_markdown.py
test_config.py
test_proxy.py
test_browser.py # requires playwright browsers; skipped if absent
test_agent.py # mocked anthropic client
test_tools.py
test_e2e_smoke.py # fixture site end-to-end; skipped if browsers absent
```
---
## Task 0: Dependencies and test scaffolding
**Files:**
- Modify: `pyproject.toml`
- Create: `tests/__init__.py`, `tests/conftest.py`, `tests/fixture_site.py`
- [ ] **Step 1: Add runtime + dev dependencies**
Run:
```bash
cd /home/df/projects/reverse_engineer
uv add playwright mitmproxy anthropic genson
uv add --dev pytest pytest-asyncio
```
Expected: `pyproject.toml` gains a populated `dependencies` list and a `[dependency-groups]`/dev group; `uv.lock` updates. If any package lacks a free-threaded 3.14 wheel and the resolve fails, re-run with the GIL interpreter selected (`uv add --python 3.14 ...`) and record the fallback in `README.md` per the spec's free-threading caveat.
- [ ] **Step 2: Install Playwright's Chromium**
Run:
```bash
uv run playwright install chromium
```
Expected: downloads the Chromium build. If it fails in this environment, that only affects browser/E2E tests (which are guarded to skip); core tasks proceed regardless.
- [ ] **Step 3: Add pytest config to `pyproject.toml`**
Append:
```toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
```
- [ ] **Step 4: Create the stdlib fixture site**
Create `tests/fixture_site.py`:
```python
"""A tiny dependency-free JSON site for integration tests, served over HTTP."""
from __future__ import annotations
import json
import threading
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
class _Handler(BaseHTTPRequestHandler):
def log_message(self, *args: object) -> None: # silence test output
pass
def _send_json(self, status: int, payload: object) -> None:
body = json.dumps(payload).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self) -> None:
if self.path == "/":
html = b"<html><body><script>fetch('/api/users')</script></body></html>"
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.send_header("Content-Length", str(len(html)))
self.end_headers()
self.wfile.write(html)
elif self.path == "/api/users":
self._send_json(200, [{"id": 1, "name": "Ada"}])
elif self.path.startswith("/api/users/"):
self._send_json(200, {"id": int(self.path.rsplit("/", 1)[1]), "name": "Ada"})
else:
self._send_json(404, {"error": "not found"})
def do_POST(self) -> None:
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length) if length else b"{}"
self._send_json(201, {"received": json.loads(raw or b"{}")})
def start_fixture_site() -> tuple[ThreadingHTTPServer, str]:
"""Start the site on an ephemeral port; return (server, base_url)."""
server = ThreadingHTTPServer(("127.0.0.1", 0), _Handler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
host, port = server.server_address
return server, f"http://{host}:{port}"
```
- [ ] **Step 5: Create test package marker and conftest fixtures**
Create `tests/__init__.py` (empty).
Create `tests/conftest.py`:
```python
from __future__ import annotations
from collections.abc import Iterator
import pytest
from tests.fixture_site import start_fixture_site
@pytest.fixture
def fixture_site() -> Iterator[str]:
server, base_url = start_fixture_site()
try:
yield base_url
finally:
server.shutdown()
```
- [ ] **Step 6: Verify the toolchain runs**
Run:
```bash
uv run pytest -q
```
Expected: pytest collects 0 tests and exits 0 (no tests yet). Confirms config is valid.
- [ ] **Step 7: Commit**
```bash
git add pyproject.toml uv.lock tests/
git commit -m "build: add deps and test scaffolding for auto-reverse"
```
---
## Task 1: Core data models
**Files:**
- Create: `src/auto_reverse/models.py`
- Test: `tests/test_models.py`
- [ ] **Step 1: Write failing tests**
Create `tests/test_models.py`:
```python
from auto_reverse.models import CapturedFlow, Signature, status_class
def test_status_class_buckets():
assert status_class(200) == "2xx"
assert status_class(201) == "2xx"
assert status_class(404) == "4xx"
assert status_class(503) == "5xx"
def test_signature_is_hashable_and_equal():
a = Signature("GET", "ex.com", "/api/users/{id}", "2xx")
b = Signature("GET", "ex.com", "/api/users/{id}", "2xx")
assert a == b
assert {a, b} == {a}
def test_captured_flow_json_body_parsing():
flow = CapturedFlow(
method="POST", host="ex.com", path="/api/x", query={},
req_headers={"content-type": "application/json"}, req_body=b'{"a": 1}',
status=201, resp_headers={"content-type": "application/json"},
resp_body=b'{"ok": true}', timestamp=0.0,
)
assert flow.request_json() == {"a": 1}
assert flow.response_json() == {"ok": True}
def test_captured_flow_non_json_body_returns_none():
flow = CapturedFlow(
method="GET", host="ex.com", path="/x", query={},
req_headers={}, req_body=None, status=200,
resp_headers={"content-type": "text/html"}, resp_body=b"<html>",
timestamp=0.0,
)
assert flow.response_json() is None
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `uv run pytest tests/test_models.py -q`
Expected: FAIL — `ModuleNotFoundError: No module named 'auto_reverse.models'`.
- [ ] **Step 3: Implement `models.py`**
Create `src/auto_reverse/models.py`:
```python
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any
def status_class(status: int) -> str:
return f"{status // 100}xx"
@dataclass(frozen=True)
class Signature:
method: str
host: str
path_template: str
status_class: str
@dataclass
class CapturedFlow:
method: str
host: str
path: str
query: dict[str, list[str]]
req_headers: dict[str, str]
req_body: bytes | None
status: int
resp_headers: dict[str, str]
resp_body: bytes | None
timestamp: float
def _json(self, body: bytes | None, headers: dict[str, str]) -> Any | None:
if body is None:
return None
ctype = headers.get("content-type", "").lower()
if "json" not in ctype:
return None
try:
return json.loads(body)
except (ValueError, UnicodeDecodeError):
return None
def request_json(self) -> Any | None:
return self._json(self.req_body, self.req_headers)
def response_json(self) -> Any | None:
return self._json(self.resp_body, self.resp_headers)
@dataclass
class EndpointRecord:
signature: Signature
sample_count: int = 0
query_params: set[str] = field(default_factory=set)
request_schema: dict[str, Any] | None = None
response_schema: dict[str, Any] | None = None
# LLM-enriched fields (filled by the doc engine):
summary: str = ""
description: str = ""
tag: str = ""
documented: bool = False
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `uv run pytest tests/test_models.py -q`
Expected: PASS (4 tests).
- [ ] **Step 5: Commit**
```bash
git add src/auto_reverse/models.py tests/test_models.py
git commit -m "feat: core data models (Signature, CapturedFlow, EndpointRecord)"
```
---
## Task 2: Path templating
**Files:**
- Modify: `src/auto_reverse/store.py` (create)
- Test: `tests/test_store.py`
- [ ] **Step 1: Write failing tests**
Create `tests/test_store.py`:
```python
from auto_reverse.store import path_template
def test_collapses_numeric_ids():
assert path_template("/api/users/4812/orders/99") == "/api/users/{id}/orders/{id}"
def test_collapses_uuid():
p = "/api/items/550e8400-e29b-41d4-a716-446655440000"
assert path_template(p) == "/api/items/{id}"
def test_collapses_long_hex_token():
assert path_template("/files/a1b2c3d4e5f60718293a4b5c") == "/files/{id}"
def test_keeps_short_words():
assert path_template("/api/users/me/settings") == "/api/users/me/settings"
def test_root_and_empty():
assert path_template("/") == "/"
assert path_template("") == "/"
```
- [ ] **Step 2: Run to verify fail**
Run: `uv run pytest tests/test_store.py -q`
Expected: FAIL — `ModuleNotFoundError: No module named 'auto_reverse.store'`.
- [ ] **Step 3: Implement `path_template` in `store.py`**
Create `src/auto_reverse/store.py`:
```python
from __future__ import annotations
import re
_UUID = re.compile(r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$")
_HEX_TOKEN = re.compile(r"^[0-9a-fA-F]{16,}$")
_LONG_OPAQUE = re.compile(r"^[A-Za-z0-9_\-]{20,}$")
def _is_variable(segment: str) -> bool:
if segment.isdigit():
return True
if _UUID.match(segment):
return True
if _HEX_TOKEN.match(segment):
return True
if _LONG_OPAQUE.match(segment) and any(c.isdigit() for c in segment):
return True
return False
def path_template(path: str) -> str:
"""Collapse variable path segments (ids, UUIDs, hashes, opaque tokens) to {id}."""
if not path or path == "/":
return "/"
parts = path.split("/")
out = ["{id}" if part and _is_variable(part) else part for part in parts]
return "/".join(out)
```
- [ ] **Step 4: Run to verify pass**
Run: `uv run pytest tests/test_store.py -q`
Expected: PASS (5 tests).
- [ ] **Step 5: Commit**
```bash
git add src/auto_reverse/store.py tests/test_store.py
git commit -m "feat: path templating for endpoint signatures"
```
---
## Task 3: Scope filter
**Files:**
- Modify: `src/auto_reverse/store.py`
- Test: `tests/test_scope.py`
- [ ] **Step 1: Write failing tests**
Create `tests/test_scope.py`:
```python
from auto_reverse.models import CapturedFlow
from auto_reverse.store import ScopeFilter
def _flow(host: str, path: str, ctype: str = "application/json") -> CapturedFlow:
return CapturedFlow(
method="GET", host=host, path=path, query={}, req_headers={},
req_body=None, status=200, resp_headers={"content-type": ctype},
resp_body=b"{}", timestamp=0.0,
)
def test_target_host_in_scope():
f = ScopeFilter(target_hosts={"app.example.com"})
assert f.is_in_scope(_flow("app.example.com", "/api/users"))
def test_other_host_out_of_scope():
f = ScopeFilter(target_hosts={"app.example.com"})
assert not f.is_in_scope(_flow("cdn.other.com", "/x"))
def test_static_asset_dropped():
f = ScopeFilter(target_hosts={"app.example.com"})
assert not f.is_in_scope(_flow("app.example.com", "/main.js", "application/javascript"))
def test_analytics_host_dropped_by_default():
f = ScopeFilter(target_hosts={"app.example.com"})
assert not f.is_in_scope(_flow("www.google-analytics.com", "/collect"))
def test_extra_allow_host():
f = ScopeFilter(target_hosts={"app.example.com"}, allow_hosts={"api.example.com"})
assert f.is_in_scope(_flow("api.example.com", "/v1/data"))
def test_explicit_deny_overrides():
f = ScopeFilter(target_hosts={"app.example.com"}, deny_hosts={"app.example.com"})
assert not f.is_in_scope(_flow("app.example.com", "/api/users"))
```
- [ ] **Step 2: Run to verify fail**
Run: `uv run pytest tests/test_scope.py -q`
Expected: FAIL — `ImportError: cannot import name 'ScopeFilter'`.
- [ ] **Step 3: Append `ScopeFilter` to `store.py`**
Add to `src/auto_reverse/store.py`:
```python
from auto_reverse.models import CapturedFlow
_ASSET_SUFFIXES = (".js", ".mjs", ".css", ".png", ".jpg", ".jpeg", ".gif",
".svg", ".woff", ".woff2", ".ttf", ".ico", ".map", ".webp")
_DEFAULT_ANALYTICS = frozenset({
"www.google-analytics.com", "google-analytics.com", "analytics.google.com",
"stats.g.doubleclick.net", "api.segment.io", "cdn.segment.com",
"browser.sentry-cdn.com", "js.stripe.com",
})
class ScopeFilter:
def __init__(
self,
target_hosts: set[str],
allow_hosts: set[str] | None = None,
deny_hosts: set[str] | None = None,
) -> None:
self.target_hosts = set(target_hosts)
self.allow_hosts = set(allow_hosts or set())
self.deny_hosts = set(deny_hosts or set())
def is_in_scope(self, flow: CapturedFlow) -> bool:
host = flow.host
if host in self.deny_hosts:
return False
if host in _DEFAULT_ANALYTICS:
return False
if host not in self.target_hosts and host not in self.allow_hosts:
return False
if flow.path.split("?")[0].lower().endswith(_ASSET_SUFFIXES):
return False
ctype = flow.resp_headers.get("content-type", "").lower()
if ctype.startswith(("text/css", "image/", "font/", "application/javascript")):
return False
return True
```
- [ ] **Step 4: Run to verify pass**
Run: `uv run pytest tests/test_scope.py -q`
Expected: PASS (6 tests).
- [ ] **Step 5: Commit**
```bash
git add src/auto_reverse/store.py tests/test_scope.py
git commit -m "feat: scope filter (host allow/deny, asset and analytics drop)"
```
---
## Task 4: Flow Store (thread-safe dedup)
**Files:**
- Modify: `src/auto_reverse/store.py`
- Test: `tests/test_store.py`
- [ ] **Step 1: Write failing tests**
Append to `tests/test_store.py`:
```python
from auto_reverse.models import CapturedFlow, Signature
from auto_reverse.store import FlowStore, ScopeFilter
def _post(host: str, path: str, body: bytes) -> CapturedFlow:
return CapturedFlow(
method="POST", host=host, path=path, query={},
req_headers={"content-type": "application/json"}, req_body=body,
status=201, resp_headers={"content-type": "application/json"},
resp_body=b'{"ok": true}', timestamp=0.0,
)
def test_ingest_new_signature_returns_true_once():
store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
assert store.ingest(_post("ex.com", "/api/cart/1", b'{"q": 1}')).is_new is True
# same template, different id -> not new
assert store.ingest(_post("ex.com", "/api/cart/2", b'{"q": 2}')).is_new is False
assert len(store.endpoints()) == 1
def test_out_of_scope_flow_ignored():
store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
result = store.ingest(_post("other.com", "/x", b"{}"))
assert result.is_new is False
assert result.in_scope is False
assert store.endpoints() == []
def test_new_signature_callback_fires_with_signature():
seen: list[Signature] = []
store = FlowStore(ScopeFilter(target_hosts={"ex.com"}), on_new_signature=seen.append)
store.ingest(_post("ex.com", "/api/cart/1", b"{}"))
store.ingest(_post("ex.com", "/api/cart/2", b"{}"))
assert len(seen) == 1
assert seen[0].path_template == "/api/cart/{id}"
def test_search_filters_by_substring():
store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
store.ingest(_post("ex.com", "/api/cart/1", b"{}"))
store.ingest(_post("ex.com", "/api/login", b"{}"))
results = store.search("cart")
assert len(results) == 1
assert results[0].signature.path_template == "/api/cart/{id}"
```
- [ ] **Step 2: Run to verify fail**
Run: `uv run pytest tests/test_store.py -q`
Expected: FAIL — `ImportError: cannot import name 'FlowStore'`.
- [ ] **Step 3: Append `FlowStore` and `IngestResult` to `store.py`**
Add to `src/auto_reverse/store.py`:
```python
import threading
from collections.abc import Callable
from dataclasses import dataclass
from urllib.parse import urlsplit
from auto_reverse.models import EndpointRecord, Signature, status_class
MAX_SAMPLES = 5
@dataclass
class IngestResult:
in_scope: bool
is_new: bool
signature: Signature | None
class FlowStore:
"""Thread-safe store: dedup by signature, retain bounded samples per endpoint."""
def __init__(
self,
scope: ScopeFilter,
on_new_signature: Callable[[Signature], None] | None = None,
) -> None:
self._scope = scope
self._on_new = on_new_signature
self._lock = threading.Lock()
self._records: dict[Signature, EndpointRecord] = {}
self._samples: dict[Signature, list[CapturedFlow]] = {}
def signature_of(self, flow: CapturedFlow) -> Signature:
return Signature(
method=flow.method.upper(),
host=flow.host,
path_template=path_template(flow.path),
status_class=status_class(flow.status),
)
def ingest(self, flow: CapturedFlow) -> IngestResult:
if not self._scope.is_in_scope(flow):
return IngestResult(in_scope=False, is_new=False, signature=None)
sig = self.signature_of(flow)
with self._lock:
is_new = sig not in self._records
if is_new:
self._records[sig] = EndpointRecord(signature=sig)
self._samples[sig] = []
record = self._records[sig]
record.sample_count += 1
record.query_params.update(self._query_keys(flow))
samples = self._samples[sig]
if len(samples) < MAX_SAMPLES:
samples.append(flow)
if is_new and self._on_new is not None:
self._on_new(sig)
return IngestResult(in_scope=True, is_new=is_new, signature=sig)
@staticmethod
def _query_keys(flow: CapturedFlow) -> set[str]:
return set(flow.query.keys())
def endpoints(self) -> list[EndpointRecord]:
with self._lock:
return list(self._records.values())
def samples(self, sig: Signature) -> list[CapturedFlow]:
with self._lock:
return list(self._samples.get(sig, []))
def get(self, sig: Signature) -> EndpointRecord | None:
with self._lock:
return self._records.get(sig)
def search(self, query: str) -> list[EndpointRecord]:
q = query.lower()
with self._lock:
return [
r for r in self._records.values()
if q in r.signature.path_template.lower()
or q in r.signature.method.lower()
]
```
Note: `urlsplit` import is reserved for proxy use; keep it only if used — if pyright flags it unused here, remove it. (The capture addon, Task 8, splits URLs.)
- [ ] **Step 4: Run to verify pass**
Run: `uv run pytest tests/test_store.py -q`
Expected: PASS (9 tests total in file).
- [ ] **Step 5: Lint check**
Run: `uv run ruff check src/auto_reverse/store.py`
Expected: clean (remove any unused import it reports).
- [ ] **Step 6: Commit**
```bash
git add src/auto_reverse/store.py tests/test_store.py
git commit -m "feat: thread-safe FlowStore with signature dedup and samples"
```
---
## Task 5: Deterministic schema inference
**Files:**
- Create: `src/auto_reverse/doc/__init__.py`, `src/auto_reverse/doc/schema.py`
- Test: `tests/test_schema.py`
- [ ] **Step 1: Write failing tests**
Create `tests/test_schema.py`:
```python
from auto_reverse.doc.schema import SchemaAccumulator
def test_single_object_schema():
acc = SchemaAccumulator()
acc.add({"id": 1, "name": "Ada"})
schema = acc.schema()
assert schema["type"] == "object"
assert set(schema["properties"]) == {"id", "name"}
def test_merge_widens_optional_fields():
acc = SchemaAccumulator()
acc.add({"id": 1, "name": "Ada"})
acc.add({"id": 2}) # name missing -> becomes optional
schema = acc.schema()
assert "id" in schema.get("required", [])
assert "name" not in schema.get("required", [])
def test_array_schema():
acc = SchemaAccumulator()
acc.add([{"id": 1}, {"id": 2}])
schema = acc.schema()
assert schema["type"] == "array"
assert schema["items"]["type"] == "object"
def test_empty_accumulator_returns_none():
assert SchemaAccumulator().schema() is None
```
- [ ] **Step 2: Run to verify fail**
Run: `uv run pytest tests/test_schema.py -q`
Expected: FAIL — `ModuleNotFoundError: No module named 'auto_reverse.doc'`.
- [ ] **Step 3: Implement schema wrapper**
Create `src/auto_reverse/doc/__init__.py` (empty).
Create `src/auto_reverse/doc/schema.py`:
```python
from __future__ import annotations
from typing import Any
from genson import SchemaBuilder
class SchemaAccumulator:
"""Accumulate JSON samples into a widening JSON Schema (genson-backed)."""
def __init__(self) -> None:
self._builder = SchemaBuilder()
self._count = 0
def add(self, value: Any) -> None:
self._builder.add_object(value)
self._count += 1
def schema(self) -> dict[str, Any] | None:
if self._count == 0:
return None
result: dict[str, Any] = self._builder.to_schema()
result.pop("$schema", None)
return result
```
- [ ] **Step 4: Run to verify pass**
Run: `uv run pytest tests/test_schema.py -q`
Expected: PASS (4 tests).
- [ ] **Step 5: Commit**
```bash
git add src/auto_reverse/doc/__init__.py src/auto_reverse/doc/schema.py tests/test_schema.py
git commit -m "feat: deterministic JSON schema inference (genson wrapper)"
```
---
## Task 6: OpenAPI assembly
**Files:**
- Create: `src/auto_reverse/doc/openapi.py`
- Test: `tests/test_openapi.py`
- [ ] **Step 1: Write failing tests**
Create `tests/test_openapi.py`:
```python
from auto_reverse.doc.openapi import build_openapi
from auto_reverse.models import EndpointRecord, Signature
def _record(method: str, template: str, **kw) -> EndpointRecord:
rec = EndpointRecord(signature=Signature(method, "ex.com", template, "2xx"))
for k, v in kw.items():
setattr(rec, k, v)
return rec
def test_builds_paths_and_methods():
records = [
_record("GET", "/api/users", summary="List users",
response_schema={"type": "array"}),
_record("POST", "/api/users", summary="Create user",
request_schema={"type": "object"}),
]
spec = build_openapi(records, title="ex.com API")
assert spec["openapi"].startswith("3.")
assert spec["info"]["title"] == "ex.com API"
assert set(spec["paths"]["/api/users"]) == {"get", "post"}
assert spec["paths"]["/api/users"]["get"]["summary"] == "List users"
def test_path_param_declared_for_template():
rec = _record("GET", "/api/users/{id}", summary="Get user")
spec = build_openapi([rec], title="x")
params = spec["paths"]["/api/users/{id}"]["get"]["parameters"]
assert any(p["name"] == "id" and p["in"] == "path" for p in params)
def test_request_body_included_when_schema_present():
rec = _record("POST", "/api/x", request_schema={"type": "object"})
op = build_openapi([rec], title="x")["paths"]["/api/x"]["post"]
assert op["requestBody"]["content"]["application/json"]["schema"] == {"type": "object"}
```
- [ ] **Step 2: Run to verify fail**
Run: `uv run pytest tests/test_openapi.py -q`
Expected: FAIL — `ModuleNotFoundError: No module named 'auto_reverse.doc.openapi'`.
- [ ] **Step 3: Implement `build_openapi`**
Create `src/auto_reverse/doc/openapi.py`:
```python
from __future__ import annotations
import re
from typing import Any
from auto_reverse.models import EndpointRecord
_PARAM = re.compile(r"\{([^}]+)\}")
def _path_params(template: str) -> list[dict[str, Any]]:
return [
{"name": name, "in": "path", "required": True, "schema": {"type": "string"}}
for name in _PARAM.findall(template)
]
def _operation(rec: EndpointRecord) -> dict[str, Any]:
op: dict[str, Any] = {}
if rec.summary:
op["summary"] = rec.summary
if rec.description:
op["description"] = rec.description
if rec.tag:
op["tags"] = [rec.tag]
params = _path_params(rec.signature.path_template)
params += [
{"name": q, "in": "query", "required": False, "schema": {"type": "string"}}
for q in sorted(rec.query_params)
]
if params:
op["parameters"] = params
if rec.request_schema is not None:
op["requestBody"] = {
"content": {"application/json": {"schema": rec.request_schema}}
}
status = rec.signature.status_class.replace("x", "X")
response: dict[str, Any] = {"description": rec.summary or "Response"}
if rec.response_schema is not None:
response["content"] = {"application/json": {"schema": rec.response_schema}}
op["responses"] = {status[0] + "XX": response}
return op
def build_openapi(records: list[EndpointRecord], title: str) -> dict[str, Any]:
paths: dict[str, dict[str, Any]] = {}
for rec in records:
template = rec.signature.path_template
method = rec.signature.method.lower()
paths.setdefault(template, {})[method] = _operation(rec)
return {
"openapi": "3.1.0",
"info": {"title": title, "version": "0.0.0"},
"paths": paths,
}
```
- [ ] **Step 4: Run to verify pass**
Run: `uv run pytest tests/test_openapi.py -q`
Expected: PASS (3 tests).
- [ ] **Step 5: Commit**
```bash
git add src/auto_reverse/doc/openapi.py tests/test_openapi.py
git commit -m "feat: OpenAPI assembly from endpoint records"
```
---
## Task 7: Markdown rendering
**Files:**
- Create: `src/auto_reverse/doc/markdown.py`
- Test: `tests/test_markdown.py`
- [ ] **Step 1: Write failing tests**
Create `tests/test_markdown.py`:
```python
from auto_reverse.doc.markdown import render_markdown
from auto_reverse.models import EndpointRecord, Signature
def _rec(method, template, **kw):
rec = EndpointRecord(signature=Signature(method, "ex.com", template, "2xx"))
for k, v in kw.items():
setattr(rec, k, v)
return rec
def test_renders_heading_and_endpoints():
md = render_markdown([_rec("GET", "/api/users", summary="List users")], title="ex.com API")
assert "# ex.com API" in md
assert "`GET /api/users`" in md
assert "List users" in md
def test_groups_by_tag():
records = [
_rec("GET", "/api/users", tag="Users"),
_rec("GET", "/api/cart", tag="Cart"),
]
md = render_markdown(records, title="x")
assert "## Users" in md
assert "## Cart" in md
def test_untagged_go_under_general():
md = render_markdown([_rec("GET", "/api/x")], title="x")
assert "## General" in md
```
- [ ] **Step 2: Run to verify fail**
Run: `uv run pytest tests/test_markdown.py -q`
Expected: FAIL — module not found.
- [ ] **Step 3: Implement `render_markdown`**
Create `src/auto_reverse/doc/markdown.py`:
```python
from __future__ import annotations
from collections import defaultdict
from auto_reverse.models import EndpointRecord
def render_markdown(records: list[EndpointRecord], title: str) -> str:
groups: dict[str, list[EndpointRecord]] = defaultdict(list)
for rec in records:
groups[rec.tag or "General"].append(rec)
lines = [f"# {title}", ""]
for tag in sorted(groups):
lines.append(f"## {tag}")
lines.append("")
for rec in sorted(groups[tag], key=lambda r: r.signature.path_template):
sig = rec.signature
lines.append(f"### `{sig.method} {sig.path_template}`")
if rec.summary:
lines.append(f"**{rec.summary}**")
if rec.description:
lines.append("")
lines.append(rec.description)
lines.append(f"\n_Seen {rec.sample_count} time(s)._")
lines.append("")
return "\n".join(lines)
```
- [ ] **Step 4: Run to verify pass**
Run: `uv run pytest tests/test_markdown.py -q`
Expected: PASS (3 tests).
- [ ] **Step 5: Commit**
```bash
git add src/auto_reverse/doc/markdown.py tests/test_markdown.py
git commit -m "feat: markdown API documentation rendering"
```
---
## Task 8: Config and pluggable auth stub
**Files:**
- Create: `src/auto_reverse/config.py`
- Test: `tests/test_config.py`
- [ ] **Step 1: Write failing tests**
Create `tests/test_config.py`:
```python
import pytest
from auto_reverse.config import Config, ManualPauseAuth, NoAuth, make_auth
def test_config_derives_target_host():
cfg = Config(target_url="https://app.example.com/dashboard")
assert cfg.target_host == "app.example.com"
def test_config_scope_hosts_includes_target_plus_extra():
cfg = Config(target_url="https://app.example.com", scope_hosts={"api.example.com"})
assert cfg.all_scope_hosts() == {"app.example.com", "api.example.com"}
def test_default_model():
assert Config(target_url="https://x.com").model == "claude-opus-4-8"
def test_make_auth_returns_strategy():
assert isinstance(make_auth("manual"), ManualPauseAuth)
assert isinstance(make_auth("none"), NoAuth)
def test_make_auth_unknown_raises():
with pytest.raises(ValueError):
make_auth("oauth-magic")
```
- [ ] **Step 2: Run to verify fail**
Run: `uv run pytest tests/test_config.py -q`
Expected: FAIL — module not found.
- [ ] **Step 3: Implement `config.py`**
Create `src/auto_reverse/config.py`:
```python
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Protocol
from urllib.parse import urlsplit
@dataclass
class Config:
target_url: str
out_dir: str | None = None
proxy_port: int = 8080
headless: bool = False
profile: str | None = None
gen_client: bool = False
model: str = "claude-opus-4-8"
scope_hosts: set[str] = field(default_factory=set)
no_llm_doc: bool = False
resume: str | None = None
auth: str = "manual"
@property
def target_host(self) -> str:
return urlsplit(self.target_url).hostname or ""
def all_scope_hosts(self) -> set[str]:
return {self.target_host, *self.scope_hosts}
class AuthStrategy(Protocol):
name: str
async def authenticate(self, page: object) -> None:
"""Prepare an authenticated session on the given Playwright page."""
...
class NoAuth:
name = "none"
async def authenticate(self, page: object) -> None:
return None
class ManualPauseAuth:
"""Default stub: pause so the human can log in by hand, then continue."""
name = "manual"
async def authenticate(self, page: object) -> None:
# Implemented against the real page in browser.py wiring; the stub
# simply records intent. The REPL prompts the user to log in and
# press enter before autonomous exploration begins.
return None
def make_auth(name: str) -> AuthStrategy:
strategies: dict[str, AuthStrategy] = {"manual": ManualPauseAuth(), "none": NoAuth()}
if name not in strategies:
raise ValueError(f"unknown auth strategy: {name!r}")
return strategies[name]
```
- [ ] **Step 4: Run to verify pass**
Run: `uv run pytest tests/test_config.py -q`
Expected: PASS (5 tests).
- [ ] **Step 5: Commit**
```bash
git add src/auto_reverse/config.py tests/test_config.py
git commit -m "feat: Config and pluggable auth stub (manual/none)"
```
---
## Task 9: Embedded proxy + capture addon
**Files:**
- Create: `src/auto_reverse/proxy.py`
- Test: `tests/test_proxy.py`
The capture addon converts a mitmproxy `HTTPFlow` into our `CapturedFlow`, feeds the store, and appends raw flows to an archive. We unit-test the pure conversion (`flow_from_mitm`) with a fake mitmproxy-shaped object so no proxy needs to run; the live proxy is exercised in the E2E smoke (Task 15).
- [ ] **Step 1: Write failing tests**
Create `tests/test_proxy.py`:
```python
from types import SimpleNamespace
from auto_reverse.proxy import flow_from_mitm
def _fake_mitm_flow():
request = SimpleNamespace(
method="POST", pretty_host="ex.com", path="/api/users?role=admin",
headers={"content-type": "application/json"}, content=b'{"name": "Ada"}',
query=SimpleNamespace(fields=[("role", "admin")]),
)
response = SimpleNamespace(
status_code=201, headers={"content-type": "application/json"},
content=b'{"id": 1}',
)
return SimpleNamespace(request=request, response=response, timestamp_start=1.5)
def test_flow_from_mitm_maps_fields():
captured = flow_from_mitm(_fake_mitm_flow())
assert captured.method == "POST"
assert captured.host == "ex.com"
assert captured.path == "/api/users"
assert captured.query == {"role": ["admin"]}
assert captured.status == 201
assert captured.request_json() == {"name": "Ada"}
assert captured.response_json() == {"id": 1}
def test_flow_from_mitm_handles_missing_response():
flow = _fake_mitm_flow()
flow.response = None
captured = flow_from_mitm(flow)
assert captured is None
```
- [ ] **Step 2: Run to verify fail**
Run: `uv run pytest tests/test_proxy.py -q`
Expected: FAIL — module not found.
- [ ] **Step 3: Implement `proxy.py`**
Create `src/auto_reverse/proxy.py`:
```python
from __future__ import annotations
import asyncio
import threading
from pathlib import Path
from typing import Any
from urllib.parse import urlsplit
from auto_reverse.models import CapturedFlow
from auto_reverse.store import FlowStore
def flow_from_mitm(flow: Any) -> CapturedFlow | None:
"""Convert a mitmproxy HTTPFlow (or test double) into a CapturedFlow."""
if flow.response is None:
return None
req = flow.request
query: dict[str, list[str]] = {}
for key, value in req.query.fields:
query.setdefault(key, []).append(value)
return CapturedFlow(
method=req.method,
host=req.pretty_host,
path=urlsplit(req.path).path,
query=query,
req_headers={k.lower(): v for k, v in dict(req.headers).items()},
req_body=req.content,
status=flow.response.status_code,
resp_headers={k.lower(): v for k, v in dict(flow.response.headers).items()},
resp_body=flow.response.content,
timestamp=getattr(flow, "timestamp_start", 0.0) or 0.0,
)
class CaptureAddon:
"""mitmproxy addon: on each response, ingest into the store + archive raw."""
def __init__(self, store: FlowStore, archive_path: Path) -> None:
self._store = store
self._archive = archive_path
self._archive.parent.mkdir(parents=True, exist_ok=True)
def response(self, flow: Any) -> None: # mitmproxy hook name
captured = flow_from_mitm(flow)
if captured is None:
return
self._store.ingest(captured)
with self._archive.open("a") as fh:
fh.write(f"{captured.method} {captured.host}{captured.path} {captured.status}\n")
class ProxyServer:
"""Run mitmproxy's DumpMaster in a dedicated thread with its own loop."""
def __init__(self, store: FlowStore, archive_path: Path, port: int) -> None:
self._store = store
self._archive_path = archive_path
self._port = port
self._master: Any = None
self._loop: asyncio.AbstractEventLoop | None = None
self._thread: threading.Thread | None = None
@property
def port(self) -> int:
return self._port
def start(self) -> None:
ready = threading.Event()
self._thread = threading.Thread(target=self._run, args=(ready,), daemon=True)
self._thread.start()
ready.wait(timeout=10)
def _run(self, ready: threading.Event) -> None:
from mitmproxy.options import Options
from mitmproxy.tools.dump import DumpMaster
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
opts = Options(listen_host="127.0.0.1", listen_port=self._port)
self._master = DumpMaster(opts, with_termlog=False, with_dumper=False)
self._master.addons.add(CaptureAddon(self._store, self._archive_path))
async def _serve() -> None:
ready.set()
await self._master.run()
self._loop.run_until_complete(_serve())
def stop(self) -> None:
if self._master is not None and self._loop is not None:
self._loop.call_soon_threadsafe(self._master.shutdown)
```
- [ ] **Step 4: Run to verify pass**
Run: `uv run pytest tests/test_proxy.py -q`
Expected: PASS (2 tests).
- [ ] **Step 5: Commit**
```bash
git add src/auto_reverse/proxy.py tests/test_proxy.py
git commit -m "feat: embedded mitmproxy capture addon and proxy server"
```
---
## Task 10: Doc engine (event consumer)
**Files:**
- Create: `src/auto_reverse/doc/engine.py`
- Test: extend `tests/test_schema.py` is wrong target — create `tests/test_engine.py`
- [ ] **Step 1: Write failing tests**
Create `tests/test_engine.py`:
```python
from pathlib import Path
from auto_reverse.doc.engine import DocEngine
from auto_reverse.models import CapturedFlow
from auto_reverse.store import FlowStore, ScopeFilter
def _flow(path: str, resp: bytes) -> CapturedFlow:
return CapturedFlow(
method="GET", host="ex.com", path=path, query={}, req_headers={},
req_body=None, status=200,
resp_headers={"content-type": "application/json"}, resp_body=resp,
timestamp=0.0,
)
def test_engine_writes_spec_and_markdown(tmp_path: Path):
store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
engine = DocEngine(store, out_dir=tmp_path, title="ex.com API", use_llm=False)
store.ingest(_flow("/api/users", b'[{"id": 1}]'))
sig = store.endpoints()[0].signature
engine.document(sig)
spec = (tmp_path / "openapi.yaml").read_text()
assert "/api/users" in spec
assert (tmp_path / "API.md").read_text().startswith("# ex.com API")
def test_engine_infers_response_schema(tmp_path: Path):
store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
engine = DocEngine(store, out_dir=tmp_path, title="x", use_llm=False)
store.ingest(_flow("/api/users", b'[{"id": 1, "name": "Ada"}]'))
sig = store.endpoints()[0].signature
engine.document(sig)
rec = store.get(sig)
assert rec is not None
assert rec.response_schema is not None
assert rec.response_schema["type"] == "array"
```
- [ ] **Step 2: Run to verify fail**
Run: `uv run pytest tests/test_engine.py -q`
Expected: FAIL — module not found.
- [ ] **Step 3: Implement `engine.py`**
Create `src/auto_reverse/doc/engine.py`:
```python
from __future__ import annotations
import threading
from pathlib import Path
import yaml # provided transitively by mitmproxy (ruamel/pyyaml); see note
from auto_reverse.doc.markdown import render_markdown
from auto_reverse.doc.openapi import build_openapi
from auto_reverse.doc.schema import SchemaAccumulator
from auto_reverse.models import Signature
from auto_reverse.store import FlowStore
class DocEngine:
"""Turn a new endpoint signature into inferred schemas + written outputs."""
def __init__(
self, store: FlowStore, out_dir: Path, title: str, use_llm: bool = True
) -> None:
self._store = store
self._out = out_dir
self._title = title
self._use_llm = use_llm
self._lock = threading.Lock()
self._out.mkdir(parents=True, exist_ok=True)
def document(self, sig: Signature) -> None:
record = self._store.get(sig)
if record is None:
return
req_acc = SchemaAccumulator()
resp_acc = SchemaAccumulator()
for flow in self._store.samples(sig):
rj = flow.request_json()
if rj is not None:
req_acc.add(rj)
sj = flow.response_json()
if sj is not None:
resp_acc.add(sj)
record.request_schema = req_acc.schema()
record.response_schema = resp_acc.schema()
if not record.summary:
record.summary = f"{sig.method} {sig.path_template}"
record.documented = True
self._write()
def _write(self) -> None:
with self._lock:
records = self._store.endpoints()
spec = build_openapi(records, title=self._title)
(self._out / "openapi.yaml").write_text(yaml.safe_dump(spec, sort_keys=False))
(self._out / "API.md").write_text(render_markdown(records, title=self._title))
```
**Note on `yaml`:** if `import yaml` fails (PyYAML not present transitively), add it explicitly: `uv add pyyaml`, then re-run. Do this in Step 4 if needed.
- [ ] **Step 4: Run to verify pass**
Run: `uv run pytest tests/test_engine.py -q`
Expected: PASS (2 tests). If `ModuleNotFoundError: yaml`, run `uv add pyyaml` and re-run.
- [ ] **Step 5: Commit**
```bash
git add src/auto_reverse/doc/engine.py tests/test_engine.py pyproject.toml uv.lock
git commit -m "feat: doc engine writes openapi.yaml and API.md from samples"
```
---
## Task 11: Browser wrapper
**Files:**
- Create: `src/auto_reverse/browser.py`
- Test: `tests/test_browser.py`
The browser wrapper uses Playwright's sync API (runs cleanly in a worker thread, separate from the proxy's asyncio loop). The test is guarded to skip when browsers are not installed, and routes through the live fixture site (no proxy, direct) to verify navigation + snapshot.
- [ ] **Step 1: Write failing test**
Create `tests/test_browser.py`:
```python
import pytest
playwright = pytest.importorskip("playwright.sync_api")
from auto_reverse.browser import Browser # noqa: E402
@pytest.fixture
def browser():
try:
b = Browser(proxy_port=None, headless=True)
b.start()
except Exception as exc: # browser binary missing, etc.
pytest.skip(f"browser unavailable: {exc}")
yield b
b.stop()
def test_navigate_and_snapshot(browser, fixture_site):
browser.navigate(fixture_site + "/")
snap = browser.snapshot()
assert snap["url"].endswith("/")
assert "title" in snap
```
- [ ] **Step 2: Run to verify fail**
Run: `uv run pytest tests/test_browser.py -q`
Expected: FAIL — module not found (or SKIP if Playwright import missing). A skip here is acceptable; the implementation step still applies.
- [ ] **Step 3: Implement `browser.py`**
Create `src/auto_reverse/browser.py`:
```python
from __future__ import annotations
from typing import Any
class Browser:
"""Headed (or headless) Playwright Chromium, optionally routed via proxy."""
def __init__(self, proxy_port: int | None, headless: bool = False) -> None:
self._proxy_port = proxy_port
self._headless = headless
self._pw: Any = None
self._browser: Any = None
self._page: Any = None
def start(self) -> None:
from playwright.sync_api import sync_playwright
self._pw = sync_playwright().start()
launch_kwargs: dict[str, Any] = {
"headless": self._headless,
"args": ["--ignore-certificate-errors"],
}
if self._proxy_port is not None:
launch_kwargs["proxy"] = {"server": f"http://127.0.0.1:{self._proxy_port}"}
self._browser = self._pw.chromium.launch(**launch_kwargs)
context = self._browser.new_context(ignore_https_errors=True)
self._page = context.new_page()
def navigate(self, url: str) -> dict[str, Any]:
self._page.goto(url, wait_until="networkidle")
return self.snapshot()
def click(self, selector: str) -> dict[str, Any]:
self._page.click(selector, timeout=5000)
self._page.wait_for_load_state("networkidle")
return self.snapshot()
def type_text(self, selector: str, text: str) -> dict[str, Any]:
self._page.fill(selector, text)
return self.snapshot()
def snapshot(self) -> dict[str, Any]:
"""Compact view for the agent: url, title, and visible interactive elements."""
elements = self._page.eval_on_selector_all(
"a, button, input, [role=button], [role=link]",
"""els => els.slice(0, 40).map(e => ({
tag: e.tagName.toLowerCase(),
text: (e.innerText || e.value || e.getAttribute('aria-label') || '').slice(0, 60),
id: e.id || null,
}))""",
)
return {
"url": self._page.url,
"title": self._page.title(),
"elements": elements,
}
def pause_for_human(self) -> None:
"""Surface the headed browser for manual control (Playwright inspector)."""
self._page.pause()
def stop(self) -> None:
if self._browser is not None:
self._browser.close()
if self._pw is not None:
self._pw.stop()
```
- [ ] **Step 4: Run to verify pass (or skip)**
Run: `uv run pytest tests/test_browser.py -q`
Expected: PASS if Chromium is installed; SKIP otherwise. Both are acceptable to proceed.
- [ ] **Step 5: Commit**
```bash
git add src/auto_reverse/browser.py tests/test_browser.py
git commit -m "feat: Playwright browser wrapper with compact snapshot"
```
---
## Task 12: Tool definitions and handlers
**Files:**
- Create: `src/auto_reverse/tools/__init__.py`, `src/auto_reverse/tools/browser_tools.py`, `src/auto_reverse/tools/flows_tools.py`, `src/auto_reverse/tools/doc_tools.py`
- Test: `tests/test_tools.py`
Tools are plain callables plus an Anthropic tool schema. Each handler takes a JSON-able `input` dict and returns a JSON-able result. The registry maps tool name → (schema, handler). Browser handlers use a fake browser in tests.
- [ ] **Step 1: Write failing tests**
Create `tests/test_tools.py`:
```python
from auto_reverse.doc.engine import DocEngine
from auto_reverse.models import CapturedFlow
from auto_reverse.store import FlowStore, ScopeFilter
from auto_reverse.tools import build_registry
class FakeBrowser:
def navigate(self, url):
return {"url": url, "title": "T", "elements": []}
def click(self, selector):
return {"url": "u", "title": "T", "elements": []}
def type_text(self, selector, text):
return {"url": "u", "title": "T", "elements": []}
def snapshot(self):
return {"url": "u", "title": "T", "elements": []}
def _store_with_endpoint(tmp_path):
store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
store.ingest(CapturedFlow(
method="GET", host="ex.com", path="/api/users", query={}, req_headers={},
req_body=None, status=200,
resp_headers={"content-type": "application/json"}, resp_body=b"[]",
timestamp=0.0,
))
engine = DocEngine(store, out_dir=tmp_path, title="x", use_llm=False)
return store, engine
def test_registry_has_expected_tools(tmp_path):
store, engine = _store_with_endpoint(tmp_path)
reg = build_registry(FakeBrowser(), store, engine)
names = {schema["name"] for schema, _ in reg.values()}
assert {"browser_navigate", "browser_click", "flows_search", "doc_document"} <= names
def test_flows_search_handler_returns_matches(tmp_path):
store, engine = _store_with_endpoint(tmp_path)
reg = build_registry(FakeBrowser(), store, engine)
_, handler = reg["flows_search"]
result = handler({"query": "users"})
assert any("/api/users" in ep["path"] for ep in result["endpoints"])
def test_browser_navigate_handler(tmp_path):
store, engine = _store_with_endpoint(tmp_path)
reg = build_registry(FakeBrowser(), store, engine)
_, handler = reg["browser_navigate"]
result = handler({"url": "http://x"})
assert result["url"] == "http://x"
```
- [ ] **Step 2: Run to verify fail**
Run: `uv run pytest tests/test_tools.py -q`
Expected: FAIL — module not found.
- [ ] **Step 3: Implement tool modules**
Create `src/auto_reverse/tools/browser_tools.py`:
```python
from __future__ import annotations
from collections.abc import Callable
from typing import Any
Handler = Callable[[dict[str, Any]], dict[str, Any]]
def browser_tools(browser: Any) -> dict[str, tuple[dict[str, Any], Handler]]:
return {
"browser_navigate": (
{
"name": "browser_navigate",
"description": "Navigate the browser to a URL and return a page snapshot.",
"input_schema": {
"type": "object",
"properties": {"url": {"type": "string"}},
"required": ["url"],
},
},
lambda inp: browser.navigate(inp["url"]),
),
"browser_click": (
{
"name": "browser_click",
"description": "Click an element by CSS selector; returns a new snapshot.",
"input_schema": {
"type": "object",
"properties": {"selector": {"type": "string"}},
"required": ["selector"],
},
},
lambda inp: browser.click(inp["selector"]),
),
"browser_type": (
{
"name": "browser_type",
"description": "Fill a form field (CSS selector) with text.",
"input_schema": {
"type": "object",
"properties": {
"selector": {"type": "string"},
"text": {"type": "string"},
},
"required": ["selector", "text"],
},
},
lambda inp: browser.type_text(inp["selector"], inp["text"]),
),
"browser_snapshot": (
{
"name": "browser_snapshot",
"description": "Return the current page snapshot without acting.",
"input_schema": {"type": "object", "properties": {}},
},
lambda inp: browser.snapshot(),
),
}
```
Create `src/auto_reverse/tools/flows_tools.py`:
```python
from __future__ import annotations
from collections.abc import Callable
from typing import Any
from auto_reverse.models import EndpointRecord
from auto_reverse.store import FlowStore
Handler = Callable[[dict[str, Any]], dict[str, Any]]
def _record_view(rec: EndpointRecord) -> dict[str, Any]:
return {
"method": rec.signature.method,
"path": rec.signature.path_template,
"status": rec.signature.status_class,
"samples": rec.sample_count,
"documented": rec.documented,
}
def flows_tools(store: FlowStore) -> dict[str, tuple[dict[str, Any], Handler]]:
def search(inp: dict[str, Any]) -> dict[str, Any]:
query = inp.get("query", "")
records = store.search(query) if query else store.endpoints()
return {"endpoints": [_record_view(r) for r in records]}
return {
"flows_search": (
{
"name": "flows_search",
"description": "List/search discovered API endpoints captured so far.",
"input_schema": {
"type": "object",
"properties": {"query": {"type": "string"}},
},
},
search,
),
}
```
Create `src/auto_reverse/tools/doc_tools.py`:
```python
from __future__ import annotations
from collections.abc import Callable
from typing import Any
from auto_reverse.doc.engine import DocEngine
from auto_reverse.store import FlowStore
Handler = Callable[[dict[str, Any]], dict[str, Any]]
def doc_tools(store: FlowStore, engine: DocEngine) -> dict[str, tuple[dict[str, Any], Handler]]:
def document(inp: dict[str, Any]) -> dict[str, Any]:
path = inp["path_template"]
for rec in store.endpoints():
if rec.signature.path_template == path:
if inp.get("summary"):
rec.summary = inp["summary"]
if inp.get("description"):
rec.description = inp["description"]
if inp.get("tag"):
rec.tag = inp["tag"]
engine.document(rec.signature)
return {"documented": path}
return {"error": f"no endpoint matching {path}"}
return {
"doc_document": (
{
"name": "doc_document",
"description": (
"Enrich and (re)write docs for an endpoint by path template, "
"optionally setting a human summary/description/tag."
),
"input_schema": {
"type": "object",
"properties": {
"path_template": {"type": "string"},
"summary": {"type": "string"},
"description": {"type": "string"},
"tag": {"type": "string"},
},
"required": ["path_template"],
},
},
document,
),
}
```
Create `src/auto_reverse/tools/__init__.py`:
```python
from __future__ import annotations
from collections.abc import Callable
from typing import Any
from auto_reverse.doc.engine import DocEngine
from auto_reverse.store import FlowStore
from auto_reverse.tools.browser_tools import browser_tools
from auto_reverse.tools.doc_tools import doc_tools
from auto_reverse.tools.flows_tools import flows_tools
Handler = Callable[[dict[str, Any]], dict[str, Any]]
Registry = dict[str, tuple[dict[str, Any], Handler]]
def build_registry(browser: Any, store: FlowStore, engine: DocEngine) -> Registry:
registry: Registry = {}
registry.update(browser_tools(browser))
registry.update(flows_tools(store))
registry.update(doc_tools(store, engine))
return registry
def tool_schemas(registry: Registry) -> list[dict[str, Any]]:
return [schema for schema, _ in registry.values()]
```
- [ ] **Step 4: Run to verify pass**
Run: `uv run pytest tests/test_tools.py -q`
Expected: PASS (3 tests).
- [ ] **Step 5: Commit**
```bash
git add src/auto_reverse/tools/ tests/test_tools.py
git commit -m "feat: agent tool definitions (browser, flows, doc) and registry"
```
---
## Task 13: Agent (Claude tool-use loop)
**Files:**
- Create: `src/auto_reverse/agent.py`
- Test: `tests/test_agent.py`
The agent runs the tool-use loop: send messages + tools to Claude, execute any `tool_use` blocks via the registry, feed `tool_result`s back, repeat until the model returns text with no tool calls. Tested with a fake client returning scripted responses.
- [ ] **Step 1: Write failing tests**
Create `tests/test_agent.py`:
```python
from types import SimpleNamespace
from auto_reverse.agent import Agent
def _text_block(text):
return SimpleNamespace(type="text", text=text)
def _tool_use(tool_id, name, inp):
return SimpleNamespace(type="tool_use", id=tool_id, name=name, input=inp)
class FakeMessages:
def __init__(self, scripted):
self._scripted = list(scripted)
self.calls = []
def create(self, **kwargs):
self.calls.append(kwargs)
content = self._scripted.pop(0)
stop = "tool_use" if any(b.type == "tool_use" for b in content) else "end_turn"
return SimpleNamespace(content=content, stop_reason=stop, role="assistant")
class FakeClient:
def __init__(self, scripted):
self.messages = FakeMessages(scripted)
def test_agent_executes_tool_then_returns_text():
scripted = [
[_tool_use("t1", "flows_search", {"query": "users"})],
[_text_block("Found the users endpoint.")],
]
client = FakeClient(scripted)
registry = {
"flows_search": (
{"name": "flows_search", "input_schema": {"type": "object"}},
lambda inp: {"endpoints": [{"path": "/api/users"}]},
)
}
agent = Agent(client, registry, model="m", system="s")
reply = agent.run_turn("map users")
assert "users endpoint" in reply
# the tool result was fed back: second create call has >= 3 messages
assert len(client.messages.calls[1]["messages"]) >= 3
def test_agent_plain_text_no_tools():
client = FakeClient([[_text_block("Hello!")]])
agent = Agent(client, {}, model="m", system="s")
assert agent.run_turn("hi") == "Hello!"
```
- [ ] **Step 2: Run to verify fail**
Run: `uv run pytest tests/test_agent.py -q`
Expected: FAIL — module not found.
- [ ] **Step 3: Implement `agent.py`**
Create `src/auto_reverse/agent.py`:
```python
from __future__ import annotations
from typing import Any
from auto_reverse.tools import Registry, tool_schemas
MAX_ITERATIONS = 25
class Agent:
"""Conversational Claude tool-use loop driving browser/flows/doc tools."""
def __init__(self, client: Any, registry: Registry, model: str, system: str) -> None:
self._client = client
self._registry = registry
self._model = model
self._system = system
self._messages: list[dict[str, Any]] = []
def run_turn(self, user_message: str) -> str:
self._messages.append({"role": "user", "content": user_message})
for _ in range(MAX_ITERATIONS):
response = self._client.messages.create(
model=self._model,
max_tokens=4096,
system=self._system,
tools=tool_schemas(self._registry),
messages=self._messages,
)
self._messages.append(
{"role": "assistant", "content": self._serialize(response.content)}
)
tool_uses = [b for b in response.content if b.type == "tool_use"]
if not tool_uses:
return self._text_of(response.content)
results = []
for block in tool_uses:
results.append(self._run_tool(block))
self._messages.append({"role": "user", "content": results})
return "(stopped: reached max tool iterations)"
def _run_tool(self, block: Any) -> dict[str, Any]:
entry = self._registry.get(block.name)
if entry is None:
output: Any = {"error": f"unknown tool {block.name}"}
else:
_, handler = entry
try:
output = handler(block.input)
except Exception as exc: # tool failure -> structured error, agent re-plans
output = {"error": str(exc)}
return {
"type": "tool_result",
"tool_use_id": block.id,
"content": __import__("json").dumps(output),
}
@staticmethod
def _serialize(content: list[Any]) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for b in content:
if b.type == "text":
out.append({"type": "text", "text": b.text})
elif b.type == "tool_use":
out.append(
{"type": "tool_use", "id": b.id, "name": b.name, "input": b.input}
)
return out
@staticmethod
def _text_of(content: list[Any]) -> str:
return "".join(b.text for b in content if b.type == "text").strip()
```
Note: replace `__import__("json")` with a top-level `import json` and `json.dumps(output)` — written inline here only to keep the snippet self-contained. Add `import json` to the imports and use `json.dumps(output)`.
- [ ] **Step 4: Apply the json import cleanup**
Edit `src/auto_reverse/agent.py`: add `import json` under `from __future__`, and change the `content` line to `"content": json.dumps(output),`.
- [ ] **Step 5: Run to verify pass**
Run: `uv run pytest tests/test_agent.py -q`
Expected: PASS (2 tests).
- [ ] **Step 6: Commit**
```bash
git add src/auto_reverse/agent.py tests/test_agent.py
git commit -m "feat: Claude tool-use agent loop with graceful tool-error handling"
```
---
## Task 14: Optional client generation
**Files:**
- Create: `src/auto_reverse/doc/client.py`
- Test: covered by manual verification + a unit test on the command builder
- [ ] **Step 1: Write failing test**
Create `tests/test_client.py`:
```python
from pathlib import Path
from auto_reverse.doc.client import client_gen_command
def test_command_references_spec_and_out(tmp_path: Path):
spec = tmp_path / "openapi.yaml"
out = tmp_path / "client"
cmd = client_gen_command(spec, out)
assert str(spec) in cmd
assert "openapi-python-client" in " ".join(cmd)
```
- [ ] **Step 2: Run to verify fail**
Run: `uv run pytest tests/test_client.py -q`
Expected: FAIL — module not found.
- [ ] **Step 3: Implement `client.py`**
Create `src/auto_reverse/doc/client.py`:
```python
from __future__ import annotations
import subprocess
from pathlib import Path
def client_gen_command(spec_path: Path, out_dir: Path) -> list[str]:
return [
"uvx",
"openapi-python-client",
"generate",
"--path",
str(spec_path),
"--output-path",
str(out_dir),
]
def generate_client(spec_path: Path, out_dir: Path) -> bool:
"""Run the deterministic codegen; returns True on success."""
try:
subprocess.run(client_gen_command(spec_path, out_dir), check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
```
- [ ] **Step 4: Run to verify pass**
Run: `uv run pytest tests/test_client.py -q`
Expected: PASS (1 test).
- [ ] **Step 5: Commit**
```bash
git add src/auto_reverse/doc/client.py tests/test_client.py
git commit -m "feat: optional typed client generation from openapi spec"
```
---
## Task 15: REPL and CLI wiring
**Files:**
- Create: `src/auto_reverse/repl.py`, `src/auto_reverse/cli.py`
- Modify: `src/auto_reverse/__init__.py`
- Test: `tests/test_e2e_smoke.py`
The REPL handles `/meta-commands` locally and routes plain text to the agent. The CLI parses args, builds the object graph, wires the `on_new_signature` callback to the doc engine on a worker thread, starts the proxy, launches the browser, and runs the REPL. End-to-end smoke drives the fixture site through the proxy and asserts an endpoint lands in the spec.
- [ ] **Step 1: Write failing E2E smoke test**
Create `tests/test_e2e_smoke.py`:
```python
import threading
from pathlib import Path
import pytest
playwright = pytest.importorskip("playwright.sync_api")
from auto_reverse.browser import Browser # noqa: E402
from auto_reverse.doc.engine import DocEngine # noqa: E402
from auto_reverse.proxy import ProxyServer # noqa: E402
from auto_reverse.store import FlowStore, ScopeFilter # noqa: E402
def test_capture_to_spec_end_to_end(tmp_path: Path, fixture_site: str):
from urllib.parse import urlsplit
host = urlsplit(fixture_site).hostname
port = urlsplit(fixture_site).port
scope = ScopeFilter(target_hosts={f"{host}:{port}", host})
engine_holder: dict = {}
def on_new(sig):
engine_holder["engine"].document(sig)
store = FlowStore(scope, on_new_signature=on_new)
engine = DocEngine(store, out_dir=tmp_path, title="fixture", use_llm=False)
engine_holder["engine"] = engine
proxy = ProxyServer(store, archive_path=tmp_path / "archive.log", port=0)
try:
proxy.start()
except Exception as exc:
pytest.skip(f"proxy unavailable: {exc}")
try:
browser = Browser(proxy_port=proxy.port, headless=True)
browser.start()
except Exception as exc:
proxy.stop()
pytest.skip(f"browser unavailable: {exc}")
try:
browser.navigate(fixture_site + "/") # triggers fetch('/api/users')
# allow capture to settle
threading.Event().wait(1.0)
assert any(
"/api/users" in r.signature.path_template for r in store.endpoints()
)
finally:
browser.stop()
proxy.stop()
```
Note: `ProxyServer(port=0)` needs to expose the OS-assigned port. If mitmproxy does not support port 0 ephemeral selection, pick a fixed high port (e.g. 18080) in this test and pass it to both proxy and browser.
- [ ] **Step 2: Run to verify fail or skip**
Run: `uv run pytest tests/test_e2e_smoke.py -q`
Expected: FAIL (imports resolve but modules incomplete) or SKIP (no browser/proxy). Acceptable to proceed; this test is the integration safety net.
- [ ] **Step 3: Implement `repl.py`**
Create `src/auto_reverse/repl.py`:
```python
from __future__ import annotations
from auto_reverse.agent import Agent
from auto_reverse.store import FlowStore
HELP = """\
Commands:
<text> state intent in natural language (sent to the agent)
/flows [q] list/search discovered endpoints (local)
/spec show spec path + endpoint count
/help this help
/quit exit
"""
class Repl:
def __init__(self, agent: Agent, store: FlowStore, spec_path: str) -> None:
self._agent = agent
self._store = store
self._spec_path = spec_path
def handle(self, line: str) -> str | None:
"""Process one input line. Returns output text, or None to signal quit."""
line = line.strip()
if not line:
return ""
if line in ("/quit", "/exit"):
return None
if line == "/help":
return HELP
if line == "/spec":
return f"{self._spec_path}{len(self._store.endpoints())} endpoint(s)"
if line.startswith("/flows"):
query = line[len("/flows"):].strip()
records = self._store.search(query) if query else self._store.endpoints()
return "\n".join(
f"{r.signature.method} {r.signature.path_template}" for r in records
) or "(no endpoints yet)"
return self._agent.run_turn(line)
def run(self) -> None: # pragma: no cover - interactive loop
print(HELP)
while True:
try:
line = input("> ")
except (EOFError, KeyboardInterrupt):
print()
break
out = self.handle(line)
if out is None:
break
if out:
print(out)
```
- [ ] **Step 4: Add a REPL unit test**
Create `tests/test_repl.py`:
```python
from auto_reverse.models import CapturedFlow
from auto_reverse.repl import Repl
from auto_reverse.store import FlowStore, ScopeFilter
class FakeAgent:
def run_turn(self, msg):
return f"agent saw: {msg}"
def _store():
s = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
s.ingest(CapturedFlow(
method="GET", host="ex.com", path="/api/users", query={}, req_headers={},
req_body=None, status=200, resp_headers={}, resp_body=None, timestamp=0.0,
))
return s
def test_quit_returns_none():
repl = Repl(FakeAgent(), _store(), "openapi.yaml")
assert repl.handle("/quit") is None
def test_flows_lists_endpoints():
repl = Repl(FakeAgent(), _store(), "openapi.yaml")
assert "/api/users" in repl.handle("/flows")
def test_plain_text_goes_to_agent():
repl = Repl(FakeAgent(), _store(), "openapi.yaml")
assert repl.handle("map users") == "agent saw: map users"
```
Run: `uv run pytest tests/test_repl.py -q`
Expected: PASS (3 tests).
- [ ] **Step 5: Implement `cli.py`**
Create `src/auto_reverse/cli.py`:
```python
from __future__ import annotations
import argparse
import os
import sys
import threading
from datetime import datetime, timezone
from pathlib import Path
from anthropic import Anthropic
from auto_reverse.agent import Agent
from auto_reverse.browser import Browser
from auto_reverse.config import Config
from auto_reverse.doc.client import generate_client
from auto_reverse.doc.engine import DocEngine
from auto_reverse.proxy import ProxyServer
from auto_reverse.repl import Repl
from auto_reverse.store import FlowStore, ScopeFilter
from auto_reverse.tools import build_registry
SYSTEM_PROMPT = """\
You are auto-reverse, an assistant that reverse-engineers a website's API.
Drive the browser toward the user's stated intent using the browser_* tools.
After actions, inspect captured traffic with flows_search and enrich notable
endpoints with doc_document (give a short summary, description, and tag).
Pursue the intent to a sensible depth, then summarize what you found and ask
what to do next. Be concise.
"""
def _parse_args(argv: list[str]) -> Config:
p = argparse.ArgumentParser(prog="auto-reverse")
p.add_argument("target_url")
p.add_argument("--out")
p.add_argument("--proxy-port", type=int, default=8080)
p.add_argument("--headless", action="store_true")
p.add_argument("--profile")
p.add_argument("--gen-client", action="store_true")
p.add_argument("--model", default="claude-opus-4-8")
p.add_argument("--scope", default="")
p.add_argument("--no-llm-doc", action="store_true")
p.add_argument("--resume")
a = p.parse_args(argv)
return Config(
target_url=a.target_url,
out_dir=a.out,
proxy_port=a.proxy_port,
headless=a.headless,
profile=a.profile,
gen_client=a.gen_client,
model=a.model,
scope_hosts={h for h in a.scope.split(",") if h},
no_llm_doc=a.no_llm_doc,
resume=a.resume,
)
def run(argv: list[str] | None = None) -> int:
cfg = _parse_args(argv if argv is not None else sys.argv[1:])
out_dir = Path(
cfg.out_dir
or f"./auto-reverse-out/{cfg.target_host}-{datetime.now(timezone.utc):%Y%m%d-%H%M%S}"
)
out_dir.mkdir(parents=True, exist_ok=True)
scope = ScopeFilter(target_hosts=cfg.all_scope_hosts())
title = f"{cfg.target_host} API"
engine_box: dict[str, DocEngine] = {}
def on_new(sig) -> None:
engine = engine_box.get("engine")
if engine is not None:
threading.Thread(target=engine.document, args=(sig,), daemon=True).start()
store = FlowStore(scope, on_new_signature=on_new)
engine = DocEngine(store, out_dir=out_dir, title=title, use_llm=not cfg.no_llm_doc)
engine_box["engine"] = engine
proxy = ProxyServer(store, archive_path=out_dir / "archive.log", port=cfg.proxy_port)
proxy.start()
browser = Browser(proxy_port=cfg.proxy_port, headless=cfg.headless)
browser.start()
browser.navigate(cfg.target_url)
if cfg.auth == "manual" and not cfg.headless:
input("Log in if needed, then press Enter to begin exploration... ")
client = Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
registry = build_registry(browser, store, engine)
agent = Agent(client, registry, model=cfg.model, system=SYSTEM_PROMPT)
repl = Repl(agent, store, spec_path=str(out_dir / "openapi.yaml"))
try:
repl.run()
finally:
browser.stop()
proxy.stop()
if cfg.gen_client:
ok = generate_client(out_dir / "openapi.yaml", out_dir / "client")
print("client generated" if ok else "client generation skipped/failed")
print(f"Outputs in {out_dir}")
return 0
```
- [ ] **Step 6: Wire `main()`**
Replace `src/auto_reverse/__init__.py`:
```python
from auto_reverse.cli import run
def main() -> None:
raise SystemExit(run())
```
- [ ] **Step 7: Add a CLI arg-parsing unit test**
Create `tests/test_cli.py`:
```python
from auto_reverse.cli import _parse_args
def test_parse_minimal():
cfg = _parse_args(["https://app.example.com"])
assert cfg.target_url == "https://app.example.com"
assert cfg.model == "claude-opus-4-8"
assert cfg.headless is False
def test_parse_scope_and_flags():
cfg = _parse_args(["https://x.com", "--scope", "a.com,b.com", "--headless", "--gen-client"])
assert cfg.scope_hosts == {"a.com", "b.com"}
assert cfg.headless is True
assert cfg.gen_client is True
```
Run: `uv run pytest tests/test_cli.py tests/test_repl.py -q`
Expected: PASS.
- [ ] **Step 8: Run the full suite**
Run: `uv run pytest -q`
Expected: all non-skipped tests PASS. Browser/proxy/E2E tests may SKIP if Chromium/proxy unavailable in this environment.
- [ ] **Step 9: Lint and type-check**
Run:
```bash
uv run ruff check src tests
uv run pyright src
```
Expected: ruff clean; pyright clean (fix any strict-mode complaints — add annotations as needed). Where third-party libs lack stubs (mitmproxy/playwright accessed via `Any`), confirm those are isolated to `proxy.py`/`browser.py` as designed.
- [ ] **Step 10: Commit**
```bash
git add src/auto_reverse tests/test_e2e_smoke.py tests/test_repl.py tests/test_cli.py
git commit -m "feat: REPL and CLI wiring; end-to-end capture-to-spec smoke test"
```
---
## Task 16: README and manual verification
**Files:**
- Modify: `README.md`
- [ ] **Step 1: Write `README.md`**
Replace `README.md` with usage: install (`uv sync`, `uv run playwright install chromium`), the free-threaded fallback note, `ANTHROPIC_API_KEY` requirement, example invocation (`uv run auto-reverse https://example.com`), the REPL commands, and where outputs land (`auto-reverse-out/<host>-<ts>/openapi.yaml`, `API.md`, `archive.log`).
- [ ] **Step 2: Manual end-to-end check (requires API key + browser)**
Run:
```bash
ANTHROPIC_API_KEY=... uv run auto-reverse https://<a-real-test-site> --headless
```
Then type an intent like `list the public endpoints` and confirm `openapi.yaml` + `API.md` appear in the output dir. (Skip if no key/site available; the automated E2E smoke already covers the capture→spec path.)
- [ ] **Step 3: Commit**
```bash
git add README.md
git commit -m "docs: README with setup, usage, and free-threading notes"
```
---
## Self-Review Notes (already applied)
- **Spec coverage:** hybrid control (manual-pause prompt + `pause_for_human`), single free-threaded process (proxy thread + doc-worker threads + main agent), dedup signature (Task 2/4), scope filtering (Task 3), OpenAPI + markdown + raw archive outputs (Tasks 6/7/9/10), optional client gen (Task 14), `--resume`/`--no-llm-doc`/`--profile` flags parsed (Task 15). Note: `--resume` and `--profile` are parsed and stored but full reuse logic is left minimal in v1 — flagged here as a known partial; deepen if needed.
- **Type consistency:** `EndpointRecord` field names (`summary`/`description`/`tag`/`request_schema`/`response_schema`/`documented`/`sample_count`/`query_params`) are used identically across store, engine, openapi, markdown, tools. Tool names (`browser_navigate`, `browser_click`, `browser_type`, `browser_snapshot`, `flows_search`, `doc_document`) are consistent between registry and agent tests.
- **LLM enrichment:** v1 wires deterministic schema inference + agent-driven `doc_document` enrichment; an automatic per-signature LLM call inside `DocEngine` is intentionally deferred (the agent enriches via the tool), keeping `use_llm` as the on/off switch and `--no-llm-doc` honored. This satisfies the spec's "LLM enrich only on novelty" via the agent loop rather than a second hidden LLM consumer.