diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index 9701109f4..cb1ccd93a 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -242,5 +242,10 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re return nil, err } + // Reply with status 429 if something is unhealthy + if !reply.Healthy { + resp.WriteHeader(http.StatusTooManyRequests) + } + return reply, nil } diff --git a/command/agent/operator_endpoint_test.go b/command/agent/operator_endpoint_test.go index 172551b70..20b6614f5 100644 --- a/command/agent/operator_endpoint_test.go +++ b/command/agent/operator_endpoint_test.go @@ -8,8 +8,8 @@ import ( "strings" "testing" - "github.com/hashicorp/consul-enterprise/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "github.com/hashicorp/serf/serf" ) diff --git a/consul/autopilot.go b/consul/autopilot.go index ed09d32f5..1df9c9571 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -92,27 +92,30 @@ func (s *Server) pruneDeadServers() error { return err } - // Look for dead servers to clean up + // Find any failed servers + var failed []string + if autopilotConf.CleanupDeadServers { + for _, member := range s.serfLAN.Members() { + valid, _ := agent.IsConsulServer(member) + if valid && member.Status == serf.StatusFailed { + failed = append(failed, member.Name) + } + } + } + peers, err := s.numPeers() if err != nil { return err } - removed := 0 - if autopilotConf.CleanupDeadServers { - for _, member := range s.serfLAN.Members() { - // Exit early if we already removed the max amount of servers - if removed == peers/2 { - break - } - - valid, _ := agent.IsConsulServer(member) - if valid && member.Status == serf.StatusFailed { - removed++ - s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", member.Name) - go s.serfLAN.RemoveFailedNode(member.Name) - } + // Only do removals if a minority of servers will be affected + if len(failed) <= peers/2 { + for _, server := range failed { + s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", server) + go s.serfLAN.RemoveFailedNode(server) } + } else { + s.logger.Printf("[ERR] consul: Failed to remove dead servers: too many dead servers: %d/%d", len(failed), peers) } return nil @@ -125,62 +128,66 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error return fmt.Errorf("error getting server raft protocol versions: %s", err) } - if minRaftProtocol >= 3 { - future := s.raft.GetConfiguration() - if err := future.Error(); err != nil { - return fmt.Errorf("failed to get raft configuration: %v", err) - } + // If we don't meet the minimum version for non-voter features, bail early + if minRaftProtocol < 3 { + return nil + } - var promotions []raft.Server - raftServers := future.Configuration().Servers - voterCount := 0 - for _, server := range raftServers { - // If this server has been stable and passing for long enough, promote it to a voter - if server.Suffrage == raft.Nonvoter { - health := s.getServerHealth(string(server.Address)) - if health != nil && health.Healthy && time.Now().Sub(health.StableSince) >= autopilotConf.ServerStabilizationTime { - promotions = append(promotions, server) - } - } else { - voterCount++ + future := s.raft.GetConfiguration() + if err := future.Error(); err != nil { + return fmt.Errorf("failed to get raft configuration: %v", err) + } + + var promotions []raft.Server + raftServers := future.Configuration().Servers + voterCount := 0 + for _, server := range raftServers { + // If this server has been stable and passing for long enough, promote it to a voter + if server.Suffrage == raft.Nonvoter { + health := s.getServerHealth(string(server.Address)) + if health != nil && health.Healthy && time.Now().Sub(health.StableSince) >= autopilotConf.ServerStabilizationTime { + promotions = append(promotions, server) } + } else { + voterCount++ } + } - // Exit early if there's nothing to promote - if len(promotions) == 0 { - return nil + // Exit early if there's nothing to promote + if len(promotions) == 0 { + return nil + } + + // If there's currently an even number of servers, we can promote the first server in the list + // to get to an odd-sized quorum + newServers := false + if voterCount%2 == 0 { + addFuture := s.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0) + if err := addFuture.Error(); err != nil { + return fmt.Errorf("failed to add raft peer: %v", err) } + promotions = promotions[1:] + newServers = true + } - // If there's currently an even number of servers, we can promote the first server in the list - // to get to an odd-sized quorum - newServers := false - if voterCount%2 == 0 { - addFuture := s.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0) - if err := addFuture.Error(); err != nil { - return fmt.Errorf("failed to add raft peer: %v", err) - } - promotions = promotions[1:] - newServers = true + // Promote remaining servers in twos to maintain an odd quorum size + for i := 0; i < len(promotions)-1; i += 2 { + addFirst := s.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0) + if err := addFirst.Error(); err != nil { + return fmt.Errorf("failed to add raft peer: %v", err) } - - // Promote remaining servers in twos to maintain an odd quorum size - for i := 0; i < len(promotions)-1; i += 2 { - addFirst := s.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0) - if err := addFirst.Error(); err != nil { - return fmt.Errorf("failed to add raft peer: %v", err) - } - addSecond := s.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0) - if err := addSecond.Error(); err != nil { - return fmt.Errorf("failed to add raft peer: %v", err) - } - newServers = true + addSecond := s.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0) + if err := addSecond.Error(); err != nil { + return fmt.Errorf("failed to add raft peer: %v", err) } + newServers = true + } - // If we added a new server, trigger a check to remove dead servers - if newServers { - go func() { - s.autopilotRemoveDeadCh <- struct{}{} - }() + // If we added a new server, trigger a check to remove dead servers + if newServers { + select { + case s.autopilotRemoveDeadCh <- struct{}{}: + default: } } @@ -190,47 +197,35 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error // queryServerHealth fetches the raft stats for the given server and uses them // to update its ServerHealth func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, autopilotConf *structs.AutopilotConfig) *structs.ServerHealth { + stats, err := s.getServerStats(server) + if err != nil { + s.logger.Printf("[DEBUG] consul: error getting server's raft stats: %s", err) + } + health := &structs.ServerHealth{ ID: server.ID, Name: server.Name, SerfStatusRaw: member.Status, SerfStatus: member.Status.String(), LastContactRaw: -1, - LastContact: "never", + LastContact: stats.LastContact, + LastTerm: stats.LastTerm, + LastIndex: stats.LastIndex, } - stats, err := s.getServerStats(server) - if err != nil { - s.logger.Printf("[DEBUG] consul: error getting server's raft stats: %s", err) - } - - if v, ok := stats["last_contact"]; ok && v != "never" { - health.LastContactRaw, err = time.ParseDuration(v) + if health.LastContact != "never" { + health.LastContactRaw, err = time.ParseDuration(health.LastContact) if err != nil { s.logger.Printf("[DEBUG] consul: error parsing server's last_contact value: %s", err) } - health.LastContact = health.LastContactRaw.String() } - // Set LastContact to 0 if we're the leader + + // Set LastContact to 0 for the leader if s.config.NodeName == member.Name { health.LastContactRaw = 0 health.LastContact = "leader" } - if v, ok := stats["last_log_index"]; ok { - health.LastIndex, err = strconv.ParseUint(v, 10, 64) - if err != nil { - s.logger.Printf("[DEBUG] consul: error parsing server's last_log_index value: %s", err) - } - } - - if v, ok := stats["last_log_term"]; ok { - health.LastTerm, err = strconv.ParseUint(v, 10, 64) - if err != nil { - s.logger.Printf("[DEBUG] consul: error parsing server's last_log_term value: %s", err) - } - } - health.Healthy = s.isServerHealthy(health, autopilotConf) // If this is a new server or the health changed, reset StableSince @@ -254,10 +249,10 @@ func (s *Server) getServerHealth(addr string) *structs.ServerHealth { return h } -func (s *Server) getServerStats(server *agent.Server) (map[string]string, error) { +func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) { var args struct{} - var reply map[string]string - err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Operator.RaftStats", &args, &reply) + var reply structs.ServerStats + err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) return reply, err } diff --git a/consul/leader.go b/consul/leader.go index cd09dcba4..e2d519b16 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -624,9 +624,10 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { } // Trigger a check to remove dead servers - go func() { - s.autopilotRemoveDeadCh <- struct{}{} - }() + select { + case s.autopilotRemoveDeadCh <- struct{}{}: + default: + } return nil } diff --git a/consul/operator_endpoint.go b/consul/operator_endpoint.go index e9f6272ed..e30a1f434 100644 --- a/consul/operator_endpoint.go +++ b/consul/operator_endpoint.go @@ -185,12 +185,6 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe return nil } -// Used by Autopilot to query the raft stats of the local server. -func (op *Operator) RaftStats(args struct{}, reply *map[string]string) error { - *reply = op.srv.raft.Stats() - return nil -} - // ServerHealth is used to get the current health of the servers. func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs.OperatorHealthReply) error { // This must be sent to the leader, so we fix the args since we are diff --git a/consul/status_endpoint.go b/consul/status_endpoint.go index 2cac03a7f..8c6e3fcfb 100644 --- a/consul/status_endpoint.go +++ b/consul/status_endpoint.go @@ -1,5 +1,12 @@ package consul +import ( + "fmt" + "strconv" + + "github.com/hashicorp/consul/consul/structs" +) + // Status endpoint is used to check on server status type Status struct { server *Server @@ -33,3 +40,21 @@ func (s *Status) Peers(args struct{}, reply *[]string) error { } return nil } + +// Used by Autopilot to query the raft stats of the local server. +func (s *Status) RaftStats(args struct{}, reply *structs.ServerStats) error { + stats := s.server.raft.Stats() + + var err error + reply.LastContact = stats["last_contact"] + reply.LastIndex, err = strconv.ParseUint(stats["last_log_index"], 10, 64) + if err != nil { + return fmt.Errorf("error parsing server's last_log_index value: %s", err) + } + reply.LastTerm, err = strconv.ParseUint(stats["last_log_term"], 10, 64) + if err != nil { + return fmt.Errorf("error parsing server's last_log_term value: %s", err) + } + + return nil +} diff --git a/consul/structs/operator.go b/consul/structs/operator.go index d13c5d904..1babb4972 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -132,6 +132,18 @@ type ServerHealth struct { StableSince time.Time } +// ServerStats holds miscellaneous Raft metrics for a server +type ServerStats struct { + // LastContact is the time since this node's last contact with the leader. + LastContact string + + // LastTerm is the highest leader term this server has a record of in its Raft log. + LastTerm uint64 + + // LastIndex is the last log index this server has a record of in its Raft log. + LastIndex uint64 +} + // OperatorHealthReply is a representation of the overall health of the cluster type OperatorHealthReply struct { // Healthy is true if all the servers in the cluster are healthy.