Consolidate server lookup into one place and replace usages of localConsuls.
This commit is contained in:
parent
cac1c29ec5
commit
5a29eb7486
|
@ -238,9 +238,7 @@ func (s *Server) getLeader() (bool, *metadata.Server) {
|
|||
}
|
||||
|
||||
// Lookup the server
|
||||
s.localLock.RLock()
|
||||
server := s.localConsuls[leader]
|
||||
s.localLock.RUnlock()
|
||||
server, _ := s.serverLookup.GetServer(leader)
|
||||
|
||||
// Server could be nil
|
||||
return false, server
|
||||
|
|
|
@ -125,18 +125,14 @@ func (s *Server) localEvent(event serf.UserEvent) {
|
|||
// lanNodeJoin is used to handle join events on the LAN pool.
|
||||
func (s *Server) lanNodeJoin(me serf.MemberEvent) {
|
||||
for _, m := range me.Members {
|
||||
ok, parts := metadata.IsConsulServer(m)
|
||||
ok, serverMeta := metadata.IsConsulServer(m)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
s.logger.Printf("[INFO] consul: Adding LAN server %s", parts)
|
||||
s.logger.Printf("[INFO] consul: Adding LAN server %s", serverMeta)
|
||||
|
||||
// See if it's configured as part of our DC.
|
||||
if parts.Datacenter == s.config.Datacenter {
|
||||
s.localLock.Lock()
|
||||
s.localConsuls[raft.ServerAddress(parts.Addr.String())] = parts
|
||||
s.localLock.Unlock()
|
||||
}
|
||||
// Update server lookup
|
||||
s.serverLookup.AddServer(serverMeta)
|
||||
|
||||
// If we're still expecting to bootstrap, may need to handle this.
|
||||
if s.config.BootstrapExpect != 0 {
|
||||
|
@ -144,7 +140,7 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
|
|||
}
|
||||
|
||||
// Update id to address map
|
||||
s.serverAddressLookup.AddServer(parts.ID, parts.Addr.String())
|
||||
s.serverLookup.AddServer(serverMeta)
|
||||
|
||||
// Kick the join flooders.
|
||||
s.FloodNotify()
|
||||
|
@ -274,11 +270,7 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {
|
|||
}
|
||||
s.logger.Printf("[INFO] consul: Removing LAN server %s", parts)
|
||||
|
||||
s.localLock.Lock()
|
||||
delete(s.localConsuls, raft.ServerAddress(parts.Addr.String()))
|
||||
s.localLock.Unlock()
|
||||
|
||||
// Update id to address map
|
||||
s.serverAddressLookup.RemoveServer(parts.ID)
|
||||
s.serverLookup.RemoveServer(parts)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -123,11 +123,6 @@ type Server struct {
|
|||
// strong consistency.
|
||||
fsm *consulFSM
|
||||
|
||||
// localConsuls is used to track the known consuls
|
||||
// in the local datacenter. Used to do leader forwarding.
|
||||
localConsuls map[raft.ServerAddress]*metadata.Server
|
||||
localLock sync.RWMutex
|
||||
|
||||
// Logger uses the provided LogOutput
|
||||
logger *log.Logger
|
||||
|
||||
|
@ -171,8 +166,8 @@ type Server struct {
|
|||
// which SHOULD only consist of Consul servers
|
||||
serfWAN *serf.Serf
|
||||
|
||||
// fast lookup from id to server address to provide to the raft transport layer
|
||||
serverAddressLookup *ServerAddressLookup
|
||||
// serverLookup provides fast and thread-safe lookup by id and address
|
||||
serverLookup *ServerLookup
|
||||
|
||||
// floodLock controls access to floodCh.
|
||||
floodLock sync.RWMutex
|
||||
|
@ -298,7 +293,6 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
|||
connPool: connPool,
|
||||
eventChLAN: make(chan serf.Event, 256),
|
||||
eventChWAN: make(chan serf.Event, 256),
|
||||
localConsuls: make(map[raft.ServerAddress]*metadata.Server),
|
||||
logger: logger,
|
||||
reconcileCh: make(chan serf.Member, 32),
|
||||
router: router.NewRouter(logger, config.Datacenter),
|
||||
|
@ -307,7 +301,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
|||
reassertLeaderCh: make(chan chan error),
|
||||
sessionTimers: NewSessionTimers(),
|
||||
tombstoneGC: gc,
|
||||
serverAddressLookup: NewServerAddressLookup(),
|
||||
serverLookup: NewServerLookup(),
|
||||
shutdownCh: shutdownCh,
|
||||
}
|
||||
|
||||
|
@ -502,7 +496,7 @@ func (s *Server) setupRaft() error {
|
|||
Stream: s.raftLayer,
|
||||
MaxPool: 3,
|
||||
Timeout: 10 * time.Second,
|
||||
ServerAddressProvider: s.serverAddressLookup,
|
||||
ServerAddressProvider: s.serverLookup,
|
||||
}
|
||||
|
||||
trans := raft.NewNetworkTransportWithConfig(transConfig)
|
||||
|
@ -705,9 +699,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
|
|||
return true
|
||||
}
|
||||
|
||||
s.localLock.RLock()
|
||||
server, ok := s.localConsuls[address]
|
||||
s.localLock.RUnlock()
|
||||
server, ok := s.serverLookup.GetServer(address)
|
||||
|
||||
if !ok {
|
||||
return false
|
||||
|
|
|
@ -4,31 +4,53 @@ import (
|
|||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
// serverIdToAddress is a map from id to address for servers in the LAN pool.
|
||||
// used for fast lookup to satisfy the ServerAddressProvider interface
|
||||
type ServerAddressLookup struct {
|
||||
serverIdToAddress sync.Map
|
||||
// ServerLookup encapsulates looking up servers by id and address
|
||||
type ServerLookup struct {
|
||||
lock sync.RWMutex
|
||||
addressToServer map[raft.ServerAddress]*metadata.Server
|
||||
IdToServer map[raft.ServerID]*metadata.Server
|
||||
}
|
||||
|
||||
func NewServerAddressLookup() *ServerAddressLookup {
|
||||
return &ServerAddressLookup{}
|
||||
func NewServerLookup() *ServerLookup {
|
||||
return &ServerLookup{addressToServer: make(map[raft.ServerAddress]*metadata.Server), IdToServer: make(map[raft.ServerID]*metadata.Server)}
|
||||
}
|
||||
|
||||
func (sa *ServerAddressLookup) AddServer(id string, address string) {
|
||||
sa.serverIdToAddress.Store(id, address)
|
||||
func (sa *ServerLookup) AddServer(server *metadata.Server) {
|
||||
sa.lock.Lock()
|
||||
defer sa.lock.Unlock()
|
||||
sa.addressToServer[raft.ServerAddress(server.Addr.String())] = server
|
||||
sa.IdToServer[raft.ServerID(server.ID)] = server
|
||||
}
|
||||
|
||||
func (sa *ServerAddressLookup) RemoveServer(id string) {
|
||||
sa.serverIdToAddress.Delete(id)
|
||||
func (sa *ServerLookup) RemoveServer(server *metadata.Server) {
|
||||
sa.lock.Lock()
|
||||
defer sa.lock.Unlock()
|
||||
delete(sa.addressToServer, raft.ServerAddress(server.Addr.String()))
|
||||
delete(sa.IdToServer, raft.ServerID(server.ID))
|
||||
}
|
||||
|
||||
func (sa *ServerAddressLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) {
|
||||
val, ok := sa.serverIdToAddress.Load(string(id))
|
||||
// Implements the ServerAddressProvider interface
|
||||
func (sa *ServerLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) {
|
||||
sa.lock.RLock()
|
||||
defer sa.lock.RUnlock()
|
||||
svr, ok := sa.IdToServer[id]
|
||||
if !ok {
|
||||
return "", fmt.Errorf("Could not find address for server id %v", id)
|
||||
}
|
||||
return raft.ServerAddress(val.(string)), nil
|
||||
return raft.ServerAddress(svr.Addr.String()), nil
|
||||
}
|
||||
|
||||
// GetServer looks up the server by address, returns a boolean if not found
|
||||
func (sa *ServerLookup) GetServer(addr raft.ServerAddress) (*metadata.Server, bool) {
|
||||
sa.lock.RLock()
|
||||
defer sa.lock.RUnlock()
|
||||
svr, ok := sa.addressToServer[addr]
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return svr, true
|
||||
}
|
||||
|
|
|
@ -3,14 +3,32 @@ package consul
|
|||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
func TestServerAddressLookup(t *testing.T) {
|
||||
lookup := NewServerAddressLookup()
|
||||
addr := "72.0.0.17:8300"
|
||||
lookup.AddServer("1", addr)
|
||||
type testAddr struct {
|
||||
addr string
|
||||
}
|
||||
|
||||
got, err := lookup.ServerAddr("1")
|
||||
func (ta *testAddr) Network() string {
|
||||
return "tcp"
|
||||
}
|
||||
|
||||
func (ta *testAddr) String() string {
|
||||
return ta.addr
|
||||
}
|
||||
|
||||
func TestServerLookup(t *testing.T) {
|
||||
lookup := NewServerLookup()
|
||||
addr := "72.0.0.17:8300"
|
||||
id := "1"
|
||||
|
||||
svr := &metadata.Server{ID: id, Addr: &testAddr{addr}}
|
||||
lookup.AddServer(svr)
|
||||
|
||||
got, err := lookup.ServerAddr(raft.ServerID(id))
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error:%v", err)
|
||||
}
|
||||
|
@ -18,7 +36,15 @@ func TestServerAddressLookup(t *testing.T) {
|
|||
t.Fatalf("Expected %v but got %v", addr, got)
|
||||
}
|
||||
|
||||
lookup.RemoveServer("1")
|
||||
server, ok := lookup.GetServer(raft.ServerAddress(addr))
|
||||
if !ok {
|
||||
t.Fatalf("Expected lookup to return true")
|
||||
}
|
||||
if server.Addr.String() != addr {
|
||||
t.Fatalf("Expected lookup to return address %v but got %v", addr, server.Addr)
|
||||
}
|
||||
|
||||
lookup.RemoveServer(svr)
|
||||
|
||||
got, err = lookup.ServerAddr("1")
|
||||
expectedErr := fmt.Errorf("Could not find address for server id 1")
|
||||
|
@ -26,5 +52,7 @@ func TestServerAddressLookup(t *testing.T) {
|
|||
t.Fatalf("Unexpected error, got %v wanted %v", err, expectedErr)
|
||||
}
|
||||
|
||||
lookup.RemoveServer("3")
|
||||
svr2 := &metadata.Server{ID: "2", Addr: &testAddr{"123.4.5.6"}}
|
||||
lookup.RemoveServer(svr2)
|
||||
|
||||
}
|
||||
|
|
|
@ -342,7 +342,7 @@ func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) {
|
|||
if len(s2.router.GetDatacenters()) != 2 {
|
||||
r.Fatalf("remote consul missing")
|
||||
}
|
||||
if len(s2.localConsuls) != 2 {
|
||||
if len(s2.serverLookup.addressToServer) != 2 {
|
||||
r.Fatalf("local consul fellow s3 for s2 missing")
|
||||
}
|
||||
})
|
||||
|
@ -666,14 +666,12 @@ func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) {
|
|||
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s2, 2)) })
|
||||
|
||||
// Have s2 make an RPC call to s1
|
||||
s2.localLock.RLock()
|
||||
var leader *metadata.Server
|
||||
for _, server := range s2.localConsuls {
|
||||
for _, server := range s2.serverLookup.addressToServer {
|
||||
if server.Name == s1.config.NodeName {
|
||||
leader = server
|
||||
}
|
||||
}
|
||||
s2.localLock.RUnlock()
|
||||
if leader == nil {
|
||||
t.Fatal("no leader")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue