diff --git a/vendor/github.com/hashicorp/serf/serf/config.go b/vendor/github.com/hashicorp/serf/serf/config.go index dfe878bbc..2403cea58 100644 --- a/vendor/github.com/hashicorp/serf/serf/config.go +++ b/vendor/github.com/hashicorp/serf/serf/config.go @@ -15,6 +15,7 @@ var ProtocolVersionMap map[uint8]uint8 func init() { ProtocolVersionMap = map[uint8]uint8{ + 5: 2, 4: 2, 3: 2, 2: 2, @@ -240,7 +241,7 @@ func DefaultConfig() *Config { EventBuffer: 512, QueryBuffer: 512, LogOutput: os.Stderr, - ProtocolVersion: ProtocolVersionMax, + ProtocolVersion: 4, ReapInterval: 15 * time.Second, RecentIntentTimeout: 5 * time.Minute, ReconnectInterval: 30 * time.Second, diff --git a/vendor/github.com/hashicorp/serf/serf/delegate.go b/vendor/github.com/hashicorp/serf/serf/delegate.go index d19ca3090..8f51cb7d0 100644 --- a/vendor/github.com/hashicorp/serf/serf/delegate.go +++ b/vendor/github.com/hashicorp/serf/serf/delegate.go @@ -1,9 +1,11 @@ package serf import ( + "bytes" "fmt" "github.com/armon/go-metrics" + "github.com/hashicorp/go-msgpack/codec" ) // delegate is the memberlist.Delegate implementation that Serf uses. @@ -83,6 +85,25 @@ func (d *delegate) NotifyMsg(buf []byte) { d.serf.logger.Printf("[DEBUG] serf: messageQueryResponseType: %v", resp.From) d.serf.handleQueryResponse(&resp) + case messageRelayType: + var header relayHeader + var handle codec.MsgpackHandle + reader := bytes.NewReader(buf[1:]) + decoder := codec.NewDecoder(reader, &handle) + if err := decoder.Decode(&header); err != nil { + d.serf.logger.Printf("[ERR] serf: Error decoding relay header: %s", err) + break + } + + // The remaining contents are the message itself, so forward that + raw := make([]byte, reader.Len()) + reader.Read(raw) + d.serf.logger.Printf("[DEBUG] serf: Relaying response to addr: %s", header.DestAddr.String()) + if err := d.serf.memberlist.SendTo(&header.DestAddr, raw); err != nil { + d.serf.logger.Printf("[ERR] serf: Error forwarding message to %s: %s", header.DestAddr.String(), err) + break + } + default: d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t) } diff --git a/vendor/github.com/hashicorp/serf/serf/event.go b/vendor/github.com/hashicorp/serf/serf/event.go index 8337e95ea..c6be9242e 100644 --- a/vendor/github.com/hashicorp/serf/serf/event.go +++ b/vendor/github.com/hashicorp/serf/serf/event.go @@ -2,6 +2,7 @@ package serf import ( "fmt" + "math/rand" "net" "sync" "time" @@ -95,18 +96,19 @@ func (u UserEvent) String() string { return fmt.Sprintf("user-event: %s", u.Name) } -// Query is the struct used EventQuery type events +// Query is the struct used by EventQuery type events type Query struct { LTime LamportTime Name string Payload []byte - serf *Serf - id uint32 // ID is not exported, since it may change - addr []byte // Address to respond to - port uint16 // Port to respond to - deadline time.Time // Must respond by this deadline - respLock sync.Mutex + serf *Serf + id uint32 // ID is not exported, since it may change + addr []byte // Address to respond to + port uint16 // Port to respond to + deadline time.Time // Must respond by this deadline + relayFactor uint8 // Number of duplicate responses to relay back to sender + respLock sync.Mutex } func (q *Query) EventType() EventType { @@ -145,24 +147,84 @@ func (q *Query) Respond(buf []byte) error { Payload: buf, } - // Format the response - raw, err := encodeMessage(messageQueryResponseType, &resp) - if err != nil { - return fmt.Errorf("Failed to format response: %v", err) + // Send a direct response + { + raw, err := encodeMessage(messageQueryResponseType, &resp) + if err != nil { + return fmt.Errorf("Failed to format response: %v", err) + } + + // Check the size limit + if len(raw) > q.serf.config.QueryResponseSizeLimit { + return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit) + } + + addr := net.UDPAddr{IP: q.addr, Port: int(q.port)} + if err := q.serf.memberlist.SendTo(&addr, raw); err != nil { + return err + } } - // Check the size limit - if len(raw) > q.serf.config.QueryResponseSizeLimit { - return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit) + // Relay the response through up to relayFactor other nodes + members := q.serf.Members() + if len(members) > 2 { + addr := net.UDPAddr{IP: q.addr, Port: int(q.port)} + raw, err := encodeRelayMessage(messageQueryResponseType, addr, &resp) + if err != nil { + return fmt.Errorf("Failed to format relayed response: %v", err) + } + + // Check the size limit + if len(raw) > q.serf.config.QueryResponseSizeLimit { + return fmt.Errorf("relayed response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit) + } + + relayMembers := kRandomMembers(int(q.relayFactor), members, func(m Member) bool { + return m.Status != StatusAlive || m.ProtocolMax < 5 || m.Name == q.serf.LocalMember().Name + }) + + for _, m := range relayMembers { + relayAddr := net.UDPAddr{IP: m.Addr, Port: int(m.Port)} + if err := q.serf.memberlist.SendTo(&relayAddr, raw); err != nil { + return err + } + } } - // Send the response - addr := net.UDPAddr{IP: q.addr, Port: int(q.port)} - if err := q.serf.memberlist.SendTo(&addr, raw); err != nil { - return err - } - - // Clera the deadline, response sent + // Clear the deadline, responses sent q.deadline = time.Time{} return nil } + +// kRandomMembers selects up to k members from a given list, optionally +// filtering by the given filterFunc +func kRandomMembers(k int, members []Member, filterFunc func(Member) bool) []Member { + n := len(members) + kMembers := make([]Member, 0, k) +OUTER: + // Probe up to 3*n times, with large n this is not necessary + // since k << n, but with small n we want search to be + // exhaustive + for i := 0; i < 3*n && len(kMembers) < k; i++ { + // Get random member + idx := rand.Intn(n) + member := members[idx] + + // Give the filter a shot at it. + if filterFunc != nil && filterFunc(member) { + continue OUTER + } + + // Check if we have this member already + for j := 0; j < len(kMembers); j++ { + if member.Name == kMembers[j].Name { + continue OUTER + } + } + + // Append the member + kMembers = append(kMembers, member) + } + + return kMembers +} diff --git a/vendor/github.com/hashicorp/serf/serf/keymanager.go b/vendor/github.com/hashicorp/serf/serf/keymanager.go index 72a319449..fd53182fc 100644 --- a/vendor/github.com/hashicorp/serf/serf/keymanager.go +++ b/vendor/github.com/hashicorp/serf/serf/keymanager.go @@ -33,6 +33,13 @@ type KeyResponse struct { Keys map[string]int } +// KeyRequestOptions is used to contain optional parameters for a keyring operation +type KeyRequestOptions struct { + // RelayFactor is the number of duplicate query responses to send by relaying through + // other nodes, for redundancy + RelayFactor uint8 +} + // streamKeyResp takes care of reading responses from a channel and composing // them into a KeyResponse. It will update a KeyResponse *in place* and // therefore has nothing to return. @@ -83,7 +90,7 @@ func (k *KeyManager) streamKeyResp(resp *KeyResponse, ch <-chan NodeResponse) { // handleKeyRequest performs query broadcasting to all members for any type of // key operation and manages gathering responses and packing them up into a // KeyResponse for uniform response handling. -func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) { +func (k *KeyManager) handleKeyRequest(key, query string, opts *KeyRequestOptions) (*KeyResponse, error) { resp := &KeyResponse{ Messages: make(map[string]string), Keys: make(map[string]int), @@ -103,6 +110,9 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) { } qParam := k.serf.DefaultQueryParams() + if opts != nil { + qParam.RelayFactor = opts.RelayFactor + } queryResp, err := k.serf.Query(qName, req, qParam) if err != nil { return resp, err @@ -127,30 +137,42 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) { // responses from each of them, returning a list of messages from each node // and any applicable error conditions. func (k *KeyManager) InstallKey(key string) (*KeyResponse, error) { + return k.InstallKeyWithOptions(key, nil) +} + +func (k *KeyManager) InstallKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) { k.l.Lock() defer k.l.Unlock() - return k.handleKeyRequest(key, installKeyQuery) + return k.handleKeyRequest(key, installKeyQuery, opts) } // UseKey handles broadcasting a primary key change to all members in the // cluster, and gathering any response messages. If successful, there should // be an empty KeyResponse returned. func (k *KeyManager) UseKey(key string) (*KeyResponse, error) { + return k.UseKeyWithOptions(key, nil) +} + +func (k *KeyManager) UseKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) { k.l.Lock() defer k.l.Unlock() - return k.handleKeyRequest(key, useKeyQuery) + return k.handleKeyRequest(key, useKeyQuery, opts) } // RemoveKey handles broadcasting a key to the cluster for removal. Each member // will receive this event, and if they have the key in their keyring, remove // it. If any errors are encountered, RemoveKey will collect and relay them. func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) { + return k.RemoveKeyWithOptions(key, nil) +} + +func (k *KeyManager) RemoveKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) { k.l.Lock() defer k.l.Unlock() - return k.handleKeyRequest(key, removeKeyQuery) + return k.handleKeyRequest(key, removeKeyQuery, opts) } // ListKeys is used to collect installed keys from members in a Serf cluster @@ -159,8 +181,12 @@ func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) { // Since having multiple keys installed can cause performance penalties in some // cases, it's important to verify this information and remove unneeded keys. func (k *KeyManager) ListKeys() (*KeyResponse, error) { + return k.ListKeysWithOptions(nil) +} + +func (k *KeyManager) ListKeysWithOptions(opts *KeyRequestOptions) (*KeyResponse, error) { k.l.RLock() defer k.l.RUnlock() - return k.handleKeyRequest("", listKeysQuery) -} + return k.handleKeyRequest("", listKeysQuery, opts) +} \ No newline at end of file diff --git a/vendor/github.com/hashicorp/serf/serf/messages.go b/vendor/github.com/hashicorp/serf/serf/messages.go index c90c96450..20df5b8e8 100644 --- a/vendor/github.com/hashicorp/serf/serf/messages.go +++ b/vendor/github.com/hashicorp/serf/serf/messages.go @@ -2,8 +2,10 @@ package serf import ( "bytes" - "github.com/hashicorp/go-msgpack/codec" + "net" "time" + + "github.com/hashicorp/go-msgpack/codec" ) // messageType are the types of gossip messages Serf will send along @@ -20,6 +22,7 @@ const ( messageConflictResponseType messageKeyRequestType messageKeyResponseType + messageRelayType ) const ( @@ -75,15 +78,16 @@ type messageUserEvent struct { // messageQuery is used for query events type messageQuery struct { - LTime LamportTime // Event lamport time - ID uint32 // Query ID, randomly generated - Addr []byte // Source address, used for a direct reply - Port uint16 // Source port, used for a direct reply - Filters [][]byte // Potential query filters - Flags uint32 // Used to provide various flags - Timeout time.Duration // Maximum time between delivery and response - Name string // Query name - Payload []byte // Query payload + LTime LamportTime // Event lamport time + ID uint32 // Query ID, randomly generated + Addr []byte // Source address, used for a direct reply + Port uint16 // Source port, used for a direct reply + Filters [][]byte // Potential query filters + Flags uint32 // Used to provide various flags + RelayFactor uint8 // Used to set the number of duplicate relayed responses + Timeout time.Duration // Maximum time between delivery and response + Name string // Query name + Payload []byte // Query payload } // Ack checks if the ack flag is set @@ -136,6 +140,28 @@ func encodeMessage(t messageType, msg interface{}) ([]byte, error) { return buf.Bytes(), err } +// relayHeader is used to store the end destination of a relayed message +type relayHeader struct { + DestAddr net.UDPAddr +} + +// encodeRelayMessage wraps a message in the messageRelayType, adding the length and +// address of the end recipient to the front of the message +func encodeRelayMessage(t messageType, addr net.UDPAddr, msg interface{}) ([]byte, error) { + buf := bytes.NewBuffer(nil) + handle := codec.MsgpackHandle{} + encoder := codec.NewEncoder(buf, &handle) + + buf.WriteByte(uint8(messageRelayType)) + if err := encoder.Encode(relayHeader{DestAddr: addr}); err != nil { + return nil, err + } + + buf.WriteByte(uint8(t)) + err := encoder.Encode(msg) + return buf.Bytes(), err +} + func encodeFilter(f filterType, filt interface{}) ([]byte, error) { buf := bytes.NewBuffer(nil) buf.WriteByte(uint8(f)) diff --git a/vendor/github.com/hashicorp/serf/serf/query.go b/vendor/github.com/hashicorp/serf/serf/query.go index f29a3b3c5..d758945c1 100644 --- a/vendor/github.com/hashicorp/serf/serf/query.go +++ b/vendor/github.com/hashicorp/serf/serf/query.go @@ -24,6 +24,10 @@ type QueryParam struct { // send an ack. RequestAck bool + // RelayFactor controls the number of duplicate responses to relay + // back to the sender through other nodes for redundancy. + RelayFactor uint8 + // The timeout limits how long the query is left open. If not provided, // then a default timeout is used based on the configuration of Serf Timeout time.Duration @@ -93,6 +97,10 @@ type QueryResponse struct { // respCh is used to send a response from a node respCh chan NodeResponse + // acks/responses are used to track the nodes that have sent an ack/response + acks map[string]struct{} + responses map[string]struct{} + closed bool closeLock sync.Mutex } @@ -100,13 +108,15 @@ type QueryResponse struct { // newQueryResponse is used to construct a new query response func newQueryResponse(n int, q *messageQuery) *QueryResponse { resp := &QueryResponse{ - deadline: time.Now().Add(q.Timeout), - id: q.ID, - lTime: q.LTime, - respCh: make(chan NodeResponse, n), + deadline: time.Now().Add(q.Timeout), + id: q.ID, + lTime: q.LTime, + respCh: make(chan NodeResponse, n), + responses: make(map[string]struct{}), } if q.Ack() { resp.ackCh = make(chan string, n) + resp.acks = make(map[string]struct{}) } return resp } diff --git a/vendor/github.com/hashicorp/serf/serf/serf.go b/vendor/github.com/hashicorp/serf/serf/serf.go index 424da0195..cb21f882c 100644 --- a/vendor/github.com/hashicorp/serf/serf/serf.go +++ b/vendor/github.com/hashicorp/serf/serf/serf.go @@ -25,7 +25,7 @@ import ( // version to memberlist below. const ( ProtocolVersionMin uint8 = 2 - ProtocolVersionMax = 4 + ProtocolVersionMax = 5 ) const ( @@ -499,15 +499,16 @@ func (s *Serf) Query(name string, payload []byte, params *QueryParam) (*QueryRes // Create a message q := messageQuery{ - LTime: s.queryClock.Time(), - ID: uint32(rand.Int31()), - Addr: local.Addr, - Port: local.Port, - Filters: filters, - Flags: flags, - Timeout: params.Timeout, - Name: name, - Payload: payload, + LTime: s.queryClock.Time(), + ID: uint32(rand.Int31()), + Addr: local.Addr, + Port: local.Port, + Filters: filters, + Flags: flags, + RelayFactor: params.RelayFactor, + Timeout: params.Timeout, + Name: name, + Payload: payload, } // Encode the query @@ -1242,14 +1243,15 @@ func (s *Serf) handleQuery(query *messageQuery) bool { if s.config.EventCh != nil { s.config.EventCh <- &Query{ - LTime: query.LTime, - Name: query.Name, - Payload: query.Payload, - serf: s, - id: query.ID, - addr: query.Addr, - port: query.Port, - deadline: time.Now().Add(query.Timeout), + LTime: query.LTime, + Name: query.Name, + Payload: query.Payload, + serf: s, + id: query.ID, + addr: query.Addr, + port: query.Port, + deadline: time.Now().Add(query.Timeout), + relayFactor: query.RelayFactor, } } return rebroadcast @@ -1282,18 +1284,30 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) { // Process each type of response if resp.Ack() { + // Exit early if this is a duplicate ack + if _, ok := query.acks[resp.From]; ok { + return + } + metrics.IncrCounter([]string{"serf", "query_acks"}, 1) select { case query.ackCh <- resp.From: + query.acks[resp.From] = struct{}{} default: - s.logger.Printf("[WARN] serf: Failed to delivery query ack, dropping") + s.logger.Printf("[WARN] serf: Failed to deliver query ack, dropping") } } else { + // Exit early if this is a duplicate response + if _, ok := query.responses[resp.From]; ok { + return + } + metrics.IncrCounter([]string{"serf", "query_responses"}, 1) select { case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}: + query.responses[resp.From] = struct{}{} default: - s.logger.Printf("[WARN] serf: Failed to delivery query response, dropping") + s.logger.Printf("[WARN] serf: Failed to deliver query response, dropping") } } } diff --git a/vendor/vendor.json b/vendor/vendor.json index e1f9a145f..55d0b0bcb 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -625,11 +625,11 @@ "revisionTime": "2016-08-09T01:42:04Z" }, { - "checksumSHA1": "vLyudzMEdik8IpRY1H2vRa2PeLU=", + "checksumSHA1": "EhESUBqb9Kot4rzZu2l/oAJoYCU=", "comment": "v0.7.0-66-g6c4672d", "path": "github.com/hashicorp/serf/serf", - "revision": "114430d8210835d66defdc31cdc176c58e060005", - "revisionTime": "2016-08-09T01:42:04Z" + "revision": "34e94dbd8faa991710b442c22ad6ad37c8b44c3b", + "revisionTime": "2017-02-02T01:56:25Z" }, { "checksumSHA1": "ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=",