Add state store table and endpoints for autopilot

This commit is contained in:
Kyle Havlovitz 2017-02-23 20:32:13 -08:00
parent ae9fce0ae0
commit 56e22a719f
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
19 changed files with 519 additions and 37 deletions

View File

@ -25,10 +25,10 @@ import (
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-sockaddr/template"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
"github.com/shirou/gopsutil/host"
"github.com/hashicorp/raft"
)
const (
@ -383,6 +383,9 @@ func (a *Agent) consulConfig() *consul.Config {
if a.config.Protocol > 0 {
base.ProtocolVersion = uint8(a.config.Protocol)
}
if a.config.RaftProtocol != 0 {
base.RaftConfig.ProtocolVersion = raft.ProtocolVersion(a.config.RaftProtocol)
}
if a.config.ACLToken != "" {
base.ACLToken = a.config.ACLToken
}
@ -413,11 +416,8 @@ func (a *Agent) consulConfig() *consul.Config {
if a.config.SessionTTLMinRaw != "" {
base.SessionTTLMin = a.config.SessionTTLMin
}
if a.config.Autopilot.RaftProtocolVersion != 0 {
base.RaftConfig.ProtocolVersion = raft.ProtocolVersion(a.config.Autopilot.RaftProtocolVersion)
}
if a.config.Autopilot.DeadServerCleanup != nil {
base.DeadServerCleanup = *a.config.Autopilot.DeadServerCleanup
base.AutopilotConfig.DeadServerCleanup = *a.config.Autopilot.DeadServerCleanup
}
// Format the build string

View File

@ -772,9 +772,6 @@ func DefaultConfig() *Config {
CheckReapInterval: 30 * time.Second,
AEInterval: time.Minute,
DisableCoordinates: false,
Autopilot: Autopilot{
DeadServerCleanup: Bool(true),
},
// SyncCoordinateRateTarget is set based on the rate that we want
// the server to handle as an aggregate across the entire cluster.

View File

@ -297,6 +297,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration))
s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer))
s.handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint))
s.handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration))
s.handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral))
s.handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific))
s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate))

View File

@ -3,11 +3,11 @@ package agent
import (
"fmt"
"net/http"
"strconv"
"github.com/hashicorp/consul/consul/structs"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/raft"
"strconv"
)
// OperatorRaftConfiguration is used to inspect the current Raft configuration.
@ -105,7 +105,7 @@ func (s *HTTPServer) OperatorKeyringEndpoint(resp http.ResponseWriter, req *http
case "DELETE":
return s.KeyringRemove(resp, req, &args)
default:
resp.WriteHeader(405)
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
}
@ -166,3 +166,43 @@ func keyringErrorsOrNil(responses []*structs.KeyringResponse) error {
}
return errs
}
// OperatorAutopilotConfiguration is used to inspect the current Autopilot configuration.
// This supports the stale query mode in case the cluster doesn't have a leader.
func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Switch on the method
switch req.Method {
case "GET":
var args structs.DCSpecificRequest
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
var reply structs.AutopilotConfig
if err := s.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
return nil, err
}
return reply, nil
case "PUT":
var args structs.AutopilotSetConfigRequest
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
if err := decodeBody(req, &args.Config, nil); err != nil {
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf("Error parsing relay factor: %v", err)))
return nil, nil
}
var reply struct{}
if err := s.agent.RPC("Operator.AutopilotSetConfiguration", &args, &reply); err != nil {
return nil, err
}
return nil, nil
default:
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
}

View File

@ -285,3 +285,60 @@ func TestOperator_Keyring_InvalidRelayFactor(t *testing.T) {
}
}, configFunc)
}
func TestOperator_AutopilotGetConfiguration(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/operator/autopilot/configuration", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.OperatorAutopilotConfiguration(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
out, ok := obj.(structs.AutopilotConfig)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if !out.DeadServerCleanup {
t.Fatalf("bad: %#v", out)
}
})
}
func TestOperator_AutopilotSetConfiguration(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer([]byte(`{"DeadServerCleanup": false}`))
req, err := http.NewRequest("PUT", "/v1/operator/autopilot/configuration", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err = srv.OperatorAutopilotConfiguration(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.AutopilotConfig
if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if reply.DeadServerCleanup {
t.Fatalf("bad: %#v", reply)
}
})
}

View File

@ -7,6 +7,7 @@ import (
"os"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/memberlist"
@ -275,9 +276,9 @@ type Config struct {
// place, and a small jitter is applied to avoid a thundering herd.
RPCHoldTimeout time.Duration
// DeadServerCleanup controls whether to remove dead servers when a new
// server is added to the Raft peers
DeadServerCleanup bool
// AutopilotConfig is used to apply the initial autopilot config when
// bootstrapping.
AutopilotConfig *structs.AutopilotConfig
}
// CheckVersion is used to check if the ProtocolVersion is valid
@ -351,7 +352,9 @@ func DefaultConfig() *Config {
TLSMinVersion: "tls10",
AutopilotConfig: &structs.AutopilotConfig{
DeadServerCleanup: true,
},
}
// Increase our reap interval to 3 days instead of 24h.

View File

@ -152,6 +152,13 @@ func (s *Server) establishLeadership() error {
err)
return err
}
// Setup autopilot config if we are the leader and need to
if err := s.initializeAutopilot(); err != nil {
s.logger.Printf("[ERR] consul: Autopilot initialization failed: %v", err)
return err
}
return nil
}
@ -237,6 +244,26 @@ func (s *Server) initializeACL() error {
return nil
}
// initializeAutopilot is used to setup the autopilot config if we are
// the leader and need to do this
func (s *Server) initializeAutopilot() error {
// Bail if the config has already been initialized
state := s.fsm.State()
config, err := state.AutopilotConfig()
if err != nil {
return fmt.Errorf("failed to get autopilot config: %v", err)
}
if config != nil {
return nil
}
if err := state.UpdateAutopilotConfig(s.config.AutopilotConfig); err != nil {
return err
}
return nil
}
// reconcile is used to reconcile the differences between Serf
// membership and what is reflected in our strongly consistent store.
// Mainly we need to ensure all live nodes are registered, all failed
@ -581,8 +608,14 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
}
}
state := s.fsm.State()
autopilotConf, err := state.AutopilotConfig()
if err != nil {
return err
}
// Look for dead servers to clean up
if s.config.DeadServerCleanup {
if autopilotConf.DeadServerCleanup {
for _, member := range s.serfLAN.Members() {
valid, _ := agent.IsConsulServer(member)
if valid && member.Name != m.Name && member.Status == serf.StatusFailed {

View File

@ -125,3 +125,55 @@ REMOVE:
op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer %q", args.Address)
return nil
}
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error {
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
return err
}
// This action requires operator read access.
acl, err := op.srv.resolveToken(args.Token)
if err != nil {
return err
}
if acl != nil && !acl.OperatorRead() {
return permissionDeniedErr
}
// We can't fetch the leader and the configuration atomically with
// the current Raft API.
state := op.srv.fsm.State()
config, err := state.AutopilotConfig()
if err != nil {
return err
}
*reply = *config
return nil
}
// AutopilotGetConfiguration is used to set the current Autopilot configuration.
func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *struct{}) error {
if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done {
return err
}
// This action requires operator read access.
acl, err := op.srv.resolveToken(args.Token)
if err != nil {
return err
}
if acl != nil && !acl.OperatorWrite() {
return permissionDeniedErr
}
// Update the autopilot config
state := op.srv.fsm.State()
if err := state.UpdateAutopilotConfig(&args.Config); err != nil {
return err
}
return nil
}

View File

@ -243,3 +243,189 @@ func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) {
t.Fatalf("err: %v", err)
}
}
func TestOperator_Autopilot_GetConfiguration(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.AutopilotConfig.DeadServerCleanup = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Change the autopilot config from the default
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.AutopilotConfig
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
if err != nil {
t.Fatalf("err: %v", err)
}
if reply.DeadServerCleanup {
t.Fatalf("bad: %#v", reply)
}
}
func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
c.AutopilotConfig.DeadServerCleanup = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Change the autopilot config from the default
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.AutopilotConfig
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
// Create an ACL with operator read permissions.
var token string
{
var rules = `
operator = "read"
`
req := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: rules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
t.Fatalf("err: %v", err)
}
}
// Now it should kick back for being an invalid config, which means it
// tried to do the operation.
arg.Token = token
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
if err != nil {
t.Fatalf("err: %v", err)
}
if reply.DeadServerCleanup {
t.Fatalf("bad: %#v", reply)
}
}
func TestOperator_Autopilot_SetConfiguration(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.AutopilotConfig.DeadServerCleanup = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Change the autopilot config from the default
arg := structs.AutopilotSetConfigRequest{
Datacenter: "dc1",
Config: structs.AutopilotConfig{
DeadServerCleanup: true,
},
}
var reply struct{}
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
if err != nil {
t.Fatalf("err: %v", err)
}
// Make sure it's changed
state := s1.fsm.State()
config, err := state.AutopilotConfig()
if err != nil {
t.Fatal(err)
}
if !config.DeadServerCleanup {
t.Fatalf("bad: %#v", config)
}
}
func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
c.AutopilotConfig.DeadServerCleanup = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Change the autopilot config from the default
arg := structs.AutopilotSetConfigRequest{
Datacenter: "dc1",
Config: structs.AutopilotConfig{
DeadServerCleanup: true,
},
}
var reply struct{}
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
// Create an ACL with operator write permissions.
var token string
{
var rules = `
operator = "write"
`
req := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: rules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
t.Fatalf("err: %v", err)
}
}
// Now it should kick back for being an invalid config, which means it
// tried to do the operation.
arg.Token = token
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
if err != nil {
t.Fatalf("err: %v", err)
}
// Make sure it's changed
state := s1.fsm.State()
config, err := state.AutopilotConfig()
if err != nil {
t.Fatal(err)
}
if !config.DeadServerCleanup {
t.Fatalf("bad: %#v", config)
}
}

View File

@ -289,7 +289,7 @@ func (s *Server) maybeBootstrap() {
addr := server.Addr.String()
addrs = append(addrs, addr)
var id raft.ServerID
if server.ID != "" && minRaftVersion >= 3 {
if minRaftVersion >= 3 {
id = raft.ServerID(server.ID)
} else {
id = raft.ServerID(addr)

View File

@ -12,8 +12,8 @@ import (
"time"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-uuid"
)
var nextPort int32 = 15000

39
consul/state/autopilot.go Normal file
View File

@ -0,0 +1,39 @@
package state
import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
)
// AutopilotConfig is used to get the current Autopilot configuration.
func (s *StateStore) AutopilotConfig() (*structs.AutopilotConfig, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the autopilot config
c, err := tx.First("autopilot-config", "id")
if err != nil {
return nil, fmt.Errorf("failed autopilot config lookup: %s", err)
}
config, ok := c.(*structs.AutopilotConfig)
if !ok {
return nil, nil
}
return config, nil
}
// AutopilotConfig is used to set the current Autopilot configuration.
func (s *StateStore) UpdateAutopilotConfig(config *structs.AutopilotConfig) error {
tx := s.db.Txn(true)
defer tx.Abort()
if err := tx.Insert("autopilot-config", config); err != nil {
return fmt.Errorf("failed updating autopilot config: %s", err)
}
tx.Commit()
return nil
}

View File

@ -0,0 +1,31 @@
package state
import (
"reflect"
"testing"
"github.com/hashicorp/consul/consul/structs"
)
func TestStateStore_Autopilot(t *testing.T) {
s := testStateStore(t)
expected := &structs.AutopilotConfig{
DeadServerCleanup: true,
}
if err := s.UpdateAutopilotConfig(expected); err != nil {
t.Fatal(err)
}
idx, config, err := s.AutopilotConfig()
if err != nil {
t.Fatal(err)
}
if idx != 0 {
t.Fatalf("bad: %d", idx)
}
if !reflect.DeepEqual(expected, config) {
t.Fatalf("bad: %#v, %#v", expected, config)
}
}

View File

@ -31,6 +31,7 @@ func stateStoreSchema() *memdb.DBSchema {
aclsTableSchema,
coordinatesTableSchema,
preparedQueriesTableSchema,
autopilotConfigTableSchema,
}
// Add the tables to the root schema
@ -440,3 +441,21 @@ func preparedQueriesTableSchema() *memdb.TableSchema {
},
}
}
// autopilotConfigTableSchema returns a new table schema used for storing
// the autopilot configuration
func autopilotConfigTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "autopilot-config",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: true,
Unique: true,
Indexer: &memdb.ConditionalIndex{
Conditional: func(obj interface{}) (bool, error) { return true, nil },
},
},
},
}
}

View File

@ -4,6 +4,12 @@ import (
"github.com/hashicorp/raft"
)
type AutopilotConfig struct {
// DeadServerCleanup controls whether to remove dead servers when a new
// server is added to the Raft peers
DeadServerCleanup bool
}
// RaftServer has information about a server in the Raft configuration.
type RaftServer struct {
// ID is the unique ID for the server. These are currently the same
@ -55,3 +61,21 @@ type RaftPeerByAddressRequest struct {
func (op *RaftPeerByAddressRequest) RequestDatacenter() string {
return op.Datacenter
}
// AutopilotSetConfigRequest is used by the Operator endpoint to update the
// current Autopilot configuration of the cluster.
type AutopilotSetConfigRequest struct {
// Datacenter is the target this request is intended for.
Datacenter string
// Config is the new Autopilot configuration to use.
Config AutopilotConfig
// WriteRequest holds the ACL token to go along with this request.
WriteRequest
}
// RequestDatacenter returns the datacenter for a given request.
func (op *AutopilotSetConfigRequest) RequestDatacenter() string {
return op.Datacenter
}