feat: embedded mitmproxy capture addon and proxy server

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-01 00:04:58 +08:00
parent c6d349c505
commit 20c7e0275f
2 changed files with 134 additions and 0 deletions
+100
View File
@@ -0,0 +1,100 @@
from __future__ import annotations
import asyncio
import threading
from typing import TYPE_CHECKING, Any
from urllib.parse import urlsplit
from auto_reverse.models import CapturedFlow
if TYPE_CHECKING:
from pathlib import Path
from auto_reverse.store import FlowStore
def flow_from_mitm(flow: Any) -> CapturedFlow | None:
"""Convert a mitmproxy HTTPFlow (or test double) into a CapturedFlow."""
if flow.response is None:
return None
req = flow.request
query: dict[str, list[str]] = {}
for key, value in req.query.fields:
query.setdefault(key, []).append(value)
raw_path: str = req.path
return CapturedFlow(
method=req.method,
host=req.pretty_host,
path=urlsplit(raw_path).path,
query=query,
req_headers={k.lower(): v for k, v in dict(req.headers).items()},
req_body=req.content,
status=flow.response.status_code,
resp_headers={k.lower(): v for k, v in dict(flow.response.headers).items()},
resp_body=flow.response.content,
timestamp=getattr(flow, "timestamp_start", 0.0) or 0.0,
)
class CaptureAddon:
"""mitmproxy addon: on each response, ingest into the store + archive raw."""
def __init__(self, store: FlowStore, archive_path: Path) -> None:
self._store = store
self._archive = archive_path
self._archive.parent.mkdir(parents=True, exist_ok=True)
def response(self, flow: Any) -> None: # mitmproxy hook name
captured = flow_from_mitm(flow)
if captured is None:
return
self._store.ingest(captured)
with self._archive.open("a") as fh:
fh.write(
f"{captured.method} {captured.host}{captured.path} {captured.status}\n"
)
class ProxyServer:
"""Run mitmproxy's DumpMaster in a dedicated thread with its own loop."""
def __init__(self, store: FlowStore, archive_path: Path, port: int) -> None:
self._store = store
self._archive_path = archive_path
self._port = port
self._master: Any = None
self._loop: asyncio.AbstractEventLoop | None = None
self._thread: threading.Thread | None = None
@property
def port(self) -> int:
return self._port
def start(self) -> None:
ready = threading.Event()
self._thread = threading.Thread(target=self._run, args=(ready,), daemon=True)
self._thread.start()
ready.wait(timeout=10)
def _run(self, ready: threading.Event) -> None:
from mitmproxy.options import Options
from mitmproxy.tools.dump import DumpMaster
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
async def _serve() -> None:
# DumpMaster grabs the running loop on construction, so it must be
# built inside the coroutine rather than before run_until_complete.
opts = Options(listen_host="127.0.0.1", listen_port=self._port)
master: Any = DumpMaster(opts, with_termlog=False, with_dumper=False)
master.addons.add(CaptureAddon(self._store, self._archive_path))
self._master = master
ready.set()
await master.run()
self._loop.run_until_complete(_serve())
def stop(self) -> None:
if self._master is not None and self._loop is not None:
self._loop.call_soon_threadsafe(self._master.shutdown)
+34
View File
@@ -0,0 +1,34 @@
from types import SimpleNamespace
from auto_reverse.proxy import flow_from_mitm
def _fake_mitm_flow():
request = SimpleNamespace(
method="POST", pretty_host="ex.com", path="/api/users?role=admin",
headers={"content-type": "application/json"}, content=b'{"name": "Ada"}',
query=SimpleNamespace(fields=[("role", "admin")]),
)
response = SimpleNamespace(
status_code=201, headers={"content-type": "application/json"},
content=b'{"id": 1}',
)
return SimpleNamespace(request=request, response=response, timestamp_start=1.5)
def test_flow_from_mitm_maps_fields():
captured = flow_from_mitm(_fake_mitm_flow())
assert captured.method == "POST"
assert captured.host == "ex.com"
assert captured.path == "/api/users"
assert captured.query == {"role": ["admin"]}
assert captured.status == 201
assert captured.request_json() == {"name": "Ada"}
assert captured.response_json() == {"id": 1}
def test_flow_from_mitm_handles_missing_response():
flow = _fake_mitm_flow()
flow.response = None
captured = flow_from_mitm(flow)
assert captured is None