From 404810043a581f572e719c25a96022a653ea3a6a Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 00:50:45 -0800 Subject: [PATCH 01/15] Added the implementation of consul client --- client/alloc_runner.go | 37 +++++++++++---------- client/alloc_runner_test.go | 6 ++-- client/client.go | 25 ++++++++++----- client/consul.go | 64 +++++++++++++++++++++++++++++++++++++ client/task_runner.go | 23 ++++++++++++- client/task_runner_test.go | 8 +++-- jobspec/parse.go | 4 +-- nomad/structs/structs.go | 2 +- 8 files changed, 136 insertions(+), 33 deletions(-) create mode 100644 client/consul.go diff --git a/client/alloc_runner.go b/client/alloc_runner.go index d723478db..0bee8103a 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -33,9 +33,10 @@ type AllocStateUpdater func(alloc *structs.Allocation) error // AllocRunner is used to wrap an allocation and provide the execution context. type AllocRunner struct { - config *config.Config - updater AllocStateUpdater - logger *log.Logger + config *config.Config + updater AllocStateUpdater + logger *log.Logger + consulClient *ConsulClient alloc *structs.Allocation @@ -66,18 +67,20 @@ type allocRunnerState struct { } // NewAllocRunner is used to create a new allocation context -func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater, alloc *structs.Allocation) *AllocRunner { +func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater, + alloc *structs.Allocation, consulClient *ConsulClient) *AllocRunner { ar := &AllocRunner{ - config: config, - updater: updater, - logger: logger, - alloc: alloc, - dirtyCh: make(chan struct{}, 1), - tasks: make(map[string]*TaskRunner), - restored: make(map[string]struct{}), - updateCh: make(chan *structs.Allocation, 8), - destroyCh: make(chan struct{}), - waitCh: make(chan struct{}), + config: config, + updater: updater, + logger: logger, + alloc: alloc, + consulClient: consulClient, + dirtyCh: make(chan struct{}, 1), + tasks: make(map[string]*TaskRunner), + restored: make(map[string]struct{}), + updateCh: make(chan *structs.Allocation, 8), + destroyCh: make(chan struct{}), + waitCh: make(chan struct{}), } return ar } @@ -109,7 +112,8 @@ func (r *AllocRunner) RestoreState() error { task := &structs.Task{Name: name} restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy) tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, - r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker) + r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker, + r.consulClient) r.tasks[name] = tr // Skip tasks in terminal states. @@ -320,7 +324,8 @@ func (r *AllocRunner) Run() { task.Resources = alloc.TaskResources[task.Name] restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy) tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, - r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker) + r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker, + r.consulClient) r.tasks[task.Name] = tr go tr.Run() } diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 9abe6b8a2..3b9828493 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -31,12 +31,13 @@ func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) { conf.AllocDir = os.TempDir() upd := &MockAllocStateUpdater{} alloc := mock.Alloc() + consulClient, _ := NewConsulClient() if !restarts { alloc.Job.Type = structs.JobTypeBatch *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} } - ar := NewAllocRunner(logger, conf, upd.Update, alloc) + ar := NewAllocRunner(logger, conf, upd.Update, alloc, consulClient) return upd, ar } @@ -141,8 +142,9 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { } // Create a new alloc runner + consulClient, err := NewConsulClient() ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, - &structs.Allocation{ID: ar.alloc.ID}) + &structs.Allocation{ID: ar.alloc.ID}, consulClient) err = ar2.RestoreState() if err != nil { t.Fatalf("err: %v", err) diff --git a/client/client.go b/client/client.go index 1255e7d10..527d43041 100644 --- a/client/client.go +++ b/client/client.go @@ -70,6 +70,8 @@ type Client struct { logger *log.Logger + consulClient *ConsulClient + lastServer net.Addr lastRPCTime time.Time lastServerLock sync.Mutex @@ -96,14 +98,21 @@ func NewClient(cfg *config.Config) (*Client, error) { // Create a logger logger := log.New(cfg.LogOutput, "", log.LstdFlags) + // Create the consul client + consulClient, err := NewConsulClient() + if err != nil { + return nil, fmt.Errorf("failed to create the consul client: %v", err) + } + // Create the client c := &Client{ - config: cfg, - start: time.Now(), - connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), - logger: logger, - allocs: make(map[string]*AllocRunner), - shutdownCh: make(chan struct{}), + config: cfg, + start: time.Now(), + consulClient: consulClient, + connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), + logger: logger, + allocs: make(map[string]*AllocRunner), + shutdownCh: make(chan struct{}), } // Initialize the client @@ -335,7 +344,7 @@ func (c *Client) restoreState() error { for _, entry := range list { id := entry.Name() alloc := &structs.Allocation{ID: id} - ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc) + ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulClient) c.allocs[id] = ar if err := ar.RestoreState(); err != nil { c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err) @@ -749,7 +758,7 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error { func (c *Client) addAlloc(alloc *structs.Allocation) error { c.allocLock.Lock() defer c.allocLock.Unlock() - ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc) + ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulClient) c.allocs[alloc.ID] = ar go ar.Run() return nil diff --git a/client/consul.go b/client/consul.go new file mode 100644 index 000000000..188816859 --- /dev/null +++ b/client/consul.go @@ -0,0 +1,64 @@ +package client + +import ( + "fmt" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + consulPort = 8080 +) + +type ConsulClient struct { + client *api.Client +} + +func NewConsulClient() (*ConsulClient, error) { + var err error + var c *api.Client + if c, err = api.NewClient(api.DefaultConfig()); err != nil { + return nil, err + } + + consulClient := ConsulClient{ + client: c, + } + + return &consulClient, nil +} + +func (c *ConsulClient) Register(task *structs.Task, allocID string, port int, host string) error { + var mErr multierror.Error + serviceDefns := make([]*api.AgentServiceRegistration, len(task.Services)) + for idx, service := range task.Services { + service.Id = fmt.Sprintf("%s-%s", allocID, task.Name) + asr := &api.AgentServiceRegistration{ + ID: service.Id, + Name: service.Name, + Tags: service.Tags, + Port: port, + Address: host, + } + serviceDefns[idx] = asr + } + + for _, serviceDefn := range serviceDefns { + if err := c.client.Agent().ServiceRegister(serviceDefn); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + + return mErr.ErrorOrNil() +} + +func (c *ConsulClient) DeRegister(task *structs.Task) error { + var mErr multierror.Error + for _, service := range task.Services { + if err := c.client.Agent().ServiceDeregister(service.Id); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + return mErr.ErrorOrNil() +} diff --git a/client/task_runner.go b/client/task_runner.go index 6a4e6ed46..456df9e09 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -25,6 +25,7 @@ type TaskRunner struct { ctx *driver.ExecContext allocID string restartTracker restartTracker + consulClient *ConsulClient task *structs.Task state *structs.TaskState @@ -52,13 +53,14 @@ type TaskStateUpdater func(taskName string) func NewTaskRunner(logger *log.Logger, config *config.Config, updater TaskStateUpdater, ctx *driver.ExecContext, allocID string, task *structs.Task, state *structs.TaskState, - restartTracker restartTracker) *TaskRunner { + restartTracker restartTracker, consulClient *ConsulClient) *TaskRunner { tc := &TaskRunner{ config: config, updater: updater, logger: logger, restartTracker: restartTracker, + consulClient: consulClient, ctx: ctx, allocID: allocID, task: task, @@ -231,6 +233,22 @@ func (r *TaskRunner) run() { var destroyErr error destroyed := false + // Register the services defined by the task with Consil + for _, service := range r.task.Services { + portLabel := service.PortLabel + var port int + var host string + for _, network := range r.task.Resources.Networks { + if p, ok := network.MapLabelToValues()[portLabel]; ok { + port = p + host = network.IP + break + } + } + + r.consulClient.Register(r.task, r.allocID, port, host) + } + OUTER: // Wait for updates for { @@ -303,6 +321,9 @@ func (r *TaskRunner) run() { // Set force start because we are restarting the task. forceStart = true } + + // De-Register the services belonging to the task from consul + r.consulClient.DeRegister(r.task) return } diff --git a/client/task_runner_test.go b/client/task_runner_test.go index f38cf045a..2c3eb8928 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -32,7 +32,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { upd := &MockTaskStateUpdater{} alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] - + consulClient, _ := NewConsulClient() // Initialize the port listing. This should be done by the offer process but // we have a mock so that doesn't happen. task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}} @@ -48,7 +48,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { } state := alloc.TaskStates[task.Name] - tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, state, restartTracker) + tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, state, restartTracker, consulClient) return upd, tr } @@ -164,8 +164,10 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { } // Create a new task runner + consulClient, _ := NewConsulClient() tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update, - tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker) + tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker, + consulClient) if err := tr2.RestoreState(); err != nil { t.Fatalf("err: %v", err) } diff --git a/jobspec/parse.go b/jobspec/parse.go index 92f3c5048..58dbd6c9c 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -463,7 +463,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l } func parseServices(jobName string, taskGroupName string, task *structs.Task, serviceObjs *ast.ObjectList) error { - task.Services = make([]structs.Service, len(serviceObjs.Items)) + task.Services = make([]*structs.Service, len(serviceObjs.Items)) var defaultServiceName bool for idx, o := range serviceObjs.Items { var service structs.Service @@ -503,7 +503,7 @@ func parseServices(jobName string, taskGroupName string, task *structs.Task, ser } } - task.Services[idx] = service + task.Services[idx] = &service } return nil diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 4a0f29f88..c4dbc5c45 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1064,7 +1064,7 @@ type Task struct { Env map[string]string // List of service definitions exposed by the Task - Services []Service + Services []*Service // Constraints can be specified at a task level and apply only to // the particular task. From d6da6372cdb850c06136f913b6c63243eba63020 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 01:18:29 -0800 Subject: [PATCH 02/15] Moving the logic to find port and host inside consul client --- client/consul.go | 22 ++++++++++++++++++++-- client/task_runner.go | 15 +-------------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/client/consul.go b/client/consul.go index 188816859..27ea17735 100644 --- a/client/consul.go +++ b/client/consul.go @@ -29,11 +29,15 @@ func NewConsulClient() (*ConsulClient, error) { return &consulClient, nil } -func (c *ConsulClient) Register(task *structs.Task, allocID string, port int, host string) error { +func (c *ConsulClient) Register(task *structs.Task, allocID string) error { var mErr multierror.Error - serviceDefns := make([]*api.AgentServiceRegistration, len(task.Services)) + var serviceDefns []*api.AgentServiceRegistration for idx, service := range task.Services { service.Id = fmt.Sprintf("%s-%s", allocID, task.Name) + host, port := c.findPortAndHostForLabel(service.PortLabel, task) + if host == "" || port == 0 { + continue + } asr := &api.AgentServiceRegistration{ ID: service.Id, Name: service.Name, @@ -62,3 +66,17 @@ func (c *ConsulClient) DeRegister(task *structs.Task) error { } return mErr.ErrorOrNil() } + +func (c *ConsulClient) findPortAndHostForLabel(portLabel string, task *structs.Task) (string, int) { + var host string + var port int + for _, network := range task.Resources.Networks { + if p, ok := network.MapLabelToValues()[portLabel]; ok { + host = network.IP + port = p + break + } + } + + return host, port +} diff --git a/client/task_runner.go b/client/task_runner.go index 456df9e09..bcf1f1e58 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -234,20 +234,7 @@ func (r *TaskRunner) run() { destroyed := false // Register the services defined by the task with Consil - for _, service := range r.task.Services { - portLabel := service.PortLabel - var port int - var host string - for _, network := range r.task.Resources.Networks { - if p, ok := network.MapLabelToValues()[portLabel]; ok { - port = p - host = network.IP - break - } - } - - r.consulClient.Register(r.task, r.allocID, port, host) - } + r.consulClient.Register(r.task, r.allocID) OUTER: // Wait for updates From a447b1ea3cc9f836ee28ca82fe40f449aa0ef995 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 01:20:53 -0800 Subject: [PATCH 03/15] DRYed the code --- client/consul.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/client/consul.go b/client/consul.go index 27ea17735..a7f411941 100644 --- a/client/consul.go +++ b/client/consul.go @@ -68,15 +68,10 @@ func (c *ConsulClient) DeRegister(task *structs.Task) error { } func (c *ConsulClient) findPortAndHostForLabel(portLabel string, task *structs.Task) (string, int) { - var host string - var port int for _, network := range task.Resources.Networks { if p, ok := network.MapLabelToValues()[portLabel]; ok { - host = network.IP - port = p - break + return network.IP, p } } - - return host, port + return "", 0 } From dd875f1d2bd5855e1365749d5eaa7ebb309d7994 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 02:07:07 -0800 Subject: [PATCH 04/15] Fixed the test errors --- jobspec/parse_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 6eb19af11..d452d1a70 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -94,7 +94,7 @@ func TestParse(t *testing.T) { Config: map[string]interface{}{ "image": "hashicorp/binstore", }, - Services: []structs.Service{ + Services: []*structs.Service{ { Id: "", Name: "binstore-storagelocker-binsl-binstore", From cb34f34c12ffd6457f1fdfb80bf426a6b07948bb Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 02:14:07 -0800 Subject: [PATCH 05/15] Added a logger to consul client --- client/alloc_runner_test.go | 4 ++-- client/client.go | 2 +- client/consul.go | 6 +++++- client/task_runner_test.go | 4 ++-- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 3b9828493..148a33a42 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -31,7 +31,7 @@ func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) { conf.AllocDir = os.TempDir() upd := &MockAllocStateUpdater{} alloc := mock.Alloc() - consulClient, _ := NewConsulClient() + consulClient, _ := NewConsulClient(logger) if !restarts { alloc.Job.Type = structs.JobTypeBatch *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} @@ -142,7 +142,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { } // Create a new alloc runner - consulClient, err := NewConsulClient() + consulClient, err := NewConsulClient(ar.logger) ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, consulClient) err = ar2.RestoreState() diff --git a/client/client.go b/client/client.go index 527d43041..e33872782 100644 --- a/client/client.go +++ b/client/client.go @@ -99,7 +99,7 @@ func NewClient(cfg *config.Config) (*Client, error) { logger := log.New(cfg.LogOutput, "", log.LstdFlags) // Create the consul client - consulClient, err := NewConsulClient() + consulClient, err := NewConsulClient(logger) if err != nil { return nil, fmt.Errorf("failed to create the consul client: %v", err) } diff --git a/client/consul.go b/client/consul.go index a7f411941..85fad12bc 100644 --- a/client/consul.go +++ b/client/consul.go @@ -5,6 +5,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" + "log" ) const ( @@ -13,9 +14,11 @@ const ( type ConsulClient struct { client *api.Client + + logger *log.Logger } -func NewConsulClient() (*ConsulClient, error) { +func NewConsulClient(logger *log.Logger) (*ConsulClient, error) { var err error var c *api.Client if c, err = api.NewClient(api.DefaultConfig()); err != nil { @@ -24,6 +27,7 @@ func NewConsulClient() (*ConsulClient, error) { consulClient := ConsulClient{ client: c, + logger: logger, } return &consulClient, nil diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 2c3eb8928..788b0b1d5 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -32,7 +32,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { upd := &MockTaskStateUpdater{} alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] - consulClient, _ := NewConsulClient() + consulClient, _ := NewConsulClient(logger) // Initialize the port listing. This should be done by the offer process but // we have a mock so that doesn't happen. task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}} @@ -164,7 +164,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { } // Create a new task runner - consulClient, _ := NewConsulClient() + consulClient, _ := NewConsulClient(tr.logger) tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update, tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker, consulClient) From b8c2cc81f0b3982b2660dd2a79865f7472b54385 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 02:37:34 -0800 Subject: [PATCH 06/15] Defering calling the de-register from consul call when a service is not running --- client/consul.go | 10 +++++++--- client/task_runner.go | 5 +++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/client/consul.go b/client/consul.go index 85fad12bc..b0ad31893 100644 --- a/client/consul.go +++ b/client/consul.go @@ -36,7 +36,7 @@ func NewConsulClient(logger *log.Logger) (*ConsulClient, error) { func (c *ConsulClient) Register(task *structs.Task, allocID string) error { var mErr multierror.Error var serviceDefns []*api.AgentServiceRegistration - for idx, service := range task.Services { + for _, service := range task.Services { service.Id = fmt.Sprintf("%s-%s", allocID, task.Name) host, port := c.findPortAndHostForLabel(service.PortLabel, task) if host == "" || port == 0 { @@ -49,11 +49,13 @@ func (c *ConsulClient) Register(task *structs.Task, allocID string) error { Port: port, Address: host, } - serviceDefns[idx] = asr + serviceDefns = append(serviceDefns, asr) } for _, serviceDefn := range serviceDefns { + c.logger.Printf("[INFO] Registering service %v with Consul", serviceDefn.Name) if err := c.client.Agent().ServiceRegister(serviceDefn); err != nil { + c.logger.Printf("[ERROR] Error while registering service %v with Consul: %v", serviceDefn.Name, err) mErr.Errors = append(mErr.Errors, err) } } @@ -61,10 +63,12 @@ func (c *ConsulClient) Register(task *structs.Task, allocID string) error { return mErr.ErrorOrNil() } -func (c *ConsulClient) DeRegister(task *structs.Task) error { +func (c *ConsulClient) Deregister(task *structs.Task) error { var mErr multierror.Error for _, service := range task.Services { + c.logger.Printf("[INFO] De-Registering service %v with Consul", service.Name) if err := c.client.Agent().ServiceDeregister(service.Id); err != nil { + c.logger.Printf("[ERROR] Error in de-registering service %v from Consul", service.Name) mErr.Errors = append(mErr.Errors, err) } } diff --git a/client/task_runner.go b/client/task_runner.go index bcf1f1e58..cc4bddab8 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -236,6 +236,9 @@ func (r *TaskRunner) run() { // Register the services defined by the task with Consil r.consulClient.Register(r.task, r.allocID) + // De-Register the services belonging to the task from consul + defer r.consulClient.Deregister(r.task) + OUTER: // Wait for updates for { @@ -309,8 +312,6 @@ func (r *TaskRunner) run() { forceStart = true } - // De-Register the services belonging to the task from consul - r.consulClient.DeRegister(r.task) return } From 962e10b1023d8244519b80d53dc516aeb5632643 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 03:08:53 -0800 Subject: [PATCH 07/15] Registering the checks with consul --- client/consul.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/client/consul.go b/client/consul.go index b0ad31893..26a9c0d7b 100644 --- a/client/consul.go +++ b/client/consul.go @@ -42,12 +42,14 @@ func (c *ConsulClient) Register(task *structs.Task, allocID string) error { if host == "" || port == 0 { continue } + checks := c.makeChecks(service, host, port) asr := &api.AgentServiceRegistration{ ID: service.Id, Name: service.Name, Tags: service.Tags, Port: port, Address: host, + Checks: checks, } serviceDefns = append(serviceDefns, asr) } @@ -83,3 +85,23 @@ func (c *ConsulClient) findPortAndHostForLabel(portLabel string, task *structs.T } return "", 0 } + +func (c *ConsulClient) makeChecks(service *structs.Service, ip string, port int) []*api.AgentServiceCheck { + var checks []*api.AgentServiceCheck + for _, check := range service.Checks { + c := &api.AgentServiceCheck{ + Interval: check.Interval.String(), + Timeout: check.Timeout.String(), + } + switch check.Type { + case structs.ServiceCheckHTTP: + c.HTTP = fmt.Sprintf("%s://%s:%d/%s", check.Protocol, ip, port, check.Http) + case structs.ServiceCheckTCP: + c.TCP = fmt.Sprintf("%s:%d", ip, port) + case structs.ServiceCheckScript: + c.Script = check.Script // TODO This needs to include the path of the alloc dir and based on driver types + } + checks = append(checks, c) + } + return checks +} From cc26cb9a7bd54a0f89432acfcec25632c5841720 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 04:34:23 -0800 Subject: [PATCH 08/15] Added the logic to retry services which needs to be tracked if consul doesn't respond --- client/consul.go | 130 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 95 insertions(+), 35 deletions(-) diff --git a/client/consul.go b/client/consul.go index 26a9c0d7b..5814450f9 100644 --- a/client/consul.go +++ b/client/consul.go @@ -2,32 +2,43 @@ package client import ( "fmt" - "github.com/hashicorp/consul/api" + consul "github.com/hashicorp/consul/api" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" "log" + "time" ) const ( - consulPort = 8080 + syncInterval = 5 * time.Second ) -type ConsulClient struct { - client *api.Client +type trackedService struct { + allocId string + task *structs.Task + service *structs.Service +} - logger *log.Logger +type ConsulClient struct { + client *consul.Client + logger *log.Logger + shutdownCh chan struct{} + + trackedServices map[string]*trackedService } func NewConsulClient(logger *log.Logger) (*ConsulClient, error) { var err error - var c *api.Client - if c, err = api.NewClient(api.DefaultConfig()); err != nil { + var c *consul.Client + ts := make(map[string]*trackedService) + if c, err = consul.NewClient(consul.DefaultConfig()); err != nil { return nil, err } consulClient := ConsulClient{ - client: c, - logger: logger, + client: c, + logger: logger, + trackedServices: ts, } return &consulClient, nil @@ -35,31 +46,16 @@ func NewConsulClient(logger *log.Logger) (*ConsulClient, error) { func (c *ConsulClient) Register(task *structs.Task, allocID string) error { var mErr multierror.Error - var serviceDefns []*api.AgentServiceRegistration for _, service := range task.Services { - service.Id = fmt.Sprintf("%s-%s", allocID, task.Name) - host, port := c.findPortAndHostForLabel(service.PortLabel, task) - if host == "" || port == 0 { - continue - } - checks := c.makeChecks(service, host, port) - asr := &api.AgentServiceRegistration{ - ID: service.Id, - Name: service.Name, - Tags: service.Tags, - Port: port, - Address: host, - Checks: checks, - } - serviceDefns = append(serviceDefns, asr) - } - - for _, serviceDefn := range serviceDefns { - c.logger.Printf("[INFO] Registering service %v with Consul", serviceDefn.Name) - if err := c.client.Agent().ServiceRegister(serviceDefn); err != nil { - c.logger.Printf("[ERROR] Error while registering service %v with Consul: %v", serviceDefn.Name, err) + if err := c.registerService(service, task, allocID); err != nil { mErr.Errors = append(mErr.Errors, err) } + ts := &trackedService{ + allocId: allocID, + task: task, + } + c.trackedServices[service.Id] = ts + } return mErr.ErrorOrNil() @@ -69,10 +65,11 @@ func (c *ConsulClient) Deregister(task *structs.Task) error { var mErr multierror.Error for _, service := range task.Services { c.logger.Printf("[INFO] De-Registering service %v with Consul", service.Name) - if err := c.client.Agent().ServiceDeregister(service.Id); err != nil { + if err := c.deregisterService(service.Id); err != nil { c.logger.Printf("[ERROR] Error in de-registering service %v from Consul", service.Name) mErr.Errors = append(mErr.Errors, err) } + delete(c.trackedServices, service.Id) } return mErr.ErrorOrNil() } @@ -86,10 +83,73 @@ func (c *ConsulClient) findPortAndHostForLabel(portLabel string, task *structs.T return "", 0 } -func (c *ConsulClient) makeChecks(service *structs.Service, ip string, port int) []*api.AgentServiceCheck { - var checks []*api.AgentServiceCheck +func (c *ConsulClient) SyncWithConsul() { + sync := time.After(syncInterval) + agent := c.client.Agent() + + for { + select { + case <-sync: + var consulServices map[string]*consul.AgentService + var err error + if consulServices, err = agent.Services(); err != nil { + c.logger.Printf("[DEBUG] Error while syncing services with Consul: %v", err) + continue + } + for serviceId := range c.trackedServices { + if _, ok := consulServices[serviceId]; !ok { + ts := c.trackedServices[serviceId] + c.registerService(ts.service, ts.task, ts.allocId) + } + } + + for serviceId := range consulServices { + if _, ok := c.trackedServices[serviceId]; !ok { + if err := c.deregisterService(serviceId); err != nil { + c.logger.Printf("[DEBUG] Error while de-registering service with ID: %s", serviceId) + } + } + } + case <-c.shutdownCh: + return + } + } +} + +func (c *ConsulClient) registerService(service *structs.Service, task *structs.Task, allocID string) error { + var mErr multierror.Error + service.Id = fmt.Sprintf("%s-%s", allocID, task.Name) + host, port := c.findPortAndHostForLabel(service.PortLabel, task) + if host == "" || port == 0 { + return fmt.Errorf("The port:%s marked for registration of service: %s couldn't be found", service.PortLabel, service.Name) + } + checks := c.makeChecks(service, host, port) + asr := &consul.AgentServiceRegistration{ + ID: service.Id, + Name: service.Name, + Tags: service.Tags, + Port: port, + Address: host, + Checks: checks, + } + if err := c.client.Agent().ServiceRegister(asr); err != nil { + c.logger.Printf("[ERROR] Error while registering service %v with Consul: %v", service.Name, err) + mErr.Errors = append(mErr.Errors, err) + } + return mErr.ErrorOrNil() +} + +func (c *ConsulClient) deregisterService(serviceId string) error { + if err := c.client.Agent().ServiceDeregister(serviceId); err != nil { + return err + } + return nil +} + +func (c *ConsulClient) makeChecks(service *structs.Service, ip string, port int) []*consul.AgentServiceCheck { + var checks []*consul.AgentServiceCheck for _, check := range service.Checks { - c := &api.AgentServiceCheck{ + c := &consul.AgentServiceCheck{ Interval: check.Interval.String(), Timeout: check.Timeout.String(), } From e83387191eb5ed1cb78f0c0b731fe83327fcffb5 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 04:59:57 -0800 Subject: [PATCH 09/15] Shutting down consul an not trying to de-register the consul service --- client/client.go | 6 ++++++ client/consul.go | 15 +++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/client/client.go b/client/client.go index e33872782..8e605f4a5 100644 --- a/client/client.go +++ b/client/client.go @@ -145,6 +145,9 @@ func NewClient(cfg *config.Config) (*Client, error) { // Start the client! go c.run() + + // Start the consul client + go c.consulClient.SyncWithConsul() return c, nil } @@ -209,6 +212,9 @@ func (c *Client) Shutdown() error { } } + // Stop the consul client + c.consulClient.ShutDown() + c.shutdown = true close(c.shutdownCh) c.connPool.Shutdown() diff --git a/client/consul.go b/client/consul.go index 5814450f9..db22eea56 100644 --- a/client/consul.go +++ b/client/consul.go @@ -30,7 +30,6 @@ type ConsulClient struct { func NewConsulClient(logger *log.Logger) (*ConsulClient, error) { var err error var c *consul.Client - ts := make(map[string]*trackedService) if c, err = consul.NewClient(consul.DefaultConfig()); err != nil { return nil, err } @@ -38,7 +37,8 @@ func NewConsulClient(logger *log.Logger) (*ConsulClient, error) { consulClient := ConsulClient{ client: c, logger: logger, - trackedServices: ts, + trackedServices: make(map[string]*trackedService), + shutdownCh: make(chan struct{}), } return &consulClient, nil @@ -53,6 +53,7 @@ func (c *ConsulClient) Register(task *structs.Task, allocID string) error { ts := &trackedService{ allocId: allocID, task: task, + service: service, } c.trackedServices[service.Id] = ts @@ -74,6 +75,10 @@ func (c *ConsulClient) Deregister(task *structs.Task) error { return mErr.ErrorOrNil() } +func (c *ConsulClient) ShutDown() { + close(c.shutdownCh) +} + func (c *ConsulClient) findPortAndHostForLabel(portLabel string, task *structs.Task) (string, int) { for _, network := range task.Resources.Networks { if p, ok := network.MapLabelToValues()[portLabel]; ok { @@ -90,6 +95,8 @@ func (c *ConsulClient) SyncWithConsul() { for { select { case <-sync: + sync = time.After(syncInterval) + c.logger.Printf("[DEBUG] Syncing with consul") var consulServices map[string]*consul.AgentService var err error if consulServices, err = agent.Services(); err != nil { @@ -104,6 +111,9 @@ func (c *ConsulClient) SyncWithConsul() { } for serviceId := range consulServices { + if serviceId == "consul" { + continue + } if _, ok := c.trackedServices[serviceId]; !ok { if err := c.deregisterService(serviceId); err != nil { c.logger.Printf("[DEBUG] Error while de-registering service with ID: %s", serviceId) @@ -111,6 +121,7 @@ func (c *ConsulClient) SyncWithConsul() { } } case <-c.shutdownCh: + c.logger.Printf("[INFO] Shutting down Consul Client") return } } From 2ee71ffb5930c5e448684dafb515c222598c7b60 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 05:15:52 -0800 Subject: [PATCH 10/15] Added the option to configure consul address --- client/alloc_runner_test.go | 4 ++-- client/client.go | 3 ++- client/consul.go | 6 ++++-- client/task_runner_test.go | 4 ++-- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 148a33a42..15077c76c 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -31,7 +31,7 @@ func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) { conf.AllocDir = os.TempDir() upd := &MockAllocStateUpdater{} alloc := mock.Alloc() - consulClient, _ := NewConsulClient(logger) + consulClient, _ := NewConsulClient(logger, "127.0.0.1:8500") if !restarts { alloc.Job.Type = structs.JobTypeBatch *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} @@ -142,7 +142,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { } // Create a new alloc runner - consulClient, err := NewConsulClient(ar.logger) + consulClient, err := NewConsulClient(ar.logger, "127.0.0.1:8500") ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, consulClient) err = ar2.RestoreState() diff --git a/client/client.go b/client/client.go index 8e605f4a5..040d80bbf 100644 --- a/client/client.go +++ b/client/client.go @@ -99,7 +99,8 @@ func NewClient(cfg *config.Config) (*Client, error) { logger := log.New(cfg.LogOutput, "", log.LstdFlags) // Create the consul client - consulClient, err := NewConsulClient(logger) + consulAddr := cfg.ReadDefault("consul.address", "127.0.0.1:8500") + consulClient, err := NewConsulClient(logger, consulAddr) if err != nil { return nil, fmt.Errorf("failed to create the consul client: %v", err) } diff --git a/client/consul.go b/client/consul.go index db22eea56..0e122f13a 100644 --- a/client/consul.go +++ b/client/consul.go @@ -27,10 +27,12 @@ type ConsulClient struct { trackedServices map[string]*trackedService } -func NewConsulClient(logger *log.Logger) (*ConsulClient, error) { +func NewConsulClient(logger *log.Logger, consulAddr string) (*ConsulClient, error) { var err error var c *consul.Client - if c, err = consul.NewClient(consul.DefaultConfig()); err != nil { + cfg := consul.DefaultConfig() + cfg.Address = consulAddr + if c, err = consul.NewClient(cfg); err != nil { return nil, err } diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 788b0b1d5..f8bc9e466 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -32,7 +32,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { upd := &MockTaskStateUpdater{} alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] - consulClient, _ := NewConsulClient(logger) + consulClient, _ := NewConsulClient(logger, "127.0.0.1:8500") // Initialize the port listing. This should be done by the offer process but // we have a mock so that doesn't happen. task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}} @@ -164,7 +164,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { } // Create a new task runner - consulClient, _ := NewConsulClient(tr.logger) + consulClient, _ := NewConsulClient(tr.logger, "127.0.0.1:8500") tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update, tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker, consulClient) From 2f1f3d3898db3a25ab8e02e50d9ce23695a7483b Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 05:20:57 -0800 Subject: [PATCH 11/15] Removed a debug log --- client/consul.go | 1 - 1 file changed, 1 deletion(-) diff --git a/client/consul.go b/client/consul.go index 0e122f13a..ca0feedc8 100644 --- a/client/consul.go +++ b/client/consul.go @@ -98,7 +98,6 @@ func (c *ConsulClient) SyncWithConsul() { select { case <-sync: sync = time.After(syncInterval) - c.logger.Printf("[DEBUG] Syncing with consul") var consulServices map[string]*consul.AgentService var err error if consulServices, err = agent.Services(); err != nil { From 93d0dbc974319c886733bb16488921625017da62 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 05:49:40 -0800 Subject: [PATCH 12/15] Added some docs --- client/consul.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/client/consul.go b/client/consul.go index ca0feedc8..0573364cf 100644 --- a/client/consul.go +++ b/client/consul.go @@ -100,10 +100,15 @@ func (c *ConsulClient) SyncWithConsul() { sync = time.After(syncInterval) var consulServices map[string]*consul.AgentService var err error + + // Get the list of the services that Consul knows about if consulServices, err = agent.Services(); err != nil { c.logger.Printf("[DEBUG] Error while syncing services with Consul: %v", err) continue } + + // See if we have services that Consul doesn't know about yet. + // Register with Consul the services which are not registered for serviceId := range c.trackedServices { if _, ok := consulServices[serviceId]; !ok { ts := c.trackedServices[serviceId] @@ -111,6 +116,8 @@ func (c *ConsulClient) SyncWithConsul() { } } + // See if consul thinks we have some services which are not running + // anymore on the node. We de-register those services for serviceId := range consulServices { if serviceId == "consul" { continue From 41a1e6d74b33b7454086960cc3f52f80bd9ba2b3 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 09:36:37 -0800 Subject: [PATCH 13/15] Added a lock around modification of tracked services map --- client/consul.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/client/consul.go b/client/consul.go index 0573364cf..2bb194af3 100644 --- a/client/consul.go +++ b/client/consul.go @@ -6,6 +6,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" "log" + "sync" "time" ) @@ -25,6 +26,7 @@ type ConsulClient struct { shutdownCh chan struct{} trackedServices map[string]*trackedService + trackedSrvLock sync.Mutex } func NewConsulClient(logger *log.Logger, consulAddr string) (*ConsulClient, error) { @@ -57,8 +59,9 @@ func (c *ConsulClient) Register(task *structs.Task, allocID string) error { task: task, service: service, } + c.trackedSrvLock.Lock() c.trackedServices[service.Id] = ts - + c.trackedSrvLock.Unlock() } return mErr.ErrorOrNil() @@ -72,7 +75,9 @@ func (c *ConsulClient) Deregister(task *structs.Task) error { c.logger.Printf("[ERROR] Error in de-registering service %v from Consul", service.Name) mErr.Errors = append(mErr.Errors, err) } + c.trackedSrvLock.Lock() delete(c.trackedServices, service.Id) + c.trackedSrvLock.Unlock() } return mErr.ErrorOrNil() } From 21bb78f9487c141cd96840da9047848ef05078c7 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 10:26:24 -0800 Subject: [PATCH 14/15] Removing support for scrpt checks --- nomad/structs/structs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c4dbc5c45..7fde437cc 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1024,7 +1024,7 @@ func (sc *ServiceCheck) Validate() error { if sc.Type == ServiceCheckScript && sc.Script == "" { return fmt.Errorf("Script checks need the script to invoke") } - if t != ServiceCheckTCP && t != ServiceCheckHTTP && t != ServiceCheckDocker && t != ServiceCheckScript { + if t != ServiceCheckTCP && t != ServiceCheckHTTP { return fmt.Errorf("Check with name %v has invalid check type: %s ", sc.Name, sc.Type) } return nil From 2deed3a2eb91f968a2fa7596cf0a6a9918ae27da Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 18 Nov 2015 10:32:31 -0800 Subject: [PATCH 15/15] Added a log line to indicate we are registering a service with Consul --- client/consul.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/consul.go b/client/consul.go index 2bb194af3..b3fdfe6c1 100644 --- a/client/consul.go +++ b/client/consul.go @@ -51,6 +51,7 @@ func NewConsulClient(logger *log.Logger, consulAddr string) (*ConsulClient, erro func (c *ConsulClient) Register(task *structs.Task, allocID string) error { var mErr multierror.Error for _, service := range task.Services { + c.logger.Printf("[INFO] Registering service %s with Consul.", service.Name) if err := c.registerService(service, task, allocID); err != nil { mErr.Errors = append(mErr.Errors, err) }