Updates Serf to pick up intent queue fix.

This fixes #1062 by storing intents per-node instead of in a small, fixed-
size circular buffer.
This commit is contained in:
James Phillips 2016-08-08 18:58:44 -07:00
parent 1a5b2abe79
commit 954c32e6ee
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
3 changed files with 87 additions and 72 deletions

View File

@ -121,12 +121,12 @@ type Config struct {
// prevent an unbounded growth of memory utilization // prevent an unbounded growth of memory utilization
MaxQueueDepth int MaxQueueDepth int
// RecentIntentBuffer is used to set the size of recent join and leave intent // RecentIntentTimeout is used to determine how long we store recent
// messages that will be buffered. This is used to guard against // join and leave intents. This is used to guard against the case where
// the case where Serf broadcasts an intent that arrives before the // Serf broadcasts an intent that arrives before the Memberlist event.
// Memberlist event. It is important that this not be too small to avoid // It is important that this not be too short to avoid continuous
// continuous rebroadcasting of dead events. // rebroadcasting of dead events.
RecentIntentBuffer int RecentIntentTimeout time.Duration
// EventBuffer is used to control how many events are buffered. // EventBuffer is used to control how many events are buffered.
// This is used to prevent re-delivery of events to a client. The buffer // This is used to prevent re-delivery of events to a client. The buffer
@ -242,7 +242,7 @@ func DefaultConfig() *Config {
LogOutput: os.Stderr, LogOutput: os.Stderr,
ProtocolVersion: ProtocolVersionMax, ProtocolVersion: ProtocolVersionMax,
ReapInterval: 15 * time.Second, ReapInterval: 15 * time.Second,
RecentIntentBuffer: 128, RecentIntentTimeout: 5 * time.Minute,
ReconnectInterval: 30 * time.Second, ReconnectInterval: 30 * time.Second,
ReconnectTimeout: 24 * time.Hour, ReconnectTimeout: 24 * time.Hour,
QueueDepthWarning: 128, QueueDepthWarning: 128,

View File

@ -65,12 +65,11 @@ type Serf struct {
memberLock sync.RWMutex memberLock sync.RWMutex
members map[string]*memberState members map[string]*memberState
// Circular buffers for recent intents, used // recentIntents the lamport time and type of intent for a given node in
// in case we get the intent before the relevant event // case we get an intent before the relevant memberlist event. This is
recentLeave []nodeIntent // indexed by node, and always store the latest lamport time / intent
recentLeaveIndex int // we've seen. The memberLock protects this structure.
recentJoin []nodeIntent recentIntents map[string]nodeIntent
recentJoinIndex int
eventBroadcasts *memberlist.TransmitLimitedQueue eventBroadcasts *memberlist.TransmitLimitedQueue
eventBuffer []*userEvents eventBuffer []*userEvents
@ -179,10 +178,18 @@ type memberState struct {
leaveTime time.Time // wall clock time of leave leaveTime time.Time // wall clock time of leave
} }
// nodeIntent is used to buffer intents for out-of-order deliveries // nodeIntent is used to buffer intents for out-of-order deliveries.
type nodeIntent struct { type nodeIntent struct {
// Type is the intent being tracked. Only messageJoinType and
// messageLeaveType are tracked.
Type messageType
// WallTime is the wall clock time we saw this intent in order to
// expire it from the buffer.
WallTime time.Time
// LTime is the Lamport time, used for cluster-wide ordering of events.
LTime LamportTime LTime LamportTime
Node string
} }
// userEvent is used to buffer events to prevent re-delivery // userEvent is used to buffer events to prevent re-delivery
@ -340,8 +347,7 @@ func Create(conf *Config) (*Serf, error) {
} }
// Create the buffer for recent intents // Create the buffer for recent intents
serf.recentJoin = make([]nodeIntent, conf.RecentIntentBuffer) serf.recentIntents = make(map[string]nodeIntent)
serf.recentLeave = make([]nodeIntent, conf.RecentIntentBuffer)
// Create a buffer for events and queries // Create a buffer for events and queries
serf.eventBuffer = make([]*userEvents, conf.EventBuffer) serf.eventBuffer = make([]*userEvents, conf.EventBuffer)
@ -855,17 +861,15 @@ func (s *Serf) handleNodeJoin(n *memberlist.Node) {
}, },
} }
// Check if we have a join intent and use the LTime // Check if we have a join or leave intent. The intent buffer
if join := recentIntent(s.recentJoin, n.Name); join != nil { // will only hold one event for this node, so the more recent
member.statusLTime = join.LTime // one will take effect.
if join, ok := recentIntent(s.recentIntents, n.Name, messageJoinType); ok {
member.statusLTime = join
} }
if leave, ok := recentIntent(s.recentIntents, n.Name, messageLeaveType); ok {
// Check if we have a leave intent
if leave := recentIntent(s.recentLeave, n.Name); leave != nil {
if leave.LTime > member.statusLTime {
member.Status = StatusLeaving member.Status = StatusLeaving
member.statusLTime = leave.LTime member.statusLTime = leave
}
} }
s.members[n.Name] = member s.members[n.Name] = member
@ -1016,18 +1020,8 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
member, ok := s.members[leaveMsg.Node] member, ok := s.members[leaveMsg.Node]
if !ok { if !ok {
// If we've already seen this message don't rebroadcast // Rebroadcast only if this was an update we hadn't seen before.
if recentIntent(s.recentLeave, leaveMsg.Node) != nil { return upsertIntent(s.recentIntents, leaveMsg.Node, messageLeaveType, leaveMsg.LTime, time.Now)
return false
}
// We don't know this member so store it in a buffer for now
s.recentLeave[s.recentLeaveIndex] = nodeIntent{
LTime: leaveMsg.LTime,
Node: leaveMsg.Node,
}
s.recentLeaveIndex = (s.recentLeaveIndex + 1) % len(s.recentLeave)
return true
} }
// If the message is old, then it is irrelevant and we can skip it // If the message is old, then it is irrelevant and we can skip it
@ -1087,15 +1081,8 @@ func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool {
member, ok := s.members[joinMsg.Node] member, ok := s.members[joinMsg.Node]
if !ok { if !ok {
// If we've already seen this message don't rebroadcast // Rebroadcast only if this was an update we hadn't seen before.
if recentIntent(s.recentJoin, joinMsg.Node) != nil { return upsertIntent(s.recentIntents, joinMsg.Node, messageJoinType, joinMsg.LTime, time.Now)
return false
}
// We don't know this member so store it in a buffer for now
s.recentJoin[s.recentJoinIndex] = nodeIntent{LTime: joinMsg.LTime, Node: joinMsg.Node}
s.recentJoinIndex = (s.recentJoinIndex + 1) % len(s.recentJoin)
return true
} }
// Check if this time is newer than what we have // Check if this time is newer than what we have
@ -1387,14 +1374,17 @@ func (s *Serf) resolveNodeConflict() {
} }
} }
// handleReap periodically reaps the list of failed and left members. // handleReap periodically reaps the list of failed and left members, as well
// as old buffered intents.
func (s *Serf) handleReap() { func (s *Serf) handleReap() {
for { for {
select { select {
case <-time.After(s.config.ReapInterval): case <-time.After(s.config.ReapInterval):
s.memberLock.Lock() s.memberLock.Lock()
s.failedMembers = s.reap(s.failedMembers, s.config.ReconnectTimeout) now := time.Now()
s.leftMembers = s.reap(s.leftMembers, s.config.TombstoneTimeout) s.failedMembers = s.reap(s.failedMembers, now, s.config.ReconnectTimeout)
s.leftMembers = s.reap(s.leftMembers, now, s.config.TombstoneTimeout)
reapIntents(s.recentIntents, now, s.config.RecentIntentTimeout)
s.memberLock.Unlock() s.memberLock.Unlock()
case <-s.shutdownCh: case <-s.shutdownCh:
return return
@ -1418,8 +1408,7 @@ func (s *Serf) handleReconnect() {
// reap is called with a list of old members and a timeout, and removes // reap is called with a list of old members and a timeout, and removes
// members that have exceeded the timeout. The members are removed from // members that have exceeded the timeout. The members are removed from
// both the old list and the members itself. Locking is left to the caller. // both the old list and the members itself. Locking is left to the caller.
func (s *Serf) reap(old []*memberState, timeout time.Duration) []*memberState { func (s *Serf) reap(old []*memberState, now time.Time, timeout time.Duration) []*memberState {
now := time.Now()
n := len(old) n := len(old)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
m := old[i] m := old[i]
@ -1490,7 +1479,7 @@ func (s *Serf) reconnect() {
} }
// Select a random member to try and join // Select a random member to try and join
idx := int(rand.Uint32() % uint32(n)) idx := rand.Int31n(int32(n))
mem := s.failedMembers[idx] mem := s.failedMembers[idx]
s.memberLock.RUnlock() s.memberLock.RUnlock()
@ -1538,24 +1527,46 @@ func removeOldMember(old []*memberState, name string) []*memberState {
return old return old
} }
// recentIntent checks the recent intent buffer for a matching // reapIntents clears out any intents that are older than the timeout. Make sure
// entry for a given node, and either returns the message or nil // the memberLock is held when passing in the Serf instance's recentIntents
func recentIntent(recent []nodeIntent, node string) (intent *nodeIntent) { // member.
for i := 0; i < len(recent); i++ { func reapIntents(intents map[string]nodeIntent, now time.Time, timeout time.Duration) {
// Break fast if we hit a zero entry for node, intent := range intents {
if recent[i].LTime == 0 { if now.Sub(intent.WallTime) > timeout {
break delete(intents, node)
}
}
}
// upsertIntent will update an existing intent with the supplied Lamport time,
// or create a new entry. This will return true if a new entry was added. The
// stamper is used to capture the wall clock time for expiring these buffered
// intents. Make sure the memberLock is held when passing in the Serf instance's
// recentIntents member.
func upsertIntent(intents map[string]nodeIntent, node string, itype messageType,
ltime LamportTime, stamper func() time.Time) bool {
if intent, ok := intents[node]; !ok || ltime > intent.LTime {
intents[node] = nodeIntent{
Type: itype,
WallTime: stamper(),
LTime: ltime,
}
return true
} }
// Check for a node match return false
if recent[i].Node == node { }
// Take the most recent entry
if intent == nil || recent[i].LTime > intent.LTime { // recentIntent checks the recent intent buffer for a matching entry for a given
intent = &recent[i] // node, and returns the Lamport time, if an intent is present, indicated by the
// returned boolean. Make sure the memberLock is held for read when passing in
// the Serf instance's recentIntents member.
func recentIntent(intents map[string]nodeIntent, node string, itype messageType) (LamportTime, bool) {
if intent, ok := intents[node]; ok && intent.Type == itype {
return intent.LTime, true
} }
}
} return LamportTime(0), false
return
} }
// handleRejoin attempts to reconnect to previously known alive nodes // handleRejoin attempts to reconnect to previously known alive nodes

8
vendor/vendor.json vendored
View File

@ -276,14 +276,18 @@
"revision": "84989fd23ad4cc0e7ad44d6a871fd793eb9beb0a" "revision": "84989fd23ad4cc0e7ad44d6a871fd793eb9beb0a"
}, },
{ {
"checksumSHA1": "E3Xcanc9ouQwL+CZGOUyA/+giLg=",
"comment": "v0.7.0-66-g6c4672d", "comment": "v0.7.0-66-g6c4672d",
"path": "github.com/hashicorp/serf/coordinate", "path": "github.com/hashicorp/serf/coordinate",
"revision": "6c4672d66fc6312ddde18399262943e21175d831" "revision": "114430d8210835d66defdc31cdc176c58e060005",
"revisionTime": "2016-08-09T01:42:04Z"
}, },
{ {
"checksumSHA1": "vLyudzMEdik8IpRY1H2vRa2PeLU=",
"comment": "v0.7.0-66-g6c4672d", "comment": "v0.7.0-66-g6c4672d",
"path": "github.com/hashicorp/serf/serf", "path": "github.com/hashicorp/serf/serf",
"revision": "6c4672d66fc6312ddde18399262943e21175d831" "revision": "114430d8210835d66defdc31cdc176c58e060005",
"revisionTime": "2016-08-09T01:42:04Z"
}, },
{ {
"path": "github.com/hashicorp/yamux", "path": "github.com/hashicorp/yamux",