Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres:13.18
image: postgres:16
env:
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
Expand All @@ -50,21 +50,21 @@ jobs:

steps:
# Checks-out your repository under $GITHUB_WORKSPACE
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
fetch-depth: 1
submodules: true

- name: Setup Go environment
uses: actions/setup-go@v3.3.0
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}

- name: Install dependencies
run: go get .

- name: Install Protoc
uses: arduino/setup-protoc@v2
uses: arduino/setup-protoc@v3
with:
version: ${{ env.PROTOC_VERSION }}

Expand All @@ -76,6 +76,12 @@ jobs:
- name: Generate grpc code
run: protoc --go_out=. --go-grpc_out=. -I ./submodules/durabletask-protobuf/protos orchestrator_service.proto

- name: Run go vet
run: go vet ./...

- name: Run golangci-lint
uses: golangci/golangci-lint-action@v7

- name: Run integration tests
env:
POSTGRES_ENABLED: "true"
Expand Down
23 changes: 23 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
version: "2"

run:
timeout: 5m

linters:
enable:
- errcheck
- govet
- ineffassign
- staticcheck
- unused
- gocritic
- errorlint
settings:
errorlint:
errorf: true
asserts: true
comparison: true
exclusions:
paths:
- internal/protos
- tests/mocks
2 changes: 1 addition & 1 deletion backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ loop:
}

// mustEmbedUnimplementedTaskHubSidecarServiceServer implements protos.TaskHubSidecarServiceServer
func (grpcExecutor) mustEmbedUnimplementedTaskHubSidecarServiceServer() {
func (grpcExecutor) mustEmbedUnimplementedTaskHubSidecarServiceServer() { //nolint:unused
}

func createGetInstanceResponse(req *protos.GetInstanceRequest, metadata *api.OrchestrationMetadata) *protos.GetInstanceResponse {
Expand Down
23 changes: 14 additions & 9 deletions backend/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -143,21 +144,24 @@ func (p *orchestratorProcessor) AbandonWorkItem(ctx context.Context, wi WorkItem

func (w *orchestratorProcessor) applyWorkItem(ctx context.Context, wi *OrchestrationWorkItem) (context.Context, trace.Span, bool) {
// Ignore work items for orchestrations that are completed or are in a corrupted state.
if !wi.State.IsValid() {
switch {
case !wi.State.IsValid():
w.logger.Warnf("%v: orchestration state is invalid; dropping work item", wi.InstanceID)
return nil, nil, false
} else if wi.State.IsCompleted() {
case wi.State.IsCompleted():
w.logger.Warnf("%v: orchestration already completed; dropping work item", wi.InstanceID)
return nil, nil, false
} else if len(wi.NewEvents) == 0 {
case len(wi.NewEvents) == 0:
w.logger.Warnf("%v: the work item had no events!", wi.InstanceID)
}

// The orchestrator started event is used primarily for updating the current time as reported
// by the orchestration context APIs.
wi.State.AddEvent(helpers.NewOrchestratorStartedEvent())
if err := wi.State.AddEvent(helpers.NewOrchestratorStartedEvent()); err != nil {
w.logger.Warnf("%v: failed to add orchestrator started event: %v", wi.InstanceID, err)
}

// Each orchestration instance gets its own distributed tracing span. However, the implementation of
// Each orchestration instancegets its own distributed tracing span. However, the implementation of
// endOrchestratorSpan will "cancel" the span mark the span as "unsampled" if the orchestration isn't
// complete. This is part of the strategy for producing one span for the entire orchestration execution,
// which isn't something that's natively supported by OTel today.
Expand All @@ -169,7 +173,7 @@ func (w *orchestratorProcessor) applyWorkItem(ctx context.Context, wi *Orchestra
added := 0
for _, e := range wi.NewEvents {
if err := wi.State.AddEvent(e); err != nil {
if err == ErrDuplicateEvent {
if errors.Is(err, ErrDuplicateEvent) {
w.logger.Warnf("%v: dropping duplicate event: %v", wi.InstanceID, e)
} else {
w.logger.Warnf("%v: dropping event: %v, %v", wi.InstanceID, e, err)
Expand Down Expand Up @@ -270,7 +274,8 @@ func (w *orchestratorProcessor) startOrResumeOrchestratorSpan(ctx context.Contex
}

func (w *orchestratorProcessor) endOrchestratorSpan(ctx context.Context, wi *OrchestrationWorkItem, span trace.Span, continuedAsNew bool) {
if wi.State.IsCompleted() {
switch {
case wi.State.IsCompleted():
if fd, err := wi.State.FailureDetails(); err == nil {
span.SetStatus(codes.Error, fd.ErrorMessage)
}
Expand All @@ -280,12 +285,12 @@ func (w *orchestratorProcessor) endOrchestratorSpan(ctx context.Context, wi *Orc
})
addNotableEventsToSpan(wi.State.OldEvents(), span)
addNotableEventsToSpan(wi.State.NewEvents(), span)
} else if continuedAsNew {
case continuedAsNew:
span.SetAttributes(attribute.KeyValue{
Key: "durabletask.runtime_status",
Value: attribute.StringValue(helpers.ToRuntimeStatusString(protos.OrchestrationStatus_ORCHESTRATION_STATUS_CONTINUED_AS_NEW)),
})
} else {
default:
// Cancel the span - we want to publish it only when an orchestration
// completes or when it continue-as-new's.
helpers.CancelSpan(span)
Expand Down
37 changes: 19 additions & 18 deletions backend/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewPostgresOptions(host string, port uint16, database string, user string,
if err != nil {
panic(fmt.Errorf("failed to parse the postgres connection string: %w", err))
}
conf.ConnConfig.Config.ConnectTimeout = 2 * time.Minute
conf.ConnConfig.ConnectTimeout = 2 * time.Minute
conf.MaxConnLifetime = 2 * time.Minute
conf.MaxConnIdleTime = 2 * time.Minute
conf.MaxConns = 1
Expand Down Expand Up @@ -138,9 +138,9 @@ func (be *postgresBackend) AbandonOrchestrationWorkItem(ctx context.Context, wi
if err != nil {
return err
}
defer tx.Rollback(ctx)
defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op

var visibleTime *time.Time = nil
var visibleTime*time.Time = nil
if delay := wi.GetAbandonDelay(); delay > 0 {
t := time.Now().UTC().Add(delay)
visibleTime = &t
Expand Down Expand Up @@ -199,7 +199,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi
if err != nil {
return err
}
defer tx.Rollback(ctx)
defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op

now := time.Now().UTC()

Expand All @@ -219,7 +219,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi
continue
}
isCreated = true
sqlSB.WriteString(fmt.Sprintf("CreatedTime = $%d, Input = $%d, ", currIndex, currIndex+1))
fmt.Fprintf(&sqlSB, "CreatedTime = $%d, Input = $%d, ", currIndex, currIndex+1)
currIndex += 2
sqlUpdateArgs = append(sqlUpdateArgs, e.Timestamp.AsTime())
sqlUpdateArgs = append(sqlUpdateArgs, es.Input.GetValue())
Expand All @@ -229,7 +229,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi
continue
}
isCompleted = true
sqlSB.WriteString(fmt.Sprintf("CompletedTime = $%d, Output = $%d, FailureDetails = $%d, ", currIndex, currIndex+1, currIndex+2))
fmt.Fprintf(&sqlSB, "CompletedTime = $%d, Output = $%d, FailureDetails = $%d, ", currIndex, currIndex+1, currIndex+2)
currIndex += 3
sqlUpdateArgs = append(sqlUpdateArgs, now)
sqlUpdateArgs = append(sqlUpdateArgs, ec.Result.GetValue())
Expand All @@ -247,14 +247,13 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi
}

if wi.State.CustomStatus != nil {
sqlSB.WriteString(fmt.Sprintf("CustomStatus = $%d, ", currIndex))
fmt.Fprintf(&sqlSB, "CustomStatus = $%d, ", currIndex)
currIndex++
sqlUpdateArgs = append(sqlUpdateArgs, wi.State.CustomStatus.Value)
}

// TODO: Support for stickiness, which would extend the LockExpiration
sqlSB.WriteString(fmt.Sprintf("RuntimeStatus = $%d, LastUpdatedTime = $%d, LockExpiration = NULL WHERE InstanceID = $%d AND LockedBy = $%d", currIndex, currIndex+1, currIndex+2, currIndex+3))
currIndex += 4
fmt.Fprintf(&sqlSB, "RuntimeStatus = $%d, LastUpdatedTime = $%d, LockExpiration = NULL WHERE InstanceID = $%d AND LockedBy = $%d", currIndex, currIndex+1, currIndex+2, currIndex+3)
sqlUpdateArgs = append(sqlUpdateArgs, helpers.ToRuntimeStatusString(wi.State.RuntimeStatus()), now, string(wi.InstanceID), wi.LockedBy)

result, err := tx.Exec(ctx, sqlSB.String(), sqlUpdateArgs...)
Expand Down Expand Up @@ -282,7 +281,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi
builder := strings.Builder{}
builder.WriteString("INSERT INTO History (InstanceID, SequenceNumber, EventPayload) VALUES ")
for i := 0; i < newHistoryCount; i++ {
builder.WriteString(fmt.Sprintf("($%d, $%d, $%d)", 3*i+1, 3*i+2, 3*i+3))
fmt.Fprintf(&builder, "($%d, $%d, $%d)", 3*i+1, 3*i+2, 3*i+3)
if i < newHistoryCount-1 {
builder.WriteString(", ")
}
Expand Down Expand Up @@ -313,7 +312,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi
builder := strings.Builder{}
builder.WriteString("INSERT INTO NewTasks (InstanceID, EventPayload) VALUES ")
for i := 0; i < newActivityCount; i++ {
builder.WriteString(fmt.Sprintf("($%d, $%d)", 2*i+1, 2*i+2))
fmt.Fprintf(&builder, "($%d, $%d)", 2*i+1, 2*i+2)
if i < newActivityCount-1 {
builder.WriteString(", ")
}
Expand Down Expand Up @@ -342,7 +341,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi
builder := strings.Builder{}
builder.WriteString("INSERT INTO NewEvents (InstanceID, EventPayload, VisibleTime) VALUES ")
for i := 0; i < newEventCount; i++ {
builder.WriteString(fmt.Sprintf("($%d, $%d, $%d)", 3*i+1, 3*i+2, 3*i+3))
fmt.Fprintf(&builder, "($%d, $%d, $%d)", 3*i+1, 3*i+2, 3*i+3)
if i < newEventCount-1 {
builder.WriteString(", ")
}
Expand Down Expand Up @@ -431,7 +430,7 @@ func (be *postgresBackend) CreateOrchestrationInstance(ctx context.Context, e *b
if err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}
defer tx.Rollback(ctx)
defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op

var instanceID string
if instanceID, err = be.createOrchestrationInstanceInternal(ctx, e, tx, opts...); errors.Is(err, api.ErrIgnoreInstance) {
Expand Down Expand Up @@ -480,7 +479,9 @@ func (be *postgresBackend) createOrchestrationInstanceInternal(ctx context.Conte
policy := &protos.OrchestrationIdReusePolicy{}

for _, opt := range opts {
opt(policy)
if err := opt(policy); err != nil {
return "", err
}
}

rows, err := insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent)
Expand Down Expand Up @@ -765,10 +766,10 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)
defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op

now := time.Now().UTC()
newLockExpiration := now.Add(be.options.OrchestrationLockTimeout)
newLockExpiration:= now.Add(be.options.OrchestrationLockTimeout)

// Place a lock on an orchestration instance that has new events that are ready to be executed.
row := tx.QueryRow(
Expand Down Expand Up @@ -913,7 +914,7 @@ func (be *postgresBackend) CompleteActivityWorkItem(ctx context.Context, wi *bac
if err != nil {
return err
}
defer tx.Rollback(ctx)
defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op

bytes, err := backend.MarshalHistoryEvent(wi.Result)
if err != nil {
Expand Down Expand Up @@ -978,7 +979,7 @@ func (be *postgresBackend) PurgeOrchestrationState(ctx context.Context, id api.I
if err != nil {
return err
}
defer tx.Rollback(ctx)
defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op

if err := be.cleanupOrchestrationStateInternal(ctx, tx, id, true); err != nil {
return err
Expand Down
Loading
Loading