diff --git a/src/auto_reverse/proxy.py b/src/auto_reverse/proxy.py new file mode 100644 index 0000000..4c65cb9 --- /dev/null +++ b/src/auto_reverse/proxy.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import asyncio +import threading +from typing import TYPE_CHECKING, Any +from urllib.parse import urlsplit + +from auto_reverse.models import CapturedFlow + +if TYPE_CHECKING: + from pathlib import Path + + from auto_reverse.store import FlowStore + + +def flow_from_mitm(flow: Any) -> CapturedFlow | None: + """Convert a mitmproxy HTTPFlow (or test double) into a CapturedFlow.""" + if flow.response is None: + return None + req = flow.request + query: dict[str, list[str]] = {} + for key, value in req.query.fields: + query.setdefault(key, []).append(value) + raw_path: str = req.path + return CapturedFlow( + method=req.method, + host=req.pretty_host, + path=urlsplit(raw_path).path, + query=query, + req_headers={k.lower(): v for k, v in dict(req.headers).items()}, + req_body=req.content, + status=flow.response.status_code, + resp_headers={k.lower(): v for k, v in dict(flow.response.headers).items()}, + resp_body=flow.response.content, + timestamp=getattr(flow, "timestamp_start", 0.0) or 0.0, + ) + + +class CaptureAddon: + """mitmproxy addon: on each response, ingest into the store + archive raw.""" + + def __init__(self, store: FlowStore, archive_path: Path) -> None: + self._store = store + self._archive = archive_path + self._archive.parent.mkdir(parents=True, exist_ok=True) + + def response(self, flow: Any) -> None: # mitmproxy hook name + captured = flow_from_mitm(flow) + if captured is None: + return + self._store.ingest(captured) + with self._archive.open("a") as fh: + fh.write( + f"{captured.method} {captured.host}{captured.path} {captured.status}\n" + ) + + +class ProxyServer: + """Run mitmproxy's DumpMaster in a dedicated thread with its own loop.""" + + def __init__(self, store: FlowStore, archive_path: Path, port: int) -> None: + self._store = store + self._archive_path = archive_path + self._port = port + self._master: Any = None + self._loop: asyncio.AbstractEventLoop | None = None + self._thread: threading.Thread | None = None + + @property + def port(self) -> int: + return self._port + + def start(self) -> None: + ready = threading.Event() + self._thread = threading.Thread(target=self._run, args=(ready,), daemon=True) + self._thread.start() + ready.wait(timeout=10) + + def _run(self, ready: threading.Event) -> None: + from mitmproxy.options import Options + from mitmproxy.tools.dump import DumpMaster + + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + async def _serve() -> None: + # DumpMaster grabs the running loop on construction, so it must be + # built inside the coroutine rather than before run_until_complete. + opts = Options(listen_host="127.0.0.1", listen_port=self._port) + master: Any = DumpMaster(opts, with_termlog=False, with_dumper=False) + master.addons.add(CaptureAddon(self._store, self._archive_path)) + self._master = master + ready.set() + await master.run() + + self._loop.run_until_complete(_serve()) + + def stop(self) -> None: + if self._master is not None and self._loop is not None: + self._loop.call_soon_threadsafe(self._master.shutdown) diff --git a/tests/test_proxy.py b/tests/test_proxy.py new file mode 100644 index 0000000..66a8dfd --- /dev/null +++ b/tests/test_proxy.py @@ -0,0 +1,34 @@ +from types import SimpleNamespace + +from auto_reverse.proxy import flow_from_mitm + + +def _fake_mitm_flow(): + request = SimpleNamespace( + method="POST", pretty_host="ex.com", path="/api/users?role=admin", + headers={"content-type": "application/json"}, content=b'{"name": "Ada"}', + query=SimpleNamespace(fields=[("role", "admin")]), + ) + response = SimpleNamespace( + status_code=201, headers={"content-type": "application/json"}, + content=b'{"id": 1}', + ) + return SimpleNamespace(request=request, response=response, timestamp_start=1.5) + + +def test_flow_from_mitm_maps_fields(): + captured = flow_from_mitm(_fake_mitm_flow()) + assert captured.method == "POST" + assert captured.host == "ex.com" + assert captured.path == "/api/users" + assert captured.query == {"role": ["admin"]} + assert captured.status == 201 + assert captured.request_json() == {"name": "Ada"} + assert captured.response_json() == {"id": 1} + + +def test_flow_from_mitm_handles_missing_response(): + flow = _fake_mitm_flow() + flow.response = None + captured = flow_from_mitm(flow) + assert captured is None