Merge pull request #3195 from hashicorp/issue-3018-filehandle-leaks
Fix socket file handle leaks from old blocking queries upon consul reload
This commit is contained in:
commit
793fda1101
11
api/api.go
11
api/api.go
|
@ -105,6 +105,10 @@ type QueryOptions struct {
|
|||
// relayed back to the sender through N other random nodes. Must be
|
||||
// a value from 0 to 5 (inclusive).
|
||||
RelayFactor uint8
|
||||
|
||||
// Context (optional) is passed through to the underlying http request layer, can be used
|
||||
// to set timeouts and deadlines as well as to cancel requests
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
// WriteOptions are used to parameterize a write
|
||||
|
@ -457,6 +461,7 @@ type request struct {
|
|||
body io.Reader
|
||||
header http.Header
|
||||
obj interface{}
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// setQueryOptions is used to annotate the request with
|
||||
|
@ -494,6 +499,7 @@ func (r *request) setQueryOptions(q *QueryOptions) {
|
|||
if q.RelayFactor != 0 {
|
||||
r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor)))
|
||||
}
|
||||
r.ctx = q.Context
|
||||
}
|
||||
|
||||
// durToMsec converts a duration to a millisecond specified string. If the
|
||||
|
@ -569,8 +575,11 @@ func (r *request) toHTTP() (*http.Request, error) {
|
|||
if r.config.HttpAuth != nil {
|
||||
req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password)
|
||||
}
|
||||
|
||||
if r.ctx != nil {
|
||||
return req.WithContext(r.ctx), nil
|
||||
} else {
|
||||
return req, nil
|
||||
}
|
||||
}
|
||||
|
||||
// newRequest is used to create a new request
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package watch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
consulapi "github.com/hashicorp/consul/api"
|
||||
|
@ -41,7 +42,8 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|||
}
|
||||
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||
kv := p.client.KV()
|
||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
||||
opts := makeQueryOptionsWithContext(p, stale)
|
||||
defer p.cancelFunc()
|
||||
pair, meta, err := kv.Get(key, &opts)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
|
@ -70,7 +72,8 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|||
}
|
||||
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||
kv := p.client.KV()
|
||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
||||
opts := makeQueryOptionsWithContext(p, stale)
|
||||
defer p.cancelFunc()
|
||||
pairs, meta, err := kv.List(prefix, &opts)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
|
@ -89,7 +92,8 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|||
|
||||
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||
catalog := p.client.Catalog()
|
||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
||||
opts := makeQueryOptionsWithContext(p, stale)
|
||||
defer p.cancelFunc()
|
||||
services, meta, err := catalog.Services(&opts)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
|
@ -108,7 +112,8 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|||
|
||||
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||
catalog := p.client.Catalog()
|
||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
||||
opts := makeQueryOptionsWithContext(p, stale)
|
||||
defer p.cancelFunc()
|
||||
nodes, meta, err := catalog.Nodes(&opts)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
|
@ -144,7 +149,8 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|||
|
||||
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||
health := p.client.Health()
|
||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
||||
opts := makeQueryOptionsWithContext(p, stale)
|
||||
defer p.cancelFunc()
|
||||
nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
|
@ -177,7 +183,8 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|||
|
||||
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||
health := p.client.Health()
|
||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
||||
opts := makeQueryOptionsWithContext(p, stale)
|
||||
defer p.cancelFunc()
|
||||
var checks []*consulapi.HealthCheck
|
||||
var meta *consulapi.QueryMeta
|
||||
var err error
|
||||
|
@ -205,7 +212,8 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|||
|
||||
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||
event := p.client.Event()
|
||||
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
|
||||
opts := makeQueryOptionsWithContext(p, false)
|
||||
defer p.cancelFunc()
|
||||
events, meta, err := event.List(name, &opts)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
|
@ -222,3 +230,10 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|||
}
|
||||
return fn, nil
|
||||
}
|
||||
|
||||
func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
p.cancelFunc = cancel
|
||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
|
||||
return opts
|
||||
}
|
||||
|
|
|
@ -107,6 +107,9 @@ func (p *Plan) Stop() {
|
|||
return
|
||||
}
|
||||
p.stop = true
|
||||
if p.cancelFunc != nil {
|
||||
p.cancelFunc()
|
||||
}
|
||||
close(p.stopCh)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package watch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
@ -30,6 +31,7 @@ type Plan struct {
|
|||
stop bool
|
||||
stopCh chan struct{}
|
||||
stopLock sync.Mutex
|
||||
cancelFunc context.CancelFunc
|
||||
}
|
||||
|
||||
// WatcherFunc is used to watch for a diff
|
||||
|
|
Loading…
Reference in a new issue