diff --git a/.github/workflows/pr-validation.yml b/.github/workflows/pr-validation.yml index e319ced7..b57ec8d1 100644 --- a/.github/workflows/pr-validation.yml +++ b/.github/workflows/pr-validation.yml @@ -33,7 +33,7 @@ jobs: # Label used to access the service container postgres: # Docker Hub image - image: postgres:13.18 + image: postgres:16 env: POSTGRES_PASSWORD: postgres POSTGRES_USER: postgres @@ -50,13 +50,13 @@ jobs: steps: # Checks-out your repository under $GITHUB_WORKSPACE - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 1 submodules: true - name: Setup Go environment - uses: actions/setup-go@v3.3.0 + uses: actions/setup-go@v5 with: go-version: ${{ matrix.go-version }} @@ -64,7 +64,7 @@ jobs: run: go get . - name: Install Protoc - uses: arduino/setup-protoc@v2 + uses: arduino/setup-protoc@v3 with: version: ${{ env.PROTOC_VERSION }} @@ -76,6 +76,12 @@ jobs: - name: Generate grpc code run: protoc --go_out=. --go-grpc_out=. -I ./submodules/durabletask-protobuf/protos orchestrator_service.proto + - name: Run go vet + run: go vet ./... + + - name: Run golangci-lint + uses: golangci/golangci-lint-action@v7 + - name: Run integration tests env: POSTGRES_ENABLED: "true" diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 00000000..5e180f2b --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,23 @@ +version: "2" + +run: + timeout: 5m + +linters: + enable: + - errcheck + - govet + - ineffassign + - staticcheck + - unused + - gocritic + - errorlint + settings: + errorlint: + errorf: true + asserts: true + comparison: true + exclusions: + paths: + - internal/protos + - tests/mocks diff --git a/backend/executor.go b/backend/executor.go index feef60d7..06f5287c 100644 --- a/backend/executor.go +++ b/backend/executor.go @@ -511,7 +511,7 @@ loop: } // mustEmbedUnimplementedTaskHubSidecarServiceServer implements protos.TaskHubSidecarServiceServer -func (grpcExecutor) mustEmbedUnimplementedTaskHubSidecarServiceServer() { +func (grpcExecutor) mustEmbedUnimplementedTaskHubSidecarServiceServer() { //nolint:unused } func createGetInstanceResponse(req *protos.GetInstanceRequest, metadata *api.OrchestrationMetadata) *protos.GetInstanceResponse { diff --git a/backend/orchestration.go b/backend/orchestration.go index c416da2b..567108c5 100644 --- a/backend/orchestration.go +++ b/backend/orchestration.go @@ -2,6 +2,7 @@ package backend import ( "context" + "errors" "fmt" "time" @@ -143,21 +144,24 @@ func (p *orchestratorProcessor) AbandonWorkItem(ctx context.Context, wi WorkItem func (w *orchestratorProcessor) applyWorkItem(ctx context.Context, wi *OrchestrationWorkItem) (context.Context, trace.Span, bool) { // Ignore work items for orchestrations that are completed or are in a corrupted state. - if !wi.State.IsValid() { + switch { + case !wi.State.IsValid(): w.logger.Warnf("%v: orchestration state is invalid; dropping work item", wi.InstanceID) return nil, nil, false - } else if wi.State.IsCompleted() { + case wi.State.IsCompleted(): w.logger.Warnf("%v: orchestration already completed; dropping work item", wi.InstanceID) return nil, nil, false - } else if len(wi.NewEvents) == 0 { + case len(wi.NewEvents) == 0: w.logger.Warnf("%v: the work item had no events!", wi.InstanceID) } // The orchestrator started event is used primarily for updating the current time as reported // by the orchestration context APIs. - wi.State.AddEvent(helpers.NewOrchestratorStartedEvent()) + if err := wi.State.AddEvent(helpers.NewOrchestratorStartedEvent()); err != nil { + w.logger.Warnf("%v: failed to add orchestrator started event: %v", wi.InstanceID, err) + } - // Each orchestration instance gets its own distributed tracing span. However, the implementation of + // Each orchestration instancegets its own distributed tracing span. However, the implementation of // endOrchestratorSpan will "cancel" the span mark the span as "unsampled" if the orchestration isn't // complete. This is part of the strategy for producing one span for the entire orchestration execution, // which isn't something that's natively supported by OTel today. @@ -169,7 +173,7 @@ func (w *orchestratorProcessor) applyWorkItem(ctx context.Context, wi *Orchestra added := 0 for _, e := range wi.NewEvents { if err := wi.State.AddEvent(e); err != nil { - if err == ErrDuplicateEvent { + if errors.Is(err, ErrDuplicateEvent) { w.logger.Warnf("%v: dropping duplicate event: %v", wi.InstanceID, e) } else { w.logger.Warnf("%v: dropping event: %v, %v", wi.InstanceID, e, err) @@ -270,7 +274,8 @@ func (w *orchestratorProcessor) startOrResumeOrchestratorSpan(ctx context.Contex } func (w *orchestratorProcessor) endOrchestratorSpan(ctx context.Context, wi *OrchestrationWorkItem, span trace.Span, continuedAsNew bool) { - if wi.State.IsCompleted() { + switch { + case wi.State.IsCompleted(): if fd, err := wi.State.FailureDetails(); err == nil { span.SetStatus(codes.Error, fd.ErrorMessage) } @@ -280,12 +285,12 @@ func (w *orchestratorProcessor) endOrchestratorSpan(ctx context.Context, wi *Orc }) addNotableEventsToSpan(wi.State.OldEvents(), span) addNotableEventsToSpan(wi.State.NewEvents(), span) - } else if continuedAsNew { + case continuedAsNew: span.SetAttributes(attribute.KeyValue{ Key: "durabletask.runtime_status", Value: attribute.StringValue(helpers.ToRuntimeStatusString(protos.OrchestrationStatus_ORCHESTRATION_STATUS_CONTINUED_AS_NEW)), }) - } else { + default: // Cancel the span - we want to publish it only when an orchestration // completes or when it continue-as-new's. helpers.CancelSpan(span) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 4b23b1e0..db7670e8 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -44,7 +44,7 @@ func NewPostgresOptions(host string, port uint16, database string, user string, if err != nil { panic(fmt.Errorf("failed to parse the postgres connection string: %w", err)) } - conf.ConnConfig.Config.ConnectTimeout = 2 * time.Minute + conf.ConnConfig.ConnectTimeout = 2 * time.Minute conf.MaxConnLifetime = 2 * time.Minute conf.MaxConnIdleTime = 2 * time.Minute conf.MaxConns = 1 @@ -138,9 +138,9 @@ func (be *postgresBackend) AbandonOrchestrationWorkItem(ctx context.Context, wi if err != nil { return err } - defer tx.Rollback(ctx) + defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op - var visibleTime *time.Time = nil + var visibleTime*time.Time = nil if delay := wi.GetAbandonDelay(); delay > 0 { t := time.Now().UTC().Add(delay) visibleTime = &t @@ -199,7 +199,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi if err != nil { return err } - defer tx.Rollback(ctx) + defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op now := time.Now().UTC() @@ -219,7 +219,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi continue } isCreated = true - sqlSB.WriteString(fmt.Sprintf("CreatedTime = $%d, Input = $%d, ", currIndex, currIndex+1)) + fmt.Fprintf(&sqlSB, "CreatedTime = $%d, Input = $%d, ", currIndex, currIndex+1) currIndex += 2 sqlUpdateArgs = append(sqlUpdateArgs, e.Timestamp.AsTime()) sqlUpdateArgs = append(sqlUpdateArgs, es.Input.GetValue()) @@ -229,7 +229,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi continue } isCompleted = true - sqlSB.WriteString(fmt.Sprintf("CompletedTime = $%d, Output = $%d, FailureDetails = $%d, ", currIndex, currIndex+1, currIndex+2)) + fmt.Fprintf(&sqlSB, "CompletedTime = $%d, Output = $%d, FailureDetails = $%d, ", currIndex, currIndex+1, currIndex+2) currIndex += 3 sqlUpdateArgs = append(sqlUpdateArgs, now) sqlUpdateArgs = append(sqlUpdateArgs, ec.Result.GetValue()) @@ -247,14 +247,13 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi } if wi.State.CustomStatus != nil { - sqlSB.WriteString(fmt.Sprintf("CustomStatus = $%d, ", currIndex)) + fmt.Fprintf(&sqlSB, "CustomStatus = $%d, ", currIndex) currIndex++ sqlUpdateArgs = append(sqlUpdateArgs, wi.State.CustomStatus.Value) } // TODO: Support for stickiness, which would extend the LockExpiration - sqlSB.WriteString(fmt.Sprintf("RuntimeStatus = $%d, LastUpdatedTime = $%d, LockExpiration = NULL WHERE InstanceID = $%d AND LockedBy = $%d", currIndex, currIndex+1, currIndex+2, currIndex+3)) - currIndex += 4 + fmt.Fprintf(&sqlSB, "RuntimeStatus = $%d, LastUpdatedTime = $%d, LockExpiration = NULL WHERE InstanceID = $%d AND LockedBy = $%d", currIndex, currIndex+1, currIndex+2, currIndex+3) sqlUpdateArgs = append(sqlUpdateArgs, helpers.ToRuntimeStatusString(wi.State.RuntimeStatus()), now, string(wi.InstanceID), wi.LockedBy) result, err := tx.Exec(ctx, sqlSB.String(), sqlUpdateArgs...) @@ -282,7 +281,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi builder := strings.Builder{} builder.WriteString("INSERT INTO History (InstanceID, SequenceNumber, EventPayload) VALUES ") for i := 0; i < newHistoryCount; i++ { - builder.WriteString(fmt.Sprintf("($%d, $%d, $%d)", 3*i+1, 3*i+2, 3*i+3)) + fmt.Fprintf(&builder, "($%d, $%d, $%d)", 3*i+1, 3*i+2, 3*i+3) if i < newHistoryCount-1 { builder.WriteString(", ") } @@ -313,7 +312,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi builder := strings.Builder{} builder.WriteString("INSERT INTO NewTasks (InstanceID, EventPayload) VALUES ") for i := 0; i < newActivityCount; i++ { - builder.WriteString(fmt.Sprintf("($%d, $%d)", 2*i+1, 2*i+2)) + fmt.Fprintf(&builder, "($%d, $%d)", 2*i+1, 2*i+2) if i < newActivityCount-1 { builder.WriteString(", ") } @@ -342,7 +341,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi builder := strings.Builder{} builder.WriteString("INSERT INTO NewEvents (InstanceID, EventPayload, VisibleTime) VALUES ") for i := 0; i < newEventCount; i++ { - builder.WriteString(fmt.Sprintf("($%d, $%d, $%d)", 3*i+1, 3*i+2, 3*i+3)) + fmt.Fprintf(&builder, "($%d, $%d, $%d)", 3*i+1, 3*i+2, 3*i+3) if i < newEventCount-1 { builder.WriteString(", ") } @@ -431,7 +430,7 @@ func (be *postgresBackend) CreateOrchestrationInstance(ctx context.Context, e *b if err != nil { return fmt.Errorf("failed to start transaction: %w", err) } - defer tx.Rollback(ctx) + defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op var instanceID string if instanceID, err = be.createOrchestrationInstanceInternal(ctx, e, tx, opts...); errors.Is(err, api.ErrIgnoreInstance) { @@ -480,7 +479,9 @@ func (be *postgresBackend) createOrchestrationInstanceInternal(ctx context.Conte policy := &protos.OrchestrationIdReusePolicy{} for _, opt := range opts { - opt(policy) + if err := opt(policy); err != nil { + return "", err + } } rows, err := insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent) @@ -765,10 +766,10 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe if err != nil { return nil, err } - defer tx.Rollback(ctx) + defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op now := time.Now().UTC() - newLockExpiration := now.Add(be.options.OrchestrationLockTimeout) + newLockExpiration:= now.Add(be.options.OrchestrationLockTimeout) // Place a lock on an orchestration instance that has new events that are ready to be executed. row := tx.QueryRow( @@ -913,7 +914,7 @@ func (be *postgresBackend) CompleteActivityWorkItem(ctx context.Context, wi *bac if err != nil { return err } - defer tx.Rollback(ctx) + defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op bytes, err := backend.MarshalHistoryEvent(wi.Result) if err != nil { @@ -978,7 +979,7 @@ func (be *postgresBackend) PurgeOrchestrationState(ctx context.Context, id api.I if err != nil { return err } - defer tx.Rollback(ctx) + defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op if err := be.cleanupOrchestrationStateInternal(ctx, tx, id, true); err != nil { return err diff --git a/backend/runtimestate.go b/backend/runtimestate.go index 96a63fea..8dabe3ab 100644 --- a/backend/runtimestate.go +++ b/backend/runtimestate.go @@ -47,7 +47,7 @@ func NewOrchestrationRuntimeState(instanceID api.InstanceID, existingHistory []* } for _, e := range existingHistory { - s.addEvent(e, false) + _ = s.addEvent(e, false) } return s @@ -75,8 +75,6 @@ func (s *OrchestrationRuntimeState) addEvent(e *HistoryEvent, isNew bool) error s.isSuspended = true } else if e.GetExecutionResumed() != nil { s.isSuspended = false - } else { - // TODO: Check for other possible duplicates using task IDs } if isNew { @@ -107,10 +105,12 @@ func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorA if completedAction.OrchestrationStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_CONTINUED_AS_NEW { newState := NewOrchestrationRuntimeState(s.instanceID, []*protos.HistoryEvent{}) newState.continuedAsNew = true - newState.AddEvent(helpers.NewOrchestratorStartedEvent()) + if err := newState.AddEvent(helpers.NewOrchestratorStartedEvent()); err != nil { + return false, fmt.Errorf("failed to add orchestrator started event: %w", err) + } // Duplicate the start event info, updating just the input - newState.AddEvent( + if err := newState.AddEvent( helpers.NewExecutionStartedEvent( s.startEvent.Name, string(s.instanceID), @@ -119,11 +119,15 @@ func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorA s.startEvent.ParentTraceContext, nil, ), - ) + ); err != nil { + return false, fmt.Errorf("failed to add execution started event: %w", err) + } // Unprocessed "carryover" events for _, e := range completedAction.CarryoverEvents { - newState.AddEvent(e) + if err := newState.AddEvent(e); err != nil { + return false, fmt.Errorf("failed to add carryover event: %w", err) + } } // Overwrite the current state object with a new one @@ -132,7 +136,9 @@ func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorA // ignore all remaining actions return true, nil } else { - s.AddEvent(helpers.NewExecutionCompletedEvent(action.Id, completedAction.OrchestrationStatus, completedAction.Result, completedAction.FailureDetails)) + if err := s.AddEvent(helpers.NewExecutionCompletedEvent(action.Id, completedAction.OrchestrationStatus, completedAction.Result, completedAction.FailureDetails)); err != nil { + return false, fmt.Errorf("failed to add execution completed event: %w", err) + } if s.startEvent.GetParentInstance() != nil { msg := OrchestratorMessage{ HistoryEvent: &protos.HistoryEvent{EventId: -1, Timestamp: timestamppb.Now()}, @@ -158,7 +164,9 @@ func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorA } } } else if createtimer := action.GetCreateTimer(); createtimer != nil { - s.AddEvent(helpers.NewTimerCreatedEvent(action.Id, createtimer.FireAt)) + if err := s.AddEvent(helpers.NewTimerCreatedEvent(action.Id, createtimer.FireAt)); err != nil { + return false, fmt.Errorf("failed to add timer created event: %w", err) + } s.pendingTimers = append(s.pendingTimers, helpers.NewTimerFiredEvent(action.Id, createtimer.FireAt, currentTraceContext)) } else if scheduleTask := action.GetScheduleTask(); scheduleTask != nil { scheduledEvent := helpers.NewTaskScheduledEvent( @@ -168,21 +176,25 @@ func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorA scheduleTask.Input, currentTraceContext, ) - s.AddEvent(scheduledEvent) - s.pendingTasks = append(s.pendingTasks, scheduledEvent) + if err := s.AddEvent(scheduledEvent); err != nil { + return false, fmt.Errorf("failed to add task scheduled event: %w", err) + } + s.pendingTasks= append(s.pendingTasks, scheduledEvent) } else if createSO := action.GetCreateSubOrchestration(); createSO != nil { // Autogenerate an instance ID for the sub-orchestration if none is provided, using a // deterministic algorithm based on the parent instance ID to help enable de-duplication. if createSO.InstanceId == "" { createSO.InstanceId = fmt.Sprintf("%s:%04x", s.instanceID, action.Id) } - s.AddEvent(helpers.NewSubOrchestrationCreatedEvent( + if err := s.AddEvent(helpers.NewSubOrchestrationCreatedEvent( action.Id, createSO.Name, createSO.Version, createSO.Input, createSO.InstanceId, - currentTraceContext)) + currentTraceContext)); err != nil { + return false, fmt.Errorf("failed to add sub-orchestration created event: %w", err) + } startEvent := helpers.NewExecutionStartedEvent( createSO.Name, createSO.InstanceId, @@ -194,7 +206,9 @@ func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorA s.pendingMessages = append(s.pendingMessages, OrchestratorMessage{HistoryEvent: startEvent, TargetInstanceID: createSO.InstanceId}) } else if sendEvent := action.GetSendEvent(); sendEvent != nil { e := helpers.NewSendEventEvent(action.Id, sendEvent.Instance.InstanceId, sendEvent.Name, sendEvent.Data) - s.AddEvent(e) + if err := s.AddEvent(e); err != nil { + return false, fmt.Errorf("failed to add send event: %w", err) + } s.pendingMessages = append(s.pendingMessages, OrchestratorMessage{HistoryEvent: e, TargetInstanceID: sendEvent.Instance.InstanceId}) } else if terminate := action.GetTerminateOrchestration(); terminate != nil { // Send a message to terminate the target orchestration @@ -242,11 +256,12 @@ func (s *OrchestrationRuntimeState) Output() (string, error) { } func (s *OrchestrationRuntimeState) RuntimeStatus() protos.OrchestrationStatus { - if s.startEvent == nil { + switch { + case s.startEvent == nil: return protos.OrchestrationStatus_ORCHESTRATION_STATUS_PENDING - } else if s.isSuspended { + case s.isSuspended: return protos.OrchestrationStatus_ORCHESTRATION_STATUS_SUSPENDED - } else if s.completedEvent != nil { + case s.completedEvent != nil: return s.completedEvent.GetOrchestrationStatus() } diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 271da9a9..a153d409 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -71,11 +71,12 @@ func NewSqliteBackend(opts *SqliteOptions, logger backend.Logger) backend.Backen if opts == nil { opts = NewSqliteOptions("") } - if opts.FilePath == "" { + switch { + case opts.FilePath == "": be.dsn = "file::memory:" - } else if !strings.HasPrefix(opts.FilePath, "file:") { + case !strings.HasPrefix(opts.FilePath, "file:"): be.dsn = "file:" + opts.FilePath - } else { + default: be.dsn = opts.FilePath } @@ -111,16 +112,17 @@ func (be *sqliteBackend) DeleteTaskHub(ctx context.Context) error { if be.options.FilePath == "" { // In-memory DB return nil - } else { - // File-system DB - err := os.Remove(be.options.FilePath) - if err == nil { - return nil - } else if os.IsNotExist(err) { - return backend.ErrTaskHubNotFound - } else { - return fmt.Errorf("failed to delete the database: %w", err) - } + } + + // File-system DB + err := os.Remove(be.options.FilePath) + switch { + case err == nil: + return nil + case os.IsNotExist(err): + return backend.ErrTaskHubNotFound + default: + return fmt.Errorf("failed to delete the database: %w", err) } } @@ -134,7 +136,7 @@ func (be *sqliteBackend) AbandonOrchestrationWorkItem(ctx context.Context, wi *b if err != nil { return err } - defer tx.Rollback() + defer tx.Rollback() //nolint:errcheck // rollback after commit is a no-op var visibleTime *time.Time = nil if delay := wi.GetAbandonDelay(); delay > 0 { @@ -195,7 +197,7 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi * if err != nil { return err } - defer tx.Rollback() + defer tx.Rollback() //nolint:errcheck // rollback after commit is a no-op now := time.Now().UTC() @@ -337,7 +339,7 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi * OperationStatus: []protos.OrchestrationStatus{protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED}, Action: api.REUSE_ID_ACTION_TERMINATE, })); err != nil { - if err == backend.ErrDuplicateEvent { + if errors.Is(err, backend.ErrDuplicateEvent) { be.logger.Warnf( "%v: dropping sub-orchestration creation event because an instance with the target ID (%v) already exists.", wi.InstanceID, @@ -401,7 +403,7 @@ func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *bac if err != nil { return fmt.Errorf("failed to start transaction: %w", err) } - defer tx.Rollback() + defer tx.Rollback() //nolint:errcheck // rollback after commit is a no-op var instanceID string if instanceID, err = be.createOrchestrationInstanceInternal(ctx, e, tx, opts...); errors.Is(err, api.ErrIgnoreInstance) { @@ -450,7 +452,9 @@ func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context policy := &protos.OrchestrationIdReusePolicy{} for _, opt := range opts { - opt(policy) + if err := opt(policy); err != nil { + return "", err + } } rows, err := insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent) @@ -535,7 +539,7 @@ func (be *sqliteBackend) handleInstanceExists(ctx context.Context, tx *sql.Tx, s // should never happen, because we clean up instance before create new one if rows <= 0 { - return fmt.Errorf("failed to insert into [Instances] table because entry already exists.") + return fmt.Errorf("failed to insert into [Instances] table because entry already exists") } return nil } @@ -644,7 +648,7 @@ func (be *sqliteBackend) GetOrchestrationMetadata(ctx context.Context, iid api.I ) err := row.Err() - if err == sql.ErrNoRows { + if errors.Is(err, sql.ErrNoRows) { return nil, api.ErrInstanceNotFound } else if err != nil { return nil, fmt.Errorf("failed to query the Instances table: %w", row.Err()) @@ -662,7 +666,7 @@ func (be *sqliteBackend) GetOrchestrationMetadata(ctx context.Context, iid api.I var failureDetailsPayload []byte err = row.Scan(&instanceID, &name, &runtimeStatus, &createdAt, &lastUpdatedAt, &input, &output, &customStatus, &failureDetailsPayload) - if err == sql.ErrNoRows { + if errors.Is(err, sql.ErrNoRows) { return nil, api.ErrInstanceNotFound } else if err != nil { return nil, fmt.Errorf("failed to scan the Instances table result: %w", err) @@ -745,7 +749,7 @@ func (be *sqliteBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend if err != nil { return nil, err } - defer tx.Rollback() + defer tx.Rollback() //nolint:errcheck // rollback after commit is a no-op now := time.Now().UTC() newLockExpiration := now.Add(be.options.OrchestrationLockTimeout) @@ -897,7 +901,7 @@ func (be *sqliteBackend) CompleteActivityWorkItem(ctx context.Context, wi *backe if err != nil { return err } - defer tx.Rollback() + defer tx.Rollback() //nolint:errcheck // rollback after commit is a no-op bytes, err := backend.MarshalHistoryEvent(wi.Result) if err != nil { @@ -962,7 +966,7 @@ func (be *sqliteBackend) PurgeOrchestrationState(ctx context.Context, id api.Ins if err != nil { return err } - defer tx.Rollback() + defer tx.Rollback() //nolint:errcheck // rollback after commit is a no-op if err := be.cleanupOrchestrationStateInternal(ctx, tx, id, true); err != nil { return err diff --git a/backend/taskhub.go b/backend/taskhub.go index 688cb735..fa4603b9 100644 --- a/backend/taskhub.go +++ b/backend/taskhub.go @@ -2,6 +2,7 @@ package backend import ( "context" + "errors" "sync" ) @@ -31,7 +32,7 @@ func NewTaskHubWorker(be Backend, orchestrationWorker TaskWorker, activityWorker func (w *taskHubWorker) Start(ctx context.Context) error { // TODO: Check for already started worker - if err := w.backend.CreateTaskHub(ctx); err != nil && err != ErrTaskHubExists { + if err := w.backend.CreateTaskHub(ctx); err != nil && !errors.Is(err, ErrTaskHubExists) { return err } if err := w.backend.Start(ctx); err != nil { diff --git a/client/client_grpc.go b/client/client_grpc.go index bf1ef199..996fd985 100644 --- a/client/client_grpc.go +++ b/client/client_grpc.go @@ -34,7 +34,9 @@ func NewTaskHubGrpcClient(cc grpc.ClientConnInterface, logger backend.Logger) *T func (c *TaskHubGrpcClient) ScheduleNewOrchestration(ctx context.Context, orchestrator string, opts ...api.NewOrchestrationOptions) (api.InstanceID, error) { req := &protos.CreateInstanceRequest{Name: orchestrator} for _, configure := range opts { - configure(req) + if err := configure(req); err != nil { + return api.EmptyInstanceID, fmt.Errorf("failed to configure orchestration request: %w", err) + } } if req.InstanceId == "" { req.InstanceId = uuid.NewString() diff --git a/internal/helpers/tracing.go b/internal/helpers/tracing.go index 12e62b86..492241bb 100644 --- a/internal/helpers/tracing.go +++ b/internal/helpers/tracing.go @@ -78,15 +78,16 @@ func startNewSpan( timestamp time.Time, ) (context.Context, trace.Span) { var spanName string - if taskVersion != "" { + switch { + case taskVersion != "": spanName = taskType + "||" + taskName + "||" + taskVersion attributes = append(attributes, attribute.KeyValue{ Key: "durabletask.task.version", Value: attribute.StringValue(taskVersion), }) - } else if taskName != "" { + case taskName != "": spanName = taskType + "||" + taskName - } else { + default: spanName = taskType } @@ -149,7 +150,7 @@ func SpanContextFromTraceContext(tc *protos.TraceContext) (trace.SpanContext, er } else { // backwards compatibility with older versions of the protobuf traceID = tc.GetTraceParent() - spanID = tc.GetSpanID() + spanID = tc.GetSpanID() //nolint:staticcheck // backwards compatibility with older versions of the protobuf traceFlags = "01" // sampled } diff --git a/samples/azurefunctions/middleware.go b/samples/azurefunctions/middleware.go index 88ba189d..980a8349 100644 --- a/samples/azurefunctions/middleware.go +++ b/samples/azurefunctions/middleware.go @@ -38,7 +38,10 @@ func MapOrchestrator(o task.Orchestrator) func(http.ResponseWriter, *http.Reques return func(w http.ResponseWriter, httpReq *http.Request) { var invokeRequest InvokeRequest d := json.NewDecoder(httpReq.Body) - d.Decode(&invokeRequest) + if err := d.Decode(&invokeRequest); err != nil { + fmt.Printf("ERROR: Failed to decode invoke request: %v\n", err) + return + } // TODO: Give the schema, construct the context object and invoke the orchestrator contextParam := invokeRequest.Data["context"] @@ -87,7 +90,9 @@ func MapOrchestrator(o task.Orchestrator) func(http.ResponseWriter, *http.Reques } fmt.Println("Sending response JSON:", string(responseJson)) w.Header().Set("Content-Type", "application/json") - w.Write(responseJson) + if _, err := w.Write(responseJson); err != nil { + fmt.Printf("ERROR: Failed to write response: %v\n", err) + } } } @@ -101,7 +106,10 @@ func MapActivity(a task.Activity) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { var invokeRequest InvokeRequest d := json.NewDecoder(r.Body) - d.Decode(&invokeRequest) + if err := d.Decode(&invokeRequest); err != nil { + fmt.Printf("ERROR: Failed to decode invoke request: %v\n", err) + return + } fmt.Println("Activity request:", invokeRequest) @@ -132,7 +140,7 @@ func MapActivity(a task.Activity) func(http.ResponseWriter, *http.Request) { fmt.Printf("Task failed: %v\n", tf.FailureDetails) statusCode = 500 } else { - panic(fmt.Errorf("Unexpected event type: %v", e)) + panic(fmt.Errorf("unexpected event type: %v", e)) } invokeResponse := &InvokeResponse{ReturnValue: []byte(returnValue)} @@ -144,6 +152,8 @@ func MapActivity(a task.Activity) func(http.ResponseWriter, *http.Request) { fmt.Printf("Sending %d response: %s\n", statusCode, string(responseJson)) w.Header().Set("Content-Type", "application/json") w.WriteHeader(statusCode) - w.Write(responseJson) + if _, err := w.Write(responseJson); err != nil { + fmt.Printf("ERROR: Failed to write response: %v\n", err) + } } } diff --git a/samples/distributedtracing/distributedtracing.go b/samples/distributedtracing/distributedtracing.go index 09de0744..e99d7ffb 100644 --- a/samples/distributedtracing/distributedtracing.go +++ b/samples/distributedtracing/distributedtracing.go @@ -32,9 +32,15 @@ func main() { // Create a new task registry and add the orchestrator and activities r := task.NewTaskRegistry() - r.AddOrchestrator(DistributedTraceSampleOrchestrator) - r.AddActivity(DoWorkActivity) - r.AddActivity(CallHttpEndpointActivity) + if err := r.AddOrchestrator(DistributedTraceSampleOrchestrator); err != nil { + log.Fatalf("Failed to register orchestrator: %v", err) //nolint:gocritic // Fatalf in sample main() is acceptable + } + if err := r.AddActivity(DoWorkActivity); err != nil { + log.Fatalf("Failed to register activity: %v", err) + } + if err := r.AddActivity(CallHttpEndpointActivity); err != nil { + log.Fatalf("Failed to register activity: %v", err) + } // Init the client ctx := context.Background() @@ -42,7 +48,11 @@ func main() { if err != nil { log.Fatalf("Failed to initialize the client: %v", err) } - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + log.Printf("Failed to shutdown worker: %v", err) + } + }() // Start a new orchestration id, err := client.ScheduleNewOrchestration(ctx, DistributedTraceSampleOrchestrator) diff --git a/samples/externalevents/externalevents.go b/samples/externalevents/externalevents.go index cba48eed..2ca4da5e 100644 --- a/samples/externalevents/externalevents.go +++ b/samples/externalevents/externalevents.go @@ -15,7 +15,9 @@ import ( func main() { // Create a new task registry and add the orchestrator and activities r := task.NewTaskRegistry() - r.AddOrchestrator(ExternalEventOrchestrator) + if err := r.AddOrchestrator(ExternalEventOrchestrator); err != nil { + log.Fatalf("Failed to register orchestrator: %v", err) + } // Init the client ctx := context.Background() @@ -23,14 +25,18 @@ func main() { if err != nil { log.Fatalf("Failed to initialize the client: %v", err) } - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + log.Printf("Failed to shutdown worker: %v", err) + } + }() // Start a new orchestration id, err := client.ScheduleNewOrchestration(ctx, "ExternalEventOrchestrator") if err != nil { - log.Fatalf("Failed to schedule new orchestration: %v", err) + log.Fatalf("Failed to schedule new orchestration: %v", err) //nolint:gocritic // Fatalf in sample main() is acceptable } - metadata, err := client.WaitForOrchestrationStart(ctx, id) + _, err = client.WaitForOrchestrationStart(ctx, id) if err != nil { log.Fatalf("Failed to wait for orchestration to start: %v", err) } @@ -39,14 +45,16 @@ func main() { go func() { fmt.Println("Enter your first name: ") var nameInput string - fmt.Scanln(&nameInput) + if _, err := fmt.Scanln(&nameInput); err != nil { + log.Fatalf("Failed to read input: %v", err) + } if err = client.RaiseEvent(ctx, id, "Name", api.WithEventPayload(nameInput)); err != nil { log.Fatalf("Failed to raise event: %v", err) } }() // After the orchestration receives the event, it should complete on its own - metadata, err = client.WaitForOrchestrationCompletion(ctx, id) + metadata, err := client.WaitForOrchestrationCompletion(ctx, id) if err != nil { log.Fatalf("Failed to wait for orchestration to complete: %v", err) } diff --git a/samples/heterogeneous/heterogeneous.go b/samples/heterogeneous/heterogeneous.go index b9b87fb1..01a0129f 100644 --- a/samples/heterogeneous/heterogeneous.go +++ b/samples/heterogeneous/heterogeneous.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log" "net" "time" @@ -143,7 +144,11 @@ func main() { if err := taskHubWorker.Start(ctx); err != nil { panic(err) } - defer taskHubWorker.Shutdown(ctx) + defer func() { + if err := taskHubWorker.Shutdown(ctx); err != nil { + log.Printf("Failed to shutdown worker: %v", err) + } + }() taskHubClient := backend.NewTaskHubClient(be) id, err := taskHubClient.ScheduleNewOrchestration(ctx, orchestratorName, api.WithInput(1)) diff --git a/samples/parallel/parallel.go b/samples/parallel/parallel.go index 3a4e6e8b..7465f37d 100644 --- a/samples/parallel/parallel.go +++ b/samples/parallel/parallel.go @@ -17,9 +17,15 @@ import ( func main() { // Create a new task registry and add the orchestrator and activities r := task.NewTaskRegistry() - r.AddOrchestrator(UpdateDevicesOrchestrator) - r.AddActivity(GetDevicesToUpdate) - r.AddActivity(UpdateDevice) + if err := r.AddOrchestrator(UpdateDevicesOrchestrator); err != nil { + log.Fatalf("Failed to register orchestrator: %v", err) + } + if err := r.AddActivity(GetDevicesToUpdate); err != nil { + log.Fatalf("Failed to register activity: %v", err) + } + if err := r.AddActivity(UpdateDevice); err != nil { + log.Fatalf("Failed to register activity: %v", err) + } // Init the client ctx := context.Background() @@ -27,12 +33,16 @@ func main() { if err != nil { log.Fatalf("Failed to initialize the client: %v", err) } - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + log.Printf("Failed to shutdown worker: %v", err) + } + }() // Start a new orchestration id, err := client.ScheduleNewOrchestration(ctx, UpdateDevicesOrchestrator) if err != nil { - log.Fatalf("Failed to schedule new orchestration: %v", err) + log.Fatalf("Failed to schedule new orchestration: %v", err) //nolint:gocritic // Fatalf in sample main() is acceptable } // Wait for the orchestration to complete diff --git a/samples/retries/retries.go b/samples/retries/retries.go index ee5a8b42..f305d185 100644 --- a/samples/retries/retries.go +++ b/samples/retries/retries.go @@ -16,8 +16,12 @@ import ( func main() { // Create a new task registry and add the orchestrator and activities r := task.NewTaskRegistry() - r.AddOrchestrator(RetryActivityOrchestrator) - r.AddActivity(RandomFailActivity) + if err := r.AddOrchestrator(RetryActivityOrchestrator); err != nil { + log.Fatalf("Failed to register orchestrator: %v", err) + } + if err := r.AddActivity(RandomFailActivity); err != nil { + log.Fatalf("Failed to register activity: %v", err) + } // Init the client ctx := context.Background() @@ -25,12 +29,16 @@ func main() { if err != nil { log.Fatalf("Failed to initialize the client: %v", err) } - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + log.Printf("Failed to shutdown worker: %v", err) + } + }() // Start a new orchestration id, err := client.ScheduleNewOrchestration(ctx, RetryActivityOrchestrator) if err != nil { - log.Fatalf("Failed to schedule new orchestration: %v", err) + log.Fatalf("Failed to schedule new orchestration: %v", err) //nolint:gocritic // Fatalf in sample main() is acceptable } // Wait for the orchestration to complete diff --git a/samples/sequence/sequence.go b/samples/sequence/sequence.go index 4809bcb1..281d3287 100644 --- a/samples/sequence/sequence.go +++ b/samples/sequence/sequence.go @@ -14,8 +14,12 @@ import ( func main() { // Create a new task registry and add the orchestrator and activities r := task.NewTaskRegistry() - r.AddOrchestrator(ActivitySequenceOrchestrator) - r.AddActivity(SayHelloActivity) + if err := r.AddOrchestrator(ActivitySequenceOrchestrator); err != nil { + log.Fatalf("Failed to register orchestrator: %v", err) + } + if err := r.AddActivity(SayHelloActivity); err != nil { + log.Fatalf("Failed to register activity: %v", err) + } // Init the client ctx := context.Background() @@ -23,12 +27,16 @@ func main() { if err != nil { log.Fatalf("Failed to initialize the client: %v", err) } - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + log.Printf("Failed to shutdown worker: %v", err) + } + }() // Start a new orchestration id, err := client.ScheduleNewOrchestration(ctx, ActivitySequenceOrchestrator) if err != nil { - log.Fatalf("Failed to schedule new orchestration: %v", err) + log.Fatalf("Failed to schedule new orchestration: %v", err) //nolint:gocritic // Fatalf in sample main() is acceptable } // Wait for the orchestration to complete diff --git a/task/executor.go b/task/executor.go index 4cf3d65a..c210e9be 100644 --- a/task/executor.go +++ b/task/executor.go @@ -28,7 +28,7 @@ func (te *taskExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID, ts := e.GetTaskScheduled() if ts == nil { // No clean way to deal with this other than to abandon it - return nil, fmt.Errorf("Unexpected event type for ExecuteActivity: %v", e.EventType) + return nil, fmt.Errorf("unexpected event type for ExecuteActivity: %v", e.EventType) } invoker, ok := te.Registry.activities[ts.Name] if !ok { @@ -97,12 +97,12 @@ func (te taskExecutor) Shutdown(ctx context.Context) error { } func unmarshalData(data []byte, v any) error { - if v == nil { + switch { + case v == nil: return nil - } else if len(data) == 0 { - v = nil + case len(data) == 0: return nil - } else { + default: return json.Unmarshal(data, v) } } diff --git a/task/orchestrator.go b/task/orchestrator.go index b418b0d3..0fc345af 100644 --- a/task/orchestrator.go +++ b/task/orchestrator.go @@ -3,6 +3,7 @@ package task import ( "container/list" "encoding/json" + "errors" "fmt" "math" "strings" @@ -132,7 +133,7 @@ func (ctx *OrchestrationContext) start() (actions []*protos.OrchestratorAction) defer func() { result := recover() - if result == ErrTaskBlocked { + if resultErr, ok := result.(error); ok && errors.Is(resultErr, ErrTaskBlocked) { // Expected, normal part of execution actions = ctx.actions() } else if result != nil { @@ -143,7 +144,9 @@ func (ctx *OrchestrationContext) start() (actions []*protos.OrchestratorAction) for { if ok, err := ctx.processNextEvent(); err != nil { - ctx.setFailed(err) + if setErr := ctx.setFailed(err); setErr != nil { + break + } break } else if !ok { // Orchestrator finished, break out of the loop and return any pending actions @@ -170,12 +173,13 @@ func (ctx *OrchestrationContext) processNextEvent() (bool, error) { func (ctx *OrchestrationContext) getNextHistoryEvent() (*protos.HistoryEvent, bool) { var historyList []*protos.HistoryEvent index := ctx.historyIndex - if ctx.historyIndex >= len(ctx.oldEvents)+len(ctx.newEvents) { + switch { + case ctx.historyIndex >= len(ctx.oldEvents)+len(ctx.newEvents): return nil, false - } else if ctx.historyIndex < len(ctx.oldEvents) { + case ctx.historyIndex < len(ctx.oldEvents): ctx.IsReplaying = true historyList = ctx.oldEvents - } else { + default: ctx.IsReplaying = false historyList = ctx.newEvents index -= len(ctx.oldEvents) @@ -328,8 +332,7 @@ func (ctx *OrchestrationContext) internalScheduleTaskWithRetries(initialAttempt timerErr := ctx.createTimerInternal(nextDelay).Await(nil) if timerErr != nil { - // TODO use errors.Join when updating golang - return fmt.Errorf("%v %w", timerErr, err) + return errors.Join(timerErr, err) } err = ctx.internalScheduleTaskWithRetries(initialAttempt, schedule, policy, retryCount+1).Await(v) @@ -451,11 +454,12 @@ func (ctx *OrchestrationContext) onExecutionStarted(es *protos.ExecutionStartedE output, appError := orchestrator(ctx) var err error - if appError != nil { + switch { + case appError != nil: err = ctx.setFailed(appError) - } else if ctx.continuedAsNew { + case ctx.continuedAsNew: err = ctx.setContinuedAsNew() - } else { + default: err = ctx.setComplete(output) } diff --git a/tests/backend_test.go b/tests/backend_test.go index ff9c22bb..1d72729a 100644 --- a/tests/backend_test.go +++ b/tests/backend_test.go @@ -140,7 +140,7 @@ func Test_CompleteOrchestration(t *testing.T) { expectedResult := "done!" stackTraceBuffer := make([]byte, 256) - var expectedStackTrace string = "" + var expectedStackTrace string // Produce an ExecutionCompleted event with a particular output getOrchestratorActions := func() []*protos.OrchestratorAction { @@ -196,7 +196,7 @@ func Test_ScheduleActivityTasks(t *testing.T) { for i, be := range backends { initTest(t, be, i, true) - wi, err := be.GetActivityWorkItem(ctx) + _, err := be.GetActivityWorkItem(ctx) if !assert.ErrorIs(t, err, backend.ErrNoWorkItems) { continue } @@ -221,7 +221,7 @@ func Test_ScheduleActivityTasks(t *testing.T) { assert.ErrorIs(t, err, backend.ErrNoWorkItems) // However, there should be an activity work item - wi, err = be.GetActivityWorkItem(ctx) + wi, err := be.GetActivityWorkItem(ctx) if assert.NoError(t, err) && assert.NotNil(t, wi) { assert.Equal(t, expectedName, wi.NewEvent.GetTaskScheduled().GetName()) assert.Equal(t, expectedInput, wi.NewEvent.GetTaskScheduled().GetInput().GetValue()) @@ -332,10 +332,12 @@ func Test_AbandonActivityWorkItem(t *testing.T) { if err := be.AbandonActivityWorkItem(ctx, wi); assert.NoError(t, err) { // Re-fetch the abandoned activity work item - wi, err = be.GetActivityWorkItem(ctx) - assert.Equal(t, "MyActivity", wi.NewEvent.GetTaskScheduled().GetName()) - assert.Equal(t, int32(123), wi.NewEvent.EventId) - assert.Nil(t, wi.NewEvent.GetTaskScheduled().GetInput()) + wi, err := be.GetActivityWorkItem(ctx) + if assert.NoError(t, err) { + assert.Equal(t, "MyActivity", wi.NewEvent.GetTaskScheduled().GetName()) + assert.Equal(t, int32(123), wi.NewEvent.EventId) + assert.Nil(t, wi.NewEvent.GetTaskScheduled().GetInput()) + } } } } @@ -451,7 +453,9 @@ func workItemProcessingTestLogic( if state, ok := getOrchestrationRuntimeState(t, be, wi); ok { // Update the state with new events. Normally the worker logic would do this. for _, e := range wi.NewEvents { - state.AddEvent(e) + if err := state.AddEvent(e); err != nil { + t.Fatalf("failed to add event: %v", err) + } } actions := getOrchestratorActions() diff --git a/tests/grpc/grpc_test.go b/tests/grpc/grpc_test.go index 7e085cb7..e5bf6d35 100644 --- a/tests/grpc/grpc_test.go +++ b/tests/grpc/grpc_test.go @@ -61,7 +61,11 @@ func TestMain(m *testing.M) { if err != nil { log.Fatalf("failed to connect to gRPC server: %v", err) } - defer conn.Close() + defer func() { + if err := conn.Close(); err != nil { + log.Printf("failed to close connection: %v", err) + } + }() grpcClient = client.NewTaskHubGrpcClient(conn, logger) // Run the test exitCode @@ -71,16 +75,16 @@ func TestMain(m *testing.M) { defer cancel() err = grpcExecutor.Shutdown(timeoutCtx) if err != nil { - log.Fatalf("failed to shutdown grpc Executor: %v", err) + log.Printf("failed to shutdown grpc Executor: %v", err) } timeoutCtx, cancel = context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := taskHubWorker.Shutdown(timeoutCtx); err != nil { - log.Fatalf("failed to shutdown worker: %v", err) + log.Printf("failed to shutdown worker: %v", err) } grpcServer.Stop() - os.Exit(exitCode) + os.Exit(exitCode) //nolint:gocritic // os.Exit in TestMain is required by Go testing } func startGrpcListener(t *testing.T, r *task.TaskRegistry) context.CancelFunc { @@ -91,11 +95,11 @@ func startGrpcListener(t *testing.T, r *task.TaskRegistry) context.CancelFunc { func Test_Grpc_WaitForInstanceStart_Timeout(t *testing.T) { r := task.NewTaskRegistry() - r.AddOrchestratorN("WaitForInstanceStartThrowsException", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("WaitForInstanceStartThrowsException", func(ctx *task.OrchestrationContext) (any, error) { // sleep 5 seconds time.Sleep(5 * time.Second) return 42, nil - }) + })) cancelListener := startGrpcListener(t, r) defer cancelListener() @@ -113,11 +117,11 @@ func Test_Grpc_WaitForInstanceStart_Timeout(t *testing.T) { func Test_Grpc_WaitForInstanceStart_ConnectionResume(t *testing.T) { r := task.NewTaskRegistry() - r.AddOrchestratorN("WaitForInstanceStartThrowsException", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("WaitForInstanceStartThrowsException", func(ctx *task.OrchestrationContext) (any, error) { // sleep 5 seconds time.Sleep(5 * time.Second) return 42, nil - }) + })) cancelListener := startGrpcListener(t, r) @@ -148,7 +152,7 @@ func Test_Grpc_WaitForInstanceStart_ConnectionResume(t *testing.T) { func Test_Grpc_HelloOrchestration(t *testing.T) { r := task.NewTaskRegistry() - r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { var input string if err := ctx.GetInput(&input); err != nil { return nil, err @@ -157,14 +161,14 @@ func Test_Grpc_HelloOrchestration(t *testing.T) { err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) ctx.SetCustomStatus("hello-test") return output, err - }) - r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { var name string if err := ctx.GetInput(&name); err != nil { return nil, err } return fmt.Sprintf("Hello, %s!", name), nil - }) + })) cancelListener := startGrpcListener(t, r) defer cancelListener() @@ -185,16 +189,18 @@ func Test_Grpc_SuspendResume(t *testing.T) { const eventCount = 10 r := task.NewTaskRegistry() - r.AddOrchestratorN("SuspendResumeOrchestration", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("SuspendResumeOrchestration", func(ctx *task.OrchestrationContext) (any, error) { for i := 0; i < eventCount; i++ { var value int - ctx.WaitForSingleEvent("MyEvent", 5*time.Second).Await(&value) + if err := ctx.WaitForSingleEvent("MyEvent", 5*time.Second).Await(&value); err != nil { + return false, err + } if value != i { return false, errors.New("Unexpected value") } } return true, nil - }) + })) cancelListener := startGrpcListener(t, r) defer cancelListener() @@ -237,30 +243,38 @@ func Test_Grpc_Terminate_Recursive(t *testing.T) { delayTime := 4 * time.Second executedActivity := false r := task.NewTaskRegistry() - r.AddOrchestratorN("Root", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("Root", func(ctx *task.OrchestrationContext) (any, error) { tasks := []task.Task{} for i := 0; i < 5; i++ { task := ctx.CallSubOrchestrator("L1") tasks = append(tasks, task) } for _, task := range tasks { - task.Await(nil) + if err := task.Await(nil); err != nil { + return nil, err + } } return nil, nil - }) - r.AddOrchestratorN("L1", func(ctx *task.OrchestrationContext) (any, error) { - ctx.CallSubOrchestrator("L2").Await(nil) + })) + require.NoError(t, r.AddOrchestratorN("L1", func(ctx *task.OrchestrationContext) (any, error) { + if err := ctx.CallSubOrchestrator("L2").Await(nil); err != nil { + return nil, err + } return nil, nil - }) - r.AddOrchestratorN("L2", func(ctx *task.OrchestrationContext) (any, error) { - ctx.CreateTimer(delayTime).Await(nil) - ctx.CallActivity("Fail").Await(nil) + })) + require.NoError(t, r.AddOrchestratorN("L2", func(ctx *task.OrchestrationContext) (any, error) { + if err := ctx.CreateTimer(delayTime).Await(nil); err != nil { + return nil, err + } + if err := ctx.CallActivity("Fail").Await(nil); err != nil { + return nil, err + } return nil, nil - }) - r.AddActivityN("Fail", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("Fail", func(ctx task.ActivityContext) (any, error) { executedActivity = true return nil, errors.New("Failed: Should not have executed the activity") - }) + })) cancelListener := startGrpcListener(t, r) defer cancelListener() @@ -297,23 +311,25 @@ func Test_Grpc_Terminate_Recursive(t *testing.T) { func Test_Grpc_ReuseInstanceIDIgnore(t *testing.T) { delayTime := 2 * time.Second r := task.NewTaskRegistry() - r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { var input string if err := ctx.GetInput(&input); err != nil { return nil, err } var output string - ctx.CreateTimer(delayTime).Await(nil) + if err := ctx.CreateTimer(delayTime).Await(nil); err != nil { + return nil, err + } err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) return output, err - }) - r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { var name string if err := ctx.GetInput(&name); err != nil { return nil, err } return fmt.Sprintf("Hello, %s!", name), nil - }) + })) cancelListener := startGrpcListener(t, r) defer cancelListener() @@ -326,7 +342,8 @@ func Test_Grpc_ReuseInstanceIDIgnore(t *testing.T) { id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID)) require.NoError(t, err) // wait orchestration to start - grpcClient.WaitForOrchestrationStart(ctx, id) + _, err = grpcClient.WaitForOrchestrationStart(ctx, id) + require.NoError(t, err) pivotTime := time.Now() // schedule again, it should ignore creating the new orchestration id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationIdReusePolicy(reuseIdPolicy)) @@ -345,23 +362,25 @@ func Test_Grpc_ReuseInstanceIDIgnore(t *testing.T) { func Test_Grpc_ReuseInstanceIDTerminate(t *testing.T) { delayTime := 2 * time.Second r := task.NewTaskRegistry() - r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { var input string if err := ctx.GetInput(&input); err != nil { return nil, err } var output string - ctx.CreateTimer(delayTime).Await(nil) + if err := ctx.CreateTimer(delayTime).Await(nil); err != nil { + return nil, err + } err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) return output, err - }) - r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { var name string if err := ctx.GetInput(&name); err != nil { return nil, err } return fmt.Sprintf("Hello, %s!", name), nil - }) + })) cancelListener := startGrpcListener(t, r) defer cancelListener() @@ -374,7 +393,8 @@ func Test_Grpc_ReuseInstanceIDTerminate(t *testing.T) { id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID)) require.NoError(t, err) // wait orchestration to start - grpcClient.WaitForOrchestrationStart(ctx, id) + _, err = grpcClient.WaitForOrchestrationStart(ctx, id) + require.NoError(t, err) pivotTime := time.Now() // schedule again, it should terminate the first orchestration and start a new one id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationIdReusePolicy(reuseIdPolicy)) @@ -393,31 +413,33 @@ func Test_Grpc_ReuseInstanceIDTerminate(t *testing.T) { func Test_Grpc_ReuseInstanceIDError(t *testing.T) { delayTime := 4 * time.Second r := task.NewTaskRegistry() - r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { var input string if err := ctx.GetInput(&input); err != nil { return nil, err } var output string - ctx.CreateTimer(delayTime).Await(nil) + if err := ctx.CreateTimer(delayTime).Await(nil); err != nil { + return nil, err + } err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) return output, err - }) - r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { var name string if err := ctx.GetInput(&name); err != nil { return nil, err } return fmt.Sprintf("Hello, %s!", name), nil - }) + })) cancelListener := startGrpcListener(t, r) defer cancelListener() instanceID := api.InstanceID("THROW_IF_RUNNING_OR_COMPLETED") - id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID)) + _, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID)) require.NoError(t, err) - id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id)) + _, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(instanceID)) if assert.Error(t, err) { assert.Contains(t, err.Error(), "orchestration instance already exists") } @@ -425,7 +447,7 @@ func Test_Grpc_ReuseInstanceIDError(t *testing.T) { func Test_Grpc_ActivityRetries(t *testing.T) { r := task.NewTaskRegistry() - r.AddOrchestratorN("ActivityRetries", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("ActivityRetries", func(ctx *task.OrchestrationContext) (any, error) { if err := ctx.CallActivity("FailActivity", task.WithActivityRetryPolicy(&task.RetryPolicy{ MaxAttempts: 3, InitialRetryInterval: 10 * time.Millisecond, @@ -433,10 +455,10 @@ func Test_Grpc_ActivityRetries(t *testing.T) { return nil, err } return nil, nil - }) - r.AddActivityN("FailActivity", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("FailActivity", func(ctx task.ActivityContext) (any, error) { return nil, errors.New("activity failure") - }) + })) cancelListener := startGrpcListener(t, r) defer cancelListener() @@ -456,7 +478,7 @@ func Test_Grpc_ActivityRetries(t *testing.T) { func Test_Grpc_SubOrchestratorRetries(t *testing.T) { r := task.NewTaskRegistry() - r.AddOrchestratorN("Parent", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("Parent", func(ctx *task.OrchestrationContext) (any, error) { err := ctx.CallSubOrchestrator( "Child", task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_child"), @@ -466,10 +488,10 @@ func Test_Grpc_SubOrchestratorRetries(t *testing.T) { BackoffCoefficient: 2, })).Await(nil) return nil, err - }) - r.AddOrchestratorN("Child", func(ctx *task.OrchestrationContext) (any, error) { + })) + require.NoError(t, r.AddOrchestratorN("Child", func(ctx *task.OrchestrationContext) (any, error) { return nil, errors.New("Child failed") - }) + })) cancelListener := startGrpcListener(t, r) defer cancelListener() diff --git a/tests/orchestrations_test.go b/tests/orchestrations_test.go index 922f882f..f7b66c66 100644 --- a/tests/orchestrations_test.go +++ b/tests/orchestrations_test.go @@ -22,15 +22,19 @@ import ( func Test_EmptyOrchestration(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("EmptyOrchestrator", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("EmptyOrchestrator", func(ctx *task.OrchestrationContext) (any, error) { return nil, nil - }) + })) // Initialization ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration id, err := client.ScheduleNewOrchestration(ctx, "EmptyOrchestrator") @@ -50,16 +54,20 @@ func Test_EmptyOrchestration(t *testing.T) { func Test_SingleTimer(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("SingleTimer", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("SingleTimer", func(ctx *task.OrchestrationContext) (any, error) { err := ctx.CreateTimer(time.Duration(0)).Await(nil) return nil, err - }) + })) // Initialization ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration id, err := client.ScheduleNewOrchestration(ctx, "SingleTimer") @@ -83,7 +91,7 @@ func Test_SingleTimer(t *testing.T) { func Test_ConcurrentTimers(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("TimerFanOut", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("TimerFanOut", func(ctx *task.OrchestrationContext) (any, error) { tasks := []task.Task{} for i := 0; i < 3; i++ { tasks = append(tasks, ctx.CreateTimer(1*time.Second)) @@ -94,14 +102,18 @@ func Test_ConcurrentTimers(t *testing.T) { } } return nil, nil - }) + })) // Initialization ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration id, err := client.ScheduleNewOrchestration(ctx, "TimerFanOut") @@ -127,20 +139,28 @@ func Test_ConcurrentTimers(t *testing.T) { func Test_IsReplaying(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("IsReplayingOrch", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("IsReplayingOrch", func(ctx *task.OrchestrationContext) (any, error) { values := []bool{ctx.IsReplaying} - ctx.CreateTimer(time.Duration(0)).Await(nil) + if err := ctx.CreateTimer(time.Duration(0)).Await(nil); err != nil { + return nil, err + } values = append(values, ctx.IsReplaying) - ctx.CreateTimer(time.Duration(0)).Await(nil) + if err := ctx.CreateTimer(time.Duration(0)).Await(nil); err != nil { + return nil, err + } values = append(values, ctx.IsReplaying) return values, nil - }) + })) // Initialization ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration id, err := client.ScheduleNewOrchestration(ctx, "IsReplayingOrch") @@ -165,7 +185,7 @@ func Test_IsReplaying(t *testing.T) { func Test_SingleActivity(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { var input string if err := ctx.GetInput(&input); err != nil { return nil, err @@ -173,20 +193,24 @@ func Test_SingleActivity(t *testing.T) { var output string err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) return output, err - }) - r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { var name string if err := ctx.GetInput(&name); err != nil { return nil, err } return fmt.Sprintf("Hello, %s!", name), nil - }) + })) // Initialization ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界")) @@ -210,7 +234,7 @@ func Test_SingleActivity(t *testing.T) { func Test_ActivityChain(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("ActivityChain", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("ActivityChain", func(ctx *task.OrchestrationContext) (any, error) { val := 0 for i := 0; i < 10; i++ { if err := ctx.CallActivity("PlusOne", task.WithActivityInput(val)).Await(&val); err != nil { @@ -218,20 +242,24 @@ func Test_ActivityChain(t *testing.T) { } } return val, nil - }) - r.AddActivityN("PlusOne", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("PlusOne", func(ctx task.ActivityContext) (any, error) { var input int if err := ctx.GetInput(&input); err != nil { return nil, err } return input + 1, nil - }) + })) // Initialization ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration id, err := client.ScheduleNewOrchestration(ctx, "ActivityChain") @@ -257,7 +285,7 @@ func Test_ActivityChain(t *testing.T) { func Test_ActivityRetries(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("ActivityRetries", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("ActivityRetries", func(ctx *task.OrchestrationContext) (any, error) { if err := ctx.CallActivity("FailActivity", task.WithActivityRetryPolicy(&task.RetryPolicy{ MaxAttempts: 3, InitialRetryInterval: 10 * time.Millisecond, @@ -265,16 +293,20 @@ func Test_ActivityRetries(t *testing.T) { return nil, err } return nil, nil - }) - r.AddActivityN("FailActivity", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("FailActivity", func(ctx task.ActivityContext) (any, error) { return nil, errors.New("activity failure") - }) + })) // Initialization ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration id, err := client.ScheduleNewOrchestration(ctx, "ActivityRetries") @@ -303,7 +335,7 @@ func Test_ActivityRetries(t *testing.T) { func Test_ActivityFanOut(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("ActivityFanOut", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("ActivityFanOut", func(ctx *task.OrchestrationContext) (any, error) { tasks := []task.Task{} for i := 0; i < 10; i++ { tasks = append(tasks, ctx.CallActivity("ToString", task.WithActivityInput(i))) @@ -318,21 +350,25 @@ func Test_ActivityFanOut(t *testing.T) { } sort.Sort(sort.Reverse(sort.StringSlice(results))) return results, nil - }) - r.AddActivityN("ToString", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("ToString", func(ctx task.ActivityContext) (any, error) { var input int if err := ctx.GetInput(&input); err != nil { return nil, err } time.Sleep(1 * time.Second) return fmt.Sprintf("%d", input), nil - }) + })) // Initialization ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r, backend.WithMaxParallelism(10)) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration id, err := client.ScheduleNewOrchestration(ctx, "ActivityFanOut") @@ -357,7 +393,7 @@ func Test_ActivityFanOut(t *testing.T) { func Test_SingleSubOrchestrator_Completed(t *testing.T) { r := task.NewTaskRegistry() - r.AddOrchestratorN("Parent", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("Parent", func(ctx *task.OrchestrationContext) (any, error) { var input any if err := ctx.GetInput(&input); err != nil { return nil, err @@ -368,19 +404,23 @@ func Test_SingleSubOrchestrator_Completed(t *testing.T) { task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_child"), task.WithSubOrchestratorInput(input)).Await(&output) return output, err - }) - r.AddOrchestratorN("Child", func(ctx *task.OrchestrationContext) (any, error) { + })) + require.NoError(t, r.AddOrchestratorN("Child", func(ctx *task.OrchestrationContext) (any, error) { var input string if err := ctx.GetInput(&input); err != nil { return "", err } return input, nil - }) + })) ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() id, err := client.ScheduleNewOrchestration(ctx, "Parent", api.WithInput("Hello, world!")) require.NoError(t, err) @@ -399,20 +439,24 @@ func Test_SingleSubOrchestrator_Completed(t *testing.T) { func Test_SingleSubOrchestrator_Failed(t *testing.T) { r := task.NewTaskRegistry() - r.AddOrchestratorN("Parent", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("Parent", func(ctx *task.OrchestrationContext) (any, error) { err := ctx.CallSubOrchestrator( "Child", task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_child")).Await(nil) return nil, err - }) - r.AddOrchestratorN("Child", func(ctx *task.OrchestrationContext) (any, error) { + })) + require.NoError(t, r.AddOrchestratorN("Child", func(ctx *task.OrchestrationContext) (any, error) { return nil, errors.New("Child failed") - }) + })) ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() id, err := client.ScheduleNewOrchestration(ctx, "Parent") require.NoError(t, err) @@ -433,7 +477,7 @@ func Test_SingleSubOrchestrator_Failed(t *testing.T) { func Test_SingleSubOrchestrator_Failed_Retries(t *testing.T) { r := task.NewTaskRegistry() - r.AddOrchestratorN("Parent", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("Parent", func(ctx *task.OrchestrationContext) (any, error) { err := ctx.CallSubOrchestrator( "Child", task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_child"), @@ -443,15 +487,19 @@ func Test_SingleSubOrchestrator_Failed_Retries(t *testing.T) { BackoffCoefficient: 2, })).Await(nil) return nil, err - }) - r.AddOrchestratorN("Child", func(ctx *task.OrchestrationContext) (any, error) { + })) + require.NoError(t, r.AddOrchestratorN("Child", func(ctx *task.OrchestrationContext) (any, error) { return nil, errors.New("Child failed") - }) + })) ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() id, err := client.ScheduleNewOrchestration(ctx, "Parent") require.NoError(t, err) @@ -477,7 +525,7 @@ func Test_SingleSubOrchestrator_Failed_Retries(t *testing.T) { func Test_ContinueAsNew(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("ContinueAsNewTest", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("ContinueAsNewTest", func(ctx *task.OrchestrationContext) (any, error) { var input int32 if err := ctx.GetInput(&input); err != nil { return nil, err @@ -490,13 +538,17 @@ func Test_ContinueAsNew(t *testing.T) { ctx.ContinueAsNew(input + 1) } return input, nil - }) + })) // Initialization ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration id, err := client.ScheduleNewOrchestration(ctx, "ContinueAsNewTest", api.WithInput(0)) @@ -529,7 +581,7 @@ func Test_ContinueAsNew(t *testing.T) { func Test_ContinueAsNew_Events(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("ContinueAsNewTest", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("ContinueAsNewTest", func(ctx *task.OrchestrationContext) (any, error) { var input int32 if err := ctx.GetInput(&input); err != nil { return nil, err @@ -543,12 +595,16 @@ func Test_ContinueAsNew_Events(t *testing.T) { } ctx.ContinueAsNew(input+1, task.WithKeepUnprocessedEvents()) return nil, nil - }) + })) // Initialization ctx := context.Background() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration id, err := client.ScheduleNewOrchestration(ctx, "ContinueAsNewTest", api.WithInput(0)) @@ -566,7 +622,7 @@ func Test_ContinueAsNew_Events(t *testing.T) { func Test_ExternalEventContention(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("ContinueAsNewTest", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("ContinueAsNewTest", func(ctx *task.OrchestrationContext) (any, error) { var data int32 if err := ctx.WaitForSingleEvent("MyEventData", 1*time.Second).Await(&data); err != nil && !errors.Is(err, task.ErrTaskCanceled) { return nil, err @@ -583,12 +639,16 @@ func Test_ExternalEventContention(t *testing.T) { ctx.ContinueAsNew(nil, task.WithKeepUnprocessedEvents()) return nil, nil - }) + })) // Initialization ctx := context.Background() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration id, err := client.ScheduleNewOrchestration(ctx, "ContinueAsNewTest") @@ -617,22 +677,28 @@ func Test_ExternalEventOrchestration(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("ExternalEventOrchestration", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("ExternalEventOrchestration", func(ctx *task.OrchestrationContext) (any, error) { for i := 0; i < eventCount; i++ { var value int - ctx.WaitForSingleEvent("MyEvent", 5*time.Second).Await(&value) + if err := ctx.WaitForSingleEvent("MyEvent", 5*time.Second).Await(&value); err != nil { + return false, err + } if value != i { return false, errors.New("Unexpected value") } } return true, nil - }) + })) // Initialization ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration id, err := client.ScheduleNewOrchestration(ctx, "ExternalEventOrchestration", api.WithInput(0)) @@ -674,18 +740,22 @@ func Test_ExternalEventOrchestration(t *testing.T) { func Test_ExternalEventTimeout(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("ExternalEventOrchestrationWithTimeout", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("ExternalEventOrchestrationWithTimeout", func(ctx *task.OrchestrationContext) (any, error) { if err := ctx.WaitForSingleEvent("MyEvent", 2*time.Second).Await(nil); err != nil { return nil, err } return nil, nil - }) + })) // Initialization ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run two variations, one where we raise the external event and one where we don't (timeout) for _, raiseEvent := range []bool{true, false} { @@ -742,22 +812,28 @@ func Test_SuspendResumeOrchestration(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("SuspendResumeOrchestration", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("SuspendResumeOrchestration", func(ctx *task.OrchestrationContext) (any, error) { for i := 0; i < eventCount; i++ { var value int - ctx.WaitForSingleEvent("MyEvent", 5*time.Second).Await(&value) + if err := ctx.WaitForSingleEvent("MyEvent", 5*time.Second).Await(&value); err != nil { + return false, err + } if value != i { return false, errors.New("Unexpected value") } } return true, nil - }) + })) // Initialization ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration, which will block waiting for external events id, err := client.ScheduleNewOrchestration(ctx, "SuspendResumeOrchestration", api.WithInput(0)) @@ -821,16 +897,22 @@ func Test_SuspendResumeOrchestration(t *testing.T) { func Test_TerminateOrchestration(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("MyOrchestrator", func(ctx *task.OrchestrationContext) (any, error) { - ctx.CreateTimer(3 * time.Second).Await(nil) + require.NoError(t, r.AddOrchestratorN("MyOrchestrator", func(ctx *task.OrchestrationContext) (any, error) { + if err := ctx.CreateTimer(3 * time.Second).Await(nil); err != nil { + return nil, err + } return nil, nil - }) + })) // Initialization ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration, which will block waiting for external events id, err := client.ScheduleNewOrchestration(ctx, "MyOrchestrator") @@ -858,35 +940,47 @@ func Test_TerminateOrchestration_Recursive(t *testing.T) { delayTime := 4 * time.Second executedActivity := false r := task.NewTaskRegistry() - r.AddOrchestratorN("Root", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("Root", func(ctx *task.OrchestrationContext) (any, error) { tasks := []task.Task{} for i := 0; i < 5; i++ { task := ctx.CallSubOrchestrator("L1", task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_L1_"+strconv.Itoa(i))) tasks = append(tasks, task) } for _, task := range tasks { - task.Await(nil) + if err := task.Await(nil); err != nil { + return nil, err + } } return nil, nil - }) - r.AddOrchestratorN("L1", func(ctx *task.OrchestrationContext) (any, error) { - ctx.CallSubOrchestrator("L2", task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_L2")).Await(nil) + })) + require.NoError(t, r.AddOrchestratorN("L1", func(ctx *task.OrchestrationContext) (any, error) { + if err := ctx.CallSubOrchestrator("L2", task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_L2")).Await(nil); err != nil { + return nil, err + } return nil, nil - }) - r.AddOrchestratorN("L2", func(ctx *task.OrchestrationContext) (any, error) { - ctx.CreateTimer(delayTime).Await(nil) - ctx.CallActivity("Fail").Await(nil) + })) + require.NoError(t, r.AddOrchestratorN("L2", func(ctx *task.OrchestrationContext) (any, error) { + if err := ctx.CreateTimer(delayTime).Await(nil); err != nil { + return nil, err + } + if err := ctx.CallActivity("Fail").Await(nil); err != nil { + return nil, err + } return nil, nil - }) - r.AddActivityN("Fail", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("Fail", func(ctx task.ActivityContext) (any, error) { executedActivity = true return nil, errors.New("Failed: Should not have executed the activity") - }) + })) // Initialization ctx := context.Background() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Test terminating with and without recursion for _, recurse := range []bool{true, false} { @@ -942,27 +1036,37 @@ func Test_TerminateOrchestration_Recursive(t *testing.T) { func Test_TerminateOrchestration_Recursive_TerminateCompletedSubOrchestration(t *testing.T) { delayTime := 4 * time.Second r := task.NewTaskRegistry() - r.AddOrchestratorN("Root", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("Root", func(ctx *task.OrchestrationContext) (any, error) { // Create L1 sub-orchestration and wait for it to complete - ctx.CallSubOrchestrator("L1", task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_L1")).Await(nil) - ctx.CreateTimer(delayTime).Await(nil) + if err := ctx.CallSubOrchestrator("L1", task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_L1")).Await(nil); err != nil { + return nil, err + } + if err := ctx.CreateTimer(delayTime).Await(nil); err != nil { + return nil, err + } return nil, nil - }) - r.AddOrchestratorN("L1", func(ctx *task.OrchestrationContext) (any, error) { + })) + require.NoError(t, r.AddOrchestratorN("L1", func(ctx *task.OrchestrationContext) (any, error) { // Create L2 sub-orchestration but don't wait for it to complete ctx.CallSubOrchestrator("L2", task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_L2")) return nil, nil - }) - r.AddOrchestratorN("L2", func(ctx *task.OrchestrationContext) (any, error) { + })) + require.NoError(t, r.AddOrchestratorN("L2", func(ctx *task.OrchestrationContext) (any, error) { // Wait for `delayTime` - ctx.CreateTimer(delayTime).Await(nil) + if err := ctx.CreateTimer(delayTime).Await(nil); err != nil { + return nil, err + } return nil, nil - }) + })) // Initialization ctx := context.Background() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Test terminating with and without recursion for _, recurse := range []bool{true, false} { @@ -1032,17 +1136,21 @@ func Test_TerminateOrchestration_Recursive_TerminateCompletedSubOrchestration(t func Test_PurgeCompletedOrchestration(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("ExternalEventOrchestration", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("ExternalEventOrchestration", func(ctx *task.OrchestrationContext) (any, error) { if err := ctx.WaitForSingleEvent("MyEvent", 30*time.Second).Await(nil); err != nil { return nil, err } return nil, nil - }) + })) // Initialization ctx := context.Background() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the orchestration id, err := client.ScheduleNewOrchestration(ctx, "ExternalEventOrchestration") @@ -1085,23 +1193,33 @@ func Test_PurgeCompletedOrchestration(t *testing.T) { func Test_PurgeOrchestration_Recursive(t *testing.T) { delayTime := 4 * time.Second r := task.NewTaskRegistry() - r.AddOrchestratorN("Root", func(ctx *task.OrchestrationContext) (any, error) { - ctx.CallSubOrchestrator("L1", task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_L1")).Await(nil) + require.NoError(t, r.AddOrchestratorN("Root", func(ctx *task.OrchestrationContext) (any, error) { + if err := ctx.CallSubOrchestrator("L1", task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_L1")).Await(nil); err != nil { + return nil, err + } return nil, nil - }) - r.AddOrchestratorN("L1", func(ctx *task.OrchestrationContext) (any, error) { - ctx.CallSubOrchestrator("L2", task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_L2")).Await(nil) + })) + require.NoError(t, r.AddOrchestratorN("L1", func(ctx *task.OrchestrationContext) (any, error) { + if err := ctx.CallSubOrchestrator("L2", task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_L2")).Await(nil); err != nil { + return nil, err + } return nil, nil - }) - r.AddOrchestratorN("L2", func(ctx *task.OrchestrationContext) (any, error) { - ctx.CreateTimer(delayTime).Await(nil) + })) + require.NoError(t, r.AddOrchestratorN("L2", func(ctx *task.OrchestrationContext) (any, error) { + if err := ctx.CreateTimer(delayTime).Await(nil); err != nil { + return nil, err + } return nil, nil - }) + })) // Initialization ctx := context.Background() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Test terminating with and without recursion for _, recurse := range []bool{true, false} { @@ -1149,7 +1267,7 @@ func Test_RecreateCompletedOrchestration(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("HelloOrchestration", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("HelloOrchestration", func(ctx *task.OrchestrationContext) (any, error) { var input string if err := ctx.GetInput(&input); err != nil { return nil, err @@ -1157,20 +1275,24 @@ func Test_RecreateCompletedOrchestration(t *testing.T) { var output string err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) return output, err - }) - r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { var name string if err := ctx.GetInput(&name); err != nil { return nil, err } return fmt.Sprintf("Hello, %s!", name), nil - }) + })) // Initialization ctx := context.Background() exporter := initTracing() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() // Run the first orchestration id, err := client.ScheduleNewOrchestration(ctx, "HelloOrchestration", api.WithInput("世界")) @@ -1205,7 +1327,7 @@ func Test_RecreateCompletedOrchestration(t *testing.T) { func Test_SingleActivity_ReuseInstanceIDIgnore(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { var input string if err := ctx.GetInput(&input); err != nil { return nil, err @@ -1213,19 +1335,23 @@ func Test_SingleActivity_ReuseInstanceIDIgnore(t *testing.T) { var output string err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) return output, err - }) - r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { var name string if err := ctx.GetInput(&name); err != nil { return nil, err } return fmt.Sprintf("Hello, %s!", name), nil - }) + })) // Initialization ctx := context.Background() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() instanceID := api.InstanceID("IGNORE_IF_RUNNING_OR_COMPLETED") reuseIdPolicy := &api.OrchestrationIdReusePolicy{ @@ -1237,7 +1363,8 @@ func Test_SingleActivity_ReuseInstanceIDIgnore(t *testing.T) { id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID)) require.NoError(t, err) // wait orchestration to start - client.WaitForOrchestrationStart(ctx, id) + _, err = client.WaitForOrchestrationStart(ctx, id) + require.NoError(t, err) pivotTime := time.Now() // schedule again, it should ignore creating the new orchestration id, err = client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationIdReusePolicy(reuseIdPolicy)) @@ -1256,7 +1383,7 @@ func Test_SingleActivity_ReuseInstanceIDIgnore(t *testing.T) { func Test_SingleActivity_ReuseInstanceIDTerminate(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { var input string if err := ctx.GetInput(&input); err != nil { return nil, err @@ -1264,19 +1391,23 @@ func Test_SingleActivity_ReuseInstanceIDTerminate(t *testing.T) { var output string err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) return output, err - }) - r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { var name string if err := ctx.GetInput(&name); err != nil { return nil, err } return fmt.Sprintf("Hello, %s!", name), nil - }) + })) // Initialization ctx := context.Background() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() instanceID := api.InstanceID("TERMINATE_IF_RUNNING_OR_COMPLETED") reuseIdPolicy := &api.OrchestrationIdReusePolicy{ @@ -1288,7 +1419,8 @@ func Test_SingleActivity_ReuseInstanceIDTerminate(t *testing.T) { id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID)) require.NoError(t, err) // wait orchestration to start - client.WaitForOrchestrationStart(ctx, id) + _, err = client.WaitForOrchestrationStart(ctx, id) + require.NoError(t, err) pivotTime := time.Now() // schedule again, it should terminate the first orchestration and start a new one id, err = client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationIdReusePolicy(reuseIdPolicy)) @@ -1307,7 +1439,7 @@ func Test_SingleActivity_ReuseInstanceIDTerminate(t *testing.T) { func Test_SingleActivity_ReuseInstanceIDError(t *testing.T) { // Registration r := task.NewTaskRegistry() - r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { var input string if err := ctx.GetInput(&input); err != nil { return nil, err @@ -1315,26 +1447,30 @@ func Test_SingleActivity_ReuseInstanceIDError(t *testing.T) { var output string err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) return output, err - }) - r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + })) + require.NoError(t, r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { var name string if err := ctx.GetInput(&name); err != nil { return nil, err } return fmt.Sprintf("Hello, %s!", name), nil - }) + })) // Initialization ctx := context.Background() client, worker := initTaskHubWorker(ctx, r) - defer worker.Shutdown(ctx) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() instanceID := api.InstanceID("ERROR_IF_RUNNING_OR_COMPLETED") // Run the orchestration - id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID)) + _, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID)) require.NoError(t, err) - id, err = client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id)) + _, err = client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(instanceID)) if assert.Error(t, err) { assert.Contains(t, err.Error(), "orchestration instance already exists") } diff --git a/tests/task_executor_test.go b/tests/task_executor_test.go index 864bfb6d..9e2d1d2e 100644 --- a/tests/task_executor_test.go +++ b/tests/task_executor_test.go @@ -16,11 +16,13 @@ import ( func Test_Executor_WaitForEventSchedulesTimer(t *testing.T) { timerDuration := 5 * time.Second r := task.NewTaskRegistry() - r.AddOrchestratorN("Orchestration", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("Orchestration", func(ctx *task.OrchestrationContext) (any, error) { var value int - ctx.WaitForSingleEvent("MyEvent", timerDuration).Await(&value) + if err := ctx.WaitForSingleEvent("MyEvent", timerDuration).Await(&value); err != nil { + return nil, err + } return value, nil - }) + })) iid := api.InstanceID("abc123") startEvent := helpers.NewOrchestratorStartedEvent() // determines the current orchestrator time @@ -46,11 +48,13 @@ func Test_Executor_WaitForEventSchedulesTimer(t *testing.T) { // The correct behavior is that a suspended orchestration should not return any actions. func Test_Executor_SuspendStopsAllActions(t *testing.T) { r := task.NewTaskRegistry() - r.AddOrchestratorN("SuspendResumeOrchestration", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, r.AddOrchestratorN("SuspendResumeOrchestration", func(ctx *task.OrchestrationContext) (any, error) { var value int - ctx.WaitForSingleEvent("MyEvent", 5*time.Second).Await(&value) + if err := ctx.WaitForSingleEvent("MyEvent", 5*time.Second).Await(&value); err != nil { + return nil, err + } return value, nil - }) + })) executor := task.NewTaskExecutor(r) iid := api.InstanceID("abc123")