feat: scope filter (host allow/deny, asset and analytics drop)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,10 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import re
|
import re
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from auto_reverse.models import CapturedFlow
|
||||||
|
|
||||||
_UUID = re.compile(r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$")
|
_UUID = re.compile(r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$")
|
||||||
_HEX_TOKEN = re.compile(r"^[0-9a-fA-F]{16,}$")
|
_HEX_TOKEN = re.compile(r"^[0-9a-fA-F]{16,}$")
|
||||||
@@ -24,3 +28,37 @@ def path_template(path: str) -> str:
|
|||||||
parts = path.split("/")
|
parts = path.split("/")
|
||||||
out = ["{id}" if part and _is_variable(part) else part for part in parts]
|
out = ["{id}" if part and _is_variable(part) else part for part in parts]
|
||||||
return "/".join(out)
|
return "/".join(out)
|
||||||
|
|
||||||
|
|
||||||
|
_ASSET_SUFFIXES = (".js", ".mjs", ".css", ".png", ".jpg", ".jpeg", ".gif",
|
||||||
|
".svg", ".woff", ".woff2", ".ttf", ".ico", ".map", ".webp")
|
||||||
|
_DEFAULT_ANALYTICS = frozenset({
|
||||||
|
"www.google-analytics.com", "google-analytics.com", "analytics.google.com",
|
||||||
|
"stats.g.doubleclick.net", "api.segment.io", "cdn.segment.com",
|
||||||
|
"browser.sentry-cdn.com", "js.stripe.com",
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
class ScopeFilter:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
target_hosts: set[str],
|
||||||
|
allow_hosts: set[str] | None = None,
|
||||||
|
deny_hosts: set[str] | None = None,
|
||||||
|
) -> None:
|
||||||
|
self.target_hosts = set(target_hosts)
|
||||||
|
self.allow_hosts = set(allow_hosts or set())
|
||||||
|
self.deny_hosts = set(deny_hosts or set())
|
||||||
|
|
||||||
|
def is_in_scope(self, flow: CapturedFlow) -> bool:
|
||||||
|
host = flow.host
|
||||||
|
if host in self.deny_hosts:
|
||||||
|
return False
|
||||||
|
if host in _DEFAULT_ANALYTICS:
|
||||||
|
return False
|
||||||
|
if host not in self.target_hosts and host not in self.allow_hosts:
|
||||||
|
return False
|
||||||
|
if flow.path.split("?")[0].lower().endswith(_ASSET_SUFFIXES):
|
||||||
|
return False
|
||||||
|
ctype = flow.resp_headers.get("content-type", "").lower()
|
||||||
|
return not ctype.startswith(("text/css", "image/", "font/", "application/javascript"))
|
||||||
|
|||||||
@@ -0,0 +1,40 @@
|
|||||||
|
from auto_reverse.models import CapturedFlow
|
||||||
|
from auto_reverse.store import ScopeFilter
|
||||||
|
|
||||||
|
|
||||||
|
def _flow(host: str, path: str, ctype: str = "application/json") -> CapturedFlow:
|
||||||
|
return CapturedFlow(
|
||||||
|
method="GET", host=host, path=path, query={}, req_headers={},
|
||||||
|
req_body=None, status=200, resp_headers={"content-type": ctype},
|
||||||
|
resp_body=b"{}", timestamp=0.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_target_host_in_scope():
|
||||||
|
f = ScopeFilter(target_hosts={"app.example.com"})
|
||||||
|
assert f.is_in_scope(_flow("app.example.com", "/api/users"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_other_host_out_of_scope():
|
||||||
|
f = ScopeFilter(target_hosts={"app.example.com"})
|
||||||
|
assert not f.is_in_scope(_flow("cdn.other.com", "/x"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_static_asset_dropped():
|
||||||
|
f = ScopeFilter(target_hosts={"app.example.com"})
|
||||||
|
assert not f.is_in_scope(_flow("app.example.com", "/main.js", "application/javascript"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_analytics_host_dropped_by_default():
|
||||||
|
f = ScopeFilter(target_hosts={"app.example.com"})
|
||||||
|
assert not f.is_in_scope(_flow("www.google-analytics.com", "/collect"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_extra_allow_host():
|
||||||
|
f = ScopeFilter(target_hosts={"app.example.com"}, allow_hosts={"api.example.com"})
|
||||||
|
assert f.is_in_scope(_flow("api.example.com", "/v1/data"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_explicit_deny_overrides():
|
||||||
|
f = ScopeFilter(target_hosts={"app.example.com"}, deny_hosts={"app.example.com"})
|
||||||
|
assert not f.is_in_scope(_flow("app.example.com", "/api/users"))
|
||||||
Reference in New Issue
Block a user