-
Notifications
You must be signed in to change notification settings - Fork 609
feat(sqlalchemy): Support span streaming #6132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
a139626
38b5933
3d89b9e
b1fa3b5
9929f55
75c0cc1
ad4e14d
2bf7c77
f873d09
56ee084
2c22c16
f9faa2f
064dd83
4b965c4
98c67f0
2877f7b
f20dfc6
6322ad0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,16 @@ | ||
| from sentry_sdk.consts import SPANSTATUS, SPANDATA | ||
| from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable | ||
| from sentry_sdk.tracing_utils import add_query_source, record_sql_queries | ||
| from sentry_sdk.tracing_utils import ( | ||
| add_query_source, | ||
| record_sql_queries_supporting_streaming, | ||
| ) | ||
| from sentry_sdk.utils import ( | ||
| capture_internal_exceptions, | ||
| ensure_integration_enabled, | ||
| parse_version, | ||
| ) | ||
| from sentry_sdk.traces import StreamedSpan, SpanStatus | ||
| from sentry_sdk.tracing import Span | ||
|
|
||
| try: | ||
| from sqlalchemy.engine import Engine # type: ignore | ||
|
|
@@ -20,8 +25,7 @@ | |
| from typing import Any | ||
| from typing import ContextManager | ||
| from typing import Optional | ||
|
|
||
| from sentry_sdk.tracing import Span | ||
| from typing import Union | ||
|
|
||
|
|
||
| class SqlalchemyIntegration(Integration): | ||
|
|
@@ -48,7 +52,7 @@ def _before_cursor_execute( | |
| executemany: bool, | ||
| *args: "Any", | ||
| ) -> None: | ||
| ctx_mgr = record_sql_queries( | ||
| ctx_mgr = record_sql_queries_supporting_streaming( | ||
| cursor, | ||
| statement, | ||
| parameters, | ||
|
|
@@ -78,12 +82,19 @@ def _after_cursor_execute( | |
| context, "_sentry_sql_span_manager", None | ||
| ) | ||
|
|
||
| # Record query source immediately before span is finished: accurate end timestamp and before the span is flushed. | ||
| span: "Optional[Union[Span, StreamedSpan]]" = getattr( | ||
| context, "_sentry_sql_span", None | ||
| ) | ||
| if isinstance(span, StreamedSpan): | ||
| with capture_internal_exceptions(): | ||
| add_query_source(span) | ||
|
|
||
| if ctx_mgr is not None: | ||
| context._sentry_sql_span_manager = None | ||
| ctx_mgr.__exit__(None, None, None) | ||
|
|
||
| span: "Optional[Span]" = getattr(context, "_sentry_sql_span", None) | ||
| if span is not None: | ||
| if isinstance(span, Span): | ||
| with capture_internal_exceptions(): | ||
| add_query_source(span) | ||
|
|
||
|
|
@@ -96,7 +107,10 @@ def _handle_error(context: "Any", *args: "Any") -> None: | |
| span: "Optional[Span]" = getattr(execution_context, "_sentry_sql_span", None) | ||
|
|
||
| if span is not None: | ||
| span.set_status(SPANSTATUS.INTERNAL_ERROR) | ||
| if isinstance(span, StreamedSpan): | ||
| span.status = SpanStatus.ERROR | ||
| else: | ||
| span.set_status(SPANSTATUS.INTERNAL_ERROR) | ||
|
|
||
| # _after_cursor_execute does not get called for crashing SQL stmts. Judging | ||
| # from SQLAlchemy codebase it does seem like any error coming into this | ||
|
alexander-alderman-webb marked this conversation as resolved.
|
||
|
|
@@ -132,15 +146,20 @@ def _get_db_system(name: str) -> "Optional[str]": | |
| return None | ||
|
|
||
|
|
||
| def _set_db_data(span: "Span", conn: "Any") -> None: | ||
| def _set_db_data(span: "Union[Span, StreamedSpan]", conn: "Any") -> None: | ||
| if isinstance(span, StreamedSpan): | ||
| set_on_span = span.set_attribute | ||
| else: | ||
| set_on_span = span.set_data | ||
|
|
||
| db_system = _get_db_system(conn.engine.name) | ||
| if db_system is not None: | ||
| span.set_data(SPANDATA.DB_SYSTEM, db_system) | ||
| set_on_span(SPANDATA.DB_SYSTEM, db_system) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| try: | ||
| driver = conn.dialect.driver | ||
| if driver: | ||
| span.set_data(SPANDATA.DB_DRIVER_NAME, driver) | ||
| set_on_span(SPANDATA.DB_DRIVER_NAME, driver) | ||
| except Exception: | ||
| pass | ||
|
|
||
|
|
@@ -149,12 +168,12 @@ def _set_db_data(span: "Span", conn: "Any") -> None: | |
|
|
||
| db_name = conn.engine.url.database | ||
| if db_name is not None: | ||
| span.set_data(SPANDATA.DB_NAME, db_name) | ||
| set_on_span(SPANDATA.DB_NAME, db_name) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be |
||
|
|
||
| server_address = conn.engine.url.host | ||
| if server_address is not None: | ||
| span.set_data(SPANDATA.SERVER_ADDRESS, server_address) | ||
| set_on_span(SPANDATA.SERVER_ADDRESS, server_address) | ||
|
|
||
| server_port = conn.engine.url.port | ||
| if server_port is not None: | ||
| span.set_data(SPANDATA.SERVER_PORT, server_port) | ||
| set_on_span(SPANDATA.SERVER_PORT, server_port) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -170,6 +170,65 @@ def record_sql_queries( | |
| yield span | ||
|
|
||
|
|
||
| @contextlib.contextmanager | ||
| def record_sql_queries_supporting_streaming( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason not to adapt the existing |
||
| cursor: "Any", | ||
| query: "Any", | ||
| params_list: "Any", | ||
| paramstyle: "Optional[str]", | ||
| executemany: bool, | ||
| record_cursor_repr: bool = False, | ||
| span_origin: str = "manual", | ||
| ) -> "Generator[Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan], None, None]": | ||
| # TODO: Bring back capturing of params by default | ||
| client = sentry_sdk.get_client() | ||
| if client.options["_experiments"].get("record_sql_params", False): | ||
| if not params_list or params_list == [None]: | ||
| params_list = None | ||
|
|
||
| if paramstyle == "pyformat": | ||
| paramstyle = "format" | ||
| else: | ||
| params_list = None | ||
| paramstyle = None | ||
|
|
||
| query = _format_sql(cursor, query) | ||
|
|
||
| data = {} | ||
| if params_list is not None: | ||
| data["db.params"] = params_list | ||
| if paramstyle is not None: | ||
| data["db.paramstyle"] = paramstyle | ||
| if executemany: | ||
| data["db.executemany"] = True | ||
| if record_cursor_repr and cursor is not None: | ||
| data["db.cursor"] = cursor | ||
|
Comment on lines
+198
to
+205
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're setting these as attributes couple lines later, but none of them seems to be in conventions? |
||
|
|
||
| with capture_internal_exceptions(): | ||
| sentry_sdk.add_breadcrumb(message=query, category="query", data=data) | ||
|
|
||
| if has_span_streaming_enabled(client.options): | ||
| with sentry_sdk.traces.start_span( | ||
| name="<unknown SQL query>" if query is None else query, | ||
| attributes={ | ||
| "sentry.origin": span_origin, | ||
| "sentry.op": OP.DB, | ||
| }, | ||
| ) as span: | ||
| for k, v in data.items(): | ||
| span.set_attribute(k, v) | ||
| yield span | ||
|
alexander-alderman-webb marked this conversation as resolved.
|
||
| else: | ||
| with sentry_sdk.start_span( | ||
| op=OP.DB, | ||
| name=query, | ||
|
alexander-alderman-webb marked this conversation as resolved.
|
||
| origin=span_origin, | ||
| ) as span: | ||
| for k, v in data.items(): | ||
| span.set_data(k, v) | ||
| yield span | ||
|
|
||
|
|
||
| def maybe_create_breadcrumbs_from_span( | ||
| scope: "sentry_sdk.Scope", span: "sentry_sdk.tracing.Span" | ||
| ) -> None: | ||
|
|
@@ -316,22 +375,36 @@ def add_source( | |
| span.set_attribute(SPANDATA.CODE_FUNCTION, frame.f_code.co_name) | ||
|
|
||
|
|
||
| def add_query_source(span: "sentry_sdk.tracing.Span") -> None: | ||
| def add_query_source( | ||
| span: "Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan]", | ||
| ) -> None: | ||
| """ | ||
| Adds OTel compatible source code information to a database query span | ||
| """ | ||
| client = sentry_sdk.get_client() | ||
| if not client.is_active(): | ||
| return | ||
|
|
||
| if span.timestamp is None or span.start_timestamp is None: | ||
| if isinstance(span, LegacySpan): | ||
| if not client.is_active(): | ||
| return | ||
|
Comment on lines
+387
to
+388
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this check should apply to both streaming and non streaming modes, so I'd move it outside of the |
||
|
|
||
| # In the StreamedSpan case, we need to add the extra span information before | ||
| # the span finishes, so it's expected that this will be None. In the LegacySpan case, | ||
| # it should already be finished. | ||
| if span.timestamp is None: | ||
| return | ||
|
|
||
| if span.start_timestamp is None: | ||
| return | ||
|
|
||
| should_add_query_source = client.options.get("enable_db_query_source", True) | ||
| if not should_add_query_source: | ||
| return | ||
|
|
||
| duration = span.timestamp - span.start_timestamp | ||
| end_timestamp = ( | ||
| datetime.now(timezone.utc) if span.timestamp is None else span.timestamp | ||
| ) | ||
|
|
||
| duration = end_timestamp - span.start_timestamp | ||
| threshold = client.options.get("db_query_source_threshold_ms", 0) | ||
| slow_query = duration / timedelta(milliseconds=1) > threshold | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.