Updates Serf to pull in new queue depth controls.

This commit is contained in:
James Phillips 2017-12-06 17:06:08 -08:00
parent 23e5dba7d8
commit 3ac47e3b1c
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
3 changed files with 32 additions and 5 deletions

View File

@ -112,6 +112,10 @@ type Config struct {
// node. // node.
FlapTimeout time.Duration FlapTimeout time.Duration
// QueueCheckInterval is the interval at which we check the message
// queue to apply the warning and max depth.
QueueCheckInterval time.Duration
// QueueDepthWarning is used to generate warning message if the // QueueDepthWarning is used to generate warning message if the
// number of queued messages to broadcast exceeds this number. This // number of queued messages to broadcast exceeds this number. This
// is to provide the user feedback if events are being triggered // is to provide the user feedback if events are being triggered
@ -123,6 +127,12 @@ type Config struct {
// prevent an unbounded growth of memory utilization // prevent an unbounded growth of memory utilization
MaxQueueDepth int MaxQueueDepth int
// MinQueueDepth, if >0 will enforce a lower limit for dropping messages
// and then the max will be max(MinQueueDepth, 2*SizeOfCluster). This
// defaults to 0 which disables this dynamic sizing feature. If this is
// >0 then MaxQueueDepth will be ignored.
MinQueueDepth int
// RecentIntentTimeout is used to determine how long we store recent // RecentIntentTimeout is used to determine how long we store recent
// join and leave intents. This is used to guard against the case where // join and leave intents. This is used to guard against the case where
// Serf broadcasts an intent that arrives before the Memberlist event. // Serf broadcasts an intent that arrives before the Memberlist event.
@ -253,6 +263,7 @@ func DefaultConfig() *Config {
RecentIntentTimeout: 5 * time.Minute, RecentIntentTimeout: 5 * time.Minute,
ReconnectInterval: 30 * time.Second, ReconnectInterval: 30 * time.Second,
ReconnectTimeout: 24 * time.Hour, ReconnectTimeout: 24 * time.Hour,
QueueCheckInterval: 30 * time.Second,
QueueDepthWarning: 128, QueueDepthWarning: 128,
MaxQueueDepth: 4096, MaxQueueDepth: 4096,
TombstoneTimeout: 24 * time.Hour, TombstoneTimeout: 24 * time.Hour,

View File

@ -1515,21 +1515,37 @@ func (s *Serf) reconnect() {
s.memberlist.Join([]string{addr.String()}) s.memberlist.Join([]string{addr.String()})
} }
// getQueueMax will get the maximum queue depth, which might be dynamic depending
// on how Serf is configured.
func (s *Serf) getQueueMax() int {
max := s.config.MaxQueueDepth
if s.config.MinQueueDepth > 0 {
s.memberLock.RLock()
max = 2 * len(s.members)
s.memberLock.RUnlock()
if max < s.config.MinQueueDepth {
max = s.config.MinQueueDepth
}
}
return max
}
// checkQueueDepth periodically checks the size of a queue to see if // checkQueueDepth periodically checks the size of a queue to see if
// it is too large // it is too large
func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQueue) { func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQueue) {
for { for {
select { select {
case <-time.After(time.Second): case <-time.After(s.config.QueueCheckInterval):
numq := queue.NumQueued() numq := queue.NumQueued()
metrics.AddSample([]string{"serf", "queue", name}, float32(numq)) metrics.AddSample([]string{"serf", "queue", name}, float32(numq))
if numq >= s.config.QueueDepthWarning { if numq >= s.config.QueueDepthWarning {
s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq) s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq)
} }
if numq > s.config.MaxQueueDepth { if max := s.getQueueMax(); numq > max {
s.logger.Printf("[WARN] serf: %s queue depth (%d) exceeds limit (%d), dropping messages!", s.logger.Printf("[WARN] serf: %s queue depth (%d) exceeds limit (%d), dropping messages!",
name, numq, s.config.MaxQueueDepth) name, numq, max)
queue.Prune(s.config.MaxQueueDepth) queue.Prune(max)
} }
case <-s.shutdownCh: case <-s.shutdownCh:
return return

2
vendor/vendor.json vendored
View File

@ -64,7 +64,7 @@
{"path":"github.com/hashicorp/raft","checksumSHA1":"JjJtGJi1ywWhVhs/PvTXxe4TeD8=","revision":"6d14f0c70869faabd9e60ba7ed88a6cbbd6a661f","revisionTime":"2017-10-03T22:09:13Z","version":"v1.0.0","versionExact":"v1.0.0"}, {"path":"github.com/hashicorp/raft","checksumSHA1":"JjJtGJi1ywWhVhs/PvTXxe4TeD8=","revision":"6d14f0c70869faabd9e60ba7ed88a6cbbd6a661f","revisionTime":"2017-10-03T22:09:13Z","version":"v1.0.0","versionExact":"v1.0.0"},
{"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"}, {"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"},
{"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"mS15CkImPzXYsgNwl3Mt9Gh3Vb0=","comment":"v0.7.0-66-g6c4672d","revision":"c20a0b1b1ea9eb8168bcdec0116688fa9254e449","revisionTime":"2017-10-22T02:00:50Z"}, {"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"mS15CkImPzXYsgNwl3Mt9Gh3Vb0=","comment":"v0.7.0-66-g6c4672d","revision":"c20a0b1b1ea9eb8168bcdec0116688fa9254e449","revisionTime":"2017-10-22T02:00:50Z"},
{"path":"github.com/hashicorp/serf/serf","checksumSHA1":"iYhCWgAAUcQjU0JocsKgak5C8tY=","comment":"v0.7.0-66-g6c4672d","revision":"c20a0b1b1ea9eb8168bcdec0116688fa9254e449","revisionTime":"2017-10-22T02:00:50Z"}, {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"NegZzEwYOlfkbvy+jTBkX3OBcRM=","comment":"v0.7.0-66-g6c4672d","revision":"a110af454b635c75adc2b7eee541af2c68666d97","revisionTime":"2017-12-07T01:04:04Z"},
{"path":"github.com/hashicorp/yamux","checksumSHA1":"ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=","revision":"d1caa6c97c9fc1cc9e83bbe34d0603f9ff0ce8bd","revisionTime":"2016-07-20T23:31:40Z"}, {"path":"github.com/hashicorp/yamux","checksumSHA1":"ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=","revision":"d1caa6c97c9fc1cc9e83bbe34d0603f9ff0ce8bd","revisionTime":"2016-07-20T23:31:40Z"},
{"path":"github.com/mattn/go-isatty","checksumSHA1":"xZuhljnmBysJPta/lMyYmJdujCg=","revision":"66b8e73f3f5cda9f96b69efd03dd3d7fc4a5cdb8","revisionTime":"2016-08-06T12:27:52Z"}, {"path":"github.com/mattn/go-isatty","checksumSHA1":"xZuhljnmBysJPta/lMyYmJdujCg=","revision":"66b8e73f3f5cda9f96b69efd03dd3d7fc4a5cdb8","revisionTime":"2016-08-06T12:27:52Z"},
{"path":"github.com/miekg/dns","checksumSHA1":"Jo+pItYOocIRdoFL0fc4nHhUEJY=","revision":"bbca4873b326f5dc54bfe31148446d4ed79a5a02","revisionTime":"2017-08-08T22:19:10Z"}, {"path":"github.com/miekg/dns","checksumSHA1":"Jo+pItYOocIRdoFL0fc4nHhUEJY=","revision":"bbca4873b326f5dc54bfe31148446d4ed79a5a02","revisionTime":"2017-08-08T22:19:10Z"},