diff --git a/vendor/github.com/hashicorp/memberlist/config.go b/vendor/github.com/hashicorp/memberlist/config.go index 1c13bfcd3..2f43d14cb 100644 --- a/vendor/github.com/hashicorp/memberlist/config.go +++ b/vendor/github.com/hashicorp/memberlist/config.go @@ -11,10 +11,15 @@ type Config struct { // The name of this node. This must be unique in the cluster. Name string + // Transport is a hook for providing custom code to communicate with + // other nodes. If this is left nil, then memberlist will by default + // make a NetTransport using BindAddr and BindPort from this structure. + Transport Transport + // Configuration related to what address to bind to and ports to - // listen on. The port is used for both UDP and TCP gossip. - // It is assumed other nodes are running on this port, but they - // do not need to. + // listen on. The port is used for both UDP and TCP gossip. It is + // assumed other nodes are running on this port, but they do not need + // to. BindAddr string BindPort int @@ -28,8 +33,11 @@ type Config struct { // ProtocolVersionMax. ProtocolVersion uint8 - // TCPTimeout is the timeout for establishing a TCP connection with - // a remote node for a full state sync. + // TCPTimeout is the timeout for establishing a stream connection with + // a remote node for a full state sync, and for stream read and write + // operations. This is a legacy name for backwards compatibility, but + // should really be called StreamTimeout now that we have generalized + // the transport. TCPTimeout time.Duration // IndirectChecks is the number of nodes that will be asked to perform @@ -189,10 +197,13 @@ type Config struct { // while UDP messages are handled. HandoffQueueDepth int - // Maximum number of bytes that memberlist expects UDP messages to be. A safe - // value for this is typically 1400 bytes (which is the default.) However, - // depending on your network's MTU (Maximum Transmission Unit) you may be able - // to increase this. + // Maximum number of bytes that memberlist will put in a packet (this + // will be for UDP packets by default with a NetTransport). A safe value + // for this is typically 1400 bytes (which is the default). However, + // depending on your network's MTU (Maximum Transmission Unit) you may + // be able to increase this to get more content into each gossip packet. + // This is a legacy name for backward compatibility but should really be + // called PacketBufferSize now that we have generalized the transport. UDPBufferSize int } diff --git a/vendor/github.com/hashicorp/memberlist/delegate.go b/vendor/github.com/hashicorp/memberlist/delegate.go index 66aa2da79..551548892 100644 --- a/vendor/github.com/hashicorp/memberlist/delegate.go +++ b/vendor/github.com/hashicorp/memberlist/delegate.go @@ -12,7 +12,7 @@ type Delegate interface { // NotifyMsg is called when a user-data message is received. // Care should be taken that this method does not block, since doing // so would block the entire UDP packet receive loop. Additionally, the byte - // slice may be modified after the call returns, so it should be copied if needed. + // slice may be modified after the call returns, so it should be copied if needed NotifyMsg([]byte) // GetBroadcasts is called when user data messages can be broadcast. diff --git a/vendor/github.com/hashicorp/memberlist/memberlist.go b/vendor/github.com/hashicorp/memberlist/memberlist.go index 371e3294b..2aba22322 100644 --- a/vendor/github.com/hashicorp/memberlist/memberlist.go +++ b/vendor/github.com/hashicorp/memberlist/memberlist.go @@ -40,9 +40,8 @@ type Memberlist struct { leave bool leaveBroadcast chan struct{} - udpListener *net.UDPConn - tcpListener *net.TCPListener - handoff chan msgHandoff + transport Transport + handoff chan msgHandoff nodeLock sync.RWMutex nodes []*nodeState // Known nodes @@ -91,25 +90,6 @@ func newMemberlist(conf *Config) (*Memberlist, error) { } } - tcpAddr := &net.TCPAddr{IP: net.ParseIP(conf.BindAddr), Port: conf.BindPort} - tcpLn, err := net.ListenTCP("tcp", tcpAddr) - if err != nil { - return nil, fmt.Errorf("Failed to start TCP listener. Err: %s", err) - } - if conf.BindPort == 0 { - conf.BindPort = tcpLn.Addr().(*net.TCPAddr).Port - } - - udpAddr := &net.UDPAddr{IP: net.ParseIP(conf.BindAddr), Port: conf.BindPort} - udpLn, err := net.ListenUDP("udp", udpAddr) - if err != nil { - tcpLn.Close() - return nil, fmt.Errorf("Failed to start UDP listener. Err: %s", err) - } - - // Set the UDP receive window size - setUDPRecvBuf(udpLn) - if conf.LogOutput != nil && conf.Logger != nil { return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.") } @@ -124,12 +104,33 @@ func newMemberlist(conf *Config) (*Memberlist, error) { logger = log.New(logDest, "", log.LstdFlags) } + // Set up a network transport by default if a custom one wasn't given + // by the config. + transport := conf.Transport + if transport == nil { + nc := &NetTransportConfig{ + BindAddrs: []string{conf.BindAddr}, + BindPort: conf.BindPort, + Logger: logger, + } + nt, err := NewNetTransport(nc) + if err != nil { + return nil, fmt.Errorf("Could not set up network transport: %v", err) + } + + if conf.BindPort == 0 { + port := nt.GetAutoBindPort() + conf.BindPort = port + logger.Printf("[DEBUG] Using dynamic bind port %d", port) + } + transport = nt + } + m := &Memberlist{ config: conf, shutdownCh: make(chan struct{}), leaveBroadcast: make(chan struct{}, 1), - udpListener: udpLn, - tcpListener: tcpLn, + transport: transport, handoff: make(chan msgHandoff, conf.HandoffQueueDepth), nodeMap: make(map[string]*nodeState), nodeTimers: make(map[string]*suspicion), @@ -141,9 +142,9 @@ func newMemberlist(conf *Config) (*Memberlist, error) { m.broadcasts.NumNodes = func() int { return m.estNumNodes() } - go m.tcpListen() - go m.udpListen() - go m.udpHandler() + go m.streamListen() + go m.packetListen() + go m.packetHandler() return m, nil } @@ -187,7 +188,8 @@ func (m *Memberlist) Join(existing []string) (int, error) { } for _, addr := range addrs { - if err := m.pushPullNode(addr.ip, addr.port, true); err != nil { + hp := joinHostPort(addr.ip.String(), addr.port) + if err := m.pushPullNode(hp, true); err != nil { err = fmt.Errorf("Failed to join %s: %v", addr.ip, err) errs = multierror.Append(errs, err) m.logger.Printf("[DEBUG] memberlist: %v", err) @@ -327,68 +329,30 @@ func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) { // as if we received an alive notification our own network channel for // ourself. func (m *Memberlist) setAlive() error { - var advertiseAddr net.IP - var advertisePort int - if m.config.AdvertiseAddr != "" { - // If AdvertiseAddr is not empty, then advertise - // the given address and port. - ip := net.ParseIP(m.config.AdvertiseAddr) - if ip == nil { - return fmt.Errorf("Failed to parse advertise address!") - } - - // Ensure IPv4 conversion if necessary - if ip4 := ip.To4(); ip4 != nil { - ip = ip4 - } - - advertiseAddr = ip - advertisePort = m.config.AdvertisePort - } else { - if m.config.BindAddr == "0.0.0.0" { - // Otherwise, if we're not bound to a specific IP, let's use a suitable - // private IP address. - var err error - m.config.AdvertiseAddr, err = sockaddr.GetPrivateIP() - if err != nil { - return fmt.Errorf("Failed to get interface addresses: %v", err) - } - if m.config.AdvertiseAddr == "" { - return fmt.Errorf("No private IP address found, and explicit IP not provided") - } - - advertiseAddr = net.ParseIP(m.config.AdvertiseAddr) - if advertiseAddr == nil { - return fmt.Errorf("Failed to parse advertise address: %q", m.config.AdvertiseAddr) - } - } else { - // Use the IP that we're bound to. - addr := m.tcpListener.Addr().(*net.TCPAddr) - advertiseAddr = addr.IP - } - - // Use the port we are bound to. - advertisePort = m.tcpListener.Addr().(*net.TCPAddr).Port + // Get the final advertise address from the transport, which may need + // to see which address we bound to. + addr, port, err := m.transport.FinalAdvertiseAddr( + m.config.AdvertiseAddr, m.config.AdvertisePort) + if err != nil { + return fmt.Errorf("Failed to get final advertise address: %v") } // Check if this is a public address without encryption - ipAddr, err := sockaddr.NewIPAddr(advertiseAddr.String()) + ipAddr, err := sockaddr.NewIPAddr(addr.String()) if err != nil { return fmt.Errorf("Failed to parse interface addresses: %v", err) } - ifAddrs := []sockaddr.IfAddr{ sockaddr.IfAddr{ SockAddr: ipAddr, }, } - _, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs) if len(publicIfs) > 0 && !m.config.EncryptionEnabled() { m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!") } - // Get the node meta data + // Set any metadata from the delegate. var meta []byte if m.config.Delegate != nil { meta = m.config.Delegate.NodeMeta(MetaMaxSize) @@ -400,8 +364,8 @@ func (m *Memberlist) setAlive() error { a := alive{ Incarnation: m.nextIncarnation(), Node: m.config.Name, - Addr: advertiseAddr, - Port: uint16(advertisePort), + Addr: addr, + Port: uint16(port), Meta: meta, Vsn: []uint8{ ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion, @@ -410,7 +374,6 @@ func (m *Memberlist) setAlive() error { }, } m.aliveNode(&a, nil, true) - return nil } @@ -473,13 +436,8 @@ func (m *Memberlist) UpdateNode(timeout time.Duration) error { return nil } -// SendTo is used to directly send a message to another node, without -// the use of the gossip mechanism. This will encode the message as a -// user-data message, which a delegate will receive through NotifyMsg -// The actual data is transmitted over UDP, which means this is a -// best-effort transmission mechanism, and the maximum size of the -// message is the size of a single UDP datagram, after compression. -// This method is DEPRECATED in favor or SendToUDP +// SendTo is deprecated in favor of SendBestEffort, which requires a node to +// target. func (m *Memberlist) SendTo(to net.Addr, msg []byte) error { // Encode as a user message buf := make([]byte, 1, len(msg)+1) @@ -487,36 +445,39 @@ func (m *Memberlist) SendTo(to net.Addr, msg []byte) error { buf = append(buf, msg...) // Send the message - return m.rawSendMsgUDP(to, nil, buf) + return m.rawSendMsgPacket(to.String(), nil, buf) } -// SendToUDP is used to directly send a message to another node, without -// the use of the gossip mechanism. This will encode the message as a -// user-data message, which a delegate will receive through NotifyMsg -// The actual data is transmitted over UDP, which means this is a -// best-effort transmission mechanism, and the maximum size of the -// message is the size of a single UDP datagram, after compression +// SendToUDP is deprecated in favor of SendBestEffort. func (m *Memberlist) SendToUDP(to *Node, msg []byte) error { + return m.SendBestEffort(to, msg) +} + +// SendToTCP is deprecated in favor of SendReliable. +func (m *Memberlist) SendToTCP(to *Node, msg []byte) error { + return m.SendReliable(to, msg) +} + +// SendBestEffort uses the unreliable packet-oriented interface of the transport +// to target a user message at the given node (this does not use the gossip +// mechanism). The maximum size of the message depends on the configured +// UDPBufferSize for this memberlist instance. +func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error { // Encode as a user message buf := make([]byte, 1, len(msg)+1) buf[0] = byte(userMsg) buf = append(buf, msg...) // Send the message - destAddr := &net.UDPAddr{IP: to.Addr, Port: int(to.Port)} - return m.rawSendMsgUDP(destAddr, to, buf) + return m.rawSendMsgPacket(to.Address(), to, buf) } -// SendToTCP is used to directly send a message to another node, without -// the use of the gossip mechanism. This will encode the message as a -// user-data message, which a delegate will receive through NotifyMsg -// The actual data is transmitted over TCP, which means delivery -// is guaranteed if no error is returned. There is no limit -// to the size of the message -func (m *Memberlist) SendToTCP(to *Node, msg []byte) error { - // Send the message - destAddr := &net.TCPAddr{IP: to.Addr, Port: int(to.Port)} - return m.sendTCPUserMsg(destAddr, msg) +// SendReliable uses the reliable stream-oriented interface of the transport to +// target a user message at the given node (this does not use the gossip +// mechanism). Delivery is guaranteed if no error is returned, and there is no +// limit on the size of the message. +func (m *Memberlist) SendReliable(to *Node, msg []byte) error { + return m.sendUserMsg(to.Address(), msg) } // Members returns a list of all known live nodes. The node structures @@ -651,10 +612,14 @@ func (m *Memberlist) Shutdown() error { return nil } + // Shut down the transport first, which should block until it's + // completely torn down. If we kill the memberlist-side handlers + // those I/O handlers might get stuck. + m.transport.Shutdown() + + // Now tear down everything else. m.shutdown = true close(m.shutdownCh) m.deschedule() - m.udpListener.Close() - m.tcpListener.Close() return nil } diff --git a/vendor/github.com/hashicorp/memberlist/mock_transport.go b/vendor/github.com/hashicorp/memberlist/mock_transport.go new file mode 100644 index 000000000..b8bafa802 --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/mock_transport.go @@ -0,0 +1,121 @@ +package memberlist + +import ( + "fmt" + "net" + "strconv" + "time" +) + +// MockNetwork is used as a factory that produces MockTransport instances which +// are uniquely addressed and wired up to talk to each other. +type MockNetwork struct { + transports map[string]*MockTransport + port int +} + +// NewTransport returns a new MockTransport with a unique address, wired up to +// talk to the other transports in the MockNetwork. +func (n *MockNetwork) NewTransport() *MockTransport { + n.port += 1 + addr := fmt.Sprintf("127.0.0.1:%d", n.port) + transport := &MockTransport{ + net: n, + addr: &MockAddress{addr}, + packetCh: make(chan *Packet), + streamCh: make(chan net.Conn), + } + + if n.transports == nil { + n.transports = make(map[string]*MockTransport) + } + n.transports[addr] = transport + return transport +} + +// MockAddress is a wrapper which adds the net.Addr interface to our mock +// address scheme. +type MockAddress struct { + addr string +} + +// See net.Addr. +func (a *MockAddress) Network() string { + return "mock" +} + +// See net.Addr. +func (a *MockAddress) String() string { + return a.addr +} + +// MockTransport directly plumbs messages to other transports its MockNetwork. +type MockTransport struct { + net *MockNetwork + addr *MockAddress + packetCh chan *Packet + streamCh chan net.Conn +} + +// See Transport. +func (t *MockTransport) FinalAdvertiseAddr(string, int) (net.IP, int, error) { + host, portStr, err := net.SplitHostPort(t.addr.String()) + if err != nil { + return nil, 0, err + } + + ip := net.ParseIP(host) + if ip == nil { + return nil, 0, fmt.Errorf("Failed to parse IP %q", host) + } + + port, err := strconv.ParseInt(portStr, 10, 16) + if err != nil { + return nil, 0, err + } + + return ip, int(port), nil +} + +// See Transport. +func (t *MockTransport) WriteTo(b []byte, addr string) (time.Time, error) { + dest, ok := t.net.transports[addr] + if !ok { + return time.Time{}, fmt.Errorf("No route to %q", addr) + } + + now := time.Now() + dest.packetCh <- &Packet{ + Buf: b, + From: t.addr, + Timestamp: now, + } + return now, nil +} + +// See Transport. +func (t *MockTransport) PacketCh() <-chan *Packet { + return t.packetCh +} + +// See Transport. +func (t *MockTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) { + dest, ok := t.net.transports[addr] + if !ok { + return nil, fmt.Errorf("No route to %q", addr) + } + + p1, p2 := net.Pipe() + dest.streamCh <- p1 + return p2, nil +} + +// See Transport. +func (t *MockTransport) StreamCh() <-chan net.Conn { + return t.streamCh +} + +// See Transport. +func (t *MockTransport) Shutdown() error { + return nil +} diff --git a/vendor/github.com/hashicorp/memberlist/net.go b/vendor/github.com/hashicorp/memberlist/net.go index e47da411e..e0036d01d 100644 --- a/vendor/github.com/hashicorp/memberlist/net.go +++ b/vendor/github.com/hashicorp/memberlist/net.go @@ -68,8 +68,6 @@ const ( MetaMaxSize = 512 // Maximum size for node meta data compoundHeaderOverhead = 2 // Assumed header overhead compoundOverhead = 2 // Assumed overhead per entry in compoundHeader - udpBufSize = 65536 - udpRecvBuf = 2 * 1024 * 1024 userMsgOverhead = 1 blockingWarning = 10 * time.Millisecond // Warn if a UDP packet takes this long to process maxPushStateBytes = 10 * 1024 * 1024 @@ -185,43 +183,29 @@ func (m *Memberlist) encryptionVersion() encryptionVersion { } } -// setUDPRecvBuf is used to resize the UDP receive window. The function -// attempts to set the read buffer to `udpRecvBuf` but backs off until -// the read buffer can be set. -func setUDPRecvBuf(c *net.UDPConn) { - size := udpRecvBuf +// streamListen is a long running goroutine that pulls incoming streams from the +// transport and hands them off for processing. +func (m *Memberlist) streamListen() { for { - if err := c.SetReadBuffer(size); err == nil { - break + select { + case conn := <-m.transport.StreamCh(): + go m.handleConn(conn) + + case <-m.shutdownCh: + return } - size = size / 2 } } -// tcpListen listens for and handles incoming connections -func (m *Memberlist) tcpListen() { - for { - conn, err := m.tcpListener.AcceptTCP() - if err != nil { - if m.shutdown { - break - } - m.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %s", err) - continue - } - go m.handleConn(conn) - } -} - -// handleConn handles a single incoming TCP connection -func (m *Memberlist) handleConn(conn *net.TCPConn) { - m.logger.Printf("[DEBUG] memberlist: TCP connection %s", LogConn(conn)) +// handleConn handles a single incoming stream connection from the transport. +func (m *Memberlist) handleConn(conn net.Conn) { + m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn)) defer conn.Close() metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1) conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) - msgType, bufConn, dec, err := m.readTCP(conn) + msgType, bufConn, dec, err := m.readStream(conn) if err != nil { if err != io.EOF { m.logger.Printf("[ERR] memberlist: failed to receive: %s %s", err, LogConn(conn)) @@ -253,7 +237,7 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) { case pingMsg: var p ping if err := dec.Decode(&p); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to decode TCP ping: %s %s", err, LogConn(conn)) + m.logger.Printf("[ERR] memberlist: Failed to decode ping: %s %s", err, LogConn(conn)) return } @@ -265,13 +249,13 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) { ack := ackResp{p.SeqNo, nil} out, err := encode(ackRespMsg, &ack) if err != nil { - m.logger.Printf("[ERR] memberlist: Failed to encode TCP ack: %s", err) + m.logger.Printf("[ERR] memberlist: Failed to encode ack: %s", err) return } - err = m.rawSendMsgTCP(conn, out.Bytes()) + err = m.rawSendMsgStream(conn, out.Bytes()) if err != nil { - m.logger.Printf("[ERR] memberlist: Failed to send TCP ack: %s %s", err, LogConn(conn)) + m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogConn(conn)) return } default: @@ -279,49 +263,17 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) { } } -// udpListen listens for and handles incoming UDP packets -func (m *Memberlist) udpListen() { - var n int - var addr net.Addr - var err error - var lastPacket time.Time +// packetListen is a long running goroutine that pulls packets out of the +// transport and hands them off for processing. +func (m *Memberlist) packetListen() { for { - // Do a check for potentially blocking operations - if !lastPacket.IsZero() && time.Now().Sub(lastPacket) > blockingWarning { - diff := time.Now().Sub(lastPacket) - m.logger.Printf( - "[DEBUG] memberlist: Potential blocking operation. Last command took %v", - diff) + select { + case packet := <-m.transport.PacketCh(): + m.ingestPacket(packet.Buf, packet.From, packet.Timestamp) + + case <-m.shutdownCh: + return } - - // Create a new buffer - // TODO: Use Sync.Pool eventually - buf := make([]byte, udpBufSize) - - // Read a packet - n, addr, err = m.udpListener.ReadFrom(buf) - if err != nil { - if m.shutdown { - break - } - m.logger.Printf("[ERR] memberlist: Error reading UDP packet: %s", err) - continue - } - - // Capture the reception time of the packet as close to the - // system calls as possible. - lastPacket = time.Now() - - // Check the length - if n < 1 { - m.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes) %s", - len(buf), LogAddress(addr)) - continue - } - - // Ingest this packet - metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n)) - m.ingestPacket(buf[:n], addr, lastPacket) } } @@ -384,18 +336,18 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim select { case m.handoff <- msgHandoff{msgType, buf, from}: default: - m.logger.Printf("[WARN] memberlist: UDP handler queue full, dropping message (%d) %s", msgType, LogAddress(from)) + m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from)) } default: - m.logger.Printf("[ERR] memberlist: UDP msg type (%d) not supported %s", msgType, LogAddress(from)) + m.logger.Printf("[ERR] memberlist: msg type (%d) not supported %s", msgType, LogAddress(from)) } } -// udpHandler processes messages received over UDP, but is decoupled -// from the listener to avoid blocking the listener which may cause -// ping/ack messages to be delayed. -func (m *Memberlist) udpHandler() { +// packetHandler is a long running goroutine that processes messages received +// over the packet interface, but is decoupled from the listener to avoid +// blocking the listener which may cause ping/ack messages to be delayed. +func (m *Memberlist) packetHandler() { for { select { case msg := <-m.handoff: @@ -413,7 +365,7 @@ func (m *Memberlist) udpHandler() { case userMsg: m.handleUser(buf, from) default: - m.logger.Printf("[ERR] memberlist: UDP msg type (%d) not supported %s (handler)", msgType, LogAddress(from)) + m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from)) } case <-m.shutdownCh: @@ -457,7 +409,7 @@ func (m *Memberlist) handlePing(buf []byte, from net.Addr) { if m.config.Ping != nil { ack.Payload = m.config.Ping.AckPayload() } - if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil { + if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogAddress(from)) } } @@ -478,7 +430,6 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) { // Send a ping to the correct host. localSeqNo := m.nextSeqNo() ping := ping{SeqNo: localSeqNo, Node: ind.Node} - destAddr := &net.UDPAddr{IP: ind.Target, Port: int(ind.Port)} // Setup a response handler to relay the ack cancelCh := make(chan struct{}) @@ -488,14 +439,15 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) { // Forward the ack back to the requestor. ack := ackResp{ind.SeqNo, nil} - if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil { + if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil { m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogAddress(from)) } } m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout) // Send the ping. - if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil { + addr := joinHostPort(net.IP(ind.Target).String(), ind.Port) + if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from)) } @@ -507,7 +459,7 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) { return case <-time.After(m.config.ProbeTimeout): nack := nackResp{ind.SeqNo} - if err := m.encodeAndSendMsg(from, nackRespMsg, &nack); err != nil { + if err := m.encodeAndSendMsg(from.String(), nackRespMsg, &nack); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogAddress(from)) } } @@ -589,20 +541,20 @@ func (m *Memberlist) handleCompressed(buf []byte, from net.Addr, timestamp time. } // encodeAndSendMsg is used to combine the encoding and sending steps -func (m *Memberlist) encodeAndSendMsg(to net.Addr, msgType messageType, msg interface{}) error { +func (m *Memberlist) encodeAndSendMsg(addr string, msgType messageType, msg interface{}) error { out, err := encode(msgType, msg) if err != nil { return err } - if err := m.sendMsg(to, out.Bytes()); err != nil { + if err := m.sendMsg(addr, out.Bytes()); err != nil { return err } return nil } -// sendMsg is used to send a UDP message to another host. It will opportunistically -// create a compoundMsg and piggy back other broadcasts -func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error { +// sendMsg is used to send a message via packet to another host. It will +// opportunistically create a compoundMsg and piggy back other broadcasts. +func (m *Memberlist) sendMsg(addr string, msg []byte) error { // Check if we can piggy back any messages bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead if m.config.EncryptionEnabled() { @@ -612,7 +564,7 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error { // Fast path if nothing to piggypack if len(extra) == 0 { - return m.rawSendMsgUDP(to, nil, msg) + return m.rawSendMsgPacket(addr, nil, msg) } // Join all the messages @@ -624,11 +576,12 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error { compound := makeCompoundMessage(msgs) // Send the message - return m.rawSendMsgUDP(to, nil, compound.Bytes()) + return m.rawSendMsgPacket(addr, nil, compound.Bytes()) } -// rawSendMsgUDP is used to send a UDP message to another host without modification -func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error { +// rawSendMsgPacket is used to send message via packet to another host without +// modification, other than compression or encryption if enabled. +func (m *Memberlist) rawSendMsgPacket(addr string, node *Node, msg []byte) error { // Check if we have compression enabled if m.config.EnableCompression { buf, err := compressPayload(msg) @@ -644,9 +597,9 @@ func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error // Try to look up the destination node if node == nil { - toAddr, _, err := net.SplitHostPort(addr.String()) + toAddr, _, err := net.SplitHostPort(addr) if err != nil { - m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr.String(), err) + m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr, err) return err } m.nodeLock.RLock() @@ -681,12 +634,13 @@ func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error } metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg))) - _, err := m.udpListener.WriteTo(msg, addr) + _, err := m.transport.WriteTo(msg, addr) return err } -// rawSendMsgTCP is used to send a TCP message to another host without modification -func (m *Memberlist) rawSendMsgTCP(conn net.Conn, sendBuf []byte) error { +// rawSendMsgStream is used to stream a message to another host without +// modification, other than applying compression and encryption if enabled. +func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte) error { // Check if compresion is enabled if m.config.EnableCompression { compBuf, err := compressPayload(sendBuf) @@ -719,43 +673,36 @@ func (m *Memberlist) rawSendMsgTCP(conn net.Conn, sendBuf []byte) error { return nil } -// sendTCPUserMsg is used to send a TCP userMsg to another host -func (m *Memberlist) sendTCPUserMsg(to net.Addr, sendBuf []byte) error { - dialer := net.Dialer{Timeout: m.config.TCPTimeout} - conn, err := dialer.Dial("tcp", to.String()) +// sendUserMsg is used to stream a user message to another host. +func (m *Memberlist) sendUserMsg(addr string, sendBuf []byte) error { + conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout) if err != nil { return err } defer conn.Close() bufConn := bytes.NewBuffer(nil) - if err := bufConn.WriteByte(byte(userMsg)); err != nil { return err } - // Send our node state header := userMsgHeader{UserMsgLen: len(sendBuf)} hd := codec.MsgpackHandle{} enc := codec.NewEncoder(bufConn, &hd) - if err := enc.Encode(&header); err != nil { return err } - if _, err := bufConn.Write(sendBuf); err != nil { return err } - - return m.rawSendMsgTCP(conn, bufConn.Bytes()) + return m.rawSendMsgStream(conn, bufConn.Bytes()) } -// sendAndReceiveState is used to initiate a push/pull over TCP with a remote node -func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([]pushNodeState, []byte, error) { +// sendAndReceiveState is used to initiate a push/pull over a stream with a +// remote host. +func (m *Memberlist) sendAndReceiveState(addr string, join bool) ([]pushNodeState, []byte, error) { // Attempt to connect - dialer := net.Dialer{Timeout: m.config.TCPTimeout} - dest := net.TCPAddr{IP: addr, Port: int(port)} - conn, err := dialer.Dial("tcp", dest.String()) + conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout) if err != nil { return nil, nil, err } @@ -769,7 +716,7 @@ func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([ } conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) - msgType, bufConn, dec, err := m.readTCP(conn) + msgType, bufConn, dec, err := m.readStream(conn) if err != nil { return nil, nil, err } @@ -785,7 +732,7 @@ func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([ return remoteNodes, userState, err } -// sendLocalState is invoked to send our local state over a tcp connection +// sendLocalState is invoked to send our local state over a stream connection. func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error { // Setup a deadline conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) @@ -843,7 +790,7 @@ func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error { } // Get the send buffer - return m.rawSendMsgTCP(conn, bufConn.Bytes()) + return m.rawSendMsgStream(conn, bufConn.Bytes()) } // encryptLocalState is used to help encrypt local state before sending @@ -901,9 +848,9 @@ func (m *Memberlist) decryptRemoteState(bufConn io.Reader) ([]byte, error) { return decryptPayload(keys, cipherBytes, dataBytes) } -// readTCP is used to read the start of a TCP stream. -// it decrypts and decompresses the stream if necessary -func (m *Memberlist) readTCP(conn net.Conn) (messageType, io.Reader, *codec.Decoder, error) { +// readStream is used to read from a stream connection, decrypting and +// decompressing the stream if necessary. +func (m *Memberlist) readStream(conn net.Conn) (messageType, io.Reader, *codec.Decoder, error) { // Created a buffered reader var bufConn io.Reader = bufio.NewReader(conn) @@ -1044,7 +991,7 @@ func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, us return nil } -// readUserMsg is used to decode a userMsg from a TCP stream +// readUserMsg is used to decode a userMsg from a stream. func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error { // Read the user message header var header userMsgHeader @@ -1075,13 +1022,12 @@ func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error { return nil } -// sendPingAndWaitForAck makes a TCP connection to the given address, sends +// sendPingAndWaitForAck makes a stream connection to the given address, sends // a ping, and waits for an ack. All of this is done as a series of blocking // operations, given the deadline. The bool return parameter is true if we // we able to round trip a ping to the other node. -func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadline time.Time) (bool, error) { - dialer := net.Dialer{Deadline: deadline} - conn, err := dialer.Dial("tcp", destAddr.String()) +func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time.Time) (bool, error) { + conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout) if err != nil { // If the node is actually dead we expect this to fail, so we // shouldn't spam the logs with it. After this point, errors @@ -1097,17 +1043,17 @@ func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadlin return false, err } - if err = m.rawSendMsgTCP(conn, out.Bytes()); err != nil { + if err = m.rawSendMsgStream(conn, out.Bytes()); err != nil { return false, err } - msgType, _, dec, err := m.readTCP(conn) + msgType, _, dec, err := m.readStream(conn) if err != nil { return false, err } if msgType != ackRespMsg { - return false, fmt.Errorf("Unexpected msgType (%d) from TCP ping %s", msgType, LogConn(conn)) + return false, fmt.Errorf("Unexpected msgType (%d) from ping %s", msgType, LogConn(conn)) } var ack ackResp @@ -1116,7 +1062,7 @@ func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadlin } if ack.SeqNo != ping.SeqNo { - return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d) from TCP ping %s", ack.SeqNo, ping.SeqNo, LogConn(conn)) + return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo, LogConn(conn)) } return true, nil diff --git a/vendor/github.com/hashicorp/memberlist/net_transport.go b/vendor/github.com/hashicorp/memberlist/net_transport.go new file mode 100644 index 000000000..e7b88b01f --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/net_transport.go @@ -0,0 +1,289 @@ +package memberlist + +import ( + "fmt" + "log" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/armon/go-metrics" + sockaddr "github.com/hashicorp/go-sockaddr" +) + +const ( + // udpPacketBufSize is used to buffer incoming packets during read + // operations. + udpPacketBufSize = 65536 + + // udpRecvBufSize is a large buffer size that we attempt to set UDP + // sockets to in order to handle a large volume of messages. + udpRecvBufSize = 2 * 1024 * 1024 +) + +// NetTransportConfig is used to configure a net transport. +type NetTransportConfig struct { + // BindAddrs is a list of addresses to bind to for both TCP and UDP + // communications. + BindAddrs []string + + // BindPort is the port to listen on, for each address above. + BindPort int + + // Logger is a logger for operator messages. + Logger *log.Logger +} + +// NetTransport is a Transport implementation that uses connectionless UDP for +// packet operations, and ad-hoc TCP connections for stream operations. +type NetTransport struct { + config *NetTransportConfig + packetCh chan *Packet + streamCh chan net.Conn + logger *log.Logger + wg sync.WaitGroup + tcpListeners []*net.TCPListener + udpListeners []*net.UDPConn + shutdown int32 +} + +// NewNetTransport returns a net transport with the given configuration. On +// success all the network listeners will be created and listening. +func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) { + // If we reject the empty list outright we can assume that there's at + // least one listener of each type later during operation. + if len(config.BindAddrs) == 0 { + return nil, fmt.Errorf("At least one bind address is required") + } + + // Build out the new transport. + var ok bool + t := NetTransport{ + config: config, + packetCh: make(chan *Packet), + streamCh: make(chan net.Conn), + logger: config.Logger, + } + + // Clean up listeners if there's an error. + defer func() { + if !ok { + t.Shutdown() + } + }() + + // Build all the TCP and UDP listeners. + port := config.BindPort + for _, addr := range config.BindAddrs { + ip := net.ParseIP(addr) + + tcpAddr := &net.TCPAddr{IP: ip, Port: port} + tcpLn, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + return nil, fmt.Errorf("Failed to start TCP listener on %q port %d: %v", addr, port, err) + } + t.tcpListeners = append(t.tcpListeners, tcpLn) + + // If the config port given was zero, use the first TCP listener + // to pick an available port and then apply that to everything + // else. + if port == 0 { + port = tcpLn.Addr().(*net.TCPAddr).Port + } + + udpAddr := &net.UDPAddr{IP: ip, Port: port} + udpLn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + return nil, fmt.Errorf("Failed to start UDP listener on %q port %d: %v", addr, port, err) + } + if err := setUDPRecvBuf(udpLn); err != nil { + return nil, fmt.Errorf("Failed to resize UDP buffer: %v", err) + } + t.udpListeners = append(t.udpListeners, udpLn) + } + + // Fire them up now that we've been able to create them all. + for i := 0; i < len(config.BindAddrs); i++ { + t.wg.Add(2) + go t.tcpListen(t.tcpListeners[i]) + go t.udpListen(t.udpListeners[i]) + } + + ok = true + return &t, nil +} + +// GetAutoBindPort returns the bind port that was automatically given by the +// kernel, if a bind port of 0 was given. +func (t *NetTransport) GetAutoBindPort() int { + // We made sure there's at least one TCP listener, and that one's + // port was applied to all the others for the dynamic bind case. + return t.tcpListeners[0].Addr().(*net.TCPAddr).Port +} + +// See Transport. +func (t *NetTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, error) { + var advertiseAddr net.IP + var advertisePort int + if ip != "" { + // If they've supplied an address, use that. + advertiseAddr = net.ParseIP(ip) + if advertiseAddr == nil { + return nil, 0, fmt.Errorf("Failed to parse advertise address %q", ip) + } + + // Ensure IPv4 conversion if necessary. + if ip4 := advertiseAddr.To4(); ip4 != nil { + advertiseAddr = ip4 + } + advertisePort = port + } else { + if t.config.BindAddrs[0] == "0.0.0.0" { + // Otherwise, if we're not bound to a specific IP, let's + // use a suitable private IP address. + var err error + ip, err = sockaddr.GetPrivateIP() + if err != nil { + return nil, 0, fmt.Errorf("Failed to get interface addresses: %v", err) + } + if ip == "" { + return nil, 0, fmt.Errorf("No private IP address found, and explicit IP not provided") + } + + advertiseAddr = net.ParseIP(ip) + if advertiseAddr == nil { + return nil, 0, fmt.Errorf("Failed to parse advertise address: %q", ip) + } + } else { + // Use the IP that we're bound to, based on the first + // TCP listener, which we already ensure is there. + advertiseAddr = t.tcpListeners[0].Addr().(*net.TCPAddr).IP + } + + // Use the port we are bound to. + advertisePort = t.GetAutoBindPort() + } + + return advertiseAddr, advertisePort, nil +} + +// See Transport. +func (t *NetTransport) WriteTo(b []byte, addr string) (time.Time, error) { + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return time.Time{}, err + } + + // We made sure there's at least one UDP listener, so just use the + // packet sending interface on the first one. Take the time after the + // write call comes back, which will underestimate the time a little, + // but help account for any delays before the write occurs. + _, err = t.udpListeners[0].WriteTo(b, udpAddr) + return time.Now(), err +} + +// See Transport. +func (t *NetTransport) PacketCh() <-chan *Packet { + return t.packetCh +} + +// See Transport. +func (t *NetTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) { + dialer := net.Dialer{Timeout: timeout} + return dialer.Dial("tcp", addr) +} + +// See Transport. +func (t *NetTransport) StreamCh() <-chan net.Conn { + return t.streamCh +} + +// See Transport. +func (t *NetTransport) Shutdown() error { + // This will avoid log spam about errors when we shut down. + atomic.StoreInt32(&t.shutdown, 1) + + // Rip through all the connections and shut them down. + for _, conn := range t.tcpListeners { + conn.Close() + } + for _, conn := range t.udpListeners { + conn.Close() + } + + // Block until all the listener threads have died. + t.wg.Wait() + return nil +} + +// tcpListen is a long running goroutine that accepts incoming TCP connections +// and hands them off to the stream channel. +func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) { + defer t.wg.Done() + for { + conn, err := tcpLn.AcceptTCP() + if err != nil { + if s := atomic.LoadInt32(&t.shutdown); s == 1 { + break + } + + t.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %v", err) + continue + } + + t.streamCh <- conn + } +} + +// udpListen is a long running goroutine that accepts incoming UDP packets and +// hands them off to the packet channel. +func (t *NetTransport) udpListen(udpLn *net.UDPConn) { + defer t.wg.Done() + for { + // Do a blocking read into a fresh buffer. Grab a time stamp as + // close as possible to the I/O. + buf := make([]byte, udpPacketBufSize) + n, addr, err := udpLn.ReadFrom(buf) + ts := time.Now() + if err != nil { + if s := atomic.LoadInt32(&t.shutdown); s == 1 { + break + } + + t.logger.Printf("[ERR] memberlist: Error reading UDP packet: %v", err) + continue + } + + // Check the length - it needs to have at least one byte to be a + // proper message. + if n < 1 { + t.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes) %s", + len(buf), LogAddress(addr)) + continue + } + + // Ingest the packet. + metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n)) + t.packetCh <- &Packet{ + Buf: buf[:n], + From: addr, + Timestamp: ts, + } + } +} + +// setUDPRecvBuf is used to resize the UDP receive window. The function +// attempts to set the read buffer to `udpRecvBuf` but backs off until +// the read buffer can be set. +func setUDPRecvBuf(c *net.UDPConn) error { + size := udpRecvBufSize + var err error + for size > 0 { + if err = c.SetReadBuffer(size); err == nil { + return nil + } + size = size / 2 + } + return err +} diff --git a/vendor/github.com/hashicorp/memberlist/state.go b/vendor/github.com/hashicorp/memberlist/state.go index 6b9122f08..3ed149ff5 100644 --- a/vendor/github.com/hashicorp/memberlist/state.go +++ b/vendor/github.com/hashicorp/memberlist/state.go @@ -34,6 +34,12 @@ type Node struct { DCur uint8 // Current version delegate is speaking } +// Address returns the host:port form of a node's address, suitable for use +// with a transport. +func (n *Node) Address() string { + return joinHostPort(n.Addr.String(), n.Port) +} + // NodeState is used to manage our state view of another node type nodeState struct { Node @@ -42,6 +48,12 @@ type nodeState struct { StateChange time.Time // Time last state change happened } +// Address returns the host:port form of a node's address, suitable for use +// with a transport. +func (n *nodeState) Address() string { + return n.Node.Address() +} + // ackHandler is used to register handlers for incoming acks and nacks. type ackHandler struct { ackFn func([]byte, time.Time) @@ -238,9 +250,9 @@ func (m *Memberlist) probeNode(node *nodeState) { // also tack on a suspect message so that it has a chance to refute as // soon as possible. deadline := time.Now().Add(probeInterval) - destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)} + addr := node.Address() if node.State == stateAlive { - if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil { + if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err) return } @@ -261,8 +273,8 @@ func (m *Memberlist) probeNode(node *nodeState) { } compound := makeCompoundMessage(msgs) - if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", destAddr, err) + if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err) return } } @@ -305,7 +317,7 @@ func (m *Memberlist) probeNode(node *nodeState) { // probe interval it will give the TCP fallback more time, which // is more active in dealing with lost packets, and it gives more // time to wait for indirect acks/nacks. - m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node.Name) + m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name) } // Get some random live nodes. @@ -327,8 +339,7 @@ func (m *Memberlist) probeNode(node *nodeState) { expectedNacks++ } - destAddr := &net.UDPAddr{IP: peer.Addr, Port: int(peer.Port)} - if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil { + if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err) } } @@ -345,12 +356,11 @@ func (m *Memberlist) probeNode(node *nodeState) { // config option to turn this off if desired. fallbackCh := make(chan bool, 1) if (!m.config.DisableTcpPings) && (node.PMax >= 3) { - destAddr := &net.TCPAddr{IP: node.Addr, Port: int(node.Port)} go func() { defer close(fallbackCh) - didContact, err := m.sendPingAndWaitForAck(destAddr, ping, deadline) + didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline) if err != nil { - m.logger.Printf("[ERR] memberlist: Failed TCP fallback ping: %s", err) + m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err) } else { fallbackCh <- didContact } @@ -375,7 +385,7 @@ func (m *Memberlist) probeNode(node *nodeState) { // any additional time here. for didContact := range fallbackCh { if didContact { - m.logger.Printf("[WARN] memberlist: Was able to reach %s via TCP but not UDP, network may be misconfigured and not allowing bidirectional UDP", node.Name) + m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name) return } } @@ -410,7 +420,7 @@ func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) { m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval) // Send a ping to the node. - if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil { + if err := m.encodeAndSendMsg(addr.String(), pingMsg, &ping); err != nil { return 0, err } @@ -496,18 +506,17 @@ func (m *Memberlist) gossip() { return } - destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)} - + addr := node.Address() if len(msgs) == 1 { // Send single message as is - if err := m.rawSendMsgUDP(destAddr, &node.Node, msgs[0]); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err) + if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err) } } else { // Otherwise create and send a compound message compound := makeCompoundMessage(msgs) - if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err) + if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err) } } } @@ -533,17 +542,17 @@ func (m *Memberlist) pushPull() { node := nodes[0] // Attempt a push pull - if err := m.pushPullNode(node.Addr, node.Port, false); err != nil { + if err := m.pushPullNode(node.Address(), false); err != nil { m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err) } } // pushPullNode does a complete state exchange with a specific node. -func (m *Memberlist) pushPullNode(addr []byte, port uint16, join bool) error { +func (m *Memberlist) pushPullNode(addr string, join bool) error { defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now()) // Attempt to send and receive with the node - remote, userState, err := m.sendAndReceiveState(addr, port, join) + remote, userState, err := m.sendAndReceiveState(addr, join) if err != nil { return err } diff --git a/vendor/github.com/hashicorp/memberlist/transport.go b/vendor/github.com/hashicorp/memberlist/transport.go new file mode 100644 index 000000000..ca0a66083 --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/transport.go @@ -0,0 +1,65 @@ +package memberlist + +import ( + "net" + "time" +) + +// Packet is used to provide some metadata about incoming packets from peers +// over a packet connection, as well as the packet payload. +type Packet struct { + // Buf has the raw contents of the packet. + Buf []byte + + // From has the address of the peer. This is an actual net.Addr so we + // can expose some concrete details about incoming packets. + From net.Addr + + // Timestamp is the time when the packet was received. This should be + // taken as close as possible to the actual receipt time to help make an + // accurate RTT measurements during probes. + Timestamp time.Time +} + +// Transport is used to abstract over communicating with other peers. The packet +// interface is assumed to be best-effort and the stream interface is assumed to +// be reliable. +type Transport interface { + // FinalAdvertiseAddr is given the user's configured values (which + // might be empty) and returns the desired IP and port to advertise to + // the rest of the cluster. + FinalAdvertiseAddr(ip string, port int) (net.IP, int, error) + + // WriteTo is a packet-oriented interface that fires off the given + // payload to the given address in a connectionless fashion. This should + // return a time stamp that's as close as possible to when the packet + // was transmitted to help make accurate RTT measurements during probes. + // + // This is similar to net.PacketConn, though we didn't want to expose + // that full set of required methods to keep assumptions about the + // underlying plumbing to a minimum. We also treat the address here as a + // string, similar to Dial, so it's network neutral, so this usually is + // in the form of "host:port". + WriteTo(b []byte, addr string) (time.Time, error) + + // PacketCh returns a channel that can be read to receive incoming + // packets from other peers. How this is set up for listening is left as + // an exercise for the concrete transport implementations. + PacketCh() <-chan *Packet + + // DialTimeout is used to create a connection that allows us to perform + // two-way communication with a peer. This is generally more expensive + // than packet connections so is used for more infrequent operations + // such as anti-entropy or fallback probes if the packet-oriented probe + // failed. + DialTimeout(addr string, timeout time.Duration) (net.Conn, error) + + // StreamCh returns a channel that can be read to handle incoming stream + // connections from other peers. How this is set up for listening is + // left as an exercise for the concrete transport implementations. + StreamCh() <-chan net.Conn + + // Shutdown is called when memberlist is shutting down; this gives the + // transport a chance to clean up any listeners. + Shutdown() error +} diff --git a/vendor/github.com/hashicorp/memberlist/util.go b/vendor/github.com/hashicorp/memberlist/util.go index 2ee58ba10..a4f926e3a 100644 --- a/vendor/github.com/hashicorp/memberlist/util.go +++ b/vendor/github.com/hashicorp/memberlist/util.go @@ -8,6 +8,8 @@ import ( "io" "math" "math/rand" + "net" + "strconv" "strings" "time" @@ -286,3 +288,9 @@ func decompressBuffer(c *compress) ([]byte, error) { // Return the uncompressed bytes return b.Bytes(), nil } + +// joinHostPort returns the host:port form of an address, for use with a +// transport. +func joinHostPort(host string, port uint16) string { + return net.JoinHostPort(host, strconv.Itoa(int(port))) +} diff --git a/vendor/vendor.json b/vendor/vendor.json index d31f89ea3..a24c0891b 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -588,10 +588,10 @@ "revisionTime": "2015-06-09T07:04:31Z" }, { - "checksumSHA1": "1zk7IeGClUqBo+Phsx89p7fQ/rQ=", + "checksumSHA1": "lhcYybZqKjS43O2DBjvnGn0yPPo=", "path": "github.com/hashicorp/memberlist", - "revision": "23ad4b7d7b38496cd64c241dfd4c60b7794c254a", - "revisionTime": "2017-02-08T21:15:06Z" + "revision": "bbad45074a20fa539bc52e62d1e5f9f8cb467e7c", + "revisionTime": "2017-03-16T23:38:26Z" }, { "checksumSHA1": "qnlqWJYV81ENr61SZk9c65R1mDo=",