Merge pull request #1456 from hashicorp/b-system-job

Node Register handles transistioning to ready and creating evals
This commit is contained in:
Alex Dadgar 2016-07-25 12:46:35 -07:00 committed by GitHub
commit 42df093939
3 changed files with 121 additions and 6 deletions

View file

@ -1100,7 +1100,9 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
// watchNodeUpdates periodically checks for changes to the node attributes or meta map
func (c *Client) watchNodeUpdates() {
c.logger.Printf("[DEBUG] client: periodically checking for node changes at duration %v", nodeUpdateRetryIntv)
var attrHash, metaHash uint64
// Initialize the hashes
_, attrHash, metaHash := c.hasNodeChanged(0, 0)
var changed bool
for {
select {

View file

@ -74,6 +74,16 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
return fmt.Errorf("failed to computed node class: %v", err)
}
// Look for the node so we can detect a state transistion
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
originalNode, err := snap.NodeByID(args.Node.ID)
if err != nil {
return err
}
// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.NodeRegisterRequestType, args)
if err != nil {
@ -83,7 +93,12 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
reply.NodeModifyIndex = index
// Check if we should trigger evaluations
if structs.ShouldDrainNode(args.Node.Status) {
originalStatus := structs.NodeStatusInit
if originalNode != nil {
originalStatus = originalNode.Status
}
transitionToReady := transitionedToReady(args.Node.Status, originalStatus)
if structs.ShouldDrainNode(args.Node.Status) || transitionToReady {
evalIDs, evalIndex, err := n.createNodeEvals(args.Node.ID, index)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err)
@ -105,7 +120,7 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
// Set the reply index
reply.Index = index
snap, err := n.srv.fsm.State().Snapshot()
snap, err = n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
@ -236,9 +251,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
}
// Check if we should trigger evaluations
initToReady := node.Status == structs.NodeStatusInit && args.Status == structs.NodeStatusReady
terminalToReady := node.Status == structs.NodeStatusDown && args.Status == structs.NodeStatusReady
transitionToReady := initToReady || terminalToReady
transitionToReady := transitionedToReady(args.Status, node.Status)
if structs.ShouldDrainNode(args.Status) || transitionToReady {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
if err != nil {
@ -271,6 +284,14 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
return nil
}
// transitionedToReady is a helper that takes a nodes new and old status and
// returns whether it has transistioned to ready.
func transitionedToReady(newStatus, oldStatus string) bool {
initToReady := oldStatus == structs.NodeStatusInit && newStatus == structs.NodeStatusReady
terminalToReady := oldStatus == structs.NodeStatusDown && newStatus == structs.NodeStatusReady
return initToReady || terminalToReady
}
// UpdateDrain is used to update the drain mode of a client node
func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
reply *structs.NodeDrainUpdateResponse) error {

View file

@ -153,6 +153,98 @@ func TestClientEndpoint_UpdateStatus(t *testing.T) {
}
}
func TestClientEndpoint_Register_GetEvals(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Register a system job.
job := mock.SystemJob()
state := s1.fsm.State()
if err := state.UpsertJob(1, job); err != nil {
t.Fatalf("err: %v", err)
}
// Create the register request going directly to ready
node := mock.Node()
node.Status = structs.NodeStatusReady
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.NodeUpdateResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Check for heartbeat interval
ttl := resp.HeartbeatTTL
if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
t.Fatalf("bad: %#v", ttl)
}
// Check for an eval caused by the system job.
if len(resp.EvalIDs) != 1 {
t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
}
evalID := resp.EvalIDs[0]
eval, err := state.EvalByID(evalID)
if err != nil {
t.Fatalf("could not get eval %v", evalID)
}
if eval.Type != "system" {
t.Fatalf("unexpected eval type; got %v; want %q", eval.Type, "system")
}
// Check for the node in the FSM
out, err := state.NodeByID(node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected node")
}
if out.ModifyIndex != resp.Index {
t.Fatalf("index mis-match")
}
// Transistion it to down and then ready
node.Status = structs.NodeStatusDown
reg = &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if len(resp.EvalIDs) != 1 {
t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
}
node.Status = structs.NodeStatusReady
reg = &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if len(resp.EvalIDs) != 1 {
t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
}
}
func TestClientEndpoint_UpdateStatus_GetEvals(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()