Skip to content

Commit

Permalink
Introduce service stable logic to ensure ECS workload available
Browse files Browse the repository at this point in the history
Signed-off-by: khanhtc1202 <khanhtc1202@gmail.com>
  • Loading branch information
khanhtc1202 committed Aug 18, 2023
1 parent b4a4f6c commit d6c46c7
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 0 deletions.
15 changes: 15 additions & 0 deletions pkg/app/piped/executor/ecs/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ func (e *deployExecutor) ensureSync(ctx context.Context) model.StageStatus {
return model.StageStatus_STAGE_FAILURE
}

if err := serviceStable(ctx, &e.Input, e.platformProviderName, e.platformProviderCfg, servicedefinition); err != nil {
e.LogPersister.Errorf("Failed while waiting for service stable: %v", err)
return model.StageStatus_STAGE_FAILURE
}

return model.StageStatus_STAGE_SUCCESS
}

Expand All @@ -132,6 +137,11 @@ func (e *deployExecutor) ensurePrimaryRollout(ctx context.Context) model.StageSt
return model.StageStatus_STAGE_FAILURE
}

if err := serviceStable(ctx, &e.Input, e.platformProviderName, e.platformProviderCfg, servicedefinition); err != nil {
e.LogPersister.Errorf("Failed while waiting for service stable: %v", err)
return model.StageStatus_STAGE_FAILURE
}

return model.StageStatus_STAGE_SUCCESS
}

Expand All @@ -158,6 +168,11 @@ func (e *deployExecutor) ensureCanaryRollout(ctx context.Context) model.StageSta
return model.StageStatus_STAGE_FAILURE
}

if err := serviceStable(ctx, &e.Input, e.platformProviderName, e.platformProviderCfg, servicedefinition); err != nil {
e.LogPersister.Errorf("Failed while waiting for service stable: %v", err)
return model.StageStatus_STAGE_FAILURE
}

return model.StageStatus_STAGE_SUCCESS
}

Expand Down
41 changes: 41 additions & 0 deletions pkg/app/piped/executor/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"strconv"
"time"

"github.com/aws/aws-sdk-go-v2/service/ecs/types"
"go.uber.org/zap"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/pipe-cd/pipecd/pkg/app/piped/executor"
"github.com/pipe-cd/pipecd/pkg/app/piped/platformprovider"
provider "github.com/pipe-cd/pipecd/pkg/app/piped/platformprovider/ecs"
"github.com/pipe-cd/pipecd/pkg/backoff"
"github.com/pipe-cd/pipecd/pkg/config"
"github.com/pipe-cd/pipecd/pkg/model"
)
Expand All @@ -39,6 +41,13 @@ const (
trafficRoutePrimaryMetadataKey = "primary-percentage"
trafficRouteCanaryMetadataKey = "canary-percentage"
canaryScaleMetadataKey = "canary-scale"

// Follow the implementation of the AWS cli for service-stable command.
// ref: https://docs.aws.amazon.com/cli/latest/reference/ecs/wait/services-stable.html
// Retry wait service stable check. This value is set to 40.
retryServiceStable = 40
// Interval wait service stable check. This value is set to 15 seconds.
retryServiceStableInterval = 15 * time.Second
)

type registerer interface {
Expand Down Expand Up @@ -346,6 +355,38 @@ func rollout(ctx context.Context, in *executor.Input, platformProviderName strin
return true
}

func serviceStable(ctx context.Context, in *executor.Input, platformProviderName string, platformProviderCfg *config.PlatformProviderECSConfig, serviceDefinition types.Service) error {
client, err := provider.DefaultRegistry().Client(platformProviderName, platformProviderCfg, in.Logger)
if err != nil {
in.LogPersister.Errorf("Unable to create ECS client for the provider %s: %v", platformProviderName, err)
return err
}

in.LogPersister.Infof("Wait for service to reach stable state")
// Follow the implementation of the AWS cli for service-stable command.
// ref: https://docs.aws.amazon.com/cli/latest/reference/ecs/wait/services-stable.html
retry := backoff.NewRetry(retryServiceStable, backoff.NewConstant(retryServiceStableInterval))
check, err := retry.Do(ctx, func() (interface{}, error) {
err := client.IsServiceStable(ctx, serviceDefinition)
// Ignore error in case it's not found error, there is no service to check.
if errors.Is(err, platformprovider.ErrNotFound) {
return false, nil
}
return err == nil, err
})

if check.(bool) {
in.LogPersister.Infof("Successfully waited for service to reach stable state")
return nil
}

if err == nil {
return fmt.Errorf("service %s is not stable", *serviceDefinition.ServiceName)
}

return err
}

func clean(ctx context.Context, in *executor.Input, platformProviderName string, platformProviderCfg *config.PlatformProviderECSConfig) bool {
client, err := provider.DefaultRegistry().Client(platformProviderName, platformProviderCfg, in.Logger)
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions pkg/app/piped/platformprovider/ecs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,33 @@ func (c *client) GetPrimaryTaskSet(ctx context.Context, service types.Service) (
return nil, platformprovider.ErrNotFound
}

// IsServiceStable check whether the given service is stable or not.
// It returns nil if the service is stable, otherwise it returns an error.
func (c *client) IsServiceStable(ctx context.Context, service types.Service) error {
input := &ecs.DescribeServicesInput{
Cluster: service.ClusterArn,
Services: []string{*service.ServiceArn},
}

output, err := c.ecsClient.DescribeServices(ctx, input)
if err != nil {
return fmt.Errorf("failed to get service %s: %w", *service.ServiceName, err)
}

Check warning on line 243 in pkg/app/piped/platformprovider/ecs/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/piped/platformprovider/ecs/client.go#L234-L243

Added lines #L234 - L243 were not covered by tests

if len(output.Services) == 0 {
return platformprovider.ErrNotFound
}

Check warning on line 247 in pkg/app/piped/platformprovider/ecs/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/piped/platformprovider/ecs/client.go#L245-L247

Added lines #L245 - L247 were not covered by tests

svc := output.Services[0]
// Logic follow implementation of the AWS CLI.
// ref: https://docs.aws.amazon.com/cli/latest/reference/ecs/wait/services-stable.html#description
if len(svc.Deployments) == 1 && svc.RunningCount == svc.DesiredCount {
return nil
}

Check warning on line 254 in pkg/app/piped/platformprovider/ecs/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/piped/platformprovider/ecs/client.go#L249-L254

Added lines #L249 - L254 were not covered by tests

return fmt.Errorf("service %s is not stable", *service.ServiceName)

Check warning on line 256 in pkg/app/piped/platformprovider/ecs/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/piped/platformprovider/ecs/client.go#L256

Added line #L256 was not covered by tests
}

func (c *client) DeleteTaskSet(ctx context.Context, service types.Service, taskSetArn string) error {
input := &ecs.DeleteTaskSetInput{
Cluster: service.ClusterArn,
Expand Down
1 change: 1 addition & 0 deletions pkg/app/piped/platformprovider/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type ECS interface {
ServiceExists(ctx context.Context, clusterName string, servicesName string) (bool, error)
CreateService(ctx context.Context, service types.Service) (*types.Service, error)
UpdateService(ctx context.Context, service types.Service) (*types.Service, error)
IsServiceStable(ctx context.Context, service types.Service) error
RegisterTaskDefinition(ctx context.Context, taskDefinition types.TaskDefinition) (*types.TaskDefinition, error)
RunTask(ctx context.Context, taskDefinition types.TaskDefinition, clusterArn string, launchType string, awsVpcConfiguration *config.ECSVpcConfiguration, tags []types.Tag) error
GetPrimaryTaskSet(ctx context.Context, service types.Service) (*types.TaskSet, error)
Expand Down

0 comments on commit d6c46c7

Please sign in to comment.