Update serf deps

This commit is contained in:
Kyle Havlovitz 2017-02-01 20:02:13 -05:00
parent ee4b14a0ad
commit 21ce56e6f3
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
8 changed files with 225 additions and 65 deletions

View File

@ -15,6 +15,7 @@ var ProtocolVersionMap map[uint8]uint8
func init() { func init() {
ProtocolVersionMap = map[uint8]uint8{ ProtocolVersionMap = map[uint8]uint8{
5: 2,
4: 2, 4: 2,
3: 2, 3: 2,
2: 2, 2: 2,
@ -240,7 +241,7 @@ func DefaultConfig() *Config {
EventBuffer: 512, EventBuffer: 512,
QueryBuffer: 512, QueryBuffer: 512,
LogOutput: os.Stderr, LogOutput: os.Stderr,
ProtocolVersion: ProtocolVersionMax, ProtocolVersion: 4,
ReapInterval: 15 * time.Second, ReapInterval: 15 * time.Second,
RecentIntentTimeout: 5 * time.Minute, RecentIntentTimeout: 5 * time.Minute,
ReconnectInterval: 30 * time.Second, ReconnectInterval: 30 * time.Second,

View File

@ -1,9 +1,11 @@
package serf package serf
import ( import (
"bytes"
"fmt" "fmt"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/go-msgpack/codec"
) )
// delegate is the memberlist.Delegate implementation that Serf uses. // delegate is the memberlist.Delegate implementation that Serf uses.
@ -83,6 +85,25 @@ func (d *delegate) NotifyMsg(buf []byte) {
d.serf.logger.Printf("[DEBUG] serf: messageQueryResponseType: %v", resp.From) d.serf.logger.Printf("[DEBUG] serf: messageQueryResponseType: %v", resp.From)
d.serf.handleQueryResponse(&resp) d.serf.handleQueryResponse(&resp)
case messageRelayType:
var header relayHeader
var handle codec.MsgpackHandle
reader := bytes.NewReader(buf[1:])
decoder := codec.NewDecoder(reader, &handle)
if err := decoder.Decode(&header); err != nil {
d.serf.logger.Printf("[ERR] serf: Error decoding relay header: %s", err)
break
}
// The remaining contents are the message itself, so forward that
raw := make([]byte, reader.Len())
reader.Read(raw)
d.serf.logger.Printf("[DEBUG] serf: Relaying response to addr: %s", header.DestAddr.String())
if err := d.serf.memberlist.SendTo(&header.DestAddr, raw); err != nil {
d.serf.logger.Printf("[ERR] serf: Error forwarding message to %s: %s", header.DestAddr.String(), err)
break
}
default: default:
d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t) d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t)
} }

View File

@ -2,6 +2,7 @@ package serf
import ( import (
"fmt" "fmt"
"math/rand"
"net" "net"
"sync" "sync"
"time" "time"
@ -95,18 +96,19 @@ func (u UserEvent) String() string {
return fmt.Sprintf("user-event: %s", u.Name) return fmt.Sprintf("user-event: %s", u.Name)
} }
// Query is the struct used EventQuery type events // Query is the struct used by EventQuery type events
type Query struct { type Query struct {
LTime LamportTime LTime LamportTime
Name string Name string
Payload []byte Payload []byte
serf *Serf serf *Serf
id uint32 // ID is not exported, since it may change id uint32 // ID is not exported, since it may change
addr []byte // Address to respond to addr []byte // Address to respond to
port uint16 // Port to respond to port uint16 // Port to respond to
deadline time.Time // Must respond by this deadline deadline time.Time // Must respond by this deadline
respLock sync.Mutex relayFactor uint8 // Number of duplicate responses to relay back to sender
respLock sync.Mutex
} }
func (q *Query) EventType() EventType { func (q *Query) EventType() EventType {
@ -145,24 +147,84 @@ func (q *Query) Respond(buf []byte) error {
Payload: buf, Payload: buf,
} }
// Format the response // Send a direct response
raw, err := encodeMessage(messageQueryResponseType, &resp) {
if err != nil { raw, err := encodeMessage(messageQueryResponseType, &resp)
return fmt.Errorf("Failed to format response: %v", err) if err != nil {
return fmt.Errorf("Failed to format response: %v", err)
}
// Check the size limit
if len(raw) > q.serf.config.QueryResponseSizeLimit {
return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
}
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
if err := q.serf.memberlist.SendTo(&addr, raw); err != nil {
return err
}
} }
// Check the size limit // Relay the response through up to relayFactor other nodes
if len(raw) > q.serf.config.QueryResponseSizeLimit { members := q.serf.Members()
return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit) if len(members) > 2 {
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
raw, err := encodeRelayMessage(messageQueryResponseType, addr, &resp)
if err != nil {
return fmt.Errorf("Failed to format relayed response: %v", err)
}
// Check the size limit
if len(raw) > q.serf.config.QueryResponseSizeLimit {
return fmt.Errorf("relayed response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
}
relayMembers := kRandomMembers(int(q.relayFactor), members, func(m Member) bool {
return m.Status != StatusAlive || m.ProtocolMax < 5 || m.Name == q.serf.LocalMember().Name
})
for _, m := range relayMembers {
relayAddr := net.UDPAddr{IP: m.Addr, Port: int(m.Port)}
if err := q.serf.memberlist.SendTo(&relayAddr, raw); err != nil {
return err
}
}
} }
// Send the response // Clear the deadline, responses sent
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
if err := q.serf.memberlist.SendTo(&addr, raw); err != nil {
return err
}
// Clera the deadline, response sent
q.deadline = time.Time{} q.deadline = time.Time{}
return nil return nil
} }
// kRandomMembers selects up to k members from a given list, optionally
// filtering by the given filterFunc
func kRandomMembers(k int, members []Member, filterFunc func(Member) bool) []Member {
n := len(members)
kMembers := make([]Member, 0, k)
OUTER:
// Probe up to 3*n times, with large n this is not necessary
// since k << n, but with small n we want search to be
// exhaustive
for i := 0; i < 3*n && len(kMembers) < k; i++ {
// Get random member
idx := rand.Intn(n)
member := members[idx]
// Give the filter a shot at it.
if filterFunc != nil && filterFunc(member) {
continue OUTER
}
// Check if we have this member already
for j := 0; j < len(kMembers); j++ {
if member.Name == kMembers[j].Name {
continue OUTER
}
}
// Append the member
kMembers = append(kMembers, member)
}
return kMembers
}

View File

@ -33,6 +33,13 @@ type KeyResponse struct {
Keys map[string]int Keys map[string]int
} }
// KeyRequestOptions is used to contain optional parameters for a keyring operation
type KeyRequestOptions struct {
// RelayFactor is the number of duplicate query responses to send by relaying through
// other nodes, for redundancy
RelayFactor uint8
}
// streamKeyResp takes care of reading responses from a channel and composing // streamKeyResp takes care of reading responses from a channel and composing
// them into a KeyResponse. It will update a KeyResponse *in place* and // them into a KeyResponse. It will update a KeyResponse *in place* and
// therefore has nothing to return. // therefore has nothing to return.
@ -83,7 +90,7 @@ func (k *KeyManager) streamKeyResp(resp *KeyResponse, ch <-chan NodeResponse) {
// handleKeyRequest performs query broadcasting to all members for any type of // handleKeyRequest performs query broadcasting to all members for any type of
// key operation and manages gathering responses and packing them up into a // key operation and manages gathering responses and packing them up into a
// KeyResponse for uniform response handling. // KeyResponse for uniform response handling.
func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) { func (k *KeyManager) handleKeyRequest(key, query string, opts *KeyRequestOptions) (*KeyResponse, error) {
resp := &KeyResponse{ resp := &KeyResponse{
Messages: make(map[string]string), Messages: make(map[string]string),
Keys: make(map[string]int), Keys: make(map[string]int),
@ -103,6 +110,9 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
} }
qParam := k.serf.DefaultQueryParams() qParam := k.serf.DefaultQueryParams()
if opts != nil {
qParam.RelayFactor = opts.RelayFactor
}
queryResp, err := k.serf.Query(qName, req, qParam) queryResp, err := k.serf.Query(qName, req, qParam)
if err != nil { if err != nil {
return resp, err return resp, err
@ -127,30 +137,42 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
// responses from each of them, returning a list of messages from each node // responses from each of them, returning a list of messages from each node
// and any applicable error conditions. // and any applicable error conditions.
func (k *KeyManager) InstallKey(key string) (*KeyResponse, error) { func (k *KeyManager) InstallKey(key string) (*KeyResponse, error) {
return k.InstallKeyWithOptions(key, nil)
}
func (k *KeyManager) InstallKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
k.l.Lock() k.l.Lock()
defer k.l.Unlock() defer k.l.Unlock()
return k.handleKeyRequest(key, installKeyQuery) return k.handleKeyRequest(key, installKeyQuery, opts)
} }
// UseKey handles broadcasting a primary key change to all members in the // UseKey handles broadcasting a primary key change to all members in the
// cluster, and gathering any response messages. If successful, there should // cluster, and gathering any response messages. If successful, there should
// be an empty KeyResponse returned. // be an empty KeyResponse returned.
func (k *KeyManager) UseKey(key string) (*KeyResponse, error) { func (k *KeyManager) UseKey(key string) (*KeyResponse, error) {
return k.UseKeyWithOptions(key, nil)
}
func (k *KeyManager) UseKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
k.l.Lock() k.l.Lock()
defer k.l.Unlock() defer k.l.Unlock()
return k.handleKeyRequest(key, useKeyQuery) return k.handleKeyRequest(key, useKeyQuery, opts)
} }
// RemoveKey handles broadcasting a key to the cluster for removal. Each member // RemoveKey handles broadcasting a key to the cluster for removal. Each member
// will receive this event, and if they have the key in their keyring, remove // will receive this event, and if they have the key in their keyring, remove
// it. If any errors are encountered, RemoveKey will collect and relay them. // it. If any errors are encountered, RemoveKey will collect and relay them.
func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) { func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) {
return k.RemoveKeyWithOptions(key, nil)
}
func (k *KeyManager) RemoveKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
k.l.Lock() k.l.Lock()
defer k.l.Unlock() defer k.l.Unlock()
return k.handleKeyRequest(key, removeKeyQuery) return k.handleKeyRequest(key, removeKeyQuery, opts)
} }
// ListKeys is used to collect installed keys from members in a Serf cluster // ListKeys is used to collect installed keys from members in a Serf cluster
@ -159,8 +181,12 @@ func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) {
// Since having multiple keys installed can cause performance penalties in some // Since having multiple keys installed can cause performance penalties in some
// cases, it's important to verify this information and remove unneeded keys. // cases, it's important to verify this information and remove unneeded keys.
func (k *KeyManager) ListKeys() (*KeyResponse, error) { func (k *KeyManager) ListKeys() (*KeyResponse, error) {
return k.ListKeysWithOptions(nil)
}
func (k *KeyManager) ListKeysWithOptions(opts *KeyRequestOptions) (*KeyResponse, error) {
k.l.RLock() k.l.RLock()
defer k.l.RUnlock() defer k.l.RUnlock()
return k.handleKeyRequest("", listKeysQuery) return k.handleKeyRequest("", listKeysQuery, opts)
} }

View File

@ -2,8 +2,10 @@ package serf
import ( import (
"bytes" "bytes"
"github.com/hashicorp/go-msgpack/codec" "net"
"time" "time"
"github.com/hashicorp/go-msgpack/codec"
) )
// messageType are the types of gossip messages Serf will send along // messageType are the types of gossip messages Serf will send along
@ -20,6 +22,7 @@ const (
messageConflictResponseType messageConflictResponseType
messageKeyRequestType messageKeyRequestType
messageKeyResponseType messageKeyResponseType
messageRelayType
) )
const ( const (
@ -75,15 +78,16 @@ type messageUserEvent struct {
// messageQuery is used for query events // messageQuery is used for query events
type messageQuery struct { type messageQuery struct {
LTime LamportTime // Event lamport time LTime LamportTime // Event lamport time
ID uint32 // Query ID, randomly generated ID uint32 // Query ID, randomly generated
Addr []byte // Source address, used for a direct reply Addr []byte // Source address, used for a direct reply
Port uint16 // Source port, used for a direct reply Port uint16 // Source port, used for a direct reply
Filters [][]byte // Potential query filters Filters [][]byte // Potential query filters
Flags uint32 // Used to provide various flags Flags uint32 // Used to provide various flags
Timeout time.Duration // Maximum time between delivery and response RelayFactor uint8 // Used to set the number of duplicate relayed responses
Name string // Query name Timeout time.Duration // Maximum time between delivery and response
Payload []byte // Query payload Name string // Query name
Payload []byte // Query payload
} }
// Ack checks if the ack flag is set // Ack checks if the ack flag is set
@ -136,6 +140,28 @@ func encodeMessage(t messageType, msg interface{}) ([]byte, error) {
return buf.Bytes(), err return buf.Bytes(), err
} }
// relayHeader is used to store the end destination of a relayed message
type relayHeader struct {
DestAddr net.UDPAddr
}
// encodeRelayMessage wraps a message in the messageRelayType, adding the length and
// address of the end recipient to the front of the message
func encodeRelayMessage(t messageType, addr net.UDPAddr, msg interface{}) ([]byte, error) {
buf := bytes.NewBuffer(nil)
handle := codec.MsgpackHandle{}
encoder := codec.NewEncoder(buf, &handle)
buf.WriteByte(uint8(messageRelayType))
if err := encoder.Encode(relayHeader{DestAddr: addr}); err != nil {
return nil, err
}
buf.WriteByte(uint8(t))
err := encoder.Encode(msg)
return buf.Bytes(), err
}
func encodeFilter(f filterType, filt interface{}) ([]byte, error) { func encodeFilter(f filterType, filt interface{}) ([]byte, error) {
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)
buf.WriteByte(uint8(f)) buf.WriteByte(uint8(f))

View File

@ -24,6 +24,10 @@ type QueryParam struct {
// send an ack. // send an ack.
RequestAck bool RequestAck bool
// RelayFactor controls the number of duplicate responses to relay
// back to the sender through other nodes for redundancy.
RelayFactor uint8
// The timeout limits how long the query is left open. If not provided, // The timeout limits how long the query is left open. If not provided,
// then a default timeout is used based on the configuration of Serf // then a default timeout is used based on the configuration of Serf
Timeout time.Duration Timeout time.Duration
@ -93,6 +97,10 @@ type QueryResponse struct {
// respCh is used to send a response from a node // respCh is used to send a response from a node
respCh chan NodeResponse respCh chan NodeResponse
// acks/responses are used to track the nodes that have sent an ack/response
acks map[string]struct{}
responses map[string]struct{}
closed bool closed bool
closeLock sync.Mutex closeLock sync.Mutex
} }
@ -100,13 +108,15 @@ type QueryResponse struct {
// newQueryResponse is used to construct a new query response // newQueryResponse is used to construct a new query response
func newQueryResponse(n int, q *messageQuery) *QueryResponse { func newQueryResponse(n int, q *messageQuery) *QueryResponse {
resp := &QueryResponse{ resp := &QueryResponse{
deadline: time.Now().Add(q.Timeout), deadline: time.Now().Add(q.Timeout),
id: q.ID, id: q.ID,
lTime: q.LTime, lTime: q.LTime,
respCh: make(chan NodeResponse, n), respCh: make(chan NodeResponse, n),
responses: make(map[string]struct{}),
} }
if q.Ack() { if q.Ack() {
resp.ackCh = make(chan string, n) resp.ackCh = make(chan string, n)
resp.acks = make(map[string]struct{})
} }
return resp return resp
} }

View File

@ -25,7 +25,7 @@ import (
// version to memberlist below. // version to memberlist below.
const ( const (
ProtocolVersionMin uint8 = 2 ProtocolVersionMin uint8 = 2
ProtocolVersionMax = 4 ProtocolVersionMax = 5
) )
const ( const (
@ -499,15 +499,16 @@ func (s *Serf) Query(name string, payload []byte, params *QueryParam) (*QueryRes
// Create a message // Create a message
q := messageQuery{ q := messageQuery{
LTime: s.queryClock.Time(), LTime: s.queryClock.Time(),
ID: uint32(rand.Int31()), ID: uint32(rand.Int31()),
Addr: local.Addr, Addr: local.Addr,
Port: local.Port, Port: local.Port,
Filters: filters, Filters: filters,
Flags: flags, Flags: flags,
Timeout: params.Timeout, RelayFactor: params.RelayFactor,
Name: name, Timeout: params.Timeout,
Payload: payload, Name: name,
Payload: payload,
} }
// Encode the query // Encode the query
@ -1242,14 +1243,15 @@ func (s *Serf) handleQuery(query *messageQuery) bool {
if s.config.EventCh != nil { if s.config.EventCh != nil {
s.config.EventCh <- &Query{ s.config.EventCh <- &Query{
LTime: query.LTime, LTime: query.LTime,
Name: query.Name, Name: query.Name,
Payload: query.Payload, Payload: query.Payload,
serf: s, serf: s,
id: query.ID, id: query.ID,
addr: query.Addr, addr: query.Addr,
port: query.Port, port: query.Port,
deadline: time.Now().Add(query.Timeout), deadline: time.Now().Add(query.Timeout),
relayFactor: query.RelayFactor,
} }
} }
return rebroadcast return rebroadcast
@ -1282,18 +1284,30 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
// Process each type of response // Process each type of response
if resp.Ack() { if resp.Ack() {
// Exit early if this is a duplicate ack
if _, ok := query.acks[resp.From]; ok {
return
}
metrics.IncrCounter([]string{"serf", "query_acks"}, 1) metrics.IncrCounter([]string{"serf", "query_acks"}, 1)
select { select {
case query.ackCh <- resp.From: case query.ackCh <- resp.From:
query.acks[resp.From] = struct{}{}
default: default:
s.logger.Printf("[WARN] serf: Failed to delivery query ack, dropping") s.logger.Printf("[WARN] serf: Failed to deliver query ack, dropping")
} }
} else { } else {
// Exit early if this is a duplicate response
if _, ok := query.responses[resp.From]; ok {
return
}
metrics.IncrCounter([]string{"serf", "query_responses"}, 1) metrics.IncrCounter([]string{"serf", "query_responses"}, 1)
select { select {
case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}: case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}:
query.responses[resp.From] = struct{}{}
default: default:
s.logger.Printf("[WARN] serf: Failed to delivery query response, dropping") s.logger.Printf("[WARN] serf: Failed to deliver query response, dropping")
} }
} }
} }

6
vendor/vendor.json vendored
View File

@ -625,11 +625,11 @@
"revisionTime": "2016-08-09T01:42:04Z" "revisionTime": "2016-08-09T01:42:04Z"
}, },
{ {
"checksumSHA1": "vLyudzMEdik8IpRY1H2vRa2PeLU=", "checksumSHA1": "EhESUBqb9Kot4rzZu2l/oAJoYCU=",
"comment": "v0.7.0-66-g6c4672d", "comment": "v0.7.0-66-g6c4672d",
"path": "github.com/hashicorp/serf/serf", "path": "github.com/hashicorp/serf/serf",
"revision": "114430d8210835d66defdc31cdc176c58e060005", "revision": "34e94dbd8faa991710b442c22ad6ad37c8b44c3b",
"revisionTime": "2016-08-09T01:42:04Z" "revisionTime": "2017-02-02T01:56:25Z"
}, },
{ {
"checksumSHA1": "ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=", "checksumSHA1": "ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=",