open-nomad/nomad/volumewatcher/volumes_watcher_test.go
Seth Hoenig ba728f8f97
api: enable support for setting original job source (#16763)
* api: enable support for setting original source alongside job

This PR adds support for setting job source material along with
the registration of a job.

This includes a new HTTP endpoint and a new RPC endpoint for
making queries for the original source of a job. The
HTTP endpoint is /v1/job/<id>/submission?version=<version> and
the RPC method is Job.GetJobSubmission.

The job source (if submitted, and doing so is always optional), is
stored in the job_submission memdb table, separately from the
actual job. This way we do not incur overhead of reading the large
string field throughout normal job operations.

The server config now includes job_max_source_size for configuring
the maximum size the job source may be, before the server simply
drops the source material. This should help prevent Bad Things from
happening when huge jobs are submitted. If the value is set to 0,
all job source material will be dropped.

* api: avoid writing var content to disk for parsing

* api: move submission validation into RPC layer

* api: return an error if updating a job submission without namespace or job id

* api: be exact about the job index we associate a submission with (modify)

* api: reword api docs scheduling

* api: prune all but the last 6 job submissions

* api: protect against nil job submission in job validation

* api: set max job source size in test server

* api: fixups from pr
2023-04-11 08:45:08 -05:00

325 lines
10 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package volumewatcher
import (
"testing"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
// TestVolumeWatch_EnableDisable tests the watcher registration logic that needs
// to happen during leader step-up/step-down
func TestVolumeWatch_EnableDisable(t *testing.T) {
ci.Parallel(t)
srv := &MockRPCServer{}
srv.state = state.TestStateStore(t)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond
watcher.SetEnabled(true, srv.State(), "")
plugin := mock.CSIPlugin()
node := testNode(plugin, srv.State())
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusComplete
vol := testVolume(plugin, alloc, node.ID)
index++
err := srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)
// need to have just enough of a volume and claim in place so that
// the watcher doesn't immediately stop and unload itself
claim := &structs.CSIVolumeClaim{
Mode: structs.CSIVolumeClaimGC,
State: structs.CSIVolumeClaimStateNodeDetached,
}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(t, err)
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)
watcher.SetEnabled(false, nil, "")
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
require.Equal(t, 0, len(watcher.watchers))
}
// TestVolumeWatch_LeadershipTransition tests the correct behavior of
// claim reaping across leader step-up/step-down
func TestVolumeWatch_LeadershipTransition(t *testing.T) {
ci.Parallel(t)
srv := &MockRPCServer{}
srv.state = state.TestStateStore(t)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond
plugin := mock.CSIPlugin()
node := testNode(plugin, srv.State())
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusRunning
vol := testVolume(plugin, alloc, node.ID)
index++
err := srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index,
[]*structs.Allocation{alloc})
require.NoError(t, err)
watcher.SetEnabled(true, srv.State(), "")
index++
err = srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)
// we should get or start up a watcher when we get an update for
// the volume from the state store
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)
vol, _ = srv.State().CSIVolumeByID(nil, vol.Namespace, vol.ID)
require.Len(t, vol.PastClaims, 0, "expected to have 0 PastClaims")
require.Equal(t, srv.countCSIUnpublish, 0, "expected no CSI.Unpublish RPC calls")
// trying to test a dropped watch is racy, so to reliably simulate
// this condition, step-down the watcher first and then perform
// the writes to the volume before starting the new watcher. no
// watches for that change will fire on the new watcher
// step-down (this is sync)
watcher.SetEnabled(false, nil, "")
watcher.wlock.RLock()
require.Equal(t, 0, len(watcher.watchers))
watcher.wlock.RUnlock()
// allocation is now invalid
index++
err = srv.State().DeleteEval(index, []string{}, []string{alloc.ID}, false)
require.NoError(t, err)
// emit a GC so that we have a volume change that's dropped
claim := &structs.CSIVolumeClaim{
AllocationID: alloc.ID,
NodeID: node.ID,
Mode: structs.CSIVolumeClaimGC,
State: structs.CSIVolumeClaimStateUnpublishing,
}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(t, err)
// create a new watcher and enable it to simulate the leadership
// transition
watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond
watcher.SetEnabled(true, srv.State(), "")
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 0 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)
vol, _ = srv.State().CSIVolumeByID(nil, vol.Namespace, vol.ID)
require.Len(t, vol.PastClaims, 1, "expected to have 1 PastClaim")
require.Equal(t, srv.countCSIUnpublish, 1, "expected CSI.Unpublish RPC to be called")
}
// TestVolumeWatch_StartStop tests the start and stop of the watcher when
// it receives notifcations and has completed its work
func TestVolumeWatch_StartStop(t *testing.T) {
ci.Parallel(t)
srv := &MockStatefulRPCServer{}
srv.state = state.TestStateStore(t)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond
watcher.SetEnabled(true, srv.State(), "")
require.Equal(t, 0, len(watcher.watchers))
plugin := mock.CSIPlugin()
node := testNode(plugin, srv.State())
alloc1 := mock.Alloc()
alloc1.ClientStatus = structs.AllocClientStatusRunning
alloc2 := mock.Alloc()
alloc2.Job = alloc1.Job
alloc2.ClientStatus = structs.AllocClientStatusRunning
index++
err := srv.State().UpsertJob(structs.MsgTypeTestSetup, index, nil, alloc1.Job)
require.NoError(t, err)
index++
err = srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1, alloc2})
require.NoError(t, err)
// register a volume and an unused volume
vol := testVolume(plugin, alloc1, node.ID)
index++
err = srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)
// assert we get a watcher; there are no claims so it should immediately stop
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 0 == len(watcher.watchers)
}, time.Second*2, 10*time.Millisecond)
// claim the volume for both allocs
claim := &structs.CSIVolumeClaim{
AllocationID: alloc1.ID,
NodeID: node.ID,
Mode: structs.CSIVolumeClaimRead,
AccessMode: structs.CSIVolumeAccessModeMultiNodeReader,
}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(t, err)
claim.AllocationID = alloc2.ID
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(t, err)
// reap the volume and assert nothing has happened
claim = &structs.CSIVolumeClaim{
AllocationID: alloc1.ID,
NodeID: node.ID,
}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(t, err)
ws := memdb.NewWatchSet()
vol, _ = srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
require.Equal(t, 2, len(vol.ReadAllocs))
// alloc becomes terminal
alloc1 = alloc1.Copy()
alloc1.ClientStatus = structs.AllocClientStatusComplete
index++
err = srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1})
require.NoError(t, err)
index++
claim.State = structs.CSIVolumeClaimStateReadyToFree
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(t, err)
// watcher stops and 1 claim has been released
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 0 == len(watcher.watchers)
}, time.Second*5, 10*time.Millisecond)
vol, _ = srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
must.Eq(t, 1, len(vol.ReadAllocs))
must.Eq(t, 0, len(vol.PastClaims))
}
// TestVolumeWatch_Delete tests the stop of the watcher when it receives
// notifications around a deleted volume
func TestVolumeWatch_Delete(t *testing.T) {
ci.Parallel(t)
srv := &MockStatefulRPCServer{}
srv.state = state.TestStateStore(t)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond
watcher.SetEnabled(true, srv.State(), "")
must.Eq(t, 0, len(watcher.watchers))
// register an unused volume
plugin := mock.CSIPlugin()
vol := mock.CSIVolume(plugin)
index++
must.NoError(t, srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol}))
// assert we get a watcher; there are no claims so it should immediately stop
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 0 == len(watcher.watchers)
}, time.Second*2, 10*time.Millisecond)
// write a GC claim to the volume and then immediately delete, to
// potentially hit the race condition between updates and deletes
index++
must.NoError(t, srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID,
&structs.CSIVolumeClaim{
Mode: structs.CSIVolumeClaimGC,
State: structs.CSIVolumeClaimStateReadyToFree,
}))
index++
must.NoError(t, srv.State().CSIVolumeDeregister(
index, vol.Namespace, []string{vol.ID}, false))
// the watcher should not be running
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 0 == len(watcher.watchers)
}, time.Second*5, 10*time.Millisecond)
}
// TestVolumeWatch_RegisterDeregister tests the start and stop of
// watchers around registration
func TestVolumeWatch_RegisterDeregister(t *testing.T) {
ci.Parallel(t)
srv := &MockStatefulRPCServer{}
srv.state = state.TestStateStore(t)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 10 * time.Millisecond
watcher.SetEnabled(true, srv.State(), "")
require.Equal(t, 0, len(watcher.watchers))
plugin := mock.CSIPlugin()
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusComplete
// register a volume without claims
vol := mock.CSIVolume(plugin)
index++
err := srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)
// watcher should stop
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 0 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)
}