refactor drainer into a subpkg

This commit is contained in:
Michael Schurter 2018-02-26 16:28:10 -08:00
parent 78c7c36e65
commit 0a17076ad2
7 changed files with 176 additions and 78 deletions

View File

@ -102,7 +102,7 @@ func NewDeploymentsWatcher(logger *log.Logger,
// SetEnabled is used to control if the watcher is enabled. The watcher
// should only be enabled on the active leader. When being enabled the state is
// passed in as it is no longer valid once a leader election has taken place.
func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore) error {
func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore) {
w.l.Lock()
defer w.l.Unlock()
@ -120,8 +120,6 @@ func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore) error {
if enabled && !wasEnabled {
go w.watchDeployments(w.ctx)
}
return nil
}
// flush is used to clear the state of the watcher

View File

@ -1,4 +1,4 @@
package nomad
package drainer
import (
"context"
@ -71,21 +71,67 @@ func (s *stopAllocs) add(j *structs.Job, a *structs.Allocation) {
s.jobBatch[jobKey{a.Namespace, a.JobID}] = j
}
// startNodeDrainer should be called in establishLeadership by the leader.
func (s *Server) startNodeDrainer(stopCh chan struct{}) {
state := s.fsm.State()
// RaftApplier contains methods for applying the raft requests required by the
// NodeDrainer.
type RaftApplier interface {
AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) error
NodeDrainComplete(nodeID string) error
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-stopCh:
type nodeDrainerState struct {
enabled bool
state *state.StateStore
}
type NodeDrainer struct {
enabledCh chan nodeDrainerState
raft RaftApplier
logger *log.Logger
}
func NewNodeDrainer(logger *log.Logger, raft RaftApplier) *NodeDrainer {
return &NodeDrainer{
enabledCh: make(chan nodeDrainerState),
raft: raft,
logger: logger,
}
}
func (n *NodeDrainer) SetEnabled(enabled bool, state *state.StateStore) {
n.enabledCh <- nodeDrainerState{enabled, state}
}
//FIXME never exits
func (n *NodeDrainer) Run() {
running := false
var ctx context.Context
cancel := func() {}
for s := range n.enabledCh {
switch {
case s.enabled && running:
// Already running
continue
case !s.enabled && !running:
// Already stopped
continue
case !s.enabled && running:
// Stop running node drainer
cancel()
case <-ctx.Done():
running = false
case s.enabled && !running:
// Start running node drainer
ctx, cancel = context.WithCancel(context.Background())
go n.nodeDrainer(ctx, s.state)
running = true
}
}()
}
}
nodes, nodesIndex, drainingJobs, allocsIndex := initDrainer(s.logger, state)
// nodeDrainer should be called in establishLeadership by the leader.
func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore) {
nodes, nodesIndex, drainingJobs, allocsIndex := initDrainer(n.logger, state)
// Wait for a node's drain deadline to expire
var nextDeadline time.Time
@ -102,12 +148,12 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
deadlineTimer := time.NewTimer(time.Until(nextDeadline))
// Watch for nodes to start or stop draining
nodeWatcher := newNodeWatcher(s.logger, nodes, nodesIndex, state)
nodeWatcher := newNodeWatcher(n.logger, nodes, nodesIndex, state)
go nodeWatcher.run(ctx)
// Watch for drained allocations to be replaced
// Watch for changes in allocs for jobs with allocs on draining nodes
jobWatcher := newJobWatcher(s.logger, drainingJobs, allocsIndex, state)
jobWatcher := newJobWatcher(n.logger, drainingJobs, allocsIndex, state)
go jobWatcher.run(ctx)
for {
@ -116,11 +162,11 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
//possible outcome of this is that an allocation could be
//stopped on a node that recently had its drain cancelled which
//doesn't seem like that bad of a pathological case
s.logger.Printf("[TRACE] nomad.drain: LOOP next deadline: %s (%s)", nextDeadline, time.Until(nextDeadline))
n.logger.Printf("[TRACE] nomad.drain: LOOP next deadline: %s (%s)", nextDeadline, time.Until(nextDeadline))
select {
case nodes = <-nodeWatcher.nodesCh:
// update draining nodes
s.logger.Printf("[TRACE] nomad.drain: running due to node change (%d nodes draining)", len(nodes))
n.logger.Printf("[TRACE] nomad.drain: running due to node change (%d nodes draining)", len(nodes))
// update deadline timer
changed := false
@ -139,7 +185,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
// if changed reset the timer
if changed {
s.logger.Printf("[TRACE] nomad.drain: new node deadline: %s", nextDeadline)
n.logger.Printf("[TRACE] nomad.drain: new node deadline: %s", nextDeadline)
if !deadlineTimer.Stop() {
// timer may have been recv'd in a
// previous loop, so don't block
@ -152,10 +198,10 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
}
case jobs := <-jobWatcher.WaitCh():
s.logger.Printf("[TRACE] nomad.drain: running due to alloc change (%d jobs updated)", len(jobs))
n.logger.Printf("[TRACE] nomad.drain: running due to alloc change (%d jobs updated)", len(jobs))
case when := <-deadlineTimer.C:
// deadline for a node was reached
s.logger.Printf("[TRACE] nomad.drain: running due to deadline reached (at %s)", when)
n.logger.Printf("[TRACE] nomad.drain: running due to deadline reached (at %s)", when)
case <-ctx.Done():
// exit
return
@ -164,15 +210,13 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
// Tracks nodes that are done draining
doneNodes := map[string]*structs.Node{}
//TODO work from a state snapshot? perhaps from a last update
//index? I can't think of why this would be beneficial as this
//entire process runs asynchronously with the fsm/scheduler/etc
// Capture state (statestore and time) to do consistent comparisons
snapshot, err := state.Snapshot()
if err != nil {
//FIXME
panic(err)
}
now := time.Now() // for determing deadlines in a consistent way
now := time.Now()
// job key -> {job, allocs}
// Collect all allocs for all jobs with at least one
@ -227,14 +271,14 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
// allocs left to be drained
if !alloc.TerminalStatus() {
if !allocsLeft {
s.logger.Printf("[TRACE] nomad.drain: node %s has allocs left to drain", nodeID[:6])
n.logger.Printf("[TRACE] nomad.drain: node %s has allocs left to drain", nodeID[:6])
allocsLeft = true
}
}
// Don't bother collecting system/batch jobs for nodes that haven't hit their deadline
if job.Type != structs.JobTypeService && !deadlineReached {
s.logger.Printf("[TRACE] nomad.drain: not draining %s job %s because deadline isn't for %s",
n.logger.Printf("[TRACE] nomad.drain: not draining %s job %s because deadline isn't for %s",
job.Type, job.Name, node.DrainStrategy.DeadlineTime().Sub(now))
skipJob[jobkey] = struct{}{}
continue
@ -248,14 +292,14 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
// Count the number of down (terminal or nil deployment status) per task group
if job.Type == structs.JobTypeService {
n := 0
num := 0
for _, a := range jobAllocs {
if !a.TerminalStatus() && a.DeploymentStatus != nil {
upPerTG[makeTaskGroupKey(a)]++
n++
num++
}
}
s.logger.Printf("[TRACE] nomad.drain: job %s has %d task groups running", job.Name, n)
n.logger.Printf("[TRACE] nomad.drain: job %s has %d task groups running", job.Name, num)
}
drainable[jobkey] = &drainingJob{
@ -268,7 +312,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
// if node has no allocs or has hit its deadline, it's done draining!
if !allocsLeft || deadlineReached {
s.logger.Printf("[TRACE] nomad.drain: node %s has no more allocs left to drain or has reached deadline", nodeID)
n.logger.Printf("[TRACE] nomad.drain: node %s has no more allocs left to drain or has reached deadline", nodeID)
jobWatcher.nodeDone(nodeID)
doneNodes[nodeID] = node
}
@ -298,7 +342,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
tgKey := makeTaskGroupKey(alloc)
if node.DrainStrategy.DeadlineTime().Before(now) {
s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to node's drain deadline", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
n.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to node's drain deadline", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
// Alloc's Node has reached its deadline
stoplist.add(drainingJob.job, alloc)
upPerTG[tgKey]--
@ -319,19 +363,19 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
// Only 1, drain
if tg.Count == 1 {
s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to count=1", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
n.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to count=1", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
stoplist.add(drainingJob.job, alloc)
continue
}
// No migrate strategy or a max parallel of 0 mean force draining
if tg.Migrate == nil || tg.Migrate.MaxParallel == 0 {
s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to force drain", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
n.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to force drain", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
stoplist.add(drainingJob.job, alloc)
continue
}
s.logger.Printf("[TRACE] nomad.drain: considering job %s alloc %s count %d maxp %d up %d",
n.logger.Printf("[TRACE] nomad.drain: considering job %s alloc %s count %d maxp %d up %d",
drainingJob.job.Name, alloc.ID[:6], tg.Count, tg.Migrate.MaxParallel, upPerTG[tgKey])
// Count - MaxParalell = minimum number of allocations that must be "up"
@ -339,7 +383,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
// If minimum is < the current number up it is safe to stop one.
if minUp < upPerTG[tgKey] {
s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to max parallel", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
n.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to max parallel", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
// More migrations are allowed, add to stoplist
stoplist.add(drainingJob.job, alloc)
upPerTG[tgKey]--
@ -348,7 +392,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
}
if len(stoplist.allocBatch) > 0 {
s.logger.Printf("[DEBUG] nomad.drain: stopping %d alloc(s) for %d job(s)", len(stoplist.allocBatch), len(stoplist.jobBatch))
n.logger.Printf("[DEBUG] nomad.drain: stopping %d alloc(s) for %d job(s)", len(stoplist.allocBatch), len(stoplist.jobBatch))
// Reevaluate affected jobs
evals := make([]*structs.Evaluation, 0, len(stoplist.jobBatch))
@ -365,40 +409,21 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
})
}
// Send raft request
batch := &structs.AllocUpdateDesiredTransitionRequest{
Allocs: stoplist.allocBatch,
Evals: evals,
WriteRequest: structs.WriteRequest{Region: s.config.Region},
}
// Commit this update via Raft
//TODO Not the right request
_, index, err := s.raftApply(structs.AllocUpdateDesiredTransitionRequestType, batch)
if err != nil {
if err := n.raft.AllocUpdateDesiredTransition(stoplist.allocBatch, evals); err != nil {
//FIXME
panic(err)
}
//TODO i bet there's something useful to do with this index
_ = index
}
// Unset drain for nodes done draining
for nodeID, node := range doneNodes {
args := structs.NodeUpdateDrainRequest{
NodeID: nodeID,
Drain: false,
WriteRequest: structs.WriteRequest{Region: s.config.Region},
}
_, _, err := s.raftApply(structs.NodeUpdateDrainRequestType, &args)
if err != nil {
s.logger.Printf("[ERR] nomad.drain: failed to unset drain for: %v", err)
if err := n.raft.NodeDrainComplete(nodeID); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for: %v", err)
//FIXME
panic(err)
}
s.logger.Printf("[INFO] nomad.drain: node %s (%s) completed draining", nodeID, node.Name)
n.logger.Printf("[INFO] nomad.drain: node %s (%s) completed draining", nodeID, node.Name)
delete(nodes, nodeID)
}
}

View File

@ -1,7 +1,9 @@
package nomad
package drainer_test
import (
"fmt"
"net"
"net/rpc"
"sort"
"strings"
"testing"
@ -10,7 +12,9 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
@ -20,11 +24,29 @@ import (
"github.com/stretchr/testify/require"
)
// rpcClient is a test helper method to return a ClientCodec to use to make rpc
// calls to the passed server.
func rpcClient(t *testing.T, conf *nomad.Config) rpc.ClientCodec {
addr := conf.RPCAddr
conn, err := net.DialTimeout("tcp", addr.String(), time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
// Write the Nomad RPC byte to set the mode
conn.Write([]byte{byte(pool.RpcNomad)})
return pool.NewClientCodec(conn)
}
// TestNodeDrainer_SimpleDrain asserts that draining when there are two nodes
// moves allocs from the draining node to the other node.
func TestNodeDrainer_SimpleDrain(t *testing.T) {
require := require.New(t)
server := TestServer(t, nil)
// Capture test servers config
var serverConfig *nomad.Config
server := nomad.TestServer(t, func(c *nomad.Config) {
serverConfig = c
})
defer server.Shutdown()
testutil.WaitForLeader(t, server.RPC)
@ -32,7 +54,7 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) {
// Setup 2 Nodes: A & B; A has allocs and is draining
// Create mock jobs
state := server.fsm.State()
state := server.State()
serviceJob := mock.Job()
serviceJob.Name = "service-job"
@ -83,12 +105,12 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) {
// Start node 1
c1 := client.TestClient(t, func(conf *config.Config) {
conf.LogOutput = testlog.NewWriter(t)
conf.Servers = []string{server.config.RPCAddr.String()}
conf.Servers = []string{serverConfig.RPCAddr.String()}
})
defer c1.Shutdown()
// Start jobs so they all get placed on node 1
codec := rpcClient(t, server)
codec := rpcClient(t, serverConfig)
for _, job := range []*structs.Job{systemJob, serviceJob, batchJob} {
req := &structs.JobRegisterRequest{
Job: job.Copy(),
@ -137,7 +159,6 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) {
t.Logf("%d alloc %s job %s status %s", i, alloc.ID, alloc.Job.Name, alloc.ClientStatus)
}
}
server.logger.Println("----------------------------------------------------------------------quitting--------------------------------------------------------")
t.Fatalf("failed waiting for all allocs to start: %v", err)
})
@ -155,7 +176,7 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) {
// Start node 2
c2 := client.TestClient(t, func(conf *config.Config) {
conf.NetworkSpeed = 10000
conf.Servers = []string{server.config.RPCAddr.String()}
conf.Servers = []string{serverConfig.RPCAddr.String()}
})
defer c2.Shutdown()
@ -191,7 +212,6 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) {
t.Logf("%d alloc %s job %s status %s prev %s", i, alloc.ID, alloc.Job.Name, alloc.ClientStatus, alloc.PreviousAllocation)
}
}
server.logger.Println("----------------------------------------------------------------------quitting--------------------------------------------------------")
t.Errorf("failed waiting for all allocs to migrate: %v", err)
})

30
nomad/drainer_shims.go Normal file
View File

@ -0,0 +1,30 @@
package nomad
import "github.com/hashicorp/nomad/nomad/structs"
// drainerShim implements the drainer.RaftApplier interface required by the
// NodeDrainer.
type drainerShim struct {
s *Server
}
func (d drainerShim) NodeDrainComplete(nodeID string) error {
args := &structs.NodeUpdateDrainRequest{
NodeID: nodeID,
Drain: false,
WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
}
_, _, err := d.s.raftApply(structs.NodeUpdateDrainRequestType, args)
return err
}
func (d drainerShim) AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) error {
args := &structs.AllocUpdateDesiredTransitionRequest{
Allocs: allocs,
Evals: evals,
WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
}
_, _, err := d.s.raftApply(structs.AllocUpdateDesiredTransitionRequestType, args)
return err
}

View File

@ -199,9 +199,10 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
s.blockedEvals.SetTimetable(s.fsm.TimeTable())
// Enable the deployment watcher, since we are now the leader
if err := s.deploymentWatcher.SetEnabled(true, s.State()); err != nil {
return err
}
s.deploymentWatcher.SetEnabled(true, s.State())
// Enable the NodeDrainer
s.nodeDrainer.SetEnabled(true, s.State())
// Restore the eval broker state
if err := s.restoreEvals(); err != nil {
@ -267,8 +268,15 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
go s.replicateACLTokens(stopCh)
}
// Start Node Drainer
go s.startNodeDrainer(stopCh)
// Convert stopCh into a Context
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
select {
case <-stopCh:
case <-ctx.Done():
}
}()
// Setup any enterprise systems required.
if err := s.establishEnterpriseLeadership(stopCh); err != nil {
@ -676,9 +684,10 @@ func (s *Server) revokeLeadership() error {
s.vault.SetActive(false)
// Disable the deployment watcher as it is only useful as a leader.
if err := s.deploymentWatcher.SetEnabled(false, nil); err != nil {
return err
}
s.deploymentWatcher.SetEnabled(false, nil)
// Disable the node drainer
s.nodeDrainer.SetEnabled(false, nil)
// Disable any enterprise systems required.
if err := s.revokeEnterpriseLeadership(); err != nil {

View File

@ -30,7 +30,7 @@ func rpcClient(t *testing.T, s *Server) rpc.ClientCodec {
if err != nil {
t.Fatalf("err: %v", err)
}
// Write the Consul RPC byte to set the mode
// Write the Nomad RPC byte to set the mode
conn.Write([]byte{byte(pool.RpcNomad)})
return pool.NewClientCodec(conn)
}

View File

@ -27,6 +27,7 @@ import (
"github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/nomad/nomad/deploymentwatcher"
"github.com/hashicorp/nomad/nomad/drainer"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
@ -172,6 +173,9 @@ type Server struct {
// make the required calls to continue to transition the deployment.
deploymentWatcher *deploymentwatcher.Watcher
// nodeDrainer is used to drain allocations from nodes.
nodeDrainer *drainer.NodeDrainer
// evalBroker is used to manage the in-progress evaluations
// that are waiting to be brokered to a sub-scheduler
evalBroker *EvalBroker
@ -355,6 +359,9 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg
return nil, fmt.Errorf("failed to create deployment watcher: %v", err)
}
// Setup the node drainer.
s.setupNodeDrainer()
// Setup the enterprise state
if err := s.setupEnterprise(config); err != nil {
return nil, err
@ -880,6 +887,15 @@ func (s *Server) setupDeploymentWatcher() error {
return nil
}
// setupNodeDrainer creates a node drainer which will be enabled when a server
// becomes a leader.
func (s *Server) setupNodeDrainer() {
// create a shim around raft requests
shim := drainerShim{s}
s.nodeDrainer = drainer.NewNodeDrainer(s.logger, shim)
go s.nodeDrainer.Run()
}
// setupVaultClient is used to set up the Vault API client.
func (s *Server) setupVaultClient() error {
v, err := NewVaultClient(s.config.VaultConfig, s.logger, s.purgeVaultAccessors)