Add UpdateTask method instead of Remove/Add

This commit is contained in:
Michael Schurter 2017-04-03 17:08:08 -07:00
parent 8118c41931
commit 244251490a
5 changed files with 264 additions and 212 deletions

View File

@ -10,4 +10,5 @@ import (
type ConsulServiceAPI interface { type ConsulServiceAPI interface {
RegisterTask(allocID string, task *structs.Task, exec consul.ScriptExecutor) error RegisterTask(allocID string, task *structs.Task, exec consul.ScriptExecutor) error
RemoveTask(allocID string, task *structs.Task) RemoveTask(allocID string, task *structs.Task)
UpdateTask(allocID string, existing, newTask *structs.Task, exec consul.ScriptExecutor) error
} }

View File

@ -1386,15 +1386,27 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
updatedTask.Resources = update.TaskResources[updatedTask.Name] updatedTask.Resources = update.TaskResources[updatedTask.Name]
var mErr multierror.Error var mErr multierror.Error
var scriptExec consul.ScriptExecutor
r.handleLock.Lock() r.handleLock.Lock()
if r.handle != nil { if r.handle != nil {
// Update will update resources and store the new kill timeout. // Update will update resources and store the new kill timeout.
if err := r.handle.Update(updatedTask); err != nil { if err := r.handle.Update(updatedTask); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err)) mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err))
} }
// Not all drivers support Exec (eg QEMU)
scriptExec, _ = r.handle.(consul.ScriptExecutor) //FIXME is there a better place to do this? used to be in executor
// Prepare services
interpolateServices(r.getTaskEnv(), updatedTask)
// Not all drivers support Exec (eg QEMU), but RegisterTask
// handles nil ScriptExecutors
scriptExec, _ := r.handle.(consul.ScriptExecutor)
// Since the handle exists, the task is running, so we need to
// update it in Consul (if the handle doesn't exist
// registration in Consul will happen when it's created)
if err := r.consul.UpdateTask(r.alloc.ID, r.task, updatedTask, scriptExec); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("error updating services and checks in Consul: %v", err))
}
} }
r.handleLock.Unlock() r.handleLock.Unlock()
@ -1403,21 +1415,9 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
r.restartTracker.SetPolicy(tg.RestartPolicy) r.restartTracker.SetPolicy(tg.RestartPolicy)
} }
// Deregister the old service+checks
r.consul.RemoveTask(r.alloc.ID, r.task)
// Store the updated alloc. // Store the updated alloc.
r.alloc = update r.alloc = update
r.task = updatedTask r.task = updatedTask
//FIXME is there a better place to do this? used to be in executor
// Prepare services
interpolateServices(r.getTaskEnv(), r.task)
// Register the new service+checks
if err := r.consul.RegisterTask(r.alloc.ID, r.task, scriptExec); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("error registering updated task with consul: %v", err))
}
return mErr.ErrorOrNil() return mErr.ErrorOrNil()
} }

View File

@ -88,17 +88,9 @@ type ServiceClient struct {
// syncCh triggers a sync in the main Run loop // syncCh triggers a sync in the main Run loop
syncCh chan struct{} syncCh chan struct{}
// services and checks to be registered // pending service and check operations
regServices map[string]*api.AgentServiceRegistration pending *consulOps
regChecks map[string]*api.AgentCheckRegistration opsLock sync.Mutex
// services and checks to be unregisterd
deregServices map[string]struct{}
deregChecks map[string]struct{}
// script checks to be run() after their corresponding check is
// registered
regScripts map[string]*scriptCheck
// script check cancel funcs to be called before their corresponding // script check cancel funcs to be called before their corresponding
// check is removed. Only accessed in sync() so not covered by regLock // check is removed. Only accessed in sync() so not covered by regLock
@ -126,11 +118,7 @@ func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait, shutdownWait: defaultShutdownWait,
syncCh: make(chan struct{}, 1), syncCh: make(chan struct{}, 1),
regServices: make(map[string]*api.AgentServiceRegistration), pending: newConsulOps(),
regChecks: make(map[string]*api.AgentCheckRegistration),
deregServices: make(map[string]struct{}),
deregChecks: make(map[string]struct{}),
regScripts: make(map[string]*scriptCheck),
runningScripts: make(map[string]*scriptHandle), runningScripts: make(map[string]*scriptHandle),
agentServices: make(map[string]struct{}, 8), agentServices: make(map[string]struct{}, 8),
agentChecks: make(map[string]struct{}, 8), agentChecks: make(map[string]struct{}, 8),
@ -182,67 +170,39 @@ func (c *ServiceClient) forceSync() {
// sync enqueued operations. // sync enqueued operations.
func (c *ServiceClient) sync() error { func (c *ServiceClient) sync() error {
// Shallow copy and reset the pending operations fields c.opsLock.Lock()
c.regLock.Lock() ops := c.pending
regServices := make(map[string]*api.AgentServiceRegistration, len(c.regServices)) c.pending = newConsulOps()
for k, v := range c.regServices { c.opsLock.Unlock()
regServices[k] = v
}
c.regServices = map[string]*api.AgentServiceRegistration{}
regChecks := make(map[string]*api.AgentCheckRegistration, len(c.regChecks))
for k, v := range c.regChecks {
regChecks[k] = v
}
c.regChecks = map[string]*api.AgentCheckRegistration{}
regScripts := make(map[string]*scriptCheck, len(c.regScripts))
for k, v := range c.regScripts {
regScripts[k] = v
}
c.regScripts = map[string]*scriptCheck{}
deregServices := make(map[string]struct{}, len(c.deregServices))
for k := range c.deregServices {
deregServices[k] = mark
}
c.deregServices = map[string]struct{}{}
deregChecks := make(map[string]struct{}, len(c.deregChecks))
for k := range c.deregChecks {
deregChecks[k] = mark
}
c.deregChecks = map[string]struct{}{}
c.regLock.Unlock()
var err error var err error
regServiceN, regCheckN, deregServiceN, deregCheckN := len(regServices), len(regChecks), len(deregServices), len(deregChecks) msg := ops.String()
// Register Services // Register Services
for id, service := range regServices { for id, service := range ops.regServices {
if err = c.client.ServiceRegister(service); err != nil { if err = c.client.ServiceRegister(service); err != nil {
goto ERROR goto ERROR
} }
delete(regServices, id) delete(ops.regServices, id)
} }
// Register Checks // Register Checks
for id, check := range regChecks { for id, check := range ops.regChecks {
if err = c.client.CheckRegister(check); err != nil { if err = c.client.CheckRegister(check); err != nil {
goto ERROR goto ERROR
} }
delete(regChecks, id) delete(ops.regChecks, id)
// Run the script for this check if one exists // Run the script for this check if one exists
if script, ok := regScripts[id]; ok { if script, ok := ops.regScripts[id]; ok {
// This check is a script check; run it // This check is a script check; run it
c.runningScripts[id] = script.run() c.runningScripts[id] = script.run()
} }
} }
// Deregister Checks // Deregister Checks
for id := range deregChecks { for id := range ops.deregChecks {
if h, ok := c.runningScripts[id]; ok { if h, ok := c.runningScripts[id]; ok {
// This check is a script check; stop it // This check is a script check; stop it
h.cancel() h.cancel()
@ -252,63 +212,28 @@ func (c *ServiceClient) sync() error {
if err = c.client.CheckDeregister(id); err != nil { if err = c.client.CheckDeregister(id); err != nil {
goto ERROR goto ERROR
} }
delete(deregChecks, id) delete(ops.deregChecks, id)
} }
// Deregister Services // Deregister Services
for id := range deregServices { for id := range ops.deregServices {
if err = c.client.ServiceDeregister(id); err != nil { if err = c.client.ServiceDeregister(id); err != nil {
goto ERROR goto ERROR
} }
delete(deregServices, id) delete(ops.deregServices, id)
} }
c.logger.Printf("[DEBUG] consul: registered %d services / %d checks; deregisterd %d services / %d checks", regServiceN, regCheckN, deregServiceN, deregCheckN) c.logger.Printf("[DEBUG] consul: %s", msg)
return nil return nil
//TODO Labels and gotos are nasty; move to a function? //TODO Labels and gotos are nasty; move to a function?
ERROR: ERROR:
// An error occurred, repopulate the operation maps omitting any keys // An error occurred, repopulate the operation maps but give
// that have been updated while sync() ran. // precendence to new ops
c.regLock.Lock() c.opsLock.Lock()
for id, service := range regServices { ops.merge(c.pending)
if _, ok := c.regServices[id]; ok { c.pending = ops
continue c.opsLock.Unlock()
}
if _, ok := c.deregServices[id]; ok {
continue
}
c.regServices[id] = service
}
for id, check := range regChecks {
if _, ok := c.regChecks[id]; ok {
continue
}
if _, ok := c.deregChecks[id]; ok {
continue
}
c.regChecks[id] = check
}
for id, script := range regScripts {
if _, ok := c.regScripts[id]; ok {
// a new version of this script was added, drop this one
continue
}
c.regScripts[id] = script
}
for id, _ := range deregServices {
if _, ok := c.regServices[id]; ok {
continue
}
c.deregServices[id] = mark
}
for id, _ := range deregChecks {
if _, ok := c.regChecks[id]; ok {
continue
}
c.deregChecks[id] = mark
}
c.regLock.Unlock()
return err return err
} }
@ -317,10 +242,9 @@ ERROR:
// //
// Agents will be deregistered when Shutdown is called. // Agents will be deregistered when Shutdown is called.
func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) error { func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) error {
regs := make([]*api.AgentServiceRegistration, len(services)) ops := newConsulOps()
checks := make([]*api.AgentCheckRegistration, 0, len(services))
for i, service := range services { for _, service := range services {
id := makeAgentServiceID(role, service) id := makeAgentServiceID(role, service)
host, rawport, err := net.SplitHostPort(service.PortLabel) host, rawport, err := net.SplitHostPort(service.PortLabel)
if err != nil { if err != nil {
@ -337,7 +261,7 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
Address: host, Address: host,
Port: port, Port: port,
} }
regs[i] = serviceReg ops.regServices[id] = serviceReg
for _, check := range service.Checks { for _, check := range service.Checks {
checkID := createCheckID(id, check) checkID := createCheckID(id, check)
@ -360,22 +284,79 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
if err != nil { if err != nil {
return fmt.Errorf("failed to add check %q: %v", check.Name, err) return fmt.Errorf("failed to add check %q: %v", check.Name, err)
} }
checks = append(checks, checkReg) ops.regChecks[checkID] = checkReg
} }
} }
// Now add them to the registration queue // Now add them to the registration queue
c.enqueueRegs(regs, checks, nil) c.opsLock.Lock()
c.pending.merge(ops)
c.opsLock.Unlock()
// Record IDs for deregistering on shutdown // Record IDs for deregistering on shutdown
c.agentLock.Lock() c.agentLock.Lock()
for _, s := range regs { for id := range ops.regServices {
c.agentServices[s.ID] = mark c.agentServices[id] = mark
} }
for _, ch := range checks { for id := range ops.regChecks {
c.agentChecks[ch.ID] = mark c.agentChecks[id] = mark
} }
c.agentLock.Unlock() c.agentLock.Unlock()
c.forceSync()
return nil
}
type addrParser func(portLabel string) (string, int)
// makeCheckReg adds a check reg to operations.
func (c *ServiceClient) makeCheckReg(ops *consulOps, check *structs.ServiceCheck,
service *api.AgentServiceRegistration, exec ScriptExecutor, parseAddr addrParser) error {
checkID := createCheckID(service.ID, check)
if check.Type == structs.ServiceCheckScript {
if exec == nil {
return fmt.Errorf("driver doesn't support script checks")
}
ops.regScripts[checkID] = newScriptCheck(
checkID, check, exec, c.client, c.logger, c.shutdownCh)
}
host, port := service.Address, service.Port
if check.PortLabel != "" {
host, port = parseAddr(check.PortLabel)
}
checkReg, err := createCheckReg(service.ID, checkID, check, host, port)
if err != nil {
return fmt.Errorf("failed to add check %q: %v", check.Name, err)
}
ops.regChecks[checkID] = checkReg
return nil
}
// serviceRegs creates service registrations, check registrations, and script
// checks from a service.
func (c *ServiceClient) serviceRegs(ops *consulOps, allocID string, service *structs.Service,
exec ScriptExecutor, task *structs.Task) error {
id := makeTaskServiceID(allocID, task.Name, service)
host, port := task.FindHostAndPortFor(service.PortLabel)
serviceReg := &api.AgentServiceRegistration{
ID: id,
Name: service.Name,
Tags: make([]string, len(service.Tags)),
Address: host,
Port: port,
}
// copy isn't strictly necessary but can avoid bugs especially
// with tests that may reuse Tasks
copy(serviceReg.Tags, service.Tags)
ops.regServices[id] = serviceReg
for _, check := range service.Checks {
err := c.makeCheckReg(ops, check, serviceReg, exec, task.FindHostAndPortFor)
if err != nil {
return err
}
}
return nil return nil
} }
@ -384,48 +365,99 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
// //
// Actual communication with Consul is done asynchrously (see Run). // Actual communication with Consul is done asynchrously (see Run).
func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec ScriptExecutor) error { func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec ScriptExecutor) error {
regs := make([]*api.AgentServiceRegistration, len(task.Services)) ops := newConsulOps()
checks := make([]*api.AgentCheckRegistration, 0, len(task.Services)*2) // just guess at size for _, service := range task.Services {
var scriptChecks []*scriptCheck if err := c.serviceRegs(ops, allocID, service, exec, task); err != nil {
return err
for i, service := range task.Services {
id := makeTaskServiceID(allocID, task.Name, service)
host, port := task.FindHostAndPortFor(service.PortLabel)
serviceReg := &api.AgentServiceRegistration{
ID: id,
Name: service.Name,
Tags: make([]string, len(service.Tags)),
Address: host,
Port: port,
} }
// copy isn't strictly necessary but can avoid bugs especially
// with tests that may reuse Tasks
copy(serviceReg.Tags, service.Tags)
regs[i] = serviceReg
for _, check := range service.Checks {
checkID := createCheckID(id, check)
if check.Type == structs.ServiceCheckScript {
if exec == nil {
return fmt.Errorf("driver %q doesn't support script checks", task.Driver)
}
scriptChecks = append(scriptChecks, newScriptCheck(checkID, check, exec, c.client, c.logger, c.shutdownCh))
}
host, port := serviceReg.Address, serviceReg.Port
if check.PortLabel != "" {
host, port = task.FindHostAndPortFor(check.PortLabel)
}
checkReg, err := createCheckReg(id, checkID, check, host, port)
if err != nil {
return fmt.Errorf("failed to add check %q: %v", check.Name, err)
}
checks = append(checks, checkReg)
}
} }
// Now add them to the registration queue // Now add them to the registration queue
c.enqueueRegs(regs, checks, scriptChecks) c.opsLock.Lock()
c.pending.merge(ops)
c.opsLock.Unlock()
c.forceSync()
return nil
}
// UpdateTask in Consul. Does not alter the service if only checks have
// changed.
func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Task, exec ScriptExecutor) error {
ops := newConsulOps()
existingIDs := make(map[string]*structs.Service, len(existing.Services))
for _, s := range existing.Services {
existingIDs[makeTaskServiceID(allocID, existing.Name, s)] = s
c.logger.Printf("[XXX] EXISTING: %s", makeTaskServiceID(allocID, existing.Name, s))
}
newIDs := make(map[string]*structs.Service, len(newTask.Services))
for _, s := range newTask.Services {
newIDs[makeTaskServiceID(allocID, newTask.Name, s)] = s
c.logger.Printf("[XXX] UPDATED : %s", makeTaskServiceID(allocID, newTask.Name, s))
}
parseAddr := newTask.FindHostAndPortFor
// Loop over existing Service IDs to see if they have been removed or
// updated.
for existingID, existingSvc := range existingIDs {
newSvc, ok := newIDs[existingID]
if !ok {
// Existing sevice entry removed
ops.deregServices[existingID] = mark
for _, check := range existingSvc.Checks {
ops.deregChecks[createCheckID(existingID, check)] = mark
}
continue
}
// Manipulating checks is cheap and easy, so just remove old and add new
for _, check := range existingSvc.Checks {
ops.deregChecks[createCheckID(existingID, check)] = mark
}
// Register new checks
for _, check := range newSvc.Checks {
checkID := createCheckID(existingID, check)
// Don't deregister this check if it hasn't changed
delete(ops.deregChecks, checkID)
if check.Type == structs.ServiceCheckScript {
if exec == nil {
return fmt.Errorf("driver doesn't support script checks")
}
ops.regScripts[checkID] = newScriptCheck(
checkID, check, exec, c.client, c.logger, c.shutdownCh)
}
host, port := parseAddr(existingSvc.PortLabel)
if check.PortLabel != "" {
host, port = parseAddr(check.PortLabel)
}
checkReg, err := createCheckReg(existingID, checkID, check, host, port)
if err != nil {
return err
}
ops.regChecks[checkID] = checkReg
}
// Service hasn't changed and checks are updated so don't
// process this service again later
delete(newIDs, existingID)
}
// Any remaining services should just be enqueued directly
for _, newSvc := range newIDs {
err := c.serviceRegs(ops, allocID, newSvc, exec, newTask)
if err != nil {
return err
}
}
// Finally enqueue the updates and force sync
c.opsLock.Lock()
c.pending.merge(ops)
c.opsLock.Unlock()
c.forceSync()
return nil return nil
} }
@ -433,62 +465,20 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec Sc
// //
// Actual communication with Consul is done asynchrously (see Run). // Actual communication with Consul is done asynchrously (see Run).
func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) { func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) {
deregs := make([]string, len(task.Services)) ops := newConsulOps()
checks := make([]string, 0, len(task.Services)*2) // just guess at size
for i, service := range task.Services { for _, service := range task.Services {
id := makeTaskServiceID(allocID, task.Name, service) id := makeTaskServiceID(allocID, task.Name, service)
deregs[i] = id ops.deregServices[id] = mark
for _, check := range service.Checks { for _, check := range service.Checks {
checks = append(checks, createCheckID(id, check)) ops.deregChecks[createCheckID(id, check)] = mark
} }
} }
// Now add them to the deregistration fields; main Run loop will update // Now add them to the deregistration fields; main Run loop will update
c.enqueueDeregs(deregs, checks)
}
// enqueueRegs enqueues service and check registrations for the next time
// operations are sync'd to Consul.
func (c *ServiceClient) enqueueRegs(regs []*api.AgentServiceRegistration, checks []*api.AgentCheckRegistration, scriptChecks []*scriptCheck) {
c.regLock.Lock() c.regLock.Lock()
for _, reg := range regs { c.pending.merge(ops)
// Add reg
c.regServices[reg.ID] = reg
// Make sure it's not being removed
delete(c.deregServices, reg.ID)
}
for _, check := range checks {
// Add check
c.regChecks[check.ID] = check
// Make sure it's not being removed
delete(c.deregChecks, check.ID)
}
for _, script := range scriptChecks {
c.regScripts[script.id] = script
}
c.regLock.Unlock()
c.forceSync()
}
// enqueueDeregs enqueues service and check removals for the next time
// operations are sync'd to Consul.
func (c *ServiceClient) enqueueDeregs(deregs []string, checks []string) {
c.regLock.Lock()
for _, dereg := range deregs {
// Add dereg
c.deregServices[dereg] = mark
// Make sure it's not being added
delete(c.regServices, dereg)
}
for _, check := range checks {
// Add check for removal
c.deregChecks[check] = mark
// Make sure it's not being added
delete(c.regChecks, check)
}
c.regLock.Unlock() c.regLock.Unlock()
c.forceSync() c.forceSync()

View File

@ -0,0 +1,61 @@
package consul
import (
"fmt"
"github.com/hashicorp/consul/api"
)
type consulOps struct {
// services and checks to be registered
regServices map[string]*api.AgentServiceRegistration
regChecks map[string]*api.AgentCheckRegistration
// services and checks to be unregisterd
deregServices map[string]struct{}
deregChecks map[string]struct{}
// script checks to be run() after their corresponding check is
// registered
regScripts map[string]*scriptCheck
}
func newConsulOps() *consulOps {
return &consulOps{
regServices: make(map[string]*api.AgentServiceRegistration),
regChecks: make(map[string]*api.AgentCheckRegistration),
deregServices: make(map[string]struct{}),
deregChecks: make(map[string]struct{}),
regScripts: make(map[string]*scriptCheck),
}
}
// merge newer operations. New operations registrations override existing
// deregistrations.
func (c *consulOps) merge(newer *consulOps) {
for id, service := range newer.regServices {
delete(c.deregServices, id)
c.regServices[id] = service
}
for id, check := range newer.regChecks {
delete(c.deregChecks, id)
c.regChecks[id] = check
}
for id, script := range newer.regScripts {
c.regScripts[id] = script
}
for id, _ := range newer.deregServices {
delete(c.regServices, id)
c.deregServices[id] = mark
}
for id, _ := range newer.deregChecks {
delete(c.regChecks, id)
delete(c.regScripts, id)
c.deregChecks[id] = mark
}
}
func (c *consulOps) String() string {
return fmt.Sprintf("registered %d services / %d checks; deregisterd %d services / %d checks",
len(c.regServices), len(c.regChecks), len(c.deregServices), len(c.deregChecks))
}

View File

@ -323,8 +323,8 @@ func TestConsul_ShutdownOK(t *testing.T) {
} }
// Nothing should be enqueued anymore // Nothing should be enqueued anymore
enqueued := (len(ctx.ServiceClient.regServices) + len(ctx.ServiceClient.deregServices) + enqueued := (len(ctx.ServiceClient.pending.regServices) + len(ctx.ServiceClient.pending.deregServices) +
len(ctx.ServiceClient.regChecks) + len(ctx.ServiceClient.deregChecks)) len(ctx.ServiceClient.pending.regChecks) + len(ctx.ServiceClient.pending.deregChecks))
if enqueued > 0 { if enqueued > 0 {
t.Errorf("%d operations still enqueued", enqueued) t.Errorf("%d operations still enqueued", enqueued)
} }