Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add conclude transaction command to vtctld service #16693

Merged
merged 10 commits into from
Sep 16, 2024
29 changes: 0 additions & 29 deletions go/cmd/vtctldclient/command/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,6 @@ var (
RunE: commandExecuteMultiFetchAsDBA,
Aliases: []string{"ExecuteMultiFetchAsDba"},
}
// GetUnresolvedTransactions makes an GetUnresolvedTransactions gRPC call to a vtctld.
GetUnresolvedTransactions = &cobra.Command{
Use: "GetUnresolvedTransactions <keyspace>",
Short: "Retrieves unresolved transactions for the given keyspace.",
Args: cobra.ExactArgs(1),
RunE: commandGetUnresolvedTransactions,
}
)

var executeFetchAsAppOptions = struct {
Expand Down Expand Up @@ -205,26 +198,6 @@ func commandExecuteMultiFetchAsDBA(cmd *cobra.Command, args []string) error {
return nil
}

func commandGetUnresolvedTransactions(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

keyspace := cmd.Flags().Arg(0)
resp, err := client.GetUnresolvedTransactions(commandCtx,
&vtctldatapb.GetUnresolvedTransactionsRequest{
Keyspace: keyspace,
})
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp.Transactions)
if err != nil {
return err
}
fmt.Printf("%s\n", data)
return nil
}

func init() {
ExecuteFetchAsApp.Flags().Int64Var(&executeFetchAsAppOptions.MaxRows, "max-rows", 10_000, "The maximum number of rows to fetch from the remote tablet.")
ExecuteFetchAsApp.Flags().BoolVar(&executeFetchAsAppOptions.UsePool, "use-pool", false, "Use the tablet connection pool instead of creating a fresh connection.")
Expand All @@ -242,6 +215,4 @@ func init() {
ExecuteMultiFetchAsDBA.Flags().BoolVar(&executeMultiFetchAsDBAOptions.ReloadSchema, "reload-schema", false, "Instructs the tablet to reload its schema after executing the query.")
ExecuteMultiFetchAsDBA.Flags().BoolVarP(&executeMultiFetchAsDBAOptions.JSON, "json", "j", false, "Output the results in JSON instead of a human-readable table.")
Root.AddCommand(ExecuteMultiFetchAsDBA)

Root.AddCommand(GetUnresolvedTransactions)
}
134 changes: 134 additions & 0 deletions go/cmd/vtctldclient/command/transactions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package command

import (
"fmt"

"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/cli"
querypb "vitess.io/vitess/go/vt/proto/query"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

var (
DistributedTransaction = &cobra.Command{
Use: "DistributedTransaction <cmd>",
Short: "Perform commands on distributed transaction",
Args: cobra.MinimumNArgs(2),

DisableFlagsInUseLine: true,
}

// GetUnresolvedTransactions makes an GetUnresolvedTransactions gRPC call to a vtctld.
GetUnresolvedTransactions = &cobra.Command{
Use: "list <keyspace>",
Short: "Retrieves unresolved transactions for the given keyspace.",
Aliases: []string{"List"},
Args: cobra.ExactArgs(1),
RunE: commandGetUnresolvedTransactions,

DisableFlagsInUseLine: true,
}

// ConcludeTransaction makes a ConcludeTransaction gRPC call to a vtctld.
ConcludeTransaction = &cobra.Command{
Use: "conclude <dtid> [<keyspace/shard> ...]",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we even take in <keyspace/shard> list? Why don't we just conclude the transaction everywhere. Like the user only has one option, which is to conclude the transaction on all the shards, not on some shards.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking about it that if the list of keyspace/shard is not provided we will get it and then conclude on all.
This is more from if the user gets the unresolved transactions list then it knows which all shards are involved and can provide the list to conclude.

We can make the flow better. Let's do that in a follow up PR.

Short: "Concludes the unresolved transaction by rolling back the prepared transaction on each participating shard and removing the transaction metadata record.",
Aliases: []string{"Conclude"},
Args: cobra.MinimumNArgs(1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the examples it is not entirely clear to me what the arguments to conclude will look like. But we have found it more useful to specify the parameters as flags rather than depend on args because it is easy to forget the order or delimiters etc. We had that problem a lot with the earlier vreplication commands. It is a bit more typing

For example it could be
DistributedTransactions Conclude --keyspace ks1 --shards -40,a0- --dtid 82345 OR with shorthand codes

DistributedTransactions Conclude -k ks1 -s -40,a0- --dtid 82345

You can decide if this works better or not ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A transaction can span multiple keyspaces so keyspace and shard will go in pair like keyspace/shard

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not find a good way to record the participants list.
We keep it like this for now.

RunE: commandConcludeTransaction,

DisableFlagsInUseLine: true,
}
)

type ConcludeTransactionOutput struct {
Dtid string `json:"dtid"`
Message string `json:"message"`
Error string `json:"error,omitempty"`
}

const (
concludeSuccess = "Successfully concluded the distributed transaction"
concludeFailure = "Failed to conclude the distributed transaction"
)

func commandGetUnresolvedTransactions(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

keyspace := cmd.Flags().Arg(0)
resp, err := client.GetUnresolvedTransactions(commandCtx,
&vtctldatapb.GetUnresolvedTransactionsRequest{
Keyspace: keyspace,
})
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp.Transactions)
if err != nil {
return err
}
fmt.Println(string(data))
return nil
}

func commandConcludeTransaction(cmd *cobra.Command, args []string) error {
allArgs := cmd.Flags().Args()
shards, err := cli.ParseKeyspaceShards(allArgs[1:])
if err != nil {
return err
}
cli.FinishedParsing(cmd)

dtid := allArgs[0]
var participants []*querypb.Target
for _, shard := range shards {
participants = append(participants, &querypb.Target{
Keyspace: shard.Keyspace,
Shard: shard.Name,
})
}
output := ConcludeTransactionOutput{
Dtid: dtid,
Message: concludeSuccess,
}

_, err = client.ConcludeTransaction(commandCtx,
&vtctldatapb.ConcludeTransactionRequest{
Dtid: dtid,
Participants: participants,
})
if err != nil {
output.Message = concludeFailure
output.Error = err.Error()
}

data, _ := cli.MarshalJSON(output)
fmt.Println(string(data))

return err
}

func init() {
DistributedTransaction.AddCommand(GetUnresolvedTransactions)
DistributedTransaction.AddCommand(ConcludeTransaction)

Root.AddCommand(DistributedTransaction)
}
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtctldclient.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Available Commands:
DeleteShards Deletes the specified shards from the topology.
DeleteSrvVSchema Deletes the SrvVSchema object in the given cell.
DeleteTablets Deletes tablet(s) from the topology.
DistributedTransaction Perform commands on distributed transaction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be plural? I see from the discussion below that there is just one dt per shard: is that for all users and connections?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plural makes more sense to me too, especially for DistributedTransactions list since there can be multiple transactions that are listed.

EmergencyReparentShard Reparents the shard to the new primary. Assumes the old primary is dead and not responding.
ExecuteFetchAsApp Executes the given query as the App user on the remote tablet.
ExecuteFetchAsDBA Executes the given query as the DBA user on the remote tablet.
Expand Down Expand Up @@ -59,7 +60,6 @@ Available Commands:
GetTablets Looks up tablets according to filter criteria.
GetThrottlerStatus Get the throttler status for the given tablet.
GetTopologyPath Gets the value associated with the particular path (key) in the topology server.
GetUnresolvedTransactions Retrieves unresolved transactions for the given keyspace.
GetVSchema Prints a JSON representation of a keyspace's topo record.
GetWorkflows Gets all vreplication workflows (Reshard, MoveTables, etc) in the given keyspace.
LegacyVtctlCommand Invoke a legacy vtctlclient command. Flag parsing is best effort.
Expand Down
12 changes: 11 additions & 1 deletion go/test/endtoend/tabletmanager/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,19 @@ func TestTabletCommands(t *testing.T) {
})

t.Run("GetUnresolvedTransactions", func(t *testing.T) {
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetUnresolvedTransactions", keyspaceName)
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "list", keyspaceName)
require.NoError(t, err)
})
t.Run("ConcludeTransaction", func(t *testing.T) {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "conclude", "ks:0:1234")
assert.NoError(t, err)
assert.Contains(t, output, "Successfully concluded the distributed transaction")
})
t.Run("ConcludeTransaction with participants", func(t *testing.T) {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "conclude", "ks:0:1234", "ks/0")
assert.NoError(t, err)
assert.Contains(t, output, "Successfully concluded the distributed transaction")
})
// check Ping / RefreshState / RefreshStateByShard
err = clusterInstance.VtctldClientProcess.ExecuteCommand("PingTablet", primaryTablet.Alias)
require.Nil(t, err, "error should be Nil")
Expand Down
Loading
Loading