diff --git a/pkg/lockservice/lock_table_remote.go b/pkg/lockservice/lock_table_remote.go index 0c1fe42dc68fc..d63bd83bee9ef 100644 --- a/pkg/lockservice/lock_table_remote.go +++ b/pkg/lockservice/lock_table_remote.go @@ -33,6 +33,15 @@ import ( "go.uber.org/zap" ) +var ( + // Backoff parameters for remote lock table retry loops (unlock, getLock). + // These prevent CPU spinning when the remote lock service is temporarily + // unavailable. + remoteRetryInitialBackoff = 100 * time.Millisecond + remoteRetryMaxBackoff = 5 * time.Second + remoteRetryMaxDuration = 30 * time.Second +) + // remoteLockTable the lock corresponding to the Table is managed by a remote LockTable. // And the remoteLockTable acts as a proxy for this LockTable locally. type remoteLockTable struct { @@ -150,6 +159,8 @@ func (l *remoteLockTable) unlock( txn, l.bind, ) + backoff := remoteRetryInitialBackoff + deadline := time.Now().Add(remoteRetryMaxDuration) for { err := l.doUnlock(txn, commitTS, mutations...) if err == nil { @@ -171,6 +182,18 @@ func (l *remoteLockTable) unlock( if err := l.handleError(err, false); err == nil { return } + if time.Now().After(deadline) { + l.logger.Error("unlock retry budget exhausted, giving up", + zap.Uint64("table-id", l.bind.Table), + zap.String("txn", hex.EncodeToString(txn.txnID)), + zap.Duration("budget", remoteRetryMaxDuration)) + return + } + time.Sleep(backoff) + backoff *= 2 + if backoff > remoteRetryMaxBackoff { + backoff = remoteRetryMaxBackoff + } } } @@ -178,6 +201,8 @@ func (l *remoteLockTable) getLock( key []byte, txn pb.WaitTxn, fn func(Lock)) { + backoff := remoteRetryInitialBackoff + deadline := time.Now().Add(remoteRetryMaxDuration) for { lock, ok, err := l.doGetLock(key, txn) if err == nil { @@ -192,6 +217,18 @@ func (l *remoteLockTable) getLock( if err = l.handleError(err, false); err == nil { return } + if time.Now().After(deadline) { + l.logger.Error("getLock retry budget exhausted, giving up", + zap.Uint64("table-id", l.bind.Table), + zap.String("txn", hex.EncodeToString(txn.TxnID)), + zap.Duration("budget", remoteRetryMaxDuration)) + return + } + time.Sleep(backoff) + backoff *= 2 + if backoff > remoteRetryMaxBackoff { + backoff = remoteRetryMaxBackoff + } } } diff --git a/pkg/lockservice/lock_table_remote_test.go b/pkg/lockservice/lock_table_remote_test.go index e00c48eb41829..f778904d31701 100644 --- a/pkg/lockservice/lock_table_remote_test.go +++ b/pkg/lockservice/lock_table_remote_test.go @@ -212,6 +212,127 @@ func TestUnlockRemoteWithRetry(t *testing.T) { ) } +func TestUnlockRemoteStopsWhenRetryBudgetExhausted(t *testing.T) { + oldInitial := remoteRetryInitialBackoff + oldMaxBackoff := remoteRetryMaxBackoff + oldMaxDuration := remoteRetryMaxDuration + remoteRetryInitialBackoff = time.Millisecond + remoteRetryMaxBackoff = time.Millisecond + remoteRetryMaxDuration = -time.Millisecond + defer func() { + remoteRetryInitialBackoff = oldInitial + remoteRetryMaxBackoff = oldMaxBackoff + remoteRetryMaxDuration = oldMaxDuration + }() + + n := 0 + runRemoteLockTableTests( + t, + pb.LockTable{ServiceID: "s1"}, + func(s Server) { + s.RegisterMethodHandler( + pb.Method_Unlock, + func( + ctx context.Context, + cancel context.CancelFunc, + req *pb.Request, + resp *pb.Response, + cs morpc.ClientSession) { + n++ + writeResponse(getLogger(""), cancel, resp, moerr.NewRPCTimeout(ctx), cs) + }, + ) + s.RegisterMethodHandler( + pb.Method_GetBind, + func( + ctx context.Context, + cancel context.CancelFunc, + req *pb.Request, + resp *pb.Response, + cs morpc.ClientSession) { + resp.GetBind.LockTable = pb.LockTable{ + ServiceID: "s1", + Valid: true, + } + writeResponse(getLogger(""), cancel, resp, nil, cs) + }, + ) + }, + func(l *remoteLockTable, s Server) { + txnID := []byte("txn1") + txn := newActiveTxn(txnID, string(txnID), newFixedSlicePool(32), "") + l.unlock(txn, nil, timestamp.Timestamp{}) + assert.Equal(t, 1, n) + reuse.Free(txn, nil) + }, + func(lt pb.LockTable) {}, + ) +} + +func TestGetLockRemoteWithRetry(t *testing.T) { + oldInitial := remoteRetryInitialBackoff + oldMaxBackoff := remoteRetryMaxBackoff + oldMaxDuration := remoteRetryMaxDuration + remoteRetryInitialBackoff = time.Millisecond + remoteRetryMaxBackoff = time.Millisecond + remoteRetryMaxDuration = time.Second + defer func() { + remoteRetryInitialBackoff = oldInitial + remoteRetryMaxBackoff = oldMaxBackoff + remoteRetryMaxDuration = oldMaxDuration + }() + + n := 0 + called := false + runRemoteLockTableTests( + t, + pb.LockTable{ServiceID: "s1"}, + func(s Server) { + s.RegisterMethodHandler( + pb.Method_GetTxnLock, + func( + ctx context.Context, + cancel context.CancelFunc, + req *pb.Request, + resp *pb.Response, + cs morpc.ClientSession) { + n++ + if n == 1 { + writeResponse(getLogger(""), cancel, resp, moerr.NewRPCTimeout(ctx), cs) + return + } + resp.GetTxnLock.Value = int32(pb.Granularity_Row) + writeResponse(getLogger(""), cancel, resp, nil, cs) + }, + ) + s.RegisterMethodHandler( + pb.Method_GetBind, + func( + ctx context.Context, + cancel context.CancelFunc, + req *pb.Request, + resp *pb.Response, + cs morpc.ClientSession) { + resp.GetBind.LockTable = pb.LockTable{ + ServiceID: "s1", + Valid: true, + } + writeResponse(getLogger(""), cancel, resp, nil, cs) + }, + ) + }, + func(l *remoteLockTable, s Server) { + l.getLock([]byte("row1"), pb.WaitTxn{TxnID: []byte("txn1")}, func(lock Lock) { + called = true + assert.Equal(t, byte(pb.Granularity_Row), lock.value) + }) + assert.True(t, called) + assert.Equal(t, 2, n) + }, + func(lt pb.LockTable) {}, + ) +} + func TestRemoteWithBindChanged(t *testing.T) { newBind := pb.LockTable{ ServiceID: "s2", diff --git a/pkg/sql/colexec/lockop/lock_op.go b/pkg/sql/colexec/lockop/lock_op.go index da921ba981ba4..439a7f3e02b88 100644 --- a/pkg/sql/colexec/lockop/lock_op.go +++ b/pkg/sql/colexec/lockop/lock_op.go @@ -869,8 +869,12 @@ func waitToRetryLock(ctx context.Context, wait time.Duration) bool { } func getRetryWaitDuration(err error, retryState *lockRetryState) (time.Duration, bool) { + // When backend budget is disabled, only non-backend errors may retry. if defaultMaxWaitTimeOnRetryBackendLock <= 0 { - return 0, false + if isBoundedRetryLockError(err) { + return 0, false + } + return defaultWaitTimeOnRetryLock, true } now := time.Now() diff --git a/pkg/sql/colexec/lockop/lock_op_test.go b/pkg/sql/colexec/lockop/lock_op_test.go index 1245109f4882c..ba2ea8a4aa7bf 100644 --- a/pkg/sql/colexec/lockop/lock_op_test.go +++ b/pkg/sql/colexec/lockop/lock_op_test.go @@ -404,6 +404,58 @@ func TestLockWithRetryFailsFastWhenBackendRetryBudgetDisabled(t *testing.T) { require.Less(t, time.Since(start), defaultWaitTimeOnRetryLock) } +// TestLockWithRetryStillRetriesBindChangeWhenBackendBudgetDisabled verifies that +// disabling the backend retry budget (setting it to 0) only prevents retries for +// backend availability errors, not for normal retryable errors like +// ErrLockTableBindChanged. +func TestLockWithRetryStillRetriesBindChangeWhenBackendBudgetDisabled(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + lockSvc := mock_lock.NewMockLockService(ctrl) + txnOp := mock_frontend.NewMockTxnOperator(ctrl) + + oldWait := defaultWaitTimeOnRetryLock + oldBudget := defaultMaxWaitTimeOnRetryBackendLock + defaultWaitTimeOnRetryLock = 50 * time.Millisecond + defaultMaxWaitTimeOnRetryBackendLock = 0 + defer func() { + defaultWaitTimeOnRetryLock = oldWait + defaultMaxWaitTimeOnRetryBackendLock = oldBudget + }() + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + gomock.InOrder( + // First call: ErrLockTableBindChanged should still be retried + lockSvc.EXPECT(). + Lock(ctx, uint64(1), gomock.Nil(), []byte("txn1"), lock.LockOptions{}). + Return(lock.Result{}, moerr.NewLockTableBindChangedNoCtx()), + txnOp.EXPECT().HasLockTable(uint64(1)).Return(false), + // Second call: succeed + lockSvc.EXPECT(). + Lock(ctx, uint64(1), gomock.Nil(), []byte("txn1"), lock.LockOptions{}). + Return(lock.Result{Timestamp: timestamp.Timestamp{PhysicalTime: 1}}, nil), + ) + + result, err := lockWithRetry( + ctx, + lockSvc, + 1, + nil, + []byte("txn1"), + lock.LockOptions{}, + txnOp, + nil, + nil, + LockOptions{}, + types.Type{}, + ) + require.NoError(t, err) + require.Equal(t, int64(1), result.Timestamp.PhysicalTime) +} + func TestLockWithRetryRetriesInsideLoopAndReturnsSecondResult(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/pkg/txn/service/service.go b/pkg/txn/service/service.go index 02d8af0776ab3..4e9bf905c235c 100644 --- a/pkg/txn/service/service.go +++ b/pkg/txn/service/service.go @@ -238,6 +238,11 @@ func (s *service) parallelSendWithRetry( ctx context.Context, requests []txn.TxnRequest, ignoreTxnErrorCodes map[uint16]struct{}) *rpc.SendResult { + const ( + initialBackoff = 100 * time.Millisecond + maxBackoff = time.Second + ) + backoff := initialBackoff for { select { case <-ctx.Done(): @@ -248,8 +253,22 @@ func (s *service) parallelSendWithRetry( if err != nil { err = moerr.AttachCause(ctx, err) util.LogTxnSendRequestsFailed(s.logger, requests, err) + timer := time.NewTimer(backoff) + select { + case <-ctx.Done(): + timer.Stop() + return nil + case <-timer.C: + } + if backoff < maxBackoff { + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } continue } + backoff = initialBackoff util.LogTxnReceivedResponses(s.logger, result.Responses) hasError := false for _, resp := range result.Responses { diff --git a/pkg/txn/service/service_test.go b/pkg/txn/service/service_test.go index b05b9bca4e619..766a3b651eef4 100644 --- a/pkg/txn/service/service_test.go +++ b/pkg/txn/service/service_test.go @@ -23,8 +23,21 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/pb/txn" + "github.com/matrixorigin/matrixone/pkg/txn/rpc" ) +type retryTestSender struct { + send func(context.Context, []txn.TxnRequest) (*rpc.SendResult, error) +} + +func (s *retryTestSender) Send(ctx context.Context, requests []txn.TxnRequest) (*rpc.SendResult, error) { + return s.send(ctx, requests) +} + +func (s *retryTestSender) Close() error { + return nil +} + func TestGCZombie(t *testing.T) { sender := NewTestSender() defer func() { @@ -142,3 +155,54 @@ func Test_parallelSendWithRetry(t *testing.T) { sender.action = "return_err_and_reset" s.parallelSendWithRetry(ctx, nil, nil) } + +func Test_parallelSendWithRetryStopsDuringBackoffOnContextCancel(t *testing.T) { + firstSend := make(chan struct{}) + sender := &retryTestSender{ + send: func(ctx context.Context, requests []txn.TxnRequest) (*rpc.SendResult, error) { + select { + case <-firstSend: + return &rpc.SendResult{}, nil + default: + close(firstSend) + return nil, moerr.NewInternalErrorNoCtx("return error") + } + }, + } + s := NewTestTxnServiceWithLogAndZombie(t, 1, sender, NewTestClock(1), nil, time.Millisecond*3).(*service) + assert.NoError(t, s.Start()) + defer func() { + assert.NoError(t, s.Close(false)) + }() + + ctx, cancel := context.WithCancel(context.Background()) + resultC := make(chan *rpc.SendResult, 1) + go func() { + resultC <- s.parallelSendWithRetry(ctx, nil, nil) + }() + + <-firstSend + cancel() + assert.Nil(t, <-resultC) +} + +func Test_parallelSendWithRetryReturnsNilWhenContextAlreadyCanceled(t *testing.T) { + called := false + sender := &retryTestSender{ + send: func(ctx context.Context, requests []txn.TxnRequest) (*rpc.SendResult, error) { + called = true + return &rpc.SendResult{}, nil + }, + } + s := NewTestTxnServiceWithLogAndZombie(t, 1, sender, NewTestClock(1), nil, time.Millisecond*3).(*service) + assert.NoError(t, s.Start()) + defer func() { + assert.NoError(t, s.Close(false)) + }() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + assert.Nil(t, s.parallelSendWithRetry(ctx, nil, nil)) + assert.False(t, called) +}