package nomad import ( "context" "fmt" "strings" "sync" "time" "golang.org/x/sync/errgroup" "github.com/armon/go-metrics" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/watch" "github.com/hashicorp/raft" vapi "github.com/hashicorp/vault/api" ) const ( // batchUpdateInterval is how long we wait to batch updates batchUpdateInterval = 50 * time.Millisecond // maxParallelRequestsPerDerive is the maximum number of parallel Vault // create token requests that may be outstanding per derive request maxParallelRequestsPerDerive = 16 ) // Node endpoint is used for client interactions type Node struct { srv *Server // updates holds pending client status updates for allocations updates []*structs.Allocation // updateFuture is used to wait for the pending batch update // to complete. This may be nil if no batch is pending. updateFuture *batchFuture // updateTimer is the timer that will trigger the next batch // update, and may be nil if there is no batch pending. updateTimer *time.Timer // updatesLock synchronizes access to the updates list, // the future and the timer. updatesLock sync.Mutex } // Register is used to upsert a client that is available for scheduling func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error { if done, err := n.srv.forward("Node.Register", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "register"}, time.Now()) // Validate the arguments if args.Node == nil { return fmt.Errorf("missing node for client registration") } if args.Node.ID == "" { return fmt.Errorf("missing node ID for client registration") } if args.Node.Datacenter == "" { return fmt.Errorf("missing datacenter for client registration") } if args.Node.Name == "" { return fmt.Errorf("missing node name for client registration") } if len(args.Node.Attributes) == 0 { return fmt.Errorf("missing attributes for client registration") } // COMPAT: Remove after 0.6 // Need to check if this node is <0.4.x since SecretID is new in 0.5 pre, err := nodePreSecretID(args.Node) if err != nil { return err } if args.Node.SecretID == "" && !pre { return fmt.Errorf("missing node secret ID for client registration") } // Default the status if none is given if args.Node.Status == "" { args.Node.Status = structs.NodeStatusInit } if !structs.ValidNodeStatus(args.Node.Status) { return fmt.Errorf("invalid status for node") } // Set the timestamp when the node is registered args.Node.StatusUpdatedAt = time.Now().Unix() // Compute the node class if err := args.Node.ComputeClass(); err != nil { return fmt.Errorf("failed to computed node class: %v", err) } // Look for the node so we can detect a state transistion snap, err := n.srv.fsm.State().Snapshot() if err != nil { return err } originalNode, err := snap.NodeByID(args.Node.ID) if err != nil { return err } // Check if the SecretID has been tampered with if !pre && originalNode != nil { if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" { return fmt.Errorf("node secret ID does not match. Not registering node.") } } // Commit this update via Raft _, index, err := n.srv.raftApply(structs.NodeRegisterRequestType, args) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: Register failed: %v", err) return err } reply.NodeModifyIndex = index // Check if we should trigger evaluations originalStatus := structs.NodeStatusInit if originalNode != nil { originalStatus = originalNode.Status } transitionToReady := transitionedToReady(args.Node.Status, originalStatus) if structs.ShouldDrainNode(args.Node.Status) || transitionToReady { evalIDs, evalIndex, err := n.createNodeEvals(args.Node.ID, index) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) return err } reply.EvalIDs = evalIDs reply.EvalCreateIndex = evalIndex } // Check if we need to setup a heartbeat if !args.Node.TerminalStatus() { ttl, err := n.srv.resetHeartbeatTimer(args.Node.ID) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: heartbeat reset failed: %v", err) return err } reply.HeartbeatTTL = ttl } // Set the reply index reply.Index = index snap, err = n.srv.fsm.State().Snapshot() if err != nil { return err } n.srv.peerLock.RLock() defer n.srv.peerLock.RUnlock() if err := n.constructNodeServerInfoResponse(snap, reply); err != nil { n.srv.logger.Printf("[ERR] nomad.client: failed to populate NodeUpdateResponse: %v", err) return err } return nil } // nodePreSecretID is a helper that returns whether the node is on a version // that is before SecretIDs were introduced func nodePreSecretID(node *structs.Node) (bool, error) { a := node.Attributes if a == nil { return false, fmt.Errorf("node doesn't have attributes set") } v, ok := a["nomad.version"] if !ok { return false, fmt.Errorf("missing Nomad version in attributes") } return !strings.HasPrefix(v, "0.5"), nil } // updateNodeUpdateResponse assumes the n.srv.peerLock is held for reading. func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply *structs.NodeUpdateResponse) error { reply.LeaderRPCAddr = n.srv.raft.Leader() // Reply with config information required for future RPC requests reply.Servers = make([]*structs.NodeServerInfo, 0, len(n.srv.localPeers)) for k, v := range n.srv.localPeers { reply.Servers = append(reply.Servers, &structs.NodeServerInfo{ RPCAdvertiseAddr: k, RPCMajorVersion: int32(v.MajorVersion), RPCMinorVersion: int32(v.MinorVersion), Datacenter: v.Datacenter, }) } // TODO(sean@): Use an indexed node count instead // // Snapshot is used only to iterate over all nodes to create a node // count to send back to Nomad Clients in their heartbeat so Clients // can estimate the size of the cluster. iter, err := snap.Nodes() if err == nil { for { raw := iter.Next() if raw == nil { break } reply.NumNodes++ } } return nil } // Deregister is used to remove a client from the cluster. If a client should // just be made unavailable for scheduling, a status update is preferred. func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.NodeUpdateResponse) error { if done, err := n.srv.forward("Node.Deregister", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now()) // Verify the arguments if args.NodeID == "" { return fmt.Errorf("missing node ID for client deregistration") } // Commit this update via Raft _, index, err := n.srv.raftApply(structs.NodeDeregisterRequestType, args) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: Deregister failed: %v", err) return err } // Clear the heartbeat timer if any n.srv.clearHeartbeatTimer(args.NodeID) // Create the evaluations for this node evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) return err } // Determine if there are any Vault accessors on the node accessors, err := n.srv.State().VaultAccessorsByNode(args.NodeID) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: looking up accessors for node %q failed: %v", args.NodeID, err) return err } if len(accessors) != 0 { if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil { n.srv.logger.Printf("[ERR] nomad.client: revoking accessors for node %q failed: %v", args.NodeID, err) return err } } // Setup the reply reply.EvalIDs = evalIDs reply.EvalCreateIndex = evalIndex reply.NodeModifyIndex = index reply.Index = index return nil } // UpdateStatus is used to update the status of a client node func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.NodeUpdateResponse) error { if done, err := n.srv.forward("Node.UpdateStatus", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "update_status"}, time.Now()) // Verify the arguments if args.NodeID == "" { return fmt.Errorf("missing node ID for client status update") } if !structs.ValidNodeStatus(args.Status) { return fmt.Errorf("invalid status for node") } // Look for the node snap, err := n.srv.fsm.State().Snapshot() if err != nil { return err } node, err := snap.NodeByID(args.NodeID) if err != nil { return err } if node == nil { return fmt.Errorf("node not found") } // XXX: Could use the SecretID here but have to update the heartbeat system // to track SecretIDs. // Update the timestamp of when the node status was updated node.StatusUpdatedAt = time.Now().Unix() // Commit this update via Raft var index uint64 if node.Status != args.Status { _, index, err = n.srv.raftApply(structs.NodeUpdateStatusRequestType, args) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: status update failed: %v", err) return err } reply.NodeModifyIndex = index } // Check if we should trigger evaluations transitionToReady := transitionedToReady(args.Status, node.Status) if structs.ShouldDrainNode(args.Status) || transitionToReady { evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) return err } reply.EvalIDs = evalIDs reply.EvalCreateIndex = evalIndex } // Check if we need to setup a heartbeat switch args.Status { case structs.NodeStatusDown: // Determine if there are any Vault accessors on the node accessors, err := n.srv.State().VaultAccessorsByNode(args.NodeID) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: looking up accessors for node %q failed: %v", args.NodeID, err) return err } if len(accessors) != 0 { if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil { n.srv.logger.Printf("[ERR] nomad.client: revoking accessors for node %q failed: %v", args.NodeID, err) return err } } default: ttl, err := n.srv.resetHeartbeatTimer(args.NodeID) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: heartbeat reset failed: %v", err) return err } reply.HeartbeatTTL = ttl } // Set the reply index and leader reply.Index = index n.srv.peerLock.RLock() defer n.srv.peerLock.RUnlock() if err := n.constructNodeServerInfoResponse(snap, reply); err != nil { n.srv.logger.Printf("[ERR] nomad.client: failed to populate NodeUpdateResponse: %v", err) return err } return nil } // transitionedToReady is a helper that takes a nodes new and old status and // returns whether it has transistioned to ready. func transitionedToReady(newStatus, oldStatus string) bool { initToReady := oldStatus == structs.NodeStatusInit && newStatus == structs.NodeStatusReady terminalToReady := oldStatus == structs.NodeStatusDown && newStatus == structs.NodeStatusReady return initToReady || terminalToReady } // UpdateDrain is used to update the drain mode of a client node func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, reply *structs.NodeDrainUpdateResponse) error { if done, err := n.srv.forward("Node.UpdateDrain", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "update_drain"}, time.Now()) // Verify the arguments if args.NodeID == "" { return fmt.Errorf("missing node ID for drain update") } // Look for the node snap, err := n.srv.fsm.State().Snapshot() if err != nil { return err } node, err := snap.NodeByID(args.NodeID) if err != nil { return err } if node == nil { return fmt.Errorf("node not found") } // Update the timestamp to node.StatusUpdatedAt = time.Now().Unix() // Commit this update via Raft var index uint64 if node.Drain != args.Drain { _, index, err = n.srv.raftApply(structs.NodeUpdateDrainRequestType, args) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: drain update failed: %v", err) return err } reply.NodeModifyIndex = index } // Always attempt to create Node evaluations because there may be a System // job registered that should be evaluated. evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) return err } reply.EvalIDs = evalIDs reply.EvalCreateIndex = evalIndex // Set the reply index reply.Index = index return nil } // Evaluate is used to force a re-evaluation of the node func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUpdateResponse) error { if done, err := n.srv.forward("Node.Evaluate", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "evaluate"}, time.Now()) // Verify the arguments if args.NodeID == "" { return fmt.Errorf("missing node ID for evaluation") } // Look for the node snap, err := n.srv.fsm.State().Snapshot() if err != nil { return err } node, err := snap.NodeByID(args.NodeID) if err != nil { return err } if node == nil { return fmt.Errorf("node not found") } // Create the evaluation evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, node.ModifyIndex) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) return err } reply.EvalIDs = evalIDs reply.EvalCreateIndex = evalIndex // Set the reply index reply.Index = evalIndex n.srv.peerLock.RLock() defer n.srv.peerLock.RUnlock() if err := n.constructNodeServerInfoResponse(snap, reply); err != nil { n.srv.logger.Printf("[ERR] nomad.client: failed to populate NodeUpdateResponse: %v", err) return err } return nil } // GetNode is used to request information about a specific node func (n *Node) GetNode(args *structs.NodeSpecificRequest, reply *structs.SingleNodeResponse) error { if done, err := n.srv.forward("Node.GetNode", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "get_node"}, time.Now()) // Setup the blocking query opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, watch: watch.NewItems(watch.Item{Node: args.NodeID}), run: func() error { // Verify the arguments if args.NodeID == "" { return fmt.Errorf("missing node ID") } // Look for the node snap, err := n.srv.fsm.State().Snapshot() if err != nil { return err } out, err := snap.NodeByID(args.NodeID) if err != nil { return err } // Setup the output if out != nil { // Clear the secret ID reply.Node = out.Copy() reply.Node.SecretID = "" reply.Index = out.ModifyIndex } else { // Use the last index that affected the nodes table index, err := snap.Index("nodes") if err != nil { return err } reply.Node = nil reply.Index = index } // Set the query response n.srv.setQueryMeta(&reply.QueryMeta) return nil }} return n.srv.blockingRPC(&opts) } // GetAllocs is used to request allocations for a specific node func (n *Node) GetAllocs(args *structs.NodeSpecificRequest, reply *structs.NodeAllocsResponse) error { if done, err := n.srv.forward("Node.GetAllocs", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "get_allocs"}, time.Now()) // Verify the arguments if args.NodeID == "" { return fmt.Errorf("missing node ID") } // Setup the blocking query opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, watch: watch.NewItems(watch.Item{AllocNode: args.NodeID}), run: func() error { // Look for the node snap, err := n.srv.fsm.State().Snapshot() if err != nil { return err } allocs, err := snap.AllocsByNode(args.NodeID) if err != nil { return err } // Setup the output if len(allocs) != 0 { reply.Allocs = allocs for _, alloc := range allocs { reply.Index = maxUint64(reply.Index, alloc.ModifyIndex) } } else { reply.Allocs = nil // Use the last index that affected the nodes table index, err := snap.Index("allocs") if err != nil { return err } // Must provide non-zero index to prevent blocking // Index 1 is impossible anyways (due to Raft internals) if index == 0 { reply.Index = 1 } else { reply.Index = index } } return nil }} return n.srv.blockingRPC(&opts) } // GetClientAllocs is used to request a lightweight list of alloc modify indexes // per allocation. func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, reply *structs.NodeClientAllocsResponse) error { if done, err := n.srv.forward("Node.GetClientAllocs", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "get_client_allocs"}, time.Now()) // Verify the arguments if args.NodeID == "" { return fmt.Errorf("missing node ID") } // Setup the blocking query opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, watch: watch.NewItems(watch.Item{AllocNode: args.NodeID}), run: func() error { // Look for the node snap, err := n.srv.fsm.State().Snapshot() if err != nil { return err } // Look for the node node, err := snap.NodeByID(args.NodeID) if err != nil { return err } var allocs []*structs.Allocation if node != nil { // COMPAT: Remove in 0.6 // Check if the node should have a SecretID set if args.SecretID == "" { if pre, err := nodePreSecretID(node); err != nil { return err } else if !pre { return fmt.Errorf("missing node secret ID for client status update") } } else if args.SecretID != node.SecretID { return fmt.Errorf("node secret ID does not match") } var err error allocs, err = snap.AllocsByNode(args.NodeID) if err != nil { return err } } reply.Allocs = make(map[string]uint64) // Setup the output if len(allocs) != 0 { for _, alloc := range allocs { reply.Allocs[alloc.ID] = alloc.AllocModifyIndex reply.Index = maxUint64(reply.Index, alloc.ModifyIndex) } } else { // Use the last index that affected the nodes table index, err := snap.Index("allocs") if err != nil { return err } // Must provide non-zero index to prevent blocking // Index 1 is impossible anyways (due to Raft internals) if index == 0 { reply.Index = 1 } else { reply.Index = index } } return nil }} return n.srv.blockingRPC(&opts) } // UpdateAlloc is used to update the client status of an allocation func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.GenericResponse) error { if done, err := n.srv.forward("Node.UpdateAlloc", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "update_alloc"}, time.Now()) // Ensure at least a single alloc if len(args.Alloc) == 0 { return fmt.Errorf("must update at least one allocation") } // Add this to the batch n.updatesLock.Lock() n.updates = append(n.updates, args.Alloc...) // Start a new batch if none future := n.updateFuture if future == nil { future = NewBatchFuture() n.updateFuture = future n.updateTimer = time.AfterFunc(batchUpdateInterval, func() { // Get the pending updates n.updatesLock.Lock() updates := n.updates future := n.updateFuture n.updates = nil n.updateFuture = nil n.updateTimer = nil n.updatesLock.Unlock() // Perform the batch update n.batchUpdate(future, updates) }) } n.updatesLock.Unlock() // Wait for the future if err := future.Wait(); err != nil { return err } // Setup the response reply.Index = future.Index() return nil } // batchUpdate is used to update all the allocations func (n *Node) batchUpdate(future *batchFuture, updates []*structs.Allocation) { // Prepare the batch update batch := &structs.AllocUpdateRequest{ Alloc: updates, WriteRequest: structs.WriteRequest{Region: n.srv.config.Region}, } // Commit this update via Raft var mErr multierror.Error _, index, err := n.srv.raftApply(structs.AllocClientUpdateRequestType, batch) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: alloc update failed: %v", err) mErr.Errors = append(mErr.Errors, err) } // For each allocation we are updating check if we should revoke any // Vault Accessors var revoke []*structs.VaultAccessor for _, alloc := range updates { // Skip any allocation that isn't dead on the client if !alloc.Terminated() { continue } // Determine if there are any Vault accessors for the allocation accessors, err := n.srv.State().VaultAccessorsByAlloc(alloc.ID) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: looking up accessors for alloc %q failed: %v", alloc.ID, err) mErr.Errors = append(mErr.Errors, err) } revoke = append(revoke, accessors...) } if len(revoke) != 0 { if err := n.srv.vault.RevokeTokens(context.Background(), revoke, true); err != nil { n.srv.logger.Printf("[ERR] nomad.client: batched accessor revocation failed: %v", err) mErr.Errors = append(mErr.Errors, err) } } // Respond to the future future.Respond(index, mErr.ErrorOrNil()) } // List is used to list the available nodes func (n *Node) List(args *structs.NodeListRequest, reply *structs.NodeListResponse) error { if done, err := n.srv.forward("Node.List", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "list"}, time.Now()) // Setup the blocking query opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, watch: watch.NewItems(watch.Item{Table: "nodes"}), run: func() error { // Capture all the nodes snap, err := n.srv.fsm.State().Snapshot() if err != nil { return err } var iter memdb.ResultIterator if prefix := args.QueryOptions.Prefix; prefix != "" { iter, err = snap.NodesByIDPrefix(prefix) } else { iter, err = snap.Nodes() } if err != nil { return err } var nodes []*structs.NodeListStub for { raw := iter.Next() if raw == nil { break } node := raw.(*structs.Node) nodes = append(nodes, node.Stub()) } reply.Nodes = nodes // Use the last index that affected the jobs table index, err := snap.Index("nodes") if err != nil { return err } reply.Index = index // Set the query response n.srv.setQueryMeta(&reply.QueryMeta) return nil }} return n.srv.blockingRPC(&opts) } // createNodeEvals is used to create evaluations for each alloc on a node. // Each Eval is scoped to a job, so we need to potentially trigger many evals. func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint64, error) { // Snapshot the state snap, err := n.srv.fsm.State().Snapshot() if err != nil { return nil, 0, fmt.Errorf("failed to snapshot state: %v", err) } // Find all the allocations for this node allocs, err := snap.AllocsByNode(nodeID) if err != nil { return nil, 0, fmt.Errorf("failed to find allocs for '%s': %v", nodeID, err) } sysJobsIter, err := snap.JobsByScheduler("system") if err != nil { return nil, 0, fmt.Errorf("failed to find system jobs for '%s': %v", nodeID, err) } var sysJobs []*structs.Job for job := sysJobsIter.Next(); job != nil; job = sysJobsIter.Next() { sysJobs = append(sysJobs, job.(*structs.Job)) } // Fast-path if nothing to do if len(allocs) == 0 && len(sysJobs) == 0 { return nil, 0, nil } // Create an eval for each JobID affected var evals []*structs.Evaluation var evalIDs []string jobIDs := make(map[string]struct{}) for _, alloc := range allocs { // Deduplicate on JobID if _, ok := jobIDs[alloc.JobID]; ok { continue } jobIDs[alloc.JobID] = struct{}{} // Create a new eval eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: alloc.Job.Priority, Type: alloc.Job.Type, TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: alloc.JobID, NodeID: nodeID, NodeModifyIndex: nodeIndex, Status: structs.EvalStatusPending, } evals = append(evals, eval) evalIDs = append(evalIDs, eval.ID) } // Create an evaluation for each system job. for _, job := range sysJobs { // Still dedup on JobID as the node may already have the system job. if _, ok := jobIDs[job.ID]; ok { continue } jobIDs[job.ID] = struct{}{} // Create a new eval eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, Type: job.Type, TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: nodeID, NodeModifyIndex: nodeIndex, Status: structs.EvalStatusPending, } evals = append(evals, eval) evalIDs = append(evalIDs, eval.ID) } // Create the Raft transaction update := &structs.EvalUpdateRequest{ Evals: evals, WriteRequest: structs.WriteRequest{Region: n.srv.config.Region}, } // Commit this evaluation via Raft // XXX: There is a risk of partial failure where the node update succeeds // but that the EvalUpdate does not. _, evalIndex, err := n.srv.raftApply(structs.EvalUpdateRequestType, update) if err != nil { return nil, 0, err } return evalIDs, evalIndex, nil } // batchFuture is used to wait on a batch update to complete type batchFuture struct { doneCh chan struct{} err error index uint64 } // NewBatchFuture creates a new batch future func NewBatchFuture() *batchFuture { return &batchFuture{ doneCh: make(chan struct{}), } } // Wait is used to block for the future to complete and returns the error func (b *batchFuture) Wait() error { <-b.doneCh return b.err } // Index is used to return the index of the batch, only after Wait() func (b *batchFuture) Index() uint64 { return b.index } // Respond is used to unblock the future func (b *batchFuture) Respond(index uint64, err error) { b.index = index b.err = err close(b.doneCh) } // DeriveVaultToken is used by the clients to request wrapped Vault tokens for // tasks func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, reply *structs.DeriveVaultTokenResponse) error { // setErr is a helper for setting the recoverable error on the reply and // logging it setErr := func(e error, recoverable bool) { reply.Error = structs.NewRecoverableError(e, recoverable) n.srv.logger.Printf("[ERR] nomad.client: DeriveVaultToken failed (recoverable %v): %v", recoverable, e) } if done, err := n.srv.forward("Node.DeriveVaultToken", args, args, reply); done { setErr(err, err == structs.ErrNoLeader) return nil } defer metrics.MeasureSince([]string{"nomad", "client", "derive_vault_token"}, time.Now()) // Verify the arguments if args.NodeID == "" { setErr(fmt.Errorf("missing node ID"), false) return nil } if args.SecretID == "" { setErr(fmt.Errorf("missing node SecretID"), false) return nil } if args.AllocID == "" { setErr(fmt.Errorf("missing allocation ID"), false) return nil } if len(args.Tasks) == 0 { setErr(fmt.Errorf("no tasks specified"), false) return nil } // Verify the following: // * The Node exists and has the correct SecretID // * The Allocation exists on the specified node // * The allocation contains the given tasks and they each require Vault // tokens snap, err := n.srv.fsm.State().Snapshot() if err != nil { setErr(err, false) return nil } node, err := snap.NodeByID(args.NodeID) if err != nil { setErr(err, false) return nil } if node == nil { setErr(fmt.Errorf("Node %q does not exist", args.NodeID), false) return nil } if node.SecretID != args.SecretID { setErr(fmt.Errorf("SecretID mismatch"), false) return nil } alloc, err := snap.AllocByID(args.AllocID) if err != nil { setErr(err, false) return nil } if alloc == nil { setErr(fmt.Errorf("Allocation %q does not exist", args.AllocID), false) return nil } if alloc.NodeID != args.NodeID { setErr(fmt.Errorf("Allocation %q not running on Node %q", args.AllocID, args.NodeID), false) return nil } if alloc.TerminalStatus() { setErr(fmt.Errorf("Can't request Vault token for terminal allocation"), false) return nil } // Check the policies policies := alloc.Job.VaultPolicies() if policies == nil { setErr(fmt.Errorf("Job doesn't require Vault policies"), false) return nil } tg, ok := policies[alloc.TaskGroup] if !ok { setErr(fmt.Errorf("Task group does not require Vault policies"), false) return nil } var unneeded []string for _, task := range args.Tasks { taskVault := tg[task] if taskVault == nil || len(taskVault.Policies) == 0 { unneeded = append(unneeded, task) } } if len(unneeded) != 0 { e := fmt.Errorf("Requested Vault tokens for tasks without defined Vault policies: %s", strings.Join(unneeded, ", ")) setErr(e, false) return nil } // At this point the request is valid and we should contact Vault for // tokens. // Create an error group where we will spin up a fixed set of goroutines to // handle deriving tokens but where if any fails the whole group is // canceled. g, ctx := errgroup.WithContext(context.Background()) // Cap the handlers handlers := len(args.Tasks) if handlers > maxParallelRequestsPerDerive { handlers = maxParallelRequestsPerDerive } // Create the Vault Tokens input := make(chan string, handlers) results := make(map[string]*vapi.Secret, len(args.Tasks)) for i := 0; i < handlers; i++ { g.Go(func() error { for { select { case task, ok := <-input: if !ok { return nil } secret, err := n.srv.vault.CreateToken(ctx, alloc, task) if err != nil { wrapped := fmt.Errorf("failed to create token for task %q: %v", task, err) if rerr, ok := err.(*structs.RecoverableError); ok && rerr.Recoverable { // If the error is recoverable, propogate it return structs.NewRecoverableError(wrapped, true) } return wrapped } results[task] = secret case <-ctx.Done(): return nil } } }) } // Send the input go func() { defer close(input) for _, task := range args.Tasks { select { case <-ctx.Done(): return case input <- task: } } }() // Wait for everything to complete or for an error createErr := g.Wait() // Retrieve the results accessors := make([]*structs.VaultAccessor, 0, len(results)) tokens := make(map[string]string, len(results)) for task, secret := range results { w := secret.WrapInfo if w == nil { return fmt.Errorf("Vault returned Secret without WrapInfo") } tokens[task] = w.Token accessor := &structs.VaultAccessor{ Accessor: w.WrappedAccessor, Task: task, NodeID: alloc.NodeID, AllocID: alloc.ID, CreationTTL: w.TTL, } accessors = append(accessors, accessor) } // If there was an error revoke the created tokens if createErr != nil { n.srv.logger.Printf("[ERR] nomad.node: Vault token creation failed: %v", createErr) if revokeErr := n.srv.vault.RevokeTokens(context.Background(), accessors, false); revokeErr != nil { n.srv.logger.Printf("[ERR] nomad.node: Vault token revocation failed: %v", revokeErr) } if rerr, ok := createErr.(*structs.RecoverableError); ok { reply.Error = rerr } else { reply.Error = structs.NewRecoverableError(createErr, false) } return nil } // Commit to Raft before returning any of the tokens req := structs.VaultAccessorsRequest{Accessors: accessors} _, index, err := n.srv.raftApply(structs.VaultAccessorRegisterRequestType, &req) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: Register Vault accessors failed: %v", err) // Determine if we can recover from the error retry := false switch err { case raft.ErrNotLeader, raft.ErrLeadershipLost, raft.ErrRaftShutdown, raft.ErrEnqueueTimeout: retry = true } setErr(err, retry) return nil } reply.Index = index reply.Tasks = tokens n.srv.setQueryMeta(&reply.QueryMeta) return nil }