Merge pull request #2788 from hashicorp/f-autopilot-2

Autopilot server health monitoring
This commit is contained in:
Kyle Havlovitz 2017-03-10 12:29:45 -08:00 committed by GitHub
commit 4807215fa1
27 changed files with 1570 additions and 127 deletions

View File

@ -6,6 +6,7 @@ import (
"io"
"strconv"
"strings"
"time"
)
// Operator can be used to perform low-level operator tasks for Consul.
@ -79,6 +80,19 @@ type AutopilotConfiguration struct {
// peer list when a new server joins
CleanupDeadServers bool
// LastContactThreshold is the limit on the amount of time a server can go
// without leader contact before being considered unhealthy.
LastContactThreshold *ReadableDuration
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
// be behind before being considered unhealthy.
MaxTrailingLogs uint64
// ServerStabilizationTime is the minimum amount of time a server must be
// in a stable, healthy state before it can be added to the cluster. Only
// applicable with Raft protocol version 3 or higher.
ServerStabilizationTime *ReadableDuration
// CreateIndex holds the index corresponding the creation of this configuration.
// This is a read-only field.
CreateIndex uint64
@ -90,6 +104,84 @@ type AutopilotConfiguration struct {
ModifyIndex uint64
}
// ServerHealth is the health (from the leader's point of view) of a server.
type ServerHealth struct {
// ID is the raft ID of the server.
ID string
// Name is the node name of the server.
Name string
// The status of the SerfHealth check for the server.
SerfStatus string
// LastContact is the time since this node's last contact with the leader.
LastContact *ReadableDuration
// LastTerm is the highest leader term this server has a record of in its Raft log.
LastTerm uint64
// LastIndex is the last log index this server has a record of in its Raft log.
LastIndex uint64
// Healthy is whether or not the server is healthy according to the current
// Autopilot config.
Healthy bool
// StableSince is the last time this server's Healthy value changed.
StableSince time.Time
}
// OperatorHealthReply is a representation of the overall health of the cluster
type OperatorHealthReply struct {
// Healthy is true if all the servers in the cluster are healthy.
Healthy bool
// FailureTolerance is the number of healthy servers that could be lost without
// an outage occurring.
FailureTolerance int
// Servers holds the health of each server.
Servers []ServerHealth
}
// ReadableDuration is a duration type that is serialized to JSON in human readable format.
type ReadableDuration time.Duration
func NewReadableDuration(dur time.Duration) *ReadableDuration {
d := ReadableDuration(dur)
return &d
}
func (d *ReadableDuration) String() string { return d.Duration().String() }
func (d *ReadableDuration) Duration() time.Duration {
if d == nil {
return time.Duration(0)
}
return time.Duration(*d)
}
func (d *ReadableDuration) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, d.Duration().String())), nil
}
func (d *ReadableDuration) UnmarshalJSON(raw []byte) error {
if d == nil {
return fmt.Errorf("cannot unmarshal to nil pointer")
}
str := string(raw)
if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' {
return fmt.Errorf("must be enclosed with quotes: %s", str)
}
dur, err := time.ParseDuration(str[1 : len(str)-1])
if err != nil {
return err
}
*d = ReadableDuration(dur)
return nil
}
// RaftGetConfiguration is used to query the current Raft peer set.
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
r := op.c.newRequest("GET", "/v1/operator/raft/configuration")
@ -203,6 +295,7 @@ func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfig
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return &out, nil
}
@ -241,3 +334,20 @@ func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *W
return res, nil
}
// AutopilotServerHealth
func (op *Operator) AutopilotServerHealth(q *QueryOptions) (*OperatorHealthReply, error) {
r := op.c.newRequest("GET", "/v1/operator/autopilot/health")
r.setQueryOptions(q)
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out OperatorHealthReply
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return &out, nil
}

View File

@ -1,6 +1,7 @@
package api
import (
"fmt"
"strings"
"testing"
@ -178,3 +179,28 @@ func TestOperator_AutopilotCASConfiguration(t *testing.T) {
}
}
}
func TestOperator_ServerHealth(t *testing.T) {
t.Parallel()
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
c.RaftProtocol = 3
})
defer s.Stop()
operator := c.Operator()
testutil.WaitForResult(func() (bool, error) {
out, err := operator.AutopilotServerHealth(nil)
if err != nil {
return false, fmt.Errorf("err: %v", err)
}
if len(out.Servers) != 1 ||
!out.Servers[0].Healthy ||
out.Servers[0].Name != s.Config.NodeName {
return false, fmt.Errorf("bad: %v", out)
}
return true, nil
}, func(err error) {
t.Fatal(err)
})
}

View File

@ -419,6 +419,15 @@ func (a *Agent) consulConfig() *consul.Config {
if a.config.Autopilot.CleanupDeadServers != nil {
base.AutopilotConfig.CleanupDeadServers = *a.config.Autopilot.CleanupDeadServers
}
if a.config.Autopilot.LastContactThreshold != nil {
base.AutopilotConfig.LastContactThreshold = *a.config.Autopilot.LastContactThreshold
}
if a.config.Autopilot.MaxTrailingLogs != nil {
base.AutopilotConfig.MaxTrailingLogs = *a.config.Autopilot.MaxTrailingLogs
}
if a.config.Autopilot.ServerStabilizationTime != nil {
base.AutopilotConfig.ServerStabilizationTime = *a.config.Autopilot.ServerStabilizationTime
}
// Format the build string
revision := a.config.Revision

View File

@ -135,6 +135,8 @@ func (c *Command) readConfig() *Config {
f.IntVar(&cmdConfig.Protocol, "protocol", -1,
"Sets the protocol version. Defaults to latest.")
f.IntVar(&cmdConfig.RaftProtocol, "raft-protocol", -1,
"Sets the Raft protocol version. Defaults to latest.")
f.BoolVar(&cmdConfig.EnableSyslog, "syslog", false,
"Enables logging to syslog.")

View File

@ -265,6 +265,21 @@ type Autopilot struct {
// CleanupDeadServers enables the automatic cleanup of dead servers when new ones
// are added to the peer list. Defaults to true.
CleanupDeadServers *bool `mapstructure:"cleanup_dead_servers"`
// LastContactThreshold is the limit on the amount of time a server can go
// without leader contact before being considered unhealthy.
LastContactThreshold *time.Duration `mapstructure:"-" json:"-"`
LastContactThresholdRaw string `mapstructure:"last_contact_threshold"`
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
// be behind before being considered unhealthy.
MaxTrailingLogs *uint64 `mapstructure:"max_trailing_logs"`
// ServerStabilizationTime is the minimum amount of time a server must be
// in a stable, healthy state before it can be added to the cluster. Only
// applicable with Raft protocol version 3 or higher.
ServerStabilizationTime *time.Duration `mapstructure:"-" json:"-"`
ServerStabilizationTimeRaw string `mapstructure:"server_stabilization_time"`
}
// Config is the configuration that can be set for an Agent.
@ -692,6 +707,16 @@ func Bool(b bool) *bool {
return &b
}
// Uint64 is used to initialize uint64 pointers in struct literals.
func Uint64(i uint64) *uint64 {
return &i
}
// Duration is used to initialize time.Duration pointers in struct literals.
func Duration(d time.Duration) *time.Duration {
return &d
}
// UnixSocketPermissions contains information about a unix socket, and
// implements the FilePermissions interface.
type UnixSocketPermissions struct {
@ -1041,6 +1066,21 @@ func DecodeConfig(r io.Reader) (*Config, error) {
result.ReconnectTimeoutWan = dur
}
if raw := result.Autopilot.LastContactThresholdRaw; raw != "" {
dur, err := time.ParseDuration(raw)
if err != nil {
return nil, fmt.Errorf("LastContactThreshold invalid: %v", err)
}
result.Autopilot.LastContactThreshold = &dur
}
if raw := result.Autopilot.ServerStabilizationTimeRaw; raw != "" {
dur, err := time.ParseDuration(raw)
if err != nil {
return nil, fmt.Errorf("ServerStabilizationTime invalid: %v", err)
}
result.Autopilot.ServerStabilizationTime = &dur
}
// Merge the single recursor
if result.DNSRecursor != "" {
result.DNSRecursors = append(result.DNSRecursors, result.DNSRecursor)
@ -1293,7 +1333,7 @@ func MergeConfig(a, b *Config) *Config {
if b.Protocol > 0 {
result.Protocol = b.Protocol
}
if b.RaftProtocol != 0 {
if b.RaftProtocol > 0 {
result.RaftProtocol = b.RaftProtocol
}
if b.NodeID != "" {
@ -1347,6 +1387,15 @@ func MergeConfig(a, b *Config) *Config {
if b.Autopilot.CleanupDeadServers != nil {
result.Autopilot.CleanupDeadServers = b.Autopilot.CleanupDeadServers
}
if b.Autopilot.LastContactThreshold != nil {
result.Autopilot.LastContactThreshold = b.Autopilot.LastContactThreshold
}
if b.Autopilot.MaxTrailingLogs != nil {
result.Autopilot.MaxTrailingLogs = b.Autopilot.MaxTrailingLogs
}
if b.Autopilot.ServerStabilizationTime != nil {
result.Autopilot.ServerStabilizationTime = b.Autopilot.ServerStabilizationTime
}
if b.Telemetry.DisableHostname == true {
result.Telemetry.DisableHostname = true
}

View File

@ -1103,13 +1103,27 @@ func TestDecodeConfig_Performance(t *testing.T) {
}
func TestDecodeConfig_Autopilot(t *testing.T) {
input := `{"autopilot": { "cleanup_dead_servers": true }}`
input := `{"autopilot": {
"cleanup_dead_servers": true,
"last_contact_threshold": "100ms",
"max_trailing_logs": 10,
"server_stabilization_time": "10s"
}}`
config, err := DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if config.Autopilot.CleanupDeadServers == nil || !*config.Autopilot.CleanupDeadServers {
t.Fatalf("bad: cleanup_dead_servers isn't set: %#v", config)
t.Fatalf("bad: %#v", config)
}
if config.Autopilot.LastContactThreshold == nil || *config.Autopilot.LastContactThreshold != 100*time.Millisecond {
t.Fatalf("bad: %#v", config)
}
if config.Autopilot.MaxTrailingLogs == nil || *config.Autopilot.MaxTrailingLogs != 10 {
t.Fatalf("bad: %#v", config)
}
if config.Autopilot.ServerStabilizationTime == nil || *config.Autopilot.ServerStabilizationTime != 10*time.Second {
t.Fatalf("bad: %#v", config)
}
}
@ -1629,7 +1643,10 @@ func TestMergeConfig(t *testing.T) {
SkipLeaveOnInt: Bool(true),
RaftProtocol: 3,
Autopilot: Autopilot{
CleanupDeadServers: Bool(true),
CleanupDeadServers: Bool(true),
LastContactThreshold: Duration(time.Duration(10)),
MaxTrailingLogs: Uint64(10),
ServerStabilizationTime: Duration(time.Duration(100)),
},
EnableDebug: true,
VerifyIncoming: true,

View File

@ -298,6 +298,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer))
s.handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint))
s.handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration))
s.handleFuncMetrics("/v1/operator/autopilot/health", s.wrap(s.OperatorServerHealth))
s.handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral))
s.handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific))
s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate))

View File

@ -4,10 +4,13 @@ import (
"fmt"
"net/http"
"strconv"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/consul/structs"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/raft"
"strings"
)
// OperatorRaftConfiguration is used to inspect the current Raft configuration.
@ -183,12 +186,35 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
return nil, err
}
return reply, nil
out := api.AutopilotConfiguration{
CleanupDeadServers: reply.CleanupDeadServers,
LastContactThreshold: api.NewReadableDuration(reply.LastContactThreshold),
MaxTrailingLogs: reply.MaxTrailingLogs,
ServerStabilizationTime: api.NewReadableDuration(reply.ServerStabilizationTime),
CreateIndex: reply.CreateIndex,
ModifyIndex: reply.ModifyIndex,
}
return out, nil
case "PUT":
var args structs.AutopilotSetConfigRequest
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
var conf api.AutopilotConfiguration
if err := decodeBody(req, &conf, FixupConfigDurations); err != nil {
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf("Error parsing autopilot config: %v", err)))
return nil, nil
}
args.Config = structs.AutopilotConfig{
CleanupDeadServers: conf.CleanupDeadServers,
LastContactThreshold: conf.LastContactThreshold.Duration(),
MaxTrailingLogs: conf.MaxTrailingLogs,
ServerStabilizationTime: conf.ServerStabilizationTime.Duration(),
}
// Check for cas value
params := req.URL.Query()
if _, ok := params["cas"]; ok {
@ -202,12 +228,6 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
args.CAS = true
}
if err := decodeBody(req, &args.Config, nil); err != nil {
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf("Error parsing autopilot config: %v", err)))
return nil, nil
}
var reply bool
if err := s.agent.RPC("Operator.AutopilotSetConfiguration", &args, &reply); err != nil {
return nil, err
@ -224,3 +244,68 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
return nil, nil
}
}
// FixupConfigDurations is used to handle parsing the duration fields in
// the Autopilot config struct
func FixupConfigDurations(raw interface{}) error {
rawMap, ok := raw.(map[string]interface{})
if !ok {
return nil
}
for key, val := range rawMap {
if strings.ToLower(key) == "lastcontactthreshold" ||
strings.ToLower(key) == "serverstabilizationtime" {
// Convert a string value into an integer
if vStr, ok := val.(string); ok {
dur, err := time.ParseDuration(vStr)
if err != nil {
return err
}
rawMap[key] = dur
}
}
}
return nil
}
// OperatorServerHealth is used to get the health of the servers in the local DC
func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
var args structs.DCSpecificRequest
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
var reply structs.OperatorHealthReply
if err := s.agent.RPC("Operator.ServerHealth", &args, &reply); err != nil {
return nil, err
}
// Reply with status 429 if something is unhealthy
if !reply.Healthy {
resp.WriteHeader(http.StatusTooManyRequests)
}
out := &api.OperatorHealthReply{
Healthy: reply.Healthy,
FailureTolerance: reply.FailureTolerance,
}
for _, server := range reply.Servers {
out.Servers = append(out.Servers, api.ServerHealth{
ID: server.ID,
Name: server.Name,
SerfStatus: server.SerfStatus.String(),
LastContact: api.NewReadableDuration(server.LastContact),
LastTerm: server.LastTerm,
LastIndex: server.LastIndex,
Healthy: server.Healthy,
StableSince: server.StableSince.Round(time.Second).UTC(),
})
}
return out, nil
}

View File

@ -7,8 +7,11 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
)
func TestOperator_OperatorRaftConfiguration(t *testing.T) {
@ -302,7 +305,7 @@ func TestOperator_AutopilotGetConfiguration(t *testing.T) {
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
out, ok := obj.(structs.AutopilotConfig)
out, ok := obj.(api.AutopilotConfiguration)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
@ -420,3 +423,83 @@ func TestOperator_AutopilotCASConfiguration(t *testing.T) {
}
})
}
func TestOperator_OperatorServerHealth(t *testing.T) {
cb := func(c *Config) {
c.RaftProtocol = 3
}
httpTestWithConfig(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/operator/autopilot/health", body)
if err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
resp := httptest.NewRecorder()
obj, err := srv.OperatorServerHealth(resp, req)
if err != nil {
return false, fmt.Errorf("err: %v", err)
}
if resp.Code != 200 {
return false, fmt.Errorf("bad code: %d", resp.Code)
}
out, ok := obj.(*api.OperatorHealthReply)
if !ok {
return false, fmt.Errorf("unexpected: %T", obj)
}
if len(out.Servers) != 1 ||
!out.Servers[0].Healthy ||
out.Servers[0].Name != srv.agent.config.NodeName ||
out.Servers[0].SerfStatus != "alive" ||
out.FailureTolerance != 0 {
return false, fmt.Errorf("bad: %v", out)
}
return true, nil
}, func(err error) {
t.Fatal(err)
})
}, cb)
}
func TestOperator_OperatorServerHealth_Unhealthy(t *testing.T) {
threshold := time.Duration(-1)
cb := func(c *Config) {
c.RaftProtocol = 3
c.Autopilot.LastContactThreshold = &threshold
}
httpTestWithConfig(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/operator/autopilot/health", body)
if err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
resp := httptest.NewRecorder()
obj, err := srv.OperatorServerHealth(resp, req)
if err != nil {
return false, fmt.Errorf("err: %v", err)
}
if resp.Code != 429 {
return false, fmt.Errorf("bad code: %d", resp.Code)
}
out, ok := obj.(*api.OperatorHealthReply)
if !ok {
return false, fmt.Errorf("unexpected: %T", obj)
}
if len(out.Servers) != 1 ||
out.Healthy ||
out.Servers[0].Name != srv.agent.config.NodeName {
return false, fmt.Errorf("bad: %v", out)
}
return true, nil
}, func(err error) {
t.Fatal(err)
})
}, cb)
}

View File

@ -56,6 +56,9 @@ func (c *OperatorAutopilotGetCommand) Run(args []string) int {
return 1
}
c.Ui.Output(fmt.Sprintf("CleanupDeadServers = %v", config.CleanupDeadServers))
c.Ui.Output(fmt.Sprintf("LastContactThreshold = %v", config.LastContactThreshold.String()))
c.Ui.Output(fmt.Sprintf("MaxTrailingLogs = %v", config.MaxTrailingLogs))
c.Ui.Output(fmt.Sprintf("ServerStabilizationTime = %v", config.ServerStabilizationTime.String()))
return 0
}

View File

@ -4,7 +4,9 @@ import (
"flag"
"fmt"
"strings"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/base"
)
@ -29,12 +31,27 @@ func (c *OperatorAutopilotSetCommand) Synopsis() string {
func (c *OperatorAutopilotSetCommand) Run(args []string) int {
var cleanupDeadServers base.BoolValue
var maxTrailingLogs base.UintValue
var lastContactThreshold base.DurationValue
var serverStabilizationTime base.DurationValue
f := c.Command.NewFlagSet(c)
f.Var(&cleanupDeadServers, "cleanup-dead-servers",
"Controls whether Consul will automatically remove dead servers "+
"when new ones are successfully added. Must be one of `true|false`.")
f.Var(&maxTrailingLogs, "max-trailing-logs",
"Controls the maximum number of log entries that a server can trail the "+
"leader by before being considered unhealthy.")
f.Var(&lastContactThreshold, "last-contact-threshold",
"Controls the maximum amount of time a server can go without contact "+
"from the leader before being considered unhealthy. Must be a duration value "+
"such as `200ms`.")
f.Var(&serverStabilizationTime, "server-stabilization-time",
"Controls the minimum amount of time a server must be stable in the "+
"'healthy' state before being added to the cluster. Only takes effect if all "+
"servers are running Raft protocol version 3 or higher. Must be a duration "+
"value such as `10s`.")
if err := c.Command.Parse(args); err != nil {
if err == flag.ErrHelp {
@ -59,9 +76,21 @@ func (c *OperatorAutopilotSetCommand) Run(args []string) int {
return 1
}
// Update the config values.
// Update the config values based on the set flags.
cleanupDeadServers.Merge(&conf.CleanupDeadServers)
trailing := uint(conf.MaxTrailingLogs)
maxTrailingLogs.Merge(&trailing)
conf.MaxTrailingLogs = uint64(trailing)
last := time.Duration(*conf.LastContactThreshold)
lastContactThreshold.Merge(&last)
conf.LastContactThreshold = api.NewReadableDuration(last)
stablization := time.Duration(*conf.ServerStabilizationTime)
serverStabilizationTime.Merge(&stablization)
conf.ServerStabilizationTime = api.NewReadableDuration(stablization)
// Check-and-set the new configuration.
result, err := operator.AutopilotCASConfiguration(conf, nil)
if err != nil {

View File

@ -3,6 +3,7 @@ package command
import (
"strings"
"testing"
"time"
"github.com/hashicorp/consul/command/base"
"github.com/hashicorp/consul/consul/structs"
@ -25,7 +26,13 @@ func TestOperator_Autopilot_Set(t *testing.T) {
Flags: base.FlagSetHTTP,
},
}
args := []string{"-http-addr=" + a1.httpAddr, "-cleanup-dead-servers=false"}
args := []string{
"-http-addr=" + a1.httpAddr,
"-cleanup-dead-servers=false",
"-max-trailing-logs=99",
"-last-contact-threshold=123ms",
"-server-stabilization-time=123ms",
}
code := c.Run(args)
if code != 0 {
@ -47,4 +54,13 @@ func TestOperator_Autopilot_Set(t *testing.T) {
if reply.CleanupDeadServers {
t.Fatalf("bad: %#v", reply)
}
if reply.MaxTrailingLogs != 99 {
t.Fatalf("bad: %#v", reply)
}
if reply.LastContactThreshold != 123*time.Millisecond {
t.Fatalf("bad: %#v", reply)
}
if reply.ServerStabilizationTime != 123*time.Millisecond {
t.Fatalf("bad: %#v", reply)
}
}

314
consul/autopilot.go Normal file
View File

@ -0,0 +1,314 @@
package consul
import (
"fmt"
"strconv"
"sync"
"time"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
// AutopilotPolicy is the interface for the Autopilot mechanism
type AutopilotPolicy interface {
// PromoteNonVoters defines the handling of non-voting servers
PromoteNonVoters(*structs.AutopilotConfig) error
}
func (s *Server) startAutopilot() {
s.autopilotShutdownCh = make(chan struct{})
s.autopilotWaitGroup = sync.WaitGroup{}
s.autopilotWaitGroup.Add(1)
go s.autopilotLoop()
}
func (s *Server) stopAutopilot() {
close(s.autopilotShutdownCh)
s.autopilotWaitGroup.Wait()
}
// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove.
func (s *Server) autopilotLoop() {
defer s.autopilotWaitGroup.Done()
// Monitor server health until shutdown
ticker := time.NewTicker(s.config.AutopilotInterval)
defer ticker.Stop()
for {
select {
case <-s.autopilotShutdownCh:
return
case <-ticker.C:
state := s.fsm.State()
_, autopilotConf, err := state.AutopilotConfig()
if err != nil {
s.logger.Printf("[ERR] consul: error retrieving autopilot config: %s", err)
}
if err := s.autopilotPolicy.PromoteNonVoters(autopilotConf); err != nil {
s.logger.Printf("[ERR] consul: error checking for non-voters to promote: %s", err)
}
if err := s.pruneDeadServers(); err != nil {
s.logger.Printf("[ERR] consul: error checking for dead servers to remove: %s", err)
}
case <-s.autopilotRemoveDeadCh:
if err := s.pruneDeadServers(); err != nil {
s.logger.Printf("[ERR] consul: error checking for dead servers to remove: %s", err)
}
}
}
}
// pruneDeadServers removes up to numPeers/2 failed servers
func (s *Server) pruneDeadServers() error {
state := s.fsm.State()
_, autopilotConf, err := state.AutopilotConfig()
if err != nil {
return err
}
// Find any failed servers
var failed []string
if autopilotConf.CleanupDeadServers {
for _, member := range s.serfLAN.Members() {
valid, _ := agent.IsConsulServer(member)
if valid && member.Status == serf.StatusFailed {
failed = append(failed, member.Name)
}
}
}
// Nothing to remove, return early
if len(failed) == 0 {
return nil
}
peers, err := s.numPeers()
if err != nil {
return err
}
// Only do removals if a minority of servers will be affected
if len(failed) < peers/2 {
for _, server := range failed {
s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", server)
go s.serfLAN.RemoveFailedNode(server)
}
} else {
s.logger.Printf("[ERR] consul: Failed to remove dead servers: too many dead servers: %d/%d", len(failed), peers)
}
return nil
}
// BasicAutopilot defines a policy for promoting non-voting servers in a way
// that maintains an odd-numbered voter count.
type BasicAutopilot struct {
server *Server
}
// PromoteNonVoters promotes eligible non-voting servers to voters.
func (b *BasicAutopilot) PromoteNonVoters(autopilotConf *structs.AutopilotConfig) error {
minRaftProtocol, err := ServerMinRaftProtocol(b.server.LANMembers())
if err != nil {
return fmt.Errorf("error getting server raft protocol versions: %s", err)
}
// If we don't meet the minimum version for non-voter features, bail early
if minRaftProtocol < 3 {
return nil
}
future := b.server.raft.GetConfiguration()
if err := future.Error(); err != nil {
return fmt.Errorf("failed to get raft configuration: %v", err)
}
var promotions []raft.Server
raftServers := future.Configuration().Servers
voterCount := 0
for _, server := range raftServers {
// If this server has been stable and passing for long enough, promote it to a voter
if server.Suffrage == raft.Nonvoter {
health := b.server.getServerHealth(string(server.ID))
if health.IsStable(time.Now(), autopilotConf) {
promotions = append(promotions, server)
}
} else {
voterCount++
}
}
// Exit early if there's nothing to promote
if len(promotions) == 0 {
return nil
}
// If there's currently an even number of servers, we can promote the first server in the list
// to get to an odd-sized quorum
newServers := false
if voterCount%2 == 0 {
addFuture := b.server.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0)
if err := addFuture.Error(); err != nil {
return fmt.Errorf("failed to add raft peer: %v", err)
}
promotions = promotions[1:]
newServers = true
}
// Promote remaining servers in twos to maintain an odd quorum size
for i := 0; i < len(promotions)-1; i += 2 {
addFirst := b.server.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0)
if err := addFirst.Error(); err != nil {
return fmt.Errorf("failed to add raft peer: %v", err)
}
addSecond := b.server.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0)
if err := addSecond.Error(); err != nil {
return fmt.Errorf("failed to add raft peer: %v", err)
}
newServers = true
}
// If we added a new server, trigger a check to remove dead servers
if newServers {
select {
case b.server.autopilotRemoveDeadCh <- struct{}{}:
default:
}
}
return nil
}
// serverHealthLoop monitors the health of the servers in the cluster
func (s *Server) serverHealthLoop() {
// Monitor server health until shutdown
ticker := time.NewTicker(s.config.ServerHealthInterval)
defer ticker.Stop()
for {
select {
case <-s.shutdownCh:
return
case <-ticker.C:
serverHealths := make(map[string]*structs.ServerHealth)
// Don't do anything if the min Raft version is too low
minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers())
if err != nil {
s.logger.Printf("[ERR] consul: error getting server raft protocol versions: %s", err)
break
}
if minRaftProtocol < 3 {
break
}
state := s.fsm.State()
_, autopilotConf, err := state.AutopilotConfig()
if err != nil {
s.logger.Printf("[ERR] consul: error retrieving autopilot config: %s", err)
break
}
// Bail early if autopilot config hasn't been initialized yet
if autopilotConf == nil {
break
}
// Build an updated map of server healths
for _, member := range s.LANMembers() {
if member.Status == serf.StatusLeft {
continue
}
valid, parts := agent.IsConsulServer(member)
if valid {
health, err := s.queryServerHealth(member, parts, autopilotConf)
if err != nil {
s.logger.Printf("[ERR] consul: error fetching server health: %s", err)
serverHealths[parts.ID] = &structs.ServerHealth{
ID: parts.ID,
Name: parts.Name,
Healthy: false,
}
} else {
serverHealths[parts.ID] = health
}
}
}
s.serverHealthLock.Lock()
s.serverHealths = serverHealths
s.serverHealthLock.Unlock()
}
}
}
// queryServerHealth fetches the raft stats for the given server and uses them
// to update its ServerHealth
func (s *Server) queryServerHealth(member serf.Member, server *agent.Server,
autopilotConf *structs.AutopilotConfig) (*structs.ServerHealth, error) {
stats, err := s.getServerStats(server)
if err != nil {
return nil, fmt.Errorf("error getting raft stats: %s", err)
}
health := &structs.ServerHealth{
ID: server.ID,
Name: server.Name,
SerfStatus: member.Status,
LastContact: -1,
LastTerm: stats.LastTerm,
LastIndex: stats.LastIndex,
}
if stats.LastContact != "never" {
health.LastContact, err = time.ParseDuration(stats.LastContact)
if err != nil {
return nil, fmt.Errorf("error parsing last_contact duration: %s", err)
}
}
// Set LastContact to 0 for the leader
if s.config.NodeName == member.Name {
health.LastContact = 0
}
lastTerm, err := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing last_log_term: %s", err)
}
health.Healthy = health.IsHealthy(lastTerm, s.raft.LastIndex(), autopilotConf)
// If this is a new server or the health changed, reset StableSince
lastHealth := s.getServerHealth(server.ID)
if lastHealth == nil || lastHealth.Healthy != health.Healthy {
health.StableSince = time.Now()
} else {
health.StableSince = lastHealth.StableSince
}
return health, nil
}
func (s *Server) getServerHealth(id string) *structs.ServerHealth {
s.serverHealthLock.RLock()
defer s.serverHealthLock.RUnlock()
h, ok := s.serverHealths[id]
if !ok {
return nil
}
return h
}
func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) {
var args struct{}
var reply structs.ServerStats
err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply)
return reply, err
}

240
consul/autopilot_test.go Normal file
View File

@ -0,0 +1,240 @@
package consul
import (
"fmt"
"os"
"testing"
"time"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
func TestAutopilot_CleanupDeadServer(t *testing.T) {
dir1, s1 := testServerDCBootstrap(t, "dc1", true)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
// Kill a non-leader server
s2.Shutdown()
testutil.WaitForResult(func() (bool, error) {
alive := 0
for _, m := range s1.LANMembers() {
if m.Status == serf.StatusAlive {
alive++
}
}
return alive == 2, nil
}, func(err error) {
t.Fatalf("should have 2 alive members")
})
// Bring up and join a new server
dir4, s4 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir4)
defer s4.Shutdown()
if _, err := s4.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
servers[1] = s4
// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
}
func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
c.AutopilotInterval = 100 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
conf := func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
}
dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
dir4, s4 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir4)
defer s4.Shutdown()
servers := []*Server{s1, s2, s3, s4}
// Join the servers to s1
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
for _, s := range servers[1:] {
if _, err := s.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
}
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 4, nil
}, func(err error) {
t.Fatalf("should have 4 peers")
})
}
// Kill a non-leader server
s4.Shutdown()
// Should be removed from the peers automatically
for _, s := range []*Server{s1, s2, s3} {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
}
func TestAutopilot_PromoteNonVoter(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
c.RaftConfig.ProtocolVersion = 3
c.AutopilotConfig.ServerStabilizationTime = 200 * time.Millisecond
c.ServerHealthInterval = 100 * time.Millisecond
c.AutopilotInterval = 100 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
c.RaftConfig.ProtocolVersion = 3
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Wait for the new server to be added as a non-voter, but make sure
// it doesn't get promoted to a voter even after ServerStabilizationTime,
// because that would result in an even-numbered quorum count.
testutil.WaitForResult(func() (bool, error) {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
return false, err
}
servers := future.Configuration().Servers
if len(servers) != 2 {
return false, fmt.Errorf("bad: %v", servers)
}
if servers[1].Suffrage != raft.Nonvoter {
return false, fmt.Errorf("bad: %v", servers)
}
health := s1.getServerHealth(string(servers[1].ID))
if health == nil {
return false, fmt.Errorf("nil health")
}
if !health.Healthy {
return false, fmt.Errorf("bad: %v", health)
}
if time.Now().Sub(health.StableSince) < s1.config.AutopilotConfig.ServerStabilizationTime {
return false, fmt.Errorf("stable period not elapsed")
}
return true, nil
}, func(err error) {
t.Fatal(err)
})
// Now add another server and make sure they both get promoted to voters after stabilization
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
c.RaftConfig.ProtocolVersion = 3
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
return false, err
}
servers := future.Configuration().Servers
if len(servers) != 3 {
return false, fmt.Errorf("bad: %v", servers)
}
if servers[1].Suffrage != raft.Voter {
return false, fmt.Errorf("bad: %v", servers)
}
if servers[2].Suffrage != raft.Voter {
return false, fmt.Errorf("bad: %v", servers)
}
return true, nil
}, func(err error) {
t.Fatal(err)
})
}

View File

@ -279,6 +279,15 @@ type Config struct {
// AutopilotConfig is used to apply the initial autopilot config when
// bootstrapping.
AutopilotConfig *structs.AutopilotConfig
// ServerHealthInterval is the frequency with which the health of the
// servers in the cluster will be updated.
ServerHealthInterval time.Duration
// AutopilotInterval is the frequency with which the leader will perform
// autopilot tasks, such as promoting eligible non-voters and removing
// dead servers.
AutopilotInterval time.Duration
}
// CheckVersion is used to check if the ProtocolVersion is valid
@ -353,8 +362,13 @@ func DefaultConfig() *Config {
TLSMinVersion: "tls10",
AutopilotConfig: &structs.AutopilotConfig{
CleanupDeadServers: true,
CleanupDeadServers: true,
LastContactThreshold: 200 * time.Millisecond,
MaxTrailingLogs: 250,
ServerStabilizationTime: 10 * time.Second,
},
ServerHealthInterval: 2 * time.Second,
AutopilotInterval: 10 * time.Second,
}
// Increase our reap interval to 3 days instead of 24h.

View File

@ -159,6 +159,8 @@ func (s *Server) establishLeadership() error {
return err
}
s.startAutopilot()
return nil
}
@ -174,6 +176,9 @@ func (s *Server) revokeLeadership() error {
s.logger.Printf("[ERR] consul: Clearing session timers failed: %v", err)
return err
}
s.stopAutopilot()
return nil
}
@ -598,13 +603,20 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
return err
}
if minRaftProtocol >= 2 && parts.RaftVersion >= 3 {
switch {
case minRaftProtocol >= 3:
addFuture := s.raft.AddNonvoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
return err
}
case minRaftProtocol == 2 && parts.RaftVersion >= 3:
addFuture := s.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
return err
}
} else {
default:
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
@ -612,21 +624,10 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
}
}
state := s.fsm.State()
_, autopilotConf, err := state.AutopilotConfig()
if err != nil {
return err
}
// Look for dead servers to clean up
if autopilotConf.CleanupDeadServers {
for _, member := range s.serfLAN.Members() {
valid, _ := agent.IsConsulServer(member)
if valid && member.Name != m.Name && member.Status == serf.StatusFailed {
s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", member.Name)
go s.serfLAN.RemoveFailedNode(member.Name)
}
}
// Trigger a check to remove dead servers
select {
case s.autopilotRemoveDeadCh <- struct{}{}:
default:
}
return nil

View File

@ -623,76 +623,6 @@ func TestLeader_ReapTombstones(t *testing.T) {
})
}
func TestLeader_CleanupDeadServers(t *testing.T) {
dir1, s1 := testServerDCBootstrap(t, "dc1", true)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
// Kill a non-leader server
s2.Shutdown()
testutil.WaitForResult(func() (bool, error) {
alive := 0
for _, m := range s1.LANMembers() {
if m.Status == serf.StatusAlive {
alive++
}
}
return alive == 2, nil
}, func(err error) {
t.Fatalf("should have 2 alive members")
})
// Bring up and join a new server
dir4, s4 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir4)
defer s4.Shutdown()
if _, err := s4.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
servers[1] = s4
// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
}
func TestLeader_RollRaftServer(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = true

View File

@ -183,3 +183,60 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
}
return nil
}
// ServerHealth is used to get the current health of the servers.
func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs.OperatorHealthReply) error {
// This must be sent to the leader, so we fix the args since we are
// re-using a structure where we don't support all the options.
args.RequireConsistent = true
args.AllowStale = false
if done, err := op.srv.forward("Operator.ServerHealth", args, args, reply); done {
return err
}
// This action requires operator read access.
acl, err := op.srv.resolveToken(args.Token)
if err != nil {
return err
}
if acl != nil && !acl.OperatorRead() {
return permissionDeniedErr
}
// Exit early if the min Raft version is too low
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.LANMembers())
if err != nil {
return fmt.Errorf("error getting server raft protocol versions: %s", err)
}
if minRaftProtocol < 3 {
return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint")
}
var status structs.OperatorHealthReply
future := op.srv.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
healthyCount := 0
servers := future.Configuration().Servers
for _, s := range servers {
health := op.srv.getServerHealth(string(s.ID))
if health != nil {
if health.Healthy {
healthyCount++
}
status.Servers = append(status.Servers, *health)
}
}
status.Healthy = healthyCount == len(servers)
// If we have extra healthy servers, set FailureTolerance
if healthyCount > len(servers)/2+1 {
status.FailureTolerance = healthyCount - (len(servers)/2 + 1)
}
*reply = status
return nil
}

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/raft"
"time"
)
func TestOperator_RaftGetConfiguration(t *testing.T) {
@ -426,3 +427,95 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
t.Fatalf("bad: %#v", config)
}
}
func TestOperator_ServerHealth(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
c.RaftConfig.ProtocolVersion = 3
c.ServerHealthInterval = 100 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
c.RaftConfig.ProtocolVersion = 3
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
c.RaftConfig.ProtocolVersion = 3
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, s1.RPC, "dc1")
testutil.WaitForResult(func() (bool, error) {
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.OperatorHealthReply
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
if err != nil {
return false, fmt.Errorf("err: %v", err)
}
if !reply.Healthy {
return false, fmt.Errorf("bad: %v", reply)
}
if reply.FailureTolerance != 1 {
return false, fmt.Errorf("bad: %v", reply)
}
if len(reply.Servers) != 3 {
return false, fmt.Errorf("bad: %v", reply)
}
if reply.Servers[0].LastContact != 0 {
return false, fmt.Errorf("bad: %v", reply)
}
if reply.Servers[1].LastContact <= 0 {
return false, fmt.Errorf("bad: %v", reply)
}
if reply.Servers[2].LastContact <= 0 {
return false, fmt.Errorf("bad: %v", reply)
}
return true, nil
}, func(err error) {
t.Fatal(err)
})
}
func TestOperator_ServerHealth_UnsupportedRaftVersion(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
c.RaftConfig.ProtocolVersion = 2
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.OperatorHealthReply
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") {
t.Fatalf("bad: %v", err)
}
}

View File

@ -76,6 +76,18 @@ type Server struct {
// aclCache is the non-authoritative ACL cache.
aclCache *aclCache
// autopilotPolicy controls the behavior of Autopilot for certain tasks.
autopilotPolicy AutopilotPolicy
// autopilotRemoveDeadCh is used to trigger a check for dead server removals.
autopilotRemoveDeadCh chan struct{}
// autopilotShutdownCh is used to stop the Autopilot loop.
autopilotShutdownCh chan struct{}
// autopilotWaitGroup is used to block until Autopilot shuts down.
autopilotWaitGroup sync.WaitGroup
// Consul configuration
config *Config
@ -145,6 +157,10 @@ type Server struct {
sessionTimers map[string]*time.Timer
sessionTimersLock sync.Mutex
// serverHealths stores the current view of server healths.
serverHealths map[string]*structs.ServerHealth
serverHealthLock sync.RWMutex
// tombstoneGC is used to track the pending GC invocations
// for the KV tombstones
tombstoneGC *state.TombstoneGC
@ -222,19 +238,22 @@ func NewServer(config *Config) (*Server, error) {
// Create server.
s := &Server{
config: config,
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
localConsuls: make(map[raft.ServerAddress]*agent.Server),
logger: logger,
reconcileCh: make(chan serf.Member, 32),
remoteConsuls: make(map[string][]*agent.Server, 4),
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
tombstoneGC: gc,
shutdownCh: make(chan struct{}),
autopilotRemoveDeadCh: make(chan struct{}),
autopilotShutdownCh: make(chan struct{}),
config: config,
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
localConsuls: make(map[raft.ServerAddress]*agent.Server),
logger: logger,
reconcileCh: make(chan serf.Member, 32),
remoteConsuls: make(map[string][]*agent.Server, 4),
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
tombstoneGC: gc,
shutdownCh: make(chan struct{}),
}
s.autopilotPolicy = &BasicAutopilot{s}
// Initialize the authoritative ACL cache.
s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault)
@ -299,6 +318,9 @@ func NewServer(config *Config) (*Server, error) {
// Start the metrics handlers.
go s.sessionStats()
// Start the server health checking.
go s.serverHealthLoop()
return s, nil
}

View File

@ -1,5 +1,12 @@
package consul
import (
"fmt"
"strconv"
"github.com/hashicorp/consul/consul/structs"
)
// Status endpoint is used to check on server status
type Status struct {
server *Server
@ -33,3 +40,21 @@ func (s *Status) Peers(args struct{}, reply *[]string) error {
}
return nil
}
// Used by Autopilot to query the raft stats of the local server.
func (s *Status) RaftStats(args struct{}, reply *structs.ServerStats) error {
stats := s.server.raft.Stats()
var err error
reply.LastContact = stats["last_contact"]
reply.LastIndex, err = strconv.ParseUint(stats["last_log_index"], 10, 64)
if err != nil {
return fmt.Errorf("error parsing server's last_log_index value: %s", err)
}
reply.LastTerm, err = strconv.ParseUint(stats["last_log_term"], 10, 64)
if err != nil {
return fmt.Errorf("error parsing server's last_log_term value: %s", err)
}
return nil
}

View File

@ -1,15 +1,32 @@
package structs
import (
"time"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
// AutopilotConfig holds the Autopilot configuration for a cluster.
type AutopilotConfig struct {
// CleanupDeadServers controls whether to remove dead servers when a new
// server is added to the Raft peers
// server is added to the Raft peers.
CleanupDeadServers bool
// RaftIndex stores the create/modify indexes of this configuration
// LastContactThreshold is the limit on the amount of time a server can go
// without leader contact before being considered unhealthy.
LastContactThreshold time.Duration
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
// be behind before being considered unhealthy.
MaxTrailingLogs uint64
// ServerStabilizationTime is the minimum amount of time a server must be
// in a stable, healthy state before it can be added to the cluster. Only
// applicable with Raft protocol version 3 or higher.
ServerStabilizationTime time.Duration
// RaftIndex stores the create/modify indexes of this configuration.
RaftIndex
}
@ -85,3 +102,96 @@ type AutopilotSetConfigRequest struct {
func (op *AutopilotSetConfigRequest) RequestDatacenter() string {
return op.Datacenter
}
// ServerHealth is the health (from the leader's point of view) of a server.
type ServerHealth struct {
// ID is the raft ID of the server.
ID string
// Name is the node name of the server.
Name string
// The status of the SerfHealth check for the server.
SerfStatus serf.MemberStatus
// LastContact is the time since this node's last contact with the leader.
LastContact time.Duration
// LastTerm is the highest leader term this server has a record of in its Raft log.
LastTerm uint64
// LastIndex is the last log index this server has a record of in its Raft log.
LastIndex uint64
// Healthy is whether or not the server is healthy according to the current
// Autopilot config.
Healthy bool
// StableSince is the last time this server's Healthy value changed.
StableSince time.Time
}
// IsHealthy determines whether this ServerHealth is considered healthy
// based on the given Autopilot config
func (h *ServerHealth) IsHealthy(lastTerm uint64, lastIndex uint64, autopilotConf *AutopilotConfig) bool {
if h.SerfStatus != serf.StatusAlive {
return false
}
if h.LastContact > autopilotConf.LastContactThreshold || h.LastContact < 0 {
return false
}
if h.LastTerm != lastTerm {
return false
}
if lastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < lastIndex-autopilotConf.MaxTrailingLogs {
return false
}
return true
}
// IsStable returns true if the ServerHealth is in a stable, passing state
// according to the given AutopilotConfig
func (h *ServerHealth) IsStable(now time.Time, conf *AutopilotConfig) bool {
if h == nil {
return false
}
if !h.Healthy {
return false
}
if now.Sub(h.StableSince) < conf.ServerStabilizationTime {
return false
}
return true
}
// ServerStats holds miscellaneous Raft metrics for a server
type ServerStats struct {
// LastContact is the time since this node's last contact with the leader.
LastContact string
// LastTerm is the highest leader term this server has a record of in its Raft log.
LastTerm uint64
// LastIndex is the last log index this server has a record of in its Raft log.
LastIndex uint64
}
// OperatorHealthReply is a representation of the overall health of the cluster
type OperatorHealthReply struct {
// Healthy is true if all the servers in the cluster are healthy.
Healthy bool
// FailureTolerance is the number of healthy servers that could be lost without
// an outage occurring.
FailureTolerance int
// Servers holds the health of each server.
Servers []ServerHealth
}

View File

@ -0,0 +1,94 @@
package structs
import (
"testing"
"time"
"github.com/hashicorp/serf/serf"
)
func TestServerHealth_IsHealthy(t *testing.T) {
cases := []struct {
health ServerHealth
lastTerm uint64
lastIndex uint64
conf AutopilotConfig
expected bool
}{
// Healthy server, all values within allowed limits
{
health: ServerHealth{SerfStatus: serf.StatusAlive, LastTerm: 1, LastIndex: 0},
lastTerm: 1,
lastIndex: 10,
conf: AutopilotConfig{MaxTrailingLogs: 20},
expected: true,
},
// Serf status failed
{
health: ServerHealth{SerfStatus: serf.StatusFailed},
expected: false,
},
// Old value for lastTerm
{
health: ServerHealth{SerfStatus: serf.StatusAlive, LastTerm: 0},
lastTerm: 1,
expected: false,
},
// Too far behind on logs
{
health: ServerHealth{SerfStatus: serf.StatusAlive, LastIndex: 0},
lastIndex: 10,
conf: AutopilotConfig{MaxTrailingLogs: 5},
expected: false,
},
}
for index, tc := range cases {
actual := tc.health.IsHealthy(tc.lastTerm, tc.lastIndex, &tc.conf)
if actual != tc.expected {
t.Fatalf("bad value for case %d: %v", index, actual)
}
}
}
func TestServerHealth_IsStable(t *testing.T) {
start := time.Now()
cases := []struct {
health *ServerHealth
now time.Time
conf AutopilotConfig
expected bool
}{
// Healthy server, all values within allowed limits
{
health: &ServerHealth{Healthy: true, StableSince: start},
now: start.Add(15 * time.Second),
conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second},
expected: true,
},
// Unhealthy server
{
health: &ServerHealth{Healthy: false},
expected: false,
},
// Healthy server, hasn't reached stabilization time
{
health: &ServerHealth{Healthy: true, StableSince: start},
now: start.Add(5 * time.Second),
conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second},
expected: false,
},
// Nil struct
{
health: nil,
expected: false,
},
}
for index, tc := range cases {
actual := tc.health.IsStable(tc.now, &tc.conf)
if actual != tc.expected {
t.Fatalf("bad value for case %d: %v", index, actual)
}
}
}

View File

@ -65,6 +65,7 @@ type TestServerConfig struct {
Bind string `json:"bind_addr,omitempty"`
Addresses *TestAddressConfig `json:"addresses,omitempty"`
Ports *TestPortConfig `json:"ports,omitempty"`
RaftProtocol int `json:"raft_protocol,omitempty"`
ACLMasterToken string `json:"acl_master_token,omitempty"`
ACLDatacenter string `json:"acl_datacenter,omitempty"`
ACLDefaultPolicy string `json:"acl_default_policy,omitempty"`

View File

@ -29,6 +29,7 @@ The following endpoints are supported:
* [`/v1/operator/raft/peer`](#raft-peer): Operates on Raft peers
* [`/v1/operator/keyring`](#keyring): Operates on gossip keyring
* [`/v1/operator/autopilot/configuration`](#autopilot-configuration): Operates on the Autopilot configuration
* [`/v1/operator/autopilot/health`](#autopilot-health): Returns the health of the servers
Not all endpoints support blocking queries and all consistency modes,
see details in the sections below.
@ -288,13 +289,16 @@ A JSON body is returned that looks like this:
```javascript
{
"CleanupDeadServers": true,
"LastContactThreshold": "200ms",
"MaxTrailingLogs": 250,
"ServerStabilizationTime": "10s",
"CreateIndex": 4,
"ModifyIndex": 4
}
```
`CleanupDeadServers` is whether dead servers should be removed automatically when
a new server is added to the cluster.
For more information about the Autopilot configuration options, see the agent configuration section
[here](/docs/agent/options.html#autopilot).
#### PUT Method
@ -313,11 +317,89 @@ body must look like:
```javascript
{
"CleanupDeadServers": true
"CleanupDeadServers": true,
"LastContactThreshold": "200ms",
"MaxTrailingLogs": 250,
"ServerStabilizationTime": "10s",
"CreateIndex": 4,
"ModifyIndex": 4
}
```
`CleanupDeadServers` is whether dead servers should be removed automatically when
a new server is added to the cluster.
For more information about the Autopilot configuration options, see the agent configuration section
[here](/docs/agent/options.html#autopilot).
The return code will indicate success or failure.
### <a name="autopilot-health"></a> /v1/operator/autopilot/health
Available in Consul 0.8.0 and later, the autopilot health endpoint supports the
`GET` method.
This endpoint supports the use of ACL tokens using either the `X-CONSUL-TOKEN`
header or the `?token=` query parameter.
By default, the datacenter of the agent is queried; however, the `dc` can be
provided using the `?dc=` query parameter.
#### GET Method
When using the `GET` method, the request will be forwarded to the cluster
leader to retrieve its latest Autopilot configuration.
If ACLs are enabled, the client will need to supply an ACL Token with
[`operator`](/docs/internals/acl.html#operator) read privileges.
A JSON body is returned that looks like this:
```javascript
{
"Healthy": true,
"FailureTolerance": 0,
"Servers": [
{
"ID": "e349749b-3303-3ddf-959c-b5885a0e1f6e",
"Name": "node1",
"SerfStatus": "alive",
"LastContact": "0s",
"LastTerm": 2,
"LastIndex": 46,
"Healthy": true,
"StableSince": "2017-03-06T22:07:51Z"
},
{
"ID": "e36ee410-cc3c-0a0c-c724-63817ab30303",
"Name": "node2",
"SerfStatus": "alive",
"LastContact": "27.291304ms",
"LastTerm": 2,
"LastIndex": 46,
"Healthy": true,
"StableSince": "2017-03-06T22:18:26Z"
}
]
}
```
`Healthy` is whether all the servers are currently heathly.
`FailureTolerance` is the number of redundant healthy servers that could be fail
without causing an outage (this would be 2 in a healthy cluster of 5 servers).
The `Servers` list holds detailed health information on each server:
- `ID` is the Raft ID of the server.
- `Name` is the node name of the server.
- `SerfStatus` is the SerfHealth check status for the server.
- `LastContact` is the time elapsed since this server's last contact with the leader.
- `LastTerm` is the server's last known Raft leader term.
- `LastIndex` is the index of the server's last committed Raft log entry.
- `Healthy` is whether the server is healthy according to the current Autopilot configuration.
- `StableSince` is the time this server has been in its current `Healthy` state.

View File

@ -311,6 +311,11 @@ will exit with an error at startup.
use. This defaults to the latest version. This should be set only when [upgrading](/docs/upgrading.html).
You can view the protocol versions supported by Consul by running `consul -v`.
* <a name="_raft_protocol"></a><a href="#_raft_protocol">`-raft-protocol`</a> - This controls the internal
version of the Raft consensus protocol used for server communications. This defaults to 2 but must
be set to 3 in order to gain access to Autopilot features, with the exception of
[`cleanup_dead_servers`](#cleanup_dead_servers).
* <a name="_recursor"></a><a href="#_recursor">`-recursor`</a> - Specifies the address of an upstream DNS
server. This option may be provided multiple times, and is functionally
equivalent to the [`recursors` configuration option](#recursors).
@ -557,7 +562,21 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
The following sub-keys are available:
* <a name="cleanup_dead_servers"></a><a href="#cleanup_dead_servers">`cleanup_dead_servers`</a> - This controls
the automatic removal of dead server nodes whenever a new server is added to the cluster. Defaults to `true`.
the automatic removal of dead server nodes periodically and whenever a new server is added to the cluster.
Defaults to `true`.
* <a name="last_contact_threshold"></a><a href="#last_contact_threshold">`last_contact_threshold`</a> - Controls
the maximum amount of time a server can go without contact from the leader before being considered unhealthy.
Must be a duration value such as `10s`. Defaults to `200ms`.
* <a name="max_trailing_threshold"></a><a href="#max_trailing_threshold">`max_trailing_threshold`</a> - Controls
the maximum number of log entries that a server can trail the leader by before being considered unhealthy. Defaults
to 250.
* <a name="server_stabilization_time"></a><a href="#server_stabilization_time">`server_stabilization_time`</a> -
Controls the minimum amount of time a server must be stable in the 'healthy' state before being added to the
cluster. Only takes effect if all servers are running Raft protocol version 3 or higher. Must be a duration value
such as `30s`. Defaults to `10s`.
* <a name="bootstrap"></a><a href="#bootstrap">`bootstrap`</a> Equivalent to the
[`-bootstrap` command-line flag](#_bootstrap).
@ -764,11 +783,9 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
* <a name="protocol"></a><a href="#protocol">`protocol`</a> Equivalent to the
[`-protocol` command-line flag](#_protocol).
* <a name="raft_protocol"></a><a href="#raft_protocol">`raft_protocol`</a> - This controls the internal
version of the Raft consensus protocol used for server communications. This defaults to 2 but must
be set to 3 in order to gain access to other [Autopilot](#autopilot) features, with the exception of
[`cleanup_dead_servers`](#cleanup_dead_servers).
* <a name="raft_protocol"></a><a href="#raft_protocol">`raft_protocol`</a> Equivalent to the
[`-raft-protocol` command-line flag](#_raft_protocol).
* <a name="reap"></a><a href="#reap">`reap`</a> This controls Consul's automatic reaping of child processes,
which is useful if Consul is running as PID 1 in a Docker container. If this isn't specified, then Consul will
automatically reap child processes if it detects it is running as PID 1. If this is set to true or false, then

View File

@ -40,6 +40,9 @@ The output looks like this:
```
CleanupDeadServers = true
LastContactThreshold = 200ms
MaxTrailingLogs = 250
ServerStabilizationTime = 10s
```
## set-config
@ -58,6 +61,16 @@ Usage: `consul operator autopilot set-config [options]`
* `-cleanup-dead-servers` - Specifies whether to enable automatic removal of dead servers
upon the successful joining of new servers to the cluster. Must be one of `[true|false]`.
* `last-contact-threshold` - Controls the maximum amount of time a server can go without contact
from the leader before being considered unhealthy. Must be a duration value such as `200ms`.
* `max-trailing-logs` - Controls the maximum number of log entries that a server can trail
the leader by before being considered unhealthy.
* `server-stabilization-time` - Controls the minimum amount of time a server must be stable in
the 'healthy' state before being added to the cluster. Only takes effect if all servers are
running Raft protocol version 3 or higher. Must be a duration value such as `10s`.
The output looks like this:
```