Merge pull request #4284 from hashicorp/f-drain-event

Emit Node Events for draining
This commit is contained in:
Alex Dadgar 2018-05-22 21:04:18 +00:00 committed by GitHub
commit 86be50fa05
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 217 additions and 32 deletions

View file

@ -30,13 +30,21 @@ const (
// NodeDeadlineCoalesceWindow is the duration in which deadlining nodes will
// be coalesced together
NodeDeadlineCoalesceWindow = 5 * time.Second
// NodeDrainEventComplete is used to indicate that the node drain is
// finished.
NodeDrainEventComplete = "Node drain complete"
// NodeDrainEventDetailDeadlined is the key to use when the drain is
// complete because a deadline. The acceptable values are "true" and "false"
NodeDrainEventDetailDeadlined = "deadline_reached"
)
// RaftApplier contains methods for applying the raft requests required by the
// NodeDrainer.
type RaftApplier interface {
AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error)
NodesDrainComplete(nodes []string) (uint64, error)
NodesDrainComplete(nodes []string, event *structs.NodeEvent) (uint64, error)
}
// NodeTracker is the interface to notify an object that is tracking draining
@ -254,10 +262,16 @@ func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) {
n.l.RUnlock()
n.batchDrainAllocs(forceStop)
// Create the node event
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemDrain).
SetMessage(NodeDrainEventComplete).
AddDetail(NodeDrainEventDetailDeadlined, "true")
// Submit the node transitions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(defaultMaxIdsPerTxn, nodes) {
if _, err := n.raft.NodesDrainComplete(nodes); err != nil {
if _, err := n.raft.NodesDrainComplete(nodes, event); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err)
}
}
@ -324,10 +338,15 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) {
}
}
// Create the node event
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemDrain).
SetMessage(NodeDrainEventComplete)
// Submit the node transitions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(defaultMaxIdsPerTxn, done) {
if _, err := n.raft.NodesDrainComplete(nodes); err != nil {
if _, err := n.raft.NodesDrainComplete(nodes, event); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err)
}
}

View file

@ -102,7 +102,12 @@ func (n *NodeDrainer) Update(node *structs.Node) {
}
}
index, err := n.raft.NodesDrainComplete([]string{node.ID})
// Create the node event
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemDrain).
SetMessage(NodeDrainEventComplete)
index, err := n.raft.NodesDrainComplete([]string{node.ID}, event)
if err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for node %q: %v", node.ID, err)
} else {

View file

@ -97,7 +97,7 @@ func TestNodeDrainWatcher_Remove(t *testing.T) {
require.Equal(n, tracked[n.ID])
// Change the node to be not draining and wait for it to be untracked
require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false))
require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false, nil))
testutil.WaitForResult(func() (bool, error) {
return len(m.Events) == 2, nil
}, func(err error) {
@ -175,7 +175,7 @@ func TestNodeDrainWatcher_Update(t *testing.T) {
// Change the node to have a new spec
s2 := n.DrainStrategy.Copy()
s2.Deadline += time.Hour
require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false))
require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false, nil))
// Wait for it to be updated
testutil.WaitForResult(func() (bool, error) {

View file

@ -12,6 +12,7 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/drainer"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@ -212,6 +213,12 @@ func TestDrainer_Simple_ServiceOnly(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
}
func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) {
@ -300,6 +307,13 @@ func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined)
}
func TestDrainer_DrainEmptyNode(t *testing.T) {
@ -343,6 +357,12 @@ func TestDrainer_DrainEmptyNode(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
}
func TestDrainer_AllTypes_Deadline(t *testing.T) {
@ -500,6 +520,13 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) {
}
}
require.True(serviceMax < batchMax)
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined)
}
// Test that drain is unset when batch jobs naturally finish
@ -659,6 +686,12 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
}
func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
@ -824,6 +857,13 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined)
}
// Test that transitions to force drain work.
@ -962,6 +1002,13 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 4)
require.Equal(drainer.NodeDrainEventComplete, node.Events[3].Message)
require.Contains(node.Events[3].Details, drainer.NodeDrainEventDetailDeadlined)
})
}
}

View file

@ -8,15 +8,19 @@ type drainerShim struct {
s *Server
}
func (d drainerShim) NodesDrainComplete(nodes []string) (uint64, error) {
func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent) (uint64, error) {
args := &structs.BatchNodeUpdateDrainRequest{
Updates: make(map[string]*structs.DrainUpdate, len(nodes)),
NodeEvents: make(map[string]*structs.NodeEvent, len(nodes)),
WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
}
update := &structs.DrainUpdate{}
for _, node := range nodes {
args.Updates[node] = update
if event != nil {
args.NodeEvents[node] = event
}
}
resp, index, err := d.s.raftApply(structs.BatchNodeUpdateDrainRequestType, args)

View file

@ -349,7 +349,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
}
}
if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible); err != nil {
if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.NodeEvent); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateNodeDrain failed: %v", err)
return err
}
@ -363,7 +363,7 @@ func (n *nomadFSM) applyBatchDrainUpdate(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.BatchUpdateNodeDrain(index, req.Updates); err != nil {
if err := n.state.BatchUpdateNodeDrain(index, req.Updates, req.NodeEvents); err != nil {
n.logger.Printf("[ERR] nomad.fsm: BatchUpdateNodeDrain failed: %v", err)
return err
}

View file

@ -327,12 +327,20 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) {
Deadline: 10 * time.Second,
},
}
event := &structs.NodeEvent{
Message: "Drain strategy enabled",
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
}
req2 := structs.BatchNodeUpdateDrainRequest{
Updates: map[string]*structs.DrainUpdate{
node.ID: {
DrainStrategy: strategy,
},
},
NodeEvents: map[string]*structs.NodeEvent{
node.ID: event,
},
}
buf, err = structs.Encode(structs.BatchNodeUpdateDrainRequestType, req2)
require.Nil(err)
@ -346,6 +354,7 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) {
require.Nil(err)
require.True(node.Drain)
require.Equal(node.DrainStrategy, strategy)
require.Len(node.Events, 2)
}
func TestFSM_UpdateNodeDrain(t *testing.T) {
@ -371,6 +380,11 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
req2 := structs.NodeUpdateDrainRequest{
NodeID: node.ID,
DrainStrategy: strategy,
NodeEvent: &structs.NodeEvent{
Message: "Drain strategy enabled",
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
},
}
buf, err = structs.Encode(structs.NodeUpdateDrainRequestType, req2)
require.Nil(err)
@ -384,6 +398,7 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
require.Nil(err)
require.True(node.Drain)
require.Equal(node.DrainStrategy, strategy)
require.Len(node.Events, 2)
}
func TestFSM_UpdateNodeDrain_Pre08_Compatibility(t *testing.T) {

View file

@ -27,6 +27,11 @@ const (
// maxParallelRequestsPerDerive is the maximum number of parallel Vault
// create token requests that may be outstanding per derive request
maxParallelRequestsPerDerive = 16
// NodeDrainEvents are the various drain messages
NodeDrainEventDrainSet = "Node drain strategy set"
NodeDrainEventDrainDisabled = "Node drain disabled"
NodeDrainEventDrainUpdated = "Node drain stategy updated"
)
// Node endpoint is used for client interactions
@ -439,6 +444,9 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
if args.NodeID == "" {
return fmt.Errorf("missing node ID for drain update")
}
if args.NodeEvent != nil {
return fmt.Errorf("node event must not be set")
}
// Look for the node
snap, err := n.srv.fsm.State().Snapshot()
@ -468,6 +476,18 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
args.DrainStrategy.ForceDeadline = time.Now().Add(args.DrainStrategy.Deadline)
}
// Construct the node event
args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemDrain)
if node.DrainStrategy == nil && args.DrainStrategy == nil {
return nil // Nothing to do
} else if node.DrainStrategy == nil && args.DrainStrategy != nil {
args.NodeEvent.SetMessage(NodeDrainEventDrainSet)
} else if node.DrainStrategy != nil && args.DrainStrategy != nil {
args.NodeEvent.SetMessage(NodeDrainEventDrainUpdated)
} else if node.DrainStrategy != nil && args.DrainStrategy == nil {
args.NodeEvent.SetMessage(NodeDrainEventDrainDisabled)
}
// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.NodeUpdateDrainRequestType, args)
if err != nil {

View file

@ -845,6 +845,8 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) {
require.Nil(err)
require.True(out.Drain)
require.Equal(strategy.Deadline, out.DrainStrategy.Deadline)
require.Len(out.Events, 2)
require.Equal(NodeDrainEventDrainSet, out.Events[1].Message)
// before+deadline should be before the forced deadline
require.True(beforeUpdate.Add(strategy.Deadline).Before(out.DrainStrategy.ForceDeadline))
@ -2587,7 +2589,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
Deadline: 10 * time.Second,
},
}
errCh <- state.UpdateNodeDrain(3, node.ID, s, false)
errCh <- state.UpdateNodeDrain(3, node.ID, s, false, nil)
})
req.MinQueryIndex = 2

View file

@ -619,11 +619,11 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error
}
// BatchUpdateNodeDrain is used to update the drain of a node set of nodes
func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*structs.DrainUpdate) error {
func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()
for node, update := range updates {
if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible); err != nil {
if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, events[node]); err != nil {
return err
}
}
@ -633,11 +633,11 @@ func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*stru
// UpdateNodeDrain is used to update the drain of a node
func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string,
drain *structs.DrainStrategy, markEligible bool) error {
drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()
if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible); err != nil {
if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, event); err != nil {
return err
}
txn.Commit()
@ -645,7 +645,7 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string,
}
func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID string,
drain *structs.DrainStrategy, markEligible bool) error {
drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error {
// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
@ -660,6 +660,11 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()
// Add the event if given
if event != nil {
appendNodeEvents(index, copyNode, []*structs.NodeEvent{event})
}
// Update the drain in the copy
copyNode.Drain = drain != nil // COMPAT: Remove in Nomad 0.9
copyNode.DrainStrategy = drain
@ -754,18 +759,7 @@ func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*str
// Copy the existing node
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()
// Add the events, updating the indexes
for _, e := range events {
e.CreateIndex = index
copyNode.Events = append(copyNode.Events, e)
}
// Keep node events pruned to not exceed the max allowed
if l := len(copyNode.Events); l > structs.MaxRetainedNodeEvents {
delta := l - structs.MaxRetainedNodeEvents
copyNode.Events = copyNode.Events[delta:]
}
appendNodeEvents(index, copyNode, events)
// Insert the node
if err := txn.Insert("nodes", copyNode); err != nil {
@ -778,6 +772,22 @@ func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*str
return nil
}
// appendNodeEvents is a helper that takes a node and new events and appends
// them, pruning older events as needed.
func appendNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEvent) {
// Add the events, updating the indexes
for _, e := range events {
e.CreateIndex = index
node.Events = append(node.Events, e)
}
// Keep node events pruned to not exceed the max allowed
if l := len(node.Events); l > structs.MaxRetainedNodeEvents {
delta := l - structs.MaxRetainedNodeEvents
node.Events = node.Events[delta:]
}
}
// NodeByID is used to lookup a node by ID
func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) {
txn := s.db.Txn(false)

View file

@ -726,7 +726,17 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
},
}
require.Nil(state.BatchUpdateNodeDrain(1002, update))
event := &structs.NodeEvent{
Message: "Drain strategy enabled",
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
}
events := map[string]*structs.NodeEvent{
n1.ID: event,
n2.ID: event,
}
require.Nil(state.BatchUpdateNodeDrain(1002, update, events))
require.True(watchFired(ws))
ws = memdb.NewWatchSet()
@ -736,6 +746,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
require.True(out.Drain)
require.NotNil(out.DrainStrategy)
require.Equal(out.DrainStrategy, expectedDrain)
require.Len(out.Events, 2)
require.EqualValues(1002, out.ModifyIndex)
}
@ -763,7 +774,12 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
},
}
require.Nil(state.UpdateNodeDrain(1001, node.ID, expectedDrain, false))
event := &structs.NodeEvent{
Message: "Drain strategy enabled",
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
}
require.Nil(state.UpdateNodeDrain(1001, node.ID, expectedDrain, false, event))
require.True(watchFired(ws))
ws = memdb.NewWatchSet()
@ -772,6 +788,7 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
require.True(out.Drain)
require.NotNil(out.DrainStrategy)
require.Equal(out.DrainStrategy, expectedDrain)
require.Len(out.Events, 2)
require.EqualValues(1001, out.ModifyIndex)
index, err := state.Index("nodes")
@ -886,11 +903,21 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
},
}
require.Nil(state.UpdateNodeDrain(1001, node.ID, drain, false))
event1 := &structs.NodeEvent{
Message: "Drain strategy enabled",
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
}
require.Nil(state.UpdateNodeDrain(1001, node.ID, drain, false, event1))
require.True(watchFired(ws))
// Remove the drain
require.Nil(state.UpdateNodeDrain(1002, node.ID, nil, true))
event2 := &structs.NodeEvent{
Message: "Drain strategy disabled",
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
}
require.Nil(state.UpdateNodeDrain(1002, node.ID, nil, true, event2))
ws = memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
@ -898,6 +925,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
require.False(out.Drain)
require.Nil(out.DrainStrategy)
require.Equal(out.SchedulingEligibility, structs.NodeSchedulingEligible)
require.Len(out.Events, 3)
require.EqualValues(1002, out.ModifyIndex)
index, err := state.Index("nodes")
@ -944,7 +972,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
Deadline: -1 * time.Second,
},
}
require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false))
require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, nil))
// Try to set the node to eligible
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible)

View file

@ -344,6 +344,10 @@ type NodeUpdateDrainRequest struct {
// MarkEligible marks the node as eligible if removing the drain strategy.
MarkEligible bool
// NodeEvent is the event added to the node
NodeEvent *NodeEvent
WriteRequest
}
@ -352,6 +356,10 @@ type NodeUpdateDrainRequest struct {
type BatchNodeUpdateDrainRequest struct {
// Updates is a mapping of nodes to their updated drain strategy
Updates map[string]*DrainUpdate
// NodeEvents is a mapping of the node to the event to add to the node
NodeEvents map[string]*NodeEvent
WriteRequest
}
@ -1227,6 +1235,33 @@ func (ne *NodeEvent) Copy() *NodeEvent {
return c
}
// NewNodeEvent generates a new node event storing the current time as the
// timestamp
func NewNodeEvent() *NodeEvent {
return &NodeEvent{Timestamp: time.Now()}
}
// SetMessage is used to set the message on the node event
func (ne *NodeEvent) SetMessage(msg string) *NodeEvent {
ne.Message = msg
return ne
}
// SetSubsystem is used to set the subsystem on the node event
func (ne *NodeEvent) SetSubsystem(sys string) *NodeEvent {
ne.Subsystem = sys
return ne
}
// AddDetail is used to add a detail to the node event
func (ne *NodeEvent) AddDetail(k, v string) *NodeEvent {
if ne.Details == nil {
ne.Details = make(map[string]string, 1)
}
ne.Details[k] = v
return ne
}
const (
NodeStatusInit = "initializing"
NodeStatusReady = "ready"