nomad: adding FSM support for node drain update
This commit is contained in:
parent
82cba83eb6
commit
21000d41ec
16
nomad/fsm.go
16
nomad/fsm.go
|
@ -120,6 +120,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
|
|||
return n.applyDeregisterNode(buf[1:], log.Index)
|
||||
case structs.NodeUpdateStatusRequestType:
|
||||
return n.applyStatusUpdate(buf[1:], log.Index)
|
||||
case structs.NodeUpdateDrainRequestType:
|
||||
return n.applyDrainUpdate(buf[1:], log.Index)
|
||||
case structs.JobRegisterRequestType:
|
||||
return n.applyRegisterJob(buf[1:], log.Index)
|
||||
case structs.JobDeregisterRequestType:
|
||||
|
@ -184,6 +186,20 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_drain_update"}, time.Now())
|
||||
var req structs.NodeUpdateDrainRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
|
||||
if err := n.state.UpdateNodeDrain(index, req.NodeID, req.Drain); err != nil {
|
||||
n.logger.Printf("[ERR] nomad.fsm: UpdateNodeDrain failed: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nomadFSM) applyRegisterJob(buf []byte, index uint64) interface{} {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_job"}, time.Now())
|
||||
var req structs.JobRegisterRequest
|
||||
|
|
|
@ -178,6 +178,47 @@ func TestFSM_UpdateNodeStatus(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFSM_UpdateNodeDrain(t *testing.T) {
|
||||
fsm := testFSM(t)
|
||||
|
||||
node := mock.Node()
|
||||
req := structs.NodeRegisterRequest{
|
||||
Node: node,
|
||||
}
|
||||
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
req2 := structs.NodeUpdateDrainRequest{
|
||||
NodeID: node.ID,
|
||||
Drain: true,
|
||||
}
|
||||
buf, err = structs.Encode(structs.NodeUpdateDrainRequestType, req2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp = fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify we are NOT registered
|
||||
node, err = fsm.State().GetNodeByID(req.Node.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !node.Drain {
|
||||
t.Fatalf("bad node: %#v", node)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_RegisterJob(t *testing.T) {
|
||||
fsm := testFSM(t)
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ const (
|
|||
NodeRegisterRequestType MessageType = iota
|
||||
NodeDeregisterRequestType
|
||||
NodeUpdateStatusRequestType
|
||||
NodeUpdateDrainRequestType
|
||||
JobRegisterRequestType
|
||||
JobDeregisterRequestType
|
||||
EvalUpdateRequestType
|
||||
|
@ -128,7 +129,7 @@ type NodeDeregisterRequest struct {
|
|||
WriteRequest
|
||||
}
|
||||
|
||||
// UpdateStatusRequest is used for Client.UpdateStatus endpoint
|
||||
// NodeUpdateStatusRequest is used for Client.UpdateStatus endpoint
|
||||
// to update the status of a node.
|
||||
type NodeUpdateStatusRequest struct {
|
||||
NodeID string
|
||||
|
@ -136,6 +137,13 @@ type NodeUpdateStatusRequest struct {
|
|||
WriteRequest
|
||||
}
|
||||
|
||||
// NodeUpdateDrainRequest is used for updatin the drain status
|
||||
type NodeUpdateDrainRequest struct {
|
||||
NodeID string
|
||||
Drain bool
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// NodeEvaluateRequest is used to re-evaluate the ndoe
|
||||
type NodeEvaluateRequest struct {
|
||||
NodeID string
|
||||
|
|
Loading…
Reference in New Issue