Add CAS capability to autopilot config endpoint

This commit is contained in:
Kyle Havlovitz 2017-02-24 13:08:49 -08:00
parent 7d514a7ef6
commit c9ddee1a79
No known key found for this signature in database
GPG key ID: 8A5E6B173056AD6C
12 changed files with 354 additions and 31 deletions

View file

@ -1,5 +1,13 @@
package api
import (
"bytes"
"fmt"
"io"
"strconv"
"strings"
)
// Operator can be used to perform low-level operator tasks for Consul.
type Operator struct {
c *Client
@ -65,9 +73,16 @@ type KeyringResponse struct {
// AutopilotConfiguration is used for querying/setting the Autopilot configuration
type AutopilotConfiguration struct {
// DeadServerCleanup controls whether to remove dead servers from the Raft peer list
// when a new server joins
// DeadServerCleanup controls whether to remove dead servers from the Raft
// peer list when a new server joins
DeadServerCleanup bool
// CreateIndex holds the index corresponding the creation of this configuration.
// This is a read-only field.
CreateIndex uint64
// ModifyIndex is used for doing a Check-And-Set update operation.
ModifyIndex uint64
}
// RaftGetConfiguration is used to query the current Raft peer set.
@ -169,7 +184,7 @@ func (op *Operator) KeyringUse(key string, q *WriteOptions) error {
return nil
}
// RaftGetConfiguration is used to query the current Raft peer set.
// AutopilotGetConfiguration is used to query the current Autopilot configuration.
func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfiguration, error) {
r := op.c.newRequest("GET", "/v1/operator/autopilot/configuration")
r.setQueryOptions(q)
@ -186,7 +201,7 @@ func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfig
return &out, nil
}
// RaftGetConfiguration is used to query the current Raft peer set.
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
func (op *Operator) AutopilotSetConfiguration(conf *AutopilotConfiguration, q *WriteOptions) error {
r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
r.setWriteOptions(q)
@ -197,4 +212,27 @@ func (op *Operator) AutopilotSetConfiguration(conf *AutopilotConfiguration, q *W
}
resp.Body.Close()
return nil
}
// AutopilotCASConfiguration is used to perform a Check-And-Set update on the
// Autopilot configuration. The ModifyIndex value will be respected. Returns
// true on success or false on failures.
func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *WriteOptions) (bool, error) {
r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
r.setWriteOptions(q)
r.params.Set("cas", strconv.FormatUint(conf.ModifyIndex, 10))
r.obj = conf
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return false, err
}
defer resp.Body.Close()
var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, fmt.Errorf("Failed to read response: %v", err)
}
res := strings.Contains(string(buf.Bytes()), "true")
return res, nil
}

View file

@ -120,7 +120,8 @@ func TestOperator_AutopilotGetSetConfiguration(t *testing.T) {
}
// Change a config setting
if err := operator.AutopilotSetConfiguration(&AutopilotConfiguration{false}, nil); err != nil {
newConf := &AutopilotConfiguration{DeadServerCleanup: false}
if err := operator.AutopilotSetConfiguration(newConf, nil); err != nil {
t.Fatalf("err: %v", err)
}
@ -131,4 +132,49 @@ func TestOperator_AutopilotGetSetConfiguration(t *testing.T) {
if config.DeadServerCleanup {
t.Fatalf("bad: %v", config)
}
}
func TestOperator_AutopilotCASConfiguration(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
operator := c.Operator()
config, err := operator.AutopilotGetConfiguration(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if !config.DeadServerCleanup {
t.Fatalf("bad: %v", config)
}
// Pass an invalid ModifyIndex
{
newConf := &AutopilotConfiguration{
DeadServerCleanup: false,
ModifyIndex: config.ModifyIndex - 1,
}
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp {
t.Fatalf("bad: %v", resp)
}
}
// Pass a valid ModifyIndex
{
newConf := &AutopilotConfiguration{
DeadServerCleanup: false,
ModifyIndex: config.ModifyIndex,
}
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if !resp {
t.Fatalf("bad: %v", resp)
}
}
}

View file

@ -189,18 +189,36 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
// Check for cas value
params := req.URL.Query()
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
if err != nil {
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf("Error parsing cas value: %v", err)))
return nil, nil
}
args.Config.ModifyIndex = casVal
args.CAS = true
}
if err := decodeBody(req, &args.Config, nil); err != nil {
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf("Error parsing relay factor: %v", err)))
resp.Write([]byte(fmt.Sprintf("Error parsing autopilot config: %v", err)))
return nil, nil
}
var reply struct{}
var reply bool
if err := s.agent.RPC("Operator.AutopilotSetConfiguration", &args, &reply); err != nil {
return nil, err
}
return nil, nil
// Only use the out value if this was a CAS
if !args.CAS {
return true, nil
} else {
return reply, nil
}
default:
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil

View file

@ -336,9 +336,87 @@ func TestOperator_AutopilotSetConfiguration(t *testing.T) {
if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if reply.DeadServerCleanup {
t.Fatalf("bad: %#v", reply)
}
})
}
}
func TestOperator_AutopilotCASConfiguration(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer([]byte(`{"DeadServerCleanup": false}`))
req, err := http.NewRequest("PUT", "/v1/operator/autopilot/configuration", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err = srv.OperatorAutopilotConfiguration(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.AutopilotConfig
if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if reply.DeadServerCleanup {
t.Fatalf("bad: %#v", reply)
}
// Create a CAS request, bad index
{
buf := bytes.NewBuffer([]byte(`{"DeadServerCleanup": true}`))
req, err := http.NewRequest("PUT",
fmt.Sprintf("/v1/operator/autopilot/configuration?cas=%d", reply.ModifyIndex-1), buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.OperatorAutopilotConfiguration(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if res := obj.(bool); res {
t.Fatalf("should NOT work")
}
}
// Create a CAS request, good index
{
buf := bytes.NewBuffer([]byte(`{"DeadServerCleanup": true}`))
req, err := http.NewRequest("PUT",
fmt.Sprintf("/v1/operator/autopilot/configuration?cas=%d", reply.ModifyIndex), buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.OperatorAutopilotConfiguration(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if res := obj.(bool); !res {
t.Fatalf("should work")
}
}
// Verify the update
if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if !reply.DeadServerCleanup {
t.Fatalf("bad: %#v", reply)
}
})
}

View file

@ -105,6 +105,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applyPreparedQueryOperation(buf[1:], log.Index)
case structs.TxnRequestType:
return c.applyTxn(buf[1:], log.Index)
case structs.AutopilotRequestType:
return c.applyAutopilotUpdate(buf[1:], log.Index)
default:
if ignoreUnknown {
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
@ -310,6 +312,25 @@ func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} {
return structs.TxnResponse{results, errors}
}
func (c *consulFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
var req structs.AutopilotSetConfigRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "autopilot"}, time.Now())
if req.CAS {
act, err := c.state.AutopilotCASConfig(index, req.Config.ModifyIndex, &req.Config)
if err != nil {
return err
} else {
return act
}
} else {
return c.state.AutopilotSetConfig(index, &req.Config)
}
}
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
defer func(start time.Time) {
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))

View file

@ -249,7 +249,7 @@ func (s *Server) initializeACL() error {
func (s *Server) initializeAutopilot() error {
// Bail if the config has already been initialized
state := s.fsm.State()
config, err := state.AutopilotConfig()
_, config, err := state.AutopilotConfig()
if err != nil {
return fmt.Errorf("failed to get autopilot config: %v", err)
}
@ -257,8 +257,11 @@ func (s *Server) initializeAutopilot() error {
return nil
}
if err := state.UpdateAutopilotConfig(s.config.AutopilotConfig); err != nil {
return err
req := structs.AutopilotSetConfigRequest{
Config: *s.config.AutopilotConfig,
}
if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil {
return fmt.Errorf("failed to initialize autopilot config")
}
return nil
@ -609,7 +612,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
}
state := s.fsm.State()
autopilotConf, err := state.AutopilotConfig()
_, autopilotConf, err := state.AutopilotConfig()
if err != nil {
return err
}

View file

@ -144,7 +144,7 @@ func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, r
// We can't fetch the leader and the configuration atomically with
// the current Raft API.
state := op.srv.fsm.State()
config, err := state.AutopilotConfig()
_, config, err := state.AutopilotConfig()
if err != nil {
return err
}
@ -154,8 +154,8 @@ func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, r
return nil
}
// AutopilotGetConfiguration is used to set the current Autopilot configuration.
func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *struct{}) error {
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error {
if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done {
return err
}
@ -169,11 +169,19 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
return permissionDeniedErr
}
// Update the autopilot config
state := op.srv.fsm.State()
if err := state.UpdateAutopilotConfig(&args.Config); err != nil {
// Apply the update
resp, err := op.srv.raftApply(structs.AutopilotRequestType, args)
if err != nil {
op.srv.logger.Printf("[ERR] consul.operator: Apply failed: %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Check if the return type is a bool.
if respBool, ok := resp.(bool); ok {
*reply = respBool
}
return nil
}

View file

@ -428,4 +428,4 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
if !config.DeadServerCleanup {
t.Fatalf("bad: %#v", config)
}
}
}

View file

@ -4,36 +4,83 @@ import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
)
// AutopilotConfig is used to get the current Autopilot configuration.
func (s *StateStore) AutopilotConfig() (*structs.AutopilotConfig, error) {
func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the autopilot config
c, err := tx.First("autopilot-config", "id")
if err != nil {
return nil, fmt.Errorf("failed autopilot config lookup: %s", err)
return 0, nil, fmt.Errorf("failed autopilot config lookup: %s", err)
}
config, ok := c.(*structs.AutopilotConfig)
if !ok {
return nil, nil
return 0, nil, nil
}
return config, nil
return config.ModifyIndex, config, nil
}
// AutopilotConfig is used to set the current Autopilot configuration.
func (s *StateStore) UpdateAutopilotConfig(config *structs.AutopilotConfig) error {
// AutopilotSetConfig is used to set the current Autopilot configuration.
func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error {
tx := s.db.Txn(true)
defer tx.Abort()
if err := tx.Insert("autopilot-config", config); err != nil {
return fmt.Errorf("failed updating autopilot config: %s", err)
}
s.autopilotSetConfigTxn(idx, tx, config)
tx.Commit()
return nil
}
// AutopilotCASConfig is used to try updating the Autopilot configuration with a
// given Raft index. If the CAS index specified is not equal to the last observed index
// for the config, then the call is a noop,
func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
// Check for an existing config
existing, err := tx.First("autopilot-config", "id")
if err != nil {
return false, fmt.Errorf("failed autopilot config lookup: %s", err)
}
// If the existing index does not match the provided CAS
// index arg, then we shouldn't update anything and can safely
// return early here.
e, ok := existing.(*structs.AutopilotConfig)
if !ok || e.ModifyIndex != cidx {
return false, nil
}
s.autopilotSetConfigTxn(idx, tx, config)
tx.Commit()
return true, nil
}
func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error {
// Check for an existing config
existing, err := tx.First("autopilot-config", "id")
if err != nil {
return fmt.Errorf("failed autopilot config lookup: %s", err)
}
// Set the indexes.
if existing != nil {
config.CreateIndex = existing.(*structs.AutopilotConfig).CreateIndex
} else {
config.CreateIndex = idx
}
config.ModifyIndex = idx
if err := tx.Insert("autopilot-config", config); err != nil {
return fmt.Errorf("failed updating autopilot config: %s", err)
}
return nil
}

View file

@ -14,7 +14,7 @@ func TestStateStore_Autopilot(t *testing.T) {
DeadServerCleanup: true,
}
if err := s.UpdateAutopilotConfig(expected); err != nil {
if err := s.AutopilotSetConfig(0, expected); err != nil {
t.Fatal(err)
}
@ -29,3 +29,60 @@ func TestStateStore_Autopilot(t *testing.T) {
t.Fatalf("bad: %#v, %#v", expected, config)
}
}
func TestStateStore_AutopilotCAS(t *testing.T) {
s := testStateStore(t)
expected := &structs.AutopilotConfig{
DeadServerCleanup: true,
}
if err := s.AutopilotSetConfig(0, expected); err != nil {
t.Fatal(err)
}
if err := s.AutopilotSetConfig(1, expected); err != nil {
t.Fatal(err)
}
// Do a CAS with an index lower than the entry
ok, err := s.AutopilotCASConfig(2, 0, &structs.AutopilotConfig{
DeadServerCleanup: false,
})
if ok || err != nil {
t.Fatalf("expected (false, nil), got: (%v, %#v)", ok, err)
}
// Check that the index is untouched and the entry
// has not been deleted.
idx, config, err := s.AutopilotConfig()
if err != nil {
t.Fatal(err)
}
if idx != 1 {
t.Fatalf("bad: %d", idx)
}
if !config.DeadServerCleanup {
t.Fatalf("bad: %#v", config)
}
// Do another CAS, this time with the correct index
ok, err = s.AutopilotCASConfig(2, 1, &structs.AutopilotConfig{
DeadServerCleanup: false,
})
if !ok || err != nil {
t.Fatalf("expected (true, nil), got: (%v, %#v)", ok, err)
}
// Check that the index is untouched and the entry
// has not been deleted.
idx, config, err = s.AutopilotConfig()
if err != nil {
t.Fatal(err)
}
if idx != 2 {
t.Fatalf("bad: %d", idx)
}
if config.DeadServerCleanup {
t.Fatalf("bad: %#v", config)
}
}

View file

@ -8,6 +8,9 @@ type AutopilotConfig struct {
// DeadServerCleanup controls whether to remove dead servers when a new
// server is added to the Raft peers
DeadServerCleanup bool
// RaftIndex stores the create/modify indexes of this configuration
RaftIndex
}
// RaftServer has information about a server in the Raft configuration.
@ -71,6 +74,9 @@ type AutopilotSetConfigRequest struct {
// Config is the new Autopilot configuration to use.
Config AutopilotConfig
// CAS controls whether to use check-and-set semantics for this request.
CAS bool
// WriteRequest holds the ACL token to go along with this request.
WriteRequest
}

View file

@ -40,6 +40,7 @@ const (
CoordinateBatchUpdateType
PreparedQueryRequestType
TxnRequestType
AutopilotRequestType
)
const (