diff --git a/pkg/app/piped/executor/ecs/deploy.go b/pkg/app/piped/executor/ecs/deploy.go index fc3e31a390..2c684a2556 100644 --- a/pkg/app/piped/executor/ecs/deploy.go +++ b/pkg/app/piped/executor/ecs/deploy.go @@ -106,6 +106,11 @@ func (e *deployExecutor) ensureSync(ctx context.Context) model.StageStatus { return model.StageStatus_STAGE_FAILURE } + if err := serviceStable(ctx, &e.Input, e.platformProviderName, e.platformProviderCfg, servicedefinition); err != nil { + e.LogPersister.Errorf("Failed while waiting for service stable: %v", err) + return model.StageStatus_STAGE_FAILURE + } + return model.StageStatus_STAGE_SUCCESS } @@ -132,6 +137,11 @@ func (e *deployExecutor) ensurePrimaryRollout(ctx context.Context) model.StageSt return model.StageStatus_STAGE_FAILURE } + if err := serviceStable(ctx, &e.Input, e.platformProviderName, e.platformProviderCfg, servicedefinition); err != nil { + e.LogPersister.Errorf("Failed while waiting for service stable: %v", err) + return model.StageStatus_STAGE_FAILURE + } + return model.StageStatus_STAGE_SUCCESS } @@ -158,6 +168,11 @@ func (e *deployExecutor) ensureCanaryRollout(ctx context.Context) model.StageSta return model.StageStatus_STAGE_FAILURE } + if err := serviceStable(ctx, &e.Input, e.platformProviderName, e.platformProviderCfg, servicedefinition); err != nil { + e.LogPersister.Errorf("Failed while waiting for service stable: %v", err) + return model.StageStatus_STAGE_FAILURE + } + return model.StageStatus_STAGE_SUCCESS } diff --git a/pkg/app/piped/executor/ecs/ecs.go b/pkg/app/piped/executor/ecs/ecs.go index 6b5f87f86d..c2e87fd904 100644 --- a/pkg/app/piped/executor/ecs/ecs.go +++ b/pkg/app/piped/executor/ecs/ecs.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "strconv" + "time" "github.com/aws/aws-sdk-go-v2/service/ecs/types" "go.uber.org/zap" @@ -28,6 +29,7 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/piped/executor" "github.com/pipe-cd/pipecd/pkg/app/piped/platformprovider" provider "github.com/pipe-cd/pipecd/pkg/app/piped/platformprovider/ecs" + "github.com/pipe-cd/pipecd/pkg/backoff" "github.com/pipe-cd/pipecd/pkg/config" "github.com/pipe-cd/pipecd/pkg/model" ) @@ -39,6 +41,13 @@ const ( trafficRoutePrimaryMetadataKey = "primary-percentage" trafficRouteCanaryMetadataKey = "canary-percentage" canaryScaleMetadataKey = "canary-scale" + + // Follow the implementation of the AWS cli for service-stable command. + // ref: https://docs.aws.amazon.com/cli/latest/reference/ecs/wait/services-stable.html + // Retry wait service stable check. This value is set to 40. + retryServiceStable = 40 + // Interval wait service stable check. This value is set to 15 seconds. + retryServiceStableInterval = 15 * time.Second ) type registerer interface { @@ -346,6 +355,38 @@ func rollout(ctx context.Context, in *executor.Input, platformProviderName strin return true } +func serviceStable(ctx context.Context, in *executor.Input, platformProviderName string, platformProviderCfg *config.PlatformProviderECSConfig, serviceDefinition types.Service) error { + client, err := provider.DefaultRegistry().Client(platformProviderName, platformProviderCfg, in.Logger) + if err != nil { + in.LogPersister.Errorf("Unable to create ECS client for the provider %s: %v", platformProviderName, err) + return err + } + + in.LogPersister.Infof("Wait for service to reach stable state") + // Follow the implementation of the AWS cli for service-stable command. + // ref: https://docs.aws.amazon.com/cli/latest/reference/ecs/wait/services-stable.html + retry := backoff.NewRetry(retryServiceStable, backoff.NewConstant(retryServiceStableInterval)) + check, err := retry.Do(ctx, func() (interface{}, error) { + err := client.IsServiceStable(ctx, serviceDefinition) + // Ignore error in case it's not found error, there is no service to check. + if errors.Is(err, platformprovider.ErrNotFound) { + return false, nil + } + return err == nil, err + }) + + if check.(bool) { + in.LogPersister.Infof("Successfully waited for service to reach stable state") + return nil + } + + if err == nil { + return fmt.Errorf("service %s is not stable", *serviceDefinition.ServiceName) + } + + return err +} + func clean(ctx context.Context, in *executor.Input, platformProviderName string, platformProviderCfg *config.PlatformProviderECSConfig) bool { client, err := provider.DefaultRegistry().Client(platformProviderName, platformProviderCfg, in.Logger) if err != nil { diff --git a/pkg/app/piped/platformprovider/ecs/client.go b/pkg/app/piped/platformprovider/ecs/client.go index f7853e2b97..39882c9b11 100644 --- a/pkg/app/piped/platformprovider/ecs/client.go +++ b/pkg/app/piped/platformprovider/ecs/client.go @@ -229,6 +229,33 @@ func (c *client) GetPrimaryTaskSet(ctx context.Context, service types.Service) ( return nil, platformprovider.ErrNotFound } +// IsServiceStable check whether the given service is stable or not. +// It returns nil if the service is stable, otherwise it returns an error. +func (c *client) IsServiceStable(ctx context.Context, service types.Service) error { + input := &ecs.DescribeServicesInput{ + Cluster: service.ClusterArn, + Services: []string{*service.ServiceArn}, + } + + output, err := c.ecsClient.DescribeServices(ctx, input) + if err != nil { + return fmt.Errorf("failed to get service %s: %w", *service.ServiceName, err) + } + + if len(output.Services) == 0 { + return platformprovider.ErrNotFound + } + + svc := output.Services[0] + // Logic follow implementation of the AWS CLI. + // ref: https://docs.aws.amazon.com/cli/latest/reference/ecs/wait/services-stable.html#description + if len(svc.Deployments) == 1 && svc.RunningCount == svc.DesiredCount { + return nil + } + + return fmt.Errorf("service %s is not stable", *service.ServiceName) +} + func (c *client) DeleteTaskSet(ctx context.Context, service types.Service, taskSetArn string) error { input := &ecs.DeleteTaskSetInput{ Cluster: service.ClusterArn, diff --git a/pkg/app/piped/platformprovider/ecs/ecs.go b/pkg/app/piped/platformprovider/ecs/ecs.go index 64efaa959e..ae6d862296 100644 --- a/pkg/app/piped/platformprovider/ecs/ecs.go +++ b/pkg/app/piped/platformprovider/ecs/ecs.go @@ -45,6 +45,7 @@ type ECS interface { ServiceExists(ctx context.Context, clusterName string, servicesName string) (bool, error) CreateService(ctx context.Context, service types.Service) (*types.Service, error) UpdateService(ctx context.Context, service types.Service) (*types.Service, error) + IsServiceStable(ctx context.Context, service types.Service) error RegisterTaskDefinition(ctx context.Context, taskDefinition types.TaskDefinition) (*types.TaskDefinition, error) RunTask(ctx context.Context, taskDefinition types.TaskDefinition, clusterArn string, launchType string, awsVpcConfiguration *config.ECSVpcConfiguration, tags []types.Tag) error GetPrimaryTaskSet(ctx context.Context, service types.Service) (*types.TaskSet, error)