Skip to content

DS 2359 observation loop atomic pipeline output#21237

Merged
brunotm merged 14 commits intodevelopfrom
DS-2359-observation-loop-atomic-pipeline-output
Feb 23, 2026
Merged

DS 2359 observation loop atomic pipeline output#21237
brunotm merged 14 commits intodevelopfrom
DS-2359-observation-loop-atomic-pipeline-output

Conversation

@akuzni2
Copy link
Copy Markdown
Contributor

@akuzni2 akuzni2 commented Feb 19, 2026

  • ensure pipeline streams are extracted atomically
  • changeset

@trunk-io
Copy link
Copy Markdown

trunk-io Bot commented Feb 19, 2026

Static BadgeStatic BadgeStatic BadgeStatic Badge

Failed Test Failure Summary Logs
TestIntegration_LogEventProvider_Backfill The test failed due to a timeout while waiting for the log poller to update or reach the specified partition. Logs ↗︎

View Full Report ↗︎Docs

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR ensures pipeline streams are extracted atomically to prevent partial failures where some streams from a multi-stream pipeline have values while others don't. The change addresses a concurrency issue in the LLO observation loop where BridgeTask's use of overtimeContext could cause waiters to bail early with errors while the executor eventually succeeds.

Changes:

  • Modified observation context to block waiters until pipeline execution completes, ensuring all goroutines receive identical results
  • Added removeIncompleteGroups function to enforce all-or-nothing cache writes per pipeline group
  • Extracted StreamValueCache interface to enable test verification of atomic behavior

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
core/services/llo/observation/observation_context.go Removed ctx.Done() handling from waiter path to prevent partial results
core/services/llo/observation/data_source.go Added validation to drop incomplete pipeline groups before caching
core/services/llo/observation/cache.go Extracted StreamValueCache interface for testability
core/services/llo/observation/observation_context_test.go Added tests for partial extraction failures and concurrent atomic output
core/services/llo/observation/data_source_test.go Added comprehensive tests for atomic cache writes and removeIncompleteGroups edge cases
.changeset/warm-roses-argue.md Added changeset with minor formatting issue

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread .changeset/warm-roses-argue.md

mu.Lock()
observedValues[streamID] = val
successfulStreamIDs = append(successfulStreamIDs, streamID)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we also want to also adjust the successful stream count based on the dropped streams from removeIncompleteGroups?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure I've added separate field to log out the count of dropped streamIds and the field for successfulStreamIDs is just the count of values post filtering

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the original successfulStreamIDs even used anywhere then?

}
delete(observedValues, sid)
}
lggr.Debugw("Discarding incomplete pipeline group",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we know under what specific conditions this occurs and would it be indicative of issues with data freshness or otherwise? im wondering if we should add metrics/upgrade to a warn log to keep an eye on it

Copy link
Copy Markdown
Contributor Author

@akuzni2 akuzni2 Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was occurring more frequently with the race that's fixed in observation_context.go. now this might occur if there are individual task level failures i.e. task didn't complete in time, or bad data, or missing data, etc...

I don't think we need more metrics right now & we can lean into stream values telemetry to figure out DON wide issues

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the skip logic still iterates over individual streams - to preserve atomicity, we should attempt to refresh/observe all the streams within a group if any value is expired?

Copy link
Copy Markdown
Contributor Author

@akuzni2 akuzni2 Feb 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very good suggestion makes the observation even more correct - been added

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Feb 21, 2026

✅ No conflicts with other open PRs targeting develop

@cl-sonarqube-production
Copy link
Copy Markdown


mu.Lock()
observedValues[streamID] = val
successfulStreamIDs = append(successfulStreamIDs, streamID)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the original successfulStreamIDs even used anywhere then?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants