updated memberlist dependency to latest, which is missing NMD-1173 error

This commit is contained in:
Chris Baker 2018-12-07 22:15:05 +00:00
parent 59beae35df
commit 4bbb8106c1
15 changed files with 1010 additions and 434 deletions

View file

@ -1,3 +1,5 @@
DEPS := $(shell go list -f '{{range .Imports}}{{.}} {{end}}' ./...)
test: subnet test: subnet
go test ./... go test ./...
@ -11,4 +13,8 @@ cov:
gocov test github.com/hashicorp/memberlist | gocov-html > /tmp/coverage.html gocov test github.com/hashicorp/memberlist | gocov-html > /tmp/coverage.html
open /tmp/coverage.html open /tmp/coverage.html
.PNONY: test cov integ deps:
go get -t -d -v ./...
echo $(DEPS) | xargs -n1 go get -d
.PHONY: test cov integ

View file

@ -1,4 +1,4 @@
# memberlist [![GoDoc](https://godoc.org/github.com/hashicorp/memberlist?status.png)](https://godoc.org/github.com/hashicorp/memberlist) # memberlist [![GoDoc](https://godoc.org/github.com/hashicorp/memberlist?status.png)](https://godoc.org/github.com/hashicorp/memberlist) [![Build Status](https://travis-ci.org/hashicorp/memberlist.svg?branch=master)](https://travis-ci.org/hashicorp/memberlist)
memberlist is a [Go](http://www.golang.org) library that manages cluster memberlist is a [Go](http://www.golang.org) library that manages cluster
membership and member failure detection using a gossip based protocol. membership and member failure detection using a gossip based protocol.
@ -23,6 +23,8 @@ Please check your installation with:
go version go version
``` ```
Run `make deps` to fetch dependencies before building
## Usage ## Usage
Memberlist is surprisingly simple to use. An example is shown below: Memberlist is surprisingly simple to use. An example is shown below:
@ -63,82 +65,11 @@ For complete documentation, see the associated [Godoc](http://godoc.org/github.c
## Protocol ## Protocol
memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf), memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://ieeexplore.ieee.org/document/1028914/). However, we extend the protocol in a number of ways:
with a few minor adaptations, mostly to increase propagation speed and
* Several extensions are made to increase propagation speed and
convergence rate. convergence rate.
* Another set of extensions, that we call Lifeguard, are made to make memberlist more robust in the presence of slow message processing (due to factors such as CPU starvation, and network delay or loss).
A high level overview of the memberlist protocol (based on SWIM) is For details on all of these extensions, please read our paper "[Lifeguard : SWIM-ing with Situational Awareness](https://arxiv.org/abs/1707.00788)", along with the memberlist source. We welcome any questions related
described below, but for details please read the full
[SWIM paper](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf)
followed by the memberlist source. We welcome any questions related
to the protocol on our issue tracker. to the protocol on our issue tracker.
### Protocol Description
memberlist begins by joining an existing cluster or starting a new
cluster. If starting a new cluster, additional nodes are expected to join
it. New nodes in an existing cluster must be given the address of at
least one existing member in order to join the cluster. The new member
does a full state sync with the existing member over TCP and begins gossiping its
existence to the cluster.
Gossip is done over UDP with a configurable but fixed fanout and interval.
This ensures that network usage is constant with regards to number of nodes, as opposed to
exponential growth that can occur with traditional heartbeat mechanisms.
Complete state exchanges with a random node are done periodically over
TCP, but much less often than gossip messages. This increases the likelihood
that the membership list converges properly since the full state is exchanged
and merged. The interval between full state exchanges is configurable or can
be disabled entirely.
Failure detection is done by periodic random probing using a configurable interval.
If the node fails to ack within a reasonable time (typically some multiple
of RTT), then an indirect probe as well as a direct TCP probe are attempted. An
indirect probe asks a configurable number of random nodes to probe the same node,
in case there are network issues causing our own node to fail the probe. The direct
TCP probe is used to help identify the common situation where networking is
misconfigured to allow TCP but not UDP. Without the TCP probe, a UDP-isolated node
would think all other nodes were suspect and could cause churn in the cluster when
it attempts a TCP-based state exchange with another node. It is not desirable to
operate with only TCP connectivity because convergence will be much slower, but it
is enabled so that memberlist can detect this situation and alert operators.
If both our probe, the indirect probes, and the direct TCP probe fail within a
configurable time, then the node is marked "suspicious" and this knowledge is
gossiped to the cluster. A suspicious node is still considered a member of
cluster. If the suspect member of the cluster does not dispute the suspicion
within a configurable period of time, the node is finally considered dead,
and this state is then gossiped to the cluster.
This is a brief and incomplete description of the protocol. For a better idea,
please read the
[SWIM paper](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf)
in its entirety, along with the memberlist source code.
### Changes from SWIM
As mentioned earlier, the memberlist protocol is based on SWIM but includes
minor changes, mostly to increase propagation speed and convergence rates.
The changes from SWIM are noted here:
* memberlist does a full state sync over TCP periodically. SWIM only propagates
changes over gossip. While both eventually reach convergence, the full state
sync increases the likelihood that nodes are fully converged more quickly,
at the expense of more bandwidth usage. This feature can be totally disabled
if you wish.
* memberlist has a dedicated gossip layer separate from the failure detection
protocol. SWIM only piggybacks gossip messages on top of probe/ack messages.
memberlist also piggybacks gossip messages on top of probe/ack messages, but
also will periodically send out dedicated gossip messages on their own. This
feature lets you have a higher gossip rate (for example once per 200ms)
and a slower failure detection rate (such as once per second), resulting
in overall faster convergence rates and data propagation speeds. This feature
can be totally disabed as well, if you wish.
* memberlist stores around the state of dead nodes for a set amount of time,
so that when full syncs are requested, the requester also receives information
about dead nodes. Because SWIM doesn't do full syncs, SWIM deletes dead node
state immediately upon learning that the node is dead. This change again helps
the cluster converge more quickly.

View file

@ -11,10 +11,15 @@ type Config struct {
// The name of this node. This must be unique in the cluster. // The name of this node. This must be unique in the cluster.
Name string Name string
// Transport is a hook for providing custom code to communicate with
// other nodes. If this is left nil, then memberlist will by default
// make a NetTransport using BindAddr and BindPort from this structure.
Transport Transport
// Configuration related to what address to bind to and ports to // Configuration related to what address to bind to and ports to
// listen on. The port is used for both UDP and TCP gossip. // listen on. The port is used for both UDP and TCP gossip. It is
// It is assumed other nodes are running on this port, but they // assumed other nodes are running on this port, but they do not need
// do not need to. // to.
BindAddr string BindAddr string
BindPort int BindPort int
@ -28,8 +33,11 @@ type Config struct {
// ProtocolVersionMax. // ProtocolVersionMax.
ProtocolVersion uint8 ProtocolVersion uint8
// TCPTimeout is the timeout for establishing a TCP connection with // TCPTimeout is the timeout for establishing a stream connection with
// a remote node for a full state sync. // a remote node for a full state sync, and for stream read and write
// operations. This is a legacy name for backwards compatibility, but
// should really be called StreamTimeout now that we have generalized
// the transport.
TCPTimeout time.Duration TCPTimeout time.Duration
// IndirectChecks is the number of nodes that will be asked to perform // IndirectChecks is the number of nodes that will be asked to perform
@ -133,6 +141,16 @@ type Config struct {
GossipNodes int GossipNodes int
GossipToTheDeadTime time.Duration GossipToTheDeadTime time.Duration
// GossipVerifyIncoming controls whether to enforce encryption for incoming
// gossip. It is used for upshifting from unencrypted to encrypted gossip on
// a running cluster.
GossipVerifyIncoming bool
// GossipVerifyOutgoing controls whether to enforce encryption for outgoing
// gossip. It is used for upshifting from unencrypted to encrypted gossip on
// a running cluster.
GossipVerifyOutgoing bool
// EnableCompression is used to control message compression. This can // EnableCompression is used to control message compression. This can
// be used to reduce bandwidth usage at the cost of slightly more CPU // be used to reduce bandwidth usage at the cost of slightly more CPU
// utilization. This is only available starting at protocol version 1. // utilization. This is only available starting at protocol version 1.
@ -189,10 +207,13 @@ type Config struct {
// while UDP messages are handled. // while UDP messages are handled.
HandoffQueueDepth int HandoffQueueDepth int
// Maximum number of bytes that memberlist expects UDP messages to be. A safe // Maximum number of bytes that memberlist will put in a packet (this
// value for this is typically 1400 bytes (which is the default.) However, // will be for UDP packets by default with a NetTransport). A safe value
// depending on your network's MTU (Maximum Transmission Unit) you may be able // for this is typically 1400 bytes (which is the default). However,
// to increase this. // depending on your network's MTU (Maximum Transmission Unit) you may
// be able to increase this to get more content into each gossip packet.
// This is a legacy name for backward compatibility but should really be
// called PacketBufferSize now that we have generalized the transport.
UDPBufferSize int UDPBufferSize int
} }
@ -214,7 +235,7 @@ func DefaultLANConfig() *Config {
TCPTimeout: 10 * time.Second, // Timeout after 10 seconds TCPTimeout: 10 * time.Second, // Timeout after 10 seconds
IndirectChecks: 3, // Use 3 nodes for the indirect ping IndirectChecks: 3, // Use 3 nodes for the indirect ping
RetransmitMult: 4, // Retransmit a message 4 * log(N+1) nodes RetransmitMult: 4, // Retransmit a message 4 * log(N+1) nodes
SuspicionMult: 5, // Suspect a node for 5 * log(N+1) * Interval SuspicionMult: 4, // Suspect a node for 4 * log(N+1) * Interval
SuspicionMaxTimeoutMult: 6, // For 10k nodes this will give a max timeout of 120 seconds SuspicionMaxTimeoutMult: 6, // For 10k nodes this will give a max timeout of 120 seconds
PushPullInterval: 30 * time.Second, // Low frequency PushPullInterval: 30 * time.Second, // Low frequency
ProbeTimeout: 500 * time.Millisecond, // Reasonable RTT time for LAN ProbeTimeout: 500 * time.Millisecond, // Reasonable RTT time for LAN
@ -225,6 +246,8 @@ func DefaultLANConfig() *Config {
GossipNodes: 3, // Gossip to 3 nodes GossipNodes: 3, // Gossip to 3 nodes
GossipInterval: 200 * time.Millisecond, // Gossip more rapidly GossipInterval: 200 * time.Millisecond, // Gossip more rapidly
GossipToTheDeadTime: 30 * time.Second, // Same as push/pull GossipToTheDeadTime: 30 * time.Second, // Same as push/pull
GossipVerifyIncoming: true,
GossipVerifyOutgoing: true,
EnableCompression: true, // Enable compression by default EnableCompression: true, // Enable compression by default

View file

@ -12,7 +12,7 @@ type Delegate interface {
// NotifyMsg is called when a user-data message is received. // NotifyMsg is called when a user-data message is received.
// Care should be taken that this method does not block, since doing // Care should be taken that this method does not block, since doing
// so would block the entire UDP packet receive loop. Additionally, the byte // so would block the entire UDP packet receive loop. Additionally, the byte
// slice may be modified after the call returns, so it should be copied if needed. // slice may be modified after the call returns, so it should be copied if needed
NotifyMsg([]byte) NotifyMsg([]byte)
// GetBroadcasts is called when user data messages can be broadcast. // GetBroadcasts is called when user data messages can be broadcast.

View file

@ -15,6 +15,7 @@ multiple routes.
package memberlist package memberlist
import ( import (
"container/list"
"fmt" "fmt"
"log" "log"
"net" "net"
@ -22,9 +23,10 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/hashicorp/go-multierror" multierror "github.com/hashicorp/go-multierror"
sockaddr "github.com/hashicorp/go-sockaddr" sockaddr "github.com/hashicorp/go-sockaddr"
"github.com/miekg/dns" "github.com/miekg/dns"
) )
@ -33,16 +35,23 @@ type Memberlist struct {
sequenceNum uint32 // Local sequence number sequenceNum uint32 // Local sequence number
incarnation uint32 // Local incarnation number incarnation uint32 // Local incarnation number
numNodes uint32 // Number of known nodes (estimate) numNodes uint32 // Number of known nodes (estimate)
pushPullReq uint32 // Number of push/pull requests
config *Config config *Config
shutdown bool shutdown int32 // Used as an atomic boolean value
shutdownCh chan struct{} shutdownCh chan struct{}
leave bool leave int32 // Used as an atomic boolean value
leaveBroadcast chan struct{} leaveBroadcast chan struct{}
udpListener *net.UDPConn shutdownLock sync.Mutex // Serializes calls to Shutdown
tcpListener *net.TCPListener leaveLock sync.Mutex // Serializes calls to Leave
handoff chan msgHandoff
transport Transport
handoffCh chan struct{}
highPriorityMsgQueue *list.List
lowPriorityMsgQueue *list.List
msgQueueLock sync.Mutex
nodeLock sync.RWMutex nodeLock sync.RWMutex
nodes []*nodeState // Known nodes nodes []*nodeState // Known nodes
@ -91,25 +100,6 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
} }
} }
tcpAddr := &net.TCPAddr{IP: net.ParseIP(conf.BindAddr), Port: conf.BindPort}
tcpLn, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
return nil, fmt.Errorf("Failed to start TCP listener. Err: %s", err)
}
if conf.BindPort == 0 {
conf.BindPort = tcpLn.Addr().(*net.TCPAddr).Port
}
udpAddr := &net.UDPAddr{IP: net.ParseIP(conf.BindAddr), Port: conf.BindPort}
udpLn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
tcpLn.Close()
return nil, fmt.Errorf("Failed to start UDP listener. Err: %s", err)
}
// Set the UDP receive window size
setUDPRecvBuf(udpLn)
if conf.LogOutput != nil && conf.Logger != nil { if conf.LogOutput != nil && conf.Logger != nil {
return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.") return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.")
} }
@ -124,13 +114,65 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
logger = log.New(logDest, "", log.LstdFlags) logger = log.New(logDest, "", log.LstdFlags)
} }
// Set up a network transport by default if a custom one wasn't given
// by the config.
transport := conf.Transport
if transport == nil {
nc := &NetTransportConfig{
BindAddrs: []string{conf.BindAddr},
BindPort: conf.BindPort,
Logger: logger,
}
// See comment below for details about the retry in here.
makeNetRetry := func(limit int) (*NetTransport, error) {
var err error
for try := 0; try < limit; try++ {
var nt *NetTransport
if nt, err = NewNetTransport(nc); err == nil {
return nt, nil
}
if strings.Contains(err.Error(), "address already in use") {
logger.Printf("[DEBUG] memberlist: Got bind error: %v", err)
continue
}
}
return nil, fmt.Errorf("failed to obtain an address: %v", err)
}
// The dynamic bind port operation is inherently racy because
// even though we are using the kernel to find a port for us, we
// are attempting to bind multiple protocols (and potentially
// multiple addresses) with the same port number. We build in a
// few retries here since this often gets transient errors in
// busy unit tests.
limit := 1
if conf.BindPort == 0 {
limit = 10
}
nt, err := makeNetRetry(limit)
if err != nil {
return nil, fmt.Errorf("Could not set up network transport: %v", err)
}
if conf.BindPort == 0 {
port := nt.GetAutoBindPort()
conf.BindPort = port
conf.AdvertisePort = port
logger.Printf("[DEBUG] memberlist: Using dynamic bind port %d", port)
}
transport = nt
}
m := &Memberlist{ m := &Memberlist{
config: conf, config: conf,
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
leaveBroadcast: make(chan struct{}, 1), leaveBroadcast: make(chan struct{}, 1),
udpListener: udpLn, transport: transport,
tcpListener: tcpLn, handoffCh: make(chan struct{}, 1),
handoff: make(chan msgHandoff, conf.HandoffQueueDepth), highPriorityMsgQueue: list.New(),
lowPriorityMsgQueue: list.New(),
nodeMap: make(map[string]*nodeState), nodeMap: make(map[string]*nodeState),
nodeTimers: make(map[string]*suspicion), nodeTimers: make(map[string]*suspicion),
awareness: newAwareness(conf.AwarenessMaxMultiplier), awareness: newAwareness(conf.AwarenessMaxMultiplier),
@ -141,9 +183,9 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
m.broadcasts.NumNodes = func() int { m.broadcasts.NumNodes = func() int {
return m.estNumNodes() return m.estNumNodes()
} }
go m.tcpListen() go m.streamListen()
go m.udpListen() go m.packetListen()
go m.udpHandler() go m.packetHandler()
return m, nil return m, nil
} }
@ -187,7 +229,8 @@ func (m *Memberlist) Join(existing []string) (int, error) {
} }
for _, addr := range addrs { for _, addr := range addrs {
if err := m.pushPullNode(addr.ip, addr.port, true); err != nil { hp := joinHostPort(addr.ip.String(), addr.port)
if err := m.pushPullNode(hp, true); err != nil {
err = fmt.Errorf("Failed to join %s: %v", addr.ip, err) err = fmt.Errorf("Failed to join %s: %v", addr.ip, err)
errs = multierror.Append(errs, err) errs = multierror.Append(errs, err)
m.logger.Printf("[DEBUG] memberlist: %v", err) m.logger.Printf("[DEBUG] memberlist: %v", err)
@ -273,23 +316,17 @@ func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, err
// resolveAddr is used to resolve the address into an address, // resolveAddr is used to resolve the address into an address,
// port, and error. If no port is given, use the default // port, and error. If no port is given, use the default
func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) { func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
// Normalize the incoming string to host:port so we can apply Go's // This captures the supplied port, or the default one.
// parser to it. hostStr = ensurePort(hostStr, m.config.BindPort)
port := uint16(0)
if !hasPort(hostStr) {
hostStr += ":" + strconv.Itoa(m.config.BindPort)
}
host, sport, err := net.SplitHostPort(hostStr) host, sport, err := net.SplitHostPort(hostStr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// This will capture the supplied port, or the default one added above.
lport, err := strconv.ParseUint(sport, 10, 16) lport, err := strconv.ParseUint(sport, 10, 16)
if err != nil { if err != nil {
return nil, err return nil, err
} }
port = uint16(lport) port := uint16(lport)
// If it looks like an IP address we are done. The SplitHostPort() above // If it looks like an IP address we are done. The SplitHostPort() above
// will make sure the host part is in good shape for parsing, even for // will make sure the host part is in good shape for parsing, even for
@ -327,68 +364,30 @@ func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
// as if we received an alive notification our own network channel for // as if we received an alive notification our own network channel for
// ourself. // ourself.
func (m *Memberlist) setAlive() error { func (m *Memberlist) setAlive() error {
var advertiseAddr net.IP // Get the final advertise address from the transport, which may need
var advertisePort int // to see which address we bound to.
if m.config.AdvertiseAddr != "" { addr, port, err := m.transport.FinalAdvertiseAddr(
// If AdvertiseAddr is not empty, then advertise m.config.AdvertiseAddr, m.config.AdvertisePort)
// the given address and port.
ip := net.ParseIP(m.config.AdvertiseAddr)
if ip == nil {
return fmt.Errorf("Failed to parse advertise address!")
}
// Ensure IPv4 conversion if necessary
if ip4 := ip.To4(); ip4 != nil {
ip = ip4
}
advertiseAddr = ip
advertisePort = m.config.AdvertisePort
} else {
if m.config.BindAddr == "0.0.0.0" {
// Otherwise, if we're not bound to a specific IP, let's use a suitable
// private IP address.
var err error
m.config.AdvertiseAddr, err = sockaddr.GetPrivateIP()
if err != nil { if err != nil {
return fmt.Errorf("Failed to get interface addresses: %v", err) return fmt.Errorf("Failed to get final advertise address: %v", err)
}
if m.config.AdvertiseAddr == "" {
return fmt.Errorf("No private IP address found, and explicit IP not provided")
}
advertiseAddr = net.ParseIP(m.config.AdvertiseAddr)
if advertiseAddr == nil {
return fmt.Errorf("Failed to parse advertise address: %q", m.config.AdvertiseAddr)
}
} else {
// Use the IP that we're bound to.
addr := m.tcpListener.Addr().(*net.TCPAddr)
advertiseAddr = addr.IP
}
// Use the port we are bound to.
advertisePort = m.tcpListener.Addr().(*net.TCPAddr).Port
} }
// Check if this is a public address without encryption // Check if this is a public address without encryption
ipAddr, err := sockaddr.NewIPAddr(advertiseAddr.String()) ipAddr, err := sockaddr.NewIPAddr(addr.String())
if err != nil { if err != nil {
return fmt.Errorf("Failed to parse interface addresses: %v", err) return fmt.Errorf("Failed to parse interface addresses: %v", err)
} }
ifAddrs := []sockaddr.IfAddr{ ifAddrs := []sockaddr.IfAddr{
sockaddr.IfAddr{ sockaddr.IfAddr{
SockAddr: ipAddr, SockAddr: ipAddr,
}, },
} }
_, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs) _, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs)
if len(publicIfs) > 0 && !m.config.EncryptionEnabled() { if len(publicIfs) > 0 && !m.config.EncryptionEnabled() {
m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!") m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!")
} }
// Get the node meta data // Set any metadata from the delegate.
var meta []byte var meta []byte
if m.config.Delegate != nil { if m.config.Delegate != nil {
meta = m.config.Delegate.NodeMeta(MetaMaxSize) meta = m.config.Delegate.NodeMeta(MetaMaxSize)
@ -400,8 +399,8 @@ func (m *Memberlist) setAlive() error {
a := alive{ a := alive{
Incarnation: m.nextIncarnation(), Incarnation: m.nextIncarnation(),
Node: m.config.Name, Node: m.config.Name,
Addr: advertiseAddr, Addr: addr,
Port: uint16(advertisePort), Port: uint16(port),
Meta: meta, Meta: meta,
Vsn: []uint8{ Vsn: []uint8{
ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
@ -410,7 +409,6 @@ func (m *Memberlist) setAlive() error {
}, },
} }
m.aliveNode(&a, nil, true) m.aliveNode(&a, nil, true)
return nil return nil
} }
@ -473,13 +471,8 @@ func (m *Memberlist) UpdateNode(timeout time.Duration) error {
return nil return nil
} }
// SendTo is used to directly send a message to another node, without // SendTo is deprecated in favor of SendBestEffort, which requires a node to
// the use of the gossip mechanism. This will encode the message as a // target.
// user-data message, which a delegate will receive through NotifyMsg
// The actual data is transmitted over UDP, which means this is a
// best-effort transmission mechanism, and the maximum size of the
// message is the size of a single UDP datagram, after compression.
// This method is DEPRECATED in favor or SendToUDP
func (m *Memberlist) SendTo(to net.Addr, msg []byte) error { func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
// Encode as a user message // Encode as a user message
buf := make([]byte, 1, len(msg)+1) buf := make([]byte, 1, len(msg)+1)
@ -487,36 +480,39 @@ func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
buf = append(buf, msg...) buf = append(buf, msg...)
// Send the message // Send the message
return m.rawSendMsgUDP(to, nil, buf) return m.rawSendMsgPacket(to.String(), nil, buf)
} }
// SendToUDP is used to directly send a message to another node, without // SendToUDP is deprecated in favor of SendBestEffort.
// the use of the gossip mechanism. This will encode the message as a
// user-data message, which a delegate will receive through NotifyMsg
// The actual data is transmitted over UDP, which means this is a
// best-effort transmission mechanism, and the maximum size of the
// message is the size of a single UDP datagram, after compression
func (m *Memberlist) SendToUDP(to *Node, msg []byte) error { func (m *Memberlist) SendToUDP(to *Node, msg []byte) error {
return m.SendBestEffort(to, msg)
}
// SendToTCP is deprecated in favor of SendReliable.
func (m *Memberlist) SendToTCP(to *Node, msg []byte) error {
return m.SendReliable(to, msg)
}
// SendBestEffort uses the unreliable packet-oriented interface of the transport
// to target a user message at the given node (this does not use the gossip
// mechanism). The maximum size of the message depends on the configured
// UDPBufferSize for this memberlist instance.
func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error {
// Encode as a user message // Encode as a user message
buf := make([]byte, 1, len(msg)+1) buf := make([]byte, 1, len(msg)+1)
buf[0] = byte(userMsg) buf[0] = byte(userMsg)
buf = append(buf, msg...) buf = append(buf, msg...)
// Send the message // Send the message
destAddr := &net.UDPAddr{IP: to.Addr, Port: int(to.Port)} return m.rawSendMsgPacket(to.Address(), to, buf)
return m.rawSendMsgUDP(destAddr, to, buf)
} }
// SendToTCP is used to directly send a message to another node, without // SendReliable uses the reliable stream-oriented interface of the transport to
// the use of the gossip mechanism. This will encode the message as a // target a user message at the given node (this does not use the gossip
// user-data message, which a delegate will receive through NotifyMsg // mechanism). Delivery is guaranteed if no error is returned, and there is no
// The actual data is transmitted over TCP, which means delivery // limit on the size of the message.
// is guaranteed if no error is returned. There is no limit func (m *Memberlist) SendReliable(to *Node, msg []byte) error {
// to the size of the message return m.sendUserMsg(to.Address(), msg)
func (m *Memberlist) SendToTCP(to *Node, msg []byte) error {
// Send the message
destAddr := &net.TCPAddr{IP: to.Addr, Port: int(to.Port)}
return m.sendTCPUserMsg(destAddr, msg)
} }
// Members returns a list of all known live nodes. The node structures // Members returns a list of all known live nodes. The node structures
@ -564,18 +560,17 @@ func (m *Memberlist) NumMembers() (alive int) {
// This method is safe to call multiple times, but must not be called // This method is safe to call multiple times, but must not be called
// after the cluster is already shut down. // after the cluster is already shut down.
func (m *Memberlist) Leave(timeout time.Duration) error { func (m *Memberlist) Leave(timeout time.Duration) error {
m.nodeLock.Lock() m.leaveLock.Lock()
// We can't defer m.nodeLock.Unlock() because m.deadNode will also try to defer m.leaveLock.Unlock()
// acquire a lock so we need to Unlock before that.
if m.shutdown { if m.hasShutdown() {
m.nodeLock.Unlock()
panic("leave after shutdown") panic("leave after shutdown")
} }
if !m.leave { if !m.hasLeft() {
m.leave = true atomic.StoreInt32(&m.leave, 1)
m.nodeLock.Lock()
state, ok := m.nodeMap[m.config.Name] state, ok := m.nodeMap[m.config.Name]
m.nodeLock.Unlock() m.nodeLock.Unlock()
if !ok { if !ok {
@ -601,8 +596,6 @@ func (m *Memberlist) Leave(timeout time.Duration) error {
return fmt.Errorf("timeout waiting for leave broadcast") return fmt.Errorf("timeout waiting for leave broadcast")
} }
} }
} else {
m.nodeLock.Unlock()
} }
return nil return nil
@ -644,17 +637,55 @@ func (m *Memberlist) ProtocolVersion() uint8 {
// //
// This method is safe to call multiple times. // This method is safe to call multiple times.
func (m *Memberlist) Shutdown() error { func (m *Memberlist) Shutdown() error {
m.shutdownLock.Lock()
defer m.shutdownLock.Unlock()
if m.hasShutdown() {
return nil
}
// Shut down the transport first, which should block until it's
// completely torn down. If we kill the memberlist-side handlers
// those I/O handlers might get stuck.
if err := m.transport.Shutdown(); err != nil {
m.logger.Printf("[ERR] Failed to shutdown transport: %v", err)
}
// Now tear down everything else.
atomic.StoreInt32(&m.shutdown, 1)
close(m.shutdownCh)
m.deschedule()
return nil
}
func (m *Memberlist) hasShutdown() bool {
return atomic.LoadInt32(&m.shutdown) == 1
}
func (m *Memberlist) hasLeft() bool {
return atomic.LoadInt32(&m.leave) == 1
}
func (m *Memberlist) getNodeState(addr string) nodeStateType {
m.nodeLock.RLock()
defer m.nodeLock.RUnlock()
n := m.nodeMap[addr]
return n.State
}
func (m *Memberlist) getNodeStateChange(addr string) time.Time {
m.nodeLock.RLock()
defer m.nodeLock.RUnlock()
n := m.nodeMap[addr]
return n.StateChange
}
func (m *Memberlist) changeNode(addr string, f func(*nodeState)) {
m.nodeLock.Lock() m.nodeLock.Lock()
defer m.nodeLock.Unlock() defer m.nodeLock.Unlock()
if m.shutdown { n := m.nodeMap[addr]
return nil f(n)
}
m.shutdown = true
close(m.shutdownCh)
m.deschedule()
m.udpListener.Close()
m.tcpListener.Close()
return nil
} }

View file

@ -0,0 +1,121 @@
package memberlist
import (
"fmt"
"net"
"strconv"
"time"
)
// MockNetwork is used as a factory that produces MockTransport instances which
// are uniquely addressed and wired up to talk to each other.
type MockNetwork struct {
transports map[string]*MockTransport
port int
}
// NewTransport returns a new MockTransport with a unique address, wired up to
// talk to the other transports in the MockNetwork.
func (n *MockNetwork) NewTransport() *MockTransport {
n.port += 1
addr := fmt.Sprintf("127.0.0.1:%d", n.port)
transport := &MockTransport{
net: n,
addr: &MockAddress{addr},
packetCh: make(chan *Packet),
streamCh: make(chan net.Conn),
}
if n.transports == nil {
n.transports = make(map[string]*MockTransport)
}
n.transports[addr] = transport
return transport
}
// MockAddress is a wrapper which adds the net.Addr interface to our mock
// address scheme.
type MockAddress struct {
addr string
}
// See net.Addr.
func (a *MockAddress) Network() string {
return "mock"
}
// See net.Addr.
func (a *MockAddress) String() string {
return a.addr
}
// MockTransport directly plumbs messages to other transports its MockNetwork.
type MockTransport struct {
net *MockNetwork
addr *MockAddress
packetCh chan *Packet
streamCh chan net.Conn
}
// See Transport.
func (t *MockTransport) FinalAdvertiseAddr(string, int) (net.IP, int, error) {
host, portStr, err := net.SplitHostPort(t.addr.String())
if err != nil {
return nil, 0, err
}
ip := net.ParseIP(host)
if ip == nil {
return nil, 0, fmt.Errorf("Failed to parse IP %q", host)
}
port, err := strconv.ParseInt(portStr, 10, 16)
if err != nil {
return nil, 0, err
}
return ip, int(port), nil
}
// See Transport.
func (t *MockTransport) WriteTo(b []byte, addr string) (time.Time, error) {
dest, ok := t.net.transports[addr]
if !ok {
return time.Time{}, fmt.Errorf("No route to %q", addr)
}
now := time.Now()
dest.packetCh <- &Packet{
Buf: b,
From: t.addr,
Timestamp: now,
}
return now, nil
}
// See Transport.
func (t *MockTransport) PacketCh() <-chan *Packet {
return t.packetCh
}
// See Transport.
func (t *MockTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
dest, ok := t.net.transports[addr]
if !ok {
return nil, fmt.Errorf("No route to %q", addr)
}
p1, p2 := net.Pipe()
dest.streamCh <- p1
return p2, nil
}
// See Transport.
func (t *MockTransport) StreamCh() <-chan net.Conn {
return t.streamCh
}
// See Transport.
func (t *MockTransport) Shutdown() error {
return nil
}

View file

@ -8,9 +8,10 @@ import (
"hash/crc32" "hash/crc32"
"io" "io"
"net" "net"
"sync/atomic"
"time" "time"
"github.com/armon/go-metrics" metrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-msgpack/codec"
) )
@ -55,6 +56,7 @@ const (
encryptMsg encryptMsg
nackRespMsg nackRespMsg
hasCrcMsg hasCrcMsg
errMsg
) )
// compressionType is used to specify the compression algorithm // compressionType is used to specify the compression algorithm
@ -68,11 +70,10 @@ const (
MetaMaxSize = 512 // Maximum size for node meta data MetaMaxSize = 512 // Maximum size for node meta data
compoundHeaderOverhead = 2 // Assumed header overhead compoundHeaderOverhead = 2 // Assumed header overhead
compoundOverhead = 2 // Assumed overhead per entry in compoundHeader compoundOverhead = 2 // Assumed overhead per entry in compoundHeader
udpBufSize = 65536
udpRecvBuf = 2 * 1024 * 1024
userMsgOverhead = 1 userMsgOverhead = 1
blockingWarning = 10 * time.Millisecond // Warn if a UDP packet takes this long to process blockingWarning = 10 * time.Millisecond // Warn if a UDP packet takes this long to process
maxPushStateBytes = 10 * 1024 * 1024 maxPushStateBytes = 20 * 1024 * 1024
maxPushPullRequests = 128 // Maximum number of concurrent push/pull requests
) )
// ping request sent directly to node // ping request sent directly to node
@ -107,6 +108,11 @@ type nackResp struct {
SeqNo uint32 SeqNo uint32
} }
// err response is sent to relay the error from the remote end
type errResp struct {
Error string
}
// suspect is broadcast when we suspect a node is dead // suspect is broadcast when we suspect a node is dead
type suspect struct { type suspect struct {
Incarnation uint32 Incarnation uint32
@ -185,46 +191,45 @@ func (m *Memberlist) encryptionVersion() encryptionVersion {
} }
} }
// setUDPRecvBuf is used to resize the UDP receive window. The function // streamListen is a long running goroutine that pulls incoming streams from the
// attempts to set the read buffer to `udpRecvBuf` but backs off until // transport and hands them off for processing.
// the read buffer can be set. func (m *Memberlist) streamListen() {
func setUDPRecvBuf(c *net.UDPConn) {
size := udpRecvBuf
for { for {
if err := c.SetReadBuffer(size); err == nil { select {
break case conn := <-m.transport.StreamCh():
}
size = size / 2
}
}
// tcpListen listens for and handles incoming connections
func (m *Memberlist) tcpListen() {
for {
conn, err := m.tcpListener.AcceptTCP()
if err != nil {
if m.shutdown {
break
}
m.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %s", err)
continue
}
go m.handleConn(conn) go m.handleConn(conn)
case <-m.shutdownCh:
return
}
} }
} }
// handleConn handles a single incoming TCP connection // handleConn handles a single incoming stream connection from the transport.
func (m *Memberlist) handleConn(conn *net.TCPConn) { func (m *Memberlist) handleConn(conn net.Conn) {
m.logger.Printf("[DEBUG] memberlist: TCP connection %s", LogConn(conn)) m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn))
defer conn.Close() defer conn.Close()
metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1) metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1)
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
msgType, bufConn, dec, err := m.readTCP(conn) msgType, bufConn, dec, err := m.readStream(conn)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
m.logger.Printf("[ERR] memberlist: failed to receive: %s %s", err, LogConn(conn)) m.logger.Printf("[ERR] memberlist: failed to receive: %s %s", err, LogConn(conn))
resp := errResp{err.Error()}
out, err := encode(errMsg, &resp)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode error response: %s", err)
return
}
err = m.rawSendMsgStream(conn, out.Bytes())
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send error: %s %s", err, LogConn(conn))
return
}
} }
return return
} }
@ -235,6 +240,16 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) {
m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn)) m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn))
} }
case pushPullMsg: case pushPullMsg:
// Increment counter of pending push/pulls
numConcurrent := atomic.AddUint32(&m.pushPullReq, 1)
defer atomic.AddUint32(&m.pushPullReq, ^uint32(0))
// Check if we have too many open push/pull requests
if numConcurrent >= maxPushPullRequests {
m.logger.Printf("[ERR] memberlist: Too many pending push/pull requests")
return
}
join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec) join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec)
if err != nil { if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn)) m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn))
@ -253,7 +268,7 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) {
case pingMsg: case pingMsg:
var p ping var p ping
if err := dec.Decode(&p); err != nil { if err := dec.Decode(&p); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to decode TCP ping: %s %s", err, LogConn(conn)) m.logger.Printf("[ERR] memberlist: Failed to decode ping: %s %s", err, LogConn(conn))
return return
} }
@ -265,13 +280,13 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) {
ack := ackResp{p.SeqNo, nil} ack := ackResp{p.SeqNo, nil}
out, err := encode(ackRespMsg, &ack) out, err := encode(ackRespMsg, &ack)
if err != nil { if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode TCP ack: %s", err) m.logger.Printf("[ERR] memberlist: Failed to encode ack: %s", err)
return return
} }
err = m.rawSendMsgTCP(conn, out.Bytes()) err = m.rawSendMsgStream(conn, out.Bytes())
if err != nil { if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send TCP ack: %s %s", err, LogConn(conn)) m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogConn(conn))
return return
} }
default: default:
@ -279,49 +294,17 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) {
} }
} }
// udpListen listens for and handles incoming UDP packets // packetListen is a long running goroutine that pulls packets out of the
func (m *Memberlist) udpListen() { // transport and hands them off for processing.
var n int func (m *Memberlist) packetListen() {
var addr net.Addr
var err error
var lastPacket time.Time
for { for {
// Do a check for potentially blocking operations select {
if !lastPacket.IsZero() && time.Now().Sub(lastPacket) > blockingWarning { case packet := <-m.transport.PacketCh():
diff := time.Now().Sub(lastPacket) m.ingestPacket(packet.Buf, packet.From, packet.Timestamp)
m.logger.Printf(
"[DEBUG] memberlist: Potential blocking operation. Last command took %v", case <-m.shutdownCh:
diff) return
} }
// Create a new buffer
// TODO: Use Sync.Pool eventually
buf := make([]byte, udpBufSize)
// Read a packet
n, addr, err = m.udpListener.ReadFrom(buf)
if err != nil {
if m.shutdown {
break
}
m.logger.Printf("[ERR] memberlist: Error reading UDP packet: %s", err)
continue
}
// Capture the reception time of the packet as close to the
// system calls as possible.
lastPacket = time.Now()
// Check the length
if n < 1 {
m.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes) %s",
len(buf), LogAddress(addr))
continue
}
// Ingest this packet
metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n))
m.ingestPacket(buf[:n], addr, lastPacket)
} }
} }
@ -331,9 +314,14 @@ func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time
// Decrypt the payload // Decrypt the payload
plain, err := decryptPayload(m.config.Keyring.GetKeys(), buf, nil) plain, err := decryptPayload(m.config.Keyring.GetKeys(), buf, nil)
if err != nil { if err != nil {
if !m.config.GossipVerifyIncoming {
// Treat the message as plaintext
plain = buf
} else {
m.logger.Printf("[ERR] memberlist: Decrypt packet failed: %v %s", err, LogAddress(from)) m.logger.Printf("[ERR] memberlist: Decrypt packet failed: %v %s", err, LogAddress(from))
return return
} }
}
// Continue processing the plaintext buffer // Continue processing the plaintext buffer
buf = plain buf = plain
@ -381,24 +369,61 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim
case deadMsg: case deadMsg:
fallthrough fallthrough
case userMsg: case userMsg:
// Determine the message queue, prioritize alive
queue := m.lowPriorityMsgQueue
if msgType == aliveMsg {
queue = m.highPriorityMsgQueue
}
// Check for overflow and append if not full
m.msgQueueLock.Lock()
if queue.Len() >= m.config.HandoffQueueDepth {
m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
} else {
queue.PushBack(msgHandoff{msgType, buf, from})
}
m.msgQueueLock.Unlock()
// Notify of pending message
select { select {
case m.handoff <- msgHandoff{msgType, buf, from}: case m.handoffCh <- struct{}{}:
default: default:
m.logger.Printf("[WARN] memberlist: UDP handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
} }
default: default:
m.logger.Printf("[ERR] memberlist: UDP msg type (%d) not supported %s", msgType, LogAddress(from)) m.logger.Printf("[ERR] memberlist: msg type (%d) not supported %s", msgType, LogAddress(from))
} }
} }
// udpHandler processes messages received over UDP, but is decoupled // getNextMessage returns the next message to process in priority order, using LIFO
// from the listener to avoid blocking the listener which may cause func (m *Memberlist) getNextMessage() (msgHandoff, bool) {
// ping/ack messages to be delayed. m.msgQueueLock.Lock()
func (m *Memberlist) udpHandler() { defer m.msgQueueLock.Unlock()
if el := m.highPriorityMsgQueue.Back(); el != nil {
m.highPriorityMsgQueue.Remove(el)
msg := el.Value.(msgHandoff)
return msg, true
} else if el := m.lowPriorityMsgQueue.Back(); el != nil {
m.lowPriorityMsgQueue.Remove(el)
msg := el.Value.(msgHandoff)
return msg, true
}
return msgHandoff{}, false
}
// packetHandler is a long running goroutine that processes messages received
// over the packet interface, but is decoupled from the listener to avoid
// blocking the listener which may cause ping/ack messages to be delayed.
func (m *Memberlist) packetHandler() {
for { for {
select { select {
case msg := <-m.handoff: case <-m.handoffCh:
for {
msg, ok := m.getNextMessage()
if !ok {
break
}
msgType := msg.msgType msgType := msg.msgType
buf := msg.buf buf := msg.buf
from := msg.from from := msg.from
@ -413,7 +438,8 @@ func (m *Memberlist) udpHandler() {
case userMsg: case userMsg:
m.handleUser(buf, from) m.handleUser(buf, from)
default: default:
m.logger.Printf("[ERR] memberlist: UDP msg type (%d) not supported %s (handler)", msgType, LogAddress(from)) m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from))
}
} }
case <-m.shutdownCh: case <-m.shutdownCh:
@ -457,7 +483,7 @@ func (m *Memberlist) handlePing(buf []byte, from net.Addr) {
if m.config.Ping != nil { if m.config.Ping != nil {
ack.Payload = m.config.Ping.AckPayload() ack.Payload = m.config.Ping.AckPayload()
} }
if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil { if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogAddress(from)) m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogAddress(from))
} }
} }
@ -478,7 +504,6 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
// Send a ping to the correct host. // Send a ping to the correct host.
localSeqNo := m.nextSeqNo() localSeqNo := m.nextSeqNo()
ping := ping{SeqNo: localSeqNo, Node: ind.Node} ping := ping{SeqNo: localSeqNo, Node: ind.Node}
destAddr := &net.UDPAddr{IP: ind.Target, Port: int(ind.Port)}
// Setup a response handler to relay the ack // Setup a response handler to relay the ack
cancelCh := make(chan struct{}) cancelCh := make(chan struct{})
@ -488,14 +513,15 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
// Forward the ack back to the requestor. // Forward the ack back to the requestor.
ack := ackResp{ind.SeqNo, nil} ack := ackResp{ind.SeqNo, nil}
if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil { if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogAddress(from)) m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogAddress(from))
} }
} }
m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout) m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout)
// Send the ping. // Send the ping.
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil { addr := joinHostPort(net.IP(ind.Target).String(), ind.Port)
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from)) m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from))
} }
@ -507,7 +533,7 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
return return
case <-time.After(m.config.ProbeTimeout): case <-time.After(m.config.ProbeTimeout):
nack := nackResp{ind.SeqNo} nack := nackResp{ind.SeqNo}
if err := m.encodeAndSendMsg(from, nackRespMsg, &nack); err != nil { if err := m.encodeAndSendMsg(from.String(), nackRespMsg, &nack); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogAddress(from)) m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogAddress(from))
} }
} }
@ -589,30 +615,30 @@ func (m *Memberlist) handleCompressed(buf []byte, from net.Addr, timestamp time.
} }
// encodeAndSendMsg is used to combine the encoding and sending steps // encodeAndSendMsg is used to combine the encoding and sending steps
func (m *Memberlist) encodeAndSendMsg(to net.Addr, msgType messageType, msg interface{}) error { func (m *Memberlist) encodeAndSendMsg(addr string, msgType messageType, msg interface{}) error {
out, err := encode(msgType, msg) out, err := encode(msgType, msg)
if err != nil { if err != nil {
return err return err
} }
if err := m.sendMsg(to, out.Bytes()); err != nil { if err := m.sendMsg(addr, out.Bytes()); err != nil {
return err return err
} }
return nil return nil
} }
// sendMsg is used to send a UDP message to another host. It will opportunistically // sendMsg is used to send a message via packet to another host. It will
// create a compoundMsg and piggy back other broadcasts // opportunistically create a compoundMsg and piggy back other broadcasts.
func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error { func (m *Memberlist) sendMsg(addr string, msg []byte) error {
// Check if we can piggy back any messages // Check if we can piggy back any messages
bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead
if m.config.EncryptionEnabled() { if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
bytesAvail -= encryptOverhead(m.encryptionVersion()) bytesAvail -= encryptOverhead(m.encryptionVersion())
} }
extra := m.getBroadcasts(compoundOverhead, bytesAvail) extra := m.getBroadcasts(compoundOverhead, bytesAvail)
// Fast path if nothing to piggypack // Fast path if nothing to piggypack
if len(extra) == 0 { if len(extra) == 0 {
return m.rawSendMsgUDP(to, nil, msg) return m.rawSendMsgPacket(addr, nil, msg)
} }
// Join all the messages // Join all the messages
@ -624,11 +650,12 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error {
compound := makeCompoundMessage(msgs) compound := makeCompoundMessage(msgs)
// Send the message // Send the message
return m.rawSendMsgUDP(to, nil, compound.Bytes()) return m.rawSendMsgPacket(addr, nil, compound.Bytes())
} }
// rawSendMsgUDP is used to send a UDP message to another host without modification // rawSendMsgPacket is used to send message via packet to another host without
func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error { // modification, other than compression or encryption if enabled.
func (m *Memberlist) rawSendMsgPacket(addr string, node *Node, msg []byte) error {
// Check if we have compression enabled // Check if we have compression enabled
if m.config.EnableCompression { if m.config.EnableCompression {
buf, err := compressPayload(msg) buf, err := compressPayload(msg)
@ -644,9 +671,9 @@ func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error
// Try to look up the destination node // Try to look up the destination node
if node == nil { if node == nil {
toAddr, _, err := net.SplitHostPort(addr.String()) toAddr, _, err := net.SplitHostPort(addr)
if err != nil { if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr.String(), err) m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr, err)
return err return err
} }
m.nodeLock.RLock() m.nodeLock.RLock()
@ -668,7 +695,7 @@ func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error
} }
// Check if we have encryption enabled // Check if we have encryption enabled
if m.config.EncryptionEnabled() { if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
// Encrypt the payload // Encrypt the payload
var buf bytes.Buffer var buf bytes.Buffer
primaryKey := m.config.Keyring.GetPrimaryKey() primaryKey := m.config.Keyring.GetPrimaryKey()
@ -681,12 +708,13 @@ func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error
} }
metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg))) metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg)))
_, err := m.udpListener.WriteTo(msg, addr) _, err := m.transport.WriteTo(msg, addr)
return err return err
} }
// rawSendMsgTCP is used to send a TCP message to another host without modification // rawSendMsgStream is used to stream a message to another host without
func (m *Memberlist) rawSendMsgTCP(conn net.Conn, sendBuf []byte) error { // modification, other than applying compression and encryption if enabled.
func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte) error {
// Check if compresion is enabled // Check if compresion is enabled
if m.config.EnableCompression { if m.config.EnableCompression {
compBuf, err := compressPayload(sendBuf) compBuf, err := compressPayload(sendBuf)
@ -698,7 +726,7 @@ func (m *Memberlist) rawSendMsgTCP(conn net.Conn, sendBuf []byte) error {
} }
// Check if encryption is enabled // Check if encryption is enabled
if m.config.EncryptionEnabled() { if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
crypt, err := m.encryptLocalState(sendBuf) crypt, err := m.encryptLocalState(sendBuf)
if err != nil { if err != nil {
m.logger.Printf("[ERROR] memberlist: Failed to encrypt local state: %v", err) m.logger.Printf("[ERROR] memberlist: Failed to encrypt local state: %v", err)
@ -719,43 +747,36 @@ func (m *Memberlist) rawSendMsgTCP(conn net.Conn, sendBuf []byte) error {
return nil return nil
} }
// sendTCPUserMsg is used to send a TCP userMsg to another host // sendUserMsg is used to stream a user message to another host.
func (m *Memberlist) sendTCPUserMsg(to net.Addr, sendBuf []byte) error { func (m *Memberlist) sendUserMsg(addr string, sendBuf []byte) error {
dialer := net.Dialer{Timeout: m.config.TCPTimeout} conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
conn, err := dialer.Dial("tcp", to.String())
if err != nil { if err != nil {
return err return err
} }
defer conn.Close() defer conn.Close()
bufConn := bytes.NewBuffer(nil) bufConn := bytes.NewBuffer(nil)
if err := bufConn.WriteByte(byte(userMsg)); err != nil { if err := bufConn.WriteByte(byte(userMsg)); err != nil {
return err return err
} }
// Send our node state
header := userMsgHeader{UserMsgLen: len(sendBuf)} header := userMsgHeader{UserMsgLen: len(sendBuf)}
hd := codec.MsgpackHandle{} hd := codec.MsgpackHandle{}
enc := codec.NewEncoder(bufConn, &hd) enc := codec.NewEncoder(bufConn, &hd)
if err := enc.Encode(&header); err != nil { if err := enc.Encode(&header); err != nil {
return err return err
} }
if _, err := bufConn.Write(sendBuf); err != nil { if _, err := bufConn.Write(sendBuf); err != nil {
return err return err
} }
return m.rawSendMsgStream(conn, bufConn.Bytes())
return m.rawSendMsgTCP(conn, bufConn.Bytes())
} }
// sendAndReceiveState is used to initiate a push/pull over TCP with a remote node // sendAndReceiveState is used to initiate a push/pull over a stream with a
func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([]pushNodeState, []byte, error) { // remote host.
func (m *Memberlist) sendAndReceiveState(addr string, join bool) ([]pushNodeState, []byte, error) {
// Attempt to connect // Attempt to connect
dialer := net.Dialer{Timeout: m.config.TCPTimeout} conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
dest := net.TCPAddr{IP: addr, Port: int(port)}
conn, err := dialer.Dial("tcp", dest.String())
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -769,11 +790,19 @@ func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([
} }
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
msgType, bufConn, dec, err := m.readTCP(conn) msgType, bufConn, dec, err := m.readStream(conn)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
if msgType == errMsg {
var resp errResp
if err := dec.Decode(&resp); err != nil {
return nil, nil, err
}
return nil, nil, fmt.Errorf("remote error: %v", resp.Error)
}
// Quit if not push/pull // Quit if not push/pull
if msgType != pushPullMsg { if msgType != pushPullMsg {
err := fmt.Errorf("received invalid msgType (%d), expected pushPullMsg (%d) %s", msgType, pushPullMsg, LogConn(conn)) err := fmt.Errorf("received invalid msgType (%d), expected pushPullMsg (%d) %s", msgType, pushPullMsg, LogConn(conn))
@ -785,7 +814,7 @@ func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([
return remoteNodes, userState, err return remoteNodes, userState, err
} }
// sendLocalState is invoked to send our local state over a tcp connection // sendLocalState is invoked to send our local state over a stream connection.
func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error { func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error {
// Setup a deadline // Setup a deadline
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
@ -843,7 +872,7 @@ func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error {
} }
// Get the send buffer // Get the send buffer
return m.rawSendMsgTCP(conn, bufConn.Bytes()) return m.rawSendMsgStream(conn, bufConn.Bytes())
} }
// encryptLocalState is used to help encrypt local state before sending // encryptLocalState is used to help encrypt local state before sending
@ -901,9 +930,9 @@ func (m *Memberlist) decryptRemoteState(bufConn io.Reader) ([]byte, error) {
return decryptPayload(keys, cipherBytes, dataBytes) return decryptPayload(keys, cipherBytes, dataBytes)
} }
// readTCP is used to read the start of a TCP stream. // readStream is used to read from a stream connection, decrypting and
// it decrypts and decompresses the stream if necessary // decompressing the stream if necessary.
func (m *Memberlist) readTCP(conn net.Conn) (messageType, io.Reader, *codec.Decoder, error) { func (m *Memberlist) readStream(conn net.Conn) (messageType, io.Reader, *codec.Decoder, error) {
// Created a buffered reader // Created a buffered reader
var bufConn io.Reader = bufio.NewReader(conn) var bufConn io.Reader = bufio.NewReader(conn)
@ -929,7 +958,7 @@ func (m *Memberlist) readTCP(conn net.Conn) (messageType, io.Reader, *codec.Deco
// Reset message type and bufConn // Reset message type and bufConn
msgType = messageType(plain[0]) msgType = messageType(plain[0])
bufConn = bytes.NewReader(plain[1:]) bufConn = bytes.NewReader(plain[1:])
} else if m.config.EncryptionEnabled() { } else if m.config.EncryptionEnabled() && m.config.GossipVerifyIncoming {
return 0, nil, nil, return 0, nil, nil,
fmt.Errorf("Encryption is configured but remote state is not encrypted") fmt.Errorf("Encryption is configured but remote state is not encrypted")
} }
@ -1044,7 +1073,7 @@ func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, us
return nil return nil
} }
// readUserMsg is used to decode a userMsg from a TCP stream // readUserMsg is used to decode a userMsg from a stream.
func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error { func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
// Read the user message header // Read the user message header
var header userMsgHeader var header userMsgHeader
@ -1075,13 +1104,12 @@ func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
return nil return nil
} }
// sendPingAndWaitForAck makes a TCP connection to the given address, sends // sendPingAndWaitForAck makes a stream connection to the given address, sends
// a ping, and waits for an ack. All of this is done as a series of blocking // a ping, and waits for an ack. All of this is done as a series of blocking
// operations, given the deadline. The bool return parameter is true if we // operations, given the deadline. The bool return parameter is true if we
// we able to round trip a ping to the other node. // we able to round trip a ping to the other node.
func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadline time.Time) (bool, error) { func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time.Time) (bool, error) {
dialer := net.Dialer{Deadline: deadline} conn, err := m.transport.DialTimeout(addr, deadline.Sub(time.Now()))
conn, err := dialer.Dial("tcp", destAddr.String())
if err != nil { if err != nil {
// If the node is actually dead we expect this to fail, so we // If the node is actually dead we expect this to fail, so we
// shouldn't spam the logs with it. After this point, errors // shouldn't spam the logs with it. After this point, errors
@ -1097,17 +1125,17 @@ func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadlin
return false, err return false, err
} }
if err = m.rawSendMsgTCP(conn, out.Bytes()); err != nil { if err = m.rawSendMsgStream(conn, out.Bytes()); err != nil {
return false, err return false, err
} }
msgType, _, dec, err := m.readTCP(conn) msgType, _, dec, err := m.readStream(conn)
if err != nil { if err != nil {
return false, err return false, err
} }
if msgType != ackRespMsg { if msgType != ackRespMsg {
return false, fmt.Errorf("Unexpected msgType (%d) from TCP ping %s", msgType, LogConn(conn)) return false, fmt.Errorf("Unexpected msgType (%d) from ping %s", msgType, LogConn(conn))
} }
var ack ackResp var ack ackResp
@ -1116,7 +1144,7 @@ func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadlin
} }
if ack.SeqNo != ping.SeqNo { if ack.SeqNo != ping.SeqNo {
return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d) from TCP ping %s", ack.SeqNo, ping.SeqNo, LogConn(conn)) return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo)
} }
return true, nil return true, nil

289
vendor/github.com/hashicorp/memberlist/net_transport.go generated vendored Normal file
View file

@ -0,0 +1,289 @@
package memberlist
import (
"fmt"
"log"
"net"
"sync"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
sockaddr "github.com/hashicorp/go-sockaddr"
)
const (
// udpPacketBufSize is used to buffer incoming packets during read
// operations.
udpPacketBufSize = 65536
// udpRecvBufSize is a large buffer size that we attempt to set UDP
// sockets to in order to handle a large volume of messages.
udpRecvBufSize = 2 * 1024 * 1024
)
// NetTransportConfig is used to configure a net transport.
type NetTransportConfig struct {
// BindAddrs is a list of addresses to bind to for both TCP and UDP
// communications.
BindAddrs []string
// BindPort is the port to listen on, for each address above.
BindPort int
// Logger is a logger for operator messages.
Logger *log.Logger
}
// NetTransport is a Transport implementation that uses connectionless UDP for
// packet operations, and ad-hoc TCP connections for stream operations.
type NetTransport struct {
config *NetTransportConfig
packetCh chan *Packet
streamCh chan net.Conn
logger *log.Logger
wg sync.WaitGroup
tcpListeners []*net.TCPListener
udpListeners []*net.UDPConn
shutdown int32
}
// NewNetTransport returns a net transport with the given configuration. On
// success all the network listeners will be created and listening.
func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) {
// If we reject the empty list outright we can assume that there's at
// least one listener of each type later during operation.
if len(config.BindAddrs) == 0 {
return nil, fmt.Errorf("At least one bind address is required")
}
// Build out the new transport.
var ok bool
t := NetTransport{
config: config,
packetCh: make(chan *Packet),
streamCh: make(chan net.Conn),
logger: config.Logger,
}
// Clean up listeners if there's an error.
defer func() {
if !ok {
t.Shutdown()
}
}()
// Build all the TCP and UDP listeners.
port := config.BindPort
for _, addr := range config.BindAddrs {
ip := net.ParseIP(addr)
tcpAddr := &net.TCPAddr{IP: ip, Port: port}
tcpLn, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
return nil, fmt.Errorf("Failed to start TCP listener on %q port %d: %v", addr, port, err)
}
t.tcpListeners = append(t.tcpListeners, tcpLn)
// If the config port given was zero, use the first TCP listener
// to pick an available port and then apply that to everything
// else.
if port == 0 {
port = tcpLn.Addr().(*net.TCPAddr).Port
}
udpAddr := &net.UDPAddr{IP: ip, Port: port}
udpLn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
return nil, fmt.Errorf("Failed to start UDP listener on %q port %d: %v", addr, port, err)
}
if err := setUDPRecvBuf(udpLn); err != nil {
return nil, fmt.Errorf("Failed to resize UDP buffer: %v", err)
}
t.udpListeners = append(t.udpListeners, udpLn)
}
// Fire them up now that we've been able to create them all.
for i := 0; i < len(config.BindAddrs); i++ {
t.wg.Add(2)
go t.tcpListen(t.tcpListeners[i])
go t.udpListen(t.udpListeners[i])
}
ok = true
return &t, nil
}
// GetAutoBindPort returns the bind port that was automatically given by the
// kernel, if a bind port of 0 was given.
func (t *NetTransport) GetAutoBindPort() int {
// We made sure there's at least one TCP listener, and that one's
// port was applied to all the others for the dynamic bind case.
return t.tcpListeners[0].Addr().(*net.TCPAddr).Port
}
// See Transport.
func (t *NetTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, error) {
var advertiseAddr net.IP
var advertisePort int
if ip != "" {
// If they've supplied an address, use that.
advertiseAddr = net.ParseIP(ip)
if advertiseAddr == nil {
return nil, 0, fmt.Errorf("Failed to parse advertise address %q", ip)
}
// Ensure IPv4 conversion if necessary.
if ip4 := advertiseAddr.To4(); ip4 != nil {
advertiseAddr = ip4
}
advertisePort = port
} else {
if t.config.BindAddrs[0] == "0.0.0.0" {
// Otherwise, if we're not bound to a specific IP, let's
// use a suitable private IP address.
var err error
ip, err = sockaddr.GetPrivateIP()
if err != nil {
return nil, 0, fmt.Errorf("Failed to get interface addresses: %v", err)
}
if ip == "" {
return nil, 0, fmt.Errorf("No private IP address found, and explicit IP not provided")
}
advertiseAddr = net.ParseIP(ip)
if advertiseAddr == nil {
return nil, 0, fmt.Errorf("Failed to parse advertise address: %q", ip)
}
} else {
// Use the IP that we're bound to, based on the first
// TCP listener, which we already ensure is there.
advertiseAddr = t.tcpListeners[0].Addr().(*net.TCPAddr).IP
}
// Use the port we are bound to.
advertisePort = t.GetAutoBindPort()
}
return advertiseAddr, advertisePort, nil
}
// See Transport.
func (t *NetTransport) WriteTo(b []byte, addr string) (time.Time, error) {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return time.Time{}, err
}
// We made sure there's at least one UDP listener, so just use the
// packet sending interface on the first one. Take the time after the
// write call comes back, which will underestimate the time a little,
// but help account for any delays before the write occurs.
_, err = t.udpListeners[0].WriteTo(b, udpAddr)
return time.Now(), err
}
// See Transport.
func (t *NetTransport) PacketCh() <-chan *Packet {
return t.packetCh
}
// See Transport.
func (t *NetTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
dialer := net.Dialer{Timeout: timeout}
return dialer.Dial("tcp", addr)
}
// See Transport.
func (t *NetTransport) StreamCh() <-chan net.Conn {
return t.streamCh
}
// See Transport.
func (t *NetTransport) Shutdown() error {
// This will avoid log spam about errors when we shut down.
atomic.StoreInt32(&t.shutdown, 1)
// Rip through all the connections and shut them down.
for _, conn := range t.tcpListeners {
conn.Close()
}
for _, conn := range t.udpListeners {
conn.Close()
}
// Block until all the listener threads have died.
t.wg.Wait()
return nil
}
// tcpListen is a long running goroutine that accepts incoming TCP connections
// and hands them off to the stream channel.
func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) {
defer t.wg.Done()
for {
conn, err := tcpLn.AcceptTCP()
if err != nil {
if s := atomic.LoadInt32(&t.shutdown); s == 1 {
break
}
t.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %v", err)
continue
}
t.streamCh <- conn
}
}
// udpListen is a long running goroutine that accepts incoming UDP packets and
// hands them off to the packet channel.
func (t *NetTransport) udpListen(udpLn *net.UDPConn) {
defer t.wg.Done()
for {
// Do a blocking read into a fresh buffer. Grab a time stamp as
// close as possible to the I/O.
buf := make([]byte, udpPacketBufSize)
n, addr, err := udpLn.ReadFrom(buf)
ts := time.Now()
if err != nil {
if s := atomic.LoadInt32(&t.shutdown); s == 1 {
break
}
t.logger.Printf("[ERR] memberlist: Error reading UDP packet: %v", err)
continue
}
// Check the length - it needs to have at least one byte to be a
// proper message.
if n < 1 {
t.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes) %s",
len(buf), LogAddress(addr))
continue
}
// Ingest the packet.
metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n))
t.packetCh <- &Packet{
Buf: buf[:n],
From: addr,
Timestamp: ts,
}
}
}
// setUDPRecvBuf is used to resize the UDP receive window. The function
// attempts to set the read buffer to `udpRecvBuf` but backs off until
// the read buffer can be set.
func setUDPRecvBuf(c *net.UDPConn) error {
size := udpRecvBufSize
var err error
for size > 0 {
if err = c.SetReadBuffer(size); err == nil {
return nil
}
size = size / 2
}
return err
}

View file

@ -27,6 +27,26 @@ type limitedBroadcast struct {
transmits int // Number of transmissions attempted. transmits int // Number of transmissions attempted.
b Broadcast b Broadcast
} }
// for testing; emits in transmit order if reverse=false
func (q *TransmitLimitedQueue) orderedView(reverse bool) []*limitedBroadcast {
q.Lock()
defer q.Unlock()
out := make([]*limitedBroadcast, 0, len(q.bcQueue))
if reverse {
for i := 0; i < len(q.bcQueue); i++ {
out = append(out, q.bcQueue[i])
}
} else {
for i := len(q.bcQueue) - 1; i >= 0; i-- {
out = append(out, q.bcQueue[i])
}
}
return out
}
type limitedBroadcasts []*limitedBroadcast type limitedBroadcasts []*limitedBroadcast
// Broadcast is something that can be broadcasted via gossip to // Broadcast is something that can be broadcasted via gossip to

View file

@ -34,6 +34,17 @@ type Node struct {
DCur uint8 // Current version delegate is speaking DCur uint8 // Current version delegate is speaking
} }
// Address returns the host:port form of a node's address, suitable for use
// with a transport.
func (n *Node) Address() string {
return joinHostPort(n.Addr.String(), n.Port)
}
// String returns the node name
func (n *Node) String() string {
return n.Name
}
// NodeState is used to manage our state view of another node // NodeState is used to manage our state view of another node
type nodeState struct { type nodeState struct {
Node Node
@ -42,6 +53,12 @@ type nodeState struct {
StateChange time.Time // Time last state change happened StateChange time.Time // Time last state change happened
} }
// Address returns the host:port form of a node's address, suitable for use
// with a transport.
func (n *nodeState) Address() string {
return n.Node.Address()
}
// ackHandler is used to register handlers for incoming acks and nacks. // ackHandler is used to register handlers for incoming acks and nacks.
type ackHandler struct { type ackHandler struct {
ackFn func([]byte, time.Time) ackFn func([]byte, time.Time)
@ -216,6 +233,15 @@ START:
m.probeNode(&node) m.probeNode(&node)
} }
// probeNodeByAddr just safely calls probeNode given only the address of the node (for tests)
func (m *Memberlist) probeNodeByAddr(addr string) {
m.nodeLock.RLock()
n := m.nodeMap[addr]
m.nodeLock.RUnlock()
m.probeNode(n)
}
// probeNode handles a single round of failure checking on a node. // probeNode handles a single round of failure checking on a node.
func (m *Memberlist) probeNode(node *nodeState) { func (m *Memberlist) probeNode(node *nodeState) {
defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now()) defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
@ -234,13 +260,20 @@ func (m *Memberlist) probeNode(node *nodeState) {
nackCh := make(chan struct{}, m.config.IndirectChecks+1) nackCh := make(chan struct{}, m.config.IndirectChecks+1)
m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval) m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)
// Mark the sent time here, which should be after any pre-processing but
// before system calls to do the actual send. This probably over-reports
// a bit, but it's the best we can do. We had originally put this right
// after the I/O, but that would sometimes give negative RTT measurements
// which was not desirable.
sent := time.Now()
// Send a ping to the node. If this node looks like it's suspect or dead, // Send a ping to the node. If this node looks like it's suspect or dead,
// also tack on a suspect message so that it has a chance to refute as // also tack on a suspect message so that it has a chance to refute as
// soon as possible. // soon as possible.
deadline := time.Now().Add(probeInterval) deadline := sent.Add(probeInterval)
destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)} addr := node.Address()
if node.State == stateAlive { if node.State == stateAlive {
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil { if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err) m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
return return
} }
@ -261,17 +294,12 @@ func (m *Memberlist) probeNode(node *nodeState) {
} }
compound := makeCompoundMessage(msgs) compound := makeCompoundMessage(msgs)
if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil { if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", destAddr, err) m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err)
return return
} }
} }
// Mark the sent time here, which should be after any pre-processing and
// system calls to do the actual send. This probably under-reports a bit,
// but it's the best we can do.
sent := time.Now()
// Arrange for our self-awareness to get updated. At this point we've // Arrange for our self-awareness to get updated. At this point we've
// sent the ping, so any return statement means the probe succeeded // sent the ping, so any return statement means the probe succeeded
// which will improve our health until we get to the failure scenarios // which will improve our health until we get to the failure scenarios
@ -305,7 +333,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
// probe interval it will give the TCP fallback more time, which // probe interval it will give the TCP fallback more time, which
// is more active in dealing with lost packets, and it gives more // is more active in dealing with lost packets, and it gives more
// time to wait for indirect acks/nacks. // time to wait for indirect acks/nacks.
m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node.Name) m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name)
} }
// Get some random live nodes. // Get some random live nodes.
@ -327,8 +355,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
expectedNacks++ expectedNacks++
} }
destAddr := &net.UDPAddr{IP: peer.Addr, Port: int(peer.Port)} if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil {
if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err) m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
} }
} }
@ -345,12 +372,11 @@ func (m *Memberlist) probeNode(node *nodeState) {
// config option to turn this off if desired. // config option to turn this off if desired.
fallbackCh := make(chan bool, 1) fallbackCh := make(chan bool, 1)
if (!m.config.DisableTcpPings) && (node.PMax >= 3) { if (!m.config.DisableTcpPings) && (node.PMax >= 3) {
destAddr := &net.TCPAddr{IP: node.Addr, Port: int(node.Port)}
go func() { go func() {
defer close(fallbackCh) defer close(fallbackCh)
didContact, err := m.sendPingAndWaitForAck(destAddr, ping, deadline) didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline)
if err != nil { if err != nil {
m.logger.Printf("[ERR] memberlist: Failed TCP fallback ping: %s", err) m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
} else { } else {
fallbackCh <- didContact fallbackCh <- didContact
} }
@ -375,7 +401,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
// any additional time here. // any additional time here.
for didContact := range fallbackCh { for didContact := range fallbackCh {
if didContact { if didContact {
m.logger.Printf("[WARN] memberlist: Was able to reach %s via TCP but not UDP, network may be misconfigured and not allowing bidirectional UDP", node.Name) m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
return return
} }
} }
@ -390,7 +416,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
awarenessDelta = 0 awarenessDelta = 0
if expectedNacks > 0 { if expectedNacks > 0 {
if nackCount := len(nackCh); nackCount < expectedNacks { if nackCount := len(nackCh); nackCount < expectedNacks {
awarenessDelta += 2 * (expectedNacks - nackCount) awarenessDelta += (expectedNacks - nackCount)
} }
} else { } else {
awarenessDelta += 1 awarenessDelta += 1
@ -410,7 +436,7 @@ func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval) m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval)
// Send a ping to the node. // Send a ping to the node.
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil { if err := m.encodeAndSendMsg(addr.String(), pingMsg, &ping); err != nil {
return 0, err return 0, err
} }
@ -496,18 +522,17 @@ func (m *Memberlist) gossip() {
return return
} }
destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)} addr := node.Address()
if len(msgs) == 1 { if len(msgs) == 1 {
// Send single message as is // Send single message as is
if err := m.rawSendMsgUDP(destAddr, &node.Node, msgs[0]); err != nil { if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err) m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
} }
} else { } else {
// Otherwise create and send a compound message // Otherwise create and send a compound message
compound := makeCompoundMessage(msgs) compound := makeCompoundMessage(msgs)
if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil { if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err) m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
} }
} }
} }
@ -533,17 +558,17 @@ func (m *Memberlist) pushPull() {
node := nodes[0] node := nodes[0]
// Attempt a push pull // Attempt a push pull
if err := m.pushPullNode(node.Addr, node.Port, false); err != nil { if err := m.pushPullNode(node.Address(), false); err != nil {
m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err) m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
} }
} }
// pushPullNode does a complete state exchange with a specific node. // pushPullNode does a complete state exchange with a specific node.
func (m *Memberlist) pushPullNode(addr []byte, port uint16, join bool) error { func (m *Memberlist) pushPullNode(addr string, join bool) error {
defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now()) defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
// Attempt to send and receive with the node // Attempt to send and receive with the node
remote, userState, err := m.sendAndReceiveState(addr, port, join) remote, userState, err := m.sendAndReceiveState(addr, join)
if err != nil { if err != nil {
return err return err
} }
@ -821,7 +846,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
// in-queue to be processed but blocked by the locks above. If we let // in-queue to be processed but blocked by the locks above. If we let
// that aliveMsg process, it'll cause us to re-join the cluster. This // that aliveMsg process, it'll cause us to re-join the cluster. This
// ensures that we don't. // ensures that we don't.
if m.leave && a.Node == m.config.Name { if m.hasLeft() && a.Node == m.config.Name {
return return
} }
@ -1097,7 +1122,7 @@ func (m *Memberlist) deadNode(d *dead) {
// Check if this is us // Check if this is us
if state.Name == m.config.Name { if state.Name == m.config.Name {
// If we are not leaving we need to refute // If we are not leaving we need to refute
if !m.leave { if !m.hasLeft() {
m.refute(state, d.Incarnation) m.refute(state, d.Incarnation)
m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From) m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From)
return // Do not mark ourself dead return // Do not mark ourself dead

View file

@ -117,7 +117,7 @@ func (s *suspicion) Confirm(from string) bool {
// stop the timer then we will call the timeout function directly from // stop the timer then we will call the timeout function directly from
// here. // here.
n := atomic.AddInt32(&s.n, 1) n := atomic.AddInt32(&s.n, 1)
elapsed := time.Now().Sub(s.start) elapsed := time.Since(s.start)
remaining := remainingSuspicionTime(n, s.k, elapsed, s.min, s.max) remaining := remainingSuspicionTime(n, s.k, elapsed, s.min, s.max)
if s.timer.Stop() { if s.timer.Stop() {
if remaining > 0 { if remaining > 0 {

16
vendor/github.com/hashicorp/memberlist/tag.sh generated vendored Executable file
View file

@ -0,0 +1,16 @@
#!/usr/bin/env bash
set -e
# The version must be supplied from the environment. Do not include the
# leading "v".
if [ -z $VERSION ]; then
echo "Please specify a version."
exit 1
fi
# Generate the tag.
echo "==> Tagging version $VERSION..."
git commit --allow-empty -a --gpg-sign=348FFC4C -m "Release v$VERSION"
git tag -a -m "Version $VERSION" -s -u 348FFC4C "v${VERSION}" master
exit 0

65
vendor/github.com/hashicorp/memberlist/transport.go generated vendored Normal file
View file

@ -0,0 +1,65 @@
package memberlist
import (
"net"
"time"
)
// Packet is used to provide some metadata about incoming packets from peers
// over a packet connection, as well as the packet payload.
type Packet struct {
// Buf has the raw contents of the packet.
Buf []byte
// From has the address of the peer. This is an actual net.Addr so we
// can expose some concrete details about incoming packets.
From net.Addr
// Timestamp is the time when the packet was received. This should be
// taken as close as possible to the actual receipt time to help make an
// accurate RTT measurement during probes.
Timestamp time.Time
}
// Transport is used to abstract over communicating with other peers. The packet
// interface is assumed to be best-effort and the stream interface is assumed to
// be reliable.
type Transport interface {
// FinalAdvertiseAddr is given the user's configured values (which
// might be empty) and returns the desired IP and port to advertise to
// the rest of the cluster.
FinalAdvertiseAddr(ip string, port int) (net.IP, int, error)
// WriteTo is a packet-oriented interface that fires off the given
// payload to the given address in a connectionless fashion. This should
// return a time stamp that's as close as possible to when the packet
// was transmitted to help make accurate RTT measurements during probes.
//
// This is similar to net.PacketConn, though we didn't want to expose
// that full set of required methods to keep assumptions about the
// underlying plumbing to a minimum. We also treat the address here as a
// string, similar to Dial, so it's network neutral, so this usually is
// in the form of "host:port".
WriteTo(b []byte, addr string) (time.Time, error)
// PacketCh returns a channel that can be read to receive incoming
// packets from other peers. How this is set up for listening is left as
// an exercise for the concrete transport implementations.
PacketCh() <-chan *Packet
// DialTimeout is used to create a connection that allows us to perform
// two-way communication with a peer. This is generally more expensive
// than packet connections so is used for more infrequent operations
// such as anti-entropy or fallback probes if the packet-oriented probe
// failed.
DialTimeout(addr string, timeout time.Duration) (net.Conn, error)
// StreamCh returns a channel that can be read to handle incoming stream
// connections from other peers. How this is set up for listening is
// left as an exercise for the concrete transport implementations.
StreamCh() <-chan net.Conn
// Shutdown is called when memberlist is shutting down; this gives the
// transport a chance to clean up any listeners.
Shutdown() error
}

View file

@ -8,6 +8,8 @@ import (
"io" "io"
"math" "math"
"math/rand" "math/rand"
"net"
"strconv"
"strings" "strings"
"time" "time"
@ -76,10 +78,9 @@ func retransmitLimit(retransmitMult, n int) int {
// shuffleNodes randomly shuffles the input nodes using the Fisher-Yates shuffle // shuffleNodes randomly shuffles the input nodes using the Fisher-Yates shuffle
func shuffleNodes(nodes []*nodeState) { func shuffleNodes(nodes []*nodeState) {
n := len(nodes) n := len(nodes)
for i := n - 1; i > 0; i-- { rand.Shuffle(n, func(i, j int) {
j := rand.Intn(i + 1)
nodes[i], nodes[j] = nodes[j], nodes[i] nodes[i], nodes[j] = nodes[j], nodes[i]
} })
} }
// pushPushScale is used to scale the time interval at which push/pull // pushPushScale is used to scale the time interval at which push/pull
@ -215,20 +216,6 @@ func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
return return
} }
// Given a string of the form "host", "host:port",
// "ipv6::addr" or "[ipv6::address]:port",
// return true if the string includes a port.
func hasPort(s string) bool {
last := strings.LastIndex(s, ":")
if last == -1 {
return false
}
if s[0] == '[' {
return s[last-1] == ']'
}
return strings.Index(s, ":") == last
}
// compressPayload takes an opaque input buffer, compresses it // compressPayload takes an opaque input buffer, compresses it
// and wraps it in a compress{} message that is encoded. // and wraps it in a compress{} message that is encoded.
func compressPayload(inp []byte) (*bytes.Buffer, error) { func compressPayload(inp []byte) (*bytes.Buffer, error) {
@ -286,3 +273,37 @@ func decompressBuffer(c *compress) ([]byte, error) {
// Return the uncompressed bytes // Return the uncompressed bytes
return b.Bytes(), nil return b.Bytes(), nil
} }
// joinHostPort returns the host:port form of an address, for use with a
// transport.
func joinHostPort(host string, port uint16) string {
return net.JoinHostPort(host, strconv.Itoa(int(port)))
}
// hasPort is given a string of the form "host", "host:port", "ipv6::address",
// or "[ipv6::address]:port", and returns true if the string includes a port.
func hasPort(s string) bool {
// IPv6 address in brackets.
if strings.LastIndex(s, "[") == 0 {
return strings.LastIndex(s, ":") > strings.LastIndex(s, "]")
}
// Otherwise the presence of a single colon determines if there's a port
// since IPv6 addresses outside of brackets (count > 1) can't have a
// port.
return strings.Count(s, ":") == 1
}
// ensurePort makes sure the given string has a port number on it, otherwise it
// appends the given port as a default.
func ensurePort(s string, port int) string {
if hasPort(s) {
return s
}
// If this is an IPv6 address, the join call will add another set of
// brackets, so we have to trim before we add the default port.
s = strings.Trim(s, "[]")
s = net.JoinHostPort(s, strconv.Itoa(port))
return s
}

2
vendor/vendor.json vendored
View file

@ -216,7 +216,7 @@
{"path":"github.com/hashicorp/hcl2/hcldec","checksumSHA1":"wQ3hLj4s+5jN6LePSpT0XTTvdXA=","revision":"6743a2254ba3d642b7d3a0be506259a0842819ac","revisionTime":"2018-08-10T01:10:00Z"}, {"path":"github.com/hashicorp/hcl2/hcldec","checksumSHA1":"wQ3hLj4s+5jN6LePSpT0XTTvdXA=","revision":"6743a2254ba3d642b7d3a0be506259a0842819ac","revisionTime":"2018-08-10T01:10:00Z"},
{"path":"github.com/hashicorp/hcl2/hclparse","checksumSHA1":"IzmftuG99BqNhbFGhxZaGwtiMtM=","revision":"6743a2254ba3d642b7d3a0be506259a0842819ac","revisionTime":"2018-08-10T01:10:00Z"}, {"path":"github.com/hashicorp/hcl2/hclparse","checksumSHA1":"IzmftuG99BqNhbFGhxZaGwtiMtM=","revision":"6743a2254ba3d642b7d3a0be506259a0842819ac","revisionTime":"2018-08-10T01:10:00Z"},
{"path":"github.com/hashicorp/logutils","checksumSHA1":"vt+P9D2yWDO3gdvdgCzwqunlhxU=","revision":"0dc08b1671f34c4250ce212759ebd880f743d883"}, {"path":"github.com/hashicorp/logutils","checksumSHA1":"vt+P9D2yWDO3gdvdgCzwqunlhxU=","revision":"0dc08b1671f34c4250ce212759ebd880f743d883"},
{"path":"github.com/hashicorp/memberlist","checksumSHA1":"1zk7IeGClUqBo+Phsx89p7fQ/rQ=","revision":"23ad4b7d7b38496cd64c241dfd4c60b7794c254a","revisionTime":"2017-02-08T21:15:06Z"}, {"path":"github.com/hashicorp/memberlist","checksumSHA1":"pd6KJd+33bGQ6oc+2X+ZvqgSAGI=","revision":"2072f3a3ff4b7b3d830be77678d5d4b978362bc4","revisionTime":"2018-10-22T22:19:44Z"},
{"path":"github.com/hashicorp/net-rpc-msgpackrpc","checksumSHA1":"qnlqWJYV81ENr61SZk9c65R1mDo=","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3"}, {"path":"github.com/hashicorp/net-rpc-msgpackrpc","checksumSHA1":"qnlqWJYV81ENr61SZk9c65R1mDo=","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3"},
{"path":"github.com/hashicorp/raft","checksumSHA1":"zkA9uvbj1BdlveyqXpVTh1N6ers=","revision":"077966dbc90f342107eb723ec52fdb0463ec789b","revisionTime":"2018-01-17T20:29:25Z","version":"master","versionExact":"master"}, {"path":"github.com/hashicorp/raft","checksumSHA1":"zkA9uvbj1BdlveyqXpVTh1N6ers=","revision":"077966dbc90f342107eb723ec52fdb0463ec789b","revisionTime":"2018-01-17T20:29:25Z","version":"master","versionExact":"master"},
{"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"}, {"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"},