Updates to latest Serf/memberlist to get lifeguard and TCP joins over DNS.
This commit is contained in:
parent
c3d86c07bd
commit
1776316053
|
@ -206,7 +206,7 @@
|
|||
},
|
||||
{
|
||||
"ImportPath": "github.com/hashicorp/memberlist",
|
||||
"Rev": "cef12ad58224d55cf26caa9e3d239c2fcb3432a2"
|
||||
"Rev": "215aec831f03c9b7c61ac183d3e28fff3c7d3a37"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/hashicorp/net-rpc-msgpackrpc",
|
||||
|
@ -226,13 +226,13 @@
|
|||
},
|
||||
{
|
||||
"ImportPath": "github.com/hashicorp/serf/coordinate",
|
||||
"Comment": "v0.7.0-62-gb60a6d9",
|
||||
"Rev": "b60a6d928fe726a588f79a1d500582507f9d79de"
|
||||
"Comment": "v0.7.0-64-gdce30f1",
|
||||
"Rev": "dce30f1c7806bf2d96478abb983c53af0e4c8fb2"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/hashicorp/serf/serf",
|
||||
"Comment": "v0.7.0-62-gb60a6d9",
|
||||
"Rev": "b60a6d928fe726a588f79a1d500582507f9d79de"
|
||||
"Comment": "v0.7.0-64-gdce30f1",
|
||||
"Rev": "dce30f1c7806bf2d96478abb983c53af0e4c8fb2"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/hashicorp/yamux",
|
||||
|
|
|
@ -82,7 +82,7 @@ least one existing member in order to join the cluster. The new member
|
|||
does a full state sync with the existing member over TCP and begins gossiping its
|
||||
existence to the cluster.
|
||||
|
||||
Gossip is done over UDP to a with a configurable but fixed fanout and interval.
|
||||
Gossip is done over UDP with a configurable but fixed fanout and interval.
|
||||
This ensures that network usage is constant with regards to number of nodes, as opposed to
|
||||
exponential growth that can occur with traditional heartbeat mechanisms.
|
||||
Complete state exchanges with a random node are done periodically over
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
package memberlist
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
)
|
||||
|
||||
// awareness manages a simple metric for tracking the estimated health of the
|
||||
// local node. Health is primary the node's ability to respond in the soft
|
||||
// real-time manner required for correct health checking of other nodes in the
|
||||
// cluster.
|
||||
type awareness struct {
|
||||
sync.RWMutex
|
||||
|
||||
// max is the upper threshold for the timeout scale (the score will be
|
||||
// constrained to be from 0 <= score < max).
|
||||
max int
|
||||
|
||||
// score is the current awareness score. Lower values are healthier and
|
||||
// zero is the minimum value.
|
||||
score int
|
||||
}
|
||||
|
||||
// newAwareness returns a new awareness object.
|
||||
func newAwareness(max int) *awareness {
|
||||
return &awareness{
|
||||
max: max,
|
||||
score: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// ApplyDelta takes the given delta and applies it to the score in a thread-safe
|
||||
// manner. It also enforces a floor of zero and a max of max, so deltas may not
|
||||
// change the overall score if it's railed at one of the extremes.
|
||||
func (a *awareness) ApplyDelta(delta int) {
|
||||
a.Lock()
|
||||
initial := a.score
|
||||
a.score += delta
|
||||
if a.score < 0 {
|
||||
a.score = 0
|
||||
} else if a.score > (a.max - 1) {
|
||||
a.score = (a.max - 1)
|
||||
}
|
||||
final := a.score
|
||||
a.Unlock()
|
||||
|
||||
if initial != final {
|
||||
metrics.SetGauge([]string{"memberlist", "health", "score"}, float32(final))
|
||||
}
|
||||
}
|
||||
|
||||
// GetHealthScore returns the raw health score.
|
||||
func (a *awareness) GetHealthScore() int {
|
||||
a.RLock()
|
||||
score := a.score
|
||||
a.RUnlock()
|
||||
return score
|
||||
}
|
||||
|
||||
// ScaleTimeout takes the given duration and scales it based on the current
|
||||
// score. Less healthyness will lead to longer timeouts.
|
||||
func (a *awareness) ScaleTimeout(timeout time.Duration) time.Duration {
|
||||
a.RLock()
|
||||
score := a.score
|
||||
a.RUnlock()
|
||||
return timeout * (time.Duration(score) + 1)
|
||||
}
|
|
@ -63,6 +63,23 @@ type Config struct {
|
|||
// still alive.
|
||||
SuspicionMult int
|
||||
|
||||
// SuspicionMaxTimeoutMult is the multiplier applied to the
|
||||
// SuspicionTimeout used as an upper bound on detection time. This max
|
||||
// timeout is calculated using the formula:
|
||||
//
|
||||
// SuspicionMaxTimeout = SuspicionMaxTimeoutMult * SuspicionTimeout
|
||||
//
|
||||
// If everything is working properly, confirmations from other nodes will
|
||||
// accelerate suspicion timers in a manner which will cause the timeout
|
||||
// to reach the base SuspicionTimeout before that elapses, so this value
|
||||
// will typically only come into play if a node is experiencing issues
|
||||
// communicating with other nodes. It should be set to a something fairly
|
||||
// large so that a node having problems will have a lot of chances to
|
||||
// recover before falsely declaring other nodes as failed, but short
|
||||
// enough for a legitimately isolated node to still make progress marking
|
||||
// nodes failed in a reasonable amount of time.
|
||||
SuspicionMaxTimeoutMult int
|
||||
|
||||
// PushPullInterval is the interval between complete state syncs.
|
||||
// Complete state syncs are done with a single node over TCP and are
|
||||
// quite expensive relative to standard gossiped messages. Setting this
|
||||
|
@ -91,6 +108,11 @@ type Config struct {
|
|||
// indirect UDP pings.
|
||||
DisableTcpPings bool
|
||||
|
||||
// AwarenessMaxMultiplier will increase the probe interval if the node
|
||||
// becomes aware that it might be degraded and not meeting the soft real
|
||||
// time requirements to reliably probe other nodes.
|
||||
AwarenessMaxMultiplier int
|
||||
|
||||
// GossipInterval and GossipNodes are used to configure the gossip
|
||||
// behavior of memberlist.
|
||||
//
|
||||
|
@ -143,6 +165,10 @@ type Config struct {
|
|||
Ping PingDelegate
|
||||
Alive AliveDelegate
|
||||
|
||||
// DNSConfigPath points to the system's DNS config file, usually located
|
||||
// at /etc/resolv.conf. It can be overridden via config for easier testing.
|
||||
DNSConfigPath string
|
||||
|
||||
// LogOutput is the writer where logs should be sent. If this is not
|
||||
// set, logging will go to stderr by default. You cannot specify both LogOutput
|
||||
// and Logger at the same time.
|
||||
|
@ -174,10 +200,12 @@ func DefaultLANConfig() *Config {
|
|||
IndirectChecks: 3, // Use 3 nodes for the indirect ping
|
||||
RetransmitMult: 4, // Retransmit a message 4 * log(N+1) nodes
|
||||
SuspicionMult: 5, // Suspect a node for 5 * log(N+1) * Interval
|
||||
SuspicionMaxTimeoutMult: 6, // For 10k nodes this will give a max timeout of 120 seconds
|
||||
PushPullInterval: 30 * time.Second, // Low frequency
|
||||
ProbeTimeout: 500 * time.Millisecond, // Reasonable RTT time for LAN
|
||||
ProbeInterval: 1 * time.Second, // Failure check every second
|
||||
DisableTcpPings: false, // TCP pings are safe, even with mixed versions
|
||||
AwarenessMaxMultiplier: 8, // Probe interval backs off to 8 seconds
|
||||
|
||||
GossipNodes: 3, // Gossip to 3 nodes
|
||||
GossipInterval: 200 * time.Millisecond, // Gossip more rapidly
|
||||
|
@ -185,8 +213,9 @@ func DefaultLANConfig() *Config {
|
|||
EnableCompression: true, // Enable compression by default
|
||||
|
||||
SecretKey: nil,
|
||||
|
||||
Keyring: nil,
|
||||
|
||||
DNSConfigPath: "/etc/resolv.conf",
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,8 @@ type Delegate interface {
|
|||
// It can return a list of buffers to send. Each buffer should assume an
|
||||
// overhead as provided with a limit on the total byte size allowed.
|
||||
// The total byte size of the resulting data to send must not exceed
|
||||
// the limit.
|
||||
// the limit. Care should be taken that this method does not block,
|
||||
// since doing so would block the entire UDP packet receive loop.
|
||||
GetBroadcasts(overhead, limit int) [][]byte
|
||||
|
||||
// LocalState is used for a TCP Push/Pull. This is sent to
|
||||
|
|
|
@ -20,8 +20,12 @@ import (
|
|||
"net"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/miekg/dns"
|
||||
)
|
||||
|
||||
type Memberlist struct {
|
||||
|
@ -42,6 +46,8 @@ type Memberlist struct {
|
|||
nodeLock sync.RWMutex
|
||||
nodes []*nodeState // Known nodes
|
||||
nodeMap map[string]*nodeState // Maps Addr.String() -> NodeState
|
||||
nodeTimers map[string]*suspicion // Maps Addr.String() -> suspicion timer
|
||||
awareness *awareness
|
||||
|
||||
tickerLock sync.Mutex
|
||||
tickers []*time.Ticker
|
||||
|
@ -57,7 +63,7 @@ type Memberlist struct {
|
|||
}
|
||||
|
||||
// newMemberlist creates the network listeners.
|
||||
// Does not schedule execution of background maintenence.
|
||||
// Does not schedule execution of background maintenance.
|
||||
func newMemberlist(conf *Config) (*Memberlist, error) {
|
||||
if conf.ProtocolVersion < ProtocolVersionMin {
|
||||
return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
|
||||
|
@ -125,6 +131,8 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
|
|||
tcpListener: tcpLn,
|
||||
handoff: make(chan msgHandoff, 1024),
|
||||
nodeMap: make(map[string]*nodeState),
|
||||
nodeTimers: make(map[string]*suspicion),
|
||||
awareness: newAwareness(conf.AwarenessMaxMultiplier),
|
||||
ackHandlers: make(map[uint32]*ackHandler),
|
||||
broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
|
||||
logger: logger,
|
||||
|
@ -166,72 +174,152 @@ func Create(conf *Config) (*Memberlist, error) {
|
|||
// none could be reached. If an error is returned, the node did not successfully
|
||||
// join the cluster.
|
||||
func (m *Memberlist) Join(existing []string) (int, error) {
|
||||
// Attempt to join any of them
|
||||
numSuccess := 0
|
||||
var retErr error
|
||||
var errs error
|
||||
for _, exist := range existing {
|
||||
addrs, port, err := m.resolveAddr(exist)
|
||||
addrs, err := m.resolveAddr(exist)
|
||||
if err != nil {
|
||||
m.logger.Printf("[WARN] memberlist: Failed to resolve %s: %v", exist, err)
|
||||
retErr = err
|
||||
err = fmt.Errorf("Failed to resolve %s: %v", exist, err)
|
||||
errs = multierror.Append(errs, err)
|
||||
m.logger.Printf("[WARN] memberlist: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, addr := range addrs {
|
||||
if err := m.pushPullNode(addr, port, true); err != nil {
|
||||
retErr = err
|
||||
if err := m.pushPullNode(addr.ip, addr.port, true); err != nil {
|
||||
err = fmt.Errorf("Failed to join %s: %v", addr.ip, err)
|
||||
errs = multierror.Append(errs, err)
|
||||
m.logger.Printf("[DEBUG] memberlist: %v", err)
|
||||
continue
|
||||
}
|
||||
numSuccess++
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if numSuccess > 0 {
|
||||
retErr = nil
|
||||
errs = nil
|
||||
}
|
||||
return numSuccess, errs
|
||||
}
|
||||
|
||||
// ipPort holds information about a node we want to try to join.
|
||||
type ipPort struct {
|
||||
ip net.IP
|
||||
port uint16
|
||||
}
|
||||
|
||||
// tcpLookupIP is a helper to initiate a TCP-based DNS lookup for the given host.
|
||||
// The built-in Go resolver will do a UDP lookup first, and will only use TCP if
|
||||
// the response has the truncate bit set, which isn't common on DNS servers like
|
||||
// Consul's. By doing the TCP lookup directly, we get the best chance for the
|
||||
// largest list of hosts to join. Since joins are relatively rare events, it's ok
|
||||
// to do this rather expensive operation.
|
||||
func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, error) {
|
||||
// Don't attempt any TCP lookups against non-fully qualified domain
|
||||
// names, since those will likely come from the resolv.conf file.
|
||||
if !strings.Contains(host, ".") {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return numSuccess, retErr
|
||||
// Make sure the domain name is terminated with a dot (we know there's
|
||||
// at least one character at this point).
|
||||
dn := host
|
||||
if dn[len(dn)-1] != '.' {
|
||||
dn = dn + "."
|
||||
}
|
||||
|
||||
// See if we can find a server to try.
|
||||
cc, err := dns.ClientConfigFromFile(m.config.DNSConfigPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(cc.Servers) > 0 {
|
||||
// We support host:port in the DNS config, but need to add the
|
||||
// default port if one is not supplied.
|
||||
server := cc.Servers[0]
|
||||
if !hasPort(server) {
|
||||
server = net.JoinHostPort(server, cc.Port)
|
||||
}
|
||||
|
||||
// Do the lookup.
|
||||
c := new(dns.Client)
|
||||
c.Net = "tcp"
|
||||
msg := new(dns.Msg)
|
||||
msg.SetQuestion(dn, dns.TypeANY)
|
||||
in, _, err := c.Exchange(msg, server)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Handle any IPs we get back that we can attempt to join.
|
||||
var ips []ipPort
|
||||
for _, r := range in.Answer {
|
||||
switch rr := r.(type) {
|
||||
case (*dns.A):
|
||||
ips = append(ips, ipPort{rr.A, defaultPort})
|
||||
case (*dns.AAAA):
|
||||
ips = append(ips, ipPort{rr.AAAA, defaultPort})
|
||||
case (*dns.CNAME):
|
||||
m.logger.Printf("[DEBUG] memberlist: Ignoring CNAME RR in TCP-first answer for '%s'", host)
|
||||
}
|
||||
}
|
||||
return ips, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// resolveAddr is used to resolve the address into an address,
|
||||
// port, and error. If no port is given, use the default
|
||||
func (m *Memberlist) resolveAddr(hostStr string) ([][]byte, uint16, error) {
|
||||
ips := make([][]byte, 0)
|
||||
func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
|
||||
// Normalize the incoming string to host:port so we can apply Go's
|
||||
// parser to it.
|
||||
port := uint16(0)
|
||||
if !hasPort(hostStr) {
|
||||
hostStr += ":" + strconv.Itoa(m.config.BindPort)
|
||||
}
|
||||
host, sport, err := net.SplitHostPort(hostStr)
|
||||
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
|
||||
// error, port missing - we can solve this
|
||||
port = uint16(m.config.BindPort)
|
||||
host = hostStr
|
||||
} else if err != nil {
|
||||
// error, but not missing port
|
||||
return ips, port, err
|
||||
} else if lport, err := strconv.ParseUint(sport, 10, 16); err != nil {
|
||||
// error, when parsing port
|
||||
return ips, port, err
|
||||
} else {
|
||||
// no error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// This will capture the supplied port, or the default one added above.
|
||||
lport, err := strconv.ParseUint(sport, 10, 16)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
port = uint16(lport)
|
||||
|
||||
// If it looks like an IP address we are done. The SplitHostPort() above
|
||||
// will make sure the host part is in good shape for parsing, even for
|
||||
// IPv6 addresses.
|
||||
if ip := net.ParseIP(host); ip != nil {
|
||||
return []ipPort{ipPort{ip, port}}, nil
|
||||
}
|
||||
|
||||
// Get the addresses that hostPort might resolve to
|
||||
// ResolveTcpAddr requres ipv6 brackets to separate
|
||||
// port numbers whereas ParseIP doesn't, but luckily
|
||||
// SplitHostPort takes care of the brackets
|
||||
if ip := net.ParseIP(host); ip == nil {
|
||||
if pre, err := net.LookupIP(host); err == nil {
|
||||
for _, ip := range pre {
|
||||
ips = append(ips, ip)
|
||||
// First try TCP so we have the best chance for the largest list of
|
||||
// hosts to join. If this fails it's not fatal since this isn't a standard
|
||||
// way to query DNS, and we have a fallback below.
|
||||
ips, err := m.tcpLookupIP(host, port)
|
||||
if err != nil {
|
||||
m.logger.Printf("[DEBUG] memberlist: TCP-first lookup failed for '%s', falling back to UDP: %s", hostStr, err)
|
||||
}
|
||||
} else {
|
||||
return ips, port, err
|
||||
}
|
||||
} else {
|
||||
ips = append(ips, ip)
|
||||
if len(ips) > 0 {
|
||||
return ips, nil
|
||||
}
|
||||
|
||||
return ips, port, nil
|
||||
// If TCP didn't yield anything then use the normal Go resolver which
|
||||
// will try UDP, then might possibly try TCP again if the UDP response
|
||||
// indicates it was truncated.
|
||||
ans, err := net.LookupIP(host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ips = make([]ipPort, 0, len(ans))
|
||||
for _, ip := range ans {
|
||||
ips = append(ips, ipPort{ip, port})
|
||||
}
|
||||
return ips, nil
|
||||
}
|
||||
|
||||
// setAlive is used to mark this node as being alive. This is the same
|
||||
|
@ -541,6 +629,13 @@ func (m *Memberlist) anyAlive() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// GetHealthScore gives this instance's idea of how well it is meeting the soft
|
||||
// real-time requirements of the protocol. Lower numbers are better, and zero
|
||||
// means "totally healthy".
|
||||
func (m *Memberlist) GetHealthScore() int {
|
||||
return m.awareness.GetHealthScore()
|
||||
}
|
||||
|
||||
// ProtocolVersion returns the protocol version currently in use by
|
||||
// this memberlist.
|
||||
func (m *Memberlist) ProtocolVersion() uint8 {
|
||||
|
|
|
@ -24,9 +24,15 @@ const (
|
|||
// A memberlist speaking version 2 of the protocol will attempt
|
||||
// to TCP ping another memberlist who understands version 3 or
|
||||
// greater.
|
||||
//
|
||||
// Version 4 added support for nacks as part of indirect probes.
|
||||
// A memberlist speaking version 2 of the protocol will expect
|
||||
// nacks from another memberlist who understands version 4 or
|
||||
// greater, and likewise nacks will be sent to memberlists who
|
||||
// understand version 4 or greater.
|
||||
ProtocolVersion2Compatible = 2
|
||||
|
||||
ProtocolVersionMax = 3
|
||||
ProtocolVersionMax = 4
|
||||
)
|
||||
|
||||
// messageType is an integer ID of a type of message that can be received
|
||||
|
@ -46,6 +52,7 @@ const (
|
|||
userMsg // User mesg, not handled by us
|
||||
compressMsg
|
||||
encryptMsg
|
||||
nackRespMsg
|
||||
)
|
||||
|
||||
// compressionType is used to specify the compression algorithm
|
||||
|
@ -83,6 +90,7 @@ type indirectPingReq struct {
|
|||
Target []byte
|
||||
Port uint16
|
||||
Node string
|
||||
Nack bool // true if we'd like a nack back
|
||||
}
|
||||
|
||||
// ack response is sent for a ping
|
||||
|
@ -91,6 +99,13 @@ type ackResp struct {
|
|||
Payload []byte
|
||||
}
|
||||
|
||||
// nack response is sent for an indirect ping when the pinger doesn't hear from
|
||||
// the ping-ee within the configured timeout. This lets the original node know
|
||||
// that the indirect ping attempt happened but didn't succeed.
|
||||
type nackResp struct {
|
||||
SeqNo uint32
|
||||
}
|
||||
|
||||
// suspect is broadcast when we suspect a node is dead
|
||||
type suspect struct {
|
||||
Incarnation uint32
|
||||
|
@ -121,7 +136,7 @@ type dead struct {
|
|||
}
|
||||
|
||||
// pushPullHeader is used to inform the
|
||||
// otherside how many states we are transfering
|
||||
// otherside how many states we are transferring
|
||||
type pushPullHeader struct {
|
||||
Nodes int
|
||||
UserStateLen int // Encodes the byte lengh of user state
|
||||
|
@ -134,7 +149,7 @@ type userMsgHeader struct {
|
|||
}
|
||||
|
||||
// pushNodeState is used for pushPullReq when we are
|
||||
// transfering out node states
|
||||
// transferring out node states
|
||||
type pushNodeState struct {
|
||||
Name string
|
||||
Addr []byte
|
||||
|
@ -343,6 +358,8 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim
|
|||
m.handleIndirectPing(buf, from)
|
||||
case ackRespMsg:
|
||||
m.handleAck(buf, from, timestamp)
|
||||
case nackRespMsg:
|
||||
m.handleNack(buf, from)
|
||||
|
||||
case suspectMsg:
|
||||
fallthrough
|
||||
|
@ -440,18 +457,23 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
|
|||
}
|
||||
|
||||
// For proto versions < 2, there is no port provided. Mask old
|
||||
// behavior by using the configured port
|
||||
// behavior by using the configured port.
|
||||
if m.ProtocolVersion() < 2 || ind.Port == 0 {
|
||||
ind.Port = uint16(m.config.BindPort)
|
||||
}
|
||||
|
||||
// Send a ping to the correct host
|
||||
// Send a ping to the correct host.
|
||||
localSeqNo := m.nextSeqNo()
|
||||
ping := ping{SeqNo: localSeqNo, Node: ind.Node}
|
||||
destAddr := &net.UDPAddr{IP: ind.Target, Port: int(ind.Port)}
|
||||
|
||||
// Setup a response handler to relay the ack
|
||||
cancelCh := make(chan struct{})
|
||||
respHandler := func(payload []byte, timestamp time.Time) {
|
||||
// Try to prevent the nack if we've caught it in time.
|
||||
close(cancelCh)
|
||||
|
||||
// Forward the ack back to the requestor.
|
||||
ack := ackResp{ind.SeqNo, nil}
|
||||
if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogAddress(from))
|
||||
|
@ -459,10 +481,25 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
|
|||
}
|
||||
m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout)
|
||||
|
||||
// Send the ping
|
||||
// Send the ping.
|
||||
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from))
|
||||
}
|
||||
|
||||
// Setup a timer to fire off a nack if no ack is seen in time.
|
||||
if ind.Nack {
|
||||
go func() {
|
||||
select {
|
||||
case <-cancelCh:
|
||||
return
|
||||
case <-time.After(m.config.ProbeTimeout):
|
||||
nack := nackResp{ind.SeqNo}
|
||||
if err := m.encodeAndSendMsg(from, nackRespMsg, &nack); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogAddress(from))
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Memberlist) handleAck(buf []byte, from net.Addr, timestamp time.Time) {
|
||||
|
@ -474,6 +511,15 @@ func (m *Memberlist) handleAck(buf []byte, from net.Addr, timestamp time.Time) {
|
|||
m.invokeAckHandler(ack, timestamp)
|
||||
}
|
||||
|
||||
func (m *Memberlist) handleNack(buf []byte, from net.Addr) {
|
||||
var nack nackResp
|
||||
if err := decode(buf, &nack); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to decode nack response: %s %s", err, LogAddress(from))
|
||||
return
|
||||
}
|
||||
m.invokeNackHandler(nack)
|
||||
}
|
||||
|
||||
func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) {
|
||||
var sus suspect
|
||||
if err := decode(buf, &sus); err != nil {
|
||||
|
|
|
@ -42,9 +42,10 @@ type nodeState struct {
|
|||
StateChange time.Time // Time last state change happened
|
||||
}
|
||||
|
||||
// ackHandler is used to register handlers for incoming acks
|
||||
// ackHandler is used to register handlers for incoming acks and nacks.
|
||||
type ackHandler struct {
|
||||
handler func([]byte, time.Time)
|
||||
ackFn func([]byte, time.Time)
|
||||
nackFn func()
|
||||
timer *time.Timer
|
||||
}
|
||||
|
||||
|
@ -148,7 +149,7 @@ func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) {
|
|||
}
|
||||
}
|
||||
|
||||
// Deschedule is used to stop the background maintenence. This is safe
|
||||
// Deschedule is used to stop the background maintenance. This is safe
|
||||
// to call multiple times.
|
||||
func (m *Memberlist) deschedule() {
|
||||
m.tickerLock.Lock()
|
||||
|
@ -219,24 +220,68 @@ START:
|
|||
func (m *Memberlist) probeNode(node *nodeState) {
|
||||
defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
|
||||
|
||||
// We use our health awareness to scale the overall probe interval, so we
|
||||
// slow down if we detect problems. The ticker that calls us can handle
|
||||
// us running over the base interval, and will skip missed ticks.
|
||||
probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval)
|
||||
if probeInterval > m.config.ProbeInterval {
|
||||
metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1)
|
||||
}
|
||||
|
||||
// Prepare a ping message and setup an ack handler.
|
||||
ping := ping{SeqNo: m.nextSeqNo(), Node: node.Name}
|
||||
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
|
||||
m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)
|
||||
nackCh := make(chan struct{}, m.config.IndirectChecks+1)
|
||||
m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)
|
||||
|
||||
// Send a ping to the node.
|
||||
deadline := time.Now().Add(m.config.ProbeInterval)
|
||||
// Send a ping to the node. If this node looks like it's suspect or dead,
|
||||
// also tack on a suspect message so that it has a chance to refute as
|
||||
// soon as possible.
|
||||
deadline := time.Now().Add(probeInterval)
|
||||
destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
|
||||
if node.State == stateAlive {
|
||||
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
var msgs [][]byte
|
||||
if buf, err := encode(pingMsg, &ping); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to encode ping message: %s", err)
|
||||
return
|
||||
} else {
|
||||
msgs = append(msgs, buf.Bytes())
|
||||
}
|
||||
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
|
||||
if buf, err := encode(suspectMsg, &s); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to encode suspect message: %s", err)
|
||||
return
|
||||
} else {
|
||||
msgs = append(msgs, buf.Bytes())
|
||||
}
|
||||
|
||||
compound := makeCompoundMessage(msgs)
|
||||
if err := m.rawSendMsgUDP(destAddr, compound.Bytes()); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", destAddr, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Mark the sent time here, which should be after any pre-processing and
|
||||
// system calls to do the actual send. This probably under-reports a bit,
|
||||
// but it's the best we can do.
|
||||
sent := time.Now()
|
||||
|
||||
// Arrange for our self-awareness to get updated. At this point we've
|
||||
// sent the ping, so any return statement means the probe succeeded
|
||||
// which will improve our health until we get to the failure scenarios
|
||||
// at the end of this function, which will alter this delta variable
|
||||
// accordingly.
|
||||
awarenessDelta := -1
|
||||
defer func() {
|
||||
m.awareness.ApplyDelta(awarenessDelta)
|
||||
}()
|
||||
|
||||
// Wait for response or round-trip-time.
|
||||
select {
|
||||
case v := <-ackCh:
|
||||
|
@ -254,6 +299,12 @@ func (m *Memberlist) probeNode(node *nodeState) {
|
|||
ackCh <- v
|
||||
}
|
||||
case <-time.After(m.config.ProbeTimeout):
|
||||
// Note that we don't scale this timeout based on awareness and
|
||||
// the health score. That's because we don't really expect waiting
|
||||
// longer to help get UDP through. Since health does extend the
|
||||
// probe interval it will give the TCP fallback more time, which
|
||||
// is more active in dealing with lost packets, and it gives more
|
||||
// time to wait for indirect acks/nacks.
|
||||
m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node.Name)
|
||||
}
|
||||
|
||||
|
@ -264,8 +315,15 @@ func (m *Memberlist) probeNode(node *nodeState) {
|
|||
m.nodeLock.RUnlock()
|
||||
|
||||
// Attempt an indirect ping.
|
||||
expectedNacks := 0
|
||||
ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name}
|
||||
for _, peer := range kNodes {
|
||||
// We only expect nack to be sent from peers who understand
|
||||
// version 4 of the protocol.
|
||||
if ind.Nack = peer.PMax >= 4; ind.Nack {
|
||||
expectedNacks++
|
||||
}
|
||||
|
||||
destAddr := &net.UDPAddr{IP: peer.Addr, Port: int(peer.Port)}
|
||||
if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
|
||||
|
@ -319,7 +377,23 @@ func (m *Memberlist) probeNode(node *nodeState) {
|
|||
}
|
||||
}
|
||||
|
||||
// No acks received from target, suspect
|
||||
// Update our self-awareness based on the results of this failed probe.
|
||||
// If we don't have peers who will send nacks then we penalize for any
|
||||
// failed probe as a simple health metric. If we do have peers to nack
|
||||
// verify, then we can use that as a more sophisticated measure of self-
|
||||
// health because we assume them to be working, and they can help us
|
||||
// decide if the probed node was really dead or if it was something wrong
|
||||
// with ourselves.
|
||||
awarenessDelta = 0
|
||||
if expectedNacks > 0 {
|
||||
if nackCount := len(nackCh); nackCount < expectedNacks {
|
||||
awarenessDelta += 2 * (expectedNacks - nackCount)
|
||||
}
|
||||
} else {
|
||||
awarenessDelta += 1
|
||||
}
|
||||
|
||||
// No acks received from target, suspect it as failed.
|
||||
m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name)
|
||||
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
|
||||
m.suspectNode(&s)
|
||||
|
@ -330,7 +404,7 @@ func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
|
|||
// Prepare a ping message and setup an ack handler.
|
||||
ping := ping{SeqNo: m.nextSeqNo(), Node: node}
|
||||
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
|
||||
m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)
|
||||
m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval)
|
||||
|
||||
// Send a ping to the node.
|
||||
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
|
||||
|
@ -584,6 +658,11 @@ func (m *Memberlist) nextIncarnation() uint32 {
|
|||
return atomic.AddUint32(&m.incarnation, 1)
|
||||
}
|
||||
|
||||
// skipIncarnation adds the positive offset to the incarnation number.
|
||||
func (m *Memberlist) skipIncarnation(offset uint32) uint32 {
|
||||
return atomic.AddUint32(&m.incarnation, offset)
|
||||
}
|
||||
|
||||
// estNumNodes is used to get the current estimate of the number of nodes
|
||||
func (m *Memberlist) estNumNodes() int {
|
||||
return int(atomic.LoadUint32(&m.numNodes))
|
||||
|
@ -595,19 +674,27 @@ type ackMessage struct {
|
|||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// setAckChannel is used to attach a channel to receive a message when an ack with a given
|
||||
// sequence number is received. The `complete` field of the message will be false on timeout
|
||||
func (m *Memberlist) setAckChannel(seqNo uint32, ch chan ackMessage, timeout time.Duration) {
|
||||
// Create a handler function
|
||||
handler := func(payload []byte, timestamp time.Time) {
|
||||
// setProbeChannels is used to attach the ackCh to receive a message when an ack
|
||||
// with a given sequence number is received. The `complete` field of the message
|
||||
// will be false on timeout. Any nack messages will cause an empty struct to be
|
||||
// passed to the nackCh, which can be nil if not needed.
|
||||
func (m *Memberlist) setProbeChannels(seqNo uint32, ackCh chan ackMessage, nackCh chan struct{}, timeout time.Duration) {
|
||||
// Create handler functions for acks and nacks
|
||||
ackFn := func(payload []byte, timestamp time.Time) {
|
||||
select {
|
||||
case ch <- ackMessage{true, payload, timestamp}:
|
||||
case ackCh <- ackMessage{true, payload, timestamp}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
nackFn := func() {
|
||||
select {
|
||||
case nackCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Add the handler
|
||||
ah := &ackHandler{handler, nil}
|
||||
// Add the handlers
|
||||
ah := &ackHandler{ackFn, nackFn, nil}
|
||||
m.ackLock.Lock()
|
||||
m.ackHandlers[seqNo] = ah
|
||||
m.ackLock.Unlock()
|
||||
|
@ -618,18 +705,19 @@ func (m *Memberlist) setAckChannel(seqNo uint32, ch chan ackMessage, timeout tim
|
|||
delete(m.ackHandlers, seqNo)
|
||||
m.ackLock.Unlock()
|
||||
select {
|
||||
case ch <- ackMessage{false, nil, time.Now()}:
|
||||
case ackCh <- ackMessage{false, nil, time.Now()}:
|
||||
default:
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// setAckHandler is used to attach a handler to be invoked when an
|
||||
// ack with a given sequence number is received. If a timeout is reached,
|
||||
// the handler is deleted
|
||||
func (m *Memberlist) setAckHandler(seqNo uint32, handler func([]byte, time.Time), timeout time.Duration) {
|
||||
// setAckHandler is used to attach a handler to be invoked when an ack with a
|
||||
// given sequence number is received. If a timeout is reached, the handler is
|
||||
// deleted. This is used for indirect pings so does not configure a function
|
||||
// for nacks.
|
||||
func (m *Memberlist) setAckHandler(seqNo uint32, ackFn func([]byte, time.Time), timeout time.Duration) {
|
||||
// Add the handler
|
||||
ah := &ackHandler{handler, nil}
|
||||
ah := &ackHandler{ackFn, nil, nil}
|
||||
m.ackLock.Lock()
|
||||
m.ackHandlers[seqNo] = ah
|
||||
m.ackLock.Unlock()
|
||||
|
@ -642,7 +730,7 @@ func (m *Memberlist) setAckHandler(seqNo uint32, handler func([]byte, time.Time)
|
|||
})
|
||||
}
|
||||
|
||||
// Invokes an Ack handler if any is associated, and reaps the handler immediately
|
||||
// Invokes an ack handler if any is associated, and reaps the handler immediately
|
||||
func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) {
|
||||
m.ackLock.Lock()
|
||||
ah, ok := m.ackHandlers[ack.SeqNo]
|
||||
|
@ -652,7 +740,49 @@ func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) {
|
|||
return
|
||||
}
|
||||
ah.timer.Stop()
|
||||
ah.handler(ack.Payload, timestamp)
|
||||
ah.ackFn(ack.Payload, timestamp)
|
||||
}
|
||||
|
||||
// Invokes nack handler if any is associated.
|
||||
func (m *Memberlist) invokeNackHandler(nack nackResp) {
|
||||
m.ackLock.Lock()
|
||||
ah, ok := m.ackHandlers[nack.SeqNo]
|
||||
m.ackLock.Unlock()
|
||||
if !ok || ah.nackFn == nil {
|
||||
return
|
||||
}
|
||||
ah.nackFn()
|
||||
}
|
||||
|
||||
// refute gossips an alive message in response to incoming information that we
|
||||
// are suspect or dead. It will make sure the incarnation number beats the given
|
||||
// accusedInc value, or you can supply 0 to just get the next incarnation number.
|
||||
// This alters the node state that's passed in so this MUST be called while the
|
||||
// nodeLock is held.
|
||||
func (m *Memberlist) refute(me *nodeState, accusedInc uint32) {
|
||||
// Make sure the incarnation number beats the accusation.
|
||||
inc := m.nextIncarnation()
|
||||
if accusedInc >= inc {
|
||||
inc = m.skipIncarnation(accusedInc - inc + 1)
|
||||
}
|
||||
me.Incarnation = inc
|
||||
|
||||
// Decrease our health because we are being asked to refute a problem.
|
||||
m.awareness.ApplyDelta(1)
|
||||
|
||||
// Format and broadcast an alive message.
|
||||
a := alive{
|
||||
Incarnation: inc,
|
||||
Node: me.Name,
|
||||
Addr: me.Addr,
|
||||
Port: me.Port,
|
||||
Meta: me.Meta,
|
||||
Vsn: []uint8{
|
||||
me.PMin, me.PMax, me.PCur,
|
||||
me.DMin, me.DMax, me.DCur,
|
||||
},
|
||||
}
|
||||
m.encodeAndBroadcast(me.Addr.String(), aliveMsg, a)
|
||||
}
|
||||
|
||||
// aliveNode is invoked by the network layer when we get a message about a
|
||||
|
@ -754,6 +884,9 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
|
|||
return
|
||||
}
|
||||
|
||||
// Clear out any suspicion timer that may be in effect.
|
||||
delete(m.nodeTimers, a.Node)
|
||||
|
||||
// Store the old state and meta data
|
||||
oldState := state.State
|
||||
oldMeta := state.Meta
|
||||
|
@ -783,21 +916,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
|
|||
return
|
||||
}
|
||||
|
||||
inc := m.nextIncarnation()
|
||||
for a.Incarnation >= inc {
|
||||
inc = m.nextIncarnation()
|
||||
}
|
||||
state.Incarnation = inc
|
||||
|
||||
a := alive{
|
||||
Incarnation: inc,
|
||||
Node: state.Name,
|
||||
Addr: state.Addr,
|
||||
Port: state.Port,
|
||||
Meta: state.Meta,
|
||||
Vsn: versions,
|
||||
}
|
||||
m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
|
||||
m.refute(state, a.Incarnation)
|
||||
m.logger.Printf("[WARN] memberlist: Refuting an alive message")
|
||||
} else {
|
||||
m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
|
||||
|
@ -854,6 +973,17 @@ func (m *Memberlist) suspectNode(s *suspect) {
|
|||
return
|
||||
}
|
||||
|
||||
// See if there's a suspicion timer we can confirm. If the info is new
|
||||
// to us we will go ahead and re-gossip it. This allows for multiple
|
||||
// independent confirmations to flow even when a node probes a node
|
||||
// that's already suspect.
|
||||
if timer, ok := m.nodeTimers[s.Node]; ok {
|
||||
if timer.Confirm(s.From) {
|
||||
m.encodeAndBroadcast(s.Node, suspectMsg, s)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Ignore non-alive nodes
|
||||
if state.State != stateAlive {
|
||||
return
|
||||
|
@ -861,24 +991,7 @@ func (m *Memberlist) suspectNode(s *suspect) {
|
|||
|
||||
// If this is us we need to refute, otherwise re-broadcast
|
||||
if state.Name == m.config.Name {
|
||||
inc := m.nextIncarnation()
|
||||
for s.Incarnation >= inc {
|
||||
inc = m.nextIncarnation()
|
||||
}
|
||||
state.Incarnation = inc
|
||||
|
||||
a := alive{
|
||||
Incarnation: inc,
|
||||
Node: state.Name,
|
||||
Addr: state.Addr,
|
||||
Port: state.Port,
|
||||
Meta: state.Meta,
|
||||
Vsn: []uint8{
|
||||
state.PMin, state.PMax, state.PCur,
|
||||
state.DMin, state.DMax, state.DCur,
|
||||
},
|
||||
}
|
||||
m.encodeAndBroadcast(s.Node, aliveMsg, a)
|
||||
m.refute(state, s.Incarnation)
|
||||
m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From)
|
||||
return // Do not mark ourself suspect
|
||||
} else {
|
||||
|
@ -894,26 +1007,41 @@ func (m *Memberlist) suspectNode(s *suspect) {
|
|||
changeTime := time.Now()
|
||||
state.StateChange = changeTime
|
||||
|
||||
// Setup a timeout for this
|
||||
timeout := suspicionTimeout(m.config.SuspicionMult, m.estNumNodes(), m.config.ProbeInterval)
|
||||
time.AfterFunc(timeout, func() {
|
||||
// Setup a suspicion timer. Given that we don't have any known phase
|
||||
// relationship with our peers, we set up k such that we hit the nominal
|
||||
// timeout two probe intervals short of what we expect given the suspicion
|
||||
// multiplier.
|
||||
k := m.config.SuspicionMult - 2
|
||||
|
||||
// If there aren't enough nodes to give the expected confirmations, just
|
||||
// set k to 0 to say that we don't expect any. Note we subtract 2 from n
|
||||
// here to take out ourselves and the node being probed.
|
||||
n := m.estNumNodes()
|
||||
if n-2 < k {
|
||||
k = 0
|
||||
}
|
||||
|
||||
// Compute the timeouts based on the size of the cluster.
|
||||
min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval)
|
||||
max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min
|
||||
fn := func(numConfirmations int) {
|
||||
m.nodeLock.Lock()
|
||||
state, ok := m.nodeMap[s.Node]
|
||||
timeout := ok && state.State == stateSuspect && state.StateChange == changeTime
|
||||
m.nodeLock.Unlock()
|
||||
|
||||
if timeout {
|
||||
m.suspectTimeout(state)
|
||||
if k > 0 && numConfirmations < k {
|
||||
metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// suspectTimeout is invoked when a suspect timeout has occurred
|
||||
func (m *Memberlist) suspectTimeout(n *nodeState) {
|
||||
// Construct a dead message
|
||||
m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached", n.Name)
|
||||
d := dead{Incarnation: n.Incarnation, Node: n.Name, From: m.config.Name}
|
||||
m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)",
|
||||
state.Name, numConfirmations)
|
||||
d := dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name}
|
||||
m.deadNode(&d)
|
||||
}
|
||||
}
|
||||
m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn)
|
||||
}
|
||||
|
||||
// deadNode is invoked by the network layer when we get a message
|
||||
|
@ -933,6 +1061,9 @@ func (m *Memberlist) deadNode(d *dead) {
|
|||
return
|
||||
}
|
||||
|
||||
// Clear out any suspicion timer that may be in effect.
|
||||
delete(m.nodeTimers, d.Node)
|
||||
|
||||
// Ignore if node is already dead
|
||||
if state.State == stateDead {
|
||||
return
|
||||
|
@ -942,24 +1073,7 @@ func (m *Memberlist) deadNode(d *dead) {
|
|||
if state.Name == m.config.Name {
|
||||
// If we are not leaving we need to refute
|
||||
if !m.leave {
|
||||
inc := m.nextIncarnation()
|
||||
for d.Incarnation >= inc {
|
||||
inc = m.nextIncarnation()
|
||||
}
|
||||
state.Incarnation = inc
|
||||
|
||||
a := alive{
|
||||
Incarnation: inc,
|
||||
Node: state.Name,
|
||||
Addr: state.Addr,
|
||||
Port: state.Port,
|
||||
Meta: state.Meta,
|
||||
Vsn: []uint8{
|
||||
state.PMin, state.PMax, state.PCur,
|
||||
state.DMin, state.DMax, state.DCur,
|
||||
},
|
||||
}
|
||||
m.encodeAndBroadcast(d.Node, aliveMsg, a)
|
||||
m.refute(state, d.Incarnation)
|
||||
m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From)
|
||||
return // Do not mark ourself dead
|
||||
}
|
||||
|
@ -1001,7 +1115,7 @@ func (m *Memberlist) mergeState(remote []pushNodeState) {
|
|||
m.aliveNode(&a, nil, false)
|
||||
|
||||
case stateDead:
|
||||
// If the remote node belives a node is dead, we prefer to
|
||||
// If the remote node believes a node is dead, we prefer to
|
||||
// suspect that node instead of declaring it dead instantly
|
||||
fallthrough
|
||||
case stateSuspect:
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
package memberlist
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// suspicion manages the suspect timer for a node and provides an interface
|
||||
// to accelerate the timeout as we get more independent confirmations that
|
||||
// a node is suspect.
|
||||
type suspicion struct {
|
||||
// n is the number of independent confirmations we've seen. This must
|
||||
// be updated using atomic instructions to prevent contention with the
|
||||
// timer callback.
|
||||
n int32
|
||||
|
||||
// k is the number of independent confirmations we'd like to see in
|
||||
// order to drive the timer to its minimum value.
|
||||
k int32
|
||||
|
||||
// min is the minimum timer value.
|
||||
min time.Duration
|
||||
|
||||
// max is the maximum timer value.
|
||||
max time.Duration
|
||||
|
||||
// start captures the timestamp when we began the timer. This is used
|
||||
// so we can calculate durations to feed the timer during updates in
|
||||
// a way the achieves the overall time we'd like.
|
||||
start time.Time
|
||||
|
||||
// timer is the underlying timer that implements the timeout.
|
||||
timer *time.Timer
|
||||
|
||||
// f is the function to call when the timer expires. We hold on to this
|
||||
// because there are cases where we call it directly.
|
||||
timeoutFn func()
|
||||
|
||||
// confirmations is a map of "from" nodes that have confirmed a given
|
||||
// node is suspect. This prevents double counting.
|
||||
confirmations map[string]struct{}
|
||||
}
|
||||
|
||||
// newSuspicion returns a timer started with the max time, and that will drive
|
||||
// to the min time after seeing k or more confirmations. The from node will be
|
||||
// excluded from confirmations since we might get our own suspicion message
|
||||
// gossiped back to us. The minimum time will be used if no confirmations are
|
||||
// called for (k <= 0).
|
||||
func newSuspicion(from string, k int, min time.Duration, max time.Duration, fn func(int)) *suspicion {
|
||||
s := &suspicion{
|
||||
k: int32(k),
|
||||
min: min,
|
||||
max: max,
|
||||
confirmations: make(map[string]struct{}),
|
||||
}
|
||||
|
||||
// Exclude the from node from any confirmations.
|
||||
s.confirmations[from] = struct{}{}
|
||||
|
||||
// Pass the number of confirmations into the timeout function for
|
||||
// easy telemetry.
|
||||
s.timeoutFn = func() {
|
||||
fn(int(atomic.LoadInt32(&s.n)))
|
||||
}
|
||||
|
||||
// If there aren't any confirmations to be made then take the min
|
||||
// time from the start.
|
||||
timeout := max
|
||||
if k < 1 {
|
||||
timeout = min
|
||||
}
|
||||
s.timer = time.AfterFunc(timeout, s.timeoutFn)
|
||||
|
||||
// Capture the start time right after starting the timer above so
|
||||
// we should always err on the side of a little longer timeout if
|
||||
// there's any preemption that separates this and the step above.
|
||||
s.start = time.Now()
|
||||
return s
|
||||
}
|
||||
|
||||
// remainingSuspicionTime takes the state variables of the suspicion timer and
|
||||
// calculates the remaining time to wait before considering a node dead. The
|
||||
// return value can be negative, so be prepared to fire the timer immediately in
|
||||
// that case.
|
||||
func remainingSuspicionTime(n, k int32, elapsed time.Duration, min, max time.Duration) time.Duration {
|
||||
frac := math.Log(float64(n)+1.0) / math.Log(float64(k)+1.0)
|
||||
raw := max.Seconds() - frac*(max.Seconds()-min.Seconds())
|
||||
timeout := time.Duration(math.Floor(1000.0*raw)) * time.Millisecond
|
||||
if timeout < min {
|
||||
timeout = min
|
||||
}
|
||||
|
||||
// We have to take into account the amount of time that has passed so
|
||||
// far, so we get the right overall timeout.
|
||||
return timeout - elapsed
|
||||
}
|
||||
|
||||
// Confirm registers that a possibly new peer has also determined the given
|
||||
// node is suspect. This returns true if this was new information, and false
|
||||
// if it was a duplicate confirmation, or if we've got enough confirmations to
|
||||
// hit the minimum.
|
||||
func (s *suspicion) Confirm(from string) bool {
|
||||
// If we've got enough confirmations then stop accepting them.
|
||||
if atomic.LoadInt32(&s.n) >= s.k {
|
||||
return false
|
||||
}
|
||||
|
||||
// Only allow one confirmation from each possible peer.
|
||||
if _, ok := s.confirmations[from]; ok {
|
||||
return false
|
||||
}
|
||||
s.confirmations[from] = struct{}{}
|
||||
|
||||
// Compute the new timeout given the current number of confirmations and
|
||||
// adjust the timer. If the timeout becomes negative *and* we can cleanly
|
||||
// stop the timer then we will call the timeout function directly from
|
||||
// here.
|
||||
n := atomic.AddInt32(&s.n, 1)
|
||||
elapsed := time.Now().Sub(s.start)
|
||||
remaining := remainingSuspicionTime(n, s.k, elapsed, s.min, s.max)
|
||||
if s.timer.Stop() {
|
||||
if remaining > 0 {
|
||||
s.timer.Reset(remaining)
|
||||
} else {
|
||||
go s.timeoutFn()
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
|
@ -9,6 +9,7 @@ import (
|
|||
"math"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
|
@ -326,6 +327,12 @@ func isLoopbackIP(ip_str string) bool {
|
|||
return loopbackBlock.Contains(ip)
|
||||
}
|
||||
|
||||
// Given a string of the form "host", "host:port", or "[ipv6::address]:port",
|
||||
// return true if the string includes a port.
|
||||
func hasPort(s string) bool {
|
||||
return strings.LastIndex(s, ":") > strings.LastIndex(s, "]")
|
||||
}
|
||||
|
||||
// compressPayload takes an opaque input buffer, compresses it
|
||||
// and wraps it in a compress{} message that is encoded.
|
||||
func compressPayload(inp []byte) (*bytes.Buffer, error) {
|
||||
|
|
Loading…
Reference in New Issue