diff --git a/agent/consul/server.go b/agent/consul/server.go index afcebd60d..0c177599b 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -494,7 +494,7 @@ func (s *Server) setupRaft() error { } // Create a transport layer. - trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput) + trans := raft.NewNetworkTransportWithServerAddressProvider(s.raftLayer, 3, 10*time.Second, s) s.raftTransport = trans // Make sure we set the LogOutput. @@ -1047,6 +1047,18 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) { return s.serfWAN.GetCoordinate() } +func (s *Server) ServerAddr(id raft.ServerID) raft.ServerAddress { + if string(id) == string(s.config.NodeID) { + return raft.ServerAddress(s.config.RPCAddr.String()) + } + addr, err := s.router.GetServerAddressByID(s.config.Datacenter, string(id)) + if err != nil { + s.logger.Println("[WARN] Unable to find address for raft server id %v", id) + return raft.ServerAddress("") + } + return raft.ServerAddress(addr) +} + // Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write func (s *Server) setConsistentReadReady() { atomic.StoreInt32(&s.readyForConsistentReads, 1) diff --git a/agent/router/manager.go b/agent/router/manager.go index 288a30bd0..efa747c85 100644 --- a/agent/router/manager.go +++ b/agent/router/manager.go @@ -79,6 +79,9 @@ type Manager struct { listValue atomic.Value listLock sync.Mutex + // idToAddress provides lookup of server address by id, and is maintained alongside listValue + idToAddress atomic.Value + // rebalanceTimer controls the duration of the rebalance interval rebalanceTimer *time.Timer @@ -223,10 +226,30 @@ func (m *Manager) getServerList() serverList { return m.listValue.Load().(serverList) } +// GetServerAddress by ID returns a server address based on the id +func (m *Manager) GetServerAddressByID(id string) string { + idAddrMap := m.idToAddress.Load().(map[string]string) + addr, ok := idAddrMap[id] + if !ok { + m.logger.Printf("[WARN] Unable to find address for node id %v", id) + return "" + } + return addr +} + // saveServerList is a convenience method which hides the locking semantics // of atomic.Value from the caller. func (m *Manager) saveServerList(l serverList) { m.listValue.Store(l) + m.idToAddress.Store(makeIdAddrMap(l)) +} + +func makeIdAddrMap(list serverList) map[string]string { + ret := make(map[string]string) + for _, server := range list.servers { + ret[server.ID] = server.Addr.String() + } + return ret } // New is the only way to safely create a new Manager struct. diff --git a/agent/router/router.go b/agent/router/router.go index c41a6a79c..c0a48e623 100644 --- a/agent/router/router.go +++ b/agent/router/router.go @@ -489,3 +489,28 @@ func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) { } return maps, nil } + +func (r *Router) GetServerAddressByID(datacenter string, id string) (string, error) { + r.RLock() + defer r.RUnlock() + + // Get the list of managers for this datacenter. This will usually just + // have one entry, but it's possible to have a user-defined area + WAN. + managers, ok := r.managers[datacenter] + if !ok { + return "", fmt.Errorf("datacenter %v not found", datacenter) + } + + // loop over all the managers till we find a matching address for the id + // there could be more than for if network areas are configured + for _, manager := range managers { + if manager.IsOffline() { + continue + } + id := manager.GetServerAddressByID(id) + if id != "" { + return id, nil + } + } + return "", fmt.Errorf("Unable to match id %v to any known servers ", id) +}