Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #4580 #4581 #4582

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 38 additions & 22 deletions pkg/app/piped/executor/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ import (
)

const (
activeServiceKeyName = "active-service-object"
// Canary task set metadata keys.
canaryTaskSetARNKeyName = "canary-taskset-arn"
canaryServiceKeyName = "canary-service-object"
// Stage metadata keys.
trafficRoutePrimaryMetadataKey = "primary-percentage"
trafficRouteCanaryMetadataKey = "canary-percentage"
Expand Down Expand Up @@ -265,6 +266,17 @@ func sync(ctx context.Context, in *executor.Input, platformProviderName string,
return false
}

// Store ACTIVE service to delete its unused TaskSet later.
serviceObjData, err := json.Marshal(service)
if err != nil {
in.LogPersister.Errorf("Unable to store applied service to metadata store: %v", err)
return false
}
if err := in.MetadataStore.Shared().Put(ctx, activeServiceKeyName, string(serviceObjData)); err != nil {
in.LogPersister.Errorf("Unable to store applied service to metadata store: %v", err)
return false
}

in.LogPersister.Infof("Start rolling out ECS task set")
if err := createPrimaryTaskSet(ctx, client, *service, *td, targetGroup); err != nil {
in.LogPersister.Errorf("Failed to rolling out ECS task set for service %s: %v", *serviceDefinition.ServiceName, err)
Expand Down Expand Up @@ -302,6 +314,17 @@ func rollout(ctx context.Context, in *executor.Input, platformProviderName strin
return false
}

// Store ACTIVE service to delete its unused TaskSet later.
serviceObjData, err := json.Marshal(service)
if err != nil {
in.LogPersister.Errorf("Unable to store applied service to metadata store: %v", err)
return false
}
if err := in.MetadataStore.Shared().Put(ctx, activeServiceKeyName, string(serviceObjData)); err != nil {
in.LogPersister.Errorf("Unable to store applied service to metadata store: %v", err)
return false
}

// Create a task set in the specified cluster and service.
in.LogPersister.Infof("Start rolling out ECS task set")
if in.StageConfig.Name == model.StageECSPrimaryRollout {
Expand Down Expand Up @@ -336,16 +359,6 @@ func rollout(ctx context.Context, in *executor.Input, platformProviderName strin
in.LogPersister.Errorf("Unable to store created active taskSet to metadata store: %v", err)
return false
}
// Store applied Service (CANARY variant) to delete its TaskSet later.
serviceObjData, err := json.Marshal(service)
if err != nil {
in.LogPersister.Errorf("Unable to store applied service to metadata store: %v", err)
return false
}
if err := in.MetadataStore.Shared().Put(ctx, canaryServiceKeyName, string(serviceObjData)); err != nil {
in.LogPersister.Errorf("Unable to store applied service to metadata store: %v", err)
return false
}
}

in.LogPersister.Infof("Wait service to reach stable state")
Expand All @@ -365,27 +378,30 @@ func clean(ctx context.Context, in *executor.Input, platformProviderName string,
return false
}

taskSetArn, ok := in.MetadataStore.Shared().Get(canaryTaskSetARNKeyName)
if !ok {
in.LogPersister.Errorf("Unable to restore CANARY task set to clean: Not found")
return false
}
serviceObjData, ok := in.MetadataStore.Shared().Get(canaryServiceKeyName)
// Get service object from metadata store.
serviceObjData, ok := in.MetadataStore.Shared().Get(activeServiceKeyName)
if !ok {
in.LogPersister.Errorf("Unable to restore CANARY service to clean: Not found")
in.LogPersister.Errorf("Unable to restore service to clean: Not found")
return false
}
service := &types.Service{}
if err := json.Unmarshal([]byte(serviceObjData), service); err != nil {
in.LogPersister.Errorf("Unable to restore CANARY service to clean: %v", err)
in.LogPersister.Errorf("Unable to restore service to clean: %v", err)
return false
}

if err := client.DeleteTaskSet(ctx, *service, taskSetArn); err != nil {
in.LogPersister.Errorf("Failed to clean CANARY task set %s: %v", taskSetArn, err)
return false
// Delete canary task set if present.
taskSetArn, ok := in.MetadataStore.Shared().Get(canaryTaskSetARNKeyName)
if ok {
in.LogPersister.Infof("Cleaning CANARY task set %s from service %s", taskSetArn, *service.ServiceName)
if err := client.DeleteTaskSet(ctx, *service, taskSetArn); err != nil {
in.LogPersister.Errorf("Failed to clean CANARY task set %s: %v", taskSetArn, err)
return false
}
return true
}

in.LogPersister.Info("No task set found in metadata store to clean")
return true
}

Expand Down
66 changes: 32 additions & 34 deletions pkg/app/piped/executor/ecs/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ func rollback(ctx context.Context, in *executor.Input, platformProviderName stri
return false
}

// Get current PRIMARY task set.
prevPrimaryTaskSet, err := client.GetPrimaryTaskSet(ctx, *service)
// Ignore error in case it's not found error, the prevPrimaryTaskSet doesn't exist for newly created Service.
// Get current PRIMARY/ACTIVE task set.
prevTaskSets, err := client.GetServiceTaskSets(ctx, *service)
// Ignore error in case it's not found error, the prevTaskSets doesn't exist for newly created Service.
if err != nil && !errors.Is(err, platformprovider.ErrNotFound) {
in.LogPersister.Errorf("Failed to determine current ECS PRIMARY taskSet of service %s for rollback: %v", *serviceDefinition.ServiceName, err)
in.LogPersister.Errorf("Failed to determine current ECS PRIMARY/ACTIVE taskSet of service %s for rollback: %v", *serviceDefinition.ServiceName, err)
return false
}

Expand All @@ -139,41 +139,39 @@ func rollback(ctx context.Context, in *executor.Input, platformProviderName stri
return false
}

// Remove old taskSet if existed.
if prevPrimaryTaskSet != nil {
if err = client.DeleteTaskSet(ctx, *service, *prevPrimaryTaskSet.TaskSetArn); err != nil {
in.LogPersister.Errorf("Failed to remove unused previous PRIMARY taskSet %s: %v", *prevPrimaryTaskSet.TaskSetArn, err)
return false
// Reset routing in case of rolling back progressive pipeline.
if primaryTargetGroup != nil && canaryTargetGroup != nil {
routingTrafficCfg := provider.RoutingTrafficConfig{
{
TargetGroupArn: *primaryTargetGroup.TargetGroupArn,
Weight: 100,
},
{
TargetGroupArn: *canaryTargetGroup.TargetGroupArn,
Weight: 0,
},
}
}

// Reset routing
routingTrafficCfg := provider.RoutingTrafficConfig{
{
TargetGroupArn: *primaryTargetGroup.TargetGroupArn,
Weight: 100,
},
{
TargetGroupArn: *canaryTargetGroup.TargetGroupArn,
Weight: 0,
},
}

currListenerArns, err := client.GetListenerArns(ctx, *primaryTargetGroup)
if err != nil {
in.LogPersister.Errorf("Failed to get current active listeners: %v", err)
return false
}
currListenerArns, err := client.GetListenerArns(ctx, *primaryTargetGroup)
if err != nil {
in.LogPersister.Errorf("Failed to get current active listeners: %v", err)
return false
}

if err := client.ModifyListeners(ctx, currListenerArns, routingTrafficCfg); err != nil {
in.LogPersister.Errorf("Failed to routing traffic to PRIMARY variant: %v", err)
return false
if err := client.ModifyListeners(ctx, currListenerArns, routingTrafficCfg); err != nil {
in.LogPersister.Errorf("Failed to routing traffic to PRIMARY variant: %v", err)
return false
}
}

// Delete Canary taskSet
if !clean(ctx, in, platformProviderName, platformProviderCfg) {
in.LogPersister.Error("Failed to delete CANARY TaskSet")
return false
// Delete previous ACTIVE taskSets
in.LogPersister.Infof("Start deleting previous ACTIVE taskSets")
for _, ts := range prevTaskSets {
in.LogPersister.Infof("Deleting previous ACTIVE taskSet %s", *ts.TaskSetArn)
if err := client.DeleteTaskSet(ctx, *service, *ts.TaskSetArn); err != nil {
in.LogPersister.Errorf("Failed to remove previous ACTIVE taskSet %s: %v", *ts.TaskSetArn, err)
return false
}
}

in.LogPersister.Infof("Rolled back the ECS service %s and task definition %s configuration to original stage", *serviceDefinition.ServiceName, *taskDefinition.Family)
Expand Down
25 changes: 25 additions & 0 deletions pkg/app/piped/platformprovider/ecs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,31 @@ func (c *client) GetPrimaryTaskSet(ctx context.Context, service types.Service) (
return nil, platformprovider.ErrNotFound
}

func (c *client) GetServiceTaskSets(ctx context.Context, service types.Service) ([]*types.TaskSet, error) {
input := &ecs.DescribeServicesInput{
Cluster: service.ClusterArn,
Services: []string{
*service.ServiceArn,
},
}
output, err := c.ecsClient.DescribeServices(ctx, input)
if err != nil {
return nil, fmt.Errorf("failed to get task sets of service %s: %w", *service.ServiceName, err)
}
if len(output.Services) == 0 {
return nil, fmt.Errorf("failed to get task sets of service %s: services empty", *service.ServiceName)
}
svc := output.Services[0]
taskSets := make([]*types.TaskSet, 0, len(svc.TaskSets))
for i := range svc.TaskSets {
if aws.ToString(svc.TaskSets[i].Status) == "DRAINING" {
continue
}
taskSets = append(taskSets, &svc.TaskSets[i])
}
return taskSets, nil
}

// WaitServiceStable blocks until the ECS service is stable.
// It returns nil if the service is stable, otherwise it returns an error.
// Note: This function follow the implementation of the AWS CLI.
Expand Down
1 change: 1 addition & 0 deletions pkg/app/piped/platformprovider/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type ECS interface {
RegisterTaskDefinition(ctx context.Context, taskDefinition types.TaskDefinition) (*types.TaskDefinition, error)
RunTask(ctx context.Context, taskDefinition types.TaskDefinition, clusterArn string, launchType string, awsVpcConfiguration *config.ECSVpcConfiguration, tags []types.Tag) error
GetPrimaryTaskSet(ctx context.Context, service types.Service) (*types.TaskSet, error)
GetServiceTaskSets(ctx context.Context, service types.Service) ([]*types.TaskSet, error)
CreateTaskSet(ctx context.Context, service types.Service, taskDefinition types.TaskDefinition, targetGroup *types.LoadBalancer, scale int) (*types.TaskSet, error)
DeleteTaskSet(ctx context.Context, service types.Service, taskSetArn string) error
UpdateServicePrimaryTaskSet(ctx context.Context, service types.Service, taskSet types.TaskSet) (*types.TaskSet, error)
Expand Down
3 changes: 2 additions & 1 deletion pkg/app/piped/platformprovider/ecs/target_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ func loadTargetGroups(targetGroups config.ECSTargetGroups) (*types.LoadBalancer,
return nil, nil, fmt.Errorf("invalid primary target group definition given: %v", err)
}

canary := &types.LoadBalancer{}
var canary *types.LoadBalancer
if len(targetGroups.Canary) > 0 {
canaryDecoder := json.NewDecoder(bytes.NewReader(targetGroups.Canary))
canaryDecoder.DisallowUnknownFields()
canary = &types.LoadBalancer{}
err := canaryDecoder.Decode(canary)
if err != nil {
return nil, nil, fmt.Errorf("invalid canary target group definition given: %v", err)
Expand Down
104 changes: 104 additions & 0 deletions pkg/app/piped/platformprovider/ecs/target_groups_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2023 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecs

import (
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ecs/types"
"github.com/stretchr/testify/assert"

"github.com/pipe-cd/pipecd/pkg/config"
)

func TestLoadTargetGroup(t *testing.T) {
t.Parallel()

testcases := []struct {
name string
cfg config.ECSTargetGroups
expected []*types.LoadBalancer
expectedErr bool
}{
{
name: "no target group",
cfg: config.ECSTargetGroups{},
expected: []*types.LoadBalancer{nil, nil},
expectedErr: true,
},
{
name: "primary target group only",
cfg: config.ECSTargetGroups{
Primary: []byte(`{"targetGroupArn": "primary-target-group-arn", "containerName": "primary-container-name", "containerPort": 80}`),
},
expected: []*types.LoadBalancer{
{
TargetGroupArn: aws.String("primary-target-group-arn"),
ContainerName: aws.String("primary-container-name"),
ContainerPort: aws.Int32(80),
},
nil,
},
expectedErr: false,
},
{
name: "primary and canary target group",
cfg: config.ECSTargetGroups{
Primary: []byte(`{"targetGroupArn": "primary-target-group-arn", "containerName": "primary-container-name", "containerPort": 80}`),
Canary: []byte(`{"targetGroupArn": "canary-target-group-arn", "containerName": "canary-container-name", "containerPort": 80}`),
},
expected: []*types.LoadBalancer{
{
TargetGroupArn: aws.String("primary-target-group-arn"),
ContainerName: aws.String("primary-container-name"),
ContainerPort: aws.Int32(80),
},
{
TargetGroupArn: aws.String("canary-target-group-arn"),
ContainerName: aws.String("canary-container-name"),
ContainerPort: aws.Int32(80),
},
},
expectedErr: false,
},
{
name: "invalid primary target group",
cfg: config.ECSTargetGroups{
Primary: []byte(`{"invalidField": "primary-target-group-arn"}`),
},
expected: []*types.LoadBalancer{nil, nil},
expectedErr: true,
},
{
name: "invalid canary target group",
cfg: config.ECSTargetGroups{
Primary: []byte(`{"targetGroupArn": "primary-target-group-arn"`),
Canary: []byte(`{"invalidField": "canary-target-group-arn"}`),
},
expected: []*types.LoadBalancer{nil, nil},
expectedErr: true,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
primary, canary, err := loadTargetGroups(tc.cfg)
assert.Equal(t, tc.expectedErr, err != nil)
assert.Equal(t, tc.expected[0], primary)
assert.Equal(t, tc.expected[1], canary)
})
}
}
2 changes: 1 addition & 1 deletion tool/actions-plan-preview/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ require (
golang.org/x/net v0.7.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
gopkg.in/yaml.v3 v3.0.0 // indirect
)
3 changes: 2 additions & 1 deletion tool/actions-plan-preview/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscL
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=