diff --git a/changelog/20.0/20.0.0/summary.md b/changelog/20.0/20.0.0/summary.md index 3845f8aced5..4665069e1fe 100644 --- a/changelog/20.0/20.0.0/summary.md +++ b/changelog/20.0/20.0.0/summary.md @@ -6,6 +6,7 @@ - **[Breaking changes](#breaking-changes)** - [`shutdown_grace_period` Default Change](#shutdown-grace-period-default) - [New `unmanaged` Flag and `disable_active_reparents` deprecation](#unmanaged-flag) + - [`recovery-period-block-duration` Flag deprecation](#recovery-block-deprecation) - [`mysqlctld` `onterm-timeout` Default Change](#mysqlctld-onterm-timeout) - [`Durabler` interface method renaming](#durabler-interface-method-renaming) - **[Query Compatibility](#query-compatibility)** @@ -40,6 +41,13 @@ New flag `--unmanaged` has been introduced in this release to make it easier to Starting this release, all unmanaged tablets should specify this flag. + +#### `recovery-period-block-duration` Flag deprecation + +The flag `--recovery-period-block-duration` has been deprecated in VTOrc from this release. Its value is now ignored and the flag will be removed in later releases. +VTOrc no longer blocks recoveries for a certain duration after a previous recovery has completed. Since VTOrc refreshes the required information after +acquiring a shard lock, blocking of recoveries is not required. + #### `mysqlctld` `onterm_timeout` Default Change The `--onterm_timeout` flag default value has changed for `mysqlctld`. It now is by default long enough to be able to wait for the default `--shutdown-wait-time` when shutting down on a `TERM` signal. diff --git a/go/cmd/vtorc/cli/cli.go b/go/cmd/vtorc/cli/cli.go index f521ae05e57..1233c1e2ac2 100644 --- a/go/cmd/vtorc/cli/cli.go +++ b/go/cmd/vtorc/cli/cli.go @@ -39,7 +39,6 @@ var ( --topo_global_root /vitess/global \ --log_dir $VTDATAROOT/tmp \ --port 15000 \ - --recovery-period-block-duration "10m" \ --instance-poll-time "1s" \ --topo-information-refresh-duration "30s" \ --alsologtostderr`, @@ -85,7 +84,7 @@ func run(cmd *cobra.Command, args []string) { // addStatusParts adds UI parts to the /debug/status page of VTOrc func addStatusParts() { servenv.AddStatusPart("Recent Recoveries", logic.TopologyRecoveriesTemplate, func() any { - recoveries, _ := logic.ReadRecentRecoveries(false, 0) + recoveries, _ := logic.ReadRecentRecoveries(0) return recoveries }) } diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index 1e14056460e..187426a4afa 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -10,7 +10,6 @@ vtorc \ --topo_global_root /vitess/global \ --log_dir $VTDATAROOT/tmp \ --port 15000 \ - --recovery-period-block-duration "10m" \ --instance-poll-time "1s" \ --topo-information-refresh-duration "30s" \ --alsologtostderr @@ -65,7 +64,6 @@ Flags: --prevent-cross-cell-failover Prevent VTOrc from promoting a primary in a different cell than the current primary in case of a failover --purge_logs_interval duration how often try to remove old logs (default 1h0m0s) --reasonable-replication-lag duration Maximum replication lag on replicas which is deemed to be acceptable (default 10s) - --recovery-period-block-duration duration Duration for which a new recovery is blocked on an instance after running a recovery (default 30s) --recovery-poll-duration duration Timer duration on which VTOrc polls its database to run a recovery (default 1s) --remote_operation_timeout duration time to wait for a remote operation (default 15s) --security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only) diff --git a/go/vt/vtorc/config/config.go b/go/vt/vtorc/config/config.go index ba3c41ddc61..402c67870ba 100644 --- a/go/vt/vtorc/config/config.go +++ b/go/vt/vtorc/config/config.go @@ -44,7 +44,6 @@ const ( DiscoveryQueueMaxStatisticsSize = 120 DiscoveryCollectionRetentionSeconds = 120 UnseenInstanceForgetHours = 240 // Number of hours after which an unseen instance is forgotten - FailureDetectionPeriodBlockMinutes = 60 // The time for which an instance's failure discovery is kept "active", so as to avoid concurrent "discoveries" of the instance's failure; this precedes any recovery process, if any. ) var ( @@ -77,6 +76,7 @@ func RegisterFlags(fs *pflag.FlagSet) { fs.BoolVar(&auditToSyslog, "audit-to-syslog", auditToSyslog, "Whether to store the audit log in the syslog") fs.DurationVar(&auditPurgeDuration, "audit-purge-duration", auditPurgeDuration, "Duration for which audit logs are held before being purged. Should be in multiples of days") fs.DurationVar(&recoveryPeriodBlockDuration, "recovery-period-block-duration", recoveryPeriodBlockDuration, "Duration for which a new recovery is blocked on an instance after running a recovery") + fs.MarkDeprecated("recovery-period-block-duration", "As of v20 this is ignored and will be removed in a future release.") fs.BoolVar(&preventCrossCellFailover, "prevent-cross-cell-failover", preventCrossCellFailover, "Prevent VTOrc from promoting a primary in a different cell than the current primary in case of a failover") fs.DurationVar(&waitReplicasTimeout, "wait-replicas-timeout", waitReplicasTimeout, "Duration for which to wait for replica's to respond when issuing RPCs") fs.DurationVar(&tolerableReplicationLag, "tolerable-replication-lag", tolerableReplicationLag, "Amount of replication lag that is considered acceptable for a tablet to be eligible for promotion when Vitess makes the choice of a new primary in PRS") diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index 94daebbf7f0..fbb96ef75c0 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -24,8 +24,7 @@ var TableNames = []string{ "topology_recovery", "database_instance_topology_history", "candidate_database_instance", - "topology_failure_detection", - "blocked_topology_recovery", + "recovery_detection", "database_instance_last_analysis", "database_instance_analysis_changelog", "node_health_history", @@ -172,35 +171,19 @@ DROP TABLE IF EXISTS topology_recovery CREATE TABLE topology_recovery ( recovery_id integer, alias varchar(256) NOT NULL, - in_active_period tinyint NOT NULL DEFAULT 0, - start_active_period timestamp not null default (''), - end_active_period_unixtime int, + start_recovery timestamp NOT NULL DEFAULT (''), end_recovery timestamp NULL DEFAULT NULL, - processing_node_hostname varchar(128) NOT NULL, - processcing_node_token varchar(128) NOT NULL, successor_alias varchar(256) DEFAULT NULL, analysis varchar(128) not null default '', keyspace varchar(128) NOT NULL, shard varchar(128) NOT NULL, - count_affected_replicas int not null default 0, is_successful TINYint NOT NULL DEFAULT 0, - acknowledged TINYint NOT NULL DEFAULT 0, - acknowledged_by varchar(128) not null default '', - acknowledge_comment text not null default '', all_errors text not null default '', - acknowledged_at TIMESTAMP NULL, - last_detection_id bigint not null default 0, - uid varchar(128) not null default '', + detection_id bigint not null default 0, PRIMARY KEY (recovery_id) )`, ` -CREATE INDEX in_active_start_period_idx_topology_recovery ON topology_recovery (in_active_period, start_active_period) - `, - ` -CREATE INDEX start_active_period_idx_topology_recovery ON topology_recovery (start_active_period) - `, - ` -CREATE UNIQUE INDEX alias_active_period_uidx_topology_recovery ON topology_recovery (alias, in_active_period, end_active_period_unixtime) +CREATE INDEX start_recovery_idx_topology_recovery ON topology_recovery (start_recovery) `, ` DROP TABLE IF EXISTS database_instance_topology_history @@ -236,44 +219,19 @@ CREATE TABLE candidate_database_instance ( CREATE INDEX last_suggested_idx_candidate_database_instance ON candidate_database_instance (last_suggested) `, ` -DROP TABLE IF EXISTS topology_failure_detection +DROP TABLE IF EXISTS recovery_detection `, ` -CREATE TABLE topology_failure_detection ( +CREATE TABLE recovery_detection ( detection_id integer, alias varchar(256) NOT NULL, - in_active_period tinyint NOT NULL DEFAULT '0', - start_active_period timestamp not null default (''), - end_active_period_unixtime int NOT NULL, - processing_node_hostname varchar(128) NOT NULL, - processcing_node_token varchar(128) NOT NULL, analysis varchar(128) NOT NULL, keyspace varchar(128) NOT NULL, shard varchar(128) NOT NULL, - count_affected_replicas int NOT NULL, - is_actionable tinyint not null default 0, + detection_timestamp timestamp NOT NULL default (''), PRIMARY KEY (detection_id) )`, ` -CREATE INDEX in_active_start_period_idx_topology_failure_detection ON topology_failure_detection (in_active_period, start_active_period) - `, - ` -DROP TABLE IF EXISTS blocked_topology_recovery -`, - ` -CREATE TABLE blocked_topology_recovery ( - alias varchar(256) NOT NULL, - keyspace varchar(128) NOT NULL, - shard varchar(128) NOT NULL, - analysis varchar(128) NOT NULL, - last_blocked_timestamp timestamp not null default (''), - blocking_recovery_id bigint, - PRIMARY KEY (alias) -)`, - ` -CREATE INDEX keyspace_shard_blocked_idx_blocked_topology_recovery ON blocked_topology_recovery (keyspace, shard, last_blocked_timestamp) - `, - ` DROP TABLE IF EXISTS database_instance_last_analysis `, ` @@ -343,7 +301,7 @@ DROP TABLE IF EXISTS topology_recovery_steps ` CREATE TABLE topology_recovery_steps ( recovery_step_id integer, - recovery_uid varchar(128) NOT NULL, + recovery_id integer NOT NULL, audit_at timestamp not null default (''), message text NOT NULL, PRIMARY KEY (recovery_step_id) @@ -409,33 +367,21 @@ CREATE TABLE vitess_shard ( CREATE INDEX source_host_port_idx_database_instance_database_instance on database_instance (source_host, source_port) `, ` -CREATE INDEX keyspace_shard_in_active_idx_topology_recovery on topology_recovery (keyspace, shard, in_active_period) +CREATE INDEX keyspace_shard_idx_topology_recovery on topology_recovery (keyspace, shard) `, ` CREATE INDEX end_recovery_idx_topology_recovery on topology_recovery (end_recovery) `, ` -CREATE INDEX acknowledged_idx_topology_recovery on topology_recovery (acknowledged, acknowledged_at) - `, - ` -CREATE INDEX last_blocked_idx_blocked_topology_recovery on blocked_topology_recovery (last_blocked_timestamp) - `, - ` CREATE INDEX instance_timestamp_idx_database_instance_analysis_changelog on database_instance_analysis_changelog (alias, analysis_timestamp) `, ` -CREATE INDEX last_detection_idx_topology_recovery on topology_recovery (last_detection_id) +CREATE INDEX detection_idx_topology_recovery on topology_recovery (detection_id) `, ` CREATE INDEX last_seen_active_idx_node_health on node_health (last_seen_active) `, ` -CREATE INDEX uid_idx_topology_recovery ON topology_recovery(uid) - `, - ` -CREATE INDEX recovery_uid_idx_topology_recovery_steps ON topology_recovery_steps(recovery_uid) - `, - ` -CREATE UNIQUE INDEX alias_active_recoverable_uidx_topology_failure_detection ON topology_failure_detection (alias, in_active_period, end_active_period_unixtime, is_actionable) +CREATE INDEX recovery_id_idx_topology_recovery_steps ON topology_recovery_steps(recovery_id) `, } diff --git a/go/vt/vtorc/inst/analysis.go b/go/vt/vtorc/inst/analysis.go index 54500621cb9..328b43df0c5 100644 --- a/go/vt/vtorc/inst/analysis.go +++ b/go/vt/vtorc/inst/analysis.go @@ -142,9 +142,7 @@ type ReplicationAnalysis struct { CountDelayedReplicas uint CountLaggingReplicas uint IsActionableRecovery bool - ProcessingNodeHostname string - ProcessingNodeToken string - StartActivePeriod string + RecoveryId int64 GTIDMode string MinReplicaGTIDMode string MaxReplicaGTIDMode string diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index 749827f006c..b348d17d45f 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -31,7 +31,6 @@ import ( "vitess.io/vitess/go/vt/vtctl/reparentutil" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" - "vitess.io/vitess/go/vt/vtorc/process" "vitess.io/vitess/go/vt/vtorc/util" "github.com/patrickmn/go-cache" @@ -302,9 +301,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna clusters := make(map[string]*clusterAnalysis) err := db.Db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error { a := &ReplicationAnalysis{ - Analysis: NoProblem, - ProcessingNodeHostname: process.ThisHostname, - ProcessingNodeToken: util.ProcessToken.Hash, + Analysis: NoProblem, } tablet := &topodatapb.Tablet{} diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index 2edd68847bb..cd6406e2599 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -119,9 +119,11 @@ func ExecDBWriteFunc(f func() error) error { } func ExpireTableData(tableName string, timestampColumn string) error { - query := fmt.Sprintf("delete from %s where %s < NOW() - INTERVAL ? DAY", tableName, timestampColumn) writeFunc := func() error { - _, err := db.ExecVTOrc(query, config.Config.AuditPurgeDays) + _, err := db.ExecVTOrc( + fmt.Sprintf("delete from %s where %s < NOW() - INTERVAL ? DAY", tableName, timestampColumn), + config.Config.AuditPurgeDays, + ) return err } return ExecDBWriteFunc(writeFunc) diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go index 2346173ab4f..d6c53db11e4 100644 --- a/go/vt/vtorc/inst/instance_dao_test.go +++ b/go/vt/vtorc/inst/instance_dao_test.go @@ -715,3 +715,60 @@ func TestGetDatabaseState(t *testing.T) { require.NoError(t, err) require.Contains(t, ds, `"alias": "zone1-0000000112"`) } + +func TestExpireTableData(t *testing.T) { + oldVal := config.Config.AuditPurgeDays + config.Config.AuditPurgeDays = 10 + defer func() { + config.Config.AuditPurgeDays = oldVal + }() + + tests := []struct { + name string + tableName string + insertQuery string + timestampColumn string + expectedRowCount int + }{ + { + name: "ExpireAudit", + tableName: "audit", + timestampColumn: "audit_timestamp", + expectedRowCount: 1, + insertQuery: `insert into audit (audit_id, audit_timestamp, audit_type, alias, message, keyspace, shard) values +(1, NOW() - INTERVAL 50 DAY, 'a','a','a','a','a'), +(2, NOW() - INTERVAL 5 DAY, 'a','a','a','a','a')`, + }, + { + name: "ExpireRecoveryDetectionHistory", + tableName: "recovery_detection", + timestampColumn: "detection_timestamp", + expectedRowCount: 2, + insertQuery: `insert into recovery_detection (detection_id, detection_timestamp, alias, analysis, keyspace, shard) values +(1, NOW() - INTERVAL 3 DAY,'a','a','a','a'), +(2, NOW() - INTERVAL 5 DAY,'a','a','a','a'), +(3, NOW() - INTERVAL 15 DAY,'a','a','a','a')`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + _, err := db.ExecVTOrc(tt.insertQuery) + require.NoError(t, err) + + err = ExpireTableData(tt.tableName, tt.timestampColumn) + require.NoError(t, err) + + rowsCount := 0 + err = db.QueryVTOrc(`select * from `+tt.tableName, nil, func(rowMap sqlutils.RowMap) error { + rowsCount++ + return nil + }) + require.NoError(t, err) + require.EqualValues(t, tt.expectedRowCount, rowsCount) + }) + } +} diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index 8a1c3af5ac9..4c1e4264b5d 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -35,8 +35,6 @@ import ( "vitess.io/vitess/go/vt/vtorc/util" ) -type RecoveryType string - const ( CheckAndRecoverGenericProblemRecoveryName string = "CheckAndRecoverGenericProblem" RecoverDeadPrimaryRecoveryName string = "RecoverDeadPrimary" @@ -102,30 +100,17 @@ const ( // TopologyRecovery represents an entry in the topology_recovery table type TopologyRecovery struct { ID int64 - UID string AnalysisEntry inst.ReplicationAnalysis - SuccessorHostname string - SuccessorPort int SuccessorAlias string - IsActive bool IsSuccessful bool AllErrors []string RecoveryStartTimestamp string RecoveryEndTimestamp string - ProcessingNodeHostname string - ProcessingNodeToken string - Acknowledged bool - AcknowledgedAt string - AcknowledgedBy string - AcknowledgedComment string - LastDetectionID int64 - RelatedRecoveryID int64 - Type RecoveryType + DetectionID int64 } func NewTopologyRecovery(replicationAnalysis inst.ReplicationAnalysis) *TopologyRecovery { topologyRecovery := &TopologyRecovery{} - topologyRecovery.UID = util.PrettyUniqueToken() topologyRecovery.AnalysisEntry = replicationAnalysis topologyRecovery.AllErrors = []string{} return topologyRecovery @@ -145,16 +130,16 @@ func (topologyRecovery *TopologyRecovery) AddErrors(errs []error) { } type TopologyRecoveryStep struct { - ID int64 - RecoveryUID string - AuditAt string - Message string + ID int64 + RecoveryID int64 + AuditAt string + Message string } -func NewTopologyRecoveryStep(uid string, message string) *TopologyRecoveryStep { +func NewTopologyRecoveryStep(id int64, message string) *TopologyRecoveryStep { return &TopologyRecoveryStep{ - RecoveryUID: uid, - Message: message, + RecoveryID: id, + Message: message, } } @@ -173,7 +158,7 @@ func AuditTopologyRecovery(topologyRecovery *TopologyRecovery, message string) e return nil } - recoveryStep := NewTopologyRecoveryStep(topologyRecovery.UID, message) + recoveryStep := NewTopologyRecoveryStep(topologyRecovery.ID, message) return writeTopologyRecoveryStep(recoveryStep) } @@ -187,7 +172,7 @@ func resolveRecovery(topologyRecovery *TopologyRecovery, successorInstance *inst // recoverPrimaryHasPrimary resets the replication on the primary instance func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry) if topologyRecovery == nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixPrimaryHasPrimary.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err @@ -223,7 +208,7 @@ func runEmergencyReparentOp(ctx context.Context, analysisEntry *inst.Replication return false, nil, err } - topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, true, true) + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry) if topologyRecovery == nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another %v.", analysisEntry.AnalyzedInstanceAlias, recoveryName)) return false, nil, err @@ -300,19 +285,6 @@ func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry *inst.Repl return false, nil, nil } -// checkAndExecuteFailureDetectionProcesses tries to register for failure detection and potentially executes -// failure-detection processes. -func checkAndExecuteFailureDetectionProcesses(analysisEntry *inst.ReplicationAnalysis) (detectionRegistrationSuccess bool, processesExecutionAttempted bool, err error) { - if ok, _ := AttemptFailureDetectionRegistration(analysisEntry); !ok { - if util.ClearToLog("checkAndExecuteFailureDetectionProcesses", analysisEntry.AnalyzedInstanceAlias) { - log.Infof("checkAndExecuteFailureDetectionProcesses: could not register %+v detection on %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) - } - return false, false, nil - } - log.Infof("topology_recovery: detected %+v failure on %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) - return true, false, nil -} - // getCheckAndRecoverFunctionCode gets the recovery function code to use for the given analysis. func getCheckAndRecoverFunctionCode(analysisCode inst.AnalysisCode, tabletAlias string) recoveryFunction { switch analysisCode { @@ -500,17 +472,12 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er } // At this point we have validated there's a failure scenario for which we have a recovery path. - - // Initiate detection: - _, _, err = checkAndExecuteFailureDetectionProcesses(analysisEntry) + // Record the failure detected in the logs. + err = InsertRecoveryDetection(analysisEntry) if err != nil { - log.Errorf("executeCheckAndRecoverFunction: error on failure detection: %+v", err) + log.Errorf("executeCheckAndRecoverFunction: error on inserting recovery detection record: %+v", err) return err } - // We don't mind whether detection really executed the processes or not - // (it may have been silenced due to previous detection). We only care there's no error. - - // We're about to embark on recovery shortly... // Check for recovery being disabled globally if recoveryDisabledGlobally, err := IsRecoveryDisabled(); err != nil { @@ -705,7 +672,7 @@ func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry *inst.R // electNewPrimary elects a new primary while none were present before. func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false /*failIfFailedInstanceInActiveRecovery*/, true /*failIfClusterInActiveRecovery*/) + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry) if topologyRecovery == nil || err != nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another electNewPrimary.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err @@ -754,7 +721,7 @@ func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysi // fixPrimary sets the primary as read-write. func fixPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry) if topologyRecovery == nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixPrimary.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err @@ -785,7 +752,7 @@ func fixPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (r // fixReplica sets the replica as read-only and points it at the current primary. func fixReplica(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry) if topologyRecovery == nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixReplica.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err @@ -826,7 +793,7 @@ func fixReplica(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (r // recoverErrantGTIDDetected changes the tablet type of a replica tablet that has errant GTIDs. func recoverErrantGTIDDetected(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry) if topologyRecovery == nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another recoverErrantGTIDDetected.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index 4a7a6c77ef1..dd5f8a96430 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -25,87 +25,41 @@ import ( "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" - "vitess.io/vitess/go/vt/vtorc/process" - "vitess.io/vitess/go/vt/vtorc/util" ) -// AttemptFailureDetectionRegistration tries to add a failure-detection entry; if this fails that means the problem has already been detected -func AttemptFailureDetectionRegistration(analysisEntry *inst.ReplicationAnalysis) (registrationSuccessful bool, err error) { - args := sqlutils.Args( - analysisEntry.AnalyzedInstanceAlias, - process.ThisHostname, - util.ProcessToken.Hash, - string(analysisEntry.Analysis), - analysisEntry.ClusterDetails.Keyspace, - analysisEntry.ClusterDetails.Shard, - analysisEntry.CountReplicas, - analysisEntry.IsActionableRecovery, - ) - startActivePeriodHint := "now()" - if analysisEntry.StartActivePeriod != "" { - startActivePeriodHint = "?" - args = append(args, analysisEntry.StartActivePeriod) - } - - query := fmt.Sprintf(` +// InsertRecoveryDetection inserts the recovery analysis that has been detected. +func InsertRecoveryDetection(analysisEntry *inst.ReplicationAnalysis) error { + sqlResult, err := db.ExecVTOrc(` insert ignore - into topology_failure_detection ( + into recovery_detection ( alias, - in_active_period, - end_active_period_unixtime, - processing_node_hostname, - processcing_node_token, analysis, keyspace, shard, - count_affected_replicas, - is_actionable, - start_active_period + detection_timestamp ) values ( - ?, - 1, - 0, - ?, - ?, ?, ?, ?, ?, - ?, - %s - ) - `, startActivePeriodHint) - - sqlResult, err := db.ExecVTOrc(query, args...) + now() + )`, + analysisEntry.AnalyzedInstanceAlias, + string(analysisEntry.Analysis), + analysisEntry.ClusterDetails.Keyspace, + analysisEntry.ClusterDetails.Shard, + ) if err != nil { log.Error(err) - return false, err + return err } - rows, err := sqlResult.RowsAffected() - if err != nil { - log.Error(err) - return false, err - } - return (rows > 0), nil -} - -// ClearActiveFailureDetections clears the "in_active_period" flag for old-enough detections, thereby allowing for -// further detections on cleared instances. -func ClearActiveFailureDetections() error { - _, err := db.ExecVTOrc(` - update topology_failure_detection set - in_active_period = 0, - end_active_period_unixtime = UNIX_TIMESTAMP() - where - in_active_period = 1 - AND start_active_period < NOW() - INTERVAL ? MINUTE - `, - config.FailureDetectionPeriodBlockMinutes, - ) + id, err := sqlResult.LastInsertId() if err != nil { log.Error(err) + return err } - return err + analysisEntry.RecoveryId = id + return nil } func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecovery, error) { @@ -114,43 +68,29 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover insert ignore into topology_recovery ( recovery_id, - uid, alias, - in_active_period, - start_active_period, - end_active_period_unixtime, - processing_node_hostname, - processcing_node_token, + start_recovery, analysis, keyspace, shard, - count_affected_replicas, - last_detection_id + detection_id ) values ( ?, ?, - ?, - 1, NOW(), - 0, - ?, ?, ?, ?, - ?, - ?, - (select ifnull(max(detection_id), 0) from topology_failure_detection where alias = ?) + ? ) `, sqlutils.NilIfZero(topologyRecovery.ID), - topologyRecovery.UID, analysisEntry.AnalyzedInstanceAlias, - process.ThisHostname, util.ProcessToken.Hash, string(analysisEntry.Analysis), analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, - analysisEntry.CountReplicas, analysisEntry.AnalyzedInstanceAlias, + analysisEntry.RecoveryId, ) if err != nil { return nil, err @@ -171,224 +111,27 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover } // AttemptRecoveryRegistration tries to add a recovery entry; if this fails that means recovery is already in place. -func AttemptRecoveryRegistration(analysisEntry *inst.ReplicationAnalysis, failIfFailedInstanceInActiveRecovery bool, failIfClusterInActiveRecovery bool) (*TopologyRecovery, error) { - if failIfFailedInstanceInActiveRecovery { - // Let's check if this instance has just been promoted recently and is still in active period. - // If so, we reject recovery registration to avoid flapping. - recoveries, err := ReadInActivePeriodSuccessorInstanceRecovery(analysisEntry.AnalyzedInstanceAlias) - if err != nil { - log.Error(err) - return nil, err - } - if len(recoveries) > 0 { - _ = RegisterBlockedRecoveries(analysisEntry, recoveries) - errMsg := fmt.Sprintf("AttemptRecoveryRegistration: tablet %+v has recently been promoted (by failover of %+v) and is in active period. It will not be failed over. You may acknowledge the failure on %+v (-c ack-instance-recoveries) to remove this blockage", analysisEntry.AnalyzedInstanceAlias, recoveries[0].AnalysisEntry.AnalyzedInstanceAlias, recoveries[0].AnalysisEntry.AnalyzedInstanceAlias) - log.Errorf(errMsg) - return nil, fmt.Errorf(errMsg) - } - } - if failIfClusterInActiveRecovery { - // Let's check if this cluster has just experienced a failover of the same analysis and is still in active period. - // If so, we reject recovery registration to avoid flapping. - recoveries, err := ReadInActivePeriodClusterRecovery(analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, string(analysisEntry.Analysis)) - if err != nil { - log.Error(err) - return nil, err - } - if len(recoveries) > 0 { - _ = RegisterBlockedRecoveries(analysisEntry, recoveries) - errMsg := fmt.Sprintf("AttemptRecoveryRegistration: keyspace %+v shard %+v has recently experienced a failover (of %+v) and is in active period. It will not be failed over again. You may acknowledge the failure on this cluster (-c ack-cluster-recoveries) or on %+v (-c ack-instance-recoveries) to remove this blockage", analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, recoveries[0].AnalysisEntry.AnalyzedInstanceAlias, recoveries[0].AnalysisEntry.AnalyzedInstanceAlias) - log.Errorf(errMsg) - return nil, fmt.Errorf(errMsg) - } - } - if !failIfFailedInstanceInActiveRecovery { - // Implicitly acknowledge this instance's possibly existing active recovery, provided they are completed. - _, _ = AcknowledgeInstanceCompletedRecoveries(analysisEntry.AnalyzedInstanceAlias, "vtorc", fmt.Sprintf("implicit acknowledge due to user invocation of recovery on same instance: %+v", analysisEntry.AnalyzedInstanceAlias)) - // The fact we only acknowledge a completed recovery solves the possible case of two DBAs simultaneously - // trying to recover the same instance at the same time - } - - topologyRecovery := NewTopologyRecovery(*analysisEntry) - - topologyRecovery, err := writeTopologyRecovery(topologyRecovery) +func AttemptRecoveryRegistration(analysisEntry *inst.ReplicationAnalysis) (*TopologyRecovery, error) { + // Check if there is an active recovery in progress for the cluster of the given instance. + recoveries, err := ReadActiveClusterRecoveries(analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard) if err != nil { log.Error(err) return nil, err } - return topologyRecovery, nil -} - -// ClearActiveRecoveries clears the "in_active_period" flag for old-enough recoveries, thereby allowing for -// further recoveries on cleared instances. -func ClearActiveRecoveries() error { - _, err := db.ExecVTOrc(` - update topology_recovery set - in_active_period = 0, - end_active_period_unixtime = UNIX_TIMESTAMP() - where - in_active_period = 1 - AND start_active_period < NOW() - INTERVAL ? SECOND - `, - config.Config.RecoveryPeriodBlockSeconds, - ) - if err != nil { - log.Error(err) - } - return err -} - -// RegisterBlockedRecoveries writes down currently blocked recoveries, and indicates what recovery they are blocked on. -// Recoveries are blocked thru the in_active_period flag, which comes to avoid flapping. -func RegisterBlockedRecoveries(analysisEntry *inst.ReplicationAnalysis, blockingRecoveries []*TopologyRecovery) error { - for _, recovery := range blockingRecoveries { - _, err := db.ExecVTOrc(` - insert - into blocked_topology_recovery ( - alias, - keyspace, - shard, - analysis, - last_blocked_timestamp, - blocking_recovery_id - ) values ( - ?, - ?, - ?, - ?, - NOW(), - ? - ) - on duplicate key update - keyspace=values(keyspace), - shard=values(shard), - analysis=values(analysis), - last_blocked_timestamp=values(last_blocked_timestamp), - blocking_recovery_id=values(blocking_recovery_id) - `, analysisEntry.AnalyzedInstanceAlias, - analysisEntry.ClusterDetails.Keyspace, - analysisEntry.ClusterDetails.Shard, - string(analysisEntry.Analysis), - recovery.ID, - ) - if err != nil { - log.Error(err) - } - } - return nil -} - -// ExpireBlockedRecoveries clears listing of blocked recoveries that are no longer actually blocked. -func ExpireBlockedRecoveries() error { - // Older recovery is acknowledged by now, hence blocked recovery should be released. - // Do NOTE that the data in blocked_topology_recovery is only used for auditing: it is NOT the data - // based on which we make automated decisions. - - query := ` - select - blocked_topology_recovery.alias - from - blocked_topology_recovery - left join topology_recovery on (blocking_recovery_id = topology_recovery.recovery_id and acknowledged = 0) - where - acknowledged is null - ` - var expiredAliases []string - err := db.QueryVTOrc(query, sqlutils.Args(), func(m sqlutils.RowMap) error { - expiredAliases = append(expiredAliases, m.GetString("alias")) - return nil - }) - - for _, expiredAlias := range expiredAliases { - _, err := db.ExecVTOrc(` - delete - from blocked_topology_recovery - where - alias = ? - `, - expiredAlias, - ) - if err != nil { - log.Error(err) - return err - } + if len(recoveries) > 0 { + errMsg := fmt.Sprintf("AttemptRecoveryRegistration: Active recovery (id:%v) in the cluster %s:%s for %s", recoveries[0].ID, analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, recoveries[0].AnalysisEntry.Analysis) + log.Errorf(errMsg) + return nil, fmt.Errorf(errMsg) } - if err != nil { - log.Error(err) - return err - } - // Some oversampling, if a problem has not been noticed for some time (e.g. the server came up alive - // before action was taken), expire it. - // Recall that RegisterBlockedRecoveries continuously updates the last_blocked_timestamp column. - _, err = db.ExecVTOrc(` - delete - from blocked_topology_recovery - where - last_blocked_timestamp < NOW() - interval ? second - `, config.Config.RecoveryPollSeconds*2, - ) - if err != nil { - log.Error(err) - } - return err -} + topologyRecovery := NewTopologyRecovery(*analysisEntry) -// acknowledgeRecoveries sets acknowledged* details and clears the in_active_period flags from a set of entries -func acknowledgeRecoveries(owner string, comment string, markEndRecovery bool, whereClause string, args []any) (countAcknowledgedEntries int64, err error) { - additionalSet := `` - if markEndRecovery { - additionalSet = ` - end_recovery=IFNULL(end_recovery, NOW()), - ` - } - query := fmt.Sprintf(` - update topology_recovery set - in_active_period = 0, - end_active_period_unixtime = case when end_active_period_unixtime = 0 then UNIX_TIMESTAMP() else end_active_period_unixtime end, - %s - acknowledged = 1, - acknowledged_at = NOW(), - acknowledged_by = ?, - acknowledge_comment = ? - where - acknowledged = 0 - and - %s - `, additionalSet, whereClause) - args = append(sqlutils.Args(owner, comment), args...) - sqlResult, err := db.ExecVTOrc(query, args...) - if err != nil { - log.Error(err) - return 0, err - } - rows, err := sqlResult.RowsAffected() + topologyRecovery, err = writeTopologyRecovery(topologyRecovery) if err != nil { log.Error(err) + return nil, err } - return rows, err -} - -// AcknowledgeInstanceCompletedRecoveries marks active and COMPLETED recoveries for given instance as acknowledged. -// This also implied clearing their active period, which in turn enables further recoveries on those topologies -func AcknowledgeInstanceCompletedRecoveries(tabletAlias string, owner string, comment string) (countAcknowledgedEntries int64, err error) { - whereClause := ` - alias = ? - and end_recovery is not null - ` - return acknowledgeRecoveries(owner, comment, false, whereClause, sqlutils.Args(tabletAlias)) -} - -// AcknowledgeCrashedRecoveries marks recoveries whose processing nodes has crashed as acknowledged. -func AcknowledgeCrashedRecoveries() (countAcknowledgedEntries int64, err error) { - whereClause := ` - in_active_period = 1 - and end_recovery is null - and concat(processing_node_hostname, ':', processcing_node_token) not in ( - select concat(hostname, ':', token) from node_health - ) - ` - return acknowledgeRecoveries("vtorc", "detected crashed recovery", true, whereClause, sqlutils.Args()) + return topologyRecovery, nil } // ResolveRecovery is called on completion of a recovery process and updates the recovery status. @@ -401,11 +144,11 @@ func writeResolveRecovery(topologyRecovery *TopologyRecovery) error { all_errors = ?, end_recovery = NOW() where - uid = ? + recovery_id = ? `, topologyRecovery.IsSuccessful, topologyRecovery.SuccessorAlias, strings.Join(topologyRecovery.AllErrors, "\n"), - topologyRecovery.UID, + topologyRecovery.ID, ) if err != nil { log.Error(err) @@ -419,26 +162,16 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog query := fmt.Sprintf(` select recovery_id, - uid, alias, - (IFNULL(end_active_period_unixtime, 0) = 0) as is_active, - start_active_period, - IFNULL(end_active_period_unixtime, 0) as end_active_period_unixtime, + start_recovery, IFNULL(end_recovery, '') AS end_recovery, is_successful, - processing_node_hostname, - processcing_node_token, ifnull(successor_alias, '') as successor_alias, analysis, keyspace, shard, - count_affected_replicas, all_errors, - acknowledged, - acknowledged_at, - acknowledged_by, - acknowledge_comment, - last_detection_id + detection_id from topology_recovery %s @@ -449,20 +182,15 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog err := db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error { topologyRecovery := *NewTopologyRecovery(inst.ReplicationAnalysis{}) topologyRecovery.ID = m.GetInt64("recovery_id") - topologyRecovery.UID = m.GetString("uid") - topologyRecovery.IsActive = m.GetBool("is_active") - topologyRecovery.RecoveryStartTimestamp = m.GetString("start_active_period") + topologyRecovery.RecoveryStartTimestamp = m.GetString("start_recovery") topologyRecovery.RecoveryEndTimestamp = m.GetString("end_recovery") topologyRecovery.IsSuccessful = m.GetBool("is_successful") - topologyRecovery.ProcessingNodeHostname = m.GetString("processing_node_hostname") - topologyRecovery.ProcessingNodeToken = m.GetString("processcing_node_token") topologyRecovery.AnalysisEntry.AnalyzedInstanceAlias = m.GetString("alias") topologyRecovery.AnalysisEntry.Analysis = inst.AnalysisCode(m.GetString("analysis")) topologyRecovery.AnalysisEntry.ClusterDetails.Keyspace = m.GetString("keyspace") topologyRecovery.AnalysisEntry.ClusterDetails.Shard = m.GetString("shard") - topologyRecovery.AnalysisEntry.CountReplicas = m.GetUint("count_affected_replicas") topologyRecovery.SuccessorAlias = m.GetString("successor_alias") @@ -470,12 +198,7 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog topologyRecovery.AllErrors = strings.Split(m.GetString("all_errors"), "\n") - topologyRecovery.Acknowledged = m.GetBool("acknowledged") - topologyRecovery.AcknowledgedAt = m.GetString("acknowledged_at") - topologyRecovery.AcknowledgedBy = m.GetString("acknowledged_by") - topologyRecovery.AcknowledgedComment = m.GetString("acknowledge_comment") - - topologyRecovery.LastDetectionID = m.GetInt64("last_detection_id") + topologyRecovery.DetectionID = m.GetInt64("detection_id") res = append(res, &topologyRecovery) return nil @@ -487,37 +210,21 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog return res, err } -// ReadInActivePeriodClusterRecovery reads recoveries (possibly complete!) that are in active period for the analysis. -// (may be used to block further recoveries of the same analysis on this cluster) -func ReadInActivePeriodClusterRecovery(keyspace string, shard, analysis string) ([]*TopologyRecovery, error) { +// ReadActiveClusterRecoveries reads recoveries that are ongoing for the given cluster. +func ReadActiveClusterRecoveries(keyspace string, shard string) ([]*TopologyRecovery, error) { whereClause := ` where - in_active_period=1 + end_recovery IS NULL and keyspace=? - and shard=? - and analysis=?` - return readRecoveries(whereClause, ``, sqlutils.Args(keyspace, shard, analysis)) -} - -// ReadInActivePeriodSuccessorInstanceRecovery reads completed recoveries for a given instance, where said instance -// was promoted as result, still in active period (may be used to block further recoveries should this instance die) -func ReadInActivePeriodSuccessorInstanceRecovery(tabletAlias string) ([]*TopologyRecovery, error) { - whereClause := ` - where - in_active_period=1 - and - successor_alias=?` - return readRecoveries(whereClause, ``, sqlutils.Args(tabletAlias)) + and shard=?` + return readRecoveries(whereClause, ``, sqlutils.Args(keyspace, shard)) } // ReadRecentRecoveries reads latest recovery entries from topology_recovery -func ReadRecentRecoveries(unacknowledgedOnly bool, page int) ([]*TopologyRecovery, error) { +func ReadRecentRecoveries(page int) ([]*TopologyRecovery, error) { whereConditions := []string{} whereClause := "" var args []any - if unacknowledgedOnly { - whereConditions = append(whereConditions, `acknowledged=0`) - } if len(whereConditions) > 0 { whereClause = fmt.Sprintf("where %s", strings.Join(whereConditions, " and ")) } @@ -533,9 +240,9 @@ func writeTopologyRecoveryStep(topologyRecoveryStep *TopologyRecoveryStep) error sqlResult, err := db.ExecVTOrc(` insert ignore into topology_recovery_steps ( - recovery_step_id, recovery_uid, audit_at, message + recovery_step_id, recovery_id, audit_at, message ) values (?, ?, now(), ?) - `, sqlutils.NilIfZero(topologyRecoveryStep.ID), topologyRecoveryStep.RecoveryUID, topologyRecoveryStep.Message, + `, sqlutils.NilIfZero(topologyRecoveryStep.ID), topologyRecoveryStep.RecoveryID, topologyRecoveryStep.Message, ) if err != nil { log.Error(err) @@ -548,17 +255,17 @@ func writeTopologyRecoveryStep(topologyRecoveryStep *TopologyRecoveryStep) error return err } -// ExpireFailureDetectionHistory removes old rows from the topology_failure_detection table -func ExpireFailureDetectionHistory() error { - return inst.ExpireTableData("topology_failure_detection", "start_active_period") +// ExpireRecoveryDetectionHistory removes old rows from the recovery_detection table +func ExpireRecoveryDetectionHistory() error { + return inst.ExpireTableData("recovery_detection", "detection_timestamp") } -// ExpireTopologyRecoveryHistory removes old rows from the topology_failure_detection table +// ExpireTopologyRecoveryHistory removes old rows from the topology_recovery table func ExpireTopologyRecoveryHistory() error { - return inst.ExpireTableData("topology_recovery", "start_active_period") + return inst.ExpireTableData("topology_recovery", "start_recovery") } -// ExpireTopologyRecoveryStepsHistory removes old rows from the topology_failure_detection table +// ExpireTopologyRecoveryStepsHistory removes old rows from the topology_recovery_steps table func ExpireTopologyRecoveryStepsHistory() error { return inst.ExpireTableData("topology_recovery_steps", "audit_at") } diff --git a/go/vt/vtorc/logic/topology_recovery_dao_test.go b/go/vt/vtorc/logic/topology_recovery_dao_test.go index f9a9026a4a1..354af82e2b3 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao_test.go +++ b/go/vt/vtorc/logic/topology_recovery_dao_test.go @@ -17,11 +17,13 @@ limitations under the License. package logic import ( + "strconv" "testing" "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/external/golib/sqlutils" + "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" ) @@ -59,7 +61,7 @@ func TestTopologyRecovery(t *testing.T) { }) t.Run("read recoveries", func(t *testing.T) { - recoveries, err := ReadRecentRecoveries(false, 0) + recoveries, err := ReadRecentRecoveries(0) require.NoError(t, err) require.Len(t, recoveries, 1) // Assert that the ID field matches the one that we just wrote @@ -67,35 +69,102 @@ func TestTopologyRecovery(t *testing.T) { }) } -// TestBlockedRecoveryInsertion tests that we are able to insert into the blocked_recovery table. -func TestBlockedRecoveryInsertion(t *testing.T) { - orcDb, err := db.OpenVTOrc() - require.NoError(t, err) +func TestExpireTableData(t *testing.T) { + oldVal := config.Config.AuditPurgeDays + config.Config.AuditPurgeDays = 10 defer func() { - _, err = orcDb.Exec("delete from blocked_topology_recovery") - require.NoError(t, err) + config.Config.AuditPurgeDays = oldVal }() - analysisEntry := &inst.ReplicationAnalysis{ - AnalyzedInstanceAlias: "zone1-0000000100", - ClusterDetails: inst.ClusterInfo{ - Keyspace: "ks", - Shard: "0", + tests := []struct { + name string + tableName string + insertQuery string + expectedRowCount int + expireFunc func() error + }{ + { + name: "ExpireRecoveryDetectionHistory", + tableName: "recovery_detection", + expectedRowCount: 2, + insertQuery: `insert into recovery_detection (detection_id, detection_timestamp, alias, analysis, keyspace, shard) values +(1, NOW() - INTERVAL 3 DAY,'a','a','a','a'), +(2, NOW() - INTERVAL 5 DAY,'a','a','a','a'), +(3, NOW() - INTERVAL 15 DAY,'a','a','a','a')`, + expireFunc: ExpireRecoveryDetectionHistory, + }, + { + name: "ExpireTopologyRecoveryHistory", + tableName: "topology_recovery", + expectedRowCount: 1, + insertQuery: `insert into topology_recovery (recovery_id, start_recovery, alias, analysis, keyspace, shard) values +(1, NOW() - INTERVAL 13 DAY,'a','a','a','a'), +(2, NOW() - INTERVAL 5 DAY,'a','a','a','a'), +(3, NOW() - INTERVAL 15 DAY,'a','a','a','a')`, + expireFunc: ExpireTopologyRecoveryHistory, }, - Analysis: inst.DeadPrimaryAndSomeReplicas, + { + name: "ExpireTopologyRecoveryStepsHistory", + tableName: "topology_recovery_steps", + expectedRowCount: 1, + insertQuery: `insert into topology_recovery_steps (recovery_step_id, audit_at, recovery_id, message) values +(1, NOW() - INTERVAL 13 DAY, 1, 'a'), +(2, NOW() - INTERVAL 5 DAY, 2, 'a'), +(3, NOW() - INTERVAL 15 DAY, 3, 'a')`, + expireFunc: ExpireTopologyRecoveryStepsHistory, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + _, err := db.ExecVTOrc(tt.insertQuery) + require.NoError(t, err) + + err = tt.expireFunc() + require.NoError(t, err) + + rowsCount := 0 + err = db.QueryVTOrc(`select * from `+tt.tableName, nil, func(rowMap sqlutils.RowMap) error { + rowsCount++ + return nil + }) + require.NoError(t, err) + require.EqualValues(t, tt.expectedRowCount, rowsCount) + }) } - blockedRecovery := &TopologyRecovery{ - ID: 1, +} + +func TestInsertRecoveryDetection(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + ra := &inst.ReplicationAnalysis{ + AnalyzedInstanceAlias: "alias-1", + Analysis: inst.ClusterHasNoPrimary, + ClusterDetails: inst.ClusterInfo{ + Keyspace: keyspace, + Shard: shard, + }, } - err = RegisterBlockedRecoveries(analysisEntry, []*TopologyRecovery{blockedRecovery}) + err := InsertRecoveryDetection(ra) require.NoError(t, err) + require.NotEqual(t, 0, ra.RecoveryId) - totalBlockedRecoveries := 0 - err = db.QueryVTOrc("select count(*) as blocked_recoveries from blocked_topology_recovery", nil, func(rowMap sqlutils.RowMap) error { - totalBlockedRecoveries = rowMap.GetInt("blocked_recoveries") + var rows []map[string]sqlutils.CellData + err = db.QueryVTOrc("select * from recovery_detection", nil, func(rowMap sqlutils.RowMap) error { + rows = append(rows, rowMap) return nil }) require.NoError(t, err) - // There should be 1 blocked recovery after insertion - require.Equal(t, 1, totalBlockedRecoveries) + require.Len(t, rows, 1) + require.EqualValues(t, ra.AnalyzedInstanceAlias, rows[0]["alias"].String) + require.EqualValues(t, ra.Analysis, rows[0]["analysis"].String) + require.EqualValues(t, keyspace, rows[0]["keyspace"].String) + require.EqualValues(t, shard, rows[0]["shard"].String) + require.EqualValues(t, strconv.Itoa(int(ra.RecoveryId)), rows[0]["detection_id"].String) + require.NotEqual(t, "", rows[0]["detection_timestamp"].String) } diff --git a/go/vt/vtorc/logic/topology_recovery_test.go b/go/vt/vtorc/logic/topology_recovery_test.go index f636a194283..f7658060b95 100644 --- a/go/vt/vtorc/logic/topology_recovery_test.go +++ b/go/vt/vtorc/logic/topology_recovery_test.go @@ -131,7 +131,7 @@ func TestElectNewPrimaryPanic(t *testing.T) { require.Error(t, err) } -func TestDifferentAnalysescHaveDifferentCooldowns(t *testing.T) { +func TestRecoveryRegistration(t *testing.T) { orcDb, err := db.OpenVTOrc() require.NoError(t, err) oldTs := ts @@ -181,13 +181,20 @@ func TestDifferentAnalysescHaveDifferentCooldowns(t *testing.T) { defer cancel() ts = memorytopo.NewServer(ctx, "zone1") - _, err = AttemptRecoveryRegistration(&replicaAnalysisEntry, false, true) - require.Nil(t, err) + tp, err := AttemptRecoveryRegistration(&replicaAnalysisEntry) + require.NoError(t, err) + + // because there is another recovery in progress for this shard, this will fail. + _, err = AttemptRecoveryRegistration(&primaryAnalysisEntry) + require.ErrorContains(t, err, "Active recovery") - // even though this is another recovery on the same cluster, allow it to go through - // because the analysis is different (ReplicationStopped vs DeadPrimary) - _, err = AttemptRecoveryRegistration(&primaryAnalysisEntry, true, true) - require.Nil(t, err) + // Lets say the recovery finishes after some time. + err = resolveRecovery(tp, nil) + require.NoError(t, err) + + // now this recovery registration should be successful. + _, err = AttemptRecoveryRegistration(&primaryAnalysisEntry) + require.NoError(t, err) } func TestGetCheckAndRecoverFunctionCode(t *testing.T) { diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 66c5590831b..b9e5795a31f 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -374,7 +374,7 @@ func ContinuousDiscovery() { go inst.ExpireStaleInstanceBinlogCoordinates() go process.ExpireNodesHistory() go process.ExpireAvailableNodes() - go ExpireFailureDetectionHistory() + go ExpireRecoveryDetectionHistory() go ExpireTopologyRecoveryHistory() go ExpireTopologyRecoveryStepsHistory() } @@ -382,10 +382,6 @@ func ContinuousDiscovery() { case <-recoveryTick: go func() { if IsLeaderOrActive() { - go ClearActiveFailureDetections() - go ClearActiveRecoveries() - go ExpireBlockedRecoveries() - go AcknowledgeCrashedRecoveries() go inst.ExpireInstanceAnalysisChangelog() go func() { diff --git a/go/vt/vtorc/util/token.go b/go/vt/vtorc/util/token.go index 940f7a44698..b3e61594c29 100644 --- a/go/vt/vtorc/util/token.go +++ b/go/vt/vtorc/util/token.go @@ -20,8 +20,6 @@ import ( "crypto/rand" "crypto/sha256" "encoding/hex" - "fmt" - "time" ) func toHash(input []byte) string { @@ -53,7 +51,3 @@ func NewToken() *Token { Hash: RandomHash(), } } - -func PrettyUniqueToken() string { - return fmt.Sprintf("%d:%s", time.Now().UnixNano(), NewToken().Hash) -}