Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tatlat committed Mar 15, 2024
1 parent d9ac93a commit d0ff4de
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 38 deletions.
53 changes: 17 additions & 36 deletions pkg/clustermanager/eksa_mover.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (

"github.com/go-logr/logr"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/aws/eks-anywhere/pkg/api/v1alpha1"
"github.com/aws/eks-anywhere/pkg/clients/kubernetes"
Expand All @@ -24,20 +22,19 @@ type MoverOpt func(*Mover)
// Mover applies the cluster spec to the management cluster and waits
// until the changes are fully reconciled.
type Mover struct {
log logr.Logger
clientFactory ClientFactory
applyClusterTimeout, waitForFailureMessage time.Duration
retryBackOff time.Duration
log logr.Logger
clientFactory ClientFactory
moveClusterTimeout time.Duration
retryBackOff time.Duration
}

// NewMover builds an Mover.
func NewMover(log logr.Logger, clientFactory ClientFactory, opts ...MoverOpt) *Mover {
m := &Mover{
log: log,
clientFactory: clientFactory,
applyClusterTimeout: applyClusterSpecTimeout,
waitForFailureMessage: waitForFailureMessageErrorTimeout,
retryBackOff: retryBackOff,
log: log,
clientFactory: clientFactory,
moveClusterTimeout: applyClusterSpecTimeout,
retryBackOff: retryBackOff,
}

for _, opt := range opts {
Expand All @@ -51,8 +48,7 @@ func NewMover(log logr.Logger, clientFactory ClientFactory, opts ...MoverOpt) *M
func WithMoverNoTimeouts() MoverOpt {
return func(a *Mover) {
maxTime := time.Duration(math.MaxInt64)
a.applyClusterTimeout = maxTime
a.waitForFailureMessage = maxTime
a.moveClusterTimeout = maxTime
}
}

Expand All @@ -61,7 +57,7 @@ func WithMoverNoTimeouts() MoverOpt {
// Generally only used in tests.
func WithMoverApplyClusterTimeout(timeout time.Duration) MoverOpt {
return func(m *Mover) {
m.applyClusterTimeout = timeout
m.moveClusterTimeout = timeout
}
}

Expand All @@ -78,7 +74,7 @@ func WithMoverRetryBackOff(backOff time.Duration) MoverOpt {
func (m *Mover) Move(ctx context.Context, spec *cluster.Spec, fromCluster, toCluster *types.Cluster) error {
m.log.V(3).Info("Moving the cluster object")
err := retrier.New(
m.applyClusterTimeout,
m.moveClusterTimeout,
retrier.WithRetryPolicy(retrier.BackOffPolicy(m.retryBackOff)),
).Retry(func() error {
var err error
Expand All @@ -100,15 +96,15 @@ func (m *Mover) Move(ctx context.Context, spec *cluster.Spec, fromCluster, toClu

// pause cluster on bootstrap
cluster.PauseReconcile()
if err := fromClient.ApplyServerSide(ctx, defaultFieldManager, cluster, kubernetes.ApplyServerSideOptions{ForceOwnership: true}); err != nil {
return errors.Wrapf(err, "getting cluster from %s", fromCluster.Name)
if err := fromClient.Update(ctx, cluster); err != nil {
return errors.Wrapf(err, "updating cluster on %s", fromCluster.Name)
}

if err := moveClusterResource(ctx, cluster, toClient); err != nil {
return err
}

if err := moveChildObjects(ctx, spec, fromClient, toClient); err != nil {
if err := moveChildObjects(ctx, *spec, fromClient, toClient); err != nil {
return err
}

Expand All @@ -119,33 +115,18 @@ func (m *Mover) Move(ctx context.Context, spec *cluster.Spec, fromCluster, toClu
}

func moveClusterResource(ctx context.Context, cluster *v1alpha1.Cluster, client kubernetes.Client) error {
// create namespace on mgmt
ns := &v1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: cluster.Namespace,
},
}

if err := client.Create(ctx, ns); err != nil && !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "creating cluster namespace")
}

cluster.ResourceVersion = ""
cluster.UID = ""

// move eksa cluster
if err := client.ApplyServerSide(ctx, defaultFieldManager, cluster, kubernetes.ApplyServerSideOptions{ForceOwnership: true}); err != nil {
if err := client.Create(ctx, cluster); err != nil && !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "moving cluster %s", cluster.Name)
}

return nil
}

func moveChildObjects(ctx context.Context, spec *cluster.Spec, fromClient, toClient kubernetes.Client) error {
func moveChildObjects(ctx context.Context, spec cluster.Spec, fromClient, toClient kubernetes.Client) error {
// read and move child objects
for _, child := range spec.ChildObjects() {
if err := fromClient.Get(ctx, child.GetName(), child.GetNamespace(), child); err != nil {
Expand All @@ -156,7 +137,7 @@ func moveChildObjects(ctx context.Context, spec *cluster.Spec, fromClient, toCli
child.SetUID("")
child.SetOwnerReferences(nil)

if err := toClient.ApplyServerSide(ctx, defaultFieldManager, child, kubernetes.ApplyServerSideOptions{ForceOwnership: true}); err != nil {
if err := toClient.Create(ctx, child); err != nil && !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "moving child object %s %s", child.GetObjectKind().GroupVersionKind().Kind, child.GetName())
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/clustermanager/eksa_mover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func TestMoverSuccess(t *testing.T) {
)

tt.Expect(m.Move(tt.ctx, tt.spec, tt.bootstrap, tt.mgmtCluster)).To(gomega.Succeed())

for _, obj := range objs {
tt.Expect(tt.toClient.Get(tt.ctx, obj.GetName(), obj.GetNamespace(), obj)).To(gomega.Succeed())
}
}

func TestMoverFailBuildFromClient(t *testing.T) {
Expand Down Expand Up @@ -112,7 +116,7 @@ func TestMoverFailCreateNamespace(t *testing.T) {
tt.Expect(err).To(gomega.HaveOccurred())
}

func TestMoverGetChildren(t *testing.T) {
func TestMoverFailGetChildren(t *testing.T) {
tt := newMoverTest(t)
objs := []kubernetes.Object{tt.spec.Cluster}
tt.buildClient(nil, nil, objs...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/dependencies/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1224,7 +1224,7 @@ func (f *Factory) WithClusterMover() *Factory {
f.buildSteps = append(f.buildSteps, func(_ context.Context) error {
var opts []clustermanager.MoverOpt
if f.config.noTimeouts {
opts = append(opts, clustermanager.WithMoverApplyClusterTimeout(time.Hour))
opts = append(opts, clustermanager.WithMoverNoTimeouts())
}

f.dependencies.ClusterMover = clustermanager.NewMover(
Expand Down

0 comments on commit d0ff4de

Please sign in to comment.