diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 78a454e1bc6c..f84c3928edbd 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -60,6 +60,7 @@ func main() { op.GetClient(), op.EventRecorder, op.UnavailableOfferingsCache, + op.SSMCache, cloudProvider, op.SubnetProvider, op.SecurityGroupProvider, diff --git a/pkg/apis/v1/ec2nodeclass.go b/pkg/apis/v1/ec2nodeclass.go index 39f790284619..d74ce18b5e0c 100644 --- a/pkg/apis/v1/ec2nodeclass.go +++ b/pkg/apis/v1/ec2nodeclass.go @@ -489,16 +489,40 @@ func (in *EC2NodeClass) AMIFamily() string { if in.Spec.AMIFamily != nil { return *in.Spec.AMIFamily } - if term, ok := lo.Find(in.Spec.AMISelectorTerms, func(t AMISelectorTerm) bool { - return t.Alias != "" - }); ok { - return AMIFamilyFromAlias(term.Alias) + if alias := in.Alias(); alias != nil { + return alias.Family } // Unreachable: validation enforces that one of the above conditions must be met return AMIFamilyCustom } -func AMIFamilyFromAlias(alias string) string { +type Alias struct { + Family string + Version string +} + +const ( + AliasVersionLatest = "latest" +) + +func (a *Alias) String() string { + return fmt.Sprintf("%s@%s", a.Family, a.Version) +} + +func (in *EC2NodeClass) Alias() *Alias { + term, ok := lo.Find(in.Spec.AMISelectorTerms, func(term AMISelectorTerm) bool { + return term.Alias != "" + }) + if !ok { + return nil + } + return &Alias{ + Family: amiFamilyFromAlias(term.Alias), + Version: amiVersionFromAlias(term.Alias), + } +} + +func amiFamilyFromAlias(alias string) string { components := strings.Split(alias, "@") if len(components) != 2 { log.Fatalf("failed to parse AMI alias %q, invalid format", alias) @@ -518,7 +542,7 @@ func AMIFamilyFromAlias(alias string) string { return family } -func AMIVersionFromAlias(alias string) string { +func amiVersionFromAlias(alias string) string { components := strings.Split(alias, "@") if len(components) != 2 { log.Fatalf("failed to parse AMI alias %q, invalid format", alias) diff --git a/pkg/apis/v1/ec2nodeclass_conversion.go b/pkg/apis/v1/ec2nodeclass_conversion.go index 38cf590c8b4f..77b2329c3a90 100644 --- a/pkg/apis/v1/ec2nodeclass_conversion.go +++ b/pkg/apis/v1/ec2nodeclass_conversion.go @@ -52,15 +52,10 @@ func (in *EC2NodeClass) ConvertTo(ctx context.Context, to apis.Convertible) erro v1beta1enc.Spec.AMIFamily = lo.ToPtr(in.AMIFamily()) } - if term, ok := lo.Find(in.Spec.AMISelectorTerms, func(term AMISelectorTerm) bool { - return term.Alias != "" - }); ok { - version := AMIVersionFromAlias(term.Alias) - if version != "latest" { - v1beta1enc.Annotations = lo.Assign(v1beta1enc.Annotations, map[string]string{ - AnnotationAliasVersionCompatibilityKey: version, - }) - } + if alias := in.Alias(); alias != nil && alias.Version != AliasVersionLatest { + v1beta1enc.Annotations = lo.Assign(v1beta1enc.Annotations, map[string]string{ + AnnotationAliasVersionCompatibilityKey: alias.Version, + }) } in.Spec.convertTo(&v1beta1enc.Spec) diff --git a/pkg/apis/v1/zz_generated.deepcopy.go b/pkg/apis/v1/zz_generated.deepcopy.go index 627c14cfad13..802b4929776c 100644 --- a/pkg/apis/v1/zz_generated.deepcopy.go +++ b/pkg/apis/v1/zz_generated.deepcopy.go @@ -69,6 +69,21 @@ func (in *AMISelectorTerm) DeepCopy() *AMISelectorTerm { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Alias) DeepCopyInto(out *Alias) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Alias. +func (in *Alias) DeepCopy() *Alias { + if in == nil { + return nil + } + out := new(Alias) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BlockDevice) DeepCopyInto(out *BlockDevice) { *out = *in diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 8f8c98d48c7b..00c221fa7897 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -35,9 +35,9 @@ const ( AvailableIPAddressTTL = 5 * time.Minute // AvailableIPAddressTTL is time to drop AssociatePublicIPAddressTTL data if it is not updated within the TTL AssociatePublicIPAddressTTL = 5 * time.Minute - // SSMGetParametersByPathTTL is the time to drop SSM Parameters by path data. This only queries EKS Optimized AMI + // SSMCacheTTL is the time to drop SSM Parameters by path data. This only queries EKS Optimized AMI // releases, so we should expect this to be updated relatively infrequently. - SSMGetParametersByPathTTL = 24 * time.Hour + SSMCacheTTL = 24 * time.Hour ) const ( diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 18b19c89237c..53c2999ee2b2 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -19,6 +19,7 @@ import ( "github.com/awslabs/operatorpkg/controller" "github.com/awslabs/operatorpkg/status" + "github.com/patrickmn/go-cache" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/karpenter/pkg/cloudprovider" migration "sigs.k8s.io/karpenter/pkg/controllers/migration/resource" @@ -29,6 +30,7 @@ import ( nodeclasstermination "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/termination" controllersinstancetype "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype" controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing" + ssminvalidation "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/ssm/invalidation" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" "github.com/aws/aws-sdk-go/aws/session" @@ -40,7 +42,7 @@ import ( "sigs.k8s.io/karpenter/pkg/events" karpoptions "sigs.k8s.io/karpenter/pkg/operator/options" - "github.com/aws/karpenter-provider-aws/pkg/cache" + awscache "github.com/aws/karpenter-provider-aws/pkg/cache" "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption" nodeclaimgarbagecollection "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclaim/garbagecollection" nodeclaimtagging "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclaim/tagging" @@ -55,11 +57,25 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/providers/subnet" ) -func NewControllers(ctx context.Context, mgr manager.Manager, sess *session.Session, clk clock.Clock, kubeClient client.Client, recorder events.Recorder, - unavailableOfferings *cache.UnavailableOfferings, cloudProvider cloudprovider.CloudProvider, subnetProvider subnet.Provider, - securityGroupProvider securitygroup.Provider, instanceProfileProvider instanceprofile.Provider, instanceProvider instance.Provider, - pricingProvider pricing.Provider, amiProvider amifamily.Provider, launchTemplateProvider launchtemplate.Provider, instanceTypeProvider instancetype.Provider) []controller.Controller { - +func NewControllers( + ctx context.Context, + mgr manager.Manager, + sess *session.Session, + clk clock.Clock, + kubeClient client.Client, + recorder events.Recorder, + unavailableOfferings *awscache.UnavailableOfferings, + ssmCache *cache.Cache, + cloudProvider cloudprovider.CloudProvider, + subnetProvider subnet.Provider, + securityGroupProvider securitygroup.Provider, + instanceProfileProvider instanceprofile.Provider, + instanceProvider instance.Provider, + pricingProvider pricing.Provider, + amiProvider amifamily.Provider, + launchTemplateProvider launchtemplate.Provider, + instanceTypeProvider instancetype.Provider, +) []controller.Controller { controllers := []controller.Controller{ nodeclasshash.NewController(kubeClient), nodeclassstatus.NewController(kubeClient, subnetProvider, securityGroupProvider, amiProvider, instanceProfileProvider, launchTemplateProvider), @@ -68,6 +84,7 @@ func NewControllers(ctx context.Context, mgr manager.Manager, sess *session.Sess nodeclaimtagging.NewController(kubeClient, instanceProvider), controllerspricing.NewController(pricingProvider), controllersinstancetype.NewController(instanceTypeProvider), + ssminvalidation.NewController(ssmCache, amiProvider), status.NewController[*v1.EC2NodeClass](kubeClient, mgr.GetEventRecorderFor("karpenter")), } if !karpoptions.FromContext(ctx).DisableWebhook { diff --git a/pkg/controllers/providers/ssm/invalidation/controller.go b/pkg/controllers/providers/ssm/invalidation/controller.go new file mode 100644 index 000000000000..d57be1594f89 --- /dev/null +++ b/pkg/controllers/providers/ssm/invalidation/controller.go @@ -0,0 +1,95 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package invalidation + +import ( + "context" + "time" + + "github.com/awslabs/operatorpkg/singleton" + "github.com/patrickmn/go-cache" + "github.com/samber/lo" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/karpenter/pkg/operator/injection" + + v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" + "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" + "github.com/aws/karpenter-provider-aws/pkg/providers/ssm" +) + +// The SSM Invalidation controller is responsible for invalidating "latest" SSM parameters when they point to deprecated +// AMIs. This can occur when an EKS-optimized AMI with a regression is released, and the AMI team chooses to deprecate +// the AMI. Normally, SSM parameter cache entries expire after 24 hours to prevent a thundering herd upon a new AMI +// release, however Karpenter should react faster when an AMI is deprecated. This controller will ensure Karpenter +// reacts to AMI deprecations within it's polling period (30m). +type Controller struct { + cache *cache.Cache + amiProvider amifamily.Provider +} + +func NewController(ssmCache *cache.Cache, amiProvider amifamily.Provider) *Controller { + return &Controller{ + cache: ssmCache, + amiProvider: amiProvider, + } +} + +func (c *Controller) Name() string { + return "providers.ssm.invalidation" +} + +func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, c.Name()) + + amiIDsToParameters := map[string]ssm.Parameter{} + for _, item := range c.cache.Items() { + entry := item.Object.(ssm.CacheEntry) + if !entry.Parameter.IsMutable { + continue + } + amiIDsToParameters[entry.Value] = entry.Parameter + } + amis := []amifamily.AMI{} + for _, nodeClass := range lo.Map(lo.Keys(amiIDsToParameters), func(amiID string, _ int) *v1.EC2NodeClass { + return &v1.EC2NodeClass{ + Spec: v1.EC2NodeClassSpec{ + AMISelectorTerms: []v1.AMISelectorTerm{{ID: amiID}}, + }, + } + }) { + resolvedAMIs, err := c.amiProvider.List(ctx, nodeClass) + if err != nil { + return reconcile.Result{}, err + } + amis = append(amis, resolvedAMIs...) + } + for _, ami := range amis { + if !ami.Deprecated { + continue + } + parameter := amiIDsToParameters[ami.AmiID] + c.cache.Delete(parameter.CacheKey()) + } + return reconcile.Result{RequeueAfter: 30 * time.Minute}, nil +} + +func (c *Controller) Register(_ context.Context, m manager.Manager) error { + return controllerruntime.NewControllerManagedBy(m). + Named(c.Name()). + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) +} diff --git a/pkg/controllers/providers/ssm/invalidation/suite_test.go b/pkg/controllers/providers/ssm/invalidation/suite_test.go new file mode 100644 index 000000000000..110fcc302eaf --- /dev/null +++ b/pkg/controllers/providers/ssm/invalidation/suite_test.go @@ -0,0 +1,163 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package invalidation_test + +import ( + "context" + "testing" + "time" + + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/samber/lo" + coreoptions "sigs.k8s.io/karpenter/pkg/operator/options" + coretest "sigs.k8s.io/karpenter/pkg/test" + "sigs.k8s.io/karpenter/pkg/test/v1alpha1" + + "github.com/aws/karpenter-provider-aws/pkg/apis" + v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" + "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/ssm/invalidation" + "github.com/aws/karpenter-provider-aws/pkg/operator/options" + "github.com/aws/karpenter-provider-aws/pkg/providers/ssm" + "github.com/aws/karpenter-provider-aws/pkg/test" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + . "sigs.k8s.io/karpenter/pkg/test/expectations" + . "sigs.k8s.io/karpenter/pkg/utils/testing" +) + +var ctx context.Context +var stop context.CancelFunc +var env *coretest.Environment +var awsEnv *test.Environment +var invalidationController *invalidation.Controller + +func TestAWS(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "SSM Invalidation Controller") +} + +var _ = BeforeSuite(func() { + env = coretest.NewEnvironment(coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...)) + ctx = coreoptions.ToContext(ctx, coretest.Options()) + ctx = options.ToContext(ctx, test.Options()) + ctx, stop = context.WithCancel(ctx) + awsEnv = test.NewEnvironment(ctx, env) + + invalidationController = invalidation.NewController(awsEnv.SSMCache, awsEnv.AMIProvider) +}) + +var _ = AfterSuite(func() { + stop() + Expect(env.Stop()).To(Succeed(), "Failed to stop environment") +}) + +var _ = BeforeEach(func() { + ctx = coreoptions.ToContext(ctx, coretest.Options()) + ctx = options.ToContext(ctx, test.Options()) + awsEnv.Reset() +}) + +var _ = Describe("SSM Invalidation Controller", func() { + var nodeClass *v1.EC2NodeClass + BeforeEach(func() { + nodeClass = &v1.EC2NodeClass{ + Spec: v1.EC2NodeClassSpec{ + AMISelectorTerms: []v1.AMISelectorTerm{{Alias: "al2023@latest"}}, + }, + } + }) + It("shouldn't invalidate cache entries for non-deprecated AMIs", func() { + _, err := awsEnv.AMIProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + currentEntries := getSSMCacheEntries() + Expect(len(currentEntries)).To(Equal(4)) + awsEnv.EC2Cache.Flush() + ExpectSingletonReconciled(ctx, invalidationController) + awsEnv.SSMAPI.Reset() + _, err = awsEnv.AMIProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + updatedEntries := getSSMCacheEntries() + Expect(len(updatedEntries)).To(Equal(4)) + for parameter, amiID := range currentEntries { + updatedAMIID, ok := updatedEntries[parameter] + Expect(ok).To(BeTrue()) + Expect(updatedAMIID).To(Equal(amiID)) + } + }) + It("shouldn't invalidate cache entries for deprecated AMIs when the SSM parameter is immutable", func() { + nodeClass.Spec.AMISelectorTerms[0].Alias = "al2023@v20241024" + _, err := awsEnv.AMIProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + currentEntries := getSSMCacheEntries() + deprecateAMIs(lo.Values(currentEntries)...) + Expect(len(currentEntries)).To(Equal(4)) + awsEnv.EC2Cache.Flush() + ExpectSingletonReconciled(ctx, invalidationController) + awsEnv.SSMAPI.Reset() + _, err = awsEnv.AMIProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + updatedEntries := getSSMCacheEntries() + Expect(len(updatedEntries)).To(Equal(4)) + for parameter, amiID := range currentEntries { + updatedAMIID, ok := updatedEntries[parameter] + Expect(ok).To(BeTrue()) + Expect(updatedAMIID).To(Equal(amiID)) + } + }) + It("should invalidate cache entries for deprecated AMIs when the SSM parameter is mutable", func() { + _, err := awsEnv.AMIProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + currentEntries := getSSMCacheEntries() + deprecateAMIs(lo.Values(currentEntries)...) + Expect(len(currentEntries)).To(Equal(4)) + awsEnv.EC2Cache.Flush() + ExpectSingletonReconciled(ctx, invalidationController) + awsEnv.SSMAPI.Reset() + _, err = awsEnv.AMIProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + updatedEntries := getSSMCacheEntries() + Expect(len(updatedEntries)).To(Equal(4)) + for parameter, amiID := range currentEntries { + updatedAMIID, ok := updatedEntries[parameter] + Expect(ok).To(BeTrue()) + Expect(updatedAMIID).ToNot(Equal(amiID)) + } + }) +}) + +func getSSMCacheEntries() map[string]string { + entries := map[string]string{} + for _, item := range awsEnv.SSMCache.Items() { + entry := item.Object.(ssm.CacheEntry) + entries[entry.Parameter.Name] = entry.Value + } + return entries +} + +func deprecateAMIs(amiIDs ...string) { + awsEnv.EC2API.DescribeImagesOutput.Set(&ec2.DescribeImagesOutput{ + Images: lo.Map(amiIDs, func(amiID string, _ int) *ec2.Image { + return &ec2.Image{ + Name: lo.ToPtr(coretest.RandomName()), + ImageId: lo.ToPtr(amiID), + CreationDate: lo.ToPtr(awsEnv.Clock.Now().Add(-24 * time.Hour).Format(time.RFC3339)), + Architecture: lo.ToPtr("x86_64"), + DeprecationTime: lo.ToPtr(awsEnv.Clock.Now().Add(-12 * time.Hour).Format(time.RFC3339)), + } + }), + }) +} diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index fd7683fe8f4c..e248c1f96e3b 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -75,6 +75,7 @@ type Operator struct { Session *session.Session UnavailableOfferingsCache *awscache.UnavailableOfferings + SSMCache *cache.Cache EC2API ec2iface.EC2API SubnetProvider subnet.Provider SecurityGroupProvider securitygroup.Provider @@ -133,6 +134,8 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont } unavailableOfferingsCache := awscache.NewUnavailableOfferings() + ssmCache := cache.New(awscache.SSMCacheTTL, awscache.DefaultCleanupInterval) + subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval)) securityGroupProvider := securitygroup.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)) instanceProfileProvider := instanceprofile.NewDefaultProvider(*sess.Config.Region, iam.New(sess), cache.New(awscache.InstanceProfileTTL, awscache.DefaultCleanupInterval)) @@ -143,8 +146,8 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont *sess.Config.Region, ) versionProvider := version.NewDefaultProvider(operator.KubernetesInterface, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)) - ssmProvider := ssmp.NewDefaultProvider(ssm.New(sess), cache.New(awscache.SSMGetParametersByPathTTL, awscache.DefaultCleanupInterval)) - amiProvider := amifamily.NewDefaultProvider(versionProvider, ssmProvider, ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)) + ssmProvider := ssmp.NewDefaultProvider(ssm.New(sess), ssmCache) + amiProvider := amifamily.NewDefaultProvider(operator.Clock, versionProvider, ssmProvider, ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)) amiResolver := amifamily.NewResolver(amiProvider) launchTemplateProvider := launchtemplate.NewDefaultProvider( ctx, @@ -181,6 +184,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont Operator: operator, Session: sess, UnavailableOfferingsCache: unavailableOfferingsCache, + SSMCache: ssmCache, EC2API: ec2api, SubnetProvider: subnetProvider, SecurityGroupProvider: securityGroupProvider, diff --git a/pkg/providers/amifamily/al2.go b/pkg/providers/amifamily/al2.go index ebd385250dc7..f5d7162ecfe4 100644 --- a/pkg/providers/amifamily/al2.go +++ b/pkg/providers/amifamily/al2.go @@ -43,22 +43,25 @@ func (a AL2) DescribeImageQuery(ctx context.Context, ssmProvider ssm.Provider, k ids := map[string][]Variant{} for path, variants := range map[string][]Variant{ fmt.Sprintf("/aws/service/eks/optimized-ami/%s/amazon-linux-2/%s/image_id", k8sVersion, lo.Ternary( - amiVersion == AMIVersionLatest, + amiVersion == v1.AliasVersionLatest, "recommended", fmt.Sprintf("amazon-eks-node-%s-%s", k8sVersion, amiVersion), )): {VariantStandard}, fmt.Sprintf("/aws/service/eks/optimized-ami/%s/amazon-linux-2-arm64/%s/image_id", k8sVersion, lo.Ternary( - amiVersion == AMIVersionLatest, + amiVersion == v1.AliasVersionLatest, "recommended", fmt.Sprintf("amazon-eks-arm64-node-%s-%s", k8sVersion, amiVersion), )): {VariantStandard}, fmt.Sprintf("/aws/service/eks/optimized-ami/%s/amazon-linux-2-gpu/%s/image_id", k8sVersion, lo.Ternary( - amiVersion == AMIVersionLatest, + amiVersion == v1.AliasVersionLatest, "recommended", fmt.Sprintf("amazon-eks-gpu-node-%s-%s", k8sVersion, amiVersion), )): {VariantNeuron, VariantNvidia}, } { - imageID, err := ssmProvider.Get(ctx, path) + imageID, err := ssmProvider.Get(ctx, ssm.Parameter{ + Name: path, + IsMutable: amiVersion == v1.AliasVersionLatest, + }) if err != nil { continue } diff --git a/pkg/providers/amifamily/al2023.go b/pkg/providers/amifamily/al2023.go index bde9161b0be9..b3b353696963 100644 --- a/pkg/providers/amifamily/al2023.go +++ b/pkg/providers/amifamily/al2023.go @@ -43,7 +43,10 @@ func (a AL2023) DescribeImageQuery(ctx context.Context, ssmProvider ssm.Provider } { for _, variant := range variants { path := a.resolvePath(arch, string(variant), k8sVersion, amiVersion) - imageID, err := ssmProvider.Get(ctx, path) + imageID, err := ssmProvider.Get(ctx, ssm.Parameter{ + Name: path, + IsMutable: amiVersion == v1.AliasVersionLatest, + }) if err != nil { continue } @@ -68,7 +71,7 @@ func (a AL2023) DescribeImageQuery(ctx context.Context, ssmProvider ssm.Provider func (a AL2023) resolvePath(architecture, variant, k8sVersion, amiVersion string) string { name := lo.Ternary( - amiVersion == AMIVersionLatest, + amiVersion == v1.AliasVersionLatest, "recommended", fmt.Sprintf("amazon-eks-node-al2023-%s-%s-%s-%s", architecture, variant, k8sVersion, amiVersion), ) diff --git a/pkg/providers/amifamily/ami.go b/pkg/providers/amifamily/ami.go index 2d41eb8e2838..acbd01ee249e 100644 --- a/pkg/providers/amifamily/ami.go +++ b/pkg/providers/amifamily/ami.go @@ -26,6 +26,7 @@ import ( "github.com/mitchellh/hashstructure/v2" "github.com/patrickmn/go-cache" "github.com/samber/lo" + "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/log" v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" @@ -49,15 +50,17 @@ type DefaultProvider struct { cm *pretty.ChangeMonitor versionProvider version.Provider ssmProvider ssm.Provider + clk clock.Clock } -func NewDefaultProvider(versionProvider version.Provider, ssmProvider ssm.Provider, ec2api ec2iface.EC2API, cache *cache.Cache) *DefaultProvider { +func NewDefaultProvider(clock clock.Clock, versionProvider version.Provider, ssmProvider ssm.Provider, ec2api ec2iface.EC2API, cache *cache.Cache) *DefaultProvider { return &DefaultProvider{ cache: cache, ec2api: ec2api, cm: pretty.NewChangeMonitor(), versionProvider: versionProvider, ssmProvider: ssmProvider, + clk: clock, } } @@ -71,12 +74,10 @@ func (p *DefaultProvider) List(ctx context.Context, nodeClass *v1.EC2NodeClass) } // Discover deprecated AMIs if automatic AMI discovery and upgrade is enabled. This ensures we'll be able to // provision in the event of an EKS optimized AMI being deprecated. - includeDeprecated := lo.ContainsBy(nodeClass.Spec.AMISelectorTerms, func(term v1.AMISelectorTerm) bool { - if term.Alias == "" { - return false - } - return v1.AMIVersionFromAlias(term.Alias) == "latest" - }) + includeDeprecated := false + if alias := nodeClass.Alias(); alias != nil { + includeDeprecated = alias.Version == v1.AliasVersionLatest + } amis, err := p.amis(ctx, queries, includeDeprecated) if err != nil { return nil, err @@ -93,15 +94,12 @@ func (p *DefaultProvider) List(ctx context.Context, nodeClass *v1.EC2NodeClass) func (p *DefaultProvider) DescribeImageQueries(ctx context.Context, nodeClass *v1.EC2NodeClass) ([]DescribeImageQuery, error) { // Aliases are mutually exclusive, both on the term level and field level within a term. // This is enforced by a CEL validation, we will treat this as an invariant. - if term, ok := lo.Find(nodeClass.Spec.AMISelectorTerms, func(term v1.AMISelectorTerm) bool { - return term.Alias != "" - }); ok { + if alias := nodeClass.Alias(); alias != nil { kubernetesVersion, err := p.versionProvider.Get(ctx) if err != nil { return nil, fmt.Errorf("getting kubernetes version, %w", err) } - amiFamily := GetAMIFamily(v1.AMIFamilyFromAlias(term.Alias), nil) - query, err := amiFamily.DescribeImageQuery(ctx, p.ssmProvider, kubernetesVersion, v1.AMIVersionFromAlias(term.Alias)) + query, err := GetAMIFamily(alias.Family, nil).DescribeImageQuery(ctx, p.ssmProvider, kubernetesVersion, alias.Version) if err != nil { return []DescribeImageQuery{}, err } @@ -191,6 +189,7 @@ func (p *DefaultProvider) amis(ctx context.Context, queries []DescribeImageQuery AmiID: lo.FromPtr(image.ImageId), CreationDate: lo.FromPtr(image.CreationDate), Requirements: reqs, + Deprecated: p.IsDeprecated(image), } } } @@ -219,3 +218,13 @@ func MapToInstanceTypes(instanceTypes []*cloudprovider.InstanceType, amis []v1.A } return amiIDs } + +func (p *DefaultProvider) IsDeprecated(image *ec2.Image) bool { + if image.DeprecationTime == nil { + return false + } + if deprecationTime := lo.Must(time.Parse(time.RFC3339, *image.DeprecationTime)); deprecationTime.After(p.clk.Now()) { + return false + } + return true +} diff --git a/pkg/providers/amifamily/bottlerocket.go b/pkg/providers/amifamily/bottlerocket.go index a6bfe1c3bd15..76d6b49cb505 100644 --- a/pkg/providers/amifamily/bottlerocket.go +++ b/pkg/providers/amifamily/bottlerocket.go @@ -49,7 +49,10 @@ func (b Bottlerocket) DescribeImageQuery(ctx context.Context, ssmProvider ssm.Pr fmt.Sprintf("/aws/service/bottlerocket/aws-k8s-%s-nvidia/x86_64/%s/image_id", k8sVersion, trimmedAMIVersion): {VariantNeuron, VariantNvidia}, fmt.Sprintf("/aws/service/bottlerocket/aws-k8s-%s-nvidia/arm64/%s/image_id", k8sVersion, trimmedAMIVersion): {VariantNeuron, VariantNvidia}, } { - imageID, err := ssmProvider.Get(ctx, path) + imageID, err := ssmProvider.Get(ctx, ssm.Parameter{ + Name: path, + IsMutable: amiVersion == v1.AliasVersionLatest, + }) if err != nil { continue } diff --git a/pkg/providers/amifamily/types.go b/pkg/providers/amifamily/types.go index 95caa67985a2..330001ce6394 100644 --- a/pkg/providers/amifamily/types.go +++ b/pkg/providers/amifamily/types.go @@ -29,17 +29,12 @@ import ( v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" ) -const ( - // AMIVersionLatest is the version used in EKS aliases to represent the latest version. This maps to different - // values in the SSM path, depending on the AMI type (e.g. "recommended" for AL2/AL2023)). - AMIVersionLatest = "latest" -) - type AMI struct { Name string AmiID string CreationDate string Requirements scheduling.Requirements + Deprecated bool } type AMIs []AMI diff --git a/pkg/providers/amifamily/windows.go b/pkg/providers/amifamily/windows.go index f1194dcf9d57..f9f20f1ca539 100644 --- a/pkg/providers/amifamily/windows.go +++ b/pkg/providers/amifamily/windows.go @@ -47,7 +47,10 @@ type Windows struct { } func (w Windows) DescribeImageQuery(ctx context.Context, ssmProvider ssm.Provider, k8sVersion string, amiVersion string) (DescribeImageQuery, error) { - imageID, err := ssmProvider.Get(ctx, fmt.Sprintf("/aws/service/ami-windows-latest/Windows_Server-%s-English-%s-EKS_Optimized-%s/image_id", w.Version, v1.WindowsCore, k8sVersion)) + imageID, err := ssmProvider.Get(ctx, ssm.Parameter{ + Name: fmt.Sprintf("/aws/service/ami-windows-latest/Windows_Server-%s-English-%s-EKS_Optimized-%s/image_id", w.Version, v1.WindowsCore, k8sVersion), + IsMutable: true, + }) if err != nil { return DescribeImageQuery{}, fmt.Errorf(`failed to discover any AMIs for alias "windows%s@%s"`, w.Version, amiVersion) } diff --git a/pkg/providers/ssm/provider.go b/pkg/providers/ssm/provider.go index 586a13bf47a3..66caae786d22 100644 --- a/pkg/providers/ssm/provider.go +++ b/pkg/providers/ssm/provider.go @@ -19,7 +19,6 @@ import ( "fmt" "sync" - "github.com/aws/aws-sdk-go/service/ssm" "github.com/aws/aws-sdk-go/service/ssm/ssmiface" "github.com/patrickmn/go-cache" "github.com/samber/lo" @@ -27,7 +26,7 @@ import ( ) type Provider interface { - Get(context.Context, string) (string, error) + Get(context.Context, Parameter) (string, error) } type DefaultProvider struct { @@ -43,19 +42,20 @@ func NewDefaultProvider(ssmapi ssmiface.SSMAPI, cache *cache.Cache) *DefaultProv } } -func (p *DefaultProvider) Get(ctx context.Context, parameter string) (string, error) { +func (p *DefaultProvider) Get(ctx context.Context, parameter Parameter) (string, error) { p.Lock() defer p.Unlock() - if result, ok := p.cache.Get(parameter); ok { - return result.(string), nil + if entry, ok := p.cache.Get(parameter.CacheKey()); ok { + return entry.(CacheEntry).Value, nil } - result, err := p.ssmapi.GetParameterWithContext(ctx, &ssm.GetParameterInput{ - Name: lo.ToPtr(parameter), - }) + result, err := p.ssmapi.GetParameterWithContext(ctx, parameter.GetParameterInput()) if err != nil { - return "", fmt.Errorf("getting ssm parameter %q, %w", parameter, err) + return "", fmt.Errorf("getting ssm parameter %q, %w", parameter.Name, err) } - p.cache.SetDefault(parameter, lo.FromPtr(result.Parameter.Value)) - log.FromContext(ctx).WithValues("parameter", parameter, "value", result.Parameter.Value).Info("discovered ssm parameter") + p.cache.SetDefault(parameter.CacheKey(), CacheEntry{ + Parameter: parameter, + Value: lo.FromPtr(result.Parameter.Value), + }) + log.FromContext(ctx).WithValues("parameter", parameter.Name, "value", result.Parameter.Value).Info("discovered ssm parameter") return lo.FromPtr(result.Parameter.Value), nil } diff --git a/pkg/providers/ssm/types.go b/pkg/providers/ssm/types.go new file mode 100644 index 000000000000..818f97027b8c --- /dev/null +++ b/pkg/providers/ssm/types.go @@ -0,0 +1,43 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ssm + +import ( + "github.com/aws/aws-sdk-go/service/ssm" + "github.com/samber/lo" +) + +type Parameter struct { + Name string + // IsMutable indicates if the value associated with an SSM parameter is expected to change. An example of a mutable + // parameter would be any of the "latest" or "recommended" AMI parameters which are updated each time a new AMI is + // released. On the otherhand, we would consider a parameter parameter for a specific AMI version to be immutable. + IsMutable bool +} + +func (p *Parameter) GetParameterInput() *ssm.GetParameterInput { + return &ssm.GetParameterInput{ + Name: lo.ToPtr(p.Name), + } +} + +func (p *Parameter) CacheKey() string { + return p.Name +} + +type CacheEntry struct { + Parameter Parameter + Value string +} diff --git a/pkg/test/environment.go b/pkg/test/environment.go index ac4c357bfee1..8067f578c1d1 100644 --- a/pkg/test/environment.go +++ b/pkg/test/environment.go @@ -17,10 +17,12 @@ package test import ( "context" "net" + "time" "github.com/patrickmn/go-cache" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" + clock "k8s.io/utils/clock/testing" karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1" karpv1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1" @@ -80,9 +82,13 @@ type Environment struct { AMIResolver *amifamily.Resolver VersionProvider *version.DefaultProvider LaunchTemplateProvider *launchtemplate.DefaultProvider + + Clock *clock.FakeClock } func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment { + clock := &clock.FakeClock{} + // API ec2api := fake.NewEC2API() eksapi := fake.NewEKSAPI() @@ -110,7 +116,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment versionProvider := version.NewDefaultProvider(env.KubernetesInterface, kubernetesVersionCache) instanceProfileProvider := instanceprofile.NewDefaultProvider(fake.DefaultRegion, iamapi, instanceProfileCache) ssmProvider := ssmp.NewDefaultProvider(ssmapi, ssmCache) - amiProvider := amifamily.NewDefaultProvider(versionProvider, ssmProvider, ec2api, ec2Cache) + amiProvider := amifamily.NewDefaultProvider(clock, versionProvider, ssmProvider, ec2api, ec2Cache) amiResolver := amifamily.NewResolver(amiProvider) instanceTypesProvider := instancetype.NewDefaultProvider(fake.DefaultRegion, instanceTypeCache, ec2api, subnetProvider, unavailableOfferingsCache, pricingProvider) launchTemplateProvider := @@ -165,10 +171,14 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment AMIProvider: amiProvider, AMIResolver: amiResolver, VersionProvider: versionProvider, + + Clock: clock, } } func (env *Environment) Reset() { + env.Clock.SetTime(time.Now()) + env.EC2API.Reset() env.EKSAPI.Reset() env.SSMAPI.Reset()