Skip to content

Commit

Permalink
fix: txpool size change to update normal and foundrows pool
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Nov 6, 2024
1 parent b45d7c9 commit 43af039
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 32 deletions.
2 changes: 1 addition & 1 deletion go/vt/vttablet/endtoend/call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
Expand Down Expand Up @@ -136,6 +135,7 @@ func TestCallProcedureLeakTx(t *testing.T) {

func TestCallProcedureChangedTx(t *testing.T) {
client := framework.NewClient()
defer client.Release()

_, err := client.Execute(`call proc_tx_begin()`, nil)
require.EqualError(t, err, "Transaction not concluded inside the stored procedure, leaking transaction from stored procedure is not allowed (CallerID: dev)")
Expand Down
43 changes: 42 additions & 1 deletion go/vt/vttablet/endtoend/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -73,6 +72,48 @@ func TestStreamPoolSize(t *testing.T) {
verifyIntValue(t, vstart, "StreamConnPoolCapacity", 1)
}

// TestTxPoolSize starts 2 transactions, one in normal pool and one in found rows pool of transaction pool.
// Changing the pool size to 1, we verify that the pool size is updated and the pool is full when we try to acquire next transaction.
func TestTxPoolSize(t *testing.T) {
vstart := framework.DebugVars()

verifyIntValue(t, vstart, "TransactionPoolCapacity", 20)
verifyIntValue(t, vstart, "FoundRowsPoolCapacity", 20)

client1 := framework.NewClient()
err := client1.Begin( /* found rows pool*/ false)
require.NoError(t, err)
defer client1.Rollback()
verifyIntValue(t, framework.DebugVars(), "TransactionPoolAvailable", framework.FetchInt(vstart, "TransactionPoolAvailable")-1)

client2 := framework.NewClient()
err = client2.Begin( /* found rows pool*/ true)
require.NoError(t, err)
defer client2.Rollback()
verifyIntValue(t, framework.DebugVars(), "FoundRowsPoolAvailable", framework.FetchInt(vstart, "FoundRowsPoolAvailable")-1)

revert := changeVar(t, "TxPoolSize", "1")
defer revert()
vend := framework.DebugVars()
verifyIntValue(t, vend, "TransactionPoolAvailable", 0)
verifyIntValue(t, vend, "TransactionPoolCapacity", 1)
verifyIntValue(t, vend, "FoundRowsPoolAvailable", 0)
verifyIntValue(t, vend, "FoundRowsPoolCapacity", 1)
assert.Equal(t, 1, framework.Server.TxPoolSize())

client3 := framework.NewClient()

// tx pool - normal
err = client3.Begin( /* found rows pool*/ false)
require.ErrorContains(t, err, "connection limit exceeded")
compareIntDiff(t, framework.DebugVars(), "Errors/RESOURCE_EXHAUSTED", vstart, 1)

// tx pool - found rows
err = client3.Begin( /* found rows pool*/ true)
require.ErrorContains(t, err, "connection limit exceeded")
compareIntDiff(t, framework.DebugVars(), "Errors/RESOURCE_EXHAUSTED", vstart, 2)
}

func TestDisableConsolidator(t *testing.T) {
totalConsolidationsTag := "Waits/Histograms/Consolidations/Count"
initial := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
Expand Down
30 changes: 1 addition & 29 deletions go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,16 @@ import (
"testing"
"time"

"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv/tabletenvtest"

"google.golang.org/protobuf/proto"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/utils"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

func TestCommit(t *testing.T) {
Expand Down Expand Up @@ -202,30 +198,6 @@ func TestAutoCommit(t *testing.T) {
}
}

func TestTxPoolSize(t *testing.T) {
tabletenvtest.LoadTabletEnvFlags()

vstart := framework.DebugVars()

client1 := framework.NewClient()
err := client1.Begin(false)
require.NoError(t, err)
defer client1.Rollback()
verifyIntValue(t, framework.DebugVars(), "TransactionPoolAvailable", tabletenv.NewCurrentConfig().TxPool.Size-1)

revert := changeVar(t, "TxPoolSize", "1")
defer revert()
vend := framework.DebugVars()
verifyIntValue(t, vend, "TransactionPoolAvailable", 0)
verifyIntValue(t, vend, "TransactionPoolCapacity", 1)

client2 := framework.NewClient()
err = client2.Begin(false)
require.Error(t, err)
require.Contains(t, err.Error(), "connection limit exceeded")
compareIntDiff(t, framework.DebugVars(), "Errors/RESOURCE_EXHAUSTED", vstart, 1)
}

func TestForUpdate(t *testing.T) {
for _, mode := range []string{"for update", "lock in share mode"} {
client := framework.NewClient()
Expand Down
6 changes: 5 additions & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,11 @@ func (tsv *TabletServer) StreamPoolSize() int {

// SetTxPoolSize changes the tx pool size to the specified value.
func (tsv *TabletServer) SetTxPoolSize(ctx context.Context, val int) error {
return tsv.te.txPool.scp.conns.SetCapacity(ctx, int64(val))
// TxPool manages two pools, one for normal connections and one for CLIENT_FOUND_ROWS capability enabled on the connections.
if err := tsv.te.txPool.scp.conns.SetCapacity(ctx, int64(val)); err != nil {
return err
}
return tsv.te.txPool.scp.foundRowsPool.SetCapacity(ctx, int64(val))
}

// TxPoolSize returns the tx pool size.
Expand Down

0 comments on commit 43af039

Please sign in to comment.