Merge pull request #939 from hashicorp/f-leak

Fixing memory leak caused by blocking query
This commit is contained in:
Armon Dadgar 2015-05-14 18:32:30 -07:00
commit 490c4e1f8c
8 changed files with 166 additions and 25 deletions

View File

@ -10,7 +10,7 @@ import (
// notify list.
type NotifyGroup struct {
l sync.Mutex
notify []chan struct{}
notify map[chan struct{}]struct{}
}
// Notify will do a non-blocking send to all waiting channels, and
@ -18,20 +18,33 @@ type NotifyGroup struct {
func (n *NotifyGroup) Notify() {
n.l.Lock()
defer n.l.Unlock()
for _, ch := range n.notify {
for ch, _ := range n.notify {
select {
case ch <- struct{}{}:
default:
}
}
n.notify = n.notify[:0]
n.notify = nil
}
// Wait adds a channel to the notify group
func (n *NotifyGroup) Wait(ch chan struct{}) {
n.l.Lock()
defer n.l.Unlock()
n.notify = append(n.notify, ch)
if n.notify == nil {
n.notify = make(map[chan struct{}]struct{})
}
n.notify[ch] = struct{}{}
}
// Clear removes a channel from the notify group
func (n *NotifyGroup) Clear(ch chan struct{}) {
n.l.Lock()
defer n.l.Unlock()
if n.notify == nil {
return
}
delete(n.notify, ch)
}
// WaitCh allocates a channel that is subscribed to notifications

View File

@ -54,3 +54,19 @@ func TestNotifyGroup(t *testing.T) {
t.Fatalf("should not block")
}
}
func TestNotifyGroup_Clear(t *testing.T) {
grp := &NotifyGroup{}
ch1 := grp.WaitCh()
grp.Clear(ch1)
grp.Notify()
// Should not get anything
select {
case <-ch1:
t.Fatalf("should not get message")
default:
}
}

View File

@ -30,6 +30,15 @@ const (
// maxQueryTime is used to bound the limit of a blocking query
maxQueryTime = 600 * time.Second
// defaultQueryTime is the amount of time we block waiting for a change
// if no time is specified. Previously we would wait the maxQueryTime.
defaultQueryTime = 300 * time.Second
// jitterFraction is a the limit to the amount of jitter we apply
// to a user specified MaxQueryTime. We divide the specified time by
// the fraction. So 16 == 6.25% limit of jitter
jitterFraction = 16
// Warn if the Raft command is larger than this.
// If it's over 1MB something is probably being abusive.
raftWarnSize = 1024 * 1024
@ -314,8 +323,9 @@ type blockingRPCOptions struct {
// blockingRPCOpt is the replacement for blockingRPC as it allows
// for more parameterization easily. It should be prefered over blockingRPC.
func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error {
var timeout <-chan time.Time
var timeout *time.Timer
var notifyCh chan struct{}
var state *StateStore
// Fast path non-blocking
if opts.queryOpts.MinQueryIndex == 0 {
@ -327,31 +337,39 @@ func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error {
panic("no tables to block on")
}
// Restrict the max query time
// Restrict the max query time, and ensure there is always one
if opts.queryOpts.MaxQueryTime > maxQueryTime {
opts.queryOpts.MaxQueryTime = maxQueryTime
} else if opts.queryOpts.MaxQueryTime <= 0 {
opts.queryOpts.MaxQueryTime = defaultQueryTime
}
// Ensure a time limit is set if we have an index
if opts.queryOpts.MinQueryIndex > 0 && opts.queryOpts.MaxQueryTime == 0 {
opts.queryOpts.MaxQueryTime = maxQueryTime
}
// Apply a small amount of jitter to the request
opts.queryOpts.MaxQueryTime += randomStagger(opts.queryOpts.MaxQueryTime / jitterFraction)
// Setup a query timeout
if opts.queryOpts.MaxQueryTime > 0 {
timeout = time.After(opts.queryOpts.MaxQueryTime)
}
timeout = time.NewTimer(opts.queryOpts.MaxQueryTime)
// Setup a notification channel for changes
SETUP_NOTIFY:
if opts.queryOpts.MinQueryIndex > 0 {
// Setup the notify channel
notifyCh = make(chan struct{}, 1)
state := s.fsm.State()
// Ensure we tear down any watchers on return
state = s.fsm.State()
defer func() {
timeout.Stop()
state.StopWatch(opts.tables, notifyCh)
if opts.kvWatch {
state.StopWatchKV(opts.kvPrefix, notifyCh)
}
}()
REGISTER_NOTIFY:
// Register the notification channel. This may be done
// multiple times if we have not reached the target wait index.
state.Watch(opts.tables, notifyCh)
if opts.kvWatch {
state.WatchKV(opts.kvPrefix, notifyCh)
}
}
RUN_QUERY:
// Update the query meta data
@ -372,8 +390,8 @@ RUN_QUERY:
if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex {
select {
case <-notifyCh:
goto SETUP_NOTIFY
case <-timeout:
goto REGISTER_NOTIFY
case <-timeout.C:
}
}
return err

View File

@ -421,6 +421,13 @@ func (s *StateStore) Watch(tables MDBTables, notify chan struct{}) {
}
}
// StopWatch is used to unsubscribe a channel to a set of MDBTables
func (s *StateStore) StopWatch(tables MDBTables, notify chan struct{}) {
for _, t := range tables {
s.watch[t].Clear(notify)
}
}
// WatchKV is used to subscribe a channel to changes in KV data
func (s *StateStore) WatchKV(prefix string, notify chan struct{}) {
s.kvWatchLock.Lock()
@ -439,6 +446,18 @@ func (s *StateStore) WatchKV(prefix string, notify chan struct{}) {
s.kvWatch.Insert(prefix, grp)
}
// StopWatchKV is used to unsubscribe a channel from changes in KV data
func (s *StateStore) StopWatchKV(prefix string, notify chan struct{}) {
s.kvWatchLock.Lock()
defer s.kvWatchLock.Unlock()
// Check for an existing notify group
if raw, ok := s.kvWatch.Get(prefix); ok {
grp := raw.(*NotifyGroup)
grp.Clear(notify)
}
}
// notifyKV is used to notify any KV listeners of a change
// on a prefix
func (s *StateStore) notifyKV(path string, prefix bool) {

View File

@ -127,6 +127,37 @@ func TestGetNodes(t *testing.T) {
}
}
func TestGetNodes_Watch_StopWatch(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
notify1 := make(chan struct{}, 1)
notify2 := make(chan struct{}, 1)
store.Watch(store.QueryTables("Nodes"), notify1)
store.Watch(store.QueryTables("Nodes"), notify2)
store.StopWatch(store.QueryTables("Nodes"), notify2)
if err := store.EnsureNode(40, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
select {
case <-notify1:
default:
t.Fatalf("should be notified")
}
select {
case <-notify2:
t.Fatalf("should not be notified")
default:
}
}
func BenchmarkGetNodes(b *testing.B) {
store, err := testStateStore()
if err != nil {
@ -1429,6 +1460,32 @@ func TestKVSSet_Watch(t *testing.T) {
}
}
func TestKVSSet_Watch_Stop(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
notify1 := make(chan struct{}, 1)
store.WatchKV("", notify1)
store.StopWatchKV("", notify1)
// Create the entry
d := &structs.DirEntry{Key: "foo/baz", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}
// Check that we've not fired notify1
select {
case <-notify1:
t.Fatalf("should not notify ")
default:
}
}
func TestKVSSet_Get(t *testing.T) {
store, err := testStateStore()
if err != nil {

View File

@ -4,12 +4,14 @@ import (
crand "crypto/rand"
"encoding/binary"
"fmt"
"math/rand"
"net"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
"github.com/hashicorp/serf/serf"
)
@ -222,3 +224,8 @@ func generateUUID() string {
buf[8:10],
buf[10:16])
}
// Returns a random stagger interval between 0 and the duration
func randomStagger(intv time.Duration) time.Duration {
return time.Duration(uint64(rand.Int63()) % uint64(intv))
}

View File

@ -4,6 +4,7 @@ import (
"net"
"regexp"
"testing"
"time"
"github.com/hashicorp/serf/serf"
)
@ -124,3 +125,13 @@ func TestGenerateUUID(t *testing.T) {
}
}
}
func TestRandomStagger(t *testing.T) {
intv := time.Minute
for i := 0; i < 10; i++ {
stagger := randomStagger(intv)
if stagger < 0 || stagger >= intv {
t.Fatalf("Bad: %v", stagger)
}
}
}

View File

@ -39,9 +39,9 @@ query string parameter to the value of `X-Consul-Index`, indicating that the cli
to wait for any changes subsequent to that index.
In addition to `index`, endpoints that support blocking will also honor a `wait`
parameter specifying a maximum duration for the blocking request. If not set, it will
default to 10 minutes. This value can be specified in the form of "10s" or "5m" (i.e.,
10 seconds or 5 minutes, respectively).
parameter specifying a maximum duration for the blocking request. This is limited to
10 minutes. If not set, the wait time defaults to 5 minutes. This value can be specified
in the form of "10s" or "5m" (i.e., 10 seconds or 5 minutes, respectively).
A critical note is that the return of a blocking request is **no guarantee** of a change. It
is possible that the timeout was reached or that there was an idempotent write that does