reimplement success state for tr hooks and state persistence

splits apart local and remote persistence

removes some locking *for now*
This commit is contained in:
Michael Schurter 2018-07-10 21:22:04 -07:00
parent 4f43ff5c51
commit ae89b7da95
6 changed files with 174 additions and 54 deletions

View file

@ -43,8 +43,8 @@ type TaskPrerunResponse struct {
// run // run
HookData map[string]string HookData map[string]string
// DoOnce lets the hook indicate that it should only be run once // Done lets the hook indicate that it should only be run once
DoOnce bool Done bool
} }
type TaskPrerunHook interface { type TaskPrerunHook interface {

View file

@ -3,7 +3,6 @@ package state
import ( import (
"sync" "sync"
"github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/state"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
) )
@ -18,9 +17,6 @@ type State struct {
// allocations client state // allocations client state
ClientDesc string ClientDesc string
// TaskStates captures the state of individual tasks.
TaskStates map[string]*state.State
// DeploymentStatus captures the status of the deployment // DeploymentStatus captures the status of the deployment
DeploymentStatus *structs.AllocDeploymentStatus DeploymentStatus *structs.AllocDeploymentStatus
} }

View file

@ -1,28 +1,36 @@
package state package state
import ( import (
"sync" "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
) )
type State struct { // LocalState is Task state which is persisted for use when restarting Nomad
sync.RWMutex // agents.
Task *structs.TaskState type LocalState struct {
Hooks map[string]*HookState Hooks map[string]*HookState
// VaultToken is the current Vault token for the task // VaultToken is the current Vault token for the task
VaultToken string VaultToken string
// DriverNetwork is the network information returned by the task
// driver's Start method
DriverNetwork *structs.DriverNetwork
}
func NewLocalState() *LocalState {
return &LocalState{
Hooks: make(map[string]*HookState),
}
} }
// Copy should be called with the lock held // Copy should be called with the lock held
func (s *State) Copy() *State { func (s *LocalState) Copy() *LocalState {
// Create a copy // Create a copy
c := &State{ c := &LocalState{
Task: s.Task.Copy(), Hooks: make(map[string]*HookState, len(s.Hooks)),
Hooks: make(map[string]*HookState, len(s.Hooks)), VaultToken: s.VaultToken,
VaultToken: s.VaultToken, DriverNetwork: s.DriverNetwork,
} }
// Copy the hooks // Copy the hooks
@ -34,7 +42,10 @@ func (s *State) Copy() *State {
} }
type HookState struct { type HookState struct {
Data map[string]string // PrerunDone is true if the hook has run Prerun successfully and does
// not need to run again
PrerunDone bool
Data map[string]string
} }
func (h *HookState) Copy() *HookState { func (h *HookState) Copy() *HookState {
@ -43,3 +54,15 @@ func (h *HookState) Copy() *HookState {
c.Data = helper.CopyMapStringString(c.Data) c.Data = helper.CopyMapStringString(c.Data)
return c return c
} }
func (h *HookState) Equal(o *HookState) bool {
if h == nil || o == nil {
return h == o
}
if h.PrerunDone != o.PrerunDone {
return false
}
return helper.CompareMapStringString(h.Data, o.Data)
}

View file

@ -1,12 +1,15 @@
package taskrunner package taskrunner
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io"
"sync" "sync"
"time" "time"
metrics "github.com/armon/go-metrics" metrics "github.com/armon/go-metrics"
"github.com/boltdb/bolt"
log "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
@ -15,7 +18,16 @@ import (
"github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/driver/env" "github.com/hashicorp/nomad/client/driver/env"
oldstate "github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
"golang.org/x/crypto/blake2b"
)
var (
// taskRunnerStateAllKey holds all the task runners state. At the moment
// there is no need to split it
taskRunnerStateAllKey = []byte("simple-all")
) )
type TaskRunner struct { type TaskRunner struct {
@ -29,8 +41,19 @@ type TaskRunner struct {
clientConfig *config.Config clientConfig *config.Config
// state captures the state of the task runner // state captures the state of the task for updating the allocation
state *state.State state *structs.TaskState
// localState captures the node-local state of the task for when the
// Nomad agent restarts
localState *state.LocalState
// stateDB is for persisting localState
stateDB *bolt.DB
// persistedHash is the hash of the last persisted state for skipping
// unnecessary writes
persistedHash []byte
// ctx is the task runner's context and is done whe the task runner // ctx is the task runner's context and is done whe the task runner
// should exit. Shutdown hooks are run. // should exit. Shutdown hooks are run.
@ -59,7 +82,7 @@ type TaskRunner struct {
// and Run loops, so there's never concurrent access. // and Run loops, so there's never concurrent access.
//handleLock sync.Mutex //handleLock sync.Mutex
// task is the task beign run // task is the task being run
task *structs.Task task *structs.Task
taskLock sync.RWMutex taskLock sync.RWMutex
@ -88,8 +111,11 @@ type Config struct {
TaskDir *allocdir.TaskDir TaskDir *allocdir.TaskDir
Logger log.Logger Logger log.Logger
// State is optionally restored task state // LocalState is optionally restored task state
State *state.State LocalState *state.LocalState
// StateDB is used to store and restore state.
StateDB *bolt.DB
} }
func NewTaskRunner(config *Config) (*TaskRunner, error) { func NewTaskRunner(config *Config) (*TaskRunner, error) {
@ -112,11 +138,14 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
taskDir: config.TaskDir, taskDir: config.TaskDir,
taskName: config.Task.Name, taskName: config.Task.Name,
envBuilder: envBuilder, envBuilder: envBuilder,
state: config.State, //XXX Make a Copy to avoid races?
ctx: trCtx, state: config.Alloc.TaskStates[config.Task.Name],
ctxCancel: trCancel, localState: config.LocalState,
updateCh: make(chan *structs.Allocation), stateDB: config.StateDB,
waitCh: make(chan struct{}), ctx: trCtx,
ctxCancel: trCancel,
updateCh: make(chan *structs.Allocation),
waitCh: make(chan struct{}),
} }
// Create the logger based on the allocation ID // Create the logger based on the allocation ID
@ -150,13 +179,13 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
func (tr *TaskRunner) initState() { func (tr *TaskRunner) initState() {
if tr.state == nil { if tr.state == nil {
tr.state = &state.State{ tr.state = &structs.TaskState{
Task: &structs.TaskState{ State: structs.TaskStatePending,
State: structs.TaskStatePending,
},
Hooks: make(map[string]*state.HookState),
} }
} }
if tr.localState == nil {
tr.localState = state.NewLocalState()
}
} }
func (tr *TaskRunner) initLabels() { func (tr *TaskRunner) initLabels() {
@ -329,15 +358,60 @@ func (tr *TaskRunner) initDriver() error {
return nil return nil
} }
// SetState sets the task runners state. // persistLocalState persists local state to disk synchronously.
func (tr *TaskRunner) persistLocalState() error {
// buffer for writing to boltdb
var buf bytes.Buffer
// Hash for skipping unnecessary writes
h, err := blake2b.New256(nil)
if err != nil {
// Programming error that should never happen!
return err
}
// Multiplex writes to both
w := io.MultiWriter(h, &buf)
// Encode as msgpack value
if err := codec.NewEncoder(w, structs.MsgpackHandle).Encode(&tr.localState); err != nil {
return fmt.Errorf("failed to serialize snapshot: %v", err)
}
// If the hashes are equal, skip the write
hashVal := h.Sum(nil)
if bytes.Equal(hashVal, tr.persistedHash) {
return nil
}
// Start the transaction.
return tr.stateDB.Batch(func(tx *bolt.Tx) error {
// Grab the task bucket
//XXX move into new state pkg
taskBkt, err := oldstate.GetTaskBucket(tx, tr.allocID, tr.taskName)
if err != nil {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
if err := oldstate.PutData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil {
return fmt.Errorf("failed to write task_runner state: %v", err)
}
// Store the hash that was persisted
tx.OnCommit(func() {
tr.persistedHash = hashVal
})
return nil
})
}
// SetState sets the task runners allocation state.
func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) { func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) {
// Ensure the event is populated with human readable strings // Ensure the event is populated with human readable strings
event.PopulateEventDisplayMessage() event.PopulateEventDisplayMessage()
// Lock our state task := tr.state
tr.state.Lock()
defer tr.state.Unlock()
task := tr.state.Task
// Update the state of the task // Update the state of the task
if state != "" { if state != "" {

View file

@ -61,16 +61,13 @@ func (tr *TaskRunner) prerun() error {
TaskEnv: tr.envBuilder.Build(), TaskEnv: tr.envBuilder.Build(),
} }
tr.state.RLock() origHookState := tr.localState.Hooks[name]
hookState := tr.state.Hooks[name] if origHookState != nil && origHookState.PrerunDone {
if hookState != nil {
// Hook already ran, skip // Hook already ran, skip
tr.state.RUnlock()
continue continue
} }
req.VaultToken = tr.state.VaultToken req.VaultToken = tr.localState.VaultToken
tr.state.RUnlock()
// Time the prerun hook // Time the prerun hook
var start time.Time var start time.Time
@ -87,25 +84,23 @@ func (tr *TaskRunner) prerun() error {
// Store the hook state // Store the hook state
{ {
tr.state.Lock() hookState, ok := tr.localState.Hooks[name]
hookState, ok := tr.state.Hooks[name]
if !ok { if !ok {
hookState = &state.HookState{} hookState = &state.HookState{}
tr.state.Hooks[name] = hookState tr.localState.Hooks[name] = hookState
} }
if resp.HookData != nil { if resp.HookData != nil {
hookState.Data = resp.HookData hookState.Data = resp.HookData
hookState.PrerunDone = resp.Done
} }
// XXX Detect if state has changed so that we can signal to the // Persist local state if the hook state has changed
// alloc runner precisly if !hookState.Equal(origHookState) {
/* if err := tr.persistLocalState(); err != nil {
if err := tr.allocRunner.StateUpdated(tr.state.Copy()); err != nil { return err
tr.logger.Error("failed to save state", "error", err)
} }
*/ }
tr.state.Unlock()
} }
// Store the environment variables returned by the hook // Store the environment variables returned by the hook

View file

@ -172,6 +172,38 @@ func SliceSetDisjoint(first, second []string) (bool, []string) {
return false, flattened return false, flattened
} }
// CompareMapStringString returns true if the maps are equivalent. A nil and
// empty map are considered not equal.
func CompareMapStringString(a, b map[string]string) bool {
if a == nil || b == nil {
return a == nil && b == nil
}
if len(a) != len(b) {
return false
}
for k, v := range a {
v2, ok := b[k]
if !ok {
return false
}
if v != v2 {
return false
}
}
// Already compared all known values in a so only test that keys from b
// exist in a
for k := range b {
if _, ok := a[k]; !ok {
return false
}
}
return true
}
// Helpers for copying generic structures. // Helpers for copying generic structures.
func CopyMapStringString(m map[string]string) map[string]string { func CopyMapStringString(m map[string]string) map[string]string {
l := len(m) l := len(m)