diff --git a/deployment/environment/devenv/don.go b/deployment/environment/devenv/don.go index 92751ca94f6..05a3d5bea08 100644 --- a/deployment/environment/devenv/don.go +++ b/deployment/environment/devenv/don.go @@ -335,8 +335,30 @@ func (n *Node) RegisterNodeToJobDistributor(ctx context.Context, jd JobDistribut Labels: n.labels, Name: n.Name, }) - - if err != nil { + // node already registered, fetch it's id + // TODO: check for rpc code = "AlreadyExists" instead + if err != nil && strings.Contains(err.Error(), "AlreadyExists") { + nodesResponse, err := jd.ListNodes(ctx, &nodev1.ListNodesRequest{ + Filter: &nodev1.ListNodesRequest_Filter{ + Selectors: []*ptypes.Selector{ + { + Key: "p2p_id", + Op: ptypes.SelectorOp_EQ, + Value: peerID, + }, + }, + }, + }) + if err != nil { + return err + } + nodes := nodesResponse.GetNodes() + if len(nodes) == 0 { + return fmt.Errorf("failed to find node: %v", n.Name) + } + n.NodeId = nodes[0].Id + return nil + } else if err != nil { return fmt.Errorf("failed to register node %s: %w", n.Name, err) } if registerResponse.GetNode().GetId() == "" { @@ -367,13 +389,12 @@ func (n *Node) CreateJobDistributor(ctx context.Context, jd JobDistributor) (str func (n *Node) SetUpAndLinkJobDistributor(ctx context.Context, jd JobDistributor) error { // register the node in the job distributor err := n.RegisterNodeToJobDistributor(ctx, jd) - // TODO: check for rpc code = "AlreadyExists" instead - if err != nil && !strings.Contains(err.Error(), "AlreadyExists") { + if err != nil { return err } // now create the job distributor in the node id, err := n.CreateJobDistributor(ctx, jd) - if err != nil { + if err != nil && !strings.Contains(err.Error(), "DuplicateFeedsManagerError") { return err } // wait for the node to connect to the job distributor @@ -382,7 +403,7 @@ func (n *Node) SetUpAndLinkJobDistributor(ctx context.Context, jd JobDistributor Id: n.NodeId, }) if err != nil { - return fmt.Errorf("failed to get node %s: %w", n.Name, err) + return retry.RetryableError(fmt.Errorf("failed to get node %s: %w", n.Name, err)) } if getRes.GetNode() == nil { return fmt.Errorf("no node found for node id %s", n.NodeId) diff --git a/deployment/environment/web/sdk/client/client.go b/deployment/environment/web/sdk/client/client.go index 74d18c13acb..5472591ef94 100644 --- a/deployment/environment/web/sdk/client/client.go +++ b/deployment/environment/web/sdk/client/client.go @@ -206,7 +206,7 @@ func (c *client) CreateJobDistributor(ctx context.Context, in JobDistributorInpu msg := err.GetMessage() return "", fmt.Errorf("failed to create feeds manager: %v", msg) } - return "", fmt.Errorf("failed to create feeds manager") + return "", fmt.Errorf("failed to create feeds manager: %v", response.GetCreateFeedsManager().GetTypename()) } func (c *client) UpdateJobDistributor(ctx context.Context, id string, in JobDistributorInput) error {