example redis job "runs" on arv2! see below

Tons left to do and lots of churn:
1. No state saving
2. No shutdown or gc
3. Removed AR factory *for now*
4. Made all "Config" structs local to the package they configure
5. Added allocID to GC to avoid a lookup

Really hating how many things use *structs.Allocation. It's not bad
without state saving, but if AllocRunner starts updating its copy things
get racy fast.
This commit is contained in:
Michael Schurter 2018-06-28 17:01:05 -07:00
parent 9a6aa38b0f
commit 0f7dcfdc9a
10 changed files with 248 additions and 184 deletions

View file

@ -2,14 +2,17 @@ package allocrunnerv2
import (
"fmt"
"path/filepath"
"sync"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunnerv2/config"
"github.com/hashicorp/nomad/client/allocrunner"
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
"github.com/hashicorp/nomad/client/allocrunnerv2/state"
"github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner"
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -18,8 +21,7 @@ type allocRunner struct {
// Logger is the logger for the alloc runner.
logger log.Logger
// Config is the configuration for the alloc runner.
config *config.Config
clientConfig *config.Config
// waitCh is closed when the alloc runner has transitioned to a terminal
// state
@ -27,7 +29,7 @@ type allocRunner struct {
// Alloc captures the allocation being run.
alloc *structs.Allocation
allocLock sync.Mutex
allocLock sync.RWMutex
// state captures the state of the alloc runner
state *state.State
@ -42,26 +44,31 @@ type allocRunner struct {
// tasks are the set of task runners
tasks map[string]*taskrunner.TaskRunner
// updateCh receives allocation updates
// updateCh receives allocation updates via the Update method
updateCh chan *structs.Allocation
}
// NewAllocRunner returns a new allocation runner.
func NewAllocRunner(config *config.Config) (*allocRunner, error) {
func NewAllocRunner(config *Config) *allocRunner {
ar := &allocRunner{
config: config,
alloc: config.Allocation,
alloc: config.Alloc,
clientConfig: config.ClientConfig,
tasks: make(map[string]*taskrunner.TaskRunner),
waitCh: make(chan struct{}),
updateCh: make(chan *structs.Allocation),
}
// Create alloc dir
//XXX update AllocDir to hc log
ar.allocDir = allocdir.NewAllocDir(nil, filepath.Join(config.ClientConfig.AllocDir, config.Alloc.ID))
// Create the logger based on the allocation ID
ar.logger = config.Logger.With("alloc_id", ar.ID())
ar.logger = config.Logger.With("alloc_id", config.Alloc.ID)
// Initialize the runners hooks.
ar.initRunnerHooks()
return ar, nil
return ar
}
func (ar *allocRunner) WaitCh() <-chan struct{} {
@ -115,15 +122,16 @@ POST:
// runImpl is used to run the runners.
func (ar *allocRunner) runImpl() (<-chan struct{}, error) {
// Grab the task group
tg := ar.alloc.Job.LookupTaskGroup(ar.alloc.TaskGroup)
alloc := ar.Alloc()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
// XXX Fail and exit
ar.logger.Error("failed to lookup task group", "task_group", ar.alloc.TaskGroup)
return nil, fmt.Errorf("failed to lookup task group %q", ar.alloc.TaskGroup)
ar.logger.Error("failed to lookup task group", "task_group", alloc.TaskGroup)
return nil, fmt.Errorf("failed to lookup task group %q", alloc.TaskGroup)
}
for _, task := range tg.Tasks {
if err := ar.runTask(task); err != nil {
if err := ar.runTask(alloc, task); err != nil {
return nil, err
}
}
@ -142,11 +150,13 @@ func (ar *allocRunner) runImpl() (<-chan struct{}, error) {
}
// runTask is used to run a task.
func (ar *allocRunner) runTask(task *structs.Task) error {
func (ar *allocRunner) runTask(alloc *structs.Allocation, task *structs.Task) error {
// Create the runner
config := &taskrunner.Config{
Parent: &allocRunnerShim{ar},
Alloc: alloc,
ClientConfig: ar.clientConfig,
Task: task,
TaskDir: ar.allocDir.NewTaskDir(task.Name),
Logger: ar.logger,
}
tr, err := taskrunner.NewTaskRunner(config)
@ -162,6 +172,20 @@ func (ar *allocRunner) runTask(task *structs.Task) error {
return nil
}
// Alloc returns the current allocation being run by this runner.
//XXX how do we handle mutate the state saving stuff
func (ar *allocRunner) Alloc() *structs.Allocation {
ar.allocLock.RLock()
defer ar.allocLock.RUnlock()
return ar.alloc
}
// SaveState does all the state related stuff. Who knows. FIXME
//XXX
func (ar *allocRunner) SaveState() error {
return nil
}
// Update the running allocation with a new version received from the server.
//
// This method is safe for calling concurrently with Run() and does not modify
@ -169,3 +193,56 @@ func (ar *allocRunner) runTask(task *structs.Task) error {
func (ar *allocRunner) Update(update *structs.Allocation) {
ar.updateCh <- update
}
// Destroy the alloc runner by stopping it if it is still running and cleaning
// up all of its resources.
//
// This method is safe for calling concurrently with Run(). Callers must
// receive on WaitCh() to block until alloc runner has stopped and been
// destroyed.
//XXX TODO
func (ar *allocRunner) Destroy() {
//TODO
}
// IsDestroyed returns true if the alloc runner has been destroyed (stopped and
// garbage collected).
//
// This method is safe for calling concurrently with Run(). Callers must
// receive on WaitCh() to block until alloc runner has stopped and been
// destroyed.
//XXX TODO
func (ar *allocRunner) IsDestroyed() bool {
return false
}
// IsWaiting returns true if the alloc runner is waiting for its previous
// allocation to terminate.
//
// This method is safe for calling concurrently with Run().
//XXX TODO
func (ar *allocRunner) IsWaiting() bool {
return false
}
// IsMigrating returns true if the alloc runner is migrating data from its
// previous allocation.
//
// This method is safe for calling concurrently with Run().
//XXX TODO
func (ar *allocRunner) IsMigrating() bool {
return false
}
// StatsReporter needs implementing
//XXX
func (ar *allocRunner) StatsReporter() allocrunner.AllocStatsReporter {
return noopStatsReporter{}
}
//FIXME implement
type noopStatsReporter struct{}
func (noopStatsReporter) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) {
return nil, fmt.Errorf("not implemented")
}

View file

@ -1,15 +0,0 @@
package allocrunnerv2
import "github.com/hashicorp/nomad/nomad/structs"
func (ar *allocRunner) ID() string {
ar.allocLock.Lock()
defer ar.allocLock.Unlock()
return ar.alloc.ID
}
func (ar *allocRunner) Alloc() *structs.Allocation {
ar.allocLock.Lock()
defer ar.allocLock.Unlock()
return ar.alloc
}

View file

@ -1,4 +1,4 @@
package config
package allocrunnerv2
import (
"github.com/boltdb/bolt"
@ -15,8 +15,8 @@ type Config struct {
// ClientConfig is the clients configuration.
ClientConfig *clientconfig.Config
// Allocation captures the allocation that should be run.
Allocation *structs.Allocation
// Alloc captures the allocation that should be run.
Alloc *structs.Allocation
// StateDB is used to store and restore state.
StateDB *bolt.DB

View file

@ -1,10 +0,0 @@
package interfaces
import (
"context"
"github.com/hashicorp/nomad/client/allocrunnerv2/config"
)
// AllocRunnerFactory is the factory method for retrieving an allocation runner.
type AllocRunnerFactory func(ctx context.Context, config *config.Config) (AllocRunner, error)

View file

@ -10,10 +10,10 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
"github.com/hashicorp/nomad/client/allocrunnerv2/config"
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
arstate "github.com/hashicorp/nomad/client/allocrunnerv2/state"
"github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/state"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/driver/env"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
@ -46,9 +46,6 @@ type TaskRunner struct {
// Logger is the logger for the task runner.
logger log.Logger
// allocRunner is the parent allocRunner
allocRunner allocRunner
// waitCh is closed when the task runner has transitioned to a terminal
// state
waitCh chan struct{}
@ -83,8 +80,10 @@ type TaskRunner struct {
}
type Config struct {
Parent allocRunner
Alloc *structs.Allocation
ClientConfig *config.Config
Task *structs.Task
TaskDir *allocdir.TaskDir
Logger log.Logger
// State is optionally restored task state
@ -97,19 +96,18 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
// Initialize the environment builder
envBuilder := env.NewBuilder(
config.Parent.Config().ClientConfig.Node,
config.Parent.Alloc(),
config.ClientConfig.Node,
config.Alloc,
config.Task,
config.Parent.Config().ClientConfig.Region)
config.ClientConfig.Region)
tr := &TaskRunner{
config: config,
allocRunner: config.Parent,
state: config.State,
ctx: trCtx,
ctxCancel: trCancel,
task: config.Task,
taskDir: config.Parent.GetAllocDir().NewTaskDir(config.Task.Name),
taskDir: config.TaskDir,
envBuilder: envBuilder,
waitCh: make(chan struct{}),
}
@ -118,7 +116,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
tr.logger = config.Logger.Named("task_runner").With("task", config.Task.Name)
// Build the restart tracker.
alloc := tr.allocRunner.Alloc()
alloc := config.Alloc
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
tr.logger.Error("alloc missing task group")
@ -156,7 +154,7 @@ func (tr *TaskRunner) initState() {
}
func (tr *TaskRunner) initLabels() {
alloc := tr.allocRunner.Alloc()
alloc := tr.config.Alloc
tr.baseLabels = []metrics.Label{
{
Name: "job",
@ -316,14 +314,14 @@ func (tr *TaskRunner) initDriver() error {
tr.SetState("", structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg))
}
alloc := tr.allocRunner.Alloc()
alloc := tr.config.Alloc
driverCtx := driver.NewDriverContext(
alloc.Job.Name,
alloc.TaskGroup,
tr.Name(),
tr.allocRunner.ID(),
tr.allocRunner.Config().ClientConfig, // XXX Why does it need this
tr.allocRunner.Config().ClientConfig.Node, // XXX THIS I NEED TO FIX
alloc.ID,
tr.config.ClientConfig, // XXX Why does it need this
tr.config.ClientConfig.Node, // XXX THIS I NEED TO FIX
tr.logger.StandardLogger(nil), // XXX Should pass this through
eventEmitter)
@ -358,7 +356,7 @@ func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) {
}
if event.Type == structs.TaskRestarting {
if !tr.allocRunner.Config().ClientConfig.DisableTaggedMetrics {
if !tr.config.ClientConfig.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "restart"}, 1, tr.baseLabels)
}
//if r.config.BackwardsCompatibleMetrics {
@ -376,7 +374,7 @@ func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) {
// Capture the start time if it is just starting
if task.State != structs.TaskStateRunning {
task.StartedAt = time.Now().UTC()
if !tr.allocRunner.Config().ClientConfig.DisableTaggedMetrics {
if !tr.config.ClientConfig.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "running"}, 1, tr.baseLabels)
}
//if r.config.BackwardsCompatibleMetrics {
@ -391,14 +389,14 @@ func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) {
// Emitting metrics to indicate task complete and failures
if task.Failed {
if !tr.allocRunner.Config().ClientConfig.DisableTaggedMetrics {
if !tr.config.ClientConfig.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "failed"}, 1, tr.baseLabels)
}
//if r.config.BackwardsCompatibleMetrics {
//metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "failed"}, 1)
//}
} else {
if !tr.allocRunner.Config().ClientConfig.DisableTaggedMetrics {
if !tr.config.ClientConfig.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "complete"}, 1, tr.baseLabels)
}
//if r.config.BackwardsCompatibleMetrics {
@ -408,9 +406,10 @@ func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) {
}
// Create a copy and notify the alloc runner of the transition
if err := tr.allocRunner.StateUpdated(tr.state.Copy()); err != nil {
tr.logger.Error("failed to save state", "error", err)
}
//FIXME
//if err := tr.allocRunner.StateUpdated(tr.state.Copy()); err != nil {
//tr.logger.Error("failed to save state", "error", err)
//}
}
// appendTaskEvent updates the task status by appending the new event.

View file

@ -30,7 +30,7 @@ func (tr *TaskRunner) initHooks() {
func (tr *TaskRunner) prerun() error {
// Determine if the allocation is terminaland we should avoid running
// pre-run hooks.
alloc := tr.allocRunner.Alloc()
alloc := tr.config.Alloc
if alloc.TerminalStatus() {
tr.logger.Trace("skipping pre-run hooks since allocation is terminal")
return nil
@ -77,7 +77,7 @@ func (tr *TaskRunner) prerun() error {
//XXX Can we assume everything only wants to be run until
//successful and simply keep track of which hooks have yet to
//run on failures+retries?
if hookState.SuccessfulOnce {
if hookState != nil && hookState.SuccessfulOnce {
tr.logger.Trace("skipping hook since it was successfully run once", "name", name)
continue
}
@ -106,9 +106,11 @@ func (tr *TaskRunner) prerun() error {
// XXX Detect if state has changed so that we can signal to the
// alloc runner precisly
/*
if err := tr.allocRunner.StateUpdated(tr.state.Copy()); err != nil {
tr.logger.Error("failed to save state", "error", err)
}
*/
tr.state.Unlock()
}
@ -225,7 +227,7 @@ func (h *taskDirHook) Name() string {
}
func (h *taskDirHook) Prerun(req *interfaces.TaskPrerunRequest, resp *interfaces.TaskPrerunResponse) error {
cc := h.runner.allocRunner.Config().ClientConfig
cc := h.runner.config.ClientConfig
chroot := cconfig.DefaultChrootEnv
if len(cc.ChrootEnv) > 0 {
chroot = cc.ChrootEnv
@ -242,7 +244,7 @@ func (h *taskDirHook) Prerun(req *interfaces.TaskPrerunRequest, resp *interfaces
}
// Update the environment variables based on the built task directory
driver.SetEnvvars(h.runner.envBuilder, fsi, h.runner.taskDir, h.runner.allocRunner.Config().ClientConfig)
driver.SetEnvvars(h.runner.envBuilder, fsi, h.runner.taskDir, h.runner.config.ClientConfig)
return nil
}

View file

@ -2,7 +2,6 @@ package allocrunnerv2
import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunnerv2/config"
"github.com/hashicorp/nomad/client/allocrunnerv2/state"
trstate "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/state"
)
@ -17,10 +16,6 @@ func (a *allocRunnerShim) State() *state.State {
return a.state
}
func (a *allocRunnerShim) Config() *config.Config {
return a.config
}
func (a *allocRunnerShim) GetAllocDir() *allocdir.AllocDir {
return a.allocDir
}

View file

@ -29,6 +29,7 @@ import (
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner"
"github.com/hashicorp/nomad/client/allocrunnerv2"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/servers"
"github.com/hashicorp/nomad/client/state"
@ -99,6 +100,18 @@ type ClientStatsReporter interface {
LatestHostStats() *stats.HostStats
}
type AllocRunner interface {
StatsReporter() allocrunner.AllocStatsReporter
Destroy()
IsDestroyed() bool
IsWaiting() bool
IsMigrating() bool
WaitCh() <-chan struct{}
SaveState() error
Update(*structs.Allocation)
Alloc() *structs.Allocation
}
// Client is used to implement the client interaction with Nomad. Clients
// are expected to register as a schedulable node to the servers, and to
// run allocations as determined by the servers.
@ -150,7 +163,7 @@ type Client struct {
// allocs maps alloc IDs to their AllocRunner. This map includes all
// AllocRunners - running and GC'd - until the server GCs them.
allocs map[string]*allocrunner.AllocRunner
allocs map[string]AllocRunner
allocLock sync.RWMutex
// allocUpdates stores allocations that need to be synced to the server.
@ -225,7 +238,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
tlsWrap: tlsWrap,
streamingRpcs: structs.NewStreamingRpcRegistry(),
logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
allocs: make(map[string]*allocrunner.AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
@ -522,16 +534,12 @@ func (c *Client) Shutdown() error {
// Destroy all the running allocations.
if c.config.DevMode {
var wg sync.WaitGroup
for _, ar := range c.getAllocRunners() {
wg.Add(1)
go func(ar *allocrunner.AllocRunner) {
ar.Destroy()
<-ar.WaitCh()
wg.Done()
}(ar)
}
wg.Wait()
for _, ar := range c.getAllocRunners() {
<-ar.WaitCh()
}
}
c.shutdown = true
@ -614,11 +622,15 @@ func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) {
c.allocLock.RLock()
defer c.allocLock.RUnlock()
ar, ok := c.allocs[allocID]
_, ok := c.allocs[allocID]
if !ok {
return nil, structs.NewErrUnknownAllocation(allocID)
}
return ar.GetAllocDir(), nil
//XXX Experiment in splitting AllocDir and TaskDir structs as the FS
//API only needs the immutable AllocDir struct
//FIXME needs a better logger though -- good reason to still get it from alloc runner?
return allocdir.NewAllocDir(c.logger, filepath.Join(c.config.AllocDir, allocID)), nil
}
// GetClientAlloc returns the allocation from the client
@ -803,7 +815,7 @@ func (c *Client) saveState() error {
wg.Add(len(runners))
for id, ar := range runners {
go func(id string, ar *allocrunner.AllocRunner) {
go func(id string, ar AllocRunner) {
err := ar.SaveState()
if err != nil {
c.logger.Printf("[ERR] client: failed to save state for alloc %q: %v", id, err)
@ -820,10 +832,10 @@ func (c *Client) saveState() error {
}
// getAllocRunners returns a snapshot of the current set of alloc runners.
func (c *Client) getAllocRunners() map[string]*allocrunner.AllocRunner {
func (c *Client) getAllocRunners() map[string]AllocRunner {
c.allocLock.RLock()
defer c.allocLock.RUnlock()
runners := make(map[string]*allocrunner.AllocRunner, len(c.allocs))
runners := make(map[string]AllocRunner, len(c.allocs))
for id, ar := range c.allocs {
runners[id] = ar
}
@ -1498,7 +1510,7 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
c.allocLock.RUnlock()
if ok {
c.garbageCollector.MarkForCollection(ar)
c.garbageCollector.MarkForCollection(alloc.ID, ar)
// Trigger a GC in case we're over thresholds and just
// waiting for eligible allocs.
@ -1660,14 +1672,16 @@ OUTER:
// need to pull all the allocations.
var pull []string
filtered := make(map[string]struct{})
runners := c.getAllocRunners()
var pullIndex uint64
for allocID, modifyIndex := range resp.Allocs {
// Pull the allocation if we don't have an alloc runner for the
// allocation or if the alloc runner requires an updated allocation.
runner, ok := runners[allocID]
//XXX Part of Client alloc index tracking exp
c.allocLock.RLock()
currentAR, ok := c.allocs[allocID]
c.allocLock.RUnlock()
if !ok || runner.ShouldUpdate(modifyIndex) {
if !ok || modifyIndex > currentAR.Alloc().AllocModifyIndex {
// Only pull allocs that are required. Filtered
// allocs might be at a higher index, so ignore
// it.
@ -1798,14 +1812,14 @@ func (c *Client) watchNodeUpdates() {
func (c *Client) runAllocs(update *allocUpdates) {
// Get the existing allocs
c.allocLock.RLock()
exist := make([]*structs.Allocation, 0, len(c.allocs))
for _, ar := range c.allocs {
exist = append(exist, ar.Alloc())
existing := make(map[string]uint64, len(c.allocs))
for id, ar := range c.allocs {
existing[id] = ar.Alloc().AllocModifyIndex
}
c.allocLock.RUnlock()
// Diff the existing and updated allocations
diff := diffAllocs(exist, update)
diff := diffAllocs(existing, update)
c.logger.Printf("[DEBUG] client: %#v", diff)
// Remove the old allocations
@ -1815,9 +1829,9 @@ func (c *Client) runAllocs(update *allocUpdates) {
// Update the existing allocations
for _, update := range diff.updated {
if err := c.updateAlloc(update.exist, update.updated); err != nil {
if err := c.updateAlloc(update); err != nil {
c.logger.Printf("[ERR] client: failed to update alloc %q: %v",
update.exist.ID, err)
update.ID, err)
}
}
@ -1842,34 +1856,33 @@ func (c *Client) runAllocs(update *allocUpdates) {
// removeAlloc is invoked when we should remove an allocation because it has
// been removed by the server.
func (c *Client) removeAlloc(alloc *structs.Allocation) {
func (c *Client) removeAlloc(allocID string) {
c.allocLock.Lock()
ar, ok := c.allocs[alloc.ID]
defer c.allocLock.Unlock()
ar, ok := c.allocs[allocID]
if !ok {
c.allocLock.Unlock()
c.logger.Printf("[WARN] client: missing context for alloc '%s'", alloc.ID)
c.logger.Printf("[WARN] client: missing context for alloc '%s'", allocID)
return
}
// Stop tracking alloc runner as it's been GC'd by the server
delete(c.allocs, alloc.ID)
c.allocLock.Unlock()
delete(c.allocs, allocID)
// Ensure the GC has a reference and then collect. Collecting through the GC
// applies rate limiting
c.garbageCollector.MarkForCollection(ar)
c.garbageCollector.MarkForCollection(allocID, ar)
// GC immediately since the server has GC'd it
go c.garbageCollector.Collect(alloc.ID)
go c.garbageCollector.Collect(allocID)
}
// updateAlloc is invoked when we should update an allocation
func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
c.allocLock.RLock()
ar, ok := c.allocs[exist.ID]
c.allocLock.RUnlock()
func (c *Client) updateAlloc(update *structs.Allocation) error {
c.allocLock.Lock()
defer c.allocLock.Unlock()
ar, ok := c.allocs[update.ID]
if !ok {
c.logger.Printf("[WARN] client: missing context for alloc '%s'", exist.ID)
c.logger.Printf("[WARN] client: missing context for alloc %q", update.ID)
return nil
}
@ -1879,16 +1892,19 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
// addAlloc is invoked when we should add an allocation
func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error {
// Check if we already have an alloc runner
c.allocLock.Lock()
defer c.allocLock.Unlock()
// Check if we already have an alloc runner
if _, ok := c.allocs[alloc.ID]; ok {
c.logger.Printf("[DEBUG]: client: dropping duplicate add allocation request: %q", alloc.ID)
return nil
}
//FIXME disabled previous alloc waiting/migrating
// get the previous alloc runner - if one exists - for the
// blocking/migrating watcher
/*
var prevAR *allocrunner.AllocRunner
if alloc.PreviousAllocation != "" {
prevAR = c.allocs[alloc.PreviousAllocation]
@ -1896,16 +1912,33 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
c.configLock.RLock()
prevAlloc := allocrunner.NewAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger, migrateToken)
*/
// Copy the config since the node can be swapped out as it is being updated.
// The long term fix is to pass in the config and node separately and then
// we don't have to do a copy.
ar := allocrunner.NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc)
//ar := allocrunner.NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc)
//XXX FIXME
logger := hclog.New(&hclog.LoggerOptions{
Name: "nomad",
Level: hclog.LevelFromString("DEBUG"),
})
c.configLock.RLock()
arConf := &allocrunnerv2.Config{
Logger: logger,
ClientConfig: c.config,
Alloc: alloc,
StateDB: nil, //FIXME ?
}
c.configLock.RUnlock()
ar := allocrunnerv2.NewAllocRunner(arConf)
// Store the alloc runner.
c.allocs[alloc.ID] = ar
//XXX(schmichael) Why do we do this?
if err := ar.SaveState(); err != nil {
c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err)
}

View file

@ -7,7 +7,6 @@ import (
"sync"
"time"
"github.com/hashicorp/nomad/client/allocrunner"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -162,7 +161,7 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
}
// Destroy the alloc runner and wait until it exits
a.destroyAllocRunner(gcAlloc.allocRunner, reason)
a.destroyAllocRunner(gcAlloc.allocID, gcAlloc.allocRunner, reason)
}
return nil
}
@ -170,12 +169,8 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
// destroyAllocRunner is used to destroy an allocation runner. It will acquire a
// lock to restrict parallelism and then destroy the alloc runner, returning
// once the allocation has been destroyed.
func (a *AllocGarbageCollector) destroyAllocRunner(ar *allocrunner.AllocRunner, reason string) {
id := "<nil>"
if alloc := ar.Alloc(); alloc != nil {
id = alloc.ID
}
a.logger.Printf("[INFO] client.gc: garbage collecting allocation %s due to %s", id, reason)
func (a *AllocGarbageCollector) destroyAllocRunner(allocID string, ar AllocRunner, reason string) {
a.logger.Printf("[INFO] client.gc: garbage collecting allocation %s due to %s", allocID, reason)
// Acquire the destroy lock
select {
@ -191,7 +186,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *allocrunner.AllocRunner,
case <-a.shutdownCh:
}
a.logger.Printf("[DEBUG] client.gc: garbage collected %q", ar.Alloc().ID)
a.logger.Printf("[DEBUG] client.gc: garbage collected %s", allocID)
// Release the lock
<-a.destroyCh
@ -205,7 +200,7 @@ func (a *AllocGarbageCollector) Stop() {
// alloc was found and garbage collected; otherwise false.
func (a *AllocGarbageCollector) Collect(allocID string) bool {
if gcAlloc := a.allocRunners.Remove(allocID); gcAlloc != nil {
a.destroyAllocRunner(gcAlloc.allocRunner, "forced collection")
a.destroyAllocRunner(allocID, gcAlloc.allocRunner, "forced collection")
return true
}
@ -227,7 +222,7 @@ func (a *AllocGarbageCollector) CollectAll() {
return
}
go a.destroyAllocRunner(gcAlloc.allocRunner, "forced full node collection")
go a.destroyAllocRunner(gcAlloc.allocID, gcAlloc.allocRunner, "forced full node collection")
}
}
@ -257,7 +252,7 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e
}
// Destroy the alloc runner and wait until it exits
a.destroyAllocRunner(gcAlloc.allocRunner, fmt.Sprintf("new allocations and over max (%d)", a.config.MaxAllocs))
a.destroyAllocRunner(gcAlloc.allocID, gcAlloc.allocRunner, fmt.Sprintf("new allocations and over max (%d)", a.config.MaxAllocs))
}
totalResource := &structs.AllocatedSharedResources{}
@ -330,7 +325,7 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e
}
// Destroy the alloc runner and wait until it exits
a.destroyAllocRunner(ar, fmt.Sprintf("freeing %d MB for new allocations", allocDiskMB))
a.destroyAllocRunner(gcAlloc.allocID, ar, fmt.Sprintf("freeing %d MB for new allocations", allocDiskMB))
diskCleared += allocDiskMB
}
@ -338,13 +333,8 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e
}
// MarkForCollection starts tracking an allocation for Garbage Collection
func (a *AllocGarbageCollector) MarkForCollection(ar *allocrunner.AllocRunner) {
if ar.Alloc() == nil {
a.destroyAllocRunner(ar, "alloc is nil")
return
}
if a.allocRunners.Push(ar) {
func (a *AllocGarbageCollector) MarkForCollection(allocID string, ar AllocRunner) {
if a.allocRunners.Push(allocID, ar) {
a.logger.Printf("[INFO] client.gc: marking allocation %v for GC", ar.Alloc().ID)
}
}
@ -353,7 +343,8 @@ func (a *AllocGarbageCollector) MarkForCollection(ar *allocrunner.AllocRunner) {
// a PQ
type GCAlloc struct {
timeStamp time.Time
allocRunner *allocrunner.AllocRunner
allocID string
allocRunner AllocRunner
index int
}
@ -407,20 +398,20 @@ func NewIndexedGCAllocPQ() *IndexedGCAllocPQ {
// Push an alloc runner into the GC queue. Returns true if alloc was added,
// false if the alloc already existed.
func (i *IndexedGCAllocPQ) Push(ar *allocrunner.AllocRunner) bool {
func (i *IndexedGCAllocPQ) Push(allocID string, ar AllocRunner) bool {
i.pqLock.Lock()
defer i.pqLock.Unlock()
alloc := ar.Alloc()
if _, ok := i.index[alloc.ID]; ok {
if _, ok := i.index[allocID]; ok {
// No work to do
return false
}
gcAlloc := &GCAlloc{
timeStamp: time.Now(),
allocID: allocID,
allocRunner: ar,
}
i.index[alloc.ID] = gcAlloc
i.index[allocID] = gcAlloc
heap.Push(&i.heap, gcAlloc)
return true
}

View file

@ -7,16 +7,12 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
type allocTuple struct {
exist, updated *structs.Allocation
}
// diffResult is used to return the sets that result from a diff
type diffResult struct {
added []*structs.Allocation
removed []*structs.Allocation
updated []allocTuple
ignore []*structs.Allocation
removed []string
updated []*structs.Allocation
ignore []string
}
func (d *diffResult) GoString() string {
@ -26,38 +22,34 @@ func (d *diffResult) GoString() string {
// diffAllocs is used to diff the existing and updated allocations
// to see what has happened.
func diffAllocs(existing []*structs.Allocation, allocs *allocUpdates) *diffResult {
func diffAllocs(existing map[string]uint64, allocs *allocUpdates) *diffResult {
// Scan the existing allocations
result := &diffResult{}
existIdx := make(map[string]struct{})
for _, exist := range existing {
// Mark this as existing
existIdx[exist.ID] = struct{}{}
for existID, existIndex := range existing {
// Check if the alloc was updated or filtered because an update wasn't
// needed.
alloc, pulled := allocs.pulled[exist.ID]
_, filtered := allocs.filtered[exist.ID]
alloc, pulled := allocs.pulled[existID]
_, filtered := allocs.filtered[existID]
// If not updated or filtered, removed
if !pulled && !filtered {
result.removed = append(result.removed, exist)
result.removed = append(result.removed, existID)
continue
}
// Check for an update
if pulled && alloc.AllocModifyIndex > exist.AllocModifyIndex {
result.updated = append(result.updated, allocTuple{exist, alloc})
if pulled && alloc.AllocModifyIndex > existIndex {
result.updated = append(result.updated, alloc)
continue
}
// Ignore this
result.ignore = append(result.ignore, exist)
result.ignore = append(result.ignore, existID)
}
// Scan the updated allocations for any that are new
for id, pulled := range allocs.pulled {
if _, ok := existIdx[id]; !ok {
if _, ok := existing[id]; !ok {
result.added = append(result.added, pulled)
}
}