Remove server address tracking logic from manager/router and maintain it as part of lan event listener instead. Used sync.Map to track this, and added unit tests

This commit is contained in:
Preetha Appan 2017-08-29 19:37:48 -05:00
parent b4a9d77d49
commit ca48e7e4c2
6 changed files with 81 additions and 52 deletions

View File

@ -143,6 +143,9 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
s.maybeBootstrap()
}
// Update id to address map
s.serverAddressLookup.AddServer(parts.ID, parts.Addr.String())
// Kick the join flooders.
s.FloodNotify()
}
@ -274,5 +277,8 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {
s.localLock.Lock()
delete(s.localConsuls, raft.ServerAddress(parts.Addr.String()))
s.localLock.Unlock()
// Update id to address map
s.serverAddressLookup.RemoveServer(parts.ID)
}
}

View File

@ -171,6 +171,9 @@ type Server struct {
// which SHOULD only consist of Consul servers
serfWAN *serf.Serf
// fast lookup from id to server address to provide to the raft transport layer
serverAddressLookup *ServerAddressLookup
// floodLock controls access to floodCh.
floodLock sync.RWMutex
floodCh []chan struct{}
@ -286,6 +289,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
ForceTLS: config.VerifyOutgoing,
}
//serverAddrLookup = NewServerAddressLookup()
// Create server.
s := &Server{
autopilotRemoveDeadCh: make(chan struct{}),
@ -304,6 +308,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
reassertLeaderCh: make(chan chan error),
sessionTimers: NewSessionTimers(),
tombstoneGC: gc,
serverAddressLookup: NewServerAddressLookup(),
shutdownCh: shutdownCh,
}
@ -494,7 +499,12 @@ func (s *Server) setupRaft() error {
}
// Create a transport layer.
transConfig := &raft.NetworkTransportConfig{Stream: s.raftLayer, MaxPool: 3, Timeout: 10 * time.Second, ServerAddressProvider: s}
transConfig := &raft.NetworkTransportConfig{
Stream: s.raftLayer,
MaxPool: 3,
Timeout: 10 * time.Second,
ServerAddressProvider: s.serverAddressLookup,
}
trans := raft.NewNetworkTransportWithConfig(transConfig)
s.raftTransport = trans
@ -1049,17 +1059,6 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) {
return s.serfWAN.GetCoordinate()
}
func (s *Server) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) {
if string(id) == string(s.config.NodeID) {
return raft.ServerAddress(s.config.RPCAddr.String()), nil
}
addr, err := s.router.GetServerAddressByID(s.config.Datacenter, string(id))
if err != nil {
return "", err
}
return raft.ServerAddress(addr), nil
}
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1)

View File

@ -0,0 +1,34 @@
package consul
import (
"fmt"
"sync"
"github.com/hashicorp/raft"
)
// serverIdToAddress is a map from id to address for servers in the LAN pool.
// used for fast lookup to satisfy the ServerAddressProvider interface
type ServerAddressLookup struct {
serverIdToAddress sync.Map
}
func NewServerAddressLookup() *ServerAddressLookup {
return &ServerAddressLookup{}
}
func (sa *ServerAddressLookup) AddServer(id string, address string) {
sa.serverIdToAddress.Store(id, address)
}
func (sa *ServerAddressLookup) RemoveServer(id string) {
sa.serverIdToAddress.Delete(id)
}
func (sa *ServerAddressLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) {
val, ok := sa.serverIdToAddress.Load(string(id))
if !ok {
return "", fmt.Errorf("Could not find address for server id %v", id)
}
return raft.ServerAddress(val.(string)), nil
}

View File

@ -0,0 +1,30 @@
package consul
import (
"fmt"
"testing"
)
func TestServerAddressLookup(t *testing.T) {
lookup := NewServerAddressLookup()
addr := "72.0.0.17:8300"
lookup.AddServer("1", addr)
got, err := lookup.ServerAddr("1")
if err != nil {
t.Fatalf("Unexpected error:%v", err)
}
if string(got) != addr {
t.Fatalf("Expected %v but got %v", addr, got)
}
lookup.RemoveServer("1")
got, err = lookup.ServerAddr("1")
expectedErr := fmt.Errorf("Could not find address for server id 1")
if expectedErr.Error() != err.Error() {
t.Fatalf("Unexpected error, got %v wanted %v", err, expectedErr)
}
lookup.RemoveServer("3")
}

View File

@ -79,9 +79,6 @@ type Manager struct {
listValue atomic.Value
listLock sync.Mutex
// idToAddress provides lookup of server address by id, and is maintained alongside listValue
idToAddress atomic.Value
// rebalanceTimer controls the duration of the rebalance interval
rebalanceTimer *time.Timer
@ -226,22 +223,10 @@ func (m *Manager) getServerList() serverList {
return m.listValue.Load().(serverList)
}
// GetServerAddress by ID returns a server address based on the id
func (m *Manager) GetServerAddressByID(id string) string {
idAddrMap := m.idToAddress.Load().(map[string]string)
addr, ok := idAddrMap[id]
if !ok {
m.logger.Printf("[WARN] Unable to find address for node id %v", id)
return ""
}
return addr
}
// saveServerList is a convenience method which hides the locking semantics
// of atomic.Value from the caller.
func (m *Manager) saveServerList(l serverList) {
m.listValue.Store(l)
m.idToAddress.Store(makeIdAddrMap(l))
}
func makeIdAddrMap(list serverList) map[string]string {

View File

@ -489,28 +489,3 @@ func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) {
}
return maps, nil
}
func (r *Router) GetServerAddressByID(datacenter string, id string) (string, error) {
r.RLock()
defer r.RUnlock()
// Get the list of managers for this datacenter. This will usually just
// have one entry, but it's possible to have a user-defined area + WAN.
managers, ok := r.managers[datacenter]
if !ok {
return "", fmt.Errorf("datacenter %v not found", datacenter)
}
// loop over all the managers till we find a matching address for the id
// there could be more than for if network areas are configured
for _, manager := range managers {
if manager.IsOffline() {
continue
}
id := manager.GetServerAddressByID(id)
if id != "" {
return id, nil
}
}
return "", fmt.Errorf("Unable to match id %v to any known servers ", id)
}