From 22cebb65b1cfc221e52425c9072d88de8e92fbe1 Mon Sep 17 00:00:00 2001 From: frairon Date: Wed, 18 Sep 2024 12:52:03 +0200 Subject: [PATCH] add recover-forever option --- options.go | 10 +++ partition_processor.go | 22 +++++- processor.go | 26 +++++-- systemtest/processor_test.go | 143 +++++++++++++++++++++++++++++++++-- 4 files changed, 188 insertions(+), 13 deletions(-) diff --git a/options.go b/options.go index 1e0771cf..f94a7c6d 100644 --- a/options.go +++ b/options.go @@ -147,6 +147,7 @@ type poptions struct { backoffResetTime time.Duration hotStandby bool recoverAhead bool + recoverForever bool producerDefaultHeaders Headers builders struct { @@ -274,6 +275,15 @@ func WithRecoverAhead() ProcessorOption { } } +// WithRecoverForever configures the processor to recover joins and the processor table forever, without ever joining a group. +// Using this option is highly experimental and is typically used for cluster-migration-scenarios where mirror-maker cannot synchronize +// the consumer-group-offsets. +func WithRecoverForever() ProcessorOption { + return func(o *poptions, gg *GroupGraph) { + o.recoverForever = true + } +} + // WithGroupGraphHook allows a function to obtain the group graph when a processor is started. func WithGroupGraphHook(hook func(gg *GroupGraph)) ProcessorOption { return func(o *poptions, gg *GroupGraph) { diff --git a/partition_processor.go b/partition_processor.go index c9b19afe..765d3ec5 100644 --- a/partition_processor.go +++ b/partition_processor.go @@ -36,6 +36,8 @@ const ( runModePassive // the processor only recovers once and then stops. This is used for recover-ahead-option runModeRecoverOnly + // the processor only recovers forever, never stops. This is used for the RecoverForever option to keep the processor busy + runModeRecoverForever ) type visit struct { @@ -211,7 +213,15 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error { setupErrg.Go(func() error { pp.log.Debugf("catching up table") defer pp.log.Debugf("catching up table done") - return pp.table.SetupAndRecover(setupCtx, false) + + err := pp.table.SetupAndRecover(setupCtx, false) + if err != nil { + return err + } + if pp.runMode == runModeRecoverForever { + return pp.table.CatchupForever(setupCtx, false) + } + return nil }) } @@ -229,7 +239,15 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error { pp.joins[join.Topic()] = table setupErrg.Go(func() error { - return table.SetupAndRecover(setupCtx, false) + err := table.SetupAndRecover(setupCtx, false) + if err != nil { + return err + } + + if pp.runMode == runModeRecoverForever { + return table.CatchupForever(setupCtx, false) + } + return nil }) } diff --git a/processor.go b/processor.go index 32e4e24c..88b1722f 100644 --- a/processor.go +++ b/processor.go @@ -323,6 +323,11 @@ func (g *Processor) Run(ctx context.Context) (rerr error) { return fmt.Errorf("error waiting for start up tables: %w", err) } + if ctx.Err() != nil { + g.log.Printf("Shutting down processor before it starts, context was cancelled") + return nil + } + // run the main rebalance-consume-loop errg.Go(func() error { return g.rebalanceLoop(ctx) @@ -466,17 +471,22 @@ func (g *Processor) waitForStartupTables(ctx context.Context) error { } g.mTables.RUnlock() - // If we recover ahead, we'll also start all partition processors once in recover-only-mode + // If we recover ahead (or forever), we'll also start all partition processors once in recover-only-mode (or recover-forever-mode) // and do the same boilerplate to keep the waitmap up to date. - if g.opts.recoverAhead { + if g.opts.recoverAhead || g.opts.recoverForever { + + mode := runModeRecoverOnly + if g.opts.recoverForever { + mode = runModeRecoverForever + } partitions, err := g.findStatefulPartitions() if err != nil { return fmt.Errorf("error finding dependent partitions: %w", err) } for _, part := range partitions { part := part - pproc, err := g.createPartitionProcessor(ctx, part, runModeRecoverOnly, func(msg *message, meta string) { - panic("a partition processor in recover-only-mode never commits a message") + pproc, err := g.createPartitionProcessor(ctx, part, mode, func(msg *message, meta string) { + panic("a partition processor in recover-only-mode/recover-forever-mode never commits a message") }) if err != nil { return fmt.Errorf("Error creating partition processor for recover-ahead %s/%d: %v", g.Graph().Group(), part, err) @@ -517,7 +527,13 @@ func (g *Processor) waitForStartupTables(ctx context.Context) error { select { // the context has closed, no point in waiting case <-ctx.Done(): - g.log.Debugf("Stopping to wait for views to get up, context closed") + g.log.Debugf("Stopping to wait for tables to get up, context closed") + + // if we're in recover-forever-mode, this is the normal way of shutdown, so no error here. + if g.opts.recoverForever { + return nil + } + return fmt.Errorf("context closed while waiting for startup tables to become ready") // the error group is done, which means diff --git a/systemtest/processor_test.go b/systemtest/processor_test.go index c6abd702..04a222d4 100644 --- a/systemtest/processor_test.go +++ b/systemtest/processor_test.go @@ -6,6 +6,7 @@ import ( "log" "os" "strings" + "sync/atomic" "testing" "time" @@ -241,9 +242,7 @@ func TestRecoverAhead(t *testing.T) { return proc2.Run(ctx) }) - pollTimed(t, "procs 1&2 recovered", func() bool { - return true - }, proc1.Recovered, proc2.Recovered) + pollTimed(t, "procs 1&2 recovered", proc1.Recovered, proc2.Recovered) // check the storages that were initalized by the processors: // both have each 2 storages, because all tables only have 1 partition @@ -259,9 +258,6 @@ func TestRecoverAhead(t *testing.T) { // wait until the keys are present pollTimed(t, "key-values are present", - func() bool { - return true - }, func() bool { has, _ := tableStorage1.Has("key1") return has @@ -297,6 +293,141 @@ func TestRecoverAhead(t *testing.T) { require.NoError(t, errg.Wait().ErrorOrNil()) } +func TestRecoverForever(t *testing.T) { + brokers := initSystemTest(t) + var ( + group = goka.Group(fmt.Sprintf("goka-systemtest-recoverforever-%d", time.Now().Unix())) + inputStream = fmt.Sprintf("%s-input", group) + table = string(goka.GroupTable(group)) + ) + + tmc := goka.NewTopicManagerConfig() + tmc.Table.Replication = 1 + tmc.Stream.Replication = 1 + cfg := goka.DefaultConfig() + cfg.Consumer.Offsets.Initial = sarama.OffsetOldest + goka.ReplaceGlobalConfig(cfg) + + tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers) + require.NoError(t, err) + + err = tm.EnsureStreamExists(inputStream, 1) + require.NoError(t, err) + err = tm.EnsureTableExists(string(table), 1) + require.NoError(t, err) + + // emit something into the state table (like simulating a processor ctx.SetValue()). + // Our test processors should update their value in the join-table + tableEmitter, err := goka.NewEmitter(brokers, goka.Stream(table), new(codec.String)) + require.NoError(t, err) + require.NoError(t, tableEmitter.EmitSync("key1", "tableval1")) + require.NoError(t, tableEmitter.Finish()) + + // emit an input-message + inputEmitter, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String)) + require.NoError(t, err) + require.NoError(t, inputEmitter.EmitSync("key1", "input-value")) + require.NoError(t, inputEmitter.Finish()) + + storageTracker := newStorageTracker() + + var ( + processed atomic.Int64 + itemsRecovered atomic.Int64 + ) + + createProc := func(recoverForever bool) *goka.Processor { + opts := []goka.ProcessorOption{ + + goka.WithUpdateCallback(func(ctx goka.UpdateContext, s storage.Storage, key string, value []byte) error { + itemsRecovered.Add(1) + return goka.DefaultUpdate(ctx, s, key, value) + }), + goka.WithStorageBuilder(storageTracker.Build), + } + if recoverForever { + opts = append(opts, goka.WithRecoverForever()) + } + proc, err := goka.NewProcessor(brokers, + goka.DefineGroup( + group, + goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { + processed.Add(1) + }), + goka.Persist(new(codec.String)), + ), + opts..., + ) + require.NoError(t, err) + return proc + } + + proc1 := createProc(true) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errg, ctx := multierr.NewErrGroup(ctx) + + errg.Go(func() error { + return proc1.Run(ctx) + }) + + pollTimed(t, "procs 1 storage initialized", func() bool { + return len(storageTracker.storages) == 1 + }) + + // get the corresponding storages for both table and join-partitions + tableStorage1 := storageTracker.storages[storageTracker.key(string(table), 0)] + + // wait until the keys are present + pollTimed(t, "key-values are present", + func() bool { + has, _ := tableStorage1.Has("key1") + return has + }, + ) + + // check the table-values + val1, _ := tableStorage1.Get("key1") + require.Equal(t, "tableval1", string(val1)) + + // stop everything and wait until it's shut down + cancel() + require.NoError(t, errg.Wait().ErrorOrNil()) + + require.EqualValues(t, 0, processed.Load()) + require.EqualValues(t, 1, itemsRecovered.Load()) + + // run processor a second time, without recover forever + itemsRecovered.Store(0) + proc2 := createProc(false) + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + errg, ctx = multierr.NewErrGroup(ctx) + errg.Go(func() error { + return proc2.Run(ctx) + }) + + pollTimed(t, "procs 2 storage initialized", func() bool { + return len(storageTracker.storages) == 1 + }) + + pollTimed(t, "procs recovered", proc2.Recovered) + + // wait until the input is actually processed + pollTimed(t, "input processed", func() bool { + return processed.Load() == 1 + }) + + // at this point we know the processor started, recovered, and consumed the one message in the input-table. + // Now make sure the processor did not recover again (because it did already in the first run) + require.EqualValues(t, 0, itemsRecovered.Load()) + + // stop everything and wait until it's shut down + cancel() + require.NoError(t, errg.Wait().ErrorOrNil()) +} + // TestRebalance runs some processors to test rebalance. It's merely a // runs-without-errors test, not a real functional test. func TestRebalance(t *testing.T) {