Comments
This commit is contained in:
parent
4f8bfd7b18
commit
ba0b3963ef
|
@ -423,6 +423,8 @@ func (r *TaskRunner) validateTask() error {
|
||||||
return mErr.ErrorOrNil()
|
return mErr.ErrorOrNil()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// tokenFuture stores the Vault token and allows consumers to block till a valid
|
||||||
|
// token exists
|
||||||
type tokenFuture struct {
|
type tokenFuture struct {
|
||||||
waiting []chan struct{}
|
waiting []chan struct{}
|
||||||
token string
|
token string
|
||||||
|
@ -430,10 +432,13 @@ type tokenFuture struct {
|
||||||
m sync.Mutex
|
m sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewTokenFuture returns a new token future without any token set
|
||||||
func NewTokenFuture() *tokenFuture {
|
func NewTokenFuture() *tokenFuture {
|
||||||
return &tokenFuture{}
|
return &tokenFuture{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait returns a channel that can be waited on. When this channel unblocks, a
|
||||||
|
// valid token will be available via the Get method
|
||||||
func (f *tokenFuture) Wait() <-chan struct{} {
|
func (f *tokenFuture) Wait() <-chan struct{} {
|
||||||
f.m.Lock()
|
f.m.Lock()
|
||||||
defer f.m.Unlock()
|
defer f.m.Unlock()
|
||||||
|
@ -448,6 +453,7 @@ func (f *tokenFuture) Wait() <-chan struct{} {
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set sets the token value and unblocks any caller of Wait
|
||||||
func (f *tokenFuture) Set(token string) *tokenFuture {
|
func (f *tokenFuture) Set(token string) *tokenFuture {
|
||||||
f.m.Lock()
|
f.m.Lock()
|
||||||
defer f.m.Unlock()
|
defer f.m.Unlock()
|
||||||
|
@ -461,6 +467,7 @@ func (f *tokenFuture) Set(token string) *tokenFuture {
|
||||||
return f
|
return f
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clear clears the set vault token.
|
||||||
func (f *tokenFuture) Clear() *tokenFuture {
|
func (f *tokenFuture) Clear() *tokenFuture {
|
||||||
f.m.Lock()
|
f.m.Lock()
|
||||||
defer f.m.Unlock()
|
defer f.m.Unlock()
|
||||||
|
@ -470,12 +477,17 @@ func (f *tokenFuture) Clear() *tokenFuture {
|
||||||
return f
|
return f
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get returns the set Vault token
|
||||||
func (f *tokenFuture) Get() string {
|
func (f *tokenFuture) Get() string {
|
||||||
f.m.Lock()
|
f.m.Lock()
|
||||||
defer f.m.Unlock()
|
defer f.m.Unlock()
|
||||||
return f.token
|
return f.token
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// vaultManager should be called in a go-routine and manages the derivation,
|
||||||
|
// renewal and handling of errors with the Vault token. The optional parameter
|
||||||
|
// allows setting the initial Vault token. This is useful when the Vault token
|
||||||
|
// is recovered off disk.
|
||||||
func (r *TaskRunner) vaultManager(token string) {
|
func (r *TaskRunner) vaultManager(token string) {
|
||||||
// updatedToken lets us store state between loops. If true, a new token
|
// updatedToken lets us store state between loops. If true, a new token
|
||||||
// has been retrieved and we need to apply the Vault change mode
|
// has been retrieved and we need to apply the Vault change mode
|
||||||
|
@ -574,6 +586,9 @@ OUTER:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deriveVaultToken derives the Vault token using exponential backoffs. It
|
||||||
|
// returns the Vault token and whether the token is valid. If it is not valid we
|
||||||
|
// are shutting down
|
||||||
func (r *TaskRunner) deriveVaultToken() (string, bool) {
|
func (r *TaskRunner) deriveVaultToken() (string, bool) {
|
||||||
attempts := 0
|
attempts := 0
|
||||||
for {
|
for {
|
||||||
|
@ -599,6 +614,7 @@ func (r *TaskRunner) deriveVaultToken() (string, bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// writeToken writes the given token to disk
|
||||||
func (r *TaskRunner) writeToken(token string) error {
|
func (r *TaskRunner) writeToken(token string) error {
|
||||||
// Write the token to disk
|
// Write the token to disk
|
||||||
secretDir, err := r.ctx.AllocDir.GetSecretDir(r.task.Name)
|
secretDir, err := r.ctx.AllocDir.GetSecretDir(r.task.Name)
|
||||||
|
@ -615,6 +631,8 @@ func (r *TaskRunner) writeToken(token string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// updatedTokenHandler is called when a new Vault token is retrieved. Things
|
||||||
|
// that rely on the token should be updated here.
|
||||||
func (r *TaskRunner) updatedTokenHandler() {
|
func (r *TaskRunner) updatedTokenHandler() {
|
||||||
|
|
||||||
// Update the tasks environment
|
// Update the tasks environment
|
||||||
|
|
|
@ -22,6 +22,9 @@ type MockVaultClient struct {
|
||||||
// token is derived
|
// token is derived
|
||||||
DeriveTokenErrors map[string]map[string]error
|
DeriveTokenErrors map[string]map[string]error
|
||||||
|
|
||||||
|
// DeriveTokenFn allows the caller to control the DeriveToken function. If
|
||||||
|
// not set an error is returned if found in DeriveTokenErrors and otherwise
|
||||||
|
// a token is generated and returned
|
||||||
DeriveTokenFn func(a *structs.Allocation, tasks []string) (map[string]string, error)
|
DeriveTokenFn func(a *structs.Allocation, tasks []string) (map[string]string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1189,7 +1189,6 @@ func parseVault(result *structs.Vault, list *ast.ObjectList) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO use default
|
|
||||||
if err := mapstructure.WeakDecode(m, result); err != nil {
|
if err := mapstructure.WeakDecode(m, result); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue