diff --git a/examples/local/vstream_client.go b/examples/local/vstream_client.go index 98d2129f898..939178159f4 100644 --- a/examples/local/vstream_client.go +++ b/examples/local/vstream_client.go @@ -38,7 +38,7 @@ import ( */ func main() { ctx := context.Background() - streamCustomer := true + streamCustomer := false var vgtid *binlogdatapb.VGtid if streamCustomer { vgtid = &binlogdatapb.VGtid{ diff --git a/go/vt/schema/parser.go b/go/vt/schema/parser.go index 7ed820a3687..78ec4ec36e6 100644 --- a/go/vt/schema/parser.go +++ b/go/vt/schema/parser.go @@ -19,7 +19,6 @@ package schema import ( "fmt" "regexp" - "strconv" "strings" "vitess.io/vitess/go/sqltypes" @@ -171,12 +170,13 @@ func parseEnumOrSetTokens(enumOrSetValues string) []string { } // ParseEnumOrSetTokensMap parses the comma delimited part of an enum column definition -// and returns a map where ["1"] is the first token, and [""] is th elast token -func ParseEnumOrSetTokensMap(enumOrSetValues string) map[string]string { +// and returns a map where [1] is the first token, and [] is the last. +func ParseEnumOrSetTokensMap(enumOrSetValues string) map[int]string { tokens := parseEnumOrSetTokens(enumOrSetValues) - tokensMap := map[string]string{} + tokensMap := map[int]string{} for i, token := range tokens { - tokensMap[strconv.Itoa(i+1)] = token + // SET and ENUM values are 1 indexed. + tokensMap[i+1] = token } return tokensMap } diff --git a/go/vt/schema/parser_test.go b/go/vt/schema/parser_test.go index d251a195d1d..fe9264c29b9 100644 --- a/go/vt/schema/parser_test.go +++ b/go/vt/schema/parser_test.go @@ -167,12 +167,12 @@ func TestParseEnumTokensMap(t *testing.T) { input := `'x-small','small','medium','large','x-large'` enumTokensMap := ParseEnumOrSetTokensMap(input) - expect := map[string]string{ - "1": "x-small", - "2": "small", - "3": "medium", - "4": "large", - "5": "x-large", + expect := map[int]string{ + 1: "x-small", + 2: "small", + 3: "medium", + 4: "large", + 5: "x-large", } assert.Equal(t, expect, enumTokensMap) } @@ -183,7 +183,7 @@ func TestParseEnumTokensMap(t *testing.T) { } for _, input := range inputs { enumTokensMap := ParseEnumOrSetTokensMap(input) - expect := map[string]string{} + expect := map[int]string{} assert.Equal(t, expect, enumTokensMap) } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index 424daad4871..0b17a718ce7 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -200,7 +200,6 @@ type TablePlan struct { Delete *sqlparser.ParsedQuery MultiDelete *sqlparser.ParsedQuery Fields []*querypb.Field - EnumValuesMap map[string](map[string]string) ConvertIntToEnum map[string]bool // PKReferences is used to check if an event changed // a primary key column (row move). @@ -335,34 +334,6 @@ func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*q // An integer converted to an enum. We must write the textual value of the int. i.e. 0 turns to '0' return sqltypes.StringBindVariable(val.ToString()), nil } - if enumValues, ok := tp.EnumValuesMap[field.Name]; ok && !val.IsNull() { - // The fact that this field has a EnumValuesMap entry, means we must - // use the enum's text value as opposed to the enum's numerical value. - // This may be needed in Online DDL, when the enum column could be modified: - // - Either from ENUM to a text type (VARCHAR/TEXT) - // - Or from ENUM to another ENUM with different value ordering, - // e.g. from `('red', 'green', 'blue')` to `('red', 'blue')`. - // By applying the textual value of an enum we eliminate the ordering concern. - // In non-Online DDL this shouldn't be a concern because the schema is static, - // and so passing the enum's numerical value is sufficient. - enumValue, enumValueOK := enumValues[val.ToString()] - if !enumValueOK { - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Invalid enum value: %v for field %s", val, field.Name) - } - // get the enum text for this val - return sqltypes.StringBindVariable(enumValue), nil - } - if field.Type == querypb.Type_ENUM { - // This is an ENUM w/o a values map, which means that we are most likely using - // the index value -- what is stored and binlogged vs. the list of strings - // defined in the table schema -- and we must use an int bindvar or we'll have - // invalid/incorrect predicates like WHERE enumcol='2'. - // This will be the case when applying binlog events. - enumIndexVal := sqltypes.MakeTrusted(querypb.Type_UINT64, val.Raw()) - if enumIndex, err := enumIndexVal.ToUint64(); err == nil { - return sqltypes.Uint64BindVariable(enumIndex), nil - } - } return sqltypes.ValueBindVariable(*val), nil } diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 17afe030d11..8f156101a7d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -27,7 +27,6 @@ import ( "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet" @@ -231,12 +230,6 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum Match: fromTable, } - enumValuesMap := map[string](map[string]string){} - for k, v := range rule.ConvertEnumToText { - tokensMap := schema.ParseEnumOrSetTokensMap(v) - enumValuesMap[k] = tokensMap - } - if expr, ok := sel.SelectExprs[0].(*sqlparser.StarExpr); ok { // If it's a "select *", we return a partial plan, and complete // it when we get back field info from the stream. @@ -252,7 +245,6 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum SendRule: sendRule, Lastpk: lastpk, Stats: stats, - EnumValuesMap: enumValuesMap, ConvertCharset: rule.ConvertCharset, ConvertIntToEnum: rule.ConvertIntToEnum, CollationEnv: collationEnv, @@ -335,7 +327,6 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum tablePlan := tpb.generate() tablePlan.SendRule = sendRule - tablePlan.EnumValuesMap = enumValuesMap tablePlan.ConvertCharset = rule.ConvertCharset tablePlan.ConvertIntToEnum = rule.ConvertIntToEnum return tablePlan, nil diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index c3e1975c0a1..51f07303163 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -58,6 +58,11 @@ type Plan struct { // of the table. Filters []Filter + // Convert any ordinal values seen in the binlog events for ENUM + // or SET columns to the string value. The map is keyed on the + // column number, with the value being the map of ordinal to string. + EnumValuesMap map[int](map[int]string) + env *vtenv.Environment } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 3aede20f650..e23f3787723 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -21,6 +21,8 @@ import ( "context" "fmt" "io" + "strconv" + "strings" "time" "google.golang.org/protobuf/encoding/prototext" @@ -35,6 +37,7 @@ import ( "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" + schemautils "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet" @@ -753,6 +756,22 @@ func (vs *vstreamer) buildTablePlan(id uint64, tm *mysql.TableMap) (*binlogdatap Plan: plan, TableMap: tm, } + // Add any necessary ENUM and SET ordinal to string mappings. + for i, col := range cols { + if col.Type == querypb.Type_ENUM || col.Type == querypb.Type_SET { + if plan.EnumValuesMap == nil { + plan.EnumValuesMap = make(map[int]map[int]string) + } + // Strip the enum() / set() parts out. + begin := strings.Index(col.ColumnType, "(") + end := strings.LastIndex(col.ColumnType, ")") + if begin == -1 || end == -1 { + return nil, fmt.Errorf("enum or set column %s does not have valid string values: %s", + col.Name, col.ColumnType) + } + plan.EnumValuesMap[i] = schemautils.ParseEnumOrSetTokensMap(col.ColumnType[begin+1 : end]) + } + } return &binlogdatapb.VEvent{ Type: binlogdatapb.VEventType_FIELD, FieldEvent: &binlogdatapb.FieldEvent{ @@ -827,7 +846,7 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er // initially using collations for the column types based on the *connection // collation* and not the actual *column collation*. // But because we now get the correct collation for the actual column from - // mysqld in getExtColsInfo we know this is the correct one for the vstream + // mysqld in getExtColInfos we know this is the correct one for the vstream // target and we use that rather than any that were in the binlog events, // which were for the source and which can be using a different collation // than the target. @@ -835,7 +854,6 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er if err != nil { return nil, err } - return fieldsCopy, nil } @@ -1046,6 +1064,49 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo } pos += l + // Convert the ordinal values in the binlog event for SET and ENUM fields into their + // string representations. + if plan.Table.Fields[colNum].Type == querypb.Type_ENUM { + ordinalValue, err := value.ToInt() + if err != nil { + log.Errorf("extractRowAndFilter: %s, table: %s, colNum: %d, fields: %+v, current values: %+v", + err, plan.Table.Name, colNum, plan.Table.Fields, values) + return false, nil, false, err + } + strVal := plan.EnumValuesMap[colNum][ordinalValue] + value = sqltypes.MakeTrusted(plan.Table.Fields[colNum].Type, []byte(strVal)) + log.Errorf("DEBUG: extractRowAndFilter: mapped string value for col %d: %v", colNum, strVal) + } + if plan.Table.Fields[colNum].Type == querypb.Type_SET { + val := bytes.Buffer{} + // A SET column can have 64 unique values: https://dev.mysql.com/doc/refman/en/set.html + // For this reason the binlog event contains the values encoded as a 64-bit integer. + // This value can then be converted to a binary / base 2 integer where it becomes + // a bitmap of the values specified. + bv, err := value.ToUint64() + if err != nil { + log.Errorf("extractRowAndFilter: %s, table: %s, colNum: %d, fields: %+v, current values: %+v", + err, plan.Table.Name, colNum, plan.Table.Fields, values) + return false, nil, false, err + } + // Convert it to a base2 integer / binary value. Finally, strconv converts this to a + // string of 1s and 0s and we can then loop through the bytes. Note that this map is + // in reverse order as this was a little endian integer. + valMap := strconv.FormatUint(bv, 2) + valLen := len(valMap) + for i := 0; i < valLen; i++ { + if valMap[i] == '1' { + strVal := plan.EnumValuesMap[colNum][valLen-i] + if val.Len() > 0 { + val.WriteByte(',') + } + val.WriteString(strVal) + } + } + value = sqltypes.MakeTrusted(plan.Table.Fields[colNum].Type, val.Bytes()) + log.Errorf("DEBUG: extractRowAndFilter: mapped string value for col %d: %v", colNum, val.String()) + } + charsets[colNum] = collations.ID(plan.Table.Fields[colNum].Charset) values[colNum] = value valueIndex++