Merge pull request #2861 from hashicorp/f-update-raft-serf

Update vendored raft and serf
This commit is contained in:
Michael Schurter 2017-07-19 12:33:00 -07:00 committed by GitHub
commit 09b887133d
14 changed files with 430 additions and 123 deletions

View File

@ -384,6 +384,10 @@ func (s *FileSnapshotSink) Close() error {
// Close the open handles
if err := s.finalize(); err != nil {
s.logger.Printf("[ERR] snapshot: Failed to finalize snapshot: %v", err)
if delErr := os.RemoveAll(s.dir); delErr != nil {
s.logger.Printf("[ERR] snapshot: Failed to delete temporary snapshot directory at path %v: %v", s.dir, delErr)
return delErr
}
return err
}

View File

@ -34,10 +34,20 @@ type Client struct {
// value to determine how many samples we keep, per node.
latencyFilterSamples map[string][]float64
// stats is used to record events that occur when updating coordinates.
stats ClientStats
// mutex enables safe concurrent access to the client.
mutex sync.RWMutex
}
// ClientStats is used to record events that occur when updating coordinates.
type ClientStats struct {
// Resets is incremented any time we reset our local coordinate because
// our calculations have resulted in an invalid state.
Resets int
}
// NewClient creates a new Client and verifies the configuration is valid.
func NewClient(config *Config) (*Client, error) {
if !(config.Dimensionality > 0) {
@ -63,11 +73,16 @@ func (c *Client) GetCoordinate() *Coordinate {
}
// SetCoordinate forces the client's coordinate to a known state.
func (c *Client) SetCoordinate(coord *Coordinate) {
func (c *Client) SetCoordinate(coord *Coordinate) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if err := c.checkCoordinate(coord); err != nil {
return err
}
c.coord = coord.Clone()
return nil
}
// ForgetNode removes any client state for the given node.
@ -78,6 +93,29 @@ func (c *Client) ForgetNode(node string) {
delete(c.latencyFilterSamples, node)
}
// Stats returns a copy of stats for the client.
func (c *Client) Stats() ClientStats {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.stats
}
// checkCoordinate returns an error if the coordinate isn't compatible with
// this client, or if the coordinate itself isn't valid. This assumes the mutex
// has been locked already.
func (c *Client) checkCoordinate(coord *Coordinate) error {
if !c.coord.IsCompatibleWith(coord) {
return fmt.Errorf("dimensions aren't compatible")
}
if !coord.IsValid() {
return fmt.Errorf("coordinate is invalid")
}
return nil
}
// latencyFilter applies a simple moving median filter with a new sample for
// a node. This assumes that the mutex has been locked already.
func (c *Client) latencyFilter(node string, rttSeconds float64) float64 {
@ -159,15 +197,24 @@ func (c *Client) updateGravity() {
// Update takes other, a coordinate for another node, and rtt, a round trip
// time observation for a ping to that node, and updates the estimated position of
// the client's coordinate. Returns the updated coordinate.
func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) *Coordinate {
func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) (*Coordinate, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
if err := c.checkCoordinate(other); err != nil {
return nil, err
}
rttSeconds := c.latencyFilter(node, rtt.Seconds())
c.updateVivaldi(other, rttSeconds)
c.updateAdjustment(other, rttSeconds)
c.updateGravity()
return c.coord.Clone()
if !c.coord.IsValid() {
c.stats.Resets++
c.coord = NewCoordinate(c.config)
}
return c.coord.Clone(), nil
}
// DistanceTo returns the estimated RTT from the client's coordinate to other, the

View File

@ -72,6 +72,26 @@ func (c *Coordinate) Clone() *Coordinate {
}
}
// componentIsValid returns false if a floating point value is a NaN or an
// infinity.
func componentIsValid(f float64) bool {
return !math.IsInf(f, 0) && !math.IsNaN(f)
}
// IsValid returns false if any component of a coordinate isn't valid, per the
// componentIsValid() helper above.
func (c *Coordinate) IsValid() bool {
for i := range c.Vec {
if !componentIsValid(c.Vec[i]) {
return false
}
}
return componentIsValid(c.Error) &&
componentIsValid(c.Adjustment) &&
componentIsValid(c.Height)
}
// IsCompatibleWith checks to see if the two coordinates are compatible
// dimensionally. If this returns true then you are guaranteed to not get
// any runtime errors operating on them.
@ -122,7 +142,7 @@ func (c *Coordinate) rawDistanceTo(other *Coordinate) float64 {
// already been checked to be compatible.
func add(vec1 []float64, vec2 []float64) []float64 {
ret := make([]float64, len(vec1))
for i, _ := range ret {
for i := range ret {
ret[i] = vec1[i] + vec2[i]
}
return ret
@ -132,7 +152,7 @@ func add(vec1 []float64, vec2 []float64) []float64 {
// dimensions have already been checked to be compatible.
func diff(vec1 []float64, vec2 []float64) []float64 {
ret := make([]float64, len(vec1))
for i, _ := range ret {
for i := range ret {
ret[i] = vec1[i] - vec2[i]
}
return ret
@ -141,7 +161,7 @@ func diff(vec1 []float64, vec2 []float64) []float64 {
// mul returns vec multiplied by a scalar factor.
func mul(vec []float64, factor float64) []float64 {
ret := make([]float64, len(vec))
for i, _ := range vec {
for i := range vec {
ret[i] = vec[i] * factor
}
return ret
@ -150,7 +170,7 @@ func mul(vec []float64, factor float64) []float64 {
// magnitude computes the magnitude of the vec.
func magnitude(vec []float64) float64 {
sum := 0.0
for i, _ := range vec {
for i := range vec {
sum += vec[i] * vec[i]
}
return math.Sqrt(sum)
@ -168,7 +188,7 @@ func unitVectorAt(vec1 []float64, vec2 []float64) ([]float64, float64) {
}
// Otherwise, just return a random unit vector.
for i, _ := range ret {
for i := range ret {
ret[i] = rand.Float64() - 0.5
}
if mag := magnitude(ret); mag > zeroThreshold {

View File

@ -2,6 +2,7 @@ package serf
import (
"io"
"log"
"os"
"time"
@ -15,6 +16,7 @@ var ProtocolVersionMap map[uint8]uint8
func init() {
ProtocolVersionMap = map[uint8]uint8{
5: 2,
4: 2,
3: 2,
2: 2,
@ -182,6 +184,12 @@ type Config struct {
// logs will go to stderr.
LogOutput io.Writer
// Logger is a custom logger which you provide. If Logger is set, it will use
// this for the internal logger. If Logger is not set, it will fall back to the
// behavior for using LogOutput. You cannot specify both LogOutput and Logger
// at the same time.
Logger *log.Logger
// SnapshotPath if provided is used to snapshot live nodes as well
// as lamport clock values. When Serf is started with a snapshot,
// it will attempt to join all the previously known nodes until one
@ -240,7 +248,7 @@ func DefaultConfig() *Config {
EventBuffer: 512,
QueryBuffer: 512,
LogOutput: os.Stderr,
ProtocolVersion: ProtocolVersionMax,
ProtocolVersion: 4,
ReapInterval: 15 * time.Second,
RecentIntentTimeout: 5 * time.Minute,
ReconnectInterval: 30 * time.Second,

View File

@ -1,9 +1,11 @@
package serf
import (
"bytes"
"fmt"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-msgpack/codec"
)
// delegate is the memberlist.Delegate implementation that Serf uses.
@ -83,6 +85,25 @@ func (d *delegate) NotifyMsg(buf []byte) {
d.serf.logger.Printf("[DEBUG] serf: messageQueryResponseType: %v", resp.From)
d.serf.handleQueryResponse(&resp)
case messageRelayType:
var header relayHeader
var handle codec.MsgpackHandle
reader := bytes.NewReader(buf[1:])
decoder := codec.NewDecoder(reader, &handle)
if err := decoder.Decode(&header); err != nil {
d.serf.logger.Printf("[ERR] serf: Error decoding relay header: %s", err)
break
}
// The remaining contents are the message itself, so forward that
raw := make([]byte, reader.Len())
reader.Read(raw)
d.serf.logger.Printf("[DEBUG] serf: Relaying response to addr: %s", header.DestAddr.String())
if err := d.serf.memberlist.SendTo(&header.DestAddr, raw); err != nil {
d.serf.logger.Printf("[ERR] serf: Error forwarding message to %s: %s", header.DestAddr.String(), err)
break
}
default:
d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t)
}
@ -230,7 +251,8 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
// If we are doing a join, and eventJoinIgnore is set
// then we set the eventMinTime to the EventLTime. This
// prevents any of the incoming events from being processed
if isJoin && d.serf.eventJoinIgnore {
eventJoinIgnore := d.serf.eventJoinIgnore.Load().(bool)
if isJoin && eventJoinIgnore {
d.serf.eventLock.Lock()
if pp.EventLTime > d.serf.eventMinTime {
d.serf.eventMinTime = pp.EventLTime

View File

@ -95,18 +95,19 @@ func (u UserEvent) String() string {
return fmt.Sprintf("user-event: %s", u.Name)
}
// Query is the struct used EventQuery type events
// Query is the struct used by EventQuery type events
type Query struct {
LTime LamportTime
Name string
Payload []byte
serf *Serf
id uint32 // ID is not exported, since it may change
addr []byte // Address to respond to
port uint16 // Port to respond to
deadline time.Time // Must respond by this deadline
respLock sync.Mutex
serf *Serf
id uint32 // ID is not exported, since it may change
addr []byte // Address to respond to
port uint16 // Port to respond to
deadline time.Time // Must respond by this deadline
relayFactor uint8 // Number of duplicate responses to relay back to sender
respLock sync.Mutex
}
func (q *Query) EventType() EventType {
@ -129,12 +130,12 @@ func (q *Query) Respond(buf []byte) error {
// Check if we've already responded
if q.deadline.IsZero() {
return fmt.Errorf("Response already sent")
return fmt.Errorf("response already sent")
}
// Ensure we aren't past our response deadline
if time.Now().After(q.deadline) {
return fmt.Errorf("Response is past the deadline")
return fmt.Errorf("response is past the deadline")
}
// Create response
@ -145,10 +146,10 @@ func (q *Query) Respond(buf []byte) error {
Payload: buf,
}
// Format the response
// Send a direct response
raw, err := encodeMessage(messageQueryResponseType, &resp)
if err != nil {
return fmt.Errorf("Failed to format response: %v", err)
return fmt.Errorf("failed to format response: %v", err)
}
// Check the size limit
@ -156,13 +157,18 @@ func (q *Query) Respond(buf []byte) error {
return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
}
// Send the response
// Send the response directly to the originator
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
if err := q.serf.memberlist.SendTo(&addr, raw); err != nil {
return err
}
// Clera the deadline, response sent
// Relay the response through up to relayFactor other nodes
if err := q.serf.relayResponse(q.relayFactor, addr, &resp); err != nil {
return err
}
// Clear the deadline, responses sent
q.deadline = time.Time{}
return nil
}

View File

@ -192,10 +192,12 @@ func (s *serfQueries) handleInstallKey(q *Query) {
goto SEND
}
if err := s.serf.writeKeyringFile(); err != nil {
response.Message = err.Error()
s.logger.Printf("[ERR] serf: Failed to write keyring file: %s", err)
goto SEND
if s.serf.config.KeyringFile != "" {
if err := s.serf.writeKeyringFile(); err != nil {
response.Message = err.Error()
s.logger.Printf("[ERR] serf: Failed to write keyring file: %s", err)
goto SEND
}
}
response.Result = true

View File

@ -33,6 +33,13 @@ type KeyResponse struct {
Keys map[string]int
}
// KeyRequestOptions is used to contain optional parameters for a keyring operation
type KeyRequestOptions struct {
// RelayFactor is the number of duplicate query responses to send by relaying through
// other nodes, for redundancy
RelayFactor uint8
}
// streamKeyResp takes care of reading responses from a channel and composing
// them into a KeyResponse. It will update a KeyResponse *in place* and
// therefore has nothing to return.
@ -83,7 +90,7 @@ func (k *KeyManager) streamKeyResp(resp *KeyResponse, ch <-chan NodeResponse) {
// handleKeyRequest performs query broadcasting to all members for any type of
// key operation and manages gathering responses and packing them up into a
// KeyResponse for uniform response handling.
func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
func (k *KeyManager) handleKeyRequest(key, query string, opts *KeyRequestOptions) (*KeyResponse, error) {
resp := &KeyResponse{
Messages: make(map[string]string),
Keys: make(map[string]int),
@ -103,6 +110,9 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
}
qParam := k.serf.DefaultQueryParams()
if opts != nil {
qParam.RelayFactor = opts.RelayFactor
}
queryResp, err := k.serf.Query(qName, req, qParam)
if err != nil {
return resp, err
@ -127,30 +137,42 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
// responses from each of them, returning a list of messages from each node
// and any applicable error conditions.
func (k *KeyManager) InstallKey(key string) (*KeyResponse, error) {
return k.InstallKeyWithOptions(key, nil)
}
func (k *KeyManager) InstallKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
k.l.Lock()
defer k.l.Unlock()
return k.handleKeyRequest(key, installKeyQuery)
return k.handleKeyRequest(key, installKeyQuery, opts)
}
// UseKey handles broadcasting a primary key change to all members in the
// cluster, and gathering any response messages. If successful, there should
// be an empty KeyResponse returned.
func (k *KeyManager) UseKey(key string) (*KeyResponse, error) {
return k.UseKeyWithOptions(key, nil)
}
func (k *KeyManager) UseKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
k.l.Lock()
defer k.l.Unlock()
return k.handleKeyRequest(key, useKeyQuery)
return k.handleKeyRequest(key, useKeyQuery, opts)
}
// RemoveKey handles broadcasting a key to the cluster for removal. Each member
// will receive this event, and if they have the key in their keyring, remove
// it. If any errors are encountered, RemoveKey will collect and relay them.
func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) {
return k.RemoveKeyWithOptions(key, nil)
}
func (k *KeyManager) RemoveKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
k.l.Lock()
defer k.l.Unlock()
return k.handleKeyRequest(key, removeKeyQuery)
return k.handleKeyRequest(key, removeKeyQuery, opts)
}
// ListKeys is used to collect installed keys from members in a Serf cluster
@ -159,8 +181,12 @@ func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) {
// Since having multiple keys installed can cause performance penalties in some
// cases, it's important to verify this information and remove unneeded keys.
func (k *KeyManager) ListKeys() (*KeyResponse, error) {
return k.ListKeysWithOptions(nil)
}
func (k *KeyManager) ListKeysWithOptions(opts *KeyRequestOptions) (*KeyResponse, error) {
k.l.RLock()
defer k.l.RUnlock()
return k.handleKeyRequest("", listKeysQuery)
}
return k.handleKeyRequest("", listKeysQuery, opts)
}

View File

@ -2,8 +2,10 @@ package serf
import (
"bytes"
"github.com/hashicorp/go-msgpack/codec"
"net"
"time"
"github.com/hashicorp/go-msgpack/codec"
)
// messageType are the types of gossip messages Serf will send along
@ -20,6 +22,7 @@ const (
messageConflictResponseType
messageKeyRequestType
messageKeyResponseType
messageRelayType
)
const (
@ -75,15 +78,16 @@ type messageUserEvent struct {
// messageQuery is used for query events
type messageQuery struct {
LTime LamportTime // Event lamport time
ID uint32 // Query ID, randomly generated
Addr []byte // Source address, used for a direct reply
Port uint16 // Source port, used for a direct reply
Filters [][]byte // Potential query filters
Flags uint32 // Used to provide various flags
Timeout time.Duration // Maximum time between delivery and response
Name string // Query name
Payload []byte // Query payload
LTime LamportTime // Event lamport time
ID uint32 // Query ID, randomly generated
Addr []byte // Source address, used for a direct reply
Port uint16 // Source port, used for a direct reply
Filters [][]byte // Potential query filters
Flags uint32 // Used to provide various flags
RelayFactor uint8 // Used to set the number of duplicate relayed responses
Timeout time.Duration // Maximum time between delivery and response
Name string // Query name
Payload []byte // Query payload
}
// Ack checks if the ack flag is set
@ -136,6 +140,28 @@ func encodeMessage(t messageType, msg interface{}) ([]byte, error) {
return buf.Bytes(), err
}
// relayHeader is used to store the end destination of a relayed message
type relayHeader struct {
DestAddr net.UDPAddr
}
// encodeRelayMessage wraps a message in the messageRelayType, adding the length and
// address of the end recipient to the front of the message
func encodeRelayMessage(t messageType, addr net.UDPAddr, msg interface{}) ([]byte, error) {
buf := bytes.NewBuffer(nil)
handle := codec.MsgpackHandle{}
encoder := codec.NewEncoder(buf, &handle)
buf.WriteByte(uint8(messageRelayType))
if err := encoder.Encode(relayHeader{DestAddr: addr}); err != nil {
return nil, err
}
buf.WriteByte(uint8(t))
err := encoder.Encode(msg)
return buf.Bytes(), err
}
func encodeFilter(f filterType, filt interface{}) ([]byte, error) {
buf := bytes.NewBuffer(nil)
buf.WriteByte(uint8(f))

View File

@ -2,7 +2,6 @@ package serf
import (
"bytes"
"log"
"time"
"github.com/armon/go-metrics"
@ -37,7 +36,7 @@ func (p *pingDelegate) AckPayload() []byte {
// The rest of the message is the serialized coordinate.
enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := enc.Encode(p.serf.coordClient.GetCoordinate()); err != nil {
log.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err)
p.serf.logger.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err)
}
return buf.Bytes()
}
@ -52,7 +51,7 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat
// Verify ping version in the header.
version := payload[0]
if version != PingVersion {
log.Printf("[ERR] serf: Unsupported ping version: %v", version)
p.serf.logger.Printf("[ERR] serf: Unsupported ping version: %v", version)
return
}
@ -61,29 +60,30 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat
dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
var coord coordinate.Coordinate
if err := dec.Decode(&coord); err != nil {
log.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err)
p.serf.logger.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err)
return
}
// Apply the update. Since this is a coordinate coming from some place
// else we harden this and look for dimensionality problems proactively.
// Apply the update.
before := p.serf.coordClient.GetCoordinate()
if before.IsCompatibleWith(&coord) {
after := p.serf.coordClient.Update(other.Name, &coord, rtt)
// Publish some metrics to give us an idea of how much we are
// adjusting each time we update.
d := float32(before.DistanceTo(after).Seconds() * 1.0e3)
metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d)
// Cache the coordinate for the other node, and add our own
// to the cache as well since it just got updated. This lets
// users call GetCachedCoordinate with our node name, which is
// more friendly.
p.serf.coordCacheLock.Lock()
p.serf.coordCache[other.Name] = &coord
p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate()
p.serf.coordCacheLock.Unlock()
} else {
log.Printf("[ERR] serf: Rejected bad coordinate: %v\n", coord)
after, err := p.serf.coordClient.Update(other.Name, &coord, rtt)
if err != nil {
p.serf.logger.Printf("[ERR] serf: Rejected coordinate from %s: %v\n",
other.Name, err)
return
}
// Publish some metrics to give us an idea of how much we are
// adjusting each time we update.
d := float32(before.DistanceTo(after).Seconds() * 1.0e3)
metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d)
// Cache the coordinate for the other node, and add our own
// to the cache as well since it just got updated. This lets
// users call GetCachedCoordinate with our node name, which is
// more friendly.
p.serf.coordCacheLock.Lock()
p.serf.coordCache[other.Name] = &coord
p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate()
p.serf.coordCacheLock.Unlock()
}

View File

@ -1,7 +1,11 @@
package serf
import (
"errors"
"fmt"
"math"
"math/rand"
"net"
"regexp"
"sync"
"time"
@ -24,6 +28,10 @@ type QueryParam struct {
// send an ack.
RequestAck bool
// RelayFactor controls the number of duplicate responses to relay
// back to the sender through other nodes for redundancy.
RelayFactor uint8
// The timeout limits how long the query is left open. If not provided,
// then a default timeout is used based on the configuration of Serf
Timeout time.Duration
@ -93,6 +101,10 @@ type QueryResponse struct {
// respCh is used to send a response from a node
respCh chan NodeResponse
// acks/responses are used to track the nodes that have sent an ack/response
acks map[string]struct{}
responses map[string]struct{}
closed bool
closeLock sync.Mutex
}
@ -100,13 +112,15 @@ type QueryResponse struct {
// newQueryResponse is used to construct a new query response
func newQueryResponse(n int, q *messageQuery) *QueryResponse {
resp := &QueryResponse{
deadline: time.Now().Add(q.Timeout),
id: q.ID,
lTime: q.LTime,
respCh: make(chan NodeResponse, n),
deadline: time.Now().Add(q.Timeout),
id: q.ID,
lTime: q.LTime,
respCh: make(chan NodeResponse, n),
responses: make(map[string]struct{}),
}
if q.Ack() {
resp.ackCh = make(chan string, n)
resp.acks = make(map[string]struct{})
}
return resp
}
@ -135,6 +149,8 @@ func (r *QueryResponse) Deadline() time.Time {
// Finished returns if the query is finished running
func (r *QueryResponse) Finished() bool {
r.closeLock.Lock()
defer r.closeLock.Unlock()
return r.closed || time.Now().After(r.deadline)
}
@ -151,6 +167,22 @@ func (r *QueryResponse) ResponseCh() <-chan NodeResponse {
return r.respCh
}
// sendResponse sends a response on the response channel ensuring the channel is not closed.
func (r *QueryResponse) sendResponse(nr NodeResponse) error {
r.closeLock.Lock()
defer r.closeLock.Unlock()
if r.closed {
return nil
}
select {
case r.respCh <- nr:
r.responses[nr.From] = struct{}{}
default:
return errors.New("serf: Failed to deliver query response, dropping")
}
return nil
}
// NodeResponse is used to represent a single response from a node
type NodeResponse struct {
From string
@ -208,3 +240,74 @@ func (s *Serf) shouldProcessQuery(filters [][]byte) bool {
}
return true
}
// relayResponse will relay a copy of the given response to up to relayFactor
// other members.
func (s *Serf) relayResponse(relayFactor uint8, addr net.UDPAddr, resp *messageQueryResponse) error {
if relayFactor == 0 {
return nil
}
// Needs to be worth it; we need to have at least relayFactor *other*
// nodes. If you have a tiny cluster then the relayFactor shouldn't
// be needed.
members := s.Members()
if len(members) < int(relayFactor)+1 {
return nil
}
// Prep the relay message, which is a wrapped version of the original.
raw, err := encodeRelayMessage(messageQueryResponseType, addr, &resp)
if err != nil {
return fmt.Errorf("failed to format relayed response: %v", err)
}
if len(raw) > s.config.QueryResponseSizeLimit {
return fmt.Errorf("relayed response exceeds limit of %d bytes", s.config.QueryResponseSizeLimit)
}
// Relay to a random set of peers.
localName := s.LocalMember().Name
relayMembers := kRandomMembers(int(relayFactor), members, func(m Member) bool {
return m.Status != StatusAlive || m.ProtocolMax < 5 || m.Name == localName
})
for _, m := range relayMembers {
relayAddr := net.UDPAddr{IP: m.Addr, Port: int(m.Port)}
if err := s.memberlist.SendTo(&relayAddr, raw); err != nil {
return fmt.Errorf("failed to send relay response: %v", err)
}
}
return nil
}
// kRandomMembers selects up to k members from a given list, optionally
// filtering by the given filterFunc
func kRandomMembers(k int, members []Member, filterFunc func(Member) bool) []Member {
n := len(members)
kMembers := make([]Member, 0, k)
OUTER:
// Probe up to 3*n times, with large n this is not necessary
// since k << n, but with small n we want search to be
// exhaustive
for i := 0; i < 3*n && len(kMembers) < k; i++ {
// Get random member
idx := rand.Intn(n)
member := members[idx]
// Give the filter a shot at it.
if filterFunc != nil && filterFunc(member) {
continue OUTER
}
// Check if we have this member already
for j := 0; j < len(kMembers); j++ {
if member.Name == kMembers[j].Name {
continue OUTER
}
}
// Append the member
kMembers = append(kMembers, member)
}
return kMembers
}

View File

@ -10,8 +10,10 @@ import (
"log"
"math/rand"
"net"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
@ -25,7 +27,7 @@ import (
// version to memberlist below.
const (
ProtocolVersionMin uint8 = 2
ProtocolVersionMax = 4
ProtocolVersionMax = 5
)
const (
@ -73,7 +75,7 @@ type Serf struct {
eventBroadcasts *memberlist.TransmitLimitedQueue
eventBuffer []*userEvents
eventJoinIgnore bool
eventJoinIgnore atomic.Value
eventMinTime LamportTime
eventLock sync.RWMutex
@ -240,14 +242,24 @@ func Create(conf *Config) (*Serf, error) {
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
}
logger := conf.Logger
if logger == nil {
logOutput := conf.LogOutput
if logOutput == nil {
logOutput = os.Stderr
}
logger = log.New(logOutput, "", log.LstdFlags)
}
serf := &Serf{
config: conf,
logger: log.New(conf.LogOutput, "", log.LstdFlags),
logger: logger,
members: make(map[string]*memberState),
queryResponse: make(map[LamportTime]*QueryResponse),
shutdownCh: make(chan struct{}),
state: SerfAlive,
}
serf.eventJoinIgnore.Store(false)
// Check that the meta data length is okay
if len(serf.encodeTags(conf.Tags)) > memberlist.MetaMaxSize {
@ -328,21 +340,15 @@ func Create(conf *Config) (*Serf, error) {
// Setup the various broadcast queues, which we use to send our own
// custom broadcasts along the gossip channel.
serf.broadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(serf.members)
},
NumNodes: serf.NumNodes,
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
}
serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(serf.members)
},
NumNodes: serf.NumNodes,
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
}
serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(serf.members)
},
NumNodes: serf.NumNodes,
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
}
@ -499,15 +505,16 @@ func (s *Serf) Query(name string, payload []byte, params *QueryParam) (*QueryRes
// Create a message
q := messageQuery{
LTime: s.queryClock.Time(),
ID: uint32(rand.Int31()),
Addr: local.Addr,
Port: local.Port,
Filters: filters,
Flags: flags,
Timeout: params.Timeout,
Name: name,
Payload: payload,
LTime: s.queryClock.Time(),
ID: uint32(rand.Int31()),
Addr: local.Addr,
Port: local.Port,
Filters: filters,
Flags: flags,
RelayFactor: params.RelayFactor,
Timeout: params.Timeout,
Name: name,
Payload: payload,
}
// Encode the query
@ -588,9 +595,9 @@ func (s *Serf) Join(existing []string, ignoreOld bool) (int, error) {
// Ignore any events from a potential join. This is safe since we hold
// the joinLock and nobody else can be doing a Join
if ignoreOld {
s.eventJoinIgnore = true
s.eventJoinIgnore.Store(true)
defer func() {
s.eventJoinIgnore = false
s.eventJoinIgnore.Store(false)
}()
}
@ -791,13 +798,15 @@ func (s *Serf) Shutdown() error {
s.logger.Printf("[WARN] serf: Shutdown without a Leave")
}
// Wait to close the shutdown channel until after we've shut down the
// memberlist and its associated network resources, since the shutdown
// channel signals that we are cleaned up outside of Serf.
s.state = SerfShutdown
close(s.shutdownCh)
err := s.memberlist.Shutdown()
if err != nil {
return err
}
close(s.shutdownCh)
// Wait for the snapshoter to finish if we have one
if s.snapshotter != nil {
@ -1237,19 +1246,23 @@ func (s *Serf) handleQuery(query *messageQuery) bool {
if err := s.memberlist.SendTo(&addr, raw); err != nil {
s.logger.Printf("[ERR] serf: failed to send ack: %v", err)
}
if err := s.relayResponse(query.RelayFactor, addr, &ack); err != nil {
s.logger.Printf("[ERR] serf: failed to relay ack: %v", err)
}
}
}
if s.config.EventCh != nil {
s.config.EventCh <- &Query{
LTime: query.LTime,
Name: query.Name,
Payload: query.Payload,
serf: s,
id: query.ID,
addr: query.Addr,
port: query.Port,
deadline: time.Now().Add(query.Timeout),
LTime: query.LTime,
Name: query.Name,
Payload: query.Payload,
serf: s,
id: query.ID,
addr: query.Addr,
port: query.Port,
deadline: time.Now().Add(query.Timeout),
relayFactor: query.RelayFactor,
}
}
return rebroadcast
@ -1282,18 +1295,30 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
// Process each type of response
if resp.Ack() {
// Exit early if this is a duplicate ack
if _, ok := query.acks[resp.From]; ok {
metrics.IncrCounter([]string{"serf", "query_duplicate_acks"}, 1)
return
}
metrics.IncrCounter([]string{"serf", "query_acks"}, 1)
select {
case query.ackCh <- resp.From:
query.acks[resp.From] = struct{}{}
default:
s.logger.Printf("[WARN] serf: Failed to delivery query ack, dropping")
s.logger.Printf("[WARN] serf: Failed to deliver query ack, dropping")
}
} else {
// Exit early if this is a duplicate response
if _, ok := query.responses[resp.From]; ok {
metrics.IncrCounter([]string{"serf", "query_duplicate_responses"}, 1)
return
}
metrics.IncrCounter([]string{"serf", "query_responses"}, 1)
select {
case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}:
default:
s.logger.Printf("[WARN] serf: Failed to delivery query response, dropping")
err := query.sendResponse(NodeResponse{From: resp.From, Payload: resp.Payload})
if err != nil {
s.logger.Printf("[WARN] %v", err)
}
}
}
@ -1353,7 +1378,7 @@ func (s *Serf) resolveNodeConflict() {
// Update the counters
responses++
if bytes.Equal(member.Addr, local.Addr) && member.Port == local.Port {
if member.Addr.Equal(local.Addr) && member.Port == local.Port {
matching++
}
}
@ -1642,6 +1667,9 @@ func (s *Serf) Stats() map[string]string {
"query_queue": toString(uint64(s.queryBroadcasts.NumQueued())),
"encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()),
}
if !s.config.DisableCoordinates {
stats["coordinate_resets"] = toString(uint64(s.coordClient.Stats().Resets))
}
return stats
}

View File

@ -31,6 +31,7 @@ const flushInterval = 500 * time.Millisecond
const clockUpdateInterval = 500 * time.Millisecond
const coordinateUpdateInterval = 60 * time.Second
const tmpExt = ".compact"
const snapshotErrorRecoveryInterval = 30 * time.Second
// Snapshotter is responsible for ingesting events and persisting
// them to disk, and providing a recovery mechanism at start time.
@ -55,6 +56,7 @@ type Snapshotter struct {
rejoinAfterLeave bool
shutdownCh <-chan struct{}
waitCh chan struct{}
lastAttemptedCompaction time.Time
}
// PreviousNode is used to represent the previously known alive nodes
@ -84,7 +86,7 @@ func NewSnapshotter(path string,
inCh := make(chan Event, 1024)
// Try to open the file
fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644)
if err != nil {
return nil, nil, fmt.Errorf("failed to open snapshot: %v", err)
}
@ -311,6 +313,17 @@ func (s *Snapshotter) processQuery(q *Query) {
func (s *Snapshotter) tryAppend(l string) {
if err := s.appendLine(l); err != nil {
s.logger.Printf("[ERR] serf: Failed to update snapshot: %v", err)
now := time.Now()
if now.Sub(s.lastAttemptedCompaction) > snapshotErrorRecoveryInterval {
s.lastAttemptedCompaction = now
s.logger.Printf("[INFO] serf: Attempting compaction to recover from error...")
err = s.compact()
if err != nil {
s.logger.Printf("[ERR] serf: Compaction failed, will reattempt after %v: %v", snapshotErrorRecoveryInterval, err)
} else {
s.logger.Printf("[INFO] serf: Finished compaction, successfully recovered from error state")
}
}
}
}
@ -532,7 +545,10 @@ func (s *Snapshotter) replay() error {
s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err)
continue
}
s.coordClient.SetCoordinate(&coord)
if err := s.coordClient.SetCoordinate(&coord); err != nil {
s.logger.Printf("[WARN] serf: Failed to set coordinate: %v", err)
continue
}
} else if line == "leave" {
// Ignore a leave if we plan on re-joining
if s.rejoinAfterLeave {

21
vendor/vendor.json vendored
View File

@ -851,10 +851,10 @@
"revision": "a14192a58a694c123d8fe5481d4a4727d6ae82f3"
},
{
"checksumSHA1": "bYn+HDmt7YLFvEV6DagMup8mkZE=",
"checksumSHA1": "OCPP4JxnuSSmweEL9khCd6OdIts=",
"path": "github.com/hashicorp/raft",
"revision": "e5e581e04af7c46974b99195347cc0c380c0d841",
"revisionTime": "2017-06-09T23:09:26Z"
"revision": "e45173826775c4b782961c7b5758ba484b91464b",
"revisionTime": "2017-07-10T17:20:01Z"
},
{
"checksumSHA1": "QAxukkv54/iIvLfsUP6IK4R0m/A=",
@ -875,18 +875,17 @@
"revisionTime": "2016-06-01T22:40:23Z"
},
{
"checksumSHA1": "E3Xcanc9ouQwL+CZGOUyA/+giLg=",
"comment": "v0.7.0-18-gc4c55f1",
"checksumSHA1": "/oss17GO4hXGM7QnUdI3VzcAHzA=",
"path": "github.com/hashicorp/serf/coordinate",
"revision": "b9642a47e6139e50548b6f14588a1a3c0839660a",
"revisionTime": "2016-09-14T16:26:25Z"
"revision": "bbeddf0b3ab3072a60525afbd6b6f47d33839eee",
"revisionTime": "2017-07-14T18:26:01Z"
},
{
"checksumSHA1": "vLyudzMEdik8IpRY1H2vRa2PeLU=",
"comment": "v0.7.0-18-gc4c55f1",
"checksumSHA1": "pvLOzocYsZtxuJ9pREHRTxYnoa4=",
"origin": "github.com/hashicorp/nomad/vendor/github.com/hashicorp/serf/serf",
"path": "github.com/hashicorp/serf/serf",
"revision": "b9642a47e6139e50548b6f14588a1a3c0839660a",
"revisionTime": "2016-09-14T16:26:25Z"
"revision": "bbeddf0b3ab3072a60525afbd6b6f47d33839eee",
"revisionTime": "2017-07-14T18:26:01Z"
},
{
"checksumSHA1": "eGzvBRMFD6ZB3A6uO750np7Om/E=",