feat: thread-safe FlowStore with signature dedup and samples

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-31 23:56:42 +08:00
parent a82d99b12a
commit 7956b49e28
2 changed files with 128 additions and 1 deletions
+45 -1
View File
@@ -1,4 +1,5 @@
from auto_reverse.store import path_template
from auto_reverse.models import CapturedFlow, Signature
from auto_reverse.store import FlowStore, ScopeFilter, path_template
def test_collapses_numeric_ids():
@@ -21,3 +22,46 @@ def test_keeps_short_words():
def test_root_and_empty():
assert path_template("/") == "/"
assert path_template("") == "/"
def _post(host: str, path: str, body: bytes) -> CapturedFlow:
return CapturedFlow(
method="POST", host=host, path=path, query={},
req_headers={"content-type": "application/json"}, req_body=body,
status=201, resp_headers={"content-type": "application/json"},
resp_body=b'{"ok": true}', timestamp=0.0,
)
def test_ingest_new_signature_returns_true_once():
store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
assert store.ingest(_post("ex.com", "/api/cart/1", b'{"q": 1}')).is_new is True
# same template, different id -> not new
assert store.ingest(_post("ex.com", "/api/cart/2", b'{"q": 2}')).is_new is False
assert len(store.endpoints()) == 1
def test_out_of_scope_flow_ignored():
store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
result = store.ingest(_post("other.com", "/x", b"{}"))
assert result.is_new is False
assert result.in_scope is False
assert store.endpoints() == []
def test_new_signature_callback_fires_with_signature():
seen: list[Signature] = []
store = FlowStore(ScopeFilter(target_hosts={"ex.com"}), on_new_signature=seen.append)
store.ingest(_post("ex.com", "/api/cart/1", b"{}"))
store.ingest(_post("ex.com", "/api/cart/2", b"{}"))
assert len(seen) == 1
assert seen[0].path_template == "/api/cart/{id}"
def test_search_filters_by_substring():
store = FlowStore(ScopeFilter(target_hosts={"ex.com"}))
store.ingest(_post("ex.com", "/api/cart/1", b"{}"))
store.ingest(_post("ex.com", "/api/login", b"{}"))
results = store.search("cart")
assert len(results) == 1
assert results[0].signature.path_template == "/api/cart/{id}"