Merge pull request #891 from hashicorp/f-token

ACL tokens for service/check registration
This commit is contained in:
Ryan Uber 2015-05-05 22:17:31 -07:00
commit 9acc42b86e
14 changed files with 369 additions and 91 deletions

View file

@ -164,7 +164,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
Port: agent.config.Ports.Server,
Tags: []string{},
}
agent.state.AddService(&consulService)
agent.state.AddService(&consulService, "")
} else {
err = agent.setupClient()
agent.state.SetIface(agent.client)
@ -536,7 +536,11 @@ func (a *Agent) ResumeSync() {
func (a *Agent) persistService(service *structs.NodeService) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
if _, err := os.Stat(svcPath); os.IsNotExist(err) {
encoded, err := json.Marshal(service)
wrapped := persistedService{
Token: a.state.ServiceToken(service.ID),
Service: service,
}
encoded, err := json.Marshal(wrapped)
if err != nil {
return nil
}
@ -572,9 +576,13 @@ func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *CheckType) err
}
// Create the persisted check
p := persistedCheck{check, chkType}
wrapped := persistedCheck{
Check: check,
ChkType: chkType,
Token: a.state.CheckToken(check.CheckID),
}
encoded, err := json.Marshal(p)
encoded, err := json.Marshal(wrapped)
if err != nil {
return nil
}
@ -604,7 +612,7 @@ func (a *Agent) purgeCheck(checkID string) error {
// AddService is used to add a service entry.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, persist bool) error {
func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, persist bool, token string) error {
if service.Service == "" {
return fmt.Errorf("Service name missing")
}
@ -634,7 +642,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, pe
}
// Add the service
a.state.AddService(service)
a.state.AddService(service, token)
// Persist the service to a file
if persist {
@ -658,7 +666,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, pe
ServiceID: service.ID,
ServiceName: service.Service,
}
if err := a.AddCheck(check, chkType, persist); err != nil {
if err := a.AddCheck(check, chkType, persist, token); err != nil {
return err
}
}
@ -709,7 +717,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
// This entry is persistent and the agent will make a best effort to
// ensure it is registered. The Check may include a CheckType which
// is used to automatically update the check status
func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist bool) error {
func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist bool, token string) error {
if check.CheckID == "" {
return fmt.Errorf("CheckID missing")
}
@ -788,7 +796,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist
}
// Add to the local state for anti-entropy
a.state.AddCheck(check)
a.state.AddCheck(check, token)
// Persist the check
if persist {
@ -933,7 +941,7 @@ func (a *Agent) loadServices(conf *Config) error {
for _, service := range conf.Services {
ns := service.NodeService()
chkTypes := service.CheckTypes()
if err := a.AddService(ns, chkTypes, false); err != nil {
if err := a.AddService(ns, chkTypes, false, service.Token); err != nil {
return fmt.Errorf("Failed to register service '%s': %v", service.ID, err)
}
}
@ -961,9 +969,17 @@ func (a *Agent) loadServices(conf *Config) error {
return err
}
var wrapped *persistedService
var token string
var svc *structs.NodeService
if err := json.Unmarshal(content, &wrapped); err != nil {
// Backwards-compatibility for pre-0.5.1 persisted services
if err := json.Unmarshal(content, &svc); err != nil {
return err
return fmt.Errorf("failed decoding service from %s: %s", filePath, err)
}
} else {
svc = wrapped.Service
token = wrapped.Token
}
if _, ok := a.state.services[svc.ID]; ok {
@ -975,7 +991,7 @@ func (a *Agent) loadServices(conf *Config) error {
} else {
a.logger.Printf("[DEBUG] agent: restored service definition %q from %q",
svc.ID, filePath)
return a.AddService(svc, nil, false)
return a.AddService(svc, nil, false, token)
}
})
@ -1004,7 +1020,7 @@ func (a *Agent) loadChecks(conf *Config) error {
for _, check := range conf.Checks {
health := check.HealthCheck(conf.NodeName)
chkType := &check.CheckType
if err := a.AddCheck(health, chkType, false); err != nil {
if err := a.AddCheck(health, chkType, false, check.Token); err != nil {
return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check)
}
}
@ -1048,7 +1064,7 @@ func (a *Agent) loadChecks(conf *Config) error {
// services into the active pool
p.Check.Status = structs.HealthCritical
if err := a.AddCheck(p.Check, p.ChkType, false); err != nil {
if err := a.AddCheck(p.Check, p.ChkType, false, p.Token); err != nil {
// Purge the check if it is unable to be restored.
a.logger.Printf("[WARN] agent: Failed to restore check %q: %s",
p.Check.CheckID, err)
@ -1125,7 +1141,7 @@ func (a *Agent) EnableServiceMaintenance(serviceID, reason string) error {
ServiceName: service.Service,
Status: structs.HealthCritical,
}
a.AddCheck(check, nil, true)
a.AddCheck(check, nil, true, "")
a.logger.Printf("[INFO] agent: Service %q entered maintenance mode", serviceID)
return nil
@ -1171,7 +1187,7 @@ func (a *Agent) EnableNodeMaintenance(reason string) {
Notes: reason,
Status: structs.HealthCritical,
}
a.AddCheck(check, nil, true)
a.AddCheck(check, nil, true, "")
a.logger.Printf("[INFO] agent: Node entered maintenance mode")
}

View file

@ -97,8 +97,12 @@ func (s *HTTPServer) AgentRegisterCheck(resp http.ResponseWriter, req *http.Requ
return nil, nil
}
// Get the provided token, if any
var token string
s.parseToken(req, &token)
// Add the check
if err := s.agent.AddCheck(health, chkType, true); err != nil {
if err := s.agent.AddCheck(health, chkType, true, token); err != nil {
return nil, err
}
s.syncChanges()
@ -199,8 +203,12 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
}
}
// Get the provided token, if any
var token string
s.parseToken(req, &token)
// Add the check
if err := s.agent.AddService(ns, chkTypes, true); err != nil {
if err := s.agent.AddService(ns, chkTypes, true, token); err != nil {
return nil, err
}
s.syncChanges()

View file

@ -25,7 +25,7 @@ func TestHTTPAgentServices(t *testing.T) {
Tags: []string{"master"},
Port: 5000,
}
srv.agent.state.AddService(srv1)
srv.agent.state.AddService(srv1, "")
obj, err := srv.AgentServices(nil, nil)
if err != nil {
@ -52,7 +52,7 @@ func TestHTTPAgentChecks(t *testing.T) {
Name: "mysql",
Status: structs.HealthPassing,
}
srv.agent.state.AddCheck(chk1)
srv.agent.state.AddCheck(chk1, "")
obj, err := srv.AgentChecks(nil, nil)
if err != nil {
@ -252,7 +252,7 @@ func TestHTTPAgentRegisterCheck(t *testing.T) {
defer srv.agent.Shutdown()
// Register node
req, err := http.NewRequest("GET", "/v1/agent/check/register", nil)
req, err := http.NewRequest("GET", "/v1/agent/check/register?token=abc123", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -280,6 +280,11 @@ func TestHTTPAgentRegisterCheck(t *testing.T) {
if _, ok := srv.agent.checkTTLs["test"]; !ok {
t.Fatalf("missing test check ttl")
}
// Ensure the token was configured
if token := srv.agent.state.CheckToken("test"); token == "" {
t.Fatalf("missing token")
}
}
func TestHTTPAgentDeregisterCheck(t *testing.T) {
@ -289,7 +294,7 @@ func TestHTTPAgentDeregisterCheck(t *testing.T) {
defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
if err := srv.agent.AddCheck(chk, nil, false); err != nil {
if err := srv.agent.AddCheck(chk, nil, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -321,7 +326,7 @@ func TestHTTPAgentPassCheck(t *testing.T) {
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
chkType := &CheckType{TTL: 15 * time.Second}
if err := srv.agent.AddCheck(chk, chkType, false); err != nil {
if err := srv.agent.AddCheck(chk, chkType, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -354,7 +359,7 @@ func TestHTTPAgentWarnCheck(t *testing.T) {
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
chkType := &CheckType{TTL: 15 * time.Second}
if err := srv.agent.AddCheck(chk, chkType, false); err != nil {
if err := srv.agent.AddCheck(chk, chkType, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -387,7 +392,7 @@ func TestHTTPAgentFailCheck(t *testing.T) {
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
chkType := &CheckType{TTL: 15 * time.Second}
if err := srv.agent.AddCheck(chk, chkType, false); err != nil {
if err := srv.agent.AddCheck(chk, chkType, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -419,7 +424,7 @@ func TestHTTPAgentRegisterService(t *testing.T) {
defer srv.agent.Shutdown()
// Register node
req, err := http.NewRequest("GET", "/v1/agent/service/register", nil)
req, err := http.NewRequest("GET", "/v1/agent/service/register?token=abc123", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -463,6 +468,11 @@ func TestHTTPAgentRegisterService(t *testing.T) {
if len(srv.agent.checkTTLs) != 3 {
t.Fatalf("missing test check ttls: %v", srv.agent.checkTTLs)
}
// Ensure the token was configured
if token := srv.agent.state.ServiceToken("test"); token == "" {
t.Fatalf("missing token")
}
}
func TestHTTPAgentDeregisterService(t *testing.T) {
@ -475,7 +485,7 @@ func TestHTTPAgentDeregisterService(t *testing.T) {
ID: "test",
Service: "test",
}
if err := srv.agent.AddService(service, nil, false); err != nil {
if err := srv.agent.AddService(service, nil, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -561,7 +571,7 @@ func TestHTTPAgent_EnableServiceMaintenance(t *testing.T) {
ID: "test",
Service: "test",
}
if err := srv.agent.AddService(service, nil, false); err != nil {
if err := srv.agent.AddService(service, nil, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -599,7 +609,7 @@ func TestHTTPAgent_DisableServiceMaintenance(t *testing.T) {
ID: "test",
Service: "test",
}
if err := srv.agent.AddService(service, nil, false); err != nil {
if err := srv.agent.AddService(service, nil, false, ""); err != nil {
t.Fatalf("err: %v", err)
}

View file

@ -153,7 +153,7 @@ func TestAgent_AddService(t *testing.T) {
Notes: "redis heath check 2",
},
}
err := agent.AddService(srv, chkTypes, false)
err := agent.AddService(srv, chkTypes, false, "")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -197,7 +197,7 @@ func TestAgent_AddService(t *testing.T) {
Notes: "memcache heath check 2",
},
}
if err := agent.AddService(srv, chkTypes, false); err != nil {
if err := agent.AddService(srv, chkTypes, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -261,7 +261,7 @@ func TestAgent_RemoveService(t *testing.T) {
}
chkTypes := CheckTypes{&CheckType{TTL: time.Minute}}
if err := agent.AddService(srv, chkTypes, false); err != nil {
if err := agent.AddService(srv, chkTypes, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -284,7 +284,7 @@ func TestAgent_RemoveService(t *testing.T) {
&CheckType{TTL: time.Minute},
&CheckType{TTL: 30 * time.Second},
}
if err := agent.AddService(srv, chkTypes, false); err != nil {
if err := agent.AddService(srv, chkTypes, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -331,7 +331,7 @@ func TestAgent_AddCheck(t *testing.T) {
Script: "exit 0",
Interval: 15 * time.Second,
}
err := agent.AddCheck(health, chk, false)
err := agent.AddCheck(health, chk, false, "")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -362,7 +362,7 @@ func TestAgent_AddCheck_MinInterval(t *testing.T) {
Script: "exit 0",
Interval: time.Microsecond,
}
err := agent.AddCheck(health, chk, false)
err := agent.AddCheck(health, chk, false, "")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -395,7 +395,7 @@ func TestAgent_AddCheck_MissingService(t *testing.T) {
Script: "exit 0",
Interval: time.Microsecond,
}
err := agent.AddCheck(health, chk, false)
err := agent.AddCheck(health, chk, false, "")
if err == nil || err.Error() != `ServiceID "baz" does not exist` {
t.Fatalf("expected service id error, got: %v", err)
}
@ -426,7 +426,7 @@ func TestAgent_RemoveCheck(t *testing.T) {
Script: "exit 0",
Interval: 15 * time.Second,
}
err := agent.AddCheck(health, chk, false)
err := agent.AddCheck(health, chk, false, "")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -461,7 +461,7 @@ func TestAgent_UpdateCheck(t *testing.T) {
chk := &CheckType{
TTL: 15 * time.Second,
}
err := agent.AddCheck(health, chk, false)
err := agent.AddCheck(health, chk, false, "")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -522,7 +522,7 @@ func TestAgent_PersistService(t *testing.T) {
file := filepath.Join(agent.config.DataDir, servicesDir, stringHash(svc.ID))
// Check is not persisted unless requested
if err := agent.AddService(svc, nil, false); err != nil {
if err := agent.AddService(svc, nil, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := os.Stat(file); err == nil {
@ -530,7 +530,7 @@ func TestAgent_PersistService(t *testing.T) {
}
// Persists to file if requested
if err := agent.AddService(svc, nil, true); err != nil {
if err := agent.AddService(svc, nil, true, "mytoken"); err != nil {
t.Fatalf("err: %v", err)
}
@ -538,7 +538,10 @@ func TestAgent_PersistService(t *testing.T) {
t.Fatalf("err: %s", err)
}
expected, err := json.Marshal(svc)
expected, err := json.Marshal(persistedService{
Token: "mytoken",
Service: svc,
})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -561,6 +564,55 @@ func TestAgent_PersistService(t *testing.T) {
if _, ok := agent2.state.services[svc.ID]; !ok {
t.Fatalf("bad: %#v", agent2.state.services)
}
if agent2.state.serviceTokens[svc.ID] != "mytoken" {
t.Fatalf("bad: %#v", agent2.state.services[svc.ID])
}
}
func TestAgent_persistedService_compat(t *testing.T) {
// Tests backwards compatibility of persisted services from pre-0.5.1
config := nextConfig()
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
defer agent.Shutdown()
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
// Encode the NodeService directly. This is what previous versions
// would serialize to the file (without the wrapper)
encoded, err := json.Marshal(svc)
if err != nil {
t.Fatalf("err: %s", err)
}
// Write the content to the file
file := filepath.Join(agent.config.DataDir, servicesDir, stringHash(svc.ID))
if err := os.MkdirAll(filepath.Dir(file), 0700); err != nil {
t.Fatalf("err: %s", err)
}
if err := ioutil.WriteFile(file, encoded, 0600); err != nil {
t.Fatalf("err: %s", err)
}
// Load the services
if err := agent.loadServices(config); err != nil {
t.Fatalf("err: %s", err)
}
// Ensure the service was restored
services := agent.state.Services()
result, ok := services["redis"]
if !ok {
t.Fatalf("missing service")
}
if !reflect.DeepEqual(result, svc) {
t.Fatalf("bad: %#v", result)
}
}
func TestAgent_PurgeService(t *testing.T) {
@ -577,7 +629,7 @@ func TestAgent_PurgeService(t *testing.T) {
}
file := filepath.Join(agent.config.DataDir, servicesDir, stringHash(svc.ID))
if err := agent.AddService(svc, nil, true); err != nil {
if err := agent.AddService(svc, nil, true, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -613,7 +665,7 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) {
}
// First persist the service
if err := agent.AddService(svc1, nil, true); err != nil {
if err := agent.AddService(svc1, nil, true, ""); err != nil {
t.Fatalf("err: %v", err)
}
agent.Shutdown()
@ -668,7 +720,7 @@ func TestAgent_PersistCheck(t *testing.T) {
file := filepath.Join(agent.config.DataDir, checksDir, stringHash(check.CheckID))
// Not persisted if not requested
if err := agent.AddCheck(check, chkType, false); err != nil {
if err := agent.AddCheck(check, chkType, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := os.Stat(file); err == nil {
@ -676,7 +728,7 @@ func TestAgent_PersistCheck(t *testing.T) {
}
// Should persist if requested
if err := agent.AddCheck(check, chkType, true); err != nil {
if err := agent.AddCheck(check, chkType, true, "mytoken"); err != nil {
t.Fatalf("err: %v", err)
}
@ -684,8 +736,11 @@ func TestAgent_PersistCheck(t *testing.T) {
t.Fatalf("err: %s", err)
}
p := persistedCheck{check, chkType}
expected, err := json.Marshal(p)
expected, err := json.Marshal(persistedCheck{
Check: check,
ChkType: chkType,
Token: "mytoken",
})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -705,7 +760,7 @@ func TestAgent_PersistCheck(t *testing.T) {
}
defer agent2.Shutdown()
result, ok := agent2.state.checks[p.Check.CheckID]
result, ok := agent2.state.checks[check.CheckID]
if !ok {
t.Fatalf("bad: %#v", agent2.state.checks)
}
@ -714,9 +769,12 @@ func TestAgent_PersistCheck(t *testing.T) {
}
// Should have restored the monitor
if _, ok := agent2.checkMonitors[p.Check.CheckID]; !ok {
if _, ok := agent2.checkMonitors[check.CheckID]; !ok {
t.Fatalf("bad: %#v", agent2.checkMonitors)
}
if agent2.state.checkTokens[check.CheckID] != "mytoken" {
t.Fatalf("bad: %s", agent2.state.checkTokens[check.CheckID])
}
}
func TestAgent_PurgeCheck(t *testing.T) {
@ -733,7 +791,7 @@ func TestAgent_PurgeCheck(t *testing.T) {
}
file := filepath.Join(agent.config.DataDir, checksDir, stringHash(check.CheckID))
if err := agent.AddCheck(check, nil, true); err != nil {
if err := agent.AddCheck(check, nil, true, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -769,7 +827,7 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) {
}
// First persist the check
if err := agent.AddCheck(check1, nil, true); err != nil {
if err := agent.AddCheck(check1, nil, true, ""); err != nil {
t.Fatalf("err: %v", err)
}
agent.Shutdown()
@ -806,6 +864,29 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) {
}
}
func TestAgent_loadChecks_token(t *testing.T) {
config := nextConfig()
config.Checks = append(config.Checks, &CheckDefinition{
ID: "rabbitmq",
Name: "rabbitmq",
Token: "abc123",
CheckType: CheckType{
TTL: 10 * time.Second,
},
})
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
defer agent.Shutdown()
checks := agent.state.Checks()
if _, ok := checks["rabbitmq"]; !ok {
t.Fatalf("missing check")
}
if token := agent.state.CheckToken("rabbitmq"); token != "abc123" {
t.Fatalf("bad: %s", token)
}
}
func TestAgent_unloadChecks(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
@ -819,7 +900,7 @@ func TestAgent_unloadChecks(t *testing.T) {
Tags: []string{"foo"},
Port: 8000,
}
if err := agent.AddService(svc, nil, false); err != nil {
if err := agent.AddService(svc, nil, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -832,7 +913,7 @@ func TestAgent_unloadChecks(t *testing.T) {
ServiceID: "redis",
ServiceName: "redis",
}
if err := agent.AddCheck(check1, nil, false); err != nil {
if err := agent.AddCheck(check1, nil, false, ""); err != nil {
t.Fatalf("err: %s", err)
}
found := false
@ -859,6 +940,27 @@ func TestAgent_unloadChecks(t *testing.T) {
}
}
func TestAgent_loadServices_token(t *testing.T) {
config := nextConfig()
config.Services = append(config.Services, &ServiceDefinition{
ID: "rabbitmq",
Name: "rabbitmq",
Port: 5672,
Token: "abc123",
})
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
defer agent.Shutdown()
services := agent.state.Services()
if _, ok := services["rabbitmq"]; !ok {
t.Fatalf("missing service")
}
if token := agent.state.ServiceToken("rabbitmq"); token != "abc123" {
t.Fatalf("bad: %s", token)
}
}
func TestAgent_unloadServices(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
@ -873,7 +975,7 @@ func TestAgent_unloadServices(t *testing.T) {
}
// Register the service
if err := agent.AddService(svc, nil, false); err != nil {
if err := agent.AddService(svc, nil, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
found := false
@ -921,7 +1023,7 @@ func TestAgent_ServiceMaintenanceMode(t *testing.T) {
}
// Register the service
if err := agent.AddService(svc, nil, false); err != nil {
if err := agent.AddService(svc, nil, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -1021,7 +1123,7 @@ func TestAgent_checkStateSnapshot(t *testing.T) {
Tags: []string{"foo"},
Port: 8000,
}
if err := agent.AddService(svc, nil, false); err != nil {
if err := agent.AddService(svc, nil, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -1034,7 +1136,7 @@ func TestAgent_checkStateSnapshot(t *testing.T) {
ServiceID: "redis",
ServiceName: "redis",
}
if err := agent.AddCheck(check1, nil, true); err != nil {
if err := agent.AddCheck(check1, nil, true, ""); err != nil {
t.Fatalf("err: %s", err)
}

View file

@ -257,6 +257,7 @@ func (c *CheckTTL) SetStatus(status, output string) {
type persistedCheck struct {
Check *structs.HealthCheck
ChkType *CheckType
Token string
}
// CheckHTTP is used to periodically make an HTTP request to

View file

@ -36,7 +36,7 @@ type localState struct {
// element due to a go bug.
paused int32
sync.Mutex
sync.RWMutex
logger *log.Logger
// Config is the agent config
@ -48,10 +48,12 @@ type localState struct {
// Services tracks the local services
services map[string]*structs.NodeService
serviceStatus map[string]syncStatus
serviceTokens map[string]string
// Checks tracks the local checks
checks map[string]*structs.HealthCheck
checkStatus map[string]syncStatus
checkTokens map[string]string
// Used to track checks that are being deferred
deferCheck map[string]*time.Timer
@ -71,8 +73,10 @@ func (l *localState) Init(config *Config, logger *log.Logger) {
l.logger = logger
l.services = make(map[string]*structs.NodeService)
l.serviceStatus = make(map[string]syncStatus)
l.serviceTokens = make(map[string]string)
l.checks = make(map[string]*structs.HealthCheck)
l.checkStatus = make(map[string]syncStatus)
l.checkTokens = make(map[string]string)
l.deferCheck = make(map[string]*time.Timer)
l.consulCh = make(chan struct{}, 1)
l.triggerCh = make(chan struct{}, 1)
@ -119,10 +123,27 @@ func (l *localState) isPaused() bool {
return atomic.LoadInt32(&l.paused) == 1
}
// ServiceToken returns the configured ACL token for the given
// service ID. If none is present, the agent's token is returned.
func (l *localState) ServiceToken(id string) string {
l.RLock()
defer l.RUnlock()
return l.serviceToken(id)
}
// serviceToken returns an ACL token associated with a service.
func (l *localState) serviceToken(id string) string {
token := l.serviceTokens[id]
if token == "" {
token = l.config.ACLToken
}
return token
}
// AddService is used to add a service entry to the local state.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (l *localState) AddService(service *structs.NodeService) {
func (l *localState) AddService(service *structs.NodeService, token string) {
// Assign the ID if none given
if service.ID == "" && service.Service != "" {
service.ID = service.Service
@ -133,6 +154,7 @@ func (l *localState) AddService(service *structs.NodeService) {
l.services[service.ID] = service
l.serviceStatus[service.ID] = syncStatus{}
l.serviceTokens[service.ID] = token
l.changeMade()
}
@ -143,6 +165,7 @@ func (l *localState) RemoveService(serviceID string) {
defer l.Unlock()
delete(l.services, serviceID)
delete(l.serviceTokens, serviceID)
l.serviceStatus[serviceID] = syncStatus{remoteDelete: true}
l.changeMade()
}
@ -151,8 +174,8 @@ func (l *localState) RemoveService(serviceID string) {
// agent is aware of and are being kept in sync with the server
func (l *localState) Services() map[string]*structs.NodeService {
services := make(map[string]*structs.NodeService)
l.Lock()
defer l.Unlock()
l.RLock()
defer l.RUnlock()
for name, serv := range l.services {
services[name] = serv
@ -160,10 +183,27 @@ func (l *localState) Services() map[string]*structs.NodeService {
return services
}
// CheckToken is used to return the configured health check token, or
// if none is configured, the default agent ACL token.
func (l *localState) CheckToken(id string) string {
l.RLock()
defer l.RUnlock()
return l.checkToken(id)
}
// checkToken returns an ACL token associated with a check.
func (l *localState) checkToken(id string) string {
token := l.checkTokens[id]
if token == "" {
token = l.config.ACLToken
}
return token
}
// AddCheck is used to add a health check to the local state.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (l *localState) AddCheck(check *structs.HealthCheck) {
func (l *localState) AddCheck(check *structs.HealthCheck, token string) {
// Set the node name
check.Node = l.config.NodeName
@ -172,6 +212,7 @@ func (l *localState) AddCheck(check *structs.HealthCheck) {
l.checks[check.CheckID] = check
l.checkStatus[check.CheckID] = syncStatus{}
l.checkTokens[check.CheckID] = token
l.changeMade()
}
@ -182,6 +223,7 @@ func (l *localState) RemoveCheck(checkID string) {
defer l.Unlock()
delete(l.checks, checkID)
delete(l.checkTokens, checkID)
l.checkStatus[checkID] = syncStatus{remoteDelete: true}
l.changeMade()
}
@ -234,8 +276,8 @@ func (l *localState) UpdateCheck(checkID, status, output string) {
// agent is aware of and are being kept in sync with the server
func (l *localState) Checks() map[string]*structs.HealthCheck {
checks := make(map[string]*structs.HealthCheck)
l.Lock()
defer l.Unlock()
l.RLock()
defer l.RUnlock()
for name, check := range l.checks {
checks[name] = check
@ -441,7 +483,7 @@ func (l *localState) deleteService(id string) error {
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
ServiceID: id,
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
}
var out struct{}
err := l.iface.RPC("Catalog.Deregister", &req, &out)
@ -462,7 +504,7 @@ func (l *localState) deleteCheck(id string) error {
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
CheckID: id,
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
}
var out struct{}
err := l.iface.RPC("Catalog.Deregister", &req, &out)
@ -480,7 +522,7 @@ func (l *localState) syncService(id string) error {
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
Service: l.services[id],
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
}
// If the service has associated checks that are out of sync,
@ -531,13 +573,14 @@ func (l *localState) syncCheck(id string) error {
service = serv
}
}
req := structs.RegisterRequest{
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
Service: service,
Check: l.checks[id],
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
}
var out struct{}
err := l.iface.RPC("Catalog.Register", &req, &out)

View file

@ -34,7 +34,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Tags: []string{"master"},
Port: 5000,
}
agent.state.AddService(srv1)
agent.state.AddService(srv1, "")
args.Service = srv1
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
@ -47,7 +47,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Tags: []string{},
Port: 8000,
}
agent.state.AddService(srv2)
agent.state.AddService(srv2, "")
srv2_mod := new(structs.NodeService)
*srv2_mod = *srv2
@ -64,7 +64,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Tags: []string{},
Port: 80,
}
agent.state.AddService(srv3)
agent.state.AddService(srv3, "")
// Exists remote (delete)
srv4 := &structs.NodeService{
@ -86,7 +86,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Address: "127.0.0.10",
Port: 8000,
}
agent.state.AddService(srv5)
agent.state.AddService(srv5, "")
// Exists local, in sync, remote missing (create)
srv6 := &structs.NodeService{
@ -95,7 +95,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Tags: []string{},
Port: 11211,
}
agent.state.AddService(srv6)
agent.state.AddService(srv6, "")
agent.state.serviceStatus["cache"] = syncStatus{inSync: true}
srv5_mod := new(structs.NodeService)
@ -185,7 +185,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
Tags: []string{"master"},
Port: 5000,
}
agent.state.AddService(srv)
agent.state.AddService(srv, "")
chk := &structs.HealthCheck{
Node: agent.config.NodeName,
@ -194,7 +194,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
ServiceID: "mysql",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk)
agent.state.AddCheck(chk, "")
// Sync the service once
if err := agent.state.syncService("mysql"); err != nil {
@ -236,7 +236,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
Tags: []string{"master"},
Port: 5000,
}
agent.state.AddService(srv)
agent.state.AddService(srv, "")
chk1 := &structs.HealthCheck{
Node: agent.config.NodeName,
@ -245,7 +245,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
ServiceID: "redis",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk1)
agent.state.AddCheck(chk1, "")
chk2 := &structs.HealthCheck{
Node: agent.config.NodeName,
@ -254,7 +254,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
ServiceID: "redis",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk2)
agent.state.AddCheck(chk2, "")
// Sync the service once
if err := agent.state.syncService("redis"); err != nil {
@ -326,7 +326,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
Tags: []string{"master"},
Port: 5000,
}
agent.state.AddService(srv1)
agent.state.AddService(srv1, "")
// Create service (Disallowed)
srv2 := &structs.NodeService{
@ -335,7 +335,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
Tags: []string{"foo"},
Port: 5001,
}
agent.state.AddService(srv2)
agent.state.AddService(srv2, "")
// Trigger anti-entropy run and wait
agent.StartSync()
@ -409,7 +409,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
Name: "mysql",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk1)
agent.state.AddCheck(chk1, "")
args.Check = chk1
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
@ -422,7 +422,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
Name: "redis",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk2)
agent.state.AddCheck(chk2, "")
chk2_mod := new(structs.HealthCheck)
*chk2_mod = *chk2
@ -439,7 +439,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
Name: "web",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk3)
agent.state.AddCheck(chk3, "")
// Exists remote (delete)
chk4 := &structs.HealthCheck{
@ -460,7 +460,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
Name: "cache",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk5)
agent.state.AddCheck(chk5, "")
agent.state.checkStatus["cache"] = syncStatus{inSync: true}
// Trigger anti-entropy run and wait
@ -539,7 +539,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
Status: structs.HealthPassing,
Output: "",
}
agent.state.AddCheck(check)
agent.state.AddCheck(check, "")
// Trigger anti-entropy run and wait
agent.StartSync()
@ -615,6 +615,54 @@ func TestAgentAntiEntropy_deleteCheck_fails(t *testing.T) {
}
}
func TestAgent_serviceTokens(t *testing.T) {
config := nextConfig()
config.ACLToken = "default"
l := new(localState)
l.Init(config, nil)
// Returns default when no token is set
if token := l.ServiceToken("redis"); token != "default" {
t.Fatalf("bad: %s", token)
}
// Returns configured token
l.serviceTokens["redis"] = "abc123"
if token := l.ServiceToken("redis"); token != "abc123" {
t.Fatalf("bad: %s", token)
}
// Removes token
l.RemoveService("redis")
if token := l.ServiceToken("redis"); token != "default" {
t.Fatalf("bad: %s", token)
}
}
func TestAgent_checkTokens(t *testing.T) {
config := nextConfig()
config.ACLToken = "default"
l := new(localState)
l.Init(config, nil)
// Returns default when no token is set
if token := l.CheckToken("mem"); token != "default" {
t.Fatalf("bad: %s", token)
}
// Returns configured token
l.checkTokens["mem"] = "abc123"
if token := l.CheckToken("mem"); token != "abc123" {
t.Fatalf("bad: %s", token)
}
// Removes token
l.RemoveCheck("mem")
if token := l.CheckToken("mem"); token != "default" {
t.Fatalf("bad: %s", token)
}
}
var testRegisterRules = `
service "api" {
policy = "write"

View file

@ -13,6 +13,7 @@ type ServiceDefinition struct {
Port int
Check CheckType
Checks CheckTypes
Token string
}
func (s *ServiceDefinition) NodeService() *structs.NodeService {
@ -45,6 +46,7 @@ type CheckDefinition struct {
Name string
Notes string
ServiceID string
Token string
CheckType `mapstructure:",squash"`
}
@ -62,3 +64,10 @@ func (c *CheckDefinition) HealthCheck(node string) *structs.HealthCheck {
}
return health
}
// persistedService is used to wrap a service definition and bundle it
// with an ACL token so we can restore both at a later agent start.
type persistedService struct {
Token string
Service *structs.NodeService
}

View file

@ -57,7 +57,7 @@ func TestShouldProcessUserEvent(t *testing.T) {
Tags: []string{"test", "foo", "bar", "master"},
Port: 5000,
}
agent.state.AddService(srv1)
agent.state.AddService(srv1, "")
p := &UserEvent{}
if !agent.shouldProcessUserEvent(p) {
@ -159,7 +159,7 @@ func TestFireReceiveEvent(t *testing.T) {
Tags: []string{"test", "foo", "bar", "master"},
Port: 5000,
}
agent.state.AddService(srv1)
agent.state.AddService(srv1, "")
p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"}
err := agent.UserEvent("", p1)

View file

@ -42,7 +42,7 @@ func TestMaintCommandRun_NoArgs(t *testing.T) {
ID: "test",
Service: "test",
}
if err := a1.agent.AddService(service, nil, false); err != nil {
if err := a1.agent.AddService(service, nil, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
if err := a1.agent.EnableServiceMaintenance("test", "broken 1"); err != nil {
@ -132,7 +132,7 @@ func TestMaintCommandRun_EnableServiceMaintenance(t *testing.T) {
ID: "test",
Service: "test",
}
if err := a1.agent.AddService(service, nil, false); err != nil {
if err := a1.agent.AddService(service, nil, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
@ -164,7 +164,7 @@ func TestMaintCommandRun_DisableServiceMaintenance(t *testing.T) {
ID: "test",
Service: "test",
}
if err := a1.agent.AddService(service, nil, false); err != nil {
if err := a1.agent.AddService(service, nil, false, ""); err != nil {
t.Fatalf("err: %v", err)
}

View file

@ -91,6 +91,10 @@ description of the current state of the check. With a script check, the field is
set to any output generated by the script. Similarly, an external process updating
a TTL check via the HTTP interface can set the `notes` value.
Checks may also contain a `token` field to provide an ACL token. This token is
used for any interaction with the catalog for the check, including
[anti-entropy syncs](/docs/internals/anti-entropy.html) and deregistration.
To configure a check, either provide it as a `-config-file` option to the
agent or place it inside the `-config-dir` of the agent. The file must
end in the ".json" extension to be loaded by Consul. Check definitions can

View file

@ -263,6 +263,13 @@ the state of the check.
Optionally, a `ServiceID` can be provided to associate the registered check with an existing service provided by the agent.
This endpoint supports [ACL tokens](/docs/internals/acl.html). If the query
string includes a `?token=<token-id>`, the registration will use the provided
token to authorize the request. The token is also persisted in the agent's
local configuration to enable periodic
[anti-entropy](/docs/internal/anti-entropy.html) syncs and seamless agent
restarts.
The return code is 200 on success.
### <a name="agent_check_deregister"></a> /v1/agent/check/deregister/\<checkId\>
@ -346,6 +353,13 @@ If `Check` is provided, only one of `Script`, `HTTP`, or `TTL` should be specifi
`Script` and `HTTP` also require `Interval`. The created check will be named "service:\<ServiceId\>".
There is more information about checks [here](/docs/agent/checks.html).
This endpoint supports [ACL tokens](/docs/internals/acl.html). If the query
string includes a `?token=<token-id>`, the registration will use the provided
token to authorize the request. The token is also persisted in the agent's
local configuration to enable periodic
[anti-entropy](/docs/internal/anti-entropy.html) syncs and seamless agent
restarts.
The return code is 200 on success.
### <a name="agent_service_deregister"></a> /v1/agent/service/deregister/\<serviceId\>

View file

@ -51,6 +51,10 @@ The `port` field can be used as well to make a service-oriented architecture
simpler to configure; this way, the address and port of a service can
be discovered.
Services may also contain a `token` field to provide an ACL token. This token is
used for any interaction with the catalog for the service, including
[anti-entropy syncs](/docs/internals/anti-entropy.html) and deregistration.
A service can have an associated health check. This is a powerful feature as
it allows a web balancer to gracefully remove failing nodes, a database
to replace a failed slave, etc. The health check is strongly integrated in

View file

@ -189,3 +189,22 @@ This is equivalent to the following JSON input:
}
```
## Services and Checks with ACLs
Consul allows configuring ACL policies which may control access to service and
check registration. In order to successfully register a service or check with
these types of policies in place, a token with sufficient privileges must be
provided to perform the registration into the global catalog. Consul also
performs periodic [anti-entropy](/docs/internals/anti-entropy.html) syncs, which
may require an ACL token to complete. To accommodate this, Consul provides two
methods of configuring ACL tokens to use for registration events:
1. Using the [acl_token](/docs/agent/options.html#acl_token) configuration
directive. This allows a single token to be configured globally and used
during all service and check registration operations.
2. Providing an ACL token with service and check definitions at
registration time. This allows for greater flexibility and enables the use
of multiple tokens on the same agent. Examples of what this looks like are
available for both [services](/docs/agent/services.html) and
[checks](/docs/agent/checks.html). Tokens may also be passed to the
[HTTP API](/docs/agent/http.html) for operations that require them.