f0c3dca49c
Copy the updated version of freeport (sdk/freeport), and tweak it for use in Nomad tests. This means staying below port 10000 to avoid conflicts with the lib/freeport that is still transitively used by the old version of consul that we vendor. Also provide implementations to find ephemeral ports of macOS and Windows environments. Ports acquired through freeport are supposed to be returned to freeport, which this change now also introduces. Many tests are modified to include calls to a cleanup function for Server objects. This should help quite a bit with some flakey tests, but not all of them. Our port problems will not go away completely until we upgrade our vendor version of consul. With Go modules, we'll probably do a 'replace' to swap out other copies of freeport with the one now in 'nomad/helper/freeport'.
524 lines
14 KiB
Go
524 lines
14 KiB
Go
package nomad
|
|
|
|
import (
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"testing"
|
|
|
|
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
|
"github.com/hashicorp/nomad/acl"
|
|
"github.com/hashicorp/nomad/helper/freeport"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
"github.com/hashicorp/raft"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestOperator_RaftGetConfiguration(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
arg := structs.GenericRequest{
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: s1.config.Region,
|
|
},
|
|
}
|
|
var reply structs.RaftConfigurationResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
future := s1.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if len(future.Configuration().Servers) != 1 {
|
|
t.Fatalf("bad: %v", future.Configuration().Servers)
|
|
}
|
|
me := future.Configuration().Servers[0]
|
|
expected := structs.RaftConfigurationResponse{
|
|
Servers: []*structs.RaftServer{
|
|
{
|
|
ID: me.ID,
|
|
Node: fmt.Sprintf("%v.%v", s1.config.NodeName, s1.config.Region),
|
|
Address: me.Address,
|
|
Leader: true,
|
|
Voter: true,
|
|
RaftProtocol: fmt.Sprintf("%d", s1.config.RaftConfig.ProtocolVersion),
|
|
},
|
|
},
|
|
Index: future.Index(),
|
|
}
|
|
if !reflect.DeepEqual(reply, expected) {
|
|
t.Fatalf("bad: got %+v; want %+v", reply, expected)
|
|
}
|
|
}
|
|
|
|
func TestOperator_RaftGetConfiguration_ACL(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
s1, root, cleanupS1 := TestACLServer(t, nil)
|
|
defer cleanupS1()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
assert := assert.New(t)
|
|
state := s1.fsm.State()
|
|
|
|
// Create ACL token
|
|
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
|
|
|
|
arg := structs.GenericRequest{
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: s1.config.Region,
|
|
},
|
|
}
|
|
|
|
// Try with no token and expect permission denied
|
|
{
|
|
var reply structs.RaftConfigurationResponse
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply)
|
|
assert.NotNil(err)
|
|
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// Try with an invalid token and expect permission denied
|
|
{
|
|
arg.AuthToken = invalidToken.SecretID
|
|
var reply structs.RaftConfigurationResponse
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply)
|
|
assert.NotNil(err)
|
|
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// Use management token
|
|
{
|
|
arg.AuthToken = root.SecretID
|
|
var reply structs.RaftConfigurationResponse
|
|
assert.Nil(msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply))
|
|
|
|
future := s1.raft.GetConfiguration()
|
|
assert.Nil(future.Error())
|
|
assert.Len(future.Configuration().Servers, 1)
|
|
|
|
me := future.Configuration().Servers[0]
|
|
expected := structs.RaftConfigurationResponse{
|
|
Servers: []*structs.RaftServer{
|
|
{
|
|
ID: me.ID,
|
|
Node: fmt.Sprintf("%v.%v", s1.config.NodeName, s1.config.Region),
|
|
Address: me.Address,
|
|
Leader: true,
|
|
Voter: true,
|
|
RaftProtocol: fmt.Sprintf("%d", s1.config.RaftConfig.ProtocolVersion),
|
|
},
|
|
},
|
|
Index: future.Index(),
|
|
}
|
|
assert.Equal(expected, reply)
|
|
}
|
|
}
|
|
|
|
func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(2)
|
|
})
|
|
defer cleanupS1()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
ports := freeport.MustTake(1)
|
|
defer freeport.Return(ports)
|
|
|
|
// Try to remove a peer that's not there.
|
|
arg := structs.RaftPeerByAddressRequest{
|
|
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", ports[0])),
|
|
}
|
|
arg.Region = s1.config.Region
|
|
var reply struct{}
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
|
|
if err == nil || !strings.Contains(err.Error(), "not found in the Raft configuration") {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Add it manually to Raft.
|
|
{
|
|
future := s1.raft.AddPeer(arg.Address)
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Make sure it's there.
|
|
{
|
|
future := s1.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
configuration := future.Configuration()
|
|
if len(configuration.Servers) != 2 {
|
|
t.Fatalf("bad: %v", configuration)
|
|
}
|
|
}
|
|
|
|
// Remove it, now it should go through.
|
|
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Make sure it's not there.
|
|
{
|
|
future := s1.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
configuration := future.Configuration()
|
|
if len(configuration.Servers) != 1 {
|
|
t.Fatalf("bad: %v", configuration)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestOperator_RaftRemovePeerByAddress_ACL(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
|
|
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(2)
|
|
})
|
|
|
|
defer cleanupS1()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
assert := assert.New(t)
|
|
state := s1.fsm.State()
|
|
|
|
// Create ACL token
|
|
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
|
|
|
|
ports := freeport.MustTake(1)
|
|
defer freeport.Return(ports)
|
|
|
|
arg := structs.RaftPeerByAddressRequest{
|
|
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", ports[0])),
|
|
}
|
|
arg.Region = s1.config.Region
|
|
|
|
// Add peer manually to Raft.
|
|
{
|
|
future := s1.raft.AddPeer(arg.Address)
|
|
assert.Nil(future.Error())
|
|
}
|
|
|
|
var reply struct{}
|
|
|
|
// Try with no token and expect permission denied
|
|
{
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
|
|
assert.NotNil(err)
|
|
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// Try with an invalid token and expect permission denied
|
|
{
|
|
arg.AuthToken = invalidToken.SecretID
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
|
|
assert.NotNil(err)
|
|
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// Try with a management token
|
|
{
|
|
arg.AuthToken = root.SecretID
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
|
|
assert.Nil(err)
|
|
}
|
|
}
|
|
|
|
func TestOperator_RaftRemovePeerByID(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS1()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Try to remove a peer that's not there.
|
|
arg := structs.RaftPeerByIDRequest{
|
|
ID: raft.ServerID("e35bde83-4e9c-434f-a6ef-453f44ee21ea"),
|
|
}
|
|
arg.Region = s1.config.Region
|
|
var reply struct{}
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
|
|
if err == nil || !strings.Contains(err.Error(), "not found in the Raft configuration") {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
ports := freeport.MustTake(1)
|
|
defer freeport.Return(ports)
|
|
|
|
// Add it manually to Raft.
|
|
{
|
|
future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", ports[0])), 0, 0)
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Make sure it's there.
|
|
{
|
|
future := s1.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
configuration := future.Configuration()
|
|
if len(configuration.Servers) != 2 {
|
|
t.Fatalf("bad: %v", configuration)
|
|
}
|
|
}
|
|
|
|
// Remove it, now it should go through.
|
|
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Make sure it's not there.
|
|
{
|
|
future := s1.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
configuration := future.Configuration()
|
|
if len(configuration.Servers) != 1 {
|
|
t.Fatalf("bad: %v", configuration)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestOperator_RaftRemovePeerByID_ACL(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS1()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
assert := assert.New(t)
|
|
state := s1.fsm.State()
|
|
|
|
// Create ACL token
|
|
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
|
|
|
|
arg := structs.RaftPeerByIDRequest{
|
|
ID: raft.ServerID("e35bde83-4e9c-434f-a6ef-453f44ee21ea"),
|
|
}
|
|
arg.Region = s1.config.Region
|
|
|
|
ports := freeport.MustTake(1)
|
|
defer freeport.Return(ports)
|
|
|
|
// Add peer manually to Raft.
|
|
{
|
|
future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", ports[0])), 0, 0)
|
|
assert.Nil(future.Error())
|
|
}
|
|
|
|
var reply struct{}
|
|
|
|
// Try with no token and expect permission denied
|
|
{
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
|
|
assert.NotNil(err)
|
|
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// Try with an invalid token and expect permission denied
|
|
{
|
|
arg.AuthToken = invalidToken.SecretID
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
|
|
assert.NotNil(err)
|
|
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// Try with a management token
|
|
{
|
|
arg.AuthToken = root.SecretID
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
|
|
assert.Nil(err)
|
|
}
|
|
}
|
|
|
|
func TestOperator_SchedulerGetConfiguration(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.Build = "0.9.0+unittest"
|
|
})
|
|
defer cleanupS1()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
arg := structs.GenericRequest{
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: s1.config.Region,
|
|
},
|
|
}
|
|
var reply structs.SchedulerConfigurationResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &arg, &reply); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
require := require.New(t)
|
|
require.NotZero(reply.Index)
|
|
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
|
|
}
|
|
|
|
func TestOperator_SchedulerSetConfiguration(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.Build = "0.9.0+unittest"
|
|
})
|
|
defer cleanupS1()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
require := require.New(t)
|
|
|
|
// Disable preemption
|
|
arg := structs.SchedulerSetConfigRequest{
|
|
Config: structs.SchedulerConfiguration{
|
|
PreemptionConfig: structs.PreemptionConfig{
|
|
SystemSchedulerEnabled: false,
|
|
},
|
|
},
|
|
}
|
|
arg.Region = s1.config.Region
|
|
|
|
var setResponse structs.SchedulerSetConfigurationResponse
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", &arg, &setResponse)
|
|
require.Nil(err)
|
|
require.NotZero(setResponse.Index)
|
|
|
|
// Read and verify that preemption is disabled
|
|
readConfig := structs.GenericRequest{
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: s1.config.Region,
|
|
},
|
|
}
|
|
var reply structs.SchedulerConfigurationResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &readConfig, &reply); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
require.NotZero(reply.Index)
|
|
require.False(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
|
|
}
|
|
|
|
func TestOperator_SchedulerGetConfiguration_ACL(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
c.Build = "0.9.0+unittest"
|
|
})
|
|
defer cleanupS1()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
state := s1.fsm.State()
|
|
|
|
// Create ACL token
|
|
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
|
|
|
|
arg := structs.GenericRequest{
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: s1.config.Region,
|
|
},
|
|
}
|
|
require := require.New(t)
|
|
var reply structs.SchedulerConfigurationResponse
|
|
|
|
// Try with no token and expect permission denied
|
|
{
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &arg, &reply)
|
|
require.NotNil(err)
|
|
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// Try with an invalid token and expect permission denied
|
|
{
|
|
arg.AuthToken = invalidToken.SecretID
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &arg, &reply)
|
|
require.NotNil(err)
|
|
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// Try with root token, should succeed
|
|
{
|
|
arg.AuthToken = root.SecretID
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &arg, &reply)
|
|
require.Nil(err)
|
|
}
|
|
|
|
}
|
|
|
|
func TestOperator_SchedulerSetConfiguration_ACL(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
c.Build = "0.9.0+unittest"
|
|
})
|
|
defer cleanupS1()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
state := s1.fsm.State()
|
|
|
|
// Create ACL token
|
|
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
|
|
|
|
arg := structs.SchedulerSetConfigRequest{
|
|
Config: structs.SchedulerConfiguration{
|
|
PreemptionConfig: structs.PreemptionConfig{
|
|
SystemSchedulerEnabled: true,
|
|
},
|
|
},
|
|
}
|
|
arg.Region = s1.config.Region
|
|
|
|
require := require.New(t)
|
|
var reply structs.SchedulerSetConfigurationResponse
|
|
|
|
// Try with no token and expect permission denied
|
|
{
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", &arg, &reply)
|
|
require.NotNil(err)
|
|
require.Equal(structs.ErrPermissionDenied.Error(), err.Error())
|
|
}
|
|
|
|
// Try with an invalid token and expect permission denied
|
|
{
|
|
arg.AuthToken = invalidToken.SecretID
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", &arg, &reply)
|
|
require.NotNil(err)
|
|
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// Try with root token, should succeed
|
|
{
|
|
arg.AuthToken = root.SecretID
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", &arg, &reply)
|
|
require.Nil(err)
|
|
}
|
|
|
|
}
|