From 707bac0a7b9f4483e3ca0b14dcdc655acb292ee2 Mon Sep 17 00:00:00 2001 From: Chris Baker Date: Fri, 7 Dec 2018 20:11:46 +0000 Subject: [PATCH 1/7] rpc accept loop: added backoff on logging for failed connections, in case there is a fast fail loop (NMD-1173) --- nomad/rpc.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/nomad/rpc.go b/nomad/rpc.go index 7695217f0..e65b73725 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -84,6 +84,7 @@ type RPCContext struct { // listen is used to listen for incoming RPC connections func (r *rpcHandler) listen(ctx context.Context) { defer close(r.listenerCh) + var tempDelay time.Duration for { select { case <-ctx.Done(): @@ -105,9 +106,21 @@ func (r *rpcHandler) listen(ctx context.Context) { default: } - r.logger.Error("failed to accept RPC conn", "error", err) + if ne, ok := err.(net.Error); ok && ne.Temporary() { + if tempDelay == 0 { + tempDelay = 5 * time.Millisecond + } else { + tempDelay *= 2 + } + if max := 1 * time.Second; tempDelay > max { + tempDelay = max + } + r.logger.Error("failed to accept RPC conn", "error", err, "delay", tempDelay) + time.Sleep(tempDelay) + } continue } + tempDelay = 0 go r.handleConn(ctx, conn, &RPCContext{Conn: conn}) metrics.IncrCounter([]string{"nomad", "rpc", "accept_conn"}, 1) From 59beae35df9413feaefcf07dc9f12fbb132a7c7e Mon Sep 17 00:00:00 2001 From: Chris Baker Date: Fri, 7 Dec 2018 22:14:15 +0000 Subject: [PATCH 2/7] nomad/rpc listener: modified to throttle logging on "permanent" Accept() errors as well (with a higher delay cap) --- nomad/rpc.go | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/nomad/rpc.go b/nomad/rpc.go index e65b73725..9edf5df5c 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -100,24 +100,20 @@ func (r *rpcHandler) listen(ctx context.Context) { return } - select { - case <-ctx.Done(): - return - default: + if tempDelay == 0 { + tempDelay = 5 * time.Millisecond + } else { + tempDelay *= 2 } - + maxDelay := 5 * time.Second if ne, ok := err.(net.Error); ok && ne.Temporary() { - if tempDelay == 0 { - tempDelay = 5 * time.Millisecond - } else { - tempDelay *= 2 - } - if max := 1 * time.Second; tempDelay > max { - tempDelay = max - } - r.logger.Error("failed to accept RPC conn", "error", err, "delay", tempDelay) - time.Sleep(tempDelay) + maxDelay = 1 * time.Second } + if tempDelay > maxDelay { + tempDelay = maxDelay + } + r.logger.Error("failed to accept RPC conn", "error", err, "delay", tempDelay) + time.Sleep(tempDelay) continue } tempDelay = 0 From 4bbb8106c1c63690996851aacb5ecec873e18985 Mon Sep 17 00:00:00 2001 From: Chris Baker Date: Fri, 7 Dec 2018 22:15:05 +0000 Subject: [PATCH 3/7] updated memberlist dependency to latest, which is missing NMD-1173 error --- .../github.com/hashicorp/memberlist/Makefile | 8 +- .../github.com/hashicorp/memberlist/README.md | 85 +---- .../github.com/hashicorp/memberlist/config.go | 49 ++- .../hashicorp/memberlist/delegate.go | 2 +- .../hashicorp/memberlist/memberlist.go | 311 ++++++++-------- .../hashicorp/memberlist/mock_transport.go | 121 +++++++ vendor/github.com/hashicorp/memberlist/net.go | 334 ++++++++++-------- .../hashicorp/memberlist/net_transport.go | 289 +++++++++++++++ .../github.com/hashicorp/memberlist/queue.go | 20 ++ .../github.com/hashicorp/memberlist/state.go | 85 +++-- .../hashicorp/memberlist/suspicion.go | 2 +- vendor/github.com/hashicorp/memberlist/tag.sh | 16 + .../hashicorp/memberlist/transport.go | 65 ++++ .../github.com/hashicorp/memberlist/util.go | 55 ++- vendor/vendor.json | 2 +- 15 files changed, 1010 insertions(+), 434 deletions(-) create mode 100644 vendor/github.com/hashicorp/memberlist/mock_transport.go create mode 100644 vendor/github.com/hashicorp/memberlist/net_transport.go create mode 100755 vendor/github.com/hashicorp/memberlist/tag.sh create mode 100644 vendor/github.com/hashicorp/memberlist/transport.go diff --git a/vendor/github.com/hashicorp/memberlist/Makefile b/vendor/github.com/hashicorp/memberlist/Makefile index 56ef6c28c..4ee0ee4ef 100644 --- a/vendor/github.com/hashicorp/memberlist/Makefile +++ b/vendor/github.com/hashicorp/memberlist/Makefile @@ -1,3 +1,5 @@ +DEPS := $(shell go list -f '{{range .Imports}}{{.}} {{end}}' ./...) + test: subnet go test ./... @@ -11,4 +13,8 @@ cov: gocov test github.com/hashicorp/memberlist | gocov-html > /tmp/coverage.html open /tmp/coverage.html -.PNONY: test cov integ +deps: + go get -t -d -v ./... + echo $(DEPS) | xargs -n1 go get -d + +.PHONY: test cov integ diff --git a/vendor/github.com/hashicorp/memberlist/README.md b/vendor/github.com/hashicorp/memberlist/README.md index fc605a59b..f47fb81aa 100644 --- a/vendor/github.com/hashicorp/memberlist/README.md +++ b/vendor/github.com/hashicorp/memberlist/README.md @@ -1,4 +1,4 @@ -# memberlist [![GoDoc](https://godoc.org/github.com/hashicorp/memberlist?status.png)](https://godoc.org/github.com/hashicorp/memberlist) +# memberlist [![GoDoc](https://godoc.org/github.com/hashicorp/memberlist?status.png)](https://godoc.org/github.com/hashicorp/memberlist) [![Build Status](https://travis-ci.org/hashicorp/memberlist.svg?branch=master)](https://travis-ci.org/hashicorp/memberlist) memberlist is a [Go](http://www.golang.org) library that manages cluster membership and member failure detection using a gossip based protocol. @@ -23,6 +23,8 @@ Please check your installation with: go version ``` +Run `make deps` to fetch dependencies before building + ## Usage Memberlist is surprisingly simple to use. An example is shown below: @@ -63,82 +65,11 @@ For complete documentation, see the associated [Godoc](http://godoc.org/github.c ## Protocol -memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf), -with a few minor adaptations, mostly to increase propagation speed and +memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://ieeexplore.ieee.org/document/1028914/). However, we extend the protocol in a number of ways: + +* Several extensions are made to increase propagation speed and convergence rate. +* Another set of extensions, that we call Lifeguard, are made to make memberlist more robust in the presence of slow message processing (due to factors such as CPU starvation, and network delay or loss). -A high level overview of the memberlist protocol (based on SWIM) is -described below, but for details please read the full -[SWIM paper](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf) -followed by the memberlist source. We welcome any questions related +For details on all of these extensions, please read our paper "[Lifeguard : SWIM-ing with Situational Awareness](https://arxiv.org/abs/1707.00788)", along with the memberlist source. We welcome any questions related to the protocol on our issue tracker. - -### Protocol Description - -memberlist begins by joining an existing cluster or starting a new -cluster. If starting a new cluster, additional nodes are expected to join -it. New nodes in an existing cluster must be given the address of at -least one existing member in order to join the cluster. The new member -does a full state sync with the existing member over TCP and begins gossiping its -existence to the cluster. - -Gossip is done over UDP with a configurable but fixed fanout and interval. -This ensures that network usage is constant with regards to number of nodes, as opposed to -exponential growth that can occur with traditional heartbeat mechanisms. -Complete state exchanges with a random node are done periodically over -TCP, but much less often than gossip messages. This increases the likelihood -that the membership list converges properly since the full state is exchanged -and merged. The interval between full state exchanges is configurable or can -be disabled entirely. - -Failure detection is done by periodic random probing using a configurable interval. -If the node fails to ack within a reasonable time (typically some multiple -of RTT), then an indirect probe as well as a direct TCP probe are attempted. An -indirect probe asks a configurable number of random nodes to probe the same node, -in case there are network issues causing our own node to fail the probe. The direct -TCP probe is used to help identify the common situation where networking is -misconfigured to allow TCP but not UDP. Without the TCP probe, a UDP-isolated node -would think all other nodes were suspect and could cause churn in the cluster when -it attempts a TCP-based state exchange with another node. It is not desirable to -operate with only TCP connectivity because convergence will be much slower, but it -is enabled so that memberlist can detect this situation and alert operators. - -If both our probe, the indirect probes, and the direct TCP probe fail within a -configurable time, then the node is marked "suspicious" and this knowledge is -gossiped to the cluster. A suspicious node is still considered a member of -cluster. If the suspect member of the cluster does not dispute the suspicion -within a configurable period of time, the node is finally considered dead, -and this state is then gossiped to the cluster. - -This is a brief and incomplete description of the protocol. For a better idea, -please read the -[SWIM paper](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf) -in its entirety, along with the memberlist source code. - -### Changes from SWIM - -As mentioned earlier, the memberlist protocol is based on SWIM but includes -minor changes, mostly to increase propagation speed and convergence rates. - -The changes from SWIM are noted here: - -* memberlist does a full state sync over TCP periodically. SWIM only propagates - changes over gossip. While both eventually reach convergence, the full state - sync increases the likelihood that nodes are fully converged more quickly, - at the expense of more bandwidth usage. This feature can be totally disabled - if you wish. - -* memberlist has a dedicated gossip layer separate from the failure detection - protocol. SWIM only piggybacks gossip messages on top of probe/ack messages. - memberlist also piggybacks gossip messages on top of probe/ack messages, but - also will periodically send out dedicated gossip messages on their own. This - feature lets you have a higher gossip rate (for example once per 200ms) - and a slower failure detection rate (such as once per second), resulting - in overall faster convergence rates and data propagation speeds. This feature - can be totally disabed as well, if you wish. - -* memberlist stores around the state of dead nodes for a set amount of time, - so that when full syncs are requested, the requester also receives information - about dead nodes. Because SWIM doesn't do full syncs, SWIM deletes dead node - state immediately upon learning that the node is dead. This change again helps - the cluster converge more quickly. diff --git a/vendor/github.com/hashicorp/memberlist/config.go b/vendor/github.com/hashicorp/memberlist/config.go index 1c13bfcd3..c85b1657a 100644 --- a/vendor/github.com/hashicorp/memberlist/config.go +++ b/vendor/github.com/hashicorp/memberlist/config.go @@ -11,10 +11,15 @@ type Config struct { // The name of this node. This must be unique in the cluster. Name string + // Transport is a hook for providing custom code to communicate with + // other nodes. If this is left nil, then memberlist will by default + // make a NetTransport using BindAddr and BindPort from this structure. + Transport Transport + // Configuration related to what address to bind to and ports to - // listen on. The port is used for both UDP and TCP gossip. - // It is assumed other nodes are running on this port, but they - // do not need to. + // listen on. The port is used for both UDP and TCP gossip. It is + // assumed other nodes are running on this port, but they do not need + // to. BindAddr string BindPort int @@ -28,8 +33,11 @@ type Config struct { // ProtocolVersionMax. ProtocolVersion uint8 - // TCPTimeout is the timeout for establishing a TCP connection with - // a remote node for a full state sync. + // TCPTimeout is the timeout for establishing a stream connection with + // a remote node for a full state sync, and for stream read and write + // operations. This is a legacy name for backwards compatibility, but + // should really be called StreamTimeout now that we have generalized + // the transport. TCPTimeout time.Duration // IndirectChecks is the number of nodes that will be asked to perform @@ -133,6 +141,16 @@ type Config struct { GossipNodes int GossipToTheDeadTime time.Duration + // GossipVerifyIncoming controls whether to enforce encryption for incoming + // gossip. It is used for upshifting from unencrypted to encrypted gossip on + // a running cluster. + GossipVerifyIncoming bool + + // GossipVerifyOutgoing controls whether to enforce encryption for outgoing + // gossip. It is used for upshifting from unencrypted to encrypted gossip on + // a running cluster. + GossipVerifyOutgoing bool + // EnableCompression is used to control message compression. This can // be used to reduce bandwidth usage at the cost of slightly more CPU // utilization. This is only available starting at protocol version 1. @@ -189,10 +207,13 @@ type Config struct { // while UDP messages are handled. HandoffQueueDepth int - // Maximum number of bytes that memberlist expects UDP messages to be. A safe - // value for this is typically 1400 bytes (which is the default.) However, - // depending on your network's MTU (Maximum Transmission Unit) you may be able - // to increase this. + // Maximum number of bytes that memberlist will put in a packet (this + // will be for UDP packets by default with a NetTransport). A safe value + // for this is typically 1400 bytes (which is the default). However, + // depending on your network's MTU (Maximum Transmission Unit) you may + // be able to increase this to get more content into each gossip packet. + // This is a legacy name for backward compatibility but should really be + // called PacketBufferSize now that we have generalized the transport. UDPBufferSize int } @@ -214,7 +235,7 @@ func DefaultLANConfig() *Config { TCPTimeout: 10 * time.Second, // Timeout after 10 seconds IndirectChecks: 3, // Use 3 nodes for the indirect ping RetransmitMult: 4, // Retransmit a message 4 * log(N+1) nodes - SuspicionMult: 5, // Suspect a node for 5 * log(N+1) * Interval + SuspicionMult: 4, // Suspect a node for 4 * log(N+1) * Interval SuspicionMaxTimeoutMult: 6, // For 10k nodes this will give a max timeout of 120 seconds PushPullInterval: 30 * time.Second, // Low frequency ProbeTimeout: 500 * time.Millisecond, // Reasonable RTT time for LAN @@ -222,9 +243,11 @@ func DefaultLANConfig() *Config { DisableTcpPings: false, // TCP pings are safe, even with mixed versions AwarenessMaxMultiplier: 8, // Probe interval backs off to 8 seconds - GossipNodes: 3, // Gossip to 3 nodes - GossipInterval: 200 * time.Millisecond, // Gossip more rapidly - GossipToTheDeadTime: 30 * time.Second, // Same as push/pull + GossipNodes: 3, // Gossip to 3 nodes + GossipInterval: 200 * time.Millisecond, // Gossip more rapidly + GossipToTheDeadTime: 30 * time.Second, // Same as push/pull + GossipVerifyIncoming: true, + GossipVerifyOutgoing: true, EnableCompression: true, // Enable compression by default diff --git a/vendor/github.com/hashicorp/memberlist/delegate.go b/vendor/github.com/hashicorp/memberlist/delegate.go index 66aa2da79..551548892 100644 --- a/vendor/github.com/hashicorp/memberlist/delegate.go +++ b/vendor/github.com/hashicorp/memberlist/delegate.go @@ -12,7 +12,7 @@ type Delegate interface { // NotifyMsg is called when a user-data message is received. // Care should be taken that this method does not block, since doing // so would block the entire UDP packet receive loop. Additionally, the byte - // slice may be modified after the call returns, so it should be copied if needed. + // slice may be modified after the call returns, so it should be copied if needed NotifyMsg([]byte) // GetBroadcasts is called when user data messages can be broadcast. diff --git a/vendor/github.com/hashicorp/memberlist/memberlist.go b/vendor/github.com/hashicorp/memberlist/memberlist.go index 371e3294b..3a4ce967b 100644 --- a/vendor/github.com/hashicorp/memberlist/memberlist.go +++ b/vendor/github.com/hashicorp/memberlist/memberlist.go @@ -15,6 +15,7 @@ multiple routes. package memberlist import ( + "container/list" "fmt" "log" "net" @@ -22,9 +23,10 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" - "github.com/hashicorp/go-multierror" + multierror "github.com/hashicorp/go-multierror" sockaddr "github.com/hashicorp/go-sockaddr" "github.com/miekg/dns" ) @@ -33,16 +35,23 @@ type Memberlist struct { sequenceNum uint32 // Local sequence number incarnation uint32 // Local incarnation number numNodes uint32 // Number of known nodes (estimate) + pushPullReq uint32 // Number of push/pull requests config *Config - shutdown bool + shutdown int32 // Used as an atomic boolean value shutdownCh chan struct{} - leave bool + leave int32 // Used as an atomic boolean value leaveBroadcast chan struct{} - udpListener *net.UDPConn - tcpListener *net.TCPListener - handoff chan msgHandoff + shutdownLock sync.Mutex // Serializes calls to Shutdown + leaveLock sync.Mutex // Serializes calls to Leave + + transport Transport + + handoffCh chan struct{} + highPriorityMsgQueue *list.List + lowPriorityMsgQueue *list.List + msgQueueLock sync.Mutex nodeLock sync.RWMutex nodes []*nodeState // Known nodes @@ -91,25 +100,6 @@ func newMemberlist(conf *Config) (*Memberlist, error) { } } - tcpAddr := &net.TCPAddr{IP: net.ParseIP(conf.BindAddr), Port: conf.BindPort} - tcpLn, err := net.ListenTCP("tcp", tcpAddr) - if err != nil { - return nil, fmt.Errorf("Failed to start TCP listener. Err: %s", err) - } - if conf.BindPort == 0 { - conf.BindPort = tcpLn.Addr().(*net.TCPAddr).Port - } - - udpAddr := &net.UDPAddr{IP: net.ParseIP(conf.BindAddr), Port: conf.BindPort} - udpLn, err := net.ListenUDP("udp", udpAddr) - if err != nil { - tcpLn.Close() - return nil, fmt.Errorf("Failed to start UDP listener. Err: %s", err) - } - - // Set the UDP receive window size - setUDPRecvBuf(udpLn) - if conf.LogOutput != nil && conf.Logger != nil { return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.") } @@ -124,26 +114,78 @@ func newMemberlist(conf *Config) (*Memberlist, error) { logger = log.New(logDest, "", log.LstdFlags) } + // Set up a network transport by default if a custom one wasn't given + // by the config. + transport := conf.Transport + if transport == nil { + nc := &NetTransportConfig{ + BindAddrs: []string{conf.BindAddr}, + BindPort: conf.BindPort, + Logger: logger, + } + + // See comment below for details about the retry in here. + makeNetRetry := func(limit int) (*NetTransport, error) { + var err error + for try := 0; try < limit; try++ { + var nt *NetTransport + if nt, err = NewNetTransport(nc); err == nil { + return nt, nil + } + if strings.Contains(err.Error(), "address already in use") { + logger.Printf("[DEBUG] memberlist: Got bind error: %v", err) + continue + } + } + + return nil, fmt.Errorf("failed to obtain an address: %v", err) + } + + // The dynamic bind port operation is inherently racy because + // even though we are using the kernel to find a port for us, we + // are attempting to bind multiple protocols (and potentially + // multiple addresses) with the same port number. We build in a + // few retries here since this often gets transient errors in + // busy unit tests. + limit := 1 + if conf.BindPort == 0 { + limit = 10 + } + + nt, err := makeNetRetry(limit) + if err != nil { + return nil, fmt.Errorf("Could not set up network transport: %v", err) + } + if conf.BindPort == 0 { + port := nt.GetAutoBindPort() + conf.BindPort = port + conf.AdvertisePort = port + logger.Printf("[DEBUG] memberlist: Using dynamic bind port %d", port) + } + transport = nt + } + m := &Memberlist{ - config: conf, - shutdownCh: make(chan struct{}), - leaveBroadcast: make(chan struct{}, 1), - udpListener: udpLn, - tcpListener: tcpLn, - handoff: make(chan msgHandoff, conf.HandoffQueueDepth), - nodeMap: make(map[string]*nodeState), - nodeTimers: make(map[string]*suspicion), - awareness: newAwareness(conf.AwarenessMaxMultiplier), - ackHandlers: make(map[uint32]*ackHandler), - broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, - logger: logger, + config: conf, + shutdownCh: make(chan struct{}), + leaveBroadcast: make(chan struct{}, 1), + transport: transport, + handoffCh: make(chan struct{}, 1), + highPriorityMsgQueue: list.New(), + lowPriorityMsgQueue: list.New(), + nodeMap: make(map[string]*nodeState), + nodeTimers: make(map[string]*suspicion), + awareness: newAwareness(conf.AwarenessMaxMultiplier), + ackHandlers: make(map[uint32]*ackHandler), + broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, + logger: logger, } m.broadcasts.NumNodes = func() int { return m.estNumNodes() } - go m.tcpListen() - go m.udpListen() - go m.udpHandler() + go m.streamListen() + go m.packetListen() + go m.packetHandler() return m, nil } @@ -187,7 +229,8 @@ func (m *Memberlist) Join(existing []string) (int, error) { } for _, addr := range addrs { - if err := m.pushPullNode(addr.ip, addr.port, true); err != nil { + hp := joinHostPort(addr.ip.String(), addr.port) + if err := m.pushPullNode(hp, true); err != nil { err = fmt.Errorf("Failed to join %s: %v", addr.ip, err) errs = multierror.Append(errs, err) m.logger.Printf("[DEBUG] memberlist: %v", err) @@ -273,23 +316,17 @@ func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, err // resolveAddr is used to resolve the address into an address, // port, and error. If no port is given, use the default func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) { - // Normalize the incoming string to host:port so we can apply Go's - // parser to it. - port := uint16(0) - if !hasPort(hostStr) { - hostStr += ":" + strconv.Itoa(m.config.BindPort) - } + // This captures the supplied port, or the default one. + hostStr = ensurePort(hostStr, m.config.BindPort) host, sport, err := net.SplitHostPort(hostStr) if err != nil { return nil, err } - - // This will capture the supplied port, or the default one added above. lport, err := strconv.ParseUint(sport, 10, 16) if err != nil { return nil, err } - port = uint16(lport) + port := uint16(lport) // If it looks like an IP address we are done. The SplitHostPort() above // will make sure the host part is in good shape for parsing, even for @@ -327,68 +364,30 @@ func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) { // as if we received an alive notification our own network channel for // ourself. func (m *Memberlist) setAlive() error { - var advertiseAddr net.IP - var advertisePort int - if m.config.AdvertiseAddr != "" { - // If AdvertiseAddr is not empty, then advertise - // the given address and port. - ip := net.ParseIP(m.config.AdvertiseAddr) - if ip == nil { - return fmt.Errorf("Failed to parse advertise address!") - } - - // Ensure IPv4 conversion if necessary - if ip4 := ip.To4(); ip4 != nil { - ip = ip4 - } - - advertiseAddr = ip - advertisePort = m.config.AdvertisePort - } else { - if m.config.BindAddr == "0.0.0.0" { - // Otherwise, if we're not bound to a specific IP, let's use a suitable - // private IP address. - var err error - m.config.AdvertiseAddr, err = sockaddr.GetPrivateIP() - if err != nil { - return fmt.Errorf("Failed to get interface addresses: %v", err) - } - if m.config.AdvertiseAddr == "" { - return fmt.Errorf("No private IP address found, and explicit IP not provided") - } - - advertiseAddr = net.ParseIP(m.config.AdvertiseAddr) - if advertiseAddr == nil { - return fmt.Errorf("Failed to parse advertise address: %q", m.config.AdvertiseAddr) - } - } else { - // Use the IP that we're bound to. - addr := m.tcpListener.Addr().(*net.TCPAddr) - advertiseAddr = addr.IP - } - - // Use the port we are bound to. - advertisePort = m.tcpListener.Addr().(*net.TCPAddr).Port + // Get the final advertise address from the transport, which may need + // to see which address we bound to. + addr, port, err := m.transport.FinalAdvertiseAddr( + m.config.AdvertiseAddr, m.config.AdvertisePort) + if err != nil { + return fmt.Errorf("Failed to get final advertise address: %v", err) } // Check if this is a public address without encryption - ipAddr, err := sockaddr.NewIPAddr(advertiseAddr.String()) + ipAddr, err := sockaddr.NewIPAddr(addr.String()) if err != nil { return fmt.Errorf("Failed to parse interface addresses: %v", err) } - ifAddrs := []sockaddr.IfAddr{ sockaddr.IfAddr{ SockAddr: ipAddr, }, } - _, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs) if len(publicIfs) > 0 && !m.config.EncryptionEnabled() { m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!") } - // Get the node meta data + // Set any metadata from the delegate. var meta []byte if m.config.Delegate != nil { meta = m.config.Delegate.NodeMeta(MetaMaxSize) @@ -400,8 +399,8 @@ func (m *Memberlist) setAlive() error { a := alive{ Incarnation: m.nextIncarnation(), Node: m.config.Name, - Addr: advertiseAddr, - Port: uint16(advertisePort), + Addr: addr, + Port: uint16(port), Meta: meta, Vsn: []uint8{ ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion, @@ -410,7 +409,6 @@ func (m *Memberlist) setAlive() error { }, } m.aliveNode(&a, nil, true) - return nil } @@ -473,13 +471,8 @@ func (m *Memberlist) UpdateNode(timeout time.Duration) error { return nil } -// SendTo is used to directly send a message to another node, without -// the use of the gossip mechanism. This will encode the message as a -// user-data message, which a delegate will receive through NotifyMsg -// The actual data is transmitted over UDP, which means this is a -// best-effort transmission mechanism, and the maximum size of the -// message is the size of a single UDP datagram, after compression. -// This method is DEPRECATED in favor or SendToUDP +// SendTo is deprecated in favor of SendBestEffort, which requires a node to +// target. func (m *Memberlist) SendTo(to net.Addr, msg []byte) error { // Encode as a user message buf := make([]byte, 1, len(msg)+1) @@ -487,36 +480,39 @@ func (m *Memberlist) SendTo(to net.Addr, msg []byte) error { buf = append(buf, msg...) // Send the message - return m.rawSendMsgUDP(to, nil, buf) + return m.rawSendMsgPacket(to.String(), nil, buf) } -// SendToUDP is used to directly send a message to another node, without -// the use of the gossip mechanism. This will encode the message as a -// user-data message, which a delegate will receive through NotifyMsg -// The actual data is transmitted over UDP, which means this is a -// best-effort transmission mechanism, and the maximum size of the -// message is the size of a single UDP datagram, after compression +// SendToUDP is deprecated in favor of SendBestEffort. func (m *Memberlist) SendToUDP(to *Node, msg []byte) error { + return m.SendBestEffort(to, msg) +} + +// SendToTCP is deprecated in favor of SendReliable. +func (m *Memberlist) SendToTCP(to *Node, msg []byte) error { + return m.SendReliable(to, msg) +} + +// SendBestEffort uses the unreliable packet-oriented interface of the transport +// to target a user message at the given node (this does not use the gossip +// mechanism). The maximum size of the message depends on the configured +// UDPBufferSize for this memberlist instance. +func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error { // Encode as a user message buf := make([]byte, 1, len(msg)+1) buf[0] = byte(userMsg) buf = append(buf, msg...) // Send the message - destAddr := &net.UDPAddr{IP: to.Addr, Port: int(to.Port)} - return m.rawSendMsgUDP(destAddr, to, buf) + return m.rawSendMsgPacket(to.Address(), to, buf) } -// SendToTCP is used to directly send a message to another node, without -// the use of the gossip mechanism. This will encode the message as a -// user-data message, which a delegate will receive through NotifyMsg -// The actual data is transmitted over TCP, which means delivery -// is guaranteed if no error is returned. There is no limit -// to the size of the message -func (m *Memberlist) SendToTCP(to *Node, msg []byte) error { - // Send the message - destAddr := &net.TCPAddr{IP: to.Addr, Port: int(to.Port)} - return m.sendTCPUserMsg(destAddr, msg) +// SendReliable uses the reliable stream-oriented interface of the transport to +// target a user message at the given node (this does not use the gossip +// mechanism). Delivery is guaranteed if no error is returned, and there is no +// limit on the size of the message. +func (m *Memberlist) SendReliable(to *Node, msg []byte) error { + return m.sendUserMsg(to.Address(), msg) } // Members returns a list of all known live nodes. The node structures @@ -564,18 +560,17 @@ func (m *Memberlist) NumMembers() (alive int) { // This method is safe to call multiple times, but must not be called // after the cluster is already shut down. func (m *Memberlist) Leave(timeout time.Duration) error { - m.nodeLock.Lock() - // We can't defer m.nodeLock.Unlock() because m.deadNode will also try to - // acquire a lock so we need to Unlock before that. + m.leaveLock.Lock() + defer m.leaveLock.Unlock() - if m.shutdown { - m.nodeLock.Unlock() + if m.hasShutdown() { panic("leave after shutdown") } - if !m.leave { - m.leave = true + if !m.hasLeft() { + atomic.StoreInt32(&m.leave, 1) + m.nodeLock.Lock() state, ok := m.nodeMap[m.config.Name] m.nodeLock.Unlock() if !ok { @@ -601,8 +596,6 @@ func (m *Memberlist) Leave(timeout time.Duration) error { return fmt.Errorf("timeout waiting for leave broadcast") } } - } else { - m.nodeLock.Unlock() } return nil @@ -644,17 +637,55 @@ func (m *Memberlist) ProtocolVersion() uint8 { // // This method is safe to call multiple times. func (m *Memberlist) Shutdown() error { - m.nodeLock.Lock() - defer m.nodeLock.Unlock() + m.shutdownLock.Lock() + defer m.shutdownLock.Unlock() - if m.shutdown { + if m.hasShutdown() { return nil } - m.shutdown = true + // Shut down the transport first, which should block until it's + // completely torn down. If we kill the memberlist-side handlers + // those I/O handlers might get stuck. + if err := m.transport.Shutdown(); err != nil { + m.logger.Printf("[ERR] Failed to shutdown transport: %v", err) + } + + // Now tear down everything else. + atomic.StoreInt32(&m.shutdown, 1) close(m.shutdownCh) m.deschedule() - m.udpListener.Close() - m.tcpListener.Close() return nil } + +func (m *Memberlist) hasShutdown() bool { + return atomic.LoadInt32(&m.shutdown) == 1 +} + +func (m *Memberlist) hasLeft() bool { + return atomic.LoadInt32(&m.leave) == 1 +} + +func (m *Memberlist) getNodeState(addr string) nodeStateType { + m.nodeLock.RLock() + defer m.nodeLock.RUnlock() + + n := m.nodeMap[addr] + return n.State +} + +func (m *Memberlist) getNodeStateChange(addr string) time.Time { + m.nodeLock.RLock() + defer m.nodeLock.RUnlock() + + n := m.nodeMap[addr] + return n.StateChange +} + +func (m *Memberlist) changeNode(addr string, f func(*nodeState)) { + m.nodeLock.Lock() + defer m.nodeLock.Unlock() + + n := m.nodeMap[addr] + f(n) +} diff --git a/vendor/github.com/hashicorp/memberlist/mock_transport.go b/vendor/github.com/hashicorp/memberlist/mock_transport.go new file mode 100644 index 000000000..b8bafa802 --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/mock_transport.go @@ -0,0 +1,121 @@ +package memberlist + +import ( + "fmt" + "net" + "strconv" + "time" +) + +// MockNetwork is used as a factory that produces MockTransport instances which +// are uniquely addressed and wired up to talk to each other. +type MockNetwork struct { + transports map[string]*MockTransport + port int +} + +// NewTransport returns a new MockTransport with a unique address, wired up to +// talk to the other transports in the MockNetwork. +func (n *MockNetwork) NewTransport() *MockTransport { + n.port += 1 + addr := fmt.Sprintf("127.0.0.1:%d", n.port) + transport := &MockTransport{ + net: n, + addr: &MockAddress{addr}, + packetCh: make(chan *Packet), + streamCh: make(chan net.Conn), + } + + if n.transports == nil { + n.transports = make(map[string]*MockTransport) + } + n.transports[addr] = transport + return transport +} + +// MockAddress is a wrapper which adds the net.Addr interface to our mock +// address scheme. +type MockAddress struct { + addr string +} + +// See net.Addr. +func (a *MockAddress) Network() string { + return "mock" +} + +// See net.Addr. +func (a *MockAddress) String() string { + return a.addr +} + +// MockTransport directly plumbs messages to other transports its MockNetwork. +type MockTransport struct { + net *MockNetwork + addr *MockAddress + packetCh chan *Packet + streamCh chan net.Conn +} + +// See Transport. +func (t *MockTransport) FinalAdvertiseAddr(string, int) (net.IP, int, error) { + host, portStr, err := net.SplitHostPort(t.addr.String()) + if err != nil { + return nil, 0, err + } + + ip := net.ParseIP(host) + if ip == nil { + return nil, 0, fmt.Errorf("Failed to parse IP %q", host) + } + + port, err := strconv.ParseInt(portStr, 10, 16) + if err != nil { + return nil, 0, err + } + + return ip, int(port), nil +} + +// See Transport. +func (t *MockTransport) WriteTo(b []byte, addr string) (time.Time, error) { + dest, ok := t.net.transports[addr] + if !ok { + return time.Time{}, fmt.Errorf("No route to %q", addr) + } + + now := time.Now() + dest.packetCh <- &Packet{ + Buf: b, + From: t.addr, + Timestamp: now, + } + return now, nil +} + +// See Transport. +func (t *MockTransport) PacketCh() <-chan *Packet { + return t.packetCh +} + +// See Transport. +func (t *MockTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) { + dest, ok := t.net.transports[addr] + if !ok { + return nil, fmt.Errorf("No route to %q", addr) + } + + p1, p2 := net.Pipe() + dest.streamCh <- p1 + return p2, nil +} + +// See Transport. +func (t *MockTransport) StreamCh() <-chan net.Conn { + return t.streamCh +} + +// See Transport. +func (t *MockTransport) Shutdown() error { + return nil +} diff --git a/vendor/github.com/hashicorp/memberlist/net.go b/vendor/github.com/hashicorp/memberlist/net.go index e47da411e..f6a0d45fe 100644 --- a/vendor/github.com/hashicorp/memberlist/net.go +++ b/vendor/github.com/hashicorp/memberlist/net.go @@ -8,9 +8,10 @@ import ( "hash/crc32" "io" "net" + "sync/atomic" "time" - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" "github.com/hashicorp/go-msgpack/codec" ) @@ -55,6 +56,7 @@ const ( encryptMsg nackRespMsg hasCrcMsg + errMsg ) // compressionType is used to specify the compression algorithm @@ -68,11 +70,10 @@ const ( MetaMaxSize = 512 // Maximum size for node meta data compoundHeaderOverhead = 2 // Assumed header overhead compoundOverhead = 2 // Assumed overhead per entry in compoundHeader - udpBufSize = 65536 - udpRecvBuf = 2 * 1024 * 1024 userMsgOverhead = 1 blockingWarning = 10 * time.Millisecond // Warn if a UDP packet takes this long to process - maxPushStateBytes = 10 * 1024 * 1024 + maxPushStateBytes = 20 * 1024 * 1024 + maxPushPullRequests = 128 // Maximum number of concurrent push/pull requests ) // ping request sent directly to node @@ -107,6 +108,11 @@ type nackResp struct { SeqNo uint32 } +// err response is sent to relay the error from the remote end +type errResp struct { + Error string +} + // suspect is broadcast when we suspect a node is dead type suspect struct { Incarnation uint32 @@ -185,46 +191,45 @@ func (m *Memberlist) encryptionVersion() encryptionVersion { } } -// setUDPRecvBuf is used to resize the UDP receive window. The function -// attempts to set the read buffer to `udpRecvBuf` but backs off until -// the read buffer can be set. -func setUDPRecvBuf(c *net.UDPConn) { - size := udpRecvBuf +// streamListen is a long running goroutine that pulls incoming streams from the +// transport and hands them off for processing. +func (m *Memberlist) streamListen() { for { - if err := c.SetReadBuffer(size); err == nil { - break + select { + case conn := <-m.transport.StreamCh(): + go m.handleConn(conn) + + case <-m.shutdownCh: + return } - size = size / 2 } } -// tcpListen listens for and handles incoming connections -func (m *Memberlist) tcpListen() { - for { - conn, err := m.tcpListener.AcceptTCP() - if err != nil { - if m.shutdown { - break - } - m.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %s", err) - continue - } - go m.handleConn(conn) - } -} - -// handleConn handles a single incoming TCP connection -func (m *Memberlist) handleConn(conn *net.TCPConn) { - m.logger.Printf("[DEBUG] memberlist: TCP connection %s", LogConn(conn)) +// handleConn handles a single incoming stream connection from the transport. +func (m *Memberlist) handleConn(conn net.Conn) { + m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn)) defer conn.Close() metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1) conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) - msgType, bufConn, dec, err := m.readTCP(conn) + msgType, bufConn, dec, err := m.readStream(conn) if err != nil { if err != io.EOF { m.logger.Printf("[ERR] memberlist: failed to receive: %s %s", err, LogConn(conn)) + + resp := errResp{err.Error()} + out, err := encode(errMsg, &resp) + if err != nil { + m.logger.Printf("[ERR] memberlist: Failed to encode error response: %s", err) + return + } + + err = m.rawSendMsgStream(conn, out.Bytes()) + if err != nil { + m.logger.Printf("[ERR] memberlist: Failed to send error: %s %s", err, LogConn(conn)) + return + } } return } @@ -235,6 +240,16 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) { m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn)) } case pushPullMsg: + // Increment counter of pending push/pulls + numConcurrent := atomic.AddUint32(&m.pushPullReq, 1) + defer atomic.AddUint32(&m.pushPullReq, ^uint32(0)) + + // Check if we have too many open push/pull requests + if numConcurrent >= maxPushPullRequests { + m.logger.Printf("[ERR] memberlist: Too many pending push/pull requests") + return + } + join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec) if err != nil { m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn)) @@ -253,7 +268,7 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) { case pingMsg: var p ping if err := dec.Decode(&p); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to decode TCP ping: %s %s", err, LogConn(conn)) + m.logger.Printf("[ERR] memberlist: Failed to decode ping: %s %s", err, LogConn(conn)) return } @@ -265,13 +280,13 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) { ack := ackResp{p.SeqNo, nil} out, err := encode(ackRespMsg, &ack) if err != nil { - m.logger.Printf("[ERR] memberlist: Failed to encode TCP ack: %s", err) + m.logger.Printf("[ERR] memberlist: Failed to encode ack: %s", err) return } - err = m.rawSendMsgTCP(conn, out.Bytes()) + err = m.rawSendMsgStream(conn, out.Bytes()) if err != nil { - m.logger.Printf("[ERR] memberlist: Failed to send TCP ack: %s %s", err, LogConn(conn)) + m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogConn(conn)) return } default: @@ -279,49 +294,17 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) { } } -// udpListen listens for and handles incoming UDP packets -func (m *Memberlist) udpListen() { - var n int - var addr net.Addr - var err error - var lastPacket time.Time +// packetListen is a long running goroutine that pulls packets out of the +// transport and hands them off for processing. +func (m *Memberlist) packetListen() { for { - // Do a check for potentially blocking operations - if !lastPacket.IsZero() && time.Now().Sub(lastPacket) > blockingWarning { - diff := time.Now().Sub(lastPacket) - m.logger.Printf( - "[DEBUG] memberlist: Potential blocking operation. Last command took %v", - diff) + select { + case packet := <-m.transport.PacketCh(): + m.ingestPacket(packet.Buf, packet.From, packet.Timestamp) + + case <-m.shutdownCh: + return } - - // Create a new buffer - // TODO: Use Sync.Pool eventually - buf := make([]byte, udpBufSize) - - // Read a packet - n, addr, err = m.udpListener.ReadFrom(buf) - if err != nil { - if m.shutdown { - break - } - m.logger.Printf("[ERR] memberlist: Error reading UDP packet: %s", err) - continue - } - - // Capture the reception time of the packet as close to the - // system calls as possible. - lastPacket = time.Now() - - // Check the length - if n < 1 { - m.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes) %s", - len(buf), LogAddress(addr)) - continue - } - - // Ingest this packet - metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n)) - m.ingestPacket(buf[:n], addr, lastPacket) } } @@ -331,8 +314,13 @@ func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time // Decrypt the payload plain, err := decryptPayload(m.config.Keyring.GetKeys(), buf, nil) if err != nil { - m.logger.Printf("[ERR] memberlist: Decrypt packet failed: %v %s", err, LogAddress(from)) - return + if !m.config.GossipVerifyIncoming { + // Treat the message as plaintext + plain = buf + } else { + m.logger.Printf("[ERR] memberlist: Decrypt packet failed: %v %s", err, LogAddress(from)) + return + } } // Continue processing the plaintext buffer @@ -381,39 +369,77 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim case deadMsg: fallthrough case userMsg: + // Determine the message queue, prioritize alive + queue := m.lowPriorityMsgQueue + if msgType == aliveMsg { + queue = m.highPriorityMsgQueue + } + + // Check for overflow and append if not full + m.msgQueueLock.Lock() + if queue.Len() >= m.config.HandoffQueueDepth { + m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from)) + } else { + queue.PushBack(msgHandoff{msgType, buf, from}) + } + m.msgQueueLock.Unlock() + + // Notify of pending message select { - case m.handoff <- msgHandoff{msgType, buf, from}: + case m.handoffCh <- struct{}{}: default: - m.logger.Printf("[WARN] memberlist: UDP handler queue full, dropping message (%d) %s", msgType, LogAddress(from)) } default: - m.logger.Printf("[ERR] memberlist: UDP msg type (%d) not supported %s", msgType, LogAddress(from)) + m.logger.Printf("[ERR] memberlist: msg type (%d) not supported %s", msgType, LogAddress(from)) } } -// udpHandler processes messages received over UDP, but is decoupled -// from the listener to avoid blocking the listener which may cause -// ping/ack messages to be delayed. -func (m *Memberlist) udpHandler() { +// getNextMessage returns the next message to process in priority order, using LIFO +func (m *Memberlist) getNextMessage() (msgHandoff, bool) { + m.msgQueueLock.Lock() + defer m.msgQueueLock.Unlock() + + if el := m.highPriorityMsgQueue.Back(); el != nil { + m.highPriorityMsgQueue.Remove(el) + msg := el.Value.(msgHandoff) + return msg, true + } else if el := m.lowPriorityMsgQueue.Back(); el != nil { + m.lowPriorityMsgQueue.Remove(el) + msg := el.Value.(msgHandoff) + return msg, true + } + return msgHandoff{}, false +} + +// packetHandler is a long running goroutine that processes messages received +// over the packet interface, but is decoupled from the listener to avoid +// blocking the listener which may cause ping/ack messages to be delayed. +func (m *Memberlist) packetHandler() { for { select { - case msg := <-m.handoff: - msgType := msg.msgType - buf := msg.buf - from := msg.from + case <-m.handoffCh: + for { + msg, ok := m.getNextMessage() + if !ok { + break + } + msgType := msg.msgType + buf := msg.buf + from := msg.from - switch msgType { - case suspectMsg: - m.handleSuspect(buf, from) - case aliveMsg: - m.handleAlive(buf, from) - case deadMsg: - m.handleDead(buf, from) - case userMsg: - m.handleUser(buf, from) - default: - m.logger.Printf("[ERR] memberlist: UDP msg type (%d) not supported %s (handler)", msgType, LogAddress(from)) + switch msgType { + case suspectMsg: + m.handleSuspect(buf, from) + case aliveMsg: + m.handleAlive(buf, from) + case deadMsg: + m.handleDead(buf, from) + case userMsg: + m.handleUser(buf, from) + default: + m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from)) + } } case <-m.shutdownCh: @@ -457,7 +483,7 @@ func (m *Memberlist) handlePing(buf []byte, from net.Addr) { if m.config.Ping != nil { ack.Payload = m.config.Ping.AckPayload() } - if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil { + if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogAddress(from)) } } @@ -478,7 +504,6 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) { // Send a ping to the correct host. localSeqNo := m.nextSeqNo() ping := ping{SeqNo: localSeqNo, Node: ind.Node} - destAddr := &net.UDPAddr{IP: ind.Target, Port: int(ind.Port)} // Setup a response handler to relay the ack cancelCh := make(chan struct{}) @@ -488,14 +513,15 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) { // Forward the ack back to the requestor. ack := ackResp{ind.SeqNo, nil} - if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil { + if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil { m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogAddress(from)) } } m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout) // Send the ping. - if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil { + addr := joinHostPort(net.IP(ind.Target).String(), ind.Port) + if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from)) } @@ -507,7 +533,7 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) { return case <-time.After(m.config.ProbeTimeout): nack := nackResp{ind.SeqNo} - if err := m.encodeAndSendMsg(from, nackRespMsg, &nack); err != nil { + if err := m.encodeAndSendMsg(from.String(), nackRespMsg, &nack); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogAddress(from)) } } @@ -589,30 +615,30 @@ func (m *Memberlist) handleCompressed(buf []byte, from net.Addr, timestamp time. } // encodeAndSendMsg is used to combine the encoding and sending steps -func (m *Memberlist) encodeAndSendMsg(to net.Addr, msgType messageType, msg interface{}) error { +func (m *Memberlist) encodeAndSendMsg(addr string, msgType messageType, msg interface{}) error { out, err := encode(msgType, msg) if err != nil { return err } - if err := m.sendMsg(to, out.Bytes()); err != nil { + if err := m.sendMsg(addr, out.Bytes()); err != nil { return err } return nil } -// sendMsg is used to send a UDP message to another host. It will opportunistically -// create a compoundMsg and piggy back other broadcasts -func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error { +// sendMsg is used to send a message via packet to another host. It will +// opportunistically create a compoundMsg and piggy back other broadcasts. +func (m *Memberlist) sendMsg(addr string, msg []byte) error { // Check if we can piggy back any messages bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead - if m.config.EncryptionEnabled() { + if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing { bytesAvail -= encryptOverhead(m.encryptionVersion()) } extra := m.getBroadcasts(compoundOverhead, bytesAvail) // Fast path if nothing to piggypack if len(extra) == 0 { - return m.rawSendMsgUDP(to, nil, msg) + return m.rawSendMsgPacket(addr, nil, msg) } // Join all the messages @@ -624,11 +650,12 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error { compound := makeCompoundMessage(msgs) // Send the message - return m.rawSendMsgUDP(to, nil, compound.Bytes()) + return m.rawSendMsgPacket(addr, nil, compound.Bytes()) } -// rawSendMsgUDP is used to send a UDP message to another host without modification -func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error { +// rawSendMsgPacket is used to send message via packet to another host without +// modification, other than compression or encryption if enabled. +func (m *Memberlist) rawSendMsgPacket(addr string, node *Node, msg []byte) error { // Check if we have compression enabled if m.config.EnableCompression { buf, err := compressPayload(msg) @@ -644,9 +671,9 @@ func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error // Try to look up the destination node if node == nil { - toAddr, _, err := net.SplitHostPort(addr.String()) + toAddr, _, err := net.SplitHostPort(addr) if err != nil { - m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr.String(), err) + m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr, err) return err } m.nodeLock.RLock() @@ -668,7 +695,7 @@ func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error } // Check if we have encryption enabled - if m.config.EncryptionEnabled() { + if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing { // Encrypt the payload var buf bytes.Buffer primaryKey := m.config.Keyring.GetPrimaryKey() @@ -681,12 +708,13 @@ func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error } metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg))) - _, err := m.udpListener.WriteTo(msg, addr) + _, err := m.transport.WriteTo(msg, addr) return err } -// rawSendMsgTCP is used to send a TCP message to another host without modification -func (m *Memberlist) rawSendMsgTCP(conn net.Conn, sendBuf []byte) error { +// rawSendMsgStream is used to stream a message to another host without +// modification, other than applying compression and encryption if enabled. +func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte) error { // Check if compresion is enabled if m.config.EnableCompression { compBuf, err := compressPayload(sendBuf) @@ -698,7 +726,7 @@ func (m *Memberlist) rawSendMsgTCP(conn net.Conn, sendBuf []byte) error { } // Check if encryption is enabled - if m.config.EncryptionEnabled() { + if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing { crypt, err := m.encryptLocalState(sendBuf) if err != nil { m.logger.Printf("[ERROR] memberlist: Failed to encrypt local state: %v", err) @@ -719,43 +747,36 @@ func (m *Memberlist) rawSendMsgTCP(conn net.Conn, sendBuf []byte) error { return nil } -// sendTCPUserMsg is used to send a TCP userMsg to another host -func (m *Memberlist) sendTCPUserMsg(to net.Addr, sendBuf []byte) error { - dialer := net.Dialer{Timeout: m.config.TCPTimeout} - conn, err := dialer.Dial("tcp", to.String()) +// sendUserMsg is used to stream a user message to another host. +func (m *Memberlist) sendUserMsg(addr string, sendBuf []byte) error { + conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout) if err != nil { return err } defer conn.Close() bufConn := bytes.NewBuffer(nil) - if err := bufConn.WriteByte(byte(userMsg)); err != nil { return err } - // Send our node state header := userMsgHeader{UserMsgLen: len(sendBuf)} hd := codec.MsgpackHandle{} enc := codec.NewEncoder(bufConn, &hd) - if err := enc.Encode(&header); err != nil { return err } - if _, err := bufConn.Write(sendBuf); err != nil { return err } - - return m.rawSendMsgTCP(conn, bufConn.Bytes()) + return m.rawSendMsgStream(conn, bufConn.Bytes()) } -// sendAndReceiveState is used to initiate a push/pull over TCP with a remote node -func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([]pushNodeState, []byte, error) { +// sendAndReceiveState is used to initiate a push/pull over a stream with a +// remote host. +func (m *Memberlist) sendAndReceiveState(addr string, join bool) ([]pushNodeState, []byte, error) { // Attempt to connect - dialer := net.Dialer{Timeout: m.config.TCPTimeout} - dest := net.TCPAddr{IP: addr, Port: int(port)} - conn, err := dialer.Dial("tcp", dest.String()) + conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout) if err != nil { return nil, nil, err } @@ -769,11 +790,19 @@ func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([ } conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) - msgType, bufConn, dec, err := m.readTCP(conn) + msgType, bufConn, dec, err := m.readStream(conn) if err != nil { return nil, nil, err } + if msgType == errMsg { + var resp errResp + if err := dec.Decode(&resp); err != nil { + return nil, nil, err + } + return nil, nil, fmt.Errorf("remote error: %v", resp.Error) + } + // Quit if not push/pull if msgType != pushPullMsg { err := fmt.Errorf("received invalid msgType (%d), expected pushPullMsg (%d) %s", msgType, pushPullMsg, LogConn(conn)) @@ -785,7 +814,7 @@ func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([ return remoteNodes, userState, err } -// sendLocalState is invoked to send our local state over a tcp connection +// sendLocalState is invoked to send our local state over a stream connection. func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error { // Setup a deadline conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) @@ -843,7 +872,7 @@ func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error { } // Get the send buffer - return m.rawSendMsgTCP(conn, bufConn.Bytes()) + return m.rawSendMsgStream(conn, bufConn.Bytes()) } // encryptLocalState is used to help encrypt local state before sending @@ -901,9 +930,9 @@ func (m *Memberlist) decryptRemoteState(bufConn io.Reader) ([]byte, error) { return decryptPayload(keys, cipherBytes, dataBytes) } -// readTCP is used to read the start of a TCP stream. -// it decrypts and decompresses the stream if necessary -func (m *Memberlist) readTCP(conn net.Conn) (messageType, io.Reader, *codec.Decoder, error) { +// readStream is used to read from a stream connection, decrypting and +// decompressing the stream if necessary. +func (m *Memberlist) readStream(conn net.Conn) (messageType, io.Reader, *codec.Decoder, error) { // Created a buffered reader var bufConn io.Reader = bufio.NewReader(conn) @@ -929,7 +958,7 @@ func (m *Memberlist) readTCP(conn net.Conn) (messageType, io.Reader, *codec.Deco // Reset message type and bufConn msgType = messageType(plain[0]) bufConn = bytes.NewReader(plain[1:]) - } else if m.config.EncryptionEnabled() { + } else if m.config.EncryptionEnabled() && m.config.GossipVerifyIncoming { return 0, nil, nil, fmt.Errorf("Encryption is configured but remote state is not encrypted") } @@ -1044,7 +1073,7 @@ func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, us return nil } -// readUserMsg is used to decode a userMsg from a TCP stream +// readUserMsg is used to decode a userMsg from a stream. func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error { // Read the user message header var header userMsgHeader @@ -1075,13 +1104,12 @@ func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error { return nil } -// sendPingAndWaitForAck makes a TCP connection to the given address, sends +// sendPingAndWaitForAck makes a stream connection to the given address, sends // a ping, and waits for an ack. All of this is done as a series of blocking // operations, given the deadline. The bool return parameter is true if we // we able to round trip a ping to the other node. -func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadline time.Time) (bool, error) { - dialer := net.Dialer{Deadline: deadline} - conn, err := dialer.Dial("tcp", destAddr.String()) +func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time.Time) (bool, error) { + conn, err := m.transport.DialTimeout(addr, deadline.Sub(time.Now())) if err != nil { // If the node is actually dead we expect this to fail, so we // shouldn't spam the logs with it. After this point, errors @@ -1097,17 +1125,17 @@ func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadlin return false, err } - if err = m.rawSendMsgTCP(conn, out.Bytes()); err != nil { + if err = m.rawSendMsgStream(conn, out.Bytes()); err != nil { return false, err } - msgType, _, dec, err := m.readTCP(conn) + msgType, _, dec, err := m.readStream(conn) if err != nil { return false, err } if msgType != ackRespMsg { - return false, fmt.Errorf("Unexpected msgType (%d) from TCP ping %s", msgType, LogConn(conn)) + return false, fmt.Errorf("Unexpected msgType (%d) from ping %s", msgType, LogConn(conn)) } var ack ackResp @@ -1116,7 +1144,7 @@ func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadlin } if ack.SeqNo != ping.SeqNo { - return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d) from TCP ping %s", ack.SeqNo, ping.SeqNo, LogConn(conn)) + return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo) } return true, nil diff --git a/vendor/github.com/hashicorp/memberlist/net_transport.go b/vendor/github.com/hashicorp/memberlist/net_transport.go new file mode 100644 index 000000000..e7b88b01f --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/net_transport.go @@ -0,0 +1,289 @@ +package memberlist + +import ( + "fmt" + "log" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/armon/go-metrics" + sockaddr "github.com/hashicorp/go-sockaddr" +) + +const ( + // udpPacketBufSize is used to buffer incoming packets during read + // operations. + udpPacketBufSize = 65536 + + // udpRecvBufSize is a large buffer size that we attempt to set UDP + // sockets to in order to handle a large volume of messages. + udpRecvBufSize = 2 * 1024 * 1024 +) + +// NetTransportConfig is used to configure a net transport. +type NetTransportConfig struct { + // BindAddrs is a list of addresses to bind to for both TCP and UDP + // communications. + BindAddrs []string + + // BindPort is the port to listen on, for each address above. + BindPort int + + // Logger is a logger for operator messages. + Logger *log.Logger +} + +// NetTransport is a Transport implementation that uses connectionless UDP for +// packet operations, and ad-hoc TCP connections for stream operations. +type NetTransport struct { + config *NetTransportConfig + packetCh chan *Packet + streamCh chan net.Conn + logger *log.Logger + wg sync.WaitGroup + tcpListeners []*net.TCPListener + udpListeners []*net.UDPConn + shutdown int32 +} + +// NewNetTransport returns a net transport with the given configuration. On +// success all the network listeners will be created and listening. +func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) { + // If we reject the empty list outright we can assume that there's at + // least one listener of each type later during operation. + if len(config.BindAddrs) == 0 { + return nil, fmt.Errorf("At least one bind address is required") + } + + // Build out the new transport. + var ok bool + t := NetTransport{ + config: config, + packetCh: make(chan *Packet), + streamCh: make(chan net.Conn), + logger: config.Logger, + } + + // Clean up listeners if there's an error. + defer func() { + if !ok { + t.Shutdown() + } + }() + + // Build all the TCP and UDP listeners. + port := config.BindPort + for _, addr := range config.BindAddrs { + ip := net.ParseIP(addr) + + tcpAddr := &net.TCPAddr{IP: ip, Port: port} + tcpLn, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + return nil, fmt.Errorf("Failed to start TCP listener on %q port %d: %v", addr, port, err) + } + t.tcpListeners = append(t.tcpListeners, tcpLn) + + // If the config port given was zero, use the first TCP listener + // to pick an available port and then apply that to everything + // else. + if port == 0 { + port = tcpLn.Addr().(*net.TCPAddr).Port + } + + udpAddr := &net.UDPAddr{IP: ip, Port: port} + udpLn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + return nil, fmt.Errorf("Failed to start UDP listener on %q port %d: %v", addr, port, err) + } + if err := setUDPRecvBuf(udpLn); err != nil { + return nil, fmt.Errorf("Failed to resize UDP buffer: %v", err) + } + t.udpListeners = append(t.udpListeners, udpLn) + } + + // Fire them up now that we've been able to create them all. + for i := 0; i < len(config.BindAddrs); i++ { + t.wg.Add(2) + go t.tcpListen(t.tcpListeners[i]) + go t.udpListen(t.udpListeners[i]) + } + + ok = true + return &t, nil +} + +// GetAutoBindPort returns the bind port that was automatically given by the +// kernel, if a bind port of 0 was given. +func (t *NetTransport) GetAutoBindPort() int { + // We made sure there's at least one TCP listener, and that one's + // port was applied to all the others for the dynamic bind case. + return t.tcpListeners[0].Addr().(*net.TCPAddr).Port +} + +// See Transport. +func (t *NetTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, error) { + var advertiseAddr net.IP + var advertisePort int + if ip != "" { + // If they've supplied an address, use that. + advertiseAddr = net.ParseIP(ip) + if advertiseAddr == nil { + return nil, 0, fmt.Errorf("Failed to parse advertise address %q", ip) + } + + // Ensure IPv4 conversion if necessary. + if ip4 := advertiseAddr.To4(); ip4 != nil { + advertiseAddr = ip4 + } + advertisePort = port + } else { + if t.config.BindAddrs[0] == "0.0.0.0" { + // Otherwise, if we're not bound to a specific IP, let's + // use a suitable private IP address. + var err error + ip, err = sockaddr.GetPrivateIP() + if err != nil { + return nil, 0, fmt.Errorf("Failed to get interface addresses: %v", err) + } + if ip == "" { + return nil, 0, fmt.Errorf("No private IP address found, and explicit IP not provided") + } + + advertiseAddr = net.ParseIP(ip) + if advertiseAddr == nil { + return nil, 0, fmt.Errorf("Failed to parse advertise address: %q", ip) + } + } else { + // Use the IP that we're bound to, based on the first + // TCP listener, which we already ensure is there. + advertiseAddr = t.tcpListeners[0].Addr().(*net.TCPAddr).IP + } + + // Use the port we are bound to. + advertisePort = t.GetAutoBindPort() + } + + return advertiseAddr, advertisePort, nil +} + +// See Transport. +func (t *NetTransport) WriteTo(b []byte, addr string) (time.Time, error) { + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return time.Time{}, err + } + + // We made sure there's at least one UDP listener, so just use the + // packet sending interface on the first one. Take the time after the + // write call comes back, which will underestimate the time a little, + // but help account for any delays before the write occurs. + _, err = t.udpListeners[0].WriteTo(b, udpAddr) + return time.Now(), err +} + +// See Transport. +func (t *NetTransport) PacketCh() <-chan *Packet { + return t.packetCh +} + +// See Transport. +func (t *NetTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) { + dialer := net.Dialer{Timeout: timeout} + return dialer.Dial("tcp", addr) +} + +// See Transport. +func (t *NetTransport) StreamCh() <-chan net.Conn { + return t.streamCh +} + +// See Transport. +func (t *NetTransport) Shutdown() error { + // This will avoid log spam about errors when we shut down. + atomic.StoreInt32(&t.shutdown, 1) + + // Rip through all the connections and shut them down. + for _, conn := range t.tcpListeners { + conn.Close() + } + for _, conn := range t.udpListeners { + conn.Close() + } + + // Block until all the listener threads have died. + t.wg.Wait() + return nil +} + +// tcpListen is a long running goroutine that accepts incoming TCP connections +// and hands them off to the stream channel. +func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) { + defer t.wg.Done() + for { + conn, err := tcpLn.AcceptTCP() + if err != nil { + if s := atomic.LoadInt32(&t.shutdown); s == 1 { + break + } + + t.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %v", err) + continue + } + + t.streamCh <- conn + } +} + +// udpListen is a long running goroutine that accepts incoming UDP packets and +// hands them off to the packet channel. +func (t *NetTransport) udpListen(udpLn *net.UDPConn) { + defer t.wg.Done() + for { + // Do a blocking read into a fresh buffer. Grab a time stamp as + // close as possible to the I/O. + buf := make([]byte, udpPacketBufSize) + n, addr, err := udpLn.ReadFrom(buf) + ts := time.Now() + if err != nil { + if s := atomic.LoadInt32(&t.shutdown); s == 1 { + break + } + + t.logger.Printf("[ERR] memberlist: Error reading UDP packet: %v", err) + continue + } + + // Check the length - it needs to have at least one byte to be a + // proper message. + if n < 1 { + t.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes) %s", + len(buf), LogAddress(addr)) + continue + } + + // Ingest the packet. + metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n)) + t.packetCh <- &Packet{ + Buf: buf[:n], + From: addr, + Timestamp: ts, + } + } +} + +// setUDPRecvBuf is used to resize the UDP receive window. The function +// attempts to set the read buffer to `udpRecvBuf` but backs off until +// the read buffer can be set. +func setUDPRecvBuf(c *net.UDPConn) error { + size := udpRecvBufSize + var err error + for size > 0 { + if err = c.SetReadBuffer(size); err == nil { + return nil + } + size = size / 2 + } + return err +} diff --git a/vendor/github.com/hashicorp/memberlist/queue.go b/vendor/github.com/hashicorp/memberlist/queue.go index 994b90ff1..1185c9eb0 100644 --- a/vendor/github.com/hashicorp/memberlist/queue.go +++ b/vendor/github.com/hashicorp/memberlist/queue.go @@ -27,6 +27,26 @@ type limitedBroadcast struct { transmits int // Number of transmissions attempted. b Broadcast } + +// for testing; emits in transmit order if reverse=false +func (q *TransmitLimitedQueue) orderedView(reverse bool) []*limitedBroadcast { + q.Lock() + defer q.Unlock() + + out := make([]*limitedBroadcast, 0, len(q.bcQueue)) + if reverse { + for i := 0; i < len(q.bcQueue); i++ { + out = append(out, q.bcQueue[i]) + } + } else { + for i := len(q.bcQueue) - 1; i >= 0; i-- { + out = append(out, q.bcQueue[i]) + } + } + + return out +} + type limitedBroadcasts []*limitedBroadcast // Broadcast is something that can be broadcasted via gossip to diff --git a/vendor/github.com/hashicorp/memberlist/state.go b/vendor/github.com/hashicorp/memberlist/state.go index 6b9122f08..6caded313 100644 --- a/vendor/github.com/hashicorp/memberlist/state.go +++ b/vendor/github.com/hashicorp/memberlist/state.go @@ -34,6 +34,17 @@ type Node struct { DCur uint8 // Current version delegate is speaking } +// Address returns the host:port form of a node's address, suitable for use +// with a transport. +func (n *Node) Address() string { + return joinHostPort(n.Addr.String(), n.Port) +} + +// String returns the node name +func (n *Node) String() string { + return n.Name +} + // NodeState is used to manage our state view of another node type nodeState struct { Node @@ -42,6 +53,12 @@ type nodeState struct { StateChange time.Time // Time last state change happened } +// Address returns the host:port form of a node's address, suitable for use +// with a transport. +func (n *nodeState) Address() string { + return n.Node.Address() +} + // ackHandler is used to register handlers for incoming acks and nacks. type ackHandler struct { ackFn func([]byte, time.Time) @@ -216,6 +233,15 @@ START: m.probeNode(&node) } +// probeNodeByAddr just safely calls probeNode given only the address of the node (for tests) +func (m *Memberlist) probeNodeByAddr(addr string) { + m.nodeLock.RLock() + n := m.nodeMap[addr] + m.nodeLock.RUnlock() + + m.probeNode(n) +} + // probeNode handles a single round of failure checking on a node. func (m *Memberlist) probeNode(node *nodeState) { defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now()) @@ -234,13 +260,20 @@ func (m *Memberlist) probeNode(node *nodeState) { nackCh := make(chan struct{}, m.config.IndirectChecks+1) m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval) + // Mark the sent time here, which should be after any pre-processing but + // before system calls to do the actual send. This probably over-reports + // a bit, but it's the best we can do. We had originally put this right + // after the I/O, but that would sometimes give negative RTT measurements + // which was not desirable. + sent := time.Now() + // Send a ping to the node. If this node looks like it's suspect or dead, // also tack on a suspect message so that it has a chance to refute as // soon as possible. - deadline := time.Now().Add(probeInterval) - destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)} + deadline := sent.Add(probeInterval) + addr := node.Address() if node.State == stateAlive { - if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil { + if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err) return } @@ -261,17 +294,12 @@ func (m *Memberlist) probeNode(node *nodeState) { } compound := makeCompoundMessage(msgs) - if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", destAddr, err) + if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err) return } } - // Mark the sent time here, which should be after any pre-processing and - // system calls to do the actual send. This probably under-reports a bit, - // but it's the best we can do. - sent := time.Now() - // Arrange for our self-awareness to get updated. At this point we've // sent the ping, so any return statement means the probe succeeded // which will improve our health until we get to the failure scenarios @@ -305,7 +333,7 @@ func (m *Memberlist) probeNode(node *nodeState) { // probe interval it will give the TCP fallback more time, which // is more active in dealing with lost packets, and it gives more // time to wait for indirect acks/nacks. - m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node.Name) + m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name) } // Get some random live nodes. @@ -327,8 +355,7 @@ func (m *Memberlist) probeNode(node *nodeState) { expectedNacks++ } - destAddr := &net.UDPAddr{IP: peer.Addr, Port: int(peer.Port)} - if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil { + if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err) } } @@ -345,12 +372,11 @@ func (m *Memberlist) probeNode(node *nodeState) { // config option to turn this off if desired. fallbackCh := make(chan bool, 1) if (!m.config.DisableTcpPings) && (node.PMax >= 3) { - destAddr := &net.TCPAddr{IP: node.Addr, Port: int(node.Port)} go func() { defer close(fallbackCh) - didContact, err := m.sendPingAndWaitForAck(destAddr, ping, deadline) + didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline) if err != nil { - m.logger.Printf("[ERR] memberlist: Failed TCP fallback ping: %s", err) + m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err) } else { fallbackCh <- didContact } @@ -375,7 +401,7 @@ func (m *Memberlist) probeNode(node *nodeState) { // any additional time here. for didContact := range fallbackCh { if didContact { - m.logger.Printf("[WARN] memberlist: Was able to reach %s via TCP but not UDP, network may be misconfigured and not allowing bidirectional UDP", node.Name) + m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name) return } } @@ -390,7 +416,7 @@ func (m *Memberlist) probeNode(node *nodeState) { awarenessDelta = 0 if expectedNacks > 0 { if nackCount := len(nackCh); nackCount < expectedNacks { - awarenessDelta += 2 * (expectedNacks - nackCount) + awarenessDelta += (expectedNacks - nackCount) } } else { awarenessDelta += 1 @@ -410,7 +436,7 @@ func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) { m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval) // Send a ping to the node. - if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil { + if err := m.encodeAndSendMsg(addr.String(), pingMsg, &ping); err != nil { return 0, err } @@ -496,18 +522,17 @@ func (m *Memberlist) gossip() { return } - destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)} - + addr := node.Address() if len(msgs) == 1 { // Send single message as is - if err := m.rawSendMsgUDP(destAddr, &node.Node, msgs[0]); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err) + if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err) } } else { // Otherwise create and send a compound message compound := makeCompoundMessage(msgs) - if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err) + if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err) } } } @@ -533,17 +558,17 @@ func (m *Memberlist) pushPull() { node := nodes[0] // Attempt a push pull - if err := m.pushPullNode(node.Addr, node.Port, false); err != nil { + if err := m.pushPullNode(node.Address(), false); err != nil { m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err) } } // pushPullNode does a complete state exchange with a specific node. -func (m *Memberlist) pushPullNode(addr []byte, port uint16, join bool) error { +func (m *Memberlist) pushPullNode(addr string, join bool) error { defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now()) // Attempt to send and receive with the node - remote, userState, err := m.sendAndReceiveState(addr, port, join) + remote, userState, err := m.sendAndReceiveState(addr, join) if err != nil { return err } @@ -821,7 +846,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { // in-queue to be processed but blocked by the locks above. If we let // that aliveMsg process, it'll cause us to re-join the cluster. This // ensures that we don't. - if m.leave && a.Node == m.config.Name { + if m.hasLeft() && a.Node == m.config.Name { return } @@ -1097,7 +1122,7 @@ func (m *Memberlist) deadNode(d *dead) { // Check if this is us if state.Name == m.config.Name { // If we are not leaving we need to refute - if !m.leave { + if !m.hasLeft() { m.refute(state, d.Incarnation) m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From) return // Do not mark ourself dead diff --git a/vendor/github.com/hashicorp/memberlist/suspicion.go b/vendor/github.com/hashicorp/memberlist/suspicion.go index 5f573e1fc..f8aa9e20a 100644 --- a/vendor/github.com/hashicorp/memberlist/suspicion.go +++ b/vendor/github.com/hashicorp/memberlist/suspicion.go @@ -117,7 +117,7 @@ func (s *suspicion) Confirm(from string) bool { // stop the timer then we will call the timeout function directly from // here. n := atomic.AddInt32(&s.n, 1) - elapsed := time.Now().Sub(s.start) + elapsed := time.Since(s.start) remaining := remainingSuspicionTime(n, s.k, elapsed, s.min, s.max) if s.timer.Stop() { if remaining > 0 { diff --git a/vendor/github.com/hashicorp/memberlist/tag.sh b/vendor/github.com/hashicorp/memberlist/tag.sh new file mode 100755 index 000000000..cd16623a7 --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/tag.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -e + +# The version must be supplied from the environment. Do not include the +# leading "v". +if [ -z $VERSION ]; then + echo "Please specify a version." + exit 1 +fi + +# Generate the tag. +echo "==> Tagging version $VERSION..." +git commit --allow-empty -a --gpg-sign=348FFC4C -m "Release v$VERSION" +git tag -a -m "Version $VERSION" -s -u 348FFC4C "v${VERSION}" master + +exit 0 diff --git a/vendor/github.com/hashicorp/memberlist/transport.go b/vendor/github.com/hashicorp/memberlist/transport.go new file mode 100644 index 000000000..6ce55ea47 --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/transport.go @@ -0,0 +1,65 @@ +package memberlist + +import ( + "net" + "time" +) + +// Packet is used to provide some metadata about incoming packets from peers +// over a packet connection, as well as the packet payload. +type Packet struct { + // Buf has the raw contents of the packet. + Buf []byte + + // From has the address of the peer. This is an actual net.Addr so we + // can expose some concrete details about incoming packets. + From net.Addr + + // Timestamp is the time when the packet was received. This should be + // taken as close as possible to the actual receipt time to help make an + // accurate RTT measurement during probes. + Timestamp time.Time +} + +// Transport is used to abstract over communicating with other peers. The packet +// interface is assumed to be best-effort and the stream interface is assumed to +// be reliable. +type Transport interface { + // FinalAdvertiseAddr is given the user's configured values (which + // might be empty) and returns the desired IP and port to advertise to + // the rest of the cluster. + FinalAdvertiseAddr(ip string, port int) (net.IP, int, error) + + // WriteTo is a packet-oriented interface that fires off the given + // payload to the given address in a connectionless fashion. This should + // return a time stamp that's as close as possible to when the packet + // was transmitted to help make accurate RTT measurements during probes. + // + // This is similar to net.PacketConn, though we didn't want to expose + // that full set of required methods to keep assumptions about the + // underlying plumbing to a minimum. We also treat the address here as a + // string, similar to Dial, so it's network neutral, so this usually is + // in the form of "host:port". + WriteTo(b []byte, addr string) (time.Time, error) + + // PacketCh returns a channel that can be read to receive incoming + // packets from other peers. How this is set up for listening is left as + // an exercise for the concrete transport implementations. + PacketCh() <-chan *Packet + + // DialTimeout is used to create a connection that allows us to perform + // two-way communication with a peer. This is generally more expensive + // than packet connections so is used for more infrequent operations + // such as anti-entropy or fallback probes if the packet-oriented probe + // failed. + DialTimeout(addr string, timeout time.Duration) (net.Conn, error) + + // StreamCh returns a channel that can be read to handle incoming stream + // connections from other peers. How this is set up for listening is + // left as an exercise for the concrete transport implementations. + StreamCh() <-chan net.Conn + + // Shutdown is called when memberlist is shutting down; this gives the + // transport a chance to clean up any listeners. + Shutdown() error +} diff --git a/vendor/github.com/hashicorp/memberlist/util.go b/vendor/github.com/hashicorp/memberlist/util.go index 2ee58ba10..1e582a8a1 100644 --- a/vendor/github.com/hashicorp/memberlist/util.go +++ b/vendor/github.com/hashicorp/memberlist/util.go @@ -8,6 +8,8 @@ import ( "io" "math" "math/rand" + "net" + "strconv" "strings" "time" @@ -76,10 +78,9 @@ func retransmitLimit(retransmitMult, n int) int { // shuffleNodes randomly shuffles the input nodes using the Fisher-Yates shuffle func shuffleNodes(nodes []*nodeState) { n := len(nodes) - for i := n - 1; i > 0; i-- { - j := rand.Intn(i + 1) + rand.Shuffle(n, func(i, j int) { nodes[i], nodes[j] = nodes[j], nodes[i] - } + }) } // pushPushScale is used to scale the time interval at which push/pull @@ -215,20 +216,6 @@ func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) { return } -// Given a string of the form "host", "host:port", -// "ipv6::addr" or "[ipv6::address]:port", -// return true if the string includes a port. -func hasPort(s string) bool { - last := strings.LastIndex(s, ":") - if last == -1 { - return false - } - if s[0] == '[' { - return s[last-1] == ']' - } - return strings.Index(s, ":") == last -} - // compressPayload takes an opaque input buffer, compresses it // and wraps it in a compress{} message that is encoded. func compressPayload(inp []byte) (*bytes.Buffer, error) { @@ -286,3 +273,37 @@ func decompressBuffer(c *compress) ([]byte, error) { // Return the uncompressed bytes return b.Bytes(), nil } + +// joinHostPort returns the host:port form of an address, for use with a +// transport. +func joinHostPort(host string, port uint16) string { + return net.JoinHostPort(host, strconv.Itoa(int(port))) +} + +// hasPort is given a string of the form "host", "host:port", "ipv6::address", +// or "[ipv6::address]:port", and returns true if the string includes a port. +func hasPort(s string) bool { + // IPv6 address in brackets. + if strings.LastIndex(s, "[") == 0 { + return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") + } + + // Otherwise the presence of a single colon determines if there's a port + // since IPv6 addresses outside of brackets (count > 1) can't have a + // port. + return strings.Count(s, ":") == 1 +} + +// ensurePort makes sure the given string has a port number on it, otherwise it +// appends the given port as a default. +func ensurePort(s string, port int) string { + if hasPort(s) { + return s + } + + // If this is an IPv6 address, the join call will add another set of + // brackets, so we have to trim before we add the default port. + s = strings.Trim(s, "[]") + s = net.JoinHostPort(s, strconv.Itoa(port)) + return s +} diff --git a/vendor/vendor.json b/vendor/vendor.json index f724f07d8..0b07d500f 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -216,7 +216,7 @@ {"path":"github.com/hashicorp/hcl2/hcldec","checksumSHA1":"wQ3hLj4s+5jN6LePSpT0XTTvdXA=","revision":"6743a2254ba3d642b7d3a0be506259a0842819ac","revisionTime":"2018-08-10T01:10:00Z"}, {"path":"github.com/hashicorp/hcl2/hclparse","checksumSHA1":"IzmftuG99BqNhbFGhxZaGwtiMtM=","revision":"6743a2254ba3d642b7d3a0be506259a0842819ac","revisionTime":"2018-08-10T01:10:00Z"}, {"path":"github.com/hashicorp/logutils","checksumSHA1":"vt+P9D2yWDO3gdvdgCzwqunlhxU=","revision":"0dc08b1671f34c4250ce212759ebd880f743d883"}, - {"path":"github.com/hashicorp/memberlist","checksumSHA1":"1zk7IeGClUqBo+Phsx89p7fQ/rQ=","revision":"23ad4b7d7b38496cd64c241dfd4c60b7794c254a","revisionTime":"2017-02-08T21:15:06Z"}, + {"path":"github.com/hashicorp/memberlist","checksumSHA1":"pd6KJd+33bGQ6oc+2X+ZvqgSAGI=","revision":"2072f3a3ff4b7b3d830be77678d5d4b978362bc4","revisionTime":"2018-10-22T22:19:44Z"}, {"path":"github.com/hashicorp/net-rpc-msgpackrpc","checksumSHA1":"qnlqWJYV81ENr61SZk9c65R1mDo=","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3"}, {"path":"github.com/hashicorp/raft","checksumSHA1":"zkA9uvbj1BdlveyqXpVTh1N6ers=","revision":"077966dbc90f342107eb723ec52fdb0463ec789b","revisionTime":"2018-01-17T20:29:25Z","version":"master","versionExact":"master"}, {"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"}, From 22c11d8799638fa73d2dcf35166ca5ee87d3b66e Mon Sep 17 00:00:00 2001 From: Chris Baker Date: Wed, 12 Dec 2018 18:52:06 +0000 Subject: [PATCH 4/7] improved code for readability --- nomad/rpc.go | 48 +++++++++++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/nomad/rpc.go b/nomad/rpc.go index 9edf5df5c..aa681d5f8 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -51,6 +51,7 @@ type rpcHandler struct { *Server logger log.Logger gologger *golog.Logger + acceptLoopDelay time.Duration } func newRpcHandler(s *Server) *rpcHandler { @@ -84,7 +85,6 @@ type RPCContext struct { // listen is used to listen for incoming RPC connections func (r *rpcHandler) listen(ctx context.Context) { defer close(r.listenerCh) - var tempDelay time.Duration for { select { case <-ctx.Done(): @@ -99,30 +99,44 @@ func (r *rpcHandler) listen(ctx context.Context) { if r.shutdown { return } - - if tempDelay == 0 { - tempDelay = 5 * time.Millisecond - } else { - tempDelay *= 2 - } - maxDelay := 5 * time.Second - if ne, ok := err.(net.Error); ok && ne.Temporary() { - maxDelay = 1 * time.Second - } - if tempDelay > maxDelay { - tempDelay = maxDelay - } - r.logger.Error("failed to accept RPC conn", "error", err, "delay", tempDelay) - time.Sleep(tempDelay) + r.handleAcceptErr(err, ctx) continue } - tempDelay = 0 + // No error, reset delay loop + r.acceptLoopDelay = 0 go r.handleConn(ctx, conn, &RPCContext{Conn: conn}) metrics.IncrCounter([]string{"nomad", "rpc", "accept_conn"}, 1) } } +// Sleep to avoid spamming the log, with a maximum delay according to whether or not the error is temporary +func (r *rpcHandler) handleAcceptErr(err error, ctx context.Context) { + const baseAcceptLoopDelay = 5 * time.Millisecond + const maxAcceptLoopDelay = 5 * time.Second + const maxAcceptLoopDelayTemporaryError = 1 * time.Second + + if r.acceptLoopDelay == 0 { + r.acceptLoopDelay = baseAcceptLoopDelay + } else { + r.acceptLoopDelay *= 2 + } + temporaryError := false + if ne, ok := err.(net.Error); ok && ne.Temporary() { + temporaryError = true + } + if temporaryError && r.acceptLoopDelay > maxAcceptLoopDelayTemporaryError { + r.acceptLoopDelay = maxAcceptLoopDelayTemporaryError + } else if r.acceptLoopDelay > maxAcceptLoopDelay { + r.acceptLoopDelay = maxAcceptLoopDelay + } + r.logger.Error("failed to accept RPC conn", "error", err, "delay", r.acceptLoopDelay) + select { + case <-ctx.Done(): + case <-time.After(maxAcceptLoopDelay): + } +} + // handleConn is used to determine if this is a Raft or // Nomad type RPC connection and invoke the correct handler func (r *rpcHandler) handleConn(ctx context.Context, conn net.Conn, rpcCtx *RPCContext) { From 89c64932c1d86d7570aaa8d9cdf8809bc1b24750 Mon Sep 17 00:00:00 2001 From: Chris Baker Date: Wed, 12 Dec 2018 19:09:06 +0000 Subject: [PATCH 5/7] gofmt --- nomad/rpc.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nomad/rpc.go b/nomad/rpc.go index aa681d5f8..f39c52c76 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -49,8 +49,8 @@ const ( type rpcHandler struct { *Server - logger log.Logger - gologger *golog.Logger + logger log.Logger + gologger *golog.Logger acceptLoopDelay time.Duration } @@ -102,7 +102,7 @@ func (r *rpcHandler) listen(ctx context.Context) { r.handleAcceptErr(err, ctx) continue } - // No error, reset delay loop + // No error, reset loop delay r.acceptLoopDelay = 0 go r.handleConn(ctx, conn, &RPCContext{Conn: conn}) @@ -113,7 +113,7 @@ func (r *rpcHandler) listen(ctx context.Context) { // Sleep to avoid spamming the log, with a maximum delay according to whether or not the error is temporary func (r *rpcHandler) handleAcceptErr(err error, ctx context.Context) { const baseAcceptLoopDelay = 5 * time.Millisecond - const maxAcceptLoopDelay = 5 * time.Second + const maxAcceptLoopDelay = 5 * time.Second const maxAcceptLoopDelayTemporaryError = 1 * time.Second if r.acceptLoopDelay == 0 { From 34600f8b7580c8d8dd8f801bc7e5fb6905eaf78c Mon Sep 17 00:00:00 2001 From: Chris Baker Date: Wed, 12 Dec 2018 19:16:41 +0000 Subject: [PATCH 6/7] fixed bug in loop delay --- nomad/rpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/rpc.go b/nomad/rpc.go index f39c52c76..58e09303b 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -133,7 +133,7 @@ func (r *rpcHandler) handleAcceptErr(err error, ctx context.Context) { r.logger.Error("failed to accept RPC conn", "error", err, "delay", r.acceptLoopDelay) select { case <-ctx.Done(): - case <-time.After(maxAcceptLoopDelay): + case <-time.After(r.acceptLoopDelay): } } From 121a9eb8cbc0c967c6cf07b6daf3fffd605c6c46 Mon Sep 17 00:00:00 2001 From: Chris Baker Date: Wed, 12 Dec 2018 23:10:24 +0000 Subject: [PATCH 7/7] some changes for more idiomatic code --- nomad/rpc.go | 44 +++++++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/nomad/rpc.go b/nomad/rpc.go index 58e09303b..ed662762a 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -49,9 +49,8 @@ const ( type rpcHandler struct { *Server - logger log.Logger - gologger *golog.Logger - acceptLoopDelay time.Duration + logger log.Logger + gologger *golog.Logger } func newRpcHandler(s *Server) *rpcHandler { @@ -85,6 +84,8 @@ type RPCContext struct { // listen is used to listen for incoming RPC connections func (r *rpcHandler) listen(ctx context.Context) { defer close(r.listenerCh) + + var acceptLoopDelay time.Duration for { select { case <-ctx.Done(): @@ -99,41 +100,46 @@ func (r *rpcHandler) listen(ctx context.Context) { if r.shutdown { return } - r.handleAcceptErr(err, ctx) + r.handleAcceptErr(ctx, err, &acceptLoopDelay) continue } // No error, reset loop delay - r.acceptLoopDelay = 0 + acceptLoopDelay = 0 go r.handleConn(ctx, conn, &RPCContext{Conn: conn}) metrics.IncrCounter([]string{"nomad", "rpc", "accept_conn"}, 1) } } -// Sleep to avoid spamming the log, with a maximum delay according to whether or not the error is temporary -func (r *rpcHandler) handleAcceptErr(err error, ctx context.Context) { - const baseAcceptLoopDelay = 5 * time.Millisecond - const maxAcceptLoopDelay = 5 * time.Second - const maxAcceptLoopDelayTemporaryError = 1 * time.Second +// handleAcceptErr sleeps to avoid spamming the log, +// with a maximum delay according to whether or not the error is temporary +func (r *rpcHandler) handleAcceptErr(ctx context.Context, err error, loopDelay *time.Duration) { + const baseDelay = 5 * time.Millisecond + const maxDelayPerm = 5 * time.Second + const maxDelayTemp = 1 * time.Second - if r.acceptLoopDelay == 0 { - r.acceptLoopDelay = baseAcceptLoopDelay + if *loopDelay == 0 { + *loopDelay = baseDelay } else { - r.acceptLoopDelay *= 2 + *loopDelay *= 2 } + temporaryError := false if ne, ok := err.(net.Error); ok && ne.Temporary() { temporaryError = true } - if temporaryError && r.acceptLoopDelay > maxAcceptLoopDelayTemporaryError { - r.acceptLoopDelay = maxAcceptLoopDelayTemporaryError - } else if r.acceptLoopDelay > maxAcceptLoopDelay { - r.acceptLoopDelay = maxAcceptLoopDelay + + if temporaryError && *loopDelay > maxDelayTemp { + *loopDelay = maxDelayTemp + } else if *loopDelay > maxDelayPerm { + *loopDelay = maxDelayPerm } - r.logger.Error("failed to accept RPC conn", "error", err, "delay", r.acceptLoopDelay) + + r.logger.Error("failed to accept RPC conn", "error", err, "delay", *loopDelay) + select { case <-ctx.Done(): - case <-time.After(r.acceptLoopDelay): + case <-time.After(*loopDelay): } }