Registering Checks independently

This commit is contained in:
Diptanu Choudhury 2015-11-22 23:27:59 -08:00
parent f9c3d16b9f
commit a3d5b266a0
3 changed files with 37 additions and 19 deletions

View File

@ -38,7 +38,9 @@ type ConsulClient struct {
shutdownCh chan struct{}
trackedServices map[string]*trackedService // Service ID to Tracked Service Map
trackedChecks map[string]bool // List of check ids that is being tracked
trackedSrvLock sync.Mutex
trackedChkLock sync.Mutex
}
func NewConsulClient(logger *log.Logger, consulAddr string) (*ConsulClient, error) {
@ -61,9 +63,6 @@ func NewConsulClient(logger *log.Logger, consulAddr string) (*ConsulClient, erro
}
func (c *ConsulClient) Register(task *structs.Task, allocID string) error {
// Removing the service first so that we can re-sync everything cleanly
c.Deregister(task)
var mErr multierror.Error
for _, service := range task.Services {
c.logger.Printf("[INFO] consul: Registering service %s with Consul.", service.Name)
@ -156,19 +155,17 @@ func (c *ConsulClient) SyncWithConsul() {
func (c *ConsulClient) registerService(service *structs.Service, task *structs.Task, allocID string) error {
var mErr multierror.Error
service.Id = fmt.Sprintf("%s-%s", allocID, service.Name)
service.Id = service.Hash()
host, port := c.findPortAndHostForLabel(service.PortLabel, task)
if host == "" || port == 0 {
return fmt.Errorf("consul: The port:%s marked for registration of service: %s couldn't be found", service.PortLabel, service.Name)
}
checks := c.makeChecks(service, host, port)
asr := &consul.AgentServiceRegistration{
ID: service.Id,
Name: service.Name,
Tags: service.Tags,
Port: port,
Address: host,
Checks: checks,
}
ts := &trackedService{
allocId: allocID,
@ -183,6 +180,16 @@ func (c *ConsulClient) registerService(service *structs.Service, task *structs.T
c.logger.Printf("[ERROR] consul: Error while registering service %v with Consul: %v", service.Name, err)
mErr.Errors = append(mErr.Errors, err)
}
checks := c.makeChecks(service, host, port)
for _, check := range checks {
if err := c.client.Agent().CheckRegister(check); err != nil {
c.logger.Printf("[ERROR] consul: Error while registerting check %v with Consul: %v", check.Name, err)
mErr.Errors = append(mErr.Errors, err)
}
c.trackedChkLock.Lock()
c.trackedChecks[check.ID] = true
c.trackedChkLock.Unlock()
}
return mErr.ErrorOrNil()
}
@ -197,13 +204,19 @@ func (c *ConsulClient) deregisterService(serviceId string) error {
return nil
}
func (c *ConsulClient) makeChecks(service *structs.Service, ip string, port int) []*consul.AgentServiceCheck {
var checks []*consul.AgentServiceCheck
func (c *ConsulClient) makeChecks(service *structs.Service, ip string, port int) []*consul.AgentCheckRegistration {
var checks []*consul.AgentCheckRegistration
for _, check := range service.Checks {
c := &consul.AgentServiceCheck{
Interval: check.Interval.String(),
Timeout: check.Timeout.String(),
if check.Name == "" {
check.Name = fmt.Sprintf("service: '%s' check", service.Name)
}
cr := &consul.AgentCheckRegistration{
ID: check.Hash(),
Name: check.Name,
ServiceID: service.Id,
}
cr.Interval = check.Interval.String()
cr.Timeout = check.Timeout.String()
switch check.Type {
case structs.ServiceCheckHTTP:
if check.Protocol == "" {
@ -214,13 +227,14 @@ func (c *ConsulClient) makeChecks(service *structs.Service, ip string, port int)
Host: fmt.Sprintf("%s:%d", ip, port),
Path: check.Path,
}
c.HTTP = url.String()
cr.HTTP = url.String()
case structs.ServiceCheckTCP:
c.TCP = fmt.Sprintf("%s:%d", ip, port)
cr.TCP = fmt.Sprintf("%s:%d", ip, port)
case structs.ServiceCheckScript:
c.Script = check.Script // TODO This needs to include the path of the alloc dir and based on driver types
cr.Script = check.Script // TODO This needs to include the path of the alloc dir and based on driver types
}
checks = append(checks, c)
checks = append(checks, cr)
}
return checks
}

View File

@ -251,10 +251,6 @@ func (r *TaskRunner) run() {
if err := r.handle.Update(update); err != nil {
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err)
}
if err := r.consulClient.Register(update, r.allocID); err != nil {
r.logger.Printf("[ERR] client: failed to update service definition: %v", err)
}
case <-r.destroyCh:
// Avoid destroying twice
if destroyed {

View File

@ -1072,6 +1072,14 @@ func (s *Service) Validate() error {
return mErr.ErrorOrNil()
}
func (s *Service) Hash() string {
h := sha1.New()
io.WriteString(h, s.Name)
io.WriteString(h, strings.Join(s.Tags, ""))
io.WriteString(h, s.PortLabel)
return fmt.Sprintf("%x", h.Sum(nil))
}
// Task is a single process typically that is executed as part of a task group.
type Task struct {
// Name of the task