Merge pull request #982 from hashicorp/b-perform-sync

Fixing check registration in perform sync
This commit is contained in:
Diptanu Choudhury 2016-03-24 14:54:10 -07:00
commit 0df7e92c68
8 changed files with 23 additions and 19 deletions

View File

@ -1205,7 +1205,7 @@ func (c *Client) syncConsul() {
}
}
if err := c.consulService.KeepServices(runningTasks); err != nil {
c.logger.Printf("[DEBUG] error removing services from non-running tasks: %v", err)
c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err)
}
case <-c.shutdownCh:
c.logger.Printf("[INFO] client: shutting down consul sync")

View File

@ -25,7 +25,7 @@ type ConsulService struct {
allocID string
trackedServices map[string]*consul.AgentService
trackedChecks map[string]*structs.ServiceCheck
trackedChecks map[string]*consul.AgentCheckRegistration
logger *log.Logger
@ -92,7 +92,7 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string)
allocID: allocID,
logger: logger,
trackedServices: make(map[string]*consul.AgentService),
trackedChecks: make(map[string]*structs.ServiceCheck),
trackedChecks: make(map[string]*consul.AgentCheckRegistration),
shutdownCh: make(chan struct{}),
}
@ -104,7 +104,7 @@ func (c *ConsulService) SyncTask(task *structs.Task) error {
var mErr multierror.Error
c.task = task
taskServices := make(map[string]*consul.AgentService)
taskChecks := make(map[string]*structs.ServiceCheck)
taskChecks := make(map[string]*consul.AgentCheckRegistration)
// Register Services and Checks that we don't know about or has changed
for _, service := range task.Services {
@ -123,14 +123,14 @@ func (c *ConsulService) SyncTask(task *structs.Task) error {
taskServices[srv.ID] = srv
for _, chk := range service.Checks {
checkID := chk.Hash(srv.ID)
if _, ok := c.trackedChecks[checkID]; !ok {
if err := c.registerCheck(chk, srv); err != nil {
chkReg := c.createCheck(chk, srv)
if _, ok := c.trackedChecks[chkReg.ID]; !ok {
if err := c.registerCheck(chkReg); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
c.trackedChecks[checkID] = chk
taskChecks[checkID] = chk
c.trackedChecks[chkReg.ID] = chkReg
taskChecks[chkReg.ID] = chkReg
}
}
@ -206,7 +206,11 @@ func (c *ConsulService) KeepServices(tasks []*structs.Task) error {
}
// registerCheck registers a check definition with Consul
func (c *ConsulService) registerCheck(check *structs.ServiceCheck, service *consul.AgentService) error {
func (c *ConsulService) registerCheck(chkReg *consul.AgentCheckRegistration) error {
return c.client.Agent().CheckRegister(chkReg)
}
func (c *ConsulService) createCheck(check *structs.ServiceCheck, service *consul.AgentService) *consul.AgentCheckRegistration {
chkReg := consul.AgentCheckRegistration{
ID: check.Hash(service.ID),
Name: check.Name,
@ -230,7 +234,7 @@ func (c *ConsulService) registerCheck(check *structs.ServiceCheck, service *cons
case structs.ServiceCheckScript:
chkReg.TTL = check.Interval.String()
}
return c.client.Agent().CheckRegister(&chkReg)
return &chkReg
}
// createService creates a Consul AgentService from a Nomad Service
@ -315,8 +319,8 @@ func (c *ConsulService) performSync() error {
}
}
for checkID, check := range c.trackedChecks {
if chk, ok := cChecks[checkID]; !ok {
if err := c.registerCheck(check, c.trackedServices[chk.ServiceID]); err != nil {
if _, ok := cChecks[checkID]; !ok {
if err := c.registerCheck(check); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}

View File

@ -645,7 +645,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
waitCh: make(chan *cstructs.WaitResult, 1),
}
if err := exec.RegisterServices(); err != nil {
d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %v", task)
d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %q: %v", task.Name, err)
}
go h.run()
return h, nil

View File

@ -142,7 +142,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
waitCh: make(chan *cstructs.WaitResult, 1),
}
if err := exec.RegisterServices(); err != nil {
d.logger.Printf("[ERR] driver.exec: error registering services with consul for task: %v", task)
d.logger.Printf("[ERR] driver.exec: error registering services with consul for task: %q: %v", task.Name, err)
}
go h.run()
return h, nil

View File

@ -201,7 +201,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
waitCh: make(chan *cstructs.WaitResult, 1),
}
if err := h.executor.RegisterServices(); err != nil {
d.logger.Printf("[ERR] driver.java: error registering services with consul for task: %v", task)
d.logger.Printf("[ERR] driver.java: error registering services with consul for task: %q: %v", task.Name, err)
}
go h.run()
return h, nil

View File

@ -224,7 +224,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
}
if err := h.executor.RegisterServices(); err != nil {
h.logger.Printf("[ERR] driver.qemu: error registering services: %v", err)
h.logger.Printf("[ERR] driver.qemu: error registering services for task: %q: %v", task.Name, err)
}
go h.run()
return h, nil

View File

@ -134,7 +134,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
waitCh: make(chan *cstructs.WaitResult, 1),
}
if err := h.executor.RegisterServices(); err != nil {
h.logger.Printf("[ERR] driver.raw_exec: error registering services with consul: %v", err)
h.logger.Printf("[ERR] driver.raw_exec: error registering services with consul for task: %q: %v", task.Name, err)
}
go h.run()
return h, nil

View File

@ -269,7 +269,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
waitCh: make(chan *cstructs.WaitResult, 1),
}
if h.executor.RegisterServices(); err != nil {
h.logger.Printf("[ERR] driver.rkt: error registering services: %v", err)
h.logger.Printf("[ERR] driver.rkt: error registering services for task: %q: %v", task.Name, err)
}
go h.run()
return h, nil