From a53e035b490a310613a168d3edf3baca21a38504 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 11 Dec 2015 10:08:57 -0800 Subject: [PATCH 1/7] Deregister services and checks which are managed by Nomad --- client/consul.go | 38 +++++++++++++++++++++++++++++++++++--- client/consul_test.go | 3 +++ nomad/structs/structs.go | 2 +- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/client/consul.go b/client/consul.go index 3440064c7..abe915fa8 100644 --- a/client/consul.go +++ b/client/consul.go @@ -195,15 +195,18 @@ func (c *ConsulService) SyncWithConsul() { // services which are no longer present in tasks func (c *ConsulService) performSync() { // Get the list of the services and that Consul knows about - consulServices, err := c.client.Services() + srvcs, err := c.client.Services() if err != nil { return } - consulChecks, err := c.client.Checks() + chks, err := c.client.Checks() if err != nil { return } - delete(consulServices, "consul") + + // Filter the services and checks that isn't managed by consul + consulServices := c.filterConsulServices(srvcs) + consulChecks := c.filterConsulChecks(chks) knownChecks := make(map[string]struct{}) knownServices := make(map[string]struct{}) @@ -345,6 +348,35 @@ func (c *ConsulService) makeCheck(service *structs.Service, check *structs.Servi return cr } +// filterConsulServices prunes out all the service whose ids are not prefixed +// with nomad- +func (c *ConsulService) filterConsulServices(srvcs map[string]*consul.AgentService) map[string]*consul.AgentService { + nomadServices := make(map[string]*consul.AgentService) + delete(srvcs, "consul") + for _, srv := range srvcs { + if strings.HasPrefix(srv.ID, "nomad-") { + nomadServices[srv.ID] = srv + } + } + return nomadServices + +} + +// filterConsulChecks prunes out all the consul checks which do not have +// services with id prefixed with noamd- +func (c *ConsulService) filterConsulChecks(chks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck { + nomadChecks := make(map[string]*consul.AgentCheck) + for _, chk := range chks { + if strings.HasPrefix(chk.ServiceID, "nomad-") { + nomadChecks[chk.CheckID] = chk + } + } + return nomadChecks + +} + +// printLogMessage prints log messages only when the node attributes have consul +// related information func (c *ConsulService) printLogMessage(message string, v ...interface{}) { if _, ok := c.node.Attributes["consul.version"]; ok { c.logger.Printf(message, v) diff --git a/client/consul_test.go b/client/consul_test.go index 9cb38ede7..87c8143bb 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -278,3 +278,6 @@ func TestConsul_ModifyCheck(t *testing.T) { t.Fatalf("Expected number of check registrations: %v, Actual: %v", 2, apiClient.checkRegisterCallCount) } } + +func TestConsul_FilterNomadServicesAndChecks(t *testing.T) { +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c817b6672..bfa4012d1 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1157,7 +1157,7 @@ type Service struct { // InitFields interpolates values of Job, Task Group and Task in the Service // Name. This also generates check names, service id and check ids. func (s *Service) InitFields(job string, taskGroup string, task string) { - s.Id = GenerateUUID() + s.Id = fmt.Sprintf("nomad-%s", GenerateUUID()) s.Name = args.ReplaceEnv(s.Name, map[string]string{ "JOB": job, "TASKGROUP": taskGroup, From d9fc07fb083fc2ddf0562b73ff86727a6d4f70f7 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 11 Dec 2015 10:47:08 -0800 Subject: [PATCH 2/7] Added a test for the filtering logic of service and clients --- client/consul_test.go | 53 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/client/consul_test.go b/client/consul_test.go index 87c8143bb..ca56263ef 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -280,4 +280,57 @@ func TestConsul_ModifyCheck(t *testing.T) { } func TestConsul_FilterNomadServicesAndChecks(t *testing.T) { + c := newConsulService() + srvs := map[string]*consul.AgentService{ + "foo-bar": { + ID: "foo-bar", + Service: "http-frontend", + Tags: []string{"global"}, + Port: 8080, + Address: "10.10.1.11", + }, + "nomad-2121212": { + ID: "nomad-2121212", + Service: "identity-service", + Tags: []string{"global"}, + Port: 8080, + Address: "10.10.1.11", + }, + } + + nomadServices := c.filterConsulServices(srvs) + + if len(nomadServices) != 1 { + t.Fatalf("Expected number of services: %v, Actual: %v", 1, len(nomadServices)) + } + + nomadServices = c.filterConsulServices(nil) + if len(nomadServices) != 0 { + t.Fatalf("Expected number of services: %v, Actual: %v", 0, len(nomadServices)) + } + + chks := map[string]*consul.AgentCheck{ + "foo-bar-chk": { + CheckID: "foo-bar-chk", + ServiceID: "foo-bar", + Name: "alive", + }, + "212121212": { + CheckID: "212121212", + ServiceID: "nomad-2121212", + Name: "ping", + }, + } + + nomadChecks := c.filterConsulChecks(chks) + + if len(nomadChecks) != 1 { + t.Fatalf("Expected number of checks: %v, Actual: %v", 1, len(nomadChecks)) + } + + nomadChecks = c.filterConsulChecks(nil) + if len(nomadChecks) != 0 { + t.Fatalf("Expected number of checks: %v, Actual: %v", 0, len(nomadChecks)) + } + } From c7c0748cea22e3658ae23c417d2e89bc763c60e5 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 11 Dec 2015 11:02:23 -0800 Subject: [PATCH 3/7] Making a struct to hold consul service config --- client/alloc_runner_test.go | 4 ++-- client/client.go | 11 ++++++++++- client/consul.go | 35 ++++++++++++++++++++++------------- client/consul_test.go | 2 +- client/task_runner_test.go | 4 ++-- 5 files changed, 37 insertions(+), 19 deletions(-) diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 8e0c2c33c..c0bb369a1 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -31,7 +31,7 @@ func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) { conf.AllocDir = os.TempDir() upd := &MockAllocStateUpdater{} alloc := mock.Alloc() - consulClient, _ := NewConsulService(logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}) + consulClient, _ := NewConsulService(&consulServiceConfig{logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) if !restarts { alloc.Job.Type = structs.JobTypeBatch *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} @@ -142,7 +142,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { } // Create a new alloc runner - consulClient, err := NewConsulService(ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}) + consulClient, err := NewConsulService(&consulServiceConfig{ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, consulClient) err = ar2.RestoreState() diff --git a/client/client.go b/client/client.go index 6f88f7144..b371b5df9 100644 --- a/client/client.go +++ b/client/client.go @@ -157,7 +157,16 @@ func (c *Client) setupConsulService() error { auth := c.config.Read("consul.auth") enableSSL := c.config.ReadBoolDefault("consul.ssl", false) verifySSL := c.config.ReadBoolDefault("consul.verifyssl", true) - if consulService, err = NewConsulService(c.logger, addr, token, auth, enableSSL, verifySSL, c.config.Node); err != nil { + consulServiceCfg := &consulServiceConfig{ + logger: c.logger, + consulAddr: addr, + token: token, + auth: auth, + enableSSL: enableSSL, + verifySSL: verifySSL, + node: c.config.Node, + } + if consulService, err = NewConsulService(consulServiceCfg); err != nil { return err } c.consulService = consulService diff --git a/client/consul.go b/client/consul.go index abe915fa8..6abc9469f 100644 --- a/client/consul.go +++ b/client/consul.go @@ -79,25 +79,34 @@ type ConsulService struct { trackedTskLock sync.Mutex } +type consulServiceConfig struct { + logger *log.Logger + consulAddr string + token string + auth string + enableSSL bool + verifySSL bool + node *structs.Node +} + // A factory method to create new consul service -func NewConsulService(logger *log.Logger, consulAddr string, token string, - auth string, enableSSL bool, verifySSL bool, node *structs.Node) (*ConsulService, error) { +func NewConsulService(config *consulServiceConfig) (*ConsulService, error) { var err error var c *consul.Client cfg := consul.DefaultConfig() - cfg.Address = consulAddr - if token != "" { - cfg.Token = token + cfg.Address = config.consulAddr + if config.token != "" { + cfg.Token = config.token } - if auth != "" { + if config.auth != "" { var username, password string - if strings.Contains(auth, ":") { - split := strings.SplitN(auth, ":", 2) + if strings.Contains(config.auth, ":") { + split := strings.SplitN(config.auth, ":", 2) username = split[0] password = split[1] } else { - username = auth + username = config.auth } cfg.HttpAuth = &consul.HttpBasicAuth{ @@ -105,10 +114,10 @@ func NewConsulService(logger *log.Logger, consulAddr string, token string, Password: password, } } - if enableSSL { + if config.enableSSL { cfg.Scheme = "https" } - if enableSSL && !verifySSL { + if config.enableSSL && !config.verifySSL { cfg.HttpClient.Transport = &http.Transport{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, @@ -122,8 +131,8 @@ func NewConsulService(logger *log.Logger, consulAddr string, token string, consulService := ConsulService{ client: &consulApiClient{client: c}, - logger: logger, - node: node, + logger: config.logger, + node: config.node, trackedTasks: make(map[string]*trackedTask), serviceStates: make(map[string]string), shutdownCh: make(chan struct{}), diff --git a/client/consul_test.go b/client/consul_test.go index ca56263ef..efb7c0c98 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -46,7 +46,7 @@ func (a *mockConsulApiClient) Checks() (map[string]*consul.AgentCheck, error) { func newConsulService() *ConsulService { logger := log.New(os.Stdout, "logger: ", log.Lshortfile) - c, _ := NewConsulService(logger, "", "", "", false, false, &structs.Node{}) + c, _ := NewConsulService(&consulServiceConfig{logger, "", "", "", false, false, &structs.Node{}}) c.client = &mockConsulApiClient{} return c } diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 96da69d4e..dcdc84577 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -32,7 +32,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { upd := &MockTaskStateUpdater{} alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] - consulClient, _ := NewConsulService(logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}) + consulClient, _ := NewConsulService(&consulServiceConfig{logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) // Initialize the port listing. This should be done by the offer process but // we have a mock so that doesn't happen. task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}} @@ -164,7 +164,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { } // Create a new task runner - consulClient, _ := NewConsulService(tr.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}) + consulClient, _ := NewConsulService(&consulServiceConfig{tr.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update, tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker, consulClient) From 1b942c158d3eaf4a5657a452704bbe09853693a5 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 11 Dec 2015 13:07:37 -0800 Subject: [PATCH 4/7] Removing extra lines --- client/consul_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/client/consul_test.go b/client/consul_test.go index efb7c0c98..e75aee74e 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -299,7 +299,6 @@ func TestConsul_FilterNomadServicesAndChecks(t *testing.T) { } nomadServices := c.filterConsulServices(srvs) - if len(nomadServices) != 1 { t.Fatalf("Expected number of services: %v, Actual: %v", 1, len(nomadServices)) } @@ -323,7 +322,6 @@ func TestConsul_FilterNomadServicesAndChecks(t *testing.T) { } nomadChecks := c.filterConsulChecks(chks) - if len(nomadChecks) != 1 { t.Fatalf("Expected number of checks: %v, Actual: %v", 1, len(nomadChecks)) } From e389674d3467e3f35f6ad5a3c63d5340c324d80c Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 11 Dec 2015 14:02:09 -0800 Subject: [PATCH 5/7] Exctracted nomad- to a constant --- client/consul.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/consul.go b/client/consul.go index 6abc9469f..9ac123ebc 100644 --- a/client/consul.go +++ b/client/consul.go @@ -363,7 +363,7 @@ func (c *ConsulService) filterConsulServices(srvcs map[string]*consul.AgentServi nomadServices := make(map[string]*consul.AgentService) delete(srvcs, "consul") for _, srv := range srvcs { - if strings.HasPrefix(srv.ID, "nomad-") { + if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) { nomadServices[srv.ID] = srv } } @@ -376,7 +376,7 @@ func (c *ConsulService) filterConsulServices(srvcs map[string]*consul.AgentServi func (c *ConsulService) filterConsulChecks(chks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck { nomadChecks := make(map[string]*consul.AgentCheck) for _, chk := range chks { - if strings.HasPrefix(chk.ServiceID, "nomad-") { + if strings.HasPrefix(chk.ServiceID, structs.NomadConsulPrefix) { nomadChecks[chk.CheckID] = chk } } From 6665efdc36769e6f982f86085bc70a2f0ef840de Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 11 Dec 2015 14:03:52 -0800 Subject: [PATCH 6/7] Adding the changes to structs --- nomad/structs/structs.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index bfa4012d1..2c91d5f93 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1145,6 +1145,10 @@ func (sc *ServiceCheck) Hash(serviceId string) string { return fmt.Sprintf("%x", h.Sum(nil)) } +const ( + NomadConsulPrefix = "nomad" +) + // The Service model represents a Consul service defintion type Service struct { Id string // Id of the service, this needs to be unique on a local machine @@ -1157,7 +1161,7 @@ type Service struct { // InitFields interpolates values of Job, Task Group and Task in the Service // Name. This also generates check names, service id and check ids. func (s *Service) InitFields(job string, taskGroup string, task string) { - s.Id = fmt.Sprintf("nomad-%s", GenerateUUID()) + s.Id = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID()) s.Name = args.ReplaceEnv(s.Name, map[string]string{ "JOB": job, "TASKGROUP": taskGroup, From f932c5f9df629214e09fbe4fb46a1c8550a00b68 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 11 Dec 2015 14:14:04 -0800 Subject: [PATCH 7/7] Refactored test and added some comments --- client/consul_test.go | 27 +++++++++++++++++++++++++-- nomad/structs/structs.go | 3 +++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/client/consul_test.go b/client/consul_test.go index e75aee74e..e7bb8012e 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -5,6 +5,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "log" "os" + "reflect" "testing" "time" ) @@ -298,9 +299,19 @@ func TestConsul_FilterNomadServicesAndChecks(t *testing.T) { }, } + expSrvcs := map[string]*consul.AgentService{ + "nomad-2121212": { + ID: "nomad-2121212", + Service: "identity-service", + Tags: []string{"global"}, + Port: 8080, + Address: "10.10.1.11", + }, + } + nomadServices := c.filterConsulServices(srvs) - if len(nomadServices) != 1 { - t.Fatalf("Expected number of services: %v, Actual: %v", 1, len(nomadServices)) + if !reflect.DeepEqual(expSrvcs, nomadServices) { + t.Fatalf("Expected: %v, Actual: %v", expSrvcs, nomadServices) } nomadServices = c.filterConsulServices(nil) @@ -321,7 +332,19 @@ func TestConsul_FilterNomadServicesAndChecks(t *testing.T) { }, } + expChks := map[string]*consul.AgentCheck{ + "212121212": { + CheckID: "212121212", + ServiceID: "nomad-2121212", + Name: "ping", + }, + } + nomadChecks := c.filterConsulChecks(chks) + if !reflect.DeepEqual(expChks, nomadChecks) { + t.Fatalf("Expected: %v, Actual: %v", expChks, nomadChecks) + } + if len(nomadChecks) != 1 { t.Fatalf("Expected number of checks: %v, Actual: %v", 1, len(nomadChecks)) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 2c91d5f93..84a171e67 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1161,6 +1161,9 @@ type Service struct { // InitFields interpolates values of Job, Task Group and Task in the Service // Name. This also generates check names, service id and check ids. func (s *Service) InitFields(job string, taskGroup string, task string) { + // We add a prefix to the Service ID so that we can know that this service + // is managed by Consul since Consul can also have service which are not + // managed by Nomad s.Id = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID()) s.Name = args.ReplaceEnv(s.Name, map[string]string{ "JOB": job,