Skip to content

Commit

Permalink
Remove setDeadline.
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Schreiber <arthurschreiber@github.com>
  • Loading branch information
arthurschreiber committed Nov 9, 2023
1 parent 1bd34db commit a0f8d66
Showing 1 changed file with 53 additions and 54 deletions.
107 changes: 53 additions & 54 deletions go/vt/vttablet/tabletserver/connpool/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,38 @@ func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfi

defer dbc.stats.MySQLTimings.Record("Exec", time.Now())

done, wg := dbc.setDeadline(ctx)
qr, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields)
resultChan := make(chan *sqltypes.Result, 1)
errChan := make(chan error, 1)

startTime := time.Now()
go func() {
result, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields)
if err != nil {
errChan <- err
} else {
resultChan <- result
}
}()

if done != nil {
close(done)
wg.Wait()
var err error
var result *sqltypes.Result

select {
case <-ctx.Done():
killCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(startTime))
return nil, dbc.Err()
case err = <-errChan:
case result = <-resultChan:
}
if dbcerr := dbc.Err(); dbcerr != nil {
return nil, dbcerr

if dbcErr := dbc.Err(); dbcErr != nil {
return nil, dbcErr
}
return qr, err

return result, err
}

// ExecOnce executes the specified query, but does not retry on connection errors.
Expand Down Expand Up @@ -254,16 +275,29 @@ func (dbc *Conn) streamOnce(ctx context.Context, query string, callback func(*sq
dbc.current.Store(query)
defer dbc.current.Store("")

done, wg := dbc.setDeadline(ctx)
err := dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize)
errChan := make(chan error, 1)
startTime := time.Now()

go func() {
errChan <- dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize)
}()

if done != nil {
close(done)
wg.Wait()
var err error

select {
case <-ctx.Done():
killCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(startTime))
return dbc.Err()
case err = <-errChan:
}
if dbcerr := dbc.Err(); dbcerr != nil {
return dbcerr

if dbcErr := dbc.Err(); dbcErr != nil {
return dbcErr
}

return err
}

Expand Down Expand Up @@ -409,6 +443,10 @@ func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed tim
select {
case <-ctx.Done():
killConn.Close()

dbc.stats.InternalErrors.Add("HungQuery", 1)
log.Warningf("Query may be hung: %s", dbc.CurrentForLogging())

return context.Cause(ctx)
case err := <-errChan:
log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(), dbc.CurrentForLogging(), err)
Expand Down Expand Up @@ -462,45 +500,6 @@ func (dbc *Conn) Reconnect(ctx context.Context) error {
return nil
}

// setDeadline starts a goroutine that will kill the currently executing query
// if the deadline is exceeded. It returns a channel and a waitgroup. After the
// query is done executing, the caller is required to close the done channel
// and wait for the waitgroup to make sure that the necessary cleanup is done.
func (dbc *Conn) setDeadline(ctx context.Context) (chan bool, *sync.WaitGroup) {
if ctx.Done() == nil {
return nil, nil
}
done := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
startTime := time.Now()
select {
case <-ctx.Done():
dbc.Kill(ctx.Err().Error(), time.Since(startTime))
case <-done:
return
}
elapsed := time.Since(startTime)

// Give 2x the elapsed time and some buffer as grace period
// for the query to get killed.
tmr2 := time.NewTimer(2*elapsed + 5*time.Second)
defer tmr2.Stop()
select {
case <-tmr2.C:
dbc.stats.InternalErrors.Add("HungQuery", 1)
log.Warningf("Query may be hung: %s", dbc.CurrentForLogging())
case <-done:
return
}
<-done
log.Warningf("Hung query returned")
}()
return done, &wg
}

// CurrentForLogging applies transformations to the query making it suitable to log.
// It applies sanitization rules based on tablet settings and limits the max length of
// queries.
Expand Down

0 comments on commit a0f8d66

Please sign in to comment.