diff --git a/client/alloc_runner.go b/client/alloc_runner.go index d723478db..0bee8103a 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -33,9 +33,10 @@ type AllocStateUpdater func(alloc *structs.Allocation) error // AllocRunner is used to wrap an allocation and provide the execution context. type AllocRunner struct { - config *config.Config - updater AllocStateUpdater - logger *log.Logger + config *config.Config + updater AllocStateUpdater + logger *log.Logger + consulClient *ConsulClient alloc *structs.Allocation @@ -66,18 +67,20 @@ type allocRunnerState struct { } // NewAllocRunner is used to create a new allocation context -func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater, alloc *structs.Allocation) *AllocRunner { +func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater, + alloc *structs.Allocation, consulClient *ConsulClient) *AllocRunner { ar := &AllocRunner{ - config: config, - updater: updater, - logger: logger, - alloc: alloc, - dirtyCh: make(chan struct{}, 1), - tasks: make(map[string]*TaskRunner), - restored: make(map[string]struct{}), - updateCh: make(chan *structs.Allocation, 8), - destroyCh: make(chan struct{}), - waitCh: make(chan struct{}), + config: config, + updater: updater, + logger: logger, + alloc: alloc, + consulClient: consulClient, + dirtyCh: make(chan struct{}, 1), + tasks: make(map[string]*TaskRunner), + restored: make(map[string]struct{}), + updateCh: make(chan *structs.Allocation, 8), + destroyCh: make(chan struct{}), + waitCh: make(chan struct{}), } return ar } @@ -109,7 +112,8 @@ func (r *AllocRunner) RestoreState() error { task := &structs.Task{Name: name} restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy) tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, - r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker) + r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker, + r.consulClient) r.tasks[name] = tr // Skip tasks in terminal states. @@ -320,7 +324,8 @@ func (r *AllocRunner) Run() { task.Resources = alloc.TaskResources[task.Name] restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy) tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, - r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker) + r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker, + r.consulClient) r.tasks[task.Name] = tr go tr.Run() } diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 9abe6b8a2..15077c76c 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -31,12 +31,13 @@ func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) { conf.AllocDir = os.TempDir() upd := &MockAllocStateUpdater{} alloc := mock.Alloc() + consulClient, _ := NewConsulClient(logger, "127.0.0.1:8500") if !restarts { alloc.Job.Type = structs.JobTypeBatch *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} } - ar := NewAllocRunner(logger, conf, upd.Update, alloc) + ar := NewAllocRunner(logger, conf, upd.Update, alloc, consulClient) return upd, ar } @@ -141,8 +142,9 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { } // Create a new alloc runner + consulClient, err := NewConsulClient(ar.logger, "127.0.0.1:8500") ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, - &structs.Allocation{ID: ar.alloc.ID}) + &structs.Allocation{ID: ar.alloc.ID}, consulClient) err = ar2.RestoreState() if err != nil { t.Fatalf("err: %v", err) diff --git a/client/client.go b/client/client.go index 1255e7d10..040d80bbf 100644 --- a/client/client.go +++ b/client/client.go @@ -70,6 +70,8 @@ type Client struct { logger *log.Logger + consulClient *ConsulClient + lastServer net.Addr lastRPCTime time.Time lastServerLock sync.Mutex @@ -96,14 +98,22 @@ func NewClient(cfg *config.Config) (*Client, error) { // Create a logger logger := log.New(cfg.LogOutput, "", log.LstdFlags) + // Create the consul client + consulAddr := cfg.ReadDefault("consul.address", "127.0.0.1:8500") + consulClient, err := NewConsulClient(logger, consulAddr) + if err != nil { + return nil, fmt.Errorf("failed to create the consul client: %v", err) + } + // Create the client c := &Client{ - config: cfg, - start: time.Now(), - connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), - logger: logger, - allocs: make(map[string]*AllocRunner), - shutdownCh: make(chan struct{}), + config: cfg, + start: time.Now(), + consulClient: consulClient, + connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), + logger: logger, + allocs: make(map[string]*AllocRunner), + shutdownCh: make(chan struct{}), } // Initialize the client @@ -136,6 +146,9 @@ func NewClient(cfg *config.Config) (*Client, error) { // Start the client! go c.run() + + // Start the consul client + go c.consulClient.SyncWithConsul() return c, nil } @@ -200,6 +213,9 @@ func (c *Client) Shutdown() error { } } + // Stop the consul client + c.consulClient.ShutDown() + c.shutdown = true close(c.shutdownCh) c.connPool.Shutdown() @@ -335,7 +351,7 @@ func (c *Client) restoreState() error { for _, entry := range list { id := entry.Name() alloc := &structs.Allocation{ID: id} - ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc) + ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulClient) c.allocs[id] = ar if err := ar.RestoreState(); err != nil { c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err) @@ -749,7 +765,7 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error { func (c *Client) addAlloc(alloc *structs.Allocation) error { c.allocLock.Lock() defer c.allocLock.Unlock() - ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc) + ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulClient) c.allocs[alloc.ID] = ar go ar.Run() return nil diff --git a/client/consul.go b/client/consul.go new file mode 100644 index 000000000..b3fdfe6c1 --- /dev/null +++ b/client/consul.go @@ -0,0 +1,192 @@ +package client + +import ( + "fmt" + consul "github.com/hashicorp/consul/api" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/nomad/structs" + "log" + "sync" + "time" +) + +const ( + syncInterval = 5 * time.Second +) + +type trackedService struct { + allocId string + task *structs.Task + service *structs.Service +} + +type ConsulClient struct { + client *consul.Client + logger *log.Logger + shutdownCh chan struct{} + + trackedServices map[string]*trackedService + trackedSrvLock sync.Mutex +} + +func NewConsulClient(logger *log.Logger, consulAddr string) (*ConsulClient, error) { + var err error + var c *consul.Client + cfg := consul.DefaultConfig() + cfg.Address = consulAddr + if c, err = consul.NewClient(cfg); err != nil { + return nil, err + } + + consulClient := ConsulClient{ + client: c, + logger: logger, + trackedServices: make(map[string]*trackedService), + shutdownCh: make(chan struct{}), + } + + return &consulClient, nil +} + +func (c *ConsulClient) Register(task *structs.Task, allocID string) error { + var mErr multierror.Error + for _, service := range task.Services { + c.logger.Printf("[INFO] Registering service %s with Consul.", service.Name) + if err := c.registerService(service, task, allocID); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + ts := &trackedService{ + allocId: allocID, + task: task, + service: service, + } + c.trackedSrvLock.Lock() + c.trackedServices[service.Id] = ts + c.trackedSrvLock.Unlock() + } + + return mErr.ErrorOrNil() +} + +func (c *ConsulClient) Deregister(task *structs.Task) error { + var mErr multierror.Error + for _, service := range task.Services { + c.logger.Printf("[INFO] De-Registering service %v with Consul", service.Name) + if err := c.deregisterService(service.Id); err != nil { + c.logger.Printf("[ERROR] Error in de-registering service %v from Consul", service.Name) + mErr.Errors = append(mErr.Errors, err) + } + c.trackedSrvLock.Lock() + delete(c.trackedServices, service.Id) + c.trackedSrvLock.Unlock() + } + return mErr.ErrorOrNil() +} + +func (c *ConsulClient) ShutDown() { + close(c.shutdownCh) +} + +func (c *ConsulClient) findPortAndHostForLabel(portLabel string, task *structs.Task) (string, int) { + for _, network := range task.Resources.Networks { + if p, ok := network.MapLabelToValues()[portLabel]; ok { + return network.IP, p + } + } + return "", 0 +} + +func (c *ConsulClient) SyncWithConsul() { + sync := time.After(syncInterval) + agent := c.client.Agent() + + for { + select { + case <-sync: + sync = time.After(syncInterval) + var consulServices map[string]*consul.AgentService + var err error + + // Get the list of the services that Consul knows about + if consulServices, err = agent.Services(); err != nil { + c.logger.Printf("[DEBUG] Error while syncing services with Consul: %v", err) + continue + } + + // See if we have services that Consul doesn't know about yet. + // Register with Consul the services which are not registered + for serviceId := range c.trackedServices { + if _, ok := consulServices[serviceId]; !ok { + ts := c.trackedServices[serviceId] + c.registerService(ts.service, ts.task, ts.allocId) + } + } + + // See if consul thinks we have some services which are not running + // anymore on the node. We de-register those services + for serviceId := range consulServices { + if serviceId == "consul" { + continue + } + if _, ok := c.trackedServices[serviceId]; !ok { + if err := c.deregisterService(serviceId); err != nil { + c.logger.Printf("[DEBUG] Error while de-registering service with ID: %s", serviceId) + } + } + } + case <-c.shutdownCh: + c.logger.Printf("[INFO] Shutting down Consul Client") + return + } + } +} + +func (c *ConsulClient) registerService(service *structs.Service, task *structs.Task, allocID string) error { + var mErr multierror.Error + service.Id = fmt.Sprintf("%s-%s", allocID, task.Name) + host, port := c.findPortAndHostForLabel(service.PortLabel, task) + if host == "" || port == 0 { + return fmt.Errorf("The port:%s marked for registration of service: %s couldn't be found", service.PortLabel, service.Name) + } + checks := c.makeChecks(service, host, port) + asr := &consul.AgentServiceRegistration{ + ID: service.Id, + Name: service.Name, + Tags: service.Tags, + Port: port, + Address: host, + Checks: checks, + } + if err := c.client.Agent().ServiceRegister(asr); err != nil { + c.logger.Printf("[ERROR] Error while registering service %v with Consul: %v", service.Name, err) + mErr.Errors = append(mErr.Errors, err) + } + return mErr.ErrorOrNil() +} + +func (c *ConsulClient) deregisterService(serviceId string) error { + if err := c.client.Agent().ServiceDeregister(serviceId); err != nil { + return err + } + return nil +} + +func (c *ConsulClient) makeChecks(service *structs.Service, ip string, port int) []*consul.AgentServiceCheck { + var checks []*consul.AgentServiceCheck + for _, check := range service.Checks { + c := &consul.AgentServiceCheck{ + Interval: check.Interval.String(), + Timeout: check.Timeout.String(), + } + switch check.Type { + case structs.ServiceCheckHTTP: + c.HTTP = fmt.Sprintf("%s://%s:%d/%s", check.Protocol, ip, port, check.Http) + case structs.ServiceCheckTCP: + c.TCP = fmt.Sprintf("%s:%d", ip, port) + case structs.ServiceCheckScript: + c.Script = check.Script // TODO This needs to include the path of the alloc dir and based on driver types + } + checks = append(checks, c) + } + return checks +} diff --git a/client/task_runner.go b/client/task_runner.go index 6a4e6ed46..cc4bddab8 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -25,6 +25,7 @@ type TaskRunner struct { ctx *driver.ExecContext allocID string restartTracker restartTracker + consulClient *ConsulClient task *structs.Task state *structs.TaskState @@ -52,13 +53,14 @@ type TaskStateUpdater func(taskName string) func NewTaskRunner(logger *log.Logger, config *config.Config, updater TaskStateUpdater, ctx *driver.ExecContext, allocID string, task *structs.Task, state *structs.TaskState, - restartTracker restartTracker) *TaskRunner { + restartTracker restartTracker, consulClient *ConsulClient) *TaskRunner { tc := &TaskRunner{ config: config, updater: updater, logger: logger, restartTracker: restartTracker, + consulClient: consulClient, ctx: ctx, allocID: allocID, task: task, @@ -231,6 +233,12 @@ func (r *TaskRunner) run() { var destroyErr error destroyed := false + // Register the services defined by the task with Consil + r.consulClient.Register(r.task, r.allocID) + + // De-Register the services belonging to the task from consul + defer r.consulClient.Deregister(r.task) + OUTER: // Wait for updates for { @@ -303,6 +311,7 @@ func (r *TaskRunner) run() { // Set force start because we are restarting the task. forceStart = true } + return } diff --git a/client/task_runner_test.go b/client/task_runner_test.go index f38cf045a..f8bc9e466 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -32,7 +32,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { upd := &MockTaskStateUpdater{} alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] - + consulClient, _ := NewConsulClient(logger, "127.0.0.1:8500") // Initialize the port listing. This should be done by the offer process but // we have a mock so that doesn't happen. task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}} @@ -48,7 +48,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { } state := alloc.TaskStates[task.Name] - tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, state, restartTracker) + tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, state, restartTracker, consulClient) return upd, tr } @@ -164,8 +164,10 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { } // Create a new task runner + consulClient, _ := NewConsulClient(tr.logger, "127.0.0.1:8500") tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update, - tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker) + tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker, + consulClient) if err := tr2.RestoreState(); err != nil { t.Fatalf("err: %v", err) } diff --git a/jobspec/parse.go b/jobspec/parse.go index 92f3c5048..58dbd6c9c 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -463,7 +463,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l } func parseServices(jobName string, taskGroupName string, task *structs.Task, serviceObjs *ast.ObjectList) error { - task.Services = make([]structs.Service, len(serviceObjs.Items)) + task.Services = make([]*structs.Service, len(serviceObjs.Items)) var defaultServiceName bool for idx, o := range serviceObjs.Items { var service structs.Service @@ -503,7 +503,7 @@ func parseServices(jobName string, taskGroupName string, task *structs.Task, ser } } - task.Services[idx] = service + task.Services[idx] = &service } return nil diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 6eb19af11..d452d1a70 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -94,7 +94,7 @@ func TestParse(t *testing.T) { Config: map[string]interface{}{ "image": "hashicorp/binstore", }, - Services: []structs.Service{ + Services: []*structs.Service{ { Id: "", Name: "binstore-storagelocker-binsl-binstore", diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 4a0f29f88..7fde437cc 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1024,7 +1024,7 @@ func (sc *ServiceCheck) Validate() error { if sc.Type == ServiceCheckScript && sc.Script == "" { return fmt.Errorf("Script checks need the script to invoke") } - if t != ServiceCheckTCP && t != ServiceCheckHTTP && t != ServiceCheckDocker && t != ServiceCheckScript { + if t != ServiceCheckTCP && t != ServiceCheckHTTP { return fmt.Errorf("Check with name %v has invalid check type: %s ", sc.Name, sc.Type) } return nil @@ -1064,7 +1064,7 @@ type Task struct { Env map[string]string // List of service definitions exposed by the Task - Services []Service + Services []*Service // Constraints can be specified at a task level and apply only to // the particular task.