safe but slow

This commit is contained in:
Alex Dadgar 2016-02-10 13:44:53 -08:00
parent 071216a730
commit 0c4c3fc4ee
8 changed files with 129 additions and 55 deletions

View File

@ -31,16 +31,15 @@ type AllocRunner struct {
logger *log.Logger
consulService *ConsulService
alloc *structs.Allocation
allocLock sync.Mutex
// Explicit status of allocation. Set when there are failures
allocClientStatus string
alloc *structs.Allocation
allocClientStatus string // Explicit status of allocation. Set when there are failures
allocClientDescription string
allocLock sync.Mutex
dirtyCh chan struct{}
ctx *driver.ExecContext
ctxLock sync.Mutex
tasks map[string]*TaskRunner
taskStates map[string]*structs.TaskState
restored map[string]struct{}
@ -76,7 +75,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStat
consulService: consulService,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: alloc.TaskStates,
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
@ -112,7 +111,7 @@ func (r *AllocRunner) RestoreState() error {
r.restored[name] = struct{}{}
task := &structs.Task{Name: name}
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(),
task, r.consulService)
r.tasks[name] = tr
@ -153,16 +152,27 @@ func (r *AllocRunner) SaveState() error {
}
func (r *AllocRunner) saveAllocRunnerState() error {
// Create the snapshot.
r.taskStatusLock.RLock()
defer r.taskStatusLock.RUnlock()
states := copyTaskStates(r.taskStates)
r.taskStatusLock.RUnlock()
alloc := r.Alloc()
r.allocLock.Lock()
defer r.allocLock.Unlock()
allocClientStatus := r.allocClientStatus
allocClientDescription := r.allocClientDescription
r.allocLock.Unlock()
r.ctxLock.Lock()
ctx := r.ctx
r.ctxLock.Unlock()
snap := allocRunnerState{
Alloc: r.alloc,
Context: r.ctx,
AllocClientStatus: r.allocClientStatus,
AllocClientDescription: r.allocClientDescription,
TaskStates: r.taskStates,
Alloc: alloc,
Context: ctx,
AllocClientStatus: allocClientStatus,
AllocClientDescription: allocClientDescription,
TaskStates: states,
}
return persistState(r.stateFilePath(), &snap)
}
@ -186,16 +196,33 @@ func (r *AllocRunner) DestroyContext() error {
return r.ctx.AllocDir.Destroy()
}
// copyTaskStates returns a copy of the passed task states.
func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState {
copy := make(map[string]*structs.TaskState, len(states))
for task, state := range states {
copy[task] = state.Copy()
}
return copy
}
// Alloc returns the associated allocation
func (r *AllocRunner) Alloc() *structs.Allocation {
r.allocLock.Lock()
alloc := r.alloc.Copy()
// The status has explicitely been set.
if r.allocClientStatus != "" || r.allocClientDescription != "" {
alloc.ClientStatus = r.allocClientStatus
alloc.ClientDescription = r.allocClientDescription
r.allocLock.Unlock()
return alloc
}
r.allocLock.Unlock()
// Scan the task states to determine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
alloc.TaskStates = r.taskStates
alloc.TaskStates = copyTaskStates(r.taskStates)
for _, state := range r.taskStates {
switch state.State {
case structs.TaskStateRunning:
@ -213,13 +240,6 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
}
r.taskStatusLock.RUnlock()
// The status has explicitely been set.
if r.allocClientStatus != "" || r.allocClientDescription != "" {
alloc.ClientStatus = r.allocClientStatus
alloc.ClientDescription = r.allocClientDescription
return alloc
}
// Determine the alloc status
if failed {
alloc.ClientStatus = structs.AllocClientStatusFailed
@ -276,8 +296,10 @@ func (r *AllocRunner) syncStatus() error {
// setStatus is used to update the allocation status
func (r *AllocRunner) setStatus(status, desc string) {
r.alloc.ClientStatus = status
r.alloc.ClientDescription = desc
r.allocLock.Lock()
r.allocClientStatus = status
r.allocClientDescription = desc
r.allocLock.Unlock()
select {
case r.dirtyCh <- struct{}{}:
default:
@ -336,6 +358,7 @@ func (r *AllocRunner) Run() {
}
// Create the execution context
r.ctxLock.Lock()
if r.ctx == nil {
allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID))
if err := allocDir.Build(tg.Tasks); err != nil {
@ -345,6 +368,7 @@ func (r *AllocRunner) Run() {
}
r.ctx = driver.NewExecContext(allocDir, r.alloc.ID)
}
r.ctxLock.Unlock()
// Check if the allocation is in a terminal status. In this case, we don't
// start any of the task runners and directly wait for the destroy signal to
@ -364,8 +388,8 @@ func (r *AllocRunner) Run() {
continue
}
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
task, r.consulService)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(),
task.Copy(), r.consulService)
r.tasks[task.Name] = tr
go tr.Run()
}

View File

@ -72,8 +72,9 @@ func DefaultConfig() *config.Config {
// are expected to register as a schedulable node to the servers, and to
// run allocations as determined by the servers.
type Client struct {
config *config.Config
start time.Time
config *config.Config
configLock sync.RWMutex
start time.Time
logger *log.Logger
@ -409,7 +410,9 @@ func (c *Client) restoreState() error {
for _, entry := range list {
id := entry.Name()
alloc := &structs.Allocation{ID: id}
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulService)
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.config.Copy(), c.updateAllocStatus, alloc, c.consulService)
c.configLock.RUnlock()
c.allocs[id] = ar
if err := ar.RestoreState(); err != nil {
c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err)
@ -524,7 +527,10 @@ func (c *Client) fingerprint() error {
if err != nil {
return err
}
applies, err := fingerprint.FingerprintLocked(f, c.config, c.config.Node)
c.configLock.Lock()
applies, err := f.Fingerprint(c.config, c.config.Node)
c.configLock.Unlock()
if err != nil {
return err
}
@ -552,9 +558,11 @@ func (c *Client) fingerprintPeriodic(name string, f fingerprint.Fingerprint, d t
for {
select {
case <-time.After(d):
if _, err := fingerprint.FingerprintLocked(f, c.config, c.config.Node); err != nil {
c.configLock.Lock()
if _, err := f.Fingerprint(c.config, c.config.Node); err != nil {
c.logger.Printf("[DEBUG] client: periodic fingerprinting for %v failed: %v", name, err)
}
c.configLock.Unlock()
case <-c.shutdownCh:
return
}
@ -582,7 +590,9 @@ func (c *Client) setupDrivers() error {
if err != nil {
return err
}
applies, err := fingerprint.FingerprintLocked(d, c.config, c.config.Node)
c.configLock.Lock()
applies, err := d.Fingerprint(c.config, c.config.Node)
c.configLock.Unlock()
if err != nil {
return err
}
@ -664,6 +674,8 @@ func (c *Client) run() {
// determine if the node properties have changed. It returns the new hash values
// in case they are different from the old hash values.
func (c *Client) hasNodeChanged(oldAttrHash uint64, oldMetaHash uint64) (bool, uint64, uint64) {
c.configLock.RLock()
defer c.configLock.RUnlock()
newAttrHash, err := hashstructure.Hash(c.config.Node.Attributes, nil)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node attributes hash: %v", err)
@ -919,7 +931,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
c.allocLock.RLock()
exist := make([]*structs.Allocation, 0, len(c.allocs))
for _, ar := range c.allocs {
exist = append(exist, ar.Alloc())
exist = append(exist, ar.alloc)
}
c.allocLock.RUnlock()
@ -988,7 +1000,9 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
func (c *Client) addAlloc(alloc *structs.Allocation) error {
c.allocLock.Lock()
defer c.allocLock.Unlock()
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulService)
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.config.Copy(), c.updateAllocStatus, alloc, c.consulService)
c.configLock.RUnlock()
c.allocs[alloc.ID] = ar
go ar.Run()
return nil

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/copystructure"
)
// RPCHandler can be provided to the Client if there is a local server
@ -72,6 +73,19 @@ type Config struct {
Options map[string]string
}
func (c *Config) Copy() *Config {
log := c.LogOutput
c.LogOutput = nil
i, err := copystructure.Copy(c)
c.LogOutput = log
if err != nil {
return nil
}
copy := i.(*Config)
copy.LogOutput = log
return copy
}
// Read returns the specified configuration value or "".
func (c *Config) Read(id string) string {
val, ok := c.Options[id]

View File

@ -3,7 +3,6 @@ package fingerprint
import (
"fmt"
"log"
"sync"
"time"
"github.com/hashicorp/nomad/client/config"
@ -28,12 +27,6 @@ var BuiltinFingerprints = []string{
"storage",
}
var (
// NodeLock ensures that only a single fingerprinter is running at a time
// when using the FingerprintLocked method.
NodeLock sync.Mutex
)
// builtinFingerprintMap contains the built in registered fingerprints
// which are available, corresponding to a key found in BuiltinFingerprints
var builtinFingerprintMap = map[string]Factory{
@ -88,10 +81,3 @@ type StaticFingerprinter struct{}
func (s *StaticFingerprinter) Periodic() (bool, time.Duration) {
return false, EmptyDuration
}
// FingerprintLocked is used to fingerprint in a thread-safe manner.
func FingerprintLocked(f Fingerprint, config *config.Config, node *structs.Node) (bool, error) {
NodeLock.Lock()
defer NodeLock.Unlock()
return f.Fingerprint(config, node)
}

View File

@ -29,9 +29,10 @@ type TaskRunner struct {
restartTracker *RestartTracker
consulService *ConsulService
task *structs.Task
updateCh chan *structs.Allocation
handle driver.DriverHandle
task *structs.Task
updateCh chan *structs.Allocation
handle driver.DriverHandle
handleLock sync.Mutex
destroy bool
destroyCh chan struct{}
@ -127,7 +128,9 @@ func (r *TaskRunner) RestoreState() error {
r.task.Name, r.alloc.ID, err)
return nil
}
r.handleLock.Lock()
r.handle = handle
r.handleLock.Unlock()
}
return nil
}
@ -139,9 +142,11 @@ func (r *TaskRunner) SaveState() error {
snap := taskRunnerState{
Task: r.task,
}
r.handleLock.Lock()
if r.handle != nil {
snap.HandleID = r.handle.ID()
}
r.handleLock.Unlock()
return persistState(r.stateFilePath(), &snap)
}
@ -163,7 +168,10 @@ func (r *TaskRunner) setState(state string, event *structs.TaskEvent) {
// createDriver makes a driver for the task
func (r *TaskRunner) createDriver() (driver.Driver, error) {
taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, r.config.Node, r.task)
// Create a copy of the node.
// TODO REMOVE
node := r.config.Node.Copy()
taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, node, r.task)
if err != nil {
err = fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
r.task.Driver, r.alloc.ID, err)
@ -203,7 +211,9 @@ func (r *TaskRunner) startTask() error {
r.setState(structs.TaskStateDead, e)
return err
}
r.handleLock.Lock()
r.handle = handle
r.handleLock.Unlock()
r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
return nil
}
@ -222,7 +232,10 @@ func (r *TaskRunner) run() {
var forceStart bool
for {
// Start the task if not yet started or it is being forced.
if r.handle == nil || forceStart {
r.handleLock.Lock()
handleEmpty := r.handle == nil
r.handleLock.Unlock()
if handleEmpty || forceStart {
forceStart = false
if err := r.startTask(); err != nil {
return
@ -339,11 +352,13 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
// Update will update resources and store the new kill timeout.
var mErr multierror.Error
r.handleLock.Lock()
if r.handle != nil {
if err := r.handle.Update(updatedTask); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err))
}
}
r.handleLock.Unlock()
// Update the restart policy.
if r.restartTracker != nil {

View File

@ -1,9 +1,11 @@
# Increase log verbosity
log_level = "DEBUG"
log_level = "INFO"
# Setup data dir
data_dir = "/tmp/client1"
enable_debug = true
# Enable the client
client {
enabled = true
@ -13,6 +15,9 @@ client {
# like Consul used for service discovery.
servers = ["127.0.0.1:4647"]
node_class = "foo"
options {
"driver.raw_exec.enable" = "1"
}
}
# Modify our port to avoid a collision with server1

View File

@ -1,5 +1,5 @@
# Increase log verbosity
log_level = "DEBUG"
log_level = "INFO"
# Setup data dir
data_dir = "/tmp/server1"

View File

@ -542,6 +542,14 @@ type Node struct {
ModifyIndex uint64
}
func (n *Node) Copy() *Node {
i, err := copystructure.Copy(n)
if err != nil {
return nil
}
return i.(*Node)
}
// TerminalStatus returns if the current status is terminal and
// will no longer transition.
func (n *Node) TerminalStatus() bool {
@ -1478,6 +1486,14 @@ type Task struct {
KillTimeout time.Duration `mapstructure:"kill_timeout"`
}
func (t *Task) Copy() *Task {
i, err := copystructure.Copy(t)
if err != nil {
return nil
}
return i.(*Task)
}
// InitFields initializes fields in the task.
func (t *Task) InitFields(job *Job, tg *TaskGroup) {
t.InitServiceFields(job.Name, tg.Name)