csi: add empty CSI volume publication GC to scheduled core jobs (#7014)
This changeset adds a new core job `CoreJobCSIVolumePublicationGC` to the leader's loop for scheduling core job evals. Right now this is an empty method body without even a config file stanza. Later changesets will implement the logic of volume publication GC.
This commit is contained in:
parent
6e71baa77d
commit
8673ea5cba
|
@ -191,6 +191,10 @@ type Config struct {
|
|||
// for GC. This gives users some time to view terminal deployments.
|
||||
DeploymentGCThreshold time.Duration
|
||||
|
||||
// CSIVolumePublicationGCInterval is how often we dispatch a job to GC
|
||||
// unclaimed CSI volume publications.
|
||||
CSIVolumePublicationGCInterval time.Duration
|
||||
|
||||
// EvalNackTimeout controls how long we allow a sub-scheduler to
|
||||
// work on an evaluation before we consider it failed and Nack it.
|
||||
// This allows that evaluation to be handed to another sub-scheduler
|
||||
|
@ -377,6 +381,7 @@ func DefaultConfig() *Config {
|
|||
NodeGCThreshold: 24 * time.Hour,
|
||||
DeploymentGCInterval: 5 * time.Minute,
|
||||
DeploymentGCThreshold: 1 * time.Hour,
|
||||
CSIVolumePublicationGCInterval: 60 * time.Second,
|
||||
EvalNackTimeout: 60 * time.Second,
|
||||
EvalDeliveryLimit: 3,
|
||||
EvalNackInitialReenqueueDelay: 1 * time.Second,
|
||||
|
|
|
@ -50,6 +50,8 @@ func (c *CoreScheduler) Process(eval *structs.Evaluation) error {
|
|||
return c.jobGC(eval)
|
||||
case structs.CoreJobDeploymentGC:
|
||||
return c.deploymentGC(eval)
|
||||
case structs.CoreJobCSIVolumePublicationGC:
|
||||
return c.csiVolumePublicationGC(eval)
|
||||
case structs.CoreJobForceGC:
|
||||
return c.forceGC(eval)
|
||||
default:
|
||||
|
@ -703,3 +705,10 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time,
|
|||
|
||||
return timeDiff > interval.Nanoseconds()
|
||||
}
|
||||
|
||||
// csiVolumeGC is used to garbage collect CSI volume publications
|
||||
func (c *CoreScheduler) csiVolumePublicationGC(eval *structs.Evaluation) error {
|
||||
// TODO: implement me!
|
||||
c.logger.Trace("garbage collecting unclaimed CSI volume publications")
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1836,6 +1836,37 @@ func TestCoreScheduler_DeploymentGC_Force(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: this is an empty test until CoreScheduler.csiVolumePublicationGC is implemented
|
||||
func TestCoreScheduler_CSIVolumePublicationGC(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s1, cleanupS1 := TestServer(t, nil)
|
||||
defer cleanupS1()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
assert := assert.New(t)
|
||||
|
||||
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
||||
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
||||
|
||||
// TODO: insert volumes for nodes
|
||||
state := s1.fsm.State()
|
||||
|
||||
// Update the time tables to make this work
|
||||
tt := s1.fsm.TimeTable()
|
||||
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.CSIVolumePublicationGCInterval))
|
||||
|
||||
// Create a core scheduler
|
||||
snap, err := state.Snapshot()
|
||||
assert.Nil(err, "Snapshot")
|
||||
core := NewCoreScheduler(s1, snap)
|
||||
|
||||
// Attempt the GC
|
||||
gc := s1.coreJobEval(structs.CoreJobCSIVolumePublicationGC, 2000)
|
||||
assert.Nil(core.Process(gc), "Process GC")
|
||||
|
||||
// TODO: assert state is cleaned up
|
||||
}
|
||||
|
||||
func TestCoreScheduler_PartitionEvalReap(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -519,6 +519,8 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
|
|||
defer jobGC.Stop()
|
||||
deploymentGC := time.NewTicker(s.config.DeploymentGCInterval)
|
||||
defer deploymentGC.Stop()
|
||||
csiVolumePublicationGC := time.NewTicker(s.config.CSIVolumePublicationGCInterval)
|
||||
defer csiVolumePublicationGC.Stop()
|
||||
|
||||
// getLatest grabs the latest index from the state store. It returns true if
|
||||
// the index was retrieved successfully.
|
||||
|
@ -551,6 +553,10 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
|
|||
if index, ok := getLatest(); ok {
|
||||
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobDeploymentGC, index))
|
||||
}
|
||||
case <-csiVolumePublicationGC.C:
|
||||
if index, ok := getLatest(); ok {
|
||||
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobCSIVolumePublicationGC, index))
|
||||
}
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
|
|
|
@ -8694,6 +8694,11 @@ const (
|
|||
// check if they are terminal. If so, we delete these out of the system.
|
||||
CoreJobDeploymentGC = "deployment-gc"
|
||||
|
||||
// CoreJobCSIVolumePublicationGC is use for the garbage collection of CSI
|
||||
// volume publications. We periodically scan volumes to see if no allocs are
|
||||
// claiming them. If so, we unpublish the volume.
|
||||
CoreJobCSIVolumePublicationGC = "csi-volume-publication-gc"
|
||||
|
||||
// CoreJobForceGC is used to force garbage collection of all GCable objects.
|
||||
CoreJobForceGC = "force-gc"
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue