diff --git a/client/task_runner.go b/client/task_runner.go index 62c5fbd51..f2b98edc1 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -423,6 +423,8 @@ func (r *TaskRunner) validateTask() error { return mErr.ErrorOrNil() } +// tokenFuture stores the Vault token and allows consumers to block till a valid +// token exists type tokenFuture struct { waiting []chan struct{} token string @@ -430,10 +432,13 @@ type tokenFuture struct { m sync.Mutex } +// NewTokenFuture returns a new token future without any token set func NewTokenFuture() *tokenFuture { return &tokenFuture{} } +// Wait returns a channel that can be waited on. When this channel unblocks, a +// valid token will be available via the Get method func (f *tokenFuture) Wait() <-chan struct{} { f.m.Lock() defer f.m.Unlock() @@ -448,6 +453,7 @@ func (f *tokenFuture) Wait() <-chan struct{} { return c } +// Set sets the token value and unblocks any caller of Wait func (f *tokenFuture) Set(token string) *tokenFuture { f.m.Lock() defer f.m.Unlock() @@ -461,6 +467,7 @@ func (f *tokenFuture) Set(token string) *tokenFuture { return f } +// Clear clears the set vault token. func (f *tokenFuture) Clear() *tokenFuture { f.m.Lock() defer f.m.Unlock() @@ -470,12 +477,17 @@ func (f *tokenFuture) Clear() *tokenFuture { return f } +// Get returns the set Vault token func (f *tokenFuture) Get() string { f.m.Lock() defer f.m.Unlock() return f.token } +// vaultManager should be called in a go-routine and manages the derivation, +// renewal and handling of errors with the Vault token. The optional parameter +// allows setting the initial Vault token. This is useful when the Vault token +// is recovered off disk. func (r *TaskRunner) vaultManager(token string) { // updatedToken lets us store state between loops. If true, a new token // has been retrieved and we need to apply the Vault change mode @@ -574,6 +586,9 @@ OUTER: } } +// deriveVaultToken derives the Vault token using exponential backoffs. It +// returns the Vault token and whether the token is valid. If it is not valid we +// are shutting down func (r *TaskRunner) deriveVaultToken() (string, bool) { attempts := 0 for { @@ -599,6 +614,7 @@ func (r *TaskRunner) deriveVaultToken() (string, bool) { } } +// writeToken writes the given token to disk func (r *TaskRunner) writeToken(token string) error { // Write the token to disk secretDir, err := r.ctx.AllocDir.GetSecretDir(r.task.Name) @@ -615,6 +631,8 @@ func (r *TaskRunner) writeToken(token string) error { return nil } +// updatedTokenHandler is called when a new Vault token is retrieved. Things +// that rely on the token should be updated here. func (r *TaskRunner) updatedTokenHandler() { // Update the tasks environment diff --git a/client/vaultclient/vaultclient_testing.go b/client/vaultclient/vaultclient_testing.go index fc935b184..ba2feb08c 100644 --- a/client/vaultclient/vaultclient_testing.go +++ b/client/vaultclient/vaultclient_testing.go @@ -22,6 +22,9 @@ type MockVaultClient struct { // token is derived DeriveTokenErrors map[string]map[string]error + // DeriveTokenFn allows the caller to control the DeriveToken function. If + // not set an error is returned if found in DeriveTokenErrors and otherwise + // a token is generated and returned DeriveTokenFn func(a *structs.Allocation, tasks []string) (map[string]string, error) } diff --git a/jobspec/parse.go b/jobspec/parse.go index 269fafe37..637050959 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -1189,7 +1189,6 @@ func parseVault(result *structs.Vault, list *ast.ObjectList) error { return err } - // TODO use default if err := mapstructure.WeakDecode(m, result); err != nil { return err }