Skip to content

Commit

Permalink
VReplication: use proper column collations in vstreamer (#15313)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Feb 26, 2024
1 parent 47e1375 commit 8de1c91
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 84 deletions.
9 changes: 9 additions & 0 deletions go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package mysql
import (
"fmt"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

Expand Down Expand Up @@ -216,6 +218,13 @@ type TableMap struct {
// - If the metadata is one byte, only the lower 8 bits are used.
// - If the metadata is two bytes, all 16 bits are used.
Metadata []uint16

// ColumnCollationIDs contains information about the inherited
// or implied column default collation and any explicit per-column
// override for text based columns ONLY. This means that the
// array position needs to be mapped to the ordered list of
// text based columns in the table.
ColumnCollationIDs []collations.ID
}

// Rows contains data from a {WRITE,UPDATE,DELETE}_ROWS_EVENT.
Expand Down
15 changes: 9 additions & 6 deletions go/mysql/binlog_event_make_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"

"vitess.io/vitess/go/mysql/binlog"
Expand Down Expand Up @@ -222,6 +223,7 @@ func TestTableMapEvent(t *testing.T) {
0,
384, // Length of the varchar field.
},
ColumnCollationIDs: []collations.ID{},
}
tm.CanBeNull.Set(1, true)
tm.CanBeNull.Set(2, true)
Expand Down Expand Up @@ -258,12 +260,13 @@ func TestLargeTableMapEvent(t *testing.T) {
}

tm := &TableMap{
Flags: 0x8090,
Database: "my_database",
Name: "my_table",
Types: types,
CanBeNull: NewServerBitmap(colLen),
Metadata: metadata,
Flags: 0x8090,
Database: "my_database",
Name: "my_table",
Types: types,
CanBeNull: NewServerBitmap(colLen),
Metadata: metadata,
ColumnCollationIDs: []collations.ID{},
}
tm.CanBeNull.Set(1, true)
tm.CanBeNull.Set(2, true)
Expand Down
87 changes: 78 additions & 9 deletions go/mysql/binlog_event_rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,35 @@ import (
"encoding/binary"

"vitess.io/vitess/go/mysql/binlog"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// These are the TABLE_MAP_EVENT's optional metadata field types from
// MySQL's libbinlogevents/include/rows_event.h.
// See also: https://dev.mysql.com/doc/dev/mysql-server/8.0.34/structbinary__log_1_1Table__map__event_1_1Optional__metadata__fields.html
const (
tableMapSignedness uint8 = iota + 1
tableMapDefaultCharset
tableMapColumnCharset
tableMapColumnName
tableMapSetStrValue
tableMapEnumStrValue
tableMapGeometryType
tableMapSimplePrimaryKey
tableMapPrimaryKeyWithPrefix
tableMapEnumAndSetDefaultCharset
tableMapEnumAndSetColumnCharset
tableMapColumnVisibility
)

// This byte in the optional metadata indicates that we should
// read the next 2 bytes as a collation ID.
const readTwoByteCollationID = 252

// TableMap implements BinlogEvent.TableMap().
//
// Expected format (L = total length of event data):
Expand All @@ -43,6 +66,7 @@ import (
// cc column-def, one byte per column
// <var> column-meta-def (var-len encoded string)
// n NULL-bitmask, length: (cc + 7) / 8
// n Optional Metadata
func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) {
data := ev.Bytes()[f.HeaderLength:]

Expand All @@ -64,7 +88,7 @@ func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) {

columnCount, read, ok := readLenEncInt(data, pos)
if !ok {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)
}
pos = read

Expand All @@ -73,7 +97,7 @@ func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) {

metaLen, read, ok := readLenEncInt(data, pos)
if !ok {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected metadata length at position %v (data=%v)", pos, data)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected metadata length at position %v (data=%v)", pos, data)
}
pos = read

Expand All @@ -88,11 +112,19 @@ func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) {
}
}
if pos != expectedEnd {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected metadata end: got %v was expecting %v (data=%v)", pos, expectedEnd, data)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected metadata end: got %v was expecting %v (data=%v)", pos, expectedEnd, data)
}

// A bit array that says if each column can be NULL.
result.CanBeNull, _ = newBitmap(data, pos, int(columnCount))
result.CanBeNull, read = newBitmap(data, pos, int(columnCount))
pos = read

// Read any text based column collation values provided in the optional metadata.
// The binlog_row_metadata only contains this info for text based columns.
var err error
if result.ColumnCollationIDs, err = readColumnCollationIDs(data, pos, int(columnCount)); err != nil {
return nil, err
}

return result, nil
}
Expand All @@ -118,7 +150,7 @@ func metadataLength(typ byte) int {

default:
// Unknown type. This is used in tests only, so panic.
panic(vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataLength: unhandled data type: %v", typ))
panic(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataLength: unhandled data type: %v", typ))
}
}

Expand Down Expand Up @@ -154,7 +186,7 @@ func metadataRead(data []byte, pos int, typ byte) (uint16, int, error) {

default:
// Unknown types, we can't go on.
return 0, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ)
return 0, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ)
}
}

Expand Down Expand Up @@ -185,8 +217,45 @@ func metadataWrite(data []byte, pos int, typ byte, value uint16) int {

default:
// Unknown type. This is used in tests only, so panic.
panic(vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ))
panic(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ))
}
}

// readColumnCollationIDs reads from the optional metadata that exists.
// See: https://github.com/mysql/mysql-server/blob/8.0/libbinlogevents/include/rows_event.h
// What's included depends on the server configuration:
// https://dev.mysql.com/doc/refman/en/replication-options-binary-log.html#sysvar_binlog_row_metadata
// and the table definition.
// We only care about any collation IDs in the optional metadata and
// this info is provided in all binlog_row_metadata formats. Note that
// this info is only provided for text based columns.
func readColumnCollationIDs(data []byte, pos, count int) ([]collations.ID, error) {
collationIDs := make([]collations.ID, 0, count)
for pos < len(data) {
fieldType := uint8(data[pos])
pos++

fieldLen, read, ok := readLenEncInt(data, pos)
if !ok {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "error reading optional metadata field length")
}
pos = read

fieldVal := data[pos : pos+int(fieldLen)]
pos += int(fieldLen)

if fieldType == tableMapDefaultCharset || fieldType == tableMapColumnCharset { // It's one or the other
for i := uint64(0); i < fieldLen; i++ {
v := uint16(fieldVal[i])
if v == readTwoByteCollationID { // The ID is the subsequent 2 bytes
v = binary.LittleEndian.Uint16(fieldVal[i+1 : i+3])
i += 2
}
collationIDs = append(collationIDs, collations.ID(v))
}
}
}
return collationIDs, nil
}

// Rows implements BinlogEvent.TableMap().
Expand Down Expand Up @@ -235,7 +304,7 @@ func (ev binlogEvent) Rows(f BinlogFormat, tm *TableMap) (Rows, error) {

columnCount, read, ok := readLenEncInt(data, pos)
if !ok {
return result, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)
return result, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)
}
pos = read

Expand Down
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func testPlayerCopyCharPK(t *testing.T) {
defer func() { waitRetryTime = savedWaitRetryTime }()

execStatements(t, []string{
"create table src(idc binary(2) , val int, primary key(idc))",
"create table src(idc binary(2), val int, primary key(idc))",
"insert into src values('a', 1), ('c', 2)",
fmt.Sprintf("create table %s.dst(idc binary(2), val int, primary key(idc))", vrepldb),
})
Expand Down Expand Up @@ -215,7 +215,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
defer func() { waitRetryTime = savedWaitRetryTime }()

execStatements(t, []string{
"create table src(idc varchar(20), val int, primary key(idc))",
"create table src(idc varchar(20), val int, primary key(idc)) character set utf8mb3", // Use utf8mb3 to get a consistent default collation across MySQL versions
"insert into src values('a', 1), ('c', 2)",
fmt.Sprintf("create table %s.dst(idc varchar(20), val int, primary key(idc))", vrepldb),
})
Expand Down Expand Up @@ -285,7 +285,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
"/update _vt.vreplication set state='Copying'",
// Copy mode.
"insert into dst(idc,val) values ('a',1)",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:\\"a\\"}'.*`,
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:\\"a\\"}'.*`,
// Copy-catchup mode.
`/insert into dst\(idc,val\) select 'B', 3 from dual where \( .* 'B' COLLATE .* \) <= \( .* 'a' COLLATE .* \)`,
).Then(func(expect qh.ExpectationSequencer) qh.ExpectationSequencer {
Expand All @@ -295,11 +295,11 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
//upd1 := expect.
upd1 := expect.Then(qh.Eventually(
"insert into dst(idc,val) values ('B',3)",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:\\"B\\"}'.*`,
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:\\"B\\"}'.*`,
))
upd2 := expect.Then(qh.Eventually(
"insert into dst(idc,val) values ('c',2)",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:\\"c\\"}'.*`,
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:\\"c\\"}'.*`,
))
upd1.Then(upd2.Eventually())
return upd2
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/tabletserver/repltracker/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ import (
"time"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/schema/historian.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

"vitess.io/vitess/go/vt/sqlparser"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

const getInitialSchemaVersions = "select id, pos, ddl, time_updated, schemax from %s.schema_version where time_updated > %d order by id asc"
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ import (

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)

// VStreamer defines the functions of VStreamer
// VStreamer defines the functions of VStreamer
// that the replicationWatcher needs.
type VStreamer interface {
Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error
Expand Down
Loading

0 comments on commit 8de1c91

Please sign in to comment.