From dd13879b65d949ed27cad9edaa3108a27177b560 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 24 May 2024 12:04:14 +0530 Subject: [PATCH] test: add testing for shard locking Signed-off-by: Manan Gupta --- go/test/endtoend/topotest/consul/main_test.go | 44 +++++++++++++++++++ go/test/endtoend/topotest/etcd2/main_test.go | 44 +++++++++++++++++++ go/test/endtoend/topotest/utils/utils.go | 39 ++++++++++++++++ go/test/endtoend/topotest/zk2/main_test.go | 44 +++++++++++++++++++ 4 files changed, 171 insertions(+) create mode 100644 go/test/endtoend/topotest/utils/utils.go diff --git a/go/test/endtoend/topotest/consul/main_test.go b/go/test/endtoend/topotest/consul/main_test.go index 1c278864ced..b01af94f406 100644 --- a/go/test/endtoend/topotest/consul/main_test.go +++ b/go/test/endtoend/topotest/consul/main_test.go @@ -24,7 +24,9 @@ import ( "testing" "time" + topoUtils "vitess.io/vitess/go/test/endtoend/topotest/utils" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" @@ -140,6 +142,48 @@ func TestTopoRestart(t *testing.T) { } } +// TestShardLocking tests that shard locking works as intended. +func TestShardLocking(t *testing.T) { + // create topo server connection + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot) + require.NoError(t, err) + + // Acquire a shard lock. + ctx, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking") + require.NoError(t, err) + // Check that we can't reacquire it from the same context. + _, _, err = ts.LockShard(ctx, KeyspaceName, "0", "TestShardLocking") + require.ErrorContains(t, err, "lock for shard customer/0 is already held") + // Also check that TryLockShard is non-blocking and returns an error. + _, _, err = ts.TryLockShard(context.Background(), KeyspaceName, "0", "TestShardLocking") + require.ErrorContains(t, err, "node already exists: lock already exists at path keyspaces/customer/shards/0") + // Check that CheckShardLocked doesn't return an error. + err = topo.CheckShardLocked(ctx, KeyspaceName, "0") + require.NoError(t, err) + + // We'll now try to acquire the lock from a different thread. + secondThreadLockAcquired := false + go func() { + _, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking") + defer unlock(&err) + require.NoError(t, err) + secondThreadLockAcquired = true + }() + + // Wait for some time and ensure that the second acquiring of lock shard is blocked. + time.Sleep(100 * time.Millisecond) + require.False(t, secondThreadLockAcquired) + + // Unlock the shard. + unlock(&err) + // Check that we no longer have shard lock acquired. + err = topo.CheckShardLocked(ctx, KeyspaceName, "0") + require.ErrorContains(t, err, "shard customer/0 is not locked (no lockInfo in map)") + + // Wait to see that the second thread was able to acquire the shard lock. + topoUtils.WaitForBoolValue(t, &secondThreadLockAcquired, true) +} + func execute(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { t.Helper() qr, err := conn.ExecuteFetch(query, 1000, true) diff --git a/go/test/endtoend/topotest/etcd2/main_test.go b/go/test/endtoend/topotest/etcd2/main_test.go index db34bd2ee86..071375fc4b2 100644 --- a/go/test/endtoend/topotest/etcd2/main_test.go +++ b/go/test/endtoend/topotest/etcd2/main_test.go @@ -23,7 +23,9 @@ import ( "testing" "time" + topoUtils "vitess.io/vitess/go/test/endtoend/topotest/utils" "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/log" @@ -115,6 +117,48 @@ func TestTopoDownServingQuery(t *testing.T) { utils.AssertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`) } +// TestShardLocking tests that shard locking works as intended. +func TestShardLocking(t *testing.T) { + // create topo server connection + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot) + require.NoError(t, err) + + // Acquire a shard lock. + ctx, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking") + require.NoError(t, err) + // Check that we can't reacquire it from the same context. + _, _, err = ts.LockShard(ctx, KeyspaceName, "0", "TestShardLocking") + require.ErrorContains(t, err, "lock for shard customer/0 is already held") + // Also check that TryLockShard is non-blocking and returns an error. + _, _, err = ts.TryLockShard(context.Background(), KeyspaceName, "0", "TestShardLocking") + require.ErrorContains(t, err, "node already exists: lock already exists at path keyspaces/customer/shards/0") + // Check that CheckShardLocked doesn't return an error. + err = topo.CheckShardLocked(ctx, KeyspaceName, "0") + require.NoError(t, err) + + // We'll now try to acquire the lock from a different thread. + secondThreadLockAcquired := false + go func() { + _, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking") + defer unlock(&err) + require.NoError(t, err) + secondThreadLockAcquired = true + }() + + // Wait for some time and ensure that the second acquiring of lock shard is blocked. + time.Sleep(100 * time.Millisecond) + require.False(t, secondThreadLockAcquired) + + // Unlock the shard. + unlock(&err) + // Check that we no longer have shard lock acquired. + err = topo.CheckShardLocked(ctx, KeyspaceName, "0") + require.ErrorContains(t, err, "shard customer/0 is not locked (no lockInfo in map)") + + // Wait to see that the second thread was able to acquire the shard lock. + topoUtils.WaitForBoolValue(t, &secondThreadLockAcquired, true) +} + func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result { t.Helper() var res []*sqltypes.Result diff --git a/go/test/endtoend/topotest/utils/utils.go b/go/test/endtoend/topotest/utils/utils.go new file mode 100644 index 00000000000..6908696a93e --- /dev/null +++ b/go/test/endtoend/topotest/utils/utils.go @@ -0,0 +1,39 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "testing" + "time" +) + +// WaitForBoolValue takes a pointer to a boolean and waits for it to reach a certain value. +func WaitForBoolValue(t *testing.T, val *bool, waitFor bool) { + timeout := time.After(15 * time.Second) + for { + select { + case <-timeout: + t.Fatalf("Timed out waiting for the boolean to become %v", waitFor) + return + default: + if *val == waitFor { + return + } + time.Sleep(100 * time.Millisecond) + } + } +} diff --git a/go/test/endtoend/topotest/zk2/main_test.go b/go/test/endtoend/topotest/zk2/main_test.go index 816bbc72d72..b252001cdd5 100644 --- a/go/test/endtoend/topotest/zk2/main_test.go +++ b/go/test/endtoend/topotest/zk2/main_test.go @@ -23,7 +23,9 @@ import ( "testing" "time" + topoUtils "vitess.io/vitess/go/test/endtoend/topotest/utils" "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/log" @@ -116,6 +118,48 @@ func TestTopoDownServingQuery(t *testing.T) { utils.AssertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`) } +// TestShardLocking tests that shard locking works as intended. +func TestShardLocking(t *testing.T) { + // create topo server connection + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot) + require.NoError(t, err) + + // Acquire a shard lock. + ctx, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking") + require.NoError(t, err) + // Check that we can't reacquire it from the same context. + _, _, err = ts.LockShard(ctx, KeyspaceName, "0", "TestShardLocking") + require.ErrorContains(t, err, "lock for shard customer/0 is already held") + // Also check that TryLockShard is non-blocking and returns an error. + _, _, err = ts.TryLockShard(context.Background(), KeyspaceName, "0", "TestShardLocking") + require.ErrorContains(t, err, "node already exists: lock already exists at path keyspaces/customer/shards/0") + // Check that CheckShardLocked doesn't return an error. + err = topo.CheckShardLocked(ctx, KeyspaceName, "0") + require.NoError(t, err) + + // We'll now try to acquire the lock from a different thread. + secondThreadLockAcquired := false + go func() { + _, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking") + defer unlock(&err) + require.NoError(t, err) + secondThreadLockAcquired = true + }() + + // Wait for some time and ensure that the second acquiring of lock shard is blocked. + time.Sleep(100 * time.Millisecond) + require.False(t, secondThreadLockAcquired) + + // Unlock the shard. + unlock(&err) + // Check that we no longer have shard lock acquired. + err = topo.CheckShardLocked(ctx, KeyspaceName, "0") + require.ErrorContains(t, err, "shard customer/0 is not locked (no lockInfo in map)") + + // Wait to see that the second thread was able to acquire the shard lock. + topoUtils.WaitForBoolValue(t, &secondThreadLockAcquired, true) +} + func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result { t.Helper() var res []*sqltypes.Result