consul: Store the protocol version for a server

This commit is contained in:
Armon Dadgar 2014-05-27 15:07:31 -07:00
parent c8831db91c
commit 589105eee4
6 changed files with 56 additions and 28 deletions

View File

@ -43,7 +43,7 @@ type Client struct {
connPool *ConnPool connPool *ConnPool
// consuls tracks the locally known servers // consuls tracks the locally known servers
consuls []net.Addr consuls []*serverParts
consulLock sync.RWMutex consulLock sync.RWMutex
// eventCh is used to receive events from the // eventCh is used to receive events from the
@ -52,7 +52,7 @@ type Client struct {
// lastServer is the last server we made an RPC call to, // lastServer is the last server we made an RPC call to,
// this is used to re-use the last connection // this is used to re-use the last connection
lastServer net.Addr lastServer *serverParts
lastRPCTime time.Time lastRPCTime time.Time
// Logger uses the provided LogOutput // Logger uses the provided LogOutput
@ -230,15 +230,14 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
m.Name, parts.Datacenter) m.Name, parts.Datacenter)
continue continue
} }
c.logger.Printf("[INFO] consul: adding server %s", parts)
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port}
c.logger.Printf("[INFO] consul: adding server for datacenter: %s, addr: %s", parts.Datacenter, addr)
// Check if this server is known // Check if this server is known
found := false found := false
c.consulLock.Lock() c.consulLock.Lock()
for _, c := range c.consuls { for idx, existing := range c.consuls {
if c.String() == addr.String() { if existing.Name == parts.Name {
c.consuls[idx] = parts
found = true found = true
break break
} }
@ -246,7 +245,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
// Add to the list if not known // Add to the list if not known
if !found { if !found {
c.consuls = append(c.consuls, addr) c.consuls = append(c.consuls, parts)
} }
c.consulLock.Unlock() c.consulLock.Unlock()
@ -304,7 +303,7 @@ func (c *Client) localEvent(event serf.UserEvent) {
// RPC is used to forward an RPC call to a consul server, or fail if no servers // RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error { func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Check the last rpc time // Check the last rpc time
var server net.Addr var server *serverParts
if time.Now().Sub(c.lastRPCTime) < clientRPCCache { if time.Now().Sub(c.lastRPCTime) < clientRPCCache {
server = c.lastServer server = c.lastServer
if server != nil { if server != nil {
@ -325,8 +324,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Forward to remote Consul // Forward to remote Consul
TRY_RPC: TRY_RPC:
// TODO: Correct version if err := c.connPool.RPC(server.Addr, server.Version, method, args, reply); err != nil {
if err := c.connPool.RPC(server, 1, method, args, reply); err != nil {
c.lastServer = nil c.lastServer = nil
c.lastRPCTime = time.Time{} c.lastRPCTime = time.Time{}
return err return err

View File

@ -186,7 +186,8 @@ func (s *Server) forwardLeader(method string, args interface{}, reply interface{
if leader == nil { if leader == nil {
return structs.ErrNoLeader return structs.ErrNoLeader
} }
return s.connPool.RPC(leader, method, args, reply) // TODO: Correct version
return s.connPool.RPC(leader, 1, method, args, reply)
} }
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers // forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
@ -207,7 +208,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
// Forward to remote Consul // Forward to remote Consul
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1) metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
return s.connPool.RPC(server, method, args, reply) return s.connPool.RPC(server.Addr, server.Version, method, args, reply)
} }
// raftApply is used to encode a message, run it through raft, and return // raftApply is used to encode a message, run it through raft, and return

View File

@ -2,7 +2,6 @@ package consul
import ( import (
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
"net"
"strings" "strings"
) )
@ -118,15 +117,15 @@ func (s *Server) remoteJoin(me serf.MemberEvent) {
s.logger.Printf("[WARN] consul: non-server in WAN pool: %s %s", m.Name) s.logger.Printf("[WARN] consul: non-server in WAN pool: %s %s", m.Name)
continue continue
} }
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port} s.logger.Printf("[INFO] consul: adding server %s", parts)
s.logger.Printf("[INFO] consul: adding server for datacenter: %s, addr: %s", parts.Datacenter, addr)
// Check if this server is known // Check if this server is known
found := false found := false
s.remoteLock.Lock() s.remoteLock.Lock()
existing := s.remoteConsuls[parts.Datacenter] existing := s.remoteConsuls[parts.Datacenter]
for _, e := range existing { for idx, e := range existing {
if e.String() == addr.String() { if e.Name == parts.Name {
existing[idx] = parts
found = true found = true
break break
} }
@ -134,7 +133,7 @@ func (s *Server) remoteJoin(me serf.MemberEvent) {
// Add ot the list if not known // Add ot the list if not known
if !found { if !found {
s.remoteConsuls[parts.Datacenter] = append(existing, addr) s.remoteConsuls[parts.Datacenter] = append(existing, parts)
} }
s.remoteLock.Unlock() s.remoteLock.Unlock()
} }
@ -147,15 +146,14 @@ func (s *Server) remoteFailed(me serf.MemberEvent) {
if !ok { if !ok {
continue continue
} }
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port} s.logger.Printf("[INFO] consul: removing server %s", parts)
s.logger.Printf("[INFO] consul: removing server for datacenter: %s, addr: %s", parts.Datacenter, addr)
// Remove the server if known // Remove the server if known
s.remoteLock.Lock() s.remoteLock.Lock()
existing := s.remoteConsuls[parts.Datacenter] existing := s.remoteConsuls[parts.Datacenter]
n := len(existing) n := len(existing)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
if existing[i].String() == addr.String() { if existing[i].Name == parts.Name {
existing[i], existing[n-1] = existing[n-1], nil existing[i], existing[n-1] = existing[n-1], nil
existing = existing[:n-1] existing = existing[:n-1]
n-- n--

View File

@ -86,7 +86,7 @@ type Server struct {
// remoteConsuls is used to track the known consuls in // remoteConsuls is used to track the known consuls in
// remote data centers. Used to do DC forwarding. // remote data centers. Used to do DC forwarding.
remoteConsuls map[string][]net.Addr remoteConsuls map[string][]*serverParts
remoteLock sync.RWMutex remoteLock sync.RWMutex
// rpcListener is used to listen for incoming connections // rpcListener is used to listen for incoming connections
@ -164,7 +164,7 @@ func NewServer(config *Config) (*Server, error) {
eventChWAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256),
logger: logger, logger: logger,
reconcileCh: make(chan serf.Member, 32), reconcileCh: make(chan serf.Member, 32),
remoteConsuls: make(map[string][]net.Addr), remoteConsuls: make(map[string][]*serverParts),
rpcServer: rpc.NewServer(), rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS, rpcTLS: incomingTLS,
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
@ -522,8 +522,8 @@ func (s *Server) IsLeader() bool {
// RPC is used to make a local RPC call // RPC is used to make a local RPC call
func (s *Server) RPC(method string, args interface{}, reply interface{}) error { func (s *Server) RPC(method string, args interface{}, reply interface{}) error {
addr := s.rpcListener.Addr() addr := s.rpcListener.Addr()
// TODO: Correct version version := int(s.config.ProtocolVersion)
return s.connPool.RPC(addr, 1, method, args, reply) return s.connPool.RPC(addr, version, method, args, reply)
} }
// Stats is used to return statistics for debugging and insight // Stats is used to return statistics for debugging and insight

View File

@ -22,12 +22,18 @@ var privateBlocks []*net.IPNet
// serverparts is used to return the parts of a server role // serverparts is used to return the parts of a server role
type serverParts struct { type serverParts struct {
Name string
Datacenter string Datacenter string
Port int Port int
Bootstrap bool Bootstrap bool
Version int
Addr net.Addr Addr net.Addr
} }
func (s *serverParts) String() string {
return fmt.Sprintf("%s (Addr: %s) (DC: %s)", s.Name, s.Addr, s.Datacenter)
}
func init() { func init() {
// Add each private block // Add each private block
privateBlocks = make([]*net.IPNet, 3) privateBlocks = make([]*net.IPNet, 3)
@ -76,14 +82,31 @@ func isConsulServer(m serf.Member) (bool, *serverParts) {
} }
datacenter := m.Tags["dc"] datacenter := m.Tags["dc"]
port_str := m.Tags["port"]
_, bootstrap := m.Tags["bootstrap"] _, bootstrap := m.Tags["bootstrap"]
port_str := m.Tags["port"]
port, err := strconv.Atoi(port_str) port, err := strconv.Atoi(port_str)
if err != nil { if err != nil {
return false, nil return false, nil
} }
vsn_str := m.Tags["vsn"]
vsn, err := strconv.Atoi(vsn_str)
if err != nil {
return false, nil
}
addr := &net.TCPAddr{IP: m.Addr, Port: port} addr := &net.TCPAddr{IP: m.Addr, Port: port}
return true, &serverParts{datacenter, port, bootstrap, addr}
parts := &serverParts{
Name: m.Name,
Datacenter: datacenter,
Port: port,
Bootstrap: bootstrap,
Addr: addr,
Version: vsn,
}
return true, parts
} }
// Returns if a member is a consul node. Returns a boo, // Returns if a member is a consul node. Returns a boo,

View File

@ -37,17 +37,22 @@ func TestIsPrivateIP(t *testing.T) {
func TestIsConsulServer(t *testing.T) { func TestIsConsulServer(t *testing.T) {
m := serf.Member{ m := serf.Member{
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}), Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{ Tags: map[string]string{
"role": "consul", "role": "consul",
"dc": "east-aws", "dc": "east-aws",
"port": "10000", "port": "10000",
"vsn": "1",
}, },
} }
valid, parts := isConsulServer(m) valid, parts := isConsulServer(m)
if !valid || parts.Datacenter != "east-aws" || parts.Port != 10000 { if !valid || parts.Datacenter != "east-aws" || parts.Port != 10000 {
t.Fatalf("bad: %v %v", valid, parts) t.Fatalf("bad: %v %v", valid, parts)
} }
if parts.Name != "foo" {
t.Fatalf("bad: %v", parts)
}
if parts.Bootstrap { if parts.Bootstrap {
t.Fatalf("unexpected bootstrap") t.Fatalf("unexpected bootstrap")
} }
@ -59,6 +64,9 @@ func TestIsConsulServer(t *testing.T) {
if parts.Addr.String() != "127.0.0.1:10000" { if parts.Addr.String() != "127.0.0.1:10000" {
t.Fatalf("bad addr: %v", parts.Addr) t.Fatalf("bad addr: %v", parts.Addr)
} }
if parts.Version != 1 {
t.Fatalf("bad: %v", parts)
}
} }
func TestIsConsulNode(t *testing.T) { func TestIsConsulNode(t *testing.T) {