wip split event emitting and state transitions

This commit is contained in:
Michael Schurter 2018-07-16 16:32:37 -07:00
parent 516d641db0
commit 6ebdf532ea

View file

@ -43,12 +43,15 @@ var (
// taskRunnerStateAllKey holds all the task runners state. At the moment
// there is no need to split it
//XXX refactor out of clientstate and new state
taskRunnerStateAllKey = []byte("simple-all")
//XXX Old key - going to need to migrate
//taskRunnerStateAllKey = []byte("simple-all")
taskLocalStateKey = []byte("local_state")
taskStateKey = []byte("task_state")
)
type TaskRunner struct {
// allocID and taskName are immutable so store a copy to access without
// locks
// allocID and taskName are immutable so these fields may be accessed
// without locks
allocID string
taskName string
@ -58,7 +61,8 @@ type TaskRunner struct {
clientConfig *config.Config
// state captures the state of the task for updating the allocation
state *structs.TaskState
state *structs.TaskState
stateLock sync.Mutex
// localState captures the node-local state of the task for when the
// Nomad agent restarts
@ -368,10 +372,11 @@ func (tr *TaskRunner) runDriver() error {
func (tr *TaskRunner) initDriver() error {
// Create a task-specific event emitter callback to expose minimal
// state to drivers
//XXX Replace with EmitEvent -- no need for a shim
eventEmitter := func(m string, args ...interface{}) {
msg := fmt.Sprintf(m, args...)
tr.logger.Debug("driver event", "event", msg)
tr.SetState("", structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg))
tr.EmitEvent(structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg))
}
alloc := tr.Alloc()
@ -455,7 +460,7 @@ func (tr *TaskRunner) persistLocalState() error {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
if err := clientstate.PutData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil {
if err := clientstate.PutData(taskBkt, taskLocalStateKey, buf.Bytes()); err != nil {
return fmt.Errorf("failed to write task_runner state: %v", err)
}
@ -469,60 +474,52 @@ func (tr *TaskRunner) persistLocalState() error {
}
// Restore task runner state. Called by AllocRunner.Restore after NewTaskRunner
// but before Run.
// but before Run so no locks need to be acquired.
func (tr *TaskRunner) Restore(tx *bolt.Tx) error {
bkt, err := clientstate.GetTaskBucket(tx, tr.allocID, tr.taskName)
if err != nil {
return fmt.Errorf("failed to get task %q bucket: %v", tr.taskName, err)
}
// Restore Local State
//XXX set persisted hash to avoid immediate write on first use?
var ls state.LocalState
if err := clientstate.GetObject(bkt, taskRunnerStateAllKey, &ls); err != nil {
return fmt.Errorf("failed to read task runner state: %v", err)
if err := clientstate.GetObject(bkt, taskLocalStateKey, &ls); err != nil {
return fmt.Errorf("failed to read local task runner state: %v", err)
}
tr.localState = &ls
// Restore Task State
var ts structs.TaskState
if err := clientstate.GetObject(bkt, taskStateKey, &ts); err != nil {
return fmt.Errorf("failed to read task state: %v", err)
}
tr.state = &tr
return nil
}
// SetState sets the task runners allocation state.
func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) {
// Ensure the event is populated with human readable strings
event.PopulateEventDisplayMessage()
tr.stateLock.Lock()
defer tr.stateLock.Unlock()
task := tr.state
taskState := tr.state
// Update the state of the task
//XXX REMOVE ME AFTER TESTING
if state != "" {
task.State = state
panic("SetState must not be called with an empty state")
}
// Handle the event
if event == nil {
if event.FailsTask {
task.Failed = true
}
// Append the event
tr.emitEventImpl(tx, event)
if event.Type == structs.TaskRestarting {
if !tr.clientConfig.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "restart"}, 1, tr.baseLabels)
}
//if r.config.BackwardsCompatibleMetrics {
//metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "restart"}, 1)
//}
task.Restarts++
task.LastRestart = time.Unix(0, event.Time)
}
appendTaskEvent(task, event)
}
// Handle the state transistion.
// Handle the state transition.
switch state {
case structs.TaskStateRunning:
// Capture the start time if it is just starting
if task.State != structs.TaskStateRunning {
task.StartedAt = time.Now().UTC()
if taskState.State != structs.TaskStateRunning {
taskState.StartedAt = time.Now().UTC()
if !tr.clientConfig.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "running"}, 1, tr.baseLabels)
}
@ -532,12 +529,12 @@ func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) {
}
case structs.TaskStateDead:
// Capture the finished time if not already set
if task.FinishedAt.IsZero() {
task.FinishedAt = time.Now().UTC()
if taskState.FinishedAt.IsZero() {
taskState.FinishedAt = time.Now().UTC()
}
// Emitting metrics to indicate task complete and failures
if task.Failed {
if taskState.Failed {
if !tr.clientConfig.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "failed"}, 1, tr.baseLabels)
}
@ -554,15 +551,80 @@ func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) {
}
}
// Persist the state and event
err := tr.stateDB.Update(func(tx *bolt.Tx) error {
bkt, err := clientstate.GetTaskBucket(tx, tr.allocID, tr.taskName)
if err != nil {
return err
}
return bkt.PutObject(bkt, taskStateKey, tr.state)
})
if err != nil {
// Only a warning because the next event/state-transition will
// try to persist it again.
tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state)
}
// Create a copy and notify the alloc runner of the transition
//FIXME
//FIXME <-------START HERE
//if err := tr.allocRunner.StateUpdated(tr.state.Copy()); err != nil {
//tr.logger.Error("failed to save state", "error", err)
//}
}
// EmitEvent appends a new TaskEvent to this task's TaskState. The actual
// TaskState.State (pending, running, dead) is *not* updated. Use SetState to
// transition states.
// Events are persisted locally but errors are simply logged.
func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) {
tr.SetState("", event)
tr.stateLock.Lock()
defer tr.stateLock.Unlock()
tr.emitEventImpl(event)
// Events that do *not* change task state can be batched.
//XXX Seems like this clamps the maximum transaction latency to 10ms.
err := tr.stateDB.Batch(func(tx *bolt.Tx) error {
bkt, err := clientstate.GetTaskBucket(tx, tr.allocID, tr.taskName)
if err != nil {
return err
}
return bkt.PutObject(bkt, taskStateKey, tr.state)
})
if err != nil {
// Only a warning because the next event/state-transition will
// try to persist it again.
tr.logger.Warn("error persisting event", "error", err, "event", event)
}
}
// emitEventImpl is the implementation of EmitEvent without the locking so it
// can be used from SetState.
func (tr *TaskRunner) emitEventImpl(event *structs.TaskEvent) error {
// Ensure the event is populated with human readable strings
event.PopulateEventDisplayMessage()
// Propogate failure from event to task state
if event.FailsTask {
tr.state.Failed = true
}
// Update restart metrics
if event.Type == structs.TaskRestarting {
if !tr.clientConfig.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "restart"}, 1, tr.baseLabels)
}
//if r.config.BackwardsCompatibleMetrics {
//metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "restart"}, 1)
//}
tr.state.Restarts++
tr.state.LastRestart = time.Unix(0, event.Time)
}
// Append event to slice
appendTaskEvent(tr.state, event)
}
// WaitCh is closed when TaskRunner.Run exits.
@ -586,9 +648,11 @@ func (tr *TaskRunner) Update(update *structs.Allocation) {
// appendTaskEvent updates the task status by appending the new event.
func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent) {
capacity := 10
const capacity = 10
if state.Events == nil {
state.Events = make([]*structs.TaskEvent, 0, capacity)
state.Events = make([]*structs.TaskEvent, 1, capacity)
state.Events[0] = event
return
}
// If we hit capacity, then shift it.