Pipeline Vault token creation
This commit is contained in:
parent
123a26ffea
commit
19752edfaf
|
@ -1,21 +1,29 @@
|
|||
package nomad
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/nomad/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/nomad/watch"
|
||||
vapi "github.com/hashicorp/vault/api"
|
||||
)
|
||||
|
||||
const (
|
||||
// batchUpdateInterval is how long we wait to batch updates
|
||||
batchUpdateInterval = 50 * time.Millisecond
|
||||
|
||||
// maxParallelRequestsPerDerive is the maximum number of parallel Vault
|
||||
// create token requests that may be outstanding per derive request
|
||||
maxParallelRequestsPerDerive = 16
|
||||
)
|
||||
|
||||
// Node endpoint is used for client interactions
|
||||
|
@ -952,7 +960,53 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
|
|||
strings.Join(unneeded, ", "))
|
||||
}
|
||||
|
||||
// At this point the request is valid and we should contact Vault for tokens
|
||||
// At this point the request is valid and we should contact Vault for
|
||||
// tokens.
|
||||
|
||||
// Create an error group where we will spin up a fixed set of goroutines to
|
||||
// handle deriving tokens but where if any fails the whole group is
|
||||
// canceled.
|
||||
g, ctx := errgroup.WithContext(context.Background())
|
||||
|
||||
// Cap the handlers
|
||||
handlers := len(args.Tasks)
|
||||
if handlers > maxParallelRequestsPerDerive {
|
||||
handlers = maxParallelRequestsPerDerive
|
||||
}
|
||||
|
||||
// Create the Vault Tokens
|
||||
input := make(chan string, handlers)
|
||||
results := make(map[string]*vapi.Secret, len(args.Tasks))
|
||||
for i := 0; i < handlers; i++ {
|
||||
g.Go(func() error {
|
||||
task, ok := <-input
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
secret, err := n.srv.vault.CreateToken(ctx, alloc, task)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create token for task %q: %v", task, err)
|
||||
}
|
||||
|
||||
results[task] = secret
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Send the input
|
||||
go func() {
|
||||
for _, task := range args.Tasks {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case input <- task:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for everything to complete or for an error
|
||||
err = g.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -354,6 +354,8 @@ type PeriodicForceRequest struct {
|
|||
WriteRequest
|
||||
}
|
||||
|
||||
// DeriveVaultTokenRequest is used to request wrapped Vault tokens for the
|
||||
// following tasks in the given allocation
|
||||
type DeriveVaultTokenRequest struct {
|
||||
NodeID string
|
||||
SecretID string
|
||||
|
@ -362,11 +364,9 @@ type DeriveVaultTokenRequest struct {
|
|||
QueryOptions
|
||||
}
|
||||
|
||||
// DeriveVaultTokenResponse returns the wrapped tokens for each requested task
|
||||
type DeriveVaultTokenResponse struct {
|
||||
NodeID string
|
||||
SecretID string
|
||||
AllocID string
|
||||
Tasks []string
|
||||
Tasks map[string]string
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue