Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 84 additions & 45 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import random
import threading
import time
from collections import defaultdict
from datetime import datetime, timezone
import os
import threading
from typing import TYPE_CHECKING
import weakref

Expand All @@ -16,14 +18,15 @@

class SpanBatcher(Batcher["StreamedSpan"]):
# MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
# a bit of a buffer for spans that appear between setting the flush event
# a bit of a buffer for spans that appear between the trigger to flush
# and actually flushing the buffer.
#
# The max limits are all per trace.
# The max limits are all per trace (per bucket).
MAX_ENVELOPE_SIZE = 1000 # spans
MAX_BEFORE_FLUSH = 1000
MAX_BEFORE_DROP = 2000
MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB

FLUSH_WAIT_TIME = 5.0

TYPE = "span"
Expand All @@ -47,7 +50,9 @@ def __init__(
self._lock = threading.Lock()
self._active: "threading.local" = threading.local()

self._flush_event: "threading.Event" = threading.Event()
self._last_full_flush: float = time.monotonic() # drives time-based flushes
self._flush_event = threading.Event()
self._pending_flush: set[str] = set() # buckets to be flushed

self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None
Comment thread
sentrivana marked this conversation as resolved.
Expand All @@ -71,11 +76,29 @@ def _reset_thread_state(self) -> None:
self._lock = threading.Lock()
self._active = threading.local()

self._last_full_flush = time.monotonic()
self._flush_event = threading.Event()
self._pending_flush = set()

self._flusher = None
self._flusher_pid = None

def _flush_loop(self) -> None:
Comment thread
sentrivana marked this conversation as resolved.
self._active.flag = True
while self._running:
jitter = random.random() * self.FLUSH_WAIT_TIME * 0.1
self._flush_event.wait(timeout=self.FLUSH_WAIT_TIME + jitter)
self._flush_event.clear()

self._flush(only_pending=True)

if (
time.monotonic() - self._last_full_flush
>= self.FLUSH_WAIT_TIME + jitter
):
self._flush()
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Comment thread
sentrivana marked this conversation as resolved.
self._last_full_flush = time.monotonic()

def add(self, span: "StreamedSpan") -> None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
Expand Down Expand Up @@ -104,13 +127,17 @@ def add(self, span: "StreamedSpan") -> None:
self._span_buffer[span.trace_id].append(span)
self._running_size[span.trace_id] += self._estimate_size(span)

if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
return

if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
self._flush_event.set()
return
if (
size + 1 >= self.MAX_BEFORE_FLUSH
or self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH
):
self._pending_flush.add(span.trace_id)
notify = True
else:
notify = False

if notify:
self._flush_event.set()
finally:
self._active.flag = False

Expand Down Expand Up @@ -154,50 +181,62 @@ def _to_transport_format(item: "StreamedSpan") -> "Any":

return res

def _flush(self) -> None:
def _flush(self, only_pending: bool = False) -> None:
with self._lock:
if len(self._span_buffer) == 0:
if only_pending:
buckets = list(self._pending_flush)
else:
# flush whole buffer, e.g. if the SDK is shutting down
buckets = list(self._span_buffer.keys())

self._pending_flush.clear()

if not buckets:
return

envelopes = []
for spans in self._span_buffer.values():
if spans:
dsc = spans[0]._dynamic_sampling_context()

# Max per envelope is 1000, so if we happen to have more than
# 1000 spans in one bucket, we'll need to separate them.
for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))
for bucket_id in buckets:
spans = self._span_buffer.get(bucket_id)
if not spans:
continue

envelope = Envelope(
headers={
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
)
dsc = spans[0]._dynamic_sampling_context()

# Max per envelope is 1000, so if we happen to have more than
# 1000 spans in one bucket, we'll need to separate them.
for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))

envelope.add_item(
Item(
type=self.TYPE,
content_type=self.CONTENT_TYPE,
headers={
"item_count": end - start,
},
payload=PayloadRef(
json={
"items": [
self._to_transport_format(spans[j])
for j in range(start, end)
]
}
),
)
envelope = Envelope(
headers={
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
)

envelope.add_item(
Item(
type=self.TYPE,
content_type=self.CONTENT_TYPE,
headers={
"item_count": end - start,
},
payload=PayloadRef(
json={
"items": [
self._to_transport_format(spans[j])
for j in range(start, end)
]
}
),
)
)

envelopes.append(envelope)
envelopes.append(envelope)

self._span_buffer.clear()
self._running_size.clear()
del self._span_buffer[bucket_id]
del self._running_size[bucket_id]

for envelope in envelopes:
self._capture_func(envelope)
Loading
Loading