Implement alloc updates in arv2

Updates are applied asynchronously but sequentially
This commit is contained in:
Michael Schurter 2018-08-01 11:03:52 -07:00
parent 39b3f3a85b
commit a5d3e3fb0a
6 changed files with 114 additions and 38 deletions

View File

@ -24,6 +24,14 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// updateChCap is the capacity of AllocRunner's updateCh. It must be 1
// as we only want to process the latest update, so if there's already
// a pending update it will be removed from the chan before adding the
// newer update.
updateChCap = 1
)
// allocRunner is used to run all the tasks in a given allocation
type allocRunner struct {
// id is the ID of the allocation. Can be accessed without a lock
@ -70,7 +78,9 @@ type allocRunner struct {
tasks map[string]*taskrunner.TaskRunner
tasksLock sync.RWMutex
// updateCh receives allocation updates via the Update method
// updateCh receives allocation updates via the Update method. Must
// have buffer size 1 in order to support dropping pending updates when
// a newer allocation is received.
updateCh chan *structs.Allocation
}
@ -90,7 +100,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
vaultClient: config.Vault,
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
waitCh: make(chan struct{}),
updateCh: make(chan *structs.Allocation),
updateCh: make(chan *structs.Allocation, updateChCap),
state: &state.State{},
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
@ -169,9 +179,12 @@ MAIN:
// TaskRunners have all exited
break MAIN
case updated := <-ar.updateCh:
// Updated alloc received
//XXX Update hooks
//XXX Update ar.alloc
// Update ar.alloc
ar.setAlloc(updated)
//TODO Run AR Update hooks
// Update task runners
for _, tr := range ar.tasks {
tr.Update(updated)
}
@ -206,13 +219,18 @@ func (ar *allocRunner) runImpl() <-chan struct{} {
}
// Alloc returns the current allocation being run by this runner.
//XXX how do we handle mutate the state saving stuff
func (ar *allocRunner) Alloc() *structs.Allocation {
ar.allocLock.RLock()
defer ar.allocLock.RUnlock()
return ar.alloc
}
func (ar *allocRunner) setAlloc(updated *structs.Allocation) {
ar.allocLock.Lock()
ar.alloc = updated
ar.allocLock.Unlock()
}
// SaveState does all the state related stuff. Who knows. FIXME
//XXX do we need to do periodic syncing? if Saving is only called *before* Run
// *and* within Run -- *and* Updates are applid within Run -- we may be able to
@ -389,10 +407,19 @@ func getClientStatus(taskStates map[string]*structs.TaskState) (status, descript
// Update the running allocation with a new version received from the server.
//
// This method is safe for calling concurrently with Run() and does not modify
// the passed in allocation.
// This method sends the updated alloc to Run for serially processing updates.
// If there is already a pending update it will be discarded and replaced by
// the latest update.
func (ar *allocRunner) Update(update *structs.Allocation) {
ar.updateCh <- update
select {
case ar.updateCh <- update:
// Updated alloc sent
case <-ar.updateCh:
// There was a pending update; replace it with the new update.
// This also prevents Update() from blocking if its called
// concurrently with Run() exiting.
ar.updateCh <- update
}
}
// Destroy the alloc runner by stopping it if it is still running and cleaning

View File

@ -38,6 +38,12 @@ const (
// killFailureLimit is how many times we will attempt to kill a task before
// giving up and potentially leaking resources.
killFailureLimit = 5
// triggerUpdatechCap is the capacity for the triggerUpdateCh used for
// triggering updates. It should be exactly 1 as even if multiple
// updates have come in since the last one was handled, we only need to
// handle the last one.
triggerUpdateChCap = 1
)
var (
@ -91,8 +97,10 @@ type TaskRunner struct {
// Logger is the logger for the task runner.
logger log.Logger
// updateCh receives Alloc updates
updateCh chan *structs.Allocation
// triggerUpdateCh is ticked whenever update hooks need to be run and
// must be created with cap=1 to signal a pending update and prevent
// callers from deadlocking if the receiver has exited.
triggerUpdateCh chan struct{}
// waitCh is closed when the task runner has transitioned to a terminal
// state
@ -182,14 +190,14 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
consulClient: config.Consul,
vaultClient: config.VaultClient,
//XXX Make a Copy to avoid races?
state: config.Alloc.TaskStates[config.Task.Name],
localState: config.LocalState,
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
ctx: trCtx,
ctxCancel: trCancel,
updateCh: make(chan *structs.Allocation),
waitCh: make(chan struct{}),
state: config.Alloc.TaskStates[config.Task.Name],
localState: config.LocalState,
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
ctx: trCtx,
ctxCancel: trCancel,
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
waitCh: make(chan struct{}),
}
// Create the logger based on the allocation ID
@ -258,6 +266,11 @@ func (tr *TaskRunner) Run() {
defer close(tr.waitCh)
var handle driver.DriverHandle
// Updates are handled asynchronously with the other hooks but each
// triggered update - whether due to alloc updates or a new vault token
// - should be handled serially.
go tr.handleUpdates()
MAIN:
for tr.ctx.Err() == nil {
// Run the prestart hooks
@ -329,6 +342,20 @@ MAIN:
tr.logger.Debug("task run loop exiting")
}
// handleUpdates runs update hooks when triggerUpdateCh is ticked and exits
// when Run has returned. Should only be run in a goroutine from Run.
func (tr *TaskRunner) handleUpdates() {
for {
select {
case <-tr.triggerUpdateCh:
// Update triggered; run hooks
tr.updateHooks()
case <-tr.waitCh:
return
}
}
}
func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
// Determine if we should restart
state, when := tr.restartTracker.GetState()
@ -665,16 +692,28 @@ func (tr *TaskRunner) WaitCh() <-chan struct{} {
}
// Update the running allocation with a new version received from the server.
// Calls Update hooks asynchronously with Run().
//
// This method is safe for calling concurrently with Run() and does not modify
// the passed in allocation.
func (tr *TaskRunner) Update(update *structs.Allocation) {
// Update tr.alloc
tr.setAlloc(update)
// Trigger update hooks
tr.triggerUpdateHooks()
}
// triggerUpdate if there isn't already an update pending. Should be called
// instead of calling updateHooks directly to serialize runs of update hooks.
// TaskRunner state should be updated prior to triggering update hooks.
//
// Does not block.
func (tr *TaskRunner) triggerUpdateHooks() {
select {
case tr.updateCh <- update:
case <-tr.WaitCh():
//XXX Do we log here like we used to? If we're just
//shutting down it's not an error to drop the update as
//it will be applied on startup
case tr.triggerUpdateCh <- struct{}{}:
default:
// already an update hook pending
}
}

View File

@ -11,6 +11,12 @@ func (tr *TaskRunner) Alloc() *structs.Allocation {
return tr.alloc
}
func (tr *TaskRunner) setAlloc(updated *structs.Allocation) {
tr.allocLock.Lock()
tr.alloc = updated
tr.allocLock.Unlock()
}
func (tr *TaskRunner) Task() *structs.Task {
tr.taskLock.RLock()
defer tr.taskLock.RUnlock()
@ -29,10 +35,18 @@ func (tr *TaskRunner) getVaultToken() string {
return tr.vaultToken
}
// setVaultToken updates the vault token on the task runner as well as in the
// task's environment. These two places must be set atomically to avoid a task
// seeing a different token on the task runner and in its environment.
func (tr *TaskRunner) setVaultToken(token string) {
tr.vaultTokenLock.Lock()
defer tr.vaultTokenLock.Unlock()
// Update the Vault token on the runner
tr.vaultToken = token
// Update the task's environment
tr.envBuilder.SetVaultToken(token, tr.task.Vault.Env)
}
func (tr *TaskRunner) getDriverHandle() driver.DriverHandle {

View File

@ -288,7 +288,9 @@ func (tr *TaskRunner) stop() error {
return merr.ErrorOrNil()
}
// update is used to run the runners update hooks.
// update is used to run the runners update hooks. Should only be called from
// Run(). To trigger an update, update state on the TaskRunner and call
// triggerUpdateHooks.
func (tr *TaskRunner) updateHooks() {
if tr.logger.IsTrace() {
start := time.Now()

View File

@ -38,14 +38,11 @@ type vaultTokenUpdateHandler interface {
}
func (tr *TaskRunner) updatedVaultToken(token string) {
// Update the Vault token on the runner
// Update the task runner and environment
tr.setVaultToken(token)
// Update the tasks environment
tr.envBuilder.SetVaultToken(token, tr.task.Vault.Env)
// Update the hooks with the new Vault token
tr.updateHooks()
// Trigger update hooks with the new Vault token
tr.triggerUpdateHooks()
}
type vaultHookConfig struct {

View File

@ -1845,10 +1845,8 @@ func (c *Client) runAllocs(update *allocUpdates) {
// Update the existing allocations
for _, update := range diff.updated {
if err := c.updateAlloc(update); err != nil {
c.logger.Printf("[ERR] client: failed to update alloc %q: %v",
update.ID, err)
}
c.logger.Printf("[TRACE] client: updating alloc %q to index %d", update.ID, update.AllocModifyIndex)
c.updateAlloc(update)
}
// Make room for new allocations before running
@ -1893,17 +1891,16 @@ func (c *Client) removeAlloc(allocID string) {
}
// updateAlloc is invoked when we should update an allocation
func (c *Client) updateAlloc(update *structs.Allocation) error {
func (c *Client) updateAlloc(update *structs.Allocation) {
c.allocLock.Lock()
defer c.allocLock.Unlock()
ar, ok := c.allocs[update.ID]
if !ok {
c.logger.Printf("[WARN] client: missing context for alloc %q", update.ID)
return nil
return
}
ar.Update(update)
return nil
}
// addAlloc is invoked when we should add an allocation