diff --git a/client/consul.go b/client/consul.go index 8250d0a1f..4b6d1a939 100644 --- a/client/consul.go +++ b/client/consul.go @@ -38,7 +38,9 @@ type ConsulClient struct { shutdownCh chan struct{} trackedServices map[string]*trackedService // Service ID to Tracked Service Map + trackedChecks map[string]bool // List of check ids that is being tracked trackedSrvLock sync.Mutex + trackedChkLock sync.Mutex } func NewConsulClient(logger *log.Logger, consulAddr string) (*ConsulClient, error) { @@ -61,9 +63,6 @@ func NewConsulClient(logger *log.Logger, consulAddr string) (*ConsulClient, erro } func (c *ConsulClient) Register(task *structs.Task, allocID string) error { - // Removing the service first so that we can re-sync everything cleanly - c.Deregister(task) - var mErr multierror.Error for _, service := range task.Services { c.logger.Printf("[INFO] consul: Registering service %s with Consul.", service.Name) @@ -156,19 +155,17 @@ func (c *ConsulClient) SyncWithConsul() { func (c *ConsulClient) registerService(service *structs.Service, task *structs.Task, allocID string) error { var mErr multierror.Error - service.Id = fmt.Sprintf("%s-%s", allocID, service.Name) + service.Id = service.Hash() host, port := c.findPortAndHostForLabel(service.PortLabel, task) if host == "" || port == 0 { return fmt.Errorf("consul: The port:%s marked for registration of service: %s couldn't be found", service.PortLabel, service.Name) } - checks := c.makeChecks(service, host, port) asr := &consul.AgentServiceRegistration{ ID: service.Id, Name: service.Name, Tags: service.Tags, Port: port, Address: host, - Checks: checks, } ts := &trackedService{ allocId: allocID, @@ -183,6 +180,16 @@ func (c *ConsulClient) registerService(service *structs.Service, task *structs.T c.logger.Printf("[ERROR] consul: Error while registering service %v with Consul: %v", service.Name, err) mErr.Errors = append(mErr.Errors, err) } + checks := c.makeChecks(service, host, port) + for _, check := range checks { + if err := c.client.Agent().CheckRegister(check); err != nil { + c.logger.Printf("[ERROR] consul: Error while registerting check %v with Consul: %v", check.Name, err) + mErr.Errors = append(mErr.Errors, err) + } + c.trackedChkLock.Lock() + c.trackedChecks[check.ID] = true + c.trackedChkLock.Unlock() + } return mErr.ErrorOrNil() } @@ -197,13 +204,19 @@ func (c *ConsulClient) deregisterService(serviceId string) error { return nil } -func (c *ConsulClient) makeChecks(service *structs.Service, ip string, port int) []*consul.AgentServiceCheck { - var checks []*consul.AgentServiceCheck +func (c *ConsulClient) makeChecks(service *structs.Service, ip string, port int) []*consul.AgentCheckRegistration { + var checks []*consul.AgentCheckRegistration for _, check := range service.Checks { - c := &consul.AgentServiceCheck{ - Interval: check.Interval.String(), - Timeout: check.Timeout.String(), + if check.Name == "" { + check.Name = fmt.Sprintf("service: '%s' check", service.Name) } + cr := &consul.AgentCheckRegistration{ + ID: check.Hash(), + Name: check.Name, + ServiceID: service.Id, + } + cr.Interval = check.Interval.String() + cr.Timeout = check.Timeout.String() switch check.Type { case structs.ServiceCheckHTTP: if check.Protocol == "" { @@ -214,13 +227,14 @@ func (c *ConsulClient) makeChecks(service *structs.Service, ip string, port int) Host: fmt.Sprintf("%s:%d", ip, port), Path: check.Path, } - c.HTTP = url.String() + cr.HTTP = url.String() case structs.ServiceCheckTCP: - c.TCP = fmt.Sprintf("%s:%d", ip, port) + cr.TCP = fmt.Sprintf("%s:%d", ip, port) case structs.ServiceCheckScript: - c.Script = check.Script // TODO This needs to include the path of the alloc dir and based on driver types + cr.Script = check.Script // TODO This needs to include the path of the alloc dir and based on driver types } - checks = append(checks, c) + + checks = append(checks, cr) } return checks } diff --git a/client/task_runner.go b/client/task_runner.go index fd01b1f96..749b4d6d4 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -251,10 +251,6 @@ func (r *TaskRunner) run() { if err := r.handle.Update(update); err != nil { r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) } - - if err := r.consulClient.Register(update, r.allocID); err != nil { - r.logger.Printf("[ERR] client: failed to update service definition: %v", err) - } case <-r.destroyCh: // Avoid destroying twice if destroyed { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 721740404..f7a3a1b04 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1072,6 +1072,14 @@ func (s *Service) Validate() error { return mErr.ErrorOrNil() } +func (s *Service) Hash() string { + h := sha1.New() + io.WriteString(h, s.Name) + io.WriteString(h, strings.Join(s.Tags, "")) + io.WriteString(h, s.PortLabel) + return fmt.Sprintf("%x", h.Sum(nil)) +} + // Task is a single process typically that is executed as part of a task group. type Task struct { // Name of the task