Skip to content

Commit

Permalink
Nodes that are not present in orc set to unknown.
Browse files Browse the repository at this point in the history
  • Loading branch information
AMecea committed Jun 12, 2018
1 parent 4ca37ce commit 354f7b1
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/mysql/v1alpha1/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (c *MysqlCluster) GetBackupCandidate() string {
return node.Name
}
}
glog.Warning("No healthy slave node found so returns the master node: %s.", c.GetPodHostname(0))
glog.Warningf("No healthy slave node found so returns the master node: %s.", c.GetPodHostname(0))
return c.GetPodHostname(0)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/backupfactory/backupfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (f *bFactory) Sync(ctx context.Context) error {
}

func (f *bFactory) getJobName() string {
return fmt.Sprintf("%s-%s-backup", f.backup.Name, f.backup.Spec.ClusterName)
return fmt.Sprintf("%s-backupjob", f.backup.Name)
}

func (f *bFactory) ensurePodSpec(in core.PodSpec) core.PodSpec {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/clustercontroller/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *Controller) registerClusterInBackupCron(cluster *api.MysqlCluster) erro

schedule, err := cron.Parse(cluster.Spec.BackupSchedule)
if err != nil {
return fmt.Errorf("fail to parse schedule: %s", err)
return fmt.Errorf("failed to parse schedule: %s", err)
}

lockJobRegister.Lock()
Expand Down
46 changes: 30 additions & 16 deletions pkg/mysqlcluster/orc_reconciliation.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,18 @@ func (f *cFactory) SyncOrchestratorStatus(ctx context.Context) error {
}

func (f *cFactory) updateStatusFromOrc(insts []orc.Instance) {
for i := 0; i < int(f.cluster.Spec.Replicas); i++ {
host := f.cluster.GetPodHostname(i)
// select instance from orchestrator
var node *orc.Instance
for _, inst := range insts {
if inst.Key.Hostname == host {
node = &inst
break
updatedNodes := []string{}
for _, node := range insts {
host := node.Key.Hostname
updatedNodes = append(updatedNodes, host)

if !node.IsUpToDate {
if !node.IsLastCheckValid {
f.updateNodeCondition(host, api.NodeConditionLagged, core.ConditionUnknown)
f.updateNodeCondition(host, api.NodeConditionReplicating, core.ConditionUnknown)
f.updateNodeCondition(host, api.NodeConditionMaster, core.ConditionUnknown)
}
}

if node == nil {
f.updateNodeCondition(host, api.NodeConditionLagged, core.ConditionUnknown)
f.updateNodeCondition(host, api.NodeConditionReplicating, core.ConditionUnknown)
f.updateNodeCondition(host, api.NodeConditionMaster, core.ConditionUnknown)

return
continue
}

maxSlaveLatency := defaultMaxSlaveLatency
Expand Down Expand Up @@ -116,6 +111,8 @@ func (f *cFactory) updateStatusFromOrc(insts []orc.Instance) {
f.updateNodeCondition(host, api.NodeConditionMaster, core.ConditionFalse)
}
}

f.removeNodeConditionNotIn(updatedNodes)
}

func (f *cFactory) updateStatusForRecoveries(recoveries []orc.TopologyRecovery) {
Expand Down Expand Up @@ -230,3 +227,20 @@ func (f *cFactory) updateNodeCondition(host string, cType api.NodeConditionType,
}
}
}

func (f *cFactory) removeNodeConditionNotIn(hosts []string) {
for _, ns := range f.cluster.Status.Nodes {
updated := false
for _, h := range hosts {
if h == ns.Name {
updated = true
}
}

if !updated {
f.updateNodeCondition(ns.Name, api.NodeConditionLagged, core.ConditionUnknown)
f.updateNodeCondition(ns.Name, api.NodeConditionReplicating, core.ConditionUnknown)
f.updateNodeCondition(ns.Name, api.NodeConditionMaster, core.ConditionUnknown)
}
}
}
33 changes: 29 additions & 4 deletions pkg/mysqlcluster/orc_reconciliation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ var _ = Describe("Mysql cluster reconcilation", func() {

It("should update status", func() {
orcClient.AddInstance("asd.default", cluster.GetPodHostname(0),
true, -1, false)
true, -1, false, true)
orcClient.AddRecoveries("asd.default", 1, true)
factory.createPod("asd-mysql-0")

Expand All @@ -104,7 +104,7 @@ var _ = Describe("Mysql cluster reconcilation", func() {

It("should have pending recoveries", func() {
orcClient.AddInstance("asd.default", cluster.GetPodHostname(0),
true, -1, false)
true, -1, false, true)
orcClient.AddRecoveries("asd.default", 11, false)
Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed())
Expect(getCCond(
Expand All @@ -114,7 +114,7 @@ var _ = Describe("Mysql cluster reconcilation", func() {

It("should have pending recoveries but cluster not ready enough", func() {
orcClient.AddInstance("asd.default", cluster.GetPodHostname(0),
true, -1, false)
true, -1, false, true)
orcClient.AddRecoveries("asd.default", 111, false)
cluster.UpdateStatusCondition(api.ClusterConditionReady, core.ConditionTrue, "", "")
Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed())
Expand All @@ -126,7 +126,7 @@ var _ = Describe("Mysql cluster reconcilation", func() {

It("should have pending recoveries that will be recovered", func() {
orcClient.AddInstance("asd.default", cluster.GetPodHostname(0),
true, -1, false)
true, -1, false, true)
orcClient.AddRecoveries("asd.default", 112, false)
min20, _ := time.ParseDuration("-20m")
cluster.Status.Conditions = []api.ClusterCondition{
Expand All @@ -148,6 +148,31 @@ var _ = Describe("Mysql cluster reconcilation", func() {
Expect(event).To(ContainSubstring("RecoveryAcked"))
})

It("node not uptodate in orc", func() {
orcClient.AddInstance("asd.default", cluster.GetPodHostname(0),
true, -1, false, false)
Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed())

Expect(cluster.Status.Nodes[0].GetCondition(api.NodeConditionMaster).Status).To(
Equal(core.ConditionUnknown))
})

It("node not in orc", func() {
orcClient.AddInstance("asd.default", cluster.GetPodHostname(0),
true, -1, false, true)
Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed())

Expect(cluster.Status.Nodes[0].GetCondition(api.NodeConditionMaster).Status).To(
Equal(core.ConditionTrue))

orcClient.RemoveInstance("asd.default", cluster.GetPodHostname(0))
Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed())

Expect(cluster.Status.Nodes[0].GetCondition(api.NodeConditionMaster).Status).To(
Equal(core.ConditionUnknown))

})

})
})
})
Expand Down
23 changes: 22 additions & 1 deletion pkg/util/orchestrator/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func New() *FakeOrc {
return &FakeOrc{}
}

func (o *FakeOrc) AddInstance(cluster string, host string, master bool, sls int64, slaveR bool) {
func (o *FakeOrc) AddInstance(cluster, host string, master bool, sls int64, slaveR, upToDate bool) {
valid := true
if sls < 0 {
valid = false
Expand All @@ -52,6 +52,8 @@ func (o *FakeOrc) AddInstance(cluster string, host string, master bool, sls int6
ClusterName: cluster,
Slave_SQL_Running: slaveR,
Slave_IO_Running: slaveR,
IsUpToDate: upToDate,
IsLastCheckValid: upToDate,
}
if o.Clusters == nil {
o.Clusters = make(map[string][]Instance)
Expand All @@ -63,6 +65,25 @@ func (o *FakeOrc) AddInstance(cluster string, host string, master bool, sls int6
o.Clusters[cluster] = []Instance{inst}
}

func (o *FakeOrc) RemoveInstance(cluster, host string) {
instances, ok := o.Clusters[cluster]
if !ok {
return
}
index := -1
for i, inst := range instances {
if inst.Key.Hostname == host {
index = i
}
}

if index == -1 {
return
}

o.Clusters[cluster] = append(instances[:index], instances[index+1:]...)
}

func (o *FakeOrc) AddRecoveries(cluster string, id int64, ack bool) {
tr := TopologyRecovery{
Id: id,
Expand Down

0 comments on commit 354f7b1

Please sign in to comment.