diff --git a/src/auto_reverse/store.py b/src/auto_reverse/store.py index da0c662..7a310f6 100644 --- a/src/auto_reverse/store.py +++ b/src/auto_reverse/store.py @@ -1,6 +1,10 @@ from __future__ import annotations import re +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from auto_reverse.models import CapturedFlow _UUID = re.compile(r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$") _HEX_TOKEN = re.compile(r"^[0-9a-fA-F]{16,}$") @@ -24,3 +28,37 @@ def path_template(path: str) -> str: parts = path.split("/") out = ["{id}" if part and _is_variable(part) else part for part in parts] return "/".join(out) + + +_ASSET_SUFFIXES = (".js", ".mjs", ".css", ".png", ".jpg", ".jpeg", ".gif", + ".svg", ".woff", ".woff2", ".ttf", ".ico", ".map", ".webp") +_DEFAULT_ANALYTICS = frozenset({ + "www.google-analytics.com", "google-analytics.com", "analytics.google.com", + "stats.g.doubleclick.net", "api.segment.io", "cdn.segment.com", + "browser.sentry-cdn.com", "js.stripe.com", +}) + + +class ScopeFilter: + def __init__( + self, + target_hosts: set[str], + allow_hosts: set[str] | None = None, + deny_hosts: set[str] | None = None, + ) -> None: + self.target_hosts = set(target_hosts) + self.allow_hosts = set(allow_hosts or set()) + self.deny_hosts = set(deny_hosts or set()) + + def is_in_scope(self, flow: CapturedFlow) -> bool: + host = flow.host + if host in self.deny_hosts: + return False + if host in _DEFAULT_ANALYTICS: + return False + if host not in self.target_hosts and host not in self.allow_hosts: + return False + if flow.path.split("?")[0].lower().endswith(_ASSET_SUFFIXES): + return False + ctype = flow.resp_headers.get("content-type", "").lower() + return not ctype.startswith(("text/css", "image/", "font/", "application/javascript")) diff --git a/tests/test_scope.py b/tests/test_scope.py new file mode 100644 index 0000000..021e46d --- /dev/null +++ b/tests/test_scope.py @@ -0,0 +1,40 @@ +from auto_reverse.models import CapturedFlow +from auto_reverse.store import ScopeFilter + + +def _flow(host: str, path: str, ctype: str = "application/json") -> CapturedFlow: + return CapturedFlow( + method="GET", host=host, path=path, query={}, req_headers={}, + req_body=None, status=200, resp_headers={"content-type": ctype}, + resp_body=b"{}", timestamp=0.0, + ) + + +def test_target_host_in_scope(): + f = ScopeFilter(target_hosts={"app.example.com"}) + assert f.is_in_scope(_flow("app.example.com", "/api/users")) + + +def test_other_host_out_of_scope(): + f = ScopeFilter(target_hosts={"app.example.com"}) + assert not f.is_in_scope(_flow("cdn.other.com", "/x")) + + +def test_static_asset_dropped(): + f = ScopeFilter(target_hosts={"app.example.com"}) + assert not f.is_in_scope(_flow("app.example.com", "/main.js", "application/javascript")) + + +def test_analytics_host_dropped_by_default(): + f = ScopeFilter(target_hosts={"app.example.com"}) + assert not f.is_in_scope(_flow("www.google-analytics.com", "/collect")) + + +def test_extra_allow_host(): + f = ScopeFilter(target_hosts={"app.example.com"}, allow_hosts={"api.example.com"}) + assert f.is_in_scope(_flow("api.example.com", "/v1/data")) + + +def test_explicit_deny_overrides(): + f = ScopeFilter(target_hosts={"app.example.com"}, deny_hosts={"app.example.com"}) + assert not f.is_in_scope(_flow("app.example.com", "/api/users"))