set node.StatusUpdatedAt in raft
Fix a case where `node.StatusUpdatedAt` was manipulated directly in memory. This ensures that StatusUpdatedAt is set in raft layer, and ensures that the field is updated when node drain/eligibility is updated too.
This commit is contained in:
parent
dff3abb630
commit
6bdbeed319
|
@ -1,6 +1,10 @@
|
|||
package nomad
|
||||
|
||||
import "github.com/hashicorp/nomad/nomad/structs"
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// drainerShim implements the drainer.RaftApplier interface required by the
|
||||
// NodeDrainer.
|
||||
|
@ -13,6 +17,7 @@ func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent
|
|||
Updates: make(map[string]*structs.DrainUpdate, len(nodes)),
|
||||
NodeEvents: make(map[string]*structs.NodeEvent, len(nodes)),
|
||||
WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
|
||||
UpdatedAt: time.Now().Unix(),
|
||||
}
|
||||
|
||||
update := &structs.DrainUpdate{}
|
||||
|
|
|
@ -310,7 +310,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
|
|||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
|
||||
if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.NodeEvent); err != nil {
|
||||
if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.UpdatedAt, req.NodeEvent); err != nil {
|
||||
n.logger.Error("UpdateNodeStatus failed", "error", err)
|
||||
return err
|
||||
}
|
||||
|
@ -352,7 +352,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
|
|||
}
|
||||
}
|
||||
|
||||
if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.NodeEvent); err != nil {
|
||||
if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
|
||||
n.logger.Error("UpdateNodeDrain failed", "error", err)
|
||||
return err
|
||||
}
|
||||
|
@ -366,7 +366,7 @@ func (n *nomadFSM) applyBatchDrainUpdate(buf []byte, index uint64) interface{} {
|
|||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
|
||||
if err := n.state.BatchUpdateNodeDrain(index, req.Updates, req.NodeEvents); err != nil {
|
||||
if err := n.state.BatchUpdateNodeDrain(index, req.UpdatedAt, req.Updates, req.NodeEvents); err != nil {
|
||||
n.logger.Error("BatchUpdateNodeDrain failed", "error", err)
|
||||
return err
|
||||
}
|
||||
|
@ -387,7 +387,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac
|
|||
return err
|
||||
}
|
||||
|
||||
if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.NodeEvent); err != nil {
|
||||
if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.UpdatedAt, req.NodeEvent); err != nil {
|
||||
n.logger.Error("UpdateNodeEligibility failed", "error", err)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -369,7 +369,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
|
|||
// to track SecretIDs.
|
||||
|
||||
// Update the timestamp of when the node status was updated
|
||||
node.StatusUpdatedAt = time.Now().Unix()
|
||||
args.UpdatedAt = time.Now().Unix()
|
||||
|
||||
// Commit this update via Raft
|
||||
var index uint64
|
||||
|
@ -484,6 +484,9 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
|
|||
return fmt.Errorf("node not found")
|
||||
}
|
||||
|
||||
// Update the timestamp of when the node status was updated
|
||||
args.UpdatedAt = time.Now().Unix()
|
||||
|
||||
// COMPAT: Remove in 0.9. Attempt to upgrade the request if it is of the old
|
||||
// format.
|
||||
if args.Drain && args.DrainStrategy == nil {
|
||||
|
@ -589,6 +592,9 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
|
|||
return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility)
|
||||
}
|
||||
|
||||
// Update the timestamp of when the node status was updated
|
||||
args.UpdatedAt = time.Now().Unix()
|
||||
|
||||
// Construct the node event
|
||||
args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster)
|
||||
if node.SchedulingEligibility == args.Eligibility {
|
||||
|
|
|
@ -732,7 +732,7 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
|
|||
}
|
||||
|
||||
// UpdateNodeStatus is used to update the status of a node
|
||||
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event *structs.NodeEvent) error {
|
||||
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error {
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
|
||||
|
@ -748,6 +748,7 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event
|
|||
// Copy the existing node
|
||||
existingNode := existing.(*structs.Node)
|
||||
copyNode := existingNode.Copy()
|
||||
copyNode.StatusUpdatedAt = updatedAt
|
||||
|
||||
// Add the event if given
|
||||
if event != nil {
|
||||
|
@ -771,11 +772,11 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event
|
|||
}
|
||||
|
||||
// BatchUpdateNodeDrain is used to update the drain of a node set of nodes
|
||||
func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
|
||||
func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
for node, update := range updates {
|
||||
if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, events[node]); err != nil {
|
||||
if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, updatedAt, events[node]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -785,11 +786,11 @@ func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*stru
|
|||
|
||||
// UpdateNodeDrain is used to update the drain of a node
|
||||
func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string,
|
||||
drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error {
|
||||
drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error {
|
||||
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, event); err != nil {
|
||||
if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil {
|
||||
return err
|
||||
}
|
||||
txn.Commit()
|
||||
|
@ -797,7 +798,7 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string,
|
|||
}
|
||||
|
||||
func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID string,
|
||||
drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error {
|
||||
drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error {
|
||||
|
||||
// Lookup the node
|
||||
existing, err := txn.First("nodes", "id", nodeID)
|
||||
|
@ -811,6 +812,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
|
|||
// Copy the existing node
|
||||
existingNode := existing.(*structs.Node)
|
||||
copyNode := existingNode.Copy()
|
||||
copyNode.StatusUpdatedAt = updatedAt
|
||||
|
||||
// Add the event if given
|
||||
if event != nil {
|
||||
|
@ -840,7 +842,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
|
|||
}
|
||||
|
||||
// UpdateNodeEligibility is used to update the scheduling eligibility of a node
|
||||
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, event *structs.NodeEvent) error {
|
||||
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error {
|
||||
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
|
@ -857,6 +859,7 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil
|
|||
// Copy the existing node
|
||||
existingNode := existing.(*structs.Node)
|
||||
copyNode := existingNode.Copy()
|
||||
copyNode.StatusUpdatedAt = updatedAt
|
||||
|
||||
// Add the event if given
|
||||
if event != nil {
|
||||
|
|
|
@ -857,7 +857,7 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
|
|||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
require.NoError(state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady, event))
|
||||
require.NoError(state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady, 70, event))
|
||||
require.True(watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
|
@ -865,6 +865,7 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
|
|||
require.NoError(err)
|
||||
require.Equal(structs.NodeStatusReady, out.Status)
|
||||
require.EqualValues(801, out.ModifyIndex)
|
||||
require.EqualValues(70, out.StatusUpdatedAt)
|
||||
require.Len(out.Events, 2)
|
||||
require.Equal(event.Message, out.Events[1].Message)
|
||||
|
||||
|
@ -912,7 +913,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
|
|||
n2.ID: event,
|
||||
}
|
||||
|
||||
require.Nil(state.BatchUpdateNodeDrain(1002, update, events))
|
||||
require.Nil(state.BatchUpdateNodeDrain(1002, 7, update, events))
|
||||
require.True(watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
|
@ -924,6 +925,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
|
|||
require.Equal(out.DrainStrategy, expectedDrain)
|
||||
require.Len(out.Events, 2)
|
||||
require.EqualValues(1002, out.ModifyIndex)
|
||||
require.EqualValues(7, out.StatusUpdatedAt)
|
||||
}
|
||||
|
||||
index, err := state.Index("nodes")
|
||||
|
@ -955,7 +957,7 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
|
|||
Subsystem: structs.NodeEventSubsystemDrain,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
require.Nil(state.UpdateNodeDrain(1001, node.ID, expectedDrain, false, event))
|
||||
require.Nil(state.UpdateNodeDrain(1001, node.ID, expectedDrain, false, 7, event))
|
||||
require.True(watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
|
@ -966,6 +968,7 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
|
|||
require.Equal(out.DrainStrategy, expectedDrain)
|
||||
require.Len(out.Events, 2)
|
||||
require.EqualValues(1001, out.ModifyIndex)
|
||||
require.EqualValues(7, out.StatusUpdatedAt)
|
||||
|
||||
index, err := state.Index("nodes")
|
||||
require.Nil(err)
|
||||
|
@ -1084,7 +1087,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
|
|||
Subsystem: structs.NodeEventSubsystemDrain,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
require.Nil(state.UpdateNodeDrain(1001, node.ID, drain, false, event1))
|
||||
require.Nil(state.UpdateNodeDrain(1001, node.ID, drain, false, 7, event1))
|
||||
require.True(watchFired(ws))
|
||||
|
||||
// Remove the drain
|
||||
|
@ -1093,7 +1096,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
|
|||
Subsystem: structs.NodeEventSubsystemDrain,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
require.Nil(state.UpdateNodeDrain(1002, node.ID, nil, true, event2))
|
||||
require.Nil(state.UpdateNodeDrain(1002, node.ID, nil, true, 9, event2))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
out, err := state.NodeByID(ws, node.ID)
|
||||
|
@ -1103,6 +1106,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
|
|||
require.Equal(out.SchedulingEligibility, structs.NodeSchedulingEligible)
|
||||
require.Len(out.Events, 3)
|
||||
require.EqualValues(1002, out.ModifyIndex)
|
||||
require.EqualValues(9, out.StatusUpdatedAt)
|
||||
|
||||
index, err := state.Index("nodes")
|
||||
require.Nil(err)
|
||||
|
@ -1133,7 +1137,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
|
|||
Subsystem: structs.NodeEventSubsystemCluster,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, event))
|
||||
require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, 7, event))
|
||||
require.True(watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
|
@ -1143,6 +1147,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
|
|||
require.Len(out.Events, 2)
|
||||
require.Equal(out.Events[1], event)
|
||||
require.EqualValues(1001, out.ModifyIndex)
|
||||
require.EqualValues(7, out.StatusUpdatedAt)
|
||||
|
||||
index, err := state.Index("nodes")
|
||||
require.Nil(err)
|
||||
|
@ -1155,10 +1160,10 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
|
|||
Deadline: -1 * time.Second,
|
||||
},
|
||||
}
|
||||
require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, nil))
|
||||
require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, 7, nil))
|
||||
|
||||
// Try to set the node to eligible
|
||||
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, nil)
|
||||
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, 9, nil)
|
||||
require.NotNil(err)
|
||||
require.Contains(err.Error(), "while it is draining")
|
||||
}
|
||||
|
|
|
@ -347,6 +347,7 @@ type NodeUpdateStatusRequest struct {
|
|||
NodeID string
|
||||
Status string
|
||||
NodeEvent *NodeEvent
|
||||
UpdatedAt int64
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
|
@ -367,6 +368,9 @@ type NodeUpdateDrainRequest struct {
|
|||
// NodeEvent is the event added to the node
|
||||
NodeEvent *NodeEvent
|
||||
|
||||
// UpdatedAt represents server time of receiving request
|
||||
UpdatedAt int64
|
||||
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
|
@ -379,6 +383,9 @@ type BatchNodeUpdateDrainRequest struct {
|
|||
// NodeEvents is a mapping of the node to the event to add to the node
|
||||
NodeEvents map[string]*NodeEvent
|
||||
|
||||
// UpdatedAt represents server time of receiving request
|
||||
UpdatedAt int64
|
||||
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
|
@ -399,6 +406,9 @@ type NodeUpdateEligibilityRequest struct {
|
|||
// NodeEvent is the event added to the node
|
||||
NodeEvent *NodeEvent
|
||||
|
||||
// UpdatedAt represents server time of receiving request
|
||||
UpdatedAt int64
|
||||
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue