Merge pull request #766 from hashicorp/f-consul-update
Update the consul service when the task/alloc changes
This commit is contained in:
commit
a59cec5fb3
|
@ -364,8 +364,6 @@ func (r *AllocRunner) Run() {
|
|||
continue
|
||||
}
|
||||
|
||||
// Merge in the task resources
|
||||
task.Resources = alloc.TaskResources[task.Name]
|
||||
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
|
||||
task, r.consulService)
|
||||
r.tasks[task.Name] = tr
|
||||
|
@ -392,22 +390,6 @@ OUTER:
|
|||
r.taskLock.RLock()
|
||||
for _, task := range tg.Tasks {
|
||||
tr := r.tasks[task.Name]
|
||||
|
||||
// Merge in the task resources
|
||||
task.Resources = update.TaskResources[task.Name]
|
||||
FOUND:
|
||||
for _, updateGroup := range update.Job.TaskGroups {
|
||||
if tg.Name != updateGroup.Name {
|
||||
continue
|
||||
}
|
||||
for _, updateTask := range updateGroup.Tasks {
|
||||
if updateTask.Name != task.Name {
|
||||
continue
|
||||
}
|
||||
task.Services = updateTask.Services
|
||||
break FOUND
|
||||
}
|
||||
}
|
||||
tr.Update(update)
|
||||
}
|
||||
r.taskLock.RUnlock()
|
||||
|
|
|
@ -76,6 +76,7 @@ type ConsulService struct {
|
|||
|
||||
trackedTasks map[string]*trackedTask
|
||||
serviceStates map[string]string
|
||||
allocToService map[string][]string
|
||||
trackedTskLock sync.Mutex
|
||||
}
|
||||
|
||||
|
@ -130,12 +131,13 @@ func NewConsulService(config *consulServiceConfig) (*ConsulService, error) {
|
|||
}
|
||||
|
||||
consulService := ConsulService{
|
||||
client: &consulApiClient{client: c},
|
||||
logger: config.logger,
|
||||
node: config.node,
|
||||
trackedTasks: make(map[string]*trackedTask),
|
||||
serviceStates: make(map[string]string),
|
||||
shutdownCh: make(chan struct{}),
|
||||
client: &consulApiClient{client: c},
|
||||
logger: config.logger,
|
||||
node: config.node,
|
||||
trackedTasks: make(map[string]*trackedTask),
|
||||
serviceStates: make(map[string]string),
|
||||
allocToService: make(map[string][]string),
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
return &consulService, nil
|
||||
|
@ -148,8 +150,18 @@ func (c *ConsulService) Register(task *structs.Task, alloc *structs.Allocation)
|
|||
c.trackedTskLock.Lock()
|
||||
tt := &trackedTask{task: task, alloc: alloc}
|
||||
c.trackedTasks[fmt.Sprintf("%s-%s", alloc.ID, task.Name)] = tt
|
||||
|
||||
// Delete any previously registered service as the same alloc is being
|
||||
// re-registered.
|
||||
for _, service := range c.allocToService[alloc.ID] {
|
||||
delete(c.serviceStates, service)
|
||||
}
|
||||
c.trackedTskLock.Unlock()
|
||||
|
||||
for _, service := range task.Services {
|
||||
// Track the services this alloc is registering.
|
||||
c.allocToService[alloc.ID] = append(c.allocToService[alloc.ID], service.Name)
|
||||
|
||||
c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name)
|
||||
if err := c.registerService(service, task, alloc); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
|
@ -165,6 +177,7 @@ func (c *ConsulService) Deregister(task *structs.Task, alloc *structs.Allocation
|
|||
var mErr multierror.Error
|
||||
c.trackedTskLock.Lock()
|
||||
delete(c.trackedTasks, fmt.Sprintf("%s-%s", alloc.ID, task.Name))
|
||||
delete(c.allocToService, alloc.ID)
|
||||
c.trackedTskLock.Unlock()
|
||||
for _, service := range task.Services {
|
||||
serviceID := alloc.Services[service.Name]
|
||||
|
@ -229,14 +242,14 @@ func (c *ConsulService) performSync() {
|
|||
// Add new services which Consul agent isn't aware of
|
||||
knownServices[serviceID] = struct{}{}
|
||||
if _, ok := consulServices[serviceID]; !ok {
|
||||
c.printLogMessage("[INFO] consul: registering service %s with consul.", service.Name)
|
||||
c.printLogMessage("[INFO] consul: perform sync, registering service %s with consul.", service.Name)
|
||||
c.registerService(service, trackedTask.task, trackedTask.alloc)
|
||||
continue
|
||||
}
|
||||
|
||||
// If a service has changed, re-register it with Consul agent
|
||||
if service.Hash() != c.serviceStates[serviceID] {
|
||||
c.printLogMessage("[INFO] consul: reregistering service %s with consul.", service.Name)
|
||||
c.printLogMessage("[INFO] consul: perform sync hash change, reregistering service %s with consul.", service.Name)
|
||||
c.registerService(service, trackedTask.task, trackedTask.alloc)
|
||||
continue
|
||||
}
|
||||
|
@ -268,7 +281,7 @@ func (c *ConsulService) performSync() {
|
|||
for _, consulService := range consulServices {
|
||||
if _, ok := knownServices[consulService.ID]; !ok {
|
||||
delete(c.serviceStates, consulService.ID)
|
||||
c.printLogMessage("[INFO] consul: deregistering service %v with consul", consulService.Service)
|
||||
c.printLogMessage("[INFO] consul: perform sync, deregistering service %v with consul", consulService.Service)
|
||||
c.deregisterService(consulService.ID)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,9 +10,11 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/driver"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/mitchellh/hashstructure"
|
||||
|
||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
)
|
||||
|
@ -54,6 +56,9 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
|
|||
alloc *structs.Allocation, task *structs.Task,
|
||||
consulService *ConsulService) *TaskRunner {
|
||||
|
||||
// Merge in the task resources
|
||||
task.Resources = alloc.TaskResources[task.Name]
|
||||
|
||||
// Build the restart tracker.
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
if tg == nil {
|
||||
|
@ -319,21 +324,24 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
|
|||
}
|
||||
|
||||
// Extract the task.
|
||||
var task *structs.Task
|
||||
var updatedTask *structs.Task
|
||||
for _, t := range tg.Tasks {
|
||||
if t.Name == r.task.Name {
|
||||
task = t
|
||||
updatedTask = t
|
||||
}
|
||||
}
|
||||
if task == nil {
|
||||
if updatedTask == nil {
|
||||
return fmt.Errorf("task group %q doesn't contain task %q", tg.Name, r.task.Name)
|
||||
}
|
||||
r.task = task
|
||||
|
||||
// Merge in the task resources
|
||||
updatedTask.Resources = update.TaskResources[updatedTask.Name]
|
||||
|
||||
// Update will update resources and store the new kill timeout.
|
||||
var mErr multierror.Error
|
||||
if r.handle != nil {
|
||||
if err := r.handle.Update(task); err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
|
||||
if err := r.handle.Update(updatedTask); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -342,14 +350,26 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
|
|||
r.restartTracker.SetPolicy(tg.RestartPolicy)
|
||||
}
|
||||
|
||||
/* TODO
|
||||
// Re-register the task to consul and store the updated alloc.
|
||||
r.consulService.Deregister(r.task, r.alloc)
|
||||
r.alloc = update
|
||||
r.consulService.Register(r.task, r.alloc)
|
||||
*/
|
||||
// Hash services returns the hash of the task's services
|
||||
hashServices := func(task *structs.Task) uint64 {
|
||||
h, err := hashstructure.Hash(task.Services, nil)
|
||||
if err != nil {
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("hashing services failed %#v: %v", task.Services, err))
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
return nil
|
||||
// Re-register the task to consul if any of the services have changed.
|
||||
if hashServices(updatedTask) != hashServices(r.task) {
|
||||
if err := r.consulService.Register(updatedTask, update); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("updating services with consul failed: %v", err))
|
||||
}
|
||||
}
|
||||
|
||||
// Store the updated alloc.
|
||||
r.alloc = update
|
||||
r.task = updatedTask
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// Helper function for converting a WaitResult into a TaskTerminated event.
|
||||
|
|
Loading…
Reference in a new issue