Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(gateway/connector): await gateway connection before request #15686

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/capabilities/compute/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/wasmtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils/matches"

cappkg "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
Expand Down Expand Up @@ -188,6 +189,7 @@ func TestComputeFetch(t *testing.T) {
th := setup(t, defaultConfig)

th.connector.EXPECT().DonID().Return("don-id")
th.connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
th.connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"})

msgID := strings.Join([]string{
Expand Down
11 changes: 9 additions & 2 deletions core/capabilities/webapi/outgoing_connector_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,15 @@ func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context,
}
sort.Strings(gatewayIDs)

err = c.gc.SignAndSendToGateway(ctx, gatewayIDs[0], body)
if err != nil {
selectedGateway := gatewayIDs[0]

l.Infow("selected gateway, awaiting connection", "gatewayID", selectedGateway)

if err := c.gc.AwaitConnection(ctx, selectedGateway); err != nil {
return nil, errors.Wrap(err, "await connection canceled")
}

if err := c.gc.SignAndSendToGateway(ctx, selectedGateway, body); err != nil {
return nil, errors.Wrap(err, "failed to send request to gateway")
}

Expand Down
3 changes: 3 additions & 0 deletions core/capabilities/webapi/outgoing_connector_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils/matches"

"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks"
Expand All @@ -36,6 +37,7 @@ func TestHandleSingleNodeRequest(t *testing.T) {
msgID := "msgID"
testURL := "http://localhost:8080"
connector.EXPECT().DonID().Return("donID")
connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
connector.EXPECT().GatewayIDs().Return([]string{"gateway1"})

// build the expected body with the default timeout
Expand Down Expand Up @@ -82,6 +84,7 @@ func TestHandleSingleNodeRequest(t *testing.T) {
msgID := "msgID"
testURL := "http://localhost:8080"
connector.EXPECT().DonID().Return("donID")
connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
connector.EXPECT().GatewayIDs().Return([]string{"gateway1"})

// build the expected body with the defined timeout
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/webapi/target/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestCapability_Execute(t *testing.T) {
require.NoError(t, err)

gatewayResp := gatewayResponse(t, msgID)

th.connector.EXPECT().AwaitConnection(mock.Anything, "gateway1").Return(nil)
th.connector.On("SignAndSendToGateway", mock.Anything, "gateway1", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
th.connectorHandler.HandleGatewayMessage(ctx, "gateway1", gatewayResp)
}).Once()
Expand Down
45 changes: 39 additions & 6 deletions core/services/gateway/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ type GatewayConnector interface {

AddHandler(methods []string, handler GatewayConnectorHandler) error
// SendToGateway takes a signed message as argument and sends it to the specified gateway
SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error
SendToGateway(ctx context.Context, gatewayID string, msg *api.Message) error
// SignAndSendToGateway signs the message and sends the message to the specified gateway
SignAndSendToGateway(ctx context.Context, gatewayID string, msg *api.MessageBody) error
// GatewayIDs returns the list of Gateway IDs
GatewayIDs() []string
// DonID returns the DON ID
DonID() string
AwaitConnection(ctx context.Context, gatewayID string) error
}

// Signer implementation needs to be provided by a GatewayConnector user (node)
Expand Down Expand Up @@ -78,12 +79,30 @@ func (c *gatewayConnector) HealthReport() map[string]error {
func (c *gatewayConnector) Name() string { return c.lggr.Name() }

type gatewayState struct {
// signal channel is closed once the gateway is connected
signalCh chan struct{}

conn network.WSConnectionWrapper
config ConnectorGatewayConfig
url *url.URL
wsClient network.WebSocketClient
}

// A gatewayState is connected when the signal channel is closed
func (gs *gatewayState) signal() {
close(gs.signalCh)
}

// awaitConn blocks until the gateway is connected or the context is done
func (gs *gatewayState) awaitConn(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("await connection failed: %w", ctx.Err())
case <-gs.signalCh:
return nil
}
}

func NewGatewayConnector(config *ConnectorConfig, signer Signer, clock clockwork.Clock, lggr logger.Logger) (GatewayConnector, error) {
if config == nil || signer == nil || clock == nil || lggr == nil {
return nil, errors.New("nil dependency")
Expand Down Expand Up @@ -125,6 +144,7 @@ func NewGatewayConnector(config *ConnectorConfig, signer Signer, clock clockwork
config: gw,
url: parsedURL,
wsClient: network.NewWebSocketClient(config.WsClientConfig, connector, lggr),
signalCh: make(chan struct{}),
}
gateways[gw.Id] = gateway
urlToId[gw.URL] = gw.Id
Expand All @@ -150,17 +170,25 @@ func (c *gatewayConnector) AddHandler(methods []string, handler GatewayConnector
return nil
}

func (c *gatewayConnector) SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error {
func (c *gatewayConnector) AwaitConnection(ctx context.Context, gatewayID string) error {
gateway, ok := c.gateways[gatewayID]
if !ok {
return fmt.Errorf("invalid Gateway ID %s", gatewayID)
}
return gateway.awaitConn(ctx)
}

func (c *gatewayConnector) SendToGateway(ctx context.Context, gatewayID string, msg *api.Message) error {
data, err := c.codec.EncodeResponse(msg)
if err != nil {
return fmt.Errorf("error encoding response for gateway %s: %v", gatewayId, err)
return fmt.Errorf("error encoding response for gateway %s: %w", gatewayID, err)
}
gateway, ok := c.gateways[gatewayId]
gateway, ok := c.gateways[gatewayID]
if !ok {
return fmt.Errorf("invalid Gateway ID %s", gatewayId)
return fmt.Errorf("invalid Gateway ID %s", gatewayID)
}
if gateway.conn == nil {
return fmt.Errorf("connector not started")
return errors.New("connector not started")
}
return gateway.conn.Write(ctx, websocket.BinaryMessage, data)
}
Expand Down Expand Up @@ -242,10 +270,15 @@ func (c *gatewayConnector) reconnectLoop(gatewayState *gatewayState) {
} else {
c.lggr.Infow("connected successfully", "url", gatewayState.url)
closeCh := gatewayState.conn.Reset(conn)
gatewayState.signal()
<-closeCh
c.lggr.Infow("connection closed", "url", gatewayState.url)

// reset backoff
redialBackoff = utils.NewRedialBackoff()

// reset signal channel
gatewayState.signalCh = make(chan struct{})
}
select {
case <-c.shutdownCh:
Expand Down
63 changes: 55 additions & 8 deletions core/services/gateway/connector/mocks/gateway_connector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/services/workflows/syncer/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/utils/matches"
)

type wrapper struct {
Expand Down Expand Up @@ -48,6 +49,7 @@ func TestNewFetcherService(t *testing.T) {
fetcher.och.HandleGatewayMessage(ctx, "gateway1", gatewayResp)
}).Return(nil).Times(1)
connector.EXPECT().DonID().Return("don-id")
connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"})

payload, err := fetcher.Fetch(ctx, url)
Expand Down
Loading