Batch drain update

This commit is contained in:
Alex Dadgar 2018-03-09 14:15:21 -08:00 committed by Michael Schurter
parent 92b636dd32
commit 2d91b9dfba
8 changed files with 158 additions and 16 deletions

View File

@ -36,7 +36,7 @@ const (
// NodeDrainer.
type RaftApplier interface {
AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error)
NodeDrainComplete(nodeID string) (uint64, error)
NodesDrainComplete(nodes []string) (uint64, error)
}
// NodeTracker is the interface to notify an object that is tracking draining
@ -295,14 +295,9 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) {
}
n.l.RUnlock()
// TODO(alex) This should probably be a single Raft transaction
for _, doneNode := range done {
index, err := n.raft.NodeDrainComplete(doneNode)
if err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for node %q: %v", doneNode, err)
} else {
n.logger.Printf("[INFO] nomad.drain: node %q completed draining at index %d", doneNode, index)
}
// TODO(alex) Shard
if _, err := n.raft.NodesDrainComplete(done); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err)
}
}

View File

@ -89,7 +89,7 @@ func (n *NodeDrainer) Update(node *structs.Node) {
}
if done {
index, err := n.raft.NodeDrainComplete(node.ID)
index, err := n.raft.NodesDrainComplete([]string{node.ID})
if err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for node %q: %v", node.ID, err)
} else {

View File

@ -8,14 +8,18 @@ type drainerShim struct {
s *Server
}
func (d drainerShim) NodeDrainComplete(nodeID string) (uint64, error) {
args := &structs.NodeUpdateDrainRequest{
NodeID: nodeID,
Drain: false,
func (d drainerShim) NodesDrainComplete(nodes []string) (uint64, error) {
args := &structs.BatchNodeUpdateDrainRequest{
Updates: make(map[string]*structs.DrainUpdate, len(nodes)),
WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
}
resp, index, err := d.s.raftApply(structs.NodeUpdateDrainRequestType, args)
update := &structs.DrainUpdate{}
for _, node := range nodes {
args.Updates[node] = update
}
resp, index, err := d.s.raftApply(structs.BatchNodeUpdateDrainRequestType, args)
return d.convertApplyErrors(resp, index, err)
}

View File

@ -244,6 +244,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyAllocUpdateDesiredTransition(buf[1:], log.Index)
case structs.NodeUpdateEligibilityRequestType:
return n.applyNodeEligibilityUpdate(buf[1:], log.Index)
case structs.BatchNodeUpdateDrainRequestType:
return n.applyBatchDrainUpdate(buf[1:], log.Index)
}
// Check enterprise only message types.
@ -337,6 +339,20 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
return nil
}
func (n *nomadFSM) applyBatchDrainUpdate(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_node_drain_update"}, time.Now())
var req structs.BatchNodeUpdateDrainRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.BatchUpdateNodeDrain(index, req.Updates); err != nil {
n.logger.Printf("[ERR] nomad.fsm: BatchUpdateNodeDrain failed: %v", err)
return err
}
return nil
}
func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_eligibility_update"}, time.Now())
var req structs.NodeUpdateEligibilityRequest

View File

@ -278,6 +278,47 @@ func TestFSM_UpdateNodeStatus(t *testing.T) {
})
}
func TestFSM_BatchUpdateNodeDrain(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
node := mock.Node()
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
strategy := &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 10 * time.Second,
},
}
req2 := structs.BatchNodeUpdateDrainRequest{
Updates: map[string]*structs.DrainUpdate{
node.ID: &structs.DrainUpdate{
DrainStrategy: strategy,
},
},
}
buf, err = structs.Encode(structs.BatchNodeUpdateDrainRequestType, req2)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
require.Nil(resp)
// Verify we are NOT registered
ws := memdb.NewWatchSet()
node, err = fsm.State().NodeByID(ws, req.Node.ID)
require.Nil(err)
require.True(node.Drain)
require.Equal(node.DrainStrategy, strategy)
}
func TestFSM_UpdateNodeDrain(t *testing.T) {
t.Parallel()
require := require.New(t)

View File

@ -617,12 +617,34 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error
return nil
}
// BatchUpdateNodeDrain is used to update the drain of a node set of nodes
func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*structs.DrainUpdate) error {
txn := s.db.Txn(true)
defer txn.Abort()
for node, update := range updates {
if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible); err != nil {
return err
}
}
txn.Commit()
return nil
}
// UpdateNodeDrain is used to update the drain of a node
func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string,
drain *structs.DrainStrategy, markEligible bool) error {
txn := s.db.Txn(true)
defer txn.Abort()
if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible); err != nil {
return err
}
txn.Commit()
return nil
}
func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID string,
drain *structs.DrainStrategy, markEligible bool) error {
// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
@ -656,7 +678,6 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string,
return fmt.Errorf("index update failed: %v", err)
}
txn.Commit()
return nil
}

View File

@ -698,6 +698,53 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
}
}
func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
require := require.New(t)
state := testStateStore(t)
n1, n2 := mock.Node(), mock.Node()
require.Nil(state.UpsertNode(1000, n1))
require.Nil(state.UpsertNode(1001, n2))
// Create a watchset so we can test that update node drain fires the watch
ws := memdb.NewWatchSet()
_, err := state.NodeByID(ws, n1.ID)
require.Nil(err)
expectedDrain := &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: -1 * time.Second,
},
}
update := map[string]*structs.DrainUpdate{
n1.ID: &structs.DrainUpdate{
DrainStrategy: expectedDrain,
},
n2.ID: &structs.DrainUpdate{
DrainStrategy: expectedDrain,
},
}
require.Nil(state.BatchUpdateNodeDrain(1002, update))
require.True(watchFired(ws))
ws = memdb.NewWatchSet()
for _, id := range []string{n1.ID, n2.ID} {
out, err := state.NodeByID(ws, id)
require.Nil(err)
require.True(out.Drain)
require.NotNil(out.DrainStrategy)
require.Equal(out.DrainStrategy, expectedDrain)
require.EqualValues(1002, out.ModifyIndex)
}
index, err := state.Index("nodes")
require.Nil(err)
require.EqualValues(1002, index)
require.False(watchFired(ws))
}
func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
require := require.New(t)
state := testStateStore(t)

View File

@ -80,6 +80,7 @@ const (
JobBatchDeregisterRequestType
AllocUpdateDesiredTransitionRequestType
NodeUpdateEligibilityRequestType
BatchNodeUpdateDrainRequestType
)
const (
@ -314,6 +315,23 @@ type NodeUpdateDrainRequest struct {
WriteRequest
}
// BatchNodeUpdateDrainRequest is used for updating the drain strategy for a
// batch of nodes
type BatchNodeUpdateDrainRequest struct {
// Updates is a mapping of nodes to their updated drain strategy
Updates map[string]*DrainUpdate
WriteRequest
}
// DrainUpdate is used to update the drain of a node
type DrainUpdate struct {
// DrainStrategy is the new strategy for the node
DrainStrategy *DrainStrategy
// MarkEligible marks the node as eligible if removing the drain strategy.
MarkEligible bool
}
// NodeUpdateEligibilityRequest is used for updating the scheduling eligibility
type NodeUpdateEligibilityRequest struct {
NodeID string