Updates memberlist to get new transport interface.
This commit is contained in:
parent
2d659a2c4d
commit
8b39dc9dd5
|
@ -11,10 +11,15 @@ type Config struct {
|
|||
// The name of this node. This must be unique in the cluster.
|
||||
Name string
|
||||
|
||||
// Transport is a hook for providing custom code to communicate with
|
||||
// other nodes. If this is left nil, then memberlist will by default
|
||||
// make a NetTransport using BindAddr and BindPort from this structure.
|
||||
Transport Transport
|
||||
|
||||
// Configuration related to what address to bind to and ports to
|
||||
// listen on. The port is used for both UDP and TCP gossip.
|
||||
// It is assumed other nodes are running on this port, but they
|
||||
// do not need to.
|
||||
// listen on. The port is used for both UDP and TCP gossip. It is
|
||||
// assumed other nodes are running on this port, but they do not need
|
||||
// to.
|
||||
BindAddr string
|
||||
BindPort int
|
||||
|
||||
|
@ -28,8 +33,11 @@ type Config struct {
|
|||
// ProtocolVersionMax.
|
||||
ProtocolVersion uint8
|
||||
|
||||
// TCPTimeout is the timeout for establishing a TCP connection with
|
||||
// a remote node for a full state sync.
|
||||
// TCPTimeout is the timeout for establishing a stream connection with
|
||||
// a remote node for a full state sync, and for stream read and write
|
||||
// operations. This is a legacy name for backwards compatibility, but
|
||||
// should really be called StreamTimeout now that we have generalized
|
||||
// the transport.
|
||||
TCPTimeout time.Duration
|
||||
|
||||
// IndirectChecks is the number of nodes that will be asked to perform
|
||||
|
@ -189,10 +197,13 @@ type Config struct {
|
|||
// while UDP messages are handled.
|
||||
HandoffQueueDepth int
|
||||
|
||||
// Maximum number of bytes that memberlist expects UDP messages to be. A safe
|
||||
// value for this is typically 1400 bytes (which is the default.) However,
|
||||
// depending on your network's MTU (Maximum Transmission Unit) you may be able
|
||||
// to increase this.
|
||||
// Maximum number of bytes that memberlist will put in a packet (this
|
||||
// will be for UDP packets by default with a NetTransport). A safe value
|
||||
// for this is typically 1400 bytes (which is the default). However,
|
||||
// depending on your network's MTU (Maximum Transmission Unit) you may
|
||||
// be able to increase this to get more content into each gossip packet.
|
||||
// This is a legacy name for backward compatibility but should really be
|
||||
// called PacketBufferSize now that we have generalized the transport.
|
||||
UDPBufferSize int
|
||||
}
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ type Delegate interface {
|
|||
// NotifyMsg is called when a user-data message is received.
|
||||
// Care should be taken that this method does not block, since doing
|
||||
// so would block the entire UDP packet receive loop. Additionally, the byte
|
||||
// slice may be modified after the call returns, so it should be copied if needed.
|
||||
// slice may be modified after the call returns, so it should be copied if needed
|
||||
NotifyMsg([]byte)
|
||||
|
||||
// GetBroadcasts is called when user data messages can be broadcast.
|
||||
|
|
|
@ -40,8 +40,7 @@ type Memberlist struct {
|
|||
leave bool
|
||||
leaveBroadcast chan struct{}
|
||||
|
||||
udpListener *net.UDPConn
|
||||
tcpListener *net.TCPListener
|
||||
transport Transport
|
||||
handoff chan msgHandoff
|
||||
|
||||
nodeLock sync.RWMutex
|
||||
|
@ -91,25 +90,6 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
|
|||
}
|
||||
}
|
||||
|
||||
tcpAddr := &net.TCPAddr{IP: net.ParseIP(conf.BindAddr), Port: conf.BindPort}
|
||||
tcpLn, err := net.ListenTCP("tcp", tcpAddr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to start TCP listener. Err: %s", err)
|
||||
}
|
||||
if conf.BindPort == 0 {
|
||||
conf.BindPort = tcpLn.Addr().(*net.TCPAddr).Port
|
||||
}
|
||||
|
||||
udpAddr := &net.UDPAddr{IP: net.ParseIP(conf.BindAddr), Port: conf.BindPort}
|
||||
udpLn, err := net.ListenUDP("udp", udpAddr)
|
||||
if err != nil {
|
||||
tcpLn.Close()
|
||||
return nil, fmt.Errorf("Failed to start UDP listener. Err: %s", err)
|
||||
}
|
||||
|
||||
// Set the UDP receive window size
|
||||
setUDPRecvBuf(udpLn)
|
||||
|
||||
if conf.LogOutput != nil && conf.Logger != nil {
|
||||
return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.")
|
||||
}
|
||||
|
@ -124,12 +104,33 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
|
|||
logger = log.New(logDest, "", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Set up a network transport by default if a custom one wasn't given
|
||||
// by the config.
|
||||
transport := conf.Transport
|
||||
if transport == nil {
|
||||
nc := &NetTransportConfig{
|
||||
BindAddrs: []string{conf.BindAddr},
|
||||
BindPort: conf.BindPort,
|
||||
Logger: logger,
|
||||
}
|
||||
nt, err := NewNetTransport(nc)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Could not set up network transport: %v", err)
|
||||
}
|
||||
|
||||
if conf.BindPort == 0 {
|
||||
port := nt.GetAutoBindPort()
|
||||
conf.BindPort = port
|
||||
logger.Printf("[DEBUG] Using dynamic bind port %d", port)
|
||||
}
|
||||
transport = nt
|
||||
}
|
||||
|
||||
m := &Memberlist{
|
||||
config: conf,
|
||||
shutdownCh: make(chan struct{}),
|
||||
leaveBroadcast: make(chan struct{}, 1),
|
||||
udpListener: udpLn,
|
||||
tcpListener: tcpLn,
|
||||
transport: transport,
|
||||
handoff: make(chan msgHandoff, conf.HandoffQueueDepth),
|
||||
nodeMap: make(map[string]*nodeState),
|
||||
nodeTimers: make(map[string]*suspicion),
|
||||
|
@ -141,9 +142,9 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
|
|||
m.broadcasts.NumNodes = func() int {
|
||||
return m.estNumNodes()
|
||||
}
|
||||
go m.tcpListen()
|
||||
go m.udpListen()
|
||||
go m.udpHandler()
|
||||
go m.streamListen()
|
||||
go m.packetListen()
|
||||
go m.packetHandler()
|
||||
return m, nil
|
||||
}
|
||||
|
||||
|
@ -187,7 +188,8 @@ func (m *Memberlist) Join(existing []string) (int, error) {
|
|||
}
|
||||
|
||||
for _, addr := range addrs {
|
||||
if err := m.pushPullNode(addr.ip, addr.port, true); err != nil {
|
||||
hp := joinHostPort(addr.ip.String(), addr.port)
|
||||
if err := m.pushPullNode(hp, true); err != nil {
|
||||
err = fmt.Errorf("Failed to join %s: %v", addr.ip, err)
|
||||
errs = multierror.Append(errs, err)
|
||||
m.logger.Printf("[DEBUG] memberlist: %v", err)
|
||||
|
@ -327,68 +329,30 @@ func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
|
|||
// as if we received an alive notification our own network channel for
|
||||
// ourself.
|
||||
func (m *Memberlist) setAlive() error {
|
||||
var advertiseAddr net.IP
|
||||
var advertisePort int
|
||||
if m.config.AdvertiseAddr != "" {
|
||||
// If AdvertiseAddr is not empty, then advertise
|
||||
// the given address and port.
|
||||
ip := net.ParseIP(m.config.AdvertiseAddr)
|
||||
if ip == nil {
|
||||
return fmt.Errorf("Failed to parse advertise address!")
|
||||
}
|
||||
|
||||
// Ensure IPv4 conversion if necessary
|
||||
if ip4 := ip.To4(); ip4 != nil {
|
||||
ip = ip4
|
||||
}
|
||||
|
||||
advertiseAddr = ip
|
||||
advertisePort = m.config.AdvertisePort
|
||||
} else {
|
||||
if m.config.BindAddr == "0.0.0.0" {
|
||||
// Otherwise, if we're not bound to a specific IP, let's use a suitable
|
||||
// private IP address.
|
||||
var err error
|
||||
m.config.AdvertiseAddr, err = sockaddr.GetPrivateIP()
|
||||
// Get the final advertise address from the transport, which may need
|
||||
// to see which address we bound to.
|
||||
addr, port, err := m.transport.FinalAdvertiseAddr(
|
||||
m.config.AdvertiseAddr, m.config.AdvertisePort)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get interface addresses: %v", err)
|
||||
}
|
||||
if m.config.AdvertiseAddr == "" {
|
||||
return fmt.Errorf("No private IP address found, and explicit IP not provided")
|
||||
}
|
||||
|
||||
advertiseAddr = net.ParseIP(m.config.AdvertiseAddr)
|
||||
if advertiseAddr == nil {
|
||||
return fmt.Errorf("Failed to parse advertise address: %q", m.config.AdvertiseAddr)
|
||||
}
|
||||
} else {
|
||||
// Use the IP that we're bound to.
|
||||
addr := m.tcpListener.Addr().(*net.TCPAddr)
|
||||
advertiseAddr = addr.IP
|
||||
}
|
||||
|
||||
// Use the port we are bound to.
|
||||
advertisePort = m.tcpListener.Addr().(*net.TCPAddr).Port
|
||||
return fmt.Errorf("Failed to get final advertise address: %v")
|
||||
}
|
||||
|
||||
// Check if this is a public address without encryption
|
||||
ipAddr, err := sockaddr.NewIPAddr(advertiseAddr.String())
|
||||
ipAddr, err := sockaddr.NewIPAddr(addr.String())
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse interface addresses: %v", err)
|
||||
}
|
||||
|
||||
ifAddrs := []sockaddr.IfAddr{
|
||||
sockaddr.IfAddr{
|
||||
SockAddr: ipAddr,
|
||||
},
|
||||
}
|
||||
|
||||
_, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs)
|
||||
if len(publicIfs) > 0 && !m.config.EncryptionEnabled() {
|
||||
m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!")
|
||||
}
|
||||
|
||||
// Get the node meta data
|
||||
// Set any metadata from the delegate.
|
||||
var meta []byte
|
||||
if m.config.Delegate != nil {
|
||||
meta = m.config.Delegate.NodeMeta(MetaMaxSize)
|
||||
|
@ -400,8 +364,8 @@ func (m *Memberlist) setAlive() error {
|
|||
a := alive{
|
||||
Incarnation: m.nextIncarnation(),
|
||||
Node: m.config.Name,
|
||||
Addr: advertiseAddr,
|
||||
Port: uint16(advertisePort),
|
||||
Addr: addr,
|
||||
Port: uint16(port),
|
||||
Meta: meta,
|
||||
Vsn: []uint8{
|
||||
ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
|
||||
|
@ -410,7 +374,6 @@ func (m *Memberlist) setAlive() error {
|
|||
},
|
||||
}
|
||||
m.aliveNode(&a, nil, true)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -473,13 +436,8 @@ func (m *Memberlist) UpdateNode(timeout time.Duration) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SendTo is used to directly send a message to another node, without
|
||||
// the use of the gossip mechanism. This will encode the message as a
|
||||
// user-data message, which a delegate will receive through NotifyMsg
|
||||
// The actual data is transmitted over UDP, which means this is a
|
||||
// best-effort transmission mechanism, and the maximum size of the
|
||||
// message is the size of a single UDP datagram, after compression.
|
||||
// This method is DEPRECATED in favor or SendToUDP
|
||||
// SendTo is deprecated in favor of SendBestEffort, which requires a node to
|
||||
// target.
|
||||
func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
|
||||
// Encode as a user message
|
||||
buf := make([]byte, 1, len(msg)+1)
|
||||
|
@ -487,36 +445,39 @@ func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
|
|||
buf = append(buf, msg...)
|
||||
|
||||
// Send the message
|
||||
return m.rawSendMsgUDP(to, nil, buf)
|
||||
return m.rawSendMsgPacket(to.String(), nil, buf)
|
||||
}
|
||||
|
||||
// SendToUDP is used to directly send a message to another node, without
|
||||
// the use of the gossip mechanism. This will encode the message as a
|
||||
// user-data message, which a delegate will receive through NotifyMsg
|
||||
// The actual data is transmitted over UDP, which means this is a
|
||||
// best-effort transmission mechanism, and the maximum size of the
|
||||
// message is the size of a single UDP datagram, after compression
|
||||
// SendToUDP is deprecated in favor of SendBestEffort.
|
||||
func (m *Memberlist) SendToUDP(to *Node, msg []byte) error {
|
||||
return m.SendBestEffort(to, msg)
|
||||
}
|
||||
|
||||
// SendToTCP is deprecated in favor of SendReliable.
|
||||
func (m *Memberlist) SendToTCP(to *Node, msg []byte) error {
|
||||
return m.SendReliable(to, msg)
|
||||
}
|
||||
|
||||
// SendBestEffort uses the unreliable packet-oriented interface of the transport
|
||||
// to target a user message at the given node (this does not use the gossip
|
||||
// mechanism). The maximum size of the message depends on the configured
|
||||
// UDPBufferSize for this memberlist instance.
|
||||
func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error {
|
||||
// Encode as a user message
|
||||
buf := make([]byte, 1, len(msg)+1)
|
||||
buf[0] = byte(userMsg)
|
||||
buf = append(buf, msg...)
|
||||
|
||||
// Send the message
|
||||
destAddr := &net.UDPAddr{IP: to.Addr, Port: int(to.Port)}
|
||||
return m.rawSendMsgUDP(destAddr, to, buf)
|
||||
return m.rawSendMsgPacket(to.Address(), to, buf)
|
||||
}
|
||||
|
||||
// SendToTCP is used to directly send a message to another node, without
|
||||
// the use of the gossip mechanism. This will encode the message as a
|
||||
// user-data message, which a delegate will receive through NotifyMsg
|
||||
// The actual data is transmitted over TCP, which means delivery
|
||||
// is guaranteed if no error is returned. There is no limit
|
||||
// to the size of the message
|
||||
func (m *Memberlist) SendToTCP(to *Node, msg []byte) error {
|
||||
// Send the message
|
||||
destAddr := &net.TCPAddr{IP: to.Addr, Port: int(to.Port)}
|
||||
return m.sendTCPUserMsg(destAddr, msg)
|
||||
// SendReliable uses the reliable stream-oriented interface of the transport to
|
||||
// target a user message at the given node (this does not use the gossip
|
||||
// mechanism). Delivery is guaranteed if no error is returned, and there is no
|
||||
// limit on the size of the message.
|
||||
func (m *Memberlist) SendReliable(to *Node, msg []byte) error {
|
||||
return m.sendUserMsg(to.Address(), msg)
|
||||
}
|
||||
|
||||
// Members returns a list of all known live nodes. The node structures
|
||||
|
@ -651,10 +612,14 @@ func (m *Memberlist) Shutdown() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Shut down the transport first, which should block until it's
|
||||
// completely torn down. If we kill the memberlist-side handlers
|
||||
// those I/O handlers might get stuck.
|
||||
m.transport.Shutdown()
|
||||
|
||||
// Now tear down everything else.
|
||||
m.shutdown = true
|
||||
close(m.shutdownCh)
|
||||
m.deschedule()
|
||||
m.udpListener.Close()
|
||||
m.tcpListener.Close()
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
package memberlist
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
// MockNetwork is used as a factory that produces MockTransport instances which
|
||||
// are uniquely addressed and wired up to talk to each other.
|
||||
type MockNetwork struct {
|
||||
transports map[string]*MockTransport
|
||||
port int
|
||||
}
|
||||
|
||||
// NewTransport returns a new MockTransport with a unique address, wired up to
|
||||
// talk to the other transports in the MockNetwork.
|
||||
func (n *MockNetwork) NewTransport() *MockTransport {
|
||||
n.port += 1
|
||||
addr := fmt.Sprintf("127.0.0.1:%d", n.port)
|
||||
transport := &MockTransport{
|
||||
net: n,
|
||||
addr: &MockAddress{addr},
|
||||
packetCh: make(chan *Packet),
|
||||
streamCh: make(chan net.Conn),
|
||||
}
|
||||
|
||||
if n.transports == nil {
|
||||
n.transports = make(map[string]*MockTransport)
|
||||
}
|
||||
n.transports[addr] = transport
|
||||
return transport
|
||||
}
|
||||
|
||||
// MockAddress is a wrapper which adds the net.Addr interface to our mock
|
||||
// address scheme.
|
||||
type MockAddress struct {
|
||||
addr string
|
||||
}
|
||||
|
||||
// See net.Addr.
|
||||
func (a *MockAddress) Network() string {
|
||||
return "mock"
|
||||
}
|
||||
|
||||
// See net.Addr.
|
||||
func (a *MockAddress) String() string {
|
||||
return a.addr
|
||||
}
|
||||
|
||||
// MockTransport directly plumbs messages to other transports its MockNetwork.
|
||||
type MockTransport struct {
|
||||
net *MockNetwork
|
||||
addr *MockAddress
|
||||
packetCh chan *Packet
|
||||
streamCh chan net.Conn
|
||||
}
|
||||
|
||||
// See Transport.
|
||||
func (t *MockTransport) FinalAdvertiseAddr(string, int) (net.IP, int, error) {
|
||||
host, portStr, err := net.SplitHostPort(t.addr.String())
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
ip := net.ParseIP(host)
|
||||
if ip == nil {
|
||||
return nil, 0, fmt.Errorf("Failed to parse IP %q", host)
|
||||
}
|
||||
|
||||
port, err := strconv.ParseInt(portStr, 10, 16)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return ip, int(port), nil
|
||||
}
|
||||
|
||||
// See Transport.
|
||||
func (t *MockTransport) WriteTo(b []byte, addr string) (time.Time, error) {
|
||||
dest, ok := t.net.transports[addr]
|
||||
if !ok {
|
||||
return time.Time{}, fmt.Errorf("No route to %q", addr)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
dest.packetCh <- &Packet{
|
||||
Buf: b,
|
||||
From: t.addr,
|
||||
Timestamp: now,
|
||||
}
|
||||
return now, nil
|
||||
}
|
||||
|
||||
// See Transport.
|
||||
func (t *MockTransport) PacketCh() <-chan *Packet {
|
||||
return t.packetCh
|
||||
}
|
||||
|
||||
// See Transport.
|
||||
func (t *MockTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
|
||||
dest, ok := t.net.transports[addr]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("No route to %q", addr)
|
||||
}
|
||||
|
||||
p1, p2 := net.Pipe()
|
||||
dest.streamCh <- p1
|
||||
return p2, nil
|
||||
}
|
||||
|
||||
// See Transport.
|
||||
func (t *MockTransport) StreamCh() <-chan net.Conn {
|
||||
return t.streamCh
|
||||
}
|
||||
|
||||
// See Transport.
|
||||
func (t *MockTransport) Shutdown() error {
|
||||
return nil
|
||||
}
|
|
@ -68,8 +68,6 @@ const (
|
|||
MetaMaxSize = 512 // Maximum size for node meta data
|
||||
compoundHeaderOverhead = 2 // Assumed header overhead
|
||||
compoundOverhead = 2 // Assumed overhead per entry in compoundHeader
|
||||
udpBufSize = 65536
|
||||
udpRecvBuf = 2 * 1024 * 1024
|
||||
userMsgOverhead = 1
|
||||
blockingWarning = 10 * time.Millisecond // Warn if a UDP packet takes this long to process
|
||||
maxPushStateBytes = 10 * 1024 * 1024
|
||||
|
@ -185,43 +183,29 @@ func (m *Memberlist) encryptionVersion() encryptionVersion {
|
|||
}
|
||||
}
|
||||
|
||||
// setUDPRecvBuf is used to resize the UDP receive window. The function
|
||||
// attempts to set the read buffer to `udpRecvBuf` but backs off until
|
||||
// the read buffer can be set.
|
||||
func setUDPRecvBuf(c *net.UDPConn) {
|
||||
size := udpRecvBuf
|
||||
// streamListen is a long running goroutine that pulls incoming streams from the
|
||||
// transport and hands them off for processing.
|
||||
func (m *Memberlist) streamListen() {
|
||||
for {
|
||||
if err := c.SetReadBuffer(size); err == nil {
|
||||
break
|
||||
}
|
||||
size = size / 2
|
||||
}
|
||||
}
|
||||
|
||||
// tcpListen listens for and handles incoming connections
|
||||
func (m *Memberlist) tcpListen() {
|
||||
for {
|
||||
conn, err := m.tcpListener.AcceptTCP()
|
||||
if err != nil {
|
||||
if m.shutdown {
|
||||
break
|
||||
}
|
||||
m.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %s", err)
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case conn := <-m.transport.StreamCh():
|
||||
go m.handleConn(conn)
|
||||
|
||||
case <-m.shutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleConn handles a single incoming TCP connection
|
||||
func (m *Memberlist) handleConn(conn *net.TCPConn) {
|
||||
m.logger.Printf("[DEBUG] memberlist: TCP connection %s", LogConn(conn))
|
||||
// handleConn handles a single incoming stream connection from the transport.
|
||||
func (m *Memberlist) handleConn(conn net.Conn) {
|
||||
m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn))
|
||||
|
||||
defer conn.Close()
|
||||
metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1)
|
||||
|
||||
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
|
||||
msgType, bufConn, dec, err := m.readTCP(conn)
|
||||
msgType, bufConn, dec, err := m.readStream(conn)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
m.logger.Printf("[ERR] memberlist: failed to receive: %s %s", err, LogConn(conn))
|
||||
|
@ -253,7 +237,7 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) {
|
|||
case pingMsg:
|
||||
var p ping
|
||||
if err := dec.Decode(&p); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to decode TCP ping: %s %s", err, LogConn(conn))
|
||||
m.logger.Printf("[ERR] memberlist: Failed to decode ping: %s %s", err, LogConn(conn))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -265,13 +249,13 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) {
|
|||
ack := ackResp{p.SeqNo, nil}
|
||||
out, err := encode(ackRespMsg, &ack)
|
||||
if err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to encode TCP ack: %s", err)
|
||||
m.logger.Printf("[ERR] memberlist: Failed to encode ack: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = m.rawSendMsgTCP(conn, out.Bytes())
|
||||
err = m.rawSendMsgStream(conn, out.Bytes())
|
||||
if err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send TCP ack: %s %s", err, LogConn(conn))
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogConn(conn))
|
||||
return
|
||||
}
|
||||
default:
|
||||
|
@ -279,49 +263,17 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) {
|
|||
}
|
||||
}
|
||||
|
||||
// udpListen listens for and handles incoming UDP packets
|
||||
func (m *Memberlist) udpListen() {
|
||||
var n int
|
||||
var addr net.Addr
|
||||
var err error
|
||||
var lastPacket time.Time
|
||||
// packetListen is a long running goroutine that pulls packets out of the
|
||||
// transport and hands them off for processing.
|
||||
func (m *Memberlist) packetListen() {
|
||||
for {
|
||||
// Do a check for potentially blocking operations
|
||||
if !lastPacket.IsZero() && time.Now().Sub(lastPacket) > blockingWarning {
|
||||
diff := time.Now().Sub(lastPacket)
|
||||
m.logger.Printf(
|
||||
"[DEBUG] memberlist: Potential blocking operation. Last command took %v",
|
||||
diff)
|
||||
select {
|
||||
case packet := <-m.transport.PacketCh():
|
||||
m.ingestPacket(packet.Buf, packet.From, packet.Timestamp)
|
||||
|
||||
case <-m.shutdownCh:
|
||||
return
|
||||
}
|
||||
|
||||
// Create a new buffer
|
||||
// TODO: Use Sync.Pool eventually
|
||||
buf := make([]byte, udpBufSize)
|
||||
|
||||
// Read a packet
|
||||
n, addr, err = m.udpListener.ReadFrom(buf)
|
||||
if err != nil {
|
||||
if m.shutdown {
|
||||
break
|
||||
}
|
||||
m.logger.Printf("[ERR] memberlist: Error reading UDP packet: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Capture the reception time of the packet as close to the
|
||||
// system calls as possible.
|
||||
lastPacket = time.Now()
|
||||
|
||||
// Check the length
|
||||
if n < 1 {
|
||||
m.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes) %s",
|
||||
len(buf), LogAddress(addr))
|
||||
continue
|
||||
}
|
||||
|
||||
// Ingest this packet
|
||||
metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n))
|
||||
m.ingestPacket(buf[:n], addr, lastPacket)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -384,18 +336,18 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim
|
|||
select {
|
||||
case m.handoff <- msgHandoff{msgType, buf, from}:
|
||||
default:
|
||||
m.logger.Printf("[WARN] memberlist: UDP handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
|
||||
m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
|
||||
}
|
||||
|
||||
default:
|
||||
m.logger.Printf("[ERR] memberlist: UDP msg type (%d) not supported %s", msgType, LogAddress(from))
|
||||
m.logger.Printf("[ERR] memberlist: msg type (%d) not supported %s", msgType, LogAddress(from))
|
||||
}
|
||||
}
|
||||
|
||||
// udpHandler processes messages received over UDP, but is decoupled
|
||||
// from the listener to avoid blocking the listener which may cause
|
||||
// ping/ack messages to be delayed.
|
||||
func (m *Memberlist) udpHandler() {
|
||||
// packetHandler is a long running goroutine that processes messages received
|
||||
// over the packet interface, but is decoupled from the listener to avoid
|
||||
// blocking the listener which may cause ping/ack messages to be delayed.
|
||||
func (m *Memberlist) packetHandler() {
|
||||
for {
|
||||
select {
|
||||
case msg := <-m.handoff:
|
||||
|
@ -413,7 +365,7 @@ func (m *Memberlist) udpHandler() {
|
|||
case userMsg:
|
||||
m.handleUser(buf, from)
|
||||
default:
|
||||
m.logger.Printf("[ERR] memberlist: UDP msg type (%d) not supported %s (handler)", msgType, LogAddress(from))
|
||||
m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from))
|
||||
}
|
||||
|
||||
case <-m.shutdownCh:
|
||||
|
@ -457,7 +409,7 @@ func (m *Memberlist) handlePing(buf []byte, from net.Addr) {
|
|||
if m.config.Ping != nil {
|
||||
ack.Payload = m.config.Ping.AckPayload()
|
||||
}
|
||||
if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil {
|
||||
if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogAddress(from))
|
||||
}
|
||||
}
|
||||
|
@ -478,7 +430,6 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
|
|||
// Send a ping to the correct host.
|
||||
localSeqNo := m.nextSeqNo()
|
||||
ping := ping{SeqNo: localSeqNo, Node: ind.Node}
|
||||
destAddr := &net.UDPAddr{IP: ind.Target, Port: int(ind.Port)}
|
||||
|
||||
// Setup a response handler to relay the ack
|
||||
cancelCh := make(chan struct{})
|
||||
|
@ -488,14 +439,15 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
|
|||
|
||||
// Forward the ack back to the requestor.
|
||||
ack := ackResp{ind.SeqNo, nil}
|
||||
if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil {
|
||||
if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogAddress(from))
|
||||
}
|
||||
}
|
||||
m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout)
|
||||
|
||||
// Send the ping.
|
||||
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
|
||||
addr := joinHostPort(net.IP(ind.Target).String(), ind.Port)
|
||||
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from))
|
||||
}
|
||||
|
||||
|
@ -507,7 +459,7 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
|
|||
return
|
||||
case <-time.After(m.config.ProbeTimeout):
|
||||
nack := nackResp{ind.SeqNo}
|
||||
if err := m.encodeAndSendMsg(from, nackRespMsg, &nack); err != nil {
|
||||
if err := m.encodeAndSendMsg(from.String(), nackRespMsg, &nack); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogAddress(from))
|
||||
}
|
||||
}
|
||||
|
@ -589,20 +541,20 @@ func (m *Memberlist) handleCompressed(buf []byte, from net.Addr, timestamp time.
|
|||
}
|
||||
|
||||
// encodeAndSendMsg is used to combine the encoding and sending steps
|
||||
func (m *Memberlist) encodeAndSendMsg(to net.Addr, msgType messageType, msg interface{}) error {
|
||||
func (m *Memberlist) encodeAndSendMsg(addr string, msgType messageType, msg interface{}) error {
|
||||
out, err := encode(msgType, msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := m.sendMsg(to, out.Bytes()); err != nil {
|
||||
if err := m.sendMsg(addr, out.Bytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendMsg is used to send a UDP message to another host. It will opportunistically
|
||||
// create a compoundMsg and piggy back other broadcasts
|
||||
func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error {
|
||||
// sendMsg is used to send a message via packet to another host. It will
|
||||
// opportunistically create a compoundMsg and piggy back other broadcasts.
|
||||
func (m *Memberlist) sendMsg(addr string, msg []byte) error {
|
||||
// Check if we can piggy back any messages
|
||||
bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead
|
||||
if m.config.EncryptionEnabled() {
|
||||
|
@ -612,7 +564,7 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error {
|
|||
|
||||
// Fast path if nothing to piggypack
|
||||
if len(extra) == 0 {
|
||||
return m.rawSendMsgUDP(to, nil, msg)
|
||||
return m.rawSendMsgPacket(addr, nil, msg)
|
||||
}
|
||||
|
||||
// Join all the messages
|
||||
|
@ -624,11 +576,12 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error {
|
|||
compound := makeCompoundMessage(msgs)
|
||||
|
||||
// Send the message
|
||||
return m.rawSendMsgUDP(to, nil, compound.Bytes())
|
||||
return m.rawSendMsgPacket(addr, nil, compound.Bytes())
|
||||
}
|
||||
|
||||
// rawSendMsgUDP is used to send a UDP message to another host without modification
|
||||
func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error {
|
||||
// rawSendMsgPacket is used to send message via packet to another host without
|
||||
// modification, other than compression or encryption if enabled.
|
||||
func (m *Memberlist) rawSendMsgPacket(addr string, node *Node, msg []byte) error {
|
||||
// Check if we have compression enabled
|
||||
if m.config.EnableCompression {
|
||||
buf, err := compressPayload(msg)
|
||||
|
@ -644,9 +597,9 @@ func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error
|
|||
|
||||
// Try to look up the destination node
|
||||
if node == nil {
|
||||
toAddr, _, err := net.SplitHostPort(addr.String())
|
||||
toAddr, _, err := net.SplitHostPort(addr)
|
||||
if err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr.String(), err)
|
||||
m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr, err)
|
||||
return err
|
||||
}
|
||||
m.nodeLock.RLock()
|
||||
|
@ -681,12 +634,13 @@ func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error
|
|||
}
|
||||
|
||||
metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg)))
|
||||
_, err := m.udpListener.WriteTo(msg, addr)
|
||||
_, err := m.transport.WriteTo(msg, addr)
|
||||
return err
|
||||
}
|
||||
|
||||
// rawSendMsgTCP is used to send a TCP message to another host without modification
|
||||
func (m *Memberlist) rawSendMsgTCP(conn net.Conn, sendBuf []byte) error {
|
||||
// rawSendMsgStream is used to stream a message to another host without
|
||||
// modification, other than applying compression and encryption if enabled.
|
||||
func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte) error {
|
||||
// Check if compresion is enabled
|
||||
if m.config.EnableCompression {
|
||||
compBuf, err := compressPayload(sendBuf)
|
||||
|
@ -719,43 +673,36 @@ func (m *Memberlist) rawSendMsgTCP(conn net.Conn, sendBuf []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// sendTCPUserMsg is used to send a TCP userMsg to another host
|
||||
func (m *Memberlist) sendTCPUserMsg(to net.Addr, sendBuf []byte) error {
|
||||
dialer := net.Dialer{Timeout: m.config.TCPTimeout}
|
||||
conn, err := dialer.Dial("tcp", to.String())
|
||||
// sendUserMsg is used to stream a user message to another host.
|
||||
func (m *Memberlist) sendUserMsg(addr string, sendBuf []byte) error {
|
||||
conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
bufConn := bytes.NewBuffer(nil)
|
||||
|
||||
if err := bufConn.WriteByte(byte(userMsg)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send our node state
|
||||
header := userMsgHeader{UserMsgLen: len(sendBuf)}
|
||||
hd := codec.MsgpackHandle{}
|
||||
enc := codec.NewEncoder(bufConn, &hd)
|
||||
|
||||
if err := enc.Encode(&header); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := bufConn.Write(sendBuf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return m.rawSendMsgTCP(conn, bufConn.Bytes())
|
||||
return m.rawSendMsgStream(conn, bufConn.Bytes())
|
||||
}
|
||||
|
||||
// sendAndReceiveState is used to initiate a push/pull over TCP with a remote node
|
||||
func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([]pushNodeState, []byte, error) {
|
||||
// sendAndReceiveState is used to initiate a push/pull over a stream with a
|
||||
// remote host.
|
||||
func (m *Memberlist) sendAndReceiveState(addr string, join bool) ([]pushNodeState, []byte, error) {
|
||||
// Attempt to connect
|
||||
dialer := net.Dialer{Timeout: m.config.TCPTimeout}
|
||||
dest := net.TCPAddr{IP: addr, Port: int(port)}
|
||||
conn, err := dialer.Dial("tcp", dest.String())
|
||||
conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -769,7 +716,7 @@ func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([
|
|||
}
|
||||
|
||||
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
|
||||
msgType, bufConn, dec, err := m.readTCP(conn)
|
||||
msgType, bufConn, dec, err := m.readStream(conn)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -785,7 +732,7 @@ func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([
|
|||
return remoteNodes, userState, err
|
||||
}
|
||||
|
||||
// sendLocalState is invoked to send our local state over a tcp connection
|
||||
// sendLocalState is invoked to send our local state over a stream connection.
|
||||
func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error {
|
||||
// Setup a deadline
|
||||
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
|
||||
|
@ -843,7 +790,7 @@ func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error {
|
|||
}
|
||||
|
||||
// Get the send buffer
|
||||
return m.rawSendMsgTCP(conn, bufConn.Bytes())
|
||||
return m.rawSendMsgStream(conn, bufConn.Bytes())
|
||||
}
|
||||
|
||||
// encryptLocalState is used to help encrypt local state before sending
|
||||
|
@ -901,9 +848,9 @@ func (m *Memberlist) decryptRemoteState(bufConn io.Reader) ([]byte, error) {
|
|||
return decryptPayload(keys, cipherBytes, dataBytes)
|
||||
}
|
||||
|
||||
// readTCP is used to read the start of a TCP stream.
|
||||
// it decrypts and decompresses the stream if necessary
|
||||
func (m *Memberlist) readTCP(conn net.Conn) (messageType, io.Reader, *codec.Decoder, error) {
|
||||
// readStream is used to read from a stream connection, decrypting and
|
||||
// decompressing the stream if necessary.
|
||||
func (m *Memberlist) readStream(conn net.Conn) (messageType, io.Reader, *codec.Decoder, error) {
|
||||
// Created a buffered reader
|
||||
var bufConn io.Reader = bufio.NewReader(conn)
|
||||
|
||||
|
@ -1044,7 +991,7 @@ func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, us
|
|||
return nil
|
||||
}
|
||||
|
||||
// readUserMsg is used to decode a userMsg from a TCP stream
|
||||
// readUserMsg is used to decode a userMsg from a stream.
|
||||
func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
|
||||
// Read the user message header
|
||||
var header userMsgHeader
|
||||
|
@ -1075,13 +1022,12 @@ func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// sendPingAndWaitForAck makes a TCP connection to the given address, sends
|
||||
// sendPingAndWaitForAck makes a stream connection to the given address, sends
|
||||
// a ping, and waits for an ack. All of this is done as a series of blocking
|
||||
// operations, given the deadline. The bool return parameter is true if we
|
||||
// we able to round trip a ping to the other node.
|
||||
func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadline time.Time) (bool, error) {
|
||||
dialer := net.Dialer{Deadline: deadline}
|
||||
conn, err := dialer.Dial("tcp", destAddr.String())
|
||||
func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time.Time) (bool, error) {
|
||||
conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
|
||||
if err != nil {
|
||||
// If the node is actually dead we expect this to fail, so we
|
||||
// shouldn't spam the logs with it. After this point, errors
|
||||
|
@ -1097,17 +1043,17 @@ func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadlin
|
|||
return false, err
|
||||
}
|
||||
|
||||
if err = m.rawSendMsgTCP(conn, out.Bytes()); err != nil {
|
||||
if err = m.rawSendMsgStream(conn, out.Bytes()); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
msgType, _, dec, err := m.readTCP(conn)
|
||||
msgType, _, dec, err := m.readStream(conn)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if msgType != ackRespMsg {
|
||||
return false, fmt.Errorf("Unexpected msgType (%d) from TCP ping %s", msgType, LogConn(conn))
|
||||
return false, fmt.Errorf("Unexpected msgType (%d) from ping %s", msgType, LogConn(conn))
|
||||
}
|
||||
|
||||
var ack ackResp
|
||||
|
@ -1116,7 +1062,7 @@ func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadlin
|
|||
}
|
||||
|
||||
if ack.SeqNo != ping.SeqNo {
|
||||
return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d) from TCP ping %s", ack.SeqNo, ping.SeqNo, LogConn(conn))
|
||||
return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo, LogConn(conn))
|
||||
}
|
||||
|
||||
return true, nil
|
||||
|
|
|
@ -0,0 +1,289 @@
|
|||
package memberlist
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
sockaddr "github.com/hashicorp/go-sockaddr"
|
||||
)
|
||||
|
||||
const (
|
||||
// udpPacketBufSize is used to buffer incoming packets during read
|
||||
// operations.
|
||||
udpPacketBufSize = 65536
|
||||
|
||||
// udpRecvBufSize is a large buffer size that we attempt to set UDP
|
||||
// sockets to in order to handle a large volume of messages.
|
||||
udpRecvBufSize = 2 * 1024 * 1024
|
||||
)
|
||||
|
||||
// NetTransportConfig is used to configure a net transport.
|
||||
type NetTransportConfig struct {
|
||||
// BindAddrs is a list of addresses to bind to for both TCP and UDP
|
||||
// communications.
|
||||
BindAddrs []string
|
||||
|
||||
// BindPort is the port to listen on, for each address above.
|
||||
BindPort int
|
||||
|
||||
// Logger is a logger for operator messages.
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
||||
// NetTransport is a Transport implementation that uses connectionless UDP for
|
||||
// packet operations, and ad-hoc TCP connections for stream operations.
|
||||
type NetTransport struct {
|
||||
config *NetTransportConfig
|
||||
packetCh chan *Packet
|
||||
streamCh chan net.Conn
|
||||
logger *log.Logger
|
||||
wg sync.WaitGroup
|
||||
tcpListeners []*net.TCPListener
|
||||
udpListeners []*net.UDPConn
|
||||
shutdown int32
|
||||
}
|
||||
|
||||
// NewNetTransport returns a net transport with the given configuration. On
|
||||
// success all the network listeners will be created and listening.
|
||||
func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) {
|
||||
// If we reject the empty list outright we can assume that there's at
|
||||
// least one listener of each type later during operation.
|
||||
if len(config.BindAddrs) == 0 {
|
||||
return nil, fmt.Errorf("At least one bind address is required")
|
||||
}
|
||||
|
||||
// Build out the new transport.
|
||||
var ok bool
|
||||
t := NetTransport{
|
||||
config: config,
|
||||
packetCh: make(chan *Packet),
|
||||
streamCh: make(chan net.Conn),
|
||||
logger: config.Logger,
|
||||
}
|
||||
|
||||
// Clean up listeners if there's an error.
|
||||
defer func() {
|
||||
if !ok {
|
||||
t.Shutdown()
|
||||
}
|
||||
}()
|
||||
|
||||
// Build all the TCP and UDP listeners.
|
||||
port := config.BindPort
|
||||
for _, addr := range config.BindAddrs {
|
||||
ip := net.ParseIP(addr)
|
||||
|
||||
tcpAddr := &net.TCPAddr{IP: ip, Port: port}
|
||||
tcpLn, err := net.ListenTCP("tcp", tcpAddr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to start TCP listener on %q port %d: %v", addr, port, err)
|
||||
}
|
||||
t.tcpListeners = append(t.tcpListeners, tcpLn)
|
||||
|
||||
// If the config port given was zero, use the first TCP listener
|
||||
// to pick an available port and then apply that to everything
|
||||
// else.
|
||||
if port == 0 {
|
||||
port = tcpLn.Addr().(*net.TCPAddr).Port
|
||||
}
|
||||
|
||||
udpAddr := &net.UDPAddr{IP: ip, Port: port}
|
||||
udpLn, err := net.ListenUDP("udp", udpAddr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to start UDP listener on %q port %d: %v", addr, port, err)
|
||||
}
|
||||
if err := setUDPRecvBuf(udpLn); err != nil {
|
||||
return nil, fmt.Errorf("Failed to resize UDP buffer: %v", err)
|
||||
}
|
||||
t.udpListeners = append(t.udpListeners, udpLn)
|
||||
}
|
||||
|
||||
// Fire them up now that we've been able to create them all.
|
||||
for i := 0; i < len(config.BindAddrs); i++ {
|
||||
t.wg.Add(2)
|
||||
go t.tcpListen(t.tcpListeners[i])
|
||||
go t.udpListen(t.udpListeners[i])
|
||||
}
|
||||
|
||||
ok = true
|
||||
return &t, nil
|
||||
}
|
||||
|
||||
// GetAutoBindPort returns the bind port that was automatically given by the
|
||||
// kernel, if a bind port of 0 was given.
|
||||
func (t *NetTransport) GetAutoBindPort() int {
|
||||
// We made sure there's at least one TCP listener, and that one's
|
||||
// port was applied to all the others for the dynamic bind case.
|
||||
return t.tcpListeners[0].Addr().(*net.TCPAddr).Port
|
||||
}
|
||||
|
||||
// See Transport.
|
||||
func (t *NetTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, error) {
|
||||
var advertiseAddr net.IP
|
||||
var advertisePort int
|
||||
if ip != "" {
|
||||
// If they've supplied an address, use that.
|
||||
advertiseAddr = net.ParseIP(ip)
|
||||
if advertiseAddr == nil {
|
||||
return nil, 0, fmt.Errorf("Failed to parse advertise address %q", ip)
|
||||
}
|
||||
|
||||
// Ensure IPv4 conversion if necessary.
|
||||
if ip4 := advertiseAddr.To4(); ip4 != nil {
|
||||
advertiseAddr = ip4
|
||||
}
|
||||
advertisePort = port
|
||||
} else {
|
||||
if t.config.BindAddrs[0] == "0.0.0.0" {
|
||||
// Otherwise, if we're not bound to a specific IP, let's
|
||||
// use a suitable private IP address.
|
||||
var err error
|
||||
ip, err = sockaddr.GetPrivateIP()
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("Failed to get interface addresses: %v", err)
|
||||
}
|
||||
if ip == "" {
|
||||
return nil, 0, fmt.Errorf("No private IP address found, and explicit IP not provided")
|
||||
}
|
||||
|
||||
advertiseAddr = net.ParseIP(ip)
|
||||
if advertiseAddr == nil {
|
||||
return nil, 0, fmt.Errorf("Failed to parse advertise address: %q", ip)
|
||||
}
|
||||
} else {
|
||||
// Use the IP that we're bound to, based on the first
|
||||
// TCP listener, which we already ensure is there.
|
||||
advertiseAddr = t.tcpListeners[0].Addr().(*net.TCPAddr).IP
|
||||
}
|
||||
|
||||
// Use the port we are bound to.
|
||||
advertisePort = t.GetAutoBindPort()
|
||||
}
|
||||
|
||||
return advertiseAddr, advertisePort, nil
|
||||
}
|
||||
|
||||
// See Transport.
|
||||
func (t *NetTransport) WriteTo(b []byte, addr string) (time.Time, error) {
|
||||
udpAddr, err := net.ResolveUDPAddr("udp", addr)
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
|
||||
// We made sure there's at least one UDP listener, so just use the
|
||||
// packet sending interface on the first one. Take the time after the
|
||||
// write call comes back, which will underestimate the time a little,
|
||||
// but help account for any delays before the write occurs.
|
||||
_, err = t.udpListeners[0].WriteTo(b, udpAddr)
|
||||
return time.Now(), err
|
||||
}
|
||||
|
||||
// See Transport.
|
||||
func (t *NetTransport) PacketCh() <-chan *Packet {
|
||||
return t.packetCh
|
||||
}
|
||||
|
||||
// See Transport.
|
||||
func (t *NetTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
|
||||
dialer := net.Dialer{Timeout: timeout}
|
||||
return dialer.Dial("tcp", addr)
|
||||
}
|
||||
|
||||
// See Transport.
|
||||
func (t *NetTransport) StreamCh() <-chan net.Conn {
|
||||
return t.streamCh
|
||||
}
|
||||
|
||||
// See Transport.
|
||||
func (t *NetTransport) Shutdown() error {
|
||||
// This will avoid log spam about errors when we shut down.
|
||||
atomic.StoreInt32(&t.shutdown, 1)
|
||||
|
||||
// Rip through all the connections and shut them down.
|
||||
for _, conn := range t.tcpListeners {
|
||||
conn.Close()
|
||||
}
|
||||
for _, conn := range t.udpListeners {
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// Block until all the listener threads have died.
|
||||
t.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// tcpListen is a long running goroutine that accepts incoming TCP connections
|
||||
// and hands them off to the stream channel.
|
||||
func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) {
|
||||
defer t.wg.Done()
|
||||
for {
|
||||
conn, err := tcpLn.AcceptTCP()
|
||||
if err != nil {
|
||||
if s := atomic.LoadInt32(&t.shutdown); s == 1 {
|
||||
break
|
||||
}
|
||||
|
||||
t.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
t.streamCh <- conn
|
||||
}
|
||||
}
|
||||
|
||||
// udpListen is a long running goroutine that accepts incoming UDP packets and
|
||||
// hands them off to the packet channel.
|
||||
func (t *NetTransport) udpListen(udpLn *net.UDPConn) {
|
||||
defer t.wg.Done()
|
||||
for {
|
||||
// Do a blocking read into a fresh buffer. Grab a time stamp as
|
||||
// close as possible to the I/O.
|
||||
buf := make([]byte, udpPacketBufSize)
|
||||
n, addr, err := udpLn.ReadFrom(buf)
|
||||
ts := time.Now()
|
||||
if err != nil {
|
||||
if s := atomic.LoadInt32(&t.shutdown); s == 1 {
|
||||
break
|
||||
}
|
||||
|
||||
t.logger.Printf("[ERR] memberlist: Error reading UDP packet: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check the length - it needs to have at least one byte to be a
|
||||
// proper message.
|
||||
if n < 1 {
|
||||
t.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes) %s",
|
||||
len(buf), LogAddress(addr))
|
||||
continue
|
||||
}
|
||||
|
||||
// Ingest the packet.
|
||||
metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n))
|
||||
t.packetCh <- &Packet{
|
||||
Buf: buf[:n],
|
||||
From: addr,
|
||||
Timestamp: ts,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// setUDPRecvBuf is used to resize the UDP receive window. The function
|
||||
// attempts to set the read buffer to `udpRecvBuf` but backs off until
|
||||
// the read buffer can be set.
|
||||
func setUDPRecvBuf(c *net.UDPConn) error {
|
||||
size := udpRecvBufSize
|
||||
var err error
|
||||
for size > 0 {
|
||||
if err = c.SetReadBuffer(size); err == nil {
|
||||
return nil
|
||||
}
|
||||
size = size / 2
|
||||
}
|
||||
return err
|
||||
}
|
|
@ -34,6 +34,12 @@ type Node struct {
|
|||
DCur uint8 // Current version delegate is speaking
|
||||
}
|
||||
|
||||
// Address returns the host:port form of a node's address, suitable for use
|
||||
// with a transport.
|
||||
func (n *Node) Address() string {
|
||||
return joinHostPort(n.Addr.String(), n.Port)
|
||||
}
|
||||
|
||||
// NodeState is used to manage our state view of another node
|
||||
type nodeState struct {
|
||||
Node
|
||||
|
@ -42,6 +48,12 @@ type nodeState struct {
|
|||
StateChange time.Time // Time last state change happened
|
||||
}
|
||||
|
||||
// Address returns the host:port form of a node's address, suitable for use
|
||||
// with a transport.
|
||||
func (n *nodeState) Address() string {
|
||||
return n.Node.Address()
|
||||
}
|
||||
|
||||
// ackHandler is used to register handlers for incoming acks and nacks.
|
||||
type ackHandler struct {
|
||||
ackFn func([]byte, time.Time)
|
||||
|
@ -238,9 +250,9 @@ func (m *Memberlist) probeNode(node *nodeState) {
|
|||
// also tack on a suspect message so that it has a chance to refute as
|
||||
// soon as possible.
|
||||
deadline := time.Now().Add(probeInterval)
|
||||
destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
|
||||
addr := node.Address()
|
||||
if node.State == stateAlive {
|
||||
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
|
||||
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
|
||||
return
|
||||
}
|
||||
|
@ -261,8 +273,8 @@ func (m *Memberlist) probeNode(node *nodeState) {
|
|||
}
|
||||
|
||||
compound := makeCompoundMessage(msgs)
|
||||
if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", destAddr, err)
|
||||
if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -305,7 +317,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
|
|||
// probe interval it will give the TCP fallback more time, which
|
||||
// is more active in dealing with lost packets, and it gives more
|
||||
// time to wait for indirect acks/nacks.
|
||||
m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node.Name)
|
||||
m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name)
|
||||
}
|
||||
|
||||
// Get some random live nodes.
|
||||
|
@ -327,8 +339,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
|
|||
expectedNacks++
|
||||
}
|
||||
|
||||
destAddr := &net.UDPAddr{IP: peer.Addr, Port: int(peer.Port)}
|
||||
if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil {
|
||||
if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -345,12 +356,11 @@ func (m *Memberlist) probeNode(node *nodeState) {
|
|||
// config option to turn this off if desired.
|
||||
fallbackCh := make(chan bool, 1)
|
||||
if (!m.config.DisableTcpPings) && (node.PMax >= 3) {
|
||||
destAddr := &net.TCPAddr{IP: node.Addr, Port: int(node.Port)}
|
||||
go func() {
|
||||
defer close(fallbackCh)
|
||||
didContact, err := m.sendPingAndWaitForAck(destAddr, ping, deadline)
|
||||
didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline)
|
||||
if err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed TCP fallback ping: %s", err)
|
||||
m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
|
||||
} else {
|
||||
fallbackCh <- didContact
|
||||
}
|
||||
|
@ -375,7 +385,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
|
|||
// any additional time here.
|
||||
for didContact := range fallbackCh {
|
||||
if didContact {
|
||||
m.logger.Printf("[WARN] memberlist: Was able to reach %s via TCP but not UDP, network may be misconfigured and not allowing bidirectional UDP", node.Name)
|
||||
m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -410,7 +420,7 @@ func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
|
|||
m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval)
|
||||
|
||||
// Send a ping to the node.
|
||||
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
|
||||
if err := m.encodeAndSendMsg(addr.String(), pingMsg, &ping); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
|
@ -496,18 +506,17 @@ func (m *Memberlist) gossip() {
|
|||
return
|
||||
}
|
||||
|
||||
destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
|
||||
|
||||
addr := node.Address()
|
||||
if len(msgs) == 1 {
|
||||
// Send single message as is
|
||||
if err := m.rawSendMsgUDP(destAddr, &node.Node, msgs[0]); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err)
|
||||
if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
|
||||
}
|
||||
} else {
|
||||
// Otherwise create and send a compound message
|
||||
compound := makeCompoundMessage(msgs)
|
||||
if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err)
|
||||
if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -533,17 +542,17 @@ func (m *Memberlist) pushPull() {
|
|||
node := nodes[0]
|
||||
|
||||
// Attempt a push pull
|
||||
if err := m.pushPullNode(node.Addr, node.Port, false); err != nil {
|
||||
if err := m.pushPullNode(node.Address(), false); err != nil {
|
||||
m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// pushPullNode does a complete state exchange with a specific node.
|
||||
func (m *Memberlist) pushPullNode(addr []byte, port uint16, join bool) error {
|
||||
func (m *Memberlist) pushPullNode(addr string, join bool) error {
|
||||
defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
|
||||
|
||||
// Attempt to send and receive with the node
|
||||
remote, userState, err := m.sendAndReceiveState(addr, port, join)
|
||||
remote, userState, err := m.sendAndReceiveState(addr, join)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
package memberlist
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Packet is used to provide some metadata about incoming packets from peers
|
||||
// over a packet connection, as well as the packet payload.
|
||||
type Packet struct {
|
||||
// Buf has the raw contents of the packet.
|
||||
Buf []byte
|
||||
|
||||
// From has the address of the peer. This is an actual net.Addr so we
|
||||
// can expose some concrete details about incoming packets.
|
||||
From net.Addr
|
||||
|
||||
// Timestamp is the time when the packet was received. This should be
|
||||
// taken as close as possible to the actual receipt time to help make an
|
||||
// accurate RTT measurements during probes.
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// Transport is used to abstract over communicating with other peers. The packet
|
||||
// interface is assumed to be best-effort and the stream interface is assumed to
|
||||
// be reliable.
|
||||
type Transport interface {
|
||||
// FinalAdvertiseAddr is given the user's configured values (which
|
||||
// might be empty) and returns the desired IP and port to advertise to
|
||||
// the rest of the cluster.
|
||||
FinalAdvertiseAddr(ip string, port int) (net.IP, int, error)
|
||||
|
||||
// WriteTo is a packet-oriented interface that fires off the given
|
||||
// payload to the given address in a connectionless fashion. This should
|
||||
// return a time stamp that's as close as possible to when the packet
|
||||
// was transmitted to help make accurate RTT measurements during probes.
|
||||
//
|
||||
// This is similar to net.PacketConn, though we didn't want to expose
|
||||
// that full set of required methods to keep assumptions about the
|
||||
// underlying plumbing to a minimum. We also treat the address here as a
|
||||
// string, similar to Dial, so it's network neutral, so this usually is
|
||||
// in the form of "host:port".
|
||||
WriteTo(b []byte, addr string) (time.Time, error)
|
||||
|
||||
// PacketCh returns a channel that can be read to receive incoming
|
||||
// packets from other peers. How this is set up for listening is left as
|
||||
// an exercise for the concrete transport implementations.
|
||||
PacketCh() <-chan *Packet
|
||||
|
||||
// DialTimeout is used to create a connection that allows us to perform
|
||||
// two-way communication with a peer. This is generally more expensive
|
||||
// than packet connections so is used for more infrequent operations
|
||||
// such as anti-entropy or fallback probes if the packet-oriented probe
|
||||
// failed.
|
||||
DialTimeout(addr string, timeout time.Duration) (net.Conn, error)
|
||||
|
||||
// StreamCh returns a channel that can be read to handle incoming stream
|
||||
// connections from other peers. How this is set up for listening is
|
||||
// left as an exercise for the concrete transport implementations.
|
||||
StreamCh() <-chan net.Conn
|
||||
|
||||
// Shutdown is called when memberlist is shutting down; this gives the
|
||||
// transport a chance to clean up any listeners.
|
||||
Shutdown() error
|
||||
}
|
|
@ -8,6 +8,8 @@ import (
|
|||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -286,3 +288,9 @@ func decompressBuffer(c *compress) ([]byte, error) {
|
|||
// Return the uncompressed bytes
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
// joinHostPort returns the host:port form of an address, for use with a
|
||||
// transport.
|
||||
func joinHostPort(host string, port uint16) string {
|
||||
return net.JoinHostPort(host, strconv.Itoa(int(port)))
|
||||
}
|
||||
|
|
|
@ -588,10 +588,10 @@
|
|||
"revisionTime": "2015-06-09T07:04:31Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "1zk7IeGClUqBo+Phsx89p7fQ/rQ=",
|
||||
"checksumSHA1": "lhcYybZqKjS43O2DBjvnGn0yPPo=",
|
||||
"path": "github.com/hashicorp/memberlist",
|
||||
"revision": "23ad4b7d7b38496cd64c241dfd4c60b7794c254a",
|
||||
"revisionTime": "2017-02-08T21:15:06Z"
|
||||
"revision": "bbad45074a20fa539bc52e62d1e5f9f8cb467e7c",
|
||||
"revisionTime": "2017-03-16T23:38:26Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "qnlqWJYV81ENr61SZk9c65R1mDo=",
|
||||
|
|
Loading…
Reference in New Issue