Fix socket file handle leaks from old blocking queries upon consul reload. This fixes issue #3018
This commit is contained in:
parent
5500eb95eb
commit
82bf05c888
13
api/api.go
13
api/api.go
|
@ -105,6 +105,10 @@ type QueryOptions struct {
|
||||||
// relayed back to the sender through N other random nodes. Must be
|
// relayed back to the sender through N other random nodes. Must be
|
||||||
// a value from 0 to 5 (inclusive).
|
// a value from 0 to 5 (inclusive).
|
||||||
RelayFactor uint8
|
RelayFactor uint8
|
||||||
|
|
||||||
|
// Context (optional) is passed through to the underlying http request layer, can be used
|
||||||
|
// to set timeouts and deadlines as well as to cancel requests
|
||||||
|
Context context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteOptions are used to parameterize a write
|
// WriteOptions are used to parameterize a write
|
||||||
|
@ -457,6 +461,7 @@ type request struct {
|
||||||
body io.Reader
|
body io.Reader
|
||||||
header http.Header
|
header http.Header
|
||||||
obj interface{}
|
obj interface{}
|
||||||
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
// setQueryOptions is used to annotate the request with
|
// setQueryOptions is used to annotate the request with
|
||||||
|
@ -494,6 +499,7 @@ func (r *request) setQueryOptions(q *QueryOptions) {
|
||||||
if q.RelayFactor != 0 {
|
if q.RelayFactor != 0 {
|
||||||
r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor)))
|
r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor)))
|
||||||
}
|
}
|
||||||
|
r.ctx = q.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
// durToMsec converts a duration to a millisecond specified string. If the
|
// durToMsec converts a duration to a millisecond specified string. If the
|
||||||
|
@ -569,8 +575,11 @@ func (r *request) toHTTP() (*http.Request, error) {
|
||||||
if r.config.HttpAuth != nil {
|
if r.config.HttpAuth != nil {
|
||||||
req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password)
|
req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password)
|
||||||
}
|
}
|
||||||
|
if r.ctx != nil {
|
||||||
return req, nil
|
return req.WithContext(r.ctx), nil
|
||||||
|
} else {
|
||||||
|
return req, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// newRequest is used to create a new request
|
// newRequest is used to create a new request
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package watch
|
package watch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
consulapi "github.com/hashicorp/consul/api"
|
consulapi "github.com/hashicorp/consul/api"
|
||||||
|
@ -41,7 +42,9 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
}
|
}
|
||||||
fn := func(p *Plan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
kv := p.client.KV()
|
kv := p.client.KV()
|
||||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
p.cancelFunc = cancel
|
||||||
|
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
|
||||||
pair, meta, err := kv.Get(key, &opts)
|
pair, meta, err := kv.Get(key, &opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
|
@ -70,7 +73,9 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
}
|
}
|
||||||
fn := func(p *Plan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
kv := p.client.KV()
|
kv := p.client.KV()
|
||||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
p.cancelFunc = cancel
|
||||||
|
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
|
||||||
pairs, meta, err := kv.List(prefix, &opts)
|
pairs, meta, err := kv.List(prefix, &opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
|
@ -89,7 +94,9 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
|
|
||||||
fn := func(p *Plan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
catalog := p.client.Catalog()
|
catalog := p.client.Catalog()
|
||||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
p.cancelFunc = cancel
|
||||||
|
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
|
||||||
services, meta, err := catalog.Services(&opts)
|
services, meta, err := catalog.Services(&opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
|
@ -108,7 +115,9 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
|
|
||||||
fn := func(p *Plan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
catalog := p.client.Catalog()
|
catalog := p.client.Catalog()
|
||||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
p.cancelFunc = cancel
|
||||||
|
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
|
||||||
nodes, meta, err := catalog.Nodes(&opts)
|
nodes, meta, err := catalog.Nodes(&opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
|
@ -144,7 +153,9 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
|
|
||||||
fn := func(p *Plan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
health := p.client.Health()
|
health := p.client.Health()
|
||||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
p.cancelFunc = cancel
|
||||||
|
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
|
||||||
nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
|
nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
|
@ -177,7 +188,9 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
|
|
||||||
fn := func(p *Plan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
health := p.client.Health()
|
health := p.client.Health()
|
||||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
p.cancelFunc = cancel
|
||||||
|
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
|
||||||
var checks []*consulapi.HealthCheck
|
var checks []*consulapi.HealthCheck
|
||||||
var meta *consulapi.QueryMeta
|
var meta *consulapi.QueryMeta
|
||||||
var err error
|
var err error
|
||||||
|
@ -205,7 +218,9 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
|
|
||||||
fn := func(p *Plan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
event := p.client.Event()
|
event := p.client.Event()
|
||||||
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
p.cancelFunc = cancel
|
||||||
|
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex, Context: ctx}
|
||||||
events, meta, err := event.List(name, &opts)
|
events, meta, err := event.List(name, &opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
|
|
|
@ -107,6 +107,9 @@ func (p *Plan) Stop() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.stop = true
|
p.stop = true
|
||||||
|
if p.cancelFunc != nil {
|
||||||
|
p.cancelFunc()
|
||||||
|
}
|
||||||
close(p.stopCh)
|
close(p.stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,8 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"context"
|
||||||
|
|
||||||
consulapi "github.com/hashicorp/consul/api"
|
consulapi "github.com/hashicorp/consul/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -27,9 +29,10 @@ type Plan struct {
|
||||||
lastIndex uint64
|
lastIndex uint64
|
||||||
lastResult interface{}
|
lastResult interface{}
|
||||||
|
|
||||||
stop bool
|
stop bool
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
stopLock sync.Mutex
|
stopLock sync.Mutex
|
||||||
|
cancelFunc context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatcherFunc is used to watch for a diff
|
// WatcherFunc is used to watch for a diff
|
||||||
|
|
Loading…
Reference in New Issue