open-nomad/nomad/core_sched.go
2015-09-07 11:01:29 -07:00

189 lines
4.8 KiB
Go

package nomad
import (
"fmt"
"time"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
)
// CoreScheduler is a special "scheduler" that is registered
// as "_core". It is used to run various administrative work
// across the cluster.
type CoreScheduler struct {
srv *Server
snap *state.StateSnapshot
}
// NewCoreScheduler is used to return a new system scheduler instance
func NewCoreScheduler(srv *Server, snap *state.StateSnapshot) scheduler.Scheduler {
s := &CoreScheduler{
srv: srv,
snap: snap,
}
return s
}
// Process is used to implement the scheduler.Scheduler interface
func (s *CoreScheduler) Process(eval *structs.Evaluation) error {
switch eval.JobID {
case structs.CoreJobEvalGC:
return s.evalGC(eval)
case structs.CoreJobNodeGC:
return s.nodeGC(eval)
default:
return fmt.Errorf("core scheduler cannot handle job '%s'", eval.JobID)
}
}
// evalGC is used to garbage collect old evaluations
func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// Iterate over the evaluations
iter, err := c.snap.Evals()
if err != nil {
return err
}
// Compute the old threshold limit for GC using the FSM
// time table. This is a rough mapping of a time to the
// Raft index it belongs to.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.EvalGCThreshold)
oldThreshold := tt.NearestIndex(cutoff)
c.srv.logger.Printf("[DEBUG] sched.core: eval GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.EvalGCThreshold)
// Collect the allocations and evaluations to GC
var gcAlloc, gcEval []string
OUTER:
for {
raw := iter.Next()
if raw == nil {
break
}
eval := raw.(*structs.Evaluation)
// Ignore non-terminal and new evaluations
if !eval.TerminalStatus() || eval.ModifyIndex > oldThreshold {
continue
}
// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(eval.ID)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v",
eval.ID, err)
continue
}
// Scan the allocations to ensure they are terminal and old
for _, alloc := range allocs {
if !alloc.TerminalStatus() || alloc.ModifyIndex > oldThreshold {
continue OUTER
}
}
// Evaluation is eligible for garbage collection
gcEval = append(gcEval, eval.ID)
for _, alloc := range allocs {
gcAlloc = append(gcAlloc, alloc.ID)
}
}
// Fast-path the nothing case
if len(gcEval) == 0 && len(gcAlloc) == 0 {
return nil
}
c.srv.logger.Printf("[DEBUG] sched.core: eval GC: %d evaluations, %d allocs eligible",
len(gcEval), len(gcAlloc))
// Call to the leader to issue the reap
req := structs.EvalDeleteRequest{
Evals: gcEval,
Allocs: gcAlloc,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
},
}
var resp structs.GenericResponse
if err := c.srv.RPC("Eval.Reap", &req, &resp); err != nil {
c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err)
return err
}
return nil
}
// nodeGC is used to garbage collect old nodes
func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
// Iterate over the evaluations
iter, err := c.snap.Nodes()
if err != nil {
return err
}
// Compute the old threshold limit for GC using the FSM
// time table. This is a rough mapping of a time to the
// Raft index it belongs to.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.NodeGCThreshold)
oldThreshold := tt.NearestIndex(cutoff)
c.srv.logger.Printf("[DEBUG] sched.core: node GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.NodeGCThreshold)
// Collect the nodes to GC
var gcNode []string
for {
raw := iter.Next()
if raw == nil {
break
}
node := raw.(*structs.Node)
// Ignore non-terminal and new nodes
if !node.TerminalStatus() || node.ModifyIndex > oldThreshold {
continue
}
// Get the allocations by node
allocs, err := c.snap.AllocsByNode(node.ID)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for node %s: %v",
eval.ID, err)
continue
}
// If there are any allocations, skip the node
if len(allocs) > 0 {
continue
}
// Node is eligible for garbage collection
gcNode = append(gcNode, node.ID)
}
// Fast-path the nothing case
if len(gcNode) == 0 {
return nil
}
c.srv.logger.Printf("[DEBUG] sched.core: node GC: %d nodes eligible", len(gcNode))
// Call to the leader to issue the reap
for _, nodeID := range gcNode {
req := structs.NodeDeregisterRequest{
NodeID: nodeID,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
},
}
var resp structs.NodeUpdateResponse
if err := c.srv.RPC("Node.Deregister", &req, &resp); err != nil {
c.srv.logger.Printf("[ERR] sched.core: node '%s' reap failed: %v", nodeID, err)
return err
}
}
return nil
}