Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal: employ Core sequencer for pglogical & mylogical #1060

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 85 additions & 12 deletions internal/conveyor/conveyor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,54 @@
tableAcceptor *apply.Acceptor // Writes batches of mutations into target tables.
watchers types.Watchers // Target schema access.

batchReader types.BatchReader

mu struct {
sync.RWMutex
targets ident.SchemaMap[*Conveyor] // Conveyor cache
}
}

func (c *Conveyors) GetCheckPointer(schema ident.Schema) (*CheckPointer, error) {

Check failure on line 62 in internal/conveyor/conveyor.go

View workflow job for this annotation

GitHub Actions / go-tests / Code Quality

exported method Conveyors.GetCheckPointer should have comment or be unexported
w, err := c.watchers.Get(schema)
if err != nil {
return nil, err
}

var tables []ident.Table
for tbl := range w.Get().Columns.Keys() {
tables = append(tables, tbl)
}

tableGroup := &types.TableGroup{
Enclosing: schema,
Name: ident.New(schema.Raw()),
Tables: tables,
}

ret := &CheckPointer{
watcher: w,
}

var opts []checkpoint.Option
if c.cfg.DisableCheckpointStream {
opts = append(opts, checkpoint.DisableStream())
}
if l := c.cfg.LimitLookahead; l > 0 {
opts = append(opts, checkpoint.LimitLookahead(l))
}
if s := c.cfg.SkipBackwardsDataCheck; s {
opts = append(opts, checkpoint.SkipBackwardsDataCheck())
}

ret.checkpoint, err = c.checkpoints.Start(c.stopper, tableGroup, &ret.resolvingRange, opts...)
if err != nil {
return nil, err
}

return ret, nil
}

// Get returns a conveyor for a specific schema.
func (c *Conveyors) Get(schema ident.Schema) (*Conveyor, error) {
c.mu.RLock()
Expand Down Expand Up @@ -120,25 +162,28 @@
return nil, err
}

ret.acceptor, ret.stat, err = seq.Start(
labels := []string{c.kind, schema.Raw()}

_, ret.stat, err = seq.Start(
c.stopper,
&sequencer.StartOptions{
Bounds: &ret.resolvingRange,
Delegate: types.OrderedAcceptorFrom(c.tableAcceptor, c.watchers),
Group: tableGroup,
BatchReader: c.batchReader,
Bounds: &ret.resolvingRange,
Delegate: types.OrderedAcceptorFrom(c.tableAcceptor, c.watchers),
Group: tableGroup,
// Add top-of-funnel reporting.
WrapperFunc: func(acc types.MultiAcceptor) {
acc = types.CountingAcceptor(acc,

Check failure on line 176 in internal/conveyor/conveyor.go

View workflow job for this annotation

GitHub Actions / go-tests / Code Quality

this value of acc is never used (SA4006)

Check warning

Code scanning / CodeQL

Useless assignment to local variable Warning

This definition of acc is never used.
mutationsErrorCount.WithLabelValues(labels...),
mutationsReceivedCount.WithLabelValues(labels...),
mutationsSuccessCount.WithLabelValues(labels...),
)
},
})
if err != nil {
return nil, err
}

// Add top-of-funnel reporting.
labels := []string{c.kind, schema.Raw()}
ret.acceptor = types.CountingAcceptor(ret.acceptor,
mutationsErrorCount.WithLabelValues(labels...),
mutationsReceivedCount.WithLabelValues(labels...),
mutationsSuccessCount.WithLabelValues(labels...),
)

// Advance the stored resolved timestamps.
ret.updateResolved(c.stopper)

Expand All @@ -159,6 +204,13 @@
return res
}

// WithBatchReader returns a new Conveyors factory for the given batch reader.
func (c *Conveyors) WithBatchReader(br types.BatchReader) *Conveyors {
res := c.clone()
res.batchReader = br
return res
}

// Bootstrap existing schemas for recovery cases.
func (c *Conveyors) Bootstrap() error {
schemas, err := c.checkpoints.ScanForTargetSchemas(c.stopper)
Expand Down Expand Up @@ -189,6 +241,27 @@
}
}

type CheckPointer struct {

Check failure on line 244 in internal/conveyor/conveyor.go

View workflow job for this annotation

GitHub Actions / go-tests / Code Quality

exported type CheckPointer should have comment or be unexported
checkpoint *checkpoint.Group
watcher types.Watcher
resolvingRange notify.Var[hlc.Range]
}

// Advance the checkpoint for all the named partitions.
func (c *CheckPointer) Advance(ctx context.Context, partition ident.Ident, ts hlc.Time) error {
return c.checkpoint.Advance(ctx, partition, ts)
}

// Ensure that a checkpoint exists for all named partitions.
func (c *CheckPointer) Ensure(ctx context.Context, partitions []ident.Ident) error {
return c.checkpoint.Ensure(ctx, partitions)
}

// Watcher is used for testing to gain access to the underlying schema.
func (c *CheckPointer) Watcher() types.Watcher {
return c.watcher
}

// A Conveyor delivers mutations to a target, possibly asynchronously.
// It provides an abstraction over various delivery strategies and it
// manages checkpoints across multiple partitions for a table group.
Expand Down
6 changes: 6 additions & 0 deletions internal/sequencer/decorators/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
var Set = wire.NewSet(
ProvideMarker,
ProvideOnce,
ProvideRekey,
ProvideRetryTarget,
)

Expand All @@ -44,6 +45,11 @@ func ProvideOnce(pool *types.StagingPool, stagers types.Stagers) *Once {
}
}

// ProvideRekey is called by Wire.
func ProvideRekey(watchers types.Watchers) *Rekey {
return &Rekey{watchers: watchers}
}

// ProvideRetryTarget is called by Wire.
func ProvideRetryTarget(target *types.TargetPool) *RetryTarget {
return &RetryTarget{
Expand Down
224 changes: 224 additions & 0 deletions internal/sequencer/decorators/rekey.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// Copyright 2024 The Cockroach Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package decorators

import (
"bytes"
"context"
"encoding/json"

"github.com/cockroachdb/field-eng-powertools/notify"
"github.com/cockroachdb/field-eng-powertools/stopper"
"github.com/cockroachdb/replicator/internal/sequencer"
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/hlc"
"github.com/cockroachdb/replicator/internal/util/merge"
"github.com/pkg/errors"
)

// Rekey reencodes the [types.Mutation.Key] field as mutations are
// processed, based on the destination table. This is primarily to
// support the pglogical REPLICA IDENTITIY FULL option, but it could be
// used for any other case where the replication key does not actually
// preserve replication identity.
type Rekey struct {
watchers types.Watchers
}

var _ sequencer.Shim = (*Rekey)(nil)

// MultiAcceptor returns a rekeying facade around the acceptor.
func (r *Rekey) MultiAcceptor(acceptor types.MultiAcceptor) types.MultiAcceptor {
return &rekeyAcceptor{
base: base{
multiAcceptor: acceptor,
tableAcceptor: acceptor,
temporalAcceptor: acceptor,
},
Rekey: r,
}
}

// TableAcceptor returns a rekeying facade around the delegate.
func (r *Rekey) TableAcceptor(acceptor types.TableAcceptor) types.TableAcceptor {
return &rekeyAcceptor{
base: base{
tableAcceptor: acceptor,
},
Rekey: r,
}
}

// TemporalAcceptor returns a marking facade around the delegate.
func (r *Rekey) TemporalAcceptor(acceptor types.TemporalAcceptor) types.TemporalAcceptor {
return &rekeyAcceptor{
base: base{
tableAcceptor: acceptor,
temporalAcceptor: acceptor,
},
Rekey: r,
}
}

// Wrap implements [sequencer.Shim].
func (r *Rekey) Wrap(
_ *stopper.Context, delegate sequencer.Sequencer,
) (sequencer.Sequencer, error) {
return &rekeyShim{r, delegate}, nil
}

type rekeyShim struct {
*Rekey
delegate sequencer.Sequencer
}

var _ sequencer.Sequencer = (*rekeyShim)(nil)

// Start will inject the facade at the top of the stack.
func (r *rekeyShim) Start(
ctx *stopper.Context, opts *sequencer.StartOptions,
) (types.MultiAcceptor, *notify.Var[sequencer.Stat], error) {
acc, stat, err := r.delegate.Start(ctx, opts)
return r.MultiAcceptor(acc), stat, err
}

type rekeyAcceptor struct {
*Rekey
base
}

var _ types.MultiAcceptor = (*rekeyAcceptor)(nil)

func (r *rekeyAcceptor) AcceptMultiBatch(
ctx context.Context, batch *types.MultiBatch, opts *types.AcceptOptions,
) error {
if r.multiAcceptor == nil {
return errors.New("not a MultiAcceptor")
}
next, err := r.multi(batch)
if err != nil {
return err
}
return r.multiAcceptor.AcceptMultiBatch(ctx, next, opts)
}

func (r *rekeyAcceptor) AcceptTableBatch(
ctx context.Context, batch *types.TableBatch, opts *types.AcceptOptions,
) error {
if r.tableAcceptor == nil {
return errors.New("not a TableAcceptor")
}
next, err := r.table(batch)
if err != nil {
return err
}
return r.tableAcceptor.AcceptTableBatch(ctx, next, opts)
}

func (r *rekeyAcceptor) AcceptTemporalBatch(
ctx context.Context, batch *types.TemporalBatch, opts *types.AcceptOptions,
) error {
if r.temporalAcceptor == nil {
return errors.New("not a TemporalAcceptor")
}
next, err := r.temporal(batch)
if err != nil {
return err
}
return r.temporalAcceptor.AcceptTemporalBatch(ctx, next, opts)
}

func (r *rekeyAcceptor) multi(batch *types.MultiBatch) (*types.MultiBatch, error) {
ret := batch.Empty()
ret.ByTime = make(map[hlc.Time]*types.TemporalBatch, len(batch.Data))
ret.Data = make([]*types.TemporalBatch, len(batch.Data))

for idx, temp := range batch.Data {
next, err := r.temporal(temp)
if err != nil {
return nil, err
}
ret.ByTime[next.Time] = next
ret.Data[idx] = next
}
return ret, nil
}

func (r *rekeyAcceptor) table(batch *types.TableBatch) (*types.TableBatch, error) {
watcher, err := r.watchers.Get(batch.Table.Schema())
if err != nil {
return nil, err
}
colData, ok := watcher.Get().Columns.Get(batch.Table)
if !ok {
return nil, errors.Errorf("unknown table %s", batch.Table)
}
bagSpec := &merge.BagSpec{Columns: colData}

ret := batch.Empty()

if err := func() error {
for table, mut := range batch.Mutations() {
// Shortest useful json we could decode is {"a":0}
if mut.IsDelete() && len(mut.Data) < 7 {
return ret.Accumulate(table, mut)
}
bag := merge.NewBag(bagSpec)
dec := json.NewDecoder(bytes.NewReader(mut.Data))
dec.UseNumber()
if err := dec.Decode(&bag); err != nil {
return errors.WithStack(err)
}
var jsKey []any
for _, col := range colData {
if !col.Primary {
break
}
keyVal, ok := bag.Get(col.Name)
if !ok {
return errors.Errorf("could not rekey mutation; missing PK column %s", col.Name)
}
jsKey = append(jsKey, keyVal)
}
var err error
mut.Key, err = json.Marshal(jsKey)
if err != nil {
return errors.WithStack(err)
}
ret.Data = append(ret.Data, mut)
return nil
}
return nil
}(); err != nil {
return ret, err
}

return ret, nil
}

func (r *rekeyAcceptor) temporal(batch *types.TemporalBatch) (*types.TemporalBatch, error) {
ret := batch.Empty()
for table, tableBatch := range batch.Data.All() {
var err error
tableBatch, err = r.table(tableBatch)
if err != nil {
return nil, err
}
ret.Data.Put(table, tableBatch)
}
return ret, nil
}
1 change: 1 addition & 0 deletions internal/sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type StartOptions struct {
Delegate types.MultiAcceptor // The acceptor to use when continuing to process mutations.
Group *types.TableGroup // The tables that should be operated on.
MaxDeferred int // Back off after deferring this many mutations.
WrapperFunc func(acc types.MultiAcceptor)
}

// Copy returns a deep copy of the options.
Expand Down
Loading
Loading