Add workflow_streams samples#300
Open
jssmith wants to merge 8 commits intotemporalio:mainfrom
Open
Conversation
Initial samples directory for temporalio.contrib.workflow_streams, the workflow-hosted durable event stream contrib (experimental, contrib/pubsub branch of sdk-python). The order_workflow scenario covers the basic publisher path: a workflow binds a typed topic in @workflow.init, an activity publishes events via the topic handle, and a starter subscribes with WorkflowStreamClient and prints events as they arrive. Also enables the uv supply-chain cooldown options in the lockfile.
Adds a second scenario demonstrating the central Workflow Streams use case: a consumer disconnects mid-stream and resumes later via subscribe(from_offset=...), with no events lost or duplicated. The existing OrderWorkflow finishes too quickly to make the pattern visible, so this introduces a multi-stage PipelineWorkflow paced with workflow.sleep between stages. The runner reads a couple of events, persists item.offset + 1 to a temp file, sleeps "disconnected" while the workflow keeps publishing, then opens a fresh Client + WorkflowStreamClient and resumes from the persisted offset — the same shape that works across actual process restarts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a third scenario covering the third publisher shape: a backend service or scheduled job pushing events into a workflow it didn't itself start. The earlier scenarios publish either from inside the workflow or from one of its activities; this one uses WorkflowStreamClient.create() externally. HubWorkflow is a passive stream host — it does no work of its own and just waits to be told to close, fitting the event-bus pattern. The runner publishes a series of news headlines, runs a subscriber task alongside, signals close, and exits when both tasks complete. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a fourth scenario for long-running workflows that need to bound their event log: the workflow publishes events at a fixed cadence and calls self.stream.truncate(...) periodically to keep only the most recent entries. The runner subscribes twice — fast and slow — to make the trade visible: the fast subscriber sees every offset in order; the slow one falls behind a truncation, has its iterator transparently jump forward to the new base offset, and shows the offset gap that intermediate events fell into. This is the model for high-volume long-running streams: bounded log size, slow consumers may miss intermediate events but always see the most recent state. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…handles - Directory and module path renamed to plural to match sdk-python `temporalio.contrib.workflow_streams` rename. - Workflow-side: bind a typed topic handle in `@workflow.init` and call `topic.publish(value)` — the removed `WorkflowStream.publish` form is gone. Same change applied to the activity and external-publisher. - Activity: `WorkflowStreamClient.from_activity()` → `from_within_activity()`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- README: fix scenario count (two -> four), document subscriber start position and continue-as-new semantics for stream_state - hub_workflow: drop stale comment referencing a README race note that does not exist in this sample - payment_activity: trim long publisher_id/dedup caveat — moved out of the first sample's docstring to keep it approachable
…be shape End-to-end runs of the four workflow_streams scenarios surfaced two sample-side issues, both fixed here. run_publisher's consumer asserted ``isinstance(item.data, Payload)`` and called ``payload_converter.from_payload(item.data, T)``. The contrib's ``subscribe()`` defaults to converter-decoded data, not raw payloads, so this assertion fired on the first run. Switch to ``result_type=RawValue`` (the documented escape hatch for heterogeneous topics) and read ``item.data.payload``. Items published in the same workflow task that returns from ``@workflow.run`` were not delivered to subscribers — the in-memory log dies with the workflow and the next subscriber poll lands on a completed workflow. Fix: each scenario now uses an in-band terminator that subscribers break on, and each workflow holds the run open with ``await workflow.sleep(timedelta(milliseconds=500))`` so that final publish is fetched before the workflow exits: - OrderWorkflow / PipelineWorkflow: the workflow's own ``StatusEvent(kind="complete")`` / ``StageEvent(stage="complete")`` is the terminator (consumers already broke on it). - HubWorkflow: the *publisher* in run_external_publisher emits a sentinel ``NewsEvent(headline="__done__")`` immediately before signaling close; the consumer breaks on the sentinel. - TickerWorkflow: the final tick (n == count - 1) is the terminator; ``keep_last`` guarantees that offset survives the last truncation, so even slow consumers reach it. Because subscribers stop polling on the terminator, by the time ``workflow.run`` returns there are no in-flight poll handlers — no ``UnfinishedUpdateHandlersWarning`` from the SDK and no need for ``detach_pollers()`` / ``wait_condition(all_handlers_finished)`` in the workflow exit path. Two consecutive end-to-end runs of all four scenarios pass cleanly against ``temporal server start-dev --headless``.
Subscribers don't exit on their own when the host workflow completes — they need an in-band terminator, and the workflow needs to hold open briefly so the final publish is fetched before run() returns. Both pieces show up in every scenario here, so document them in one place and update scenario 3's description to mention the sentinel headline the publisher emits.
Comment on lines
+33
to
+35
| # Hold the run open briefly so subscribers' final poll | ||
| # delivers any items still in the log. | ||
| await workflow.sleep(timedelta(milliseconds=500)) |
There was a problem hiding this comment.
Are there more dependable ways of checking if the log is not empty?
Contributor
Author
There was a problem hiding this comment.
Sometimes, but not always. We are not just checking that the log has been read, we need to know that the application is done reading before we end the workflow. Real applications may want to leave the workflow open for a longer period of time.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds samples for
temporalio.contrib.workflow_streams, the workflow-hosted durable event stream contrib (experimental,contrib/pubsubbranch of sdk-python).Four scenarios under
workflow_streams/, each exercising more of the API:order_workflow— minimal publisher driven by an activity, with two typed topics on a single streampipeline_workflow— reconnecting subscriber across activity stepshub_workflow— external publisher driving a long-lived workflowticker_workflow— truncating publisher with a bounded ring bufferWorkflows bind typed topic handles in
@workflow.initand publish viatopic.publish(value). Subscribers iterate from aWorkflowStreamClient. A smallrace_with_workflowhelper races the consumer againsthandle.result()so a workflow failure surfaces as an exception rather than blocking the subscriber forever.This is one half of #299, split out so the workflow_streams basics can land independently of the openai_agents streaming sample (separate PR).
Test plan
contrib/pubsuband install into the samples uv environmentuv run workflow_streams/run_worker.pyand run eachrun_*.pystarter; verify expected output