Skip to content

Commit

Permalink
Improve handling of vplayer stalls
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Apr 25, 2024
1 parent 1de3daa commit 7be3fd4
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 2 deletions.
19 changes: 19 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/relaylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vreplication

import (
"errors"
"io"
"sync"
"time"
Expand All @@ -26,6 +27,16 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

const (
// At what point should we consider writing to the relay log write stalled
// and return an error. This stall can happen if vplayer is stuck in a
// loop trying to process the previous relay log contents. This can happen
// e.g. if the queries it's executing are doing table scans and it thus
// cannot finish the transaction wrapping the contents before it gets
// termined due to crossing the query server transaction timeout.
sendTimeout = 5 * time.Minute
)

type relayLog struct {
ctx context.Context
maxItems int
Expand Down Expand Up @@ -69,11 +80,19 @@ func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error {
rl.mu.Lock()
defer rl.mu.Unlock()

ctx, cancel := context.WithTimeout(rl.ctx, sendTimeout)
defer cancel()
if err := rl.checkDone(); err != nil {
return err
}
for rl.curSize > rl.maxSize || len(rl.items) >= rl.maxItems {
rl.canAccept.Wait()
select {
case <-ctx.Done():
return errors.New("relay log write stalled")
default:
}
// Be sure that the vplayer contex is not done.
if err := rl.checkDone(); err != nil {
return err
}
Expand Down
14 changes: 13 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vttablet"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -497,8 +498,19 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
return nil
}
}

// The parent context is a cancelable context. We create a new one here with a timeout
// to unblock the case where we're never able to complete applying the current vstream
// packet last read from the relay log.
iCtx, iCancel := context.WithTimeout(ctx, tabletenv.NewCurrentConfig().Oltp.TxTimeout)
defer iCancel()
for i, events := range items {
for j, event := range events {
select {
case <-iCtx.Done():
return errors.New("timed out replaying events in the relay log")
default:
}
if event.Timestamp != 0 {
vp.lastTimestampNs = event.Timestamp * 1e9
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
Expand All @@ -523,7 +535,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
continue
}
}
if err := vp.applyEvent(ctx, event, mustSave); err != nil {
if err := vp.applyEvent(iCtx, event, mustSave); err != nil {
if err != io.EOF {
vp.vr.stats.ErrorCounts.Add([]string{"Apply"}, 1)
log.Errorf("Error applying event: %s", err.Error())
Expand Down
1 change: 0 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ func (vse *Engine) validateBinlogRowImage(ctx context.Context, db dbconfigs.Conn
// Stream starts a new stream.
// This streams events from the binary logs
func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error {

if err := vse.validateBinlogRowImage(ctx, vse.se.GetDBConnector()); err != nil {
return err
}
Expand Down

0 comments on commit 7be3fd4

Please sign in to comment.