From c4cf5cdee2274ad1460194de45ef7c12642e451a Mon Sep 17 00:00:00 2001 From: Tanvir Tatla Date: Thu, 21 Mar 2024 15:29:23 -0700 Subject: [PATCH] add cluster mover --- pkg/clustermanager/eksa_mover.go | 137 ++++++++++++++++++++++++++ pkg/clustermanager/eksa_mover_test.go | 126 +++++++++++++++++++++++ pkg/dependencies/factory.go | 21 ++++ pkg/dependencies/factory_test.go | 12 +++ 4 files changed, 296 insertions(+) create mode 100644 pkg/clustermanager/eksa_mover.go create mode 100644 pkg/clustermanager/eksa_mover_test.go diff --git a/pkg/clustermanager/eksa_mover.go b/pkg/clustermanager/eksa_mover.go new file mode 100644 index 000000000000..f2a72f66bb94 --- /dev/null +++ b/pkg/clustermanager/eksa_mover.go @@ -0,0 +1,137 @@ +package clustermanager + +import ( + "context" + "math" + "time" + + "github.com/go-logr/logr" + "github.com/pkg/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/aws/eks-anywhere/pkg/api/v1alpha1" + "github.com/aws/eks-anywhere/pkg/clients/kubernetes" + "github.com/aws/eks-anywhere/pkg/cluster" + "github.com/aws/eks-anywhere/pkg/retrier" +) + +// MoverOpt allows to customize a Mover on construction. +type MoverOpt func(*Mover) + +// Mover applies the cluster spec to the management cluster and waits +// until the changes are fully reconciled. +type Mover struct { + log logr.Logger + clientFactory ClientFactory + moveClusterTimeout time.Duration + retryBackOff time.Duration +} + +// NewMover builds an Mover. +func NewMover(log logr.Logger, clientFactory ClientFactory, opts ...MoverOpt) *Mover { + m := &Mover{ + log: log, + clientFactory: clientFactory, + moveClusterTimeout: applyClusterSpecTimeout, + retryBackOff: retryBackOff, + } + + for _, opt := range opts { + opt(m) + } + + return m +} + +// WithMoverNoTimeouts disables the timeout for all the waits and retries in management upgrader. +func WithMoverNoTimeouts() MoverOpt { + return func(a *Mover) { + maxTime := time.Duration(math.MaxInt64) + a.moveClusterTimeout = maxTime + } +} + +// WithMoverApplyClusterTimeout allows to configure how long the mover retries +// to apply the objects in case of failure. +// Generally only used in tests. +func WithMoverApplyClusterTimeout(timeout time.Duration) MoverOpt { + return func(m *Mover) { + m.moveClusterTimeout = timeout + } +} + +// WithMoverRetryBackOff allows to configure how long the mover waits between requests +// to update the cluster spec objects and check the status of the Cluster. +// Generally only used in tests. +func WithMoverRetryBackOff(backOff time.Duration) MoverOpt { + return func(m *Mover) { + m.retryBackOff = backOff + } +} + +// Move applies the cluster's namespace and spec without checking for reconcile conditions. +func (m *Mover) Move(ctx context.Context, spec *cluster.Spec, fromClient, toClient kubernetes.Client) error { + m.log.V(3).Info("Moving the cluster object") + err := retrier.New( + m.moveClusterTimeout, + retrier.WithRetryPolicy(retrier.BackOffPolicy(m.retryBackOff)), + ).Retry(func() error { + // read the cluster from bootstrap + cluster := &v1alpha1.Cluster{} + if err := fromClient.Get(ctx, spec.Cluster.Name, spec.Cluster.Namespace, cluster); err != nil { + return errors.Wrapf(err, "reading cluster from source") + } + + // pause cluster on bootstrap + cluster.PauseReconcile() + if err := fromClient.Update(ctx, cluster); err != nil { + return errors.Wrapf(err, "updating cluster on source") + } + + if err := moveClusterResource(ctx, cluster, toClient); err != nil { + return err + } + + if err := moveChildObjects(ctx, spec, fromClient, toClient); err != nil { + return err + } + + return nil + }) + + return err +} + +func moveClusterResource(ctx context.Context, cluster *v1alpha1.Cluster, client kubernetes.Client) error { + cluster.ResourceVersion = "" + cluster.UID = "" + + // move eksa cluster + if err := client.Create(ctx, cluster); err != nil && !apierrors.IsAlreadyExists(err) { + return errors.Wrapf(err, "moving cluster %s", cluster.Name) + } + + return nil +} + +func moveChildObjects(ctx context.Context, spec *cluster.Spec, fromClient, toClient kubernetes.Client) error { + // read and move child objects + for _, child := range spec.ChildObjects() { + obj := &unstructured.Unstructured{} + obj.SetGroupVersionKind(child.GetObjectKind().GroupVersionKind()) + if err := fromClient.Get(ctx, child.GetName(), child.GetNamespace(), obj); err != nil { + return errors.Wrapf(err, "reading child object %s %s", child.GetObjectKind().GroupVersionKind().Kind, child.GetName()) + } + + obj.SetResourceVersion("") + obj.SetUID("") + obj.SetOwnerReferences(nil) + + if err := toClient.Create(ctx, obj); err != nil && !apierrors.IsAlreadyExists(err) { + return errors.Wrapf(err, "moving child object %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName()) + } + } + + return nil +} diff --git a/pkg/clustermanager/eksa_mover_test.go b/pkg/clustermanager/eksa_mover_test.go new file mode 100644 index 000000000000..e151358f9d29 --- /dev/null +++ b/pkg/clustermanager/eksa_mover_test.go @@ -0,0 +1,126 @@ +package clustermanager_test + +import ( + "context" + "testing" + "time" + + "github.com/go-logr/logr" + "github.com/golang/mock/gomock" + "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/aws/eks-anywhere/internal/test" + "github.com/aws/eks-anywhere/pkg/clients/kubernetes" + "github.com/aws/eks-anywhere/pkg/cluster" + "github.com/aws/eks-anywhere/pkg/clustermanager" + "github.com/aws/eks-anywhere/pkg/clustermanager/mocks" + "github.com/aws/eks-anywhere/pkg/controller/clientutil" + "github.com/aws/eks-anywhere/pkg/types" +) + +type moverTest struct { + gomega.Gomega + tb testing.TB + clientFactory *mocks.MockClientFactory + ctx context.Context + spec *cluster.Spec + fromClient kubernetes.Client + toClient kubernetes.Client + log logr.Logger + mgmtCluster *types.Cluster + bootstrap *types.Cluster +} + +func newMoverTest(tb testing.TB) *moverTest { + ctrl := gomock.NewController(tb) + return &moverTest{ + tb: tb, + Gomega: gomega.NewWithT(tb), + clientFactory: mocks.NewMockClientFactory(ctrl), + ctx: context.Background(), + spec: test.VSphereClusterSpec(tb, tb.Name()), + log: test.NewNullLogger(), + bootstrap: &types.Cluster{ + KubeconfigFile: "bootstrap-config", + }, + mgmtCluster: &types.Cluster{ + KubeconfigFile: "my-config", + }, + } +} + +func (a *moverTest) buildClients(fromObjs, toObjs []kubernetes.Object) { + a.fromClient = test.NewFakeKubeClient(clientutil.ObjectsToClientObjects(fromObjs)...) + a.toClient = test.NewFakeKubeClient(clientutil.ObjectsToClientObjects(toObjs)...) +} + +func TestMoverSuccess(t *testing.T) { + tt := newMoverTest(t) + objs := tt.spec.ClusterAndChildren() + tt.buildClients(objs, nil) + m := clustermanager.NewMover(tt.log, tt.clientFactory, + clustermanager.WithMoverRetryBackOff(time.Millisecond), + clustermanager.WithMoverNoTimeouts(), + ) + + tt.Expect(m.Move(tt.ctx, tt.spec, tt.fromClient, tt.toClient)).To(gomega.Succeed()) + + for _, obj := range objs { + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(obj.GetObjectKind().GroupVersionKind()) + tt.Expect(tt.toClient.Get(tt.ctx, obj.GetName(), obj.GetNamespace(), u)).To(gomega.Succeed()) + original, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + tt.Expect(err).To(gomega.Succeed()) + tt.Expect(u.Object["spec"]).To(gomega.BeComparableTo(original["spec"])) + } +} + +func TestMoverFailReadCluster(t *testing.T) { + tt := newMoverTest(t) + tt.buildClients(nil, nil) + m := clustermanager.NewMover(tt.log, tt.clientFactory, + clustermanager.WithMoverRetryBackOff(time.Millisecond), + clustermanager.WithMoverApplyClusterTimeout(time.Millisecond), + ) + err := m.Move(tt.ctx, tt.spec, tt.fromClient, tt.toClient) + + tt.Expect(err).To(gomega.MatchError(gomega.ContainSubstring("reading cluster from source"))) +} + +func TestMoverFailGetChildren(t *testing.T) { + tt := newMoverTest(t) + objs := []kubernetes.Object{tt.spec.Cluster} + tt.buildClients(objs, nil) + m := clustermanager.NewMover(tt.log, tt.clientFactory, + clustermanager.WithMoverRetryBackOff(time.Millisecond), + clustermanager.WithMoverApplyClusterTimeout(time.Millisecond), + ) + + err := m.Move(tt.ctx, tt.spec, tt.fromClient, tt.toClient) + tt.Expect(err).To(gomega.MatchError(gomega.ContainSubstring("reading child object"))) +} + +func TestMoverAlreadyMoved(t *testing.T) { + tt := newMoverTest(t) + objs := tt.spec.ClusterAndChildren() + tt.buildClients(objs, objs) + m := clustermanager.NewMover(tt.log, tt.clientFactory, + clustermanager.WithMoverRetryBackOff(time.Millisecond), + clustermanager.WithMoverApplyClusterTimeout(time.Millisecond), + ) + + err := m.Move(tt.ctx, tt.spec, tt.fromClient, tt.toClient) + tt.Expect(err).To(gomega.Succeed()) + + for _, obj := range objs { + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(obj.GetObjectKind().GroupVersionKind()) + tt.Expect(tt.toClient.Get(tt.ctx, obj.GetName(), obj.GetNamespace(), u)).To(gomega.Succeed()) + original, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + tt.Expect(err).To(gomega.Succeed()) + // the entire object including metadata/status should be equal if the object already exists in dst + tt.Expect(u.Object).To(gomega.BeComparableTo(original)) + } +} diff --git a/pkg/dependencies/factory.go b/pkg/dependencies/factory.go index 66966a432b0d..3f56d8f64f02 100644 --- a/pkg/dependencies/factory.go +++ b/pkg/dependencies/factory.go @@ -115,6 +115,7 @@ type Dependencies struct { EksaInstaller *clustermanager.EKSAInstaller DeleteClusterDefaulter cli.DeleteClusterDefaulter ClusterDeleter clustermanager.Deleter + ClusterMover *clustermanager.Mover } // KubeClients defines super struct that exposes all behavior. @@ -1216,6 +1217,26 @@ func (f *Factory) WithClusterDeleter() *Factory { return f } +// WithClusterMover builds a cluster mover. +func (f *Factory) WithClusterMover() *Factory { + f.WithLogger().WithUnAuthKubeClient().WithLogger() + + f.buildSteps = append(f.buildSteps, func(_ context.Context) error { + var opts []clustermanager.MoverOpt + if f.config.noTimeouts { + opts = append(opts, clustermanager.WithMoverNoTimeouts()) + } + + f.dependencies.ClusterMover = clustermanager.NewMover( + f.dependencies.Logger, + f.dependencies.UnAuthKubeClient, + opts..., + ) + return nil + }) + return f +} + // WithValidatorClients builds KubeClients. func (f *Factory) WithValidatorClients() *Factory { f.WithKubectl().WithUnAuthKubeClient() diff --git a/pkg/dependencies/factory_test.go b/pkg/dependencies/factory_test.go index 626e412574df..d8d0e1c9bb81 100644 --- a/pkg/dependencies/factory_test.go +++ b/pkg/dependencies/factory_test.go @@ -645,6 +645,18 @@ func TestFactoryBuildWithClusterDeleterNoTimeout(t *testing.T) { tt.Expect(deps.ClusterApplier).NotTo(BeNil()) } +func TestFactoryBuildWithClusterMoverNoTimeout(t *testing.T) { + tt := newTest(t, vsphere) + deps, err := dependencies.NewFactory(). + WithLocalExecutables(). + WithNoTimeouts(). + WithClusterMover(). + Build(context.Background()) + + tt.Expect(err).To(BeNil()) + tt.Expect(deps.ClusterApplier).NotTo(BeNil()) +} + func TestFactoryBuildWithAwsIamAuthNoTimeout(t *testing.T) { tt := newTest(t, vsphere) deps, err := dependencies.NewFactory().