diff --git a/src/auto_reverse/store.py b/src/auto_reverse/store.py index 7a310f6..d84df52 100644 --- a/src/auto_reverse/store.py +++ b/src/auto_reverse/store.py @@ -1,9 +1,15 @@ from __future__ import annotations import re +import threading +from dataclasses import dataclass from typing import TYPE_CHECKING +from auto_reverse.models import EndpointRecord, Signature, status_class + if TYPE_CHECKING: + from collections.abc import Callable + from auto_reverse.models import CapturedFlow _UUID = re.compile(r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$") @@ -62,3 +68,80 @@ class ScopeFilter: return False ctype = flow.resp_headers.get("content-type", "").lower() return not ctype.startswith(("text/css", "image/", "font/", "application/javascript")) + + +MAX_SAMPLES = 5 + + +@dataclass +class IngestResult: + in_scope: bool + is_new: bool + signature: Signature | None + + +class FlowStore: + """Thread-safe store: dedup by signature, retain bounded samples per endpoint.""" + + def __init__( + self, + scope: ScopeFilter, + on_new_signature: Callable[[Signature], None] | None = None, + ) -> None: + self._scope = scope + self._on_new = on_new_signature + self._lock = threading.Lock() + self._records: dict[Signature, EndpointRecord] = {} + self._samples: dict[Signature, list[CapturedFlow]] = {} + + def signature_of(self, flow: CapturedFlow) -> Signature: + return Signature( + method=flow.method.upper(), + host=flow.host, + path_template=path_template(flow.path), + status_class=status_class(flow.status), + ) + + def ingest(self, flow: CapturedFlow) -> IngestResult: + if not self._scope.is_in_scope(flow): + return IngestResult(in_scope=False, is_new=False, signature=None) + sig = self.signature_of(flow) + with self._lock: + is_new = sig not in self._records + if is_new: + self._records[sig] = EndpointRecord(signature=sig) + self._samples[sig] = [] + record = self._records[sig] + record.sample_count += 1 + record.query_params.update(self._query_keys(flow)) + samples = self._samples[sig] + if len(samples) < MAX_SAMPLES: + samples.append(flow) + if is_new and self._on_new is not None: + self._on_new(sig) + return IngestResult(in_scope=True, is_new=is_new, signature=sig) + + @staticmethod + def _query_keys(flow: CapturedFlow) -> set[str]: + return set(flow.query.keys()) + + def endpoints(self) -> list[EndpointRecord]: + with self._lock: + return list(self._records.values()) + + def samples(self, sig: Signature) -> list[CapturedFlow]: + with self._lock: + return list(self._samples.get(sig, [])) + + def get(self, sig: Signature) -> EndpointRecord | None: + with self._lock: + return self._records.get(sig) + + def search(self, query: str) -> list[EndpointRecord]: + q = query.lower() + with self._lock: + return [ + r for r in self._records.values() + if q in r.signature.path_template.lower() + or q in r.signature.method.lower() + ] diff --git a/tests/test_store.py b/tests/test_store.py index 995a05e..825b53e 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -1,4 +1,5 @@ -from auto_reverse.store import path_template +from auto_reverse.models import CapturedFlow, Signature +from auto_reverse.store import FlowStore, ScopeFilter, path_template def test_collapses_numeric_ids(): @@ -21,3 +22,46 @@ def test_keeps_short_words(): def test_root_and_empty(): assert path_template("/") == "/" assert path_template("") == "/" + + +def _post(host: str, path: str, body: bytes) -> CapturedFlow: + return CapturedFlow( + method="POST", host=host, path=path, query={}, + req_headers={"content-type": "application/json"}, req_body=body, + status=201, resp_headers={"content-type": "application/json"}, + resp_body=b'{"ok": true}', timestamp=0.0, + ) + + +def test_ingest_new_signature_returns_true_once(): + store = FlowStore(ScopeFilter(target_hosts={"ex.com"})) + assert store.ingest(_post("ex.com", "/api/cart/1", b'{"q": 1}')).is_new is True + # same template, different id -> not new + assert store.ingest(_post("ex.com", "/api/cart/2", b'{"q": 2}')).is_new is False + assert len(store.endpoints()) == 1 + + +def test_out_of_scope_flow_ignored(): + store = FlowStore(ScopeFilter(target_hosts={"ex.com"})) + result = store.ingest(_post("other.com", "/x", b"{}")) + assert result.is_new is False + assert result.in_scope is False + assert store.endpoints() == [] + + +def test_new_signature_callback_fires_with_signature(): + seen: list[Signature] = [] + store = FlowStore(ScopeFilter(target_hosts={"ex.com"}), on_new_signature=seen.append) + store.ingest(_post("ex.com", "/api/cart/1", b"{}")) + store.ingest(_post("ex.com", "/api/cart/2", b"{}")) + assert len(seen) == 1 + assert seen[0].path_template == "/api/cart/{id}" + + +def test_search_filters_by_substring(): + store = FlowStore(ScopeFilter(target_hosts={"ex.com"})) + store.ingest(_post("ex.com", "/api/cart/1", b"{}")) + store.ingest(_post("ex.com", "/api/login", b"{}")) + results = store.search("cart") + assert len(results) == 1 + assert results[0].signature.path_template == "/api/cart/{id}"