Merge pull request #2771 from hashicorp/f-autopilot
Autopilot dead server cleanup, config, and raft version compatibility
This commit is contained in:
commit
44f0b08db7
|
@ -1,5 +1,13 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Operator can be used to perform low-level operator tasks for Consul.
|
||||
type Operator struct {
|
||||
c *Client
|
||||
|
@ -63,6 +71,25 @@ type KeyringResponse struct {
|
|||
NumNodes int
|
||||
}
|
||||
|
||||
// AutopilotConfiguration is used for querying/setting the Autopilot configuration.
|
||||
// Autopilot helps manage operator tasks related to Consul servers like removing
|
||||
// failed servers from the Raft quorum.
|
||||
type AutopilotConfiguration struct {
|
||||
// CleanupDeadServers controls whether to remove dead servers from the Raft
|
||||
// peer list when a new server joins
|
||||
CleanupDeadServers bool
|
||||
|
||||
// CreateIndex holds the index corresponding the creation of this configuration.
|
||||
// This is a read-only field.
|
||||
CreateIndex uint64
|
||||
|
||||
// ModifyIndex will be set to the index of the last update when retrieving the
|
||||
// Autopilot configuration. Resubmitting a configuration with
|
||||
// AutopilotCASConfiguration will perform a check-and-set operation which ensures
|
||||
// there hasn't been a subsequent update since the configuration was retrieved.
|
||||
ModifyIndex uint64
|
||||
}
|
||||
|
||||
// RaftGetConfiguration is used to query the current Raft peer set.
|
||||
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
|
||||
r := op.c.newRequest("GET", "/v1/operator/raft/configuration")
|
||||
|
@ -161,3 +188,56 @@ func (op *Operator) KeyringUse(key string, q *WriteOptions) error {
|
|||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotGetConfiguration is used to query the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfiguration, error) {
|
||||
r := op.c.newRequest("GET", "/v1/operator/autopilot/configuration")
|
||||
r.setQueryOptions(q)
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var out AutopilotConfiguration
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotSetConfiguration(conf *AutopilotConfiguration, q *WriteOptions) error {
|
||||
r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
|
||||
r.setWriteOptions(q)
|
||||
r.obj = conf
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotCASConfiguration is used to perform a Check-And-Set update on the
|
||||
// Autopilot configuration. The ModifyIndex value will be respected. Returns
|
||||
// true on success or false on failures.
|
||||
func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *WriteOptions) (bool, error) {
|
||||
r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
|
||||
r.setWriteOptions(q)
|
||||
r.params.Set("cas", strconv.FormatUint(conf.ModifyIndex, 10))
|
||||
r.obj = conf
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var buf bytes.Buffer
|
||||
if _, err := io.Copy(&buf, resp.Body); err != nil {
|
||||
return false, fmt.Errorf("Failed to read response: %v", err)
|
||||
}
|
||||
res := strings.Contains(string(buf.Bytes()), "true")
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
|
|
@ -104,3 +104,77 @@ func TestOperator_KeyringInstallListPutRemove(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_AutopilotGetSetConfiguration(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
operator := c.Operator()
|
||||
config, err := operator.AutopilotGetConfiguration(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %v", config)
|
||||
}
|
||||
|
||||
// Change a config setting
|
||||
newConf := &AutopilotConfiguration{CleanupDeadServers: false}
|
||||
if err := operator.AutopilotSetConfiguration(newConf, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
config, err = operator.AutopilotGetConfiguration(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %v", config)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_AutopilotCASConfiguration(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
operator := c.Operator()
|
||||
config, err := operator.AutopilotGetConfiguration(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %v", config)
|
||||
}
|
||||
|
||||
// Pass an invalid ModifyIndex
|
||||
{
|
||||
newConf := &AutopilotConfiguration{
|
||||
CleanupDeadServers: false,
|
||||
ModifyIndex: config.ModifyIndex - 1,
|
||||
}
|
||||
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp {
|
||||
t.Fatalf("bad: %v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
// Pass a valid ModifyIndex
|
||||
{
|
||||
newConf := &AutopilotConfiguration{
|
||||
CleanupDeadServers: false,
|
||||
ModifyIndex: config.ModifyIndex,
|
||||
}
|
||||
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !resp {
|
||||
t.Fatalf("bad: %v", resp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-sockaddr/template"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/shirou/gopsutil/host"
|
||||
|
@ -382,6 +383,9 @@ func (a *Agent) consulConfig() *consul.Config {
|
|||
if a.config.Protocol > 0 {
|
||||
base.ProtocolVersion = uint8(a.config.Protocol)
|
||||
}
|
||||
if a.config.RaftProtocol != 0 {
|
||||
base.RaftConfig.ProtocolVersion = raft.ProtocolVersion(a.config.RaftProtocol)
|
||||
}
|
||||
if a.config.ACLToken != "" {
|
||||
base.ACLToken = a.config.ACLToken
|
||||
}
|
||||
|
@ -412,6 +416,9 @@ func (a *Agent) consulConfig() *consul.Config {
|
|||
if a.config.SessionTTLMinRaw != "" {
|
||||
base.SessionTTLMin = a.config.SessionTTLMin
|
||||
}
|
||||
if a.config.Autopilot.CleanupDeadServers != nil {
|
||||
base.AutopilotConfig.CleanupDeadServers = *a.config.Autopilot.CleanupDeadServers
|
||||
}
|
||||
|
||||
// Format the build string
|
||||
revision := a.config.Revision
|
||||
|
|
|
@ -260,6 +260,13 @@ type Telemetry struct {
|
|||
CirconusBrokerSelectTag string `mapstructure:"circonus_broker_select_tag"`
|
||||
}
|
||||
|
||||
// Autopilot is used to configure helpful features for operating Consul servers.
|
||||
type Autopilot struct {
|
||||
// CleanupDeadServers enables the automatic cleanup of dead servers when new ones
|
||||
// are added to the peer list. Defaults to true.
|
||||
CleanupDeadServers *bool `mapstructure:"cleanup_dead_servers"`
|
||||
}
|
||||
|
||||
// Config is the configuration that can be set for an Agent.
|
||||
// Some of this is configurable as CLI flags, but most must
|
||||
// be set using a configuration file.
|
||||
|
@ -385,11 +392,17 @@ type Config struct {
|
|||
// servers. This can be changed on reload.
|
||||
SkipLeaveOnInt *bool `mapstructure:"skip_leave_on_interrupt"`
|
||||
|
||||
// Autopilot is used to configure helpful features for operating Consul servers.
|
||||
Autopilot Autopilot `mapstructure:"autopilot"`
|
||||
|
||||
Telemetry Telemetry `mapstructure:"telemetry"`
|
||||
|
||||
// Protocol is the Consul protocol version to use.
|
||||
Protocol int `mapstructure:"protocol"`
|
||||
|
||||
// RaftProtocol sets the Raft protocol version to use on this server.
|
||||
RaftProtocol int `mapstructure:"raft_protocol"`
|
||||
|
||||
// EnableDebug is used to enable various debugging features
|
||||
EnableDebug bool `mapstructure:"enable_debug"`
|
||||
|
||||
|
@ -1280,6 +1293,9 @@ func MergeConfig(a, b *Config) *Config {
|
|||
if b.Protocol > 0 {
|
||||
result.Protocol = b.Protocol
|
||||
}
|
||||
if b.RaftProtocol != 0 {
|
||||
result.RaftProtocol = b.RaftProtocol
|
||||
}
|
||||
if b.NodeID != "" {
|
||||
result.NodeID = b.NodeID
|
||||
}
|
||||
|
@ -1328,6 +1344,9 @@ func MergeConfig(a, b *Config) *Config {
|
|||
if b.SkipLeaveOnInt != nil {
|
||||
result.SkipLeaveOnInt = b.SkipLeaveOnInt
|
||||
}
|
||||
if b.Autopilot.CleanupDeadServers != nil {
|
||||
result.Autopilot.CleanupDeadServers = b.Autopilot.CleanupDeadServers
|
||||
}
|
||||
if b.Telemetry.DisableHostname == true {
|
||||
result.Telemetry.DisableHostname = true
|
||||
}
|
||||
|
|
|
@ -281,6 +281,17 @@ func TestDecodeConfig(t *testing.T) {
|
|||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
|
||||
// raft protocol
|
||||
input = `{"raft_protocol": 3}`
|
||||
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
if config.RaftProtocol != 3 {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
|
||||
// Node metadata fields
|
||||
input = `{"node_meta": {"thing1": "1", "thing2": "2"}}`
|
||||
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
|
@ -1091,6 +1102,17 @@ func TestDecodeConfig_Performance(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestDecodeConfig_Autopilot(t *testing.T) {
|
||||
input := `{"autopilot": { "cleanup_dead_servers": true }}`
|
||||
config, err := DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if config.Autopilot.CleanupDeadServers == nil || !*config.Autopilot.CleanupDeadServers {
|
||||
t.Fatalf("bad: cleanup_dead_servers isn't set: %#v", config)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeConfig_Services(t *testing.T) {
|
||||
input := `{
|
||||
"services": [
|
||||
|
@ -1602,9 +1624,13 @@ func TestMergeConfig(t *testing.T) {
|
|||
HTTP: "127.0.0.2",
|
||||
HTTPS: "127.0.0.4",
|
||||
},
|
||||
Server: true,
|
||||
LeaveOnTerm: Bool(true),
|
||||
SkipLeaveOnInt: Bool(true),
|
||||
Server: true,
|
||||
LeaveOnTerm: Bool(true),
|
||||
SkipLeaveOnInt: Bool(true),
|
||||
RaftProtocol: 3,
|
||||
Autopilot: Autopilot{
|
||||
CleanupDeadServers: Bool(true),
|
||||
},
|
||||
EnableDebug: true,
|
||||
VerifyIncoming: true,
|
||||
VerifyOutgoing: true,
|
||||
|
|
|
@ -297,6 +297,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
|
|||
s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration))
|
||||
s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer))
|
||||
s.handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint))
|
||||
s.handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration))
|
||||
s.handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral))
|
||||
s.handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific))
|
||||
s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate))
|
||||
|
|
|
@ -3,11 +3,11 @@ package agent
|
|||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/raft"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// OperatorRaftConfiguration is used to inspect the current Raft configuration.
|
||||
|
@ -105,7 +105,7 @@ func (s *HTTPServer) OperatorKeyringEndpoint(resp http.ResponseWriter, req *http
|
|||
case "DELETE":
|
||||
return s.KeyringRemove(resp, req, &args)
|
||||
default:
|
||||
resp.WriteHeader(405)
|
||||
resp.WriteHeader(http.StatusMethodNotAllowed)
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
@ -166,3 +166,61 @@ func keyringErrorsOrNil(responses []*structs.KeyringResponse) error {
|
|||
}
|
||||
return errs
|
||||
}
|
||||
|
||||
// OperatorAutopilotConfiguration is used to inspect the current Autopilot configuration.
|
||||
// This supports the stale query mode in case the cluster doesn't have a leader.
|
||||
func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
// Switch on the method
|
||||
switch req.Method {
|
||||
case "GET":
|
||||
var args structs.DCSpecificRequest
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var reply structs.AutopilotConfig
|
||||
if err := s.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return reply, nil
|
||||
case "PUT":
|
||||
var args structs.AutopilotSetConfigRequest
|
||||
s.parseDC(req, &args.Datacenter)
|
||||
s.parseToken(req, &args.Token)
|
||||
|
||||
// Check for cas value
|
||||
params := req.URL.Query()
|
||||
if _, ok := params["cas"]; ok {
|
||||
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
|
||||
if err != nil {
|
||||
resp.WriteHeader(400)
|
||||
resp.Write([]byte(fmt.Sprintf("Error parsing cas value: %v", err)))
|
||||
return nil, nil
|
||||
}
|
||||
args.Config.ModifyIndex = casVal
|
||||
args.CAS = true
|
||||
}
|
||||
|
||||
if err := decodeBody(req, &args.Config, nil); err != nil {
|
||||
resp.WriteHeader(400)
|
||||
resp.Write([]byte(fmt.Sprintf("Error parsing autopilot config: %v", err)))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var reply bool
|
||||
if err := s.agent.RPC("Operator.AutopilotSetConfiguration", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Only use the out value if this was a CAS
|
||||
if !args.CAS {
|
||||
return true, nil
|
||||
} else {
|
||||
return reply, nil
|
||||
}
|
||||
default:
|
||||
resp.WriteHeader(http.StatusMethodNotAllowed)
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -285,3 +285,138 @@ func TestOperator_Keyring_InvalidRelayFactor(t *testing.T) {
|
|||
}
|
||||
}, configFunc)
|
||||
}
|
||||
|
||||
func TestOperator_AutopilotGetConfiguration(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
body := bytes.NewBuffer(nil)
|
||||
req, err := http.NewRequest("GET", "/v1/operator/autopilot/configuration", body)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.OperatorAutopilotConfiguration(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp.Code != 200 {
|
||||
t.Fatalf("bad code: %d", resp.Code)
|
||||
}
|
||||
out, ok := obj.(structs.AutopilotConfig)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected: %T", obj)
|
||||
}
|
||||
if !out.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestOperator_AutopilotSetConfiguration(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
body := bytes.NewBuffer([]byte(`{"CleanupDeadServers": false}`))
|
||||
req, err := http.NewRequest("PUT", "/v1/operator/autopilot/configuration", body)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
if _, err = srv.OperatorAutopilotConfiguration(resp, req); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp.Code != 200 {
|
||||
t.Fatalf("bad code: %d", resp.Code)
|
||||
}
|
||||
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
|
||||
var reply structs.AutopilotConfig
|
||||
if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if reply.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestOperator_AutopilotCASConfiguration(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
body := bytes.NewBuffer([]byte(`{"CleanupDeadServers": false}`))
|
||||
req, err := http.NewRequest("PUT", "/v1/operator/autopilot/configuration", body)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
if _, err = srv.OperatorAutopilotConfiguration(resp, req); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp.Code != 200 {
|
||||
t.Fatalf("bad code: %d", resp.Code)
|
||||
}
|
||||
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
|
||||
var reply structs.AutopilotConfig
|
||||
if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if reply.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
|
||||
// Create a CAS request, bad index
|
||||
{
|
||||
buf := bytes.NewBuffer([]byte(`{"CleanupDeadServers": true}`))
|
||||
req, err := http.NewRequest("PUT",
|
||||
fmt.Sprintf("/v1/operator/autopilot/configuration?cas=%d", reply.ModifyIndex-1), buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.OperatorAutopilotConfiguration(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if res := obj.(bool); res {
|
||||
t.Fatalf("should NOT work")
|
||||
}
|
||||
}
|
||||
|
||||
// Create a CAS request, good index
|
||||
{
|
||||
buf := bytes.NewBuffer([]byte(`{"CleanupDeadServers": true}`))
|
||||
req, err := http.NewRequest("PUT",
|
||||
fmt.Sprintf("/v1/operator/autopilot/configuration?cas=%d", reply.ModifyIndex), buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.OperatorAutopilotConfiguration(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if res := obj.(bool); !res {
|
||||
t.Fatalf("should work")
|
||||
}
|
||||
}
|
||||
|
||||
// Verify the update
|
||||
if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !reply.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
32
command/operator_autopilot.go
Normal file
32
command/operator_autopilot.go
Normal file
|
@ -0,0 +1,32 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/consul/command/base"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
type OperatorAutopilotCommand struct {
|
||||
base.Command
|
||||
}
|
||||
|
||||
func (c *OperatorAutopilotCommand) Help() string {
|
||||
helpText := `
|
||||
Usage: consul operator autopilot <subcommand> [options]
|
||||
|
||||
The Autopilot operator command is used to interact with Consul's Autopilot
|
||||
subsystem. The command can be used to view or modify the current configuration.
|
||||
|
||||
`
|
||||
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
||||
|
||||
func (c *OperatorAutopilotCommand) Synopsis() string {
|
||||
return "Provides tools for modifying Autopilot configuration"
|
||||
}
|
||||
|
||||
func (c *OperatorAutopilotCommand) Run(args []string) int {
|
||||
return cli.RunResultHelp
|
||||
}
|
61
command/operator_autopilot_get.go
Normal file
61
command/operator_autopilot_get.go
Normal file
|
@ -0,0 +1,61 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/command/base"
|
||||
)
|
||||
|
||||
type OperatorAutopilotGetCommand struct {
|
||||
base.Command
|
||||
}
|
||||
|
||||
func (c *OperatorAutopilotGetCommand) Help() string {
|
||||
helpText := `
|
||||
Usage: consul operator autopilot get-config [options]
|
||||
|
||||
Displays the current Autopilot configuration.
|
||||
|
||||
` + c.Command.Help()
|
||||
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
||||
|
||||
func (c *OperatorAutopilotGetCommand) Synopsis() string {
|
||||
return "Display the current Autopilot configuration"
|
||||
}
|
||||
|
||||
func (c *OperatorAutopilotGetCommand) Run(args []string) int {
|
||||
c.Command.NewFlagSet(c)
|
||||
|
||||
if err := c.Command.Parse(args); err != nil {
|
||||
if err == flag.ErrHelp {
|
||||
return 0
|
||||
}
|
||||
c.Ui.Error(fmt.Sprintf("Failed to parse args: %v", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
// Set up a client.
|
||||
client, err := c.Command.HTTPClient()
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
// Fetch the current configuration.
|
||||
opts := &api.QueryOptions{
|
||||
AllowStale: c.Command.HTTPStale(),
|
||||
}
|
||||
config, err := client.Operator().AutopilotGetConfiguration(opts)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error querying Autopilot configuration: %s", err))
|
||||
return 1
|
||||
}
|
||||
c.Ui.Output(fmt.Sprintf("CleanupDeadServers = %v", config.CleanupDeadServers))
|
||||
|
||||
return 0
|
||||
}
|
37
command/operator_autopilot_get_test.go
Normal file
37
command/operator_autopilot_get_test.go
Normal file
|
@ -0,0 +1,37 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/command/base"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
func TestOperator_Autopilot_Get_Implements(t *testing.T) {
|
||||
var _ cli.Command = &OperatorAutopilotGetCommand{}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_Get(t *testing.T) {
|
||||
a1 := testAgent(t)
|
||||
defer a1.Shutdown()
|
||||
waitForLeader(t, a1.httpAddr)
|
||||
|
||||
ui := new(cli.MockUi)
|
||||
c := OperatorAutopilotGetCommand{
|
||||
Command: base.Command{
|
||||
Ui: ui,
|
||||
Flags: base.FlagSetHTTP,
|
||||
},
|
||||
}
|
||||
args := []string{"-http-addr=" + a1.httpAddr}
|
||||
|
||||
code := c.Run(args)
|
||||
if code != 0 {
|
||||
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
|
||||
}
|
||||
output := strings.TrimSpace(ui.OutputWriter.String())
|
||||
if !strings.Contains(output, "CleanupDeadServers = true") {
|
||||
t.Fatalf("bad: %s", output)
|
||||
}
|
||||
}
|
78
command/operator_autopilot_set.go
Normal file
78
command/operator_autopilot_set.go
Normal file
|
@ -0,0 +1,78 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/consul/command/base"
|
||||
)
|
||||
|
||||
type OperatorAutopilotSetCommand struct {
|
||||
base.Command
|
||||
}
|
||||
|
||||
func (c *OperatorAutopilotSetCommand) Help() string {
|
||||
helpText := `
|
||||
Usage: consul operator autopilot set-config [options]
|
||||
|
||||
Modifies the current Autopilot configuration.
|
||||
|
||||
` + c.Command.Help()
|
||||
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
||||
|
||||
func (c *OperatorAutopilotSetCommand) Synopsis() string {
|
||||
return "Modify the current Autopilot configuration"
|
||||
}
|
||||
|
||||
func (c *OperatorAutopilotSetCommand) Run(args []string) int {
|
||||
var cleanupDeadServers base.BoolValue
|
||||
|
||||
f := c.Command.NewFlagSet(c)
|
||||
|
||||
f.Var(&cleanupDeadServers, "cleanup-dead-servers",
|
||||
"Controls whether Consul will automatically remove dead servers "+
|
||||
"when new ones are successfully added. Must be one of `true|false`.")
|
||||
|
||||
if err := c.Command.Parse(args); err != nil {
|
||||
if err == flag.ErrHelp {
|
||||
return 0
|
||||
}
|
||||
c.Ui.Error(fmt.Sprintf("Failed to parse args: %v", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
// Set up a client.
|
||||
client, err := c.Command.HTTPClient()
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
// Fetch the current configuration.
|
||||
operator := client.Operator()
|
||||
conf, err := operator.AutopilotGetConfiguration(nil)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error querying Autopilot configuration: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
// Update the config values.
|
||||
cleanupDeadServers.Merge(&conf.CleanupDeadServers)
|
||||
|
||||
// Check-and-set the new configuration.
|
||||
result, err := operator.AutopilotCASConfiguration(conf, nil)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error setting Autopilot configuration: %s", err))
|
||||
return 1
|
||||
}
|
||||
if result {
|
||||
c.Ui.Output("Configuration updated!")
|
||||
return 0
|
||||
} else {
|
||||
c.Ui.Output("Configuration could not be atomically updated, please try again")
|
||||
return 1
|
||||
}
|
||||
}
|
50
command/operator_autopilot_set_test.go
Normal file
50
command/operator_autopilot_set_test.go
Normal file
|
@ -0,0 +1,50 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/command/base"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
func TestOperator_Autopilot_Set_Implements(t *testing.T) {
|
||||
var _ cli.Command = &OperatorAutopilotSetCommand{}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_Set(t *testing.T) {
|
||||
a1 := testAgent(t)
|
||||
defer a1.Shutdown()
|
||||
waitForLeader(t, a1.httpAddr)
|
||||
|
||||
ui := new(cli.MockUi)
|
||||
c := OperatorAutopilotSetCommand{
|
||||
Command: base.Command{
|
||||
Ui: ui,
|
||||
Flags: base.FlagSetHTTP,
|
||||
},
|
||||
}
|
||||
args := []string{"-http-addr=" + a1.httpAddr, "-cleanup-dead-servers=false"}
|
||||
|
||||
code := c.Run(args)
|
||||
if code != 0 {
|
||||
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
|
||||
}
|
||||
output := strings.TrimSpace(ui.OutputWriter.String())
|
||||
if !strings.Contains(output, "Configuration updated") {
|
||||
t.Fatalf("bad: %s", output)
|
||||
}
|
||||
|
||||
req := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.AutopilotConfig
|
||||
if err := a1.agent.RPC("Operator.AutopilotGetConfiguration", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if reply.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
}
|
11
command/operator_autopilot_test.go
Normal file
11
command/operator_autopilot_test.go
Normal file
|
@ -0,0 +1,11 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
func TestOperator_Autopilot_Implements(t *testing.T) {
|
||||
var _ cli.Command = &OperatorAutopilotCommand{}
|
||||
}
|
27
commands.go
27
commands.go
|
@ -216,6 +216,33 @@ func init() {
|
|||
}, nil
|
||||
},
|
||||
|
||||
"operator autopilot": func() (cli.Command, error) {
|
||||
return &command.OperatorAutopilotCommand{
|
||||
Command: base.Command{
|
||||
Flags: base.FlagSetNone,
|
||||
Ui: ui,
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
|
||||
"operator autopilot get-config": func() (cli.Command, error) {
|
||||
return &command.OperatorAutopilotGetCommand{
|
||||
Command: base.Command{
|
||||
Flags: base.FlagSetHTTP,
|
||||
Ui: ui,
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
|
||||
"operator autopilot set-config": func() (cli.Command, error) {
|
||||
return &command.OperatorAutopilotSetCommand{
|
||||
Command: base.Command{
|
||||
Flags: base.FlagSetHTTP,
|
||||
Ui: ui,
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
|
||||
"operator raft": func() (cli.Command, error) {
|
||||
return &command.OperatorRaftCommand{
|
||||
Command: base.Command{
|
||||
|
|
|
@ -25,13 +25,15 @@ func (k *Key) Equal(x *Key) bool {
|
|||
|
||||
// Server is used to return details of a consul server
|
||||
type Server struct {
|
||||
Name string
|
||||
Datacenter string
|
||||
Port int
|
||||
Bootstrap bool
|
||||
Expect int
|
||||
Version int
|
||||
Addr net.Addr
|
||||
Name string
|
||||
ID string
|
||||
Datacenter string
|
||||
Port int
|
||||
Bootstrap bool
|
||||
Expect int
|
||||
Version int
|
||||
RaftVersion int
|
||||
Addr net.Addr
|
||||
}
|
||||
|
||||
// Key returns the corresponding Key
|
||||
|
@ -84,16 +86,24 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
|
|||
return false, nil
|
||||
}
|
||||
|
||||
raft_vsn_str := m.Tags["raft_vsn"]
|
||||
raft_vsn, err := strconv.Atoi(raft_vsn_str)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
addr := &net.TCPAddr{IP: m.Addr, Port: port}
|
||||
|
||||
parts := &Server{
|
||||
Name: m.Name,
|
||||
Datacenter: datacenter,
|
||||
Port: port,
|
||||
Bootstrap: bootstrap,
|
||||
Expect: expect,
|
||||
Addr: addr,
|
||||
Version: vsn,
|
||||
Name: m.Name,
|
||||
ID: m.Tags["id"],
|
||||
Datacenter: datacenter,
|
||||
Port: port,
|
||||
Bootstrap: bootstrap,
|
||||
Expect: expect,
|
||||
Addr: addr,
|
||||
Version: vsn,
|
||||
RaftVersion: raft_vsn,
|
||||
}
|
||||
return true, parts
|
||||
}
|
||||
|
|
|
@ -55,10 +55,12 @@ func TestIsConsulServer(t *testing.T) {
|
|||
Name: "foo",
|
||||
Addr: net.IP([]byte{127, 0, 0, 1}),
|
||||
Tags: map[string]string{
|
||||
"role": "consul",
|
||||
"dc": "east-aws",
|
||||
"port": "10000",
|
||||
"vsn": "1",
|
||||
"role": "consul",
|
||||
"id": "asdf",
|
||||
"dc": "east-aws",
|
||||
"port": "10000",
|
||||
"vsn": "1",
|
||||
"raft_vsn": "3",
|
||||
},
|
||||
}
|
||||
ok, parts := agent.IsConsulServer(m)
|
||||
|
@ -68,12 +70,18 @@ func TestIsConsulServer(t *testing.T) {
|
|||
if parts.Name != "foo" {
|
||||
t.Fatalf("bad: %v", parts)
|
||||
}
|
||||
if parts.ID != "asdf" {
|
||||
t.Fatalf("bad: %v", parts.ID)
|
||||
}
|
||||
if parts.Bootstrap {
|
||||
t.Fatalf("unexpected bootstrap")
|
||||
}
|
||||
if parts.Expect != 0 {
|
||||
t.Fatalf("bad: %v", parts.Expect)
|
||||
}
|
||||
if parts.RaftVersion != 3 {
|
||||
t.Fatalf("bad: %v", parts.RaftVersion)
|
||||
}
|
||||
m.Tags["bootstrap"] = "1"
|
||||
m.Tags["disabled"] = "1"
|
||||
ok, parts = agent.IsConsulServer(m)
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/memberlist"
|
||||
|
@ -274,6 +275,10 @@ type Config struct {
|
|||
// This period is meant to be long enough for a leader election to take
|
||||
// place, and a small jitter is applied to avoid a thundering herd.
|
||||
RPCHoldTimeout time.Duration
|
||||
|
||||
// AutopilotConfig is used to apply the initial autopilot config when
|
||||
// bootstrapping.
|
||||
AutopilotConfig *structs.AutopilotConfig
|
||||
}
|
||||
|
||||
// CheckVersion is used to check if the ProtocolVersion is valid
|
||||
|
@ -346,6 +351,10 @@ func DefaultConfig() *Config {
|
|||
RPCHoldTimeout: 7 * time.Second,
|
||||
|
||||
TLSMinVersion: "tls10",
|
||||
|
||||
AutopilotConfig: &structs.AutopilotConfig{
|
||||
CleanupDeadServers: true,
|
||||
},
|
||||
}
|
||||
|
||||
// Increase our reap interval to 3 days instead of 24h.
|
||||
|
@ -360,9 +369,10 @@ func DefaultConfig() *Config {
|
|||
conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort
|
||||
conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort
|
||||
|
||||
// Enable interoperability with unversioned Raft library, and don't
|
||||
// start using new ID-based features yet.
|
||||
conf.RaftConfig.ProtocolVersion = 1
|
||||
// TODO: default to 3 in Consul 0.9
|
||||
// Use a transitional version of the raft protocol to interoperate with
|
||||
// versions 1 and 3
|
||||
conf.RaftConfig.ProtocolVersion = 2
|
||||
conf.ScaleRaft(DefaultRaftMultiplier)
|
||||
|
||||
// Disable shutdown on removal
|
||||
|
|
|
@ -105,6 +105,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
|
|||
return c.applyPreparedQueryOperation(buf[1:], log.Index)
|
||||
case structs.TxnRequestType:
|
||||
return c.applyTxn(buf[1:], log.Index)
|
||||
case structs.AutopilotRequestType:
|
||||
return c.applyAutopilotUpdate(buf[1:], log.Index)
|
||||
default:
|
||||
if ignoreUnknown {
|
||||
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
|
||||
|
@ -310,6 +312,25 @@ func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} {
|
|||
return structs.TxnResponse{results, errors}
|
||||
}
|
||||
|
||||
func (c *consulFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
|
||||
var req structs.AutopilotSetConfigRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "fsm", "autopilot"}, time.Now())
|
||||
|
||||
if req.CAS {
|
||||
act, err := c.state.AutopilotCASConfig(index, req.Config.ModifyIndex, &req.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
return act
|
||||
}
|
||||
} else {
|
||||
return c.state.AutopilotSetConfig(index, &req.Config)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||
defer func(start time.Time) {
|
||||
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
|
||||
|
|
109
consul/leader.go
109
consul/leader.go
|
@ -152,6 +152,13 @@ func (s *Server) establishLeadership() error {
|
|||
err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Setup autopilot config if we are the leader and need to
|
||||
if err := s.initializeAutopilot(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: Autopilot initialization failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -237,6 +244,29 @@ func (s *Server) initializeACL() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// initializeAutopilot is used to setup the autopilot config if we are
|
||||
// the leader and need to do this
|
||||
func (s *Server) initializeAutopilot() error {
|
||||
// Bail if the config has already been initialized
|
||||
state := s.fsm.State()
|
||||
_, config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get autopilot config: %v", err)
|
||||
}
|
||||
if config != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
req := structs.AutopilotSetConfigRequest{
|
||||
Config: *s.config.AutopilotConfig,
|
||||
}
|
||||
if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil {
|
||||
return fmt.Errorf("failed to initialize autopilot config")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// reconcile is used to reconcile the differences between Serf
|
||||
// membership and what is reflected in our strongly consistent store.
|
||||
// Mainly we need to ensure all live nodes are registered, all failed
|
||||
|
@ -562,11 +592,42 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
|
|||
}
|
||||
|
||||
// Attempt to add as a peer
|
||||
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
|
||||
if err := addFuture.Error(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if minRaftProtocol >= 2 && parts.RaftVersion >= 3 {
|
||||
addFuture := s.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
|
||||
if err := addFuture.Error(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
|
||||
if err := addFuture.Error(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
state := s.fsm.State()
|
||||
_, autopilotConf, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Look for dead servers to clean up
|
||||
if autopilotConf.CleanupDeadServers {
|
||||
for _, member := range s.serfLAN.Members() {
|
||||
valid, _ := agent.IsConsulServer(member)
|
||||
if valid && member.Name != m.Name && member.Status == serf.StatusFailed {
|
||||
s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", member.Name)
|
||||
go s.serfLAN.RemoveFailedNode(member.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -583,21 +644,39 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error {
|
|||
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
|
||||
return err
|
||||
}
|
||||
for _, server := range configFuture.Configuration().Servers {
|
||||
if server.Address == raft.ServerAddress(addr) {
|
||||
goto REMOVE
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
REMOVE:
|
||||
// Attempt to remove as a peer.
|
||||
future := s.raft.RemovePeer(raft.ServerAddress(addr))
|
||||
if err := future.Error(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
|
||||
addr, err)
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, parts := agent.IsConsulServer(m)
|
||||
|
||||
// Pick which remove API to use based on how the server was added.
|
||||
for _, server := range configFuture.Configuration().Servers {
|
||||
// If we understand the new add/remove APIs and the server was added by ID, use the new remove API
|
||||
if minRaftProtocol >= 2 && server.ID == raft.ServerID(parts.ID) {
|
||||
s.logger.Printf("[INFO] consul: removing server by ID: %q", server.ID)
|
||||
future := s.raft.RemoveServer(raft.ServerID(parts.ID), 0, 0)
|
||||
if err := future.Error(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
|
||||
server.ID, err)
|
||||
return err
|
||||
}
|
||||
break
|
||||
} else if server.Address == raft.ServerAddress(addr) {
|
||||
// If not, use the old remove API
|
||||
s.logger.Printf("[INFO] consul: removing server by address: %q", server.Address)
|
||||
future := s.raft.RemovePeer(raft.ServerAddress(addr))
|
||||
if err := future.Error(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
|
||||
addr, err)
|
||||
return err
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -622,3 +622,162 @@ func TestLeader_ReapTombstones(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestLeader_CleanupDeadServers(t *testing.T) {
|
||||
dir1, s1 := testServerDCBootstrap(t, "dc1", true)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
servers := []*Server{s1, s2, s3}
|
||||
|
||||
// Try to join
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
|
||||
if _, err := s2.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, err := s3.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
for _, s := range servers {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
peers, _ := s.numPeers()
|
||||
return peers == 3, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 3 peers")
|
||||
})
|
||||
}
|
||||
|
||||
// Kill a non-leader server
|
||||
s2.Shutdown()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
alive := 0
|
||||
for _, m := range s1.LANMembers() {
|
||||
if m.Status == serf.StatusAlive {
|
||||
alive++
|
||||
}
|
||||
}
|
||||
return alive == 2, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 2 alive members")
|
||||
})
|
||||
|
||||
// Bring up and join a new server
|
||||
dir4, s4 := testServerDCBootstrap(t, "dc1", false)
|
||||
defer os.RemoveAll(dir4)
|
||||
defer s4.Shutdown()
|
||||
|
||||
if _, err := s4.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
servers[1] = s4
|
||||
|
||||
// Make sure the dead server is removed and we're back to 3 total peers
|
||||
for _, s := range servers {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
peers, _ := s.numPeers()
|
||||
return peers == 3, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 3 peers")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeader_RollRaftServer(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Bootstrap = true
|
||||
c.Datacenter = "dc1"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Bootstrap = false
|
||||
c.Datacenter = "dc1"
|
||||
c.RaftConfig.ProtocolVersion = 1
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
servers := []*Server{s1, s2, s3}
|
||||
|
||||
// Try to join
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
|
||||
if _, err := s2.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, err := s3.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
for _, s := range servers {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
peers, _ := s.numPeers()
|
||||
return peers == 3, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 3 peers")
|
||||
})
|
||||
}
|
||||
|
||||
// Kill the v1 server
|
||||
s2.Shutdown()
|
||||
|
||||
for _, s := range []*Server{s1, s3} {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
minVer, err := ServerMinRaftProtocol(s.LANMembers())
|
||||
return minVer == 2, err
|
||||
}, func(err error) {
|
||||
t.Fatalf("minimum protocol version among servers should be 2")
|
||||
})
|
||||
}
|
||||
|
||||
// Replace the dead server with one running raft protocol v3
|
||||
dir4, s4 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Bootstrap = false
|
||||
c.Datacenter = "dc1"
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
})
|
||||
defer os.RemoveAll(dir4)
|
||||
defer s4.Shutdown()
|
||||
if _, err := s4.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
servers[1] = s4
|
||||
|
||||
// Make sure the dead server is removed and we're back to 3 total peers
|
||||
for _, s := range servers {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
addrs := 0
|
||||
ids := 0
|
||||
future := s.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, server := range future.Configuration().Servers {
|
||||
if string(server.ID) == string(server.Address) {
|
||||
addrs++
|
||||
} else {
|
||||
ids++
|
||||
}
|
||||
}
|
||||
return addrs == 2 && ids == 1, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should see 2 legacy IDs and 1 GUID")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -125,3 +125,61 @@ REMOVE:
|
|||
op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer %q", args.Address)
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error {
|
||||
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This action requires operator read access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorRead() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
state := op.srv.fsm.State()
|
||||
_, config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*reply = *config
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error {
|
||||
if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This action requires operator write access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorWrite() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// Apply the update
|
||||
resp, err := op.srv.raftApply(structs.AutopilotRequestType, args)
|
||||
if err != nil {
|
||||
op.srv.logger.Printf("[ERR] consul.operator: Apply failed: %v", err)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
// Check if the return type is a bool.
|
||||
if respBool, ok := resp.(bool); ok {
|
||||
*reply = respBool
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -243,3 +243,186 @@ func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_GetConfiguration(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.AutopilotConfig.CleanupDeadServers = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.AutopilotConfig
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if reply.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
c.AutopilotConfig.CleanupDeadServers = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Try to get config without permissions
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.AutopilotConfig
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create an ACL with operator read permissions.
|
||||
var token string
|
||||
{
|
||||
var rules = `
|
||||
operator = "read"
|
||||
`
|
||||
|
||||
req := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: rules,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Now we can read and verify the config
|
||||
arg.Token = token
|
||||
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if reply.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_SetConfiguration(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.AutopilotConfig.CleanupDeadServers = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Change the autopilot config from the default
|
||||
arg := structs.AutopilotSetConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Config: structs.AutopilotConfig{
|
||||
CleanupDeadServers: true,
|
||||
},
|
||||
}
|
||||
var reply *bool
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure it's changed
|
||||
state := s1.fsm.State()
|
||||
_, config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
c.AutopilotConfig.CleanupDeadServers = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Try to set config without permissions
|
||||
arg := structs.AutopilotSetConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Config: structs.AutopilotConfig{
|
||||
CleanupDeadServers: true,
|
||||
},
|
||||
}
|
||||
var reply *bool
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create an ACL with operator write permissions.
|
||||
var token string
|
||||
{
|
||||
var rules = `
|
||||
operator = "write"
|
||||
`
|
||||
|
||||
req := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: rules,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Now we can update the config
|
||||
arg.Token = token
|
||||
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure it's changed
|
||||
state := s1.fsm.State()
|
||||
_, config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -280,11 +280,22 @@ func (s *Server) maybeBootstrap() {
|
|||
// Attempt a live bootstrap!
|
||||
var configuration raft.Configuration
|
||||
var addrs []string
|
||||
minRaftVersion, err := ServerMinRaftProtocol(members)
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: Failed to read server raft versions: %v", err)
|
||||
}
|
||||
|
||||
for _, server := range servers {
|
||||
addr := server.Addr.String()
|
||||
addrs = append(addrs, addr)
|
||||
var id raft.ServerID
|
||||
if minRaftVersion >= 3 {
|
||||
id = raft.ServerID(server.ID)
|
||||
} else {
|
||||
id = raft.ServerID(addr)
|
||||
}
|
||||
peer := raft.Server{
|
||||
ID: raft.ServerID(addr),
|
||||
ID: id,
|
||||
Address: raft.ServerAddress(addr),
|
||||
}
|
||||
configuration.Servers = append(configuration.Servers, peer)
|
||||
|
|
|
@ -317,6 +317,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
|
|||
conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion)
|
||||
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
|
||||
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
|
||||
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
|
||||
conf.Tags["build"] = s.config.Build
|
||||
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
|
||||
if s.config.Bootstrap {
|
||||
|
@ -378,9 +379,12 @@ func (s *Server) setupRaft() error {
|
|||
// Make sure we set the LogOutput.
|
||||
s.config.RaftConfig.LogOutput = s.config.LogOutput
|
||||
|
||||
// Our version of Raft protocol requires the LocalID to match the network
|
||||
// Versions of the Raft protocol below 3 require the LocalID to match the network
|
||||
// address of the transport.
|
||||
s.config.RaftConfig.LocalID = raft.ServerID(trans.LocalAddr())
|
||||
if s.config.RaftConfig.ProtocolVersion >= 3 {
|
||||
s.config.RaftConfig.LocalID = raft.ServerID(s.config.NodeID)
|
||||
}
|
||||
|
||||
// Build an all in-memory setup for dev mode, otherwise prepare a full
|
||||
// disk-based setup.
|
||||
|
@ -478,7 +482,7 @@ func (s *Server) setupRaft() error {
|
|||
configuration := raft.Configuration{
|
||||
Servers: []raft.Server{
|
||||
raft.Server{
|
||||
ID: raft.ServerID(trans.LocalAddr()),
|
||||
ID: s.config.RaftConfig.LocalID,
|
||||
Address: trans.LocalAddr(),
|
||||
},
|
||||
},
|
||||
|
|
|
@ -12,6 +12,8 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
||||
var nextPort int32 = 15000
|
||||
|
@ -46,6 +48,11 @@ func testServerConfig(t *testing.T, NodeName string) (string, *Config) {
|
|||
IP: []byte{127, 0, 0, 1},
|
||||
Port: getPort(),
|
||||
}
|
||||
nodeID, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
config.NodeID = types.NodeID(nodeID)
|
||||
config.SerfLANConfig.MemberlistConfig.BindAddr = "127.0.0.1"
|
||||
config.SerfLANConfig.MemberlistConfig.BindPort = getPort()
|
||||
config.SerfLANConfig.MemberlistConfig.SuspicionMult = 2
|
||||
|
|
86
consul/state/autopilot.go
Normal file
86
consul/state/autopilot.go
Normal file
|
@ -0,0 +1,86 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
// AutopilotConfig is used to get the current Autopilot configuration.
|
||||
func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the autopilot config
|
||||
c, err := tx.First("autopilot-config", "id")
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed autopilot config lookup: %s", err)
|
||||
}
|
||||
|
||||
config, ok := c.(*structs.AutopilotConfig)
|
||||
if !ok {
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
||||
return config.ModifyIndex, config, nil
|
||||
}
|
||||
|
||||
// AutopilotSetConfig is used to set the current Autopilot configuration.
|
||||
func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
s.autopilotSetConfigTxn(idx, tx, config)
|
||||
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotCASConfig is used to try updating the Autopilot configuration with a
|
||||
// given Raft index. If the CAS index specified is not equal to the last observed index
|
||||
// for the config, then the call is a noop,
|
||||
func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
// Check for an existing config
|
||||
existing, err := tx.First("autopilot-config", "id")
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed autopilot config lookup: %s", err)
|
||||
}
|
||||
|
||||
// If the existing index does not match the provided CAS
|
||||
// index arg, then we shouldn't update anything and can safely
|
||||
// return early here.
|
||||
e, ok := existing.(*structs.AutopilotConfig)
|
||||
if !ok || e.ModifyIndex != cidx {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
s.autopilotSetConfigTxn(idx, tx, config)
|
||||
|
||||
tx.Commit()
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error {
|
||||
// Check for an existing config
|
||||
existing, err := tx.First("autopilot-config", "id")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed autopilot config lookup: %s", err)
|
||||
}
|
||||
|
||||
// Set the indexes.
|
||||
if existing != nil {
|
||||
config.CreateIndex = existing.(*structs.AutopilotConfig).CreateIndex
|
||||
} else {
|
||||
config.CreateIndex = idx
|
||||
}
|
||||
config.ModifyIndex = idx
|
||||
|
||||
if err := tx.Insert("autopilot-config", config); err != nil {
|
||||
return fmt.Errorf("failed updating autopilot config: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
87
consul/state/autopilot_test.go
Normal file
87
consul/state/autopilot_test.go
Normal file
|
@ -0,0 +1,87 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
)
|
||||
|
||||
func TestStateStore_Autopilot(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
expected := &structs.AutopilotConfig{
|
||||
CleanupDeadServers: true,
|
||||
}
|
||||
|
||||
if err := s.AutopilotSetConfig(0, expected); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
idx, config, err := s.AutopilotConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if idx != 0 {
|
||||
t.Fatalf("bad: %d", idx)
|
||||
}
|
||||
if !reflect.DeepEqual(expected, config) {
|
||||
t.Fatalf("bad: %#v, %#v", expected, config)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_AutopilotCAS(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
expected := &structs.AutopilotConfig{
|
||||
CleanupDeadServers: true,
|
||||
}
|
||||
|
||||
if err := s.AutopilotSetConfig(0, expected); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.AutopilotSetConfig(1, expected); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Do a CAS with an index lower than the entry
|
||||
ok, err := s.AutopilotCASConfig(2, 0, &structs.AutopilotConfig{
|
||||
CleanupDeadServers: false,
|
||||
})
|
||||
if ok || err != nil {
|
||||
t.Fatalf("expected (false, nil), got: (%v, %#v)", ok, err)
|
||||
}
|
||||
|
||||
// Check that the index is untouched and the entry
|
||||
// has not been updated.
|
||||
idx, config, err := s.AutopilotConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if idx != 1 {
|
||||
t.Fatalf("bad: %d", idx)
|
||||
}
|
||||
if !config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
|
||||
// Do another CAS, this time with the correct index
|
||||
ok, err = s.AutopilotCASConfig(2, 1, &structs.AutopilotConfig{
|
||||
CleanupDeadServers: false,
|
||||
})
|
||||
if !ok || err != nil {
|
||||
t.Fatalf("expected (true, nil), got: (%v, %#v)", ok, err)
|
||||
}
|
||||
|
||||
// Make sure the config was updated
|
||||
idx, config, err = s.AutopilotConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if idx != 2 {
|
||||
t.Fatalf("bad: %d", idx)
|
||||
}
|
||||
if config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
}
|
|
@ -31,6 +31,7 @@ func stateStoreSchema() *memdb.DBSchema {
|
|||
aclsTableSchema,
|
||||
coordinatesTableSchema,
|
||||
preparedQueriesTableSchema,
|
||||
autopilotConfigTableSchema,
|
||||
}
|
||||
|
||||
// Add the tables to the root schema
|
||||
|
@ -440,3 +441,21 @@ func preparedQueriesTableSchema() *memdb.TableSchema {
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
// autopilotConfigTableSchema returns a new table schema used for storing
|
||||
// the autopilot configuration
|
||||
func autopilotConfigTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: "autopilot-config",
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
"id": &memdb.IndexSchema{
|
||||
Name: "id",
|
||||
AllowMissing: true,
|
||||
Unique: true,
|
||||
Indexer: &memdb.ConditionalIndex{
|
||||
Conditional: func(obj interface{}) (bool, error) { return true, nil },
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,15 @@ import (
|
|||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
type AutopilotConfig struct {
|
||||
// CleanupDeadServers controls whether to remove dead servers when a new
|
||||
// server is added to the Raft peers
|
||||
CleanupDeadServers bool
|
||||
|
||||
// RaftIndex stores the create/modify indexes of this configuration
|
||||
RaftIndex
|
||||
}
|
||||
|
||||
// RaftServer has information about a server in the Raft configuration.
|
||||
type RaftServer struct {
|
||||
// ID is the unique ID for the server. These are currently the same
|
||||
|
@ -55,3 +64,24 @@ type RaftPeerByAddressRequest struct {
|
|||
func (op *RaftPeerByAddressRequest) RequestDatacenter() string {
|
||||
return op.Datacenter
|
||||
}
|
||||
|
||||
// AutopilotSetConfigRequest is used by the Operator endpoint to update the
|
||||
// current Autopilot configuration of the cluster.
|
||||
type AutopilotSetConfigRequest struct {
|
||||
// Datacenter is the target this request is intended for.
|
||||
Datacenter string
|
||||
|
||||
// Config is the new Autopilot configuration to use.
|
||||
Config AutopilotConfig
|
||||
|
||||
// CAS controls whether to use check-and-set semantics for this request.
|
||||
CAS bool
|
||||
|
||||
// WriteRequest holds the ACL token to go along with this request.
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// RequestDatacenter returns the datacenter for a given request.
|
||||
func (op *AutopilotSetConfigRequest) RequestDatacenter() string {
|
||||
return op.Datacenter
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ const (
|
|||
CoordinateBatchUpdateType
|
||||
PreparedQueryRequestType
|
||||
TxnRequestType
|
||||
AutopilotRequestType
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
@ -91,6 +91,35 @@ func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, e
|
|||
return (numServers > 0) && (numWhoGrok == numServers), nil
|
||||
}
|
||||
|
||||
// ServerMinRaftProtocol returns the lowest supported Raft protocol among alive servers
|
||||
func ServerMinRaftProtocol(members []serf.Member) (int, error) {
|
||||
minVersion := -1
|
||||
for _, m := range members {
|
||||
if m.Tags["role"] != "consul" || m.Status != serf.StatusAlive {
|
||||
continue
|
||||
}
|
||||
|
||||
vsn, ok := m.Tags["raft_vsn"]
|
||||
if !ok {
|
||||
vsn = "1"
|
||||
}
|
||||
raftVsn, err := strconv.Atoi(vsn)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
if minVersion == -1 || raftVsn < minVersion {
|
||||
minVersion = raftVsn
|
||||
}
|
||||
}
|
||||
|
||||
if minVersion == -1 {
|
||||
return minVersion, fmt.Errorf("No servers found")
|
||||
}
|
||||
|
||||
return minVersion, nil
|
||||
}
|
||||
|
||||
// Returns if a member is a consul node. Returns a bool,
|
||||
// and the datacenter.
|
||||
func isConsulNode(m serf.Member) (bool, string) {
|
||||
|
|
|
@ -28,6 +28,7 @@ The following endpoints are supported:
|
|||
* [`/v1/operator/raft/configuration`](#raft-configuration): Inspects the Raft configuration
|
||||
* [`/v1/operator/raft/peer`](#raft-peer): Operates on Raft peers
|
||||
* [`/v1/operator/keyring`](#keyring): Operates on gossip keyring
|
||||
* [`/v1/operator/autopilot/configuration`](#autopilot-configuration): Operates on the Autopilot configuration
|
||||
|
||||
Not all endpoints support blocking queries and all consistency modes,
|
||||
see details in the sections below.
|
||||
|
@ -258,3 +259,65 @@ If ACLs are enabled, the client will need to supply an ACL Token with
|
|||
[`keyring`](/docs/internals/acl.html#keyring) write privileges.
|
||||
|
||||
The return code will indicate success or failure.
|
||||
|
||||
### <a name="autopilot-configuration"></a> /v1/operator/autopilot/configuration
|
||||
|
||||
Available in Consul 0.8.0 and later, the autopilot configuration endpoint supports the
|
||||
`GET` and `PUT` methods.
|
||||
|
||||
This endpoint supports the use of ACL tokens using either the `X-CONSUL-TOKEN`
|
||||
header or the `?token=` query parameter.
|
||||
|
||||
By default, the datacenter of the agent is queried; however, the `dc` can be
|
||||
provided using the `?dc=` query parameter.
|
||||
|
||||
#### GET Method
|
||||
|
||||
When using the `GET` method, the request will be forwarded to the cluster
|
||||
leader to retrieve its latest Autopilot configuration.
|
||||
|
||||
If the cluster doesn't currently have a leader an error will be returned. You
|
||||
can use the `?stale` query parameter to read the Raft configuration from any
|
||||
of the Consul servers.
|
||||
|
||||
If ACLs are enabled, the client will need to supply an ACL Token with
|
||||
[`operator`](/docs/internals/acl.html#operator) read privileges.
|
||||
|
||||
A JSON body is returned that looks like this:
|
||||
|
||||
```javascript
|
||||
{
|
||||
"CleanupDeadServers": true,
|
||||
"CreateIndex": 4,
|
||||
"ModifyIndex": 4
|
||||
}
|
||||
```
|
||||
|
||||
`CleanupDeadServers` is whether dead servers should be removed automatically when
|
||||
a new server is added to the cluster.
|
||||
|
||||
#### PUT Method
|
||||
|
||||
Using the `PUT` method, this endpoint will update the Autopilot configuration
|
||||
of the cluster.
|
||||
|
||||
The `?cas=<index>` can optionally be specified to update the configuration as a
|
||||
Check-And-Set operation. The update will only happen if the given index matches
|
||||
the `ModifyIndex` of the configuration at the time of writing.
|
||||
|
||||
If ACLs are enabled, the client will need to supply an ACL Token with
|
||||
[`operator`](/docs/internals/acl.html#operator) write privileges.
|
||||
|
||||
The `PUT` method expects a JSON request body to be submitted. The request
|
||||
body must look like:
|
||||
|
||||
```javascript
|
||||
{
|
||||
"CleanupDeadServers": true
|
||||
}
|
||||
```
|
||||
|
||||
`CleanupDeadServers` is whether dead servers should be removed automatically when
|
||||
a new server is added to the cluster.
|
||||
|
||||
The return code will indicate success or failure.
|
||||
|
|
|
@ -551,6 +551,19 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
|
|||
* <a name="atlas_endpoint"></a><a href="#atlas_endpoint">`atlas_endpoint`</a> Equivalent to the
|
||||
[`-atlas-endpoint` command-line flag](#_atlas_endpoint).
|
||||
|
||||
* <a name="autopilot"></a><a href="#autopilot">`autopilot`</a> Added in Consul 0.8, this object
|
||||
allows a number of sub-keys to be set which can configure operator-friendly settings for Consul servers.
|
||||
<br><br>
|
||||
The following sub-keys are available:
|
||||
|
||||
* <a name="raft_protocol"></a><a href="#raft_protocol">`raft_protocol`</a> - This controls the internal
|
||||
version of the Raft consensus protocol used for server communications. This defaults to 2 but must
|
||||
be set to 3 in order to gain access to other Autopilot features, with the exception of
|
||||
[`cleanup_dead_servers`](#cleanup_dead_servers).
|
||||
|
||||
* <a name="cleanup_dead_servers"></a><a href="#cleanup_dead_servers">`cleanup_dead_servers`</a> - This controls
|
||||
the automatic removal of dead server nodes whenever a new server is added to the cluster. Defaults to `true`.
|
||||
|
||||
* <a name="bootstrap"></a><a href="#bootstrap">`bootstrap`</a> Equivalent to the
|
||||
[`-bootstrap` command-line flag](#_bootstrap).
|
||||
|
||||
|
|
|
@ -35,10 +35,12 @@ Usage: consul operator <subcommand> [options]
|
|||
|
||||
Subcommands:
|
||||
|
||||
raft Provides cluster-level tools for Consul operators
|
||||
autopilot Provides tools for modifying Autopilot configuration
|
||||
raft Provides cluster-level tools for Consul operators
|
||||
```
|
||||
|
||||
For more information, examples, and usage about a subcommand, click on the name
|
||||
of the subcommand in the sidebar or one of the links below:
|
||||
|
||||
- [autopilot] (/docs/commands/operator/autopilot.html)
|
||||
- [raft] (/docs/commands/operator/raft.html)
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
---
|
||||
layout: "docs"
|
||||
page_title: "Commands: Operator Autopilot"
|
||||
sidebar_current: "docs-commands-operator-autopilot"
|
||||
description: >
|
||||
The operator autopilot subcommand is used to view and modify Consul's Autopilot configuration.
|
||||
---
|
||||
|
||||
# Consul Operator Autopilot
|
||||
|
||||
Command: `consul operator autopilot`
|
||||
|
||||
The Autopilot operator command is used to interact with Consul's Autopilot subsystem. The
|
||||
command can be used to view or modify the current Autopilot configuration.
|
||||
|
||||
```text
|
||||
Usage: consul operator autopilot <subcommand> [options]
|
||||
|
||||
The Autopilot operator command is used to interact with Consul's Autopilot
|
||||
subsystem. The command can be used to view or modify the current configuration.
|
||||
|
||||
Subcommands:
|
||||
|
||||
get-config Display the current Autopilot configuration
|
||||
set-config Modify the current Autopilot configuration
|
||||
```
|
||||
|
||||
## get-config
|
||||
|
||||
This command displays the current Raft peer configuration.
|
||||
|
||||
Usage: `consul operator autopilot get-config [options]`
|
||||
|
||||
#### API Options
|
||||
|
||||
<%= partial "docs/commands/http_api_options_client" %>
|
||||
<%= partial "docs/commands/http_api_options_server" %>
|
||||
|
||||
The output looks like this:
|
||||
|
||||
```
|
||||
CleanupDeadServers = true
|
||||
```
|
||||
|
||||
## set-config
|
||||
|
||||
Modifies the current Autopilot configuration.
|
||||
|
||||
Usage: `consul operator autopilot set-config [options]`
|
||||
|
||||
#### API Options
|
||||
|
||||
<%= partial "docs/commands/http_api_options_client" %>
|
||||
<%= partial "docs/commands/http_api_options_server" %>
|
||||
|
||||
#### Command Options
|
||||
|
||||
* `-cleanup-dead-servers` - Specifies whether to enable automatic removal of dead servers
|
||||
upon the successful joining of new servers to the cluster. Must be one of `[true|false]`.
|
||||
|
||||
The output looks like this:
|
||||
|
||||
```
|
||||
Configuration updated!
|
||||
```
|
||||
|
||||
The return code will indicate success or failure.
|
|
@ -142,6 +142,9 @@
|
|||
<li<%= sidebar_current("docs-commands-operator") %>>
|
||||
<a href="/docs/commands/operator.html">operator</a>
|
||||
<ul class="subnav">
|
||||
<li<%= sidebar_current("docs-commands-operator-autopilot") %>>
|
||||
<a href="/docs/commands/operator/autopilot.html">autopilot</a>
|
||||
</li>
|
||||
<li<%= sidebar_current("docs-commands-operator-raft") %>>
|
||||
<a href="/docs/commands/operator/raft.html">raft</a>
|
||||
</li>
|
||||
|
|
Loading…
Reference in a new issue