From 69a727f014e9f7d3dc50f6025e97345b031b4af9 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Thu, 9 Apr 2026 19:57:15 +0800 Subject: [PATCH 1/2] retry: add backoff and bounds to unbounded retry loops Three retry patterns in the codebase spin without sleep or upper bound, causing CPU waste and potential cascading failures when remote services are temporarily unavailable. 1. lock_table_remote.go unlock()/getLock(): add exponential backoff (100ms->5s) with 30s deadline to prevent tight-loop CPU spinning when the remote lock service is unreachable. 2. txn/service/service.go parallelSendWithRetry(): add exponential backoff (100ms->1s) between Send failures, reset on success. Already bounded by ctx.Done(), but the tight loop hammers the sender unnecessarily during outages. 3. lockop/lock_op.go getRetryWaitDuration(): fix the budget<=0 guard from PR #24105 so that disabling backend retry budget (budget=0) only prevents retries for backend availability errors (ErrBackendCannotConnect, ErrBackendClosed, etc), not for all errors. Normal retryable errors like ErrLockTableBindChanged still retry normally. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pkg/lockservice/lock_table_remote.go | 37 ++++++++++++++++++ pkg/sql/colexec/lockop/lock_op.go | 6 ++- pkg/sql/colexec/lockop/lock_op_test.go | 52 ++++++++++++++++++++++++++ pkg/txn/service/service.go | 19 ++++++++++ 4 files changed, 113 insertions(+), 1 deletion(-) diff --git a/pkg/lockservice/lock_table_remote.go b/pkg/lockservice/lock_table_remote.go index 0c1fe42dc68fc..d63bd83bee9ef 100644 --- a/pkg/lockservice/lock_table_remote.go +++ b/pkg/lockservice/lock_table_remote.go @@ -33,6 +33,15 @@ import ( "go.uber.org/zap" ) +var ( + // Backoff parameters for remote lock table retry loops (unlock, getLock). + // These prevent CPU spinning when the remote lock service is temporarily + // unavailable. + remoteRetryInitialBackoff = 100 * time.Millisecond + remoteRetryMaxBackoff = 5 * time.Second + remoteRetryMaxDuration = 30 * time.Second +) + // remoteLockTable the lock corresponding to the Table is managed by a remote LockTable. // And the remoteLockTable acts as a proxy for this LockTable locally. type remoteLockTable struct { @@ -150,6 +159,8 @@ func (l *remoteLockTable) unlock( txn, l.bind, ) + backoff := remoteRetryInitialBackoff + deadline := time.Now().Add(remoteRetryMaxDuration) for { err := l.doUnlock(txn, commitTS, mutations...) if err == nil { @@ -171,6 +182,18 @@ func (l *remoteLockTable) unlock( if err := l.handleError(err, false); err == nil { return } + if time.Now().After(deadline) { + l.logger.Error("unlock retry budget exhausted, giving up", + zap.Uint64("table-id", l.bind.Table), + zap.String("txn", hex.EncodeToString(txn.txnID)), + zap.Duration("budget", remoteRetryMaxDuration)) + return + } + time.Sleep(backoff) + backoff *= 2 + if backoff > remoteRetryMaxBackoff { + backoff = remoteRetryMaxBackoff + } } } @@ -178,6 +201,8 @@ func (l *remoteLockTable) getLock( key []byte, txn pb.WaitTxn, fn func(Lock)) { + backoff := remoteRetryInitialBackoff + deadline := time.Now().Add(remoteRetryMaxDuration) for { lock, ok, err := l.doGetLock(key, txn) if err == nil { @@ -192,6 +217,18 @@ func (l *remoteLockTable) getLock( if err = l.handleError(err, false); err == nil { return } + if time.Now().After(deadline) { + l.logger.Error("getLock retry budget exhausted, giving up", + zap.Uint64("table-id", l.bind.Table), + zap.String("txn", hex.EncodeToString(txn.TxnID)), + zap.Duration("budget", remoteRetryMaxDuration)) + return + } + time.Sleep(backoff) + backoff *= 2 + if backoff > remoteRetryMaxBackoff { + backoff = remoteRetryMaxBackoff + } } } diff --git a/pkg/sql/colexec/lockop/lock_op.go b/pkg/sql/colexec/lockop/lock_op.go index da921ba981ba4..439a7f3e02b88 100644 --- a/pkg/sql/colexec/lockop/lock_op.go +++ b/pkg/sql/colexec/lockop/lock_op.go @@ -869,8 +869,12 @@ func waitToRetryLock(ctx context.Context, wait time.Duration) bool { } func getRetryWaitDuration(err error, retryState *lockRetryState) (time.Duration, bool) { + // When backend budget is disabled, only non-backend errors may retry. if defaultMaxWaitTimeOnRetryBackendLock <= 0 { - return 0, false + if isBoundedRetryLockError(err) { + return 0, false + } + return defaultWaitTimeOnRetryLock, true } now := time.Now() diff --git a/pkg/sql/colexec/lockop/lock_op_test.go b/pkg/sql/colexec/lockop/lock_op_test.go index 1245109f4882c..ba2ea8a4aa7bf 100644 --- a/pkg/sql/colexec/lockop/lock_op_test.go +++ b/pkg/sql/colexec/lockop/lock_op_test.go @@ -404,6 +404,58 @@ func TestLockWithRetryFailsFastWhenBackendRetryBudgetDisabled(t *testing.T) { require.Less(t, time.Since(start), defaultWaitTimeOnRetryLock) } +// TestLockWithRetryStillRetriesBindChangeWhenBackendBudgetDisabled verifies that +// disabling the backend retry budget (setting it to 0) only prevents retries for +// backend availability errors, not for normal retryable errors like +// ErrLockTableBindChanged. +func TestLockWithRetryStillRetriesBindChangeWhenBackendBudgetDisabled(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + lockSvc := mock_lock.NewMockLockService(ctrl) + txnOp := mock_frontend.NewMockTxnOperator(ctrl) + + oldWait := defaultWaitTimeOnRetryLock + oldBudget := defaultMaxWaitTimeOnRetryBackendLock + defaultWaitTimeOnRetryLock = 50 * time.Millisecond + defaultMaxWaitTimeOnRetryBackendLock = 0 + defer func() { + defaultWaitTimeOnRetryLock = oldWait + defaultMaxWaitTimeOnRetryBackendLock = oldBudget + }() + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + gomock.InOrder( + // First call: ErrLockTableBindChanged should still be retried + lockSvc.EXPECT(). + Lock(ctx, uint64(1), gomock.Nil(), []byte("txn1"), lock.LockOptions{}). + Return(lock.Result{}, moerr.NewLockTableBindChangedNoCtx()), + txnOp.EXPECT().HasLockTable(uint64(1)).Return(false), + // Second call: succeed + lockSvc.EXPECT(). + Lock(ctx, uint64(1), gomock.Nil(), []byte("txn1"), lock.LockOptions{}). + Return(lock.Result{Timestamp: timestamp.Timestamp{PhysicalTime: 1}}, nil), + ) + + result, err := lockWithRetry( + ctx, + lockSvc, + 1, + nil, + []byte("txn1"), + lock.LockOptions{}, + txnOp, + nil, + nil, + LockOptions{}, + types.Type{}, + ) + require.NoError(t, err) + require.Equal(t, int64(1), result.Timestamp.PhysicalTime) +} + func TestLockWithRetryRetriesInsideLoopAndReturnsSecondResult(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/pkg/txn/service/service.go b/pkg/txn/service/service.go index 02d8af0776ab3..4e9bf905c235c 100644 --- a/pkg/txn/service/service.go +++ b/pkg/txn/service/service.go @@ -238,6 +238,11 @@ func (s *service) parallelSendWithRetry( ctx context.Context, requests []txn.TxnRequest, ignoreTxnErrorCodes map[uint16]struct{}) *rpc.SendResult { + const ( + initialBackoff = 100 * time.Millisecond + maxBackoff = time.Second + ) + backoff := initialBackoff for { select { case <-ctx.Done(): @@ -248,8 +253,22 @@ func (s *service) parallelSendWithRetry( if err != nil { err = moerr.AttachCause(ctx, err) util.LogTxnSendRequestsFailed(s.logger, requests, err) + timer := time.NewTimer(backoff) + select { + case <-ctx.Done(): + timer.Stop() + return nil + case <-timer.C: + } + if backoff < maxBackoff { + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } continue } + backoff = initialBackoff util.LogTxnReceivedResponses(s.logger, result.Responses) hasError := false for _, resp := range result.Responses { From 76ed2b52251aa4a13ed64f17c3237a3cb276ab0d Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Wed, 15 Apr 2026 18:25:03 +0800 Subject: [PATCH 2/2] test: cover retry backoff branches Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pkg/lockservice/lock_table_remote_test.go | 121 ++++++++++++++++++++++ pkg/txn/service/service_test.go | 64 ++++++++++++ 2 files changed, 185 insertions(+) diff --git a/pkg/lockservice/lock_table_remote_test.go b/pkg/lockservice/lock_table_remote_test.go index e00c48eb41829..f778904d31701 100644 --- a/pkg/lockservice/lock_table_remote_test.go +++ b/pkg/lockservice/lock_table_remote_test.go @@ -212,6 +212,127 @@ func TestUnlockRemoteWithRetry(t *testing.T) { ) } +func TestUnlockRemoteStopsWhenRetryBudgetExhausted(t *testing.T) { + oldInitial := remoteRetryInitialBackoff + oldMaxBackoff := remoteRetryMaxBackoff + oldMaxDuration := remoteRetryMaxDuration + remoteRetryInitialBackoff = time.Millisecond + remoteRetryMaxBackoff = time.Millisecond + remoteRetryMaxDuration = -time.Millisecond + defer func() { + remoteRetryInitialBackoff = oldInitial + remoteRetryMaxBackoff = oldMaxBackoff + remoteRetryMaxDuration = oldMaxDuration + }() + + n := 0 + runRemoteLockTableTests( + t, + pb.LockTable{ServiceID: "s1"}, + func(s Server) { + s.RegisterMethodHandler( + pb.Method_Unlock, + func( + ctx context.Context, + cancel context.CancelFunc, + req *pb.Request, + resp *pb.Response, + cs morpc.ClientSession) { + n++ + writeResponse(getLogger(""), cancel, resp, moerr.NewRPCTimeout(ctx), cs) + }, + ) + s.RegisterMethodHandler( + pb.Method_GetBind, + func( + ctx context.Context, + cancel context.CancelFunc, + req *pb.Request, + resp *pb.Response, + cs morpc.ClientSession) { + resp.GetBind.LockTable = pb.LockTable{ + ServiceID: "s1", + Valid: true, + } + writeResponse(getLogger(""), cancel, resp, nil, cs) + }, + ) + }, + func(l *remoteLockTable, s Server) { + txnID := []byte("txn1") + txn := newActiveTxn(txnID, string(txnID), newFixedSlicePool(32), "") + l.unlock(txn, nil, timestamp.Timestamp{}) + assert.Equal(t, 1, n) + reuse.Free(txn, nil) + }, + func(lt pb.LockTable) {}, + ) +} + +func TestGetLockRemoteWithRetry(t *testing.T) { + oldInitial := remoteRetryInitialBackoff + oldMaxBackoff := remoteRetryMaxBackoff + oldMaxDuration := remoteRetryMaxDuration + remoteRetryInitialBackoff = time.Millisecond + remoteRetryMaxBackoff = time.Millisecond + remoteRetryMaxDuration = time.Second + defer func() { + remoteRetryInitialBackoff = oldInitial + remoteRetryMaxBackoff = oldMaxBackoff + remoteRetryMaxDuration = oldMaxDuration + }() + + n := 0 + called := false + runRemoteLockTableTests( + t, + pb.LockTable{ServiceID: "s1"}, + func(s Server) { + s.RegisterMethodHandler( + pb.Method_GetTxnLock, + func( + ctx context.Context, + cancel context.CancelFunc, + req *pb.Request, + resp *pb.Response, + cs morpc.ClientSession) { + n++ + if n == 1 { + writeResponse(getLogger(""), cancel, resp, moerr.NewRPCTimeout(ctx), cs) + return + } + resp.GetTxnLock.Value = int32(pb.Granularity_Row) + writeResponse(getLogger(""), cancel, resp, nil, cs) + }, + ) + s.RegisterMethodHandler( + pb.Method_GetBind, + func( + ctx context.Context, + cancel context.CancelFunc, + req *pb.Request, + resp *pb.Response, + cs morpc.ClientSession) { + resp.GetBind.LockTable = pb.LockTable{ + ServiceID: "s1", + Valid: true, + } + writeResponse(getLogger(""), cancel, resp, nil, cs) + }, + ) + }, + func(l *remoteLockTable, s Server) { + l.getLock([]byte("row1"), pb.WaitTxn{TxnID: []byte("txn1")}, func(lock Lock) { + called = true + assert.Equal(t, byte(pb.Granularity_Row), lock.value) + }) + assert.True(t, called) + assert.Equal(t, 2, n) + }, + func(lt pb.LockTable) {}, + ) +} + func TestRemoteWithBindChanged(t *testing.T) { newBind := pb.LockTable{ ServiceID: "s2", diff --git a/pkg/txn/service/service_test.go b/pkg/txn/service/service_test.go index b05b9bca4e619..766a3b651eef4 100644 --- a/pkg/txn/service/service_test.go +++ b/pkg/txn/service/service_test.go @@ -23,8 +23,21 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/pb/txn" + "github.com/matrixorigin/matrixone/pkg/txn/rpc" ) +type retryTestSender struct { + send func(context.Context, []txn.TxnRequest) (*rpc.SendResult, error) +} + +func (s *retryTestSender) Send(ctx context.Context, requests []txn.TxnRequest) (*rpc.SendResult, error) { + return s.send(ctx, requests) +} + +func (s *retryTestSender) Close() error { + return nil +} + func TestGCZombie(t *testing.T) { sender := NewTestSender() defer func() { @@ -142,3 +155,54 @@ func Test_parallelSendWithRetry(t *testing.T) { sender.action = "return_err_and_reset" s.parallelSendWithRetry(ctx, nil, nil) } + +func Test_parallelSendWithRetryStopsDuringBackoffOnContextCancel(t *testing.T) { + firstSend := make(chan struct{}) + sender := &retryTestSender{ + send: func(ctx context.Context, requests []txn.TxnRequest) (*rpc.SendResult, error) { + select { + case <-firstSend: + return &rpc.SendResult{}, nil + default: + close(firstSend) + return nil, moerr.NewInternalErrorNoCtx("return error") + } + }, + } + s := NewTestTxnServiceWithLogAndZombie(t, 1, sender, NewTestClock(1), nil, time.Millisecond*3).(*service) + assert.NoError(t, s.Start()) + defer func() { + assert.NoError(t, s.Close(false)) + }() + + ctx, cancel := context.WithCancel(context.Background()) + resultC := make(chan *rpc.SendResult, 1) + go func() { + resultC <- s.parallelSendWithRetry(ctx, nil, nil) + }() + + <-firstSend + cancel() + assert.Nil(t, <-resultC) +} + +func Test_parallelSendWithRetryReturnsNilWhenContextAlreadyCanceled(t *testing.T) { + called := false + sender := &retryTestSender{ + send: func(ctx context.Context, requests []txn.TxnRequest) (*rpc.SendResult, error) { + called = true + return &rpc.SendResult{}, nil + }, + } + s := NewTestTxnServiceWithLogAndZombie(t, 1, sender, NewTestClock(1), nil, time.Millisecond*3).(*service) + assert.NoError(t, s.Start()) + defer func() { + assert.NoError(t, s.Close(false)) + }() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + assert.Nil(t, s.parallelSendWithRetry(ctx, nil, nil)) + assert.False(t, called) +}