diff --git a/api/api.go b/api/api.go index 1c9f1e2b7..ff06c5cc1 100644 --- a/api/api.go +++ b/api/api.go @@ -105,6 +105,10 @@ type QueryOptions struct { // relayed back to the sender through N other random nodes. Must be // a value from 0 to 5 (inclusive). RelayFactor uint8 + + // Context (optional) is passed through to the underlying http request layer, can be used + // to set timeouts and deadlines as well as to cancel requests + Context context.Context } // WriteOptions are used to parameterize a write @@ -457,6 +461,7 @@ type request struct { body io.Reader header http.Header obj interface{} + ctx context.Context } // setQueryOptions is used to annotate the request with @@ -494,6 +499,7 @@ func (r *request) setQueryOptions(q *QueryOptions) { if q.RelayFactor != 0 { r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor))) } + r.ctx = q.Context } // durToMsec converts a duration to a millisecond specified string. If the @@ -569,8 +575,11 @@ func (r *request) toHTTP() (*http.Request, error) { if r.config.HttpAuth != nil { req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password) } - - return req, nil + if r.ctx != nil { + return req.WithContext(r.ctx), nil + } else { + return req, nil + } } // newRequest is used to create a new request diff --git a/watch/funcs.go b/watch/funcs.go index 0fd7fdb9e..738343009 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -1,6 +1,7 @@ package watch import ( + "context" "fmt" consulapi "github.com/hashicorp/consul/api" @@ -41,7 +42,8 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) { } fn := func(p *Plan) (uint64, interface{}, error) { kv := p.client.KV() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + opts := makeQueryOptionsWithContext(p, stale) + defer p.cancelFunc() pair, meta, err := kv.Get(key, &opts) if err != nil { return 0, nil, err @@ -70,7 +72,8 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) { } fn := func(p *Plan) (uint64, interface{}, error) { kv := p.client.KV() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + opts := makeQueryOptionsWithContext(p, stale) + defer p.cancelFunc() pairs, meta, err := kv.List(prefix, &opts) if err != nil { return 0, nil, err @@ -89,7 +92,8 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { catalog := p.client.Catalog() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + opts := makeQueryOptionsWithContext(p, stale) + defer p.cancelFunc() services, meta, err := catalog.Services(&opts) if err != nil { return 0, nil, err @@ -108,7 +112,8 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { catalog := p.client.Catalog() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + opts := makeQueryOptionsWithContext(p, stale) + defer p.cancelFunc() nodes, meta, err := catalog.Nodes(&opts) if err != nil { return 0, nil, err @@ -144,7 +149,8 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { health := p.client.Health() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + opts := makeQueryOptionsWithContext(p, stale) + defer p.cancelFunc() nodes, meta, err := health.Service(service, tag, passingOnly, &opts) if err != nil { return 0, nil, err @@ -177,7 +183,8 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { health := p.client.Health() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + opts := makeQueryOptionsWithContext(p, stale) + defer p.cancelFunc() var checks []*consulapi.HealthCheck var meta *consulapi.QueryMeta var err error @@ -205,7 +212,8 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { event := p.client.Event() - opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + opts := makeQueryOptionsWithContext(p, false) + defer p.cancelFunc() events, meta, err := event.List(name, &opts) if err != nil { return 0, nil, err @@ -222,3 +230,10 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) { } return fn, nil } + +func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions { + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + return opts +} diff --git a/watch/plan.go b/watch/plan.go index 2b92ca94a..c925425f6 100644 --- a/watch/plan.go +++ b/watch/plan.go @@ -107,6 +107,9 @@ func (p *Plan) Stop() { return } p.stop = true + if p.cancelFunc != nil { + p.cancelFunc() + } close(p.stopCh) } diff --git a/watch/watch.go b/watch/watch.go index 79a8bbcde..bfc33628a 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -1,6 +1,7 @@ package watch import ( + "context" "fmt" "io" "sync" @@ -27,9 +28,10 @@ type Plan struct { lastIndex uint64 lastResult interface{} - stop bool - stopCh chan struct{} - stopLock sync.Mutex + stop bool + stopCh chan struct{} + stopLock sync.Mutex + cancelFunc context.CancelFunc } // WatcherFunc is used to watch for a diff