Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions pkg/lockservice/lock_table_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ import (
"go.uber.org/zap"
)

var (
// Backoff parameters for remote lock table retry loops (unlock, getLock).
// These prevent CPU spinning when the remote lock service is temporarily
// unavailable.
remoteRetryInitialBackoff = 100 * time.Millisecond
remoteRetryMaxBackoff = 5 * time.Second
remoteRetryMaxDuration = 30 * time.Second
)

// remoteLockTable the lock corresponding to the Table is managed by a remote LockTable.
// And the remoteLockTable acts as a proxy for this LockTable locally.
type remoteLockTable struct {
Expand Down Expand Up @@ -150,6 +159,8 @@ func (l *remoteLockTable) unlock(
txn,
l.bind,
)
backoff := remoteRetryInitialBackoff
deadline := time.Now().Add(remoteRetryMaxDuration)
for {
err := l.doUnlock(txn, commitTS, mutations...)
if err == nil {
Expand All @@ -171,13 +182,27 @@ func (l *remoteLockTable) unlock(
if err := l.handleError(err, false); err == nil {
return
}
if time.Now().After(deadline) {
l.logger.Error("unlock retry budget exhausted, giving up",
zap.Uint64("table-id", l.bind.Table),
zap.String("txn", hex.EncodeToString(txn.txnID)),
zap.Duration("budget", remoteRetryMaxDuration))
return
}
time.Sleep(backoff)
backoff *= 2
if backoff > remoteRetryMaxBackoff {
backoff = remoteRetryMaxBackoff
}
}
}

func (l *remoteLockTable) getLock(
key []byte,
txn pb.WaitTxn,
fn func(Lock)) {
backoff := remoteRetryInitialBackoff
deadline := time.Now().Add(remoteRetryMaxDuration)
for {
lock, ok, err := l.doGetLock(key, txn)
if err == nil {
Expand All @@ -192,6 +217,18 @@ func (l *remoteLockTable) getLock(
if err = l.handleError(err, false); err == nil {
return
}
if time.Now().After(deadline) {
l.logger.Error("getLock retry budget exhausted, giving up",
zap.Uint64("table-id", l.bind.Table),
zap.String("txn", hex.EncodeToString(txn.TxnID)),
zap.Duration("budget", remoteRetryMaxDuration))
return
}
time.Sleep(backoff)
backoff *= 2
if backoff > remoteRetryMaxBackoff {
backoff = remoteRetryMaxBackoff
}
}
}

Expand Down
121 changes: 121 additions & 0 deletions pkg/lockservice/lock_table_remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,127 @@ func TestUnlockRemoteWithRetry(t *testing.T) {
)
}

func TestUnlockRemoteStopsWhenRetryBudgetExhausted(t *testing.T) {
oldInitial := remoteRetryInitialBackoff
oldMaxBackoff := remoteRetryMaxBackoff
oldMaxDuration := remoteRetryMaxDuration
remoteRetryInitialBackoff = time.Millisecond
remoteRetryMaxBackoff = time.Millisecond
remoteRetryMaxDuration = -time.Millisecond
defer func() {
remoteRetryInitialBackoff = oldInitial
remoteRetryMaxBackoff = oldMaxBackoff
remoteRetryMaxDuration = oldMaxDuration
}()

n := 0
runRemoteLockTableTests(
t,
pb.LockTable{ServiceID: "s1"},
func(s Server) {
s.RegisterMethodHandler(
pb.Method_Unlock,
func(
ctx context.Context,
cancel context.CancelFunc,
req *pb.Request,
resp *pb.Response,
cs morpc.ClientSession) {
n++
writeResponse(getLogger(""), cancel, resp, moerr.NewRPCTimeout(ctx), cs)
},
)
s.RegisterMethodHandler(
pb.Method_GetBind,
func(
ctx context.Context,
cancel context.CancelFunc,
req *pb.Request,
resp *pb.Response,
cs morpc.ClientSession) {
resp.GetBind.LockTable = pb.LockTable{
ServiceID: "s1",
Valid: true,
}
writeResponse(getLogger(""), cancel, resp, nil, cs)
},
)
},
func(l *remoteLockTable, s Server) {
txnID := []byte("txn1")
txn := newActiveTxn(txnID, string(txnID), newFixedSlicePool(32), "")
l.unlock(txn, nil, timestamp.Timestamp{})
assert.Equal(t, 1, n)
reuse.Free(txn, nil)
},
func(lt pb.LockTable) {},
)
}

func TestGetLockRemoteWithRetry(t *testing.T) {
oldInitial := remoteRetryInitialBackoff
oldMaxBackoff := remoteRetryMaxBackoff
oldMaxDuration := remoteRetryMaxDuration
remoteRetryInitialBackoff = time.Millisecond
remoteRetryMaxBackoff = time.Millisecond
remoteRetryMaxDuration = time.Second
defer func() {
remoteRetryInitialBackoff = oldInitial
remoteRetryMaxBackoff = oldMaxBackoff
remoteRetryMaxDuration = oldMaxDuration
}()

n := 0
called := false
runRemoteLockTableTests(
t,
pb.LockTable{ServiceID: "s1"},
func(s Server) {
s.RegisterMethodHandler(
pb.Method_GetTxnLock,
func(
ctx context.Context,
cancel context.CancelFunc,
req *pb.Request,
resp *pb.Response,
cs morpc.ClientSession) {
n++
if n == 1 {
writeResponse(getLogger(""), cancel, resp, moerr.NewRPCTimeout(ctx), cs)
return
}
resp.GetTxnLock.Value = int32(pb.Granularity_Row)
writeResponse(getLogger(""), cancel, resp, nil, cs)
},
)
s.RegisterMethodHandler(
pb.Method_GetBind,
func(
ctx context.Context,
cancel context.CancelFunc,
req *pb.Request,
resp *pb.Response,
cs morpc.ClientSession) {
resp.GetBind.LockTable = pb.LockTable{
ServiceID: "s1",
Valid: true,
}
writeResponse(getLogger(""), cancel, resp, nil, cs)
},
)
},
func(l *remoteLockTable, s Server) {
l.getLock([]byte("row1"), pb.WaitTxn{TxnID: []byte("txn1")}, func(lock Lock) {
called = true
assert.Equal(t, byte(pb.Granularity_Row), lock.value)
})
assert.True(t, called)
assert.Equal(t, 2, n)
},
func(lt pb.LockTable) {},
)
}

func TestRemoteWithBindChanged(t *testing.T) {
newBind := pb.LockTable{
ServiceID: "s2",
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/colexec/lockop/lock_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,12 @@ func waitToRetryLock(ctx context.Context, wait time.Duration) bool {
}

func getRetryWaitDuration(err error, retryState *lockRetryState) (time.Duration, bool) {
// When backend budget is disabled, only non-backend errors may retry.
if defaultMaxWaitTimeOnRetryBackendLock <= 0 {
return 0, false
if isBoundedRetryLockError(err) {
return 0, false
}
return defaultWaitTimeOnRetryLock, true
}

now := time.Now()
Expand Down
52 changes: 52 additions & 0 deletions pkg/sql/colexec/lockop/lock_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,58 @@ func TestLockWithRetryFailsFastWhenBackendRetryBudgetDisabled(t *testing.T) {
require.Less(t, time.Since(start), defaultWaitTimeOnRetryLock)
}

// TestLockWithRetryStillRetriesBindChangeWhenBackendBudgetDisabled verifies that
// disabling the backend retry budget (setting it to 0) only prevents retries for
// backend availability errors, not for normal retryable errors like
// ErrLockTableBindChanged.
func TestLockWithRetryStillRetriesBindChangeWhenBackendBudgetDisabled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

lockSvc := mock_lock.NewMockLockService(ctrl)
txnOp := mock_frontend.NewMockTxnOperator(ctrl)

oldWait := defaultWaitTimeOnRetryLock
oldBudget := defaultMaxWaitTimeOnRetryBackendLock
defaultWaitTimeOnRetryLock = 50 * time.Millisecond
defaultMaxWaitTimeOnRetryBackendLock = 0
defer func() {
defaultWaitTimeOnRetryLock = oldWait
defaultMaxWaitTimeOnRetryBackendLock = oldBudget
}()

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

gomock.InOrder(
// First call: ErrLockTableBindChanged should still be retried
lockSvc.EXPECT().
Lock(ctx, uint64(1), gomock.Nil(), []byte("txn1"), lock.LockOptions{}).
Return(lock.Result{}, moerr.NewLockTableBindChangedNoCtx()),
txnOp.EXPECT().HasLockTable(uint64(1)).Return(false),
// Second call: succeed
lockSvc.EXPECT().
Lock(ctx, uint64(1), gomock.Nil(), []byte("txn1"), lock.LockOptions{}).
Return(lock.Result{Timestamp: timestamp.Timestamp{PhysicalTime: 1}}, nil),
)

result, err := lockWithRetry(
ctx,
lockSvc,
1,
nil,
[]byte("txn1"),
lock.LockOptions{},
txnOp,
nil,
nil,
LockOptions{},
types.Type{},
)
require.NoError(t, err)
require.Equal(t, int64(1), result.Timestamp.PhysicalTime)
}

func TestLockWithRetryRetriesInsideLoopAndReturnsSecondResult(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
19 changes: 19 additions & 0 deletions pkg/txn/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ func (s *service) parallelSendWithRetry(
ctx context.Context,
requests []txn.TxnRequest,
ignoreTxnErrorCodes map[uint16]struct{}) *rpc.SendResult {
const (
initialBackoff = 100 * time.Millisecond
maxBackoff = time.Second
)
backoff := initialBackoff
for {
select {
case <-ctx.Done():
Expand All @@ -248,8 +253,22 @@ func (s *service) parallelSendWithRetry(
if err != nil {
err = moerr.AttachCause(ctx, err)
util.LogTxnSendRequestsFailed(s.logger, requests, err)
timer := time.NewTimer(backoff)
select {
case <-ctx.Done():
timer.Stop()
return nil
case <-timer.C:
}
if backoff < maxBackoff {
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
continue
}
backoff = initialBackoff
util.LogTxnReceivedResponses(s.logger, result.Responses)
hasError := false
for _, resp := range result.Responses {
Expand Down
64 changes: 64 additions & 0 deletions pkg/txn/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,21 @@ import (

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/txn/rpc"
)

type retryTestSender struct {
send func(context.Context, []txn.TxnRequest) (*rpc.SendResult, error)
}

func (s *retryTestSender) Send(ctx context.Context, requests []txn.TxnRequest) (*rpc.SendResult, error) {
return s.send(ctx, requests)
}

func (s *retryTestSender) Close() error {
return nil
}

func TestGCZombie(t *testing.T) {
sender := NewTestSender()
defer func() {
Expand Down Expand Up @@ -142,3 +155,54 @@ func Test_parallelSendWithRetry(t *testing.T) {
sender.action = "return_err_and_reset"
s.parallelSendWithRetry(ctx, nil, nil)
}

func Test_parallelSendWithRetryStopsDuringBackoffOnContextCancel(t *testing.T) {
firstSend := make(chan struct{})
sender := &retryTestSender{
send: func(ctx context.Context, requests []txn.TxnRequest) (*rpc.SendResult, error) {
select {
case <-firstSend:
return &rpc.SendResult{}, nil
default:
close(firstSend)
return nil, moerr.NewInternalErrorNoCtx("return error")
}
},
}
s := NewTestTxnServiceWithLogAndZombie(t, 1, sender, NewTestClock(1), nil, time.Millisecond*3).(*service)
assert.NoError(t, s.Start())
defer func() {
assert.NoError(t, s.Close(false))
}()

ctx, cancel := context.WithCancel(context.Background())
resultC := make(chan *rpc.SendResult, 1)
go func() {
resultC <- s.parallelSendWithRetry(ctx, nil, nil)
}()

<-firstSend
cancel()
assert.Nil(t, <-resultC)
}

func Test_parallelSendWithRetryReturnsNilWhenContextAlreadyCanceled(t *testing.T) {
called := false
sender := &retryTestSender{
send: func(ctx context.Context, requests []txn.TxnRequest) (*rpc.SendResult, error) {
called = true
return &rpc.SendResult{}, nil
},
}
s := NewTestTxnServiceWithLogAndZombie(t, 1, sender, NewTestClock(1), nil, time.Millisecond*3).(*service)
assert.NoError(t, s.Start())
defer func() {
assert.NoError(t, s.Close(false))
}()

ctx, cancel := context.WithCancel(context.Background())
cancel()

assert.Nil(t, s.parallelSendWithRetry(ctx, nil, nil))
assert.False(t, called)
}
Loading