From 5b6ba751fde748eae4e11f85996e6443eff764c6 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Thu, 12 Sep 2024 15:45:13 -0600 Subject: [PATCH 1/2] Migrate from AWS SDK v1 to v2 Signed-off-by: Florent Poinsard --- go.mod | 21 ++- go.sum | 44 ++++- go/vt/mysqlctl/s3backupstorage/retryer.go | 84 +++++---- .../mysqlctl/s3backupstorage/retryer_test.go | 86 ++++------ go/vt/mysqlctl/s3backupstorage/s3.go | 161 ++++++++++-------- go/vt/mysqlctl/s3backupstorage/s3_test.go | 96 +++++++---- 6 files changed, 298 insertions(+), 194 deletions(-) diff --git a/go.mod b/go.mod index c241fa9175f..09ddfba8fb5 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,6 @@ require ( github.com/HdrHistogram/hdrhistogram-go v0.9.0 // indirect github.com/aquarapid/vaultlib v0.5.1 github.com/armon/go-metrics v0.4.1 // indirect - github.com/aws/aws-sdk-go v1.55.5 github.com/buger/jsonparser v1.1.1 github.com/cespare/xxhash/v2 v2.3.0 github.com/corpix/uarand v0.1.1 // indirect @@ -32,7 +31,6 @@ require ( github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/serf v0.10.1 // indirect github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428 - github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.17.9 github.com/klauspost/pgzip v1.2.6 github.com/krishicks/yaml-patch v0.0.10 @@ -92,6 +90,11 @@ require ( require ( github.com/DataDog/datadog-go/v5 v5.5.0 github.com/Shopify/toxiproxy/v2 v2.9.0 + github.com/aws/aws-sdk-go-v2 v1.30.4 + github.com/aws/aws-sdk-go-v2/config v1.27.31 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.15 + github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1 + github.com/aws/smithy-go v1.20.4 github.com/bndr/gotabulate v1.1.2 github.com/gammazero/deque v0.2.1 github.com/google/safehtml v0.1.0 @@ -124,6 +127,20 @@ require ( github.com/DataDog/go-tuf v1.1.0-0.5.2 // indirect github.com/DataDog/sketches-go v1.4.6 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.30 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect diff --git a/go.sum b/go.sum index 9471151982f..f554bd465ee 100644 --- a/go.sum +++ b/go.sum @@ -73,8 +73,44 @@ github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJ github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= -github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU= -github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/aws/aws-sdk-go-v2 v1.30.4 h1:frhcagrVNrzmT95RJImMHgabt99vkXGslubDaDagTk8= +github.com/aws/aws-sdk-go-v2 v1.30.4/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 h1:70PVAiL15/aBMh5LThwgXdSQorVr91L127ttckI9QQU= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4/go.mod h1:/MQxMqci8tlqDH+pjmoLu1i0tbWCUP1hhyMRuFxpQCw= +github.com/aws/aws-sdk-go-v2/config v1.27.31 h1:kxBoRsjhT3pq0cKthgj6RU6bXTm/2SgdoUMyrVw0rAI= +github.com/aws/aws-sdk-go-v2/config v1.27.31/go.mod h1:z04nZdSWFPaDwK3DdJOG2r+scLQzMYuJeW0CujEm9FM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.30 h1:aau/oYFtibVovr2rDt8FHlU17BTicFEMAi29V1U+L5Q= +github.com/aws/aws-sdk-go-v2/credentials v1.17.30/go.mod h1:BPJ/yXV92ZVq6G8uYvbU0gSl8q94UB63nMT5ctNO38g= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 h1:yjwoSyDZF8Jth+mUk5lSPJCkMC0lMy6FaCD51jm6ayE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12/go.mod h1:fuR57fAgMk7ot3WcNQfb6rSEn+SUffl7ri+aa8uKysI= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.15 h1:ijB7hr56MngOiELJe0C5aQRaBQ11LveNgWFyG02AUto= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.15/go.mod h1:0QEmQSSWMVfiAk93l1/ayR9DQ9+jwni7gHS2NARZXB0= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 h1:TNyt/+X43KJ9IJJMjKfa3bNTiZbUP7DeCxfbTROESwY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16/go.mod h1:2DwJF39FlNAUiX5pAc0UNeiz16lK2t7IaFcm0LFHEgc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY0L1/KftReOGxI/4NtVSTh9O/I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16 h1:mimdLQkIX1zr8GIPY1ZtALdBQGxcASiBd2MOp8m/dMc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16/go.mod h1:YHk6owoSwrIsok+cAH9PENCOGoH5PU2EllX4vLtSrsY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 h1:KypMCbLPPHEmf9DgMGw51jMj77VfGPAN2Kv4cfhlfgI= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4/go.mod h1:Vz1JQXliGcQktFTN/LN6uGppAIRoLBR2bMvIMP0gOjc= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18 h1:GckUnpm4EJOAio1c8o25a+b3lVfwVzC9gnSBqiiNmZM= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18/go.mod h1:Br6+bxfG33Dk3ynmkhsW2Z/t9D4+lRqdLDNCKi85w0U= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 h1:tJ5RnkHCiSH0jyd6gROjlJtNwov0eGYNz8s8nFcR0jQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18/go.mod h1:++NHzT+nAF7ZPrHPsA+ENvsXkOO8wEu+C6RXltAG4/c= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 h1:jg16PhLPUiHIj8zYIW6bqzeQSuHVEiWnGA0Brz5Xv2I= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16/go.mod h1:Uyk1zE1VVdsHSU7096h/rwnXDzOzYQVl+FNPhPw7ShY= +github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1 h1:mx2ucgtv+MWzJesJY9Ig/8AFHgoE5FwLXwUVgW/FGdI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1/go.mod h1:BSPI0EfnYUuNHPS0uqIo5VrRwzie+Fp+YhQOUs16sKI= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 h1:zCsFCKvbj25i7p1u94imVoO447I/sFv8qq+lGJhRN0c= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.5/go.mod h1:ZeDX1SnKsVlejeuz41GiajjZpRSWR7/42q/EyA/QEiM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 h1:SKvPgvdvmiTWoi0GAJ7AsJfOz3ngVkD/ERbs5pUnHNI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5/go.mod h1:20sz31hv/WsPa3HhU3hfrIet2kxM4Pe0r20eBZ20Tac= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 h1:OMsEmCyz2i89XwRwPouAJvhj81wINh+4UK+k/0Yo/q8= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.5/go.mod h1:vmSqFK+BVIwVpDAGZB3CoCXHzurt4qBE8lf+I/kRTh0= +github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= +github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -290,10 +326,6 @@ github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428 h1:Mo9W14pwbO9VfRe+y github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428/go.mod h1:uhpZMVGznybq1itEKXj6RYw9I71qK4kH+OGMjRC4KEo= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= -github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= -github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= -github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= -github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= diff --git a/go/vt/mysqlctl/s3backupstorage/retryer.go b/go/vt/mysqlctl/s3backupstorage/retryer.go index 052b1ef26d1..9e4e87da702 100644 --- a/go/vt/mysqlctl/s3backupstorage/retryer.go +++ b/go/vt/mysqlctl/s3backupstorage/retryer.go @@ -1,51 +1,75 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package s3backupstorage import ( + "context" "strings" "time" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go-v2/aws" ) -// ClosedConnectionRetryer implements the aws request.Retryer interface +// ClosedConnectionRetryer implements the aws.Retryer interface // and is used to retry closed connection errors during MultipartUpload -// operations. +// operations. It is a simplified version of the RetryableConnectionError +// implementation, which always retry on any type of connection error. type ClosedConnectionRetryer struct { - awsRetryer request.Retryer + awsRetryer aws.Retryer } -// RetryRules is part of the Retryer interface. It defers to the underlying -// aws Retryer to compute backoff rules. -func (retryer *ClosedConnectionRetryer) RetryRules(r *request.Request) time.Duration { - return retryer.awsRetryer.RetryRules(r) -} - -// ShouldRetry is part of the Retryer interface. It retries on errors that occur -// due to a closed network connection, and then falls back to the underlying aws -// Retryer for checking additional retry conditions. -func (retryer *ClosedConnectionRetryer) ShouldRetry(r *request.Request) bool { - if retryer.MaxRetries() == 0 { +// IsErrorRetryable returns true if the error should be retried. We first try +// to see if the error is due to the use of a closed connection, if it is, +// we retry, and if not, we default to what the aws.Retryer would do. +func (retryer *ClosedConnectionRetryer) IsErrorRetryable(err error) bool { + if retryer.MaxAttempts() == 0 { return false } - if r.Retryable != nil { - return *r.Retryable - } - - if r.Error != nil { - if awsErr, ok := r.Error.(awserr.Error); ok { - if strings.Contains(awsErr.Error(), "use of closed network connection") { - return true - } + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { + return true } } - return retryer.awsRetryer.ShouldRetry(r) + return retryer.awsRetryer.IsErrorRetryable(err) +} + +// MaxAttempts returns the maximum number of attempts that can be made for +// an attempt before failing. A value of 0 implies that the attempt should +// be retried until it succeeds if the errors are retryable. +func (retryer *ClosedConnectionRetryer) MaxAttempts() int { + return retryer.awsRetryer.MaxAttempts() +} + +// RetryDelay returns the delay that should be used before retrying the +// attempt. Will return error if the delay could not be determined. +func (retryer *ClosedConnectionRetryer) RetryDelay(attempt int, opErr error) (time.Duration, error) { + return retryer.awsRetryer.RetryDelay(attempt, opErr) +} + +// GetRetryToken attempts to deduct the retry cost from the retry token pool. +// Returning the token release function, or error. +func (retryer *ClosedConnectionRetryer) GetRetryToken(ctx context.Context, opErr error) (releaseToken func(error) error, err error) { + return retryer.awsRetryer.GetRetryToken(ctx, opErr) } -// MaxRetries is part of the Retryer interface. It defers to the -// underlying aws Retryer for the max number of retries. -func (retryer *ClosedConnectionRetryer) MaxRetries() int { - return retryer.awsRetryer.MaxRetries() +// GetInitialToken returns the initial attempt token that can increment the +// retry token pool if the attempt is successful. +func (retryer *ClosedConnectionRetryer) GetInitialToken() (releaseToken func(error) error) { + return retryer.awsRetryer.GetInitialToken() } diff --git a/go/vt/mysqlctl/s3backupstorage/retryer_test.go b/go/vt/mysqlctl/s3backupstorage/retryer_test.go index 9fe3004e6ff..666f565e7e4 100644 --- a/go/vt/mysqlctl/s3backupstorage/retryer_test.go +++ b/go/vt/mysqlctl/s3backupstorage/retryer_test.go @@ -1,79 +1,69 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package s3backupstorage import ( + "context" "errors" "testing" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/request" "github.com/stretchr/testify/assert" ) type testRetryer struct{ retry bool } -func (r *testRetryer) MaxRetries() int { return 5 } -func (r *testRetryer) RetryRules(req *request.Request) time.Duration { return time.Second } -func (r *testRetryer) ShouldRetry(req *request.Request) bool { return r.retry } +func (r *testRetryer) GetInitialToken() (releaseToken func(error) error) { panic("implement me") } +func (r *testRetryer) GetRetryToken(ctx context.Context, opErr error) (releaseToken func(error) error, err error) { + panic("implement me") +} +func (r *testRetryer) IsErrorRetryable(err error) bool { return r.retry } +func (r *testRetryer) MaxAttempts() int { return 5 } +func (r *testRetryer) RetryDelay(attempt int, opErr error) (time.Duration, error) { + return time.Second, nil +} func TestShouldRetry(t *testing.T) { tests := []struct { name string - r *request.Request + err error fallbackPolicy bool expected bool }{ - { - name: "non retryable request", - r: &request.Request{ - Retryable: aws.Bool(false), - }, + name: "no error", fallbackPolicy: false, expected: false, }, { - name: "retryable request", - r: &request.Request{ - Retryable: aws.Bool(true), - }, - fallbackPolicy: false, - expected: true, - }, - { - name: "non aws error", - r: &request.Request{ - Retryable: nil, - Error: errors.New("some error"), - }, + name: "non aws error", + err: errors.New("some error"), fallbackPolicy: false, expected: false, }, { - name: "closed connection error", - r: &request.Request{ - Retryable: nil, - Error: awserr.New("5xx", "use of closed network connection", nil), - }, - fallbackPolicy: false, - expected: true, - }, - { - name: "closed connection error (non nil origError)", - r: &request.Request{ - Retryable: nil, - Error: awserr.New("5xx", "use of closed network connection", errors.New("some error")), - }, + name: "closed connection error", + err: errors.New("use of closed network connection"), fallbackPolicy: false, expected: true, }, { - name: "other aws error hits fallback policy", - r: &request.Request{ - Retryable: nil, - Error: awserr.New("code", "not a closed network connectionn", errors.New("some error")), - }, + name: "other aws error hits fallback policy", + err: errors.New("not a closed network connection"), fallbackPolicy: true, expected: true, }, @@ -82,13 +72,7 @@ func TestShouldRetry(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { retryer := &ClosedConnectionRetryer{&testRetryer{test.fallbackPolicy}} - msg := "" - if test.r.Error != nil { - if awsErr, ok := test.r.Error.(awserr.Error); ok { - msg = awsErr.Error() - } - } - assert.Equal(t, test.expected, retryer.ShouldRetry(test.r), msg) + assert.Equal(t, test.expected, retryer.IsErrorRetryable(test.err)) }) } } diff --git a/go/vt/mysqlctl/s3backupstorage/s3.go b/go/vt/mysqlctl/s3backupstorage/s3.go index 9686b4a3978..6947f02fa6d 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3.go +++ b/go/vt/mysqlctl/s3backupstorage/s3.go @@ -39,13 +39,12 @@ import ( "sync" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/client" - "github.com/aws/aws-sdk-go/aws/request" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3iface" - "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/smithy-go/middleware" "github.com/spf13/pflag" "vitess.io/vitess/go/vt/concurrency" @@ -105,15 +104,24 @@ func init() { servenv.OnParseFor("vttablet", registerFlags) } -type logNameToLogLevel map[string]aws.LogLevelType +type logNameToLogLevel map[string]aws.ClientLogMode var logNameMap logNameToLogLevel const sseCustomerPrefix = "sse_c:" +type iClient interface { + manager.UploadAPIClient + manager.DownloadAPIClient +} + +type clientWrapper struct { + *s3.Client +} + // S3BackupHandle implements the backupstorage.BackupHandle interface. type S3BackupHandle struct { - client s3iface.S3API + client iClient bs *S3BackupStorage dir string name string @@ -154,9 +162,9 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize } // Calculate s3 upload part size using the source filesize - partSizeBytes := s3manager.DefaultUploadPartSize + partSizeBytes := manager.DefaultUploadPartSize if filesize > 0 { - minimumPartSize := float64(filesize) / float64(s3manager.MaxUploadParts) + minimumPartSize := float64(filesize) / float64(manager.MaxUploadParts) // Round up to ensure large enough partsize calculatedPartSizeBytes := int64(math.Ceil(minimumPartSize)) if calculatedPartSizeBytes > partSizeBytes { @@ -169,25 +177,33 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize go func() { defer bh.waitGroup.Done() - uploader := s3manager.NewUploaderWithClient(bh.client, func(u *s3manager.Uploader) { + uploader := manager.NewUploader(bh.client, func(u *manager.Uploader) { u.PartSize = partSizeBytes }) object := objName(bh.dir, bh.name, filename) + log.Infof("objName: %v", object) sendStats := bh.bs.params.Stats.Scope(stats.Operation("AWS:Request:Send")) // Using UploadWithContext breaks uploading to Minio and Ceph https://github.com/vitessio/vitess/issues/14188 - _, err := uploader.Upload(&s3manager.UploadInput{ + _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{ Bucket: &bucket, - Key: object, + Key: &object, Body: reader, ServerSideEncryption: bh.bs.s3SSE.awsAlg, SSECustomerAlgorithm: bh.bs.s3SSE.customerAlg, SSECustomerKey: bh.bs.s3SSE.customerKey, SSECustomerKeyMD5: bh.bs.s3SSE.customerMd5, - }, s3manager.WithUploaderRequestOptions(func(r *request.Request) { - r.Handlers.CompleteAttempt.PushBack(func(r *request.Request) { - sendStats.TimedIncrement(time.Since(r.AttemptTime)) + }, func(u *manager.Uploader) { + u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) { + o.APIOptions = append(o.APIOptions, func(stack *middleware.Stack) error { + return stack.Finalize.Add(middleware.FinalizeMiddlewareFunc("CompleteAttemptMiddleware", func(ctx context.Context, input middleware.FinalizeInput, next middleware.FinalizeHandler) (middleware.FinalizeOutput, middleware.Metadata, error) { + start := time.Now() + output, metadata, err := next.HandleFinalize(ctx, input) + sendStats.TimedIncrement(time.Since(start)) + return output, metadata, err + }), middleware.Before) + }) }) - })) + }) if err != nil { reader.CloseWithError(err) bh.RecordError(err) @@ -221,15 +237,20 @@ func (bh *S3BackupHandle) ReadFile(ctx context.Context, filename string) (io.Rea } object := objName(bh.dir, bh.name, filename) sendStats := bh.bs.params.Stats.Scope(stats.Operation("AWS:Request:Send")) - out, err := bh.client.GetObjectWithContext(ctx, &s3.GetObjectInput{ + out, err := bh.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: &bucket, - Key: object, + Key: &object, SSECustomerAlgorithm: bh.bs.s3SSE.customerAlg, SSECustomerKey: bh.bs.s3SSE.customerKey, SSECustomerKeyMD5: bh.bs.s3SSE.customerMd5, - }, func(r *request.Request) { - r.Handlers.CompleteAttempt.PushBack(func(r *request.Request) { - sendStats.TimedIncrement(time.Since(r.AttemptTime)) + }, func(o *s3.Options) { + o.APIOptions = append(o.APIOptions, func(stack *middleware.Stack) error { + return stack.Finalize.Add(middleware.FinalizeMiddlewareFunc("CompleteAttemptMiddleware", func(ctx context.Context, input middleware.FinalizeInput, next middleware.FinalizeHandler) (middleware.FinalizeOutput, middleware.Metadata, error) { + start := time.Now() + output, metadata, err := next.HandleFinalize(ctx, input) + sendStats.TimedIncrement(time.Since(start)) + return output, metadata, err + }), middleware.Before) }) }) if err != nil { @@ -241,7 +262,7 @@ func (bh *S3BackupHandle) ReadFile(ctx context.Context, filename string) (io.Rea var _ backupstorage.BackupHandle = (*S3BackupHandle)(nil) type S3ServerSideEncryption struct { - awsAlg *string + awsAlg types.ServerSideEncryption customerAlg *string customerKey *string customerMd5 *string @@ -268,13 +289,13 @@ func (s3ServerSideEncryption *S3ServerSideEncryption) init() error { s3ServerSideEncryption.customerKey = aws.String(string(decodedKey)) s3ServerSideEncryption.customerMd5 = aws.String(base64.StdEncoding.EncodeToString(md5Hash[:])) } else if sse != "" { - s3ServerSideEncryption.awsAlg = &sse + s3ServerSideEncryption.awsAlg = types.ServerSideEncryption(sse) } return nil } func (s3ServerSideEncryption *S3ServerSideEncryption) reset() { - s3ServerSideEncryption.awsAlg = nil + s3ServerSideEncryption.awsAlg = "" s3ServerSideEncryption.customerAlg = nil s3ServerSideEncryption.customerKey = nil s3ServerSideEncryption.customerMd5 = nil @@ -282,7 +303,7 @@ func (s3ServerSideEncryption *S3ServerSideEncryption) reset() { // S3BackupStorage implements the backupstorage.BackupStorage interface. type S3BackupStorage struct { - _client *s3.S3 + _client *s3.Client mu sync.Mutex s3SSE S3ServerSideEncryption params backupstorage.Params @@ -307,28 +328,28 @@ func (bs *S3BackupStorage) ListBackups(ctx context.Context, dir string) ([]backu return nil, err } - var searchPrefix *string + var searchPrefix string if dir == "/" { searchPrefix = objName("") } else { searchPrefix = objName(dir, "") } - log.Infof("objName: %v", *searchPrefix) + log.Infof("objName: %s", searchPrefix) query := &s3.ListObjectsV2Input{ Bucket: &bucket, Delimiter: &delimiter, - Prefix: searchPrefix, + Prefix: &searchPrefix, } var subdirs []string for { - objs, err := c.ListObjectsV2(query) + objs, err := c.ListObjectsV2(ctx, query) if err != nil { return nil, err } for _, prefix := range objs.CommonPrefixes { - subdir := strings.TrimPrefix(*prefix.Prefix, *searchPrefix) + subdir := strings.TrimPrefix(*prefix.Prefix, searchPrefix) subdir = strings.TrimSuffix(subdir, delimiter) subdirs = append(subdirs, subdir) } @@ -345,7 +366,7 @@ func (bs *S3BackupStorage) ListBackups(ctx context.Context, dir string) ([]backu result := make([]backupstorage.BackupHandle, 0, len(subdirs)) for _, subdir := range subdirs { result = append(result, &S3BackupHandle{ - client: c, + client: &clientWrapper{Client: c}, bs: bs, dir: dir, name: subdir, @@ -364,7 +385,7 @@ func (bs *S3BackupStorage) StartBackup(ctx context.Context, dir, name string) (b } return &S3BackupHandle{ - client: c, + client: &clientWrapper{Client: c}, bs: bs, dir: dir, name: name, @@ -381,28 +402,29 @@ func (bs *S3BackupStorage) RemoveBackup(ctx context.Context, dir, name string) e return err } + path := objName(dir, name) query := &s3.ListObjectsV2Input{ Bucket: &bucket, - Prefix: objName(dir, name), + Prefix: &path, } for { - objs, err := c.ListObjectsV2(query) + objs, err := c.ListObjectsV2(ctx, query) if err != nil { return err } - objIds := make([]*s3.ObjectIdentifier, 0, len(objs.Contents)) + objIds := make([]types.ObjectIdentifier, 0, len(objs.Contents)) for _, obj := range objs.Contents { - objIds = append(objIds, &s3.ObjectIdentifier{ + objIds = append(objIds, types.ObjectIdentifier{ Key: obj.Key, }) } quiet := true // return less in the Delete response - out, err := c.DeleteObjects(&s3.DeleteObjectsInput{ + out, err := c.DeleteObjects(ctx, &s3.DeleteObjectsInput{ Bucket: &bucket, - Delete: &s3.Delete{ + Delete: &types.Delete{ Objects: objIds, Quiet: &quiet, }, @@ -413,7 +435,7 @@ func (bs *S3BackupStorage) RemoveBackup(ctx context.Context, dir, name string) e } for _, objError := range out.Errors { - return errors.New(objError.String()) + return errors.New(*objError.Message) } if objs.NextContinuationToken == nil { @@ -442,16 +464,15 @@ func (bs *S3BackupStorage) WithParams(params backupstorage.Params) backupstorage var _ backupstorage.BackupStorage = (*S3BackupStorage)(nil) // getLogLevel converts the string loglevel to an aws.LogLevelType -func getLogLevel() *aws.LogLevelType { - l := new(aws.LogLevelType) - *l = aws.LogOff // default setting +func getLogLevel() aws.ClientLogMode { + var l aws.ClientLogMode if level, found := logNameMap[requiredLogLevel]; found { - *l = level // adjust as required + l = level // adjust as required } return l } -func (bs *S3BackupStorage) client() (*s3.S3, error) { +func (bs *S3BackupStorage) client() (*s3.Client, error) { bs.mu.Lock() defer bs.mu.Unlock() if bs._client == nil { @@ -459,34 +480,28 @@ func (bs *S3BackupStorage) client() (*s3.S3, error) { httpClient := &http.Client{Transport: bs.transport} - session, err := session.NewSession() + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(region), + config.WithClientLogMode(logLevel), + config.WithHTTPClient(httpClient), + ) if err != nil { return nil, err } - awsConfig := aws.Config{ - HTTPClient: httpClient, - LogLevel: logLevel, - Endpoint: aws.String(endpoint), - Region: aws.String(region), - S3ForcePathStyle: aws.Bool(forcePath), - } - - if retryCount >= 0 { - awsConfig = *request.WithRetryer(&awsConfig, &ClosedConnectionRetryer{ - awsRetryer: &client.DefaultRetryer{ - NumMaxRetries: retryCount, - }, - }) - } - - bs._client = s3.New(session, &awsConfig) + bs._client = s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = forcePath + if retryCount >= 0 { + o.RetryMaxAttempts = retryCount + o.Retryer = &ClosedConnectionRetryer{} + } + }) if len(bucket) == 0 { return nil, fmt.Errorf("--s3_backup_storage_bucket required") } - if _, err := bs._client.HeadBucket(&s3.HeadBucketInput{Bucket: &bucket}); err != nil { + if _, err := bs._client.HeadBucket(context.Background(), &s3.HeadBucketInput{Bucket: &bucket}); err != nil { return nil, err } @@ -497,24 +512,24 @@ func (bs *S3BackupStorage) client() (*s3.S3, error) { return bs._client, nil } -func objName(parts ...string) *string { +func objName(parts ...string) string { res := "" if root != "" { res += root + delimiter } res += strings.Join(parts, delimiter) - return &res + return res } func init() { backupstorage.BackupStorageMap["s3"] = newS3BackupStorage() logNameMap = logNameToLogLevel{ - "LogOff": aws.LogOff, - "LogDebug": aws.LogDebug, - "LogDebugWithSigning": aws.LogDebugWithSigning, - "LogDebugWithHTTPBody": aws.LogDebugWithHTTPBody, - "LogDebugWithRequestRetries": aws.LogDebugWithRequestRetries, - "LogDebugWithRequestErrors": aws.LogDebugWithRequestErrors, + "LogOff": 0, + "LogDebug": aws.LogRequest, + "LogDebugWithSigning": aws.LogSigning, + "LogDebugWithHTTPBody": aws.LogRequestWithBody, + "LogDebugWithRequestRetries": aws.LogRetries, + "LogDebugWithRequestErrors": aws.LogRequest | aws.LogRetries, } } diff --git a/go/vt/mysqlctl/s3backupstorage/s3_test.go b/go/vt/mysqlctl/s3backupstorage/s3_test.go index 6f4207a645f..1acfddcce1e 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3_test.go +++ b/go/vt/mysqlctl/s3backupstorage/s3_test.go @@ -1,22 +1,22 @@ package s3backupstorage import ( + "context" "crypto/md5" "crypto/rand" "encoding/base64" "errors" "fmt" "net/http" - "net/url" "os" "testing" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/client" - "github.com/aws/aws-sdk-go/aws/request" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/smithy-go/middleware" + smithyhttp "github.com/aws/smithy-go/transport/http" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -26,29 +26,46 @@ import ( ) type s3FakeClient struct { - s3iface.S3API + *s3.Client err error delay time.Duration } -func (sfc *s3FakeClient) PutObjectRequest(in *s3.PutObjectInput) (*request.Request, *s3.PutObjectOutput) { - u, _ := url.Parse("http://localhost:1234") - req := request.Request{ - HTTPRequest: &http.Request{ // without this we segfault \_(ツ)_/¯ (see https://github.com/aws/aws-sdk-go/blob/v1.28.8/aws/request/request_context.go#L13) - Header: make(http.Header), - URL: u, - }, - Retryer: client.DefaultRetryer{}, +type fakeClientDo struct { + delay time.Duration +} + +func (fcd fakeClientDo) Do(request *http.Request) (*http.Response, error) { + if fcd.delay > 0 { + time.Sleep(fcd.delay) + } + return nil, nil +} + +func (sfc *s3FakeClient) PutObject(ctx context.Context, in *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + var o s3.Options + for _, fn := range optFns { + fn(&o) } - req.Handlers.Send.PushBack(func(r *request.Request) { - r.Error = sfc.err - if sfc.delay > 0 { - time.Sleep(sfc.delay) - } - }) + stack := middleware.NewStack("PutObject", smithyhttp.NewStackRequest) + for _, apiOption := range o.APIOptions { + _ = apiOption(stack) + } - return &req, &s3.PutObjectOutput{} + handler := middleware.DecorateHandler(smithyhttp.NewClientHandler(&fakeClientDo{delay: sfc.delay}), stack) + _, _, err := handler.Handle(ctx, in) + if err != nil { + return nil, err + } + + if sfc.err != nil { + return nil, sfc.err + } + + return &s3.PutObjectOutput{ + ETag: aws.String("fake-etag"), + }, nil } func TestAddFileError(t *testing.T) { @@ -56,11 +73,16 @@ func TestAddFileError(t *testing.T) { client: &s3FakeClient{err: errors.New("some error")}, bs: &S3BackupStorage{ params: backupstorage.NoParams(), + s3SSE: S3ServerSideEncryption{ + customerAlg: new(string), + customerKey: new(string), + customerMd5: new(string), + }, }, readOnly: false, } - wc, err := bh.AddFile(aws.BackgroundContext(), "somefile", 100000) + wc, err := bh.AddFile(context.Background(), "somefile", 100000) require.NoErrorf(t, err, "AddFile() expected no error, got %s", err) assert.NotNil(t, wc, "AddFile() expected non-nil WriteCloser") @@ -87,12 +109,17 @@ func TestAddFileStats(t *testing.T) { Logger: logutil.NewMemoryLogger(), Stats: fakeStats, }, + s3SSE: S3ServerSideEncryption{ + customerAlg: new(string), + customerKey: new(string), + customerMd5: new(string), + }, }, readOnly: false, } for i := 0; i < 4; i++ { - wc, err := bh.AddFile(aws.BackgroundContext(), fmt.Sprintf("somefile-%d", i), 100000) + wc, err := bh.AddFile(context.Background(), fmt.Sprintf("somefile-%d", i), 100000) require.NoErrorf(t, err, "AddFile() expected no error, got %s", err) assert.NotNil(t, wc, "AddFile() expected non-nil WriteCloser") @@ -131,11 +158,16 @@ func TestAddFileErrorStats(t *testing.T) { Logger: logutil.NewMemoryLogger(), Stats: fakeStats, }, + s3SSE: S3ServerSideEncryption{ + customerAlg: new(string), + customerKey: new(string), + customerMd5: new(string), + }, }, readOnly: false, } - wc, err := bh.AddFile(aws.BackgroundContext(), "somefile", 100000) + wc, err := bh.AddFile(context.Background(), "somefile", 100000) require.NoErrorf(t, err, "AddFile() expected no error, got %s", err) assert.NotNil(t, wc, "AddFile() expected non-nil WriteCloser") @@ -163,7 +195,7 @@ func TestNoSSE(t *testing.T) { err := sseData.init() require.NoErrorf(t, err, "init() expected to succeed") - assert.Nil(t, sseData.awsAlg, "awsAlg expected to be nil") + assert.Empty(t, sseData.awsAlg, "awsAlg expected to be empty") assert.Nil(t, sseData.customerAlg, "customerAlg expected to be nil") assert.Nil(t, sseData.customerKey, "customerKey expected to be nil") assert.Nil(t, sseData.customerMd5, "customerMd5 expected to be nil") @@ -178,7 +210,7 @@ func TestSSEAws(t *testing.T) { err := sseData.init() require.NoErrorf(t, err, "init() expected to succeed") - assert.Equal(t, aws.String("aws:kms"), sseData.awsAlg, "awsAlg expected to be aws:kms") + assert.Equal(t, types.ServerSideEncryption("aws:kms"), sseData.awsAlg, "awsAlg expected to be aws:kms") assert.Nil(t, sseData.customerAlg, "customerAlg expected to be nil") assert.Nil(t, sseData.customerKey, "customerKey expected to be nil") assert.Nil(t, sseData.customerMd5, "customerMd5 expected to be nil") @@ -186,7 +218,7 @@ func TestSSEAws(t *testing.T) { sseData.reset() require.NoErrorf(t, err, "reset() expected to succeed") - assert.Nil(t, sseData.awsAlg, "awsAlg expected to be nil") + assert.Empty(t, sseData.awsAlg, "awsAlg expected to be empty") assert.Nil(t, sseData.customerAlg, "customerAlg expected to be nil") assert.Nil(t, sseData.customerKey, "customerKey expected to be nil") assert.Nil(t, sseData.customerMd5, "customerMd5 expected to be nil") @@ -227,7 +259,7 @@ func TestSSECustomerFileBinaryKey(t *testing.T) { err = sseData.init() require.NoErrorf(t, err, "init() expected to succeed") - assert.Nil(t, sseData.awsAlg, "awsAlg expected to be nil") + assert.Empty(t, sseData.awsAlg, "awsAlg expected to be empty") assert.Equal(t, aws.String("AES256"), sseData.customerAlg, "customerAlg expected to be AES256") assert.Equal(t, aws.String(string(randomKey)), sseData.customerKey, "customerKey expected to be equal to the generated randomKey") md5Hash := md5.Sum(randomKey) @@ -236,7 +268,7 @@ func TestSSECustomerFileBinaryKey(t *testing.T) { sseData.reset() require.NoErrorf(t, err, "reset() expected to succeed") - assert.Nil(t, sseData.awsAlg, "awsAlg expected to be nil") + assert.Empty(t, sseData.awsAlg, "awsAlg expected to be empty") assert.Nil(t, sseData.customerAlg, "customerAlg expected to be nil") assert.Nil(t, sseData.customerKey, "customerKey expected to be nil") assert.Nil(t, sseData.customerMd5, "customerMd5 expected to be nil") @@ -262,7 +294,7 @@ func TestSSECustomerFileBase64Key(t *testing.T) { err = sseData.init() require.NoErrorf(t, err, "init() expected to succeed") - assert.Nil(t, sseData.awsAlg, "awsAlg expected to be nil") + assert.Empty(t, sseData.awsAlg, "awsAlg expected to be empty") assert.Equal(t, aws.String("AES256"), sseData.customerAlg, "customerAlg expected to be AES256") assert.Equal(t, aws.String(string(randomKey)), sseData.customerKey, "customerKey expected to be equal to the generated randomKey") md5Hash := md5.Sum(randomKey) @@ -271,7 +303,7 @@ func TestSSECustomerFileBase64Key(t *testing.T) { sseData.reset() require.NoErrorf(t, err, "reset() expected to succeed") - assert.Nil(t, sseData.awsAlg, "awsAlg expected to be nil") + assert.Empty(t, sseData.awsAlg, "awsAlg expected to be empty") assert.Nil(t, sseData.customerAlg, "customerAlg expected to be nil") assert.Nil(t, sseData.customerKey, "customerKey expected to be nil") assert.Nil(t, sseData.customerMd5, "customerMd5 expected to be nil") From 02598ab50bf15a078904fa4dd44f6bb816ebe01a Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Thu, 12 Sep 2024 17:10:46 -0600 Subject: [PATCH 2/2] Small fix Signed-off-by: Florent Poinsard --- go/vt/mysqlctl/s3backupstorage/s3.go | 1 - go/vt/mysqlctl/xtrabackupengine.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/go/vt/mysqlctl/s3backupstorage/s3.go b/go/vt/mysqlctl/s3backupstorage/s3.go index 6947f02fa6d..1af1362ae30 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3.go +++ b/go/vt/mysqlctl/s3backupstorage/s3.go @@ -181,7 +181,6 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize u.PartSize = partSizeBytes }) object := objName(bh.dir, bh.name, filename) - log.Infof("objName: %v", object) sendStats := bh.bs.params.Stats.Scope(stats.Operation("AWS:Request:Send")) // Using UploadWithContext breaks uploading to Minio and Ceph https://github.com/vitessio/vitess/issues/14188 _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{ diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index e6d02eedc1d..784d718af26 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -277,7 +277,7 @@ func (be *XtrabackupEngine) executeFullBackup(ctx context.Context, params Backup if err != nil { return BackupUnusable, vterrors.Wrapf(err, "cannot JSON encode %v", backupManifestFileName) } - if _, err := mwc.Write([]byte(data)); err != nil { + if _, err := mwc.Write(data); err != nil { return BackupUnusable, vterrors.Wrapf(err, "cannot write %v", backupManifestFileName) }