Adds offline detection.

This commit is contained in:
James Phillips 2017-03-14 17:47:37 -07:00
parent 8cc06ec10d
commit 543389ad0a
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
4 changed files with 146 additions and 7 deletions

View File

@ -99,6 +99,10 @@ type Manager struct {
// notifyFailedBarrier is acts as a barrier to prevent queuing behind
// serverListLog and acts as a TryLock().
notifyFailedBarrier int32
// offline is used to indicate that there are no servers, or that all
// known servers have failed the ping test.
offline int32
}
// AddServer takes out an internal write lock and adds a new server. If the
@ -136,6 +140,10 @@ func (m *Manager) AddServer(s *agent.Server) {
l.servers = newServers
}
// Assume we are no longer offline since we've just seen a new server.
atomic.StoreInt32(&m.offline, 0)
// Start using this list of servers.
m.saveServerList(l)
}
@ -180,6 +188,13 @@ func (l *serverList) shuffleServers() {
}
}
// IsOffline checks to see if all the known servers have failed their ping
// test during the last rebalance.
func (m *Manager) IsOffline() bool {
offline := atomic.LoadInt32(&m.offline)
return offline == 1
}
// FindServer takes out an internal "read lock" and searches through the list
// of servers to find a "healthy" server. If the server is actually
// unhealthy, we rely on Serf to detect this and remove the node from the
@ -221,6 +236,7 @@ func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCl
m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration)
m.shutdownCh = shutdownCh
m.offline = 1
l := serverList{}
l.servers = make([]*agent.Server, 0)
@ -280,11 +296,7 @@ func (m *Manager) RebalanceServers() {
// Obtain a copy of the current serverList
l := m.getServerList()
// Early abort if there is nothing to shuffle
if len(l.servers) < 2 {
return
}
// Shuffle servers so we have a chance of picking a new one.
l.shuffleServers()
// Iterate through the shuffled server list to find an assumed
@ -307,8 +319,11 @@ func (m *Manager) RebalanceServers() {
}
// If no healthy servers were found, sleep and wait for Serf to make
// the world a happy place again.
if !foundHealthyServer {
// the world a happy place again. Update the offline status.
if foundHealthyServer {
atomic.StoreInt32(&m.offline, 0)
} else {
atomic.StoreInt32(&m.offline, 1)
m.logger.Printf("[DEBUG] manager: No healthy servers during rebalance, aborting")
return
}

View File

@ -77,6 +77,45 @@ func TestServers_AddServer(t *testing.T) {
}
}
// func (m *Manager) IsOffline() bool {
func TestServers_IsOffline(t *testing.T) {
m := testManager()
if !m.IsOffline() {
t.Fatalf("bad")
}
s1 := &agent.Server{Name: "s1"}
m.AddServer(s1)
if m.IsOffline() {
t.Fatalf("bad")
}
m.RebalanceServers()
if m.IsOffline() {
t.Fatalf("bad")
}
m.RemoveServer(s1)
m.RebalanceServers()
if !m.IsOffline() {
t.Fatalf("bad")
}
const failPct = 0.5
m = testManagerFailProb(failPct)
m.AddServer(s1)
var on, off int
for i := 0; i < 100; i++ {
m.RebalanceServers()
if m.IsOffline() {
off++
} else {
on++
}
}
if on == 0 || off == 0 {
t.Fatalf("bad: %d %d", on, off)
}
}
// func (m *Manager) FindServer() (server *agent.Server) {
func TestServers_FindServer(t *testing.T) {
m := testManager()

View File

@ -189,6 +189,7 @@ func (r *Router) addServer(area *areaInfo, s *agent.Server) error {
managers := r.managers[s.Datacenter]
r.managers[s.Datacenter] = append(managers, manager)
go manager.Start()
}
info.manager.AddServer(s)
@ -283,6 +284,10 @@ func (r *Router) FindRoute(datacenter string) (*Manager, *agent.Server, bool) {
// Try each manager until we get a server.
for _, manager := range managers {
if manager.IsOffline() {
continue
}
if s := manager.FindServer(); s != nil {
return manager, s, true
}

View File

@ -232,6 +232,86 @@ func TestRouter_Routing(t *testing.T) {
}
}
func TestRouter_Routing_Offline(t *testing.T) {
r := testRouter("dc0")
// Create a WAN-looking area.
self := "node0.dc0"
wan := testCluster(self)
if err := r.AddArea(types.AreaWAN, wan, &fauxConnPool{1.0}); err != nil {
t.Fatalf("err: %v", err)
}
// Adding the area should enable all the routes right away.
if _, _, ok := r.FindRoute("dc0"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc1"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc2"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dcX"); !ok {
t.Fatalf("bad")
}
// Do a rebalance for dc1, which should knock it offline.
func() {
r.Lock()
defer r.Unlock()
area, ok := r.areas[types.AreaWAN]
if !ok {
t.Fatalf("bad")
}
info, ok := area.managers["dc1"]
if !ok {
t.Fatalf("bad")
}
info.manager.RebalanceServers()
}()
// Recheck all the routes.
if _, _, ok := r.FindRoute("dc0"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc1"); ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc2"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dcX"); !ok {
t.Fatalf("bad")
}
// Add another area with a route to dc1.
otherID := types.AreaID("other")
other := newMockCluster(self)
other.AddMember("dc0", "node0", nil)
other.AddMember("dc1", "node1", nil)
if err := r.AddArea(otherID, other, &fauxConnPool{}); err != nil {
t.Fatalf("err: %v", err)
}
// Recheck all the routes and make sure it finds the one that's
// online.
if _, _, ok := r.FindRoute("dc0"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc1"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc2"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dcX"); !ok {
t.Fatalf("bad")
}
}
func TestRouter_GetDatacenters(t *testing.T) {
r := testRouter("dc0")