Skip to content

Commit

Permalink
VReplication: improve reliability of log management (#15374)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Mar 10, 2024
1 parent 7de897f commit b99e150
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 58 deletions.
31 changes: 31 additions & 0 deletions go/textutil/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package textutil

import (
"fmt"
"net/url"
"regexp"
"strings"
Expand All @@ -28,6 +29,13 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

type TruncationLocation int

const (
TruncationLocationMiddle TruncationLocation = iota
TruncationLocationEnd
)

var (
delimitedListRegexp = regexp.MustCompile(`[ ,;]+`)
SimulatedNullString = sqltypes.NULL.String()
Expand Down Expand Up @@ -133,3 +141,26 @@ func Title(s string) string {
},
s)
}

// TruncateText truncates the provided text, if needed, to the specified maximum
// length using the provided truncation indicator in place of the truncated text
// in the specified location of the original string.
func TruncateText(text string, limit int, location TruncationLocation, indicator string) (string, error) {
ol := len(text)
if ol <= limit {
return text, nil
}
if len(indicator)+2 >= limit {
return "", fmt.Errorf("the truncation indicator is too long for the provided text")
}
switch location {
case TruncationLocationMiddle:
prefix := (limit / 2) - len(indicator)
suffix := (ol - (prefix + len(indicator))) + 1
return fmt.Sprintf("%s%s%s", text[:prefix], indicator, text[suffix:]), nil
case TruncationLocationEnd:
return text[:limit-(len(indicator)+1)] + indicator, nil
default:
return "", fmt.Errorf("invalid truncation location: %d", location)
}
}
77 changes: 77 additions & 0 deletions go/textutil/strings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ limitations under the License.
package textutil

import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -208,3 +210,78 @@ func TestTitle(t *testing.T) {
})
}
}

func TestTruncateText(t *testing.T) {
defaultLocation := TruncationLocationMiddle
defaultMaxLen := 100
defaultTruncationIndicator := "..."

tests := []struct {
name string
text string
maxLen int
location TruncationLocation
truncationIndicator string
want string
wantErr string
}{
{
name: "no truncation",
text: "hello world",
maxLen: defaultMaxLen,
location: defaultLocation,
want: "hello world",
},
{
name: "no truncation - exact",
text: strings.Repeat("a", defaultMaxLen),
maxLen: defaultMaxLen,
location: defaultLocation,
want: strings.Repeat("a", defaultMaxLen),
},
{
name: "barely too long - mid",
text: strings.Repeat("a", defaultMaxLen+1),
truncationIndicator: defaultTruncationIndicator,
maxLen: defaultMaxLen,
location: defaultLocation,
want: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa...aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
},
{
name: "barely too long - end",
text: strings.Repeat("a", defaultMaxLen+1),
truncationIndicator: defaultTruncationIndicator,
maxLen: defaultMaxLen,
location: TruncationLocationEnd,
want: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa...",
},
{
name: "too small",
text: strings.Repeat("a", defaultMaxLen),
truncationIndicator: defaultTruncationIndicator,
maxLen: 4,
location: defaultLocation,
wantErr: "the truncation indicator is too long for the provided text",
},
{
name: "bad location",
text: strings.Repeat("a", defaultMaxLen+1),
truncationIndicator: defaultTruncationIndicator,
maxLen: defaultMaxLen,
location: 100,
wantErr: "invalid truncation location: 100",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
val, err := TruncateText(tt.text, tt.maxLen, tt.location, tt.truncationIndicator)
if tt.wantErr != "" {
require.EqualError(t, err, tt.wantErr)
} else {
require.NoError(t, err)
require.Equal(t, tt.want, val)
require.LessOrEqual(t, len(val), tt.maxLen)
}
})
}
}
8 changes: 8 additions & 0 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ import (
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/throttler"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand All @@ -65,8 +67,14 @@ var (
BlplTransaction = "Transaction"
// BlplBatchTransaction is the key for the stats map.
BlplBatchTransaction = "BatchTransaction"

// Truncate values in the middle to preserve the end of the message which
// typically contains the error text.
TruncationLocation = textutil.TruncationLocationMiddle
)

var TruncationIndicator = fmt.Sprintf(" ... %s ... ", sqlparser.TruncationText)

// Stats is the internal stats of a player. It is a different
// structure that is passed in so stats can be collected over the life
// of multiple individual players.
Expand Down
9 changes: 7 additions & 2 deletions go/vt/binlog/binlogplayer/dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -129,10 +130,14 @@ func LogError(msg string, err error) {

// LimitString truncates string to specified size
func LimitString(s string, limit int) string {
if len(s) > limit {
ts, err := textutil.TruncateText(s, limit, TruncationLocation, TruncationIndicator)
if err != nil { // Fallback to simple truncation
if len(s) <= limit {
return s
}
return s[:limit]
}
return s
return ts
}

func (dc *dbClientImpl) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) {
Expand Down
14 changes: 14 additions & 0 deletions go/vt/binlog/binlogplayer/mock_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,17 @@ func (dc *MockDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltype
}
return results, nil
}

// AddInvariant can be used to customize the behavior of the mock client.
func (dc *MockDBClient) AddInvariant(query string, result *sqltypes.Result) {
dc.expectMu.Lock()
defer dc.expectMu.Unlock()
dc.invariants[query] = result
}

// RemoveInvariant can be used to customize the behavior of the mock client.
func (dc *MockDBClient) RemoveInvariant(query string) {
dc.expectMu.Lock()
defer dc.expectMu.Unlock()
delete(dc.invariants, query)
}
3 changes: 2 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,8 @@ ORDER BY

if stream.Id > streamLog.StreamId {
log.Warningf("Found stream log for nonexistent stream: %+v", streamLog)
break
// This can happen on manual/failed workflow cleanup so keep going.
continue
}

// stream.Id == streamLog.StreamId
Expand Down
12 changes: 3 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return nil, err
}
vre.controllers[id] = ct
if err := insertLogWithParams(vdbc, LogStreamCreate, id, params); err != nil {
return nil, err
}
insertLogWithParams(vdbc, LogStreamCreate, id, params)
}
return qr, nil
case updateQuery:
Expand Down Expand Up @@ -475,9 +473,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return nil, err
}
vre.controllers[id] = ct
if err := insertLog(vdbc, LogStateChange, id, params["state"], ""); err != nil {
return nil, err
}
insertLog(vdbc, LogStateChange, id, params["state"], "")
}
return qr, nil
case deleteQuery:
Expand All @@ -495,9 +491,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
ct.Stop()
delete(vre.controllers, id)
}
if err := insertLogWithParams(vdbc, LogStreamDelete, id, nil); err != nil {
return nil, err
}
insertLogWithParams(vdbc, LogStreamDelete, id, nil)
}
if err := dbClient.Begin(); err != nil {
return nil, err
Expand Down
32 changes: 20 additions & 12 deletions go/vt/vttablet/tabletmanager/vreplication/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
Expand All @@ -33,6 +35,8 @@ import (

const (
vreplicationLogTableName = "vreplication_log"
// This comes from the fact that the message column in the vreplication_log table is of type TEXT.
maxVReplicationLogMessageLen = 65535
)

const (
Expand Down Expand Up @@ -82,46 +86,50 @@ func getLastLog(dbClient *vdbClient, vreplID int32) (id int64, typ, state, messa
return id, typ, state, message, nil
}

func insertLog(dbClient *vdbClient, typ string, vreplID int32, state, message string) error {
func insertLog(dbClient *vdbClient, typ string, vreplID int32, state, message string) {
// getLastLog returns the last log for a stream. During insertion, if the type/state/message match we do not insert
// a new log but increment the count. This prevents spamming of the log table in case the same message is logged continuously.
id, _, lastLogState, lastLogMessage, err := getLastLog(dbClient, vreplID)
if err != nil {
return err
log.Errorf("Could not insert vreplication_log record because we failed to get the last log record: %v", err)
return
}
if typ == LogStateChange && state == lastLogState {
// handles case where current state is Running, controller restarts after an error and initializes the state Running
return nil
return
}
var query string
if id > 0 && message == lastLogMessage {
query = fmt.Sprintf("update %s.vreplication_log set count = count + 1 where id = %d", sidecar.GetIdentifier(), id)
} else {
buf := sqlparser.NewTrackedBuffer(nil)
if len(message) > maxVReplicationLogMessageLen {
message, err = textutil.TruncateText(message, maxVReplicationLogMessageLen, binlogplayer.TruncationLocation, binlogplayer.TruncationIndicator)
if err != nil {
log.Errorf("Could not insert vreplication_log record because we failed to truncate the message: %v", err)
return
}
}
buf.Myprintf("insert into %s.vreplication_log(vrepl_id, type, state, message) values(%s, %s, %s, %s)",
sidecar.GetIdentifier(), strconv.Itoa(int(vreplID)), encodeString(typ), encodeString(state), encodeString(message))
query = buf.ParsedQuery().Query
}
if _, err = dbClient.ExecuteFetch(query, 10000); err != nil {
return fmt.Errorf("could not insert into log table: %v: %v", query, err)
log.Errorf("Could not insert into vreplication_log table: %v: %v", query, err)
}
return nil
}

// insertLogWithParams is called when a stream is created. The attributes of the stream are stored as a json string
func insertLogWithParams(dbClient *vdbClient, action string, vreplID int32, params map[string]string) error {
// insertLogWithParams is called when a stream is created. The attributes of the stream are stored as a json string.
func insertLogWithParams(dbClient *vdbClient, action string, vreplID int32, params map[string]string) {
var message string
if params != nil {
obj, _ := json.Marshal(params)
message = string(obj)
}
if err := insertLog(dbClient, action, vreplID, params["state"], message); err != nil {
return err
}
return nil
insertLog(dbClient, action, vreplID, params["state"], message)
}

// isUnrecoverableError returns true if vreplication cannot recover from the given error and should completely terminate
// isUnrecoverableError returns true if vreplication cannot recover from the given error and should completely terminate.
func isUnrecoverableError(err error) bool {
if err == nil {
return false
Expand Down
Loading

0 comments on commit b99e150

Please sign in to comment.