From f272951695421388e90ec3f2939c10e5220bfe4d Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Thu, 28 Sep 2023 10:52:33 +0200 Subject: [PATCH 1/2] feat(metrics): Shift flushing by up to a rollup window --- sentry_sdk/metrics.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/metrics.py b/sentry_sdk/metrics.py index 018c680750..3e5312eb06 100644 --- a/sentry_sdk/metrics.py +++ b/sentry_sdk/metrics.py @@ -2,6 +2,7 @@ import io import re import threading +import random import time import zlib from functools import wraps, partial @@ -303,6 +304,14 @@ def __init__( self._flush_event = Event() self._force_flush = False + # The aggregator shifts it's flushing by up to an entire rollup window to + # avoid multiple clients trampling on end of a 10 second window as all the + # buckets are anchored to multiples of ROLLUP seconds. We randomize this + # number once per aggregator boot to achieve some level of offsetting + # across a fleet of deployed SDKs. Relay itself will also apply independent + # jittering. + self._flush_shift = random.random() * self.ROLLUP + self._flusher = None # type: Optional[Thread] self._flusher_pid = None # type: Optional[int] self._ensure_thread() @@ -339,7 +348,7 @@ def _flushable_buckets(self): # type: (...) -> (Iterable[Tuple[int, Dict[BucketKey, Metric]]]) with self._lock: force_flush = self._force_flush - cutoff = time.time() - self.ROLLUP_IN_SECONDS + cutoff = time.time() - self.ROLLUP_IN_SECONDS - self._flush_shift flushable_buckets = () # type: Iterable[Tuple[int, Dict[BucketKey, Metric]]] weight_to_remove = 0 From ac5dd68cedafa14f8402519333e8cf80c03eb15b Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Thu, 28 Sep 2023 12:28:56 +0200 Subject: [PATCH 2/2] fix: incorrect const --- sentry_sdk/metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/metrics.py b/sentry_sdk/metrics.py index 3e5312eb06..debce9755f 100644 --- a/sentry_sdk/metrics.py +++ b/sentry_sdk/metrics.py @@ -310,7 +310,7 @@ def __init__( # number once per aggregator boot to achieve some level of offsetting # across a fleet of deployed SDKs. Relay itself will also apply independent # jittering. - self._flush_shift = random.random() * self.ROLLUP + self._flush_shift = random.random() * self.ROLLUP_IN_SECONDS self._flusher = None # type: Optional[Thread] self._flusher_pid = None # type: Optional[int]