From 7eed4a25ff18b012ac76683a50971535148c27ab Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 13 May 2024 15:21:22 +0530 Subject: [PATCH 1/4] refactor: remove unnecessary functions Signed-off-by: Manan Gupta --- go/vt/topo/locks.go | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/go/vt/topo/locks.go b/go/vt/topo/locks.go index 6325124c429..1e8f9385524 100644 --- a/go/vt/topo/locks.go +++ b/go/vt/topo/locks.go @@ -339,11 +339,7 @@ func (ts *Server) internalLockShard(ctx context.Context, keyspace, shard, action l := newLock(action) var lockDescriptor LockDescriptor var err error - if isBlocking { - lockDescriptor, err = l.lockShard(ctx, ts, keyspace, shard) - } else { - lockDescriptor, err = l.tryLockShard(ctx, ts, keyspace, shard) - } + lockDescriptor, err = l.internalLockShard(ctx, ts, keyspace, shard, isBlocking) if err != nil { return nil, nil, err } @@ -401,18 +397,6 @@ func CheckShardLocked(ctx context.Context, keyspace, shard string) error { return li.lockDescriptor.Check(ctx) } -// lockShard will lock the shard in the topology server. -// UnlockShard should be called if this returns no error. -func (l *Lock) lockShard(ctx context.Context, ts *Server, keyspace, shard string) (LockDescriptor, error) { - return l.internalLockShard(ctx, ts, keyspace, shard, true) -} - -// tryLockShard will lock the shard in the topology server but unlike `lockShard` it fail-fast if not able to get lock -// UnlockShard should be called if this returns no error. -func (l *Lock) tryLockShard(ctx context.Context, ts *Server, keyspace, shard string) (LockDescriptor, error) { - return l.internalLockShard(ctx, ts, keyspace, shard, false) -} - func (l *Lock) internalLockShard(ctx context.Context, ts *Server, keyspace, shard string, isBlocking bool) (LockDescriptor, error) { log.Infof("Locking shard %v/%v for action %v", keyspace, shard, l.Action) From df06cbdd321d438c44d92240042b8fe066b46391 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 13 May 2024 15:21:55 +0530 Subject: [PATCH 2/4] test: add failing test Signed-off-by: Manan Gupta --- go/vt/topo/memorytopo/server_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/go/vt/topo/memorytopo/server_test.go b/go/vt/topo/memorytopo/server_test.go index c2d1cf6cfb5..0d3d04f88c6 100644 --- a/go/vt/topo/memorytopo/server_test.go +++ b/go/vt/topo/memorytopo/server_test.go @@ -20,6 +20,8 @@ import ( "context" "testing" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/test" ) @@ -32,3 +34,17 @@ func TestMemoryTopo(t *testing.T) { return NewServer(ctx, test.LocalCellName) }, []string{"checkTryLock", "checkShardWithLock"}) } + +func TestLockShardContextHasDeadline(t *testing.T) { + cell := "cell-1" + ks := "ks" + shard := "-" + ts := NewServer(context.Background(), cell) + _, err := ts.GetOrCreateShard(context.Background(), ks, shard) + require.NoError(t, err) + ctx, unlock, err := ts.LockShard(context.Background(), ks, shard, "action") + require.NoError(t, err) + defer unlock(&err) + _, hasDeadline := ctx.Deadline() + require.True(t, hasDeadline) +} From 77fd2dab1cf396a9c34c7841ace7c9afbc11133e Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 13 May 2024 15:25:36 +0530 Subject: [PATCH 3/4] feat: fix lock shard to return the correct context Signed-off-by: Manan Gupta --- go/vt/topo/locks.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/go/vt/topo/locks.go b/go/vt/topo/locks.go index 1e8f9385524..9792d4a1551 100644 --- a/go/vt/topo/locks.go +++ b/go/vt/topo/locks.go @@ -339,7 +339,7 @@ func (ts *Server) internalLockShard(ctx context.Context, keyspace, shard, action l := newLock(action) var lockDescriptor LockDescriptor var err error - lockDescriptor, err = l.internalLockShard(ctx, ts, keyspace, shard, isBlocking) + ctx, lockDescriptor, err = l.internalLockShard(ctx, ts, keyspace, shard, isBlocking) if err != nil { return nil, nil, err } @@ -397,7 +397,7 @@ func CheckShardLocked(ctx context.Context, keyspace, shard string) error { return li.lockDescriptor.Check(ctx) } -func (l *Lock) internalLockShard(ctx context.Context, ts *Server, keyspace, shard string, isBlocking bool) (LockDescriptor, error) { +func (l *Lock) internalLockShard(ctx context.Context, ts *Server, keyspace, shard string, isBlocking bool) (context.Context, LockDescriptor, error) { log.Infof("Locking shard %v/%v for action %v", keyspace, shard, l.Action) ctx, cancel := context.WithTimeout(ctx, getLockTimeout()) @@ -412,12 +412,15 @@ func (l *Lock) internalLockShard(ctx context.Context, ts *Server, keyspace, shar shardPath := path.Join(KeyspacesPath, keyspace, ShardsPath, shard) j, err := l.ToJSON() if err != nil { - return nil, err + return ctx, nil, err } + var ld LockDescriptor if isBlocking { - return ts.globalCell.Lock(ctx, shardPath, j) + ld, err = ts.globalCell.Lock(ctx, shardPath, j) + } else { + ld, err = ts.globalCell.TryLock(ctx, shardPath, j) } - return ts.globalCell.TryLock(ctx, shardPath, j) + return ctx, ld, err } // unlockShard unlocks a previously locked shard. From 449a06a993bdf3dc00951c33bbf72c065f91ab5e Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 13 May 2024 16:14:57 +0530 Subject: [PATCH 4/4] feat: cancel function should be called on unlock Signed-off-by: Manan Gupta --- go/vt/topo/locks.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/go/vt/topo/locks.go b/go/vt/topo/locks.go index 9792d4a1551..6e6843a2c2c 100644 --- a/go/vt/topo/locks.go +++ b/go/vt/topo/locks.go @@ -338,8 +338,9 @@ func (ts *Server) internalLockShard(ctx context.Context, keyspace, shard, action // lock l := newLock(action) var lockDescriptor LockDescriptor + var cancel context.CancelFunc var err error - ctx, lockDescriptor, err = l.internalLockShard(ctx, ts, keyspace, shard, isBlocking) + ctx, cancel, lockDescriptor, err = l.internalLockShard(ctx, ts, keyspace, shard, isBlocking) if err != nil { return nil, nil, err } @@ -352,6 +353,7 @@ func (ts *Server) internalLockShard(ctx context.Context, keyspace, shard, action return ctx, func(finalErr *error) { i.mu.Lock() defer i.mu.Unlock() + cancel() if _, ok := i.info[mapKey]; !ok { if *finalErr != nil { @@ -397,11 +399,10 @@ func CheckShardLocked(ctx context.Context, keyspace, shard string) error { return li.lockDescriptor.Check(ctx) } -func (l *Lock) internalLockShard(ctx context.Context, ts *Server, keyspace, shard string, isBlocking bool) (context.Context, LockDescriptor, error) { +func (l *Lock) internalLockShard(ctx context.Context, ts *Server, keyspace, shard string, isBlocking bool) (context.Context, context.CancelFunc, LockDescriptor, error) { log.Infof("Locking shard %v/%v for action %v", keyspace, shard, l.Action) ctx, cancel := context.WithTimeout(ctx, getLockTimeout()) - defer cancel() span, ctx := trace.NewSpan(ctx, "TopoServer.LockShardForAction") span.Annotate("action", l.Action) @@ -412,7 +413,7 @@ func (l *Lock) internalLockShard(ctx context.Context, ts *Server, keyspace, shar shardPath := path.Join(KeyspacesPath, keyspace, ShardsPath, shard) j, err := l.ToJSON() if err != nil { - return ctx, nil, err + return ctx, cancel, nil, err } var ld LockDescriptor if isBlocking { @@ -420,7 +421,7 @@ func (l *Lock) internalLockShard(ctx context.Context, ts *Server, keyspace, shar } else { ld, err = ts.globalCell.TryLock(ctx, shardPath, j) } - return ctx, ld, err + return ctx, cancel, ld, err } // unlockShard unlocks a previously locked shard.