agent: add service/check token methods to reduce invasiveness
This commit is contained in:
parent
30f6f1142e
commit
75d182296f
|
@ -155,7 +155,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
|
|||
Port: agent.config.Ports.Server,
|
||||
Tags: []string{},
|
||||
}
|
||||
agent.state.AddService(&consulService, "")
|
||||
agent.state.AddService(&consulService)
|
||||
} else {
|
||||
err = agent.setupClient()
|
||||
agent.state.SetIface(agent.client)
|
||||
|
@ -520,11 +520,11 @@ func (a *Agent) ResumeSync() {
|
|||
}
|
||||
|
||||
// persistService saves a service definition to a JSON file in the data dir
|
||||
func (a *Agent) persistService(service *structs.NodeService, token string) error {
|
||||
func (a *Agent) persistService(service *structs.NodeService) error {
|
||||
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
|
||||
if _, err := os.Stat(svcPath); os.IsNotExist(err) {
|
||||
wrapped := &persistedService{
|
||||
Token: token,
|
||||
Token: a.state.ServiceToken(service.ID),
|
||||
Service: service,
|
||||
}
|
||||
encoded, err := json.Marshal(wrapped)
|
||||
|
@ -556,14 +556,14 @@ func (a *Agent) purgeService(serviceID string) error {
|
|||
}
|
||||
|
||||
// persistCheck saves a check definition to the local agent's state directory
|
||||
func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *CheckType, token string) error {
|
||||
func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *CheckType) error {
|
||||
checkPath := filepath.Join(a.config.DataDir, checksDir, stringHash(check.CheckID))
|
||||
if _, err := os.Stat(checkPath); !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the persisted check
|
||||
p := persistedCheck{check, chkType, token}
|
||||
p := persistedCheck{check, chkType, a.state.CheckToken(check.CheckID)}
|
||||
|
||||
encoded, err := json.Marshal(p)
|
||||
if err != nil {
|
||||
|
@ -595,8 +595,7 @@ func (a *Agent) purgeCheck(checkID string) error {
|
|||
// AddService is used to add a service entry.
|
||||
// This entry is persistent and the agent will make a best effort to
|
||||
// ensure it is registered
|
||||
func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes,
|
||||
persist bool, token string) error {
|
||||
func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, persist bool) error {
|
||||
if service.Service == "" {
|
||||
return fmt.Errorf("Service name missing")
|
||||
}
|
||||
|
@ -626,11 +625,11 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes,
|
|||
}
|
||||
|
||||
// Add the service
|
||||
a.state.AddService(service, token)
|
||||
a.state.AddService(service)
|
||||
|
||||
// Persist the service to a file
|
||||
if persist {
|
||||
if err := a.persistService(service, token); err != nil {
|
||||
if err := a.persistService(service); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -650,7 +649,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes,
|
|||
ServiceID: service.ID,
|
||||
ServiceName: service.Service,
|
||||
}
|
||||
if err := a.AddCheck(check, chkType, persist, token); err != nil {
|
||||
if err := a.AddCheck(check, chkType, persist); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -701,8 +700,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
|
|||
// This entry is persistent and the agent will make a best effort to
|
||||
// ensure it is registered. The Check may include a CheckType which
|
||||
// is used to automatically update the check status
|
||||
func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType,
|
||||
persist bool, token string) error {
|
||||
func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist bool) error {
|
||||
if check.CheckID == "" {
|
||||
return fmt.Errorf("CheckID missing")
|
||||
}
|
||||
|
@ -781,11 +779,11 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType,
|
|||
}
|
||||
|
||||
// Add to the local state for anti-entropy
|
||||
a.state.AddCheck(check, token)
|
||||
a.state.AddCheck(check)
|
||||
|
||||
// Persist the check
|
||||
if persist {
|
||||
return a.persistCheck(check, chkType, token)
|
||||
return a.persistCheck(check, chkType)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -926,7 +924,7 @@ func (a *Agent) loadServices(conf *Config) error {
|
|||
for _, service := range conf.Services {
|
||||
ns := service.NodeService()
|
||||
chkTypes := service.CheckTypes()
|
||||
if err := a.AddService(ns, chkTypes, false, service.Token); err != nil {
|
||||
if err := a.AddService(ns, chkTypes, false); err != nil {
|
||||
return fmt.Errorf("Failed to register service '%s': %v", service.ID, err)
|
||||
}
|
||||
}
|
||||
|
@ -959,6 +957,7 @@ func (a *Agent) loadServices(conf *Config) error {
|
|||
return err
|
||||
}
|
||||
svc := wrapped.Service
|
||||
a.state.AddServiceToken(svc.ID, wrapped.Token)
|
||||
|
||||
if _, ok := a.state.services[svc.ID]; ok {
|
||||
// Purge previously persisted service. This allows config to be
|
||||
|
@ -969,7 +968,7 @@ func (a *Agent) loadServices(conf *Config) error {
|
|||
} else {
|
||||
a.logger.Printf("[DEBUG] agent: restored service definition %q from %q",
|
||||
svc.ID, filePath)
|
||||
return a.AddService(svc, nil, false, wrapped.Token)
|
||||
return a.AddService(svc, nil, false)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -998,7 +997,7 @@ func (a *Agent) loadChecks(conf *Config) error {
|
|||
for _, check := range conf.Checks {
|
||||
health := check.HealthCheck(conf.NodeName)
|
||||
chkType := &check.CheckType
|
||||
if err := a.AddCheck(health, chkType, false, ""); err != nil {
|
||||
if err := a.AddCheck(health, chkType, false); err != nil {
|
||||
return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check)
|
||||
}
|
||||
}
|
||||
|
@ -1042,7 +1041,7 @@ func (a *Agent) loadChecks(conf *Config) error {
|
|||
// services into the active pool
|
||||
p.Check.Status = structs.HealthCritical
|
||||
|
||||
if err := a.AddCheck(p.Check, p.ChkType, false, ""); err != nil {
|
||||
if err := a.AddCheck(p.Check, p.ChkType, false); err != nil {
|
||||
// Purge the check if it is unable to be restored.
|
||||
a.logger.Printf("[WARN] agent: Failed to restore check %q: %s",
|
||||
p.Check.CheckID, err)
|
||||
|
@ -1119,7 +1118,7 @@ func (a *Agent) EnableServiceMaintenance(serviceID, reason string) error {
|
|||
ServiceName: service.Service,
|
||||
Status: structs.HealthCritical,
|
||||
}
|
||||
a.AddCheck(check, nil, true, "")
|
||||
a.AddCheck(check, nil, true)
|
||||
a.logger.Printf("[INFO] agent: Service %q entered maintenance mode", serviceID)
|
||||
|
||||
return nil
|
||||
|
@ -1165,7 +1164,7 @@ func (a *Agent) EnableNodeMaintenance(reason string) {
|
|||
Notes: reason,
|
||||
Status: structs.HealthCritical,
|
||||
}
|
||||
a.AddCheck(check, nil, true, "")
|
||||
a.AddCheck(check, nil, true)
|
||||
a.logger.Printf("[INFO] agent: Node entered maintenance mode")
|
||||
}
|
||||
|
||||
|
|
|
@ -100,9 +100,10 @@ func (s *HTTPServer) AgentRegisterCheck(resp http.ResponseWriter, req *http.Requ
|
|||
// Get the provided token, if any
|
||||
var token string
|
||||
s.parseToken(req, &token)
|
||||
s.agent.state.AddCheckToken(health.CheckID, token)
|
||||
|
||||
// Add the check
|
||||
if err := s.agent.AddCheck(health, chkType, true, token); err != nil {
|
||||
if err := s.agent.AddCheck(health, chkType, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.syncChanges()
|
||||
|
@ -206,9 +207,10 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
|
|||
// Get the provided token, if any
|
||||
var token string
|
||||
s.parseToken(req, &token)
|
||||
s.agent.state.AddServiceToken(ns.ID, token)
|
||||
|
||||
// Add the check
|
||||
if err := s.agent.AddService(ns, chkTypes, true, token); err != nil {
|
||||
if err := s.agent.AddService(ns, chkTypes, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.syncChanges()
|
||||
|
|
|
@ -684,7 +684,7 @@ func TestAgent_PersistCheck(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
p := persistedCheck{check, chkType}
|
||||
p := persistedCheck{check, chkType, ""}
|
||||
expected, err := json.Marshal(p)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
|
|
@ -36,7 +36,7 @@ type localState struct {
|
|||
// element due to a go bug.
|
||||
paused int32
|
||||
|
||||
sync.Mutex
|
||||
sync.RWMutex
|
||||
logger *log.Logger
|
||||
|
||||
// Config is the agent config
|
||||
|
@ -123,10 +123,30 @@ func (l *localState) isPaused() bool {
|
|||
return atomic.LoadInt32(&l.paused) == 1
|
||||
}
|
||||
|
||||
// AddServiceToken configures the provided token for the service ID.
|
||||
// The token will be used to perform service registration operations.
|
||||
func (l *localState) AddServiceToken(id, token string) {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
l.serviceTokens[id] = token
|
||||
}
|
||||
|
||||
// ServiceToken returns the configured ACL token for the given
|
||||
// service ID. If none is present, the agent's token is returned.
|
||||
func (l *localState) ServiceToken(id string) string {
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
token := l.serviceTokens[id]
|
||||
if token == "" {
|
||||
token = l.config.ACLToken
|
||||
}
|
||||
return token
|
||||
}
|
||||
|
||||
// AddService is used to add a service entry to the local state.
|
||||
// This entry is persistent and the agent will make a best effort to
|
||||
// ensure it is registered
|
||||
func (l *localState) AddService(service *structs.NodeService, token string) {
|
||||
func (l *localState) AddService(service *structs.NodeService) {
|
||||
// Assign the ID if none given
|
||||
if service.ID == "" && service.Service != "" {
|
||||
service.ID = service.Service
|
||||
|
@ -137,7 +157,6 @@ func (l *localState) AddService(service *structs.NodeService, token string) {
|
|||
|
||||
l.services[service.ID] = service
|
||||
l.serviceStatus[service.ID] = syncStatus{}
|
||||
l.serviceTokens[service.ID] = token
|
||||
l.changeMade()
|
||||
}
|
||||
|
||||
|
@ -156,8 +175,8 @@ func (l *localState) RemoveService(serviceID string) {
|
|||
// agent is aware of and are being kept in sync with the server
|
||||
func (l *localState) Services() map[string]*structs.NodeService {
|
||||
services := make(map[string]*structs.NodeService)
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
for name, serv := range l.services {
|
||||
services[name] = serv
|
||||
|
@ -165,10 +184,30 @@ func (l *localState) Services() map[string]*structs.NodeService {
|
|||
return services
|
||||
}
|
||||
|
||||
// AddCheckToken is used to configure an ACL token for a specific
|
||||
// health check. The token is used during check registration operations.
|
||||
func (l *localState) AddCheckToken(id, token string) {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
l.checkTokens[id] = token
|
||||
}
|
||||
|
||||
// CheckToken is used to return the configured health check token, or
|
||||
// if none is configured, the default agent ACL token.
|
||||
func (l *localState) CheckToken(id string) string {
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
token := l.checkTokens[id]
|
||||
if token == "" {
|
||||
token = l.config.ACLToken
|
||||
}
|
||||
return token
|
||||
}
|
||||
|
||||
// AddCheck is used to add a health check to the local state.
|
||||
// This entry is persistent and the agent will make a best effort to
|
||||
// ensure it is registered
|
||||
func (l *localState) AddCheck(check *structs.HealthCheck, token string) {
|
||||
func (l *localState) AddCheck(check *structs.HealthCheck) {
|
||||
// Set the node name
|
||||
check.Node = l.config.NodeName
|
||||
|
||||
|
@ -177,7 +216,6 @@ func (l *localState) AddCheck(check *structs.HealthCheck, token string) {
|
|||
|
||||
l.checks[check.CheckID] = check
|
||||
l.checkStatus[check.CheckID] = syncStatus{}
|
||||
l.checkTokens[check.CheckID] = token
|
||||
l.changeMade()
|
||||
}
|
||||
|
||||
|
@ -239,8 +277,8 @@ func (l *localState) UpdateCheck(checkID, status, output string) {
|
|||
// agent is aware of and are being kept in sync with the server
|
||||
func (l *localState) Checks() map[string]*structs.HealthCheck {
|
||||
checks := make(map[string]*structs.HealthCheck)
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
for name, check := range l.checks {
|
||||
checks[name] = check
|
||||
|
@ -442,16 +480,11 @@ func (l *localState) deleteService(id string) error {
|
|||
return fmt.Errorf("ServiceID missing")
|
||||
}
|
||||
|
||||
token := l.serviceTokens[id]
|
||||
if token == "" {
|
||||
token = l.config.ACLToken
|
||||
}
|
||||
|
||||
req := structs.DeregisterRequest{
|
||||
Datacenter: l.config.Datacenter,
|
||||
Node: l.config.NodeName,
|
||||
ServiceID: id,
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
WriteRequest: structs.WriteRequest{Token: l.ServiceToken(id)},
|
||||
}
|
||||
var out struct{}
|
||||
err := l.iface.RPC("Catalog.Deregister", &req, &out)
|
||||
|
@ -468,16 +501,11 @@ func (l *localState) deleteCheck(id string) error {
|
|||
return fmt.Errorf("CheckID missing")
|
||||
}
|
||||
|
||||
token := l.checkTokens[id]
|
||||
if token == "" {
|
||||
token = l.config.ACLToken
|
||||
}
|
||||
|
||||
req := structs.DeregisterRequest{
|
||||
Datacenter: l.config.Datacenter,
|
||||
Node: l.config.NodeName,
|
||||
CheckID: id,
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
WriteRequest: structs.WriteRequest{Token: l.CheckToken(id)},
|
||||
}
|
||||
var out struct{}
|
||||
err := l.iface.RPC("Catalog.Deregister", &req, &out)
|
||||
|
@ -490,17 +518,12 @@ func (l *localState) deleteCheck(id string) error {
|
|||
|
||||
// syncService is used to sync a service to the server
|
||||
func (l *localState) syncService(id string) error {
|
||||
token := l.serviceTokens[id]
|
||||
if token == "" {
|
||||
token = l.config.ACLToken
|
||||
}
|
||||
|
||||
req := structs.RegisterRequest{
|
||||
Datacenter: l.config.Datacenter,
|
||||
Node: l.config.NodeName,
|
||||
Address: l.config.AdvertiseAddr,
|
||||
Service: l.services[id],
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
WriteRequest: structs.WriteRequest{Token: l.ServiceToken(id)},
|
||||
}
|
||||
|
||||
// If the service has associated checks that are out of sync,
|
||||
|
@ -552,18 +575,13 @@ func (l *localState) syncCheck(id string) error {
|
|||
}
|
||||
}
|
||||
|
||||
token := l.checkTokens[id]
|
||||
if token == "" {
|
||||
token = l.config.ACLToken
|
||||
}
|
||||
|
||||
req := structs.RegisterRequest{
|
||||
Datacenter: l.config.Datacenter,
|
||||
Node: l.config.NodeName,
|
||||
Address: l.config.AdvertiseAddr,
|
||||
Service: service,
|
||||
Check: l.checks[id],
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
WriteRequest: structs.WriteRequest{Token: l.CheckToken(id)},
|
||||
}
|
||||
var out struct{}
|
||||
err := l.iface.RPC("Catalog.Register", &req, &out)
|
||||
|
|
Loading…
Reference in New Issue