Smaller methods, and added tests for RPC layer
This commit is contained in:
parent
75662b50d1
commit
7ef126a027
|
@ -215,51 +215,59 @@ func (s *HTTPServer) OperatorSchedulerConfiguration(resp http.ResponseWriter, re
|
|||
// Switch on the method
|
||||
switch req.Method {
|
||||
case "GET":
|
||||
var args structs.GenericRequest
|
||||
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
return s.schedulerGetConfig(resp, req)
|
||||
|
||||
var reply structs.SchedulerConfigurationResponse
|
||||
if err := s.agent.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
setMeta(resp, &reply.QueryMeta)
|
||||
|
||||
return reply, nil
|
||||
|
||||
case "PUT":
|
||||
var args structs.SchedulerSetConfigRequest
|
||||
s.parseWriteRequest(req, &args.WriteRequest)
|
||||
|
||||
var conf api.SchedulerConfiguration
|
||||
if err := decodeBody(req, &conf); err != nil {
|
||||
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing scheduler config: %v", err))
|
||||
}
|
||||
|
||||
args.Config = structs.SchedulerConfiguration{
|
||||
PreemptionConfig: structs.PreemptionConfig{SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled},
|
||||
}
|
||||
|
||||
// Check for cas value
|
||||
params := req.URL.Query()
|
||||
if _, ok := params["cas"]; ok {
|
||||
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
|
||||
if err != nil {
|
||||
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err))
|
||||
}
|
||||
args.Config.ModifyIndex = casVal
|
||||
args.CAS = true
|
||||
}
|
||||
|
||||
var reply structs.SchedulerSetConfigurationResponse
|
||||
if err := s.agent.RPC("Operator.SchedulerSetConfiguration", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
setIndex(resp, reply.Index)
|
||||
return reply, nil
|
||||
case "PUT", "POST":
|
||||
return s.schedulerUpdateConfig(resp, req)
|
||||
|
||||
default:
|
||||
return nil, CodedError(404, ErrInvalidMethod)
|
||||
return nil, CodedError(405, ErrInvalidMethod)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *HTTPServer) schedulerGetConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
var args structs.GenericRequest
|
||||
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var reply structs.SchedulerConfigurationResponse
|
||||
if err := s.agent.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
setMeta(resp, &reply.QueryMeta)
|
||||
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
var args structs.SchedulerSetConfigRequest
|
||||
s.parseWriteRequest(req, &args.WriteRequest)
|
||||
|
||||
var conf api.SchedulerConfiguration
|
||||
if err := decodeBody(req, &conf); err != nil {
|
||||
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing scheduler config: %v", err))
|
||||
}
|
||||
|
||||
args.Config = structs.SchedulerConfiguration{
|
||||
PreemptionConfig: structs.PreemptionConfig{SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled},
|
||||
}
|
||||
|
||||
// Check for cas value
|
||||
params := req.URL.Query()
|
||||
if _, ok := params["cas"]; ok {
|
||||
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
|
||||
if err != nil {
|
||||
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err))
|
||||
}
|
||||
args.Config.ModifyIndex = casVal
|
||||
args.CAS = true
|
||||
}
|
||||
|
||||
var reply structs.SchedulerSetConfigurationResponse
|
||||
if err := s.agent.RPC("Operator.SchedulerSetConfiguration", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
setIndex(resp, reply.Index)
|
||||
return reply, nil
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestOperator_RaftGetConfiguration(t *testing.T) {
|
||||
|
@ -333,3 +334,157 @@ func TestOperator_RaftRemovePeerByID_ACL(t *testing.T) {
|
|||
assert.Nil(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_SchedulerGetConfiguration(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
arg := structs.GenericRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: s1.config.Region,
|
||||
},
|
||||
}
|
||||
var reply structs.SchedulerConfigurationResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &arg, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
require := require.New(t)
|
||||
require.NotZero(reply.Index)
|
||||
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
|
||||
}
|
||||
|
||||
func TestOperator_SchedulerSetConfiguration(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
// Disable preemption
|
||||
arg := structs.SchedulerSetConfigRequest{
|
||||
Config: structs.SchedulerConfiguration{
|
||||
PreemptionConfig: structs.PreemptionConfig{
|
||||
SystemSchedulerEnabled: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
arg.Region = s1.config.Region
|
||||
|
||||
var setResponse structs.SchedulerSetConfigurationResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", &arg, &setResponse)
|
||||
require.Nil(err)
|
||||
require.NotZero(setResponse.Index)
|
||||
|
||||
// Read and verify that preemption is disabled
|
||||
readConfig := structs.GenericRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: s1.config.Region,
|
||||
},
|
||||
}
|
||||
var reply structs.SchedulerConfigurationResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &readConfig, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
require.NotZero(reply.Index)
|
||||
require.False(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
|
||||
}
|
||||
|
||||
func TestOperator_SchedulerGetConfiguration_ACL(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1, root := TestACLServer(t, func(c *Config) {
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
})
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
state := s1.fsm.State()
|
||||
|
||||
// Create ACL token
|
||||
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
|
||||
|
||||
arg := structs.GenericRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: s1.config.Region,
|
||||
},
|
||||
}
|
||||
require := require.New(t)
|
||||
var reply structs.SchedulerConfigurationResponse
|
||||
|
||||
// Try with no token and expect permission denied
|
||||
{
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &arg, &reply)
|
||||
require.NotNil(err)
|
||||
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
|
||||
}
|
||||
|
||||
// Try with an invalid token and expect permission denied
|
||||
{
|
||||
arg.AuthToken = invalidToken.SecretID
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &arg, &reply)
|
||||
require.NotNil(err)
|
||||
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
|
||||
}
|
||||
|
||||
// Try with root token, should succeed
|
||||
{
|
||||
arg.AuthToken = root.SecretID
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &arg, &reply)
|
||||
require.Nil(err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestOperator_SchedulerSetConfiguration_ACL(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1, root := TestACLServer(t, func(c *Config) {
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
})
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
state := s1.fsm.State()
|
||||
|
||||
// Create ACL token
|
||||
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
|
||||
|
||||
arg := structs.SchedulerSetConfigRequest{
|
||||
Config: structs.SchedulerConfiguration{
|
||||
PreemptionConfig: structs.PreemptionConfig{
|
||||
SystemSchedulerEnabled: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
arg.Region = s1.config.Region
|
||||
|
||||
require := require.New(t)
|
||||
var reply structs.SchedulerSetConfigurationResponse
|
||||
|
||||
// Try with no token and expect permission denied
|
||||
{
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", &arg, &reply)
|
||||
require.NotNil(err)
|
||||
require.Equal(structs.ErrPermissionDenied.Error(), err.Error())
|
||||
}
|
||||
|
||||
// Try with an invalid token and expect permission denied
|
||||
{
|
||||
arg.AuthToken = invalidToken.SecretID
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", &arg, &reply)
|
||||
require.NotNil(err)
|
||||
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
|
||||
}
|
||||
|
||||
// Try with root token, should succeed
|
||||
{
|
||||
arg.AuthToken = root.SecretID
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", &arg, &reply)
|
||||
require.Nil(err)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -3918,7 +3918,7 @@ func (s *StateStore) SchedulerCASConfig(idx, cidx uint64, config *structs.Schedu
|
|||
// index arg, then we shouldn't update anything and can safely
|
||||
// return early here.
|
||||
e, ok := existing.(*structs.SchedulerConfiguration)
|
||||
if !ok || e.ModifyIndex != cidx {
|
||||
if !ok || (e != nil && e.ModifyIndex != cidx) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue