From 954c32e6ee2f04799df08ee5e2e02cf865fb12c3 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 8 Aug 2016 18:58:44 -0700 Subject: [PATCH] Updates Serf to pick up intent queue fix. This fixes #1062 by storing intents per-node instead of in a small, fixed- size circular buffer. --- .../github.com/hashicorp/serf/serf/config.go | 14 +- vendor/github.com/hashicorp/serf/serf/serf.go | 137 ++++++++++-------- vendor/vendor.json | 8 +- 3 files changed, 87 insertions(+), 72 deletions(-) diff --git a/vendor/github.com/hashicorp/serf/serf/config.go b/vendor/github.com/hashicorp/serf/serf/config.go index e8edd6902..dfe878bbc 100644 --- a/vendor/github.com/hashicorp/serf/serf/config.go +++ b/vendor/github.com/hashicorp/serf/serf/config.go @@ -121,12 +121,12 @@ type Config struct { // prevent an unbounded growth of memory utilization MaxQueueDepth int - // RecentIntentBuffer is used to set the size of recent join and leave intent - // messages that will be buffered. This is used to guard against - // the case where Serf broadcasts an intent that arrives before the - // Memberlist event. It is important that this not be too small to avoid - // continuous rebroadcasting of dead events. - RecentIntentBuffer int + // RecentIntentTimeout is used to determine how long we store recent + // join and leave intents. This is used to guard against the case where + // Serf broadcasts an intent that arrives before the Memberlist event. + // It is important that this not be too short to avoid continuous + // rebroadcasting of dead events. + RecentIntentTimeout time.Duration // EventBuffer is used to control how many events are buffered. // This is used to prevent re-delivery of events to a client. The buffer @@ -242,7 +242,7 @@ func DefaultConfig() *Config { LogOutput: os.Stderr, ProtocolVersion: ProtocolVersionMax, ReapInterval: 15 * time.Second, - RecentIntentBuffer: 128, + RecentIntentTimeout: 5 * time.Minute, ReconnectInterval: 30 * time.Second, ReconnectTimeout: 24 * time.Hour, QueueDepthWarning: 128, diff --git a/vendor/github.com/hashicorp/serf/serf/serf.go b/vendor/github.com/hashicorp/serf/serf/serf.go index 04b300a9b..424da0195 100644 --- a/vendor/github.com/hashicorp/serf/serf/serf.go +++ b/vendor/github.com/hashicorp/serf/serf/serf.go @@ -65,12 +65,11 @@ type Serf struct { memberLock sync.RWMutex members map[string]*memberState - // Circular buffers for recent intents, used - // in case we get the intent before the relevant event - recentLeave []nodeIntent - recentLeaveIndex int - recentJoin []nodeIntent - recentJoinIndex int + // recentIntents the lamport time and type of intent for a given node in + // case we get an intent before the relevant memberlist event. This is + // indexed by node, and always store the latest lamport time / intent + // we've seen. The memberLock protects this structure. + recentIntents map[string]nodeIntent eventBroadcasts *memberlist.TransmitLimitedQueue eventBuffer []*userEvents @@ -179,10 +178,18 @@ type memberState struct { leaveTime time.Time // wall clock time of leave } -// nodeIntent is used to buffer intents for out-of-order deliveries +// nodeIntent is used to buffer intents for out-of-order deliveries. type nodeIntent struct { + // Type is the intent being tracked. Only messageJoinType and + // messageLeaveType are tracked. + Type messageType + + // WallTime is the wall clock time we saw this intent in order to + // expire it from the buffer. + WallTime time.Time + + // LTime is the Lamport time, used for cluster-wide ordering of events. LTime LamportTime - Node string } // userEvent is used to buffer events to prevent re-delivery @@ -340,8 +347,7 @@ func Create(conf *Config) (*Serf, error) { } // Create the buffer for recent intents - serf.recentJoin = make([]nodeIntent, conf.RecentIntentBuffer) - serf.recentLeave = make([]nodeIntent, conf.RecentIntentBuffer) + serf.recentIntents = make(map[string]nodeIntent) // Create a buffer for events and queries serf.eventBuffer = make([]*userEvents, conf.EventBuffer) @@ -855,17 +861,15 @@ func (s *Serf) handleNodeJoin(n *memberlist.Node) { }, } - // Check if we have a join intent and use the LTime - if join := recentIntent(s.recentJoin, n.Name); join != nil { - member.statusLTime = join.LTime + // Check if we have a join or leave intent. The intent buffer + // will only hold one event for this node, so the more recent + // one will take effect. + if join, ok := recentIntent(s.recentIntents, n.Name, messageJoinType); ok { + member.statusLTime = join } - - // Check if we have a leave intent - if leave := recentIntent(s.recentLeave, n.Name); leave != nil { - if leave.LTime > member.statusLTime { - member.Status = StatusLeaving - member.statusLTime = leave.LTime - } + if leave, ok := recentIntent(s.recentIntents, n.Name, messageLeaveType); ok { + member.Status = StatusLeaving + member.statusLTime = leave } s.members[n.Name] = member @@ -1016,18 +1020,8 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool { member, ok := s.members[leaveMsg.Node] if !ok { - // If we've already seen this message don't rebroadcast - if recentIntent(s.recentLeave, leaveMsg.Node) != nil { - return false - } - - // We don't know this member so store it in a buffer for now - s.recentLeave[s.recentLeaveIndex] = nodeIntent{ - LTime: leaveMsg.LTime, - Node: leaveMsg.Node, - } - s.recentLeaveIndex = (s.recentLeaveIndex + 1) % len(s.recentLeave) - return true + // Rebroadcast only if this was an update we hadn't seen before. + return upsertIntent(s.recentIntents, leaveMsg.Node, messageLeaveType, leaveMsg.LTime, time.Now) } // If the message is old, then it is irrelevant and we can skip it @@ -1087,15 +1081,8 @@ func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool { member, ok := s.members[joinMsg.Node] if !ok { - // If we've already seen this message don't rebroadcast - if recentIntent(s.recentJoin, joinMsg.Node) != nil { - return false - } - - // We don't know this member so store it in a buffer for now - s.recentJoin[s.recentJoinIndex] = nodeIntent{LTime: joinMsg.LTime, Node: joinMsg.Node} - s.recentJoinIndex = (s.recentJoinIndex + 1) % len(s.recentJoin) - return true + // Rebroadcast only if this was an update we hadn't seen before. + return upsertIntent(s.recentIntents, joinMsg.Node, messageJoinType, joinMsg.LTime, time.Now) } // Check if this time is newer than what we have @@ -1387,14 +1374,17 @@ func (s *Serf) resolveNodeConflict() { } } -// handleReap periodically reaps the list of failed and left members. +// handleReap periodically reaps the list of failed and left members, as well +// as old buffered intents. func (s *Serf) handleReap() { for { select { case <-time.After(s.config.ReapInterval): s.memberLock.Lock() - s.failedMembers = s.reap(s.failedMembers, s.config.ReconnectTimeout) - s.leftMembers = s.reap(s.leftMembers, s.config.TombstoneTimeout) + now := time.Now() + s.failedMembers = s.reap(s.failedMembers, now, s.config.ReconnectTimeout) + s.leftMembers = s.reap(s.leftMembers, now, s.config.TombstoneTimeout) + reapIntents(s.recentIntents, now, s.config.RecentIntentTimeout) s.memberLock.Unlock() case <-s.shutdownCh: return @@ -1418,8 +1408,7 @@ func (s *Serf) handleReconnect() { // reap is called with a list of old members and a timeout, and removes // members that have exceeded the timeout. The members are removed from // both the old list and the members itself. Locking is left to the caller. -func (s *Serf) reap(old []*memberState, timeout time.Duration) []*memberState { - now := time.Now() +func (s *Serf) reap(old []*memberState, now time.Time, timeout time.Duration) []*memberState { n := len(old) for i := 0; i < n; i++ { m := old[i] @@ -1490,7 +1479,7 @@ func (s *Serf) reconnect() { } // Select a random member to try and join - idx := int(rand.Uint32() % uint32(n)) + idx := rand.Int31n(int32(n)) mem := s.failedMembers[idx] s.memberLock.RUnlock() @@ -1538,24 +1527,46 @@ func removeOldMember(old []*memberState, name string) []*memberState { return old } -// recentIntent checks the recent intent buffer for a matching -// entry for a given node, and either returns the message or nil -func recentIntent(recent []nodeIntent, node string) (intent *nodeIntent) { - for i := 0; i < len(recent); i++ { - // Break fast if we hit a zero entry - if recent[i].LTime == 0 { - break - } - - // Check for a node match - if recent[i].Node == node { - // Take the most recent entry - if intent == nil || recent[i].LTime > intent.LTime { - intent = &recent[i] - } +// reapIntents clears out any intents that are older than the timeout. Make sure +// the memberLock is held when passing in the Serf instance's recentIntents +// member. +func reapIntents(intents map[string]nodeIntent, now time.Time, timeout time.Duration) { + for node, intent := range intents { + if now.Sub(intent.WallTime) > timeout { + delete(intents, node) } } - return +} + +// upsertIntent will update an existing intent with the supplied Lamport time, +// or create a new entry. This will return true if a new entry was added. The +// stamper is used to capture the wall clock time for expiring these buffered +// intents. Make sure the memberLock is held when passing in the Serf instance's +// recentIntents member. +func upsertIntent(intents map[string]nodeIntent, node string, itype messageType, + ltime LamportTime, stamper func() time.Time) bool { + if intent, ok := intents[node]; !ok || ltime > intent.LTime { + intents[node] = nodeIntent{ + Type: itype, + WallTime: stamper(), + LTime: ltime, + } + return true + } + + return false +} + +// recentIntent checks the recent intent buffer for a matching entry for a given +// node, and returns the Lamport time, if an intent is present, indicated by the +// returned boolean. Make sure the memberLock is held for read when passing in +// the Serf instance's recentIntents member. +func recentIntent(intents map[string]nodeIntent, node string, itype messageType) (LamportTime, bool) { + if intent, ok := intents[node]; ok && intent.Type == itype { + return intent.LTime, true + } + + return LamportTime(0), false } // handleRejoin attempts to reconnect to previously known alive nodes diff --git a/vendor/vendor.json b/vendor/vendor.json index 6f2965982..a4773ad39 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -276,14 +276,18 @@ "revision": "84989fd23ad4cc0e7ad44d6a871fd793eb9beb0a" }, { + "checksumSHA1": "E3Xcanc9ouQwL+CZGOUyA/+giLg=", "comment": "v0.7.0-66-g6c4672d", "path": "github.com/hashicorp/serf/coordinate", - "revision": "6c4672d66fc6312ddde18399262943e21175d831" + "revision": "114430d8210835d66defdc31cdc176c58e060005", + "revisionTime": "2016-08-09T01:42:04Z" }, { + "checksumSHA1": "vLyudzMEdik8IpRY1H2vRa2PeLU=", "comment": "v0.7.0-66-g6c4672d", "path": "github.com/hashicorp/serf/serf", - "revision": "6c4672d66fc6312ddde18399262943e21175d831" + "revision": "114430d8210835d66defdc31cdc176c58e060005", + "revisionTime": "2016-08-09T01:42:04Z" }, { "path": "github.com/hashicorp/yamux",