package nomad import ( "fmt" "time" "github.com/armon/go-metrics" "github.com/hashicorp/nomad/nomad/structs" ) // ClientEndpoint endpoint is used for client interactions type ClientEndpoint struct { srv *Server } // Register is used to upsert a client that is available for scheduling func (c *ClientEndpoint) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error { if done, err := c.srv.forward("Client.Register", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "register"}, time.Now()) // Validate the arguments if args.Node == nil { return fmt.Errorf("missing node for client registration") } if args.Node.ID == "" { return fmt.Errorf("missing node ID for client registration") } if args.Node.Datacenter == "" { return fmt.Errorf("missing datacenter for client registration") } if args.Node.Name == "" { return fmt.Errorf("missing node name for client registration") } // Default the status if none is given if args.Node.Status == "" { args.Node.Status = structs.NodeStatusInit } if !structs.ValidNodeStatus(args.Node.Status) { return fmt.Errorf("invalid status for node") } // Commit this update via Raft _, index, err := c.srv.raftApply(structs.NodeRegisterRequestType, args) if err != nil { c.srv.logger.Printf("[ERR] nomad.client: Register failed: %v", err) return err } reply.NodeModifyIndex = index // Check if we should trigger evaluations if structs.ShouldDrainNode(args.Node.Status) { evalIDs, evalIndex, err := c.createNodeEvals(args.Node.ID, index) if err != nil { c.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) return err } reply.EvalIDs = evalIDs reply.EvalCreateIndex = evalIndex } // Set the reply index reply.Index = index return nil } // Deregister is used to remove a client from the client. If a client should // just be made unavailable for scheduling, a status update is prefered. func (c *ClientEndpoint) Deregister(args *structs.NodeDeregisterRequest, reply *structs.NodeUpdateResponse) error { if done, err := c.srv.forward("Client.Deregister", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now()) // Verify the arguments if args.NodeID == "" { return fmt.Errorf("missing node ID for client deregistration") } // Commit this update via Raft _, index, err := c.srv.raftApply(structs.NodeDeregisterRequestType, args) if err != nil { c.srv.logger.Printf("[ERR] nomad.client: Deregister failed: %v", err) return err } // Create the evaluations for this node evalIDs, evalIndex, err := c.createNodeEvals(args.NodeID, index) if err != nil { c.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) return err } // Setup the reply reply.EvalIDs = evalIDs reply.EvalCreateIndex = evalIndex reply.NodeModifyIndex = index reply.Index = index return nil } // UpdateStatus is used to update the status of a client node func (c *ClientEndpoint) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.NodeUpdateResponse) error { if done, err := c.srv.forward("Client.UpdateStatus", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "update_status"}, time.Now()) // Verify the arguments if args.NodeID == "" { return fmt.Errorf("missing node ID for client deregistration") } if !structs.ValidNodeStatus(args.Status) { return fmt.Errorf("invalid status for node") } // Commit this update via Raft _, index, err := c.srv.raftApply(structs.NodeUpdateStatusRequestType, args) if err != nil { c.srv.logger.Printf("[ERR] nomad.client: status update failed: %v", err) return err } reply.NodeModifyIndex = index // Check if we should trigger evaluations if structs.ShouldDrainNode(args.Status) { evalIDs, evalIndex, err := c.createNodeEvals(args.NodeID, index) if err != nil { c.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) return err } reply.EvalIDs = evalIDs reply.EvalCreateIndex = evalIndex } // Set the reply index reply.Index = index return nil } // Evaluate is used to force a re-evaluation of the node func (c *ClientEndpoint) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUpdateResponse) error { if done, err := c.srv.forward("Client.Evaluate", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "evaluate"}, time.Now()) // Verify the arguments if args.NodeID == "" { return fmt.Errorf("missing node ID for evaluation") } // Look for the node snap, err := c.srv.fsm.State().Snapshot() if err != nil { return err } node, err := snap.GetNodeByID(args.NodeID) if err != nil { return err } if node == nil { return fmt.Errorf("node not found") } // Create the evaluation evalIDs, evalIndex, err := c.createNodeEvals(args.NodeID, node.ModifyIndex) if err != nil { c.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) return err } reply.EvalIDs = evalIDs reply.EvalCreateIndex = evalIndex // Set the reply index reply.Index = evalIndex return nil } // GetNode is used to request information about a specific ndoe func (c *ClientEndpoint) GetNode(args *structs.NodeSpecificRequest, reply *structs.SingleNodeResponse) error { if done, err := c.srv.forward("Client.GetNode", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "get_node"}, time.Now()) // Verify the arguments if args.NodeID == "" { return fmt.Errorf("missing node ID") } // Look for the node snap, err := c.srv.fsm.State().Snapshot() if err != nil { return err } out, err := snap.GetNodeByID(args.NodeID) if err != nil { return err } // Setup the output if out != nil { reply.Node = out reply.Index = out.ModifyIndex } else { // Use the last index that affected the nodes table index, err := snap.GetIndex("nodes") if err != nil { return err } reply.Index = index } // Set the query response c.srv.setQueryMeta(&reply.QueryMeta) return nil } // createNodeEvals is used to create evaluations for each alloc on a node. // Each Eval is scoped to a job, so we need to potentially trigger many evals. func (c *ClientEndpoint) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint64, error) { // Snapshot the state snap, err := c.srv.fsm.State().Snapshot() if err != nil { return nil, 0, fmt.Errorf("failed to snapshot state: %v", err) } // Find all the allocations for this node allocs, err := snap.AllocsByNode(nodeID) if err != nil { return nil, 0, fmt.Errorf("failed to find allocs for '%s': %v", nodeID, err) } // Fast-path if nothing to do if len(allocs) == 0 { return nil, 0, nil } // Create an eval for each JobID affected var evals []*structs.Evaluation var evalIDs []string jobIDs := make(map[string]struct{}) for _, alloc := range allocs { // Deduplicate on JobID if _, ok := jobIDs[alloc.JobID]; ok { continue } jobIDs[alloc.JobID] = struct{}{} // Create a new eval eval := &structs.Evaluation{ ID: generateUUID(), Priority: alloc.Job.Priority, Type: alloc.Job.Type, TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: alloc.JobID, NodeID: nodeID, NodeModifyIndex: nodeIndex, Status: structs.EvalStatusPending, } evals = append(evals, eval) evalIDs = append(evalIDs, eval.ID) } // Create the Raft transaction update := &structs.EvalUpdateRequest{ Evals: evals, WriteRequest: structs.WriteRequest{Region: c.srv.config.Region}, } // Commit this evaluation via Raft // XXX: There is a risk of partial failure where the node update succeeds // but that the EvalUpdate does not. _, evalIndex, err := c.srv.raftApply(structs.EvalUpdateRequestType, update) if err != nil { return nil, 0, err } return evalIDs, evalIndex, nil }