diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 4170d4fbe..d4fe1e7db 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1,21 +1,29 @@ package nomad import ( + "context" "fmt" "strings" "sync" "time" + "golang.org/x/sync/errgroup" + "github.com/armon/go-metrics" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/watch" + vapi "github.com/hashicorp/vault/api" ) const ( // batchUpdateInterval is how long we wait to batch updates batchUpdateInterval = 50 * time.Millisecond + + // maxParallelRequestsPerDerive is the maximum number of parallel Vault + // create token requests that may be outstanding per derive request + maxParallelRequestsPerDerive = 16 ) // Node endpoint is used for client interactions @@ -952,7 +960,53 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, strings.Join(unneeded, ", ")) } - // At this point the request is valid and we should contact Vault for tokens + // At this point the request is valid and we should contact Vault for + // tokens. + + // Create an error group where we will spin up a fixed set of goroutines to + // handle deriving tokens but where if any fails the whole group is + // canceled. + g, ctx := errgroup.WithContext(context.Background()) + + // Cap the handlers + handlers := len(args.Tasks) + if handlers > maxParallelRequestsPerDerive { + handlers = maxParallelRequestsPerDerive + } + + // Create the Vault Tokens + input := make(chan string, handlers) + results := make(map[string]*vapi.Secret, len(args.Tasks)) + for i := 0; i < handlers; i++ { + g.Go(func() error { + task, ok := <-input + if !ok { + return nil + } + + secret, err := n.srv.vault.CreateToken(ctx, alloc, task) + if err != nil { + return fmt.Errorf("failed to create token for task %q: %v", task, err) + } + + results[task] = secret + return nil + }) + } + + // Send the input + go func() { + for _, task := range args.Tasks { + select { + case <-ctx.Done(): + return + case input <- task: + } + } + }() + + // Wait for everything to complete or for an error + err = g.Wait() return nil } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 962e59b9b..3071beb79 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -354,6 +354,8 @@ type PeriodicForceRequest struct { WriteRequest } +// DeriveVaultTokenRequest is used to request wrapped Vault tokens for the +// following tasks in the given allocation type DeriveVaultTokenRequest struct { NodeID string SecretID string @@ -362,11 +364,9 @@ type DeriveVaultTokenRequest struct { QueryOptions } +// DeriveVaultTokenResponse returns the wrapped tokens for each requested task type DeriveVaultTokenResponse struct { - NodeID string - SecretID string - AllocID string - Tasks []string + Tasks map[string]string QueryMeta }