Skip to content

Commit

Permalink
Merge pull request #96 from cybozu-go/issue-686
Browse files Browse the repository at this point in the history
Issue 686
  • Loading branch information
shunki-fujita authored Jun 18, 2024
2 parents 18c6141 + 230da45 commit 62089ce
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 59 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
name: Small Tests
strategy:
matrix:
mysql-version: ["8.0.18", "8.0.25", "8.0.26", "8.0.27", "8.0.28", "8.0.30", "8.0.31", "8.0.32", "8.0.33", "8.0.34", "8.0.35", "8.0.36", "8.0.37"]
mysql-version: ["8.0.28", "8.0.36", "8.0.37", "8.4.0"]
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
MYSQL_VERSION = 8.0.37
MYSQL_VERSION = 8.4.0

# For Go
GOOS := $(shell go env GOOS)
Expand Down
24 changes: 18 additions & 6 deletions server/clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"path/filepath"
"strings"
"time"

mocoagent "github.com/cybozu-go/moco-agent"
Expand Down Expand Up @@ -40,8 +41,13 @@ var _ = Describe("clone", func() {
Expect(err).NotTo(HaveOccurred())
_, err = donorDB.Exec("INSERT INTO foo.bar (i) VALUES (100), (101), (102), (103)")
Expect(err).NotTo(HaveOccurred())
_, err = donorDB.Exec(`RESET MASTER`)
Expect(err).NotTo(HaveOccurred())
if strings.HasPrefix(MySQLVersion, "8.4") {
_, err = donorDB.Exec(`RESET BINARY LOGS AND GTIDS`)
Expect(err).NotTo(HaveOccurred())
} else {
_, err = donorDB.Exec(`RESET MASTER`)
Expect(err).NotTo(HaveOccurred())
}
_, err = donorDB.Exec("INSERT INTO foo.bar (i) VALUES (200), (800), (10000), (-3)")
Expect(err).NotTo(HaveOccurred())

Expand Down Expand Up @@ -99,10 +105,16 @@ var _ = Describe("clone", func() {
By("starting replication")
_, err = donorDB.Exec(`INSERT INTO foo.bar (i) VALUES (9), (999)`)
Expect(err).NotTo(HaveOccurred())
_, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
_, err = replicaDB.Exec(`START SLAVE`)
if strings.HasPrefix(MySQLVersion, "8.4") {
_, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
} else {
_, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
}
_, err = replicaDB.Exec(`START REPLICA`)
Expect(err).NotTo(HaveOccurred())

Eventually(func() int {
Expand Down
16 changes: 14 additions & 2 deletions server/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,20 @@ func Init(ctx context.Context, db *sqlx.DB, socket string) error {
return err
}

if _, err := db.ExecContext(ctx, "RESET MASTER"); err != nil {
return fmt.Errorf("failed to reset master: %w", err)
var version string
err := db.GetContext(ctx, &version, `SELECT SUBSTRING_INDEX(VERSION(), '.', 2)`)
if err != nil {
return fmt.Errorf("failed to get version: %w", err)
}
if version == "8.4" {
if _, err := db.ExecContext(ctx, "RESET BINARY LOGS AND GTIDS"); err != nil {
return fmt.Errorf("failed to reset binary logs and gtids: %w", err)
}

} else {
if _, err := db.ExecContext(ctx, "RESET MASTER"); err != nil {
return fmt.Errorf("failed to reset master: %w", err)
}
}
if _, err := db.ExecContext(ctx, "SET GLOBAL super_read_only=ON"); err != nil {
return fmt.Errorf("failed to enable super_read_only: %w", err)
Expand Down
97 changes: 58 additions & 39 deletions server/mysql_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,26 @@ type MySQLPrimaryStatus struct {

// MySQLReplicaStatus defines the observed state of a replica
type MySQLReplicaStatus struct {
LastIOErrno int `db:"Last_IO_Errno"`
LastIOError string `db:"Last_IO_Error"`
LastSQLErrno int `db:"Last_SQL_Errno"`
LastSQLError string `db:"Last_SQL_Error"`
MasterHost string `db:"Master_Host"`
RetrievedGtidSet string `db:"Retrieved_Gtid_Set"`
ExecutedGtidSet string `db:"Executed_Gtid_Set"`
SlaveIORunning string `db:"Slave_IO_Running"`
SlaveSQLRunning string `db:"Slave_SQL_Running"`
LastIOErrno int `db:"Last_IO_Errno"`
LastIOError string `db:"Last_IO_Error"`
LastSQLErrno int `db:"Last_SQL_Errno"`
LastSQLError string `db:"Last_SQL_Error"`
SourceHost string `db:"Source_Host"`
RetrievedGtidSet string `db:"Retrieved_Gtid_Set"`
ExecutedGtidSet string `db:"Executed_Gtid_Set"`
ReplicaIORunning string `db:"Replica_IO_Running"`
ReplicaSQLRunning string `db:"Replica_SQL_Running"`

// All of variables from here are NOT used in MOCO's reconcile
SlaveIOState string `db:"Slave_IO_State"`
MasterUser string `db:"Master_User"`
MasterPort int `db:"Master_Port"`
ReplicaIOState string `db:"Replica_IO_State"`
SourceUser string `db:"Source_User"`
SourcePort int `db:"Source_Port"`
ConnectRetry int `db:"Connect_Retry"`
MasterLogFile string `db:"Master_Log_File"`
ReadMasterLogPos int `db:"Read_Master_Log_Pos"`
SourceLogFile string `db:"Source_Log_File"`
ReadSourceLogPos int `db:"Read_Source_Log_Pos"`
RelayLogFile string `db:"Relay_Log_File"`
RelayLogPos int `db:"Relay_Log_Pos"`
RelayMasterLogFile string `db:"Relay_Master_Log_File"`
RelaySourceLogFile string `db:"Relay_Source_Log_File"`
ReplicateDoDB string `db:"Replicate_Do_DB"`
ReplicateIgnoreDB string `db:"Replicate_Ignore_DB"`
ReplicateDoTable string `db:"Replicate_Do_Table"`
Expand All @@ -64,38 +64,38 @@ type MySQLReplicaStatus struct {
LastErrno int `db:"Last_Errno"`
LastError string `db:"Last_Error"`
SkipCounter int `db:"Skip_Counter"`
ExecMasterLogPos int `db:"Exec_Master_Log_Pos"`
ExecSourceLogPos int `db:"Exec_Source_Log_Pos"`
RelayLogSpace int `db:"Relay_Log_Space"`
UntilCondition string `db:"Until_Condition"`
UntilLogFile string `db:"Until_Log_File"`
UntilLogPos int `db:"Until_Log_Pos"`
MasterSSLAllowed string `db:"Master_SSL_Allowed"`
MasterSSLCAFile string `db:"Master_SSL_CA_File"`
MasterSSLCAPath string `db:"Master_SSL_CA_Path"`
MasterSSLCert string `db:"Master_SSL_Cert"`
MasterSSLCipher string `db:"Master_SSL_Cipher"`
MasterSSLKey string `db:"Master_SSL_Key"`
SecondsBehindMaster sql.NullInt64 `db:"Seconds_Behind_Master"`
MasterSSLVerifyServerCert string `db:"Master_SSL_Verify_Server_Cert"`
SourceSSLAllowed string `db:"Source_SSL_Allowed"`
SourceSSLCAFile string `db:"Source_SSL_CA_File"`
SourceSSLCAPath string `db:"Source_SSL_CA_Path"`
SourceSSLCert string `db:"Source_SSL_Cert"`
SourceSSLCipher string `db:"Source_SSL_Cipher"`
SourceSSLKey string `db:"Source_SSL_Key"`
SecondsBehindSource sql.NullInt64 `db:"Seconds_Behind_Source"`
SourceSSLVerifyServerCert string `db:"Source_SSL_Verify_Server_Cert"`
ReplicateIgnoreServerIds string `db:"Replicate_Ignore_Server_Ids"`
MasterServerID int `db:"Master_Server_Id"`
MasterUUID string `db:"Master_UUID"`
MasterInfoFile string `db:"Master_Info_File"`
SourceServerID int `db:"Source_Server_Id"`
SourceUUID string `db:"Source_UUID"`
SourceInfoFile string `db:"Source_Info_File"`
SQLDelay int `db:"SQL_Delay"`
SQLRemainingDelay sql.NullInt64 `db:"SQL_Remaining_Delay"`
SlaveSQLRunningState string `db:"Slave_SQL_Running_State"`
MasterRetryCount int `db:"Master_Retry_Count"`
MasterBind string `db:"Master_Bind"`
ReplicaSQLRunningState string `db:"Replica_SQL_Running_State"`
SourceRetryCount int `db:"Source_Retry_Count"`
SourceBind string `db:"Source_Bind"`
LastIOErrorTimestamp string `db:"Last_IO_Error_Timestamp"`
LastSQLErrorTimestamp string `db:"Last_SQL_Error_Timestamp"`
MasterSSLCrl string `db:"Master_SSL_Crl"`
MasterSSLCrlpath string `db:"Master_SSL_Crlpath"`
SourceSSLCrl string `db:"Source_SSL_Crl"`
SourceSSLCrlpath string `db:"Source_SSL_Crlpath"`
AutoPosition string `db:"Auto_Position"`
ReplicateRewriteDB string `db:"Replicate_Rewrite_DB"`
ChannelName string `db:"Channel_Name"`
MasterTLSVersion string `db:"Master_TLS_Version"`
Masterpublickeypath string `db:"Master_public_key_path"`
Getmasterpublickey string `db:"Get_master_public_key"`
SourceTLSVersion string `db:"Source_TLS_Version"`
Sourcepublickeypath string `db:"Source_public_key_path"`
GetSourcepublickey string `db:"Get_Source_public_key"`
NetworkNamespace string `db:"Network_Namespace"`
}

Expand All @@ -120,18 +120,37 @@ func (a *Agent) GetMySQLCloneStateStatus(ctx context.Context) (*MySQLCloneStateS
return status, nil
}

func (a *Agent) IsMySQL84(ctx context.Context) (bool, error) {
var version string
err := a.db.GetContext(ctx, &version, `SELECT SUBSTRING_INDEX(VERSION(), '.', 2)`)
if err != nil {
return false, fmt.Errorf("failed to get version: %w", err)
}
return version == "8.4", nil
}

func (a *Agent) GetMySQLPrimaryStatus(ctx context.Context) (*MySQLPrimaryStatus, error) {
status := &MySQLPrimaryStatus{}
if err := a.db.GetContext(ctx, status, `SHOW MASTER STATUS`); err != nil {
return nil, fmt.Errorf("failed to show master status: %w", err)
isMySQL84, err := a.IsMySQL84(ctx)
if err != nil {
return nil, err
}
if isMySQL84 {
if err := a.db.GetContext(ctx, status, `SHOW BINARY LOG STATUS`); err != nil {
return nil, fmt.Errorf("failed to show binary log status: %w", err)
}
} else {
if err := a.db.GetContext(ctx, status, `SHOW MASTER STATUS`); err != nil {
return nil, fmt.Errorf("failed to show master status: %w", err)
}
}
return status, nil
}

func (a *Agent) GetMySQLReplicaStatus(ctx context.Context) (*MySQLReplicaStatus, error) {
status := &MySQLReplicaStatus{}
if err := a.db.GetContext(ctx, status, `SHOW SLAVE STATUS`); err != nil {
return nil, fmt.Errorf("failed to show slave status: %w", err)
if err := a.db.GetContext(ctx, status, `SHOW REPLICA STATUS`); err != nil {
return nil, fmt.Errorf("failed to show replica status: %w", err)
}
return status, nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var tmpBaseDir = path.Join(os.TempDir(), "moco-agent-test-server")

var MySQLVersion = func() string {
if ver := os.Getenv("MYSQL_VERSION"); ver == "" {
os.Setenv("MYSQL_VERSION", "8.0.28")
os.Setenv("MYSQL_VERSION", "8.4.0")
}
return os.Getenv("MYSQL_VERSION")
}()
Expand Down
2 changes: 1 addition & 1 deletion server/mysqld_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (a *Agent) MySQLDReady(w http.ResponseWriter, r *http.Request) {
return
}

if replicaStatus.SlaveIORunning != "Yes" || replicaStatus.SlaveSQLRunning != "Yes" {
if replicaStatus.ReplicaIORunning != "Yes" || replicaStatus.ReplicaSQLRunning != "Yes" {
a.logger.Info("replication threads are stopped")
http.Error(w, "replication thread are stopped", http.StatusServiceUnavailable)
return
Expand Down
29 changes: 21 additions & 8 deletions server/mysqld_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"time"

mocoagent "github.com/cybozu-go/moco-agent"
Expand Down Expand Up @@ -105,10 +106,16 @@ var _ = Describe("health", func() {
_, err = donorDB.Exec("INSERT INTO foo.bar (i) VALUES (?), (?), (?), (?)", items...)
Expect(err).NotTo(HaveOccurred())

_, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
_, err = replicaDB.Exec(`START SLAVE`)
if strings.HasPrefix(MySQLVersion, "8.4") {
_, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
} else {
_, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
}
_, err = replicaDB.Exec(`START REPLICA`)
Expect(err).NotTo(HaveOccurred())

By("checking readiness")
Expand Down Expand Up @@ -173,10 +180,16 @@ var _ = Describe("health", func() {
_, err = donorDB.Exec("SET GLOBAL read_only=0")
Expect(err).NotTo(HaveOccurred())

_, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
_, err = replicaDB.Exec(`START SLAVE`)
if strings.HasPrefix(MySQLVersion, "8.4") {
_, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
} else {
_, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
}
_, err = replicaDB.Exec(`START REPLICA`)
Expect(err).NotTo(HaveOccurred())

By("checking readiness")
Expand Down

0 comments on commit 62089ce

Please sign in to comment.