From 212bde305bd8e1305622d69e214cbfa879ce8390 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Thu, 21 Nov 2024 13:53:33 +0900 Subject: [PATCH] deployment: JD improvements (#15239) * Fix ineffective assign * jd: If a node is already registered, just proceed * jd: only set transport credentials if actually set * web/client: Clearer error if feeds manager fails to create * Add some idempotency and robustness to the node registration --- core/services/feeds/service.go | 4 +-- deployment/environment/devenv/don.go | 30 ++++++++++++++++--- deployment/environment/devenv/jd.go | 5 ++-- .../environment/web/sdk/client/client.go | 6 +++- 4 files changed, 36 insertions(+), 9 deletions(-) diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index f986d1753af..61b2d53f2d5 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -362,8 +362,8 @@ func (s *service) ListManagersByIDs(ctx context.Context, ids []int64) ([]FeedsMa return nil, errors.Wrap(err, "failed to list managers by IDs") } - for _, manager := range managers { - manager.IsConnectionActive = s.connMgr.IsConnected(manager.ID) + for i, manager := range managers { + managers[i].IsConnectionActive = s.connMgr.IsConnected(manager.ID) } return managers, nil diff --git a/deployment/environment/devenv/don.go b/deployment/environment/devenv/don.go index 830f5b921bc..05a3d5bea08 100644 --- a/deployment/environment/devenv/don.go +++ b/deployment/environment/devenv/don.go @@ -335,8 +335,30 @@ func (n *Node) RegisterNodeToJobDistributor(ctx context.Context, jd JobDistribut Labels: n.labels, Name: n.Name, }) - - if err != nil { + // node already registered, fetch it's id + // TODO: check for rpc code = "AlreadyExists" instead + if err != nil && strings.Contains(err.Error(), "AlreadyExists") { + nodesResponse, err := jd.ListNodes(ctx, &nodev1.ListNodesRequest{ + Filter: &nodev1.ListNodesRequest_Filter{ + Selectors: []*ptypes.Selector{ + { + Key: "p2p_id", + Op: ptypes.SelectorOp_EQ, + Value: peerID, + }, + }, + }, + }) + if err != nil { + return err + } + nodes := nodesResponse.GetNodes() + if len(nodes) == 0 { + return fmt.Errorf("failed to find node: %v", n.Name) + } + n.NodeId = nodes[0].Id + return nil + } else if err != nil { return fmt.Errorf("failed to register node %s: %w", n.Name, err) } if registerResponse.GetNode().GetId() == "" { @@ -372,7 +394,7 @@ func (n *Node) SetUpAndLinkJobDistributor(ctx context.Context, jd JobDistributor } // now create the job distributor in the node id, err := n.CreateJobDistributor(ctx, jd) - if err != nil { + if err != nil && !strings.Contains(err.Error(), "DuplicateFeedsManagerError") { return err } // wait for the node to connect to the job distributor @@ -381,7 +403,7 @@ func (n *Node) SetUpAndLinkJobDistributor(ctx context.Context, jd JobDistributor Id: n.NodeId, }) if err != nil { - return fmt.Errorf("failed to get node %s: %w", n.Name, err) + return retry.RetryableError(fmt.Errorf("failed to get node %s: %w", n.Name, err)) } if getRes.GetNode() == nil { return fmt.Errorf("no node found for node id %s", n.NodeId) diff --git a/deployment/environment/devenv/jd.go b/deployment/environment/devenv/jd.go index 9af8412d61e..48150340cae 100644 --- a/deployment/environment/devenv/jd.go +++ b/deployment/environment/devenv/jd.go @@ -45,8 +45,9 @@ func authTokenInterceptor(source oauth2.TokenSource) grpc.UnaryClientInterceptor } func NewJDConnection(cfg JDConfig) (*grpc.ClientConn, error) { - opts := []grpc.DialOption{ - grpc.WithTransportCredentials(cfg.Creds), + opts := []grpc.DialOption{} + if cfg.Creds != nil { + opts = append(opts, grpc.WithTransportCredentials(cfg.Creds)) } if cfg.Auth != nil { opts = append(opts, grpc.WithUnaryInterceptor(authTokenInterceptor(cfg.Auth))) diff --git a/deployment/environment/web/sdk/client/client.go b/deployment/environment/web/sdk/client/client.go index 011eb0cce31..5472591ef94 100644 --- a/deployment/environment/web/sdk/client/client.go +++ b/deployment/environment/web/sdk/client/client.go @@ -202,7 +202,11 @@ func (c *client) CreateJobDistributor(ctx context.Context, in JobDistributorInpu feedsManager := success.GetFeedsManager() return feedsManager.GetId(), nil } - return "", fmt.Errorf("failed to create feeds manager") + if err, ok := response.GetCreateFeedsManager().(*generated.CreateFeedsManagerCreateFeedsManagerSingleFeedsManagerError); ok { + msg := err.GetMessage() + return "", fmt.Errorf("failed to create feeds manager: %v", msg) + } + return "", fmt.Errorf("failed to create feeds manager: %v", response.GetCreateFeedsManager().GetTypename()) } func (c *client) UpdateJobDistributor(ctx context.Context, id string, in JobDistributorInput) error {