Skip to content

Commit

Permalink
test: add testing for shard locking
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed May 24, 2024
1 parent 95ac32b commit dd13879
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 0 deletions.
44 changes: 44 additions & 0 deletions go/test/endtoend/topotest/consul/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"testing"
"time"

topoUtils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -140,6 +142,48 @@ func TestTopoRestart(t *testing.T) {
}
}

// TestShardLocking tests that shard locking works as intended.
func TestShardLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a shard lock.
ctx, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockShard(ctx, KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "lock for shard customer/0 is already held")
// Also check that TryLockShard is non-blocking and returns an error.
_, _, err = ts.TryLockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "node already exists: lock already exists at path keyspaces/customer/shards/0")
// Check that CheckShardLocked doesn't return an error.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the shard.
unlock(&err)
// Check that we no longer have shard lock acquired.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.ErrorContains(t, err, "shard customer/0 is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoUtils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

func execute(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
Expand Down
44 changes: 44 additions & 0 deletions go/test/endtoend/topotest/etcd2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"testing"
"time"

topoUtils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/topo"

"vitess.io/vitess/go/vt/log"

Expand Down Expand Up @@ -115,6 +117,48 @@ func TestTopoDownServingQuery(t *testing.T) {
utils.AssertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
}

// TestShardLocking tests that shard locking works as intended.
func TestShardLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a shard lock.
ctx, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockShard(ctx, KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "lock for shard customer/0 is already held")
// Also check that TryLockShard is non-blocking and returns an error.
_, _, err = ts.TryLockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "node already exists: lock already exists at path keyspaces/customer/shards/0")
// Check that CheckShardLocked doesn't return an error.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the shard.
unlock(&err)
// Check that we no longer have shard lock acquired.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.ErrorContains(t, err, "shard customer/0 is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoUtils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
Expand Down
39 changes: 39 additions & 0 deletions go/test/endtoend/topotest/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"testing"
"time"
)

// WaitForBoolValue takes a pointer to a boolean and waits for it to reach a certain value.
func WaitForBoolValue(t *testing.T, val *bool, waitFor bool) {
timeout := time.After(15 * time.Second)
for {
select {
case <-timeout:
t.Fatalf("Timed out waiting for the boolean to become %v", waitFor)
return
default:
if *val == waitFor {
return
}
time.Sleep(100 * time.Millisecond)
}
}
}
44 changes: 44 additions & 0 deletions go/test/endtoend/topotest/zk2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"testing"
"time"

topoUtils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/topo"

"vitess.io/vitess/go/vt/log"

Expand Down Expand Up @@ -116,6 +118,48 @@ func TestTopoDownServingQuery(t *testing.T) {
utils.AssertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
}

// TestShardLocking tests that shard locking works as intended.
func TestShardLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a shard lock.
ctx, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockShard(ctx, KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "lock for shard customer/0 is already held")
// Also check that TryLockShard is non-blocking and returns an error.
_, _, err = ts.TryLockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "node already exists: lock already exists at path keyspaces/customer/shards/0")
// Check that CheckShardLocked doesn't return an error.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the shard.
unlock(&err)
// Check that we no longer have shard lock acquired.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.ErrorContains(t, err, "shard customer/0 is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoUtils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
Expand Down

0 comments on commit dd13879

Please sign in to comment.