From bc0494e3968e056542071ad91674cf4d38b38e67 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 15 Mar 2017 18:27:17 -0700 Subject: [PATCH] Reorganized cluster health check loop and logic --- api/operator.go | 3 + command/agent/operator_endpoint.go | 1 + consul/autopilot.go | 224 +++++++++--------- consul/leader_test.go | 2 +- consul/operator_endpoint_test.go | 17 +- consul/structs/operator.go | 3 + .../docs/agent/http/operator.html.markdown | 6 + 7 files changed, 135 insertions(+), 121 deletions(-) diff --git a/api/operator.go b/api/operator.go index 38a5a5c48..625afc975 100644 --- a/api/operator.go +++ b/api/operator.go @@ -112,6 +112,9 @@ type ServerHealth struct { // Name is the node name of the server. Name string + // Address is the address of the server. + Address string + // The status of the SerfHealth check for the server. SerfStatus string diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index fb3030e3a..c1b17d561 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -298,6 +298,7 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re out.Servers = append(out.Servers, api.ServerHealth{ ID: server.ID, Name: server.Name, + Address: server.Address, SerfStatus: server.SerfStatus.String(), LastContact: api.NewReadableDuration(server.LastContact), LastTerm: server.LastTerm, diff --git a/consul/autopilot.go b/consul/autopilot.go index dafad50c3..2ee641430 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -199,137 +199,137 @@ func (s *Server) serverHealthLoop() { case <-s.shutdownCh: return case <-ticker.C: - // Don't do anything if the min Raft version is too low - minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers()) - if err != nil { - s.logger.Printf("[ERR] consul: error getting server raft protocol versions: %s", err) - break - } - if minRaftProtocol < 3 { - break - } - - state := s.fsm.State() - _, autopilotConf, err := state.AutopilotConfig() - if err != nil { - s.logger.Printf("[ERR] consul: error retrieving autopilot config: %s", err) - break - } - // Bail early if autopilot config hasn't been initialized yet - if autopilotConf == nil { - break - } - - // Get the the serf members which are Consul servers - serverMap := make(map[string]serf.Member) - for _, member := range s.LANMembers() { - if member.Status == serf.StatusLeft { - continue - } - - valid, parts := agent.IsConsulServer(member) - if valid { - serverMap[parts.ID] = member - } - } - - future := s.raft.GetConfiguration() - if err := future.Error(); err != nil { - s.logger.Printf("[ERR] consul: error getting Raft configuration %s", err) - break - } - - // Build a current list of server healths - var clusterHealth structs.OperatorHealthReply - servers := future.Configuration().Servers - healthyCount := 0 - voterCount := 0 - for _, server := range servers { - member, ok := serverMap[string(server.ID)] - if !ok { - s.logger.Printf("[DEBUG] consul: couldn't find serf member for server with ID %q", server.ID) - continue - } - - health, err := s.queryServerHealth(member, autopilotConf) - if err != nil { - s.logger.Printf("[ERR] consul: error fetching server health: %s", err) - clusterHealth.Servers = append(clusterHealth.Servers, structs.ServerHealth{ - ID: string(server.ID), - Name: member.Name, - SerfStatus: serf.StatusFailed, - }) - continue - } - - if health.Healthy { - healthyCount++ - } - - if server.Suffrage != raft.Nonvoter { - health.Voter = true - voterCount++ - } - clusterHealth.Servers = append(clusterHealth.Servers, *health) - } - clusterHealth.Healthy = healthyCount == len(servers) - - // If we have extra healthy voters, update FailureTolerance - if voterCount > len(servers)/2+1 { - clusterHealth.FailureTolerance = voterCount - (len(servers)/2 + 1) - } - - // Heartbeat a metric for monitoring if we're the leader - if s.IsLeader() { - metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance)) - if clusterHealth.Healthy { - metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1) - } else { - metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0) - } - } - - s.clusterHealthLock.Lock() - s.clusterHealth = clusterHealth - s.clusterHealthLock.Unlock() + s.updateClusterHealth() } } } -// queryServerHealth fetches the raft stats for the given server and uses them +// updateClusterHealth fetches the Raft stats of the other servers and updates +// s.clusterHealth based on the configured Autopilot thresholds +func (s *Server) updateClusterHealth() error { + // Don't do anything if the min Raft version is too low + minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers()) + if err != nil { + return fmt.Errorf("error getting server raft protocol versions: %s", err) + } + if minRaftProtocol < 3 { + return nil + } + + state := s.fsm.State() + _, autopilotConf, err := state.AutopilotConfig() + if err != nil { + return fmt.Errorf("error retrieving autopilot config: %s", err) + } + // Bail early if autopilot config hasn't been initialized yet + if autopilotConf == nil { + return nil + } + + // Get the the serf members which are Consul servers + serverMap := make(map[string]serf.Member) + for _, member := range s.LANMembers() { + if member.Status == serf.StatusLeft { + continue + } + + valid, parts := agent.IsConsulServer(member) + if valid { + serverMap[parts.ID] = member + } + } + + future := s.raft.GetConfiguration() + if err := future.Error(); err != nil { + return fmt.Errorf("error getting Raft configuration %s", err) + } + + // Build a current list of server healths + var clusterHealth structs.OperatorHealthReply + servers := future.Configuration().Servers + healthyCount := 0 + voterCount := 0 + for _, server := range servers { + health := structs.ServerHealth{ + ID: string(server.ID), + Address: string(server.Address), + LastContact: -1, + Voter: server.Suffrage != raft.Nonvoter, + } + + // Set LastContact to 0 for the leader + if s.raft.Leader() == server.Address { + health.LastContact = 0 + } + + member, ok := serverMap[string(server.ID)] + if ok { + health.Name = member.Name + health.SerfStatus = member.Status + if err := s.updateServerHealth(&health, member, autopilotConf); err != nil { + s.logger.Printf("[ERR] consul: error getting server health: %s", err) + } + } else { + health.SerfStatus = serf.StatusNone + } + + if health.Healthy { + healthyCount++ + } + + if health.Voter { + voterCount++ + } + + clusterHealth.Servers = append(clusterHealth.Servers, health) + } + clusterHealth.Healthy = healthyCount == len(servers) + + // If we have extra healthy voters, update FailureTolerance + if voterCount > len(servers)/2+1 { + clusterHealth.FailureTolerance = voterCount - (len(servers)/2 + 1) + } + + // Heartbeat a metric for monitoring if we're the leader + if s.IsLeader() { + metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance)) + if clusterHealth.Healthy { + metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1) + } else { + metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0) + } + } + + s.clusterHealthLock.Lock() + s.clusterHealth = clusterHealth + s.clusterHealthLock.Unlock() + + return nil +} + +// updateServerHealth fetches the raft stats for the given server and uses them // to update its ServerHealth -func (s *Server) queryServerHealth(member serf.Member, autopilotConf *structs.AutopilotConfig) (*structs.ServerHealth, error) { +func (s *Server) updateServerHealth(health *structs.ServerHealth, member serf.Member, autopilotConf *structs.AutopilotConfig) error { _, server := agent.IsConsulServer(member) stats, err := s.getServerStats(server) if err != nil { - return nil, fmt.Errorf("error getting raft stats: %s", err) + return fmt.Errorf("error getting raft stats: %s", err) } - health := &structs.ServerHealth{ - ID: server.ID, - Name: server.Name, - SerfStatus: member.Status, - LastContact: -1, - LastTerm: stats.LastTerm, - LastIndex: stats.LastIndex, - } + health.LastTerm = stats.LastTerm + health.LastIndex = stats.LastIndex if stats.LastContact != "never" { health.LastContact, err = time.ParseDuration(stats.LastContact) if err != nil { - return nil, fmt.Errorf("error parsing last_contact duration: %s", err) + return fmt.Errorf("error parsing last_contact duration: %s", err) } } - // Set LastContact to 0 for the leader - if s.config.NodeName == member.Name { - health.LastContact = 0 - } - lastTerm, err := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64) if err != nil { - return nil, fmt.Errorf("error parsing last_log_term: %s", err) + return fmt.Errorf("error parsing last_log_term: %s", err) } health.Healthy = health.IsHealthy(lastTerm, s.raft.LastIndex(), autopilotConf) @@ -341,7 +341,7 @@ func (s *Server) queryServerHealth(member serf.Member, autopilotConf *structs.Au health.StableSince = lastHealth.StableSince } - return health, nil + return nil } func (s *Server) getClusterHealth() structs.OperatorHealthReply { diff --git a/consul/leader_test.go b/consul/leader_test.go index 762ed1578..7dc9e0a24 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -874,4 +874,4 @@ func TestLeader_ChangeServerID(t *testing.T) { t.Fatalf("should have 3 peers") }) } -} \ No newline at end of file +} diff --git a/consul/operator_endpoint_test.go b/consul/operator_endpoint_test.go index 8fde7de4a..bfe21d7da 100644 --- a/consul/operator_endpoint_test.go +++ b/consul/operator_endpoint_test.go @@ -479,14 +479,15 @@ func TestOperator_ServerHealth(t *testing.T) { if len(reply.Servers) != 3 { return false, fmt.Errorf("bad: %v", reply) } - if reply.Servers[0].LastContact != 0 { - return false, fmt.Errorf("bad: %v", reply) - } - if reply.Servers[1].LastContact <= 0 { - return false, fmt.Errorf("bad: %v", reply) - } - if reply.Servers[2].LastContact <= 0 { - return false, fmt.Errorf("bad: %v", reply) + // Leader should have LastContact == 0, others should be positive + for _, s := range reply.Servers { + isLeader := s1.raft.Leader() == raft.ServerAddress(s.Address) + if isLeader && s.LastContact != 0 { + return false, fmt.Errorf("bad: %v", reply) + } + if !isLeader && s.LastContact <= 0 { + return false, fmt.Errorf("bad: %v", reply) + } } return true, nil }, func(err error) { diff --git a/consul/structs/operator.go b/consul/structs/operator.go index 5fc6ef36d..619accaee 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -111,6 +111,9 @@ type ServerHealth struct { // Name is the node name of the server. Name string + // Address is the address of the server. + Address string + // The status of the SerfHealth check for the server. SerfStatus serf.MemberStatus diff --git a/website/source/docs/agent/http/operator.html.markdown b/website/source/docs/agent/http/operator.html.markdown index 6918dd8e2..922324d12 100644 --- a/website/source/docs/agent/http/operator.html.markdown +++ b/website/source/docs/agent/http/operator.html.markdown @@ -360,6 +360,7 @@ A JSON body is returned that looks like this: { "ID": "e349749b-3303-3ddf-959c-b5885a0e1f6e", "Name": "node1", + "Address": "127.0.0.1:8300", "SerfStatus": "alive", "LastContact": "0s", "LastTerm": 2, @@ -371,6 +372,7 @@ A JSON body is returned that looks like this: { "ID": "e36ee410-cc3c-0a0c-c724-63817ab30303", "Name": "node2", + "Address": "127.0.0.1:8205", "SerfStatus": "alive", "LastContact": "27.291304ms", "LastTerm": 2, @@ -394,6 +396,8 @@ The `Servers` list holds detailed health information on each server: - `Name` is the node name of the server. +- `Address` is the address of the server. + - `SerfStatus` is the SerfHealth check status for the server. - `LastContact` is the time elapsed since this server's last contact with the leader. @@ -404,4 +408,6 @@ The `Servers` list holds detailed health information on each server: - `Healthy` is whether the server is healthy according to the current Autopilot configuration. +- `Voter` is whether the server is a voting member of the Raft cluster. + - `StableSince` is the time this server has been in its current `Healthy` state. \ No newline at end of file