open-nomad/nomad/operator_endpoint_test.go
Seth Hoenig ba728f8f97
api: enable support for setting original job source (#16763)
* api: enable support for setting original source alongside job

This PR adds support for setting job source material along with
the registration of a job.

This includes a new HTTP endpoint and a new RPC endpoint for
making queries for the original source of a job. The
HTTP endpoint is /v1/job/<id>/submission?version=<version> and
the RPC method is Job.GetJobSubmission.

The job source (if submitted, and doing so is always optional), is
stored in the job_submission memdb table, separately from the
actual job. This way we do not incur overhead of reading the large
string field throughout normal job operations.

The server config now includes job_max_source_size for configuring
the maximum size the job source may be, before the server simply
drops the source material. This should help prevent Bad Things from
happening when huge jobs are submitted. If the value is set to 0,
all job source material will be dropped.

* api: avoid writing var content to disk for parsing

* api: move submission validation into RPC layer

* api: return an error if updating a job submission without namespace or job id

* api: be exact about the job index we associate a submission with (modify)

* api: reword api docs scheduling

* api: prune all but the last 6 job submissions

* api: protect against nil job submission in job validation

* api: set max job source size in test server

* api: fixups from pr
2023-04-11 08:45:08 -05:00

977 lines
25 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package nomad
import (
"crypto/sha256"
"encoding/base64"
"fmt"
"io"
"net"
"os"
"path"
"reflect"
"strings"
"testing"
"time"
"github.com/hashicorp/go-msgpack/codec"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/ci"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/snapshot"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestOperator_RaftGetConfiguration(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
arg := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: s1.config.Region,
},
}
var reply structs.RaftConfigurationResponse
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply); err != nil {
t.Fatalf("err: %v", err)
}
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
if len(future.Configuration().Servers) != 1 {
t.Fatalf("bad: %v", future.Configuration().Servers)
}
me := future.Configuration().Servers[0]
expected := structs.RaftConfigurationResponse{
Servers: []*structs.RaftServer{
{
ID: me.ID,
Node: fmt.Sprintf("%v.%v", s1.config.NodeName, s1.config.Region),
Address: me.Address,
Leader: true,
Voter: true,
RaftProtocol: fmt.Sprintf("%d", s1.config.RaftConfig.ProtocolVersion),
},
},
Index: future.Index(),
}
if !reflect.DeepEqual(reply, expected) {
t.Fatalf("bad: got %+v; want %+v", reply, expected)
}
}
func TestOperator_RaftGetConfiguration_ACL(t *testing.T) {
ci.Parallel(t)
s1, root, cleanupS1 := TestACLServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
assert := assert.New(t)
state := s1.fsm.State()
// Create ACL token
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
arg := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: s1.config.Region,
},
}
// Try with no token and expect permission denied
{
var reply structs.RaftConfigurationResponse
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try with an invalid token and expect permission denied
{
arg.AuthToken = invalidToken.SecretID
var reply structs.RaftConfigurationResponse
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Use management token
{
arg.AuthToken = root.SecretID
var reply structs.RaftConfigurationResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply))
future := s1.raft.GetConfiguration()
assert.Nil(future.Error())
assert.Len(future.Configuration().Servers, 1)
me := future.Configuration().Servers[0]
expected := structs.RaftConfigurationResponse{
Servers: []*structs.RaftServer{
{
ID: me.ID,
Node: fmt.Sprintf("%v.%v", s1.config.NodeName, s1.config.Region),
Address: me.Address,
Leader: true,
Voter: true,
RaftProtocol: fmt.Sprintf("%d", s1.config.RaftConfig.ProtocolVersion),
},
},
Index: future.Index(),
}
assert.Equal(expected, reply)
}
}
func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(2)
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
ports := ci.PortAllocator.Grab(1)
// Try to remove a peer that's not there.
arg := structs.RaftPeerByAddressRequest{
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", ports[0])),
}
arg.Region = s1.config.Region
var reply struct{}
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), "not found in the Raft configuration") {
t.Fatalf("err: %v", err)
}
// Add it manually to Raft.
{
future := s1.raft.AddPeer(arg.Address)
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
}
// Make sure it's there.
{
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
configuration := future.Configuration()
if len(configuration.Servers) != 2 {
t.Fatalf("bad: %v", configuration)
}
}
// Remove it, now it should go through.
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure it's not there.
{
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
configuration := future.Configuration()
if len(configuration.Servers) != 1 {
t.Fatalf("bad: %v", configuration)
}
}
}
func TestOperator_RaftRemovePeerByAddress_ACL(t *testing.T) {
ci.Parallel(t)
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(2)
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
assert := assert.New(t)
state := s1.fsm.State()
// Create ACL token
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
ports := ci.PortAllocator.Grab(1)
arg := structs.RaftPeerByAddressRequest{
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", ports[0])),
}
arg.Region = s1.config.Region
// Add peer manually to Raft.
{
future := s1.raft.AddPeer(arg.Address)
assert.Nil(future.Error())
}
var reply struct{}
// Try with no token and expect permission denied
{
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try with an invalid token and expect permission denied
{
arg.AuthToken = invalidToken.SecretID
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try with a management token
{
arg.AuthToken = root.SecretID
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
assert.Nil(err)
}
}
func TestOperator_RaftRemovePeerByID(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Try to remove a peer that's not there.
arg := structs.RaftPeerByIDRequest{
ID: raft.ServerID("e35bde83-4e9c-434f-a6ef-453f44ee21ea"),
}
arg.Region = s1.config.Region
var reply struct{}
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), "not found in the Raft configuration") {
t.Fatalf("err: %v", err)
}
ports := ci.PortAllocator.Grab(1)
// Add it manually to Raft.
{
future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", ports[0])), 0, 0)
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
}
// Make sure it's there.
{
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
configuration := future.Configuration()
if len(configuration.Servers) != 2 {
t.Fatalf("bad: %v", configuration)
}
}
// Remove it, now it should go through.
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure it's not there.
{
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
configuration := future.Configuration()
if len(configuration.Servers) != 1 {
t.Fatalf("bad: %v", configuration)
}
}
}
func TestOperator_RaftRemovePeerByID_ACL(t *testing.T) {
ci.Parallel(t)
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
assert := assert.New(t)
state := s1.fsm.State()
// Create ACL token
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
arg := structs.RaftPeerByIDRequest{
ID: raft.ServerID("e35bde83-4e9c-434f-a6ef-453f44ee21ea"),
}
arg.Region = s1.config.Region
ports := ci.PortAllocator.Grab(1)
// Add peer manually to Raft.
{
future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", ports[0])), 0, 0)
assert.Nil(future.Error())
}
var reply struct{}
// Try with no token and expect permission denied
{
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try with an invalid token and expect permission denied
{
arg.AuthToken = invalidToken.SecretID
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try with a management token
{
arg.AuthToken = root.SecretID
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
assert.Nil(err)
}
}
func TestOperator_SchedulerGetConfiguration(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.Build = "0.9.0+unittest"
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
arg := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: s1.config.Region,
},
}
var reply structs.SchedulerConfigurationResponse
if err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &arg, &reply); err != nil {
t.Fatalf("err: %v", err)
}
require := require.New(t)
require.NotZero(reply.Index)
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
}
func TestOperator_SchedulerSetConfiguration(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.Build = "0.9.0+unittest"
})
defer cleanupS1()
rpcCodec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Disable preemption and pause the eval broker.
arg := structs.SchedulerSetConfigRequest{
Config: structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: false,
},
PauseEvalBroker: true,
},
}
arg.Region = s1.config.Region
var setResponse structs.SchedulerSetConfigurationResponse
err := msgpackrpc.CallWithCodec(rpcCodec, "Operator.SchedulerSetConfiguration", &arg, &setResponse)
require.Nil(t, err)
require.NotZero(t, setResponse.Index)
// Read and verify that preemption is disabled and the eval and blocked
// evals systems are disabled.
readConfig := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: s1.config.Region,
},
}
var reply structs.SchedulerConfigurationResponse
err = msgpackrpc.CallWithCodec(rpcCodec, "Operator.SchedulerGetConfiguration", &readConfig, &reply)
require.NoError(t, err)
require.NotZero(t, reply.Index)
require.False(t, reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.True(t, reply.SchedulerConfig.PauseEvalBroker)
require.False(t, s1.evalBroker.Enabled())
require.False(t, s1.blockedEvals.Enabled())
}
func TestOperator_SchedulerGetConfiguration_ACL(t *testing.T) {
ci.Parallel(t)
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.RaftConfig.ProtocolVersion = 3
c.Build = "0.9.0+unittest"
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()
// Create ACL token
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
arg := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: s1.config.Region,
},
}
require := require.New(t)
var reply structs.SchedulerConfigurationResponse
// Try with no token and expect permission denied
{
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &arg, &reply)
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try with an invalid token and expect permission denied
{
arg.AuthToken = invalidToken.SecretID
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &arg, &reply)
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try with root token, should succeed
{
arg.AuthToken = root.SecretID
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &arg, &reply)
require.Nil(err)
}
}
func TestOperator_SchedulerSetConfiguration_ACL(t *testing.T) {
ci.Parallel(t)
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.RaftConfig.ProtocolVersion = 3
c.Build = "0.9.0+unittest"
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()
// Create ACL token
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
arg := structs.SchedulerSetConfigRequest{
Config: structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: true,
},
},
}
arg.Region = s1.config.Region
require := require.New(t)
var reply structs.SchedulerSetConfigurationResponse
// Try with no token and expect permission denied
{
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", &arg, &reply)
require.NotNil(err)
require.Equal(structs.ErrPermissionDenied.Error(), err.Error())
}
// Try with an invalid token and expect permission denied
{
arg.AuthToken = invalidToken.SecretID
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", &arg, &reply)
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try with root token, should succeed
{
arg.AuthToken = root.SecretID
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", &arg, &reply)
require.Nil(err)
}
}
func TestOperator_SnapshotSave(t *testing.T) {
ci.Parallel(t)
////// Nomad clusters topology - not specific to test
dir := t.TempDir()
server1, cleanupLS := TestServer(t, func(c *Config) {
c.BootstrapExpect = 2
c.DevMode = false
c.DataDir = path.Join(dir, "server1")
})
defer cleanupLS()
server2, cleanupRS := TestServer(t, func(c *Config) {
c.BootstrapExpect = 2
c.DevMode = false
c.DataDir = path.Join(dir, "server2")
})
defer cleanupRS()
remoteRegionServer, cleanupRRS := TestServer(t, func(c *Config) {
c.Region = "two"
c.DevMode = false
c.DataDir = path.Join(dir, "remote_region_server")
})
defer cleanupRRS()
TestJoin(t, server1, server2)
TestJoin(t, server1, remoteRegionServer)
testutil.WaitForLeader(t, server1.RPC)
testutil.WaitForLeader(t, server2.RPC)
testutil.WaitForLeader(t, remoteRegionServer.RPC)
leader, nonLeader := server1, server2
if server2.IsLeader() {
leader, nonLeader = server2, server1
}
///////// Actually run query now
cases := []struct {
name string
server *Server
}{
{"leader", leader},
{"non_leader", nonLeader},
{"remote_region", remoteRegionServer},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
handler, err := c.server.StreamingRpcHandler("Operator.SnapshotSave")
require.NoError(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
// start handler
go handler(p2)
var req structs.SnapshotSaveRequest
var resp structs.SnapshotSaveResponse
req.Region = "global"
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
err = encoder.Encode(&req)
require.NoError(t, err)
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
err = decoder.Decode(&resp)
require.NoError(t, err)
require.Empty(t, resp.ErrorMsg)
require.NotZero(t, resp.Index)
require.NotEmpty(t, resp.SnapshotChecksum)
require.Contains(t, resp.SnapshotChecksum, "sha-256=")
index := resp.Index
snap, err := os.CreateTemp("", "nomadtests-snapshot-")
require.NoError(t, err)
defer os.Remove(snap.Name())
hash := sha256.New()
_, err = io.Copy(io.MultiWriter(snap, hash), p1)
require.NoError(t, err)
expectedChecksum := "sha-256=" + base64.StdEncoding.EncodeToString(hash.Sum(nil))
require.Equal(t, expectedChecksum, resp.SnapshotChecksum)
_, err = snap.Seek(0, 0)
require.NoError(t, err)
meta, err := snapshot.Verify(snap)
require.NoError(t, err)
require.NotZerof(t, meta.Term, "snapshot term")
require.Equal(t, index, meta.Index)
})
}
}
func TestOperator_SnapshotSave_ACL(t *testing.T) {
ci.Parallel(t)
////// Nomad clusters topology - not specific to test
dir := t.TempDir()
s, root, cleanupLS := TestACLServer(t, func(c *Config) {
c.BootstrapExpect = 1
c.DevMode = false
c.DataDir = path.Join(dir, "server1")
})
defer cleanupLS()
testutil.WaitForLeader(t, s.RPC)
deniedToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
///////// Actually run query now
cases := []struct {
name string
token string
errCode int
err error
}{
{"root", root.SecretID, 0, nil},
{"no_permission_token", deniedToken.SecretID, 403, structs.ErrPermissionDenied},
{"invalid token", uuid.Generate(), 403, structs.ErrPermissionDenied},
{"unauthenticated", "", 403, structs.ErrPermissionDenied},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
handler, err := s.StreamingRpcHandler("Operator.SnapshotSave")
require.NoError(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
// start handler
go handler(p2)
var req structs.SnapshotSaveRequest
var resp structs.SnapshotSaveResponse
req.Region = "global"
req.AuthToken = c.token
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
err = encoder.Encode(&req)
require.NoError(t, err)
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
err = decoder.Decode(&resp)
require.NoError(t, err)
// streaming errors appear as a response rather than a returned error
if c.err != nil {
require.Equal(t, c.err.Error(), resp.ErrorMsg)
require.Equal(t, c.errCode, resp.ErrorCode)
return
}
require.NotZero(t, resp.Index)
require.NotEmpty(t, resp.SnapshotChecksum)
require.Contains(t, resp.SnapshotChecksum, "sha-256=")
io.Copy(io.Discard, p1)
})
}
}
func TestOperator_SnapshotRestore(t *testing.T) {
ci.Parallel(t)
targets := []string{"leader", "non_leader", "remote_region"}
for _, c := range targets {
t.Run(c, func(t *testing.T) {
snap, job := generateSnapshot(t)
checkFn := func(t *testing.T, s *Server) {
found, err := s.State().JobByID(nil, job.Namespace, job.ID)
require.NoError(t, err)
require.Equal(t, job.ID, found.ID)
}
var req structs.SnapshotRestoreRequest
req.Region = "global"
testRestoreSnapshot(t, &req, snap, c, checkFn)
})
}
}
func generateSnapshot(t *testing.T) (*snapshot.Snapshot, *structs.Job) {
dir := t.TempDir()
s, cleanup := TestServer(t, func(c *Config) {
c.BootstrapExpect = 1
c.DevMode = false
c.DataDir = path.Join(dir, "server1")
})
defer cleanup()
job := mock.Job()
jobReq := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var jobResp structs.JobRegisterResponse
codec := rpcClient(t, s)
err := msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp)
require.NoError(t, err)
err = s.State().UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job)
require.NoError(t, err)
snapshot, err := snapshot.New(s.logger, s.raft)
require.NoError(t, err)
t.Cleanup(func() { snapshot.Close() })
return snapshot, job
}
func testRestoreSnapshot(t *testing.T, req *structs.SnapshotRestoreRequest, snapshot io.Reader, target string,
assertionFn func(t *testing.T, server *Server)) {
////// Nomad clusters topology - not specific to test
dir := t.TempDir()
server1, cleanupLS := TestServer(t, func(c *Config) {
c.BootstrapExpect = 2
c.DevMode = false
c.DataDir = path.Join(dir, "server1")
// increase times outs to account for I/O operations that
// snapshot restore performs - some of which require sync calls
c.RaftConfig.LeaderLeaseTimeout = 1 * time.Second
c.RaftConfig.HeartbeatTimeout = 1 * time.Second
c.RaftConfig.ElectionTimeout = 1 * time.Second
c.RaftTimeout = 5 * time.Second
})
defer cleanupLS()
server2, cleanupRS := TestServer(t, func(c *Config) {
c.BootstrapExpect = 2
c.DevMode = false
c.DataDir = path.Join(dir, "server2")
// increase times outs to account for I/O operations that
// snapshot restore performs - some of which require sync calls
c.RaftConfig.LeaderLeaseTimeout = 1 * time.Second
c.RaftConfig.HeartbeatTimeout = 1 * time.Second
c.RaftConfig.ElectionTimeout = 1 * time.Second
c.RaftTimeout = 5 * time.Second
})
defer cleanupRS()
remoteRegionServer, cleanupRRS := TestServer(t, func(c *Config) {
c.Region = "two"
c.DevMode = false
c.DataDir = path.Join(dir, "remote_region_server")
})
defer cleanupRRS()
TestJoin(t, server1, server2)
TestJoin(t, server1, remoteRegionServer)
testutil.WaitForLeader(t, server1.RPC)
testutil.WaitForLeader(t, server2.RPC)
testutil.WaitForLeader(t, remoteRegionServer.RPC)
leader, nonLeader := server1, server2
if server2.IsLeader() {
leader, nonLeader = server2, server1
}
///////// Actually run query now
mapping := map[string]*Server{
"leader": leader,
"non_leader": nonLeader,
"remote_region": remoteRegionServer,
}
server := mapping[target]
require.NotNil(t, server, "target not found")
handler, err := server.StreamingRpcHandler("Operator.SnapshotRestore")
require.NoError(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
// start handler
go handler(p2)
var resp structs.SnapshotRestoreResponse
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
err = encoder.Encode(req)
require.NoError(t, err)
buf := make([]byte, 1024)
for {
n, err := snapshot.Read(buf)
if n > 0 {
require.NoError(t, encoder.Encode(&cstructs.StreamErrWrapper{Payload: buf[:n]}))
}
if err != nil {
require.NoError(t, encoder.Encode(&cstructs.StreamErrWrapper{Error: &cstructs.RpcError{Message: err.Error()}}))
break
}
}
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
err = decoder.Decode(&resp)
require.NoError(t, err)
require.Empty(t, resp.ErrorMsg)
t.Run("checking leader state", func(t *testing.T) {
assertionFn(t, leader)
})
t.Run("checking nonleader state", func(t *testing.T) {
assertionFn(t, leader)
})
}
func TestOperator_SnapshotRestore_ACL(t *testing.T) {
ci.Parallel(t)
dir := t.TempDir()
///////// Actually run query now
cases := []struct {
name string
errCode int
err error
}{
{"root", 0, nil},
{"no_permission_token", 403, structs.ErrPermissionDenied},
{"invalid token", 403, structs.ErrPermissionDenied},
{"unauthenticated", 403, structs.ErrPermissionDenied},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
snapshot, _ := generateSnapshot(t)
s, root, cleanupLS := TestACLServer(t, func(cfg *Config) {
cfg.BootstrapExpect = 1
cfg.DevMode = false
cfg.DataDir = path.Join(dir, "server_"+c.name)
})
defer cleanupLS()
testutil.WaitForLeader(t, s.RPC)
deniedToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
token := ""
switch c.name {
case "root":
token = root.SecretID
case "no_permission_token":
token = deniedToken.SecretID
case "invalid token":
token = uuid.Generate()
case "unauthenticated":
token = ""
default:
t.Fatalf("unexpected case: %v", c.name)
}
handler, err := s.StreamingRpcHandler("Operator.SnapshotRestore")
require.NoError(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
// start handler
go handler(p2)
var req structs.SnapshotRestoreRequest
var resp structs.SnapshotRestoreResponse
req.Region = "global"
req.AuthToken = token
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
err = encoder.Encode(&req)
require.NoError(t, err)
if c.err == nil {
buf := make([]byte, 1024)
for {
n, err := snapshot.Read(buf)
if n > 0 {
require.NoError(t, encoder.Encode(&cstructs.StreamErrWrapper{Payload: buf[:n]}))
}
if err != nil {
require.NoError(t, encoder.Encode(&cstructs.StreamErrWrapper{Error: &cstructs.RpcError{Message: err.Error()}}))
break
}
}
}
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
err = decoder.Decode(&resp)
require.NoError(t, err)
// streaming errors appear as a response rather than a returned error
if c.err != nil {
require.Equal(t, c.err.Error(), resp.ErrorMsg)
require.Equal(t, c.errCode, resp.ErrorCode)
return
}
require.NotZero(t, resp.Index)
io.Copy(io.Discard, p1)
})
}
}