Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into unresolved-tx-age
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Sep 18, 2024
2 parents 6f131d8 + 3182049 commit d360548
Show file tree
Hide file tree
Showing 54 changed files with 9,878 additions and 9,201 deletions.
1 change: 1 addition & 0 deletions examples/local/vstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func main() {
// MinimizeSkew: false,
// HeartbeatInterval: 60, //seconds
// StopOnReshard: true,
// IncludeReshardJournalEvents: true,
}
reader, err := conn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
if err != nil {
Expand Down
21 changes: 19 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ require (
github.com/HdrHistogram/hdrhistogram-go v0.9.0 // indirect
github.com/aquarapid/vaultlib v0.5.1
github.com/armon/go-metrics v0.4.1 // indirect
github.com/aws/aws-sdk-go v1.55.5
github.com/buger/jsonparser v1.1.1
github.com/cespare/xxhash/v2 v2.3.0
github.com/corpix/uarand v0.1.1 // indirect
Expand All @@ -32,7 +31,6 @@ require (
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/serf v0.10.1 // indirect
github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.9
github.com/klauspost/pgzip v1.2.6
github.com/krishicks/yaml-patch v0.0.10
Expand Down Expand Up @@ -92,6 +90,11 @@ require (
require (
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/Shopify/toxiproxy/v2 v2.9.0
github.com/aws/aws-sdk-go-v2 v1.30.4
github.com/aws/aws-sdk-go-v2/config v1.27.31
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.15
github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1
github.com/aws/smithy-go v1.20.4
github.com/bndr/gotabulate v1.1.2
github.com/gammazero/deque v0.2.1
github.com/google/safehtml v0.1.0
Expand Down Expand Up @@ -124,6 +127,20 @@ require (
github.com/DataDog/go-tuf v1.1.0-0.5.2 // indirect
github.com/DataDog/sketches-go v1.4.6 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.30 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
Expand Down
44 changes: 38 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,44 @@ github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJ
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aws/aws-sdk-go-v2 v1.30.4 h1:frhcagrVNrzmT95RJImMHgabt99vkXGslubDaDagTk8=
github.com/aws/aws-sdk-go-v2 v1.30.4/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 h1:70PVAiL15/aBMh5LThwgXdSQorVr91L127ttckI9QQU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4/go.mod h1:/MQxMqci8tlqDH+pjmoLu1i0tbWCUP1hhyMRuFxpQCw=
github.com/aws/aws-sdk-go-v2/config v1.27.31 h1:kxBoRsjhT3pq0cKthgj6RU6bXTm/2SgdoUMyrVw0rAI=
github.com/aws/aws-sdk-go-v2/config v1.27.31/go.mod h1:z04nZdSWFPaDwK3DdJOG2r+scLQzMYuJeW0CujEm9FM=
github.com/aws/aws-sdk-go-v2/credentials v1.17.30 h1:aau/oYFtibVovr2rDt8FHlU17BTicFEMAi29V1U+L5Q=
github.com/aws/aws-sdk-go-v2/credentials v1.17.30/go.mod h1:BPJ/yXV92ZVq6G8uYvbU0gSl8q94UB63nMT5ctNO38g=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 h1:yjwoSyDZF8Jth+mUk5lSPJCkMC0lMy6FaCD51jm6ayE=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12/go.mod h1:fuR57fAgMk7ot3WcNQfb6rSEn+SUffl7ri+aa8uKysI=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.15 h1:ijB7hr56MngOiELJe0C5aQRaBQ11LveNgWFyG02AUto=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.15/go.mod h1:0QEmQSSWMVfiAk93l1/ayR9DQ9+jwni7gHS2NARZXB0=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 h1:TNyt/+X43KJ9IJJMjKfa3bNTiZbUP7DeCxfbTROESwY=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16/go.mod h1:2DwJF39FlNAUiX5pAc0UNeiz16lK2t7IaFcm0LFHEgc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY0L1/KftReOGxI/4NtVSTh9O/I=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16 h1:mimdLQkIX1zr8GIPY1ZtALdBQGxcASiBd2MOp8m/dMc=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16/go.mod h1:YHk6owoSwrIsok+cAH9PENCOGoH5PU2EllX4vLtSrsY=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 h1:KypMCbLPPHEmf9DgMGw51jMj77VfGPAN2Kv4cfhlfgI=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4/go.mod h1:Vz1JQXliGcQktFTN/LN6uGppAIRoLBR2bMvIMP0gOjc=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18 h1:GckUnpm4EJOAio1c8o25a+b3lVfwVzC9gnSBqiiNmZM=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18/go.mod h1:Br6+bxfG33Dk3ynmkhsW2Z/t9D4+lRqdLDNCKi85w0U=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 h1:tJ5RnkHCiSH0jyd6gROjlJtNwov0eGYNz8s8nFcR0jQ=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18/go.mod h1:++NHzT+nAF7ZPrHPsA+ENvsXkOO8wEu+C6RXltAG4/c=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 h1:jg16PhLPUiHIj8zYIW6bqzeQSuHVEiWnGA0Brz5Xv2I=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16/go.mod h1:Uyk1zE1VVdsHSU7096h/rwnXDzOzYQVl+FNPhPw7ShY=
github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1 h1:mx2ucgtv+MWzJesJY9Ig/8AFHgoE5FwLXwUVgW/FGdI=
github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1/go.mod h1:BSPI0EfnYUuNHPS0uqIo5VrRwzie+Fp+YhQOUs16sKI=
github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 h1:zCsFCKvbj25i7p1u94imVoO447I/sFv8qq+lGJhRN0c=
github.com/aws/aws-sdk-go-v2/service/sso v1.22.5/go.mod h1:ZeDX1SnKsVlejeuz41GiajjZpRSWR7/42q/EyA/QEiM=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 h1:SKvPgvdvmiTWoi0GAJ7AsJfOz3ngVkD/ERbs5pUnHNI=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5/go.mod h1:20sz31hv/WsPa3HhU3hfrIet2kxM4Pe0r20eBZ20Tac=
github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 h1:OMsEmCyz2i89XwRwPouAJvhj81wINh+4UK+k/0Yo/q8=
github.com/aws/aws-sdk-go-v2/service/sts v1.30.5/go.mod h1:vmSqFK+BVIwVpDAGZB3CoCXHzurt4qBE8lf+I/kRTh0=
github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4=
github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down Expand Up @@ -288,10 +324,6 @@ github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428 h1:Mo9W14pwbO9VfRe+y
github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428/go.mod h1:uhpZMVGznybq1itEKXj6RYw9I71qK4kH+OGMjRC4KEo=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
Expand Down
2 changes: 2 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ var SwitchTrafficOptions = struct {
Direction workflow.TrafficSwitchDirection
InitializeTargetSequences bool
Shards []string
Force bool
}{}

func AddCommonSwitchTrafficFlags(cmd *cobra.Command, initializeTargetSequences bool) {
Expand All @@ -259,6 +260,7 @@ func AddCommonSwitchTrafficFlags(cmd *cobra.Command, initializeTargetSequences b
cmd.Flags().DurationVar(&SwitchTrafficOptions.MaxReplicationLagAllowed, "max-replication-lag-allowed", MaxReplicationLagDefault, "Allow traffic to be switched only if VReplication lag is below this.")
cmd.Flags().BoolVar(&SwitchTrafficOptions.EnableReverseReplication, "enable-reverse-replication", true, "Setup replication going back to the original source keyspace to support rolling back the traffic cutover.")
cmd.Flags().BoolVar(&SwitchTrafficOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred.")
cmd.Flags().BoolVar(&SwitchTrafficOptions.Force, "force", false, "Force the traffic switch even if some potentially non-critical actions cannot be performed; for example the tablet refresh fails on some tablets in the keyspace. WARNING: this should be used with extreme caution and only in emergency situations!")
if initializeTargetSequences {
cmd.Flags().BoolVar(&SwitchTrafficOptions.InitializeTargetSequences, "initialize-target-sequences", false, "When moving tables from an unsharded keyspace to a sharded keyspace, initialize any sequences that are being used on the target when switching writes.")
}
Expand Down
10 changes: 8 additions & 2 deletions go/test/endtoend/onlineddl/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func CreateTempScript(t *testing.T, content string) (fileName string) {
func MysqlClientExecFile(t *testing.T, mysqlParams *mysql.ConnParams, testDataPath, testName string, fileName string) (output string) {
t.Helper()

errorFile, err := os.CreateTemp("", "onlineddl-test-")
require.NoError(t, err)
defer os.Remove(errorFile.Name())

bashPath, err := exec.LookPath("bash")
require.NoError(t, err)
mysqlPath, err := exec.LookPath("mysql")
Expand All @@ -55,13 +59,15 @@ func MysqlClientExecFile(t *testing.T, mysqlParams *mysql.ConnParams, testDataPa
if !filepath.IsAbs(fileName) {
filePath, _ = filepath.Abs(path.Join(testDataPath, testName, fileName))
}
bashCommand := fmt.Sprintf(`%s -u%s --socket=%s --database=%s -s -s < %s 2> /tmp/error.log`, mysqlPath, mysqlParams.Uname, mysqlParams.UnixSocket, mysqlParams.DbName, filePath)
bashCommand := fmt.Sprintf(`%s -u%s --socket=%s --database=%s -s -s < %s 2> %s`, mysqlPath, mysqlParams.Uname, mysqlParams.UnixSocket, mysqlParams.DbName, filePath, errorFile.Name())
cmd, err := exec.Command(
bashPath,
"-c",
bashCommand,
).Output()

require.NoError(t, err)
errorContent, readerr := os.ReadFile(errorFile.Name())
require.NoError(t, readerr)
require.NoError(t, err, "error details: %s", errorContent)
return string(cmd)
}
10 changes: 9 additions & 1 deletion go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,10 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
Match: "/customer.*",
}},
}
flags := &vtgatepb.VStreamFlags{}
flags := &vtgatepb.VStreamFlags{
IncludeReshardJournalEvents: true,
}
journalEvents := 0

// Stream events but stop once we have a VGTID with positions for the old/original shards.
var newVGTID *binlogdatapb.VGtid
Expand Down Expand Up @@ -678,6 +681,9 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
case binlogdatapb.VEventType_JOURNAL:
require.True(t, ev.Journal.MigrationType == binlogdatapb.MigrationType_SHARDS)
journalEvents++
}
}
default:
Expand All @@ -694,6 +700,8 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
// We should have a mix of events across the old and new shards.
require.Greater(t, oldShardRowEvents, 0)
require.Greater(t, newShardRowEvents, 0)
// We should have seen a reshard journal event.
require.Greater(t, journalEvents, 0)

// The number of row events streamed by the VStream API should match the number of rows inserted.
customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer")
Expand Down
84 changes: 54 additions & 30 deletions go/vt/mysqlctl/s3backupstorage/retryer.go
Original file line number Diff line number Diff line change
@@ -1,51 +1,75 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package s3backupstorage

import (
"context"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go-v2/aws"
)

// ClosedConnectionRetryer implements the aws request.Retryer interface
// ClosedConnectionRetryer implements the aws.Retryer interface
// and is used to retry closed connection errors during MultipartUpload
// operations.
// operations. It is a simplified version of the RetryableConnectionError
// implementation, which always retry on any type of connection error.
type ClosedConnectionRetryer struct {
awsRetryer request.Retryer
awsRetryer aws.Retryer
}

// RetryRules is part of the Retryer interface. It defers to the underlying
// aws Retryer to compute backoff rules.
func (retryer *ClosedConnectionRetryer) RetryRules(r *request.Request) time.Duration {
return retryer.awsRetryer.RetryRules(r)
}

// ShouldRetry is part of the Retryer interface. It retries on errors that occur
// due to a closed network connection, and then falls back to the underlying aws
// Retryer for checking additional retry conditions.
func (retryer *ClosedConnectionRetryer) ShouldRetry(r *request.Request) bool {
if retryer.MaxRetries() == 0 {
// IsErrorRetryable returns true if the error should be retried. We first try
// to see if the error is due to the use of a closed connection, if it is,
// we retry, and if not, we default to what the aws.Retryer would do.
func (retryer *ClosedConnectionRetryer) IsErrorRetryable(err error) bool {
if retryer.MaxAttempts() == 0 {
return false
}

if r.Retryable != nil {
return *r.Retryable
}

if r.Error != nil {
if awsErr, ok := r.Error.(awserr.Error); ok {
if strings.Contains(awsErr.Error(), "use of closed network connection") {
return true
}
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
return true
}
}

return retryer.awsRetryer.ShouldRetry(r)
return retryer.awsRetryer.IsErrorRetryable(err)
}

// MaxAttempts returns the maximum number of attempts that can be made for
// an attempt before failing. A value of 0 implies that the attempt should
// be retried until it succeeds if the errors are retryable.
func (retryer *ClosedConnectionRetryer) MaxAttempts() int {
return retryer.awsRetryer.MaxAttempts()
}

// RetryDelay returns the delay that should be used before retrying the
// attempt. Will return error if the delay could not be determined.
func (retryer *ClosedConnectionRetryer) RetryDelay(attempt int, opErr error) (time.Duration, error) {
return retryer.awsRetryer.RetryDelay(attempt, opErr)
}

// GetRetryToken attempts to deduct the retry cost from the retry token pool.
// Returning the token release function, or error.
func (retryer *ClosedConnectionRetryer) GetRetryToken(ctx context.Context, opErr error) (releaseToken func(error) error, err error) {
return retryer.awsRetryer.GetRetryToken(ctx, opErr)
}

// MaxRetries is part of the Retryer interface. It defers to the
// underlying aws Retryer for the max number of retries.
func (retryer *ClosedConnectionRetryer) MaxRetries() int {
return retryer.awsRetryer.MaxRetries()
// GetInitialToken returns the initial attempt token that can increment the
// retry token pool if the attempt is successful.
func (retryer *ClosedConnectionRetryer) GetInitialToken() (releaseToken func(error) error) {
return retryer.awsRetryer.GetInitialToken()
}
Loading

0 comments on commit d360548

Please sign in to comment.