From 543389ad0a80df97833caa9127011a39a930c7f1 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 14 Mar 2017 17:47:37 -0700 Subject: [PATCH] Adds offline detection. --- consul/servers/manager.go | 29 +++++++++--- consul/servers/manager_test.go | 39 +++++++++++++++++ consul/servers/router.go | 5 +++ consul/servers/router_test.go | 80 ++++++++++++++++++++++++++++++++++ 4 files changed, 146 insertions(+), 7 deletions(-) diff --git a/consul/servers/manager.go b/consul/servers/manager.go index f05c9b2c6..5b316e2f3 100644 --- a/consul/servers/manager.go +++ b/consul/servers/manager.go @@ -99,6 +99,10 @@ type Manager struct { // notifyFailedBarrier is acts as a barrier to prevent queuing behind // serverListLog and acts as a TryLock(). notifyFailedBarrier int32 + + // offline is used to indicate that there are no servers, or that all + // known servers have failed the ping test. + offline int32 } // AddServer takes out an internal write lock and adds a new server. If the @@ -136,6 +140,10 @@ func (m *Manager) AddServer(s *agent.Server) { l.servers = newServers } + // Assume we are no longer offline since we've just seen a new server. + atomic.StoreInt32(&m.offline, 0) + + // Start using this list of servers. m.saveServerList(l) } @@ -180,6 +188,13 @@ func (l *serverList) shuffleServers() { } } +// IsOffline checks to see if all the known servers have failed their ping +// test during the last rebalance. +func (m *Manager) IsOffline() bool { + offline := atomic.LoadInt32(&m.offline) + return offline == 1 +} + // FindServer takes out an internal "read lock" and searches through the list // of servers to find a "healthy" server. If the server is actually // unhealthy, we rely on Serf to detect this and remove the node from the @@ -221,6 +236,7 @@ func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCl m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration) m.shutdownCh = shutdownCh + m.offline = 1 l := serverList{} l.servers = make([]*agent.Server, 0) @@ -280,11 +296,7 @@ func (m *Manager) RebalanceServers() { // Obtain a copy of the current serverList l := m.getServerList() - // Early abort if there is nothing to shuffle - if len(l.servers) < 2 { - return - } - + // Shuffle servers so we have a chance of picking a new one. l.shuffleServers() // Iterate through the shuffled server list to find an assumed @@ -307,8 +319,11 @@ func (m *Manager) RebalanceServers() { } // If no healthy servers were found, sleep and wait for Serf to make - // the world a happy place again. - if !foundHealthyServer { + // the world a happy place again. Update the offline status. + if foundHealthyServer { + atomic.StoreInt32(&m.offline, 0) + } else { + atomic.StoreInt32(&m.offline, 1) m.logger.Printf("[DEBUG] manager: No healthy servers during rebalance, aborting") return } diff --git a/consul/servers/manager_test.go b/consul/servers/manager_test.go index 654802edc..b87ea84fa 100644 --- a/consul/servers/manager_test.go +++ b/consul/servers/manager_test.go @@ -77,6 +77,45 @@ func TestServers_AddServer(t *testing.T) { } } +// func (m *Manager) IsOffline() bool { +func TestServers_IsOffline(t *testing.T) { + m := testManager() + if !m.IsOffline() { + t.Fatalf("bad") + } + + s1 := &agent.Server{Name: "s1"} + m.AddServer(s1) + if m.IsOffline() { + t.Fatalf("bad") + } + m.RebalanceServers() + if m.IsOffline() { + t.Fatalf("bad") + } + m.RemoveServer(s1) + m.RebalanceServers() + if !m.IsOffline() { + t.Fatalf("bad") + } + + const failPct = 0.5 + m = testManagerFailProb(failPct) + m.AddServer(s1) + var on, off int + for i := 0; i < 100; i++ { + m.RebalanceServers() + if m.IsOffline() { + off++ + } else { + on++ + } + } + if on == 0 || off == 0 { + t.Fatalf("bad: %d %d", on, off) + } +} + // func (m *Manager) FindServer() (server *agent.Server) { func TestServers_FindServer(t *testing.T) { m := testManager() diff --git a/consul/servers/router.go b/consul/servers/router.go index 7d595cc93..3255c66da 100644 --- a/consul/servers/router.go +++ b/consul/servers/router.go @@ -189,6 +189,7 @@ func (r *Router) addServer(area *areaInfo, s *agent.Server) error { managers := r.managers[s.Datacenter] r.managers[s.Datacenter] = append(managers, manager) + go manager.Start() } info.manager.AddServer(s) @@ -283,6 +284,10 @@ func (r *Router) FindRoute(datacenter string) (*Manager, *agent.Server, bool) { // Try each manager until we get a server. for _, manager := range managers { + if manager.IsOffline() { + continue + } + if s := manager.FindServer(); s != nil { return manager, s, true } diff --git a/consul/servers/router_test.go b/consul/servers/router_test.go index b94d5d960..59c2c121a 100644 --- a/consul/servers/router_test.go +++ b/consul/servers/router_test.go @@ -232,6 +232,86 @@ func TestRouter_Routing(t *testing.T) { } } +func TestRouter_Routing_Offline(t *testing.T) { + r := testRouter("dc0") + + // Create a WAN-looking area. + self := "node0.dc0" + wan := testCluster(self) + if err := r.AddArea(types.AreaWAN, wan, &fauxConnPool{1.0}); err != nil { + t.Fatalf("err: %v", err) + } + + // Adding the area should enable all the routes right away. + if _, _, ok := r.FindRoute("dc0"); !ok { + t.Fatalf("bad") + } + if _, _, ok := r.FindRoute("dc1"); !ok { + t.Fatalf("bad") + } + if _, _, ok := r.FindRoute("dc2"); !ok { + t.Fatalf("bad") + } + if _, _, ok := r.FindRoute("dcX"); !ok { + t.Fatalf("bad") + } + + // Do a rebalance for dc1, which should knock it offline. + func() { + r.Lock() + defer r.Unlock() + + area, ok := r.areas[types.AreaWAN] + if !ok { + t.Fatalf("bad") + } + + info, ok := area.managers["dc1"] + if !ok { + t.Fatalf("bad") + } + info.manager.RebalanceServers() + }() + + // Recheck all the routes. + if _, _, ok := r.FindRoute("dc0"); !ok { + t.Fatalf("bad") + } + if _, _, ok := r.FindRoute("dc1"); ok { + t.Fatalf("bad") + } + if _, _, ok := r.FindRoute("dc2"); !ok { + t.Fatalf("bad") + } + if _, _, ok := r.FindRoute("dcX"); !ok { + t.Fatalf("bad") + } + + // Add another area with a route to dc1. + otherID := types.AreaID("other") + other := newMockCluster(self) + other.AddMember("dc0", "node0", nil) + other.AddMember("dc1", "node1", nil) + if err := r.AddArea(otherID, other, &fauxConnPool{}); err != nil { + t.Fatalf("err: %v", err) + } + + // Recheck all the routes and make sure it finds the one that's + // online. + if _, _, ok := r.FindRoute("dc0"); !ok { + t.Fatalf("bad") + } + if _, _, ok := r.FindRoute("dc1"); !ok { + t.Fatalf("bad") + } + if _, _, ok := r.FindRoute("dc2"); !ok { + t.Fatalf("bad") + } + if _, _, ok := r.FindRoute("dcX"); !ok { + t.Fatalf("bad") + } +} + func TestRouter_GetDatacenters(t *testing.T) { r := testRouter("dc0")