Skip to content

Commit

Permalink
deployment: JD improvements (#15239)
Browse files Browse the repository at this point in the history
* Fix ineffective assign

* jd: If a node is already registered, just proceed

* jd: only set transport credentials if actually set

* web/client: Clearer error if feeds manager fails to create

* Add some idempotency and robustness to the node registration
  • Loading branch information
archseer authored Nov 21, 2024
1 parent 9eceab5 commit 212bde3
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 9 deletions.
4 changes: 2 additions & 2 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,8 @@ func (s *service) ListManagersByIDs(ctx context.Context, ids []int64) ([]FeedsMa
return nil, errors.Wrap(err, "failed to list managers by IDs")
}

for _, manager := range managers {
manager.IsConnectionActive = s.connMgr.IsConnected(manager.ID)
for i, manager := range managers {
managers[i].IsConnectionActive = s.connMgr.IsConnected(manager.ID)
}

return managers, nil
Expand Down
30 changes: 26 additions & 4 deletions deployment/environment/devenv/don.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,30 @@ func (n *Node) RegisterNodeToJobDistributor(ctx context.Context, jd JobDistribut
Labels: n.labels,
Name: n.Name,
})

if err != nil {
// node already registered, fetch it's id
// TODO: check for rpc code = "AlreadyExists" instead
if err != nil && strings.Contains(err.Error(), "AlreadyExists") {
nodesResponse, err := jd.ListNodes(ctx, &nodev1.ListNodesRequest{
Filter: &nodev1.ListNodesRequest_Filter{
Selectors: []*ptypes.Selector{
{
Key: "p2p_id",
Op: ptypes.SelectorOp_EQ,
Value: peerID,
},
},
},
})
if err != nil {
return err
}
nodes := nodesResponse.GetNodes()
if len(nodes) == 0 {
return fmt.Errorf("failed to find node: %v", n.Name)
}
n.NodeId = nodes[0].Id
return nil
} else if err != nil {
return fmt.Errorf("failed to register node %s: %w", n.Name, err)
}
if registerResponse.GetNode().GetId() == "" {
Expand Down Expand Up @@ -372,7 +394,7 @@ func (n *Node) SetUpAndLinkJobDistributor(ctx context.Context, jd JobDistributor
}
// now create the job distributor in the node
id, err := n.CreateJobDistributor(ctx, jd)
if err != nil {
if err != nil && !strings.Contains(err.Error(), "DuplicateFeedsManagerError") {
return err
}
// wait for the node to connect to the job distributor
Expand All @@ -381,7 +403,7 @@ func (n *Node) SetUpAndLinkJobDistributor(ctx context.Context, jd JobDistributor
Id: n.NodeId,
})
if err != nil {
return fmt.Errorf("failed to get node %s: %w", n.Name, err)
return retry.RetryableError(fmt.Errorf("failed to get node %s: %w", n.Name, err))
}
if getRes.GetNode() == nil {
return fmt.Errorf("no node found for node id %s", n.NodeId)
Expand Down
5 changes: 3 additions & 2 deletions deployment/environment/devenv/jd.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ func authTokenInterceptor(source oauth2.TokenSource) grpc.UnaryClientInterceptor
}

func NewJDConnection(cfg JDConfig) (*grpc.ClientConn, error) {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(cfg.Creds),
opts := []grpc.DialOption{}
if cfg.Creds != nil {
opts = append(opts, grpc.WithTransportCredentials(cfg.Creds))
}
if cfg.Auth != nil {
opts = append(opts, grpc.WithUnaryInterceptor(authTokenInterceptor(cfg.Auth)))
Expand Down
6 changes: 5 additions & 1 deletion deployment/environment/web/sdk/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,11 @@ func (c *client) CreateJobDistributor(ctx context.Context, in JobDistributorInpu
feedsManager := success.GetFeedsManager()
return feedsManager.GetId(), nil
}
return "", fmt.Errorf("failed to create feeds manager")
if err, ok := response.GetCreateFeedsManager().(*generated.CreateFeedsManagerCreateFeedsManagerSingleFeedsManagerError); ok {
msg := err.GetMessage()
return "", fmt.Errorf("failed to create feeds manager: %v", msg)
}
return "", fmt.Errorf("failed to create feeds manager: %v", response.GetCreateFeedsManager().GetTypename())
}

func (c *client) UpdateJobDistributor(ctx context.Context, id string, in JobDistributorInput) error {
Expand Down

0 comments on commit 212bde3

Please sign in to comment.