From 8bcab6c6d76baae4b86e2c501efa4848dc761a20 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 1 Mar 2017 14:04:40 -0800 Subject: [PATCH 1/5] Add autopilot server health tracking This adds two goroutines to perform autopilot tasks on the leader - one to monitor the health of servers and another to periodically clean up dead servers with a limit on removal count. Also adds a new http endpoint, `/v1/operator/autopilot/health`, for querying this information through an operator RPC endpoint. --- api/operator.go | 14 + command/agent/agent.go | 9 + command/agent/command.go | 2 + command/agent/config.go | 51 +++- command/agent/config_test.go | 23 +- command/agent/http.go | 1 + command/agent/operator_endpoint.go | 20 ++ command/agent/operator_endpoint_test.go | 39 +++ command/operator_autopilot_get.go | 3 + command/operator_autopilot_set.go | 37 ++- consul/autopilot.go | 286 ++++++++++++++++++ consul/autopilot_test.go | 235 ++++++++++++++ consul/config.go | 15 +- consul/leader.go | 36 +-- consul/leader_test.go | 70 ----- consul/operator_endpoint.go | 61 ++++ consul/operator_endpoint_test.go | 76 +++++ consul/server.go | 32 +- consul/structs/operator.go | 63 +++- .../docs/agent/http/operator.html.markdown | 92 +++++- .../source/docs/agent/options.html.markdown | 26 +- .../operator/autopilot.html.markdown.erb | 13 + 22 files changed, 1086 insertions(+), 118 deletions(-) create mode 100644 consul/autopilot.go create mode 100644 consul/autopilot_test.go diff --git a/api/operator.go b/api/operator.go index b7129cdde..cb5516a63 100644 --- a/api/operator.go +++ b/api/operator.go @@ -6,6 +6,7 @@ import ( "io" "strconv" "strings" + "time" ) // Operator can be used to perform low-level operator tasks for Consul. @@ -79,6 +80,19 @@ type AutopilotConfiguration struct { // peer list when a new server joins CleanupDeadServers bool + // LastContactThreshold is the limit on the amount of time a server can go + // without leader contact before being considered unhealthy. + LastContactThreshold time.Duration + + // MaxTrailingLogs is the amount of entries in the Raft Log that a server can + // be behind before being considered unhealthy. + MaxTrailingLogs uint64 + + // ServerStabilizationTime is the minimum amount of time a server must be + // in a stable, healthy state before it can be added to the cluster. Only + // applicable with Raft protocol version 3 or higher. + ServerStabilizationTime time.Duration + // CreateIndex holds the index corresponding the creation of this configuration. // This is a read-only field. CreateIndex uint64 diff --git a/command/agent/agent.go b/command/agent/agent.go index bbec4bd8f..2b3e85c08 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -419,6 +419,15 @@ func (a *Agent) consulConfig() *consul.Config { if a.config.Autopilot.CleanupDeadServers != nil { base.AutopilotConfig.CleanupDeadServers = *a.config.Autopilot.CleanupDeadServers } + if a.config.Autopilot.LastContactThreshold != nil { + base.AutopilotConfig.LastContactThreshold = *a.config.Autopilot.LastContactThreshold + } + if a.config.Autopilot.MaxTrailingLogs != nil { + base.AutopilotConfig.MaxTrailingLogs = *a.config.Autopilot.MaxTrailingLogs + } + if a.config.Autopilot.ServerStabilizationTime != nil { + base.AutopilotConfig.ServerStabilizationTime = *a.config.Autopilot.ServerStabilizationTime + } // Format the build string revision := a.config.Revision diff --git a/command/agent/command.go b/command/agent/command.go index 1ee73527c..65fe3435d 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -135,6 +135,8 @@ func (c *Command) readConfig() *Config { f.IntVar(&cmdConfig.Protocol, "protocol", -1, "Sets the protocol version. Defaults to latest.") + f.IntVar(&cmdConfig.RaftProtocol, "raft-protocol", -1, + "Sets the Raft protocol version. Defaults to latest.") f.BoolVar(&cmdConfig.EnableSyslog, "syslog", false, "Enables logging to syslog.") diff --git a/command/agent/config.go b/command/agent/config.go index bd507628c..56ccc8317 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -265,6 +265,21 @@ type Autopilot struct { // CleanupDeadServers enables the automatic cleanup of dead servers when new ones // are added to the peer list. Defaults to true. CleanupDeadServers *bool `mapstructure:"cleanup_dead_servers"` + + // LastContactThreshold is the limit on the amount of time a server can go + // without leader contact before being considered unhealthy. + LastContactThreshold *time.Duration `mapstructure:"-" json:"-"` + LastContactThresholdRaw string `mapstructure:"last_contact_threshold"` + + // MaxTrailingLogs is the amount of entries in the Raft Log that a server can + // be behind before being considered unhealthy. + MaxTrailingLogs *uint64 `mapstructure:"max_trailing_logs"` + + // ServerStabilizationTime is the minimum amount of time a server must be + // in a stable, healthy state before it can be added to the cluster. Only + // applicable with Raft protocol version 3 or higher. + ServerStabilizationTime *time.Duration `mapstructure:"-" json:"-"` + ServerStabilizationTimeRaw string `mapstructure:"server_stabilization_time"` } // Config is the configuration that can be set for an Agent. @@ -692,6 +707,16 @@ func Bool(b bool) *bool { return &b } +// Uint64 is used to initialize uint64 pointers in struct literals. +func Uint64(i uint64) *uint64 { + return &i +} + +// Duration is used to initialize time.Duration pointers in struct literals. +func Duration(d time.Duration) *time.Duration { + return &d +} + // UnixSocketPermissions contains information about a unix socket, and // implements the FilePermissions interface. type UnixSocketPermissions struct { @@ -1041,6 +1066,21 @@ func DecodeConfig(r io.Reader) (*Config, error) { result.ReconnectTimeoutWan = dur } + if raw := result.Autopilot.LastContactThresholdRaw; raw != "" { + dur, err := time.ParseDuration(raw) + if err != nil { + return nil, fmt.Errorf("LastContactThreshold invalid: %v", err) + } + result.Autopilot.LastContactThreshold = &dur + } + if raw := result.Autopilot.ServerStabilizationTimeRaw; raw != "" { + dur, err := time.ParseDuration(raw) + if err != nil { + return nil, fmt.Errorf("ServerStabilizationTime invalid: %v", err) + } + result.Autopilot.ServerStabilizationTime = &dur + } + // Merge the single recursor if result.DNSRecursor != "" { result.DNSRecursors = append(result.DNSRecursors, result.DNSRecursor) @@ -1293,7 +1333,7 @@ func MergeConfig(a, b *Config) *Config { if b.Protocol > 0 { result.Protocol = b.Protocol } - if b.RaftProtocol != 0 { + if b.RaftProtocol > 0 { result.RaftProtocol = b.RaftProtocol } if b.NodeID != "" { @@ -1347,6 +1387,15 @@ func MergeConfig(a, b *Config) *Config { if b.Autopilot.CleanupDeadServers != nil { result.Autopilot.CleanupDeadServers = b.Autopilot.CleanupDeadServers } + if b.Autopilot.LastContactThreshold != nil { + result.Autopilot.LastContactThreshold = b.Autopilot.LastContactThreshold + } + if b.Autopilot.MaxTrailingLogs != nil { + result.Autopilot.MaxTrailingLogs = b.Autopilot.MaxTrailingLogs + } + if b.Autopilot.ServerStabilizationTime != nil { + result.Autopilot.ServerStabilizationTime = b.Autopilot.ServerStabilizationTime + } if b.Telemetry.DisableHostname == true { result.Telemetry.DisableHostname = true } diff --git a/command/agent/config_test.go b/command/agent/config_test.go index e5814dda8..6ca7143a3 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -1103,13 +1103,27 @@ func TestDecodeConfig_Performance(t *testing.T) { } func TestDecodeConfig_Autopilot(t *testing.T) { - input := `{"autopilot": { "cleanup_dead_servers": true }}` + input := `{"autopilot": { + "cleanup_dead_servers": true, + "last_contact_threshold": "100ms", + "max_trailing_logs": 10, + "server_stabilization_time": "10s" + }}` config, err := DecodeConfig(bytes.NewReader([]byte(input))) if err != nil { t.Fatalf("err: %s", err) } if config.Autopilot.CleanupDeadServers == nil || !*config.Autopilot.CleanupDeadServers { - t.Fatalf("bad: cleanup_dead_servers isn't set: %#v", config) + t.Fatalf("bad: %#v", config) + } + if config.Autopilot.LastContactThreshold == nil || *config.Autopilot.LastContactThreshold != 100*time.Millisecond { + t.Fatalf("bad: %#v", config) + } + if config.Autopilot.MaxTrailingLogs == nil || *config.Autopilot.MaxTrailingLogs != 10 { + t.Fatalf("bad: %#v", config) + } + if config.Autopilot.ServerStabilizationTime == nil || *config.Autopilot.ServerStabilizationTime != 10*time.Second { + t.Fatalf("bad: %#v", config) } } @@ -1629,7 +1643,10 @@ func TestMergeConfig(t *testing.T) { SkipLeaveOnInt: Bool(true), RaftProtocol: 3, Autopilot: Autopilot{ - CleanupDeadServers: Bool(true), + CleanupDeadServers: Bool(true), + LastContactThreshold: Duration(time.Duration(10)), + MaxTrailingLogs: Uint64(10), + ServerStabilizationTime: Duration(time.Duration(100)), }, EnableDebug: true, VerifyIncoming: true, diff --git a/command/agent/http.go b/command/agent/http.go index 439a2daaa..317df3a10 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -298,6 +298,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer)) s.handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint)) s.handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration)) + s.handleFuncMetrics("/v1/operator/autopilot/health", s.wrap(s.OperatorServerHealth)) s.handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral)) s.handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific)) s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate)) diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index d9cb4de63..9701109f4 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -224,3 +224,23 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re return nil, nil } } + +// OperatorServerHealth is used to get the health of the servers in the local DC +func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "GET" { + resp.WriteHeader(http.StatusMethodNotAllowed) + return nil, nil + } + + var args structs.DCSpecificRequest + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + + var reply structs.OperatorHealthReply + if err := s.agent.RPC("Operator.ServerHealth", &args, &reply); err != nil { + return nil, err + } + + return reply, nil +} diff --git a/command/agent/operator_endpoint_test.go b/command/agent/operator_endpoint_test.go index 0915eba45..172551b70 100644 --- a/command/agent/operator_endpoint_test.go +++ b/command/agent/operator_endpoint_test.go @@ -8,7 +8,9 @@ import ( "strings" "testing" + "github.com/hashicorp/consul-enterprise/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/serf/serf" ) func TestOperator_OperatorRaftConfiguration(t *testing.T) { @@ -420,3 +422,40 @@ func TestOperator_AutopilotCASConfiguration(t *testing.T) { } }) } + +func TestOperator_OperatorServerHealth(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + body := bytes.NewBuffer(nil) + req, err := http.NewRequest("GET", "/v1/operator/autopilot/health", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForResult(func() (bool, error) { + resp := httptest.NewRecorder() + obj, err := srv.OperatorServerHealth(resp, req) + if err != nil { + return false, fmt.Errorf("err: %v", err) + } + if resp.Code != 200 { + return false, fmt.Errorf("bad code: %d", resp.Code) + } + out, ok := obj.(structs.OperatorHealthReply) + if !ok { + return false, fmt.Errorf("unexpected: %T", obj) + } + if len(out.Servers) != 1 || + !out.Servers[0].Healthy || + out.Servers[0].Name != srv.agent.config.NodeName || + out.Servers[0].SerfStatusRaw != serf.StatusAlive || + out.FailureTolerance != 0 { + return false, fmt.Errorf("bad: %v", out) + } + + return true, nil + }, func(err error) { + t.Fatal(err) + }) + + }) +} diff --git a/command/operator_autopilot_get.go b/command/operator_autopilot_get.go index d5b195d9f..68314fe3f 100644 --- a/command/operator_autopilot_get.go +++ b/command/operator_autopilot_get.go @@ -56,6 +56,9 @@ func (c *OperatorAutopilotGetCommand) Run(args []string) int { return 1 } c.Ui.Output(fmt.Sprintf("CleanupDeadServers = %v", config.CleanupDeadServers)) + c.Ui.Output(fmt.Sprintf("LastContactThreshold = %v", config.LastContactThreshold.String())) + c.Ui.Output(fmt.Sprintf("MaxTrailingLogs = %v", config.MaxTrailingLogs)) + c.Ui.Output(fmt.Sprintf("ServerStabilizationTime = %v", config.ServerStabilizationTime.String())) return 0 } diff --git a/command/operator_autopilot_set.go b/command/operator_autopilot_set.go index d87687e03..b4f4fe73d 100644 --- a/command/operator_autopilot_set.go +++ b/command/operator_autopilot_set.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "strings" + "time" "github.com/hashicorp/consul/command/base" ) @@ -29,12 +30,27 @@ func (c *OperatorAutopilotSetCommand) Synopsis() string { func (c *OperatorAutopilotSetCommand) Run(args []string) int { var cleanupDeadServers base.BoolValue + var lastContactThresholdRaw string + var maxTrailingLogs base.UintValue + var serverStabilizationTimeRaw string f := c.Command.NewFlagSet(c) f.Var(&cleanupDeadServers, "cleanup-dead-servers", "Controls whether Consul will automatically remove dead servers "+ "when new ones are successfully added. Must be one of `true|false`.") + f.Var(&maxTrailingLogs, "max-trailing-logs", + "Controls the maximum number of log entries that a server can trail the "+ + "leader by before being considered unhealthy.") + f.StringVar(&lastContactThresholdRaw, "last-contact-threshold", "", + "Controls the maximum amount of time a server can go without contact "+ + "from the leader before being considered unhealthy. Must be a duration value "+ + "such as `10s`.") + f.StringVar(&serverStabilizationTimeRaw, "server-stabilization-time", "", + "Controls the minimum amount of time a server must be stable in the "+ + "'healthy' state before being added to the cluster. Only takes effect if all "+ + "servers are running Raft protocol version 3 or higher. Must be a duration "+ + "value such as `10s`.") if err := c.Command.Parse(args); err != nil { if err == flag.ErrHelp { @@ -59,8 +75,27 @@ func (c *OperatorAutopilotSetCommand) Run(args []string) int { return 1 } - // Update the config values. + // Update the config values based on the set flags. cleanupDeadServers.Merge(&conf.CleanupDeadServers) + trailing := uint(conf.MaxTrailingLogs) + maxTrailingLogs.Merge(&trailing) + conf.MaxTrailingLogs = uint64(trailing) + + if lastContactThresholdRaw != "" { + dur, err := time.ParseDuration(lastContactThresholdRaw) + if err != nil { + c.Ui.Error(fmt.Sprintf("invalid value for last-contact-threshold: %v", err)) + return 1 + } + conf.LastContactThreshold = dur + } + if serverStabilizationTimeRaw != "" { + dur, err := time.ParseDuration(serverStabilizationTimeRaw) + if err != nil { + c.Ui.Error(fmt.Sprintf("invalid value for server-stabilization-time: %v", err)) + } + conf.ServerStabilizationTime = dur + } // Check-and-set the new configuration. result, err := operator.AutopilotCASConfiguration(conf, nil) diff --git a/consul/autopilot.go b/consul/autopilot.go new file mode 100644 index 000000000..ed09d32f5 --- /dev/null +++ b/consul/autopilot.go @@ -0,0 +1,286 @@ +package consul + +import ( + "fmt" + "strconv" + "time" + + "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/raft" + "github.com/hashicorp/serf/serf" +) + +func (s *Server) startAutopilot() { + s.autopilotShutdownCh = make(chan struct{}) + + go s.serverHealthLoop() + go s.removeDeadLoop() +} + +func (s *Server) stopAutopilot() { + close(s.autopilotShutdownCh) +} + +// serverHealthLoop monitors the health of the servers in the cluster +func (s *Server) serverHealthLoop() { + // Monitor server health until shutdown + ticker := time.NewTicker(s.config.ServerHealthInterval) + for { + select { + case <-s.autopilotShutdownCh: + ticker.Stop() + return + case <-ticker.C: + serverHealths := make(map[string]*structs.ServerHealth) + + state := s.fsm.State() + _, autopilotConf, err := state.AutopilotConfig() + if err != nil { + s.logger.Printf("[ERR] consul: error retrieving autopilot config: %s", err) + } + + // Build an updated map of server healths + for _, member := range s.LANMembers() { + if member.Status == serf.StatusLeft { + continue + } + + valid, parts := agent.IsConsulServer(member) + if valid { + health := s.queryServerHealth(member, parts, autopilotConf) + serverHealths[parts.Addr.String()] = health + } + } + + s.autopilotLock.Lock() + s.autopilotHealth = serverHealths + s.autopilotLock.Unlock() + + if err := s.promoteNonVoters(autopilotConf); err != nil { + s.logger.Printf("[ERR] consul: error checking for non-voters to promote: %s", err) + } + } + } +} + +// removeDeadLoop checks for dead servers periodically, or when receiving on autopilotRemoveDeadCh +func (s *Server) removeDeadLoop() { + ticker := time.NewTicker(s.config.RemoveDeadInterval) + for { + select { + case <-s.autopilotShutdownCh: + ticker.Stop() + return + case <-ticker.C: + if err := s.pruneDeadServers(); err != nil { + s.logger.Printf("[ERR] consul: error checking for dead servers to remove: %s", err) + } + case <-s.autopilotRemoveDeadCh: + if err := s.pruneDeadServers(); err != nil { + s.logger.Printf("[ERR] consul: error checking for dead servers to remove: %s", err) + } + } + } +} + +// pruneDeadServers removes up to numPeers/2 failed servers +func (s *Server) pruneDeadServers() error { + state := s.fsm.State() + _, autopilotConf, err := state.AutopilotConfig() + if err != nil { + return err + } + + // Look for dead servers to clean up + peers, err := s.numPeers() + if err != nil { + return err + } + + removed := 0 + if autopilotConf.CleanupDeadServers { + for _, member := range s.serfLAN.Members() { + // Exit early if we already removed the max amount of servers + if removed == peers/2 { + break + } + + valid, _ := agent.IsConsulServer(member) + if valid && member.Status == serf.StatusFailed { + removed++ + s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", member.Name) + go s.serfLAN.RemoveFailedNode(member.Name) + } + } + } + + return nil +} + +// promoteNonVoters promotes eligible non-voting servers to voters. +func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error { + minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers()) + if err != nil { + return fmt.Errorf("error getting server raft protocol versions: %s", err) + } + + if minRaftProtocol >= 3 { + future := s.raft.GetConfiguration() + if err := future.Error(); err != nil { + return fmt.Errorf("failed to get raft configuration: %v", err) + } + + var promotions []raft.Server + raftServers := future.Configuration().Servers + voterCount := 0 + for _, server := range raftServers { + // If this server has been stable and passing for long enough, promote it to a voter + if server.Suffrage == raft.Nonvoter { + health := s.getServerHealth(string(server.Address)) + if health != nil && health.Healthy && time.Now().Sub(health.StableSince) >= autopilotConf.ServerStabilizationTime { + promotions = append(promotions, server) + } + } else { + voterCount++ + } + } + + // Exit early if there's nothing to promote + if len(promotions) == 0 { + return nil + } + + // If there's currently an even number of servers, we can promote the first server in the list + // to get to an odd-sized quorum + newServers := false + if voterCount%2 == 0 { + addFuture := s.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0) + if err := addFuture.Error(); err != nil { + return fmt.Errorf("failed to add raft peer: %v", err) + } + promotions = promotions[1:] + newServers = true + } + + // Promote remaining servers in twos to maintain an odd quorum size + for i := 0; i < len(promotions)-1; i += 2 { + addFirst := s.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0) + if err := addFirst.Error(); err != nil { + return fmt.Errorf("failed to add raft peer: %v", err) + } + addSecond := s.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0) + if err := addSecond.Error(); err != nil { + return fmt.Errorf("failed to add raft peer: %v", err) + } + newServers = true + } + + // If we added a new server, trigger a check to remove dead servers + if newServers { + go func() { + s.autopilotRemoveDeadCh <- struct{}{} + }() + } + } + + return nil +} + +// queryServerHealth fetches the raft stats for the given server and uses them +// to update its ServerHealth +func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, autopilotConf *structs.AutopilotConfig) *structs.ServerHealth { + health := &structs.ServerHealth{ + ID: server.ID, + Name: server.Name, + SerfStatusRaw: member.Status, + SerfStatus: member.Status.String(), + LastContactRaw: -1, + LastContact: "never", + } + + stats, err := s.getServerStats(server) + if err != nil { + s.logger.Printf("[DEBUG] consul: error getting server's raft stats: %s", err) + } + + if v, ok := stats["last_contact"]; ok && v != "never" { + health.LastContactRaw, err = time.ParseDuration(v) + if err != nil { + s.logger.Printf("[DEBUG] consul: error parsing server's last_contact value: %s", err) + } + health.LastContact = health.LastContactRaw.String() + } + // Set LastContact to 0 if we're the leader + if s.config.NodeName == member.Name { + health.LastContactRaw = 0 + health.LastContact = "leader" + } + + if v, ok := stats["last_log_index"]; ok { + health.LastIndex, err = strconv.ParseUint(v, 10, 64) + if err != nil { + s.logger.Printf("[DEBUG] consul: error parsing server's last_log_index value: %s", err) + } + } + + if v, ok := stats["last_log_term"]; ok { + health.LastTerm, err = strconv.ParseUint(v, 10, 64) + if err != nil { + s.logger.Printf("[DEBUG] consul: error parsing server's last_log_term value: %s", err) + } + } + + health.Healthy = s.isServerHealthy(health, autopilotConf) + + // If this is a new server or the health changed, reset StableSince + lastHealth := s.getServerHealth(server.Addr.String()) + if lastHealth == nil || lastHealth.Healthy != health.Healthy { + health.StableSince = time.Now() + } else { + health.StableSince = lastHealth.StableSince + } + + return health +} + +func (s *Server) getServerHealth(addr string) *structs.ServerHealth { + s.autopilotLock.RLock() + defer s.autopilotLock.RUnlock() + h, ok := s.autopilotHealth[addr] + if !ok { + return nil + } + return h +} + +func (s *Server) getServerStats(server *agent.Server) (map[string]string, error) { + var args struct{} + var reply map[string]string + err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Operator.RaftStats", &args, &reply) + return reply, err +} + +// isServerHealthy determines whether the given ServerHealth is healthy +// based on the current Autopilot config +func (s *Server) isServerHealthy(health *structs.ServerHealth, autopilotConf *structs.AutopilotConfig) bool { + if health.SerfStatusRaw != serf.StatusAlive { + return false + } + + if health.LastContactRaw > autopilotConf.LastContactThreshold || health.LastContactRaw < 0 { + return false + } + + lastTerm, _ := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64) + if health.LastTerm != lastTerm { + return false + } + + if s.raft.LastIndex() > autopilotConf.MaxTrailingLogs && + health.LastIndex < s.raft.LastIndex()-autopilotConf.MaxTrailingLogs { + return false + } + + return true +} diff --git a/consul/autopilot_test.go b/consul/autopilot_test.go new file mode 100644 index 000000000..ae38f3f3e --- /dev/null +++ b/consul/autopilot_test.go @@ -0,0 +1,235 @@ +package consul + +import ( + "fmt" + "os" + "testing" + "time" + + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/raft" + "github.com/hashicorp/serf/serf" +) + +func TestAutopilot_CleanupDeadServer(t *testing.T) { + dir1, s1 := testServerDCBootstrap(t, "dc1", true) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + servers := []*Server{s1, s2, s3} + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.numPeers() + return peers == 3, nil + }, func(err error) { + t.Fatalf("should have 3 peers") + }) + } + + // Kill a non-leader server + s2.Shutdown() + + testutil.WaitForResult(func() (bool, error) { + alive := 0 + for _, m := range s1.LANMembers() { + if m.Status == serf.StatusAlive { + alive++ + } + } + return alive == 2, nil + }, func(err error) { + t.Fatalf("should have 2 alive members") + }) + + // Bring up and join a new server + dir4, s4 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir4) + defer s4.Shutdown() + + if _, err := s4.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + servers[1] = s4 + + // Make sure the dead server is removed and we're back to 3 total peers + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.numPeers() + return peers == 3, nil + }, func(err error) { + t.Fatalf("should have 3 peers") + }) + } +} + +func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = true + c.RemoveDeadInterval = 100 * time.Millisecond + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + conf := func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = false + } + dir2, s2 := testServerWithConfig(t, conf) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerWithConfig(t, conf) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + servers := []*Server{s1, s2, s3} + + // Join the servers to s1 + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.numPeers() + return peers == 3, nil + }, func(err error) { + t.Fatalf("should have 3 peers") + }) + } + + // Kill a non-leader server + s3.Shutdown() + + // Should be removed from the peers automatically + for _, s := range []*Server{s1, s2} { + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.numPeers() + return peers == 2, nil + }, func(err error) { + t.Fatalf("should have 2 peers") + }) + } +} + +func TestAutopilot_PromoteNonVoter(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = true + c.RaftConfig.ProtocolVersion = 3 + c.AutopilotConfig.ServerStabilizationTime = 200 * time.Millisecond + c.ServerHealthInterval = 100 * time.Millisecond + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = false + c.RaftConfig.ProtocolVersion = 3 + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Wait for the new server to be added as a non-voter, but make sure + // it doesn't get promoted to a voter even after ServerStabilizationTime, + // because that would result in an even-numbered quorum count. + testutil.WaitForResult(func() (bool, error) { + future := s1.raft.GetConfiguration() + if err := future.Error(); err != nil { + return false, err + } + + servers := future.Configuration().Servers + + if len(servers) != 2 { + return false, fmt.Errorf("bad: %v", servers) + } + if servers[1].Suffrage != raft.Nonvoter { + return false, fmt.Errorf("bad: %v", servers) + } + health := s1.getServerHealth(string(servers[1].Address)) + if health == nil { + return false, fmt.Errorf("nil health") + } + if !health.Healthy { + return false, fmt.Errorf("bad: %v", health) + } + if time.Now().Sub(health.StableSince) < s1.config.AutopilotConfig.ServerStabilizationTime { + return false, fmt.Errorf("stable period not elapsed") + } + + return true, nil + }, func(err error) { + t.Fatal(err) + }) + + // Now add another server and make sure they both get promoted to voters after stabilization + dir3, s3 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = false + c.RaftConfig.ProtocolVersion = 3 + }) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForResult(func() (bool, error) { + future := s1.raft.GetConfiguration() + if err := future.Error(); err != nil { + return false, err + } + + servers := future.Configuration().Servers + + if len(servers) != 3 { + return false, fmt.Errorf("bad: %v", servers) + } + if servers[1].Suffrage != raft.Voter { + return false, fmt.Errorf("bad: %v", servers) + } + if servers[2].Suffrage != raft.Voter { + return false, fmt.Errorf("bad: %v", servers) + } + + return true, nil + }, func(err error) { + t.Fatal(err) + }) +} diff --git a/consul/config.go b/consul/config.go index 7502cbdb8..0d758a322 100644 --- a/consul/config.go +++ b/consul/config.go @@ -279,6 +279,14 @@ type Config struct { // AutopilotConfig is used to apply the initial autopilot config when // bootstrapping. AutopilotConfig *structs.AutopilotConfig + + // ServerHealthInterval is the frequency with which the leader will check + // the health of the servers in the cluster + ServerHealthInterval time.Duration + + // RemoveDeadInterval is the frequency with which the leader will look for + // dead servers to remove from the cluster + RemoveDeadInterval time.Duration } // CheckVersion is used to check if the ProtocolVersion is valid @@ -353,8 +361,13 @@ func DefaultConfig() *Config { TLSMinVersion: "tls10", AutopilotConfig: &structs.AutopilotConfig{ - CleanupDeadServers: true, + CleanupDeadServers: true, + LastContactThreshold: 200 * time.Millisecond, + MaxTrailingLogs: 250, + ServerStabilizationTime: 10 * time.Second, }, + ServerHealthInterval: 1 * time.Second, + RemoveDeadInterval: 30 * time.Second, } // Increase our reap interval to 3 days instead of 24h. diff --git a/consul/leader.go b/consul/leader.go index 64e1671aa..cd09dcba4 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -159,6 +159,8 @@ func (s *Server) establishLeadership() error { return err } + s.startAutopilot() + return nil } @@ -174,6 +176,9 @@ func (s *Server) revokeLeadership() error { s.logger.Printf("[ERR] consul: Clearing session timers failed: %v", err) return err } + + s.stopAutopilot() + return nil } @@ -597,13 +602,20 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { return err } - if minRaftProtocol >= 2 && parts.RaftVersion >= 3 { + switch { + case minRaftProtocol >= 3: + addFuture := s.raft.AddNonvoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0) + if err := addFuture.Error(); err != nil { + s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) + return err + } + case minRaftProtocol == 2 && parts.RaftVersion >= 3: addFuture := s.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0) if err := addFuture.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) return err } - } else { + default: addFuture := s.raft.AddPeer(raft.ServerAddress(addr)) if err := addFuture.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) @@ -611,22 +623,10 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { } } - state := s.fsm.State() - _, autopilotConf, err := state.AutopilotConfig() - if err != nil { - return err - } - - // Look for dead servers to clean up - if autopilotConf.CleanupDeadServers { - for _, member := range s.serfLAN.Members() { - valid, _ := agent.IsConsulServer(member) - if valid && member.Name != m.Name && member.Status == serf.StatusFailed { - s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", member.Name) - go s.serfLAN.RemoveFailedNode(member.Name) - } - } - } + // Trigger a check to remove dead servers + go func() { + s.autopilotRemoveDeadCh <- struct{}{} + }() return nil } diff --git a/consul/leader_test.go b/consul/leader_test.go index 7549e19ad..b48e462de 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -623,76 +623,6 @@ func TestLeader_ReapTombstones(t *testing.T) { }) } -func TestLeader_CleanupDeadServers(t *testing.T) { - dir1, s1 := testServerDCBootstrap(t, "dc1", true) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - dir2, s2 := testServerDCBootstrap(t, "dc1", false) - defer os.RemoveAll(dir2) - defer s2.Shutdown() - - dir3, s3 := testServerDCBootstrap(t, "dc1", false) - defer os.RemoveAll(dir3) - defer s3.Shutdown() - - servers := []*Server{s1, s2, s3} - - // Try to join - addr := fmt.Sprintf("127.0.0.1:%d", - s1.config.SerfLANConfig.MemberlistConfig.BindPort) - if _, err := s2.JoinLAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - if _, err := s3.JoinLAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - - for _, s := range servers { - testutil.WaitForResult(func() (bool, error) { - peers, _ := s.numPeers() - return peers == 3, nil - }, func(err error) { - t.Fatalf("should have 3 peers") - }) - } - - // Kill a non-leader server - s2.Shutdown() - - testutil.WaitForResult(func() (bool, error) { - alive := 0 - for _, m := range s1.LANMembers() { - if m.Status == serf.StatusAlive { - alive++ - } - } - return alive == 2, nil - }, func(err error) { - t.Fatalf("should have 2 alive members") - }) - - // Bring up and join a new server - dir4, s4 := testServerDCBootstrap(t, "dc1", false) - defer os.RemoveAll(dir4) - defer s4.Shutdown() - - if _, err := s4.JoinLAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - servers[1] = s4 - - // Make sure the dead server is removed and we're back to 3 total peers - for _, s := range servers { - testutil.WaitForResult(func() (bool, error) { - peers, _ := s.numPeers() - return peers == 3, nil - }, func(err error) { - t.Fatalf("should have 3 peers") - }) - } -} - func TestLeader_RollRaftServer(t *testing.T) { dir1, s1 := testServerWithConfig(t, func(c *Config) { c.Bootstrap = true diff --git a/consul/operator_endpoint.go b/consul/operator_endpoint.go index 61d6d25f3..e9f6272ed 100644 --- a/consul/operator_endpoint.go +++ b/consul/operator_endpoint.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" + "time" ) // Operator endpoint is used to perform low-level operator tasks for Consul. @@ -183,3 +184,63 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe } return nil } + +// Used by Autopilot to query the raft stats of the local server. +func (op *Operator) RaftStats(args struct{}, reply *map[string]string) error { + *reply = op.srv.raft.Stats() + return nil +} + +// ServerHealth is used to get the current health of the servers. +func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs.OperatorHealthReply) error { + // This must be sent to the leader, so we fix the args since we are + // re-using a structure where we don't support all the options. + args.RequireConsistent = true + args.AllowStale = false + if done, err := op.srv.forward("Operator.ServerHealth", args, args, reply); done { + return err + } + + // This action requires operator read access. + acl, err := op.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && !acl.OperatorRead() { + return permissionDeniedErr + } + + status := structs.OperatorHealthReply{ + Healthy: true, + } + future := op.srv.raft.GetConfiguration() + if err := future.Error(); err != nil { + return err + } + + healthyCount := 0 + servers := future.Configuration().Servers + for _, s := range servers { + health := op.srv.getServerHealth(string(s.Address)) + if health != nil { + // Fix up StableSince to be more readable + health.StableSince = health.StableSince.Round(time.Second).UTC() + + if !health.Healthy { + status.Healthy = false + } else { + healthyCount++ + } + status.Servers = append(status.Servers, *health) + } + } + + // If we have extra healthy servers, set FailureTolerance + if healthyCount > len(servers)/2+1 { + status.FailureTolerance = healthyCount - (len(servers)/2 + 1) + } + + *reply = status + + return nil +} diff --git a/consul/operator_endpoint_test.go b/consul/operator_endpoint_test.go index 173bd9299..a762ba6b8 100644 --- a/consul/operator_endpoint_test.go +++ b/consul/operator_endpoint_test.go @@ -426,3 +426,79 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) { t.Fatalf("bad: %#v", config) } } + +func TestOperator_ServerHealth(t *testing.T) { + for i := 1; i <= 3; i++ { + testServerHealth(t, i) + } +} + +func testServerHealth(t *testing.T, protocol int) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = true + c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(protocol) + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = false + c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(protocol) + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + dir3, s3 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = false + c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(protocol) + }) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + testutil.WaitForResult(func() (bool, error) { + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var reply structs.OperatorHealthReply + err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply) + if err != nil { + return false, fmt.Errorf("err: %v", err) + } + if !reply.Healthy { + return false, fmt.Errorf("bad: %v", reply) + } + if reply.FailureTolerance != 1 { + return false, fmt.Errorf("bad: %v", reply) + } + if len(reply.Servers) != 3 { + return false, fmt.Errorf("bad: %v", reply) + } + if reply.Servers[0].LastContact != "leader" { + return false, fmt.Errorf("bad: %v", reply) + } + if reply.Servers[1].LastContactRaw <= 0 { + return false, fmt.Errorf("bad: %v", reply) + } + if reply.Servers[2].LastContactRaw <= 0 { + return false, fmt.Errorf("bad: %v", reply) + } + return true, nil + }, func(err error) { + t.Fatal(err) + }) +} diff --git a/consul/server.go b/consul/server.go index 410910bd5..f14d30053 100644 --- a/consul/server.go +++ b/consul/server.go @@ -76,6 +76,12 @@ type Server struct { // aclCache is the non-authoritative ACL cache. aclCache *aclCache + // autopilot + autopilotHealth map[string]*structs.ServerHealth + autopilotLock sync.RWMutex + autopilotShutdownCh chan struct{} + autopilotRemoveDeadCh chan struct{} + // Consul configuration config *Config @@ -222,18 +228,20 @@ func NewServer(config *Config) (*Server, error) { // Create server. s := &Server{ - config: config, - connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap), - eventChLAN: make(chan serf.Event, 256), - eventChWAN: make(chan serf.Event, 256), - localConsuls: make(map[raft.ServerAddress]*agent.Server), - logger: logger, - reconcileCh: make(chan serf.Member, 32), - remoteConsuls: make(map[string][]*agent.Server, 4), - rpcServer: rpc.NewServer(), - rpcTLS: incomingTLS, - tombstoneGC: gc, - shutdownCh: make(chan struct{}), + autopilotRemoveDeadCh: make(chan struct{}), + autopilotShutdownCh: make(chan struct{}), + config: config, + connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap), + eventChLAN: make(chan serf.Event, 256), + eventChWAN: make(chan serf.Event, 256), + localConsuls: make(map[raft.ServerAddress]*agent.Server), + logger: logger, + reconcileCh: make(chan serf.Member, 32), + remoteConsuls: make(map[string][]*agent.Server, 4), + rpcServer: rpc.NewServer(), + rpcTLS: incomingTLS, + tombstoneGC: gc, + shutdownCh: make(chan struct{}), } // Initialize the authoritative ACL cache. diff --git a/consul/structs/operator.go b/consul/structs/operator.go index b6a961bb3..d13c5d904 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -1,15 +1,31 @@ package structs import ( + "time" + "github.com/hashicorp/raft" + "github.com/hashicorp/serf/serf" ) type AutopilotConfig struct { // CleanupDeadServers controls whether to remove dead servers when a new - // server is added to the Raft peers + // server is added to the Raft peers. CleanupDeadServers bool - // RaftIndex stores the create/modify indexes of this configuration + // LastContactThreshold is the limit on the amount of time a server can go + // without leader contact before being considered unhealthy. + LastContactThreshold time.Duration + + // MaxTrailingLogs is the amount of entries in the Raft Log that a server can + // be behind before being considered unhealthy. + MaxTrailingLogs uint64 + + // ServerStabilizationTime is the minimum amount of time a server must be + // in a stable, healthy state before it can be added to the cluster. Only + // applicable with Raft protocol version 3 or higher. + ServerStabilizationTime time.Duration + + // RaftIndex stores the create/modify indexes of this configuration. RaftIndex } @@ -85,3 +101,46 @@ type AutopilotSetConfigRequest struct { func (op *AutopilotSetConfigRequest) RequestDatacenter() string { return op.Datacenter } + +// ServerHealth is the health (from the leader's point of view) of a server. +type ServerHealth struct { + // ID is the raft ID of the server. + ID string + + // Name is the node name of the server. + Name string + + // The status of the SerfHealth check for the server. + SerfStatusRaw serf.MemberStatus `json:"-"` + SerfStatus string + + // LastContact is the time since this node's last contact with the leader. + LastContactRaw time.Duration `json:"-"` + LastContact string + + // LastTerm is the highest leader term this server has a record of in its Raft log. + LastTerm uint64 + + // LastIndex is the last log index this server has a record of in its Raft log. + LastIndex uint64 + + // Healthy is whether or not the server is healthy according to the current + // Autopilot config. + Healthy bool + + // StableSince is the amount of time since this server's Healthy value last changed. + StableSince time.Time +} + +// OperatorHealthReply is a representation of the overall health of the cluster +type OperatorHealthReply struct { + // Healthy is true if all the servers in the cluster are healthy. + Healthy bool + + // FailureTolerance is the number of healthy servers that could be lost without + // an outage occurring. + FailureTolerance int + + // Servers holds the health of each server. + Servers []ServerHealth +} diff --git a/website/source/docs/agent/http/operator.html.markdown b/website/source/docs/agent/http/operator.html.markdown index 3d28aa3ee..d9aac210f 100644 --- a/website/source/docs/agent/http/operator.html.markdown +++ b/website/source/docs/agent/http/operator.html.markdown @@ -29,6 +29,7 @@ The following endpoints are supported: * [`/v1/operator/raft/peer`](#raft-peer): Operates on Raft peers * [`/v1/operator/keyring`](#keyring): Operates on gossip keyring * [`/v1/operator/autopilot/configuration`](#autopilot-configuration): Operates on the Autopilot configuration +* [`/v1/operator/autopilot/health`](#autopilot-health): Returns the health of the servers Not all endpoints support blocking queries and all consistency modes, see details in the sections below. @@ -288,13 +289,16 @@ A JSON body is returned that looks like this: ```javascript { "CleanupDeadServers": true, + "LastContactThreshold": 200000000, + "MaxTrailingLogs": 250, + "ServerStabilizationTime": 10000000000, "CreateIndex": 4, "ModifyIndex": 4 } ``` -`CleanupDeadServers` is whether dead servers should be removed automatically when -a new server is added to the cluster. +For more information about the Autopilot configuration options, see the agent configuration section +[here](/docs/agent/options.html#autopilot). #### PUT Method @@ -313,11 +317,89 @@ body must look like: ```javascript { - "CleanupDeadServers": true + "CleanupDeadServers": true, + "LastContactThreshold": 200000000, + "MaxTrailingLogs": 250, + "ServerStabilizationTime": 10000000000, + "CreateIndex": 4, + "ModifyIndex": 4 } ``` -`CleanupDeadServers` is whether dead servers should be removed automatically when -a new server is added to the cluster. +For more information about the Autopilot configuration options, see the agent configuration section +[here](/docs/agent/options.html#autopilot). The return code will indicate success or failure. + +### /v1/operator/autopilot/health + +Available in Consul 0.8.0 and later, the autopilot health endpoint supports the +`GET` method. + +This endpoint supports the use of ACL tokens using either the `X-CONSUL-TOKEN` +header or the `?token=` query parameter. + +By default, the datacenter of the agent is queried; however, the `dc` can be +provided using the `?dc=` query parameter. + +#### GET Method + +When using the `GET` method, the request will be forwarded to the cluster +leader to retrieve its latest Autopilot configuration. + +If ACLs are enabled, the client will need to supply an ACL Token with +[`operator`](/docs/internals/acl.html#operator) read privileges. + +A JSON body is returned that looks like this: + +```javascript +{ + "Healthy": true, + "FailureTolerance": 0, + "Servers": [ + { + "ID": "e349749b-3303-3ddf-959c-b5885a0e1f6e", + "Name": "node1", + "SerfStatus": "alive", + "LastContact": "leader", + "LastTerm": 2, + "LastIndex": 46, + "Healthy": true, + "StableSince": "2017-03-06T22:07:51Z" + }, + { + "ID": "e36ee410-cc3c-0a0c-c724-63817ab30303", + "Name": "node2", + "SerfStatus": "alive", + "LastContact": "27.291304ms", + "LastTerm": 2, + "LastIndex": 46, + "Healthy": true, + "StableSince": "2017-03-06T22:18:26Z" + } + ] +} +``` + +`Healthy` is whether all the servers are currently heathly. + +`FailureTolerance` is the number of redundant healthy servers that could be fail +without causing an outage (this would be 2 in a healthy cluster of 5 servers). + +The `Servers` list holds detailed health information on each server: + +- `ID` is the Raft ID of the server. + +- `Name` is the node name of the server. + +- `SerfStatus` is the SerfHealth check status for the server. + +- `LastContact` is the time elapsed since this server's last contact with the leader. + +- `LastTerm` is the server's last known Raft leader term. + +- `LastIndex` is the index of the server's last committed Raft log entry. + +- `Healthy` is whether the server is healthy according to the current Autopilot configuration. + +- `StableSince` is the time this server has been in its current `Healthy` state. \ No newline at end of file diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index 0979fae26..a751cf3c1 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -311,6 +311,11 @@ will exit with an error at startup. use. This defaults to the latest version. This should be set only when [upgrading](/docs/upgrading.html). You can view the protocol versions supported by Consul by running `consul -v`. +* `-raft_protocol` - This controls the internal + version of the Raft consensus protocol used for server communications. This defaults to 2 but must + be set to 3 in order to gain access to Autopilot features, with the exception of + [`cleanup_dead_servers`](#cleanup_dead_servers). + * `-recursor` - Specifies the address of an upstream DNS server. This option may be provided multiple times, and is functionally equivalent to the [`recursors` configuration option](#recursors). @@ -556,14 +561,22 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass

The following sub-keys are available: - * `raft_protocol` - This controls the internal - version of the Raft consensus protocol used for server communications. This defaults to 2 but must - be set to 3 in order to gain access to other Autopilot features, with the exception of - [`cleanup_dead_servers`](#cleanup_dead_servers). - * `cleanup_dead_servers` - This controls the automatic removal of dead server nodes whenever a new server is added to the cluster. Defaults to `true`. + * `last_contact_threshold` - Controls + the maximum amount of time a server can go without contact from the leader before being considered unhealthy. + Must be a duration value such as `10s`. Defaults to `200ms`. + + * `max_trailing_threshold` - Controls + the maximum number of log entries that a server can trail the leader by before being considered unhealthy. Defaults + to 250. + + * `server_stabilization_time` - + Controls the minimum amount of time a server must be stable in the 'healthy' state before being added to the + cluster. Only takes effect if all servers are running Raft protocol version 3 or higher. Must be a duration value + such as `10s`. Defaults to `30s`. + * `bootstrap` Equivalent to the [`-bootstrap` command-line flag](#_bootstrap). @@ -769,6 +782,9 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass * `protocol` Equivalent to the [`-protocol` command-line flag](#_protocol). +* `raft_protocol` Equivalent to the + [`-raft-protocol` command-line flag](#_raft_protocol). + * `reap` This controls Consul's automatic reaping of child processes, which is useful if Consul is running as PID 1 in a Docker container. If this isn't specified, then Consul will automatically reap child processes if it detects it is running as PID 1. If this is set to true or false, then diff --git a/website/source/docs/commands/operator/autopilot.html.markdown.erb b/website/source/docs/commands/operator/autopilot.html.markdown.erb index 5968a84b6..a2e00c267 100644 --- a/website/source/docs/commands/operator/autopilot.html.markdown.erb +++ b/website/source/docs/commands/operator/autopilot.html.markdown.erb @@ -40,6 +40,9 @@ The output looks like this: ``` CleanupDeadServers = true +LastContactThreshold = 200ms +MaxTrailingLogs = 250 +ServerStabilizationTime = 10s ``` ## set-config @@ -58,6 +61,16 @@ Usage: `consul operator autopilot set-config [options]` * `-cleanup-dead-servers` - Specifies whether to enable automatic removal of dead servers upon the successful joining of new servers to the cluster. Must be one of `[true|false]`. +* `last-contact-threshold` - Controls the maximum amount of time a server can go without contact +from the leader before being considered unhealthy. Must be a duration value such as `10s`. + +* `max-trailing-logs` - Controls the maximum number of log entries that a server can trail +the leader by before being considered unhealthy. + +* `server-stabilization-time` - Controls the minimum amount of time a server must be stable in +the 'healthy' state before being added to the cluster. Only takes effect if all servers are +running Raft protocol version 3 or higher. Must be a duration value such as `10s`. + The output looks like this: ``` From a7de1e2a3bfab6c337f4e14f1bf125d6ad0c50dc Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Tue, 7 Mar 2017 13:58:06 -0800 Subject: [PATCH 2/5] Move RaftStats to Status endpoint --- command/agent/operator_endpoint.go | 5 + command/agent/operator_endpoint_test.go | 2 +- consul/autopilot.go | 173 ++++++++++++------------ consul/leader.go | 7 +- consul/operator_endpoint.go | 6 - consul/status_endpoint.go | 25 ++++ consul/structs/operator.go | 12 ++ 7 files changed, 131 insertions(+), 99 deletions(-) diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index 9701109f4..cb1ccd93a 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -242,5 +242,10 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re return nil, err } + // Reply with status 429 if something is unhealthy + if !reply.Healthy { + resp.WriteHeader(http.StatusTooManyRequests) + } + return reply, nil } diff --git a/command/agent/operator_endpoint_test.go b/command/agent/operator_endpoint_test.go index 172551b70..20b6614f5 100644 --- a/command/agent/operator_endpoint_test.go +++ b/command/agent/operator_endpoint_test.go @@ -8,8 +8,8 @@ import ( "strings" "testing" - "github.com/hashicorp/consul-enterprise/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "github.com/hashicorp/serf/serf" ) diff --git a/consul/autopilot.go b/consul/autopilot.go index ed09d32f5..1df9c9571 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -92,27 +92,30 @@ func (s *Server) pruneDeadServers() error { return err } - // Look for dead servers to clean up + // Find any failed servers + var failed []string + if autopilotConf.CleanupDeadServers { + for _, member := range s.serfLAN.Members() { + valid, _ := agent.IsConsulServer(member) + if valid && member.Status == serf.StatusFailed { + failed = append(failed, member.Name) + } + } + } + peers, err := s.numPeers() if err != nil { return err } - removed := 0 - if autopilotConf.CleanupDeadServers { - for _, member := range s.serfLAN.Members() { - // Exit early if we already removed the max amount of servers - if removed == peers/2 { - break - } - - valid, _ := agent.IsConsulServer(member) - if valid && member.Status == serf.StatusFailed { - removed++ - s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", member.Name) - go s.serfLAN.RemoveFailedNode(member.Name) - } + // Only do removals if a minority of servers will be affected + if len(failed) <= peers/2 { + for _, server := range failed { + s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", server) + go s.serfLAN.RemoveFailedNode(server) } + } else { + s.logger.Printf("[ERR] consul: Failed to remove dead servers: too many dead servers: %d/%d", len(failed), peers) } return nil @@ -125,62 +128,66 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error return fmt.Errorf("error getting server raft protocol versions: %s", err) } - if minRaftProtocol >= 3 { - future := s.raft.GetConfiguration() - if err := future.Error(); err != nil { - return fmt.Errorf("failed to get raft configuration: %v", err) - } + // If we don't meet the minimum version for non-voter features, bail early + if minRaftProtocol < 3 { + return nil + } - var promotions []raft.Server - raftServers := future.Configuration().Servers - voterCount := 0 - for _, server := range raftServers { - // If this server has been stable and passing for long enough, promote it to a voter - if server.Suffrage == raft.Nonvoter { - health := s.getServerHealth(string(server.Address)) - if health != nil && health.Healthy && time.Now().Sub(health.StableSince) >= autopilotConf.ServerStabilizationTime { - promotions = append(promotions, server) - } - } else { - voterCount++ + future := s.raft.GetConfiguration() + if err := future.Error(); err != nil { + return fmt.Errorf("failed to get raft configuration: %v", err) + } + + var promotions []raft.Server + raftServers := future.Configuration().Servers + voterCount := 0 + for _, server := range raftServers { + // If this server has been stable and passing for long enough, promote it to a voter + if server.Suffrage == raft.Nonvoter { + health := s.getServerHealth(string(server.Address)) + if health != nil && health.Healthy && time.Now().Sub(health.StableSince) >= autopilotConf.ServerStabilizationTime { + promotions = append(promotions, server) } + } else { + voterCount++ } + } - // Exit early if there's nothing to promote - if len(promotions) == 0 { - return nil + // Exit early if there's nothing to promote + if len(promotions) == 0 { + return nil + } + + // If there's currently an even number of servers, we can promote the first server in the list + // to get to an odd-sized quorum + newServers := false + if voterCount%2 == 0 { + addFuture := s.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0) + if err := addFuture.Error(); err != nil { + return fmt.Errorf("failed to add raft peer: %v", err) } + promotions = promotions[1:] + newServers = true + } - // If there's currently an even number of servers, we can promote the first server in the list - // to get to an odd-sized quorum - newServers := false - if voterCount%2 == 0 { - addFuture := s.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0) - if err := addFuture.Error(); err != nil { - return fmt.Errorf("failed to add raft peer: %v", err) - } - promotions = promotions[1:] - newServers = true + // Promote remaining servers in twos to maintain an odd quorum size + for i := 0; i < len(promotions)-1; i += 2 { + addFirst := s.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0) + if err := addFirst.Error(); err != nil { + return fmt.Errorf("failed to add raft peer: %v", err) } - - // Promote remaining servers in twos to maintain an odd quorum size - for i := 0; i < len(promotions)-1; i += 2 { - addFirst := s.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0) - if err := addFirst.Error(); err != nil { - return fmt.Errorf("failed to add raft peer: %v", err) - } - addSecond := s.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0) - if err := addSecond.Error(); err != nil { - return fmt.Errorf("failed to add raft peer: %v", err) - } - newServers = true + addSecond := s.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0) + if err := addSecond.Error(); err != nil { + return fmt.Errorf("failed to add raft peer: %v", err) } + newServers = true + } - // If we added a new server, trigger a check to remove dead servers - if newServers { - go func() { - s.autopilotRemoveDeadCh <- struct{}{} - }() + // If we added a new server, trigger a check to remove dead servers + if newServers { + select { + case s.autopilotRemoveDeadCh <- struct{}{}: + default: } } @@ -190,47 +197,35 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error // queryServerHealth fetches the raft stats for the given server and uses them // to update its ServerHealth func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, autopilotConf *structs.AutopilotConfig) *structs.ServerHealth { + stats, err := s.getServerStats(server) + if err != nil { + s.logger.Printf("[DEBUG] consul: error getting server's raft stats: %s", err) + } + health := &structs.ServerHealth{ ID: server.ID, Name: server.Name, SerfStatusRaw: member.Status, SerfStatus: member.Status.String(), LastContactRaw: -1, - LastContact: "never", + LastContact: stats.LastContact, + LastTerm: stats.LastTerm, + LastIndex: stats.LastIndex, } - stats, err := s.getServerStats(server) - if err != nil { - s.logger.Printf("[DEBUG] consul: error getting server's raft stats: %s", err) - } - - if v, ok := stats["last_contact"]; ok && v != "never" { - health.LastContactRaw, err = time.ParseDuration(v) + if health.LastContact != "never" { + health.LastContactRaw, err = time.ParseDuration(health.LastContact) if err != nil { s.logger.Printf("[DEBUG] consul: error parsing server's last_contact value: %s", err) } - health.LastContact = health.LastContactRaw.String() } - // Set LastContact to 0 if we're the leader + + // Set LastContact to 0 for the leader if s.config.NodeName == member.Name { health.LastContactRaw = 0 health.LastContact = "leader" } - if v, ok := stats["last_log_index"]; ok { - health.LastIndex, err = strconv.ParseUint(v, 10, 64) - if err != nil { - s.logger.Printf("[DEBUG] consul: error parsing server's last_log_index value: %s", err) - } - } - - if v, ok := stats["last_log_term"]; ok { - health.LastTerm, err = strconv.ParseUint(v, 10, 64) - if err != nil { - s.logger.Printf("[DEBUG] consul: error parsing server's last_log_term value: %s", err) - } - } - health.Healthy = s.isServerHealthy(health, autopilotConf) // If this is a new server or the health changed, reset StableSince @@ -254,10 +249,10 @@ func (s *Server) getServerHealth(addr string) *structs.ServerHealth { return h } -func (s *Server) getServerStats(server *agent.Server) (map[string]string, error) { +func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) { var args struct{} - var reply map[string]string - err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Operator.RaftStats", &args, &reply) + var reply structs.ServerStats + err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) return reply, err } diff --git a/consul/leader.go b/consul/leader.go index cd09dcba4..e2d519b16 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -624,9 +624,10 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { } // Trigger a check to remove dead servers - go func() { - s.autopilotRemoveDeadCh <- struct{}{} - }() + select { + case s.autopilotRemoveDeadCh <- struct{}{}: + default: + } return nil } diff --git a/consul/operator_endpoint.go b/consul/operator_endpoint.go index e9f6272ed..e30a1f434 100644 --- a/consul/operator_endpoint.go +++ b/consul/operator_endpoint.go @@ -185,12 +185,6 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe return nil } -// Used by Autopilot to query the raft stats of the local server. -func (op *Operator) RaftStats(args struct{}, reply *map[string]string) error { - *reply = op.srv.raft.Stats() - return nil -} - // ServerHealth is used to get the current health of the servers. func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs.OperatorHealthReply) error { // This must be sent to the leader, so we fix the args since we are diff --git a/consul/status_endpoint.go b/consul/status_endpoint.go index 2cac03a7f..8c6e3fcfb 100644 --- a/consul/status_endpoint.go +++ b/consul/status_endpoint.go @@ -1,5 +1,12 @@ package consul +import ( + "fmt" + "strconv" + + "github.com/hashicorp/consul/consul/structs" +) + // Status endpoint is used to check on server status type Status struct { server *Server @@ -33,3 +40,21 @@ func (s *Status) Peers(args struct{}, reply *[]string) error { } return nil } + +// Used by Autopilot to query the raft stats of the local server. +func (s *Status) RaftStats(args struct{}, reply *structs.ServerStats) error { + stats := s.server.raft.Stats() + + var err error + reply.LastContact = stats["last_contact"] + reply.LastIndex, err = strconv.ParseUint(stats["last_log_index"], 10, 64) + if err != nil { + return fmt.Errorf("error parsing server's last_log_index value: %s", err) + } + reply.LastTerm, err = strconv.ParseUint(stats["last_log_term"], 10, 64) + if err != nil { + return fmt.Errorf("error parsing server's last_log_term value: %s", err) + } + + return nil +} diff --git a/consul/structs/operator.go b/consul/structs/operator.go index d13c5d904..1babb4972 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -132,6 +132,18 @@ type ServerHealth struct { StableSince time.Time } +// ServerStats holds miscellaneous Raft metrics for a server +type ServerStats struct { + // LastContact is the time since this node's last contact with the leader. + LastContact string + + // LastTerm is the highest leader term this server has a record of in its Raft log. + LastTerm uint64 + + // LastIndex is the last log index this server has a record of in its Raft log. + LastIndex uint64 +} + // OperatorHealthReply is a representation of the overall health of the cluster type OperatorHealthReply struct { // Healthy is true if all the servers in the cluster are healthy. From a5cbee0e99e7da78804028cace47d449d6cf5735 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 8 Mar 2017 11:31:32 -0800 Subject: [PATCH 3/5] Add AutopilotPolicy interface and BasicAutopilot --- consul/autopilot.go | 49 +++++++++++++++++++++++++------------- consul/server.go | 16 +++++++++---- consul/structs/operator.go | 1 + 3 files changed, 46 insertions(+), 20 deletions(-) diff --git a/consul/autopilot.go b/consul/autopilot.go index 1df9c9571..f775448bd 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -11,6 +11,12 @@ import ( "github.com/hashicorp/serf/serf" ) +// AutopilotPolicy is the interface for the Autopilot mechanism +type AutopilotPolicy interface { + // PromoteNonVoters defines the handling of non-voting servers + PromoteNonVoters(*structs.AutopilotConfig) error +} + func (s *Server) startAutopilot() { s.autopilotShutdownCh = make(chan struct{}) @@ -48,8 +54,12 @@ func (s *Server) serverHealthLoop() { valid, parts := agent.IsConsulServer(member) if valid { - health := s.queryServerHealth(member, parts, autopilotConf) - serverHealths[parts.Addr.String()] = health + health, err := s.queryServerHealth(member, parts, autopilotConf) + if err != nil { + s.logger.Printf("[ERR] consul: error fetching server health: %s", err) + } else { + serverHealths[parts.Addr.String()] = health + } } } @@ -57,7 +67,7 @@ func (s *Server) serverHealthLoop() { s.autopilotHealth = serverHealths s.autopilotLock.Unlock() - if err := s.promoteNonVoters(autopilotConf); err != nil { + if err := s.autopilotPolicy.PromoteNonVoters(autopilotConf); err != nil { s.logger.Printf("[ERR] consul: error checking for non-voters to promote: %s", err) } } @@ -121,9 +131,15 @@ func (s *Server) pruneDeadServers() error { return nil } -// promoteNonVoters promotes eligible non-voting servers to voters. -func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error { - minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers()) +// BasicAutopilot defines a policy for promoting non-voting servers in a way +// that maintains an odd-numbered voter count. +type BasicAutopilot struct { + server *Server +} + +// PromoteNonVoters promotes eligible non-voting servers to voters. +func (b *BasicAutopilot) PromoteNonVoters(autopilotConf *structs.AutopilotConfig) error { + minRaftProtocol, err := ServerMinRaftProtocol(b.server.LANMembers()) if err != nil { return fmt.Errorf("error getting server raft protocol versions: %s", err) } @@ -133,7 +149,7 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error return nil } - future := s.raft.GetConfiguration() + future := b.server.raft.GetConfiguration() if err := future.Error(); err != nil { return fmt.Errorf("failed to get raft configuration: %v", err) } @@ -144,7 +160,7 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error for _, server := range raftServers { // If this server has been stable and passing for long enough, promote it to a voter if server.Suffrage == raft.Nonvoter { - health := s.getServerHealth(string(server.Address)) + health := b.server.getServerHealth(string(server.Address)) if health != nil && health.Healthy && time.Now().Sub(health.StableSince) >= autopilotConf.ServerStabilizationTime { promotions = append(promotions, server) } @@ -162,7 +178,7 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error // to get to an odd-sized quorum newServers := false if voterCount%2 == 0 { - addFuture := s.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0) + addFuture := b.server.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0) if err := addFuture.Error(); err != nil { return fmt.Errorf("failed to add raft peer: %v", err) } @@ -172,11 +188,11 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error // Promote remaining servers in twos to maintain an odd quorum size for i := 0; i < len(promotions)-1; i += 2 { - addFirst := s.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0) + addFirst := b.server.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0) if err := addFirst.Error(); err != nil { return fmt.Errorf("failed to add raft peer: %v", err) } - addSecond := s.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0) + addSecond := b.server.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0) if err := addSecond.Error(); err != nil { return fmt.Errorf("failed to add raft peer: %v", err) } @@ -186,7 +202,7 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error // If we added a new server, trigger a check to remove dead servers if newServers { select { - case s.autopilotRemoveDeadCh <- struct{}{}: + case b.server.autopilotRemoveDeadCh <- struct{}{}: default: } } @@ -196,10 +212,11 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error // queryServerHealth fetches the raft stats for the given server and uses them // to update its ServerHealth -func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, autopilotConf *structs.AutopilotConfig) *structs.ServerHealth { +func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, + autopilotConf *structs.AutopilotConfig) (*structs.ServerHealth, error) { stats, err := s.getServerStats(server) if err != nil { - s.logger.Printf("[DEBUG] consul: error getting server's raft stats: %s", err) + return nil, fmt.Errorf("error getting raft stats: %s", err) } health := &structs.ServerHealth{ @@ -216,7 +233,7 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, aut if health.LastContact != "never" { health.LastContactRaw, err = time.ParseDuration(health.LastContact) if err != nil { - s.logger.Printf("[DEBUG] consul: error parsing server's last_contact value: %s", err) + return nil, fmt.Errorf("error parsing last_contact duration: %s", err) } } @@ -236,7 +253,7 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, aut health.StableSince = lastHealth.StableSince } - return health + return health, nil } func (s *Server) getServerHealth(addr string) *structs.ServerHealth { diff --git a/consul/server.go b/consul/server.go index f14d30053..e46fedd9f 100644 --- a/consul/server.go +++ b/consul/server.go @@ -76,12 +76,19 @@ type Server struct { // aclCache is the non-authoritative ACL cache. aclCache *aclCache - // autopilot - autopilotHealth map[string]*structs.ServerHealth - autopilotLock sync.RWMutex - autopilotShutdownCh chan struct{} + // autopilotHealth stores the current view of server healths. + autopilotHealth map[string]*structs.ServerHealth + autopilotLock sync.RWMutex + + // autopilotPolicy controls the behavior of Autopilot for certain tasks. + autopilotPolicy AutopilotPolicy + + // autopilotRemoveDeadCh is used to trigger a check for dead server removals. autopilotRemoveDeadCh chan struct{} + // autopilotShutdownCh is used to stop the Autopilot loop. + autopilotShutdownCh chan struct{} + // Consul configuration config *Config @@ -243,6 +250,7 @@ func NewServer(config *Config) (*Server, error) { tombstoneGC: gc, shutdownCh: make(chan struct{}), } + s.autopilotPolicy = &BasicAutopilot{s} // Initialize the authoritative ACL cache. s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault) diff --git a/consul/structs/operator.go b/consul/structs/operator.go index 1babb4972..0ebeac8d3 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/serf/serf" ) +// AutopilotConfig holds the Autopilot configuration for a cluster. type AutopilotConfig struct { // CleanupDeadServers controls whether to remove dead servers when a new // server is added to the Raft peers. From 8130f9b1c12a9a561669bcccf109b813299b4107 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Thu, 9 Mar 2017 16:43:07 -0800 Subject: [PATCH 4/5] Cleaned up and reorganized some autopilot-related code --- api/operator.go | 100 +++++++++- api/operator_test.go | 26 +++ command/agent/operator_endpoint.go | 76 +++++++- command/agent/operator_endpoint_test.go | 56 +++++- command/operator_autopilot_set.go | 37 ++-- command/operator_autopilot_set_test.go | 18 +- consul/autopilot.go | 183 ++++++++++-------- consul/autopilot_test.go | 5 +- consul/config.go | 15 +- consul/operator_endpoint.go | 22 ++- consul/operator_endpoint_test.go | 41 ++-- consul/server.go | 14 +- consul/structs/operator.go | 48 ++++- consul/structs/operator_test.go | 94 +++++++++ testutil/server.go | 1 + .../docs/agent/http/operator.html.markdown | 10 +- .../source/docs/agent/options.html.markdown | 5 +- .../operator/autopilot.html.markdown.erb | 2 +- 18 files changed, 581 insertions(+), 172 deletions(-) create mode 100644 consul/structs/operator_test.go diff --git a/api/operator.go b/api/operator.go index cb5516a63..2c9961c9d 100644 --- a/api/operator.go +++ b/api/operator.go @@ -82,7 +82,7 @@ type AutopilotConfiguration struct { // LastContactThreshold is the limit on the amount of time a server can go // without leader contact before being considered unhealthy. - LastContactThreshold time.Duration + LastContactThreshold *ReadableDuration // MaxTrailingLogs is the amount of entries in the Raft Log that a server can // be behind before being considered unhealthy. @@ -91,7 +91,7 @@ type AutopilotConfiguration struct { // ServerStabilizationTime is the minimum amount of time a server must be // in a stable, healthy state before it can be added to the cluster. Only // applicable with Raft protocol version 3 or higher. - ServerStabilizationTime time.Duration + ServerStabilizationTime *ReadableDuration // CreateIndex holds the index corresponding the creation of this configuration. // This is a read-only field. @@ -104,6 +104,84 @@ type AutopilotConfiguration struct { ModifyIndex uint64 } +// ServerHealth is the health (from the leader's point of view) of a server. +type ServerHealth struct { + // ID is the raft ID of the server. + ID string + + // Name is the node name of the server. + Name string + + // The status of the SerfHealth check for the server. + SerfStatus string + + // LastContact is the time since this node's last contact with the leader. + LastContact *ReadableDuration + + // LastTerm is the highest leader term this server has a record of in its Raft log. + LastTerm uint64 + + // LastIndex is the last log index this server has a record of in its Raft log. + LastIndex uint64 + + // Healthy is whether or not the server is healthy according to the current + // Autopilot config. + Healthy bool + + // StableSince is the last time this server's Healthy value changed. + StableSince time.Time +} + +// OperatorHealthReply is a representation of the overall health of the cluster +type OperatorHealthReply struct { + // Healthy is true if all the servers in the cluster are healthy. + Healthy bool + + // FailureTolerance is the number of healthy servers that could be lost without + // an outage occurring. + FailureTolerance int + + // Servers holds the health of each server. + Servers []ServerHealth +} + +// ReadableDuration is a duration type that is serialized to JSON in human readable format. +type ReadableDuration time.Duration + +func NewReadableDuration(dur time.Duration) *ReadableDuration { + d := ReadableDuration(dur) + return &d +} + +func (d *ReadableDuration) String() string { return d.Duration().String() } +func (d *ReadableDuration) Duration() time.Duration { + if d == nil { + return time.Duration(0) + } + return time.Duration(*d) +} + +func (d *ReadableDuration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, d.Duration().String())), nil +} + +func (d *ReadableDuration) UnmarshalJSON(raw []byte) error { + if d == nil { + return fmt.Errorf("cannot unmarshal to nil pointer") + } + + str := string(raw) + if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' { + return fmt.Errorf("must be enclosed with quotes: %s", str) + } + dur, err := time.ParseDuration(str[1 : len(str)-1]) + if err != nil { + return err + } + *d = ReadableDuration(dur) + return nil +} + // RaftGetConfiguration is used to query the current Raft peer set. func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) { r := op.c.newRequest("GET", "/v1/operator/raft/configuration") @@ -217,6 +295,7 @@ func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfig if err := decodeBody(resp, &out); err != nil { return nil, err } + return &out, nil } @@ -255,3 +334,20 @@ func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *W return res, nil } + +// AutopilotServerHealth +func (op *Operator) AutopilotServerHealth(q *QueryOptions) (*OperatorHealthReply, error) { + r := op.c.newRequest("GET", "/v1/operator/autopilot/health") + r.setQueryOptions(q) + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var out OperatorHealthReply + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + return &out, nil +} diff --git a/api/operator_test.go b/api/operator_test.go index 13ce53e61..38768c688 100644 --- a/api/operator_test.go +++ b/api/operator_test.go @@ -1,6 +1,7 @@ package api import ( + "fmt" "strings" "testing" @@ -178,3 +179,28 @@ func TestOperator_AutopilotCASConfiguration(t *testing.T) { } } } + +func TestOperator_ServerHealth(t *testing.T) { + t.Parallel() + c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) { + c.RaftProtocol = 3 + }) + defer s.Stop() + + operator := c.Operator() + testutil.WaitForResult(func() (bool, error) { + out, err := operator.AutopilotServerHealth(nil) + if err != nil { + return false, fmt.Errorf("err: %v", err) + } + if len(out.Servers) != 1 || + !out.Servers[0].Healthy || + out.Servers[0].Name != s.Config.NodeName { + return false, fmt.Errorf("bad: %v", out) + } + + return true, nil + }, func(err error) { + t.Fatal(err) + }) +} diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index cb1ccd93a..a04e801fc 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -4,10 +4,13 @@ import ( "fmt" "net/http" "strconv" + "time" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/consul/structs" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/raft" + "strings" ) // OperatorRaftConfiguration is used to inspect the current Raft configuration. @@ -183,12 +186,35 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re return nil, err } - return reply, nil + out := api.AutopilotConfiguration{ + CleanupDeadServers: reply.CleanupDeadServers, + LastContactThreshold: api.NewReadableDuration(reply.LastContactThreshold), + MaxTrailingLogs: reply.MaxTrailingLogs, + ServerStabilizationTime: api.NewReadableDuration(reply.ServerStabilizationTime), + CreateIndex: reply.CreateIndex, + ModifyIndex: reply.ModifyIndex, + } + + return out, nil case "PUT": var args structs.AutopilotSetConfigRequest s.parseDC(req, &args.Datacenter) s.parseToken(req, &args.Token) + var conf api.AutopilotConfiguration + if err := decodeBody(req, &conf, FixupConfigDurations); err != nil { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Error parsing autopilot config: %v", err))) + return nil, nil + } + + args.Config = structs.AutopilotConfig{ + CleanupDeadServers: conf.CleanupDeadServers, + LastContactThreshold: conf.LastContactThreshold.Duration(), + MaxTrailingLogs: conf.MaxTrailingLogs, + ServerStabilizationTime: conf.ServerStabilizationTime.Duration(), + } + // Check for cas value params := req.URL.Query() if _, ok := params["cas"]; ok { @@ -202,12 +228,6 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re args.CAS = true } - if err := decodeBody(req, &args.Config, nil); err != nil { - resp.WriteHeader(400) - resp.Write([]byte(fmt.Sprintf("Error parsing autopilot config: %v", err))) - return nil, nil - } - var reply bool if err := s.agent.RPC("Operator.AutopilotSetConfiguration", &args, &reply); err != nil { return nil, err @@ -225,6 +245,29 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re } } +// FixupConfigDurations is used to handle parsing the duration fields in +// the Autopilot config struct +func FixupConfigDurations(raw interface{}) error { + rawMap, ok := raw.(map[string]interface{}) + if !ok { + return nil + } + for key, val := range rawMap { + if strings.ToLower(key) == "lastcontactthreshold" || + strings.ToLower(key) == "serverstabilizationtime" { + // Convert a string value into an integer + if vStr, ok := val.(string); ok { + dur, err := time.ParseDuration(vStr) + if err != nil { + return err + } + rawMap[key] = dur + } + } + } + return nil +} + // OperatorServerHealth is used to get the health of the servers in the local DC func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Request) (interface{}, error) { if req.Method != "GET" { @@ -247,5 +290,22 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re resp.WriteHeader(http.StatusTooManyRequests) } - return reply, nil + out := &api.OperatorHealthReply{ + Healthy: reply.Healthy, + FailureTolerance: reply.FailureTolerance, + } + for _, server := range reply.Servers { + out.Servers = append(out.Servers, api.ServerHealth{ + ID: server.ID, + Name: server.Name, + SerfStatus: server.SerfStatus.String(), + LastContact: api.NewReadableDuration(server.LastContact), + LastTerm: server.LastTerm, + LastIndex: server.LastIndex, + Healthy: server.Healthy, + StableSince: server.StableSince.Round(time.Second).UTC(), + }) + } + + return out, nil } diff --git a/command/agent/operator_endpoint_test.go b/command/agent/operator_endpoint_test.go index 20b6614f5..c0ad9f9ad 100644 --- a/command/agent/operator_endpoint_test.go +++ b/command/agent/operator_endpoint_test.go @@ -7,10 +7,11 @@ import ( "net/http/httptest" "strings" "testing" + "time" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" - "github.com/hashicorp/serf/serf" ) func TestOperator_OperatorRaftConfiguration(t *testing.T) { @@ -304,7 +305,7 @@ func TestOperator_AutopilotGetConfiguration(t *testing.T) { if resp.Code != 200 { t.Fatalf("bad code: %d", resp.Code) } - out, ok := obj.(structs.AutopilotConfig) + out, ok := obj.(api.AutopilotConfiguration) if !ok { t.Fatalf("unexpected: %T", obj) } @@ -424,7 +425,10 @@ func TestOperator_AutopilotCASConfiguration(t *testing.T) { } func TestOperator_OperatorServerHealth(t *testing.T) { - httpTest(t, func(srv *HTTPServer) { + cb := func(c *Config) { + c.RaftProtocol = 3 + } + httpTestWithConfig(t, func(srv *HTTPServer) { body := bytes.NewBuffer(nil) req, err := http.NewRequest("GET", "/v1/operator/autopilot/health", body) if err != nil { @@ -440,14 +444,14 @@ func TestOperator_OperatorServerHealth(t *testing.T) { if resp.Code != 200 { return false, fmt.Errorf("bad code: %d", resp.Code) } - out, ok := obj.(structs.OperatorHealthReply) + out, ok := obj.(*api.OperatorHealthReply) if !ok { return false, fmt.Errorf("unexpected: %T", obj) } if len(out.Servers) != 1 || !out.Servers[0].Healthy || out.Servers[0].Name != srv.agent.config.NodeName || - out.Servers[0].SerfStatusRaw != serf.StatusAlive || + out.Servers[0].SerfStatus != "alive" || out.FailureTolerance != 0 { return false, fmt.Errorf("bad: %v", out) } @@ -457,5 +461,45 @@ func TestOperator_OperatorServerHealth(t *testing.T) { t.Fatal(err) }) - }) + }, cb) +} + +func TestOperator_OperatorServerHealth_Unhealthy(t *testing.T) { + threshold := time.Duration(-1) + cb := func(c *Config) { + c.RaftProtocol = 3 + c.Autopilot.LastContactThreshold = &threshold + } + httpTestWithConfig(t, func(srv *HTTPServer) { + body := bytes.NewBuffer(nil) + req, err := http.NewRequest("GET", "/v1/operator/autopilot/health", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForResult(func() (bool, error) { + resp := httptest.NewRecorder() + obj, err := srv.OperatorServerHealth(resp, req) + if err != nil { + return false, fmt.Errorf("err: %v", err) + } + if resp.Code != 429 { + return false, fmt.Errorf("bad code: %d", resp.Code) + } + out, ok := obj.(*api.OperatorHealthReply) + if !ok { + return false, fmt.Errorf("unexpected: %T", obj) + } + if len(out.Servers) != 1 || + out.Healthy || + out.Servers[0].Name != srv.agent.config.NodeName { + return false, fmt.Errorf("bad: %v", out) + } + + return true, nil + }, func(err error) { + t.Fatal(err) + }) + + }, cb) } diff --git a/command/operator_autopilot_set.go b/command/operator_autopilot_set.go index b4f4fe73d..3bd8dc859 100644 --- a/command/operator_autopilot_set.go +++ b/command/operator_autopilot_set.go @@ -3,10 +3,10 @@ package command import ( "flag" "fmt" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/command/base" "strings" "time" - - "github.com/hashicorp/consul/command/base" ) type OperatorAutopilotSetCommand struct { @@ -30,9 +30,9 @@ func (c *OperatorAutopilotSetCommand) Synopsis() string { func (c *OperatorAutopilotSetCommand) Run(args []string) int { var cleanupDeadServers base.BoolValue - var lastContactThresholdRaw string var maxTrailingLogs base.UintValue - var serverStabilizationTimeRaw string + var lastContactThreshold base.DurationValue + var serverStabilizationTime base.DurationValue f := c.Command.NewFlagSet(c) @@ -42,11 +42,11 @@ func (c *OperatorAutopilotSetCommand) Run(args []string) int { f.Var(&maxTrailingLogs, "max-trailing-logs", "Controls the maximum number of log entries that a server can trail the "+ "leader by before being considered unhealthy.") - f.StringVar(&lastContactThresholdRaw, "last-contact-threshold", "", + f.Var(&lastContactThreshold, "last-contact-threshold", "Controls the maximum amount of time a server can go without contact "+ "from the leader before being considered unhealthy. Must be a duration value "+ - "such as `10s`.") - f.StringVar(&serverStabilizationTimeRaw, "server-stabilization-time", "", + "such as `200ms`.") + f.Var(&serverStabilizationTime, "server-stabilization-time", "Controls the minimum amount of time a server must be stable in the "+ "'healthy' state before being added to the cluster. Only takes effect if all "+ "servers are running Raft protocol version 3 or higher. Must be a duration "+ @@ -77,25 +77,18 @@ func (c *OperatorAutopilotSetCommand) Run(args []string) int { // Update the config values based on the set flags. cleanupDeadServers.Merge(&conf.CleanupDeadServers) + trailing := uint(conf.MaxTrailingLogs) maxTrailingLogs.Merge(&trailing) conf.MaxTrailingLogs = uint64(trailing) - if lastContactThresholdRaw != "" { - dur, err := time.ParseDuration(lastContactThresholdRaw) - if err != nil { - c.Ui.Error(fmt.Sprintf("invalid value for last-contact-threshold: %v", err)) - return 1 - } - conf.LastContactThreshold = dur - } - if serverStabilizationTimeRaw != "" { - dur, err := time.ParseDuration(serverStabilizationTimeRaw) - if err != nil { - c.Ui.Error(fmt.Sprintf("invalid value for server-stabilization-time: %v", err)) - } - conf.ServerStabilizationTime = dur - } + last := time.Duration(*conf.LastContactThreshold) + lastContactThreshold.Merge(&last) + conf.LastContactThreshold = api.NewReadableDuration(last) + + stablization := time.Duration(*conf.ServerStabilizationTime) + serverStabilizationTime.Merge(&stablization) + conf.ServerStabilizationTime = api.NewReadableDuration(stablization) // Check-and-set the new configuration. result, err := operator.AutopilotCASConfiguration(conf, nil) diff --git a/command/operator_autopilot_set_test.go b/command/operator_autopilot_set_test.go index f4cab0b58..4b859aa0e 100644 --- a/command/operator_autopilot_set_test.go +++ b/command/operator_autopilot_set_test.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/consul/command/base" "github.com/hashicorp/consul/consul/structs" "github.com/mitchellh/cli" + "time" ) func TestOperator_Autopilot_Set_Implements(t *testing.T) { @@ -25,7 +26,13 @@ func TestOperator_Autopilot_Set(t *testing.T) { Flags: base.FlagSetHTTP, }, } - args := []string{"-http-addr=" + a1.httpAddr, "-cleanup-dead-servers=false"} + args := []string{ + "-http-addr=" + a1.httpAddr, + "-cleanup-dead-servers=false", + "-max-trailing-logs=99", + "-last-contact-threshold=123ms", + "-server-stabilization-time=123ms", + } code := c.Run(args) if code != 0 { @@ -47,4 +54,13 @@ func TestOperator_Autopilot_Set(t *testing.T) { if reply.CleanupDeadServers { t.Fatalf("bad: %#v", reply) } + if reply.MaxTrailingLogs != 99 { + t.Fatalf("bad: %#v", reply) + } + if reply.LastContactThreshold != 123*time.Millisecond { + t.Fatalf("bad: %#v", reply) + } + if reply.ServerStabilizationTime != 123*time.Millisecond { + t.Fatalf("bad: %#v", reply) + } } diff --git a/consul/autopilot.go b/consul/autopilot.go index f775448bd..1815985fc 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -3,6 +3,7 @@ package consul import ( "fmt" "strconv" + "sync" "time" "github.com/hashicorp/consul/consul/agent" @@ -19,70 +20,38 @@ type AutopilotPolicy interface { func (s *Server) startAutopilot() { s.autopilotShutdownCh = make(chan struct{}) + s.autopilotWaitGroup = sync.WaitGroup{} + s.autopilotWaitGroup.Add(1) - go s.serverHealthLoop() - go s.removeDeadLoop() + go s.autopilotLoop() } func (s *Server) stopAutopilot() { close(s.autopilotShutdownCh) + s.autopilotWaitGroup.Wait() } -// serverHealthLoop monitors the health of the servers in the cluster -func (s *Server) serverHealthLoop() { +// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove. +func (s *Server) autopilotLoop() { // Monitor server health until shutdown - ticker := time.NewTicker(s.config.ServerHealthInterval) + ticker := time.NewTicker(s.config.AutopilotInterval) for { select { case <-s.autopilotShutdownCh: ticker.Stop() + s.autopilotWaitGroup.Done() return case <-ticker.C: - serverHealths := make(map[string]*structs.ServerHealth) - state := s.fsm.State() _, autopilotConf, err := state.AutopilotConfig() if err != nil { s.logger.Printf("[ERR] consul: error retrieving autopilot config: %s", err) } - // Build an updated map of server healths - for _, member := range s.LANMembers() { - if member.Status == serf.StatusLeft { - continue - } - - valid, parts := agent.IsConsulServer(member) - if valid { - health, err := s.queryServerHealth(member, parts, autopilotConf) - if err != nil { - s.logger.Printf("[ERR] consul: error fetching server health: %s", err) - } else { - serverHealths[parts.Addr.String()] = health - } - } - } - - s.autopilotLock.Lock() - s.autopilotHealth = serverHealths - s.autopilotLock.Unlock() - if err := s.autopilotPolicy.PromoteNonVoters(autopilotConf); err != nil { s.logger.Printf("[ERR] consul: error checking for non-voters to promote: %s", err) } - } - } -} -// removeDeadLoop checks for dead servers periodically, or when receiving on autopilotRemoveDeadCh -func (s *Server) removeDeadLoop() { - ticker := time.NewTicker(s.config.RemoveDeadInterval) - for { - select { - case <-s.autopilotShutdownCh: - ticker.Stop() - return - case <-ticker.C: if err := s.pruneDeadServers(); err != nil { s.logger.Printf("[ERR] consul: error checking for dead servers to remove: %s", err) } @@ -113,13 +82,18 @@ func (s *Server) pruneDeadServers() error { } } + // Nothing to remove, return early + if len(failed) == 0 { + return nil + } + peers, err := s.numPeers() if err != nil { return err } // Only do removals if a minority of servers will be affected - if len(failed) <= peers/2 { + if len(failed) < peers/2 || (len(failed) == 1 && peers >= 3) { for _, server := range failed { s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", server) go s.serfLAN.RemoveFailedNode(server) @@ -160,8 +134,8 @@ func (b *BasicAutopilot) PromoteNonVoters(autopilotConf *structs.AutopilotConfig for _, server := range raftServers { // If this server has been stable and passing for long enough, promote it to a voter if server.Suffrage == raft.Nonvoter { - health := b.server.getServerHealth(string(server.Address)) - if health != nil && health.Healthy && time.Now().Sub(health.StableSince) >= autopilotConf.ServerStabilizationTime { + health := b.server.getServerHealth(string(server.ID)) + if health.IsStable(time.Now(), autopilotConf) { promotions = append(promotions, server) } } else { @@ -210,6 +184,68 @@ func (b *BasicAutopilot) PromoteNonVoters(autopilotConf *structs.AutopilotConfig return nil } +// serverHealthLoop monitors the health of the servers in the cluster +func (s *Server) serverHealthLoop() { + // Monitor server health until shutdown + ticker := time.NewTicker(s.config.ServerHealthInterval) + for { + select { + case <-s.shutdownCh: + ticker.Stop() + return + case <-ticker.C: + serverHealths := make(map[string]*structs.ServerHealth) + + // Don't do anything if the min Raft version is too low + minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers()) + if err != nil { + s.logger.Printf("[ERR] consul: error getting server raft protocol versions: %s", err) + break + } + if minRaftProtocol < 3 { + break + } + + state := s.fsm.State() + _, autopilotConf, err := state.AutopilotConfig() + if err != nil { + s.logger.Printf("[ERR] consul: error retrieving autopilot config: %s", err) + break + } + // Bail early if autopilot config hasn't been initialized yet + if autopilotConf == nil { + break + } + + // Build an updated map of server healths + for _, member := range s.LANMembers() { + if member.Status == serf.StatusLeft { + continue + } + + valid, parts := agent.IsConsulServer(member) + if valid { + health, err := s.queryServerHealth(member, parts, autopilotConf) + if err != nil { + s.logger.Printf("[ERR] consul: error fetching server health: %s", err) + serverHealths[parts.ID] = &structs.ServerHealth{ + ID: parts.ID, + Name: parts.Name, + Healthy: false, + } + } else { + serverHealths[parts.ID] = health + } + } + } + + s.serverHealthLock.Lock() + s.serverHealths = serverHealths + s.serverHealthLock.Unlock() + } + } +} + // queryServerHealth fetches the raft stats for the given server and uses them // to update its ServerHealth func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, @@ -220,18 +256,16 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, } health := &structs.ServerHealth{ - ID: server.ID, - Name: server.Name, - SerfStatusRaw: member.Status, - SerfStatus: member.Status.String(), - LastContactRaw: -1, - LastContact: stats.LastContact, - LastTerm: stats.LastTerm, - LastIndex: stats.LastIndex, + ID: server.ID, + Name: server.Name, + SerfStatus: member.Status, + LastContact: -1, + LastTerm: stats.LastTerm, + LastIndex: stats.LastIndex, } - if health.LastContact != "never" { - health.LastContactRaw, err = time.ParseDuration(health.LastContact) + if stats.LastContact != "never" { + health.LastContact, err = time.ParseDuration(stats.LastContact) if err != nil { return nil, fmt.Errorf("error parsing last_contact duration: %s", err) } @@ -239,14 +273,17 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, // Set LastContact to 0 for the leader if s.config.NodeName == member.Name { - health.LastContactRaw = 0 - health.LastContact = "leader" + health.LastContact = 0 } - health.Healthy = s.isServerHealthy(health, autopilotConf) + lastTerm, err := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64) + if err != nil { + return nil, fmt.Errorf("error parsing last_log_term: %s", err) + } + health.Healthy = health.IsHealthy(lastTerm, s.raft.LastIndex(), autopilotConf) // If this is a new server or the health changed, reset StableSince - lastHealth := s.getServerHealth(server.Addr.String()) + lastHealth := s.getServerHealth(server.ID) if lastHealth == nil || lastHealth.Healthy != health.Healthy { health.StableSince = time.Now() } else { @@ -256,10 +293,10 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, return health, nil } -func (s *Server) getServerHealth(addr string) *structs.ServerHealth { - s.autopilotLock.RLock() - defer s.autopilotLock.RUnlock() - h, ok := s.autopilotHealth[addr] +func (s *Server) getServerHealth(id string) *structs.ServerHealth { + s.serverHealthLock.RLock() + defer s.serverHealthLock.RUnlock() + h, ok := s.serverHealths[id] if !ok { return nil } @@ -272,27 +309,3 @@ func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, erro err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) return reply, err } - -// isServerHealthy determines whether the given ServerHealth is healthy -// based on the current Autopilot config -func (s *Server) isServerHealthy(health *structs.ServerHealth, autopilotConf *structs.AutopilotConfig) bool { - if health.SerfStatusRaw != serf.StatusAlive { - return false - } - - if health.LastContactRaw > autopilotConf.LastContactThreshold || health.LastContactRaw < 0 { - return false - } - - lastTerm, _ := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64) - if health.LastTerm != lastTerm { - return false - } - - if s.raft.LastIndex() > autopilotConf.MaxTrailingLogs && - health.LastIndex < s.raft.LastIndex()-autopilotConf.MaxTrailingLogs { - return false - } - - return true -} diff --git a/consul/autopilot_test.go b/consul/autopilot_test.go index ae38f3f3e..034197208 100644 --- a/consul/autopilot_test.go +++ b/consul/autopilot_test.go @@ -85,7 +85,7 @@ func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) { dir1, s1 := testServerWithConfig(t, func(c *Config) { c.Datacenter = "dc1" c.Bootstrap = true - c.RemoveDeadInterval = 100 * time.Millisecond + c.AutopilotInterval = 100 * time.Millisecond }) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -144,6 +144,7 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) { c.RaftConfig.ProtocolVersion = 3 c.AutopilotConfig.ServerStabilizationTime = 200 * time.Millisecond c.ServerHealthInterval = 100 * time.Millisecond + c.AutopilotInterval = 100 * time.Millisecond }) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -182,7 +183,7 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) { if servers[1].Suffrage != raft.Nonvoter { return false, fmt.Errorf("bad: %v", servers) } - health := s1.getServerHealth(string(servers[1].Address)) + health := s1.getServerHealth(string(servers[1].ID)) if health == nil { return false, fmt.Errorf("nil health") } diff --git a/consul/config.go b/consul/config.go index 0d758a322..f64567a03 100644 --- a/consul/config.go +++ b/consul/config.go @@ -280,13 +280,14 @@ type Config struct { // bootstrapping. AutopilotConfig *structs.AutopilotConfig - // ServerHealthInterval is the frequency with which the leader will check - // the health of the servers in the cluster + // ServerHealthInterval is the frequency with which the health of the + // servers in the cluster will be updated. ServerHealthInterval time.Duration - // RemoveDeadInterval is the frequency with which the leader will look for - // dead servers to remove from the cluster - RemoveDeadInterval time.Duration + // AutopilotInterval is the frequency with which the leader will perform + // autopilot tasks, such as promoting eligible non-voters and removing + // dead servers. + AutopilotInterval time.Duration } // CheckVersion is used to check if the ProtocolVersion is valid @@ -366,8 +367,8 @@ func DefaultConfig() *Config { MaxTrailingLogs: 250, ServerStabilizationTime: 10 * time.Second, }, - ServerHealthInterval: 1 * time.Second, - RemoveDeadInterval: 30 * time.Second, + ServerHealthInterval: 2 * time.Second, + AutopilotInterval: 10 * time.Second, } // Increase our reap interval to 3 days instead of 24h. diff --git a/consul/operator_endpoint.go b/consul/operator_endpoint.go index e30a1f434..16d8b75dc 100644 --- a/consul/operator_endpoint.go +++ b/consul/operator_endpoint.go @@ -8,7 +8,6 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" - "time" ) // Operator endpoint is used to perform low-level operator tasks for Consul. @@ -204,9 +203,16 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs return permissionDeniedErr } - status := structs.OperatorHealthReply{ - Healthy: true, + // Exit early if the min Raft version is too low + minRaftProtocol, err := ServerMinRaftProtocol(op.srv.LANMembers()) + if err != nil { + return fmt.Errorf("error getting server raft protocol versions: %s", err) } + if minRaftProtocol < 3 { + return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint") + } + + var status structs.OperatorHealthReply future := op.srv.raft.GetConfiguration() if err := future.Error(); err != nil { return err @@ -215,19 +221,15 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs healthyCount := 0 servers := future.Configuration().Servers for _, s := range servers { - health := op.srv.getServerHealth(string(s.Address)) + health := op.srv.getServerHealth(string(s.ID)) if health != nil { - // Fix up StableSince to be more readable - health.StableSince = health.StableSince.Round(time.Second).UTC() - - if !health.Healthy { - status.Healthy = false - } else { + if health.Healthy { healthyCount++ } status.Servers = append(status.Servers, *health) } } + status.Healthy = healthyCount == len(servers) // If we have extra healthy servers, set FailureTolerance if healthyCount > len(servers)/2+1 { diff --git a/consul/operator_endpoint_test.go b/consul/operator_endpoint_test.go index a762ba6b8..d430bd28f 100644 --- a/consul/operator_endpoint_test.go +++ b/consul/operator_endpoint_test.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/consul/testutil" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/raft" + "time" ) func TestOperator_RaftGetConfiguration(t *testing.T) { @@ -428,16 +429,11 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) { } func TestOperator_ServerHealth(t *testing.T) { - for i := 1; i <= 3; i++ { - testServerHealth(t, i) - } -} - -func testServerHealth(t *testing.T, protocol int) { dir1, s1 := testServerWithConfig(t, func(c *Config) { c.Datacenter = "dc1" c.Bootstrap = true - c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(protocol) + c.RaftConfig.ProtocolVersion = 3 + c.ServerHealthInterval = 100 * time.Millisecond }) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -447,7 +443,7 @@ func testServerHealth(t *testing.T, protocol int) { dir2, s2 := testServerWithConfig(t, func(c *Config) { c.Datacenter = "dc1" c.Bootstrap = false - c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(protocol) + c.RaftConfig.ProtocolVersion = 3 }) defer os.RemoveAll(dir2) defer s2.Shutdown() @@ -460,7 +456,7 @@ func testServerHealth(t *testing.T, protocol int) { dir3, s3 := testServerWithConfig(t, func(c *Config) { c.Datacenter = "dc1" c.Bootstrap = false - c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(protocol) + c.RaftConfig.ProtocolVersion = 3 }) defer os.RemoveAll(dir3) defer s3.Shutdown() @@ -488,13 +484,13 @@ func testServerHealth(t *testing.T, protocol int) { if len(reply.Servers) != 3 { return false, fmt.Errorf("bad: %v", reply) } - if reply.Servers[0].LastContact != "leader" { + if reply.Servers[0].LastContact != 0 { return false, fmt.Errorf("bad: %v", reply) } - if reply.Servers[1].LastContactRaw <= 0 { + if reply.Servers[1].LastContact <= 0 { return false, fmt.Errorf("bad: %v", reply) } - if reply.Servers[2].LastContactRaw <= 0 { + if reply.Servers[2].LastContact <= 0 { return false, fmt.Errorf("bad: %v", reply) } return true, nil @@ -502,3 +498,24 @@ func testServerHealth(t *testing.T, protocol int) { t.Fatal(err) }) } + +func TestOperator_ServerHealth_UnsupportedRaftVersion(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = true + c.RaftConfig.ProtocolVersion = 2 + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var reply structs.OperatorHealthReply + err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply) + if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") { + t.Fatalf("bad: %v", err) + } +} diff --git a/consul/server.go b/consul/server.go index e46fedd9f..112b9aff4 100644 --- a/consul/server.go +++ b/consul/server.go @@ -76,10 +76,6 @@ type Server struct { // aclCache is the non-authoritative ACL cache. aclCache *aclCache - // autopilotHealth stores the current view of server healths. - autopilotHealth map[string]*structs.ServerHealth - autopilotLock sync.RWMutex - // autopilotPolicy controls the behavior of Autopilot for certain tasks. autopilotPolicy AutopilotPolicy @@ -89,6 +85,9 @@ type Server struct { // autopilotShutdownCh is used to stop the Autopilot loop. autopilotShutdownCh chan struct{} + // autopilotWaitGroup is used to block until Autopilot shuts down. + autopilotWaitGroup sync.WaitGroup + // Consul configuration config *Config @@ -158,6 +157,10 @@ type Server struct { sessionTimers map[string]*time.Timer sessionTimersLock sync.Mutex + // serverHealths stores the current view of server healths. + serverHealths map[string]*structs.ServerHealth + serverHealthLock sync.RWMutex + // tombstoneGC is used to track the pending GC invocations // for the KV tombstones tombstoneGC *state.TombstoneGC @@ -315,6 +318,9 @@ func NewServer(config *Config) (*Server, error) { // Start the metrics handlers. go s.sessionStats() + // Start the server health checking. + go s.serverHealthLoop() + return s, nil } diff --git a/consul/structs/operator.go b/consul/structs/operator.go index 0ebeac8d3..f17cd8f41 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -112,12 +112,10 @@ type ServerHealth struct { Name string // The status of the SerfHealth check for the server. - SerfStatusRaw serf.MemberStatus `json:"-"` - SerfStatus string + SerfStatus serf.MemberStatus // LastContact is the time since this node's last contact with the leader. - LastContactRaw time.Duration `json:"-"` - LastContact string + LastContact time.Duration // LastTerm is the highest leader term this server has a record of in its Raft log. LastTerm uint64 @@ -129,10 +127,50 @@ type ServerHealth struct { // Autopilot config. Healthy bool - // StableSince is the amount of time since this server's Healthy value last changed. + // StableSince is the last time this server's Healthy value changed. StableSince time.Time } +// IsHealthy determines whether this ServerHealth is considered healthy +// based on the given Autopilot config +func (h *ServerHealth) IsHealthy(lastTerm uint64, lastIndex uint64, autopilotConf *AutopilotConfig) bool { + if h.SerfStatus != serf.StatusAlive { + return false + } + + if h.LastContact > autopilotConf.LastContactThreshold || h.LastContact < 0 { + return false + } + + if h.LastTerm != lastTerm { + return false + } + + if lastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < lastIndex-autopilotConf.MaxTrailingLogs { + return false + } + + return true +} + +// IsStable returns true if the ServerHealth is in a stable, passing state +// according to the given AutopilotConfig +func (h *ServerHealth) IsStable(now time.Time, conf *AutopilotConfig) bool { + if h == nil { + return false + } + + if !h.Healthy { + return false + } + + if now.Sub(h.StableSince) < conf.ServerStabilizationTime { + return false + } + + return true +} + // ServerStats holds miscellaneous Raft metrics for a server type ServerStats struct { // LastContact is the time since this node's last contact with the leader. diff --git a/consul/structs/operator_test.go b/consul/structs/operator_test.go new file mode 100644 index 000000000..8a0916f11 --- /dev/null +++ b/consul/structs/operator_test.go @@ -0,0 +1,94 @@ +package structs + +import ( + "testing" + "time" + + "github.com/hashicorp/serf/serf" +) + +func TestServerHealth_IsHealthy(t *testing.T) { + cases := []struct { + health ServerHealth + lastTerm uint64 + lastIndex uint64 + conf AutopilotConfig + expected bool + }{ + // Healthy server, all values within allowed limits + { + health: ServerHealth{SerfStatus: serf.StatusAlive, LastTerm: 1, LastIndex: 0}, + lastTerm: 1, + lastIndex: 10, + conf: AutopilotConfig{MaxTrailingLogs: 20}, + expected: true, + }, + // Serf status failed + { + health: ServerHealth{SerfStatus: serf.StatusFailed}, + expected: false, + }, + // Old value for lastTerm + { + health: ServerHealth{SerfStatus: serf.StatusAlive, LastTerm: 0}, + lastTerm: 1, + expected: false, + }, + // Too far behind on logs + { + health: ServerHealth{SerfStatus: serf.StatusAlive, LastIndex: 0}, + lastIndex: 10, + conf: AutopilotConfig{MaxTrailingLogs: 5}, + expected: false, + }, + } + + for index, tc := range cases { + actual := tc.health.IsHealthy(tc.lastTerm, tc.lastIndex, &tc.conf) + if actual != tc.expected { + t.Fatalf("bad value for case %d: %v", index, actual) + } + } +} + +func TestServerHealth_IsStable(t *testing.T) { + start := time.Now() + cases := []struct { + health *ServerHealth + now time.Time + conf AutopilotConfig + expected bool + }{ + // Healthy server, all values within allowed limits + { + health: &ServerHealth{Healthy: true, StableSince: start}, + now: start.Add(15 * time.Second), + conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second}, + expected: true, + }, + // Unhealthy server + { + health: &ServerHealth{Healthy: false}, + expected: false, + }, + // Healthy server, hasn't reached stabilization time + { + health: &ServerHealth{Healthy: true, StableSince: start}, + now: start.Add(5 * time.Second), + conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second}, + expected: false, + }, + // Nil struct + { + health: nil, + expected: false, + }, + } + + for index, tc := range cases { + actual := tc.health.IsStable(tc.now, &tc.conf) + if actual != tc.expected { + t.Fatalf("bad value for case %d: %v", index, actual) + } + } +} diff --git a/testutil/server.go b/testutil/server.go index bdbf61178..ad350c01f 100644 --- a/testutil/server.go +++ b/testutil/server.go @@ -65,6 +65,7 @@ type TestServerConfig struct { Bind string `json:"bind_addr,omitempty"` Addresses *TestAddressConfig `json:"addresses,omitempty"` Ports *TestPortConfig `json:"ports,omitempty"` + RaftProtocol int `json:"raft_protocol,omitempty"` ACLMasterToken string `json:"acl_master_token,omitempty"` ACLDatacenter string `json:"acl_datacenter,omitempty"` ACLDefaultPolicy string `json:"acl_default_policy,omitempty"` diff --git a/website/source/docs/agent/http/operator.html.markdown b/website/source/docs/agent/http/operator.html.markdown index d9aac210f..dabc9980a 100644 --- a/website/source/docs/agent/http/operator.html.markdown +++ b/website/source/docs/agent/http/operator.html.markdown @@ -289,9 +289,9 @@ A JSON body is returned that looks like this: ```javascript { "CleanupDeadServers": true, - "LastContactThreshold": 200000000, + "LastContactThreshold": "200ms", "MaxTrailingLogs": 250, - "ServerStabilizationTime": 10000000000, + "ServerStabilizationTime": "10s", "CreateIndex": 4, "ModifyIndex": 4 } @@ -318,9 +318,9 @@ body must look like: ```javascript { "CleanupDeadServers": true, - "LastContactThreshold": 200000000, + "LastContactThreshold": "200ms", "MaxTrailingLogs": 250, - "ServerStabilizationTime": 10000000000, + "ServerStabilizationTime": "10s", "CreateIndex": 4, "ModifyIndex": 4 } @@ -361,7 +361,7 @@ A JSON body is returned that looks like this: "ID": "e349749b-3303-3ddf-959c-b5885a0e1f6e", "Name": "node1", "SerfStatus": "alive", - "LastContact": "leader", + "LastContact": "0s", "LastTerm": 2, "LastIndex": 46, "Healthy": true, diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index 7c53eb411..4b85c47be 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -562,7 +562,8 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass The following sub-keys are available: * `cleanup_dead_servers` - This controls - the automatic removal of dead server nodes whenever a new server is added to the cluster. Defaults to `true`. + the automatic removal of dead server nodes periodically and whenever a new server is added to the cluster. + Defaults to `true`. * `last_contact_threshold` - Controls the maximum amount of time a server can go without contact from the leader before being considered unhealthy. @@ -575,7 +576,7 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass * `server_stabilization_time` - Controls the minimum amount of time a server must be stable in the 'healthy' state before being added to the cluster. Only takes effect if all servers are running Raft protocol version 3 or higher. Must be a duration value - such as `10s`. Defaults to `30s`. + such as `30s`. Defaults to `10s`. * `bootstrap` Equivalent to the [`-bootstrap` command-line flag](#_bootstrap). diff --git a/website/source/docs/commands/operator/autopilot.html.markdown.erb b/website/source/docs/commands/operator/autopilot.html.markdown.erb index a2e00c267..f61e872b1 100644 --- a/website/source/docs/commands/operator/autopilot.html.markdown.erb +++ b/website/source/docs/commands/operator/autopilot.html.markdown.erb @@ -62,7 +62,7 @@ Usage: `consul operator autopilot set-config [options]` upon the successful joining of new servers to the cluster. Must be one of `[true|false]`. * `last-contact-threshold` - Controls the maximum amount of time a server can go without contact -from the leader before being considered unhealthy. Must be a duration value such as `10s`. +from the leader before being considered unhealthy. Must be a duration value such as `200ms`. * `max-trailing-logs` - Controls the maximum number of log entries that a server can trail the leader by before being considered unhealthy. From b15d67bfac03f868e936d660f481e3335b9b1afb Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Fri, 10 Mar 2017 11:41:17 -0800 Subject: [PATCH 5/5] Use defers for WaitGroup and Ticker stop --- command/operator_autopilot_set.go | 5 +++-- command/operator_autopilot_set_test.go | 2 +- consul/autopilot.go | 11 ++++++---- consul/autopilot_test.go | 28 +++++++++++++++----------- 4 files changed, 27 insertions(+), 19 deletions(-) diff --git a/command/operator_autopilot_set.go b/command/operator_autopilot_set.go index 3bd8dc859..6152c0dec 100644 --- a/command/operator_autopilot_set.go +++ b/command/operator_autopilot_set.go @@ -3,10 +3,11 @@ package command import ( "flag" "fmt" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/command/base" "strings" "time" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/command/base" ) type OperatorAutopilotSetCommand struct { diff --git a/command/operator_autopilot_set_test.go b/command/operator_autopilot_set_test.go index 4b859aa0e..3b6b8b69a 100644 --- a/command/operator_autopilot_set_test.go +++ b/command/operator_autopilot_set_test.go @@ -3,11 +3,11 @@ package command import ( "strings" "testing" + "time" "github.com/hashicorp/consul/command/base" "github.com/hashicorp/consul/consul/structs" "github.com/mitchellh/cli" - "time" ) func TestOperator_Autopilot_Set_Implements(t *testing.T) { diff --git a/consul/autopilot.go b/consul/autopilot.go index 1815985fc..b1736b3b8 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -33,13 +33,15 @@ func (s *Server) stopAutopilot() { // autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove. func (s *Server) autopilotLoop() { + defer s.autopilotWaitGroup.Done() + // Monitor server health until shutdown ticker := time.NewTicker(s.config.AutopilotInterval) + defer ticker.Stop() + for { select { case <-s.autopilotShutdownCh: - ticker.Stop() - s.autopilotWaitGroup.Done() return case <-ticker.C: state := s.fsm.State() @@ -93,7 +95,7 @@ func (s *Server) pruneDeadServers() error { } // Only do removals if a minority of servers will be affected - if len(failed) < peers/2 || (len(failed) == 1 && peers >= 3) { + if len(failed) < peers/2 { for _, server := range failed { s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", server) go s.serfLAN.RemoveFailedNode(server) @@ -188,10 +190,11 @@ func (b *BasicAutopilot) PromoteNonVoters(autopilotConf *structs.AutopilotConfig func (s *Server) serverHealthLoop() { // Monitor server health until shutdown ticker := time.NewTicker(s.config.ServerHealthInterval) + defer ticker.Stop() + for { select { case <-s.shutdownCh: - ticker.Stop() return case <-ticker.C: serverHealths := make(map[string]*structs.ServerHealth) diff --git a/consul/autopilot_test.go b/consul/autopilot_test.go index 034197208..6d38d6a4d 100644 --- a/consul/autopilot_test.go +++ b/consul/autopilot_test.go @@ -102,37 +102,41 @@ func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) { defer os.RemoveAll(dir3) defer s3.Shutdown() - servers := []*Server{s1, s2, s3} + dir4, s4 := testServerWithConfig(t, conf) + defer os.RemoveAll(dir4) + defer s4.Shutdown() + + servers := []*Server{s1, s2, s3, s4} // Join the servers to s1 addr := fmt.Sprintf("127.0.0.1:%d", s1.config.SerfLANConfig.MemberlistConfig.BindPort) - if _, err := s2.JoinLAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - if _, err := s3.JoinLAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) + + for _, s := range servers[1:] { + if _, err := s.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } } for _, s := range servers { testutil.WaitForResult(func() (bool, error) { peers, _ := s.numPeers() - return peers == 3, nil + return peers == 4, nil }, func(err error) { - t.Fatalf("should have 3 peers") + t.Fatalf("should have 4 peers") }) } // Kill a non-leader server - s3.Shutdown() + s4.Shutdown() // Should be removed from the peers automatically - for _, s := range []*Server{s1, s2} { + for _, s := range []*Server{s1, s2, s3} { testutil.WaitForResult(func() (bool, error) { peers, _ := s.numPeers() - return peers == 2, nil + return peers == 3, nil }, func(err error) { - t.Fatalf("should have 2 peers") + t.Fatalf("should have 3 peers") }) } }