diff --git a/vendor/github.com/hashicorp/serf/serf/config.go b/vendor/github.com/hashicorp/serf/serf/config.go index dfe878bbc..74f21ffbd 100644 --- a/vendor/github.com/hashicorp/serf/serf/config.go +++ b/vendor/github.com/hashicorp/serf/serf/config.go @@ -2,6 +2,7 @@ package serf import ( "io" + "log" "os" "time" @@ -15,6 +16,7 @@ var ProtocolVersionMap map[uint8]uint8 func init() { ProtocolVersionMap = map[uint8]uint8{ + 5: 2, 4: 2, 3: 2, 2: 2, @@ -182,6 +184,12 @@ type Config struct { // logs will go to stderr. LogOutput io.Writer + // Logger is a custom logger which you provide. If Logger is set, it will use + // this for the internal logger. If Logger is not set, it will fall back to the + // behavior for using LogOutput. You cannot specify both LogOutput and Logger + // at the same time. + Logger *log.Logger + // SnapshotPath if provided is used to snapshot live nodes as well // as lamport clock values. When Serf is started with a snapshot, // it will attempt to join all the previously known nodes until one @@ -240,7 +248,7 @@ func DefaultConfig() *Config { EventBuffer: 512, QueryBuffer: 512, LogOutput: os.Stderr, - ProtocolVersion: ProtocolVersionMax, + ProtocolVersion: 4, ReapInterval: 15 * time.Second, RecentIntentTimeout: 5 * time.Minute, ReconnectInterval: 30 * time.Second, diff --git a/vendor/github.com/hashicorp/serf/serf/delegate.go b/vendor/github.com/hashicorp/serf/serf/delegate.go index d19ca3090..153531502 100644 --- a/vendor/github.com/hashicorp/serf/serf/delegate.go +++ b/vendor/github.com/hashicorp/serf/serf/delegate.go @@ -1,9 +1,11 @@ package serf import ( + "bytes" "fmt" "github.com/armon/go-metrics" + "github.com/hashicorp/go-msgpack/codec" ) // delegate is the memberlist.Delegate implementation that Serf uses. @@ -83,6 +85,25 @@ func (d *delegate) NotifyMsg(buf []byte) { d.serf.logger.Printf("[DEBUG] serf: messageQueryResponseType: %v", resp.From) d.serf.handleQueryResponse(&resp) + case messageRelayType: + var header relayHeader + var handle codec.MsgpackHandle + reader := bytes.NewReader(buf[1:]) + decoder := codec.NewDecoder(reader, &handle) + if err := decoder.Decode(&header); err != nil { + d.serf.logger.Printf("[ERR] serf: Error decoding relay header: %s", err) + break + } + + // The remaining contents are the message itself, so forward that + raw := make([]byte, reader.Len()) + reader.Read(raw) + d.serf.logger.Printf("[DEBUG] serf: Relaying response to addr: %s", header.DestAddr.String()) + if err := d.serf.memberlist.SendTo(&header.DestAddr, raw); err != nil { + d.serf.logger.Printf("[ERR] serf: Error forwarding message to %s: %s", header.DestAddr.String(), err) + break + } + default: d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t) } @@ -230,7 +251,8 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) { // If we are doing a join, and eventJoinIgnore is set // then we set the eventMinTime to the EventLTime. This // prevents any of the incoming events from being processed - if isJoin && d.serf.eventJoinIgnore { + eventJoinIgnore := d.serf.eventJoinIgnore.Load().(bool) + if isJoin && eventJoinIgnore { d.serf.eventLock.Lock() if pp.EventLTime > d.serf.eventMinTime { d.serf.eventMinTime = pp.EventLTime diff --git a/vendor/github.com/hashicorp/serf/serf/event.go b/vendor/github.com/hashicorp/serf/serf/event.go index 8337e95ea..29211393f 100644 --- a/vendor/github.com/hashicorp/serf/serf/event.go +++ b/vendor/github.com/hashicorp/serf/serf/event.go @@ -95,18 +95,19 @@ func (u UserEvent) String() string { return fmt.Sprintf("user-event: %s", u.Name) } -// Query is the struct used EventQuery type events +// Query is the struct used by EventQuery type events type Query struct { LTime LamportTime Name string Payload []byte - serf *Serf - id uint32 // ID is not exported, since it may change - addr []byte // Address to respond to - port uint16 // Port to respond to - deadline time.Time // Must respond by this deadline - respLock sync.Mutex + serf *Serf + id uint32 // ID is not exported, since it may change + addr []byte // Address to respond to + port uint16 // Port to respond to + deadline time.Time // Must respond by this deadline + relayFactor uint8 // Number of duplicate responses to relay back to sender + respLock sync.Mutex } func (q *Query) EventType() EventType { @@ -129,12 +130,12 @@ func (q *Query) Respond(buf []byte) error { // Check if we've already responded if q.deadline.IsZero() { - return fmt.Errorf("Response already sent") + return fmt.Errorf("response already sent") } // Ensure we aren't past our response deadline if time.Now().After(q.deadline) { - return fmt.Errorf("Response is past the deadline") + return fmt.Errorf("response is past the deadline") } // Create response @@ -145,10 +146,10 @@ func (q *Query) Respond(buf []byte) error { Payload: buf, } - // Format the response + // Send a direct response raw, err := encodeMessage(messageQueryResponseType, &resp) if err != nil { - return fmt.Errorf("Failed to format response: %v", err) + return fmt.Errorf("failed to format response: %v", err) } // Check the size limit @@ -156,13 +157,18 @@ func (q *Query) Respond(buf []byte) error { return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit) } - // Send the response + // Send the response directly to the originator addr := net.UDPAddr{IP: q.addr, Port: int(q.port)} if err := q.serf.memberlist.SendTo(&addr, raw); err != nil { return err } - // Clera the deadline, response sent + // Relay the response through up to relayFactor other nodes + if err := q.serf.relayResponse(q.relayFactor, addr, &resp); err != nil { + return err + } + + // Clear the deadline, responses sent q.deadline = time.Time{} return nil } diff --git a/vendor/github.com/hashicorp/serf/serf/internal_query.go b/vendor/github.com/hashicorp/serf/serf/internal_query.go index 128b2cf21..04c984582 100644 --- a/vendor/github.com/hashicorp/serf/serf/internal_query.go +++ b/vendor/github.com/hashicorp/serf/serf/internal_query.go @@ -192,10 +192,12 @@ func (s *serfQueries) handleInstallKey(q *Query) { goto SEND } - if err := s.serf.writeKeyringFile(); err != nil { - response.Message = err.Error() - s.logger.Printf("[ERR] serf: Failed to write keyring file: %s", err) - goto SEND + if s.serf.config.KeyringFile != "" { + if err := s.serf.writeKeyringFile(); err != nil { + response.Message = err.Error() + s.logger.Printf("[ERR] serf: Failed to write keyring file: %s", err) + goto SEND + } } response.Result = true diff --git a/vendor/github.com/hashicorp/serf/serf/keymanager.go b/vendor/github.com/hashicorp/serf/serf/keymanager.go index 72a319449..fd53182fc 100644 --- a/vendor/github.com/hashicorp/serf/serf/keymanager.go +++ b/vendor/github.com/hashicorp/serf/serf/keymanager.go @@ -33,6 +33,13 @@ type KeyResponse struct { Keys map[string]int } +// KeyRequestOptions is used to contain optional parameters for a keyring operation +type KeyRequestOptions struct { + // RelayFactor is the number of duplicate query responses to send by relaying through + // other nodes, for redundancy + RelayFactor uint8 +} + // streamKeyResp takes care of reading responses from a channel and composing // them into a KeyResponse. It will update a KeyResponse *in place* and // therefore has nothing to return. @@ -83,7 +90,7 @@ func (k *KeyManager) streamKeyResp(resp *KeyResponse, ch <-chan NodeResponse) { // handleKeyRequest performs query broadcasting to all members for any type of // key operation and manages gathering responses and packing them up into a // KeyResponse for uniform response handling. -func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) { +func (k *KeyManager) handleKeyRequest(key, query string, opts *KeyRequestOptions) (*KeyResponse, error) { resp := &KeyResponse{ Messages: make(map[string]string), Keys: make(map[string]int), @@ -103,6 +110,9 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) { } qParam := k.serf.DefaultQueryParams() + if opts != nil { + qParam.RelayFactor = opts.RelayFactor + } queryResp, err := k.serf.Query(qName, req, qParam) if err != nil { return resp, err @@ -127,30 +137,42 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) { // responses from each of them, returning a list of messages from each node // and any applicable error conditions. func (k *KeyManager) InstallKey(key string) (*KeyResponse, error) { + return k.InstallKeyWithOptions(key, nil) +} + +func (k *KeyManager) InstallKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) { k.l.Lock() defer k.l.Unlock() - return k.handleKeyRequest(key, installKeyQuery) + return k.handleKeyRequest(key, installKeyQuery, opts) } // UseKey handles broadcasting a primary key change to all members in the // cluster, and gathering any response messages. If successful, there should // be an empty KeyResponse returned. func (k *KeyManager) UseKey(key string) (*KeyResponse, error) { + return k.UseKeyWithOptions(key, nil) +} + +func (k *KeyManager) UseKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) { k.l.Lock() defer k.l.Unlock() - return k.handleKeyRequest(key, useKeyQuery) + return k.handleKeyRequest(key, useKeyQuery, opts) } // RemoveKey handles broadcasting a key to the cluster for removal. Each member // will receive this event, and if they have the key in their keyring, remove // it. If any errors are encountered, RemoveKey will collect and relay them. func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) { + return k.RemoveKeyWithOptions(key, nil) +} + +func (k *KeyManager) RemoveKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) { k.l.Lock() defer k.l.Unlock() - return k.handleKeyRequest(key, removeKeyQuery) + return k.handleKeyRequest(key, removeKeyQuery, opts) } // ListKeys is used to collect installed keys from members in a Serf cluster @@ -159,8 +181,12 @@ func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) { // Since having multiple keys installed can cause performance penalties in some // cases, it's important to verify this information and remove unneeded keys. func (k *KeyManager) ListKeys() (*KeyResponse, error) { + return k.ListKeysWithOptions(nil) +} + +func (k *KeyManager) ListKeysWithOptions(opts *KeyRequestOptions) (*KeyResponse, error) { k.l.RLock() defer k.l.RUnlock() - return k.handleKeyRequest("", listKeysQuery) -} + return k.handleKeyRequest("", listKeysQuery, opts) +} \ No newline at end of file diff --git a/vendor/github.com/hashicorp/serf/serf/messages.go b/vendor/github.com/hashicorp/serf/serf/messages.go index c90c96450..20df5b8e8 100644 --- a/vendor/github.com/hashicorp/serf/serf/messages.go +++ b/vendor/github.com/hashicorp/serf/serf/messages.go @@ -2,8 +2,10 @@ package serf import ( "bytes" - "github.com/hashicorp/go-msgpack/codec" + "net" "time" + + "github.com/hashicorp/go-msgpack/codec" ) // messageType are the types of gossip messages Serf will send along @@ -20,6 +22,7 @@ const ( messageConflictResponseType messageKeyRequestType messageKeyResponseType + messageRelayType ) const ( @@ -75,15 +78,16 @@ type messageUserEvent struct { // messageQuery is used for query events type messageQuery struct { - LTime LamportTime // Event lamport time - ID uint32 // Query ID, randomly generated - Addr []byte // Source address, used for a direct reply - Port uint16 // Source port, used for a direct reply - Filters [][]byte // Potential query filters - Flags uint32 // Used to provide various flags - Timeout time.Duration // Maximum time between delivery and response - Name string // Query name - Payload []byte // Query payload + LTime LamportTime // Event lamport time + ID uint32 // Query ID, randomly generated + Addr []byte // Source address, used for a direct reply + Port uint16 // Source port, used for a direct reply + Filters [][]byte // Potential query filters + Flags uint32 // Used to provide various flags + RelayFactor uint8 // Used to set the number of duplicate relayed responses + Timeout time.Duration // Maximum time between delivery and response + Name string // Query name + Payload []byte // Query payload } // Ack checks if the ack flag is set @@ -136,6 +140,28 @@ func encodeMessage(t messageType, msg interface{}) ([]byte, error) { return buf.Bytes(), err } +// relayHeader is used to store the end destination of a relayed message +type relayHeader struct { + DestAddr net.UDPAddr +} + +// encodeRelayMessage wraps a message in the messageRelayType, adding the length and +// address of the end recipient to the front of the message +func encodeRelayMessage(t messageType, addr net.UDPAddr, msg interface{}) ([]byte, error) { + buf := bytes.NewBuffer(nil) + handle := codec.MsgpackHandle{} + encoder := codec.NewEncoder(buf, &handle) + + buf.WriteByte(uint8(messageRelayType)) + if err := encoder.Encode(relayHeader{DestAddr: addr}); err != nil { + return nil, err + } + + buf.WriteByte(uint8(t)) + err := encoder.Encode(msg) + return buf.Bytes(), err +} + func encodeFilter(f filterType, filt interface{}) ([]byte, error) { buf := bytes.NewBuffer(nil) buf.WriteByte(uint8(f)) diff --git a/vendor/github.com/hashicorp/serf/serf/ping_delegate.go b/vendor/github.com/hashicorp/serf/serf/ping_delegate.go index a482685a2..a4c44028e 100644 --- a/vendor/github.com/hashicorp/serf/serf/ping_delegate.go +++ b/vendor/github.com/hashicorp/serf/serf/ping_delegate.go @@ -2,7 +2,6 @@ package serf import ( "bytes" - "log" "time" "github.com/armon/go-metrics" @@ -37,7 +36,7 @@ func (p *pingDelegate) AckPayload() []byte { // The rest of the message is the serialized coordinate. enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{}) if err := enc.Encode(p.serf.coordClient.GetCoordinate()); err != nil { - log.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err) + p.serf.logger.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err) } return buf.Bytes() } @@ -52,7 +51,7 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat // Verify ping version in the header. version := payload[0] if version != PingVersion { - log.Printf("[ERR] serf: Unsupported ping version: %v", version) + p.serf.logger.Printf("[ERR] serf: Unsupported ping version: %v", version) return } @@ -61,29 +60,30 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat dec := codec.NewDecoder(r, &codec.MsgpackHandle{}) var coord coordinate.Coordinate if err := dec.Decode(&coord); err != nil { - log.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err) + p.serf.logger.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err) + return } - // Apply the update. Since this is a coordinate coming from some place - // else we harden this and look for dimensionality problems proactively. + // Apply the update. before := p.serf.coordClient.GetCoordinate() - if before.IsCompatibleWith(&coord) { - after := p.serf.coordClient.Update(other.Name, &coord, rtt) - - // Publish some metrics to give us an idea of how much we are - // adjusting each time we update. - d := float32(before.DistanceTo(after).Seconds() * 1.0e3) - metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d) - - // Cache the coordinate for the other node, and add our own - // to the cache as well since it just got updated. This lets - // users call GetCachedCoordinate with our node name, which is - // more friendly. - p.serf.coordCacheLock.Lock() - p.serf.coordCache[other.Name] = &coord - p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate() - p.serf.coordCacheLock.Unlock() - } else { - log.Printf("[ERR] serf: Rejected bad coordinate: %v\n", coord) + after, err := p.serf.coordClient.Update(other.Name, &coord, rtt) + if err != nil { + p.serf.logger.Printf("[ERR] serf: Rejected coordinate from %s: %v\n", + other.Name, err) + return } + + // Publish some metrics to give us an idea of how much we are + // adjusting each time we update. + d := float32(before.DistanceTo(after).Seconds() * 1.0e3) + metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d) + + // Cache the coordinate for the other node, and add our own + // to the cache as well since it just got updated. This lets + // users call GetCachedCoordinate with our node name, which is + // more friendly. + p.serf.coordCacheLock.Lock() + p.serf.coordCache[other.Name] = &coord + p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate() + p.serf.coordCacheLock.Unlock() } diff --git a/vendor/github.com/hashicorp/serf/serf/query.go b/vendor/github.com/hashicorp/serf/serf/query.go index f29a3b3c5..0bdbb3553 100644 --- a/vendor/github.com/hashicorp/serf/serf/query.go +++ b/vendor/github.com/hashicorp/serf/serf/query.go @@ -1,7 +1,11 @@ package serf import ( + "errors" + "fmt" "math" + "math/rand" + "net" "regexp" "sync" "time" @@ -24,6 +28,10 @@ type QueryParam struct { // send an ack. RequestAck bool + // RelayFactor controls the number of duplicate responses to relay + // back to the sender through other nodes for redundancy. + RelayFactor uint8 + // The timeout limits how long the query is left open. If not provided, // then a default timeout is used based on the configuration of Serf Timeout time.Duration @@ -93,6 +101,10 @@ type QueryResponse struct { // respCh is used to send a response from a node respCh chan NodeResponse + // acks/responses are used to track the nodes that have sent an ack/response + acks map[string]struct{} + responses map[string]struct{} + closed bool closeLock sync.Mutex } @@ -100,13 +112,15 @@ type QueryResponse struct { // newQueryResponse is used to construct a new query response func newQueryResponse(n int, q *messageQuery) *QueryResponse { resp := &QueryResponse{ - deadline: time.Now().Add(q.Timeout), - id: q.ID, - lTime: q.LTime, - respCh: make(chan NodeResponse, n), + deadline: time.Now().Add(q.Timeout), + id: q.ID, + lTime: q.LTime, + respCh: make(chan NodeResponse, n), + responses: make(map[string]struct{}), } if q.Ack() { resp.ackCh = make(chan string, n) + resp.acks = make(map[string]struct{}) } return resp } @@ -135,6 +149,8 @@ func (r *QueryResponse) Deadline() time.Time { // Finished returns if the query is finished running func (r *QueryResponse) Finished() bool { + r.closeLock.Lock() + defer r.closeLock.Unlock() return r.closed || time.Now().After(r.deadline) } @@ -151,6 +167,22 @@ func (r *QueryResponse) ResponseCh() <-chan NodeResponse { return r.respCh } +// sendResponse sends a response on the response channel ensuring the channel is not closed. +func (r *QueryResponse) sendResponse(nr NodeResponse) error { + r.closeLock.Lock() + defer r.closeLock.Unlock() + if r.closed { + return nil + } + select { + case r.respCh <- nr: + r.responses[nr.From] = struct{}{} + default: + return errors.New("serf: Failed to deliver query response, dropping") + } + return nil +} + // NodeResponse is used to represent a single response from a node type NodeResponse struct { From string @@ -208,3 +240,74 @@ func (s *Serf) shouldProcessQuery(filters [][]byte) bool { } return true } + +// relayResponse will relay a copy of the given response to up to relayFactor +// other members. +func (s *Serf) relayResponse(relayFactor uint8, addr net.UDPAddr, resp *messageQueryResponse) error { + if relayFactor == 0 { + return nil + } + + // Needs to be worth it; we need to have at least relayFactor *other* + // nodes. If you have a tiny cluster then the relayFactor shouldn't + // be needed. + members := s.Members() + if len(members) < int(relayFactor)+1 { + return nil + } + + // Prep the relay message, which is a wrapped version of the original. + raw, err := encodeRelayMessage(messageQueryResponseType, addr, &resp) + if err != nil { + return fmt.Errorf("failed to format relayed response: %v", err) + } + if len(raw) > s.config.QueryResponseSizeLimit { + return fmt.Errorf("relayed response exceeds limit of %d bytes", s.config.QueryResponseSizeLimit) + } + + // Relay to a random set of peers. + localName := s.LocalMember().Name + relayMembers := kRandomMembers(int(relayFactor), members, func(m Member) bool { + return m.Status != StatusAlive || m.ProtocolMax < 5 || m.Name == localName + }) + for _, m := range relayMembers { + relayAddr := net.UDPAddr{IP: m.Addr, Port: int(m.Port)} + if err := s.memberlist.SendTo(&relayAddr, raw); err != nil { + return fmt.Errorf("failed to send relay response: %v", err) + } + } + return nil +} + +// kRandomMembers selects up to k members from a given list, optionally +// filtering by the given filterFunc +func kRandomMembers(k int, members []Member, filterFunc func(Member) bool) []Member { + n := len(members) + kMembers := make([]Member, 0, k) +OUTER: + // Probe up to 3*n times, with large n this is not necessary + // since k << n, but with small n we want search to be + // exhaustive + for i := 0; i < 3*n && len(kMembers) < k; i++ { + // Get random member + idx := rand.Intn(n) + member := members[idx] + + // Give the filter a shot at it. + if filterFunc != nil && filterFunc(member) { + continue OUTER + } + + // Check if we have this member already + for j := 0; j < len(kMembers); j++ { + if member.Name == kMembers[j].Name { + continue OUTER + } + } + + // Append the member + kMembers = append(kMembers, member) + } + + return kMembers +} diff --git a/vendor/github.com/hashicorp/serf/serf/serf.go b/vendor/github.com/hashicorp/serf/serf/serf.go index 424da0195..7256eafab 100644 --- a/vendor/github.com/hashicorp/serf/serf/serf.go +++ b/vendor/github.com/hashicorp/serf/serf/serf.go @@ -10,8 +10,10 @@ import ( "log" "math/rand" "net" + "os" "strconv" "sync" + "sync/atomic" "time" "github.com/armon/go-metrics" @@ -25,7 +27,7 @@ import ( // version to memberlist below. const ( ProtocolVersionMin uint8 = 2 - ProtocolVersionMax = 4 + ProtocolVersionMax = 5 ) const ( @@ -73,7 +75,7 @@ type Serf struct { eventBroadcasts *memberlist.TransmitLimitedQueue eventBuffer []*userEvents - eventJoinIgnore bool + eventJoinIgnore atomic.Value eventMinTime LamportTime eventLock sync.RWMutex @@ -240,14 +242,24 @@ func Create(conf *Config) (*Serf, error) { conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) } + logger := conf.Logger + if logger == nil { + logOutput := conf.LogOutput + if logOutput == nil { + logOutput = os.Stderr + } + logger = log.New(logOutput, "", log.LstdFlags) + } + serf := &Serf{ config: conf, - logger: log.New(conf.LogOutput, "", log.LstdFlags), + logger: logger, members: make(map[string]*memberState), queryResponse: make(map[LamportTime]*QueryResponse), shutdownCh: make(chan struct{}), state: SerfAlive, } + serf.eventJoinIgnore.Store(false) // Check that the meta data length is okay if len(serf.encodeTags(conf.Tags)) > memberlist.MetaMaxSize { @@ -328,21 +340,15 @@ func Create(conf *Config) (*Serf, error) { // Setup the various broadcast queues, which we use to send our own // custom broadcasts along the gossip channel. serf.broadcasts = &memberlist.TransmitLimitedQueue{ - NumNodes: func() int { - return len(serf.members) - }, + NumNodes: serf.NumNodes, RetransmitMult: conf.MemberlistConfig.RetransmitMult, } serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{ - NumNodes: func() int { - return len(serf.members) - }, + NumNodes: serf.NumNodes, RetransmitMult: conf.MemberlistConfig.RetransmitMult, } serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{ - NumNodes: func() int { - return len(serf.members) - }, + NumNodes: serf.NumNodes, RetransmitMult: conf.MemberlistConfig.RetransmitMult, } @@ -499,15 +505,16 @@ func (s *Serf) Query(name string, payload []byte, params *QueryParam) (*QueryRes // Create a message q := messageQuery{ - LTime: s.queryClock.Time(), - ID: uint32(rand.Int31()), - Addr: local.Addr, - Port: local.Port, - Filters: filters, - Flags: flags, - Timeout: params.Timeout, - Name: name, - Payload: payload, + LTime: s.queryClock.Time(), + ID: uint32(rand.Int31()), + Addr: local.Addr, + Port: local.Port, + Filters: filters, + Flags: flags, + RelayFactor: params.RelayFactor, + Timeout: params.Timeout, + Name: name, + Payload: payload, } // Encode the query @@ -588,9 +595,9 @@ func (s *Serf) Join(existing []string, ignoreOld bool) (int, error) { // Ignore any events from a potential join. This is safe since we hold // the joinLock and nobody else can be doing a Join if ignoreOld { - s.eventJoinIgnore = true + s.eventJoinIgnore.Store(true) defer func() { - s.eventJoinIgnore = false + s.eventJoinIgnore.Store(false) }() } @@ -791,13 +798,15 @@ func (s *Serf) Shutdown() error { s.logger.Printf("[WARN] serf: Shutdown without a Leave") } + // Wait to close the shutdown channel until after we've shut down the + // memberlist and its associated network resources, since the shutdown + // channel signals that we are cleaned up outside of Serf. s.state = SerfShutdown - close(s.shutdownCh) - err := s.memberlist.Shutdown() if err != nil { return err } + close(s.shutdownCh) // Wait for the snapshoter to finish if we have one if s.snapshotter != nil { @@ -1237,19 +1246,23 @@ func (s *Serf) handleQuery(query *messageQuery) bool { if err := s.memberlist.SendTo(&addr, raw); err != nil { s.logger.Printf("[ERR] serf: failed to send ack: %v", err) } + if err := s.relayResponse(query.RelayFactor, addr, &ack); err != nil { + s.logger.Printf("[ERR] serf: failed to relay ack: %v", err) + } } } if s.config.EventCh != nil { s.config.EventCh <- &Query{ - LTime: query.LTime, - Name: query.Name, - Payload: query.Payload, - serf: s, - id: query.ID, - addr: query.Addr, - port: query.Port, - deadline: time.Now().Add(query.Timeout), + LTime: query.LTime, + Name: query.Name, + Payload: query.Payload, + serf: s, + id: query.ID, + addr: query.Addr, + port: query.Port, + deadline: time.Now().Add(query.Timeout), + relayFactor: query.RelayFactor, } } return rebroadcast @@ -1282,18 +1295,30 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) { // Process each type of response if resp.Ack() { + // Exit early if this is a duplicate ack + if _, ok := query.acks[resp.From]; ok { + metrics.IncrCounter([]string{"serf", "query_duplicate_acks"}, 1) + return + } + metrics.IncrCounter([]string{"serf", "query_acks"}, 1) select { case query.ackCh <- resp.From: + query.acks[resp.From] = struct{}{} default: - s.logger.Printf("[WARN] serf: Failed to delivery query ack, dropping") + s.logger.Printf("[WARN] serf: Failed to deliver query ack, dropping") } } else { + // Exit early if this is a duplicate response + if _, ok := query.responses[resp.From]; ok { + metrics.IncrCounter([]string{"serf", "query_duplicate_responses"}, 1) + return + } + metrics.IncrCounter([]string{"serf", "query_responses"}, 1) - select { - case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}: - default: - s.logger.Printf("[WARN] serf: Failed to delivery query response, dropping") + err := query.sendResponse(NodeResponse{From: resp.From, Payload: resp.Payload}) + if err != nil { + s.logger.Printf("[WARN] %v", err) } } } @@ -1353,7 +1378,7 @@ func (s *Serf) resolveNodeConflict() { // Update the counters responses++ - if bytes.Equal(member.Addr, local.Addr) && member.Port == local.Port { + if member.Addr.Equal(local.Addr) && member.Port == local.Port { matching++ } } @@ -1642,6 +1667,9 @@ func (s *Serf) Stats() map[string]string { "query_queue": toString(uint64(s.queryBroadcasts.NumQueued())), "encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()), } + if !s.config.DisableCoordinates { + stats["coordinate_resets"] = toString(uint64(s.coordClient.Stats().Resets)) + } return stats } diff --git a/vendor/github.com/hashicorp/serf/serf/snapshot.go b/vendor/github.com/hashicorp/serf/serf/snapshot.go index 44f8a5175..8e15f3f31 100644 --- a/vendor/github.com/hashicorp/serf/serf/snapshot.go +++ b/vendor/github.com/hashicorp/serf/serf/snapshot.go @@ -31,6 +31,7 @@ const flushInterval = 500 * time.Millisecond const clockUpdateInterval = 500 * time.Millisecond const coordinateUpdateInterval = 60 * time.Second const tmpExt = ".compact" +const snapshotErrorRecoveryInterval = 30 * time.Second // Snapshotter is responsible for ingesting events and persisting // them to disk, and providing a recovery mechanism at start time. @@ -55,6 +56,7 @@ type Snapshotter struct { rejoinAfterLeave bool shutdownCh <-chan struct{} waitCh chan struct{} + lastAttemptedCompaction time.Time } // PreviousNode is used to represent the previously known alive nodes @@ -84,7 +86,7 @@ func NewSnapshotter(path string, inCh := make(chan Event, 1024) // Try to open the file - fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755) + fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644) if err != nil { return nil, nil, fmt.Errorf("failed to open snapshot: %v", err) } @@ -311,6 +313,17 @@ func (s *Snapshotter) processQuery(q *Query) { func (s *Snapshotter) tryAppend(l string) { if err := s.appendLine(l); err != nil { s.logger.Printf("[ERR] serf: Failed to update snapshot: %v", err) + now := time.Now() + if now.Sub(s.lastAttemptedCompaction) > snapshotErrorRecoveryInterval { + s.lastAttemptedCompaction = now + s.logger.Printf("[INFO] serf: Attempting compaction to recover from error...") + err = s.compact() + if err != nil { + s.logger.Printf("[ERR] serf: Compaction failed, will reattempt after %v: %v", snapshotErrorRecoveryInterval, err) + } else { + s.logger.Printf("[INFO] serf: Finished compaction, successfully recovered from error state") + } + } } } @@ -532,7 +545,10 @@ func (s *Snapshotter) replay() error { s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err) continue } - s.coordClient.SetCoordinate(&coord) + if err := s.coordClient.SetCoordinate(&coord); err != nil { + s.logger.Printf("[WARN] serf: Failed to set coordinate: %v", err) + continue + } } else if line == "leave" { // Ignore a leave if we plan on re-joining if s.rejoinAfterLeave { diff --git a/vendor/vendor.json b/vendor/vendor.json index f3423c530..fd93d1b75 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -880,6 +880,13 @@ "revision": "bbeddf0b3ab3072a60525afbd6b6f47d33839eee", "revisionTime": "2017-07-14T18:26:01Z" }, + { + "checksumSHA1": "pvLOzocYsZtxuJ9pREHRTxYnoa4=", + "origin": "github.com/hashicorp/nomad/vendor/github.com/hashicorp/serf/serf", + "path": "github.com/hashicorp/serf/serf", + "revision": "bbeddf0b3ab3072a60525afbd6b6f47d33839eee", + "revisionTime": "2017-07-14T18:26:01Z" + }, { "checksumSHA1": "eGzvBRMFD6ZB3A6uO750np7Om/E=", "path": "github.com/hashicorp/vault",