Allow cancelling blocking queries in response to shutting down.
This commit is contained in:
parent
341aedbce9
commit
90e741c6d2
|
@ -4424,7 +4424,8 @@ func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Dura
|
||||||
// If we are not blocking we can skip tracking and allocating - nil WatchSet
|
// If we are not blocking we can skip tracking and allocating - nil WatchSet
|
||||||
// is still valid to call Add on and will just be a no op.
|
// is still valid to call Add on and will just be a no op.
|
||||||
var ws memdb.WatchSet
|
var ws memdb.WatchSet
|
||||||
var timeout *time.Timer
|
var ctx context.Context = &lib.StopChannelContext{StopCh: a.shutdownCh}
|
||||||
|
shouldBlock := false
|
||||||
|
|
||||||
if alwaysBlock || hash != "" {
|
if alwaysBlock || hash != "" {
|
||||||
if wait == 0 {
|
if wait == 0 {
|
||||||
|
@ -4435,7 +4436,11 @@ func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Dura
|
||||||
}
|
}
|
||||||
// Apply a small amount of jitter to the request.
|
// Apply a small amount of jitter to the request.
|
||||||
wait += lib.RandomStagger(wait / 16)
|
wait += lib.RandomStagger(wait / 16)
|
||||||
timeout = time.NewTimer(wait)
|
var cancel func()
|
||||||
|
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(wait))
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
shouldBlock = true
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -4453,7 +4458,7 @@ func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Dura
|
||||||
// WatchSet immediately returns false which would incorrectly cause this to
|
// WatchSet immediately returns false which would incorrectly cause this to
|
||||||
// loop and repeat again, however we rely on the invariant that ws == nil
|
// loop and repeat again, however we rely on the invariant that ws == nil
|
||||||
// IFF timeout == nil in which case the Watch call is never invoked.
|
// IFF timeout == nil in which case the Watch call is never invoked.
|
||||||
if timeout == nil || hash != curHash || ws.Watch(timeout.C) {
|
if !shouldBlock || hash != curHash || ws.WatchCtx(ctx) != nil {
|
||||||
return curHash, curResp, err
|
return curHash, curResp, err
|
||||||
}
|
}
|
||||||
// Watch returned false indicating a change was detected, loop and repeat
|
// Watch returned false indicating a change was detected, loop and repeat
|
||||||
|
@ -4465,7 +4470,7 @@ func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Dura
|
||||||
if syncPauseCh := a.SyncPausedCh(); syncPauseCh != nil {
|
if syncPauseCh := a.SyncPausedCh(); syncPauseCh != nil {
|
||||||
select {
|
select {
|
||||||
case <-syncPauseCh:
|
case <-syncPauseCh:
|
||||||
case <-timeout.C:
|
case <-ctx.Done():
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -752,7 +753,9 @@ type queryFn func(memdb.WatchSet, *state.Store) error
|
||||||
|
|
||||||
// blockingQuery is used to process a potentially blocking query operation.
|
// blockingQuery is used to process a potentially blocking query operation.
|
||||||
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
|
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
|
||||||
var timeout *time.Timer
|
var cancel func()
|
||||||
|
var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh}
|
||||||
|
|
||||||
var queriesBlocking uint64
|
var queriesBlocking uint64
|
||||||
var queryTimeout time.Duration
|
var queryTimeout time.Duration
|
||||||
|
|
||||||
|
@ -776,9 +779,9 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
|
||||||
// Apply a small amount of jitter to the request.
|
// Apply a small amount of jitter to the request.
|
||||||
queryTimeout += lib.RandomStagger(queryTimeout / jitterFraction)
|
queryTimeout += lib.RandomStagger(queryTimeout / jitterFraction)
|
||||||
|
|
||||||
// Setup a query timeout.
|
// wrap the base context with a deadline
|
||||||
timeout = time.NewTimer(queryTimeout)
|
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(queryTimeout))
|
||||||
defer timeout.Stop()
|
defer cancel()
|
||||||
|
|
||||||
// instrument blockingQueries
|
// instrument blockingQueries
|
||||||
// atomic inc our server's count of in-flight blockingQueries and store the new value
|
// atomic inc our server's count of in-flight blockingQueries and store the new value
|
||||||
|
@ -833,7 +836,9 @@ RUN_QUERY:
|
||||||
}
|
}
|
||||||
// block up to the timeout if we don't see anything fresh.
|
// block up to the timeout if we don't see anything fresh.
|
||||||
if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex {
|
if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex {
|
||||||
if expired := ws.Watch(timeout.C); !expired {
|
if err := ws.WatchCtx(ctx); err == nil {
|
||||||
|
// a non-nil error only occurs when the context is cancelled
|
||||||
|
|
||||||
// If a restore may have woken us up then bail out from
|
// If a restore may have woken us up then bail out from
|
||||||
// the query immediately. This is slightly race-ey since
|
// the query immediately. This is slightly race-ey since
|
||||||
// this might have been interrupted for other reasons,
|
// this might have been interrupted for other reasons,
|
||||||
|
|
Loading…
Reference in a new issue