From 8f3b93ba826d56628cc7e1b747158284c821e58a Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Mon, 27 Apr 2026 14:19:34 +0200 Subject: [PATCH 01/20] . --- sentry_sdk/_span_batcher.py | 109 +++++++++++++++++++++++------------- 1 file changed, 71 insertions(+), 38 deletions(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 78204f1a3a..e610c457eb 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -1,4 +1,6 @@ +import random import threading +from _queue import Queue from collections import defaultdict from datetime import datetime, timezone from typing import TYPE_CHECKING @@ -14,10 +16,10 @@ class SpanBatcher(Batcher["StreamedSpan"]): # MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is - # a bit of a buffer for spans that appear between setting the flush event + # a bit of a buffer for spans that appear between the trigger to flush # and actually flushing the buffer. # - # The max limits are all per trace. + # The max limits are all per trace (per bucket). MAX_ENVELOPE_SIZE = 1000 # spans MAX_BEFORE_FLUSH = 1000 MAX_BEFORE_DROP = 2000 @@ -45,11 +47,24 @@ def __init__( self._lock = threading.Lock() self._active: "threading.local" = threading.local() - self._flush_event: "threading.Event" = threading.Event() + self._flush_queue: Queue = Queue() self._flusher: "Optional[threading.Thread]" = None self._flusher_pid: "Optional[int]" = None + def _flush_loop(self) -> None: + # Mark the flush-loop thread as active for its entire lifetime so + # that any re-entrant add() triggered by GC warnings during wait(), + # flush(), or Event operations is silently dropped instead of + # deadlocking on internal locks. + self._active.flag = True + while self._running: + # XXX: the timeout should also be per bucket + trace_id = self._flush_queue.get( + timeout=self.FLUSH_WAIT_TIME + random.random() + ) + self._flush(trace_id=trace_id) + def add(self, span: "StreamedSpan") -> None: # Bail out if the current thread is already executing batcher code. # This prevents deadlocks when code running inside the batcher (e.g. @@ -79,15 +94,23 @@ def add(self, span: "StreamedSpan") -> None: self._running_size[span.trace_id] += self._estimate_size(span) if size + 1 >= self.MAX_BEFORE_FLUSH: - self._flush_event.set() + self._flush_queue.put(span.trace_id) return if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH: - self._flush_event.set() + self._flush_queue.put(span.trace_id) return finally: self._active.flag = False + def kill(self) -> None: + if self._flusher is None: + return + + self._running = False + self._flush_queue.put(None) + self._flusher = None + @staticmethod def _estimate_size(item: "StreamedSpan") -> int: # Rough estimate of serialized span size that's quick to compute. @@ -128,50 +151,60 @@ def _to_transport_format(item: "StreamedSpan") -> "Any": return res - def _flush(self) -> None: + def _flush(self, trace_id: "Optional[str]") -> None: with self._lock: if len(self._span_buffer) == 0: return + if trace_id is None: + # flush whole buffer, e.g. if the SDK is shutting down + buckets = self._span_buffer.keys() + else: + buckets = [trace_id] + envelopes = [] - for spans in self._span_buffer.values(): - if spans: - dsc = spans[0]._dynamic_sampling_context() - # Max per envelope is 1000, so if we happen to have more than - # 1000 spans in one bucket, we'll need to separate them. - for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE): - end = min(start + self.MAX_ENVELOPE_SIZE, len(spans)) + for trace_id in buckets: + spans = self._span_buffer.get(trace_id) + if not spans: + continue - envelope = Envelope( - headers={ - "sent_at": format_timestamp(datetime.now(timezone.utc)), - "trace": dsc, - } - ) + dsc = spans[0]._dynamic_sampling_context() + + # Max per envelope is 1000, so if we happen to have more than + # 1000 spans in one bucket, we'll need to separate them. + for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE): + end = min(start + self.MAX_ENVELOPE_SIZE, len(spans)) + + envelope = Envelope( + headers={ + "sent_at": format_timestamp(datetime.now(timezone.utc)), + "trace": dsc, + } + ) - envelope.add_item( - Item( - type=self.TYPE, - content_type=self.CONTENT_TYPE, - headers={ - "item_count": end - start, - }, - payload=PayloadRef( - json={ - "items": [ - self._to_transport_format(spans[j]) - for j in range(start, end) - ] - } - ), - ) + envelope.add_item( + Item( + type=self.TYPE, + content_type=self.CONTENT_TYPE, + headers={ + "item_count": end - start, + }, + payload=PayloadRef( + json={ + "items": [ + self._to_transport_format(spans[j]) + for j in range(start, end) + ] + } + ), ) + ) - envelopes.append(envelope) + envelopes.append(envelope) - self._span_buffer.clear() - self._running_size.clear() + del self._span_buffer[trace_id] + del self._running_size[trace_id] for envelope in envelopes: self._capture_func(envelope) From c20f3bcd75cc85c5c8b107f44d68c3d44b604e01 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Tue, 28 Apr 2026 09:48:00 +0200 Subject: [PATCH 02/20] . --- sentry_sdk/_span_batcher.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index e610c457eb..d2c0e5a08e 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -53,10 +53,6 @@ def __init__( self._flusher_pid: "Optional[int]" = None def _flush_loop(self) -> None: - # Mark the flush-loop thread as active for its entire lifetime so - # that any re-entrant add() triggered by GC warnings during wait(), - # flush(), or Event operations is silently dropped instead of - # deadlocking on internal locks. self._active.flag = True while self._running: # XXX: the timeout should also be per bucket From cef35476d8f1263c7700ac8b5f1aa5a0b940f259 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 29 Apr 2026 10:55:52 +0200 Subject: [PATCH 03/20] . --- sentry_sdk/_span_batcher.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index d2c0e5a08e..64e710e2ba 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -1,6 +1,6 @@ import random import threading -from _queue import Queue +from _queue import Empty, Queue from collections import defaultdict from datetime import datetime, timezone from typing import TYPE_CHECKING @@ -55,11 +55,13 @@ def __init__( def _flush_loop(self) -> None: self._active.flag = True while self._running: - # XXX: the timeout should also be per bucket - trace_id = self._flush_queue.get( - timeout=self.FLUSH_WAIT_TIME + random.random() - ) - self._flush(trace_id=trace_id) + try: + trace_id = self._flush_queue.get( + timeout=self.FLUSH_WAIT_TIME + random.random() + ) + self._flush(trace_id=trace_id) + except Empty: + self._flush() def add(self, span: "StreamedSpan") -> None: # Bail out if the current thread is already executing batcher code. From 13f6e033642a7a8dd27c274f7ecb557fd947bcca Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 29 Apr 2026 11:04:24 +0200 Subject: [PATCH 04/20] . --- sentry_sdk/_span_batcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 64e710e2ba..6c59f1b983 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -1,11 +1,11 @@ import random import threading -from _queue import Empty, Queue from collections import defaultdict from datetime import datetime, timezone from typing import TYPE_CHECKING from sentry_sdk._batcher import Batcher +from sentry_sdk._queue import EmptyError, Queue from sentry_sdk.envelope import Envelope, Item, PayloadRef from sentry_sdk.utils import format_timestamp, serialize_attribute @@ -60,7 +60,7 @@ def _flush_loop(self) -> None: timeout=self.FLUSH_WAIT_TIME + random.random() ) self._flush(trace_id=trace_id) - except Empty: + except EmptyError: self._flush() def add(self, span: "StreamedSpan") -> None: From b7dd5271a4f7833c28fc74194f79005c432e6880 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 29 Apr 2026 11:05:10 +0200 Subject: [PATCH 05/20] . --- sentry_sdk/_span_batcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 6c59f1b983..cd71337bee 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -149,7 +149,7 @@ def _to_transport_format(item: "StreamedSpan") -> "Any": return res - def _flush(self, trace_id: "Optional[str]") -> None: + def _flush(self, trace_id: "Optional[str]" = None) -> None: with self._lock: if len(self._span_buffer) == 0: return From f7f8facfa9f23bdf7b1c058be45940b36cf7a8f8 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 29 Apr 2026 11:13:55 +0200 Subject: [PATCH 06/20] . --- sentry_sdk/_span_batcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index cd71337bee..71aa2c3df2 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -156,7 +156,7 @@ def _flush(self, trace_id: "Optional[str]" = None) -> None: if trace_id is None: # flush whole buffer, e.g. if the SDK is shutting down - buckets = self._span_buffer.keys() + buckets = list(self._span_buffer.keys()) else: buckets = [trace_id] From 3a5b70c08ae3004081c96a6f1a4e2574c7d482fc Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 29 Apr 2026 12:46:10 +0200 Subject: [PATCH 07/20] global timeout --- sentry_sdk/_span_batcher.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 71aa2c3df2..6bab70e0c2 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -1,5 +1,6 @@ import random import threading +import time from collections import defaultdict from datetime import datetime, timezone from typing import TYPE_CHECKING @@ -24,6 +25,7 @@ class SpanBatcher(Batcher["StreamedSpan"]): MAX_BEFORE_FLUSH = 1000 MAX_BEFORE_DROP = 2000 MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB + FLUSH_WAIT_TIME = 5.0 TYPE = "span" @@ -47,6 +49,7 @@ def __init__( self._lock = threading.Lock() self._active: "threading.local" = threading.local() + self._last_full_flush: float = time.monotonic() self._flush_queue: Queue = Queue() self._flusher: "Optional[threading.Thread]" = None @@ -55,13 +58,19 @@ def __init__( def _flush_loop(self) -> None: self._active.flag = True while self._running: + jitter = random.random() try: - trace_id = self._flush_queue.get( - timeout=self.FLUSH_WAIT_TIME + random.random() - ) + trace_id = self._flush_queue.get(timeout=self.FLUSH_WAIT_TIME + jitter) self._flush(trace_id=trace_id) except EmptyError: + pass + + if ( + time.monotonic() - self._last_full_flush + >= self.FLUSH_WAIT_TIME + jitter + ): self._flush() + self._last_full_flush = time.monotonic() def add(self, span: "StreamedSpan") -> None: # Bail out if the current thread is already executing batcher code. From ba2af6a30b31a7c2369136a5a53a1267417a4e35 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 29 Apr 2026 16:35:28 +0200 Subject: [PATCH 08/20] tests --- sentry_sdk/_span_batcher.py | 12 +- tests/tracing/test_span_buffer.py | 248 ++++++++++++++++++++++++++++++ 2 files changed, 255 insertions(+), 5 deletions(-) create mode 100644 tests/tracing/test_span_buffer.py diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 6bab70e0c2..04b830d842 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -58,7 +58,7 @@ def __init__( def _flush_loop(self) -> None: self._active.flag = True while self._running: - jitter = random.random() + jitter = random.random() * self.FLUSH_WAIT_TIME * 0.1 try: trace_id = self._flush_queue.get(timeout=self.FLUSH_WAIT_TIME + jitter) self._flush(trace_id=trace_id) @@ -171,8 +171,8 @@ def _flush(self, trace_id: "Optional[str]" = None) -> None: envelopes = [] - for trace_id in buckets: - spans = self._span_buffer.get(trace_id) + for bucket_id in buckets: + spans = self._span_buffer.get(bucket_id) if not spans: continue @@ -210,8 +210,10 @@ def _flush(self, trace_id: "Optional[str]" = None) -> None: envelopes.append(envelope) - del self._span_buffer[trace_id] - del self._running_size[trace_id] + del self._span_buffer[bucket_id] + del self._running_size[bucket_id] + + # XXX remove trace_id from queue for envelope in envelopes: self._capture_func(envelope) diff --git a/tests/tracing/test_span_buffer.py b/tests/tracing/test_span_buffer.py new file mode 100644 index 0000000000..49f4108c48 --- /dev/null +++ b/tests/tracing/test_span_buffer.py @@ -0,0 +1,248 @@ +import time + +import sentry_sdk +from sentry_sdk._span_batcher import SpanBatcher + + +def test_envelope_by_trace_id(sentry_init, capture_envelopes, monkeypatch): + """Envelopes only contain spans of one trace ID.""" + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + sentry_sdk.traces.new_trace() + + with sentry_sdk.traces.start_span(name="span 1a") as span1: + trace_id1 = span1.trace_id + with sentry_sdk.traces.start_span(name="span 1b"): + pass + + sentry_sdk.traces.new_trace() + + with sentry_sdk.traces.start_span(name="span 2a") as span2: + trace_id2 = span2.trace_id + with sentry_sdk.traces.start_span(name="span 2b"): + pass + + sentry_sdk.flush() + + assert len(envelopes) == 2 + + assert envelopes[0].headers["trace"]["trace_id"] == trace_id1 + assert len(envelopes[0].items[0].payload.json["items"]) == 2 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span 1a" + assert envelopes[0].items[0].payload.json["items"][0]["trace_id"] == trace_id1 + assert envelopes[0].items[0].payload.json["items"][1]["name"] == "span 1b" + assert envelopes[0].items[0].payload.json["items"][1]["trace_id"] == trace_id1 + + assert envelopes[1].headers["trace"]["trace_id"] == trace_id2 + assert len(envelopes[1].items[0].payload.json["items"]) == 2 + assert envelopes[1].items[0].payload.json["items"][0]["name"] == "span 2a" + assert envelopes[1].items[0].payload.json["items"][0]["trace_id"] == trace_id2 + assert envelopes[1].items[0].payload.json["items"][1]["name"] == "span 2b" + assert envelopes[1].items[0].payload.json["items"][1]["trace_id"] == trace_id2 + + +def test_length_based_flushing(sentry_init, capture_items, monkeypatch): + """A flush event is triggered when a bucket contains MAX_BEFORE_FLUSH spans.""" + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 1) + # set the time-based flush limit to something huge so that we're not hitting + # it since we want to test MAX_BEFORE_FLUSH instead + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 1000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + items = capture_items("span") + + with sentry_sdk.traces.start_span(name="span"): + pass + + time.sleep(0.5) + + assert len(items) == 1 + assert items[0].payload["name"] == "span" + + +def test_max_envelope_size(sentry_init, capture_envelopes, monkeypatch): + """Envelope max size is respected.""" + monkeypatch.setattr(SpanBatcher, "MAX_ENVELOPE_SIZE", 2) + # set the time-based flush limit to something huge so that we're not hitting + # it + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 1000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + with sentry_sdk.traces.start_span(name="span 1"): + pass + with sentry_sdk.traces.start_span(name="span 2"): + pass + with sentry_sdk.traces.start_span(name="span 3"): + pass + with sentry_sdk.traces.start_span(name="span 4"): + pass + with sentry_sdk.traces.start_span(name="span 5"): + pass + + sentry_sdk.flush() + + assert len(envelopes) == 3 + + assert len(envelopes[0].items[0].payload.json["items"]) == 2 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span 1" + assert envelopes[0].items[0].payload.json["items"][1]["name"] == "span 2" + assert len(envelopes[1].items[0].payload.json["items"]) == 2 + assert envelopes[1].items[0].payload.json["items"][0]["name"] == "span 3" + assert envelopes[1].items[0].payload.json["items"][1]["name"] == "span 4" + assert len(envelopes[2].items[0].payload.json["items"]) == 1 + assert envelopes[2].items[0].payload.json["items"][0]["name"] == "span 5" + + +def test_drop_after_max_reached(sentry_init, capture_envelopes, monkeypatch): + """New spans are dropped if a bucket reaches MAX_BEFORE_DROP spans.""" + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_DROP", 2) + # set the time-based flush limit to something huge so that we're not hitting + # it + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 1000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + with sentry_sdk.traces.start_span(name="span 1"): + pass + with sentry_sdk.traces.start_span(name="span 2"): + pass + with sentry_sdk.traces.start_span(name="span 3"): + pass + + sentry_sdk.flush() + + assert len(envelopes) == 1 + + assert len(envelopes[0].items[0].payload.json["items"]) == 2 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span 1" + assert envelopes[0].items[0].payload.json["items"][1]["name"] == "span 2" + + # XXX client reports? + + +def test_weight_based_flushing(sentry_init, capture_envelopes, monkeypatch): + monkeypatch.setattr(SpanBatcher, "MAX_BYTES_BEFORE_FLUSH", 1) + # set the time-based flush limit to something huge so that we're not hitting it + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 1000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + with sentry_sdk.traces.start_span(name="span 1"): + pass + + time.sleep(0.1) + + with sentry_sdk.traces.start_span(name="span 2"): + pass + + time.sleep(0.5) + + assert len(envelopes) == 2 + + assert len(envelopes[0].items[0].payload.json["items"]) == 1 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span 1" + + assert len(envelopes[1].items[0].payload.json["items"]) == 1 + assert envelopes[1].items[0].payload.json["items"][0]["name"] == "span 2" + + +def test_quiet_buckets_flush_eventually(sentry_init, capture_envelopes, monkeypatch): + """Even if a bucket doesn't trigger any size limits, it'll get flushed eventually.""" + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 0.1) + # these are purposefully high so as to never trigger in this test scenario + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 10000000) + monkeypatch.setattr(SpanBatcher, "MAX_BYTES_BEFORE_FLUSH", 10000000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + with sentry_sdk.traces.start_span(name="span 1"): + pass + + time.sleep(0.5) + + assert len(envelopes) == 1 + + assert len(envelopes[0].items[0].payload.json["items"]) == 1 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span 1" + + +def test_quiet_buckets_flushed_with_busy_neighbors( + sentry_init, capture_envelopes, monkeypatch +): + """When there's a combination of busy and quiet buckets, the quiet ones get flushed eventually.""" + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 0.1) + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 5) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + sentry_sdk.traces.new_trace() + + with sentry_sdk.traces.start_span(name="span 1") as span1: + trace_id1 = span1.trace_id + + sentry_sdk.traces.new_trace() + + with sentry_sdk.traces.start_span(name="span 2") as span2: + trace_id2 = span2.trace_id + + for i in range(3, 10): + with sentry_sdk.traces.start_span(name=f"span {i}"): + pass + + time.sleep(0.3) + + assert len(envelopes) >= 2 + + # we don't care how exactly the spans are distributed over envelopes + # (since both the time and length based limits are effective, it might not + # be stable); just check whether the spans are all there and that each + # envelope only contains spans from one trace + seen = set() + for envelope in envelopes: + assert envelope.headers["trace"]["trace_id"] in (trace_id1, trace_id2) + + if envelope.headers["trace"]["trace_id"] == trace_id1: + assert len(envelope.items[0].payload.json["items"]) == 1 + assert envelope.items[0].payload.json["items"][0]["name"] == "span 1" + seen.add(envelope.items[0].payload.json["items"][0]["name"]) + + elif envelope.headers["trace"]["trace_id"] == trace_id2: + for span in envelope.items[0].payload.json["items"]: + seen.add(span["name"]) + + assert seen == {f"span {i}" for i in range(1, 10)} From 173c0df396b5ac1dbd8a773497ff60a804692844 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 30 Apr 2026 11:11:28 +0200 Subject: [PATCH 09/20] more tests --- tests/tracing/test_span_buffer.py | 264 +++++++++++++++++++++++---- tests/tracing/test_span_streaming.py | 51 ------ 2 files changed, 225 insertions(+), 90 deletions(-) diff --git a/tests/tracing/test_span_buffer.py b/tests/tracing/test_span_buffer.py index 49f4108c48..a219c8cec0 100644 --- a/tests/tracing/test_span_buffer.py +++ b/tests/tracing/test_span_buffer.py @@ -1,4 +1,5 @@ import time +from unittest import mock import sentry_sdk from sentry_sdk._span_batcher import SpanBatcher @@ -46,35 +47,12 @@ def test_envelope_by_trace_id(sentry_init, capture_envelopes, monkeypatch): assert envelopes[1].items[0].payload.json["items"][1]["trace_id"] == trace_id2 -def test_length_based_flushing(sentry_init, capture_items, monkeypatch): - """A flush event is triggered when a bucket contains MAX_BEFORE_FLUSH spans.""" - monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 1) - # set the time-based flush limit to something huge so that we're not hitting - # it since we want to test MAX_BEFORE_FLUSH instead - monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 1000) - - sentry_init( - traces_sample_rate=1.0, - _experiments={"trace_lifecycle": "stream"}, - ) - - items = capture_items("span") - - with sentry_sdk.traces.start_span(name="span"): - pass - - time.sleep(0.5) - - assert len(items) == 1 - assert items[0].payload["name"] == "span" - - def test_max_envelope_size(sentry_init, capture_envelopes, monkeypatch): """Envelope max size is respected.""" monkeypatch.setattr(SpanBatcher, "MAX_ENVELOPE_SIZE", 2) - # set the time-based flush limit to something huge so that we're not hitting - # it - monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 1000) + # set the time-based flush limit to something huge so that we're not observing + # time-based flushes + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) sentry_init( traces_sample_rate=1.0, @@ -108,12 +86,14 @@ def test_max_envelope_size(sentry_init, capture_envelopes, monkeypatch): assert envelopes[2].items[0].payload.json["items"][0]["name"] == "span 5" -def test_drop_after_max_reached(sentry_init, capture_envelopes, monkeypatch): +def test_drop_after_max_reached( + sentry_init, capture_envelopes, capture_record_lost_event_calls, monkeypatch +): """New spans are dropped if a bucket reaches MAX_BEFORE_DROP spans.""" monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_DROP", 2) - # set the time-based flush limit to something huge so that we're not hitting - # it - monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 1000) + # set the time-based flush limit to something huge so that we're not flushing + # prematurely + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) sentry_init( traces_sample_rate=1.0, @@ -121,6 +101,7 @@ def test_drop_after_max_reached(sentry_init, capture_envelopes, monkeypatch): ) envelopes = capture_envelopes() + record_lost_event_calls = capture_record_lost_event_calls() with sentry_sdk.traces.start_span(name="span 1"): pass @@ -137,13 +118,84 @@ def test_drop_after_max_reached(sentry_init, capture_envelopes, monkeypatch): assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span 1" assert envelopes[0].items[0].payload.json["items"][1]["name"] == "span 2" - # XXX client reports? + assert ("queue_overflow", "span", None, 1) in record_lost_event_calls + + +def test_drop_isolated_per_bucket( + sentry_init, capture_envelopes, capture_record_lost_event_calls, monkeypatch +): + """A bucket reaching MAX_BEFORE_DROP only drops spans from that trace.""" + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_DROP", 2) + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + record_lost_event_calls = capture_record_lost_event_calls() + + sentry_sdk.traces.new_trace() + with sentry_sdk.traces.start_span(name="a1") as span_a: + trace_id_a = span_a.trace_id + with sentry_sdk.traces.start_span(name="a2"): + pass + with sentry_sdk.traces.start_span(name="a3"): + pass + + sentry_sdk.traces.new_trace() + with sentry_sdk.traces.start_span(name="b1") as span_b: + trace_id_b = span_b.trace_id + with sentry_sdk.traces.start_span(name="b2"): + pass + + sentry_sdk.flush() + + assert len(envelopes) == 2 + + assert envelopes[0].headers["trace"]["trace_id"] == trace_id_a + assert len(envelopes[0].items[0].payload.json["items"]) == 2 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "a1" + assert envelopes[0].items[0].payload.json["items"][1]["name"] == "a2" + + assert envelopes[1].headers["trace"]["trace_id"] == trace_id_b + assert len(envelopes[1].items[0].payload.json["items"]) == 2 + assert envelopes[1].items[0].payload.json["items"][0]["name"] == "b1" + assert envelopes[1].items[0].payload.json["items"][1]["name"] == "b2" + + assert record_lost_event_calls.count(("queue_overflow", "span", None, 1)) == 1 + + +def test_length_based_flushing(sentry_init, capture_items, monkeypatch): + """A flush event is triggered when a bucket contains MAX_BEFORE_FLUSH spans.""" + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 1) + # set the time-based flush limit to something huge so that we're not hitting + # it since we want to test MAX_BEFORE_FLUSH instead + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + items = capture_items("span") + + with sentry_sdk.traces.start_span(name="span"): + pass + + time.sleep(0.1) + + assert len(items) == 1 + assert items[0].payload["name"] == "span" def test_weight_based_flushing(sentry_init, capture_envelopes, monkeypatch): + """When a bucket reaches MAX_BYTES_BEFORE_FLUSH, it'll be flushed.""" monkeypatch.setattr(SpanBatcher, "MAX_BYTES_BEFORE_FLUSH", 1) - # set the time-based flush limit to something huge so that we're not hitting it - monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 1000) + # set the time-based flush limit to something huge so that it doesn't + # interfere + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) sentry_init( traces_sample_rate=1.0, @@ -160,7 +212,7 @@ def test_weight_based_flushing(sentry_init, capture_envelopes, monkeypatch): with sentry_sdk.traces.start_span(name="span 2"): pass - time.sleep(0.5) + time.sleep(0.1) assert len(envelopes) == 2 @@ -171,6 +223,89 @@ def test_weight_based_flushing(sentry_init, capture_envelopes, monkeypatch): assert envelopes[1].items[0].payload.json["items"][0]["name"] == "span 2" +def test_weight_based_flushing_by_attribute_size( + sentry_init, capture_envelopes, monkeypatch +): + """A bucket is flushed once the estimated bytes (driven by attribute size) exceed MAX_BYTES_BEFORE_FLUSH.""" + # _estimate_size adds 210 base + (50 + len(str(value))) per attribute. + # With this limit, a span without attributes (~210) won't trigger a flush, + # but a span carrying a large attribute value will push the bucket over. + monkeypatch.setattr(SpanBatcher, "MAX_BYTES_BEFORE_FLUSH", 500) + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 100000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + with sentry_sdk.traces.start_span(name="small span"): + pass + + time.sleep(0.1) + + # The first span alone is well under the byte limit, so no flush yet. + assert len(envelopes) == 0 + + with sentry_sdk.traces.start_span(name="big span", attributes={"big": "x" * 300}): + pass + + time.sleep(0.1) + + assert len(envelopes) == 1 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "small span" + assert envelopes[0].items[0].payload.json["items"][1]["name"] == "big span" + + +def test_bucket_recreated_after_flush(sentry_init, capture_envelopes, monkeypatch): + """Spans for a trace that arrive after that trace's bucket was flushed land in a fresh bucket.""" + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 2) + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + sentry_sdk.traces.new_trace() + + with sentry_sdk.traces.start_span(name="span 1") as span1: + trace_id = span1.trace_id + with sentry_sdk.traces.start_span(name="span 2"): + pass + + time.sleep(0.1) + + assert len(envelopes) == 1 + + with sentry_sdk.traces.start_span(name="span 3"): + pass + with sentry_sdk.traces.start_span(name="span 4"): + pass + + time.sleep(0.1) + + assert len(envelopes) == 2 + + assert envelopes[0].headers["trace"]["trace_id"] == trace_id + assert len(envelopes[0].items[0].payload.json["items"]) == 2 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span 1" + assert envelopes[0].items[0].payload.json["items"][0]["trace_id"] == trace_id + assert envelopes[0].items[0].payload.json["items"][1]["name"] == "span 2" + assert envelopes[0].items[0].payload.json["items"][1]["trace_id"] == trace_id + + assert envelopes[1].headers["trace"]["trace_id"] == trace_id + assert len(envelopes[1].items[0].payload.json["items"]) == 2 + assert envelopes[1].items[0].payload.json["items"][0]["name"] == "span 3" + assert envelopes[1].items[0].payload.json["items"][0]["trace_id"] == trace_id + assert envelopes[1].items[0].payload.json["items"][1]["name"] == "span 4" + assert envelopes[1].items[0].payload.json["items"][1]["trace_id"] == trace_id + + def test_quiet_buckets_flush_eventually(sentry_init, capture_envelopes, monkeypatch): """Even if a bucket doesn't trigger any size limits, it'll get flushed eventually.""" monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 0.1) @@ -188,7 +323,7 @@ def test_quiet_buckets_flush_eventually(sentry_init, capture_envelopes, monkeypa with sentry_sdk.traces.start_span(name="span 1"): pass - time.sleep(0.5) + time.sleep(0.3) assert len(envelopes) == 1 @@ -199,7 +334,7 @@ def test_quiet_buckets_flush_eventually(sentry_init, capture_envelopes, monkeypa def test_quiet_buckets_flushed_with_busy_neighbors( sentry_init, capture_envelopes, monkeypatch ): - """When there's a combination of busy and quiet buckets, the quiet ones get flushed eventually.""" + """When there are both busy and quiet buckets, all buckets get flushed eventually.""" monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 0.1) monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 5) @@ -229,9 +364,9 @@ def test_quiet_buckets_flushed_with_busy_neighbors( assert len(envelopes) >= 2 # we don't care how exactly the spans are distributed over envelopes - # (since both the time and length based limits are effective, it might not - # be stable); just check whether the spans are all there and that each - # envelope only contains spans from one trace + # (since both the time and length based limits are effective, the + # distribution might not be stable); just check whether the spans are all + # there and that each envelope only contains spans from one trace seen = set() for envelope in envelopes: assert envelope.headers["trace"]["trace_id"] in (trace_id1, trace_id2) @@ -246,3 +381,54 @@ def test_quiet_buckets_flushed_with_busy_neighbors( seen.add(span["name"]) assert seen == {f"span {i}" for i in range(1, 10)} + + +def test_transport_format(sentry_init, capture_envelopes): + sentry_init( + server_name="test-server", + release="1.0.0", + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + with sentry_sdk.traces.start_span(name="test"): + ... + + sentry_sdk.get_client().flush() + + assert len(envelopes) == 1 + assert len(envelopes[0].items) == 1 + item = envelopes[0].items[0] + + assert item.type == "span" + assert item.headers == { + "type": "span", + "item_count": 1, + "content_type": "application/vnd.sentry.items.span.v2+json", + } + assert item.payload.json == { + "items": [ + { + "trace_id": mock.ANY, + "span_id": mock.ANY, + "name": "test", + "status": "ok", + "is_segment": True, + "start_timestamp": mock.ANY, + "end_timestamp": mock.ANY, + "attributes": mock.ANY, + } + ] + } + for attribute, value in item.payload.json["items"][0]["attributes"].items(): + assert isinstance(attribute, str) + + assert isinstance(value, dict) + assert ( + len(value) == 2 + ) # technically, "unit" is also supported, but we don't currently set it anywhere + assert "value" in value + assert "type" in value + assert value["type"] in ("string", "boolean", "integer", "double", "array") diff --git a/tests/tracing/test_span_streaming.py b/tests/tracing/test_span_streaming.py index 849c0a5fb5..1746487175 100644 --- a/tests/tracing/test_span_streaming.py +++ b/tests/tracing/test_span_streaming.py @@ -1542,54 +1542,3 @@ def test_default_attributes(sentry_init, capture_envelopes): "sentry.release": {"value": "1.0.0", "type": "string"}, "sentry.origin": {"value": "manual", "type": "string"}, } - - -def test_transport_format(sentry_init, capture_envelopes): - sentry_init( - server_name="test-server", - release="1.0.0", - traces_sample_rate=1.0, - _experiments={"trace_lifecycle": "stream"}, - ) - - envelopes = capture_envelopes() - - with sentry_sdk.traces.start_span(name="test"): - ... - - sentry_sdk.get_client().flush() - - assert len(envelopes) == 1 - assert len(envelopes[0].items) == 1 - item = envelopes[0].items[0] - - assert item.type == "span" - assert item.headers == { - "type": "span", - "item_count": 1, - "content_type": "application/vnd.sentry.items.span.v2+json", - } - assert item.payload.json == { - "items": [ - { - "trace_id": mock.ANY, - "span_id": mock.ANY, - "name": "test", - "status": "ok", - "is_segment": True, - "start_timestamp": mock.ANY, - "end_timestamp": mock.ANY, - "attributes": mock.ANY, - } - ] - } - for attribute, value in item.payload.json["items"][0]["attributes"].items(): - assert isinstance(attribute, str) - - assert isinstance(value, dict) - assert ( - len(value) == 2 - ) # technically, "unit" is also supported, but we don't currently set it anywhere - assert "value" in value - assert "type" in value - assert value["type"] in ("string", "boolean", "integer", "double", "array") From d0cd8afe25a5db85b307ab35915fbbba031ffe64 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 30 Apr 2026 11:25:13 +0200 Subject: [PATCH 10/20] . --- sentry_sdk/_span_batcher.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 04b830d842..581a57b706 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -213,7 +213,5 @@ def _flush(self, trace_id: "Optional[str]" = None) -> None: del self._span_buffer[bucket_id] del self._running_size[bucket_id] - # XXX remove trace_id from queue - for envelope in envelopes: self._capture_func(envelope) From 252e3e32f7a428056f8ebf2036b52d16e149c50f Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 30 Apr 2026 11:26:31 +0200 Subject: [PATCH 11/20] . --- tests/tracing/test_span_streaming.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/tracing/test_span_streaming.py b/tests/tracing/test_span_streaming.py index 1746487175..fea4eeaf81 100644 --- a/tests/tracing/test_span_streaming.py +++ b/tests/tracing/test_span_streaming.py @@ -1532,7 +1532,6 @@ def test_default_attributes(sentry_init, capture_envelopes): assert item.payload.json["items"][0]["attributes"] == { "thread.id": {"value": mock.ANY, "type": "string"}, "thread.name": {"value": "MainThread", "type": "string"}, - "process.command_args": {"value": mock.ANY, "type": "array"}, "sentry.segment.id": {"value": mock.ANY, "type": "string"}, "sentry.segment.name": {"value": "test", "type": "string"}, "sentry.sdk.name": {"value": "sentry.python", "type": "string"}, From 0df0d3e9cba70300fb90fb6361e59bfb8676c40b Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 30 Apr 2026 11:40:24 +0200 Subject: [PATCH 12/20] . --- tests/tracing/test_span_buffer.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/tracing/test_span_buffer.py b/tests/tracing/test_span_buffer.py index a219c8cec0..98debaea71 100644 --- a/tests/tracing/test_span_buffer.py +++ b/tests/tracing/test_span_buffer.py @@ -227,10 +227,6 @@ def test_weight_based_flushing_by_attribute_size( sentry_init, capture_envelopes, monkeypatch ): """A bucket is flushed once the estimated bytes (driven by attribute size) exceed MAX_BYTES_BEFORE_FLUSH.""" - # _estimate_size adds 210 base + (50 + len(str(value))) per attribute. - # With this limit, a span without attributes (~210) won't trigger a flush, - # but a span carrying a large attribute value will push the bucket over. - monkeypatch.setattr(SpanBatcher, "MAX_BYTES_BEFORE_FLUSH", 500) monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 100000) @@ -241,9 +237,12 @@ def test_weight_based_flushing_by_attribute_size( envelopes = capture_envelopes() - with sentry_sdk.traces.start_span(name="small span"): + with sentry_sdk.traces.start_span(name="small span") as bare_span: pass + bare_span_size = SpanBatcher._estimate_size(bare_span) + monkeypatch.setattr(SpanBatcher, "MAX_BYTES_BEFORE_FLUSH", bare_span_size * 2) + time.sleep(0.1) # The first span alone is well under the byte limit, so no flush yet. From c815130ee21d16e3b8d49524cf473cb0a89933f5 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 30 Apr 2026 11:47:10 +0200 Subject: [PATCH 13/20] . --- tests/tracing/test_span_buffer.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/tracing/test_span_buffer.py b/tests/tracing/test_span_buffer.py index 98debaea71..5e9dc803ff 100644 --- a/tests/tracing/test_span_buffer.py +++ b/tests/tracing/test_span_buffer.py @@ -241,14 +241,16 @@ def test_weight_based_flushing_by_attribute_size( pass bare_span_size = SpanBatcher._estimate_size(bare_span) - monkeypatch.setattr(SpanBatcher, "MAX_BYTES_BEFORE_FLUSH", bare_span_size * 2) + big_attr = "x" * bare_span_size + + monkeypatch.setattr(SpanBatcher, "MAX_BYTES_BEFORE_FLUSH", bare_span_size * 3) time.sleep(0.1) # The first span alone is well under the byte limit, so no flush yet. assert len(envelopes) == 0 - with sentry_sdk.traces.start_span(name="big span", attributes={"big": "x" * 300}): + with sentry_sdk.traces.start_span(name="big span", attributes={"big": big_attr}): pass time.sleep(0.1) From 2158bad0158026645b82997e25f6103e1f056414 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 30 Apr 2026 11:47:36 +0200 Subject: [PATCH 14/20] . --- tests/tracing/test_span_buffer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tracing/test_span_buffer.py b/tests/tracing/test_span_buffer.py index 5e9dc803ff..b59ef8421d 100644 --- a/tests/tracing/test_span_buffer.py +++ b/tests/tracing/test_span_buffer.py @@ -226,7 +226,7 @@ def test_weight_based_flushing(sentry_init, capture_envelopes, monkeypatch): def test_weight_based_flushing_by_attribute_size( sentry_init, capture_envelopes, monkeypatch ): - """A bucket is flushed once the estimated bytes (driven by attribute size) exceed MAX_BYTES_BEFORE_FLUSH.""" + """Attribute size is taken into account when using MAX_BYTES_BEFORE_FLUSH.""" monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 100000) From c3ee7530c6d3423792d1e71847bfe6dc3b85b540 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 30 Apr 2026 11:54:53 +0200 Subject: [PATCH 15/20] . --- sentry_sdk/_span_batcher.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 581a57b706..3bd2ee1941 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -114,6 +114,7 @@ def kill(self) -> None: if self._flusher is None: return + self._flush() self._running = False self._flush_queue.put(None) self._flusher = None From 41a98c025c43c2d268afc7656368e6af90b206e0 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 30 Apr 2026 12:11:49 +0200 Subject: [PATCH 16/20] . --- sentry_sdk/_span_batcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 3bd2ee1941..22edb8df75 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -114,7 +114,7 @@ def kill(self) -> None: if self._flusher is None: return - self._flush() + self.flush() self._running = False self._flush_queue.put(None) self._flusher = None From 80f5400522bbf0fdd7ba8dda4fa7dc968a8c5efc Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 30 Apr 2026 12:54:24 +0200 Subject: [PATCH 17/20] replace queue --- sentry_sdk/_span_batcher.py | 58 +++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 22edb8df75..f893444088 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -6,7 +6,6 @@ from typing import TYPE_CHECKING from sentry_sdk._batcher import Batcher -from sentry_sdk._queue import EmptyError, Queue from sentry_sdk.envelope import Envelope, Item, PayloadRef from sentry_sdk.utils import format_timestamp, serialize_attribute @@ -49,8 +48,9 @@ def __init__( self._lock = threading.Lock() self._active: "threading.local" = threading.local() - self._last_full_flush: float = time.monotonic() - self._flush_queue: Queue = Queue() + self._last_full_flush: float = time.monotonic() # drives time-based flushes + self._flush_event: threading.Event = threading.Event() + self._pending_flush: set[str] = set() # buckets to be flushed self._flusher: "Optional[threading.Thread]" = None self._flusher_pid: "Optional[int]" = None @@ -59,11 +59,10 @@ def _flush_loop(self) -> None: self._active.flag = True while self._running: jitter = random.random() * self.FLUSH_WAIT_TIME * 0.1 - try: - trace_id = self._flush_queue.get(timeout=self.FLUSH_WAIT_TIME + jitter) - self._flush(trace_id=trace_id) - except EmptyError: - pass + self._flush_event.wait(timeout=self.FLUSH_WAIT_TIME + jitter) + self._flush_event.clear() + + self._flush(only_pending=True) if ( time.monotonic() - self._last_full_flush @@ -100,25 +99,20 @@ def add(self, span: "StreamedSpan") -> None: self._span_buffer[span.trace_id].append(span) self._running_size[span.trace_id] += self._estimate_size(span) - if size + 1 >= self.MAX_BEFORE_FLUSH: - self._flush_queue.put(span.trace_id) - return - - if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH: - self._flush_queue.put(span.trace_id) - return + if ( + size + 1 >= self.MAX_BEFORE_FLUSH + or self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH + ): + self._pending_flush.add(span.trace_id) + notify = True + else: + notify = False + + if notify: + self._flush_event.set() finally: self._active.flag = False - def kill(self) -> None: - if self._flusher is None: - return - - self.flush() - self._running = False - self._flush_queue.put(None) - self._flusher = None - @staticmethod def _estimate_size(item: "StreamedSpan") -> int: # Rough estimate of serialized span size that's quick to compute. @@ -159,16 +153,18 @@ def _to_transport_format(item: "StreamedSpan") -> "Any": return res - def _flush(self, trace_id: "Optional[str]" = None) -> None: + def _flush(self, only_pending: bool = False) -> None: with self._lock: - if len(self._span_buffer) == 0: - return - - if trace_id is None: + if only_pending: + buckets = list(self._pending_flush) + else: # flush whole buffer, e.g. if the SDK is shutting down buckets = list(self._span_buffer.keys()) - else: - buckets = [trace_id] + + self._pending_flush.clear() + + if not buckets: + return envelopes = [] From a1745817ffe7fcbe9268efac7ce7560df7b54439 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 30 Apr 2026 14:02:58 +0200 Subject: [PATCH 18/20] update _reset_thread_state --- sentry_sdk/_span_batcher.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index c8707632dc..6cb8e17a5c 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -51,7 +51,7 @@ def __init__( self._active: "threading.local" = threading.local() self._last_full_flush: float = time.monotonic() # drives time-based flushes - self._flush_event: threading.Event = threading.Event() + self._flush_event = threading.Event() self._pending_flush: set[str] = set() # buckets to be flushed self._flusher: "Optional[threading.Thread]" = None @@ -76,7 +76,9 @@ def _reset_thread_state(self) -> None: self._lock = threading.Lock() self._active = threading.local() + self._last_full_flush: float = time.monotonic() self._flush_event = threading.Event() + self._pending_flush: set[str] = set() self._flusher = None self._flusher_pid = None From 77d3f6cb416d5f7740dd1a5735a7c275902badc0 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 30 Apr 2026 14:09:51 +0200 Subject: [PATCH 19/20] . --- sentry_sdk/_span_batcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 6cb8e17a5c..762b377af9 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -76,9 +76,9 @@ def _reset_thread_state(self) -> None: self._lock = threading.Lock() self._active = threading.local() - self._last_full_flush: float = time.monotonic() + self._last_full_flush = time.monotonic() self._flush_event = threading.Event() - self._pending_flush: set[str] = set() + self._pending_flush = set() self._flusher = None self._flusher_pid = None From e88a50da1f4ec520262e4e9e3f376aff97e7a16a Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 30 Apr 2026 16:08:52 +0200 Subject: [PATCH 20/20] simplify test --- tests/tracing/test_span_buffer.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/tests/tracing/test_span_buffer.py b/tests/tracing/test_span_buffer.py index b59ef8421d..15823a421c 100644 --- a/tests/tracing/test_span_buffer.py +++ b/tests/tracing/test_span_buffer.py @@ -204,23 +204,15 @@ def test_weight_based_flushing(sentry_init, capture_envelopes, monkeypatch): envelopes = capture_envelopes() - with sentry_sdk.traces.start_span(name="span 1"): - pass - - time.sleep(0.1) - - with sentry_sdk.traces.start_span(name="span 2"): + with sentry_sdk.traces.start_span(name="span"): pass time.sleep(0.1) - assert len(envelopes) == 2 + assert len(envelopes) == 1 assert len(envelopes[0].items[0].payload.json["items"]) == 1 - assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span 1" - - assert len(envelopes[1].items[0].payload.json["items"]) == 1 - assert envelopes[1].items[0].payload.json["items"][0]["name"] == "span 2" + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span" def test_weight_based_flushing_by_attribute_size(