From 545f3db3fecf92fbd507a0811e2508114322506b Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Mon, 27 Apr 2015 18:26:23 -0700 Subject: [PATCH 01/11] agent: initial pass threading through tokens for services/checks --- command/agent/agent.go | 26 ++++++++++++---------- command/agent/agent_endpoint.go | 12 ++++++++-- command/agent/local.go | 39 ++++++++++++++++++++++++++++----- command/agent/structs.go | 1 + 4 files changed, 58 insertions(+), 20 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index c741240ac..f24e90f58 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -155,7 +155,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { Port: agent.config.Ports.Server, Tags: []string{}, } - agent.state.AddService(&consulService) + agent.state.AddService(&consulService, "") } else { err = agent.setupClient() agent.state.SetIface(agent.client) @@ -591,7 +591,8 @@ func (a *Agent) purgeCheck(checkID string) error { // AddService is used to add a service entry. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, persist bool) error { +func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, + persist bool, token string) error { if service.Service == "" { return fmt.Errorf("Service name missing") } @@ -621,7 +622,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, pe } // Add the service - a.state.AddService(service) + a.state.AddService(service, token) // Persist the service to a file if persist { @@ -645,7 +646,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, pe ServiceID: service.ID, ServiceName: service.Service, } - if err := a.AddCheck(check, chkType, persist); err != nil { + if err := a.AddCheck(check, chkType, persist, token); err != nil { return err } } @@ -696,7 +697,8 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { // This entry is persistent and the agent will make a best effort to // ensure it is registered. The Check may include a CheckType which // is used to automatically update the check status -func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist bool) error { +func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, + persist bool, token string) error { if check.CheckID == "" { return fmt.Errorf("CheckID missing") } @@ -775,7 +777,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist } // Add to the local state for anti-entropy - a.state.AddCheck(check) + a.state.AddCheck(check, token) // Persist the check if persist { @@ -920,7 +922,7 @@ func (a *Agent) loadServices(conf *Config) error { for _, service := range conf.Services { ns := service.NodeService() chkTypes := service.CheckTypes() - if err := a.AddService(ns, chkTypes, false); err != nil { + if err := a.AddService(ns, chkTypes, false, service.Token); err != nil { return fmt.Errorf("Failed to register service '%s': %v", service.ID, err) } } @@ -962,7 +964,7 @@ func (a *Agent) loadServices(conf *Config) error { } else { a.logger.Printf("[DEBUG] agent: restored service definition %q from %q", svc.ID, filePath) - return a.AddService(svc, nil, false) + return a.AddService(svc, nil, false, "") } }) @@ -991,7 +993,7 @@ func (a *Agent) loadChecks(conf *Config) error { for _, check := range conf.Checks { health := check.HealthCheck(conf.NodeName) chkType := &check.CheckType - if err := a.AddCheck(health, chkType, false); err != nil { + if err := a.AddCheck(health, chkType, false, ""); err != nil { return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check) } } @@ -1035,7 +1037,7 @@ func (a *Agent) loadChecks(conf *Config) error { // services into the active pool p.Check.Status = structs.HealthCritical - if err := a.AddCheck(p.Check, p.ChkType, false); err != nil { + if err := a.AddCheck(p.Check, p.ChkType, false, ""); err != nil { // Purge the check if it is unable to be restored. a.logger.Printf("[WARN] agent: Failed to restore check %q: %s", p.Check.CheckID, err) @@ -1112,7 +1114,7 @@ func (a *Agent) EnableServiceMaintenance(serviceID, reason string) error { ServiceName: service.Service, Status: structs.HealthCritical, } - a.AddCheck(check, nil, true) + a.AddCheck(check, nil, true, "") a.logger.Printf("[INFO] agent: Service %q entered maintenance mode", serviceID) return nil @@ -1158,7 +1160,7 @@ func (a *Agent) EnableNodeMaintenance(reason string) { Notes: reason, Status: structs.HealthCritical, } - a.AddCheck(check, nil, true) + a.AddCheck(check, nil, true, "") a.logger.Printf("[INFO] agent: Node entered maintenance mode") } diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 78299c78a..18a855620 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -97,8 +97,12 @@ func (s *HTTPServer) AgentRegisterCheck(resp http.ResponseWriter, req *http.Requ return nil, nil } + // Get the provided token, if any + var token string + s.parseToken(req, &token) + // Add the check - if err := s.agent.AddCheck(health, chkType, true); err != nil { + if err := s.agent.AddCheck(health, chkType, true, token); err != nil { return nil, err } s.syncChanges() @@ -199,8 +203,12 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re } } + // Get the provided token, if any + var token string + s.parseToken(req, &token) + // Add the check - if err := s.agent.AddService(ns, chkTypes, true); err != nil { + if err := s.agent.AddService(ns, chkTypes, true, token); err != nil { return nil, err } s.syncChanges() diff --git a/command/agent/local.go b/command/agent/local.go index f85d6671a..d8790b30b 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -48,10 +48,12 @@ type localState struct { // Services tracks the local services services map[string]*structs.NodeService serviceStatus map[string]syncStatus + serviceTokens map[string]string // Checks tracks the local checks checks map[string]*structs.HealthCheck checkStatus map[string]syncStatus + checkTokens map[string]string // Used to track checks that are being deferred deferCheck map[string]*time.Timer @@ -71,8 +73,10 @@ func (l *localState) Init(config *Config, logger *log.Logger) { l.logger = logger l.services = make(map[string]*structs.NodeService) l.serviceStatus = make(map[string]syncStatus) + l.serviceTokens = make(map[string]string) l.checks = make(map[string]*structs.HealthCheck) l.checkStatus = make(map[string]syncStatus) + l.checkTokens = make(map[string]string) l.deferCheck = make(map[string]*time.Timer) l.consulCh = make(chan struct{}, 1) l.triggerCh = make(chan struct{}, 1) @@ -122,7 +126,7 @@ func (l *localState) isPaused() bool { // AddService is used to add a service entry to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *localState) AddService(service *structs.NodeService) { +func (l *localState) AddService(service *structs.NodeService, token string) { // Assign the ID if none given if service.ID == "" && service.Service != "" { service.ID = service.Service @@ -133,6 +137,7 @@ func (l *localState) AddService(service *structs.NodeService) { l.services[service.ID] = service l.serviceStatus[service.ID] = syncStatus{} + l.serviceTokens[service.ID] = token l.changeMade() } @@ -163,7 +168,7 @@ func (l *localState) Services() map[string]*structs.NodeService { // AddCheck is used to add a health check to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *localState) AddCheck(check *structs.HealthCheck) { +func (l *localState) AddCheck(check *structs.HealthCheck, token string) { // Set the node name check.Node = l.config.NodeName @@ -172,6 +177,7 @@ func (l *localState) AddCheck(check *structs.HealthCheck) { l.checks[check.CheckID] = check l.checkStatus[check.CheckID] = syncStatus{} + l.checkTokens[check.CheckID] = token l.changeMade() } @@ -436,11 +442,16 @@ func (l *localState) deleteService(id string) error { return fmt.Errorf("ServiceID missing") } + token := l.serviceTokens[id] + if token == "" { + token = l.config.ACLToken + } + req := structs.DeregisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, ServiceID: id, - WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, + WriteRequest: structs.WriteRequest{Token: token}, } var out struct{} err := l.iface.RPC("Catalog.Deregister", &req, &out) @@ -457,11 +468,16 @@ func (l *localState) deleteCheck(id string) error { return fmt.Errorf("CheckID missing") } + token := l.checkTokens[id] + if token == "" { + token = l.config.ACLToken + } + req := structs.DeregisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, CheckID: id, - WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, + WriteRequest: structs.WriteRequest{Token: token}, } var out struct{} err := l.iface.RPC("Catalog.Deregister", &req, &out) @@ -474,12 +490,17 @@ func (l *localState) deleteCheck(id string) error { // syncService is used to sync a service to the server func (l *localState) syncService(id string) error { + token := l.serviceTokens[id] + if token == "" { + token = l.config.ACLToken + } + req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, Address: l.config.AdvertiseAddr, Service: l.services[id], - WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, + WriteRequest: structs.WriteRequest{Token: token}, } // If the service has associated checks that are out of sync, @@ -530,13 +551,19 @@ func (l *localState) syncCheck(id string) error { service = serv } } + + token := l.checkTokens[id] + if token == "" { + token = l.config.ACLToken + } + req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, Address: l.config.AdvertiseAddr, Service: service, Check: l.checks[id], - WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, + WriteRequest: structs.WriteRequest{Token: token}, } var out struct{} err := l.iface.RPC("Catalog.Register", &req, &out) diff --git a/command/agent/structs.go b/command/agent/structs.go index 84f606e46..a30996504 100644 --- a/command/agent/structs.go +++ b/command/agent/structs.go @@ -13,6 +13,7 @@ type ServiceDefinition struct { Port int Check CheckType Checks CheckTypes + Token string } func (s *ServiceDefinition) NodeService() *structs.NodeService { From 30f6f1142e79e2e033ef97c8eadf25a7784b7392 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Mon, 27 Apr 2015 19:01:02 -0700 Subject: [PATCH 02/11] agent: persist tokens from API registrations --- command/agent/agent.go | 23 ++++++++++++++--------- command/agent/check.go | 1 + command/agent/structs.go | 7 +++++++ 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index f24e90f58..9fcee037e 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -520,10 +520,14 @@ func (a *Agent) ResumeSync() { } // persistService saves a service definition to a JSON file in the data dir -func (a *Agent) persistService(service *structs.NodeService) error { +func (a *Agent) persistService(service *structs.NodeService, token string) error { svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID)) if _, err := os.Stat(svcPath); os.IsNotExist(err) { - encoded, err := json.Marshal(service) + wrapped := &persistedService{ + Token: token, + Service: service, + } + encoded, err := json.Marshal(wrapped) if err != nil { return nil } @@ -552,14 +556,14 @@ func (a *Agent) purgeService(serviceID string) error { } // persistCheck saves a check definition to the local agent's state directory -func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *CheckType) error { +func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *CheckType, token string) error { checkPath := filepath.Join(a.config.DataDir, checksDir, stringHash(check.CheckID)) if _, err := os.Stat(checkPath); !os.IsNotExist(err) { return err } // Create the persisted check - p := persistedCheck{check, chkType} + p := persistedCheck{check, chkType, token} encoded, err := json.Marshal(p) if err != nil { @@ -626,7 +630,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, // Persist the service to a file if persist { - if err := a.persistService(service); err != nil { + if err := a.persistService(service, token); err != nil { return err } } @@ -781,7 +785,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, // Persist the check if persist { - return a.persistCheck(check, chkType) + return a.persistCheck(check, chkType, token) } return nil @@ -950,10 +954,11 @@ func (a *Agent) loadServices(conf *Config) error { return err } - var svc *structs.NodeService - if err := json.Unmarshal(content, &svc); err != nil { + var wrapped *persistedService + if err := json.Unmarshal(content, &wrapped); err != nil { return err } + svc := wrapped.Service if _, ok := a.state.services[svc.ID]; ok { // Purge previously persisted service. This allows config to be @@ -964,7 +969,7 @@ func (a *Agent) loadServices(conf *Config) error { } else { a.logger.Printf("[DEBUG] agent: restored service definition %q from %q", svc.ID, filePath) - return a.AddService(svc, nil, false, "") + return a.AddService(svc, nil, false, wrapped.Token) } }) diff --git a/command/agent/check.go b/command/agent/check.go index bae76ca5d..67fae6233 100644 --- a/command/agent/check.go +++ b/command/agent/check.go @@ -257,6 +257,7 @@ func (c *CheckTTL) SetStatus(status, output string) { type persistedCheck struct { Check *structs.HealthCheck ChkType *CheckType + Token string } // CheckHTTP is used to periodically make an HTTP request to diff --git a/command/agent/structs.go b/command/agent/structs.go index a30996504..165555317 100644 --- a/command/agent/structs.go +++ b/command/agent/structs.go @@ -63,3 +63,10 @@ func (c *CheckDefinition) HealthCheck(node string) *structs.HealthCheck { } return health } + +// persistedService is used to wrap a service definition and bundle it +// with an ACL token so we can restore both at a later agent start. +type persistedService struct { + Token string + Service *structs.NodeService +} From 75d182296f1e6d9d6c4ae83762c88fcb96b9818d Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Mon, 27 Apr 2015 22:01:01 -0700 Subject: [PATCH 03/11] agent: add service/check token methods to reduce invasiveness --- command/agent/agent.go | 39 ++++++++------- command/agent/agent_endpoint.go | 6 ++- command/agent/agent_test.go | 2 +- command/agent/local.go | 84 ++++++++++++++++++++------------- 4 files changed, 75 insertions(+), 56 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 9fcee037e..47979f364 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -155,7 +155,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { Port: agent.config.Ports.Server, Tags: []string{}, } - agent.state.AddService(&consulService, "") + agent.state.AddService(&consulService) } else { err = agent.setupClient() agent.state.SetIface(agent.client) @@ -520,11 +520,11 @@ func (a *Agent) ResumeSync() { } // persistService saves a service definition to a JSON file in the data dir -func (a *Agent) persistService(service *structs.NodeService, token string) error { +func (a *Agent) persistService(service *structs.NodeService) error { svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID)) if _, err := os.Stat(svcPath); os.IsNotExist(err) { wrapped := &persistedService{ - Token: token, + Token: a.state.ServiceToken(service.ID), Service: service, } encoded, err := json.Marshal(wrapped) @@ -556,14 +556,14 @@ func (a *Agent) purgeService(serviceID string) error { } // persistCheck saves a check definition to the local agent's state directory -func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *CheckType, token string) error { +func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *CheckType) error { checkPath := filepath.Join(a.config.DataDir, checksDir, stringHash(check.CheckID)) if _, err := os.Stat(checkPath); !os.IsNotExist(err) { return err } // Create the persisted check - p := persistedCheck{check, chkType, token} + p := persistedCheck{check, chkType, a.state.CheckToken(check.CheckID)} encoded, err := json.Marshal(p) if err != nil { @@ -595,8 +595,7 @@ func (a *Agent) purgeCheck(checkID string) error { // AddService is used to add a service entry. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, - persist bool, token string) error { +func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, persist bool) error { if service.Service == "" { return fmt.Errorf("Service name missing") } @@ -626,11 +625,11 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, } // Add the service - a.state.AddService(service, token) + a.state.AddService(service) // Persist the service to a file if persist { - if err := a.persistService(service, token); err != nil { + if err := a.persistService(service); err != nil { return err } } @@ -650,7 +649,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, ServiceID: service.ID, ServiceName: service.Service, } - if err := a.AddCheck(check, chkType, persist, token); err != nil { + if err := a.AddCheck(check, chkType, persist); err != nil { return err } } @@ -701,8 +700,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { // This entry is persistent and the agent will make a best effort to // ensure it is registered. The Check may include a CheckType which // is used to automatically update the check status -func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, - persist bool, token string) error { +func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist bool) error { if check.CheckID == "" { return fmt.Errorf("CheckID missing") } @@ -781,11 +779,11 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, } // Add to the local state for anti-entropy - a.state.AddCheck(check, token) + a.state.AddCheck(check) // Persist the check if persist { - return a.persistCheck(check, chkType, token) + return a.persistCheck(check, chkType) } return nil @@ -926,7 +924,7 @@ func (a *Agent) loadServices(conf *Config) error { for _, service := range conf.Services { ns := service.NodeService() chkTypes := service.CheckTypes() - if err := a.AddService(ns, chkTypes, false, service.Token); err != nil { + if err := a.AddService(ns, chkTypes, false); err != nil { return fmt.Errorf("Failed to register service '%s': %v", service.ID, err) } } @@ -959,6 +957,7 @@ func (a *Agent) loadServices(conf *Config) error { return err } svc := wrapped.Service + a.state.AddServiceToken(svc.ID, wrapped.Token) if _, ok := a.state.services[svc.ID]; ok { // Purge previously persisted service. This allows config to be @@ -969,7 +968,7 @@ func (a *Agent) loadServices(conf *Config) error { } else { a.logger.Printf("[DEBUG] agent: restored service definition %q from %q", svc.ID, filePath) - return a.AddService(svc, nil, false, wrapped.Token) + return a.AddService(svc, nil, false) } }) @@ -998,7 +997,7 @@ func (a *Agent) loadChecks(conf *Config) error { for _, check := range conf.Checks { health := check.HealthCheck(conf.NodeName) chkType := &check.CheckType - if err := a.AddCheck(health, chkType, false, ""); err != nil { + if err := a.AddCheck(health, chkType, false); err != nil { return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check) } } @@ -1042,7 +1041,7 @@ func (a *Agent) loadChecks(conf *Config) error { // services into the active pool p.Check.Status = structs.HealthCritical - if err := a.AddCheck(p.Check, p.ChkType, false, ""); err != nil { + if err := a.AddCheck(p.Check, p.ChkType, false); err != nil { // Purge the check if it is unable to be restored. a.logger.Printf("[WARN] agent: Failed to restore check %q: %s", p.Check.CheckID, err) @@ -1119,7 +1118,7 @@ func (a *Agent) EnableServiceMaintenance(serviceID, reason string) error { ServiceName: service.Service, Status: structs.HealthCritical, } - a.AddCheck(check, nil, true, "") + a.AddCheck(check, nil, true) a.logger.Printf("[INFO] agent: Service %q entered maintenance mode", serviceID) return nil @@ -1165,7 +1164,7 @@ func (a *Agent) EnableNodeMaintenance(reason string) { Notes: reason, Status: structs.HealthCritical, } - a.AddCheck(check, nil, true, "") + a.AddCheck(check, nil, true) a.logger.Printf("[INFO] agent: Node entered maintenance mode") } diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 18a855620..d314e7d4c 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -100,9 +100,10 @@ func (s *HTTPServer) AgentRegisterCheck(resp http.ResponseWriter, req *http.Requ // Get the provided token, if any var token string s.parseToken(req, &token) + s.agent.state.AddCheckToken(health.CheckID, token) // Add the check - if err := s.agent.AddCheck(health, chkType, true, token); err != nil { + if err := s.agent.AddCheck(health, chkType, true); err != nil { return nil, err } s.syncChanges() @@ -206,9 +207,10 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re // Get the provided token, if any var token string s.parseToken(req, &token) + s.agent.state.AddServiceToken(ns.ID, token) // Add the check - if err := s.agent.AddService(ns, chkTypes, true, token); err != nil { + if err := s.agent.AddService(ns, chkTypes, true); err != nil { return nil, err } s.syncChanges() diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 33db4f6e2..af138bb7c 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -684,7 +684,7 @@ func TestAgent_PersistCheck(t *testing.T) { t.Fatalf("err: %s", err) } - p := persistedCheck{check, chkType} + p := persistedCheck{check, chkType, ""} expected, err := json.Marshal(p) if err != nil { t.Fatalf("err: %s", err) diff --git a/command/agent/local.go b/command/agent/local.go index d8790b30b..4211d72c9 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -36,7 +36,7 @@ type localState struct { // element due to a go bug. paused int32 - sync.Mutex + sync.RWMutex logger *log.Logger // Config is the agent config @@ -123,10 +123,30 @@ func (l *localState) isPaused() bool { return atomic.LoadInt32(&l.paused) == 1 } +// AddServiceToken configures the provided token for the service ID. +// The token will be used to perform service registration operations. +func (l *localState) AddServiceToken(id, token string) { + l.Lock() + defer l.Unlock() + l.serviceTokens[id] = token +} + +// ServiceToken returns the configured ACL token for the given +// service ID. If none is present, the agent's token is returned. +func (l *localState) ServiceToken(id string) string { + l.RLock() + defer l.RUnlock() + token := l.serviceTokens[id] + if token == "" { + token = l.config.ACLToken + } + return token +} + // AddService is used to add a service entry to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *localState) AddService(service *structs.NodeService, token string) { +func (l *localState) AddService(service *structs.NodeService) { // Assign the ID if none given if service.ID == "" && service.Service != "" { service.ID = service.Service @@ -137,7 +157,6 @@ func (l *localState) AddService(service *structs.NodeService, token string) { l.services[service.ID] = service l.serviceStatus[service.ID] = syncStatus{} - l.serviceTokens[service.ID] = token l.changeMade() } @@ -156,8 +175,8 @@ func (l *localState) RemoveService(serviceID string) { // agent is aware of and are being kept in sync with the server func (l *localState) Services() map[string]*structs.NodeService { services := make(map[string]*structs.NodeService) - l.Lock() - defer l.Unlock() + l.RLock() + defer l.RUnlock() for name, serv := range l.services { services[name] = serv @@ -165,10 +184,30 @@ func (l *localState) Services() map[string]*structs.NodeService { return services } +// AddCheckToken is used to configure an ACL token for a specific +// health check. The token is used during check registration operations. +func (l *localState) AddCheckToken(id, token string) { + l.Lock() + defer l.Unlock() + l.checkTokens[id] = token +} + +// CheckToken is used to return the configured health check token, or +// if none is configured, the default agent ACL token. +func (l *localState) CheckToken(id string) string { + l.RLock() + defer l.RUnlock() + token := l.checkTokens[id] + if token == "" { + token = l.config.ACLToken + } + return token +} + // AddCheck is used to add a health check to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *localState) AddCheck(check *structs.HealthCheck, token string) { +func (l *localState) AddCheck(check *structs.HealthCheck) { // Set the node name check.Node = l.config.NodeName @@ -177,7 +216,6 @@ func (l *localState) AddCheck(check *structs.HealthCheck, token string) { l.checks[check.CheckID] = check l.checkStatus[check.CheckID] = syncStatus{} - l.checkTokens[check.CheckID] = token l.changeMade() } @@ -239,8 +277,8 @@ func (l *localState) UpdateCheck(checkID, status, output string) { // agent is aware of and are being kept in sync with the server func (l *localState) Checks() map[string]*structs.HealthCheck { checks := make(map[string]*structs.HealthCheck) - l.Lock() - defer l.Unlock() + l.RLock() + defer l.RUnlock() for name, check := range l.checks { checks[name] = check @@ -442,16 +480,11 @@ func (l *localState) deleteService(id string) error { return fmt.Errorf("ServiceID missing") } - token := l.serviceTokens[id] - if token == "" { - token = l.config.ACLToken - } - req := structs.DeregisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, ServiceID: id, - WriteRequest: structs.WriteRequest{Token: token}, + WriteRequest: structs.WriteRequest{Token: l.ServiceToken(id)}, } var out struct{} err := l.iface.RPC("Catalog.Deregister", &req, &out) @@ -468,16 +501,11 @@ func (l *localState) deleteCheck(id string) error { return fmt.Errorf("CheckID missing") } - token := l.checkTokens[id] - if token == "" { - token = l.config.ACLToken - } - req := structs.DeregisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, CheckID: id, - WriteRequest: structs.WriteRequest{Token: token}, + WriteRequest: structs.WriteRequest{Token: l.CheckToken(id)}, } var out struct{} err := l.iface.RPC("Catalog.Deregister", &req, &out) @@ -490,17 +518,12 @@ func (l *localState) deleteCheck(id string) error { // syncService is used to sync a service to the server func (l *localState) syncService(id string) error { - token := l.serviceTokens[id] - if token == "" { - token = l.config.ACLToken - } - req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, Address: l.config.AdvertiseAddr, Service: l.services[id], - WriteRequest: structs.WriteRequest{Token: token}, + WriteRequest: structs.WriteRequest{Token: l.ServiceToken(id)}, } // If the service has associated checks that are out of sync, @@ -552,18 +575,13 @@ func (l *localState) syncCheck(id string) error { } } - token := l.checkTokens[id] - if token == "" { - token = l.config.ACLToken - } - req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, Address: l.config.AdvertiseAddr, Service: service, Check: l.checks[id], - WriteRequest: structs.WriteRequest{Token: token}, + WriteRequest: structs.WriteRequest{Token: l.CheckToken(id)}, } var out struct{} err := l.iface.RPC("Catalog.Register", &req, &out) From 23eab5ebccf72369963e16edb084438ae655e0f1 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Mon, 27 Apr 2015 22:26:03 -0700 Subject: [PATCH 04/11] agent: fix deadlock reading tokens from state --- command/agent/agent_test.go | 2 +- command/agent/local.go | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index af138bb7c..24fcbb98e 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -538,7 +538,7 @@ func TestAgent_PersistService(t *testing.T) { t.Fatalf("err: %s", err) } - expected, err := json.Marshal(svc) + expected, err := json.Marshal(&persistedService{Service: svc}) if err != nil { t.Fatalf("err: %s", err) } diff --git a/command/agent/local.go b/command/agent/local.go index 4211d72c9..2457f0c4f 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -133,9 +133,8 @@ func (l *localState) AddServiceToken(id, token string) { // ServiceToken returns the configured ACL token for the given // service ID. If none is present, the agent's token is returned. +// Assumes a lock is already established on the state. func (l *localState) ServiceToken(id string) string { - l.RLock() - defer l.RUnlock() token := l.serviceTokens[id] if token == "" { token = l.config.ACLToken @@ -193,10 +192,9 @@ func (l *localState) AddCheckToken(id, token string) { } // CheckToken is used to return the configured health check token, or -// if none is configured, the default agent ACL token. +// if none is configured, the default agent ACL token. Assumes a lock +// has already been taken on the state. func (l *localState) CheckToken(id string) string { - l.RLock() - defer l.RUnlock() token := l.checkTokens[id] if token == "" { token = l.config.ACLToken From 54b5f17629810621f755e46704a590a880a79141 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Mon, 27 Apr 2015 22:46:01 -0700 Subject: [PATCH 05/11] agent: test coverage loading service/check tokens from persisted files --- command/agent/agent.go | 1 + command/agent/agent_test.go | 19 +++++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 47979f364..8488af41d 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1029,6 +1029,7 @@ func (a *Agent) loadChecks(conf *Config) error { if err := json.Unmarshal(content, &p); err != nil { return err } + a.state.AddCheckToken(p.Check.CheckID, p.Token) if _, ok := a.state.checks[p.Check.CheckID]; ok { // Purge previously persisted check. This allows config to be diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 24fcbb98e..2f189e196 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -521,6 +521,9 @@ func TestAgent_PersistService(t *testing.T) { file := filepath.Join(agent.config.DataDir, servicesDir, stringHash(svc.ID)) + // Configure a service token + agent.state.AddServiceToken(svc.ID, "hello") + // Check is not persisted unless requested if err := agent.AddService(svc, nil, false); err != nil { t.Fatalf("err: %v", err) @@ -538,7 +541,10 @@ func TestAgent_PersistService(t *testing.T) { t.Fatalf("err: %s", err) } - expected, err := json.Marshal(&persistedService{Service: svc}) + expected, err := json.Marshal(&persistedService{ + Token: "hello", + Service: svc, + }) if err != nil { t.Fatalf("err: %s", err) } @@ -561,6 +567,9 @@ func TestAgent_PersistService(t *testing.T) { if _, ok := agent2.state.services[svc.ID]; !ok { t.Fatalf("bad: %#v", agent2.state.services) } + if agent2.state.serviceTokens[svc.ID] != "hello" { + t.Fatalf("bad: %#v", agent2.state.services[svc.ID]) + } } func TestAgent_PurgeService(t *testing.T) { @@ -667,6 +676,9 @@ func TestAgent_PersistCheck(t *testing.T) { file := filepath.Join(agent.config.DataDir, checksDir, stringHash(check.CheckID)) + // Configure a service registration token + agent.state.AddCheckToken(check.CheckID, "hello") + // Not persisted if not requested if err := agent.AddCheck(check, chkType, false); err != nil { t.Fatalf("err: %v", err) @@ -684,7 +696,7 @@ func TestAgent_PersistCheck(t *testing.T) { t.Fatalf("err: %s", err) } - p := persistedCheck{check, chkType, ""} + p := persistedCheck{check, chkType, "hello"} expected, err := json.Marshal(p) if err != nil { t.Fatalf("err: %s", err) @@ -717,6 +729,9 @@ func TestAgent_PersistCheck(t *testing.T) { if _, ok := agent2.checkMonitors[p.Check.CheckID]; !ok { t.Fatalf("bad: %#v", agent2.checkMonitors) } + if agent2.state.checkTokens[p.Check.CheckID] != "hello" { + t.Fatalf("bad: %s", agent2.state.checkTokens[p.Check.CheckID]) + } } func TestAgent_PurgeCheck(t *testing.T) { From f069db21e3a0461912d86cf50f934c495b8846eb Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 28 Apr 2015 11:53:53 -0700 Subject: [PATCH 06/11] agent: safer read methods for tokens --- command/agent/agent.go | 4 ++-- command/agent/agent_endpoint.go | 4 ++-- command/agent/agent_test.go | 4 ++-- command/agent/local.go | 34 ++++++++++++++++++++++----------- command/agent/local_test.go | 18 +++++++++++++++++ 5 files changed, 47 insertions(+), 17 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 8488af41d..8b2fbb427 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -957,7 +957,7 @@ func (a *Agent) loadServices(conf *Config) error { return err } svc := wrapped.Service - a.state.AddServiceToken(svc.ID, wrapped.Token) + a.state.SetServiceToken(svc.ID, wrapped.Token) if _, ok := a.state.services[svc.ID]; ok { // Purge previously persisted service. This allows config to be @@ -1029,7 +1029,7 @@ func (a *Agent) loadChecks(conf *Config) error { if err := json.Unmarshal(content, &p); err != nil { return err } - a.state.AddCheckToken(p.Check.CheckID, p.Token) + a.state.SetCheckToken(p.Check.CheckID, p.Token) if _, ok := a.state.checks[p.Check.CheckID]; ok { // Purge previously persisted check. This allows config to be diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index d314e7d4c..201b684c6 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -100,7 +100,7 @@ func (s *HTTPServer) AgentRegisterCheck(resp http.ResponseWriter, req *http.Requ // Get the provided token, if any var token string s.parseToken(req, &token) - s.agent.state.AddCheckToken(health.CheckID, token) + s.agent.state.SetCheckToken(health.CheckID, token) // Add the check if err := s.agent.AddCheck(health, chkType, true); err != nil { @@ -207,7 +207,7 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re // Get the provided token, if any var token string s.parseToken(req, &token) - s.agent.state.AddServiceToken(ns.ID, token) + s.agent.state.SetServiceToken(ns.ID, token) // Add the check if err := s.agent.AddService(ns, chkTypes, true); err != nil { diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 2f189e196..69039aff9 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -522,7 +522,7 @@ func TestAgent_PersistService(t *testing.T) { file := filepath.Join(agent.config.DataDir, servicesDir, stringHash(svc.ID)) // Configure a service token - agent.state.AddServiceToken(svc.ID, "hello") + agent.state.SetServiceToken(svc.ID, "hello") // Check is not persisted unless requested if err := agent.AddService(svc, nil, false); err != nil { @@ -677,7 +677,7 @@ func TestAgent_PersistCheck(t *testing.T) { file := filepath.Join(agent.config.DataDir, checksDir, stringHash(check.CheckID)) // Configure a service registration token - agent.state.AddCheckToken(check.CheckID, "hello") + agent.state.SetCheckToken(check.CheckID, "hello") // Not persisted if not requested if err := agent.AddCheck(check, chkType, false); err != nil { diff --git a/command/agent/local.go b/command/agent/local.go index 2457f0c4f..a0c200967 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -123,9 +123,9 @@ func (l *localState) isPaused() bool { return atomic.LoadInt32(&l.paused) == 1 } -// AddServiceToken configures the provided token for the service ID. +// SetServiceToken configures the provided token for the service ID. // The token will be used to perform service registration operations. -func (l *localState) AddServiceToken(id, token string) { +func (l *localState) SetServiceToken(id, token string) { l.Lock() defer l.Unlock() l.serviceTokens[id] = token @@ -133,8 +133,14 @@ func (l *localState) AddServiceToken(id, token string) { // ServiceToken returns the configured ACL token for the given // service ID. If none is present, the agent's token is returned. -// Assumes a lock is already established on the state. func (l *localState) ServiceToken(id string) string { + l.RLock() + defer l.RUnlock() + return l.serviceToken(id) +} + +// serviceToken returns an ACL token associated with a service. +func (l *localState) serviceToken(id string) string { token := l.serviceTokens[id] if token == "" { token = l.config.ACLToken @@ -183,18 +189,24 @@ func (l *localState) Services() map[string]*structs.NodeService { return services } -// AddCheckToken is used to configure an ACL token for a specific +// SetCheckToken is used to configure an ACL token for a specific // health check. The token is used during check registration operations. -func (l *localState) AddCheckToken(id, token string) { +func (l *localState) SetCheckToken(id, token string) { l.Lock() defer l.Unlock() l.checkTokens[id] = token } // CheckToken is used to return the configured health check token, or -// if none is configured, the default agent ACL token. Assumes a lock -// has already been taken on the state. +// if none is configured, the default agent ACL token. func (l *localState) CheckToken(id string) string { + l.RLock() + defer l.RUnlock() + return l.checkToken(id) +} + +// checkToken returns an ACL token associated with a check. +func (l *localState) checkToken(id string) string { token := l.checkTokens[id] if token == "" { token = l.config.ACLToken @@ -482,7 +494,7 @@ func (l *localState) deleteService(id string) error { Datacenter: l.config.Datacenter, Node: l.config.NodeName, ServiceID: id, - WriteRequest: structs.WriteRequest{Token: l.ServiceToken(id)}, + WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)}, } var out struct{} err := l.iface.RPC("Catalog.Deregister", &req, &out) @@ -503,7 +515,7 @@ func (l *localState) deleteCheck(id string) error { Datacenter: l.config.Datacenter, Node: l.config.NodeName, CheckID: id, - WriteRequest: structs.WriteRequest{Token: l.CheckToken(id)}, + WriteRequest: structs.WriteRequest{Token: l.checkToken(id)}, } var out struct{} err := l.iface.RPC("Catalog.Deregister", &req, &out) @@ -521,7 +533,7 @@ func (l *localState) syncService(id string) error { Node: l.config.NodeName, Address: l.config.AdvertiseAddr, Service: l.services[id], - WriteRequest: structs.WriteRequest{Token: l.ServiceToken(id)}, + WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)}, } // If the service has associated checks that are out of sync, @@ -579,7 +591,7 @@ func (l *localState) syncCheck(id string) error { Address: l.config.AdvertiseAddr, Service: service, Check: l.checks[id], - WriteRequest: structs.WriteRequest{Token: l.CheckToken(id)}, + WriteRequest: structs.WriteRequest{Token: l.checkToken(id)}, } var out struct{} err := l.iface.RPC("Catalog.Register", &req, &out) diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 480da00cd..673fd8d4b 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -615,6 +615,24 @@ func TestAgentAntiEntropy_deleteCheck_fails(t *testing.T) { } } +func TestAgent_serviceTokens(t *testing.T) { + l := new(localState) + l.Init() + l.SetServiceToken("redis", "abc123") + if token := l.ServiceToken("redis"); token != "abc123" { + t.Fatalf("bad: %s", token) + } +} + +func TestAgent_checkTokens(t *testing.T) { + l := new(localState) + l.Init() + l.SetCheckToken("mem", "abc123") + if token := l.CheckToken("mem"); token != "abc123" { + t.Fatalf("bad: %s", token) + } +} + var testRegisterRules = ` service "api" { policy = "write" From 802b4793df5e0d1cf5330099da6266865cedaf0c Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 28 Apr 2015 12:18:41 -0700 Subject: [PATCH 07/11] agent: backwards compat for persisted services from pre-0.5.1 --- command/agent/agent.go | 11 ++++++--- command/agent/agent_test.go | 46 +++++++++++++++++++++++++++++++++++++ command/agent/local_test.go | 4 ++-- 3 files changed, 56 insertions(+), 5 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 8b2fbb427..b8942beee 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -953,11 +953,16 @@ func (a *Agent) loadServices(conf *Config) error { } var wrapped *persistedService + var svc *structs.NodeService if err := json.Unmarshal(content, &wrapped); err != nil { - return err + // Backwards-compatibility for pre-0.5.1 persisted services + if err := json.Unmarshal(content, &svc); err != nil { + return fmt.Errorf("failed decoding service from %s: %s", filePath, err) + } + } else { + svc = wrapped.Service + a.state.SetServiceToken(svc.ID, wrapped.Token) } - svc := wrapped.Service - a.state.SetServiceToken(svc.ID, wrapped.Token) if _, ok := a.state.services[svc.ID]; ok { // Purge previously persisted service. This allows config to be diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 69039aff9..2bd082631 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -572,6 +572,52 @@ func TestAgent_PersistService(t *testing.T) { } } +func TestAgent_persistedCheck_compat(t *testing.T) { + // Tests backwards compatibility of persisted services from pre-0.5.1 + config := nextConfig() + dir, agent := makeAgent(t, config) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + svc := &structs.NodeService{ + ID: "redis", + Service: "redis", + Tags: []string{"foo"}, + Port: 8000, + } + + // Encode the NodeService directly. This is what previous versions + // would serialize to the file (without the wrapper) + encoded, err := json.Marshal(svc) + if err != nil { + t.Fatalf("err: %s", err) + } + + // Write the content to the file + file := filepath.Join(agent.config.DataDir, servicesDir, stringHash(svc.ID)) + if err := os.MkdirAll(filepath.Dir(file), 0700); err != nil { + t.Fatalf("err: %s", err) + } + if err := ioutil.WriteFile(file, encoded, 0600); err != nil { + t.Fatalf("err: %s", err) + } + + // Load the services + if err := agent.loadServices(config); err != nil { + t.Fatalf("err: %s", err) + } + + // Ensure the service was restored + services := agent.state.Services() + result, ok := services["redis"] + if !ok { + t.Fatalf("missing service") + } + if !reflect.DeepEqual(result, svc) { + t.Fatalf("bad: %#v", result) + } +} + func TestAgent_PurgeService(t *testing.T) { config := nextConfig() dir, agent := makeAgent(t, config) diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 673fd8d4b..e66977e5a 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -617,7 +617,7 @@ func TestAgentAntiEntropy_deleteCheck_fails(t *testing.T) { func TestAgent_serviceTokens(t *testing.T) { l := new(localState) - l.Init() + l.Init(nil, nil) l.SetServiceToken("redis", "abc123") if token := l.ServiceToken("redis"); token != "abc123" { t.Fatalf("bad: %s", token) @@ -626,7 +626,7 @@ func TestAgent_serviceTokens(t *testing.T) { func TestAgent_checkTokens(t *testing.T) { l := new(localState) - l.Init() + l.Init(nil, nil) l.SetCheckToken("mem", "abc123") if token := l.CheckToken("mem"); token != "abc123" { t.Fatalf("bad: %s", token) From 67e9a04f480f48a0ea7d883862964410dc119361 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 28 Apr 2015 12:44:46 -0700 Subject: [PATCH 08/11] agent: restore tokens for services and checks in config --- command/agent/agent.go | 12 +++++-- command/agent/agent_test.go | 63 ++++++++++++++++++++++++++++++++----- command/agent/structs.go | 1 + 3 files changed, 65 insertions(+), 11 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index b8942beee..ea646d588 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -523,7 +523,7 @@ func (a *Agent) ResumeSync() { func (a *Agent) persistService(service *structs.NodeService) error { svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID)) if _, err := os.Stat(svcPath); os.IsNotExist(err) { - wrapped := &persistedService{ + wrapped := persistedService{ Token: a.state.ServiceToken(service.ID), Service: service, } @@ -563,9 +563,13 @@ func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *CheckType) err } // Create the persisted check - p := persistedCheck{check, chkType, a.state.CheckToken(check.CheckID)} + wrapped := persistedCheck{ + Check: check, + ChkType: chkType, + Token: a.state.CheckToken(check.CheckID), + } - encoded, err := json.Marshal(p) + encoded, err := json.Marshal(wrapped) if err != nil { return nil } @@ -924,6 +928,7 @@ func (a *Agent) loadServices(conf *Config) error { for _, service := range conf.Services { ns := service.NodeService() chkTypes := service.CheckTypes() + a.state.SetServiceToken(service.ID, service.Token) if err := a.AddService(ns, chkTypes, false); err != nil { return fmt.Errorf("Failed to register service '%s': %v", service.ID, err) } @@ -1002,6 +1007,7 @@ func (a *Agent) loadChecks(conf *Config) error { for _, check := range conf.Checks { health := check.HealthCheck(conf.NodeName) chkType := &check.CheckType + a.state.SetCheckToken(check.ID, check.Token) if err := a.AddCheck(health, chkType, false); err != nil { return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check) } diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 2bd082631..8caea85ca 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -541,7 +541,7 @@ func TestAgent_PersistService(t *testing.T) { t.Fatalf("err: %s", err) } - expected, err := json.Marshal(&persistedService{ + expected, err := json.Marshal(persistedService{ Token: "hello", Service: svc, }) @@ -572,7 +572,7 @@ func TestAgent_PersistService(t *testing.T) { } } -func TestAgent_persistedCheck_compat(t *testing.T) { +func TestAgent_persistedService_compat(t *testing.T) { // Tests backwards compatibility of persisted services from pre-0.5.1 config := nextConfig() dir, agent := makeAgent(t, config) @@ -742,8 +742,11 @@ func TestAgent_PersistCheck(t *testing.T) { t.Fatalf("err: %s", err) } - p := persistedCheck{check, chkType, "hello"} - expected, err := json.Marshal(p) + expected, err := json.Marshal(persistedCheck{ + Check: check, + ChkType: chkType, + Token: "hello", + }) if err != nil { t.Fatalf("err: %s", err) } @@ -763,7 +766,7 @@ func TestAgent_PersistCheck(t *testing.T) { } defer agent2.Shutdown() - result, ok := agent2.state.checks[p.Check.CheckID] + result, ok := agent2.state.checks[check.CheckID] if !ok { t.Fatalf("bad: %#v", agent2.state.checks) } @@ -772,11 +775,11 @@ func TestAgent_PersistCheck(t *testing.T) { } // Should have restored the monitor - if _, ok := agent2.checkMonitors[p.Check.CheckID]; !ok { + if _, ok := agent2.checkMonitors[check.CheckID]; !ok { t.Fatalf("bad: %#v", agent2.checkMonitors) } - if agent2.state.checkTokens[p.Check.CheckID] != "hello" { - t.Fatalf("bad: %s", agent2.state.checkTokens[p.Check.CheckID]) + if agent2.state.checkTokens[check.CheckID] != "hello" { + t.Fatalf("bad: %s", agent2.state.checkTokens[check.CheckID]) } } @@ -867,6 +870,29 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) { } } +func TestAgent_loadChecks_token(t *testing.T) { + config := nextConfig() + config.Checks = append(config.Checks, &CheckDefinition{ + ID: "rabbitmq", + Name: "rabbitmq", + Token: "abc123", + CheckType: CheckType{ + TTL: 10 * time.Second, + }, + }) + dir, agent := makeAgent(t, config) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + checks := agent.state.Checks() + if _, ok := checks["rabbitmq"]; !ok { + t.Fatalf("missing check") + } + if token := agent.state.CheckToken("rabbitmq"); token != "abc123" { + t.Fatalf("bad: %s", token) + } +} + func TestAgent_unloadChecks(t *testing.T) { config := nextConfig() dir, agent := makeAgent(t, config) @@ -920,6 +946,27 @@ func TestAgent_unloadChecks(t *testing.T) { } } +func TestAgent_loadServices_token(t *testing.T) { + config := nextConfig() + config.Services = append(config.Services, &ServiceDefinition{ + ID: "rabbitmq", + Name: "rabbitmq", + Port: 5672, + Token: "abc123", + }) + dir, agent := makeAgent(t, config) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + services := agent.state.Services() + if _, ok := services["rabbitmq"]; !ok { + t.Fatalf("missing service") + } + if token := agent.state.ServiceToken("rabbitmq"); token != "abc123" { + t.Fatalf("bad: %s", token) + } +} + func TestAgent_unloadServices(t *testing.T) { config := nextConfig() dir, agent := makeAgent(t, config) diff --git a/command/agent/structs.go b/command/agent/structs.go index 165555317..06b969833 100644 --- a/command/agent/structs.go +++ b/command/agent/structs.go @@ -46,6 +46,7 @@ type CheckDefinition struct { Name string Notes string ServiceID string + Token string CheckType `mapstructure:",squash"` } From 2496a704526b2bbc899f1b348992224af688018b Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 28 Apr 2015 13:06:02 -0700 Subject: [PATCH 09/11] agent: more tests --- command/agent/agent_endpoint_test.go | 14 ++++++++++-- command/agent/local.go | 22 ++++++++++++++++-- command/agent/local_test.go | 34 ++++++++++++++++++++++++++-- 3 files changed, 64 insertions(+), 6 deletions(-) diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index 0387f3890..5c95c1a3e 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -252,7 +252,7 @@ func TestHTTPAgentRegisterCheck(t *testing.T) { defer srv.agent.Shutdown() // Register node - req, err := http.NewRequest("GET", "/v1/agent/check/register", nil) + req, err := http.NewRequest("GET", "/v1/agent/check/register?token=abc123", nil) if err != nil { t.Fatalf("err: %v", err) } @@ -280,6 +280,11 @@ func TestHTTPAgentRegisterCheck(t *testing.T) { if _, ok := srv.agent.checkTTLs["test"]; !ok { t.Fatalf("missing test check ttl") } + + // Ensure the token was configured + if token := srv.agent.state.CheckToken("test"); token == "" { + t.Fatalf("missing token") + } } func TestHTTPAgentDeregisterCheck(t *testing.T) { @@ -419,7 +424,7 @@ func TestHTTPAgentRegisterService(t *testing.T) { defer srv.agent.Shutdown() // Register node - req, err := http.NewRequest("GET", "/v1/agent/service/register", nil) + req, err := http.NewRequest("GET", "/v1/agent/service/register?token=abc123", nil) if err != nil { t.Fatalf("err: %v", err) } @@ -463,6 +468,11 @@ func TestHTTPAgentRegisterService(t *testing.T) { if len(srv.agent.checkTTLs) != 3 { t.Fatalf("missing test check ttls: %v", srv.agent.checkTTLs) } + + // Ensure the token was configured + if token := srv.agent.state.ServiceToken("test"); token == "" { + t.Fatalf("missing token") + } } func TestHTTPAgentDeregisterService(t *testing.T) { diff --git a/command/agent/local.go b/command/agent/local.go index a0c200967..83fefbf88 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -126,9 +126,18 @@ func (l *localState) isPaused() bool { // SetServiceToken configures the provided token for the service ID. // The token will be used to perform service registration operations. func (l *localState) SetServiceToken(id, token string) { + if token != "" { + l.Lock() + defer l.Unlock() + l.serviceTokens[id] = token + } +} + +// RemoveServiceToken is used to remove a configured service token. +func (l *localState) RemoveServiceToken(id string) { l.Lock() defer l.Unlock() - l.serviceTokens[id] = token + delete(l.serviceTokens, id) } // ServiceToken returns the configured ACL token for the given @@ -192,9 +201,18 @@ func (l *localState) Services() map[string]*structs.NodeService { // SetCheckToken is used to configure an ACL token for a specific // health check. The token is used during check registration operations. func (l *localState) SetCheckToken(id, token string) { + if token != "" { + l.Lock() + defer l.Unlock() + l.checkTokens[id] = token + } +} + +// RemoveCheckToken is used to remove a configured check token. +func (l *localState) RemoveCheckToken(id string) { l.Lock() defer l.Unlock() - l.checkTokens[id] = token + delete(l.checkTokens, id) } // CheckToken is used to return the configured health check token, or diff --git a/command/agent/local_test.go b/command/agent/local_test.go index e66977e5a..84c7c2d26 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -616,21 +616,51 @@ func TestAgentAntiEntropy_deleteCheck_fails(t *testing.T) { } func TestAgent_serviceTokens(t *testing.T) { + config := nextConfig() + config.ACLToken = "default" l := new(localState) - l.Init(nil, nil) + l.Init(config, nil) + + // Returns default when no token is set + if token := l.ServiceToken("redis"); token != "default" { + t.Fatalf("bad: %s", token) + } + + // Returns configured token l.SetServiceToken("redis", "abc123") if token := l.ServiceToken("redis"); token != "abc123" { t.Fatalf("bad: %s", token) } + + // Removes token + l.RemoveServiceToken("redis") + if token := l.ServiceToken("redis"); token != "default" { + t.Fatalf("bad: %s", token) + } } func TestAgent_checkTokens(t *testing.T) { + config := nextConfig() + config.ACLToken = "default" l := new(localState) - l.Init(nil, nil) + l.Init(config, nil) + + // Returns default when no token is set + if token := l.CheckToken("mem"); token != "default" { + t.Fatalf("bad: %s", token) + } + + // Returns configured token l.SetCheckToken("mem", "abc123") if token := l.CheckToken("mem"); token != "abc123" { t.Fatalf("bad: %s", token) } + + // Removes token + l.RemoveCheckToken("mem") + if token := l.CheckToken("mem"); token != "default" { + t.Fatalf("bad: %s", token) + } } var testRegisterRules = ` From af7409558464e79661d266928f70e921274e5a2e Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 28 Apr 2015 14:26:22 -0700 Subject: [PATCH 10/11] website: document service and check acl options --- .../source/docs/agent/checks.html.markdown | 4 ++++ .../docs/agent/http/agent.html.markdown | 14 ++++++++++++++ .../source/docs/agent/services.html.markdown | 4 ++++ .../source/docs/internals/acl.html.markdown | 19 +++++++++++++++++++ 4 files changed, 41 insertions(+) diff --git a/website/source/docs/agent/checks.html.markdown b/website/source/docs/agent/checks.html.markdown index 8f8e2342e..13f4d06f9 100644 --- a/website/source/docs/agent/checks.html.markdown +++ b/website/source/docs/agent/checks.html.markdown @@ -91,6 +91,10 @@ description of the current state of the check. With a script check, the field is set to any output generated by the script. Similarly, an external process updating a TTL check via the HTTP interface can set the `notes` value. +Checks may also contain a `token` field to provide an ACL token. This token is +used for any interaction with the catalog for the check, including +[anti-entropy syncs](/docs/internals/anti-entropy.html) and deregistration. + To configure a check, either provide it as a `-config-file` option to the agent or place it inside the `-config-dir` of the agent. The file must end in the ".json" extension to be loaded by Consul. Check definitions can diff --git a/website/source/docs/agent/http/agent.html.markdown b/website/source/docs/agent/http/agent.html.markdown index d23c50ab9..4c13e85ba 100644 --- a/website/source/docs/agent/http/agent.html.markdown +++ b/website/source/docs/agent/http/agent.html.markdown @@ -263,6 +263,13 @@ the state of the check. Optionally, a `ServiceID` can be provided to associate the registered check with an existing service provided by the agent. +This endpoint supports [ACL tokens](/docs/internals/acl.html). If the query +string includes a `?token=`, the registration will use the provided +token to authorize the request. The token is also persisted in the agent's +local configuration to enable periodic +[anti-entropy](/docs/internal/anti-entropy.html) syncs and seamless agent +restarts. + The return code is 200 on success. ### /v1/agent/check/deregister/\ @@ -346,6 +353,13 @@ If `Check` is provided, only one of `Script`, `HTTP`, or `TTL` should be specifi `Script` and `HTTP` also require `Interval`. The created check will be named "service:\". There is more information about checks [here](/docs/agent/checks.html). +This endpoint supports [ACL tokens](/docs/internals/acl.html). If the query +string includes a `?token=`, the registration will use the provided +token to authorize the request. The token is also persisted in the agent's +local configuration to enable periodic +[anti-entropy](/docs/internal/anti-entropy.html) syncs and seamless agent +restarts. + The return code is 200 on success. ### /v1/agent/service/deregister/\ diff --git a/website/source/docs/agent/services.html.markdown b/website/source/docs/agent/services.html.markdown index ab9c18dac..241ef10c2 100644 --- a/website/source/docs/agent/services.html.markdown +++ b/website/source/docs/agent/services.html.markdown @@ -51,6 +51,10 @@ The `port` field can be used as well to make a service-oriented architecture simpler to configure; this way, the address and port of a service can be discovered. +Services may also contain a `token` field to provide an ACL token. This token is +used for any interaction with the catalog for the service, including +[anti-entropy syncs](/docs/internals/anti-entropy.html) and deregistration. + A service can have an associated health check. This is a powerful feature as it allows a web balancer to gracefully remove failing nodes, a database to replace a failed slave, etc. The health check is strongly integrated in diff --git a/website/source/docs/internals/acl.html.markdown b/website/source/docs/internals/acl.html.markdown index 53b385720..775090544 100644 --- a/website/source/docs/internals/acl.html.markdown +++ b/website/source/docs/internals/acl.html.markdown @@ -189,3 +189,22 @@ This is equivalent to the following JSON input: } ``` +## Services and Checks with ACLs + +Consul allows configuring ACL policies which may control access to service and +check registration. In order to successfully register a service or check with +these types of policies in place, a token with sufficient privileges must be +provided to perform the registration into the global catalog. Consul also +performs periodic [anti-entropy](/docs/internals/anti-entropy.html) syncs, which +may require an ACL token to complete. To accommodate this, Consul provides two +methods of configuring ACL tokens to use for registration events: + +1. Using the [acl_token](/docs/agent/options.html#acl_token) configuration + directive. This allows a single token to be configured globally and used + during all service and check registration operations. +2. Providing an ACL token with service and check definitions at + registration time. This allows for greater flexibility and enables the use + of multiple tokens on the same agent. Examples of what this looks like are + available for both [services](/docs/agent/services.html) and + [checks](/docs/agent/checks.html). Tokens may also be passed to the + [HTTP API](/docs/agent/http.html) for operations that require them. From 3c577a006958e98fec9f6a1dd27c427d3d4ad9f0 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Mon, 4 May 2015 17:36:17 -0700 Subject: [PATCH 11/11] agent: use an additional parameter for passing tokens --- command/agent/agent.go | 30 +++++++------- command/agent/agent_endpoint.go | 6 +-- command/agent/agent_endpoint_test.go | 18 ++++----- command/agent/agent_test.go | 60 +++++++++++++--------------- command/agent/local.go | 42 +++---------------- command/agent/local_test.go | 42 +++++++++---------- command/agent/user_event_test.go | 4 +- command/maint_test.go | 6 +-- 8 files changed, 84 insertions(+), 124 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index ea646d588..e59b1edd4 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -155,7 +155,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { Port: agent.config.Ports.Server, Tags: []string{}, } - agent.state.AddService(&consulService) + agent.state.AddService(&consulService, "") } else { err = agent.setupClient() agent.state.SetIface(agent.client) @@ -599,7 +599,7 @@ func (a *Agent) purgeCheck(checkID string) error { // AddService is used to add a service entry. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, persist bool) error { +func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, persist bool, token string) error { if service.Service == "" { return fmt.Errorf("Service name missing") } @@ -629,7 +629,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, pe } // Add the service - a.state.AddService(service) + a.state.AddService(service, token) // Persist the service to a file if persist { @@ -653,7 +653,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, pe ServiceID: service.ID, ServiceName: service.Service, } - if err := a.AddCheck(check, chkType, persist); err != nil { + if err := a.AddCheck(check, chkType, persist, token); err != nil { return err } } @@ -704,7 +704,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { // This entry is persistent and the agent will make a best effort to // ensure it is registered. The Check may include a CheckType which // is used to automatically update the check status -func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist bool) error { +func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist bool, token string) error { if check.CheckID == "" { return fmt.Errorf("CheckID missing") } @@ -783,7 +783,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist } // Add to the local state for anti-entropy - a.state.AddCheck(check) + a.state.AddCheck(check, token) // Persist the check if persist { @@ -928,8 +928,7 @@ func (a *Agent) loadServices(conf *Config) error { for _, service := range conf.Services { ns := service.NodeService() chkTypes := service.CheckTypes() - a.state.SetServiceToken(service.ID, service.Token) - if err := a.AddService(ns, chkTypes, false); err != nil { + if err := a.AddService(ns, chkTypes, false, service.Token); err != nil { return fmt.Errorf("Failed to register service '%s': %v", service.ID, err) } } @@ -958,6 +957,7 @@ func (a *Agent) loadServices(conf *Config) error { } var wrapped *persistedService + var token string var svc *structs.NodeService if err := json.Unmarshal(content, &wrapped); err != nil { // Backwards-compatibility for pre-0.5.1 persisted services @@ -966,7 +966,7 @@ func (a *Agent) loadServices(conf *Config) error { } } else { svc = wrapped.Service - a.state.SetServiceToken(svc.ID, wrapped.Token) + token = wrapped.Token } if _, ok := a.state.services[svc.ID]; ok { @@ -978,7 +978,7 @@ func (a *Agent) loadServices(conf *Config) error { } else { a.logger.Printf("[DEBUG] agent: restored service definition %q from %q", svc.ID, filePath) - return a.AddService(svc, nil, false) + return a.AddService(svc, nil, false, token) } }) @@ -1007,8 +1007,7 @@ func (a *Agent) loadChecks(conf *Config) error { for _, check := range conf.Checks { health := check.HealthCheck(conf.NodeName) chkType := &check.CheckType - a.state.SetCheckToken(check.ID, check.Token) - if err := a.AddCheck(health, chkType, false); err != nil { + if err := a.AddCheck(health, chkType, false, check.Token); err != nil { return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check) } } @@ -1040,7 +1039,6 @@ func (a *Agent) loadChecks(conf *Config) error { if err := json.Unmarshal(content, &p); err != nil { return err } - a.state.SetCheckToken(p.Check.CheckID, p.Token) if _, ok := a.state.checks[p.Check.CheckID]; ok { // Purge previously persisted check. This allows config to be @@ -1053,7 +1051,7 @@ func (a *Agent) loadChecks(conf *Config) error { // services into the active pool p.Check.Status = structs.HealthCritical - if err := a.AddCheck(p.Check, p.ChkType, false); err != nil { + if err := a.AddCheck(p.Check, p.ChkType, false, p.Token); err != nil { // Purge the check if it is unable to be restored. a.logger.Printf("[WARN] agent: Failed to restore check %q: %s", p.Check.CheckID, err) @@ -1130,7 +1128,7 @@ func (a *Agent) EnableServiceMaintenance(serviceID, reason string) error { ServiceName: service.Service, Status: structs.HealthCritical, } - a.AddCheck(check, nil, true) + a.AddCheck(check, nil, true, "") a.logger.Printf("[INFO] agent: Service %q entered maintenance mode", serviceID) return nil @@ -1176,7 +1174,7 @@ func (a *Agent) EnableNodeMaintenance(reason string) { Notes: reason, Status: structs.HealthCritical, } - a.AddCheck(check, nil, true) + a.AddCheck(check, nil, true, "") a.logger.Printf("[INFO] agent: Node entered maintenance mode") } diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 201b684c6..18a855620 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -100,10 +100,9 @@ func (s *HTTPServer) AgentRegisterCheck(resp http.ResponseWriter, req *http.Requ // Get the provided token, if any var token string s.parseToken(req, &token) - s.agent.state.SetCheckToken(health.CheckID, token) // Add the check - if err := s.agent.AddCheck(health, chkType, true); err != nil { + if err := s.agent.AddCheck(health, chkType, true, token); err != nil { return nil, err } s.syncChanges() @@ -207,10 +206,9 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re // Get the provided token, if any var token string s.parseToken(req, &token) - s.agent.state.SetServiceToken(ns.ID, token) // Add the check - if err := s.agent.AddService(ns, chkTypes, true); err != nil { + if err := s.agent.AddService(ns, chkTypes, true, token); err != nil { return nil, err } s.syncChanges() diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index 5c95c1a3e..78469f774 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -25,7 +25,7 @@ func TestHTTPAgentServices(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - srv.agent.state.AddService(srv1) + srv.agent.state.AddService(srv1, "") obj, err := srv.AgentServices(nil, nil) if err != nil { @@ -52,7 +52,7 @@ func TestHTTPAgentChecks(t *testing.T) { Name: "mysql", Status: structs.HealthPassing, } - srv.agent.state.AddCheck(chk1) + srv.agent.state.AddCheck(chk1, "") obj, err := srv.AgentChecks(nil, nil) if err != nil { @@ -294,7 +294,7 @@ func TestHTTPAgentDeregisterCheck(t *testing.T) { defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} - if err := srv.agent.AddCheck(chk, nil, false); err != nil { + if err := srv.agent.AddCheck(chk, nil, false, ""); err != nil { t.Fatalf("err: %v", err) } @@ -326,7 +326,7 @@ func TestHTTPAgentPassCheck(t *testing.T) { chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chkType := &CheckType{TTL: 15 * time.Second} - if err := srv.agent.AddCheck(chk, chkType, false); err != nil { + if err := srv.agent.AddCheck(chk, chkType, false, ""); err != nil { t.Fatalf("err: %v", err) } @@ -359,7 +359,7 @@ func TestHTTPAgentWarnCheck(t *testing.T) { chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chkType := &CheckType{TTL: 15 * time.Second} - if err := srv.agent.AddCheck(chk, chkType, false); err != nil { + if err := srv.agent.AddCheck(chk, chkType, false, ""); err != nil { t.Fatalf("err: %v", err) } @@ -392,7 +392,7 @@ func TestHTTPAgentFailCheck(t *testing.T) { chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chkType := &CheckType{TTL: 15 * time.Second} - if err := srv.agent.AddCheck(chk, chkType, false); err != nil { + if err := srv.agent.AddCheck(chk, chkType, false, ""); err != nil { t.Fatalf("err: %v", err) } @@ -485,7 +485,7 @@ func TestHTTPAgentDeregisterService(t *testing.T) { ID: "test", Service: "test", } - if err := srv.agent.AddService(service, nil, false); err != nil { + if err := srv.agent.AddService(service, nil, false, ""); err != nil { t.Fatalf("err: %v", err) } @@ -571,7 +571,7 @@ func TestHTTPAgent_EnableServiceMaintenance(t *testing.T) { ID: "test", Service: "test", } - if err := srv.agent.AddService(service, nil, false); err != nil { + if err := srv.agent.AddService(service, nil, false, ""); err != nil { t.Fatalf("err: %v", err) } @@ -609,7 +609,7 @@ func TestHTTPAgent_DisableServiceMaintenance(t *testing.T) { ID: "test", Service: "test", } - if err := srv.agent.AddService(service, nil, false); err != nil { + if err := srv.agent.AddService(service, nil, false, ""); err != nil { t.Fatalf("err: %v", err) } diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 8caea85ca..1a2aed3a1 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -153,7 +153,7 @@ func TestAgent_AddService(t *testing.T) { Notes: "redis heath check 2", }, } - err := agent.AddService(srv, chkTypes, false) + err := agent.AddService(srv, chkTypes, false, "") if err != nil { t.Fatalf("err: %v", err) } @@ -197,7 +197,7 @@ func TestAgent_AddService(t *testing.T) { Notes: "memcache heath check 2", }, } - if err := agent.AddService(srv, chkTypes, false); err != nil { + if err := agent.AddService(srv, chkTypes, false, ""); err != nil { t.Fatalf("err: %v", err) } @@ -261,7 +261,7 @@ func TestAgent_RemoveService(t *testing.T) { } chkTypes := CheckTypes{&CheckType{TTL: time.Minute}} - if err := agent.AddService(srv, chkTypes, false); err != nil { + if err := agent.AddService(srv, chkTypes, false, ""); err != nil { t.Fatalf("err: %v", err) } @@ -284,7 +284,7 @@ func TestAgent_RemoveService(t *testing.T) { &CheckType{TTL: time.Minute}, &CheckType{TTL: 30 * time.Second}, } - if err := agent.AddService(srv, chkTypes, false); err != nil { + if err := agent.AddService(srv, chkTypes, false, ""); err != nil { t.Fatalf("err: %v", err) } @@ -331,7 +331,7 @@ func TestAgent_AddCheck(t *testing.T) { Script: "exit 0", Interval: 15 * time.Second, } - err := agent.AddCheck(health, chk, false) + err := agent.AddCheck(health, chk, false, "") if err != nil { t.Fatalf("err: %v", err) } @@ -362,7 +362,7 @@ func TestAgent_AddCheck_MinInterval(t *testing.T) { Script: "exit 0", Interval: time.Microsecond, } - err := agent.AddCheck(health, chk, false) + err := agent.AddCheck(health, chk, false, "") if err != nil { t.Fatalf("err: %v", err) } @@ -395,7 +395,7 @@ func TestAgent_AddCheck_MissingService(t *testing.T) { Script: "exit 0", Interval: time.Microsecond, } - err := agent.AddCheck(health, chk, false) + err := agent.AddCheck(health, chk, false, "") if err == nil || err.Error() != `ServiceID "baz" does not exist` { t.Fatalf("expected service id error, got: %v", err) } @@ -426,7 +426,7 @@ func TestAgent_RemoveCheck(t *testing.T) { Script: "exit 0", Interval: 15 * time.Second, } - err := agent.AddCheck(health, chk, false) + err := agent.AddCheck(health, chk, false, "") if err != nil { t.Fatalf("err: %v", err) } @@ -461,7 +461,7 @@ func TestAgent_UpdateCheck(t *testing.T) { chk := &CheckType{ TTL: 15 * time.Second, } - err := agent.AddCheck(health, chk, false) + err := agent.AddCheck(health, chk, false, "") if err != nil { t.Fatalf("err: %v", err) } @@ -521,11 +521,8 @@ func TestAgent_PersistService(t *testing.T) { file := filepath.Join(agent.config.DataDir, servicesDir, stringHash(svc.ID)) - // Configure a service token - agent.state.SetServiceToken(svc.ID, "hello") - // Check is not persisted unless requested - if err := agent.AddService(svc, nil, false); err != nil { + if err := agent.AddService(svc, nil, false, ""); err != nil { t.Fatalf("err: %v", err) } if _, err := os.Stat(file); err == nil { @@ -533,7 +530,7 @@ func TestAgent_PersistService(t *testing.T) { } // Persists to file if requested - if err := agent.AddService(svc, nil, true); err != nil { + if err := agent.AddService(svc, nil, true, "mytoken"); err != nil { t.Fatalf("err: %v", err) } @@ -542,7 +539,7 @@ func TestAgent_PersistService(t *testing.T) { } expected, err := json.Marshal(persistedService{ - Token: "hello", + Token: "mytoken", Service: svc, }) if err != nil { @@ -567,7 +564,7 @@ func TestAgent_PersistService(t *testing.T) { if _, ok := agent2.state.services[svc.ID]; !ok { t.Fatalf("bad: %#v", agent2.state.services) } - if agent2.state.serviceTokens[svc.ID] != "hello" { + if agent2.state.serviceTokens[svc.ID] != "mytoken" { t.Fatalf("bad: %#v", agent2.state.services[svc.ID]) } } @@ -632,7 +629,7 @@ func TestAgent_PurgeService(t *testing.T) { } file := filepath.Join(agent.config.DataDir, servicesDir, stringHash(svc.ID)) - if err := agent.AddService(svc, nil, true); err != nil { + if err := agent.AddService(svc, nil, true, ""); err != nil { t.Fatalf("err: %v", err) } @@ -668,7 +665,7 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) { } // First persist the service - if err := agent.AddService(svc1, nil, true); err != nil { + if err := agent.AddService(svc1, nil, true, ""); err != nil { t.Fatalf("err: %v", err) } agent.Shutdown() @@ -722,11 +719,8 @@ func TestAgent_PersistCheck(t *testing.T) { file := filepath.Join(agent.config.DataDir, checksDir, stringHash(check.CheckID)) - // Configure a service registration token - agent.state.SetCheckToken(check.CheckID, "hello") - // Not persisted if not requested - if err := agent.AddCheck(check, chkType, false); err != nil { + if err := agent.AddCheck(check, chkType, false, ""); err != nil { t.Fatalf("err: %v", err) } if _, err := os.Stat(file); err == nil { @@ -734,7 +728,7 @@ func TestAgent_PersistCheck(t *testing.T) { } // Should persist if requested - if err := agent.AddCheck(check, chkType, true); err != nil { + if err := agent.AddCheck(check, chkType, true, "mytoken"); err != nil { t.Fatalf("err: %v", err) } @@ -745,7 +739,7 @@ func TestAgent_PersistCheck(t *testing.T) { expected, err := json.Marshal(persistedCheck{ Check: check, ChkType: chkType, - Token: "hello", + Token: "mytoken", }) if err != nil { t.Fatalf("err: %s", err) @@ -778,7 +772,7 @@ func TestAgent_PersistCheck(t *testing.T) { if _, ok := agent2.checkMonitors[check.CheckID]; !ok { t.Fatalf("bad: %#v", agent2.checkMonitors) } - if agent2.state.checkTokens[check.CheckID] != "hello" { + if agent2.state.checkTokens[check.CheckID] != "mytoken" { t.Fatalf("bad: %s", agent2.state.checkTokens[check.CheckID]) } } @@ -797,7 +791,7 @@ func TestAgent_PurgeCheck(t *testing.T) { } file := filepath.Join(agent.config.DataDir, checksDir, stringHash(check.CheckID)) - if err := agent.AddCheck(check, nil, true); err != nil { + if err := agent.AddCheck(check, nil, true, ""); err != nil { t.Fatalf("err: %v", err) } @@ -833,7 +827,7 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) { } // First persist the check - if err := agent.AddCheck(check1, nil, true); err != nil { + if err := agent.AddCheck(check1, nil, true, ""); err != nil { t.Fatalf("err: %v", err) } agent.Shutdown() @@ -906,7 +900,7 @@ func TestAgent_unloadChecks(t *testing.T) { Tags: []string{"foo"}, Port: 8000, } - if err := agent.AddService(svc, nil, false); err != nil { + if err := agent.AddService(svc, nil, false, ""); err != nil { t.Fatalf("err: %v", err) } @@ -919,7 +913,7 @@ func TestAgent_unloadChecks(t *testing.T) { ServiceID: "redis", ServiceName: "redis", } - if err := agent.AddCheck(check1, nil, false); err != nil { + if err := agent.AddCheck(check1, nil, false, ""); err != nil { t.Fatalf("err: %s", err) } found := false @@ -981,7 +975,7 @@ func TestAgent_unloadServices(t *testing.T) { } // Register the service - if err := agent.AddService(svc, nil, false); err != nil { + if err := agent.AddService(svc, nil, false, ""); err != nil { t.Fatalf("err: %v", err) } found := false @@ -1029,7 +1023,7 @@ func TestAgent_ServiceMaintenanceMode(t *testing.T) { } // Register the service - if err := agent.AddService(svc, nil, false); err != nil { + if err := agent.AddService(svc, nil, false, ""); err != nil { t.Fatalf("err: %v", err) } @@ -1129,7 +1123,7 @@ func TestAgent_checkStateSnapshot(t *testing.T) { Tags: []string{"foo"}, Port: 8000, } - if err := agent.AddService(svc, nil, false); err != nil { + if err := agent.AddService(svc, nil, false, ""); err != nil { t.Fatalf("err: %v", err) } @@ -1142,7 +1136,7 @@ func TestAgent_checkStateSnapshot(t *testing.T) { ServiceID: "redis", ServiceName: "redis", } - if err := agent.AddCheck(check1, nil, true); err != nil { + if err := agent.AddCheck(check1, nil, true, ""); err != nil { t.Fatalf("err: %s", err) } diff --git a/command/agent/local.go b/command/agent/local.go index 83fefbf88..f86445770 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -123,23 +123,6 @@ func (l *localState) isPaused() bool { return atomic.LoadInt32(&l.paused) == 1 } -// SetServiceToken configures the provided token for the service ID. -// The token will be used to perform service registration operations. -func (l *localState) SetServiceToken(id, token string) { - if token != "" { - l.Lock() - defer l.Unlock() - l.serviceTokens[id] = token - } -} - -// RemoveServiceToken is used to remove a configured service token. -func (l *localState) RemoveServiceToken(id string) { - l.Lock() - defer l.Unlock() - delete(l.serviceTokens, id) -} - // ServiceToken returns the configured ACL token for the given // service ID. If none is present, the agent's token is returned. func (l *localState) ServiceToken(id string) string { @@ -160,7 +143,7 @@ func (l *localState) serviceToken(id string) string { // AddService is used to add a service entry to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *localState) AddService(service *structs.NodeService) { +func (l *localState) AddService(service *structs.NodeService, token string) { // Assign the ID if none given if service.ID == "" && service.Service != "" { service.ID = service.Service @@ -171,6 +154,7 @@ func (l *localState) AddService(service *structs.NodeService) { l.services[service.ID] = service l.serviceStatus[service.ID] = syncStatus{} + l.serviceTokens[service.ID] = token l.changeMade() } @@ -181,6 +165,7 @@ func (l *localState) RemoveService(serviceID string) { defer l.Unlock() delete(l.services, serviceID) + delete(l.serviceTokens, serviceID) l.serviceStatus[serviceID] = syncStatus{remoteDelete: true} l.changeMade() } @@ -198,23 +183,6 @@ func (l *localState) Services() map[string]*structs.NodeService { return services } -// SetCheckToken is used to configure an ACL token for a specific -// health check. The token is used during check registration operations. -func (l *localState) SetCheckToken(id, token string) { - if token != "" { - l.Lock() - defer l.Unlock() - l.checkTokens[id] = token - } -} - -// RemoveCheckToken is used to remove a configured check token. -func (l *localState) RemoveCheckToken(id string) { - l.Lock() - defer l.Unlock() - delete(l.checkTokens, id) -} - // CheckToken is used to return the configured health check token, or // if none is configured, the default agent ACL token. func (l *localState) CheckToken(id string) string { @@ -235,7 +203,7 @@ func (l *localState) checkToken(id string) string { // AddCheck is used to add a health check to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *localState) AddCheck(check *structs.HealthCheck) { +func (l *localState) AddCheck(check *structs.HealthCheck, token string) { // Set the node name check.Node = l.config.NodeName @@ -244,6 +212,7 @@ func (l *localState) AddCheck(check *structs.HealthCheck) { l.checks[check.CheckID] = check l.checkStatus[check.CheckID] = syncStatus{} + l.checkTokens[check.CheckID] = token l.changeMade() } @@ -254,6 +223,7 @@ func (l *localState) RemoveCheck(checkID string) { defer l.Unlock() delete(l.checks, checkID) + delete(l.checkTokens, checkID) l.checkStatus[checkID] = syncStatus{remoteDelete: true} l.changeMade() } diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 84c7c2d26..113bbe36e 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -34,7 +34,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - agent.state.AddService(srv1) + agent.state.AddService(srv1, "") args.Service = srv1 if err := agent.RPC("Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) @@ -47,7 +47,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Tags: []string{}, Port: 8000, } - agent.state.AddService(srv2) + agent.state.AddService(srv2, "") srv2_mod := new(structs.NodeService) *srv2_mod = *srv2 @@ -64,7 +64,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Tags: []string{}, Port: 80, } - agent.state.AddService(srv3) + agent.state.AddService(srv3, "") // Exists remote (delete) srv4 := &structs.NodeService{ @@ -86,7 +86,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Address: "127.0.0.10", Port: 8000, } - agent.state.AddService(srv5) + agent.state.AddService(srv5, "") // Exists local, in sync, remote missing (create) srv6 := &structs.NodeService{ @@ -95,7 +95,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Tags: []string{}, Port: 11211, } - agent.state.AddService(srv6) + agent.state.AddService(srv6, "") agent.state.serviceStatus["cache"] = syncStatus{inSync: true} srv5_mod := new(structs.NodeService) @@ -185,7 +185,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - agent.state.AddService(srv) + agent.state.AddService(srv, "") chk := &structs.HealthCheck{ Node: agent.config.NodeName, @@ -194,7 +194,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { ServiceID: "mysql", Status: structs.HealthPassing, } - agent.state.AddCheck(chk) + agent.state.AddCheck(chk, "") // Sync the service once if err := agent.state.syncService("mysql"); err != nil { @@ -236,7 +236,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - agent.state.AddService(srv) + agent.state.AddService(srv, "") chk1 := &structs.HealthCheck{ Node: agent.config.NodeName, @@ -245,7 +245,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { ServiceID: "redis", Status: structs.HealthPassing, } - agent.state.AddCheck(chk1) + agent.state.AddCheck(chk1, "") chk2 := &structs.HealthCheck{ Node: agent.config.NodeName, @@ -254,7 +254,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { ServiceID: "redis", Status: structs.HealthPassing, } - agent.state.AddCheck(chk2) + agent.state.AddCheck(chk2, "") // Sync the service once if err := agent.state.syncService("redis"); err != nil { @@ -326,7 +326,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - agent.state.AddService(srv1) + agent.state.AddService(srv1, "") // Create service (Disallowed) srv2 := &structs.NodeService{ @@ -335,7 +335,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { Tags: []string{"foo"}, Port: 5001, } - agent.state.AddService(srv2) + agent.state.AddService(srv2, "") // Trigger anti-entropy run and wait agent.StartSync() @@ -409,7 +409,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { Name: "mysql", Status: structs.HealthPassing, } - agent.state.AddCheck(chk1) + agent.state.AddCheck(chk1, "") args.Check = chk1 if err := agent.RPC("Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) @@ -422,7 +422,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { Name: "redis", Status: structs.HealthPassing, } - agent.state.AddCheck(chk2) + agent.state.AddCheck(chk2, "") chk2_mod := new(structs.HealthCheck) *chk2_mod = *chk2 @@ -439,7 +439,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { Name: "web", Status: structs.HealthPassing, } - agent.state.AddCheck(chk3) + agent.state.AddCheck(chk3, "") // Exists remote (delete) chk4 := &structs.HealthCheck{ @@ -460,7 +460,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { Name: "cache", Status: structs.HealthPassing, } - agent.state.AddCheck(chk5) + agent.state.AddCheck(chk5, "") agent.state.checkStatus["cache"] = syncStatus{inSync: true} // Trigger anti-entropy run and wait @@ -539,7 +539,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { Status: structs.HealthPassing, Output: "", } - agent.state.AddCheck(check) + agent.state.AddCheck(check, "") // Trigger anti-entropy run and wait agent.StartSync() @@ -627,13 +627,13 @@ func TestAgent_serviceTokens(t *testing.T) { } // Returns configured token - l.SetServiceToken("redis", "abc123") + l.serviceTokens["redis"] = "abc123" if token := l.ServiceToken("redis"); token != "abc123" { t.Fatalf("bad: %s", token) } // Removes token - l.RemoveServiceToken("redis") + l.RemoveService("redis") if token := l.ServiceToken("redis"); token != "default" { t.Fatalf("bad: %s", token) } @@ -651,13 +651,13 @@ func TestAgent_checkTokens(t *testing.T) { } // Returns configured token - l.SetCheckToken("mem", "abc123") + l.checkTokens["mem"] = "abc123" if token := l.CheckToken("mem"); token != "abc123" { t.Fatalf("bad: %s", token) } // Removes token - l.RemoveCheckToken("mem") + l.RemoveCheck("mem") if token := l.CheckToken("mem"); token != "default" { t.Fatalf("bad: %s", token) } diff --git a/command/agent/user_event_test.go b/command/agent/user_event_test.go index e3aa8f381..336bf0494 100644 --- a/command/agent/user_event_test.go +++ b/command/agent/user_event_test.go @@ -57,7 +57,7 @@ func TestShouldProcessUserEvent(t *testing.T) { Tags: []string{"test", "foo", "bar", "master"}, Port: 5000, } - agent.state.AddService(srv1) + agent.state.AddService(srv1, "") p := &UserEvent{} if !agent.shouldProcessUserEvent(p) { @@ -159,7 +159,7 @@ func TestFireReceiveEvent(t *testing.T) { Tags: []string{"test", "foo", "bar", "master"}, Port: 5000, } - agent.state.AddService(srv1) + agent.state.AddService(srv1, "") p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"} err := agent.UserEvent("", p1) diff --git a/command/maint_test.go b/command/maint_test.go index 92bf02b76..279305130 100644 --- a/command/maint_test.go +++ b/command/maint_test.go @@ -42,7 +42,7 @@ func TestMaintCommandRun_NoArgs(t *testing.T) { ID: "test", Service: "test", } - if err := a1.agent.AddService(service, nil, false); err != nil { + if err := a1.agent.AddService(service, nil, false, ""); err != nil { t.Fatalf("err: %v", err) } if err := a1.agent.EnableServiceMaintenance("test", "broken 1"); err != nil { @@ -132,7 +132,7 @@ func TestMaintCommandRun_EnableServiceMaintenance(t *testing.T) { ID: "test", Service: "test", } - if err := a1.agent.AddService(service, nil, false); err != nil { + if err := a1.agent.AddService(service, nil, false, ""); err != nil { t.Fatalf("err: %v", err) } @@ -164,7 +164,7 @@ func TestMaintCommandRun_DisableServiceMaintenance(t *testing.T) { ID: "test", Service: "test", } - if err := a1.agent.AddService(service, nil, false); err != nil { + if err := a1.agent.AddService(service, nil, false, ""); err != nil { t.Fatalf("err: %v", err) }