open-nomad/client/alloc_runner.go

102 lines
2.2 KiB
Go
Raw Normal View History

2015-08-23 22:06:47 +00:00
package client
2015-08-23 22:15:48 +00:00
import (
"log"
"sync"
2015-08-23 23:49:48 +00:00
"github.com/hashicorp/nomad/client/driver"
2015-08-23 22:15:48 +00:00
"github.com/hashicorp/nomad/nomad/structs"
)
2015-08-23 22:06:47 +00:00
// AllocRunner is used to wrap an allocation and provide the execution context.
type AllocRunner struct {
2015-08-23 22:06:47 +00:00
client *Client
2015-08-23 22:15:48 +00:00
logger *log.Logger
alloc *structs.Allocation
2015-08-23 23:49:48 +00:00
ctx *driver.ExecContext
tasks map[string]*TaskRunner
2015-08-23 22:15:48 +00:00
updateCh chan *structs.Allocation
destroy bool
destroyCh chan struct{}
destroyLock sync.Mutex
2015-08-23 22:06:47 +00:00
}
// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(client *Client, alloc *structs.Allocation) *AllocRunner {
ctx := &AllocRunner{
2015-08-23 22:30:16 +00:00
client: client,
logger: client.logger,
alloc: alloc,
2015-08-23 23:49:48 +00:00
tasks: make(map[string]*TaskRunner),
2015-08-23 22:30:16 +00:00
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
2015-08-23 22:06:47 +00:00
}
return ctx
}
// Alloc returns the associated allocation
2015-08-23 22:36:06 +00:00
func (r *AllocRunner) Alloc() *structs.Allocation {
return r.alloc
2015-08-23 22:06:47 +00:00
}
// Run is a long running goroutine used to manage an allocation
2015-08-23 22:36:06 +00:00
func (r *AllocRunner) Run() {
r.logger.Printf("[DEBUG] client: starting context for alloc '%s'", r.alloc.ID)
2015-08-23 22:15:48 +00:00
2015-08-23 23:49:48 +00:00
// Find our task group in the allocation
alloc := r.alloc
tg := alloc.Job.LookingTaskGroup(alloc.TaskGroup)
if tg == nil {
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
// TODO: Err out
return
}
// Create the execution context
r.ctx = driver.NewExecContext()
// Start the task runners
for _, task := range tg.Tasks {
tr := NewTaskRunner(r, r.ctx, task)
r.tasks[task.Name] = tr
go tr.Run()
}
// Wait for updates
2015-08-23 22:06:47 +00:00
for {
2015-08-23 22:15:48 +00:00
select {
2015-08-23 22:36:06 +00:00
case update := <-r.updateCh:
2015-08-23 22:15:48 +00:00
// TODO: Update
2015-08-23 22:36:06 +00:00
r.alloc = update
case <-r.destroyCh:
2015-08-23 22:15:48 +00:00
// TODO: Destroy
return
}
2015-08-23 22:06:47 +00:00
}
}
// Update is used to update the allocation of the context
2015-08-23 22:36:06 +00:00
func (r *AllocRunner) Update(update *structs.Allocation) {
2015-08-23 22:15:48 +00:00
select {
2015-08-23 22:36:06 +00:00
case r.updateCh <- update:
2015-08-23 22:15:48 +00:00
default:
2015-08-23 22:36:06 +00:00
r.logger.Printf("[ERR] client: dropping update to alloc '%s'", update.ID)
2015-08-23 22:15:48 +00:00
}
2015-08-23 22:06:47 +00:00
}
// Destroy is used to indicate that the allocation context should be destroyed
2015-08-23 22:36:06 +00:00
func (r *AllocRunner) Destroy() {
r.destroyLock.Lock()
defer r.destroyLock.Unlock()
2015-08-23 22:15:48 +00:00
2015-08-23 22:36:06 +00:00
if r.destroy {
2015-08-23 22:15:48 +00:00
return
}
2015-08-23 22:36:06 +00:00
r.destroy = true
close(r.destroyCh)
2015-08-23 22:06:47 +00:00
}