3825f7cf1f
This PR fixes a bug in which allocations from stopped batch jobs could not be garbage collected.
424 lines
12 KiB
Go
424 lines
12 KiB
Go
package nomad
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"time"
|
|
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/nomad/nomad/state"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/scheduler"
|
|
)
|
|
|
|
var (
|
|
// maxIdsPerReap is the maximum number of evals and allocations to reap in a
|
|
// single Raft transaction. This is to ensure that the Raft message does not
|
|
// become too large.
|
|
maxIdsPerReap = (1024 * 256) / 36 // 0.25 MB of ids.
|
|
)
|
|
|
|
// CoreScheduler is a special "scheduler" that is registered
|
|
// as "_core". It is used to run various administrative work
|
|
// across the cluster.
|
|
type CoreScheduler struct {
|
|
srv *Server
|
|
snap *state.StateSnapshot
|
|
}
|
|
|
|
// NewCoreScheduler is used to return a new system scheduler instance
|
|
func NewCoreScheduler(srv *Server, snap *state.StateSnapshot) scheduler.Scheduler {
|
|
s := &CoreScheduler{
|
|
srv: srv,
|
|
snap: snap,
|
|
}
|
|
return s
|
|
}
|
|
|
|
// Process is used to implement the scheduler.Scheduler interface
|
|
func (c *CoreScheduler) Process(eval *structs.Evaluation) error {
|
|
switch eval.JobID {
|
|
case structs.CoreJobEvalGC:
|
|
return c.evalGC(eval)
|
|
case structs.CoreJobNodeGC:
|
|
return c.nodeGC(eval)
|
|
case structs.CoreJobJobGC:
|
|
return c.jobGC(eval)
|
|
case structs.CoreJobForceGC:
|
|
return c.forceGC(eval)
|
|
default:
|
|
return fmt.Errorf("core scheduler cannot handle job '%s'", eval.JobID)
|
|
}
|
|
}
|
|
|
|
// forceGC is used to garbage collect all eligible objects.
|
|
func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error {
|
|
if err := c.jobGC(eval); err != nil {
|
|
return err
|
|
}
|
|
if err := c.evalGC(eval); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Node GC must occur after the others to ensure the allocations are
|
|
// cleared.
|
|
return c.nodeGC(eval)
|
|
}
|
|
|
|
// jobGC is used to garbage collect eligible jobs.
|
|
func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
|
|
// Get all the jobs eligible for garbage collection.
|
|
ws := memdb.NewWatchSet()
|
|
iter, err := c.snap.JobsByGC(ws, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var oldThreshold uint64
|
|
if eval.JobID == structs.CoreJobForceGC {
|
|
// The GC was forced, so set the threshold to its maximum so everything
|
|
// will GC.
|
|
oldThreshold = math.MaxUint64
|
|
c.srv.logger.Println("[DEBUG] sched.core: forced job GC")
|
|
} else {
|
|
// Get the time table to calculate GC cutoffs.
|
|
tt := c.srv.fsm.TimeTable()
|
|
cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold)
|
|
oldThreshold = tt.NearestIndex(cutoff)
|
|
c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)",
|
|
oldThreshold, c.srv.config.JobGCThreshold)
|
|
}
|
|
|
|
// Collect the allocations, evaluations and jobs to GC
|
|
var gcAlloc, gcEval, gcJob []string
|
|
|
|
OUTER:
|
|
for i := iter.Next(); i != nil; i = iter.Next() {
|
|
job := i.(*structs.Job)
|
|
|
|
// Ignore new jobs.
|
|
if job.CreateIndex > oldThreshold {
|
|
continue
|
|
}
|
|
|
|
ws := memdb.NewWatchSet()
|
|
evals, err := c.snap.EvalsByJob(ws, job.ID)
|
|
if err != nil {
|
|
c.srv.logger.Printf("[ERR] sched.core: failed to get evals for job %s: %v", job.ID, err)
|
|
continue
|
|
}
|
|
|
|
allEvalsGC := true
|
|
var jobAlloc, jobEval []string
|
|
for _, eval := range evals {
|
|
gc, allocs, err := c.gcEval(eval, oldThreshold, true)
|
|
if err != nil {
|
|
continue OUTER
|
|
}
|
|
|
|
if gc {
|
|
jobEval = append(jobEval, eval.ID)
|
|
jobAlloc = append(jobAlloc, allocs...)
|
|
} else {
|
|
allEvalsGC = false
|
|
break
|
|
}
|
|
}
|
|
|
|
// Job is eligible for garbage collection
|
|
if allEvalsGC {
|
|
gcJob = append(gcJob, job.ID)
|
|
gcAlloc = append(gcAlloc, jobAlloc...)
|
|
gcEval = append(gcEval, jobEval...)
|
|
}
|
|
}
|
|
|
|
// Fast-path the nothing case
|
|
if len(gcEval) == 0 && len(gcAlloc) == 0 && len(gcJob) == 0 {
|
|
return nil
|
|
}
|
|
c.srv.logger.Printf("[DEBUG] sched.core: job GC: %d jobs, %d evaluations, %d allocs eligible",
|
|
len(gcJob), len(gcEval), len(gcAlloc))
|
|
|
|
// Reap the evals and allocs
|
|
if err := c.evalReap(gcEval, gcAlloc); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Call to the leader to deregister the jobs.
|
|
for _, job := range gcJob {
|
|
req := structs.JobDeregisterRequest{
|
|
JobID: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: c.srv.config.Region,
|
|
},
|
|
}
|
|
var resp structs.JobDeregisterResponse
|
|
if err := c.srv.RPC("Job.Deregister", &req, &resp); err != nil {
|
|
c.srv.logger.Printf("[ERR] sched.core: job deregister failed: %v", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// evalGC is used to garbage collect old evaluations
|
|
func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
|
|
// Iterate over the evaluations
|
|
ws := memdb.NewWatchSet()
|
|
iter, err := c.snap.Evals(ws)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var oldThreshold uint64
|
|
if eval.JobID == structs.CoreJobForceGC {
|
|
// The GC was forced, so set the threshold to its maximum so everything
|
|
// will GC.
|
|
oldThreshold = math.MaxUint64
|
|
c.srv.logger.Println("[DEBUG] sched.core: forced eval GC")
|
|
} else {
|
|
// Compute the old threshold limit for GC using the FSM
|
|
// time table. This is a rough mapping of a time to the
|
|
// Raft index it belongs to.
|
|
tt := c.srv.fsm.TimeTable()
|
|
cutoff := time.Now().UTC().Add(-1 * c.srv.config.EvalGCThreshold)
|
|
oldThreshold = tt.NearestIndex(cutoff)
|
|
c.srv.logger.Printf("[DEBUG] sched.core: eval GC: scanning before index %d (%v)",
|
|
oldThreshold, c.srv.config.EvalGCThreshold)
|
|
}
|
|
|
|
// Collect the allocations and evaluations to GC
|
|
var gcAlloc, gcEval []string
|
|
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
|
eval := raw.(*structs.Evaluation)
|
|
|
|
// The Evaluation GC should not handle batch jobs since those need to be
|
|
// garbage collected in one shot
|
|
gc, allocs, err := c.gcEval(eval, oldThreshold, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if gc {
|
|
gcEval = append(gcEval, eval.ID)
|
|
}
|
|
gcAlloc = append(gcAlloc, allocs...)
|
|
}
|
|
|
|
// Fast-path the nothing case
|
|
if len(gcEval) == 0 && len(gcAlloc) == 0 {
|
|
return nil
|
|
}
|
|
c.srv.logger.Printf("[DEBUG] sched.core: eval GC: %d evaluations, %d allocs eligible",
|
|
len(gcEval), len(gcAlloc))
|
|
|
|
return c.evalReap(gcEval, gcAlloc)
|
|
}
|
|
|
|
// gcEval returns whether the eval should be garbage collected given a raft
|
|
// threshold index. The eval disqualifies for garbage collection if it or its
|
|
// allocs are not older than the threshold. If the eval should be garbage
|
|
// collected, the associated alloc ids that should also be removed are also
|
|
// returned
|
|
func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, allowBatch bool) (
|
|
bool, []string, error) {
|
|
// Ignore non-terminal and new evaluations
|
|
if !eval.TerminalStatus() || eval.ModifyIndex > thresholdIndex {
|
|
return false, nil, nil
|
|
}
|
|
|
|
// Create a watchset
|
|
ws := memdb.NewWatchSet()
|
|
|
|
// If the eval is from a running "batch" job we don't want to garbage
|
|
// collect its allocations. If there is a long running batch job and its
|
|
// terminal allocations get GC'd the scheduler would re-run the
|
|
// allocations.
|
|
if eval.Type == structs.JobTypeBatch {
|
|
// Check if the job is running
|
|
job, err := c.snap.JobByID(ws, eval.JobID)
|
|
if err != nil {
|
|
return false, nil, err
|
|
}
|
|
|
|
// We don't want to gc anything related to a job which is not dead
|
|
// If the batch job doesn't exist we can GC it regardless of allowBatch
|
|
if job != nil && (!allowBatch || job.Status != structs.JobStatusDead) {
|
|
return false, nil, nil
|
|
}
|
|
}
|
|
|
|
// Get the allocations by eval
|
|
allocs, err := c.snap.AllocsByEval(ws, eval.ID)
|
|
if err != nil {
|
|
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v",
|
|
eval.ID, err)
|
|
return false, nil, err
|
|
}
|
|
|
|
// Scan the allocations to ensure they are terminal and old
|
|
gcEval := true
|
|
var gcAllocIDs []string
|
|
for _, alloc := range allocs {
|
|
if !alloc.TerminalStatus() || alloc.ModifyIndex > thresholdIndex {
|
|
// Can't GC the evaluation since not all of the allocations are
|
|
// terminal
|
|
gcEval = false
|
|
} else {
|
|
// The allocation is eligible to be GC'd
|
|
gcAllocIDs = append(gcAllocIDs, alloc.ID)
|
|
}
|
|
}
|
|
|
|
return gcEval, gcAllocIDs, nil
|
|
}
|
|
|
|
// evalReap contacts the leader and issues a reap on the passed evals and
|
|
// allocs.
|
|
func (c *CoreScheduler) evalReap(evals, allocs []string) error {
|
|
// Call to the leader to issue the reap
|
|
for _, req := range c.partitionReap(evals, allocs) {
|
|
var resp structs.GenericResponse
|
|
if err := c.srv.RPC("Eval.Reap", req, &resp); err != nil {
|
|
c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// partitionReap returns a list of EvalDeleteRequest to make, ensuring a single
|
|
// request does not contain too many allocations and evaluations. This is
|
|
// necessary to ensure that the Raft transaction does not become too large.
|
|
func (c *CoreScheduler) partitionReap(evals, allocs []string) []*structs.EvalDeleteRequest {
|
|
var requests []*structs.EvalDeleteRequest
|
|
submittedEvals, submittedAllocs := 0, 0
|
|
for submittedEvals != len(evals) || submittedAllocs != len(allocs) {
|
|
req := &structs.EvalDeleteRequest{
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: c.srv.config.Region,
|
|
},
|
|
}
|
|
requests = append(requests, req)
|
|
available := maxIdsPerReap
|
|
|
|
// Add the allocs first
|
|
if remaining := len(allocs) - submittedAllocs; remaining > 0 {
|
|
if remaining <= available {
|
|
req.Allocs = allocs[submittedAllocs:]
|
|
available -= remaining
|
|
submittedAllocs += remaining
|
|
} else {
|
|
req.Allocs = allocs[submittedAllocs : submittedAllocs+available]
|
|
submittedAllocs += available
|
|
|
|
// Exhausted space so skip adding evals
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Add the evals
|
|
if remaining := len(evals) - submittedEvals; remaining > 0 {
|
|
if remaining <= available {
|
|
req.Evals = evals[submittedEvals:]
|
|
submittedEvals += remaining
|
|
} else {
|
|
req.Evals = evals[submittedEvals : submittedEvals+available]
|
|
submittedEvals += available
|
|
}
|
|
}
|
|
}
|
|
|
|
return requests
|
|
}
|
|
|
|
// nodeGC is used to garbage collect old nodes
|
|
func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
|
|
// Iterate over the evaluations
|
|
ws := memdb.NewWatchSet()
|
|
iter, err := c.snap.Nodes(ws)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var oldThreshold uint64
|
|
if eval.JobID == structs.CoreJobForceGC {
|
|
// The GC was forced, so set the threshold to its maximum so everything
|
|
// will GC.
|
|
oldThreshold = math.MaxUint64
|
|
c.srv.logger.Println("[DEBUG] sched.core: forced node GC")
|
|
} else {
|
|
// Compute the old threshold limit for GC using the FSM
|
|
// time table. This is a rough mapping of a time to the
|
|
// Raft index it belongs to.
|
|
tt := c.srv.fsm.TimeTable()
|
|
cutoff := time.Now().UTC().Add(-1 * c.srv.config.NodeGCThreshold)
|
|
oldThreshold = tt.NearestIndex(cutoff)
|
|
c.srv.logger.Printf("[DEBUG] sched.core: node GC: scanning before index %d (%v)",
|
|
oldThreshold, c.srv.config.NodeGCThreshold)
|
|
}
|
|
|
|
// Collect the nodes to GC
|
|
var gcNode []string
|
|
OUTER:
|
|
for {
|
|
raw := iter.Next()
|
|
if raw == nil {
|
|
break
|
|
}
|
|
node := raw.(*structs.Node)
|
|
|
|
// Ignore non-terminal and new nodes
|
|
if !node.TerminalStatus() || node.ModifyIndex > oldThreshold {
|
|
continue
|
|
}
|
|
|
|
// Get the allocations by node
|
|
ws := memdb.NewWatchSet()
|
|
allocs, err := c.snap.AllocsByNode(ws, node.ID)
|
|
if err != nil {
|
|
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for node %s: %v",
|
|
eval.ID, err)
|
|
continue
|
|
}
|
|
|
|
// If there are any non-terminal allocations, skip the node. If the node
|
|
// is terminal and the allocations are not, the scheduler may not have
|
|
// run yet to transition the allocs on the node to terminal. We delay
|
|
// GC'ing until this happens.
|
|
for _, alloc := range allocs {
|
|
if !alloc.TerminalStatus() {
|
|
continue OUTER
|
|
}
|
|
}
|
|
|
|
// Node is eligible for garbage collection
|
|
gcNode = append(gcNode, node.ID)
|
|
}
|
|
|
|
// Fast-path the nothing case
|
|
if len(gcNode) == 0 {
|
|
return nil
|
|
}
|
|
c.srv.logger.Printf("[DEBUG] sched.core: node GC: %d nodes eligible", len(gcNode))
|
|
|
|
// Call to the leader to issue the reap
|
|
for _, nodeID := range gcNode {
|
|
req := structs.NodeDeregisterRequest{
|
|
NodeID: nodeID,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: c.srv.config.Region,
|
|
},
|
|
}
|
|
var resp structs.NodeUpdateResponse
|
|
if err := c.srv.RPC("Node.Deregister", &req, &resp); err != nil {
|
|
c.srv.logger.Printf("[ERR] sched.core: node '%s' reap failed: %v", nodeID, err)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|