diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 7c412f30bd7..a5b7e9e49a2 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -326,6 +326,8 @@ func (c *captureImpl) run(stdCtx context.Context) error { }() g, stdCtx := errgroup.WithContext(stdCtx) + stdCtx, cancel := context.WithCancel(stdCtx) + ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{ CaptureInfo: c.info, EtcdClient: c.EtcdClient, @@ -335,7 +337,6 @@ func (c *captureImpl) run(stdCtx context.Context) error { SorterSystem: c.sorterSystem, SortEngineFactory: c.sortEngineFactory, }) - g.Go(func() error { // when the campaignOwner returns an error, it means that the owner throws // an unrecoverable serious errors (recoverable errors are intercepted in the owner tick) @@ -351,9 +352,20 @@ func (c *captureImpl) run(stdCtx context.Context) error { }) g.Go(func() error { + // Processor manager should be closed as soon as possible to prevent double write issue. + defer func() { + if cancel != nil { + // Propagate the cancel signal to the owner and other goroutines. + cancel() + } + if c.processorManager != nil { + c.processorManager.AsyncClose() + } + log.Info("processor manager closed", zap.String("captureID", c.info.ID)) + }() processorFlushInterval := time.Duration(c.config.ProcessorFlushInterval) - globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()) + globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL) globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) { c.MessageRouter.AddPeer(captureID, addr) @@ -419,7 +431,6 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { } // Campaign to be the owner, it blocks until it been elected. if err := c.campaign(ctx); err != nil { - rootErr := errors.Cause(err) if rootErr == context.Canceled { return nil @@ -467,7 +478,7 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { owner := c.newOwner(c.upstreamManager) c.setOwner(owner) - globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()) + globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL) globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) { c.MessageRouter.AddPeer(captureID, addr) @@ -485,27 +496,27 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { } }) - err = c.runEtcdWorker(ownerCtx, owner, - orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()), - ownerFlushInterval, util.RoleOwner.String()) + err = c.runEtcdWorker(ownerCtx, owner, globalState, ownerFlushInterval, util.RoleOwner.String()) c.owner.AsyncStop() c.setOwner(nil) - // if owner exits, resign the owner key, - // use a new context to prevent the context from being cancelled. - resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - if resignErr := c.resign(resignCtx); resignErr != nil { - if errors.Cause(resignErr) != context.DeadlineExceeded { - log.Info("owner resign failed", zap.String("captureID", c.info.ID), + if !cerror.ErrNotOwner.Equal(err) { + // if owner exits, resign the owner key, + // use a new context to prevent the context from being cancelled. + resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if resignErr := c.resign(resignCtx); resignErr != nil { + if errors.Cause(resignErr) != context.DeadlineExceeded { + log.Info("owner resign failed", zap.String("captureID", c.info.ID), + zap.Error(resignErr), zap.Int64("ownerRev", ownerRev)) + cancel() + return errors.Trace(resignErr) + } + + log.Warn("owner resign timeout", zap.String("captureID", c.info.ID), zap.Error(resignErr), zap.Int64("ownerRev", ownerRev)) - cancel() - return errors.Trace(resignErr) } - - log.Warn("owner resign timeout", zap.String("captureID", c.info.ID), - zap.Error(resignErr), zap.Int64("ownerRev", ownerRev)) + cancel() } - cancel() log.Info("owner resigned successfully", zap.String("captureID", c.info.ID), zap.Int64("ownerRev", ownerRev)) @@ -622,10 +633,6 @@ func (c *captureImpl) AsyncClose() { c.captureMu.Lock() defer c.captureMu.Unlock() - if c.processorManager != nil { - c.processorManager.AsyncClose() - } - log.Info("processor manager closed", zap.String("captureID", c.info.ID)) c.grpcService.Reset(nil) if c.MessageRouter != nil { diff --git a/cdc/capture/election.go b/cdc/capture/election.go index 9012d78e596..6388b1f0696 100644 --- a/cdc/capture/election.go +++ b/cdc/capture/election.go @@ -39,11 +39,11 @@ func newElection(sess *concurrency.Session, key string) election { } } -func (e *electionImpl) campaign(ctx context.Context, key string) error { +func (e *electionImpl) campaign(ctx context.Context, val string) error { failpoint.Inject("capture-campaign-compacted-error", func() { failpoint.Return(errors.Trace(mvcc.ErrCompacted)) }) - return e.election.Campaign(ctx, key) + return e.election.Campaign(ctx, val) } func (e *electionImpl) resign(ctx context.Context) error { diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 9b4756d551c..96d80ba934b 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -411,14 +411,6 @@ func (o *ownerImpl) updateMetrics() { changefeedStatusGauge.WithLabelValues(cfID.Namespace, cfID.ID). Set(float64(cf.state.Info.State.ToInt())) } - - // The InfoProvider is a proxy object returning information - // from the scheduler. - infoProvider := cf.GetInfoProvider() - if infoProvider == nil { - // The scheduler has not been initialized yet. - continue - } } } diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 3803928185e..a8f5a4d9e11 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -120,7 +120,7 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches o := owner.(*ownerImpl) o.upstreamManager = upstream.NewManager4Test(pdClient) - state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID) + state := orchestrator.NewGlobalStateForTest(etcd.DefaultCDCClusterID) tester := orchestrator.NewReactorStateTester(t, state, nil) // set captures @@ -430,7 +430,7 @@ func TestUpdateGCSafePoint(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) ctx, cancel := cdcContext.WithCancel(ctx) defer cancel() - state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID) + state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0) tester := orchestrator.NewReactorStateTester(t, state, nil) // no changefeed, the gc safe point should be max uint64 @@ -667,7 +667,7 @@ WorkLoop: } func TestCalculateGCSafepointTs(t *testing.T) { - state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID) + state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0) expectMinTsMap := make(map[uint64]uint64) expectForceUpdateMap := make(map[uint64]interface{}) o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)} diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index 08416785177..9d13e3c5cc8 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -227,6 +227,7 @@ func (m *managerImpl) handleCommand(ctx cdcContext.Context) error { for changefeedID := range m.processors { m.closeProcessor(changefeedID, ctx) } + log.Info("All processors are closed in processor manager") // FIXME: we should drain command queue and signal callers an error. return cerrors.ErrReactorFinished case commandTpWriteDebugInfo: diff --git a/cdc/processor/manager_test.go b/cdc/processor/manager_test.go index 6da2ed12439..4835dbd43ff 100644 --- a/cdc/processor/manager_test.go +++ b/cdc/processor/manager_test.go @@ -71,7 +71,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) { checkpointTs: replicaInfo.StartTs, }, nil }, &s.liveness) - s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID) + s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0) captureInfoBytes, err := ctx.GlobalVars().CaptureInfo.Marshal() require.Nil(t, err) s.tester = orchestrator.NewReactorStateTester(t, s.state, map[string]string{ diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 6d073634b26..a695844318f 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -273,6 +273,20 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er zap.Error(err)) m.clearSinkFactory() sinkFactoryErrors = make(chan error, 16) + + start := time.Now() + log.Info("Sink manager is closing all table sinks", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID)) + m.tableSinks.Range(func(key, value interface{}) bool { + value.(*tableSinkWrapper).closeTableSink() + m.sinkMemQuota.ClearTable(key.(model.TableID)) + return true + }) + log.Info("Sink manager has closed all table sinks", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Duration("cost", time.Since(start))) } if !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled { @@ -413,22 +427,17 @@ func (m *SinkManager) backgroundGC(errors chan<- error) { }() } -// generateSinkTasks generates tasks to fetch data from the source manager. -func (m *SinkManager) generateSinkTasks(ctx context.Context) error { - // Task upperbound is limited by barrierTs and schemaResolvedTs. - // But receivedSorterResolvedTs can be less than barrierTs, in which case - // the table is just scheduled to this node. - getUpperBound := func( - tableSinkUpperBoundTs model.Ts, - ) engine.Position { - schemaTs := m.schemaStorage.ResolvedTs() - if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 { - // schemaTs == math.MaxUint64 means it's in tests. - tableSinkUpperBoundTs = schemaTs + 1 - } - return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs} +func (m *SinkManager) getUpperBound(tableSinkUpperBoundTs model.Ts) engine.Position { + schemaTs := m.schemaStorage.ResolvedTs() + if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 { + // schemaTs == math.MaxUint64 means it's in tests. + tableSinkUpperBoundTs = schemaTs + 1 } + return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs} +} +// generateSinkTasks generates tasks to fetch data from the source manager. +func (m *SinkManager) generateSinkTasks(ctx context.Context) error { dispatchTasks := func() error { tables := make([]*tableSinkWrapper, 0, sinkWorkerNum) progs := make([]*progress, 0, sinkWorkerNum) @@ -476,7 +485,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error { tableSink := tables[i] slowestTableProgress := progs[i] lowerBound := slowestTableProgress.nextLowerBoundPos - upperBound := getUpperBound(tableSink.getUpperBoundTs()) + upperBound := m.getUpperBound(tableSink.getUpperBoundTs()) // The table has no available progress. if lowerBound.Compare(upperBound) >= 0 { m.sinkProgressHeap.push(slowestTableProgress) @@ -502,7 +511,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error { t := &sinkTask{ tableID: tableSink.tableID, lowerBound: lowerBound, - getUpperBound: getUpperBound, + getUpperBound: m.getUpperBound, tableSink: tableSink, callback: func(lastWrittenPos engine.Position) { p := &progress{ @@ -566,18 +575,6 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error { } func (m *SinkManager) generateRedoTasks(ctx context.Context) error { - // We use the table's resolved ts as the upper bound to fetch events. - getUpperBound := func(tableSinkUpperBoundTs model.Ts) engine.Position { - // If a task carries events after schemaResolvedTs, mounter group threads - // can be blocked on waiting schemaResolvedTs get advanced. - schemaTs := m.schemaStorage.ResolvedTs() - if tableSinkUpperBoundTs > schemaTs+1 { - tableSinkUpperBoundTs = schemaTs + 1 - } - - return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs} - } - dispatchTasks := func() error { tables := make([]*tableSinkWrapper, 0, redoWorkerNum) progs := make([]*progress, 0, redoWorkerNum) @@ -624,7 +621,7 @@ func (m *SinkManager) generateRedoTasks(ctx context.Context) error { tableSink := tables[i] slowestTableProgress := progs[i] lowerBound := slowestTableProgress.nextLowerBoundPos - upperBound := getUpperBound(tableSink.getReceivedSorterResolvedTs()) + upperBound := m.getUpperBound(tableSink.getReceivedSorterResolvedTs()) // The table has no available progress. if lowerBound.Compare(upperBound) >= 0 { @@ -646,7 +643,7 @@ func (m *SinkManager) generateRedoTasks(ctx context.Context) error { t := &redoTask{ tableID: tableSink.tableID, lowerBound: lowerBound, - getUpperBound: getUpperBound, + getUpperBound: m.getUpperBound, tableSink: tableSink, callback: func(lastWrittenPos engine.Position) { p := &progress{ @@ -840,7 +837,7 @@ func (m *SinkManager) AsyncStopTable(tableID model.TableID) bool { zap.String("changefeed", m.changefeedID.ID), zap.Int64("tableID", tableID)) } - if tableSink.(*tableSinkWrapper).asyncClose() { + if tableSink.(*tableSinkWrapper).asyncStop() { cleanedBytes := m.sinkMemQuota.RemoveTable(tableID) cleanedBytes += m.redoMemQuota.RemoveTable(tableID) log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table", @@ -910,7 +907,7 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState, // again or not if it returns false. So we must retry `tableSink.asyncClose` here // if necessary. It's better to remove the dirty logic in the future. tableSink := wrapper.(*tableSinkWrapper) - if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncClose() { + if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncStop() { cleanedBytes := m.sinkMemQuota.RemoveTable(tableID) cleanedBytes += m.redoMemQuota.RemoveTable(tableID) log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table", @@ -982,14 +979,6 @@ func (m *SinkManager) Close() { zap.String("changefeed", m.changefeedID.ID)) start := time.Now() m.waitSubroutines() - m.tableSinks.Range(func(_, value interface{}) bool { - sink := value.(*tableSinkWrapper) - sink.close() - if m.eventCache != nil { - m.eventCache.removeTable(sink.tableID) - } - return true - }) m.clearSinkFactory() log.Info("Closed sink manager", diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 0b7064568fc..d8b16523963 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -312,7 +312,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // events have been reported. Then we can continue the table // at the checkpoint position. case tablesink.SinkInternalError: - task.tableSink.clearTableSink() + task.tableSink.closeAndClearTableSink() // After the table sink is cleared all pending events are sent out or dropped. // So we can re-add the table into sinkMemQuota. w.sinkMemQuota.ClearTable(task.tableSink.tableID) diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 3472d617755..558d83076d4 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -50,7 +50,7 @@ type tableSinkWrapper struct { // tableSink is the underlying sink. tableSink tablesink.TableSink tableSinkCheckpointTs model.ResolvedTs - tableSinkMu sync.Mutex + tableSinkMu sync.RWMutex // state used to control the lifecycle of the table. state *tablepb.TableState @@ -282,21 +282,15 @@ func (t *tableSinkWrapper) markAsClosed() { } } -func (t *tableSinkWrapper) asyncClose() bool { +func (t *tableSinkWrapper) asyncStop() bool { t.markAsClosing() - if t.asyncClearTableSink() { + if t.asyncCloseAndClearTableSink() { t.markAsClosed() return true } return false } -func (t *tableSinkWrapper) close() { - t.markAsClosing() - t.clearTableSink() - t.markAsClosed() -} - // Return true means the internal table sink has been initialized. func (t *tableSinkWrapper) initTableSink() bool { t.tableSinkMu.Lock() @@ -308,23 +302,38 @@ func (t *tableSinkWrapper) initTableSink() bool { return true } -func (t *tableSinkWrapper) asyncClearTableSink() bool { - t.tableSinkMu.Lock() - defer t.tableSinkMu.Unlock() - if t.tableSink != nil { - if !t.tableSink.AsyncClose() { - return false - } - checkpointTs := t.tableSink.GetCheckpointTs() - if t.tableSinkCheckpointTs.Less(checkpointTs) { - t.tableSinkCheckpointTs = checkpointTs - } - t.tableSink = nil +func (t *tableSinkWrapper) asyncCloseTableSink() bool { + t.tableSinkMu.RLock() + defer t.tableSinkMu.RUnlock() + if t.tableSink == nil { + return true } - return true + return t.tableSink.AsyncClose() +} + +func (t *tableSinkWrapper) closeTableSink() { + t.tableSinkMu.RLock() + defer t.tableSinkMu.RUnlock() + if t.tableSink == nil { + return + } + t.tableSink.Close() +} + +func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool { + closed := t.asyncCloseTableSink() + if closed { + t.doTableSinkClear() + } + return closed +} + +func (t *tableSinkWrapper) closeAndClearTableSink() { + t.closeTableSink() + t.doTableSinkClear() } -func (t *tableSinkWrapper) clearTableSink() { +func (t *tableSinkWrapper) doTableSinkClear() { t.tableSinkMu.Lock() defer t.tableSinkMu.Unlock() if t.tableSink != nil { diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 209801244b5..18e8302b103 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -65,6 +65,22 @@ func (m *mockSink) Dead() <-chan struct{} { return make(chan struct{}) } +type mockDelayedTableSink struct { + tablesink.TableSink + + closeCnt int + closeTarget int +} + +func (t *mockDelayedTableSink) AsyncClose() bool { + t.closeCnt++ + if t.closeCnt >= t.closeTarget { + t.TableSink.Close() + return true + } + return false +} + //nolint:unparam func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.TableID) (*tableSinkWrapper, *mockSink) { tableState := tablepb.TableStatePreparing @@ -84,13 +100,26 @@ func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.Table return wrapper, sink } -func TestTableSinkWrapperClose(t *testing.T) { +func TestTableSinkWrapperStop(t *testing.T) { t.Parallel() wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1) + wrapper.tableSink = &mockDelayedTableSink{ + TableSink: wrapper.tableSink, + closeCnt: 0, + closeTarget: 10, + } require.Equal(t, tablepb.TableStatePreparing, wrapper.getState()) - wrapper.close() + + closeCnt := 0 + for { + closeCnt++ + if wrapper.asyncStop() { + break + } + } require.Equal(t, tablepb.TableStateStopped, wrapper.getState(), "table sink state should be stopped") + require.Equal(t, 10, closeCnt, "table sink should be closed 10 times") } func TestUpdateReceivedSorterResolvedTs(t *testing.T) { diff --git a/cdc/sinkv2/tablesink/table_sink_impl.go b/cdc/sinkv2/tablesink/table_sink_impl.go index affe7af76a4..a096725659e 100644 --- a/cdc/sinkv2/tablesink/table_sink_impl.go +++ b/cdc/sinkv2/tablesink/table_sink_impl.go @@ -130,7 +130,9 @@ func (e *EventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) error // GetCheckpointTs returns the checkpoint ts of the table sink. func (e *EventTableSink[E]) GetCheckpointTs() model.ResolvedTs { if e.state.Load() == state.TableSinkStopping { - e.progressTracker.checkClosed(e.backendSink.Dead()) + if e.progressTracker.checkClosed(e.backendSink.Dead()) { + e.markAsClosed() + } } return e.progressTracker.advance() } diff --git a/cdc/sinkv2/tablesink/table_sink_impl_test.go b/cdc/sinkv2/tablesink/table_sink_impl_test.go index 8b0c2e1200a..d42c282946c 100644 --- a/cdc/sinkv2/tablesink/table_sink_impl_test.go +++ b/cdc/sinkv2/tablesink/table_sink_impl_test.go @@ -366,26 +366,14 @@ func TestCheckpointTsFrozenWhenStopping(t *testing.T) { require.Nil(t, err) require.Len(t, sink.events, 7, "all events should be flushed") - go func() { - time.Sleep(time.Millisecond * 10) - sink.Close() - }() + // Table sink close should return even if callbacks are not called, + // because the backend sink is closed. + sink.Close() + tb.Close() - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - tb.Close() - }() - require.Eventually(t, func() bool { - return state.TableSinkStopping == tb.state.Load() - }, time.Second, time.Microsecond, "table should be stopping") - wg.Add(1) - go func() { - defer wg.Done() - currentTs := tb.GetCheckpointTs() - sink.acknowledge(105) - require.Equal(t, currentTs, tb.GetCheckpointTs(), "checkpointTs should not be updated") - }() - wg.Wait() + require.Equal(t, state.TableSinkStopped, tb.state.Load()) + + currentTs := tb.GetCheckpointTs() + sink.acknowledge(105) + require.Equal(t, currentTs, tb.GetCheckpointTs(), "checkpointTs should not be updated") } diff --git a/errors.toml b/errors.toml index 89125e14737..bb647eabc15 100755 --- a/errors.toml +++ b/errors.toml @@ -396,6 +396,11 @@ error = ''' event is larger than the total memory quota, size: %d, quota: %d ''' +["CDC:ErrGCTTLExceeded"] +error = ''' +the checkpoint-ts(%d) lag of the changefeed(%s) has exceeded the GC TTL and the changefeed is blocking global GC progression +''' + ["CDC:ErrGRPCDialFailed"] error = ''' grpc dial failed diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 09cab2eae4b..00313654619 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -756,6 +756,11 @@ var ( " caused by GC. checkpoint-ts %d is earlier than or equal to GC safepoint at %d", errors.RFCCodeText("CDC:ErrSnapshotLostByGC"), ) + ErrGCTTLExceeded = errors.Normalize( + "the checkpoint-ts(%d) lag of the changefeed(%s) has exceeded "+ + "the GC TTL and the changefeed is blocking global GC progression", + errors.RFCCodeText("CDC:ErrGCTTLExceeded"), + ) ErrNotOwner = errors.Normalize( "this capture is not a owner", errors.RFCCodeText("CDC:ErrNotOwner"), diff --git a/pkg/errors/helper.go b/pkg/errors/helper.go index 2ddb072a48d..bd2cac03ec5 100644 --- a/pkg/errors/helper.go +++ b/pkg/errors/helper.go @@ -39,7 +39,7 @@ func WrapError(rfcError *errors.Error, err error, args ...interface{}) error { // wants to replicate has been or will be GC. So it makes no sense to try to // resume the changefeed, and the changefeed should immediately be failed. var changeFeedFastFailError = []*errors.Error{ - ErrSnapshotLostByGC, ErrStartTsBeforeGC, + ErrGCTTLExceeded, ErrSnapshotLostByGC, ErrStartTsBeforeGC, } // IsChangefeedFastFailError checks if an error is a ChangefeedFastFailError diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index f71f425ac1c..0a1e9e67115 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -232,7 +232,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if err != nil { // This error means owner is resigned by itself, // and we should exit etcd worker and campaign owner again. - return nil + return err } } @@ -512,6 +512,7 @@ func (worker *EtcdWorker) applyUpdates() error { return errors.Trace(err) } } + worker.state.UpdatePendingChange() worker.pendingUpdates = worker.pendingUpdates[:0] return nil diff --git a/pkg/orchestrator/etcd_worker_bank_test.go b/pkg/orchestrator/etcd_worker_bank_test.go index 6448521974d..81438e7f02d 100644 --- a/pkg/orchestrator/etcd_worker_bank_test.go +++ b/pkg/orchestrator/etcd_worker_bank_test.go @@ -43,6 +43,9 @@ type bankReactorState struct { const bankTestPrefix = "/ticdc/test/bank/" +func (b *bankReactorState) UpdatePendingChange() { +} + func (b *bankReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { require.True(b.t, strings.HasPrefix(key.String(), bankTestPrefix)) indexStr := key.String()[len(bankTestPrefix):] diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index e4038c487f7..95c49907b00 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -147,6 +147,9 @@ func (s *simpleReactorState) SetSum(sum int) { s.patches = append(s.patches, patch) } +func (s *simpleReactorState) UpdatePendingChange() { +} + func (s *simpleReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { subMatches := keyParseRegexp.FindSubmatch(key.Bytes()) if len(subMatches) != 2 { @@ -283,6 +286,9 @@ type intReactorState struct { lastVal int } +func (s *intReactorState) UpdatePendingChange() { +} + func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { var err error s.val, err = strconv.Atoi(string(value)) @@ -372,6 +378,9 @@ type commonReactorState struct { pendingPatches []DataPatch } +func (s *commonReactorState) UpdatePendingChange() { +} + func (s *commonReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { s.state[key.String()] = string(value) return nil diff --git a/pkg/orchestrator/interfaces.go b/pkg/orchestrator/interfaces.go index 20452cd6f26..7bb6d7cf3e0 100644 --- a/pkg/orchestrator/interfaces.go +++ b/pkg/orchestrator/interfaces.go @@ -35,6 +35,9 @@ type ReactorState interface { // Update is called by EtcdWorker to notify the Reactor of a latest change to the Etcd state. Update(key util.EtcdKey, value []byte, isInit bool) error + // UpdatePendingChange is called by EtcdWorker to notify the Reactor to apply the pending changes. + UpdatePendingChange() + // GetPatches is called by EtcdWorker, and should return many slices of data patches that represents the changes // that a Reactor wants to apply to Etcd. // a slice of DataPatch will be committed as one ETCD txn diff --git a/pkg/orchestrator/reactor_state.go b/pkg/orchestrator/reactor_state.go index 18fb88c353c..17300c59ecb 100644 --- a/pkg/orchestrator/reactor_state.go +++ b/pkg/orchestrator/reactor_state.go @@ -16,6 +16,7 @@ package orchestrator import ( "encoding/json" "reflect" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -26,6 +27,8 @@ import ( "go.uber.org/zap" ) +const defaultCaptureRemoveTTL = 5 + // GlobalReactorState represents a global state which stores all key-value pairs in ETCD type GlobalReactorState struct { ClusterID string @@ -39,16 +42,44 @@ type GlobalReactorState struct { // to be called when captures are added and removed. onCaptureAdded func(captureID model.CaptureID, addr string) onCaptureRemoved func(captureID model.CaptureID) + + captureRemoveTTL int + toRemoveCaptures map[model.CaptureID]time.Time } -// NewGlobalState creates a new global state -func NewGlobalState(clusterID string) *GlobalReactorState { +// NewGlobalState creates a new global state. +func NewGlobalState(clusterID string, captureSessionTTL int) *GlobalReactorState { + captureRemoveTTL := captureSessionTTL / 2 + if captureRemoveTTL < defaultCaptureRemoveTTL { + captureRemoveTTL = defaultCaptureRemoveTTL + } return &GlobalReactorState{ - ClusterID: clusterID, - Owner: map[string]struct{}{}, - Captures: make(map[model.CaptureID]*model.CaptureInfo), - Upstreams: make(map[model.UpstreamID]*model.UpstreamInfo), - Changefeeds: make(map[model.ChangeFeedID]*ChangefeedReactorState), + ClusterID: clusterID, + Owner: map[string]struct{}{}, + Captures: make(map[model.CaptureID]*model.CaptureInfo), + Upstreams: make(map[model.UpstreamID]*model.UpstreamInfo), + Changefeeds: make(map[model.ChangeFeedID]*ChangefeedReactorState), + captureRemoveTTL: captureRemoveTTL, + toRemoveCaptures: make(map[model.CaptureID]time.Time), + } +} + +// NewGlobalStateForTest creates a new global state for test. +func NewGlobalStateForTest(clusterID string) *GlobalReactorState { + return NewGlobalState(clusterID, 0) +} + +// UpdatePendingChange implements the ReactorState interface +func (s *GlobalReactorState) UpdatePendingChange() { + for c, t := range s.toRemoveCaptures { + if time.Since(t) >= time.Duration(s.captureRemoveTTL)*time.Second { + log.Info("remote capture offline", zap.Any("info", s.Captures[c])) + delete(s.Captures, c) + if s.onCaptureRemoved != nil { + s.onCaptureRemoved(c) + } + delete(s.toRemoveCaptures, c) + } } } @@ -59,6 +90,7 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro if err != nil { return errors.Trace(err) } + switch k.Tp { case etcd.CDCKeyTypeOwner: if value != nil { @@ -69,11 +101,8 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro return nil case etcd.CDCKeyTypeCapture: if value == nil { - log.Info("remote capture offline", zap.Any("info", s.Captures[k.CaptureID])) - delete(s.Captures, k.CaptureID) - if s.onCaptureRemoved != nil { - s.onCaptureRemoved(k.CaptureID) - } + log.Info("remote capture offline detected", zap.Any("info", s.Captures[k.CaptureID])) + s.toRemoveCaptures[k.CaptureID] = time.Now() return nil } @@ -174,6 +203,10 @@ func NewChangefeedReactorState(clusterID string, } } +// UpdatePendingChange implements the ReactorState interface +func (s *ChangefeedReactorState) UpdatePendingChange() { +} + // Update implements the ReactorState interface func (s *ChangefeedReactorState) Update(key util.EtcdKey, value []byte, _ bool) error { k := new(etcd.CDCKey) diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index 0eddd6e023e..da217d9066a 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -427,10 +427,13 @@ func TestPatchTaskPosition(t *testing.T) { } func TestGlobalStateUpdate(t *testing.T) { + t.Parallel() + testCases := []struct { updateKey []string updateValue []string expected GlobalReactorState + timeout int }{ { // common case updateKey: []string{ @@ -512,13 +515,14 @@ func TestGlobalStateUpdate(t *testing.T) { `55551111`, `{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300"}`, `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713, -"admin-job-type":0}`, + "admin-job-type":0}`, `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713, -"admin-job-type":0}`, + "admin-job-type":0}`, ``, ``, ``, }, + timeout: 6, expected: GlobalReactorState{ ClusterID: etcd.DefaultCDCClusterID, Owner: map[string]struct{}{"22317526c4fc9a38": {}}, @@ -540,7 +544,7 @@ func TestGlobalStateUpdate(t *testing.T) { }, } for _, tc := range testCases { - state := NewGlobalState(etcd.DefaultCDCClusterID) + state := NewGlobalState(etcd.DefaultCDCClusterID, 10) for i, k := range tc.updateKey { value := []byte(tc.updateValue[i]) if len(value) == 0 { @@ -549,13 +553,17 @@ func TestGlobalStateUpdate(t *testing.T) { err := state.Update(util.NewEtcdKey(k), value, false) require.Nil(t, err) } + time.Sleep(time.Duration(tc.timeout) * time.Second) + state.UpdatePendingChange() require.True(t, cmp.Equal(state, &tc.expected, cmpopts.IgnoreUnexported(GlobalReactorState{}, ChangefeedReactorState{})), cmp.Diff(state, &tc.expected, cmpopts.IgnoreUnexported(GlobalReactorState{}, ChangefeedReactorState{}))) } } func TestCaptureChangeHooks(t *testing.T) { - state := NewGlobalState(etcd.DefaultCDCClusterID) + t.Parallel() + + state := NewGlobalState(etcd.DefaultCDCClusterID, 10) var callCount int state.onCaptureAdded = func(captureID model.CaptureID, addr string) { @@ -579,13 +587,18 @@ func TestCaptureChangeHooks(t *testing.T) { etcd.CaptureInfoKeyPrefix(etcd.DefaultCDCClusterID)+"/capture-1"), captureInfoBytes, false) require.Nil(t, err) - require.Equal(t, callCount, 1) + require.Eventually(t, func() bool { + return callCount == 1 + }, time.Second*3, 10*time.Millisecond) err = state.Update(util.NewEtcdKey( etcd.CaptureInfoKeyPrefix(etcd.DefaultCDCClusterID)+"/capture-1"), nil /* delete */, false) require.Nil(t, err) - require.Equal(t, callCount, 2) + require.Eventually(t, func() bool { + state.UpdatePendingChange() + return callCount == 2 + }, time.Second*10, 10*time.Millisecond) } func TestCheckChangefeedNormal(t *testing.T) { diff --git a/pkg/p2p/server.go b/pkg/p2p/server.go index 7bcf2c87a77..207f9bf9a97 100755 --- a/pkg/p2p/server.go +++ b/pkg/p2p/server.go @@ -378,6 +378,7 @@ func (m *MessageServer) deregisterPeerByID(ctx context.Context, peerID string) { m.peerLock.Unlock() if !ok { log.Warn("peer not found", zap.String("peerID", peerID)) + return } m.deregisterPeer(ctx, peer, nil) } diff --git a/pkg/txnutil/gc/gc_manager.go b/pkg/txnutil/gc/gc_manager.go index 349e32b20b0..639aba6e2be 100644 --- a/pkg/txnutil/gc/gc_manager.go +++ b/pkg/txnutil/gc/gc_manager.go @@ -28,11 +28,6 @@ import ( "go.uber.org/zap" ) -// gcTTL is the duration during which data related to a -// failed feed will be retained, and beyond which point the data will be deleted -// by garbage collection. -const gcTTL = 24 * time.Hour - // gcSafepointUpdateInterval is the minimum interval that CDC can update gc safepoint var gcSafepointUpdateInterval = 1 * time.Minute @@ -57,6 +52,7 @@ type gcManager struct { lastUpdatedTime time.Time lastSucceededTime time.Time lastSafePointTs uint64 + isTiCDCBlockGC bool } // NewManager creates a new Manager. @@ -103,6 +99,10 @@ func (m *gcManager) TryUpdateGCSafePoint( log.Warn("update gc safe point failed, the gc safe point is larger than checkpointTs", zap.Uint64("actual", actual), zap.Uint64("checkpointTs", checkpointTs)) } + // if the min checkpoint ts is equal to the current gc safe point, it + // means that the service gc safe point set by TiCDC is the min service + // gc safe point + m.isTiCDCBlockGC = actual == checkpointTs m.lastSafePointTs = actual m.lastSucceededTime = time.Now() return nil @@ -112,13 +112,30 @@ func (m *gcManager) CheckStaleCheckpointTs( ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts, ) error { gcSafepointUpperBound := checkpointTs - 1 - // if there is another service gc point less than the min checkpoint ts. - if gcSafepointUpperBound < m.lastSafePointTs { - return cerror.ErrSnapshotLostByGC. - GenWithStackByArgs( - checkpointTs, - m.lastSafePointTs, - ) + if m.isTiCDCBlockGC { + pdTime, err := m.pdClock.CurrentTime() + if err != nil { + return err + } + if pdTime.Sub( + oracle.GetTimeFromTS(gcSafepointUpperBound), + ) > time.Duration(m.gcTTL)*time.Second { + return cerror.ErrGCTTLExceeded. + GenWithStackByArgs( + checkpointTs, + changefeedID, + ) + } + } else { + // if `isTiCDCBlockGC` is false, it means there is another service gc + // point less than the min checkpoint ts. + if gcSafepointUpperBound < m.lastSafePointTs { + return cerror.ErrSnapshotLostByGC. + GenWithStackByArgs( + checkpointTs, + m.lastSafePointTs, + ) + } } return nil } @@ -139,5 +156,5 @@ func (m *gcManager) IgnoreFailedChangeFeed( gcSafepointUpperBound := checkpointTs - 1 return pdTime.Sub( oracle.GetTimeFromTS(gcSafepointUpperBound), - ) > gcTTL + ) > time.Duration(m.gcTTL)*time.Second } diff --git a/pkg/txnutil/gc/gc_manager_test.go b/pkg/txnutil/gc/gc_manager_test.go index fddd5a094cb..990c936faa5 100644 --- a/pkg/txnutil/gc/gc_manager_test.go +++ b/pkg/txnutil/gc/gc_manager_test.go @@ -112,6 +112,7 @@ func TestIgnoreFailedFeed(t *testing.T) { pdClock := pdutil.NewClock4Test() gcManager := NewManager(etcd.GcServiceIDForTest(), mockPDClient, pdClock).(*gcManager) + gcManager.gcTTL = 24 * 60 * 60 // 5 hours ago ts1 := oracle.GoTimeToTS(time.Now().Add(-time.Hour * 5))