Add autopilot functionality based on Consul's autopilot

This commit is contained in:
Kyle Havlovitz 2017-12-18 13:16:23 -08:00
parent 2455a205de
commit 1c07066064
45 changed files with 2717 additions and 145 deletions

228
api/operator_autopilot.go Normal file
View File

@ -0,0 +1,228 @@
package api
import (
"bytes"
"fmt"
"io"
"strconv"
"strings"
"time"
)
// AutopilotConfiguration is used for querying/setting the Autopilot configuration.
// Autopilot helps manage operator tasks related to Consul servers like removing
// failed servers from the Raft quorum.
type AutopilotConfiguration struct {
// CleanupDeadServers controls whether to remove dead servers from the Raft
// peer list when a new server joins
CleanupDeadServers bool
// LastContactThreshold is the limit on the amount of time a server can go
// without leader contact before being considered unhealthy.
LastContactThreshold *ReadableDuration
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
// be behind before being considered unhealthy.
MaxTrailingLogs uint64
// ServerStabilizationTime is the minimum amount of time a server must be
// in a stable, healthy state before it can be added to the cluster. Only
// applicable with Raft protocol version 3 or higher.
ServerStabilizationTime *ReadableDuration
// (Enterprise-only) RedundancyZoneTag is the node tag to use for separating
// servers into zones for redundancy. If left blank, this feature will be disabled.
RedundancyZoneTag string
// (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration
// strategy of waiting until enough newer-versioned servers have been added to the
// cluster before promoting them to voters.
DisableUpgradeMigration bool
// (Enterprise-only) UpgradeVersionTag is the node tag to use for version info when
// performing upgrade migrations. If left blank, the Consul version will be used.
UpgradeVersionTag string
// CreateIndex holds the index corresponding the creation of this configuration.
// This is a read-only field.
CreateIndex uint64
// ModifyIndex will be set to the index of the last update when retrieving the
// Autopilot configuration. Resubmitting a configuration with
// AutopilotCASConfiguration will perform a check-and-set operation which ensures
// there hasn't been a subsequent update since the configuration was retrieved.
ModifyIndex uint64
}
// ServerHealth is the health (from the leader's point of view) of a server.
type ServerHealth struct {
// ID is the raft ID of the server.
ID string
// Name is the node name of the server.
Name string
// Address is the address of the server.
Address string
// The status of the SerfHealth check for the server.
SerfStatus string
// Version is the Consul version of the server.
Version string
// Leader is whether this server is currently the leader.
Leader bool
// LastContact is the time since this node's last contact with the leader.
LastContact *ReadableDuration
// LastTerm is the highest leader term this server has a record of in its Raft log.
LastTerm uint64
// LastIndex is the last log index this server has a record of in its Raft log.
LastIndex uint64
// Healthy is whether or not the server is healthy according to the current
// Autopilot config.
Healthy bool
// Voter is whether this is a voting server.
Voter bool
// StableSince is the last time this server's Healthy value changed.
StableSince time.Time
}
// OperatorHealthReply is a representation of the overall health of the cluster
type OperatorHealthReply struct {
// Healthy is true if all the servers in the cluster are healthy.
Healthy bool
// FailureTolerance is the number of healthy servers that could be lost without
// an outage occurring.
FailureTolerance int
// Servers holds the health of each server.
Servers []ServerHealth
}
// ReadableDuration is a duration type that is serialized to JSON in human readable format.
type ReadableDuration time.Duration
func NewReadableDuration(dur time.Duration) *ReadableDuration {
d := ReadableDuration(dur)
return &d
}
func (d *ReadableDuration) String() string {
return d.Duration().String()
}
func (d *ReadableDuration) Duration() time.Duration {
if d == nil {
return time.Duration(0)
}
return time.Duration(*d)
}
func (d *ReadableDuration) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, d.Duration().String())), nil
}
func (d *ReadableDuration) UnmarshalJSON(raw []byte) error {
if d == nil {
return fmt.Errorf("cannot unmarshal to nil pointer")
}
str := string(raw)
if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' {
return fmt.Errorf("must be enclosed with quotes: %s", str)
}
dur, err := time.ParseDuration(str[1 : len(str)-1])
if err != nil {
return err
}
*d = ReadableDuration(dur)
return nil
}
// AutopilotGetConfiguration is used to query the current Autopilot configuration.
func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfiguration, error) {
r, err := op.c.newRequest("GET", "/v1/operator/autopilot/configuration")
r.setQueryOptions(q)
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out AutopilotConfiguration
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return &out, nil
}
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
func (op *Operator) AutopilotSetConfiguration(conf *AutopilotConfiguration, q *WriteOptions) error {
r, err := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
if err != nil {
return err
}
r.setWriteOptions(q)
r.obj = conf
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// AutopilotCASConfiguration is used to perform a Check-And-Set update on the
// Autopilot configuration. The ModifyIndex value will be respected. Returns
// true on success or false on failures.
func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *WriteOptions) (bool, error) {
r, err := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
if err != nil {
return false, err
}
r.setWriteOptions(q)
r.params.Set("cas", strconv.FormatUint(conf.ModifyIndex, 10))
r.obj = conf
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return false, err
}
defer resp.Body.Close()
var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, fmt.Errorf("Failed to read response: %v", err)
}
res := strings.Contains(buf.String(), "true")
return res, nil
}
// AutopilotServerHealth
func (op *Operator) AutopilotServerHealth(q *QueryOptions) (*OperatorHealthReply, error) {
r, err := op.c.newRequest("GET", "/v1/operator/autopilot/health")
if err != nil {
return nil, err
}
r.setQueryOptions(q)
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out OperatorHealthReply
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return &out, nil
}

View File

@ -0,0 +1,107 @@
package api
import (
"testing"
"fmt"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/nomad/testutil"
)
func TestAPI_OperatorAutopilotGetSetConfiguration(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
defer s.Stop()
operator := c.Operator()
config, err := operator.AutopilotGetConfiguration(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if !config.CleanupDeadServers {
t.Fatalf("bad: %v", config)
}
// Change a config setting
newConf := &AutopilotConfiguration{CleanupDeadServers: false}
if err := operator.AutopilotSetConfiguration(newConf, nil); err != nil {
t.Fatalf("err: %v", err)
}
config, err = operator.AutopilotGetConfiguration(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if config.CleanupDeadServers {
t.Fatalf("bad: %v", config)
}
}
func TestAPI_OperatorAutopilotCASConfiguration(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
defer s.Stop()
operator := c.Operator()
config, err := operator.AutopilotGetConfiguration(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if !config.CleanupDeadServers {
t.Fatalf("bad: %v", config)
}
// Pass an invalid ModifyIndex
{
newConf := &AutopilotConfiguration{
CleanupDeadServers: false,
ModifyIndex: config.ModifyIndex - 1,
}
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp {
t.Fatalf("bad: %v", resp)
}
}
// Pass a valid ModifyIndex
{
newConf := &AutopilotConfiguration{
CleanupDeadServers: false,
ModifyIndex: config.ModifyIndex,
}
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if !resp {
t.Fatalf("bad: %v", resp)
}
}
}
func TestAPI_OperatorAutopilotServerHealth(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.AdvertiseAddrs.RPC = "127.0.0.1"
c.Server.RaftProtocol = 3
})
defer s.Stop()
operator := c.Operator()
retry.Run(t, func(r *retry.R) {
out, err := operator.AutopilotServerHealth(nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if len(out.Servers) != 1 ||
!out.Servers[0].Healthy ||
out.Servers[0].Name != fmt.Sprintf("%s.global", s.Config.NodeName) {
r.Fatalf("bad: %v", out)
}
})
}

View File

@ -160,6 +160,32 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi
if agentConfig.Sentinel != nil {
conf.SentinelConfig = agentConfig.Sentinel
}
if agentConfig.Server.NonVotingServer {
conf.NonVoter = true
}
if agentConfig.Autopilot != nil {
if agentConfig.Autopilot.CleanupDeadServers != nil {
conf.AutopilotConfig.CleanupDeadServers = *agentConfig.Autopilot.CleanupDeadServers
}
if agentConfig.Autopilot.ServerStabilizationTime != 0 {
conf.AutopilotConfig.ServerStabilizationTime = agentConfig.Autopilot.ServerStabilizationTime
}
if agentConfig.Autopilot.LastContactThreshold != 0 {
conf.AutopilotConfig.LastContactThreshold = agentConfig.Autopilot.LastContactThreshold
}
if agentConfig.Autopilot.MaxTrailingLogs != 0 {
conf.AutopilotConfig.MaxTrailingLogs = uint64(agentConfig.Autopilot.MaxTrailingLogs)
}
if agentConfig.Autopilot.RedundancyZoneTag != "" {
conf.AutopilotConfig.RedundancyZoneTag = agentConfig.Autopilot.RedundancyZoneTag
}
if agentConfig.Autopilot.DisableUpgradeMigration != nil {
conf.AutopilotConfig.DisableUpgradeMigration = *agentConfig.Autopilot.DisableUpgradeMigration
}
if agentConfig.Autopilot.UpgradeVersionTag != "" {
conf.AutopilotConfig.UpgradeVersionTag = agentConfig.Autopilot.UpgradeVersionTag
}
}
// Set up the bind addresses
rpcAddr, err := net.ResolveTCPAddr("tcp", agentConfig.normalizedAddrs.RPC)

View File

@ -67,6 +67,7 @@ server {
bootstrap_expect = 5
data_dir = "/tmp/data"
protocol_version = 3
raft_protocol = 3
num_schedulers = 2
enabled_schedulers = ["test"]
node_gc_threshold = "12h"
@ -81,6 +82,7 @@ server {
retry_max = 3
retry_interval = "15s"
rejoin_after_leave = true
non_voting_server = true
encrypt = "abc"
}
acl {
@ -159,3 +161,12 @@ sentinel {
args = ["x", "y", "z"]
}
}
autopilot {
cleanup_dead_servers = true
disable_upgrade_migration = true
last_contact_threshold = "12705s"
max_trailing_logs = 17849
redundancy_zone_tag = "foo"
server_stabilization_time = "23057s"
upgrade_version_tag = "bar"
}

View File

@ -130,6 +130,9 @@ type Config struct {
// Sentinel holds sentinel related settings
Sentinel *config.SentinelConfig `mapstructure:"sentinel"`
// Autopilot contains the configuration for Autopilot behavior.
Autopilot *config.AutopilotConfig `mapstructure:"autopilot"`
}
// ClientConfig is configuration specific to the client mode
@ -327,6 +330,10 @@ type ServerConfig struct {
// true, we ignore the leave, and rejoin the cluster on start.
RejoinAfterLeave bool `mapstructure:"rejoin_after_leave"`
// NonVotingServer is whether this server will act as a non-voting member
// of the cluster to help provide read scalability. (Enterprise-only)
NonVotingServer bool `mapstructure:"non_voting_server"`
// Encryption key to use for the Serf communication
EncryptKey string `mapstructure:"encrypt" json:"-"`
}
@ -604,6 +611,7 @@ func DefaultConfig() *Config {
TLSConfig: &config.TLSConfig{},
Sentinel: &config.SentinelConfig{},
Version: version.GetVersion(),
Autopilot: config.DefaultAutopilotConfig(),
}
}
@ -762,6 +770,13 @@ func (c *Config) Merge(b *Config) *Config {
result.Sentinel = result.Sentinel.Merge(b.Sentinel)
}
if result.Autopilot == nil && b.Autopilot != nil {
autopilot := *b.Autopilot
result.Autopilot = &autopilot
} else if b.Autopilot != nil {
result.Autopilot = result.Autopilot.Merge(b.Autopilot)
}
// Merge config files lists
result.Files = append(result.Files, b.Files...)
@ -1016,6 +1031,9 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.RejoinAfterLeave {
result.RejoinAfterLeave = true
}
if b.NonVotingServer {
result.NonVotingServer = true
}
if b.EncryptKey != "" {
result.EncryptKey = b.EncryptKey
}

View File

@ -98,6 +98,7 @@ func parseConfig(result *Config, list *ast.ObjectList) error {
"http_api_response_headers",
"acl",
"sentinel",
"autopilot",
}
if err := helper.CheckHCLKeys(list, valid); err != nil {
return multierror.Prefix(err, "config:")
@ -121,6 +122,7 @@ func parseConfig(result *Config, list *ast.ObjectList) error {
delete(m, "http_api_response_headers")
delete(m, "acl")
delete(m, "sentinel")
delete(m, "autopilot")
// Decode the rest
if err := mapstructure.WeakDecode(m, result); err != nil {
@ -204,6 +206,13 @@ func parseConfig(result *Config, list *ast.ObjectList) error {
}
}
// Parse Autopilot config
if o := list.Filter("autopilot"); len(o.Items) > 0 {
if err := parseAutopilot(&result.Autopilot, o); err != nil {
return multierror.Prefix(err, "autopilot->")
}
}
// Parse out http_api_response_headers fields. These are in HCL as a list so
// we need to iterate over them and merge them.
if headersO := list.Filter("http_api_response_headers"); len(headersO.Items) > 0 {
@ -509,6 +518,7 @@ func parseServer(result **ServerConfig, list *ast.ObjectList) error {
"bootstrap_expect",
"data_dir",
"protocol_version",
"raft_protocol",
"num_schedulers",
"enabled_schedulers",
"node_gc_threshold",
@ -525,6 +535,7 @@ func parseServer(result **ServerConfig, list *ast.ObjectList) error {
"rejoin_after_leave",
"encrypt",
"authoritative_region",
"non_voting_server",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return err
@ -838,3 +849,49 @@ func parseSentinel(result **config.SentinelConfig, list *ast.ObjectList) error {
*result = &config
return nil
}
func parseAutopilot(result **config.AutopilotConfig, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'autopilot' block allowed")
}
// Get our Autopilot object
listVal := list.Items[0].Val
// Check for invalid keys
valid := []string{
"cleanup_dead_servers",
"server_stabilization_time",
"last_contact_threshold",
"max_trailing_logs",
"redundancy_zone_tag",
"disable_upgrade_migration",
"upgrade_version_tag",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, listVal); err != nil {
return err
}
autopilotConfig := config.DefaultAutopilotConfig()
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &autopilotConfig,
})
if err != nil {
return err
}
if err := dec.Decode(m); err != nil {
return err
}
*result = autopilotConfig
return nil
}

View File

@ -88,6 +88,7 @@ func TestConfig_Parse(t *testing.T) {
BootstrapExpect: 5,
DataDir: "/tmp/data",
ProtocolVersion: 3,
RaftProtocol: 3,
NumSchedulers: 2,
EnabledSchedulers: []string{"test"},
NodeGCThreshold: "12h",
@ -102,6 +103,7 @@ func TestConfig_Parse(t *testing.T) {
RetryInterval: "15s",
RejoinAfterLeave: true,
RetryMaxAttempts: 3,
NonVotingServer: true,
EncryptKey: "abc",
},
ACL: &ACLConfig{
@ -186,6 +188,15 @@ func TestConfig_Parse(t *testing.T) {
},
},
},
Autopilot: &config.AutopilotConfig{
CleanupDeadServers: &trueValue,
ServerStabilizationTime: 23057 * time.Second,
LastContactThreshold: 12705 * time.Second,
MaxTrailingLogs: 17849,
RedundancyZoneTag: "foo",
DisableUpgradeMigration: &trueValue,
UpgradeVersionTag: "bar",
},
},
false,
},

View File

@ -33,6 +33,7 @@ func TestConfig_Merge(t *testing.T) {
Vault: &config.VaultConfig{},
Consul: &config.ConsulConfig{},
Sentinel: &config.SentinelConfig{},
Autopilot: &config.AutopilotConfig{},
}
c2 := &Config{
@ -98,6 +99,7 @@ func TestConfig_Merge(t *testing.T) {
BootstrapExpect: 1,
DataDir: "/tmp/data1",
ProtocolVersion: 1,
RaftProtocol: 1,
NumSchedulers: 1,
NodeGCThreshold: "1h",
HeartbeatGrace: 30 * time.Second,
@ -156,6 +158,15 @@ func TestConfig_Merge(t *testing.T) {
ClientAutoJoin: &falseValue,
ChecksUseAdvertise: &falseValue,
},
Autopilot: &config.AutopilotConfig{
CleanupDeadServers: &falseValue,
ServerStabilizationTime: 1 * time.Second,
LastContactThreshold: 1 * time.Second,
MaxTrailingLogs: 1,
RedundancyZoneTag: "1",
DisableUpgradeMigration: &falseValue,
UpgradeVersionTag: "1",
},
}
c3 := &Config{
@ -246,6 +257,7 @@ func TestConfig_Merge(t *testing.T) {
RetryJoin: []string{"1.1.1.1"},
RetryInterval: "10s",
retryInterval: time.Second * 10,
NonVotingServer: true,
},
ACL: &ACLConfig{
Enabled: true,
@ -309,6 +321,15 @@ func TestConfig_Merge(t *testing.T) {
},
},
},
Autopilot: &config.AutopilotConfig{
CleanupDeadServers: &trueValue,
ServerStabilizationTime: 2 * time.Second,
LastContactThreshold: 2 * time.Second,
MaxTrailingLogs: 2,
RedundancyZoneTag: "2",
DisableUpgradeMigration: &trueValue,
UpgradeVersionTag: "2",
},
}
result := c0.Merge(c1)

View File

@ -17,6 +17,7 @@ import (
assetfs "github.com/elazarl/go-bindata-assetfs"
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
"github.com/rs/cors"
"github.com/ugorji/go/codec"
)
@ -182,7 +183,9 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/search", s.wrap(s.SearchRequest))
s.mux.HandleFunc("/v1/operator/", s.wrap(s.OperatorRequest))
s.mux.HandleFunc("/v1/operator/raft/", s.wrap(s.OperatorRequest))
s.mux.HandleFunc("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration))
s.mux.HandleFunc("/v1/operator/autopilot/health", s.wrap(s.OperatorServerHealth))
s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries))
@ -331,6 +334,24 @@ func decodeBody(req *http.Request, out interface{}) error {
return dec.Decode(&out)
}
// decodeBodyFunc is used to decode a JSON request body invoking
// a given callback function
func decodeBodyFunc(req *http.Request, out interface{}, cb func(interface{}) error) error {
var raw interface{}
dec := json.NewDecoder(req.Body)
if err := dec.Decode(&raw); err != nil {
return err
}
// Invoke the callback prior to decode
if cb != nil {
if err := cb(raw); err != nil {
return err
}
}
return mapstructure.Decode(raw, out)
}
// setIndex is used to set the index response header
func setIndex(resp http.ResponseWriter, index uint64) {
resp.Header().Set("X-Nomad-Index", strconv.FormatUint(index, 10))

View File

@ -4,6 +4,12 @@ import (
"net/http"
"strings"
"fmt"
"strconv"
"time"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
)
@ -67,3 +73,185 @@ func (s *HTTPServer) OperatorRaftPeer(resp http.ResponseWriter, req *http.Reques
}
return nil, nil
}
// OperatorAutopilotConfiguration is used to inspect the current Autopilot configuration.
// This supports the stale query mode in case the cluster doesn't have a leader.
func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Switch on the method
switch req.Method {
case "GET":
var args structs.GenericRequest
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
return nil, nil
}
var reply autopilot.Config
if err := s.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
return nil, err
}
out := api.AutopilotConfiguration{
CleanupDeadServers: reply.CleanupDeadServers,
LastContactThreshold: api.NewReadableDuration(reply.LastContactThreshold),
MaxTrailingLogs: reply.MaxTrailingLogs,
ServerStabilizationTime: api.NewReadableDuration(reply.ServerStabilizationTime),
RedundancyZoneTag: reply.RedundancyZoneTag,
DisableUpgradeMigration: reply.DisableUpgradeMigration,
UpgradeVersionTag: reply.UpgradeVersionTag,
CreateIndex: reply.CreateIndex,
ModifyIndex: reply.ModifyIndex,
}
return out, nil
case "PUT":
var args structs.AutopilotSetConfigRequest
s.parseRegion(req, &args.Region)
s.parseToken(req, &args.AuthToken)
var conf api.AutopilotConfiguration
durations := NewDurationFixer("lastcontactthreshold", "serverstabilizationtime")
if err := decodeBodyFunc(req, &conf, durations.FixupDurations); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Error parsing autopilot config: %v", err)
return nil, nil
}
args.Config = autopilot.Config{
CleanupDeadServers: conf.CleanupDeadServers,
LastContactThreshold: conf.LastContactThreshold.Duration(),
MaxTrailingLogs: conf.MaxTrailingLogs,
ServerStabilizationTime: conf.ServerStabilizationTime.Duration(),
RedundancyZoneTag: conf.RedundancyZoneTag,
DisableUpgradeMigration: conf.DisableUpgradeMigration,
UpgradeVersionTag: conf.UpgradeVersionTag,
}
// Check for cas value
params := req.URL.Query()
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
if err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Error parsing cas value: %v", err)
return nil, nil
}
args.Config.ModifyIndex = casVal
args.CAS = true
}
var reply bool
if err := s.agent.RPC("Operator.AutopilotSetConfiguration", &args, &reply); err != nil {
return nil, err
}
// Only use the out value if this was a CAS
if !args.CAS {
return true, nil
}
return reply, nil
default:
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
}
// OperatorServerHealth is used to get the health of the servers in the local DC
func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
var args structs.GenericRequest
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
return nil, nil
}
var reply autopilot.OperatorHealthReply
if err := s.agent.RPC("Operator.ServerHealth", &args, &reply); err != nil {
return nil, err
}
// Reply with status 429 if something is unhealthy
if !reply.Healthy {
resp.WriteHeader(http.StatusTooManyRequests)
}
out := &api.OperatorHealthReply{
Healthy: reply.Healthy,
FailureTolerance: reply.FailureTolerance,
}
for _, server := range reply.Servers {
out.Servers = append(out.Servers, api.ServerHealth{
ID: server.ID,
Name: server.Name,
Address: server.Address,
Version: server.Version,
Leader: server.Leader,
SerfStatus: server.SerfStatus.String(),
LastContact: api.NewReadableDuration(server.LastContact),
LastTerm: server.LastTerm,
LastIndex: server.LastIndex,
Healthy: server.Healthy,
Voter: server.Voter,
StableSince: server.StableSince.Round(time.Second).UTC(),
})
}
return out, nil
}
type durationFixer map[string]bool
func NewDurationFixer(fields ...string) durationFixer {
d := make(map[string]bool)
for _, field := range fields {
d[field] = true
}
return d
}
// FixupDurations is used to handle parsing any field names in the map to time.Durations
func (d durationFixer) FixupDurations(raw interface{}) error {
rawMap, ok := raw.(map[string]interface{})
if !ok {
return nil
}
for key, val := range rawMap {
switch val.(type) {
case map[string]interface{}:
if err := d.FixupDurations(val); err != nil {
return err
}
case []interface{}:
for _, v := range val.([]interface{}) {
if err := d.FixupDurations(v); err != nil {
return err
}
}
case []map[string]interface{}:
for _, v := range val.([]map[string]interface{}) {
if err := d.FixupDurations(v); err != nil {
return err
}
}
default:
if d[strings.ToLower(key)] {
// Convert a string value into an integer
if vStr, ok := val.(string); ok {
dur, err := time.ParseDuration(vStr)
if err != nil {
return err
}
rawMap[key] = dur
}
}
}
}
return nil
}

View File

@ -7,7 +7,15 @@ import (
"strings"
"testing"
"time"
"fmt"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/pascaldekloe/goe/verify"
)
func TestHTTP_OperatorRaftConfiguration(t *testing.T) {
@ -58,3 +66,227 @@ func TestHTTP_OperatorRaftPeer(t *testing.T) {
}
})
}
func TestOperator_AutopilotGetConfiguration(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
body := bytes.NewBuffer(nil)
req, _ := http.NewRequest("GET", "/v1/operator/autopilot/configuration", body)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorAutopilotConfiguration(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
out, ok := obj.(api.AutopilotConfiguration)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if !out.CleanupDeadServers {
t.Fatalf("bad: %#v", out)
}
})
}
func TestOperator_AutopilotSetConfiguration(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
body := bytes.NewBuffer([]byte(`{"CleanupDeadServers": false}`))
req, _ := http.NewRequest("PUT", "/v1/operator/autopilot/configuration", body)
resp := httptest.NewRecorder()
if _, err := s.Server.OperatorAutopilotConfiguration(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: s.Config.Region,
},
}
var reply autopilot.Config
if err := s.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if reply.CleanupDeadServers {
t.Fatalf("bad: %#v", reply)
}
})
}
func TestOperator_AutopilotCASConfiguration(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
body := bytes.NewBuffer([]byte(`{"CleanupDeadServers": false}`))
req, _ := http.NewRequest("PUT", "/v1/operator/autopilot/configuration", body)
resp := httptest.NewRecorder()
if _, err := s.Server.OperatorAutopilotConfiguration(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: s.Config.Region,
},
}
var reply autopilot.Config
if err := s.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if reply.CleanupDeadServers {
t.Fatalf("bad: %#v", reply)
}
// Create a CAS request, bad index
{
buf := bytes.NewBuffer([]byte(`{"CleanupDeadServers": true}`))
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/autopilot/configuration?cas=%d", reply.ModifyIndex-1), buf)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorAutopilotConfiguration(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if res := obj.(bool); res {
t.Fatalf("should NOT work")
}
}
// Create a CAS request, good index
{
buf := bytes.NewBuffer([]byte(`{"CleanupDeadServers": true}`))
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/autopilot/configuration?cas=%d", reply.ModifyIndex), buf)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorAutopilotConfiguration(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if res := obj.(bool); !res {
t.Fatalf("should work")
}
}
// Verify the update
if err := s.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if !reply.CleanupDeadServers {
t.Fatalf("bad: %#v", reply)
}
})
}
func TestOperator_ServerHealth(t *testing.T) {
t.Parallel()
httpTest(t, func(c *Config) {
c.Server.RaftProtocol = 3
}, func(s *TestAgent) {
body := bytes.NewBuffer(nil)
req, _ := http.NewRequest("GET", "/v1/operator/autopilot/health", body)
retry.Run(t, func(r *retry.R) {
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorServerHealth(resp, req)
if err != nil {
r.Fatalf("err: %v", err)
}
if resp.Code != 200 {
r.Fatalf("bad code: %d", resp.Code)
}
out, ok := obj.(*api.OperatorHealthReply)
if !ok {
r.Fatalf("unexpected: %T", obj)
}
if len(out.Servers) != 1 ||
!out.Servers[0].Healthy ||
out.Servers[0].Name != s.server.LocalMember().Name ||
out.Servers[0].SerfStatus != "alive" ||
out.FailureTolerance != 0 {
r.Fatalf("bad: %v, %q", out, s.server.LocalMember().Name)
}
})
})
}
func TestOperator_ServerHealth_Unhealthy(t *testing.T) {
t.Parallel()
httpTest(t, func(c *Config) {
c.Server.RaftProtocol = 3
c.Autopilot.LastContactThreshold = -1 * time.Second
}, func(s *TestAgent) {
body := bytes.NewBuffer(nil)
req, _ := http.NewRequest("GET", "/v1/operator/autopilot/health", body)
retry.Run(t, func(r *retry.R) {
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorServerHealth(resp, req)
if err != nil {
r.Fatalf("err: %v", err)
}
if resp.Code != 429 {
r.Fatalf("bad code: %d, %v", resp.Code, obj.(*api.OperatorHealthReply))
}
out, ok := obj.(*api.OperatorHealthReply)
if !ok {
r.Fatalf("unexpected: %T", obj)
}
if len(out.Servers) != 1 ||
out.Healthy ||
out.Servers[0].Name != s.server.LocalMember().Name {
r.Fatalf("bad: %#v", out.Servers)
}
})
})
}
func TestDurationFixer(t *testing.T) {
obj := map[string]interface{}{
"key1": []map[string]interface{}{
{
"subkey1": "10s",
},
{
"subkey2": "5d",
},
},
"key2": map[string]interface{}{
"subkey3": "30s",
"subkey4": "20m",
},
"key3": "11s",
"key4": "49h",
}
expected := map[string]interface{}{
"key1": []map[string]interface{}{
{
"subkey1": 10 * time.Second,
},
{
"subkey2": "5d",
},
},
"key2": map[string]interface{}{
"subkey3": "30s",
"subkey4": 20 * time.Minute,
},
"key3": "11s",
"key4": 49 * time.Hour,
}
fixer := NewDurationFixer("key4", "subkey1", "subkey4")
if err := fixer.FixupDurations(obj); err != nil {
t.Fatal(err)
}
// Ensure we only processed the intended fieldnames
verify.Values(t, "", obj, expected)
}

View File

@ -301,6 +301,11 @@ func (a *TestAgent) config() *Config {
config.RaftConfig.StartAsLeader = true
config.RaftTimeout = 500 * time.Millisecond
// Tighten the autopilot timing
config.AutopilotConfig.ServerStabilizationTime = 100 * time.Millisecond
config.ServerHealthInterval = 50 * time.Millisecond
config.AutopilotInterval = 100 * time.Millisecond
// Bootstrap ourselves
config.Bootstrap = true
config.BootstrapExpect = 1

View File

@ -0,0 +1,29 @@
package command
import (
"strings"
"github.com/mitchellh/cli"
)
type OperatorAutopilotCommand struct {
Meta
}
func (c *OperatorAutopilotCommand) Run(args []string) int {
return cli.RunResultHelp
}
func (c *OperatorAutopilotCommand) Synopsis() string {
return "Provides tools for modifying Autopilot configuration"
}
func (c *OperatorAutopilotCommand) Help() string {
helpText := `
Usage: consul operator autopilot <subcommand> [options]
The Autopilot operator command is used to interact with Nomad's Autopilot
subsystem. The command can be used to view or modify the current configuration.
`
return strings.TrimSpace(helpText)
}

View File

@ -0,0 +1,70 @@
package command
import (
"fmt"
"strings"
"github.com/posener/complete"
)
type OperatorAutopilotGetCommand struct {
Meta
}
func (c *OperatorAutopilotGetCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient))
}
func (c *OperatorAutopilotGetCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictNothing
}
func (c *OperatorAutopilotGetCommand) Run(args []string) int {
flags := c.Meta.FlagSet("autopilot", FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
if err := flags.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse args: %v", err))
return 1
}
// Set up a client.
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
// Fetch the current configuration.
config, err := client.Operator().AutopilotGetConfiguration(nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying Autopilot configuration: %s", err))
return 1
}
c.Ui.Output(fmt.Sprintf("CleanupDeadServers = %v", config.CleanupDeadServers))
c.Ui.Output(fmt.Sprintf("LastContactThreshold = %v", config.LastContactThreshold.String()))
c.Ui.Output(fmt.Sprintf("MaxTrailingLogs = %v", config.MaxTrailingLogs))
c.Ui.Output(fmt.Sprintf("ServerStabilizationTime = %v", config.ServerStabilizationTime.String()))
c.Ui.Output(fmt.Sprintf("RedundancyZoneTag = %q", config.RedundancyZoneTag))
c.Ui.Output(fmt.Sprintf("DisableUpgradeMigration = %v", config.DisableUpgradeMigration))
c.Ui.Output(fmt.Sprintf("UpgradeVersionTag = %q", config.UpgradeVersionTag))
return 0
}
func (c *OperatorAutopilotGetCommand) Synopsis() string {
return "Display the current Autopilot configuration"
}
func (c *OperatorAutopilotGetCommand) Help() string {
helpText := `
Usage: consul operator autopilot get-config [options]
Displays the current Autopilot configuration.
General Options:
` + generalOptionsUsage()
return strings.TrimSpace(helpText)
}

View File

@ -0,0 +1,32 @@
package command
import (
"strings"
"testing"
"github.com/mitchellh/cli"
)
func TestOperator_Autopilot_GetConfig_Implements(t *testing.T) {
t.Parallel()
var _ cli.Command = &OperatorRaftListCommand{}
}
func TestOperatorAutopilotGetConfigCommand(t *testing.T) {
t.Parallel()
s, _, addr := testServer(t, false, nil)
defer s.Shutdown()
ui := new(cli.MockUi)
c := &OperatorAutopilotGetCommand{Meta: Meta{Ui: ui}}
args := []string{"-address=" + addr}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
output := strings.TrimSpace(ui.OutputWriter.String())
if !strings.Contains(output, "CleanupDeadServers = true") {
t.Fatalf("bad: %s", output)
}
}

View File

@ -0,0 +1,156 @@
package command
import (
"fmt"
"strings"
"time"
"github.com/hashicorp/consul/command/flags"
"github.com/hashicorp/nomad/api"
"github.com/posener/complete"
)
type OperatorAutopilotSetCommand struct {
Meta
}
func (c *OperatorAutopilotSetCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-cleanup-dead-servers": complete.PredictAnything,
"-max-trailing-logs": complete.PredictAnything,
"-last-contact-threshold": complete.PredictAnything,
"-server-stabilization-time": complete.PredictAnything,
"-redundancy-zone-tag": complete.PredictAnything,
"-disable-upgrade-migration": complete.PredictAnything,
"-upgrade-version-tag": complete.PredictAnything,
})
}
func (c *OperatorAutopilotSetCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictNothing
}
func (c *OperatorAutopilotSetCommand) Run(args []string) int {
var cleanupDeadServers flags.BoolValue
var maxTrailingLogs flags.UintValue
var lastContactThreshold flags.DurationValue
var serverStabilizationTime flags.DurationValue
var redundancyZoneTag flags.StringValue
var disableUpgradeMigration flags.BoolValue
var upgradeVersionTag flags.StringValue
f := c.Meta.FlagSet("autopilot", FlagSetClient)
f.Usage = func() { c.Ui.Output(c.Help()) }
f.Var(&cleanupDeadServers, "cleanup-dead-servers", "")
f.Var(&maxTrailingLogs, "max-trailing-logs", "")
f.Var(&lastContactThreshold, "last-contact-threshold", "")
f.Var(&serverStabilizationTime, "server-stabilization-time", "")
f.Var(&redundancyZoneTag, "redundancy-zone-tag", "")
f.Var(&disableUpgradeMigration, "disable-upgrade-migration", "")
f.Var(&upgradeVersionTag, "upgrade-version-tag", "")
if err := f.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse args: %v", err))
return 1
}
// Set up a client.
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
// Fetch the current configuration.
operator := client.Operator()
conf, err := operator.AutopilotGetConfiguration(nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying for Autopilot configuration: %s", err))
return 1
}
// Update the config values based on the set flags.
cleanupDeadServers.Merge(&conf.CleanupDeadServers)
redundancyZoneTag.Merge(&conf.RedundancyZoneTag)
disableUpgradeMigration.Merge(&conf.DisableUpgradeMigration)
upgradeVersionTag.Merge(&conf.UpgradeVersionTag)
trailing := uint(conf.MaxTrailingLogs)
maxTrailingLogs.Merge(&trailing)
conf.MaxTrailingLogs = uint64(trailing)
last := time.Duration(*conf.LastContactThreshold)
lastContactThreshold.Merge(&last)
conf.LastContactThreshold = api.NewReadableDuration(last)
stablization := time.Duration(*conf.ServerStabilizationTime)
serverStabilizationTime.Merge(&stablization)
conf.ServerStabilizationTime = api.NewReadableDuration(stablization)
// Check-and-set the new configuration.
result, err := operator.AutopilotCASConfiguration(conf, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error setting Autopilot configuration: %s", err))
return 1
}
if result {
c.Ui.Output("Configuration updated!")
return 0
}
c.Ui.Output("Configuration could not be atomically updated, please try again")
return 1
}
func (c *OperatorAutopilotSetCommand) Synopsis() string {
return "Modify the current Autopilot configuration"
}
func (c *OperatorAutopilotSetCommand) Help() string {
helpText := `
Usage: consul operator autopilot set-config [options]
Modifies the current Autopilot configuration.
General Options:
` + generalOptionsUsage() + `
Set Config Options:
-cleanup-dead-servers=[true|false]
Controls whether Consul will automatically remove dead servers when
new ones are successfully added. Must be one of [true|false].
-disable-upgrade-migration=[true|false]
(Enterprise-only) Controls whether Consul will avoid promoting
new servers until it can perform a migration. Must be one of
"true|false".
-last-contact-threshold=200ms
Controls the maximum amount of time a server can go without contact
from the leader before being considered unhealthy. Must be a
duration value such as "200ms".
-max-trailing-logs=<value>
Controls the maximum number of log entries that a server can trail
the leader by before being considered unhealthy.
-redundancy-zone-tag=<value>
(Enterprise-only) Controls the node_meta tag name used for
separating servers into different redundancy zones.
-server-stabilization-time=<10s>
Controls the minimum amount of time a server must be stable in
the 'healthy' state before being added to the cluster. Only takes
effect if all servers are running Raft protocol version 3 or
higher. Must be a duration value such as "10s".
-upgrade-version-tag=<value>
(Enterprise-only) The node_meta tag to use for version info when
performing upgrade migrations. If left blank, the Consul version
will be used.
`
return strings.TrimSpace(helpText)
}

View File

@ -0,0 +1,62 @@
package command
import (
"strings"
"testing"
"time"
"github.com/mitchellh/cli"
)
func TestOperator_Autopilot_SetConfig_Implements(t *testing.T) {
t.Parallel()
var _ cli.Command = &OperatorRaftListCommand{}
}
func TestOperatorAutopilotSetConfigCommmand(t *testing.T) {
t.Parallel()
s, _, addr := testServer(t, false, nil)
defer s.Shutdown()
ui := new(cli.MockUi)
c := &OperatorAutopilotSetCommand{Meta: Meta{Ui: ui}}
args := []string{
"-address=" + addr,
"-cleanup-dead-servers=false",
"-max-trailing-logs=99",
"-last-contact-threshold=123ms",
"-server-stabilization-time=123ms",
}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
output := strings.TrimSpace(ui.OutputWriter.String())
if !strings.Contains(output, "Configuration updated") {
t.Fatalf("bad: %s", output)
}
client, err := c.Client()
if err != nil {
t.Fatal(err)
}
conf, err := client.Operator().AutopilotGetConfiguration(nil)
if err != nil {
t.Fatal(err)
}
if conf.CleanupDeadServers {
t.Fatalf("bad: %#v", conf)
}
if conf.MaxTrailingLogs != 99 {
t.Fatalf("bad: %#v", conf)
}
if conf.LastContactThreshold.Duration() != 123*time.Millisecond {
t.Fatalf("bad: %#v", conf)
}
if conf.ServerStabilizationTime.Duration() != 123*time.Millisecond {
t.Fatalf("bad: %#v", conf)
}
}

View File

@ -0,0 +1,12 @@
package command
import (
"testing"
"github.com/mitchellh/cli"
)
func TestOperator_Autopilot_Implements(t *testing.T) {
t.Parallel()
var _ cli.Command = &OperatorAutopilotCommand{}
}

View File

@ -275,6 +275,24 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory {
}, nil
},
"operator autopilot": func() (cli.Command, error) {
return &command.OperatorAutopilotCommand{
Meta: meta,
}, nil
},
"operator autopilot get-config": func() (cli.Command, error) {
return &command.OperatorAutopilotGetCommand{
Meta: meta,
}, nil
},
"operator autopilot set-config": func() (cli.Command, error) {
return &command.OperatorAutopilotSetCommand{
Meta: meta,
}, nil
},
"operator raft": func() (cli.Command, error) {
return &command.OperatorRaftCommand{
Meta: meta,

69
nomad/autopilot.go Normal file
View File

@ -0,0 +1,69 @@
package nomad
import (
"context"
"fmt"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
// AutopilotDelegate is a Consul delegate for autopilot operations.
type AutopilotDelegate struct {
server *Server
}
func (d *AutopilotDelegate) AutopilotConfig() *autopilot.Config {
return d.server.getOrCreateAutopilotConfig()
}
func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []serf.Member) map[string]*autopilot.ServerStats {
return d.server.statsFetcher.Fetch(ctx, servers)
}
func (d *AutopilotDelegate) IsServer(m serf.Member) (*autopilot.ServerInfo, error) {
ok, parts := isNomadServer(m)
if !ok || parts.Region != d.server.Region() {
return nil, nil
}
server := &autopilot.ServerInfo{
Name: m.Name,
ID: parts.ID,
Addr: parts.Addr,
Build: parts.Build,
Status: m.Status,
}
return server, nil
}
// Heartbeat a metric for monitoring if we're the leader
func (d *AutopilotDelegate) NotifyHealth(health autopilot.OperatorHealthReply) {
if d.server.raft.State() == raft.Leader {
metrics.SetGauge([]string{"nomad", "autopilot", "failure_tolerance"}, float32(health.FailureTolerance))
if health.Healthy {
metrics.SetGauge([]string{"nomad", "autopilot", "healthy"}, 1)
} else {
metrics.SetGauge([]string{"nomad", "autopilot", "healthy"}, 0)
}
}
}
func (d *AutopilotDelegate) PromoteNonVoters(conf *autopilot.Config, health autopilot.OperatorHealthReply) ([]raft.Server, error) {
future := d.server.raft.GetConfiguration()
if err := future.Error(); err != nil {
return nil, fmt.Errorf("failed to get raft configuration: %v", err)
}
return autopilot.PromoteStableServers(conf, health, future.Configuration().Servers), nil
}
func (d *AutopilotDelegate) Raft() *raft.Raft {
return d.server.raft
}
func (d *AutopilotDelegate) Serf() *serf.Serf {
return d.server.serf
}

350
nomad/autopilot_test.go Normal file
View File

@ -0,0 +1,350 @@
package nomad
import (
"testing"
"time"
"fmt"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
// wantPeers determines whether the server has the given
// number of voting raft peers.
func wantPeers(s *Server, peers int) error {
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
n := autopilot.NumPeers(future.Configuration())
if got, want := n, peers; got != want {
return fmt.Errorf("got %d peers want %d", got, want)
}
return nil
}
// wantRaft determines if the servers have all of each other in their
// Raft configurations,
func wantRaft(servers []*Server) error {
// Make sure all the servers are represented in the Raft config,
// and that there are no extras.
verifyRaft := func(c raft.Configuration) error {
want := make(map[raft.ServerID]bool)
for _, s := range servers {
want[s.config.RaftConfig.LocalID] = true
}
for _, s := range c.Servers {
if !want[s.ID] {
return fmt.Errorf("don't want %q", s.ID)
}
delete(want, s.ID)
}
if len(want) > 0 {
return fmt.Errorf("didn't find %v", want)
}
return nil
}
for _, s := range servers {
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
if err := verifyRaft(future.Configuration()); err != nil {
return err
}
}
return nil
}
func TestAutopilot_CleanupDeadServer(t *testing.T) {
t.Parallel()
for i := 1; i <= 3; i++ {
testCleanupDeadServer(t, i)
}
}
func testCleanupDeadServer(t *testing.T, raftVersion int) {
conf := func(c *Config) {
c.DevDisableBootstrap = true
c.BootstrapExpect = 3
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(raftVersion)
}
s1 := testServer(t, conf)
defer s1.Shutdown()
s2 := testServer(t, conf)
defer s2.Shutdown()
s3 := testServer(t, conf)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
testJoin(t, s1, s2, s3)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Bring up a new server
s4 := testServer(t, conf)
defer s4.Shutdown()
// Kill a non-leader server
s3.Shutdown()
retry.Run(t, func(r *retry.R) {
alive := 0
for _, m := range s1.Members() {
if m.Status == serf.StatusAlive {
alive++
}
}
if alive != 2 {
r.Fatal(nil)
}
})
// Join the new server
testJoin(t, s1, s4)
servers[2] = s4
// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
}
func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
conf := func(c *Config) {
c.DevDisableBootstrap = true
}
s2 := testServer(t, conf)
defer s2.Shutdown()
s3 := testServer(t, conf)
defer s3.Shutdown()
s4 := testServer(t, conf)
defer s4.Shutdown()
s5 := testServer(t, conf)
defer s5.Shutdown()
servers := []*Server{s1, s2, s3, s4, s5}
// Join the servers to s1, and wait until they are all promoted to
// voters.
testJoin(t, s1, servers[1:]...)
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 5))
}
})
// Kill a non-leader server
s4.Shutdown()
// Should be removed from the peers automatically
servers = []*Server{s1, s2, s3, s5}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 4))
}
})
}
func TestAutopilot_RollingUpdate(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.RaftConfig.ProtocolVersion = 3
})
defer s1.Shutdown()
conf := func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 3
}
s2 := testServer(t, conf)
defer s2.Shutdown()
s3 := testServer(t, conf)
defer s3.Shutdown()
// Join the servers to s1, and wait until they are all promoted to
// voters.
servers := []*Server{s1, s2, s3}
testJoin(t, s1, s2, s3)
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
})
// Add one more server like we are doing a rolling update.
s4 := testServer(t, conf)
defer s4.Shutdown()
testJoin(t, s1, s4)
servers = append(servers, s4)
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
})
// Now kill one of the "old" nodes like we are doing a rolling update.
s3.Shutdown()
isVoter := func() bool {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
for _, s := range future.Configuration().Servers {
if string(s.ID) == string(s4.config.NodeID) {
return s.Suffrage == raft.Voter
}
}
t.Fatalf("didn't find s4")
return false
}
// Wait for s4 to stabilize, get promoted to a voter, and for s3 to be
// removed.
servers = []*Server{s1, s2, s4}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
if !isVoter() {
r.Fatalf("should be a voter")
}
})
}
func TestAutopilot_CleanupStaleRaftServer(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
conf := func(c *Config) {
c.DevDisableBootstrap = true
}
s2 := testServer(t, conf)
defer s2.Shutdown()
s3 := testServer(t, conf)
defer s3.Shutdown()
s4 := testServer(t, conf)
defer s4.Shutdown()
servers := []*Server{s1, s2, s3}
// Join the servers to s1
testJoin(t, s1, s2, s3)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
testutil.WaitForLeader(t, s1.RPC)
// Add s4 to peers directly
addr := fmt.Sprintf("127.0.0.1:%d", s4.config.SerfConfig.MemberlistConfig.BindPort)
s1.raft.AddVoter(raft.ServerID(s4.config.NodeID), raft.ServerAddress(addr), 0, 0)
// Verify we have 4 peers
peers, err := s1.numPeers()
if err != nil {
t.Fatal(err)
}
if peers != 4 {
t.Fatalf("bad: %v", peers)
}
// Wait for s4 to be removed
for _, s := range []*Server{s1, s2, s3} {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
}
func TestAutopilot_PromoteNonVoter(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.RaftConfig.ProtocolVersion = 3
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC)
s2 := testServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 3
})
defer s2.Shutdown()
testJoin(t, s1, s2)
// Make sure we see it as a nonvoter initially. We wait until half
// the stabilization period has passed.
retry.Run(t, func(r *retry.R) {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
r.Fatal(err)
}
servers := future.Configuration().Servers
if len(servers) != 2 {
r.Fatalf("bad: %v", servers)
}
if servers[1].Suffrage != raft.Nonvoter {
r.Fatalf("bad: %v", servers)
}
health := s1.autopilot.GetServerHealth(string(servers[1].ID))
if health == nil {
r.Fatalf("nil health, %v", s1.autopilot.GetClusterHealth())
}
if !health.Healthy {
r.Fatalf("bad: %v", health)
}
if time.Since(health.StableSince) < s1.config.AutopilotConfig.ServerStabilizationTime/2 {
r.Fatal("stable period not elapsed")
}
})
// Make sure it ends up as a voter.
retry.Run(t, func(r *retry.R) {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
r.Fatal(err)
}
servers := future.Configuration().Servers
if len(servers) != 2 {
r.Fatalf("bad: %v", servers)
}
if servers[1].Suffrage != raft.Voter {
r.Fatalf("bad: %v", servers)
}
})
}

View File

@ -8,6 +8,7 @@ import (
"runtime"
"time"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/nomad/helper/uuid"
@ -93,6 +94,10 @@ type Config struct {
// RaftTimeout is applied to any network traffic for raft. Defaults to 10s.
RaftTimeout time.Duration
// (Enterprise-only) NonVoter is used to prevent this server from being added
// as a voting member of the Raft cluster.
NonVoter bool
// SerfConfig is the configuration for the serf cluster
SerfConfig *serf.Config
@ -261,6 +266,19 @@ type Config struct {
// BackwardsCompatibleMetrics determines whether to show methods of
// displaying metrics for older verions, or to only show the new format
BackwardsCompatibleMetrics bool
// AutopilotConfig is used to apply the initial autopilot config when
// bootstrapping.
AutopilotConfig *autopilot.Config
// ServerHealthInterval is the frequency with which the health of the
// servers in the cluster will be updated.
ServerHealthInterval time.Duration
// AutopilotInterval is the frequency with which the leader will perform
// autopilot tasks, such as promoting eligible non-voters and removing
// dead servers.
AutopilotInterval time.Duration
}
// CheckVersion is used to check if the ProtocolVersion is valid
@ -321,6 +339,14 @@ func DefaultConfig() *Config {
TLSConfig: &config.TLSConfig{},
ReplicationBackoff: 30 * time.Second,
SentinelGCInterval: 30 * time.Second,
AutopilotConfig: &autopilot.Config{
CleanupDeadServers: true,
LastContactThreshold: 200 * time.Millisecond,
MaxTrailingLogs: 250,
ServerStabilizationTime: 10 * time.Second,
},
ServerHealthInterval: 2 * time.Second,
AutopilotInterval: 10 * time.Second,
}
// Enable all known schedulers by default
@ -344,8 +370,8 @@ func DefaultConfig() *Config {
// Disable shutdown on removal
c.RaftConfig.ShutdownOnRemove = false
// Enable interoperability with raft protocol version 1, and don't
// start using new ID-based features yet.
// Enable interoperability with new raft APIs, requires all servers
// to be on raft v1 or higher.
c.RaftConfig.ProtocolVersion = 2
return c

View File

@ -234,6 +234,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyACLTokenDelete(buf[1:], log.Index)
case structs.ACLTokenBootstrapRequestType:
return n.applyACLTokenBootstrap(buf[1:], log.Index)
case structs.AutopilotRequestType:
return n.applyAutopilotUpdate(buf[1:], log.Index)
}
// Check enterprise only message types.
@ -833,6 +835,23 @@ func (n *nomadFSM) applyACLTokenBootstrap(buf []byte, index uint64) interface{}
return nil
}
func (n *nomadFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
var req structs.AutopilotSetConfigRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "autopilot"}, time.Now())
if req.CAS {
act, err := n.state.AutopilotCASConfig(index, req.Config.ModifyIndex, &req.Config)
if err != nil {
return err
}
return act
}
return n.state.AutopilotSetConfig(index, &req.Config)
}
func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
// Create a new snapshot
snap, err := n.state.Snapshot()

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/consul/agent/consul/autopilot"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/mock"
@ -2300,3 +2301,62 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
t.Fatalf("Diff % #v", pretty.Diff(&expected, out2))
}
}
func TestFSM_Autopilot(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
// Set the autopilot config using a request.
req := structs.AutopilotSetConfigRequest{
Datacenter: "dc1",
Config: autopilot.Config{
CleanupDeadServers: true,
LastContactThreshold: 10 * time.Second,
MaxTrailingLogs: 300,
},
}
buf, err := structs.Encode(structs.AutopilotRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
// Verify key is set directly in the state store.
_, config, err := fsm.state.AutopilotConfig()
if err != nil {
t.Fatalf("err: %v", err)
}
if config.CleanupDeadServers != req.Config.CleanupDeadServers {
t.Fatalf("bad: %v", config.CleanupDeadServers)
}
if config.LastContactThreshold != req.Config.LastContactThreshold {
t.Fatalf("bad: %v", config.LastContactThreshold)
}
if config.MaxTrailingLogs != req.Config.MaxTrailingLogs {
t.Fatalf("bad: %v", config.MaxTrailingLogs)
}
// Now use CAS and provide an old index
req.CAS = true
req.Config.CleanupDeadServers = false
req.Config.ModifyIndex = config.ModifyIndex - 1
buf, err = structs.Encode(structs.AutopilotRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
_, config, err = fsm.state.AutopilotConfig()
if err != nil {
t.Fatalf("err: %v", err)
}
if !config.CleanupDeadServers {
t.Fatalf("bad: %v", config.CleanupDeadServers)
}
}

View File

@ -13,7 +13,9 @@ import (
"golang.org/x/time/rate"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/consul/autopilot"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@ -37,6 +39,8 @@ const (
barrierWriteTimeout = 2 * time.Minute
)
var minAutopilotVersion = version.Must(version.NewVersion("0.8.0"))
// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
@ -168,6 +172,10 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
}
}
// Initialize and start the autopilot routine
s.getOrCreateAutopilotConfig()
s.autopilot.Start()
// Enable the plan queue, since we are now the leader
s.planQueue.SetEnabled(true)
@ -635,6 +643,9 @@ func (s *Server) revokeLeadership() error {
// Clear the leader token since we are no longer the leader.
s.setLeaderAcl("")
// Disable autopilot
s.autopilot.Stop()
// Disable the plan queue, since we are no longer leader
s.planQueue.SetEnabled(false)
@ -776,7 +787,7 @@ func (s *Server) addRaftPeer(m serf.Member, parts *serverParts) error {
// but we want to avoid doing that if possible to prevent useless Raft
// log entries. If the address is the same but the ID changed, remove the
// old server before adding the new one.
minRaftProtocol, err := MinRaftProtocol(s.config.Region, members)
minRaftProtocol, err := s.autopilot.MinRaftProtocol()
if err != nil {
return err
}
@ -810,8 +821,7 @@ func (s *Server) addRaftPeer(m serf.Member, parts *serverParts) error {
// Attempt to add as a peer
switch {
case minRaftProtocol >= 3:
// todo(kyhavlov): change this to AddNonVoter when adding autopilot
addFuture := s.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
addFuture := s.raft.AddNonvoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to add raft peer: %v", err)
return err
@ -836,7 +846,6 @@ func (s *Server) addRaftPeer(m serf.Member, parts *serverParts) error {
// removeRaftPeer is used to remove a Raft peer when a Nomad server leaves
// or is reaped
func (s *Server) removeRaftPeer(m serf.Member, parts *serverParts) error {
// TODO (alexdadgar) - This will need to be changed once we support node IDs.
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
// See if it's already in the configuration. It's harmless to re-remove it
@ -848,7 +857,7 @@ func (s *Server) removeRaftPeer(m serf.Member, parts *serverParts) error {
return err
}
minRaftProtocol, err := MinRaftProtocol(s.config.Region, s.serf.Members())
minRaftProtocol, err := s.autopilot.MinRaftProtocol()
if err != nil {
return err
}
@ -1163,3 +1172,31 @@ func diffACLTokens(state *state.StateStore, minIndex uint64, remoteList []*struc
}
return
}
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
func (s *Server) getOrCreateAutopilotConfig() *autopilot.Config {
state := s.fsm.State()
_, config, err := state.AutopilotConfig()
if err != nil {
s.logger.Printf("[ERR] autopilot: failed to get config: %v", err)
return nil
}
if config != nil {
return config
}
if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion) {
s.logger.Printf("[INFO] autopilot: version %v", s.Members()[0].Tags)
s.logger.Printf("[WARN] autopilot: can't initialize until all servers are >= %s", minAutopilotVersion.String())
return nil
}
config = s.config.AutopilotConfig
req := structs.AutopilotSetConfigRequest{Config: *config}
if _, _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil {
s.logger.Printf("[ERR] autopilot: failed to initialize config: %v", err)
return nil
}
return config
}

View File

@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/testutil/retry"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
@ -815,21 +816,18 @@ func TestLeader_DiffACLTokens(t *testing.T) {
func TestLeader_UpgradeRaftVersion(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 2
})
defer s1.Shutdown()
s2 := testServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 1
})
defer s2.Shutdown()
s3 := testServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 2
})
defer s3.Shutdown()
@ -854,7 +852,7 @@ func TestLeader_UpgradeRaftVersion(t *testing.T) {
}
for _, s := range []*Server{s1, s3} {
minVer, err := MinRaftProtocol(s1.config.Region, s.Members())
minVer, err := s.autopilot.MinRaftProtocol()
if err != nil {
t.Fatal(err)
}
@ -902,3 +900,81 @@ func TestLeader_UpgradeRaftVersion(t *testing.T) {
})
}
}
func TestLeader_RollRaftServer(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.RaftConfig.ProtocolVersion = 2
})
defer s1.Shutdown()
s2 := testServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 1
})
defer s2.Shutdown()
s3 := testServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 2
})
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
testJoin(t, s1, s2, s3)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Kill the v1 server
s2.Shutdown()
for _, s := range []*Server{s1, s3} {
retry.Run(t, func(r *retry.R) {
minVer, err := s.autopilot.MinRaftProtocol()
if err != nil {
r.Fatal(err)
}
if got, want := minVer, 2; got != want {
r.Fatalf("got min raft version %d want %d", got, want)
}
})
}
// Replace the dead server with one running raft protocol v3
s4 := testServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 3
})
defer s4.Shutdown()
testJoin(t, s4, s1)
servers[1] = s4
// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
retry.Run(t, func(r *retry.R) {
addrs := 0
ids := 0
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
r.Fatal(err)
}
for _, server := range future.Configuration().Servers {
if string(server.ID) == string(server.Address) {
addrs++
} else {
ids++
}
}
if got, want := addrs, 2; got != want {
r.Fatalf("got %d server addresses want %d", got, want)
}
if got, want := ids, 1; got != want {
r.Fatalf("got %d server ids want %d", got, want)
}
})
}
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"net"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
@ -124,3 +125,96 @@ REMOVE:
op.srv.logger.Printf("[WARN] nomad.operator: Removed Raft peer %q", args.Address)
return nil
}
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
func (op *Operator) AutopilotGetConfiguration(args *structs.GenericRequest, reply *autopilot.Config) error {
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
return err
}
// This action requires operator read access.
rule, err := op.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
if rule != nil && !rule.AllowOperatorRead() {
return structs.ErrPermissionDenied
}
state := op.srv.fsm.State()
_, config, err := state.AutopilotConfig()
if err != nil {
return err
}
if config == nil {
return fmt.Errorf("autopilot config not initialized yet")
}
*reply = *config
return nil
}
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error {
if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done {
return err
}
// This action requires operator write access.
rule, err := op.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
if rule != nil && !rule.AllowOperatorWrite() {
return structs.ErrPermissionDenied
}
// Apply the update
resp, _, err := op.srv.raftApply(structs.AutopilotRequestType, args)
if err != nil {
op.srv.logger.Printf("[ERR] nomad.operator: Apply failed: %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Check if the return type is a bool.
if respBool, ok := resp.(bool); ok {
*reply = respBool
}
return nil
}
// ServerHealth is used to get the current health of the servers.
func (op *Operator) ServerHealth(args *structs.GenericRequest, reply *autopilot.OperatorHealthReply) error {
// This must be sent to the leader, so we fix the args since we are
// re-using a structure where we don't support all the options.
args.AllowStale = false
if done, err := op.srv.forward("Operator.ServerHealth", args, args, reply); done {
return err
}
// This action requires operator read access.
rule, err := op.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
if rule != nil && !rule.AllowOperatorRead() {
return structs.ErrPermissionDenied
}
// Exit early if the min Raft version is too low
minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol()
if err != nil {
return fmt.Errorf("error getting server raft protocol versions: %s", err)
}
if minRaftProtocol < 3 {
return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint")
}
*reply = op.srv.autopilot.GetClusterHealth()
return nil
}

View File

@ -184,7 +184,7 @@ func (s *Server) maybeBootstrap() {
// Attempt a live bootstrap!
var configuration raft.Configuration
var addrs []string
minRaftVersion, err := MinRaftProtocol(s.config.Region, members)
minRaftVersion, err := s.autopilot.MinRaftProtocol()
if err != nil {
s.logger.Printf("[ERR] nomad: Failed to read server raft versions: %v", err)
}

View File

@ -17,6 +17,7 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/consul/agent/consul/autopilot"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
multierror "github.com/hashicorp/go-multierror"
@ -100,6 +101,9 @@ type Server struct {
raftInmem *raft.InmemStore
raftTransport *raft.NetworkTransport
// autopilot is the Autopilot instance for this server.
autopilot *autopilot.Autopilot
// fsm is the state machine used with Raft
fsm *nomadFSM
@ -171,6 +175,10 @@ type Server struct {
leaderAcl string
leaderAclLock sync.Mutex
// statsFetcher is used by autopilot to check the status of the other
// Nomad router.
statsFetcher *StatsFetcher
// EnterpriseState is used to fill in state for Pro/Ent builds
EnterpriseState
@ -271,6 +279,9 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg
// Create the periodic dispatcher for launching periodic jobs.
s.periodicDispatcher = NewPeriodicDispatch(s.logger, s)
// Initialize the stats fetcher that autopilot will use.
s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Region)
// Setup Vault
if err := s.setupVaultClient(); err != nil {
s.Shutdown()
@ -346,6 +357,9 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg
// Emit metrics
go s.heartbeatStats()
// Start the server health checking.
go s.autopilot.ServerHealthLoop(s.shutdownCh)
// Start enterprise background workers
s.startEnterpriseBackground()
@ -777,6 +791,8 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error {
}
s.rpcListener = list
s.logger.Printf("[INFO] nomad: RPC listening on %q", s.rpcListener.Addr().String())
if s.config.RPCAdvertise != nil {
s.rpcAdvertise = s.config.RPCAdvertise
} else {
@ -977,6 +993,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.Tags["build"] = s.config.Build
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["id"] = s.config.NodeID
conf.Tags["rpc_addr"] = s.rpcAdvertise.(*net.TCPAddr).IP.String()
conf.Tags["port"] = fmt.Sprintf("%d", s.rpcAdvertise.(*net.TCPAddr).Port)
if s.config.Bootstrap || (s.config.DevMode && !s.config.DevDisableBootstrap) {
conf.Tags["bootstrap"] = "1"
@ -985,6 +1002,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
if bootstrapExpect != 0 {
conf.Tags["expect"] = fmt.Sprintf("%d", bootstrapExpect)
}
if s.config.NonVoter {
conf.Tags["nonvoter"] = "1"
}
conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput
conf.EventCh = ch

View File

@ -2,9 +2,15 @@
package nomad
import "github.com/hashicorp/consul/agent/consul/autopilot"
type EnterpriseState struct{}
func (s *Server) setupEnterprise(config *Config) error {
// Set up the OSS version of autopilot
apDelegate := &AutopilotDelegate{s}
s.autopilot = autopilot.NewAutopilot(s.logger, apDelegate, config.AutopilotInterval, config.ServerHealthInterval)
return nil
}

View File

@ -55,7 +55,7 @@ func testACLServer(t *testing.T, cb func(*Config)) (*Server, *structs.ACLToken)
func testServer(t *testing.T, cb func(*Config)) *Server {
// Setup the default settings
config := DefaultConfig()
config.Build = "0.7.0+unittest"
config.Build = "0.8.0+unittest"
config.DevMode = true
nodeNum := atomic.AddUint32(&nodeNumber, 1)
config.NodeName = fmt.Sprintf("nomad-%03d", nodeNum)
@ -74,6 +74,11 @@ func testServer(t *testing.T, cb func(*Config)) *Server {
config.RaftConfig.ElectionTimeout = 50 * time.Millisecond
config.RaftTimeout = 500 * time.Millisecond
// Tighten the autopilot timing
config.AutopilotConfig.ServerStabilizationTime = 100 * time.Millisecond
config.ServerHealthInterval = 50 * time.Millisecond
config.AutopilotInterval = 100 * time.Millisecond
// Disable Vault
f := false
config.VaultConfig.Enabled = &f

104
nomad/state/autopilot.go Normal file
View File

@ -0,0 +1,104 @@
package state
import (
"fmt"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/go-memdb"
)
// autopilotConfigTableSchema returns a new table schema used for storing
// the autopilot configuration
func autopilotConfigTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "autopilot-config",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: true,
Unique: true,
Indexer: &memdb.ConditionalIndex{
Conditional: func(obj interface{}) (bool, error) { return true, nil },
},
},
},
}
}
// AutopilotConfig is used to get the current Autopilot configuration.
func (s *StateStore) AutopilotConfig() (uint64, *autopilot.Config, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the autopilot config
c, err := tx.First("autopilot-config", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed autopilot config lookup: %s", err)
}
config, ok := c.(*autopilot.Config)
if !ok {
return 0, nil, nil
}
return config.ModifyIndex, config, nil
}
// AutopilotSetConfig is used to set the current Autopilot configuration.
func (s *StateStore) AutopilotSetConfig(idx uint64, config *autopilot.Config) error {
tx := s.db.Txn(true)
defer tx.Abort()
s.autopilotSetConfigTxn(idx, tx, config)
tx.Commit()
return nil
}
// AutopilotCASConfig is used to try updating the Autopilot configuration with a
// given Raft index. If the CAS index specified is not equal to the last observed index
// for the config, then the call is a noop,
func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *autopilot.Config) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
// Check for an existing config
existing, err := tx.First("autopilot-config", "id")
if err != nil {
return false, fmt.Errorf("failed autopilot config lookup: %s", err)
}
// If the existing index does not match the provided CAS
// index arg, then we shouldn't update anything and can safely
// return early here.
e, ok := existing.(*autopilot.Config)
if !ok || e.ModifyIndex != cidx {
return false, nil
}
s.autopilotSetConfigTxn(idx, tx, config)
tx.Commit()
return true, nil
}
func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *autopilot.Config) error {
// Check for an existing config
existing, err := tx.First("autopilot-config", "id")
if err != nil {
return fmt.Errorf("failed autopilot config lookup: %s", err)
}
// Set the indexes.
if existing != nil {
config.CreateIndex = existing.(*autopilot.Config).CreateIndex
} else {
config.CreateIndex = idx
}
config.ModifyIndex = idx
if err := tx.Insert("autopilot-config", config); err != nil {
return fmt.Errorf("failed updating autopilot config: %s", err)
}
return nil
}

View File

@ -0,0 +1,94 @@
package state
import (
"reflect"
"testing"
"time"
"github.com/hashicorp/consul/agent/consul/autopilot"
)
func TestStateStore_Autopilot(t *testing.T) {
s := testStateStore(t)
expected := &autopilot.Config{
CleanupDeadServers: true,
LastContactThreshold: 5 * time.Second,
MaxTrailingLogs: 500,
ServerStabilizationTime: 100 * time.Second,
RedundancyZoneTag: "az",
DisableUpgradeMigration: true,
UpgradeVersionTag: "build",
}
if err := s.AutopilotSetConfig(0, expected); err != nil {
t.Fatal(err)
}
idx, config, err := s.AutopilotConfig()
if err != nil {
t.Fatal(err)
}
if idx != 0 {
t.Fatalf("bad: %d", idx)
}
if !reflect.DeepEqual(expected, config) {
t.Fatalf("bad: %#v, %#v", expected, config)
}
}
func TestStateStore_AutopilotCAS(t *testing.T) {
s := testStateStore(t)
expected := &autopilot.Config{
CleanupDeadServers: true,
}
if err := s.AutopilotSetConfig(0, expected); err != nil {
t.Fatal(err)
}
if err := s.AutopilotSetConfig(1, expected); err != nil {
t.Fatal(err)
}
// Do a CAS with an index lower than the entry
ok, err := s.AutopilotCASConfig(2, 0, &autopilot.Config{
CleanupDeadServers: false,
})
if ok || err != nil {
t.Fatalf("expected (false, nil), got: (%v, %#v)", ok, err)
}
// Check that the index is untouched and the entry
// has not been updated.
idx, config, err := s.AutopilotConfig()
if err != nil {
t.Fatal(err)
}
if idx != 1 {
t.Fatalf("bad: %d", idx)
}
if !config.CleanupDeadServers {
t.Fatalf("bad: %#v", config)
}
// Do another CAS, this time with the correct index
ok, err = s.AutopilotCASConfig(2, 1, &autopilot.Config{
CleanupDeadServers: false,
})
if !ok || err != nil {
t.Fatalf("expected (true, nil), got: (%v, %#v)", ok, err)
}
// Make sure the config was updated
idx, config, err = s.AutopilotConfig()
if err != nil {
t.Fatal(err)
}
if idx != 2 {
t.Fatalf("bad: %d", idx)
}
if config.CleanupDeadServers {
t.Fatalf("bad: %#v", config)
}
}

View File

@ -43,6 +43,7 @@ func init() {
vaultAccessorTableSchema,
aclPolicyTableSchema,
aclTokenTableSchema,
autopilotConfigTableSchema,
}...)
}

103
nomad/stats_fetcher.go Normal file
View File

@ -0,0 +1,103 @@
package nomad
import (
"context"
"log"
"sync"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/serf/serf"
)
// StatsFetcher has two functions for autopilot. First, lets us fetch all the
// stats in parallel so we are taking a sample as close to the same time as
// possible, since we are comparing time-sensitive info for the health check.
// Second, it bounds the time so that one slow RPC can't hold up the health
// check loop; as a side effect of how it implements this, it also limits to
// a single in-flight RPC to any given server, so goroutines don't accumulate
// as we run the health check fairly frequently.
type StatsFetcher struct {
logger *log.Logger
pool *ConnPool
region string
inflight map[string]struct{}
inflightLock sync.Mutex
}
// NewStatsFetcher returns a stats fetcher.
func NewStatsFetcher(logger *log.Logger, pool *ConnPool, region string) *StatsFetcher {
return &StatsFetcher{
logger: logger,
pool: pool,
region: region,
inflight: make(map[string]struct{}),
}
}
// fetch does the RPC to fetch the server stats from a single server. We don't
// cancel this when the context is canceled because we only want one in-flight
// RPC to each server, so we let it finish and then clean up the in-flight
// tracking.
func (f *StatsFetcher) fetch(server *serverParts, replyCh chan *autopilot.ServerStats) {
var args struct{}
var reply autopilot.ServerStats
err := f.pool.RPC(f.region, server.RPCAddr, server.MajorVersion, "Status.RaftStats", &args, &reply)
if err != nil {
f.logger.Printf("[WARN] nomad: error getting server health from %q: %v",
server.Name, err)
} else {
replyCh <- &reply
}
f.inflightLock.Lock()
delete(f.inflight, server.ID)
f.inflightLock.Unlock()
}
// Fetch will attempt to query all the servers in parallel.
func (f *StatsFetcher) Fetch(ctx context.Context, members []serf.Member) map[string]*autopilot.ServerStats {
type workItem struct {
server *serverParts
replyCh chan *autopilot.ServerStats
}
var servers []*serverParts
for _, s := range members {
if ok, parts := isNomadServer(s); ok {
servers = append(servers, parts)
}
}
// Skip any servers that have inflight requests.
var work []*workItem
f.inflightLock.Lock()
for _, server := range servers {
if _, ok := f.inflight[server.ID]; ok {
f.logger.Printf("[WARN] nomad: error getting server health from %q: last request still outstanding",
server.Name)
} else {
workItem := &workItem{
server: server,
replyCh: make(chan *autopilot.ServerStats, 1),
}
work = append(work, workItem)
f.inflight[server.ID] = struct{}{}
go f.fetch(workItem.server, workItem.replyCh)
}
}
f.inflightLock.Unlock()
// Now wait for the results to come in, or for the context to be
// canceled.
replies := make(map[string]*autopilot.ServerStats)
for _, workItem := range work {
select {
case reply := <-workItem.replyCh:
replies[workItem.server.ID] = reply
case <-ctx.Done():
f.logger.Printf("[WARN] nomad: error getting server health from %q: %v",
workItem.server.Name, ctx.Err())
}
}
return replies
}

View File

@ -0,0 +1,95 @@
package nomad
import (
"context"
"testing"
"time"
"github.com/hashicorp/nomad/testutil"
)
func TestStatsFetcher(t *testing.T) {
t.Parallel()
conf := func(c *Config) {
c.Region = "region-a"
c.DevDisableBootstrap = true
c.BootstrapExpect = 3
}
s1 := testServer(t, conf)
defer s1.Shutdown()
s2 := testServer(t, conf)
defer s2.Shutdown()
s3 := testServer(t, conf)
defer s3.Shutdown()
testJoin(t, s1, s2, s3)
testutil.WaitForLeader(t, s1.RPC)
members := s1.serf.Members()
if len(members) != 3 {
t.Fatalf("bad len: %d", len(members))
}
var servers []*serverParts
for _, member := range members {
ok, server := isNomadServer(member)
if !ok {
t.Fatalf("bad: %#v", member)
}
servers = append(servers, server)
}
// Do a normal fetch and make sure we get three responses.
func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stats := s1.statsFetcher.Fetch(ctx, s1.Members())
if len(stats) != 3 {
t.Fatalf("bad: %#v", stats)
}
for id, stat := range stats {
switch id {
case s1.config.NodeID, s2.config.NodeID, s3.config.NodeID:
// OK
default:
t.Fatalf("bad: %s", id)
}
if stat == nil || stat.LastTerm == 0 {
t.Fatalf("bad: %#v", stat)
}
}
}()
// Fake an in-flight request to server 3 and make sure we don't fetch
// from it.
func() {
s1.statsFetcher.inflight[string(s3.config.NodeID)] = struct{}{}
defer delete(s1.statsFetcher.inflight, string(s3.config.NodeID))
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stats := s1.statsFetcher.Fetch(ctx, s1.Members())
if len(stats) != 2 {
t.Fatalf("bad: %#v", stats)
}
for id, stat := range stats {
switch id {
case s1.config.NodeID, s2.config.NodeID:
// OK
case s3.config.NodeID:
t.Fatalf("bad")
default:
t.Fatalf("bad: %s", id)
}
if stat == nil || stat.LastTerm == 0 {
t.Fatalf("bad: %#v", stat)
}
}
}()
}

View File

@ -1,6 +1,10 @@
package nomad
import (
"fmt"
"strconv"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -104,3 +108,21 @@ func (s *Status) Members(args *structs.GenericRequest, reply *structs.ServerMemb
}
return nil
}
// Used by Autopilot to query the raft stats of the local server.
func (s *Status) RaftStats(args struct{}, reply *autopilot.ServerStats) error {
stats := s.srv.raft.Stats()
var err error
reply.LastContact = stats["last_contact"]
reply.LastIndex, err = strconv.ParseUint(stats["last_log_index"], 10, 64)
if err != nil {
return fmt.Errorf("error parsing server's last_log_index value: %s", err)
}
reply.LastTerm, err = strconv.ParseUint(stats["last_log_term"], 10, 64)
if err != nil {
return fmt.Errorf("error parsing server's last_log_term value: %s", err)
}
return nil
}

View File

@ -0,0 +1,98 @@
package config
import (
"time"
"github.com/hashicorp/nomad/helper"
)
type AutopilotConfig struct {
// CleanupDeadServers controls whether to remove dead servers when a new
// server is added to the Raft peers.
CleanupDeadServers *bool `mapstructure:"cleanup_dead_servers"`
// ServerStabilizationTime is the minimum amount of time a server must be
// in a stable, healthy state before it can be added to the cluster. Only
// applicable with Raft protocol version 3 or higher.
ServerStabilizationTime time.Duration `mapstructure:"server_stabilization_time"`
// LastContactThreshold is the limit on the amount of time a server can go
// without leader contact before being considered unhealthy.
LastContactThreshold time.Duration `mapstructure:"last_contact_threshold"`
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
// be behind before being considered unhealthy.
MaxTrailingLogs int `mapstructure:"max_trailing_logs"`
// (Enterprise-only) RedundancyZoneTag is the node tag to use for separating
// servers into zones for redundancy. If left blank, this feature will be disabled.
RedundancyZoneTag string `mapstructure:"redundancy_zone_tag"`
// (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration
// strategy of waiting until enough newer-versioned servers have been added to the
// cluster before promoting them to voters.
DisableUpgradeMigration *bool `mapstructure:"disable_upgrade_migration"`
// (Enterprise-only) UpgradeVersionTag is the node tag to use for version info when
// performing upgrade migrations. If left blank, the Consul version will be used.
UpgradeVersionTag string `mapstructure:"upgrade_version_tag"`
}
// DefaultAutopilotConfig() returns the canonical defaults for the Nomad
// `autopilot` configuration.
func DefaultAutopilotConfig() *AutopilotConfig {
return &AutopilotConfig{
CleanupDeadServers: helper.BoolToPtr(true),
LastContactThreshold: 200 * time.Millisecond,
MaxTrailingLogs: 250,
ServerStabilizationTime: 10 * time.Second,
}
}
func (a *AutopilotConfig) Merge(b *AutopilotConfig) *AutopilotConfig {
result := a.Copy()
if b.CleanupDeadServers != nil {
result.CleanupDeadServers = helper.BoolToPtr(*b.CleanupDeadServers)
}
if b.ServerStabilizationTime != 0 {
result.ServerStabilizationTime = b.ServerStabilizationTime
}
if b.LastContactThreshold != 0 {
result.LastContactThreshold = b.LastContactThreshold
}
if b.MaxTrailingLogs != 0 {
result.MaxTrailingLogs = b.MaxTrailingLogs
}
if b.RedundancyZoneTag != "" {
result.RedundancyZoneTag = b.RedundancyZoneTag
}
if b.DisableUpgradeMigration != nil {
result.DisableUpgradeMigration = helper.BoolToPtr(*b.DisableUpgradeMigration)
}
if b.UpgradeVersionTag != "" {
result.UpgradeVersionTag = b.UpgradeVersionTag
}
return result
}
// Copy returns a copy of this Autopilot config.
func (a *AutopilotConfig) Copy() *AutopilotConfig {
if a == nil {
return nil
}
nc := new(AutopilotConfig)
*nc = *a
// Copy the bools
if a.CleanupDeadServers != nil {
nc.CleanupDeadServers = helper.BoolToPtr(*a.CleanupDeadServers)
}
if a.DisableUpgradeMigration != nil {
nc.DisableUpgradeMigration = helper.BoolToPtr(*a.DisableUpgradeMigration)
}
return nc
}

View File

@ -0,0 +1,46 @@
package config
import (
"reflect"
"testing"
"time"
)
func TestAutopilotConfig_Merge(t *testing.T) {
trueValue, falseValue := true, false
c1 := &AutopilotConfig{
CleanupDeadServers: &falseValue,
ServerStabilizationTime: 1 * time.Second,
LastContactThreshold: 1 * time.Second,
MaxTrailingLogs: 1,
RedundancyZoneTag: "1",
DisableUpgradeMigration: &falseValue,
UpgradeVersionTag: "1",
}
c2 := &AutopilotConfig{
CleanupDeadServers: &trueValue,
ServerStabilizationTime: 2 * time.Second,
LastContactThreshold: 2 * time.Second,
MaxTrailingLogs: 2,
RedundancyZoneTag: "2",
DisableUpgradeMigration: nil,
UpgradeVersionTag: "2",
}
e := &AutopilotConfig{
CleanupDeadServers: &trueValue,
ServerStabilizationTime: 2 * time.Second,
LastContactThreshold: 2 * time.Second,
MaxTrailingLogs: 2,
RedundancyZoneTag: "2",
DisableUpgradeMigration: &falseValue,
UpgradeVersionTag: "2",
}
result := c1.Merge(c2)
if !reflect.DeepEqual(result, e) {
t.Fatalf("bad:\n%#v\n%#v", result, e)
}
}

View File

@ -1,6 +1,7 @@
package structs
import (
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/raft"
)
@ -50,3 +51,24 @@ type RaftPeerByAddressRequest struct {
// WriteRequest holds the Region for this request.
WriteRequest
}
// AutopilotSetConfigRequest is used by the Operator endpoint to update the
// current Autopilot configuration of the cluster.
type AutopilotSetConfigRequest struct {
// Datacenter is the target this request is intended for.
Datacenter string
// Config is the new Autopilot configuration to use.
Config autopilot.Config
// CAS controls whether to use check-and-set semantics for this request.
CAS bool
// WriteRequest holds the ACL token to go along with this request.
WriteRequest
}
// RequestDatacenter returns the datacenter for a given request.
func (op *AutopilotSetConfigRequest) RequestDatacenter() string {
return op.Datacenter
}

View File

@ -77,6 +77,7 @@ const (
ACLTokenUpsertRequestType
ACLTokenDeleteRequestType
ACLTokenBootstrapRequestType
AutopilotRequestType
)
const (

View File

@ -46,7 +46,9 @@ type serverParts struct {
MinorVersion int
Build version.Version
RaftVersion int
NonVoter bool
Addr net.Addr
RPCAddr net.Addr
Status serf.MemberStatus
}
@ -69,6 +71,7 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
region := m.Tags["region"]
datacenter := m.Tags["dc"]
_, bootstrap := m.Tags["bootstrap"]
_, nonVoter := m.Tags["nonvoter"]
expect := 0
expect_str, ok := m.Tags["expect"]
@ -80,6 +83,12 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
}
}
// If the server is missing the rpc_addr tag, default to the serf advertise addr
rpc_ip := net.ParseIP(m.Tags["rpc_addr"])
if rpc_ip == nil {
rpc_ip = m.Addr
}
port_str := m.Tags["port"]
port, err := strconv.Atoi(port_str)
if err != nil {
@ -116,6 +125,7 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
}
addr := &net.TCPAddr{IP: m.Addr, Port: port}
rpcAddr := &net.TCPAddr{IP: rpc_ip, Port: port}
parts := &serverParts{
Name: m.Name,
ID: id,
@ -125,10 +135,12 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,
RPCAddr: rpcAddr,
MajorVersion: majorVersion,
MinorVersion: minorVersion,
Build: *build_version,
RaftVersion: raft_vsn,
NonVoter: nonVoter,
Status: m.Status,
}
return true, parts
@ -139,7 +151,10 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version) bool {
for _, member := range members {
if valid, parts := isNomadServer(member); valid && parts.Status == serf.StatusAlive {
if parts.Build.LessThan(minVersion) {
// Check if the versions match - version.LessThan will return true for
// 0.8.0-rc1 < 0.8.0, so we want to ignore the metadata
versionsMatch := slicesMatch(minVersion.Segments(), parts.Build.Segments())
if parts.Build.LessThan(minVersion) && !versionsMatch {
return false
}
}
@ -148,34 +163,26 @@ func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Versio
return true
}
// MinRaftProtocol returns the lowest supported Raft protocol among alive servers
// in the given region.
func MinRaftProtocol(region string, members []serf.Member) (int, error) {
minVersion := -1
for _, m := range members {
if m.Tags["role"] != "nomad" || m.Tags["region"] != region || m.Status != serf.StatusAlive {
continue
}
func slicesMatch(a, b []int) bool {
if a == nil && b == nil {
return true
}
vsn, ok := m.Tags["raft_vsn"]
if !ok {
vsn = "1"
}
raftVsn, err := strconv.Atoi(vsn)
if err != nil {
return -1, err
}
if a == nil || b == nil {
return false
}
if minVersion == -1 || raftVsn < minVersion {
minVersion = raftVsn
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
if minVersion == -1 {
return minVersion, fmt.Errorf("no servers found")
}
return minVersion, nil
return true
}
// shuffleStrings randomly shuffles the list of strings

View File

@ -1,7 +1,6 @@
package nomad
import (
"errors"
"net"
"reflect"
"testing"
@ -18,12 +17,15 @@ func TestIsNomadServer(t *testing.T) {
Addr: net.IP([]byte{127, 0, 0, 1}),
Status: serf.StatusAlive,
Tags: map[string]string{
"role": "nomad",
"region": "aws",
"dc": "east-aws",
"port": "10000",
"vsn": "1",
"build": "0.7.0+ent",
"role": "nomad",
"region": "aws",
"dc": "east-aws",
"rpc_addr": "1.1.1.1",
"port": "10000",
"vsn": "1",
"raft_vsn": "2",
"nonvoter": "1",
"build": "0.7.0+ent",
},
}
valid, parts := isNomadServer(m)
@ -43,6 +45,15 @@ func TestIsNomadServer(t *testing.T) {
if parts.Status != serf.StatusAlive {
t.Fatalf("bad: %v", parts.Status)
}
if parts.RaftVersion != 2 {
t.Fatalf("bad: %v", parts.RaftVersion)
}
if parts.RPCAddr.String() != "1.1.1.1:10000" {
t.Fatalf("bad: %v", parts.RPCAddr.String())
}
if !parts.NonVoter {
t.Fatalf("bad: %v", parts.NonVoter)
}
if seg := parts.Build.Segments(); len(seg) != 3 {
t.Fatalf("bad: %v", parts.Build)
} else if seg[0] != 0 && seg[1] != 7 && seg[2] != 0 {
@ -152,105 +163,6 @@ func TestServersMeetMinimumVersion(t *testing.T) {
}
}
func TestMinRaftProtocol(t *testing.T) {
t.Parallel()
makeMember := func(version, region string) serf.Member {
return serf.Member{
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "nomad",
"region": region,
"dc": "dc1",
"port": "10000",
"vsn": "1",
"raft_vsn": version,
},
Status: serf.StatusAlive,
}
}
cases := []struct {
members []serf.Member
region string
expected int
err error
}{
// No servers, error
{
members: []serf.Member{},
expected: -1,
err: errors.New("no servers found"),
},
// One server
{
members: []serf.Member{
makeMember("1", "global"),
},
region: "global",
expected: 1,
},
// One server, bad version formatting
{
members: []serf.Member{
makeMember("asdf", "global"),
},
region: "global",
expected: -1,
err: errors.New(`strconv.Atoi: parsing "asdf": invalid syntax`),
},
// One server, wrong datacenter
{
members: []serf.Member{
makeMember("1", "global"),
},
region: "nope",
expected: -1,
err: errors.New("no servers found"),
},
// Multiple servers, different versions
{
members: []serf.Member{
makeMember("1", "global"),
makeMember("2", "global"),
},
region: "global",
expected: 1,
},
// Multiple servers, same version
{
members: []serf.Member{
makeMember("2", "global"),
makeMember("2", "global"),
},
region: "global",
expected: 2,
},
// Multiple servers, multiple datacenters
{
members: []serf.Member{
makeMember("3", "r1"),
makeMember("2", "r1"),
makeMember("1", "r2"),
},
region: "r1",
expected: 2,
},
}
for _, tc := range cases {
result, err := MinRaftProtocol(tc.region, tc.members)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.expected, tc)
}
if tc.err != nil {
if err == nil || tc.err.Error() != err.Error() {
t.Fatalf("bad: %v, %v, %v", err, tc.err, tc)
}
}
}
}
func TestShuffleStrings(t *testing.T) {
t.Parallel()
// Generate input

View File

@ -62,6 +62,7 @@ type PortsConfig struct {
type ServerConfig struct {
Enabled bool `json:"enabled"`
BootstrapExpect int `json:"bootstrap_expect"`
RaftProtocol int `json:"raft_protocol,omitempty"`
}
// ClientConfig is used to configure the client

View File

@ -11,7 +11,7 @@ var (
GitDescribe string
// The main version number that is being run at the moment.
Version = "0.7.1"
Version = "0.8.0"
// A pre-release marker for the version. If this is "" (empty string)
// then it means that it is a final release. Otherwise, this is a pre-release