Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement throttling for WS client #158

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ffapi/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (hs *HandlerFactory) handleOutput(ctx context.Context, res http.ResponseWri
}
if marshalErr != nil {
err := i18n.WrapError(ctx, marshalErr, i18n.MsgResponseMarshalError)
log.L(ctx).Errorf(err.Error())
log.L(ctx).Error(err.Error())
return 500, err
}
return status, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ffapi/openapi3.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (sg *SwaggerGen) addRoute(ctx context.Context, doc *openapi3.T, route *Rout
} else {
routeDescription = i18n.Expand(ctx, route.Description)
if routeDescription == "" && sg.options.PanicOnMissingDescription {
log.Panicf(i18n.NewError(ctx, i18n.MsgRouteDescriptionMissing, route.Name).Error())
log.Panic(i18n.NewError(ctx, i18n.MsgRouteDescriptionMissing, route.Name).Error())
}
}
op := &openapi3.Operation{
Expand Down
19 changes: 16 additions & 3 deletions pkg/ffapi/query_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"database/sql/driver"
"encoding/json"
"fmt"
"math"
"math/big"
"reflect"
"strconv"
Expand Down Expand Up @@ -110,11 +111,11 @@ func (f *stringField) Scan(src interface{}) error {
case int64:
f.s = strconv.FormatInt(tv, 10)
case uint:
f.s = strconv.FormatInt(int64(tv), 10)
f.s = strconv.FormatUint(uint64(tv), 10)
case uint32:
f.s = strconv.FormatInt(int64(tv), 10)
f.s = strconv.FormatUint(uint64(tv), 10)
case uint64:
f.s = strconv.FormatInt(int64(tv), 10)
f.s = strconv.FormatUint(tv, 10)
case *fftypes.UUID:
if tv != nil {
f.s = tv.String()
Expand Down Expand Up @@ -242,10 +243,16 @@ func (f *int64Field) Scan(src interface{}) (err error) {
case int64:
f.i = tv
case uint:
if tv > math.MaxInt64 {
return i18n.NewError(context.Background(), i18n.MsgTypeRestoreFailed, src, f.i)
}
f.i = int64(tv)
case uint32:
f.i = int64(tv)
case uint64:
if tv > math.MaxInt64 {
return i18n.NewError(context.Background(), i18n.MsgTypeRestoreFailed, src, f.i)
}
f.i = int64(tv)
case string:
f.i, err = strconv.ParseInt(src.(string), 10, 64)
Expand Down Expand Up @@ -277,10 +284,16 @@ func (f *bigIntField) Scan(src interface{}) (err error) {
case int64:
f.i = fftypes.NewFFBigInt(tv)
case uint:
if tv > math.MaxInt64 {
return i18n.NewError(context.Background(), i18n.MsgTypeRestoreFailed, src, f.i)
}
f.i = fftypes.NewFFBigInt(int64(tv))
case uint32:
f.i = fftypes.NewFFBigInt(int64(tv))
case uint64:
if tv > math.MaxInt64 {
return i18n.NewError(context.Background(), i18n.MsgTypeRestoreFailed, src, f.i)
}
f.i = fftypes.NewFFBigInt(int64(tv))
case fftypes.FFBigInt:
i := tv
Expand Down
13 changes: 8 additions & 5 deletions pkg/ffresty/ffresty.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Config struct {
}

var (
rateLimiter *rate.Limiter
rateLimiterMap map[*resty.Client]*rate.Limiter
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
metricsManager metric.MetricsManager
onErrorHooks []resty.ErrorHook
onSuccessHooks []resty.SuccessHook
Expand Down Expand Up @@ -177,7 +177,7 @@ func New(ctx context.Context, staticConfig config.Section) (client *resty.Client
return NewWithConfig(ctx, *ffrestyConfig), nil
}

func getRateLimiter(rps, burst int) *rate.Limiter {
func GetRateLimiter(rps, burst int) *rate.Limiter {
if rps != 0 { // if rps is not set no need for a rate limiter
rpsLimiter := rate.Limit(rps)
if burst == 0 {
Expand Down Expand Up @@ -226,8 +226,11 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli
}
client = resty.NewWithClient(httpClient)
}
if rateLimiterMap == nil {
rateLimiterMap = make(map[*resty.Client]*rate.Limiter)
}

rateLimiter = getRateLimiter(ffrestyConfig.ThrottleRequestsPerSecond, ffrestyConfig.ThrottleBurst)
rateLimiterMap[client] = GetRateLimiter(ffrestyConfig.ThrottleRequestsPerSecond, ffrestyConfig.ThrottleBurst)

url := strings.TrimSuffix(ffrestyConfig.URL, "/")
if url != "" {
Expand All @@ -242,9 +245,9 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli
client.SetTimeout(time.Duration(ffrestyConfig.HTTPRequestTimeout))

client.OnBeforeRequest(func(_ *resty.Client, req *resty.Request) error {
if rateLimiter != nil {
if rateLimiterMap[client] != nil {
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
// Wait for permission to proceed with the request
err := rateLimiter.Wait(req.Context())
err := rateLimiterMap[client].Wait(req.Context())
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ffresty/ffresty_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestRequestWithRateLimiter(t *testing.T) {
}

func TestRequestWithRateLimiterHighBurst(t *testing.T) {
expectedNumberOfRequest := 20 // should take longer than 3 seconds less than 4 seconds
expectedNumberOfRequest := 20 // allow all requests to be processed within 1 second

customClient := &http.Client{}

Expand Down Expand Up @@ -225,12 +225,12 @@ func TestRateLimiterFailure(t *testing.T) {
assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization"))
return httpmock.NewStringResponder(200, `{"some": "data"}`)(req)
})
rateLimiter = rate.NewLimiter(rate.Limit(1), 0) // artificially create an broken rate limiter, this is not possible with our config default
rateLimiterMap[c] = rate.NewLimiter(rate.Limit(1), 0) // artificially create an broken rate limiter, this is not possible with our config default
resp, err := c.R().Get("/test")
assert.Error(t, err)
assert.Regexp(t, "exceeds", err)
assert.Nil(t, resp)
rateLimiter = nil // reset limiter
rateLimiterMap = nil // reset limiter
}

func TestRequestRetry(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/fftypes/int.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -45,7 +45,7 @@ func (i *FFuint64) UnmarshalJSON(b []byte) error {
if !ok {
return i18n.NewError(context.Background(), i18n.MsgBigIntParseFailed, b)
}
*i = FFuint64(bi.Int64())
*i = FFuint64(bi.Uint64())
return nil
case float64:
*i = FFuint64(val)
Expand Down
4 changes: 2 additions & 2 deletions pkg/fftypes/jsonobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ func (jd JSONObject) GetStringOk(key string) (string, bool) {
case int64:
return strconv.FormatInt(vt, 10), true
case uint:
return strconv.FormatInt(int64(vt), 10), true
return strconv.FormatUint(uint64(vt), 10), true
case uint8:
return strconv.FormatInt(int64(vt), 10), true
case uint16:
return strconv.FormatInt(int64(vt), 10), true
case uint32:
return strconv.FormatInt(int64(vt), 10), true
case uint64:
return strconv.FormatInt(int64(vt), 10), true
return strconv.FormatUint(vt, 10), true
case nil:
return "", false // no need to log for nil
default:
Expand Down
10 changes: 6 additions & 4 deletions pkg/fftypes/uuid.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -93,11 +93,13 @@ func (u *UUID) UnmarshalBinary(b []byte) error {
}

func (u *UUID) HashBucket(buckets int) int {
if u == nil {
if u == nil || buckets <= 0 {
return 0
EnriqueL8 marked this conversation as resolved.
Show resolved Hide resolved
}
// Take the last random 4 bytes and mod it against the bucket count to generate
// a deterministic hash bucket allocation for the UUID V4
// Explicitly cast `buckets` to uint64 to match the type used in the modulo operation
// and ensure the result is safely converted back to int.

// #nosec G115 - Safe because `buckets` is calculated using modulo operation
return int(binary.BigEndian.Uint64((*u)[8:]) % uint64(buckets))

}
Expand Down
4 changes: 3 additions & 1 deletion pkg/fftypes/uuid_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -93,6 +93,8 @@ func TestHashBucket(t *testing.T) {
assert.Equal(t, 0, u3.HashBucket(2))

assert.Equal(t, 0, ((*UUID)(nil)).HashBucket(12345))
assert.Equal(t, 0, u3.HashBucket(-1))
assert.Equal(t, 0, u3.HashBucket(0))

}

Expand Down
4 changes: 2 additions & 2 deletions pkg/i18n/errors.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -82,7 +82,7 @@ func ffWrap(err error, msgKey ErrorMessageKey) error {

// NewError creates a new error
func NewError(ctx context.Context, msg ErrorMessageKey, inserts ...interface{}) error {
return ffWrap(errors.Errorf(truncate(ExpandWithCode(ctx, MessageKey(msg), inserts...), 2048)), msg)
return ffWrap(errors.New(truncate(ExpandWithCode(ctx, MessageKey(msg), inserts...), 2048)), msg)
}

// WrapError wraps an error
Expand Down
4 changes: 2 additions & 2 deletions pkg/metric/prometheusMetricsManager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -50,7 +50,7 @@ func checkAndUpdateLabelNames(ctx context.Context, labelNames []string, withDefa
for _, labelName := range labelNames {
if strings.HasPrefix(labelName, fireflySystemLabelsPrefix) {
err := i18n.NewError(ctx, i18n.MsgMetricsInvalidLabel, labelName, fireflySystemLabelsPrefix)
log.L(ctx).Errorf(err.Error())
log.L(ctx).Error(err.Error())
} else {
validLabelNames = append(validLabelNames, labelName)
}
Expand Down
17 changes: 15 additions & 2 deletions pkg/version/version_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
// Kaleido, Inc. CONFIDENTIAL
// Unpublished Copyright © 2023 Kaleido, Inc. All Rights Reserved.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package version

Expand Down
45 changes: 29 additions & 16 deletions pkg/wsclient/wsclient.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -29,28 +29,32 @@ import (
"time"

"github.com/gorilla/websocket"
"github.com/hyperledger/firefly-common/pkg/ffresty"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-common/pkg/retry"
"golang.org/x/time/rate"
)

type WSConfig struct {
HTTPURL string `json:"httpUrl,omitempty"`
WebSocketURL string `json:"wsUrl,omitempty"`
WSKeyPath string `json:"wsKeyPath,omitempty"`
ReadBufferSize int `json:"readBufferSize,omitempty"`
WriteBufferSize int `json:"writeBufferSize,omitempty"`
InitialDelay time.Duration `json:"initialDelay,omitempty"`
MaximumDelay time.Duration `json:"maximumDelay,omitempty"`
InitialConnectAttempts int `json:"initialConnectAttempts,omitempty"`
DisableReconnect bool `json:"disableReconnect"`
AuthUsername string `json:"authUsername,omitempty"`
AuthPassword string `json:"authPassword,omitempty"`
HTTPHeaders fftypes.JSONObject `json:"headers,omitempty"`
HeartbeatInterval time.Duration `json:"heartbeatInterval,omitempty"`
TLSClientConfig *tls.Config `json:"tlsClientConfig,omitempty"`
ConnectionTimeout time.Duration `json:"connectionTimeout,omitempty"`
HTTPURL string `json:"httpUrl,omitempty"`
WebSocketURL string `json:"wsUrl,omitempty"`
WSKeyPath string `json:"wsKeyPath,omitempty"`
ReadBufferSize int `json:"readBufferSize,omitempty"`
WriteBufferSize int `json:"writeBufferSize,omitempty"`
InitialDelay time.Duration `json:"initialDelay,omitempty"`
MaximumDelay time.Duration `json:"maximumDelay,omitempty"`
InitialConnectAttempts int `json:"initialConnectAttempts,omitempty"`
DisableReconnect bool `json:"disableReconnect"`
AuthUsername string `json:"authUsername,omitempty"`
AuthPassword string `json:"authPassword,omitempty"`
ThrottleRequestsPerSecond int `json:"requestsPerSecond,omitempty"`
ThrottleBurst int `json:"burst,omitempty"`
HTTPHeaders fftypes.JSONObject `json:"headers,omitempty"`
HeartbeatInterval time.Duration `json:"heartbeatInterval,omitempty"`
TLSClientConfig *tls.Config `json:"tlsClientConfig,omitempty"`
ConnectionTimeout time.Duration `json:"connectionTimeout,omitempty"`
// This one cannot be set in JSON - must be configured on the code interface
ReceiveExt bool
}
Expand Down Expand Up @@ -109,6 +113,7 @@ type wsClient struct {
heartbeatMux sync.Mutex
activePingSent *time.Time
lastPingCompleted time.Time
rateLimiter *rate.Limiter
}

// WSPreConnectHandler will be called before every connect/reconnect. Any error returned will prevent the websocket from connecting.
Expand Down Expand Up @@ -146,6 +151,7 @@ func New(ctx context.Context, config *WSConfig, beforeConnect WSPreConnectHandle
heartbeatInterval: config.HeartbeatInterval,
useReceiveExt: config.ReceiveExt,
disableReconnect: config.DisableReconnect,
rateLimiter: ffresty.GetRateLimiter(config.ThrottleRequestsPerSecond, config.ThrottleBurst),
}
if w.useReceiveExt {
w.receiveExt = make(chan *WSPayload)
Expand Down Expand Up @@ -220,6 +226,13 @@ func (w *wsClient) SetHeader(header, value string) {
}

func (w *wsClient) Send(ctx context.Context, message []byte) error {
if w.rateLimiter != nil {
// Wait for permission to proceed with the request
err := w.rateLimiter.Wait(ctx)
if err != nil {
return err
}
}
// Send
select {
case w.send <- message:
Expand Down
Loading
Loading