open-nomad/nomad/drainer/drainer.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

433 lines
14 KiB
Go
Raw Normal View History

2018-03-08 23:08:23 +00:00
package drainer
2018-03-02 00:37:19 +00:00
import (
"context"
"sync"
2018-03-03 01:15:38 +00:00
"time"
2018-03-02 00:37:19 +00:00
2018-09-15 23:23:13 +00:00
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper/pointer"
2018-03-06 22:37:37 +00:00
"github.com/hashicorp/nomad/helper/uuid"
2018-03-02 00:37:19 +00:00
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"golang.org/x/time/rate"
)
2018-03-03 01:15:38 +00:00
var (
// stateReadErrorDelay is the delay to apply before retrying reading state
// when there is an error
stateReadErrorDelay = 1 * time.Second
)
2018-03-02 00:37:19 +00:00
const (
// LimitStateQueriesPerSecond is the number of state queries allowed per
// second
LimitStateQueriesPerSecond = 100.0
2018-03-06 22:37:37 +00:00
// BatchUpdateInterval is how long we wait to batch updates
BatchUpdateInterval = 1 * time.Second
// NodeDeadlineCoalesceWindow is the duration in which deadlining nodes will
// be coalesced together
NodeDeadlineCoalesceWindow = 5 * time.Second
2018-05-11 00:22:06 +00:00
// NodeDrainEventComplete is used to indicate that the node drain is
// finished.
NodeDrainEventComplete = "Node drain complete"
// NodeDrainEventDetailDeadlined is the key to use when the drain is
// complete because a deadline. The acceptable values are "true" and "false"
NodeDrainEventDetailDeadlined = "deadline_reached"
2018-03-02 00:37:19 +00:00
)
// RaftApplier contains methods for applying the raft requests required by the
// NodeDrainer.
type RaftApplier interface {
2018-03-06 22:37:37 +00:00
AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error)
2018-05-11 00:22:06 +00:00
NodesDrainComplete(nodes []string, event *structs.NodeEvent) (uint64, error)
2018-03-02 00:37:19 +00:00
}
2018-03-07 23:42:17 +00:00
// NodeTracker is the interface to notify an object that is tracking draining
// nodes of changes
2018-03-03 01:15:38 +00:00
type NodeTracker interface {
2018-03-07 23:42:17 +00:00
// TrackedNodes returns all the nodes that are currently tracked as
// draining.
2018-03-06 18:12:17 +00:00
TrackedNodes() map[string]*structs.Node
2018-03-07 23:42:17 +00:00
// Remove removes a node from the draining set.
2018-03-03 01:15:38 +00:00
Remove(nodeID string)
2018-03-07 23:42:17 +00:00
// Update either updates the specification of a draining node or tracks the
// node as draining.
2018-03-03 01:15:38 +00:00
Update(node *structs.Node)
}
2018-03-07 23:42:17 +00:00
// DrainingJobWatcherFactory returns a new DrainingJobWatcher
2018-09-15 23:23:13 +00:00
type DrainingJobWatcherFactory func(context.Context, *rate.Limiter, *state.StateStore, log.Logger) DrainingJobWatcher
2018-03-07 23:42:17 +00:00
// DrainingNodeWatcherFactory returns a new DrainingNodeWatcher
2018-09-15 23:23:13 +00:00
type DrainingNodeWatcherFactory func(context.Context, *rate.Limiter, *state.StateStore, log.Logger, NodeTracker) DrainingNodeWatcher
2018-03-07 23:42:17 +00:00
// DrainDeadlineNotifierFactory returns a new DrainDeadlineNotifier
2018-03-02 00:37:19 +00:00
type DrainDeadlineNotifierFactory func(context.Context) DrainDeadlineNotifier
2018-03-07 23:42:17 +00:00
// GetDrainingJobWatcher returns a draining job watcher
2018-09-15 23:23:13 +00:00
func GetDrainingJobWatcher(ctx context.Context, limiter *rate.Limiter, state *state.StateStore, logger log.Logger) DrainingJobWatcher {
2018-03-06 22:37:37 +00:00
return NewDrainingJobWatcher(ctx, limiter, state, logger)
}
2018-03-07 23:42:17 +00:00
// GetDeadlineNotifier returns a node deadline notifier with default coalescing.
2018-03-06 22:37:37 +00:00
func GetDeadlineNotifier(ctx context.Context) DrainDeadlineNotifier {
return NewDeadlineHeap(ctx, NodeDeadlineCoalesceWindow)
}
2018-03-07 23:42:17 +00:00
// GetNodeWatcherFactory returns a DrainingNodeWatcherFactory
2018-03-06 22:37:37 +00:00
func GetNodeWatcherFactory() DrainingNodeWatcherFactory {
2018-09-15 23:23:13 +00:00
return func(ctx context.Context, limiter *rate.Limiter, state *state.StateStore, logger log.Logger, tracker NodeTracker) DrainingNodeWatcher {
2018-03-06 22:37:37 +00:00
return NewNodeDrainWatcher(ctx, limiter, state, logger, tracker)
}
}
2018-03-07 23:42:17 +00:00
// allocMigrateBatcher is used to batch allocation updates.
2018-03-06 22:37:37 +00:00
type allocMigrateBatcher struct {
// updates holds pending client status updates for allocations
updates []*structs.Allocation
// updateFuture is used to wait for the pending batch update
// to complete. This may be nil if no batch is pending.
updateFuture *structs.BatchFuture
// updateTimer is the timer that will trigger the next batch
// update, and may be nil if there is no batch pending.
updateTimer *time.Timer
batchWindow time.Duration
// synchronizes access to the updates list, the future and the timer.
sync.Mutex
}
2018-03-07 23:42:17 +00:00
// NodeDrainerConfig is used to configure a new node drainer.
2018-03-02 00:37:19 +00:00
type NodeDrainerConfig struct {
2018-09-15 23:23:13 +00:00
Logger log.Logger
2018-03-07 23:42:17 +00:00
Raft RaftApplier
JobFactory DrainingJobWatcherFactory
NodeFactory DrainingNodeWatcherFactory
DrainDeadlineFactory DrainDeadlineNotifierFactory
// StateQueriesPerSecond configures the query limit against the state store
// that is allowed by the node drainer.
2018-03-02 00:37:19 +00:00
StateQueriesPerSecond float64
2018-03-07 23:42:17 +00:00
// BatchUpdateInterval is the interval in which allocation updates are
// batched.
BatchUpdateInterval time.Duration
2018-03-02 00:37:19 +00:00
}
2018-03-08 21:25:09 +00:00
// NodeDrainer is used to orchestrate migrating allocations off of draining
// nodes.
2018-03-02 00:37:19 +00:00
type NodeDrainer struct {
enabled bool
2018-09-15 23:23:13 +00:00
logger log.Logger
2018-03-02 00:37:19 +00:00
// nodes is the set of draining nodes
nodes map[string]*drainingNode
2018-03-08 00:51:57 +00:00
// nodeWatcher watches for nodes to transition in and out of drain state.
2018-03-02 00:37:19 +00:00
nodeWatcher DrainingNodeWatcher
nodeFactory DrainingNodeWatcherFactory
2018-03-07 23:42:17 +00:00
// jobWatcher watches draining jobs and emits desired drains and notifies
// when migrations take place.
2018-03-02 00:37:19 +00:00
jobWatcher DrainingJobWatcher
jobFactory DrainingJobWatcherFactory
2018-03-07 23:42:17 +00:00
// deadlineNotifier notifies when nodes reach their drain deadline.
2018-03-02 00:37:19 +00:00
deadlineNotifier DrainDeadlineNotifier
deadlineNotifierFactory DrainDeadlineNotifierFactory
// state is the state that is watched for state changes.
state *state.StateStore
// queryLimiter is used to limit the rate of blocking queries
queryLimiter *rate.Limiter
// raft is a shim around the raft messages necessary for draining
raft RaftApplier
2018-03-06 22:37:37 +00:00
// batcher is used to batch alloc migrations.
batcher allocMigrateBatcher
2018-03-02 00:37:19 +00:00
// ctx and exitFn are used to cancel the watcher
ctx context.Context
exitFn context.CancelFunc
l sync.RWMutex
}
2018-03-07 23:42:17 +00:00
// NewNodeDrainer returns a new new node drainer. The node drainer is
// responsible for marking allocations on draining nodes with a desired
2018-03-08 00:51:57 +00:00
// migration transition, updating the drain strategy on nodes when they are
2018-03-07 23:42:17 +00:00
// complete and creating evaluations for the system to react to these changes.
2018-03-02 00:37:19 +00:00
func NewNodeDrainer(c *NodeDrainerConfig) *NodeDrainer {
return &NodeDrainer{
raft: c.Raft,
2018-09-15 23:23:13 +00:00
logger: c.Logger.Named("drain"),
2018-03-02 00:37:19 +00:00
jobFactory: c.JobFactory,
nodeFactory: c.NodeFactory,
deadlineNotifierFactory: c.DrainDeadlineFactory,
queryLimiter: rate.NewLimiter(rate.Limit(c.StateQueriesPerSecond), 100),
2018-03-06 22:37:37 +00:00
batcher: allocMigrateBatcher{
batchWindow: c.BatchUpdateInterval,
},
2018-03-02 00:37:19 +00:00
}
}
// SetEnabled will start or stop the node draining goroutine depending on the
// enabled boolean.
func (n *NodeDrainer) SetEnabled(enabled bool, state *state.StateStore) {
n.l.Lock()
defer n.l.Unlock()
2018-03-08 21:25:09 +00:00
// If we are starting now or have a new state, init state and start the
// run loop
2018-03-02 00:37:19 +00:00
n.enabled = enabled
2018-03-08 21:25:09 +00:00
if enabled {
n.flush(state)
2018-03-07 22:57:35 +00:00
go n.run(n.ctx)
2018-03-08 21:25:09 +00:00
} else if !enabled && n.exitFn != nil {
n.exitFn()
2018-03-02 00:37:19 +00:00
}
}
// flush is used to clear the state of the watcher
2018-03-08 21:25:09 +00:00
func (n *NodeDrainer) flush(state *state.StateStore) {
// Cancel anything that may be running.
2018-03-02 00:37:19 +00:00
if n.exitFn != nil {
n.exitFn()
}
2018-03-08 21:25:09 +00:00
// Store the new state
if state != nil {
n.state = state
}
2018-03-02 00:37:19 +00:00
n.ctx, n.exitFn = context.WithCancel(context.Background())
2018-03-06 18:12:17 +00:00
n.jobWatcher = n.jobFactory(n.ctx, n.queryLimiter, n.state, n.logger)
2018-03-03 01:15:38 +00:00
n.nodeWatcher = n.nodeFactory(n.ctx, n.queryLimiter, n.state, n.logger, n)
2018-03-02 00:37:19 +00:00
n.deadlineNotifier = n.deadlineNotifierFactory(n.ctx)
n.nodes = make(map[string]*drainingNode, 32)
}
2018-03-07 23:42:17 +00:00
// run is a long lived event handler that receives changes from the relevant
// watchers and takes action based on them.
2018-03-02 00:37:19 +00:00
func (n *NodeDrainer) run(ctx context.Context) {
for {
select {
case <-n.ctx.Done():
return
case nodes := <-n.deadlineNotifier.NextBatch():
n.handleDeadlinedNodes(nodes)
2018-03-06 22:37:37 +00:00
case req := <-n.jobWatcher.Drain():
n.handleJobAllocDrain(req)
case allocs := <-n.jobWatcher.Migrated():
n.handleMigratedAllocs(allocs)
2018-03-02 00:37:19 +00:00
}
}
}
2018-03-07 23:42:17 +00:00
// handleDeadlinedNodes handles a set of nodes reaching their drain deadline.
// The handler detects the remaining allocations on the nodes and immediately
// marks them for migration.
2018-03-02 23:19:55 +00:00
func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) {
2018-03-06 22:37:37 +00:00
// Retrieve the set of allocations that will be force stopped.
var forceStop []*structs.Allocation
n.l.RLock()
2018-03-06 22:37:37 +00:00
for _, node := range nodes {
draining, ok := n.nodes[node]
if !ok {
2018-09-15 23:23:13 +00:00
n.logger.Debug("skipping untracked deadlined node", "node_id", node)
2018-03-06 22:37:37 +00:00
continue
}
allocs, err := draining.RemainingAllocs()
2018-03-06 22:37:37 +00:00
if err != nil {
2019-08-16 13:03:42 +00:00
n.logger.Error("failed to retrieve allocs on deadlined node", "node_id", node, "error", err)
2018-03-06 22:37:37 +00:00
continue
}
2018-09-15 23:23:13 +00:00
n.logger.Debug("node deadlined causing allocs to be force stopped", "node_id", node, "num_allocs", len(allocs))
2018-03-06 22:37:37 +00:00
forceStop = append(forceStop, allocs...)
}
n.l.RUnlock()
n.batchDrainAllocs(forceStop)
2018-05-11 00:22:06 +00:00
// Create the node event
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemDrain).
SetMessage(NodeDrainEventComplete).
AddDetail(NodeDrainEventDetailDeadlined, "true")
2018-04-10 22:02:52 +00:00
// Submit the node transitions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(defaultMaxIdsPerTxn, nodes) {
2018-05-11 00:22:06 +00:00
if _, err := n.raft.NodesDrainComplete(nodes, event); err != nil {
n.logger.Error("failed to unset drain for nodes", "error", err)
}
}
2018-03-06 22:37:37 +00:00
}
2018-03-07 23:42:17 +00:00
// handleJobAllocDrain handles marking a set of allocations as having a desired
2018-03-08 00:51:57 +00:00
// transition to drain. The handler blocks till the changes to the allocation
// have occurred.
2018-03-06 22:37:37 +00:00
func (n *NodeDrainer) handleJobAllocDrain(req *DrainRequest) {
index, err := n.batchDrainAllocs(req.Allocs)
req.Resp.Respond(index, err)
2018-03-02 00:37:19 +00:00
}
2018-03-07 23:42:17 +00:00
// handleMigratedAllocs checks to see if any nodes can be considered done
// draining based on the set of allocations that have migrated because of an
// ongoing drain for a job.
2018-03-06 22:37:37 +00:00
func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) {
// Determine the set of nodes that were effected
nodes := make(map[string]struct{})
for _, alloc := range allocs {
nodes[alloc.NodeID] = struct{}{}
}
var done []string
var remainingAllocs []*structs.Allocation
2018-03-06 22:37:37 +00:00
// For each node, check if it is now done
n.l.RLock()
for node := range nodes {
draining, ok := n.nodes[node]
if !ok {
continue
}
isDone, err := draining.IsDone()
if err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("error checking if node is done draining", "node_id", node, "error", err)
2018-03-06 22:37:37 +00:00
continue
}
if !isDone {
continue
}
2018-03-02 00:37:19 +00:00
2018-03-06 22:37:37 +00:00
done = append(done, node)
remaining, err := draining.RemainingAllocs()
if err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("node is done draining but encountered an error getting remaining allocs", "node_id", node, "error", err)
continue
}
remainingAllocs = append(remainingAllocs, remaining...)
2018-03-06 22:37:37 +00:00
}
n.l.RUnlock()
// Stop any running system jobs on otherwise done nodes
if len(remainingAllocs) > 0 {
future := structs.NewBatchFuture()
n.drainAllocs(future, remainingAllocs)
if err := future.Wait(); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("failed to drain remaining allocs from done nodes", "num_allocs", len(remainingAllocs), "error", err)
}
}
2018-05-11 00:22:06 +00:00
// Create the node event
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemDrain).
SetMessage(NodeDrainEventComplete)
2018-04-10 22:02:52 +00:00
// Submit the node transitions in a sharded form to ensure a reasonable
2018-03-10 00:10:38 +00:00
// Raft transaction size.
for _, nodes := range partitionIds(defaultMaxIdsPerTxn, done) {
2018-05-11 00:22:06 +00:00
if _, err := n.raft.NodesDrainComplete(nodes, event); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("failed to unset drain for nodes", "error", err)
2018-03-10 00:10:38 +00:00
}
2018-03-06 22:37:37 +00:00
}
2018-03-02 00:37:19 +00:00
}
2018-03-07 23:42:17 +00:00
// batchDrainAllocs is used to batch the draining of allocations. It will block
// until the batch is complete.
2018-03-06 22:37:37 +00:00
func (n *NodeDrainer) batchDrainAllocs(allocs []*structs.Allocation) (uint64, error) {
// Add this to the batch
n.batcher.Lock()
n.batcher.updates = append(n.batcher.updates, allocs...)
// Start a new batch if none
future := n.batcher.updateFuture
if future == nil {
future = structs.NewBatchFuture()
n.batcher.updateFuture = future
n.batcher.updateTimer = time.AfterFunc(n.batcher.batchWindow, func() {
// Get the pending updates
n.batcher.Lock()
updates := n.batcher.updates
future := n.batcher.updateFuture
n.batcher.updates = nil
n.batcher.updateFuture = nil
n.batcher.updateTimer = nil
n.batcher.Unlock()
// Perform the batch update
n.drainAllocs(future, updates)
})
}
n.batcher.Unlock()
if err := future.Wait(); err != nil {
return 0, err
}
return future.Index(), nil
2018-03-02 00:37:19 +00:00
}
2018-03-08 00:51:57 +00:00
// drainAllocs is a non batch, marking of the desired transition to migrate for
2018-03-07 23:42:17 +00:00
// the set of allocations. It will also create the necessary evaluations for the
// affected jobs.
2018-03-06 22:37:37 +00:00
func (n *NodeDrainer) drainAllocs(future *structs.BatchFuture, allocs []*structs.Allocation) {
2018-03-08 00:51:57 +00:00
// Compute the effected jobs and make the transition map
jobs := make(map[structs.NamespacedID]*structs.Allocation, 4)
2018-04-10 22:02:52 +00:00
transitions := make(map[string]*structs.DesiredTransition, len(allocs))
2018-03-06 22:37:37 +00:00
for _, alloc := range allocs {
2018-04-10 22:02:52 +00:00
transitions[alloc.ID] = &structs.DesiredTransition{
Migrate: pointer.Of(true),
2018-03-06 22:37:37 +00:00
}
jobs[alloc.JobNamespacedID()] = alloc
2018-03-06 22:37:37 +00:00
}
evals := make([]*structs.Evaluation, 0, len(jobs))
now := time.Now().UTC().UnixNano()
for _, alloc := range jobs {
2018-03-06 22:37:37 +00:00
evals = append(evals, &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
Priority: alloc.Job.Priority,
Type: alloc.Job.Type,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: alloc.JobID,
2018-03-06 22:37:37 +00:00
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
2018-03-06 22:37:37 +00:00
})
}
// Commit this update via Raft
2018-03-10 00:10:38 +00:00
var finalIndex uint64
2018-04-10 22:02:52 +00:00
for _, u := range partitionAllocDrain(defaultMaxIdsPerTxn, transitions, evals) {
index, err := n.raft.AllocUpdateDesiredTransition(u.Transitions, u.Evals)
2018-03-10 00:10:38 +00:00
if err != nil {
future.Respond(0, err)
return
2018-03-10 00:10:38 +00:00
}
finalIndex = index
}
future.Respond(finalIndex, nil)
2018-03-02 00:37:19 +00:00
}