diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 9e1b96f0cf..762b377af9 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -1,7 +1,9 @@ +import random +import threading +import time from collections import defaultdict from datetime import datetime, timezone import os -import threading from typing import TYPE_CHECKING import weakref @@ -16,14 +18,15 @@ class SpanBatcher(Batcher["StreamedSpan"]): # MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is - # a bit of a buffer for spans that appear between setting the flush event + # a bit of a buffer for spans that appear between the trigger to flush # and actually flushing the buffer. # - # The max limits are all per trace. + # The max limits are all per trace (per bucket). MAX_ENVELOPE_SIZE = 1000 # spans MAX_BEFORE_FLUSH = 1000 MAX_BEFORE_DROP = 2000 MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB + FLUSH_WAIT_TIME = 5.0 TYPE = "span" @@ -47,7 +50,9 @@ def __init__( self._lock = threading.Lock() self._active: "threading.local" = threading.local() - self._flush_event: "threading.Event" = threading.Event() + self._last_full_flush: float = time.monotonic() # drives time-based flushes + self._flush_event = threading.Event() + self._pending_flush: set[str] = set() # buckets to be flushed self._flusher: "Optional[threading.Thread]" = None self._flusher_pid: "Optional[int]" = None @@ -71,11 +76,29 @@ def _reset_thread_state(self) -> None: self._lock = threading.Lock() self._active = threading.local() + self._last_full_flush = time.monotonic() self._flush_event = threading.Event() + self._pending_flush = set() self._flusher = None self._flusher_pid = None + def _flush_loop(self) -> None: + self._active.flag = True + while self._running: + jitter = random.random() * self.FLUSH_WAIT_TIME * 0.1 + self._flush_event.wait(timeout=self.FLUSH_WAIT_TIME + jitter) + self._flush_event.clear() + + self._flush(only_pending=True) + + if ( + time.monotonic() - self._last_full_flush + >= self.FLUSH_WAIT_TIME + jitter + ): + self._flush() + self._last_full_flush = time.monotonic() + def add(self, span: "StreamedSpan") -> None: # Bail out if the current thread is already executing batcher code. # This prevents deadlocks when code running inside the batcher (e.g. @@ -104,13 +127,17 @@ def add(self, span: "StreamedSpan") -> None: self._span_buffer[span.trace_id].append(span) self._running_size[span.trace_id] += self._estimate_size(span) - if size + 1 >= self.MAX_BEFORE_FLUSH: - self._flush_event.set() - return - - if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH: - self._flush_event.set() - return + if ( + size + 1 >= self.MAX_BEFORE_FLUSH + or self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH + ): + self._pending_flush.add(span.trace_id) + notify = True + else: + notify = False + + if notify: + self._flush_event.set() finally: self._active.flag = False @@ -154,50 +181,62 @@ def _to_transport_format(item: "StreamedSpan") -> "Any": return res - def _flush(self) -> None: + def _flush(self, only_pending: bool = False) -> None: with self._lock: - if len(self._span_buffer) == 0: + if only_pending: + buckets = list(self._pending_flush) + else: + # flush whole buffer, e.g. if the SDK is shutting down + buckets = list(self._span_buffer.keys()) + + self._pending_flush.clear() + + if not buckets: return envelopes = [] - for spans in self._span_buffer.values(): - if spans: - dsc = spans[0]._dynamic_sampling_context() - # Max per envelope is 1000, so if we happen to have more than - # 1000 spans in one bucket, we'll need to separate them. - for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE): - end = min(start + self.MAX_ENVELOPE_SIZE, len(spans)) + for bucket_id in buckets: + spans = self._span_buffer.get(bucket_id) + if not spans: + continue - envelope = Envelope( - headers={ - "sent_at": format_timestamp(datetime.now(timezone.utc)), - "trace": dsc, - } - ) + dsc = spans[0]._dynamic_sampling_context() + + # Max per envelope is 1000, so if we happen to have more than + # 1000 spans in one bucket, we'll need to separate them. + for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE): + end = min(start + self.MAX_ENVELOPE_SIZE, len(spans)) - envelope.add_item( - Item( - type=self.TYPE, - content_type=self.CONTENT_TYPE, - headers={ - "item_count": end - start, - }, - payload=PayloadRef( - json={ - "items": [ - self._to_transport_format(spans[j]) - for j in range(start, end) - ] - } - ), - ) + envelope = Envelope( + headers={ + "sent_at": format_timestamp(datetime.now(timezone.utc)), + "trace": dsc, + } + ) + + envelope.add_item( + Item( + type=self.TYPE, + content_type=self.CONTENT_TYPE, + headers={ + "item_count": end - start, + }, + payload=PayloadRef( + json={ + "items": [ + self._to_transport_format(spans[j]) + for j in range(start, end) + ] + } + ), ) + ) - envelopes.append(envelope) + envelopes.append(envelope) - self._span_buffer.clear() - self._running_size.clear() + del self._span_buffer[bucket_id] + del self._running_size[bucket_id] for envelope in envelopes: self._capture_func(envelope) diff --git a/tests/tracing/test_span_buffer.py b/tests/tracing/test_span_buffer.py new file mode 100644 index 0000000000..15823a421c --- /dev/null +++ b/tests/tracing/test_span_buffer.py @@ -0,0 +1,427 @@ +import time +from unittest import mock + +import sentry_sdk +from sentry_sdk._span_batcher import SpanBatcher + + +def test_envelope_by_trace_id(sentry_init, capture_envelopes, monkeypatch): + """Envelopes only contain spans of one trace ID.""" + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + sentry_sdk.traces.new_trace() + + with sentry_sdk.traces.start_span(name="span 1a") as span1: + trace_id1 = span1.trace_id + with sentry_sdk.traces.start_span(name="span 1b"): + pass + + sentry_sdk.traces.new_trace() + + with sentry_sdk.traces.start_span(name="span 2a") as span2: + trace_id2 = span2.trace_id + with sentry_sdk.traces.start_span(name="span 2b"): + pass + + sentry_sdk.flush() + + assert len(envelopes) == 2 + + assert envelopes[0].headers["trace"]["trace_id"] == trace_id1 + assert len(envelopes[0].items[0].payload.json["items"]) == 2 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span 1a" + assert envelopes[0].items[0].payload.json["items"][0]["trace_id"] == trace_id1 + assert envelopes[0].items[0].payload.json["items"][1]["name"] == "span 1b" + assert envelopes[0].items[0].payload.json["items"][1]["trace_id"] == trace_id1 + + assert envelopes[1].headers["trace"]["trace_id"] == trace_id2 + assert len(envelopes[1].items[0].payload.json["items"]) == 2 + assert envelopes[1].items[0].payload.json["items"][0]["name"] == "span 2a" + assert envelopes[1].items[0].payload.json["items"][0]["trace_id"] == trace_id2 + assert envelopes[1].items[0].payload.json["items"][1]["name"] == "span 2b" + assert envelopes[1].items[0].payload.json["items"][1]["trace_id"] == trace_id2 + + +def test_max_envelope_size(sentry_init, capture_envelopes, monkeypatch): + """Envelope max size is respected.""" + monkeypatch.setattr(SpanBatcher, "MAX_ENVELOPE_SIZE", 2) + # set the time-based flush limit to something huge so that we're not observing + # time-based flushes + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + with sentry_sdk.traces.start_span(name="span 1"): + pass + with sentry_sdk.traces.start_span(name="span 2"): + pass + with sentry_sdk.traces.start_span(name="span 3"): + pass + with sentry_sdk.traces.start_span(name="span 4"): + pass + with sentry_sdk.traces.start_span(name="span 5"): + pass + + sentry_sdk.flush() + + assert len(envelopes) == 3 + + assert len(envelopes[0].items[0].payload.json["items"]) == 2 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span 1" + assert envelopes[0].items[0].payload.json["items"][1]["name"] == "span 2" + assert len(envelopes[1].items[0].payload.json["items"]) == 2 + assert envelopes[1].items[0].payload.json["items"][0]["name"] == "span 3" + assert envelopes[1].items[0].payload.json["items"][1]["name"] == "span 4" + assert len(envelopes[2].items[0].payload.json["items"]) == 1 + assert envelopes[2].items[0].payload.json["items"][0]["name"] == "span 5" + + +def test_drop_after_max_reached( + sentry_init, capture_envelopes, capture_record_lost_event_calls, monkeypatch +): + """New spans are dropped if a bucket reaches MAX_BEFORE_DROP spans.""" + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_DROP", 2) + # set the time-based flush limit to something huge so that we're not flushing + # prematurely + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + record_lost_event_calls = capture_record_lost_event_calls() + + with sentry_sdk.traces.start_span(name="span 1"): + pass + with sentry_sdk.traces.start_span(name="span 2"): + pass + with sentry_sdk.traces.start_span(name="span 3"): + pass + + sentry_sdk.flush() + + assert len(envelopes) == 1 + + assert len(envelopes[0].items[0].payload.json["items"]) == 2 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span 1" + assert envelopes[0].items[0].payload.json["items"][1]["name"] == "span 2" + + assert ("queue_overflow", "span", None, 1) in record_lost_event_calls + + +def test_drop_isolated_per_bucket( + sentry_init, capture_envelopes, capture_record_lost_event_calls, monkeypatch +): + """A bucket reaching MAX_BEFORE_DROP only drops spans from that trace.""" + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_DROP", 2) + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + record_lost_event_calls = capture_record_lost_event_calls() + + sentry_sdk.traces.new_trace() + with sentry_sdk.traces.start_span(name="a1") as span_a: + trace_id_a = span_a.trace_id + with sentry_sdk.traces.start_span(name="a2"): + pass + with sentry_sdk.traces.start_span(name="a3"): + pass + + sentry_sdk.traces.new_trace() + with sentry_sdk.traces.start_span(name="b1") as span_b: + trace_id_b = span_b.trace_id + with sentry_sdk.traces.start_span(name="b2"): + pass + + sentry_sdk.flush() + + assert len(envelopes) == 2 + + assert envelopes[0].headers["trace"]["trace_id"] == trace_id_a + assert len(envelopes[0].items[0].payload.json["items"]) == 2 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "a1" + assert envelopes[0].items[0].payload.json["items"][1]["name"] == "a2" + + assert envelopes[1].headers["trace"]["trace_id"] == trace_id_b + assert len(envelopes[1].items[0].payload.json["items"]) == 2 + assert envelopes[1].items[0].payload.json["items"][0]["name"] == "b1" + assert envelopes[1].items[0].payload.json["items"][1]["name"] == "b2" + + assert record_lost_event_calls.count(("queue_overflow", "span", None, 1)) == 1 + + +def test_length_based_flushing(sentry_init, capture_items, monkeypatch): + """A flush event is triggered when a bucket contains MAX_BEFORE_FLUSH spans.""" + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 1) + # set the time-based flush limit to something huge so that we're not hitting + # it since we want to test MAX_BEFORE_FLUSH instead + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + items = capture_items("span") + + with sentry_sdk.traces.start_span(name="span"): + pass + + time.sleep(0.1) + + assert len(items) == 1 + assert items[0].payload["name"] == "span" + + +def test_weight_based_flushing(sentry_init, capture_envelopes, monkeypatch): + """When a bucket reaches MAX_BYTES_BEFORE_FLUSH, it'll be flushed.""" + monkeypatch.setattr(SpanBatcher, "MAX_BYTES_BEFORE_FLUSH", 1) + # set the time-based flush limit to something huge so that it doesn't + # interfere + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + with sentry_sdk.traces.start_span(name="span"): + pass + + time.sleep(0.1) + + assert len(envelopes) == 1 + + assert len(envelopes[0].items[0].payload.json["items"]) == 1 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span" + + +def test_weight_based_flushing_by_attribute_size( + sentry_init, capture_envelopes, monkeypatch +): + """Attribute size is taken into account when using MAX_BYTES_BEFORE_FLUSH.""" + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 100000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + with sentry_sdk.traces.start_span(name="small span") as bare_span: + pass + + bare_span_size = SpanBatcher._estimate_size(bare_span) + big_attr = "x" * bare_span_size + + monkeypatch.setattr(SpanBatcher, "MAX_BYTES_BEFORE_FLUSH", bare_span_size * 3) + + time.sleep(0.1) + + # The first span alone is well under the byte limit, so no flush yet. + assert len(envelopes) == 0 + + with sentry_sdk.traces.start_span(name="big span", attributes={"big": big_attr}): + pass + + time.sleep(0.1) + + assert len(envelopes) == 1 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "small span" + assert envelopes[0].items[0].payload.json["items"][1]["name"] == "big span" + + +def test_bucket_recreated_after_flush(sentry_init, capture_envelopes, monkeypatch): + """Spans for a trace that arrive after that trace's bucket was flushed land in a fresh bucket.""" + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 2) + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 100000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + sentry_sdk.traces.new_trace() + + with sentry_sdk.traces.start_span(name="span 1") as span1: + trace_id = span1.trace_id + with sentry_sdk.traces.start_span(name="span 2"): + pass + + time.sleep(0.1) + + assert len(envelopes) == 1 + + with sentry_sdk.traces.start_span(name="span 3"): + pass + with sentry_sdk.traces.start_span(name="span 4"): + pass + + time.sleep(0.1) + + assert len(envelopes) == 2 + + assert envelopes[0].headers["trace"]["trace_id"] == trace_id + assert len(envelopes[0].items[0].payload.json["items"]) == 2 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span 1" + assert envelopes[0].items[0].payload.json["items"][0]["trace_id"] == trace_id + assert envelopes[0].items[0].payload.json["items"][1]["name"] == "span 2" + assert envelopes[0].items[0].payload.json["items"][1]["trace_id"] == trace_id + + assert envelopes[1].headers["trace"]["trace_id"] == trace_id + assert len(envelopes[1].items[0].payload.json["items"]) == 2 + assert envelopes[1].items[0].payload.json["items"][0]["name"] == "span 3" + assert envelopes[1].items[0].payload.json["items"][0]["trace_id"] == trace_id + assert envelopes[1].items[0].payload.json["items"][1]["name"] == "span 4" + assert envelopes[1].items[0].payload.json["items"][1]["trace_id"] == trace_id + + +def test_quiet_buckets_flush_eventually(sentry_init, capture_envelopes, monkeypatch): + """Even if a bucket doesn't trigger any size limits, it'll get flushed eventually.""" + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 0.1) + # these are purposefully high so as to never trigger in this test scenario + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 10000000) + monkeypatch.setattr(SpanBatcher, "MAX_BYTES_BEFORE_FLUSH", 10000000) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + with sentry_sdk.traces.start_span(name="span 1"): + pass + + time.sleep(0.3) + + assert len(envelopes) == 1 + + assert len(envelopes[0].items[0].payload.json["items"]) == 1 + assert envelopes[0].items[0].payload.json["items"][0]["name"] == "span 1" + + +def test_quiet_buckets_flushed_with_busy_neighbors( + sentry_init, capture_envelopes, monkeypatch +): + """When there are both busy and quiet buckets, all buckets get flushed eventually.""" + monkeypatch.setattr(SpanBatcher, "FLUSH_WAIT_TIME", 0.1) + monkeypatch.setattr(SpanBatcher, "MAX_BEFORE_FLUSH", 5) + + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + sentry_sdk.traces.new_trace() + + with sentry_sdk.traces.start_span(name="span 1") as span1: + trace_id1 = span1.trace_id + + sentry_sdk.traces.new_trace() + + with sentry_sdk.traces.start_span(name="span 2") as span2: + trace_id2 = span2.trace_id + + for i in range(3, 10): + with sentry_sdk.traces.start_span(name=f"span {i}"): + pass + + time.sleep(0.3) + + assert len(envelopes) >= 2 + + # we don't care how exactly the spans are distributed over envelopes + # (since both the time and length based limits are effective, the + # distribution might not be stable); just check whether the spans are all + # there and that each envelope only contains spans from one trace + seen = set() + for envelope in envelopes: + assert envelope.headers["trace"]["trace_id"] in (trace_id1, trace_id2) + + if envelope.headers["trace"]["trace_id"] == trace_id1: + assert len(envelope.items[0].payload.json["items"]) == 1 + assert envelope.items[0].payload.json["items"][0]["name"] == "span 1" + seen.add(envelope.items[0].payload.json["items"][0]["name"]) + + elif envelope.headers["trace"]["trace_id"] == trace_id2: + for span in envelope.items[0].payload.json["items"]: + seen.add(span["name"]) + + assert seen == {f"span {i}" for i in range(1, 10)} + + +def test_transport_format(sentry_init, capture_envelopes): + sentry_init( + server_name="test-server", + release="1.0.0", + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + envelopes = capture_envelopes() + + with sentry_sdk.traces.start_span(name="test"): + ... + + sentry_sdk.get_client().flush() + + assert len(envelopes) == 1 + assert len(envelopes[0].items) == 1 + item = envelopes[0].items[0] + + assert item.type == "span" + assert item.headers == { + "type": "span", + "item_count": 1, + "content_type": "application/vnd.sentry.items.span.v2+json", + } + assert item.payload.json == { + "items": [ + { + "trace_id": mock.ANY, + "span_id": mock.ANY, + "name": "test", + "status": "ok", + "is_segment": True, + "start_timestamp": mock.ANY, + "end_timestamp": mock.ANY, + "attributes": mock.ANY, + } + ] + } + for attribute, value in item.payload.json["items"][0]["attributes"].items(): + assert isinstance(attribute, str) + + assert isinstance(value, dict) + assert ( + len(value) == 2 + ) # technically, "unit" is also supported, but we don't currently set it anywhere + assert "value" in value + assert "type" in value + assert value["type"] in ("string", "boolean", "integer", "double", "array") diff --git a/tests/tracing/test_span_streaming.py b/tests/tracing/test_span_streaming.py index 94bc2346ba..43df3c7e26 100644 --- a/tests/tracing/test_span_streaming.py +++ b/tests/tracing/test_span_streaming.py @@ -1505,7 +1505,7 @@ def test_profile_stops_when_segment_ends( assert get_profiler_id() is None, "profiler should have stopped" -def test_transport_format(sentry_init, capture_envelopes): +def test_default_attributes(sentry_init, capture_envelopes): sentry_init( server_name="test-server", release="1.0.0", @@ -1530,30 +1530,17 @@ def test_transport_format(sentry_init, capture_envelopes): "item_count": 1, "content_type": "application/vnd.sentry.items.span.v2+json", } - assert item.payload.json == { - "items": [ - { - "trace_id": mock.ANY, - "span_id": mock.ANY, - "name": "test", - "status": "ok", - "is_segment": True, - "start_timestamp": mock.ANY, - "end_timestamp": mock.ANY, - "attributes": { - "thread.id": {"value": mock.ANY, "type": "string"}, - "thread.name": {"value": "MainThread", "type": "string"}, - "sentry.segment.id": {"value": mock.ANY, "type": "string"}, - "sentry.segment.name": {"value": "test", "type": "string"}, - "sentry.sdk.name": {"value": "sentry.python", "type": "string"}, - "sentry.sdk.version": {"value": mock.ANY, "type": "string"}, - "server.address": {"value": "test-server", "type": "string"}, - "sentry.environment": {"value": "production", "type": "string"}, - "sentry.release": {"value": "1.0.0", "type": "string"}, - "sentry.origin": {"value": "manual", "type": "string"}, - }, - } - ] + assert item.payload.json["items"][0]["attributes"] == { + "thread.id": {"value": mock.ANY, "type": "string"}, + "thread.name": {"value": "MainThread", "type": "string"}, + "sentry.segment.id": {"value": mock.ANY, "type": "string"}, + "sentry.segment.name": {"value": "test", "type": "string"}, + "sentry.sdk.name": {"value": "sentry.python", "type": "string"}, + "sentry.sdk.version": {"value": mock.ANY, "type": "string"}, + "server.address": {"value": "test-server", "type": "string"}, + "sentry.environment": {"value": "production", "type": "string"}, + "sentry.release": {"value": "1.0.0", "type": "string"}, + "sentry.origin": {"value": "manual", "type": "string"}, }