From 460416e78743626016a549566f1b10aa754def8b Mon Sep 17 00:00:00 2001 From: Derek Strickland <1111455+DerekStrickland@users.noreply.github.com> Date: Wed, 26 Jan 2022 11:31:37 -0500 Subject: [PATCH 01/19] Update IsEmpty to check for pre-1.2.4 fields (#11930) --- .changelog/11902.txt | 4 +++ client/config/config.go | 5 ++- command/agent/config_test.go | 35 +++++++++++++++++++ .../client_with_basic_template.hcl | 9 +++++ .../client_with_basic_template.json | 9 +++++ 5 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 .changelog/11902.txt create mode 100644 command/agent/test-resources/client_with_basic_template.hcl create mode 100644 command/agent/test-resources/client_with_basic_template.json diff --git a/.changelog/11902.txt b/.changelog/11902.txt new file mode 100644 index 000000000..5ab055281 --- /dev/null +++ b/.changelog/11902.txt @@ -0,0 +1,4 @@ +```release-note:bug +template: Fixed a bug where client template configuration that did not include any +of the new 1.2.4 configuration options could result in none of the configuration getting set. +``` diff --git a/client/config/config.go b/client/config/config.go index 45400fcaa..9ec256eab 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -450,7 +450,10 @@ func (c *ClientTemplateConfig) IsEmpty() bool { return true } - return c.BlockQueryWaitTime == nil && + return !c.DisableSandbox && + len(c.FunctionDenylist) == 0 && + len(c.FunctionBlacklist) == 0 && + c.BlockQueryWaitTime == nil && c.BlockQueryWaitTimeHCL == "" && c.MaxStale == nil && c.MaxStaleHCL == "" && diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 72aa864b3..b795f8fad 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -1413,6 +1413,41 @@ func TestConfig_LoadConsulTemplateConfig(t *testing.T) { require.Equal(t, 20*time.Second, *templateConfig.VaultRetry.MaxBackoff) } +func TestConfig_LoadConsulTemplateBasic(t *testing.T) { + defaultConfig := DefaultConfig() + + // hcl + agentConfig, err := LoadConfig("test-resources/client_with_basic_template.hcl") + require.NoError(t, err) + require.NotNil(t, agentConfig.Client.TemplateConfig) + + agentConfig = defaultConfig.Merge(agentConfig) + + clientAgent := Agent{config: agentConfig} + clientConfig, err := clientAgent.clientConfig() + require.NoError(t, err) + + templateConfig := clientConfig.TemplateConfig + require.NotNil(t, templateConfig) + require.True(t, templateConfig.DisableSandbox) + require.Len(t, templateConfig.FunctionDenylist, 1) + + // json + agentConfig, err = LoadConfig("test-resources/client_with_basic_template.json") + require.NoError(t, err) + + agentConfig = defaultConfig.Merge(agentConfig) + + clientAgent = Agent{config: agentConfig} + clientConfig, err = clientAgent.clientConfig() + require.NoError(t, err) + + templateConfig = clientConfig.TemplateConfig + require.NotNil(t, templateConfig) + require.True(t, templateConfig.DisableSandbox) + require.Len(t, templateConfig.FunctionDenylist, 1) +} + func TestParseMultipleIPTemplates(t *testing.T) { testCases := []struct { name string diff --git a/command/agent/test-resources/client_with_basic_template.hcl b/command/agent/test-resources/client_with_basic_template.hcl new file mode 100644 index 000000000..fc1fcb493 --- /dev/null +++ b/command/agent/test-resources/client_with_basic_template.hcl @@ -0,0 +1,9 @@ + +client { + enabled = true + + template { + disable_file_sandbox = true + function_denylist = [] + } +} diff --git a/command/agent/test-resources/client_with_basic_template.json b/command/agent/test-resources/client_with_basic_template.json new file mode 100644 index 000000000..d468d357b --- /dev/null +++ b/command/agent/test-resources/client_with_basic_template.json @@ -0,0 +1,9 @@ +{ + "client": { + "enabled": true, + "template": { + "disable_file_sandbox": true, + "function_denylist": [] + } + } +} From 4e559c62559e31da72134bc9ba4b9e0c4ae4c660 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 24 Jan 2022 11:49:50 -0500 Subject: [PATCH 02/19] csi: update leader's ACL in volumewatcher (#11891) The volumewatcher that runs on the leader needs to make RPC calls rather than writing to raft (as we do in the deploymentwatcher) because the unpublish workflow needs to make RPC calls to the clients. This requires that the volumewatcher has access to the leader's ACL token. But when leadership transitions, the new leader creates a new leader ACL token. This ACL token needs to be passed into the volumewatcher when we enable it, otherwise the volumewatcher can find itself with a stale token. --- .changelog/11891.txt | 3 +++ nomad/leader.go | 4 ++-- nomad/volumewatcher/volumes_watcher.go | 7 ++++--- nomad/volumewatcher/volumes_watcher_test.go | 14 +++++++------- 4 files changed, 16 insertions(+), 12 deletions(-) create mode 100644 .changelog/11891.txt diff --git a/.changelog/11891.txt b/.changelog/11891.txt new file mode 100644 index 000000000..f09bf8e13 --- /dev/null +++ b/.changelog/11891.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed a bug where releasing volume claims would fail with ACL errors after leadership transitions. +``` diff --git a/nomad/leader.go b/nomad/leader.go index c6dca9dc6..ecffabc85 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -263,7 +263,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { s.nodeDrainer.SetEnabled(true, s.State()) // Enable the volume watcher, since we are now the leader - s.volumeWatcher.SetEnabled(true, s.State()) + s.volumeWatcher.SetEnabled(true, s.State(), s.getLeaderAcl()) // Restore the eval broker state if err := s.restoreEvals(); err != nil { @@ -1074,7 +1074,7 @@ func (s *Server) revokeLeadership() error { s.nodeDrainer.SetEnabled(false, nil) // Disable the volume watcher - s.volumeWatcher.SetEnabled(false, nil) + s.volumeWatcher.SetEnabled(false, nil, "") // Disable any enterprise systems required. if err := s.revokeEnterpriseLeadership(); err != nil { diff --git a/nomad/volumewatcher/volumes_watcher.go b/nomad/volumewatcher/volumes_watcher.go index 061df0613..c8c687add 100644 --- a/nomad/volumewatcher/volumes_watcher.go +++ b/nomad/volumewatcher/volumes_watcher.go @@ -57,14 +57,15 @@ func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *W // SetEnabled is used to control if the watcher is enabled. The // watcher should only be enabled on the active leader. When being -// enabled the state is passed in as it is no longer valid once a -// leader election has taken place. -func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore) { +// enabled the state and leader's ACL is passed in as it is no longer +// valid once a leader election has taken place. +func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore, leaderAcl string) { w.wlock.Lock() defer w.wlock.Unlock() wasEnabled := w.enabled w.enabled = enabled + w.leaderAcl = leaderAcl if state != nil { w.state = state diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 185a7225e..7f0365be3 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -23,7 +23,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) { index := uint64(100) watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") plugin := mock.CSIPlugin() node := testNode(plugin, srv.State()) @@ -48,7 +48,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) { return 1 == len(watcher.watchers) }, time.Second, 10*time.Millisecond) - watcher.SetEnabled(false, nil) + watcher.SetEnabled(false, nil, "") require.Equal(0, len(watcher.watchers)) } @@ -70,7 +70,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { alloc.ClientStatus = structs.AllocClientStatusComplete vol := testVolume(plugin, alloc, node.ID) - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") index++ err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) @@ -94,7 +94,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { // watches for that change will fire on the new watcher // step-down (this is sync) - watcher.SetEnabled(false, nil) + watcher.SetEnabled(false, nil, "") require.Equal(0, len(watcher.watchers)) // allocation is now invalid @@ -116,7 +116,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { // create a new watcher and enable it to simulate the leadership // transition watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "") - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") require.Eventually(func() bool { watcher.wlock.RLock() @@ -142,7 +142,7 @@ func TestVolumeWatch_StartStop(t *testing.T) { index := uint64(100) watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") require.Equal(0, len(watcher.watchers)) plugin := mock.CSIPlugin() @@ -237,7 +237,7 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) { watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") require.Equal(0, len(watcher.watchers)) plugin := mock.CSIPlugin() From 951661db048b6d6d2b8dec7798ca0fdc18c92ca0 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 27 Jan 2022 09:30:03 -0500 Subject: [PATCH 03/19] CSI: resolve invalid claim states (#11890) * csi: resolve invalid claim states on read It's currently possible for CSI volumes to be claimed by allocations that no longer exist. This changeset asserts a reasonable state at the state store level by registering these nil allocations as "past claims" on any read. This will cause any pass through the periodic GC or volumewatcher to trigger the unpublishing workflow for those claims. * csi: make feasibility check errors more understandable When the feasibility checker finds we have no free write claims, it checks to see if any of those claims are for the job we're currently scheduling (so that earlier versions of a job can't block claims for new versions) and reports a conflict if the volume can't be scheduled so that the user can fix their claims. But when the checker hits a claim that has a GCd allocation, the state is recoverable by the server once claim reaping completes and no user intervention is required; the blocked eval should complete. Differentiate the scheduler error produced by these two conditions. --- .changelog/11890.txt | 3 + nomad/core_sched.go | 4 + nomad/core_sched_test.go | 55 +++++- nomad/state/state_store.go | 26 +++ nomad/state/testing.go | 187 ++++++++++++++++++++ nomad/volumewatcher/volume_watcher_test.go | 29 +++ nomad/volumewatcher/volumes_watcher_test.go | 7 +- scheduler/feasible.go | 38 ++-- 8 files changed, 329 insertions(+), 20 deletions(-) create mode 100644 .changelog/11890.txt diff --git a/.changelog/11890.txt b/.changelog/11890.txt new file mode 100644 index 000000000..1074aa29c --- /dev/null +++ b/.changelog/11890.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed a bug where garbage collected allocations could block new claims on a volume +``` diff --git a/nomad/core_sched.go b/nomad/core_sched.go index cffb6114b..4306783ef 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -782,6 +782,10 @@ NEXT_VOLUME: continue } + // TODO(tgross): consider moving the TerminalStatus check into + // the denormalize volume logic so that we can just check the + // volume for past claims + // we only call the claim release RPC if the volume has claims // that no longer have valid allocations. otherwise we'd send // out a lot of do-nothing RPCs. diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index a19d4395b..095975a31 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2383,19 +2383,64 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { c := core.(*CoreScheduler) require.NoError(c.csiVolumeClaimGC(gc)) - // the volumewatcher will hit an error here because there's no - // path to the node. but we can't update the claim to bypass the - // client RPCs without triggering the volumewatcher's normal code - // path. + // TODO(tgross): the condition below means this test doesn't tell + // us much; ideally we should be intercepting the claim request + // and verifying that we send the expected claims but we don't + // have test infra in place to do that for server RPCs + + // sending the GC claim will trigger the volumewatcher's normal + // code path. but the volumewatcher will hit an error here + // because there's no path to the node, so we shouldn't see + // the WriteClaims removed require.Eventually(func() bool { vol, _ := state.CSIVolumeByID(ws, ns, volID) return len(vol.WriteClaims) == 1 && len(vol.WriteAllocs) == 1 && - len(vol.PastClaims) == 0 + len(vol.PastClaims) == 1 }, time.Second*1, 10*time.Millisecond, "claims were released unexpectedly") } +func TestCoreScheduler_CSIBadState_ClaimGC(t *testing.T) { + t.Parallel() + require := require.New(t) + + srv, shutdown := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + + defer shutdown() + testutil.WaitForLeader(t, srv.RPC) + + err := state.TestBadCSIState(t, srv.State()) + require.NoError(err) + + snap, err := srv.State().Snapshot() + require.NoError(err) + core := NewCoreScheduler(srv, snap) + + index, _ := srv.State().LatestIndex() + index++ + gc := srv.coreJobEval(structs.CoreJobForceGC, index) + c := core.(*CoreScheduler) + require.NoError(c.csiVolumeClaimGC(gc)) + + require.Eventually(func() bool { + vol, _ := srv.State().CSIVolumeByID(nil, + structs.DefaultNamespace, "csi-volume-nfs0") + if len(vol.PastClaims) != 2 { + return false + } + for _, claim := range vol.PastClaims { + if claim.State != structs.CSIVolumeClaimStateUnpublishing { + return false + } + } + return true + }, time.Second*1, 10*time.Millisecond, "invalid claims should be marked for GC") + +} + func TestCoreScheduler_FailLoop(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 484091b38..9b8cfa4f2 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2513,6 +2513,18 @@ func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st State: structs.CSIVolumeClaimStateTaken, } } + } else if _, ok := vol.PastClaims[id]; !ok { + // ensure that any allocs that have been GC'd since + // our last read are marked as past claims + vol.PastClaims[id] = &structs.CSIVolumeClaim{ + AllocationID: id, + Mode: structs.CSIVolumeClaimRead, + State: structs.CSIVolumeClaimStateUnpublishing, + } + readClaim := vol.ReadClaims[id] + if readClaim != nil { + vol.PastClaims[id].NodeID = readClaim.NodeID + } } } @@ -2531,6 +2543,20 @@ func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st State: structs.CSIVolumeClaimStateTaken, } } + } else if _, ok := vol.PastClaims[id]; !ok { + // ensure that any allocs that have been GC'd since + // our last read are marked as past claims + + vol.PastClaims[id] = &structs.CSIVolumeClaim{ + AllocationID: id, + Mode: structs.CSIVolumeClaimWrite, + State: structs.CSIVolumeClaimStateUnpublishing, + } + writeClaim := vol.WriteClaims[id] + if writeClaim != nil { + vol.PastClaims[id].NodeID = writeClaim.NodeID + } + } } diff --git a/nomad/state/testing.go b/nomad/state/testing.go index 460df6097..0d570a95a 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -1,7 +1,9 @@ package state import ( + "math" "testing" + "time" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" @@ -124,3 +126,188 @@ func createTestCSIPlugin(s *StateStore, id string, requiresController bool) func s.DeleteNode(structs.MsgTypeTestSetup, index, ids) } } + +func TestBadCSIState(t testing.TB, store *StateStore) error { + + pluginID := "org.democratic-csi.nfs" + + controllerInfo := func(isHealthy bool) map[string]*structs.CSIInfo { + desc := "healthy" + if !isHealthy { + desc = "failed fingerprinting with error" + } + return map[string]*structs.CSIInfo{ + pluginID: { + PluginID: pluginID, + AllocID: uuid.Generate(), + Healthy: isHealthy, + HealthDescription: desc, + RequiresControllerPlugin: true, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsAttachDetach: true, + }, + }, + } + } + + nodeInfo := func(nodeName string, isHealthy bool) map[string]*structs.CSIInfo { + desc := "healthy" + if !isHealthy { + desc = "failed fingerprinting with error" + } + return map[string]*structs.CSIInfo{ + pluginID: { + PluginID: pluginID, + AllocID: uuid.Generate(), + Healthy: isHealthy, + HealthDescription: desc, + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{ + ID: nodeName, + MaxVolumes: math.MaxInt64, + RequiresNodeStageVolume: true, + }, + }, + } + } + + nodes := make([]*structs.Node, 3) + for i := range nodes { + n := mock.Node() + n.Attributes["nomad.version"] = "1.2.4" + nodes[i] = n + } + + nodes[0].CSIControllerPlugins = controllerInfo(true) + nodes[0].CSINodePlugins = nodeInfo("nomad-client0", true) + + drainID := uuid.Generate() + + // drained node + nodes[1].CSIControllerPlugins = controllerInfo(false) + nodes[1].CSINodePlugins = nodeInfo("nomad-client1", false) + + nodes[1].LastDrain = &structs.DrainMetadata{ + StartedAt: time.Now().Add(-10 * time.Minute), + UpdatedAt: time.Now().Add(-30 * time.Second), + Status: structs.DrainStatusComplete, + AccessorID: drainID, + } + nodes[1].SchedulingEligibility = structs.NodeSchedulingIneligible + + // previously drained but now eligible + nodes[2].CSIControllerPlugins = controllerInfo(true) + nodes[2].CSINodePlugins = nodeInfo("nomad-client2", true) + nodes[2].LastDrain = &structs.DrainMetadata{ + StartedAt: time.Now().Add(-15 * time.Minute), + UpdatedAt: time.Now().Add(-5 * time.Minute), + Status: structs.DrainStatusComplete, + AccessorID: drainID, + } + nodes[2].SchedulingEligibility = structs.NodeSchedulingEligible + + // Insert nodes into the state store + index := uint64(999) + for _, n := range nodes { + index++ + err := store.UpsertNode(structs.MsgTypeTestSetup, index, n) + if err != nil { + return err + } + } + + allocID0 := uuid.Generate() // nil alloc + allocID2 := uuid.Generate() // nil alloc + + alloc1 := mock.Alloc() + alloc1.ClientStatus = "complete" + alloc1.DesiredStatus = "stop" + + // Insert allocs into the state store + err := store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1}) + if err != nil { + return err + } + + vol := &structs.CSIVolume{ + ID: "csi-volume-nfs0", + Name: "csi-volume-nfs0", + ExternalID: "csi-volume-nfs0", + Namespace: "default", + AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + MountOptions: &structs.CSIMountOptions{ + MountFlags: []string{"noatime"}, + }, + Context: map[string]string{ + "node_attach_driver": "nfs", + "provisioner_driver": "nfs-client", + "server": "192.168.56.69", + }, + Capacity: 0, + RequestedCapacityMin: 107374182, + RequestedCapacityMax: 107374182, + RequestedCapabilities: []*structs.CSIVolumeCapability{ + { + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + }, + { + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, + }, + { + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, + }, + }, + WriteAllocs: map[string]*structs.Allocation{ + allocID0: nil, + alloc1.ID: nil, + allocID2: nil, + }, + WriteClaims: map[string]*structs.CSIVolumeClaim{ + allocID0: { + AllocationID: allocID0, + NodeID: nodes[0].ID, + Mode: structs.CSIVolumeClaimWrite, + AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + State: structs.CSIVolumeClaimStateTaken, + }, + alloc1.ID: { + AllocationID: alloc1.ID, + NodeID: nodes[1].ID, + Mode: structs.CSIVolumeClaimWrite, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + State: structs.CSIVolumeClaimStateTaken, + }, + allocID2: { + AllocationID: allocID2, + NodeID: nodes[2].ID, + Mode: structs.CSIVolumeClaimWrite, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + State: structs.CSIVolumeClaimStateTaken, + }, + }, + Schedulable: true, + PluginID: pluginID, + Provider: pluginID, + ProviderVersion: "1.4.3", + ControllerRequired: true, + ControllersHealthy: 2, + ControllersExpected: 2, + NodesHealthy: 2, + NodesExpected: 0, + } + + err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + if err != nil { + return err + } + + return nil +} diff --git a/nomad/volumewatcher/volume_watcher_test.go b/nomad/volumewatcher/volume_watcher_test.go index 848ca58b9..4e8a556a4 100644 --- a/nomad/volumewatcher/volume_watcher_test.go +++ b/nomad/volumewatcher/volume_watcher_test.go @@ -75,3 +75,32 @@ func TestVolumeWatch_Reap(t *testing.T) { require.NoError(err) require.Len(vol.PastClaims, 2) // alloc claim + GC claim } + +func TestVolumeReapBadState(t *testing.T) { + + store := state.TestStateStore(t) + err := state.TestBadCSIState(t, store) + require.NoError(t, err) + srv := &MockRPCServer{ + state: store, + } + + vol, err := srv.state.CSIVolumeByID(nil, + structs.DefaultNamespace, "csi-volume-nfs0") + require.NoError(t, err) + srv.state.CSIVolumeDenormalize(nil, vol) + + ctx, exitFn := context.WithCancel(context.Background()) + w := &volumeWatcher{ + v: vol, + rpc: srv, + state: srv.State(), + ctx: ctx, + exitFn: exitFn, + logger: testlog.HCLogger(t), + } + + err = w.volumeReapImpl(vol) + require.NoError(t, err) + require.Equal(t, 2, srv.countCSIUnpublish) +} diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 7f0365be3..2271c2f20 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -70,10 +70,15 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { alloc.ClientStatus = structs.AllocClientStatusComplete vol := testVolume(plugin, alloc, node.ID) + index++ + err := srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index, + []*structs.Allocation{alloc}) + require.NoError(err) + watcher.SetEnabled(true, srv.State(), "") index++ - err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + err = srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) require.NoError(err) // we should get or start up a watcher when we get an update for diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 206544249..3b10331c5 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -15,17 +15,18 @@ import ( ) const ( - FilterConstraintHostVolumes = "missing compatible host volumes" - FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s" - FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s" - FilterConstraintCSIPluginMaxVolumesTemplate = "CSI plugin %s has the maximum number of volumes on client %s" - FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed" - FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s" - FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s is unschedulable or has exhausted its available reader claims" - FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s is unschedulable or is read-only" - FilterConstraintCSIVolumeInUseTemplate = "CSI volume %s has exhausted its available writer claims" // - FilterConstraintDrivers = "missing drivers" - FilterConstraintDevices = "missing devices" + FilterConstraintHostVolumes = "missing compatible host volumes" + FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s" + FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s" + FilterConstraintCSIPluginMaxVolumesTemplate = "CSI plugin %s has the maximum number of volumes on client %s" + FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed" + FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s" + FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s is unschedulable or has exhausted its available reader claims" + FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s is unschedulable or is read-only" + FilterConstraintCSIVolumeInUseTemplate = "CSI volume %s has exhausted its available writer claims" + FilterConstraintCSIVolumeGCdAllocationTemplate = "CSI volume %s has exhausted its available writer claims and is claimed by a garbage collected allocation %s; waiting for claim to be released" + FilterConstraintDrivers = "missing drivers" + FilterConstraintDevices = "missing devices" ) var ( @@ -320,11 +321,20 @@ func (c *CSIVolumeChecker) isFeasible(n *structs.Node) (bool, string) { return false, fmt.Sprintf(FilterConstraintCSIVolumeNoWriteTemplate, vol.ID) } if !vol.WriteFreeClaims() { - // Check the blocking allocations to see if they belong to this job for id := range vol.WriteAllocs { a, err := c.ctx.State().AllocByID(ws, id) - if err != nil || a == nil || - a.Namespace != c.namespace || a.JobID != c.jobID { + // the alloc for this blocking claim has been + // garbage collected but the volumewatcher hasn't + // finished releasing the claim (and possibly + // detaching the volume), so we need to block + // until it can be scheduled + if err != nil || a == nil { + return false, fmt.Sprintf( + FilterConstraintCSIVolumeGCdAllocationTemplate, vol.ID, id) + } else if a.Namespace != c.namespace || a.JobID != c.jobID { + // the blocking claim is for another live job + // so it's legitimately blocking more write + // claims return false, fmt.Sprintf( FilterConstraintCSIVolumeInUseTemplate, vol.ID) } From c67c31e54370fd49387bf4f63b31e7284b301a4d Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 27 Jan 2022 10:05:41 -0500 Subject: [PATCH 04/19] csi: ensure that PastClaims are populated with correct mode (#11932) In the client's `(*csiHook) Postrun()` method, we make an unpublish RPC that includes a claim in the `CSIVolumeClaimStateUnpublishing` state and using the mode from the client. But then in the `(*CSIVolume) Unpublish` RPC handler, we query the volume from the state store (because we only get an ID from the client). And when we make the client RPC for the node unpublish step, we use the _current volume's_ view of the mode. If the volume's mode has been changed before the old allocations can have their claims released, then we end up making a CSI RPC that will never succeed. Why does this code path get the mode from the volume and not the claim? Because the claim written by the GC job in `(*CoreScheduler) csiVolumeClaimGC` doesn't have a mode. Instead it just writes a claim in the unpublishing state to ensure the volumewatcher detects a "past claim" change and reaps all the claims on the volumes. Fix this by ensuring that the `CSIVolumeDenormalize` creates past claims for all nil allocations with a correct access mode set. --- nomad/state/state_store.go | 138 ++++++++++++++++++------------------- nomad/state/testing.go | 5 +- 2 files changed, 72 insertions(+), 71 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 9b8cfa4f2..78f98bcb1 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2194,7 +2194,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st // we return the volume with the plugins denormalized by default, // because the scheduler needs them for feasibility checking - return s.CSIVolumeDenormalizePluginsTxn(txn, vol.Copy()) + return s.csiVolumeDenormalizePluginsTxn(txn, vol.Copy()) } // CSIVolumesByPluginID looks up csi_volumes by pluginID. Caller should @@ -2326,11 +2326,11 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s } } - volume, err := s.CSIVolumeDenormalizePluginsTxn(txn, orig.Copy()) + volume, err := s.csiVolumeDenormalizePluginsTxn(txn, orig.Copy()) if err != nil { return err } - volume, err = s.CSIVolumeDenormalizeTxn(txn, nil, volume) + volume, err = s.csiVolumeDenormalizeTxn(txn, nil, volume) if err != nil { return err } @@ -2414,7 +2414,7 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s // volSafeToForce checks if the any of the remaining allocations // are in a non-terminal state. func (s *StateStore) volSafeToForce(txn Txn, v *structs.CSIVolume) bool { - vol, err := s.CSIVolumeDenormalizeTxn(txn, nil, v) + vol, err := s.csiVolumeDenormalizeTxn(txn, nil, v) if err != nil { return false } @@ -2443,15 +2443,12 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs } txn := s.db.ReadTxn() defer txn.Abort() - return s.CSIVolumeDenormalizePluginsTxn(txn, vol) + return s.csiVolumeDenormalizePluginsTxn(txn, vol) } -// CSIVolumeDenormalizePluginsTxn returns a CSIVolume with current health and -// plugins, but without allocations. -// Use this for current volume metadata, handling lists of volumes. -// Use CSIVolumeDenormalize for volumes containing both health and current -// allocations. -func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) { +// csiVolumeDenormalizePluginsTxn implements +// CSIVolumeDenormalizePlugins, inside a transaction. +func (s *StateStore) csiVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) { if vol == nil { return nil, nil } @@ -2484,80 +2481,83 @@ func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVol return vol, nil } -// CSIVolumeDenormalize returns a CSIVolume with allocations +// CSIVolumeDenormalize returns a CSIVolume with its current +// Allocations and Claims, including creating new PastClaims for +// terminal or garbage collected allocations. This ensures we have a +// consistent state. Note that it mutates the original volume and so +// should always be called on a Copy after reading from the state +// store. func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { txn := s.db.ReadTxn() - return s.CSIVolumeDenormalizeTxn(txn, ws, vol) + return s.csiVolumeDenormalizeTxn(txn, ws, vol) } -// CSIVolumeDenormalizeTxn populates a CSIVolume with allocations -func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { +// csiVolumeDenormalizeTxn implements CSIVolumeDenormalize inside a transaction +func (s *StateStore) csiVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { if vol == nil { return nil, nil } - for id := range vol.ReadAllocs { - a, err := s.allocByIDImpl(txn, ws, id) - if err != nil { - return nil, err - } - if a != nil { - vol.ReadAllocs[id] = a - // COMPAT(1.0): the CSIVolumeClaim fields were added - // after 0.11.1, so claims made before that may be - // missing this value. (same for WriteAlloc below) - if _, ok := vol.ReadClaims[id]; !ok { - vol.ReadClaims[id] = &structs.CSIVolumeClaim{ + + // note: denormalize mutates the maps we pass in! + denormalize := func( + currentAllocs map[string]*structs.Allocation, + currentClaims, pastClaims map[string]*structs.CSIVolumeClaim, + fallbackMode structs.CSIVolumeClaimMode) error { + + for id := range currentAllocs { + a, err := s.allocByIDImpl(txn, ws, id) + if err != nil { + return err + } + pastClaim := pastClaims[id] + currentClaim := currentClaims[id] + if currentClaim == nil { + // COMPAT(1.4.0): the CSIVolumeClaim fields were added + // after 0.11.1, so claims made before that may be + // missing this value. No clusters should see this + // anymore, so warn nosily in the logs so that + // operators ask us about it. Remove this block and + // the now-unused fallbackMode parameter, and return + // an error if currentClaim is nil in 1.4.0 + s.logger.Warn("volume was missing claim for allocation", + "volume_id", vol.ID, "alloc", id) + currentClaim = &structs.CSIVolumeClaim{ AllocationID: a.ID, NodeID: a.NodeID, - Mode: structs.CSIVolumeClaimRead, + Mode: fallbackMode, State: structs.CSIVolumeClaimStateTaken, } + currentClaims[id] = currentClaim } - } else if _, ok := vol.PastClaims[id]; !ok { - // ensure that any allocs that have been GC'd since - // our last read are marked as past claims - vol.PastClaims[id] = &structs.CSIVolumeClaim{ - AllocationID: id, - Mode: structs.CSIVolumeClaimRead, - State: structs.CSIVolumeClaimStateUnpublishing, - } - readClaim := vol.ReadClaims[id] - if readClaim != nil { - vol.PastClaims[id].NodeID = readClaim.NodeID + + currentAllocs[id] = a + if a == nil && pastClaim == nil { + // the alloc is garbage collected but nothing has written a PastClaim, + // so create one now + pastClaim = &structs.CSIVolumeClaim{ + AllocationID: id, + NodeID: currentClaim.NodeID, + Mode: currentClaim.Mode, + State: structs.CSIVolumeClaimStateUnpublishing, + AccessMode: currentClaim.AccessMode, + AttachmentMode: currentClaim.AttachmentMode, + } + pastClaims[id] = pastClaim } + } + return nil } - for id := range vol.WriteAllocs { - a, err := s.allocByIDImpl(txn, ws, id) - if err != nil { - return nil, err - } - if a != nil { - vol.WriteAllocs[id] = a - if _, ok := vol.WriteClaims[id]; !ok { - vol.WriteClaims[id] = &structs.CSIVolumeClaim{ - AllocationID: a.ID, - NodeID: a.NodeID, - Mode: structs.CSIVolumeClaimWrite, - State: structs.CSIVolumeClaimStateTaken, - } - } - } else if _, ok := vol.PastClaims[id]; !ok { - // ensure that any allocs that have been GC'd since - // our last read are marked as past claims - - vol.PastClaims[id] = &structs.CSIVolumeClaim{ - AllocationID: id, - Mode: structs.CSIVolumeClaimWrite, - State: structs.CSIVolumeClaimStateUnpublishing, - } - writeClaim := vol.WriteClaims[id] - if writeClaim != nil { - vol.PastClaims[id].NodeID = writeClaim.NodeID - } - - } + err := denormalize(vol.ReadAllocs, vol.ReadClaims, vol.PastClaims, + structs.CSIVolumeClaimRead) + if err != nil { + return nil, err + } + err = denormalize(vol.WriteAllocs, vol.WriteClaims, vol.PastClaims, + structs.CSIVolumeClaimWrite) + if err != nil { + return nil, err } // COMPAT: the AccessMode and AttachmentMode fields were added to claims diff --git a/nomad/state/testing.go b/nomad/state/testing.go index 0d570a95a..c7a2f3e8e 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -221,8 +221,8 @@ func TestBadCSIState(t testing.TB, store *StateStore) error { allocID2 := uuid.Generate() // nil alloc alloc1 := mock.Alloc() - alloc1.ClientStatus = "complete" - alloc1.DesiredStatus = "stop" + alloc1.ClientStatus = structs.AllocClientStatusRunning + alloc1.DesiredStatus = structs.AllocDesiredStatusRun // Insert allocs into the state store err := store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1}) @@ -303,6 +303,7 @@ func TestBadCSIState(t testing.TB, store *StateStore) error { NodesHealthy: 2, NodesExpected: 0, } + vol = vol.Copy() // canonicalize err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) if err != nil { From 5773fc93a2af4aa969fe7a784cc7c226d98bbfc1 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 27 Jan 2022 10:39:08 -0500 Subject: [PATCH 05/19] CSI: move terminal alloc handling into denormalization (#11931) * The volume claim GC method and volumewatcher both have logic collecting terminal allocations that duplicates most of the logic that's now in the state store's `CSIVolumeDenormalize` method. Copy this logic into the state store so that all code paths have the same view of the past claims. * Remove logic in the volume claim GC that now lives in the state store's `CSIVolumeDenormalize` method. * Remove logic in the volumewatcher that now lives in the state store's `CSIVolumeDenormalize` method. * Remove logic in the node unpublish RPC that now lives in the state store's `CSIVolumeDenormalize` method. --- nomad/core_sched.go | 33 ++---------------- nomad/csi_endpoint.go | 38 +++++++-------------- nomad/state/state_store.go | 2 +- nomad/volumewatcher/volume_watcher.go | 15 ++------ nomad/volumewatcher/volume_watcher_test.go | 4 +++ nomad/volumewatcher/volumes_watcher_test.go | 2 +- 6 files changed, 24 insertions(+), 70 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 4306783ef..f6aa3c112 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -773,7 +773,6 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { "index", oldThreshold, "csi_volume_claim_gc_threshold", c.srv.config.CSIVolumeClaimGCThreshold) -NEXT_VOLUME: for i := iter.Next(); i != nil; i = iter.Next() { vol := i.(*structs.CSIVolume) @@ -782,38 +781,12 @@ NEXT_VOLUME: continue } - // TODO(tgross): consider moving the TerminalStatus check into - // the denormalize volume logic so that we can just check the - // volume for past claims - // we only call the claim release RPC if the volume has claims // that no longer have valid allocations. otherwise we'd send // out a lot of do-nothing RPCs. - for id := range vol.ReadClaims { - alloc, err := c.snap.AllocByID(ws, id) - if err != nil { - return err - } - if alloc == nil || alloc.TerminalStatus() { - err = gcClaims(vol.Namespace, vol.ID) - if err != nil { - return err - } - goto NEXT_VOLUME - } - } - for id := range vol.WriteClaims { - alloc, err := c.snap.AllocByID(ws, id) - if err != nil { - return err - } - if alloc == nil || alloc.TerminalStatus() { - err = gcClaims(vol.Namespace, vol.ID) - if err != nil { - return err - } - goto NEXT_VOLUME - } + vol, err := c.snap.CSIVolumeDenormalize(ws, vol) + if err != nil { + return err } if len(vol.PastClaims) > 0 { err = gcClaims(vol.Namespace, vol.ID) diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index fea730dfc..ac63b8fa5 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -615,39 +615,25 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C return v.checkpointClaim(vol, claim) } - // The RPC sent from the 'nomad node detach' command won't have an + // The RPC sent from the 'nomad node detach' command or GC won't have an // allocation ID set so we try to unpublish every terminal or invalid - // alloc on the node - allocIDs := []string{} + // alloc on the node, all of which will be in PastClaims after denormalizing state := v.srv.fsm.State() vol, err := state.CSIVolumeDenormalize(memdb.NewWatchSet(), vol) if err != nil { return err } - for allocID, alloc := range vol.ReadAllocs { - if alloc == nil { - rclaim, ok := vol.ReadClaims[allocID] - if ok && rclaim.NodeID == claim.NodeID { - allocIDs = append(allocIDs, allocID) - } - } else if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() { - allocIDs = append(allocIDs, allocID) - } - } - for allocID, alloc := range vol.WriteAllocs { - if alloc == nil { - wclaim, ok := vol.WriteClaims[allocID] - if ok && wclaim.NodeID == claim.NodeID { - allocIDs = append(allocIDs, allocID) - } - } else if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() { - allocIDs = append(allocIDs, allocID) + + claimsToUnpublish := []*structs.CSIVolumeClaim{} + for _, pastClaim := range vol.PastClaims { + if claim.NodeID == pastClaim.NodeID { + claimsToUnpublish = append(claimsToUnpublish, pastClaim) } } + var merr multierror.Error - for _, allocID := range allocIDs { - claim.AllocationID = allocID - err := v.nodeUnpublishVolumeImpl(vol, claim) + for _, pastClaim := range claimsToUnpublish { + err := v.nodeUnpublishVolumeImpl(vol, pastClaim) if err != nil { merr.Errors = append(merr.Errors, err) } @@ -668,8 +654,8 @@ func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *struc ExternalID: vol.RemoteID(), AllocID: claim.AllocationID, NodeID: claim.NodeID, - AttachmentMode: vol.AttachmentMode, - AccessMode: vol.AccessMode, + AttachmentMode: claim.AttachmentMode, + AccessMode: claim.AccessMode, ReadOnly: claim.Mode == structs.CSIVolumeClaimRead, } err := v.srv.RPC("ClientCSI.NodeDetachVolume", diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 78f98bcb1..9b451f101 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2531,7 +2531,7 @@ func (s *StateStore) csiVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st } currentAllocs[id] = a - if a == nil && pastClaim == nil { + if (a == nil || a.TerminalStatus()) && pastClaim == nil { // the alloc is garbage collected but nothing has written a PastClaim, // so create one now pastClaim = &structs.CSIVolumeClaim{ diff --git a/nomad/volumewatcher/volume_watcher.go b/nomad/volumewatcher/volume_watcher.go index 28fc94f35..fe69bca41 100644 --- a/nomad/volumewatcher/volume_watcher.go +++ b/nomad/volumewatcher/volume_watcher.go @@ -177,17 +177,10 @@ func (vw *volumeWatcher) isUnclaimed(vol *structs.CSIVolume) bool { return len(vol.ReadClaims) == 0 && len(vol.WriteClaims) == 0 && len(vol.PastClaims) == 0 } +// volumeReapImpl unpublished all the volume's PastClaims. PastClaims +// will be populated from nil or terminal allocs when we call +// CSIVolumeDenormalize(), so this assumes we've done so in the caller func (vw *volumeWatcher) volumeReapImpl(vol *structs.CSIVolume) error { - - // PastClaims written by a volume GC core job will have no allocation, - // so we need to find out which allocs are eligible for cleanup. - for _, claim := range vol.PastClaims { - if claim.AllocationID == "" { - vol = vw.collectPastClaims(vol) - break // only need to collect once - } - } - var result *multierror.Error for _, claim := range vol.PastClaims { err := vw.unpublish(vol, claim) @@ -195,9 +188,7 @@ func (vw *volumeWatcher) volumeReapImpl(vol *structs.CSIVolume) error { result = multierror.Append(result, err) } } - return result.ErrorOrNil() - } func (vw *volumeWatcher) collectPastClaims(vol *structs.CSIVolume) *structs.CSIVolume { diff --git a/nomad/volumewatcher/volume_watcher_test.go b/nomad/volumewatcher/volume_watcher_test.go index 4e8a556a4..4bb4ddae4 100644 --- a/nomad/volumewatcher/volume_watcher_test.go +++ b/nomad/volumewatcher/volume_watcher_test.go @@ -37,6 +37,7 @@ func TestVolumeWatch_Reap(t *testing.T) { logger: testlog.HCLogger(t), } + vol, _ = srv.State().CSIVolumeDenormalize(nil, vol.Copy()) err := w.volumeReapImpl(vol) require.NoError(err) @@ -48,6 +49,7 @@ func TestVolumeWatch_Reap(t *testing.T) { State: structs.CSIVolumeClaimStateNodeDetached, }, } + vol, _ = srv.State().CSIVolumeDenormalize(nil, vol.Copy()) err = w.volumeReapImpl(vol) require.NoError(err) require.Len(vol.PastClaims, 1) @@ -59,6 +61,7 @@ func TestVolumeWatch_Reap(t *testing.T) { Mode: structs.CSIVolumeClaimGC, }, } + vol, _ = srv.State().CSIVolumeDenormalize(nil, vol.Copy()) err = w.volumeReapImpl(vol) require.NoError(err) require.Len(vol.PastClaims, 2) // alloc claim + GC claim @@ -71,6 +74,7 @@ func TestVolumeWatch_Reap(t *testing.T) { Mode: structs.CSIVolumeClaimRead, }, } + vol, _ = srv.State().CSIVolumeDenormalize(nil, vol.Copy()) err = w.volumeReapImpl(vol) require.NoError(err) require.Len(vol.PastClaims, 2) // alloc claim + GC claim diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 2271c2f20..c66411631 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -67,7 +67,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { plugin := mock.CSIPlugin() node := testNode(plugin, srv.State()) alloc := mock.Alloc() - alloc.ClientStatus = structs.AllocClientStatusComplete + alloc.ClientStatus = structs.AllocClientStatusRunning vol := testVolume(plugin, alloc, node.ID) index++ From 622ed093aedfebf1f0c6e3ae47eee1e9310a89ec Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 28 Jan 2022 08:30:31 -0500 Subject: [PATCH 06/19] CSI: node unmount from the client before unpublish RPC (#11892) When an allocation stops, the `csi_hook` makes an unpublish RPC to the servers to unpublish via the CSI RPCs: first to the node plugins and then the controller plugins. The controller RPCs must happen after the node RPCs so that the node has had a chance to unmount the volume before the controller tries to detach the associated device. But the client has local access to the node plugins and can independently determine if it's safe to send unpublish RPC to those plugins. This will allow the server to treat the node plugin as abandoned if a client is disconnected and `stop_on_client_disconnect` is set. This will let the server try to send unpublish RPCs to the controller plugins, under the assumption that the client will be trying to unmount the volume on its end first. Note that the CSI `NodeUnpublishVolume`/`NodeUnstageVolume` RPCs can return ignorable errors in the case where the volume has already been unmounted from the node. Handle all other errors by retrying until we get success so as to give operators the opportunity to reschedule a failed node plugin (ex. in the case where they accidentally drained a node without `-ignore-system`). Fan-out the work for each volume into its own goroutine so that we can release a subset of volumes if only one is stuck. --- client/allocrunner/csi_hook.go | 160 ++++++++++++++++++---- client/allocrunner/csi_hook_test.go | 10 +- client/pluginmanager/csimanager/volume.go | 7 +- 3 files changed, 143 insertions(+), 34 deletions(-) diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index e7e7385a3..6fb1b2866 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -3,6 +3,8 @@ package allocrunner import ( "context" "fmt" + "sync" + "time" hclog "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" @@ -24,7 +26,9 @@ type csiHook struct { updater hookResourceSetter nodeSecret string - volumeRequests map[string]*volumeAndRequest + volumeRequests map[string]*volumeAndRequest + maxBackoffInterval time.Duration + maxBackoffDuration time.Duration } // implemented by allocrunner @@ -42,6 +46,8 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M updater: updater, nodeSecret: nodeSecret, volumeRequests: map[string]*volumeAndRequest{}, + maxBackoffInterval: time.Minute, + maxBackoffDuration: time.Hour * 24, } } @@ -103,41 +109,43 @@ func (c *csiHook) Postrun() error { return nil } - var mErr *multierror.Error + var wg sync.WaitGroup + errs := make(chan error, len(c.volumeRequests)) for _, pair := range c.volumeRequests { + wg.Add(1) - mode := structs.CSIVolumeClaimRead - if !pair.request.ReadOnly { - mode = structs.CSIVolumeClaimWrite - } + // CSI RPCs can potentially fail for a very long time if a + // node plugin has failed. split the work into goroutines so + // that operators could potentially reuse one of a set of + // volumes even if this hook is stuck waiting on the others + go func(pair *volumeAndRequest) { + defer wg.Done() - source := pair.request.Source - if pair.request.PerAlloc { - // NOTE: PerAlloc can't be set if we have canaries - source = source + structs.AllocSuffix(c.alloc.Name) - } + // we can recover an unmount failure if the operator + // brings the plugin back up, so retry every few minutes + // but eventually give up + err := c.unmountWithRetry(pair) + if err != nil { + errs <- err + return + } - req := &structs.CSIVolumeUnpublishRequest{ - VolumeID: source, - Claim: &structs.CSIVolumeClaim{ - AllocationID: c.alloc.ID, - NodeID: c.alloc.NodeID, - Mode: mode, - State: structs.CSIVolumeClaimStateUnpublishing, - }, - WriteRequest: structs.WriteRequest{ - Region: c.alloc.Job.Region, - Namespace: c.alloc.Job.Namespace, - AuthToken: c.nodeSecret, - }, - } - err := c.rpcClient.RPC("CSIVolume.Unpublish", - req, &structs.CSIVolumeUnpublishResponse{}) - if err != nil { - mErr = multierror.Append(mErr, err) - } + // we can't recover from this RPC error client-side; the + // volume claim GC job will have to clean up for us once + // the allocation is marked terminal + errs <- c.unpublish(pair) + }(pair) } + + wg.Wait() + close(errs) // so we don't block waiting if there were no errors + + var mErr *multierror.Error + for err := range errs { + mErr = multierror.Append(mErr, err) + } + return mErr.ErrorOrNil() } @@ -231,3 +239,95 @@ func (c *csiHook) shouldRun() bool { return false } + +func (c *csiHook) unpublish(pair *volumeAndRequest) error { + + mode := structs.CSIVolumeClaimRead + if !pair.request.ReadOnly { + mode = structs.CSIVolumeClaimWrite + } + + source := pair.request.Source + if pair.request.PerAlloc { + // NOTE: PerAlloc can't be set if we have canaries + source = source + structs.AllocSuffix(c.alloc.Name) + } + + req := &structs.CSIVolumeUnpublishRequest{ + VolumeID: source, + Claim: &structs.CSIVolumeClaim{ + AllocationID: c.alloc.ID, + NodeID: c.alloc.NodeID, + Mode: mode, + State: structs.CSIVolumeClaimStateUnpublishing, + }, + WriteRequest: structs.WriteRequest{ + Region: c.alloc.Job.Region, + Namespace: c.alloc.Job.Namespace, + AuthToken: c.nodeSecret, + }, + } + + return c.rpcClient.RPC("CSIVolume.Unpublish", + req, &structs.CSIVolumeUnpublishResponse{}) + +} + +// unmountWithRetry tries to unmount/unstage the volume, retrying with +// exponential backoff capped to a maximum interval +func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error { + + // note: allocrunner hooks don't have access to the client's + // shutdown context, just the allocrunner's shutdown; if we make + // it available in the future we should thread it through here so + // that retry can exit gracefully instead of dropping the + // in-flight goroutine + ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration) + defer cancel() + var err error + backoff := time.Second + ticker := time.NewTicker(backoff) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return err + case <-ticker.C: + } + + err = c.unmountImpl(pair) + if err == nil { + break + } + + if backoff < c.maxBackoffInterval { + backoff = backoff * 2 + if backoff > c.maxBackoffInterval { + backoff = c.maxBackoffInterval + } + } + ticker.Reset(backoff) + } + return nil +} + +// unmountImpl implements the call to the CSI plugin manager to +// unmount the volume. Each retry will write an "Unmount volume" +// NodeEvent +func (c *csiHook) unmountImpl(pair *volumeAndRequest) error { + + mounter, err := c.csimanager.MounterForPlugin(context.TODO(), pair.volume.PluginID) + if err != nil { + return err + } + + usageOpts := &csimanager.UsageOptions{ + ReadOnly: pair.request.ReadOnly, + AttachmentMode: pair.request.AttachmentMode, + AccessMode: pair.request.AccessMode, + MountOptions: pair.request.MountOptions, + } + + return mounter.UnmountVolume(context.TODO(), + pair.volume.ID, pair.volume.RemoteID(), c.alloc.ID, usageOpts) +} diff --git a/client/allocrunner/csi_hook_test.go b/client/allocrunner/csi_hook_test.go index 045ef3e0a..d05d07385 100644 --- a/client/allocrunner/csi_hook_test.go +++ b/client/allocrunner/csi_hook_test.go @@ -5,6 +5,7 @@ import ( "fmt" "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" @@ -59,7 +60,7 @@ func TestCSIHook(t *testing.T) { "test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)}, }, expectedMountCalls: 1, - expectedUnmountCalls: 0, // not until this is done client-side + expectedUnmountCalls: 1, expectedClaimCalls: 1, expectedUnpublishCalls: 1, }, @@ -83,7 +84,7 @@ func TestCSIHook(t *testing.T) { "test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)}, }, expectedMountCalls: 1, - expectedUnmountCalls: 0, // not until this is done client-side + expectedUnmountCalls: 1, expectedClaimCalls: 1, expectedUnpublishCalls: 1, }, @@ -122,7 +123,7 @@ func TestCSIHook(t *testing.T) { // "test-alloc-dir/%s/testvolume0/ro-file-system-multi-node-reader-only", alloc.ID)}, // }, // expectedMountCalls: 1, - // expectedUnmountCalls: 0, // not until this is done client-side + // expectedUnmountCalls: 1, // expectedClaimCalls: 1, // expectedUnpublishCalls: 1, // }, @@ -144,6 +145,9 @@ func TestCSIHook(t *testing.T) { }, } hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret") + hook.maxBackoffInterval = 100 * time.Millisecond + hook.maxBackoffDuration = 2 * time.Second + require.NotNil(t, hook) require.NoError(t, hook.Prerun()) diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index c359f32e3..599c3fe06 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -348,11 +348,16 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo } } + if errors.Is(err, structs.ErrCSIClientRPCIgnorable) { + logger.Trace("unmounting volume failed with ignorable error", "error", err) + err = nil + } + event := structs.NewNodeEvent(). SetSubsystem(structs.NodeEventSubsystemStorage). SetMessage("Unmount volume"). AddDetail("volume_id", volID) - if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) { + if err == nil { event.AddDetail("success", "true") } else { event.AddDetail("success", "false") From d8a74efb07ec339d01df5fdcac114265f0003a8d Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 28 Jan 2022 14:50:54 -0500 Subject: [PATCH 07/19] set LAST_RELEASE to 1.2.4 for the 1.2.5 release branch --- GNUmakefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/GNUmakefile b/GNUmakefile index 56adc8949..84a02085e 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -32,7 +32,7 @@ PROTO_COMPARE_TAG ?= v1.0.3$(if $(findstring ent,$(GO_TAGS)),+ent,) # LAST_RELEASE is the git sha of the latest release corresponding to this branch. main should have the latest # published release, but backport branches should point to the parent tag (e.g. 1.0.8 in release-1.0.9 after 1.1.0 is cut). -LAST_RELEASE ?= v1.2.3 +LAST_RELEASE ?= v1.2.4 default: help From 6af1b359ed934d26880e918bbeb7ca607be37d20 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 28 Jan 2022 15:04:32 -0500 Subject: [PATCH 08/19] docs: missing changelog for #11892 (#11959) --- .changelog/11892.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/11892.txt diff --git a/.changelog/11892.txt b/.changelog/11892.txt new file mode 100644 index 000000000..4a6dc2cb7 --- /dev/null +++ b/.changelog/11892.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Unmount volumes from the client before sending unpublish RPC +``` From 18c528313c2fb3897dc414bb5f6e6cca6f32e297 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 28 Jan 2022 14:55:24 -0500 Subject: [PATCH 09/19] docs: add 1.2.5 to changelog --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 00b629e4e..8d97ef019 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +## 1.2.5 (February 1, 2022) + +BUG FIXES: + +* csi: Fixed a bug where garbage collected allocations could block new claims on a volume [[GH-11890](https://github.com/hashicorp/nomad/issues/11890)] +* csi: Fixed a bug where releasing volume claims would fail with ACL errors after leadership transitions. [[GH-11891](https://github.com/hashicorp/nomad/issues/11891)] +* csi: Unmount volumes from the client before sending unpublish RPC [[GH-11892](https://github.com/hashicorp/nomad/issues/11892)] +* template: Fixed a bug where client template configuration that did not include any of the new 1.2.4 configuration options could result in none of the configuration getting set. [[GH-11902](https://github.com/hashicorp/nomad/issues/11902)] + ## 1.2.4 (January 18, 2022) FEATURES: From 8af121bfbece1106148873fabb8b7f3b8956e0b0 Mon Sep 17 00:00:00 2001 From: Nomad Release bot Date: Mon, 31 Jan 2022 14:54:26 +0000 Subject: [PATCH 10/19] Generate files for 1.2.5 release --- version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version/version.go b/version/version.go index 8d6c07b13..00097a920 100644 --- a/version/version.go +++ b/version/version.go @@ -11,7 +11,7 @@ var ( GitDescribe string // The main version number that is being run at the moment. - Version = "1.2.4" + Version = "1.2.5" // A pre-release marker for the version. If this is "" (empty string) // then it means that it is a final release. Otherwise, this is a pre-release From de078e7ac641b2a149a1b0152f3ce3394ce8bd21 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Mon, 31 Jan 2022 11:54:51 -0600 Subject: [PATCH 12/19] client: fix race condition in use of go-getter go-getter creates a circular dependency between a Client and Getter, which means each is inherently thread-unsafe if you try to re-use on or the other. This PR fixes Nomad to no longer make use of the default Getter objects provided by the go-getter package. Nomad must create a new Client object on every artifact download, as the Client object controls the Src and Dst among other things. When Caling Client.Get, the Getter modifies its own Client reference, creating the circular reference and race condition. We can still achieve most of the desired connection caching behavior by re-using a shared HTTP client with transport pooling enabled. --- .changelog/12036.txt | 3 + .../allocrunner/taskrunner/getter/getter.go | 85 +++++++------------ 2 files changed, 36 insertions(+), 52 deletions(-) create mode 100644 .changelog/12036.txt diff --git a/.changelog/12036.txt b/.changelog/12036.txt new file mode 100644 index 000000000..6820782bf --- /dev/null +++ b/.changelog/12036.txt @@ -0,0 +1,3 @@ +```release-note:security +Fix race condition in use of go-getter that could cause a client agent to download the wrong artifact into the wrong destination. [CVE-2022-24686](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-24686) +``` diff --git a/client/allocrunner/taskrunner/getter/getter.go b/client/allocrunner/taskrunner/getter/getter.go index f198a7aae..4bbf7674c 100644 --- a/client/allocrunner/taskrunner/getter/getter.go +++ b/client/allocrunner/taskrunner/getter/getter.go @@ -6,22 +6,20 @@ import ( "net/http" "net/url" "strings" - "sync" + "github.com/hashicorp/go-cleanhttp" gg "github.com/hashicorp/go-getter" "github.com/hashicorp/nomad/nomad/structs" ) -var ( - // getters is the map of getters suitable for Nomad. It is initialized once - // and the lock is used to guard access to it. - getters map[string]gg.Getter - lock sync.Mutex - - // supported is the set of download schemes supported by Nomad - supported = []string{"http", "https", "s3", "hg", "git", "gcs"} -) +// httpClient is a shared HTTP client for use across all http/https Getter +// instantiations. The HTTP client is designed to be thread-safe, and using a pooled +// transport will help reduce excessive connections when clients are downloading lots +// of artifacts. +var httpClient = &http.Client{ + Transport: cleanhttp.DefaultPooledTransport(), +} const ( // gitSSHPrefix is the prefix for downloading via git using ssh @@ -35,53 +33,36 @@ type EnvReplacer interface { ClientPath(string, bool) (string, bool) } -func makeGetters(headers http.Header) map[string]gg.Getter { - getters := make(map[string]gg.Getter, len(supported)) - for _, getter := range supported { - switch { - case getter == "http" && len(headers) > 0: - fallthrough - case getter == "https" && len(headers) > 0: - getters[getter] = &gg.HttpGetter{ - Netrc: true, - Header: headers, - } - default: - if defaultGetter, ok := gg.Getters[getter]; ok { - getters[getter] = defaultGetter - } - } - } - return getters -} - // getClient returns a client that is suitable for Nomad downloading artifacts. func getClient(src string, headers http.Header, mode gg.ClientMode, dst string) *gg.Client { - client := &gg.Client{ - Src: src, - Dst: dst, - Mode: mode, - Umask: 060000000, + return &gg.Client{ + Src: src, + Dst: dst, + Mode: mode, + Umask: 060000000, + Getters: createGetters(headers), } +} - switch len(headers) { - case 0: - // When no headers are present use the memoized getters, creating them - // on demand if they do not exist yet. - lock.Lock() - if getters == nil { - getters = makeGetters(nil) - } - lock.Unlock() - client.Getters = getters - default: - // When there are headers present, we must create fresh gg.HttpGetter - // objects, because that is where gg stores the headers to use in its - // artifact HTTP GET requests. - client.Getters = makeGetters(headers) +func createGetters(header http.Header) map[string]gg.Getter { + httpGetter := &gg.HttpGetter{ + Netrc: true, + Client: httpClient, + Header: header, + } + // Explicitly create fresh set of supported Getter for each Client, because + // go-getter is not thread-safe. Use a shared HTTP client for http/https Getter, + // with pooled transport which is thread-safe. + // + // If a getter type is not listed here, it is not supported (e.g. file). + return map[string]gg.Getter{ + "git": new(gg.GitGetter), + "gcs": new(gg.GCSGetter), + "hg": new(gg.HgGetter), + "s3": new(gg.S3Getter), + "http": httpGetter, + "https": httpGetter, } - - return client } // getGetterUrl returns the go-getter URL to download the artifact. From 437bb4b86d8f5e5e9aebabe06d308cf8c4d29dfb Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Thu, 27 Jan 2022 12:46:56 -0600 Subject: [PATCH 13/19] client: check escaping of alloc dir using symlinks This PR adds symlink resolution when doing validation of paths to ensure they do not escape client allocation directories. --- .changelog/12037.txt | 3 + client/allocdir/alloc_dir.go | 16 +-- client/allocdir/alloc_dir_test.go | 36 +++---- helper/escapingfs/escapes.go | 99 ++++++++++++++++++ helper/escapingfs/escapes_test.go | 162 ++++++++++++++++++++++++++++++ nomad/structs/structs.go | 33 +----- 6 files changed, 295 insertions(+), 54 deletions(-) create mode 100644 .changelog/12037.txt create mode 100644 helper/escapingfs/escapes.go create mode 100644 helper/escapingfs/escapes_test.go diff --git a/.changelog/12037.txt b/.changelog/12037.txt new file mode 100644 index 000000000..7b6a06bf3 --- /dev/null +++ b/.changelog/12037.txt @@ -0,0 +1,3 @@ +```release-note:security +Resolve symlinks to prevent unauthorized access to files outside the allocation directory. [CVE-2022-24683](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-24683) +``` diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 0dc52029c..da05aacb3 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -6,17 +6,17 @@ import ( "fmt" "io" "io/ioutil" + "net/http" "os" "path/filepath" + "strings" "sync" "time" - "net/http" - "strings" - hclog "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/escapingfs" "github.com/hashicorp/nomad/nomad/structs" "github.com/hpcloud/tail/watch" tomb "gopkg.in/tomb.v1" @@ -350,7 +350,7 @@ func (d *AllocDir) Build() error { // List returns the list of files at a path relative to the alloc dir func (d *AllocDir) List(path string) ([]*cstructs.AllocFileInfo, error) { - if escapes, err := structs.PathEscapesAllocDir("", path); err != nil { + if escapes, err := escapingfs.PathEscapesAllocDir(d.AllocDir, "", path); err != nil { return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %v", err) } else if escapes { return nil, fmt.Errorf("Path escapes the alloc directory") @@ -376,7 +376,7 @@ func (d *AllocDir) List(path string) ([]*cstructs.AllocFileInfo, error) { // Stat returns information about the file at a path relative to the alloc dir func (d *AllocDir) Stat(path string) (*cstructs.AllocFileInfo, error) { - if escapes, err := structs.PathEscapesAllocDir("", path); err != nil { + if escapes, err := escapingfs.PathEscapesAllocDir(d.AllocDir, "", path); err != nil { return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %v", err) } else if escapes { return nil, fmt.Errorf("Path escapes the alloc directory") @@ -426,7 +426,7 @@ func detectContentType(fileInfo os.FileInfo, path string) string { // ReadAt returns a reader for a file at the path relative to the alloc dir func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) { - if escapes, err := structs.PathEscapesAllocDir("", path); err != nil { + if escapes, err := escapingfs.PathEscapesAllocDir(d.AllocDir, "", path); err != nil { return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %v", err) } else if escapes { return nil, fmt.Errorf("Path escapes the alloc directory") @@ -457,7 +457,7 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) { // BlockUntilExists blocks until the passed file relative the allocation // directory exists. The block can be cancelled with the passed context. func (d *AllocDir) BlockUntilExists(ctx context.Context, path string) (chan error, error) { - if escapes, err := structs.PathEscapesAllocDir("", path); err != nil { + if escapes, err := escapingfs.PathEscapesAllocDir(d.AllocDir, "", path); err != nil { return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %v", err) } else if escapes { return nil, fmt.Errorf("Path escapes the alloc directory") @@ -483,7 +483,7 @@ func (d *AllocDir) BlockUntilExists(ctx context.Context, path string) (chan erro // allocation directory. The offset should be the last read offset. The context is // used to clean up the watch. func (d *AllocDir) ChangeEvents(ctx context.Context, path string, curOffset int64) (*watch.FileChanges, error) { - if escapes, err := structs.PathEscapesAllocDir("", path); err != nil { + if escapes, err := escapingfs.PathEscapesAllocDir(d.AllocDir, "", path); err != nil { return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %v", err) } else if escapes { return nil, fmt.Errorf("Path escapes the alloc directory") diff --git a/client/allocdir/alloc_dir_test.go b/client/allocdir/alloc_dir_test.go index 86cd8d917..4a876c57b 100644 --- a/client/allocdir/alloc_dir_test.go +++ b/client/allocdir/alloc_dir_test.go @@ -332,28 +332,30 @@ func TestAllocDir_EscapeChecking(t *testing.T) { // Test that `nomad fs` can't read secrets func TestAllocDir_ReadAt_SecretDir(t *testing.T) { - tmp, err := ioutil.TempDir("", "AllocDir") - if err != nil { - t.Fatalf("Couldn't create temp dir: %v", err) - } - defer os.RemoveAll(tmp) + tmp := t.TempDir() d := NewAllocDir(testlog.HCLogger(t), tmp, "test") - if err := d.Build(); err != nil { - t.Fatalf("Build() failed: %v", err) - } - defer d.Destroy() + err := d.Build() + require.NoError(t, err) + defer func() { + _ = d.Destroy() + }() td := d.NewTaskDir(t1.Name) - if err := td.Build(false, nil); err != nil { - t.Fatalf("TaskDir.Build() failed: %v", err) - } + err = td.Build(false, nil) + require.NoError(t, err) - // ReadAt of secret dir should fail - secret := filepath.Join(t1.Name, TaskSecrets, "test_file") - if _, err := d.ReadAt(secret, 0); err == nil || !strings.Contains(err.Error(), "secret file prohibited") { - t.Fatalf("ReadAt of secret file didn't error: %v", err) - } + // something to write and test reading + target := filepath.Join(t1.Name, TaskSecrets, "test_file") + + // create target file in the task secrets dir + full := filepath.Join(d.AllocDir, target) + err = ioutil.WriteFile(full, []byte("hi"), 0600) + require.NoError(t, err) + + // ReadAt of a file in the task secrets dir should fail + _, err = d.ReadAt(target, 0) + require.EqualError(t, err, "Reading secret file prohibited: web/secrets/test_file") } func TestAllocDir_SplitPath(t *testing.T) { diff --git a/helper/escapingfs/escapes.go b/helper/escapingfs/escapes.go new file mode 100644 index 000000000..9296e7743 --- /dev/null +++ b/helper/escapingfs/escapes.go @@ -0,0 +1,99 @@ +package escapingfs + +import ( + "errors" + "os" + "path/filepath" + "strings" +) + +// PathEscapesAllocViaRelative returns if the given path escapes the allocation +// directory using relative paths. +// +// Only for use in server-side validation, where the real filesystem is not available. +// For client-side validation use PathEscapesAllocDir, which includes symlink validation +// as well. +// +// The prefix is joined to the path (e.g. "task/local"), and this function +// checks if path escapes the alloc dir, NOT the prefix directory within the alloc dir. +// With prefix="task/local", it will return false for "../secret", but +// true for "../../../../../../root" path; only the latter escapes the alloc dir. +func PathEscapesAllocViaRelative(prefix, path string) (bool, error) { + // Verify the destination does not escape the task's directory. The "alloc-dir" + // and "alloc-id" here are just placeholders; on a real filesystem they will + // have different names. The names are not important, but rather the number of levels + // in the path they represent. + alloc, err := filepath.Abs(filepath.Join("/", "alloc-dir/", "alloc-id/")) + if err != nil { + return false, err + } + abs, err := filepath.Abs(filepath.Join(alloc, prefix, path)) + if err != nil { + return false, err + } + rel, err := filepath.Rel(alloc, abs) + if err != nil { + return false, err + } + + return strings.HasPrefix(rel, ".."), nil +} + +// pathEscapesBaseViaSymlink returns if path escapes dir, taking into account evaluation +// of symlinks. +// +// The base directory must be an absolute path. +func pathEscapesBaseViaSymlink(base, full string) (bool, error) { + resolveSym, err := filepath.EvalSymlinks(full) + if err != nil { + return false, err + } + + rel, err := filepath.Rel(resolveSym, base) + if err != nil { + return true, nil + } + + // note: this is not the same as !filesystem.IsAbs; we are asking if the relative + // path is descendent of the base path, indicating it does not escape. + isRelative := strings.HasPrefix(rel, "..") || rel == "." + escapes := !isRelative + return escapes, nil +} + +// PathEscapesAllocDir returns true if base/prefix/path escapes the given base directory. +// +// Escaping a directory can be done with relative paths (e.g. ../../ etc.) or by +// using symlinks. This checks both methods. +// +// The base directory must be an absolute path. +func PathEscapesAllocDir(base, prefix, path string) (bool, error) { + full := filepath.Join(base, prefix, path) + + // If base is not an absolute path, the caller passed in the wrong thing. + if !filepath.IsAbs(base) { + return false, errors.New("alloc dir must be absolute") + } + + // Check path does not escape the alloc dir using relative paths. + if escapes, err := PathEscapesAllocViaRelative(prefix, path); err != nil { + return false, err + } else if escapes { + return true, nil + } + + // Check path does not escape the alloc dir using symlinks. + if escapes, err := pathEscapesBaseViaSymlink(base, full); err != nil { + if os.IsNotExist(err) { + // Treat non-existent files as non-errors; perhaps not ideal but we + // have existing features (log-follow) that depend on this. Still safe, + // because we do the symlink check on every ReadAt call also. + return false, nil + } + return false, err + } else if escapes { + return true, nil + } + + return false, nil +} diff --git a/helper/escapingfs/escapes_test.go b/helper/escapingfs/escapes_test.go new file mode 100644 index 000000000..59e74ccb3 --- /dev/null +++ b/helper/escapingfs/escapes_test.go @@ -0,0 +1,162 @@ +package escapingfs + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func setup(t *testing.T) string { + p, err := ioutil.TempDir("", "escapist") + require.NoError(t, err) + return p +} + +func cleanup(t *testing.T, root string) { + err := os.RemoveAll(root) + require.NoError(t, err) +} + +func write(t *testing.T, file, data string) { + err := ioutil.WriteFile(file, []byte(data), 0600) + require.NoError(t, err) +} + +func Test_PathEscapesAllocViaRelative(t *testing.T) { + for _, test := range []struct { + prefix string + path string + exp bool + }{ + // directly under alloc-dir/alloc-id/ + {prefix: "", path: "", exp: false}, + {prefix: "", path: "/foo", exp: false}, + {prefix: "", path: "./", exp: false}, + {prefix: "", path: "../", exp: true}, // at alloc-id/ + + // under alloc-dir/alloc-id// + {prefix: "foo", path: "", exp: false}, + {prefix: "foo", path: "/foo", exp: false}, + {prefix: "foo", path: "../", exp: false}, // at foo/ + {prefix: "foo", path: "../../", exp: true}, // at alloc-id/ + + // under alloc-dir/alloc-id/foo/bar/ + {prefix: "foo/bar", path: "", exp: false}, + {prefix: "foo/bar", path: "/foo", exp: false}, + {prefix: "foo/bar", path: "../", exp: false}, // at bar/ + {prefix: "foo/bar", path: "../../", exp: false}, // at foo/ + {prefix: "foo/bar", path: "../../../", exp: true}, // at alloc-id/ + } { + result, err := PathEscapesAllocViaRelative(test.prefix, test.path) + require.NoError(t, err) + require.Equal(t, test.exp, result) + } +} + +func Test_pathEscapesBaseViaSymlink(t *testing.T) { + t.Run("symlink-escape", func(t *testing.T) { + dir := setup(t) + defer cleanup(t, dir) + + // link from dir/link + link := filepath.Join(dir, "link") + + // link to /tmp + target := filepath.Clean("/tmp") + err := os.Symlink(target, link) + require.NoError(t, err) + + escape, err := pathEscapesBaseViaSymlink(dir, link) + require.NoError(t, err) + require.True(t, escape) + }) + + t.Run("symlink-noescape", func(t *testing.T) { + dir := setup(t) + defer cleanup(t, dir) + + // create a file within dir + target := filepath.Join(dir, "foo") + write(t, target, "hi") + + // link to file within dir + link := filepath.Join(dir, "link") + err := os.Symlink(target, link) + require.NoError(t, err) + + // link to file within dir does not escape dir + escape, err := pathEscapesBaseViaSymlink(dir, link) + require.NoError(t, err) + require.False(t, escape) + }) +} + +func Test_PathEscapesAllocDir(t *testing.T) { + + t.Run("no-escape-root", func(t *testing.T) { + dir := setup(t) + defer cleanup(t, dir) + + escape, err := PathEscapesAllocDir(dir, "", "/") + require.NoError(t, err) + require.False(t, escape) + }) + + t.Run("no-escape", func(t *testing.T) { + dir := setup(t) + defer cleanup(t, dir) + + write(t, filepath.Join(dir, "foo"), "hi") + + escape, err := PathEscapesAllocDir(dir, "", "/foo") + require.NoError(t, err) + require.False(t, escape) + }) + + t.Run("no-escape-no-exist", func(t *testing.T) { + dir := setup(t) + defer cleanup(t, dir) + + escape, err := PathEscapesAllocDir(dir, "", "/no-exist") + require.NoError(t, err) + require.False(t, escape) + }) + + t.Run("symlink-escape", func(t *testing.T) { + dir := setup(t) + defer cleanup(t, dir) + + // link from dir/link + link := filepath.Join(dir, "link") + + // link to /tmp + target := filepath.Clean("/tmp") + err := os.Symlink(target, link) + require.NoError(t, err) + + escape, err := PathEscapesAllocDir(dir, "", "/link") + require.NoError(t, err) + require.True(t, escape) + }) + + t.Run("relative-escape", func(t *testing.T) { + dir := setup(t) + defer cleanup(t, dir) + + escape, err := PathEscapesAllocDir(dir, "", "../../foo") + require.NoError(t, err) + require.True(t, escape) + }) + + t.Run("relative-escape-prefix", func(t *testing.T) { + dir := setup(t) + defer cleanup(t, dir) + + escape, err := PathEscapesAllocDir(dir, "/foo/bar", "../../../foo") + require.NoError(t, err) + require.True(t, escape) + }) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 2c20932a2..2aa0fd883 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -17,7 +17,6 @@ import ( "math" "net" "os" - "path/filepath" "reflect" "regexp" "sort" @@ -25,6 +24,7 @@ import ( "strings" "time" + "github.com/hashicorp/nomad/helper/escapingfs" "golang.org/x/crypto/blake2b" "github.com/hashicorp/cronexpr" @@ -5316,7 +5316,7 @@ func (d *DispatchPayloadConfig) Copy() *DispatchPayloadConfig { func (d *DispatchPayloadConfig) Validate() error { // Verify the destination doesn't escape - escaped, err := PathEscapesAllocDir("task/local/", d.File) + escaped, err := escapingfs.PathEscapesAllocViaRelative("task/local/", d.File) if err != nil { return fmt.Errorf("invalid destination path: %v", err) } else if escaped { @@ -7535,7 +7535,7 @@ func (t *Template) Validate() error { } // Verify the destination doesn't escape - escaped, err := PathEscapesAllocDir("task", t.DestPath) + escaped, err := escapingfs.PathEscapesAllocViaRelative("task", t.DestPath) if err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid destination path: %v", err)) } else if escaped { @@ -8333,31 +8333,6 @@ func (ta *TaskArtifact) Hash() string { return base64.RawStdEncoding.EncodeToString(h.Sum(nil)) } -// PathEscapesAllocDir returns if the given path escapes the allocation -// directory. -// -// The prefix is to joined to the path (e.g. "task/local"), and this function -// checks if path escapes the alloc dir, NOT the prefix directory within the alloc dir. -// With prefix="task/local", it will return false for "../secret", but -// true for "../../../../../../root" path; only the latter escapes the alloc dir -func PathEscapesAllocDir(prefix, path string) (bool, error) { - // Verify the destination doesn't escape the tasks directory - alloc, err := filepath.Abs(filepath.Join("/", "alloc-dir/", "alloc-id/")) - if err != nil { - return false, err - } - abs, err := filepath.Abs(filepath.Join(alloc, prefix, path)) - if err != nil { - return false, err - } - rel, err := filepath.Rel(alloc, abs) - if err != nil { - return false, err - } - - return strings.HasPrefix(rel, ".."), nil -} - func (ta *TaskArtifact) Validate() error { // Verify the source var mErr multierror.Error @@ -8376,7 +8351,7 @@ func (ta *TaskArtifact) Validate() error { ta.GetterMode, GetterModeAny, GetterModeFile, GetterModeDir)) } - escaped, err := PathEscapesAllocDir("task", ta.RelativeDest) + escaped, err := escapingfs.PathEscapesAllocViaRelative("task", ta.RelativeDest) if err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid destination path: %v", err)) } else if escaped { From 15f9d54deaa78c42106f79d618d46533114cf928 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Tue, 1 Feb 2022 18:54:53 -0500 Subject: [PATCH 14/19] api: prevent excessice CPU load on job parse Add new namespace ACL requirement for the /v1/jobs/parse endpoint and return early if HCLv2 parsing fails. The endpoint now requires the new `parse-job` ACL capability or `submit-job`. --- .changelog/12038.txt | 3 + acl/policy.go | 4 +- acl/policy_test.go | 3 + command/agent/agent_endpoint.go | 52 ++++-------- command/agent/http.go | 26 ++++++ command/agent/http_test.go | 58 ++++++++++++++ command/agent/job_endpoint.go | 21 ++++- command/agent/job_endpoint_test.go | 123 +++++++++++++++++++++++++++++ jobspec2/parse.go | 6 ++ jobspec2/parse_test.go | 43 ++++++++++ 10 files changed, 301 insertions(+), 38 deletions(-) create mode 100644 .changelog/12038.txt diff --git a/.changelog/12038.txt b/.changelog/12038.txt new file mode 100644 index 000000000..d9fee4620 --- /dev/null +++ b/.changelog/12038.txt @@ -0,0 +1,3 @@ +```release-note:security +Add ACL requirement and HCL validation to the job parse API endpoint to prevent excessive CPU usage. [CVE-2022-24685](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-24685) +``` diff --git a/acl/policy.go b/acl/policy.go index 07fa36457..95df4a280 100644 --- a/acl/policy.go +++ b/acl/policy.go @@ -26,6 +26,7 @@ const ( NamespaceCapabilityDeny = "deny" NamespaceCapabilityListJobs = "list-jobs" + NamespaceCapabilityParseJob = "parse-job" NamespaceCapabilityReadJob = "read-job" NamespaceCapabilitySubmitJob = "submit-job" NamespaceCapabilityDispatchJob = "dispatch-job" @@ -146,7 +147,7 @@ func (p *PluginPolicy) isValid() bool { // isNamespaceCapabilityValid ensures the given capability is valid for a namespace policy func isNamespaceCapabilityValid(cap string) bool { switch cap { - case NamespaceCapabilityDeny, NamespaceCapabilityListJobs, NamespaceCapabilityReadJob, + case NamespaceCapabilityDeny, NamespaceCapabilityParseJob, NamespaceCapabilityListJobs, NamespaceCapabilityReadJob, NamespaceCapabilitySubmitJob, NamespaceCapabilityDispatchJob, NamespaceCapabilityReadLogs, NamespaceCapabilityReadFS, NamespaceCapabilityAllocLifecycle, NamespaceCapabilityAllocExec, NamespaceCapabilityAllocNodeExec, @@ -166,6 +167,7 @@ func isNamespaceCapabilityValid(cap string) bool { func expandNamespacePolicy(policy string) []string { read := []string{ NamespaceCapabilityListJobs, + NamespaceCapabilityParseJob, NamespaceCapabilityReadJob, NamespaceCapabilityCSIListVolume, NamespaceCapabilityCSIReadVolume, diff --git a/acl/policy_test.go b/acl/policy_test.go index 60e4615ea..9060147d0 100644 --- a/acl/policy_test.go +++ b/acl/policy_test.go @@ -29,6 +29,7 @@ func TestParse(t *testing.T) { Policy: PolicyRead, Capabilities: []string{ NamespaceCapabilityListJobs, + NamespaceCapabilityParseJob, NamespaceCapabilityReadJob, NamespaceCapabilityCSIListVolume, NamespaceCapabilityCSIReadVolume, @@ -78,6 +79,7 @@ func TestParse(t *testing.T) { Policy: PolicyRead, Capabilities: []string{ NamespaceCapabilityListJobs, + NamespaceCapabilityParseJob, NamespaceCapabilityReadJob, NamespaceCapabilityCSIListVolume, NamespaceCapabilityCSIReadVolume, @@ -91,6 +93,7 @@ func TestParse(t *testing.T) { Policy: PolicyWrite, Capabilities: []string{ NamespaceCapabilityListJobs, + NamespaceCapabilityParseJob, NamespaceCapabilityReadJob, NamespaceCapabilityCSIListVolume, NamespaceCapabilityCSIReadVolume, diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 448b158f7..4c4072802 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -16,7 +16,6 @@ import ( "github.com/docker/docker/pkg/ioutils" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" - "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/api" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/command/agent/host" @@ -62,24 +61,7 @@ func (s *HTTPServer) AgentSelfRequest(resp http.ResponseWriter, req *http.Reques return nil, CodedError(405, ErrInvalidMethod) } - var secret string - s.parseToken(req, &secret) - - var aclObj *acl.ACL - var err error - - // Get the member as a server - var member serf.Member - if srv := s.agent.Server(); srv != nil { - member = srv.LocalMember() - aclObj, err = srv.ResolveToken(secret) - } else { - // Not a Server, so use the Client for token resolution. Note - // this gets forwarded to a server with AllowStale = true if - // the local ACL cache TTL has expired (30s by default) - aclObj, err = s.agent.Client().ResolveToken(secret) - } - + aclObj, err := s.ResolveToken(req) if err != nil { return nil, err } @@ -89,6 +71,12 @@ func (s *HTTPServer) AgentSelfRequest(resp http.ResponseWriter, req *http.Reques return nil, structs.ErrPermissionDenied } + // Get the member as a server + var member serf.Member + if srv := s.agent.Server(); srv != nil { + member = srv.LocalMember() + } + self := agentSelf{ Member: nomadMember(member), Stats: s.agent.Stats(), @@ -671,27 +659,19 @@ func (s *HTTPServer) AgentHostRequest(resp http.ResponseWriter, req *http.Reques return nil, CodedError(405, ErrInvalidMethod) } - var secret string - s.parseToken(req, &secret) - - // Check agent read permissions - var aclObj *acl.ACL - var enableDebug bool - var err error - if srv := s.agent.Server(); srv != nil { - aclObj, err = srv.ResolveToken(secret) - enableDebug = srv.GetConfig().EnableDebug - } else { - // Not a Server, so use the Client for token resolution. Note - // this gets forwarded to a server with AllowStale = true if - // the local ACL cache TTL has expired (30s by default) - aclObj, err = s.agent.Client().ResolveToken(secret) - enableDebug = s.agent.Client().GetConfig().EnableDebug - } + aclObj, err := s.ResolveToken(req) if err != nil { return nil, err } + // Check agent read permissions + var enableDebug bool + if srv := s.agent.Server(); srv != nil { + enableDebug = srv.GetConfig().EnableDebug + } else { + enableDebug = s.agent.Client().GetConfig().EnableDebug + } + if (aclObj != nil && !aclObj.AllowAgentRead()) || (aclObj == nil && !enableDebug) { return nil, structs.ErrPermissionDenied diff --git a/command/agent/http.go b/command/agent/http.go index a3c082661..c054d5096 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -23,6 +23,7 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/rs/cors" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper/noxssrw" "github.com/hashicorp/nomad/helper/tlsutil" "github.com/hashicorp/nomad/nomad/structs" @@ -270,6 +271,31 @@ func (s *HTTPServer) Shutdown() { } } +// ResolveToken extracts the ACL token secret ID from the request and +// translates it into an ACL object. Returns nil if ACLs are disabled. +func (s *HTTPServer) ResolveToken(req *http.Request) (*acl.ACL, error) { + var secret string + s.parseToken(req, &secret) + + var aclObj *acl.ACL + var err error + + if srv := s.agent.Server(); srv != nil { + aclObj, err = srv.ResolveToken(secret) + } else { + // Not a Server, so use the Client for token resolution. Note + // this gets forwarded to a server with AllowStale = true if + // the local ACL cache TTL has expired (30s by default) + aclObj, err = s.agent.Client().ResolveToken(secret) + } + + if err != nil { + return nil, fmt.Errorf("failed to resolve ACL token: %v", err) + } + + return aclObj, nil +} + // registerHandlers is used to attach our handlers to the mux func (s HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/jobs", s.wrap(s.JobsRequest)) diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 596c3a58c..dd1521387 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" @@ -1315,6 +1316,57 @@ func TestHTTPServer_Limits_OK(t *testing.T) { } } +func TestHTTPServer_ResolveToken(t *testing.T) { + t.Parallel() + + // Setup two servers, one with ACL enabled and another with ACL disabled. + noACLServer := makeHTTPServer(t, func(c *Config) { + c.ACL = &ACLConfig{Enabled: false} + }) + defer noACLServer.Shutdown() + + ACLServer := makeHTTPServer(t, func(c *Config) { + c.ACL = &ACLConfig{Enabled: true} + }) + defer ACLServer.Shutdown() + + // Register sample token. + state := ACLServer.Agent.server.State() + token := mock.CreatePolicyAndToken(t, state, 1000, "node", mock.NodePolicy(acl.PolicyWrite)) + + // Tests cases. + t.Run("acl disabled", func(t *testing.T) { + req := &http.Request{Body: http.NoBody} + got, err := noACLServer.Server.ResolveToken(req) + require.NoError(t, err) + require.Nil(t, got) + }) + + t.Run("token not found", func(t *testing.T) { + req := &http.Request{ + Body: http.NoBody, + Header: make(map[string][]string), + } + setToken(req, mock.ACLToken()) + got, err := ACLServer.Server.ResolveToken(req) + require.Nil(t, got) + require.Error(t, err) + require.Contains(t, err.Error(), "ACL token not found") + }) + + t.Run("set token", func(t *testing.T) { + req := &http.Request{ + Body: http.NoBody, + Header: make(map[string][]string), + } + setToken(req, token) + got, err := ACLServer.Server.ResolveToken(req) + require.NoError(t, err) + require.NotNil(t, got) + require.True(t, got.AllowNodeWrite()) + }) +} + func Test_IsAPIClientError(t *testing.T) { trueCases := []int{400, 403, 404, 499} for _, c := range trueCases { @@ -1410,6 +1462,12 @@ func setToken(req *http.Request, token *structs.ACLToken) { req.Header.Set("X-Nomad-Token", token.SecretID) } +func setNamespace(req *http.Request, ns string) { + q := req.URL.Query() + q.Add("namespace", ns) + req.URL.RawQuery = q.Encode() +} + func encodeReq(obj interface{}) io.ReadCloser { buf := bytes.NewBuffer(nil) enc := json.NewEncoder(buf) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index b3a4b6bd2..6576cefbe 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/golang/snappy" + "github.com/hashicorp/nomad/acl" api "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/jobspec" @@ -703,6 +704,25 @@ func (s *HTTPServer) JobsParseRequest(resp http.ResponseWriter, req *http.Reques return nil, CodedError(405, ErrInvalidMethod) } + var namespace string + parseNamespace(req, &namespace) + + aclObj, err := s.ResolveToken(req) + if err != nil { + return nil, err + } + + // Check job parse permissions + if aclObj != nil { + hasParseJob := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityParseJob) + hasSubmitJob := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilitySubmitJob) + + allowed := hasParseJob || hasSubmitJob + if !allowed { + return nil, structs.ErrPermissionDenied + } + } + args := &api.JobsParseRequest{} if err := decodeBody(req, &args); err != nil { return nil, CodedError(400, err.Error()) @@ -712,7 +732,6 @@ func (s *HTTPServer) JobsParseRequest(resp http.ResponseWriter, req *http.Reques } var jobStruct *api.Job - var err error if args.HCLv1 { jobStruct, err = jobspec.Parse(strings.NewReader(args.JobHCL)) } else { diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index d0c8a8690..ff4d615a6 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/acl" api "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/mock" @@ -407,6 +408,128 @@ func TestHTTP_JobsParse(t *testing.T) { } }) } + +func TestHTTP_JobsParse_ACL(t *testing.T) { + t.Parallel() + + httpACLTest(t, nil, func(s *TestAgent) { + state := s.Agent.server.State() + + // ACL tokens used in tests. + nodeToken := mock.CreatePolicyAndToken( + t, state, 1000, "node", + mock.NodePolicy(acl.PolicyWrite), + ) + parseJobDevToken := mock.CreatePolicyAndToken( + t, state, 1002, "parse-job-dev", + mock.NamespacePolicy("dev", "", []string{"parse-job"}), + ) + readNsDevToken := mock.CreatePolicyAndToken( + t, state, 1004, "read-dev", + mock.NamespacePolicy("dev", "read", nil), + ) + parseJobDefaultToken := mock.CreatePolicyAndToken( + t, state, 1006, "parse-job-default", + mock.NamespacePolicy("default", "", []string{"parse-job"}), + ) + submitJobDefaultToken := mock.CreatePolicyAndToken( + t, state, 1008, "submit-job-default", + mock.NamespacePolicy("default", "", []string{"submit-job"}), + ) + readNsDefaultToken := mock.CreatePolicyAndToken( + t, state, 1010, "read-default", + mock.NamespacePolicy("default", "read", nil), + ) + + testCases := []struct { + name string + token *structs.ACLToken + namespace string + expectError bool + }{ + { + name: "missing ACL token", + token: nil, + expectError: true, + }, + { + name: "wrong permissions", + token: nodeToken, + expectError: true, + }, + { + name: "wrong namespace", + token: readNsDevToken, + expectError: true, + }, + { + name: "wrong namespace capability", + token: parseJobDevToken, + expectError: true, + }, + { + name: "default namespace read", + token: readNsDefaultToken, + expectError: false, + }, + { + name: "non-default namespace read", + token: readNsDevToken, + namespace: "dev", + expectError: false, + }, + { + name: "default namespace parse-job capability", + token: parseJobDefaultToken, + expectError: false, + }, + { + name: "default namespace submit-job capability", + token: submitJobDefaultToken, + expectError: false, + }, + { + name: "non-default namespace capability", + token: parseJobDevToken, + namespace: "dev", + expectError: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + buf := encodeReq(api.JobsParseRequest{JobHCL: mock.HCL()}) + req, err := http.NewRequest("POST", "/v1/jobs/parse", buf) + require.NoError(t, err) + + if tc.namespace != "" { + setNamespace(req, tc.namespace) + } + + if tc.token != nil { + setToken(req, tc.token) + } + + respW := httptest.NewRecorder() + obj, err := s.Server.JobsParseRequest(respW, req) + + if tc.expectError { + require.Error(t, err) + require.Equal(t, structs.ErrPermissionDenied.Error(), err.Error()) + } else { + require.NoError(t, err) + require.NotNil(t, obj) + + job := obj.(*api.Job) + expected := mock.Job() + require.Equal(t, expected.Name, *job.Name) + require.ElementsMatch(t, expected.Datacenters, job.Datacenters) + } + }) + } + }) +} + func TestHTTP_JobQuery(t *testing.T) { t.Parallel() httpTest(t, nil, func(s *TestAgent) { diff --git a/jobspec2/parse.go b/jobspec2/parse.go index 1a5599fac..1febab617 100644 --- a/jobspec2/parse.go +++ b/jobspec2/parse.go @@ -96,6 +96,12 @@ func decode(c *jobConfig) error { diags = append(diags, ds...) } + // Return early if the input job or variable files are not valid. + // Decoding and evaluating invalid files may result in unexpected results. + if diags.HasErrors() { + return diags + } + diags = append(diags, c.decodeBody(file.Body)...) if diags.HasErrors() { diff --git a/jobspec2/parse_test.go b/jobspec2/parse_test.go index 7457b3b02..2cb0496a9 100644 --- a/jobspec2/parse_test.go +++ b/jobspec2/parse_test.go @@ -374,6 +374,49 @@ job "example" { require.Equal(t, "3", out.TaskGroups[2].Tasks[0].Meta["VERSION"]) } +func TestParse_InvalidHCL(t *testing.T) { + t.Run("invalid body", func(t *testing.T) { + hcl := `invalid{hcl` + + _, err := ParseWithConfig(&ParseConfig{ + Path: "input.hcl", + Body: []byte(hcl), + ArgVars: []string{}, + AllowFS: true, + }) + require.Error(t, err) + }) + + t.Run("invalid vars file", func(t *testing.T) { + tmp, err := ioutil.TempFile("", "nomad-jobspec2-") + require.NoError(t, err) + defer os.Remove(tmp.Name()) + + vars := `invalid{hcl` + _, err = tmp.Write([]byte(vars)) + require.NoError(t, err) + + hcl := ` +variables { + region_var = "default" +} +job "example" { + datacenters = [for s in ["dc1", "dc2"] : upper(s)] + region = var.region_var +} +` + + _, err = ParseWithConfig(&ParseConfig{ + Path: "input.hcl", + Body: []byte(hcl), + VarFiles: []string{tmp.Name()}, + ArgVars: []string{}, + AllowFS: true, + }) + require.Error(t, err) + }) +} + func TestParse_InvalidScalingSyntax(t *testing.T) { cases := []struct { name string From 74486d86fbaa2357a71b998352714cf41ea7ee52 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 2 Feb 2022 13:26:05 -0500 Subject: [PATCH 15/19] scheduler: prevent panic in spread iterator during alloc stop The spread iterator can panic when processing an evaluation, resulting in an unrecoverable state in the cluster. Whenever a panicked server restarts and quorum is restored, the next server to dequeue the evaluation will panic. To trigger this state: * The job must have `max_parallel = 0` and a `canary >= 1`. * The job must not have a `spread` block. * The job must have a previous version. * The previous version must have a `spread` block and at least one failed allocation. In this scenario, the desired changes include `(place 1+) (stop 1+), (ignore n) (canary 1)`. Before the scheduler can place the canary allocation, it tries to find out which allocations can be stopped. This passes back through the stack so that we can determine previous-node penalties, etc. We call `SetJob` on the stack with the previous version of the job, which will include assessing the `spread` block (even though the results are unused). The task group spread info state from that pass through the spread iterator is not reset when we call `SetJob` again. When the new job version iterates over the `groupPropertySets`, it will get an empty `spreadAttributeMap`, resulting in an unexpected nil pointer dereference. This changeset resets the spread iterator internal state when setting the job, logging with a bypass around the bug in case we hit similar cases, and a test that panics the scheduler without the patch. --- .changelog/12039.txt | 3 ++ scheduler/spread.go | 15 +++++++ scheduler/spread_test.go | 95 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 113 insertions(+) create mode 100644 .changelog/12039.txt diff --git a/.changelog/12039.txt b/.changelog/12039.txt new file mode 100644 index 000000000..d1c12a485 --- /dev/null +++ b/.changelog/12039.txt @@ -0,0 +1,3 @@ +```release-note:security +Prevent panic in spread iterator during allocation stop. [CVE-2022-24684](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-24684) +``` diff --git a/scheduler/spread.go b/scheduler/spread.go index 363701fa4..842251c28 100644 --- a/scheduler/spread.go +++ b/scheduler/spread.go @@ -71,6 +71,12 @@ func (iter *SpreadIterator) SetJob(job *structs.Job) { if job.Spreads != nil { iter.jobSpreads = job.Spreads } + + // reset group spread/property so that when we temporarily SetJob + // to an older version to calculate stops we don't leak old + // versions of spread/properties to the new job version + iter.tgSpreadInfo = make(map[string]spreadAttributeMap) + iter.groupPropertySets = make(map[string][]*propertySet) } func (iter *SpreadIterator) SetTaskGroup(tg *structs.TaskGroup) { @@ -134,6 +140,15 @@ func (iter *SpreadIterator) Next() *RankedNode { spreadAttributeMap := iter.tgSpreadInfo[tgName] spreadDetails := spreadAttributeMap[pset.targetAttribute] + if spreadDetails == nil { + iter.ctx.Logger().Named("spread").Error( + "error reading spread attribute map for task group", + "task_group", tgName, + "target", pset.targetAttribute, + ) + continue + } + if len(spreadDetails.desiredCounts) == 0 { // When desired counts map is empty the user didn't specify any targets // Use even spreading scoring algorithm for this scenario diff --git a/scheduler/spread_test.go b/scheduler/spread_test.go index 04040acb6..75de56699 100644 --- a/scheduler/spread_test.go +++ b/scheduler/spread_test.go @@ -9,6 +9,7 @@ import ( "fmt" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -811,3 +812,97 @@ func validateEqualSpread(h *Harness) error { } return fmt.Errorf("expected even distributon of allocs to racks, but got:\n%+v", countSet) } + +func TestSpreadPanicDowngrade(t *testing.T) { + + h := NewHarness(t) + + nodes := []*structs.Node{} + for i := 0; i < 5; i++ { + node := mock.Node() + nodes = append(nodes, node) + err := h.State.UpsertNode(structs.MsgTypeTestSetup, + h.NextIndex(), node) + require.NoError(t, err) + } + + // job version 1 + // max_parallel = 0, canary = 1, spread != nil, 1 failed alloc + + job1 := mock.Job() + job1.Spreads = []*structs.Spread{ + { + Attribute: "${node.unique.name}", + Weight: 50, + SpreadTarget: []*structs.SpreadTarget{}, + }, + } + job1.Update = structs.UpdateStrategy{ + Stagger: time.Duration(30 * time.Second), + MaxParallel: 0, + } + job1.Status = structs.JobStatusRunning + job1.TaskGroups[0].Count = 4 + job1.TaskGroups[0].Update = &structs.UpdateStrategy{ + Stagger: time.Duration(30 * time.Second), + MaxParallel: 1, + HealthCheck: "checks", + MinHealthyTime: time.Duration(30 * time.Second), + HealthyDeadline: time.Duration(9 * time.Minute), + ProgressDeadline: time.Duration(10 * time.Minute), + AutoRevert: true, + Canary: 1, + } + + job1.Version = 1 + job1.TaskGroups[0].Count = 5 + err := h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job1) + require.NoError(t, err) + + allocs := []*structs.Allocation{} + for i := 0; i < 4; i++ { + alloc := mock.Alloc() + alloc.Job = job1 + alloc.JobID = job1.ID + alloc.NodeID = nodes[i].ID + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + Timestamp: time.Now(), + Canary: false, + ModifyIndex: h.NextIndex(), + } + if i == 0 { + alloc.DeploymentStatus.Canary = true + } + if i == 1 { + alloc.ClientStatus = structs.AllocClientStatusFailed + } + allocs = append(allocs, alloc) + } + err = h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs) + + // job version 2 + // max_parallel = 0, canary = 1, spread == nil + + job2 := job1.Copy() + job2.Version = 2 + job2.Spreads = nil + err = h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job2) + require.NoError(t, err) + + eval := &structs.Evaluation{ + Namespace: job2.Namespace, + ID: uuid.Generate(), + Priority: job2.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job2.ID, + Status: structs.EvalStatusPending, + } + err = h.State.UpsertEvals(structs.MsgTypeTestSetup, + h.NextIndex(), []*structs.Evaluation{eval}) + require.NoError(t, err) + + processErr := h.Process(NewServiceScheduler, eval) + require.NoError(t, processErr, "failed to process eval") + require.Len(t, h.Plans, 1) +} From 9476c75c579a56a40dbeab341dd19fcfa7c24fa2 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 9 Feb 2022 19:59:37 -0500 Subject: [PATCH 16/19] docs: add 1.2.6 to changelog --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d97ef019..8ddee2c05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +## 1.2.6 (February 9, 2022) + +SECURITY: + +* Add ACL requirement and HCL validation to the job parse API endpoint to prevent excessive CPU usage. [CVE-2022-24685](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-24685) [[GH-12038](https://github.com/hashicorp/nomad/issues/12038)] +* Fix race condition in use of go-getter that could cause a client agent to download the wrong artifact into the wrong destination. [CVE-2022-24686](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-24686) [[GH-12036](https://github.com/hashicorp/nomad/issues/12036)] +* Prevent panic in spread iterator during allocation stop. [CVE-2022-24684](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-24684) [[GH-12039](https://github.com/hashicorp/nomad/issues/12039)] +* Resolve symlinks to prevent unauthorized access to files outside the allocation directory. [CVE-2022-24683](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-24683) [[GH-12037](https://github.com/hashicorp/nomad/issues/12037)] + ## 1.2.5 (February 1, 2022) BUG FIXES: From 3b0e6ae029a17a9edfab28fc242d9dcd1ee11ec4 Mon Sep 17 00:00:00 2001 From: Nomad Release bot Date: Thu, 10 Feb 2022 02:47:03 +0000 Subject: [PATCH 17/19] Generate files for 1.2.6 release --- version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version/version.go b/version/version.go index 00097a920..c30060303 100644 --- a/version/version.go +++ b/version/version.go @@ -11,7 +11,7 @@ var ( GitDescribe string // The main version number that is being run at the moment. - Version = "1.2.5" + Version = "1.2.6" // A pre-release marker for the version. If this is "" (empty string) // then it means that it is a final release. Otherwise, this is a pre-release From e83ef0a0081c079db46d7a4de3ebac550edcfb5b Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 10 Feb 2022 14:56:11 -0500 Subject: [PATCH 19/19] prepare for next release --- GNUmakefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/GNUmakefile b/GNUmakefile index 6b0197982..1a0c34638 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -39,7 +39,7 @@ PROTO_COMPARE_TAG ?= v1.0.3$(if $(findstring ent,$(GO_TAGS)),+ent,) # LAST_RELEASE is the git sha of the latest release corresponding to this branch. main should have the latest # published release, but backport branches should point to the parent tag (e.g. 1.0.8 in release-1.0.9 after 1.1.0 is cut). -LAST_RELEASE ?= v1.2.5 +LAST_RELEASE ?= v1.2.6 default: help