Skip to content

Commit

Permalink
add a warmup time option to discovery (#465)
Browse files Browse the repository at this point in the history
When there are a lot more targets than the number of connections in the pool,
then it's possible that if the list of hosts changes, the builder might pick a
totally new set of hosts than the previously selected ones, none of which will
have established subconns.

Instead of giving this new list to the picker immediately, first combine it
with the list of hosts that were previously selected, so that those subconns
have some time to warm up while the current set is still in the list.
  • Loading branch information
demmer authored and dedelala committed Jul 30, 2024
1 parent 231d7a8 commit e08ecae
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
37 changes: 28 additions & 9 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ const PoolTypeAttr = "PoolType"

// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type JSONGateResolver struct {
target resolver.Target
clientConn resolver.ClientConn
poolType string
target resolver.Target
clientConn resolver.ClientConn
poolType string
currentAddrs []resolver.Address
}

func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {}
Expand Down Expand Up @@ -210,15 +211,24 @@ func (b *JSONGateResolverBuilder) start() error {
}
parseCount.Add("changed", 1)

var wg sync.WaitGroup

// notify all the resolvers that the targets changed in parallel, since each update might sleep for
// the warmup time
b.mu.RLock()
// notify all the resolvers that the targets changed
for _, r := range b.resolvers {
err = b.update(r)
if err != nil {
log.Errorf("Failed to update resolver: %v", err)
}
wg.Add(1)
go func(r *JSONGateResolver) {
defer wg.Done()

err = b.update(r)
if err != nil {
log.Errorf("Failed to update resolver: %v", err)
}
}(r)
}
b.mu.RUnlock()
wg.Wait()
}
}()

Expand Down Expand Up @@ -393,8 +403,17 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) error {
addrs = append(addrs, resolver.Address{Addr: target.Addr, Attributes: attributes.New(PoolTypeAttr, r.poolType)})
}

log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets)
// If we've already selected some targets, give the new addresses some time to warm up before removing
// the old ones from the list
if r.currentAddrs != nil && warmupTime.Seconds() > 0 {
combined := append(r.currentAddrs, addrs...)
log.V(100).Infof("updating targets for %s to warmup %v", r.target.URL.String(), targets)
r.clientConn.UpdateState(resolver.State{Addresses: combined})
time.Sleep(*warmupTime)
}

log.V(100).Infof("updating targets for %s after warmup to %v", r.target.URL.String(), targets)
r.currentAddrs = addrs
return r.clientConn.UpdateState(resolver.State{Addresses: addrs})
}

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ var (
addressField = flag.String("address_field", "address", "field name in the json file containing the address")
portField = flag.String("port_field", "port", "field name in the json file containing the port")
balancerType = flag.String("balancer", "round_robin", "load balancing algorithm to use")
warmupTime = flag.Duration("warmup_time", 30*time.Second, "time to maintain connections to previously selected hosts")

timings = stats.NewTimings("Timings", "proxy timings by operation", "operation")

Expand Down

0 comments on commit e08ecae

Please sign in to comment.