Implement AddressProvider and wire that up to raft transport layer to support server nodes changing their IP addresses in containerized environments

This commit is contained in:
Preetha Appan 2017-08-23 11:06:26 -05:00
parent 81782e871d
commit 01f8e469aa
3 changed files with 61 additions and 1 deletions

View File

@ -494,7 +494,7 @@ func (s *Server) setupRaft() error {
}
// Create a transport layer.
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput)
trans := raft.NewNetworkTransportWithServerAddressProvider(s.raftLayer, 3, 10*time.Second, s)
s.raftTransport = trans
// Make sure we set the LogOutput.
@ -1047,6 +1047,18 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) {
return s.serfWAN.GetCoordinate()
}
func (s *Server) ServerAddr(id raft.ServerID) raft.ServerAddress {
if string(id) == string(s.config.NodeID) {
return raft.ServerAddress(s.config.RPCAddr.String())
}
addr, err := s.router.GetServerAddressByID(s.config.Datacenter, string(id))
if err != nil {
s.logger.Println("[WARN] Unable to find address for raft server id %v", id)
return raft.ServerAddress("")
}
return raft.ServerAddress(addr)
}
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1)

View File

@ -79,6 +79,9 @@ type Manager struct {
listValue atomic.Value
listLock sync.Mutex
// idToAddress provides lookup of server address by id, and is maintained alongside listValue
idToAddress atomic.Value
// rebalanceTimer controls the duration of the rebalance interval
rebalanceTimer *time.Timer
@ -223,10 +226,30 @@ func (m *Manager) getServerList() serverList {
return m.listValue.Load().(serverList)
}
// GetServerAddress by ID returns a server address based on the id
func (m *Manager) GetServerAddressByID(id string) string {
idAddrMap := m.idToAddress.Load().(map[string]string)
addr, ok := idAddrMap[id]
if !ok {
m.logger.Printf("[WARN] Unable to find address for node id %v", id)
return ""
}
return addr
}
// saveServerList is a convenience method which hides the locking semantics
// of atomic.Value from the caller.
func (m *Manager) saveServerList(l serverList) {
m.listValue.Store(l)
m.idToAddress.Store(makeIdAddrMap(l))
}
func makeIdAddrMap(list serverList) map[string]string {
ret := make(map[string]string)
for _, server := range list.servers {
ret[server.ID] = server.Addr.String()
}
return ret
}
// New is the only way to safely create a new Manager struct.

View File

@ -489,3 +489,28 @@ func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) {
}
return maps, nil
}
func (r *Router) GetServerAddressByID(datacenter string, id string) (string, error) {
r.RLock()
defer r.RUnlock()
// Get the list of managers for this datacenter. This will usually just
// have one entry, but it's possible to have a user-defined area + WAN.
managers, ok := r.managers[datacenter]
if !ok {
return "", fmt.Errorf("datacenter %v not found", datacenter)
}
// loop over all the managers till we find a matching address for the id
// there could be more than for if network areas are configured
for _, manager := range managers {
if manager.IsOffline() {
continue
}
id := manager.GetServerAddressByID(id)
if id != "" {
return id, nil
}
}
return "", fmt.Errorf("Unable to match id %v to any known servers ", id)
}