Handling errors during service and check registration
This commit is contained in:
parent
54a5e795a5
commit
bf748a522b
|
@ -104,14 +104,18 @@ func (c *ConsulService) SyncTask(task *structs.Task) error {
|
|||
}
|
||||
trackedService, ok := c.services[srv.ID]
|
||||
if (ok && !reflect.DeepEqual(trackedService, srv)) || !ok {
|
||||
c.registerService(srv)
|
||||
if err := c.registerService(srv); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
}
|
||||
c.services[srv.ID] = srv
|
||||
services[srv.ID] = srv
|
||||
|
||||
for _, chk := range service.Checks {
|
||||
if _, ok := c.checks[chk.ID]; !ok {
|
||||
c.registerCheck(chk, srv)
|
||||
if err := c.registerCheck(chk, srv); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
}
|
||||
c.checks[chk.ID] = chk
|
||||
checks[chk.ID] = chk
|
||||
|
@ -121,7 +125,9 @@ func (c *ConsulService) SyncTask(task *structs.Task) error {
|
|||
// Remove services that are not present anymore
|
||||
for _, service := range c.services {
|
||||
if _, ok := services[service.ID]; !ok {
|
||||
c.deregisterService(service.ID)
|
||||
if err := c.deregisterService(service.ID); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
delete(c.services, service.ID)
|
||||
}
|
||||
}
|
||||
|
@ -129,7 +135,9 @@ func (c *ConsulService) SyncTask(task *structs.Task) error {
|
|||
// Remove the checks that are not present anymore
|
||||
for _, check := range c.checks {
|
||||
if _, ok := checks[check.ID]; !ok {
|
||||
c.deregisterCheck(check.ID)
|
||||
if err := c.deregisterCheck(check.ID); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
delete(c.checks, check.ID)
|
||||
}
|
||||
}
|
||||
|
@ -138,7 +146,10 @@ func (c *ConsulService) SyncTask(task *structs.Task) error {
|
|||
|
||||
func (c *ConsulService) Shutdown() error {
|
||||
var mErr multierror.Error
|
||||
c.shutdownCh <- struct{}{}
|
||||
select {
|
||||
case c.shutdownCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
for _, service := range c.services {
|
||||
if err := c.client.Agent().ServiceDeregister(service.ID); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
|
@ -154,6 +165,7 @@ func (c *ConsulService) registerCheck(check *structs.ServiceCheck, service *cons
|
|||
ServiceID: service.ID,
|
||||
}
|
||||
chkReg.Timeout = check.Timeout.String()
|
||||
chkReg.Interval = check.Interval.String()
|
||||
switch check.Type {
|
||||
case structs.ServiceCheckHTTP:
|
||||
if check.Protocol == "" {
|
||||
|
|
|
@ -351,6 +351,10 @@ func (e *UniversalExecutor) RegisterServices() error {
|
|||
e.consulService = cs
|
||||
}
|
||||
err := e.consulService.SyncTask(e.ctx.Task)
|
||||
if err != nil {
|
||||
e.logger.Printf("executor: error registering services: %v", err)
|
||||
}
|
||||
//go e.consulService.SyncWithConsul()
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue