diff --git a/pkg/apis/mysql/v1alpha1/cluster.go b/pkg/apis/mysql/v1alpha1/cluster.go index cccacd5a5..65e25ab21 100644 --- a/pkg/apis/mysql/v1alpha1/cluster.go +++ b/pkg/apis/mysql/v1alpha1/cluster.go @@ -259,7 +259,7 @@ func (c *MysqlCluster) GetBackupCandidate() string { return node.Name } } - glog.Warning("No healthy slave node found so returns the master node: %s.", c.GetPodHostname(0)) + glog.Warningf("No healthy slave node found so returns the master node: %s.", c.GetPodHostname(0)) return c.GetPodHostname(0) } diff --git a/pkg/backupfactory/backupfactory.go b/pkg/backupfactory/backupfactory.go index dcdc7d55b..5273bea8c 100644 --- a/pkg/backupfactory/backupfactory.go +++ b/pkg/backupfactory/backupfactory.go @@ -88,7 +88,7 @@ func (f *bFactory) Sync(ctx context.Context) error { } func (f *bFactory) getJobName() string { - return fmt.Sprintf("%s-%s-backup", f.backup.Name, f.backup.Spec.ClusterName) + return fmt.Sprintf("%s-backupjob", f.backup.Name) } func (f *bFactory) ensurePodSpec(in core.PodSpec) core.PodSpec { diff --git a/pkg/controller/clustercontroller/backups.go b/pkg/controller/clustercontroller/backups.go index ff17abf0c..8ede28cbe 100644 --- a/pkg/controller/clustercontroller/backups.go +++ b/pkg/controller/clustercontroller/backups.go @@ -60,7 +60,7 @@ func (c *Controller) registerClusterInBackupCron(cluster *api.MysqlCluster) erro schedule, err := cron.Parse(cluster.Spec.BackupSchedule) if err != nil { - return fmt.Errorf("fail to parse schedule: %s", err) + return fmt.Errorf("failed to parse schedule: %s", err) } lockJobRegister.Lock() diff --git a/pkg/mysqlcluster/orc_reconciliation.go b/pkg/mysqlcluster/orc_reconciliation.go index adf6047dd..1b002581a 100644 --- a/pkg/mysqlcluster/orc_reconciliation.go +++ b/pkg/mysqlcluster/orc_reconciliation.go @@ -72,23 +72,18 @@ func (f *cFactory) SyncOrchestratorStatus(ctx context.Context) error { } func (f *cFactory) updateStatusFromOrc(insts []orc.Instance) { - for i := 0; i < int(f.cluster.Spec.Replicas); i++ { - host := f.cluster.GetPodHostname(i) - // select instance from orchestrator - var node *orc.Instance - for _, inst := range insts { - if inst.Key.Hostname == host { - node = &inst - break + updatedNodes := []string{} + for _, node := range insts { + host := node.Key.Hostname + updatedNodes = append(updatedNodes, host) + + if !node.IsUpToDate { + if !node.IsLastCheckValid { + f.updateNodeCondition(host, api.NodeConditionLagged, core.ConditionUnknown) + f.updateNodeCondition(host, api.NodeConditionReplicating, core.ConditionUnknown) + f.updateNodeCondition(host, api.NodeConditionMaster, core.ConditionUnknown) } - } - - if node == nil { - f.updateNodeCondition(host, api.NodeConditionLagged, core.ConditionUnknown) - f.updateNodeCondition(host, api.NodeConditionReplicating, core.ConditionUnknown) - f.updateNodeCondition(host, api.NodeConditionMaster, core.ConditionUnknown) - - return + continue } maxSlaveLatency := defaultMaxSlaveLatency @@ -116,6 +111,8 @@ func (f *cFactory) updateStatusFromOrc(insts []orc.Instance) { f.updateNodeCondition(host, api.NodeConditionMaster, core.ConditionFalse) } } + + f.removeNodeConditionNotIn(updatedNodes) } func (f *cFactory) updateStatusForRecoveries(recoveries []orc.TopologyRecovery) { @@ -230,3 +227,20 @@ func (f *cFactory) updateNodeCondition(host string, cType api.NodeConditionType, } } } + +func (f *cFactory) removeNodeConditionNotIn(hosts []string) { + for _, ns := range f.cluster.Status.Nodes { + updated := false + for _, h := range hosts { + if h == ns.Name { + updated = true + } + } + + if !updated { + f.updateNodeCondition(ns.Name, api.NodeConditionLagged, core.ConditionUnknown) + f.updateNodeCondition(ns.Name, api.NodeConditionReplicating, core.ConditionUnknown) + f.updateNodeCondition(ns.Name, api.NodeConditionMaster, core.ConditionUnknown) + } + } +} diff --git a/pkg/mysqlcluster/orc_reconciliation_test.go b/pkg/mysqlcluster/orc_reconciliation_test.go index 9db03ca9e..ad7e9400f 100644 --- a/pkg/mysqlcluster/orc_reconciliation_test.go +++ b/pkg/mysqlcluster/orc_reconciliation_test.go @@ -83,7 +83,7 @@ var _ = Describe("Mysql cluster reconcilation", func() { It("should update status", func() { orcClient.AddInstance("asd.default", cluster.GetPodHostname(0), - true, -1, false) + true, -1, false, true) orcClient.AddRecoveries("asd.default", 1, true) factory.createPod("asd-mysql-0") @@ -104,7 +104,7 @@ var _ = Describe("Mysql cluster reconcilation", func() { It("should have pending recoveries", func() { orcClient.AddInstance("asd.default", cluster.GetPodHostname(0), - true, -1, false) + true, -1, false, true) orcClient.AddRecoveries("asd.default", 11, false) Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) Expect(getCCond( @@ -114,7 +114,7 @@ var _ = Describe("Mysql cluster reconcilation", func() { It("should have pending recoveries but cluster not ready enough", func() { orcClient.AddInstance("asd.default", cluster.GetPodHostname(0), - true, -1, false) + true, -1, false, true) orcClient.AddRecoveries("asd.default", 111, false) cluster.UpdateStatusCondition(api.ClusterConditionReady, core.ConditionTrue, "", "") Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) @@ -126,7 +126,7 @@ var _ = Describe("Mysql cluster reconcilation", func() { It("should have pending recoveries that will be recovered", func() { orcClient.AddInstance("asd.default", cluster.GetPodHostname(0), - true, -1, false) + true, -1, false, true) orcClient.AddRecoveries("asd.default", 112, false) min20, _ := time.ParseDuration("-20m") cluster.Status.Conditions = []api.ClusterCondition{ @@ -148,6 +148,31 @@ var _ = Describe("Mysql cluster reconcilation", func() { Expect(event).To(ContainSubstring("RecoveryAcked")) }) + It("node not uptodate in orc", func() { + orcClient.AddInstance("asd.default", cluster.GetPodHostname(0), + true, -1, false, false) + Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) + + Expect(cluster.Status.Nodes[0].GetCondition(api.NodeConditionMaster).Status).To( + Equal(core.ConditionUnknown)) + }) + + It("node not in orc", func() { + orcClient.AddInstance("asd.default", cluster.GetPodHostname(0), + true, -1, false, true) + Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) + + Expect(cluster.Status.Nodes[0].GetCondition(api.NodeConditionMaster).Status).To( + Equal(core.ConditionTrue)) + + orcClient.RemoveInstance("asd.default", cluster.GetPodHostname(0)) + Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) + + Expect(cluster.Status.Nodes[0].GetCondition(api.NodeConditionMaster).Status).To( + Equal(core.ConditionUnknown)) + + }) + }) }) }) diff --git a/pkg/util/orchestrator/fake/client.go b/pkg/util/orchestrator/fake/client.go index 5abd27ac9..333821ae6 100644 --- a/pkg/util/orchestrator/fake/client.go +++ b/pkg/util/orchestrator/fake/client.go @@ -34,7 +34,7 @@ func New() *FakeOrc { return &FakeOrc{} } -func (o *FakeOrc) AddInstance(cluster string, host string, master bool, sls int64, slaveR bool) { +func (o *FakeOrc) AddInstance(cluster, host string, master bool, sls int64, slaveR, upToDate bool) { valid := true if sls < 0 { valid = false @@ -52,6 +52,8 @@ func (o *FakeOrc) AddInstance(cluster string, host string, master bool, sls int6 ClusterName: cluster, Slave_SQL_Running: slaveR, Slave_IO_Running: slaveR, + IsUpToDate: upToDate, + IsLastCheckValid: upToDate, } if o.Clusters == nil { o.Clusters = make(map[string][]Instance) @@ -63,6 +65,25 @@ func (o *FakeOrc) AddInstance(cluster string, host string, master bool, sls int6 o.Clusters[cluster] = []Instance{inst} } +func (o *FakeOrc) RemoveInstance(cluster, host string) { + instances, ok := o.Clusters[cluster] + if !ok { + return + } + index := -1 + for i, inst := range instances { + if inst.Key.Hostname == host { + index = i + } + } + + if index == -1 { + return + } + + o.Clusters[cluster] = append(instances[:index], instances[index+1:]...) +} + func (o *FakeOrc) AddRecoveries(cluster string, id int64, ack bool) { tr := TopologyRecovery{ Id: id,