diff --git a/consul/client.go b/consul/client.go index 9aebb35fc..4d67074d0 100644 --- a/consul/client.go +++ b/consul/client.go @@ -43,7 +43,7 @@ type Client struct { connPool *ConnPool // consuls tracks the locally known servers - consuls []net.Addr + consuls []*serverParts consulLock sync.RWMutex // eventCh is used to receive events from the @@ -52,7 +52,7 @@ type Client struct { // lastServer is the last server we made an RPC call to, // this is used to re-use the last connection - lastServer net.Addr + lastServer *serverParts lastRPCTime time.Time // Logger uses the provided LogOutput @@ -230,15 +230,14 @@ func (c *Client) nodeJoin(me serf.MemberEvent) { m.Name, parts.Datacenter) continue } - - var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port} - c.logger.Printf("[INFO] consul: adding server for datacenter: %s, addr: %s", parts.Datacenter, addr) + c.logger.Printf("[INFO] consul: adding server %s", parts) // Check if this server is known found := false c.consulLock.Lock() - for _, c := range c.consuls { - if c.String() == addr.String() { + for idx, existing := range c.consuls { + if existing.Name == parts.Name { + c.consuls[idx] = parts found = true break } @@ -246,7 +245,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) { // Add to the list if not known if !found { - c.consuls = append(c.consuls, addr) + c.consuls = append(c.consuls, parts) } c.consulLock.Unlock() @@ -304,7 +303,7 @@ func (c *Client) localEvent(event serf.UserEvent) { // RPC is used to forward an RPC call to a consul server, or fail if no servers func (c *Client) RPC(method string, args interface{}, reply interface{}) error { // Check the last rpc time - var server net.Addr + var server *serverParts if time.Now().Sub(c.lastRPCTime) < clientRPCCache { server = c.lastServer if server != nil { @@ -325,8 +324,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { // Forward to remote Consul TRY_RPC: - // TODO: Correct version - if err := c.connPool.RPC(server, 1, method, args, reply); err != nil { + if err := c.connPool.RPC(server.Addr, server.Version, method, args, reply); err != nil { c.lastServer = nil c.lastRPCTime = time.Time{} return err diff --git a/consul/rpc.go b/consul/rpc.go index c6ea007a3..478bbd8ef 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -186,7 +186,8 @@ func (s *Server) forwardLeader(method string, args interface{}, reply interface{ if leader == nil { return structs.ErrNoLeader } - return s.connPool.RPC(leader, method, args, reply) + // TODO: Correct version + return s.connPool.RPC(leader, 1, method, args, reply) } // forwardDC is used to forward an RPC call to a remote DC, or fail if no servers @@ -207,7 +208,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{ // Forward to remote Consul metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1) - return s.connPool.RPC(server, method, args, reply) + return s.connPool.RPC(server.Addr, server.Version, method, args, reply) } // raftApply is used to encode a message, run it through raft, and return diff --git a/consul/serf.go b/consul/serf.go index df10a5ba5..74d29211e 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -2,7 +2,6 @@ package consul import ( "github.com/hashicorp/serf/serf" - "net" "strings" ) @@ -118,15 +117,15 @@ func (s *Server) remoteJoin(me serf.MemberEvent) { s.logger.Printf("[WARN] consul: non-server in WAN pool: %s %s", m.Name) continue } - var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port} - s.logger.Printf("[INFO] consul: adding server for datacenter: %s, addr: %s", parts.Datacenter, addr) + s.logger.Printf("[INFO] consul: adding server %s", parts) // Check if this server is known found := false s.remoteLock.Lock() existing := s.remoteConsuls[parts.Datacenter] - for _, e := range existing { - if e.String() == addr.String() { + for idx, e := range existing { + if e.Name == parts.Name { + existing[idx] = parts found = true break } @@ -134,7 +133,7 @@ func (s *Server) remoteJoin(me serf.MemberEvent) { // Add ot the list if not known if !found { - s.remoteConsuls[parts.Datacenter] = append(existing, addr) + s.remoteConsuls[parts.Datacenter] = append(existing, parts) } s.remoteLock.Unlock() } @@ -147,15 +146,14 @@ func (s *Server) remoteFailed(me serf.MemberEvent) { if !ok { continue } - var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port} - s.logger.Printf("[INFO] consul: removing server for datacenter: %s, addr: %s", parts.Datacenter, addr) + s.logger.Printf("[INFO] consul: removing server %s", parts) // Remove the server if known s.remoteLock.Lock() existing := s.remoteConsuls[parts.Datacenter] n := len(existing) for i := 0; i < n; i++ { - if existing[i].String() == addr.String() { + if existing[i].Name == parts.Name { existing[i], existing[n-1] = existing[n-1], nil existing = existing[:n-1] n-- diff --git a/consul/server.go b/consul/server.go index e6db4fa55..9cc808f05 100644 --- a/consul/server.go +++ b/consul/server.go @@ -86,7 +86,7 @@ type Server struct { // remoteConsuls is used to track the known consuls in // remote data centers. Used to do DC forwarding. - remoteConsuls map[string][]net.Addr + remoteConsuls map[string][]*serverParts remoteLock sync.RWMutex // rpcListener is used to listen for incoming connections @@ -164,7 +164,7 @@ func NewServer(config *Config) (*Server, error) { eventChWAN: make(chan serf.Event, 256), logger: logger, reconcileCh: make(chan serf.Member, 32), - remoteConsuls: make(map[string][]net.Addr), + remoteConsuls: make(map[string][]*serverParts), rpcServer: rpc.NewServer(), rpcTLS: incomingTLS, shutdownCh: make(chan struct{}), @@ -522,8 +522,8 @@ func (s *Server) IsLeader() bool { // RPC is used to make a local RPC call func (s *Server) RPC(method string, args interface{}, reply interface{}) error { addr := s.rpcListener.Addr() - // TODO: Correct version - return s.connPool.RPC(addr, 1, method, args, reply) + version := int(s.config.ProtocolVersion) + return s.connPool.RPC(addr, version, method, args, reply) } // Stats is used to return statistics for debugging and insight diff --git a/consul/util.go b/consul/util.go index 7c62aa52d..402ecee73 100644 --- a/consul/util.go +++ b/consul/util.go @@ -22,12 +22,18 @@ var privateBlocks []*net.IPNet // serverparts is used to return the parts of a server role type serverParts struct { + Name string Datacenter string Port int Bootstrap bool + Version int Addr net.Addr } +func (s *serverParts) String() string { + return fmt.Sprintf("%s (Addr: %s) (DC: %s)", s.Name, s.Addr, s.Datacenter) +} + func init() { // Add each private block privateBlocks = make([]*net.IPNet, 3) @@ -76,14 +82,31 @@ func isConsulServer(m serf.Member) (bool, *serverParts) { } datacenter := m.Tags["dc"] - port_str := m.Tags["port"] _, bootstrap := m.Tags["bootstrap"] + + port_str := m.Tags["port"] port, err := strconv.Atoi(port_str) if err != nil { return false, nil } + + vsn_str := m.Tags["vsn"] + vsn, err := strconv.Atoi(vsn_str) + if err != nil { + return false, nil + } + addr := &net.TCPAddr{IP: m.Addr, Port: port} - return true, &serverParts{datacenter, port, bootstrap, addr} + + parts := &serverParts{ + Name: m.Name, + Datacenter: datacenter, + Port: port, + Bootstrap: bootstrap, + Addr: addr, + Version: vsn, + } + return true, parts } // Returns if a member is a consul node. Returns a boo, diff --git a/consul/util_test.go b/consul/util_test.go index daf8d0e4c..65e5e99ed 100644 --- a/consul/util_test.go +++ b/consul/util_test.go @@ -37,17 +37,22 @@ func TestIsPrivateIP(t *testing.T) { func TestIsConsulServer(t *testing.T) { m := serf.Member{ + Name: "foo", Addr: net.IP([]byte{127, 0, 0, 1}), Tags: map[string]string{ "role": "consul", "dc": "east-aws", "port": "10000", + "vsn": "1", }, } valid, parts := isConsulServer(m) if !valid || parts.Datacenter != "east-aws" || parts.Port != 10000 { t.Fatalf("bad: %v %v", valid, parts) } + if parts.Name != "foo" { + t.Fatalf("bad: %v", parts) + } if parts.Bootstrap { t.Fatalf("unexpected bootstrap") } @@ -59,6 +64,9 @@ func TestIsConsulServer(t *testing.T) { if parts.Addr.String() != "127.0.0.1:10000" { t.Fatalf("bad addr: %v", parts.Addr) } + if parts.Version != 1 { + t.Fatalf("bad: %v", parts) + } } func TestIsConsulNode(t *testing.T) {