Add new config_file_service_registration token (#15828)

This commit is contained in:
Paul Glass 2023-01-10 10:24:02 -06:00 committed by GitHub
parent e0b7694a90
commit 1bf1686ebc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 787 additions and 294 deletions

4
.changelog/15828.txt Normal file
View File

@ -0,0 +1,4 @@
```release-note:feature
acl: Add new `acl.tokens.config_file_registration` config field which specifies the token used
to register services and checks that are defined in config files.
```

View File

@ -280,7 +280,7 @@ func TestACL_vetServiceRegister(t *testing.T) {
a.State.AddServiceWithChecks(&structs.NodeService{
ID: "my-service",
Service: "other",
}, nil, "")
}, nil, "", false)
err = a.vetServiceRegister(serviceRWSecret, &structs.NodeService{
ID: "my-service",
Service: "service",
@ -310,7 +310,7 @@ func TestACL_vetServiceUpdateWithAuthorizer(t *testing.T) {
a.State.AddServiceWithChecks(&structs.NodeService{
ID: "my-service",
Service: "service",
}, nil, "")
}, nil, "", false)
err = vetServiceUpdate(serviceRWSecret, structs.NewServiceID("my-service", nil))
require.NoError(t, err)
@ -367,12 +367,12 @@ func TestACL_vetCheckRegisterWithAuthorizer(t *testing.T) {
a.State.AddServiceWithChecks(&structs.NodeService{
ID: "my-service",
Service: "service",
}, nil, "")
}, nil, "", false)
a.State.AddCheck(&structs.HealthCheck{
CheckID: types.CheckID("my-check"),
ServiceID: "my-service",
ServiceName: "other",
}, "")
}, "", false)
err = vetCheckRegister(serviceRWSecret, &structs.HealthCheck{
CheckID: types.CheckID("my-check"),
ServiceID: "my-service",
@ -384,7 +384,7 @@ func TestACL_vetCheckRegisterWithAuthorizer(t *testing.T) {
// Try to register over a node check without write privs to the node.
a.State.AddCheck(&structs.HealthCheck{
CheckID: types.CheckID("my-node-check"),
}, "")
}, "", false)
err = vetCheckRegister(serviceRWSecret, &structs.HealthCheck{
CheckID: types.CheckID("my-node-check"),
ServiceID: "my-service",
@ -416,12 +416,12 @@ func TestACL_vetCheckUpdateWithAuthorizer(t *testing.T) {
a.State.AddServiceWithChecks(&structs.NodeService{
ID: "my-service",
Service: "service",
}, nil, "")
}, nil, "", false)
a.State.AddCheck(&structs.HealthCheck{
CheckID: types.CheckID("my-service-check"),
ServiceID: "my-service",
ServiceName: "service",
}, "")
}, "", false)
err = vetCheckUpdate(serviceRWSecret, structs.NewCheckID("my-service-check", nil))
require.NoError(t, err)
@ -433,7 +433,7 @@ func TestACL_vetCheckUpdateWithAuthorizer(t *testing.T) {
// Update node check with write privs.
a.State.AddCheck(&structs.HealthCheck{
CheckID: types.CheckID("my-node-check"),
}, "")
}, "", false)
err = vetCheckUpdate(nodeRWSecret, structs.NewCheckID("my-node-check", nil))
require.NoError(t, err)

View File

@ -2435,7 +2435,7 @@ func (a *Agent) addServiceInternal(req addServiceInternalRequest) error {
}
}
err := a.State.AddServiceWithChecks(service, checks, req.token)
err := a.State.AddServiceWithChecks(service, checks, req.token, req.Source == ConfigSourceLocal)
if err != nil {
a.cleanupRegistration(cleanupServices, cleanupChecks)
return err
@ -2771,7 +2771,7 @@ func (a *Agent) addCheckLocked(check *structs.HealthCheck, chkType *structs.Chec
}
// Add to the local state for anti-entropy
err = a.State.AddCheck(check, token)
err = a.State.AddCheck(check, token, source == ConfigSourceLocal)
if err != nil {
return err
}

View File

@ -1496,6 +1496,9 @@ func (s *HTTPHandlers) AgentToken(resp http.ResponseWriter, req *http.Request) (
case "acl_replication_token", "replication":
s.agent.tokens.UpdateReplicationToken(args.Token, token_store.TokenSourceAPI)
case "config_file_service_registration":
s.agent.tokens.UpdateConfigFileRegistrationToken(args.Token, token_store.TokenSourceAPI)
default:
return HTTPError{StatusCode: http.StatusNotFound, Reason: fmt.Sprintf("Token %q is unknown", target)}
}

View File

@ -92,7 +92,7 @@ func TestAgent_Services(t *testing.T) {
},
Port: 5000,
}
require.NoError(t, a.State.AddServiceWithChecks(srv1, nil, ""))
require.NoError(t, a.State.AddServiceWithChecks(srv1, nil, "", false))
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
resp := httptest.NewRecorder()
@ -127,7 +127,7 @@ func TestAgent_ServicesFiltered(t *testing.T) {
},
Port: 5000,
}
require.NoError(t, a.State.AddServiceWithChecks(srv1, nil, ""))
require.NoError(t, a.State.AddServiceWithChecks(srv1, nil, "", false))
// Add another service
srv2 := &structs.NodeService{
@ -139,7 +139,7 @@ func TestAgent_ServicesFiltered(t *testing.T) {
},
Port: 1234,
}
require.NoError(t, a.State.AddServiceWithChecks(srv2, nil, ""))
require.NoError(t, a.State.AddServiceWithChecks(srv2, nil, "", false))
req, _ := http.NewRequest("GET", "/v1/agent/services?filter="+url.QueryEscape("foo in Meta"), nil)
resp := httptest.NewRecorder()
@ -187,7 +187,7 @@ func TestAgent_Services_ExternalConnectProxy(t *testing.T) {
Upstreams: structs.TestUpstreams(t),
},
}
a.State.AddServiceWithChecks(srv1, nil, "")
a.State.AddServiceWithChecks(srv1, nil, "", false)
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
resp := httptest.NewRecorder()
@ -231,7 +231,7 @@ func TestAgent_Services_Sidecar(t *testing.T) {
},
},
}
a.State.AddServiceWithChecks(srv1, nil, "")
a.State.AddServiceWithChecks(srv1, nil, "", false)
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
resp := httptest.NewRecorder()
@ -280,7 +280,7 @@ func TestAgent_Services_MeshGateway(t *testing.T) {
},
},
}
a.State.AddServiceWithChecks(srv1, nil, "")
a.State.AddServiceWithChecks(srv1, nil, "", false)
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
resp := httptest.NewRecorder()
@ -324,7 +324,7 @@ func TestAgent_Services_TerminatingGateway(t *testing.T) {
},
},
}
require.NoError(t, a.State.AddServiceWithChecks(srv1, nil, ""))
require.NoError(t, a.State.AddServiceWithChecks(srv1, nil, "", false))
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
resp := httptest.NewRecorder()
@ -369,7 +369,7 @@ func TestAgent_Services_ACLFilter(t *testing.T) {
},
}
for _, s := range services {
a.State.AddServiceWithChecks(s, nil, "")
a.State.AddServiceWithChecks(s, nil, "", false)
}
t.Run("no token", func(t *testing.T) {
@ -762,7 +762,7 @@ func TestAgent_Checks(t *testing.T) {
Timeout: "5s",
Status: api.HealthPassing,
}
a.State.AddCheck(chk1, "")
a.State.AddCheck(chk1, "", false)
req, _ := http.NewRequest("GET", "/v1/agent/checks", nil)
resp := httptest.NewRecorder()
@ -807,7 +807,7 @@ func TestAgent_ChecksWithFilter(t *testing.T) {
Name: "mysql",
Status: api.HealthPassing,
}
a.State.AddCheck(chk1, "")
a.State.AddCheck(chk1, "", false)
chk2 := &structs.HealthCheck{
Node: a.Config.NodeName,
@ -815,7 +815,7 @@ func TestAgent_ChecksWithFilter(t *testing.T) {
Name: "redis",
Status: api.HealthPassing,
}
a.State.AddCheck(chk2, "")
a.State.AddCheck(chk2, "", false)
req, _ := http.NewRequest("GET", "/v1/agent/checks?filter="+url.QueryEscape("Name == `redis`"), nil)
resp := httptest.NewRecorder()
@ -877,7 +877,7 @@ func TestAgent_HealthServiceByID(t *testing.T) {
ServiceID: "mysql",
Status: api.HealthPassing,
}
err := a.State.AddCheck(chk1, "")
err := a.State.AddCheck(chk1, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -889,7 +889,7 @@ func TestAgent_HealthServiceByID(t *testing.T) {
ServiceID: "mysql",
Status: api.HealthPassing,
}
err = a.State.AddCheck(chk2, "")
err = a.State.AddCheck(chk2, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -901,7 +901,7 @@ func TestAgent_HealthServiceByID(t *testing.T) {
ServiceID: "mysql2",
Status: api.HealthPassing,
}
err = a.State.AddCheck(chk3, "")
err = a.State.AddCheck(chk3, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -913,7 +913,7 @@ func TestAgent_HealthServiceByID(t *testing.T) {
ServiceID: "mysql2",
Status: api.HealthWarning,
}
err = a.State.AddCheck(chk4, "")
err = a.State.AddCheck(chk4, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -925,7 +925,7 @@ func TestAgent_HealthServiceByID(t *testing.T) {
ServiceID: "mysql3",
Status: api.HealthMaint,
}
err = a.State.AddCheck(chk5, "")
err = a.State.AddCheck(chk5, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -937,7 +937,7 @@ func TestAgent_HealthServiceByID(t *testing.T) {
ServiceID: "mysql3",
Status: api.HealthCritical,
}
err = a.State.AddCheck(chk6, "")
err = a.State.AddCheck(chk6, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -996,7 +996,7 @@ func TestAgent_HealthServiceByID(t *testing.T) {
Name: "diskCheck",
Status: api.HealthCritical,
}
err = a.State.AddCheck(nodeCheck, "")
err = a.State.AddCheck(nodeCheck, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
@ -1015,7 +1015,7 @@ func TestAgent_HealthServiceByID(t *testing.T) {
Name: "_node_maintenance",
Status: api.HealthMaint,
}
err = a.State.AddCheck(nodeCheck, "")
err = a.State.AddCheck(nodeCheck, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -1091,7 +1091,7 @@ func TestAgent_HealthServiceByName(t *testing.T) {
ServiceName: "mysql-pool-r",
Status: api.HealthPassing,
}
err := a.State.AddCheck(chk1, "")
err := a.State.AddCheck(chk1, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -1104,7 +1104,7 @@ func TestAgent_HealthServiceByName(t *testing.T) {
ServiceName: "mysql-pool-r",
Status: api.HealthWarning,
}
err = a.State.AddCheck(chk2, "")
err = a.State.AddCheck(chk2, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -1117,7 +1117,7 @@ func TestAgent_HealthServiceByName(t *testing.T) {
ServiceName: "mysql-pool-r",
Status: api.HealthPassing,
}
err = a.State.AddCheck(chk3, "")
err = a.State.AddCheck(chk3, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -1130,7 +1130,7 @@ func TestAgent_HealthServiceByName(t *testing.T) {
ServiceName: "mysql-pool-r",
Status: api.HealthCritical,
}
err = a.State.AddCheck(chk4, "")
err = a.State.AddCheck(chk4, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -1143,7 +1143,7 @@ func TestAgent_HealthServiceByName(t *testing.T) {
ServiceName: "mysql-pool-rw",
Status: api.HealthWarning,
}
err = a.State.AddCheck(chk5, "")
err = a.State.AddCheck(chk5, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -1156,7 +1156,7 @@ func TestAgent_HealthServiceByName(t *testing.T) {
ServiceName: "mysql-pool-rw",
Status: api.HealthPassing,
}
err = a.State.AddCheck(chk6, "")
err = a.State.AddCheck(chk6, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -1169,7 +1169,7 @@ func TestAgent_HealthServiceByName(t *testing.T) {
ServiceName: "httpd",
Status: api.HealthPassing,
}
err = a.State.AddCheck(chk7, "")
err = a.State.AddCheck(chk7, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -1182,7 +1182,7 @@ func TestAgent_HealthServiceByName(t *testing.T) {
ServiceName: "httpd",
Status: api.HealthPassing,
}
err = a.State.AddCheck(chk8, "")
err = a.State.AddCheck(chk8, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -1248,7 +1248,7 @@ func TestAgent_HealthServiceByName(t *testing.T) {
Name: "diskCheck",
Status: api.HealthCritical,
}
err = a.State.AddCheck(nodeCheck, "")
err = a.State.AddCheck(nodeCheck, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
@ -1267,7 +1267,7 @@ func TestAgent_HealthServiceByName(t *testing.T) {
Name: "_node_maintenance",
Status: api.HealthMaint,
}
err = a.State.AddCheck(nodeCheck, "")
err = a.State.AddCheck(nodeCheck, "", false)
if err != nil {
t.Fatalf("Err: %v", err)
}
@ -1366,7 +1366,7 @@ func TestAgent_Checks_ACLFilter(t *testing.T) {
},
}
for _, c := range checks {
a.State.AddCheck(c, "")
a.State.AddCheck(c, "", false)
}
t.Run("no token", func(t *testing.T) {
@ -6221,6 +6221,7 @@ func TestAgent_Token(t *testing.T) {
agent = ""
agent_recovery = ""
replication = ""
config_file_service_registration = ""
}
}
`)
@ -6236,6 +6237,8 @@ func TestAgent_Token(t *testing.T) {
agentRecoverySource tokenStore.TokenSource
repl string
replSource tokenStore.TokenSource
registration string
registrationSource tokenStore.TokenSource
}
resetTokens := func(init tokens) {
@ -6243,6 +6246,7 @@ func TestAgent_Token(t *testing.T) {
a.tokens.UpdateAgentToken(init.agent, init.agentSource)
a.tokens.UpdateAgentRecoveryToken(init.agentRecovery, init.agentRecoverySource)
a.tokens.UpdateReplicationToken(init.repl, init.replSource)
a.tokens.UpdateConfigFileRegistrationToken(init.registration, init.registrationSource)
}
body := func(token string) io.Reader {
@ -6362,6 +6366,15 @@ func TestAgent_Token(t *testing.T) {
raw: tokens{repl: "R", replSource: tokenStore.TokenSourceAPI},
effective: tokens{repl: "R"},
},
{
name: "set registration",
method: "PUT",
url: "config_file_service_registration?token=root",
body: body("G"),
code: http.StatusOK,
raw: tokens{registration: "G", registrationSource: tokenStore.TokenSourceAPI},
effective: tokens{registration: "G"},
},
{
name: "clear user legacy",
method: "PUT",
@ -6443,6 +6456,15 @@ func TestAgent_Token(t *testing.T) {
init: tokens{repl: "R"},
raw: tokens{replSource: tokenStore.TokenSourceAPI},
},
{
name: "clear registration",
method: "PUT",
url: "config_file_service_registration?token=root",
body: body(""),
code: http.StatusOK,
init: tokens{registration: "G"},
raw: tokens{registrationSource: tokenStore.TokenSourceAPI},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -6461,6 +6483,7 @@ func TestAgent_Token(t *testing.T) {
require.Equal(t, tt.effective.agent, a.tokens.AgentToken())
require.Equal(t, tt.effective.agentRecovery, a.tokens.AgentRecoveryToken())
require.Equal(t, tt.effective.repl, a.tokens.ReplicationToken())
require.Equal(t, tt.effective.registration, a.tokens.ConfigFileRegistrationToken())
tok, src := a.tokens.UserTokenAndSource()
require.Equal(t, tt.raw.user, tok)
@ -6477,6 +6500,10 @@ func TestAgent_Token(t *testing.T) {
tok, src = a.tokens.ReplicationTokenAndSource()
require.Equal(t, tt.raw.repl, tok)
require.Equal(t, tt.raw.replSource, src)
tok, src = a.tokens.ConfigFileRegistrationTokenAndSource()
require.Equal(t, tt.raw.registration, tok)
require.Equal(t, tt.raw.registrationSource, src)
})
}
@ -8009,7 +8036,7 @@ func TestAgent_Services_ExposeConfig(t *testing.T) {
},
},
}
a.State.AddServiceWithChecks(srv1, nil, "")
a.State.AddServiceWithChecks(srv1, nil, "", false)
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
resp := httptest.NewRecorder()

View File

@ -870,6 +870,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
ACLAgentToken: stringVal(c.ACL.Tokens.Agent),
ACLAgentRecoveryToken: stringVal(c.ACL.Tokens.AgentRecovery),
ACLReplicationToken: stringVal(c.ACL.Tokens.Replication),
ACLConfigFileRegistrationToken: stringVal(c.ACL.Tokens.ConfigFileRegistration),
},
// Autopilot

View File

@ -759,6 +759,7 @@ type Tokens struct {
AgentRecovery *string `mapstructure:"agent_recovery"`
Default *string `mapstructure:"default"`
Agent *string `mapstructure:"agent"`
ConfigFileRegistration *string `mapstructure:"config_file_service_registration"`
// Enterprise Only
ManagedServiceProvider []ServiceProviderToken `mapstructure:"managed_service_provider"`

View File

@ -18,6 +18,7 @@
"ACLAgentToken": "hidden",
"ACLDefaultToken": "hidden",
"ACLReplicationToken": "hidden",
"ACLConfigFileRegistrationToken": "hidden",
"DataDir": "",
"EnablePersistence": false,
"EnterpriseConfig": {}

View File

@ -80,6 +80,10 @@ type ServiceState struct {
// but has not been removed on the server yet.
Deleted bool
// IsLocallyDefined indicates whether the service was defined locally in config
// as opposed to being registered through the Agent API.
IsLocallyDefined bool
// WatchCh is closed when the service state changes. Suitable for use in a
// memdb.WatchSet when watching agent local changes with hash-based blocking.
WatchCh chan struct{}
@ -124,6 +128,10 @@ type CheckState struct {
// Deleted is true when the health check record has been marked as
// deleted but has not been removed on the server yet.
Deleted bool
// IsLocallyDefined indicates whether the check was defined locally in config
// as opposed to being registered through the Agent API.
IsLocallyDefined bool
}
// Clone returns a shallow copy of the object.
@ -246,14 +254,19 @@ func (l *State) ServiceToken(id structs.ServiceID) string {
// aclTokenForServiceSync returns an ACL token associated with a service. If there is
// no ACL token associated with the service, fallback is used to return a value.
// This method is not synchronized and the lock must already be held.
func (l *State) aclTokenForServiceSync(id structs.ServiceID, fallback func() string) string {
func (l *State) aclTokenForServiceSync(id structs.ServiceID, fallbacks ...func() string) string {
if s := l.services[id]; s != nil && s.Token != "" {
return s.Token
}
return fallback()
for _, fb := range fallbacks {
if tok := fb(); tok != "" {
return tok
}
}
return ""
}
func (l *State) addServiceLocked(service *structs.NodeService, token string) error {
func (l *State) addServiceLocked(service *structs.NodeService, token string, isLocal bool) error {
if service == nil {
return fmt.Errorf("no service")
}
@ -277,23 +290,25 @@ func (l *State) addServiceLocked(service *structs.NodeService, token string) err
l.setServiceStateLocked(&ServiceState{
Service: service,
Token: token,
IsLocallyDefined: isLocal,
})
return nil
}
// AddServiceWithChecks adds a service entry and its checks to the local state atomically
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (l *State) AddServiceWithChecks(service *structs.NodeService, checks []*structs.HealthCheck, token string) error {
// AddServiceWithChecks adds a service entry and its checks to the local state
// atomically This entry is persistent and the agent will make a best effort to
// ensure it is registered. The isLocallyDefined parameter indicates whether
// the service and checks are sourced from local agent configuration files.
func (l *State) AddServiceWithChecks(service *structs.NodeService, checks []*structs.HealthCheck, token string, isLocallyDefined bool) error {
l.Lock()
defer l.Unlock()
if err := l.addServiceLocked(service, token); err != nil {
if err := l.addServiceLocked(service, token, isLocallyDefined); err != nil {
return err
}
for _, check := range checks {
if err := l.addCheckLocked(check, token); err != nil {
if err := l.addCheckLocked(check, token, isLocallyDefined); err != nil {
return err
}
}
@ -508,24 +523,30 @@ func (l *State) CheckToken(id structs.CheckID) string {
// aclTokenForCheckSync returns an ACL token associated with a check. If there is
// no ACL token associated with the check, the callback is used to return a value.
// This method is not synchronized and the lock must already be held.
func (l *State) aclTokenForCheckSync(id structs.CheckID, fallback func() string) string {
func (l *State) aclTokenForCheckSync(id structs.CheckID, fallbacks ...func() string) string {
if c := l.checks[id]; c != nil && c.Token != "" {
return c.Token
}
return fallback()
for _, fb := range fallbacks {
if tok := fb(); tok != "" {
return tok
}
}
return ""
}
// AddCheck is used to add a health check to the local state.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
// AddCheck is used to add a health check to the local state. This entry is
// persistent and the agent will make a best effort to ensure it is registered.
// The isLocallyDefined parameter indicates whether the checks are sourced from
// local agent configuration files.
func (l *State) AddCheck(check *structs.HealthCheck, token string, isLocallyDefined bool) error {
l.Lock()
defer l.Unlock()
return l.addCheckLocked(check, token)
return l.addCheckLocked(check, token, isLocallyDefined)
}
func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error {
func (l *State) addCheckLocked(check *structs.HealthCheck, token string, isLocal bool) error {
if check == nil {
return fmt.Errorf("no check")
}
@ -557,6 +578,7 @@ func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error {
l.setCheckStateLocked(&CheckState{
Check: check,
Token: token,
IsLocallyDefined: isLocal,
})
return nil
}
@ -1366,9 +1388,32 @@ func (l *State) pruneCheck(id structs.CheckID) {
delete(l.checks, id)
}
// serviceRegistrationTokenFallback returns a fallback function to be used when
// determining the token to use for service sync.
//
// The fallback function will return the config file registration token if the
// given service was sourced from a service definition in a config file.
func (l *State) serviceRegistrationTokenFallback(key structs.ServiceID) func() string {
return func() string {
if s := l.services[key]; s != nil && s.IsLocallyDefined {
return l.tokens.ConfigFileRegistrationToken()
}
return ""
}
}
func (l *State) checkRegistrationTokenFallback(key structs.CheckID) func() string {
return func() string {
if s := l.checks[key]; s != nil && s.IsLocallyDefined {
return l.tokens.ConfigFileRegistrationToken()
}
return ""
}
}
// syncService is used to sync a service to the server
func (l *State) syncService(key structs.ServiceID) error {
st := l.aclTokenForServiceSync(key, l.tokens.UserToken)
st := l.aclTokenForServiceSync(key, l.serviceRegistrationTokenFallback(key), l.tokens.UserToken)
// If the service has associated checks that are out of sync,
// piggyback them on the service sync so they are part of the
@ -1384,7 +1429,7 @@ func (l *State) syncService(key structs.ServiceID) error {
if !key.Matches(c.Check.CompoundServiceID()) {
continue
}
if st != l.aclTokenForCheckSync(checkKey, l.tokens.UserToken) {
if st != l.aclTokenForCheckSync(checkKey, l.checkRegistrationTokenFallback(checkKey), l.tokens.UserToken) {
continue
}
checks = append(checks, c.Check)
@ -1452,7 +1497,7 @@ func (l *State) syncService(key structs.ServiceID) error {
// syncCheck is used to sync a check to the server
func (l *State) syncCheck(key structs.CheckID) error {
c := l.checks[key]
ct := l.aclTokenForCheckSync(key, l.tokens.UserToken)
ct := l.aclTokenForCheckSync(key, l.checkRegistrationTokenFallback(key), l.tokens.UserToken)
req := structs.RegisterRequest{
Datacenter: l.config.Datacenter,
ID: l.config.NodeID,

View File

@ -0,0 +1,79 @@
package local
import (
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/stretchr/testify/require"
)
func TestRegistrationTokenFallback(t *testing.T) {
svcId := structs.NewServiceID("redis", nil)
addServiceFn := func(l *State, isLocal bool) error {
return l.AddServiceWithChecks(&structs.NodeService{ID: svcId.ID}, nil, "", isLocal)
}
svcTokenFallback := func(l *State) func() string {
return l.serviceRegistrationTokenFallback(svcId)
}
testRegistrationTokenFallback(t, "service", addServiceFn, svcTokenFallback)
checkId := structs.NewCheckID("redis-check", nil)
addCheckFn := func(l *State, isLocal bool) error {
return l.AddCheck(&structs.HealthCheck{CheckID: checkId.ID}, "", isLocal)
}
checkTokenFallback := func(l *State) func() string {
return l.checkRegistrationTokenFallback(checkId)
}
testRegistrationTokenFallback(t, "check", addCheckFn, checkTokenFallback)
}
func testRegistrationTokenFallback(
t *testing.T,
prefix string,
addResourceFn func(*State, bool) error,
tokenFallback func(*State) func() string,
) {
cases := map[string]struct {
registrationToken string
isLocal bool
addResource func(*State, bool) error
expToken string
}{
"defaults to empty token": {},
"empty token when registration token not configured": {
addResource: addResourceFn,
},
"empty token when resource not found": {
registrationToken: "token123",
},
"registration token is used when resource is locally-defined": {
registrationToken: "token123",
addResource: addResourceFn,
isLocal: true,
expToken: "token123",
},
"empty token when resource is not locally-defined": {
registrationToken: "token123",
addResource: addResourceFn,
},
}
for name, c := range cases {
t.Run(prefix+" "+name, func(t *testing.T) {
tokens := new(token.Store)
tokens.Load(token.Config{
ACLConfigFileRegistrationToken: c.registrationToken,
}, nil)
l := NewState(Config{}, nil, tokens)
l.TriggerSyncChanges = func() {}
if c.addResource != nil {
require.NoError(t, c.addResource(l, c.isLocal))
}
fn := tokenFallback(l)
require.Equal(t, c.expToken, fn())
})
}
}

View File

@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"testing"
"time"
@ -22,6 +24,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types"
@ -65,7 +68,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
assert.False(t, a.State.ServiceExists(structs.ServiceID{ID: srv1.ID}))
a.State.AddServiceWithChecks(srv1, nil, "")
a.State.AddServiceWithChecks(srv1, nil, "", false)
assert.True(t, a.State.ServiceExists(structs.ServiceID{ID: srv1.ID}))
args.Service = srv1
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
@ -84,7 +87,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddServiceWithChecks(srv2, nil, "")
a.State.AddServiceWithChecks(srv2, nil, "", false)
srv2_mod := new(structs.NodeService)
*srv2_mod = *srv2
@ -106,7 +109,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddServiceWithChecks(srv3, nil, "")
a.State.AddServiceWithChecks(srv3, nil, "", false)
// Exists remote (delete)
srv4 := &structs.NodeService{
@ -138,7 +141,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddServiceWithChecks(srv5, nil, "")
a.State.AddServiceWithChecks(srv5, nil, "", false)
srv5_mod := new(structs.NodeService)
*srv5_mod = *srv5
@ -291,7 +294,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddServiceWithChecks(srv1, nil, "")
a.State.AddServiceWithChecks(srv1, nil, "", false)
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", &structs.RegisterRequest{
Datacenter: "dc1",
Node: a.Config.NodeName,
@ -312,7 +315,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddServiceWithChecks(srv2, nil, "")
a.State.AddServiceWithChecks(srv2, nil, "", false)
srv2_mod := clone(srv2)
srv2_mod.Port = 9000
@ -336,7 +339,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddServiceWithChecks(srv3, nil, "")
a.State.AddServiceWithChecks(srv3, nil, "", false)
// Exists remote (delete)
srv4 := &structs.NodeService{
@ -497,7 +500,7 @@ func TestAgent_ServiceWatchCh(t *testing.T) {
Tags: []string{"tag1"},
Port: 6100,
}
require.NoError(t, a.State.AddServiceWithChecks(srv1, nil, ""))
require.NoError(t, a.State.AddServiceWithChecks(srv1, nil, "", false))
verifyState := func(ss *local.ServiceState) {
require.NotNil(t, ss)
@ -519,7 +522,7 @@ func TestAgent_ServiceWatchCh(t *testing.T) {
go func() {
srv2 := srv1
srv2.Port = 6200
require.NoError(t, a.State.AddServiceWithChecks(srv2, nil, ""))
require.NoError(t, a.State.AddServiceWithChecks(srv2, nil, "", false))
}()
// We should observe WatchCh close
@ -596,7 +599,7 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddServiceWithChecks(srv1, nil, "")
a.State.AddServiceWithChecks(srv1, nil, "", false)
// register a local service with tag override disabled
srv2 := &structs.NodeService{
@ -611,7 +614,7 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddServiceWithChecks(srv2, nil, "")
a.State.AddServiceWithChecks(srv2, nil, "", false)
// make sure they are both in the catalog
if err := a.State.SyncChanges(); err != nil {
@ -723,7 +726,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
Tags: []string{"primary"},
Port: 5000,
}
a.State.AddServiceWithChecks(srv, nil, "")
a.State.AddServiceWithChecks(srv, nil, "", false)
chk := &structs.HealthCheck{
Node: a.Config.NodeName,
@ -732,7 +735,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
ServiceID: "mysql",
Status: api.HealthPassing,
}
a.State.AddCheck(chk, "")
a.State.AddCheck(chk, "", false)
if err := a.State.SyncFull(); err != nil {
t.Fatal("sync failed: ", err)
@ -773,7 +776,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
Tags: []string{"primary"},
Port: 5000,
}
a.State.AddServiceWithChecks(srv, nil, "")
a.State.AddServiceWithChecks(srv, nil, "", false)
chk1 := &structs.HealthCheck{
Node: a.Config.NodeName,
@ -782,7 +785,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
ServiceID: "redis",
Status: api.HealthPassing,
}
a.State.AddCheck(chk1, "")
a.State.AddCheck(chk1, "", false)
chk2 := &structs.HealthCheck{
Node: a.Config.NodeName,
@ -791,7 +794,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
ServiceID: "redis",
Status: api.HealthPassing,
}
a.State.AddCheck(chk2, "")
a.State.AddCheck(chk2, "", false)
if err := a.State.SyncFull(); err != nil {
t.Fatal("sync failed: ", err)
@ -874,7 +877,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddServiceWithChecks(srv1, nil, token)
a.State.AddServiceWithChecks(srv1, nil, token, false)
// Create service (allowed)
srv2 := &structs.NodeService{
@ -888,7 +891,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddServiceWithChecks(srv2, nil, token)
a.State.AddServiceWithChecks(srv2, nil, token, false)
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
@ -984,6 +987,165 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
}
}
func TestAgentAntiEntropy_ConfigFileRegistrationToken(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
tokens := map[string]string{
"api": "5ece2854-989a-4e7a-8145-4801c13350d5",
"web": "b85e99b7-8d97-45a3-a175-5f33e167177b",
}
// Configure the agent with the config_file_service_registration token.
agentConfig := fmt.Sprintf(`
primary_datacenter = "dc1"
acl {
enabled = true
default_policy = "deny"
tokens {
initial_management = "root"
config_file_service_registration = "%s"
}
}
`, tokens["api"])
// We need separate files because we can't put multiple 'service' stanzas in one config string/file.
dir := testutil.TempDir(t, "config")
apiFile := filepath.Join(dir, "api.hcl")
dbFile := filepath.Join(dir, "db.hcl")
webFile := filepath.Join(dir, "web.hcl")
// The "api" service and checks are able to register because the config_file_service_registration token
// has service:write for the "api" service.
require.NoError(t, os.WriteFile(apiFile, []byte(`
service {
name = "api"
id = "api"
check {
id = "api inline check"
status = "passing"
ttl = "99999h"
}
}
check {
id = "api standalone check"
status = "passing"
service_id = "api"
ttl = "99999h"
}
`), 0600))
// The "db" service and check is unable to register because the config_file_service_registration token
// does not have service:write for "db" and there are no inline tokens.
require.NoError(t, os.WriteFile(dbFile, []byte(`
service {
name = "db"
id = "db"
}
check {
id = "db standalone check"
service_id = "db"
status = "passing"
ttl = "99999h"
}
`), 0600))
// The "web" service is able to register because the inline tokens have service:write for "web".
// This tests that inline tokens take precedence over the config_file_service_registration token.
require.NoError(t, os.WriteFile(webFile, []byte(fmt.Sprintf(`
service {
name = "web"
id = "web"
token = "%[1]s"
}
check {
id = "web standalone check"
service_id = "web"
status = "passing"
ttl = "99999h"
token = "%[1]s"
}
`, tokens["web"])), 0600))
a := agent.NewTestAgentWithConfigFile(t, agentConfig, []string{apiFile, dbFile, webFile})
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Create the tokens referenced in the config files.
for svc, secret := range tokens {
req := structs.ACLTokenSetRequest{
ACLToken: structs.ACLToken{
SecretID: secret,
ServiceIdentities: []*structs.ACLServiceIdentity{{ServiceName: svc}},
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := a.RPC(context.Background(), "ACL.TokenSet", &req, &structs.ACLToken{}); err != nil {
t.Fatalf("err: %v", err)
}
}
// All services are added from files into local state.
assert.True(t, a.State.ServiceExists(structs.ServiceID{ID: "api"}))
assert.True(t, a.State.ServiceExists(structs.ServiceID{ID: "db"}))
assert.True(t, a.State.ServiceExists(structs.ServiceID{ID: "web"}))
// Sync services with the remote.
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
// Validate which services were able to register.
var services structs.IndexedNodeServices
require.NoError(t, a.RPC(
context.Background(),
"Catalog.NodeServices",
&structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: a.Config.NodeName,
QueryOptions: structs.QueryOptions{Token: "root"},
},
&services,
))
assert.Len(t, services.NodeServices.Services, 3)
assert.Contains(t, services.NodeServices.Services, "api")
assert.Contains(t, services.NodeServices.Services, "consul")
assert.Contains(t, services.NodeServices.Services, "web")
// No token with permission to register the "db" service.
assert.NotContains(t, services.NodeServices.Services, "db")
// Validate which checks were able to register.
var checks structs.IndexedHealthChecks
require.NoError(t, a.RPC(
context.Background(),
"Health.NodeChecks",
&structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: a.Config.NodeName,
QueryOptions: structs.QueryOptions{Token: "root"},
},
&checks,
))
sort.Slice(checks.HealthChecks, func(i, j int) bool {
return checks.HealthChecks[i].CheckID < checks.HealthChecks[j].CheckID
})
assert.Len(t, checks.HealthChecks, 4)
assert.Equal(t, checks.HealthChecks[0].CheckID, types.CheckID("api inline check"))
assert.Equal(t, checks.HealthChecks[1].CheckID, types.CheckID("api standalone check"))
assert.Equal(t, checks.HealthChecks[2].CheckID, types.CheckID("serfHealth"))
assert.Equal(t, checks.HealthChecks[3].CheckID, types.CheckID("web standalone check"))
}
type RPC interface {
RPC(ctx context.Context, method string, args interface{}, reply interface{}) error
}
@ -1044,7 +1206,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
Status: api.HealthPassing,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddCheck(chk1, "")
a.State.AddCheck(chk1, "", false)
args.Check = chk1
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
@ -1058,7 +1220,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
Status: api.HealthPassing,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddCheck(chk2, "")
a.State.AddCheck(chk2, "", false)
chk2_mod := new(structs.HealthCheck)
*chk2_mod = *chk2
@ -1076,7 +1238,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
Status: api.HealthPassing,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddCheck(chk3, "")
a.State.AddCheck(chk3, "", false)
// Exists remote (delete)
chk4 := &structs.HealthCheck{
@ -1333,7 +1495,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddServiceWithChecks(srv1, nil, "root")
a.State.AddServiceWithChecks(srv1, nil, "root", false)
srv2 := &structs.NodeService{
ID: "api",
Service: "api",
@ -1345,7 +1507,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddServiceWithChecks(srv2, nil, "root")
a.State.AddServiceWithChecks(srv2, nil, "root", false)
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
@ -1401,7 +1563,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
Status: api.HealthPassing,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddCheck(chk1, token)
a.State.AddCheck(chk1, token, false)
// This one will be allowed.
chk2 := &structs.HealthCheck{
@ -1414,7 +1576,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
Status: api.HealthPassing,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddCheck(chk2, token)
a.State.AddCheck(chk2, token, false)
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
@ -1537,7 +1699,7 @@ func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) {
Status: api.HealthPassing,
Output: "first output",
}
if err := a.State.AddCheck(check, ""); err != nil {
if err := a.State.AddCheck(check, "", false); err != nil {
t.Fatalf("bad: %s", err)
}
if err := a.State.SyncFull(); err != nil {
@ -1586,7 +1748,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
Status: api.HealthPassing,
Output: "",
}
a.State.AddCheck(check, "")
a.State.AddCheck(check, "", false)
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
@ -1862,14 +2024,14 @@ func TestState_ServiceTokens(t *testing.T) {
})
t.Run("empty string when there is no token", func(t *testing.T) {
err := l.AddServiceWithChecks(&structs.NodeService{ID: "redis"}, nil, "")
err := l.AddServiceWithChecks(&structs.NodeService{ID: "redis"}, nil, "", false)
require.NoError(t, err)
require.Equal(t, "", l.ServiceToken(id))
})
t.Run("returns configured token", func(t *testing.T) {
err := l.AddServiceWithChecks(&structs.NodeService{ID: "redis"}, nil, "abc123")
err := l.AddServiceWithChecks(&structs.NodeService{ID: "redis"}, nil, "abc123", false)
require.NoError(t, err)
require.Equal(t, "abc123", l.ServiceToken(id))
@ -1904,14 +2066,14 @@ func TestState_CheckTokens(t *testing.T) {
})
t.Run("empty string when there is no token", func(t *testing.T) {
err := l.AddCheck(&structs.HealthCheck{CheckID: "mem"}, "")
err := l.AddCheck(&structs.HealthCheck{CheckID: "mem"}, "", false)
require.NoError(t, err)
require.Equal(t, "", l.CheckToken(id))
})
t.Run("returns configured token", func(t *testing.T) {
err := l.AddCheck(&structs.HealthCheck{CheckID: "mem"}, "abc123")
err := l.AddCheck(&structs.HealthCheck{CheckID: "mem"}, "abc123", false)
require.NoError(t, err)
require.Equal(t, "abc123", l.CheckToken(id))
@ -1932,7 +2094,7 @@ func TestAgent_CheckCriticalTime(t *testing.T) {
l.TriggerSyncChanges = func() {}
svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000}
l.AddServiceWithChecks(svc, nil, "")
l.AddServiceWithChecks(svc, nil, "", false)
// Add a passing check and make sure it's not critical.
checkID := types.CheckID("redis:1")
@ -1943,7 +2105,7 @@ func TestAgent_CheckCriticalTime(t *testing.T) {
ServiceID: "redis",
Status: api.HealthPassing,
}
l.AddCheck(chk, "")
l.AddCheck(chk, "", false)
if checks := l.CriticalCheckStates(structs.DefaultEnterpriseMetaInDefaultPartition()); len(checks) > 0 {
t.Fatalf("should not have any critical checks")
}
@ -2006,7 +2168,7 @@ func TestAgent_AddCheckFailure(t *testing.T) {
}
wantErr := errors.New(`Check ID "redis:1" refers to non-existent service ID "redis"`)
got := l.AddCheck(chk, "")
got := l.AddCheck(chk, "", false)
require.Equal(t, wantErr, got)
}
@ -2018,10 +2180,10 @@ func TestAgent_AliasCheck(t *testing.T) {
l.TriggerSyncChanges = func() {}
// Add checks
require.NoError(t, l.AddServiceWithChecks(&structs.NodeService{Service: "s1"}, nil, ""))
require.NoError(t, l.AddServiceWithChecks(&structs.NodeService{Service: "s2"}, nil, ""))
require.NoError(t, l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("c1"), ServiceID: "s1"}, ""))
require.NoError(t, l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("c2"), ServiceID: "s2"}, ""))
require.NoError(t, l.AddServiceWithChecks(&structs.NodeService{Service: "s1"}, nil, "", false))
require.NoError(t, l.AddServiceWithChecks(&structs.NodeService{Service: "s2"}, nil, "", false))
require.NoError(t, l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("c1"), ServiceID: "s1"}, "", false))
require.NoError(t, l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("c2"), ServiceID: "s2"}, "", false))
// Add an alias
notifyCh := make(chan struct{}, 1)
@ -2072,7 +2234,7 @@ func TestAgent_AliasCheck_ServiceNotification(t *testing.T) {
require.NoError(t, l.AddAliasCheck(structs.NewCheckID(types.CheckID("a1"), nil), structs.NewServiceID("s1", nil), notifyCh))
// Add aliased service, s1, and verify we get notified
require.NoError(t, l.AddServiceWithChecks(&structs.NodeService{Service: "s1"}, nil, ""))
require.NoError(t, l.AddServiceWithChecks(&structs.NodeService{Service: "s1"}, nil, "", false))
select {
case <-notifyCh:
default:
@ -2080,7 +2242,7 @@ func TestAgent_AliasCheck_ServiceNotification(t *testing.T) {
}
// Re-adding same service should not lead to a notification
require.NoError(t, l.AddServiceWithChecks(&structs.NodeService{Service: "s1"}, nil, ""))
require.NoError(t, l.AddServiceWithChecks(&structs.NodeService{Service: "s1"}, nil, "", false))
select {
case <-notifyCh:
t.Fatal("notify received")
@ -2088,7 +2250,7 @@ func TestAgent_AliasCheck_ServiceNotification(t *testing.T) {
}
// Add different service and verify we do not get notified
require.NoError(t, l.AddServiceWithChecks(&structs.NodeService{Service: "s2"}, nil, ""))
require.NoError(t, l.AddServiceWithChecks(&structs.NodeService{Service: "s2"}, nil, "", false))
select {
case <-notifyCh:
t.Fatal("notify received")
@ -2193,7 +2355,7 @@ func TestState_RemoveServiceErrorMessages(t *testing.T) {
err := state.AddServiceWithChecks(&structs.NodeService{
ID: "web-id",
Service: "web-name",
}, nil, "")
}, nil, "", false)
require.NoError(t, err)
// Attempt to remove service that doesn't exist
@ -2233,7 +2395,7 @@ func TestState_Notify(t *testing.T) {
// Add a service
err := state.AddServiceWithChecks(&structs.NodeService{
Service: "web",
}, nil, "fake-token-web")
}, nil, "fake-token-web", false)
require.NoError(t, err)
// Should have a notification
@ -2244,7 +2406,7 @@ func TestState_Notify(t *testing.T) {
err = state.AddServiceWithChecks(&structs.NodeService{
Service: "web",
Port: 4444,
}, nil, "fake-token-web")
}, nil, "fake-token-web", false)
require.NoError(t, err)
// Should have a notification
@ -2264,7 +2426,7 @@ func TestState_Notify(t *testing.T) {
// Add a service
err = state.AddServiceWithChecks(&structs.NodeService{
Service: "web",
}, nil, "fake-token-web")
}, nil, "fake-token-web", false)
require.NoError(t, err)
// Should NOT have a notification
@ -2294,7 +2456,7 @@ func TestAliasNotifications_local(t *testing.T) {
Address: "127.0.0.10",
Port: 8080,
}
a.State.AddServiceWithChecks(srv, nil, "")
a.State.AddServiceWithChecks(srv, nil, "", false)
scID := "socat-sidecar-proxy"
sc := &structs.NodeService{
@ -2304,7 +2466,7 @@ func TestAliasNotifications_local(t *testing.T) {
Address: "127.0.0.10",
Port: 9090,
}
a.State.AddServiceWithChecks(sc, nil, "")
a.State.AddServiceWithChecks(sc, nil, "", false)
tcpID := types.CheckID("service:socat-tcp")
chk0 := &structs.HealthCheck{
@ -2314,7 +2476,7 @@ func TestAliasNotifications_local(t *testing.T) {
Status: api.HealthPassing,
ServiceID: svcID,
}
a.State.AddCheck(chk0, "")
a.State.AddCheck(chk0, "", false)
// Register an alias for the service
proxyID := types.CheckID("service:socat-sidecar-proxy:2")
@ -2328,7 +2490,7 @@ func TestAliasNotifications_local(t *testing.T) {
chkt := &structs.CheckType{
AliasService: svcID,
}
require.NoError(t, a.AddCheck(chk1, chkt, true, "", agent.ConfigSourceLocal))
require.NoError(t, a.AddCheck(chk1, chkt, true, "", agent.ConfigSourceLocal), false)
// Add a failing check to the same service ID, alias should also fail
maintID := types.CheckID("service:socat-maintenance")
@ -2339,7 +2501,7 @@ func TestAliasNotifications_local(t *testing.T) {
Status: api.HealthCritical,
ServiceID: svcID,
}
a.State.AddCheck(chk2, "")
a.State.AddCheck(chk2, "", false)
retry.Run(t, func(r *retry.R) {
check := a.State.Check(structs.NewCheckID(proxyID, nil))
@ -2394,14 +2556,14 @@ func TestState_SyncChanges_DuplicateAddServiceOnlySyncsOnce(t *testing.T) {
{Node: "this-node", CheckID: "the-id-2", Name: "check-healthy-2"},
}
tok := "the-token"
err := state.AddServiceWithChecks(srv, checks, tok)
err := state.AddServiceWithChecks(srv, checks, tok, false)
require.NoError(t, err)
require.NoError(t, state.SyncChanges())
// 4 rpc calls, one node register, one service register, two checks
require.Len(t, rpc.calls, 4)
// adding the service again should not catalog register
err = state.AddServiceWithChecks(srv, checks, tok)
err = state.AddServiceWithChecks(srv, checks, tok, false)
require.NoError(t, err)
require.NoError(t, state.SyncChanges())
require.Len(t, rpc.calls, 4)

View File

@ -38,7 +38,7 @@ func TestServerHTTPChecks(t *testing.T) {
localState := testLocalState(t)
mockCacheSource := newMockServiceHTTPChecks(t)
if tc.serviceInLocalState {
require.NoError(t, localState.AddServiceWithChecks(&structs.NodeService{ID: serviceID.ID}, nil, ""))
require.NoError(t, localState.AddServiceWithChecks(&structs.NodeService{ID: serviceID.ID}, nil, "", false))
}
if tc.req.NodeName == nodeName && tc.serviceInLocalState {
mockCacheSource.On("Notify", ctx, tc.req, correlationID, ch).Return(cacheResult)

View File

@ -145,7 +145,7 @@ func TestConfigSource_LocallyManagedService(t *testing.T) {
token := "token"
localState := testLocalState(t)
localState.AddServiceWithChecks(&structs.NodeService{ID: serviceID.ID}, nil, "")
localState.AddServiceWithChecks(&structs.NodeService{ID: serviceID.ID}, nil, "", false)
localWatcher := NewMockWatcher(t)
localWatcher.On("Watch", serviceID, nodeName, token).

View File

@ -32,7 +32,7 @@ func TestSync(t *testing.T) {
state.AddServiceWithChecks(&structs.NodeService{
ID: serviceID,
Kind: structs.ServiceKindConnectProxy,
}, nil, serviceToken)
}, nil, serviceToken, false)
cfgMgr := NewMockConfigManager(t)
@ -99,7 +99,7 @@ func TestSync(t *testing.T) {
state.AddServiceWithChecks(&structs.NodeService{
ID: serviceID,
Kind: structs.ServiceKindConnectProxy,
}, nil, "")
}, nil, "", false)
select {
case reg := <-registerCh:

View File

@ -22,6 +22,7 @@ type Config struct {
ACLAgentToken string
ACLAgentRecoveryToken string
ACLReplicationToken string
ACLConfigFileRegistrationToken string
EnterpriseConfig
}
@ -72,6 +73,7 @@ type persistedTokens struct {
AgentRecovery string `json:"agent_recovery,omitempty"`
Default string `json:"default,omitempty"`
Agent string `json:"agent,omitempty"`
ConfigFileRegistration string `json:"config_file_service_registration,omitempty"`
}
type fileStore struct {
@ -129,6 +131,16 @@ func loadTokens(s *Store, cfg Config, tokens persistedTokens, logger Logger) {
s.UpdateReplicationToken(cfg.ACLReplicationToken, TokenSourceConfig)
}
if tokens.ConfigFileRegistration != "" {
s.UpdateConfigFileRegistrationToken(tokens.ConfigFileRegistration, TokenSourceAPI)
if cfg.ACLConfigFileRegistrationToken != "" {
logger.Warn("\"config_file_service_registration\" token present in both the configuration and persisted token store, using the persisted token")
}
} else {
s.UpdateConfigFileRegistrationToken(cfg.ACLConfigFileRegistrationToken, TokenSourceConfig)
}
loadEnterpriseTokens(s, cfg)
}
@ -187,6 +199,10 @@ func (p *fileStore) saveToFile(s *Store) error {
tokens.Replication = tok
}
if tok, source := s.ConfigFileRegistrationTokenAndSource(); tok != "" && source == TokenSourceAPI {
tokens.ConfigFileRegistration = tok
}
data, err := json.Marshal(tokens)
if err != nil {
p.logger.Warn("failed to persist tokens", "error", err)

View File

@ -23,12 +23,14 @@ func TestStore_Load(t *testing.T) {
ACLAgentRecoveryToken: "bravo",
ACLDefaultToken: "charlie",
ACLReplicationToken: "delta",
ACLConfigFileRegistrationToken: "echo",
}
require.NoError(t, store.Load(cfg, logger))
require.Equal(t, "alfa", store.AgentToken())
require.Equal(t, "bravo", store.AgentRecoveryToken())
require.Equal(t, "charlie", store.UserToken())
require.Equal(t, "delta", store.ReplicationToken())
require.Equal(t, "echo", store.ConfigFileRegistrationToken())
})
t.Run("updated from Config", func(t *testing.T) {
@ -38,6 +40,7 @@ func TestStore_Load(t *testing.T) {
ACLAgentToken: "foxtrot",
ACLAgentRecoveryToken: "golf",
ACLReplicationToken: "hotel",
ACLConfigFileRegistrationToken: "india",
}
// ensures no error for missing persisted tokens file
require.NoError(t, store.Load(cfg, logger))
@ -45,6 +48,7 @@ func TestStore_Load(t *testing.T) {
require.Equal(t, "foxtrot", store.AgentToken())
require.Equal(t, "golf", store.AgentRecoveryToken())
require.Equal(t, "hotel", store.ReplicationToken())
require.Equal(t, "india", store.ConfigFileRegistrationToken())
})
t.Run("with persisted tokens", func(t *testing.T) {
@ -54,13 +58,15 @@ func TestStore_Load(t *testing.T) {
ACLAgentToken: "foxtrot",
ACLAgentRecoveryToken: "golf",
ACLReplicationToken: "hotel",
ACLConfigFileRegistrationToken: "delta",
}
tokens := `{
"agent" : "india",
"agent_recovery" : "juliett",
"default": "kilo",
"replication" : "lima"
"replication": "lima",
"config_file_service_registration": "mike"
}`
require.NoError(t, os.WriteFile(tokenFile, []byte(tokens), 0600))
@ -71,6 +77,7 @@ func TestStore_Load(t *testing.T) {
require.Equal(t, "foxtrot", store.AgentToken())
require.Equal(t, "golf", store.AgentRecoveryToken())
require.Equal(t, "hotel", store.ReplicationToken())
require.Equal(t, "delta", store.ConfigFileRegistrationToken())
cfg.EnablePersistence = true
require.NoError(t, store.Load(cfg, logger))
@ -79,6 +86,7 @@ func TestStore_Load(t *testing.T) {
require.Equal(t, "juliett", store.AgentRecoveryToken())
require.Equal(t, "kilo", store.UserToken())
require.Equal(t, "lima", store.ReplicationToken())
require.Equal(t, "mike", store.ConfigFileRegistrationToken())
// check store persistence was enabled
require.NotNil(t, store.persistence)
@ -103,7 +111,8 @@ func TestStore_Load(t *testing.T) {
"agent" : "mike",
"agent_recovery" : "november",
"default": "oscar",
"replication" : "papa"
"replication" : "papa",
"config_file_service_registration" : "lima"
}`
cfg := Config{
@ -113,6 +122,7 @@ func TestStore_Load(t *testing.T) {
ACLAgentToken: "romeo",
ACLAgentRecoveryToken: "sierra",
ACLReplicationToken: "tango",
ACLConfigFileRegistrationToken: "uniform",
}
require.NoError(t, os.WriteFile(tokenFile, []byte(tokens), 0600))
@ -122,6 +132,7 @@ func TestStore_Load(t *testing.T) {
require.Equal(t, "november", store.AgentRecoveryToken())
require.Equal(t, "oscar", store.UserToken())
require.Equal(t, "papa", store.ReplicationToken())
require.Equal(t, "lima", store.ConfigFileRegistrationToken())
})
t.Run("with some persisted tokens", func(t *testing.T) {
@ -137,6 +148,7 @@ func TestStore_Load(t *testing.T) {
ACLAgentToken: "xray",
ACLAgentRecoveryToken: "yankee",
ACLReplicationToken: "zulu",
ACLConfigFileRegistrationToken: "victor",
}
require.NoError(t, os.WriteFile(tokenFile, []byte(tokens), 0600))
@ -146,6 +158,7 @@ func TestStore_Load(t *testing.T) {
require.Equal(t, "victor", store.AgentRecoveryToken())
require.Equal(t, "whiskey", store.UserToken())
require.Equal(t, "zulu", store.ReplicationToken())
require.Equal(t, "victor", store.ConfigFileRegistrationToken())
})
t.Run("persisted file contains invalid data", func(t *testing.T) {
@ -156,6 +169,7 @@ func TestStore_Load(t *testing.T) {
ACLAgentToken: "two",
ACLAgentRecoveryToken: "three",
ACLReplicationToken: "four",
ACLConfigFileRegistrationToken: "five",
}
require.NoError(t, os.WriteFile(tokenFile, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, 0600))
@ -167,6 +181,7 @@ func TestStore_Load(t *testing.T) {
require.Equal(t, "two", store.AgentToken())
require.Equal(t, "three", store.AgentRecoveryToken())
require.Equal(t, "four", store.ReplicationToken())
require.Equal(t, "five", store.ConfigFileRegistrationToken())
})
t.Run("persisted file contains invalid json", func(t *testing.T) {
@ -177,6 +192,7 @@ func TestStore_Load(t *testing.T) {
ACLAgentToken: "bravo",
ACLAgentRecoveryToken: "charlie",
ACLReplicationToken: "foxtrot",
ACLConfigFileRegistrationToken: "golf",
}
require.NoError(t, os.WriteFile(tokenFile, []byte("[1,2,3]"), 0600))
@ -188,10 +204,12 @@ func TestStore_Load(t *testing.T) {
require.Equal(t, "bravo", store.AgentToken())
require.Equal(t, "charlie", store.AgentRecoveryToken())
require.Equal(t, "foxtrot", store.ReplicationToken())
require.Equal(t, "golf", store.ConfigFileRegistrationToken())
})
}
func TestStore_WithPersistenceLock(t *testing.T) {
setupStore := func() (string, *Store) {
dataDir := testutil.TempDir(t, "datadir")
store := new(Store)
cfg := Config{
@ -201,27 +219,56 @@ func TestStore_WithPersistenceLock(t *testing.T) {
ACLAgentToken: "agent-token",
ACLAgentRecoveryToken: "recovery-token",
ACLReplicationToken: "replication-token",
ACLConfigFileRegistrationToken: "registration-token",
}
err := store.Load(cfg, hclog.New(nil))
require.NoError(t, err)
f := func() error {
updated := store.UpdateUserToken("the-new-token", TokenSourceAPI)
require.True(t, updated)
updated = store.UpdateAgentRecoveryToken("the-new-recovery-token", TokenSourceAPI)
require.True(t, updated)
return nil
return dataDir, store
}
err = store.WithPersistenceLock(f)
require.NoError(t, err)
requirePersistedTokens := func(t *testing.T, dataDir string, expected persistedTokens) {
t.Helper()
tokens, err := readPersistedFromFile(filepath.Join(dataDir, tokensPath))
require.NoError(t, err)
expected := persistedTokens{
Default: "the-new-token",
AgentRecovery: "the-new-recovery-token",
}
require.Equal(t, expected, tokens)
}
t.Run("persist some tokens", func(t *testing.T) {
dataDir, store := setupStore()
err := store.WithPersistenceLock(func() error {
require.True(t, store.UpdateUserToken("the-new-default-token", TokenSourceAPI))
require.True(t, store.UpdateAgentRecoveryToken("the-new-recovery-token", TokenSourceAPI))
return nil
})
require.NoError(t, err)
// Only API-sourced tokens are persisted.
requirePersistedTokens(t, dataDir, persistedTokens{
Default: "the-new-default-token",
AgentRecovery: "the-new-recovery-token",
})
})
t.Run("persist all tokens", func(t *testing.T) {
dataDir, store := setupStore()
err := store.WithPersistenceLock(func() error {
require.True(t, store.UpdateUserToken("the-new-default-token", TokenSourceAPI))
require.True(t, store.UpdateAgentToken("the-new-agent-token", TokenSourceAPI))
require.True(t, store.UpdateAgentRecoveryToken("the-new-recovery-token", TokenSourceAPI))
require.True(t, store.UpdateReplicationToken("the-new-replication-token", TokenSourceAPI))
require.True(t, store.UpdateConfigFileRegistrationToken("the-new-registration-token", TokenSourceAPI))
return nil
})
require.NoError(t, err)
requirePersistedTokens(t, dataDir, persistedTokens{
Default: "the-new-default-token",
Agent: "the-new-agent-token",
AgentRecovery: "the-new-recovery-token",
Replication: "the-new-replication-token",
ConfigFileRegistration: "the-new-registration-token",
})
})
}

View File

@ -20,6 +20,7 @@ const (
TokenKindAgentRecovery
TokenKindUser
TokenKindReplication
TokenKindConfigFileRegistration
)
type watcher struct {
@ -74,6 +75,13 @@ type Store struct {
// replicationTokenSource indicates where this token originated from
replicationTokenSource TokenSource
// configFileRegistrationToken is used to register services and checks
// that are defined in configuration files.
configFileRegistrationToken string
// configFileRegistrationTokenSource indicates where this token originated from
configFileRegistrationTokenSource TokenSource
watchers map[int]watcher
watcherIndex int
@ -163,54 +171,43 @@ func (t *Store) sendNotificationLocked(kinds ...TokenKind) {
// UpdateUserToken replaces the current user token in the store.
// Returns true if it was changed.
func (t *Store) UpdateUserToken(token string, source TokenSource) bool {
t.l.Lock()
changed := t.userToken != token || t.userTokenSource != source
t.userToken = token
t.userTokenSource = source
if changed {
t.sendNotificationLocked(TokenKindUser)
}
t.l.Unlock()
return changed
return t.updateToken(token, source, &t.userToken, &t.userTokenSource, TokenKindUser)
}
// UpdateAgentToken replaces the current agent token in the store.
// Returns true if it was changed.
func (t *Store) UpdateAgentToken(token string, source TokenSource) bool {
t.l.Lock()
changed := t.agentToken != token || t.agentTokenSource != source
t.agentToken = token
t.agentTokenSource = source
if changed {
t.sendNotificationLocked(TokenKindAgent)
}
t.l.Unlock()
return changed
return t.updateToken(token, source, &t.agentToken, &t.agentTokenSource, TokenKindAgent)
}
// UpdateAgentRecoveryToken replaces the current agent recovery token in the store.
// Returns true if it was changed.
func (t *Store) UpdateAgentRecoveryToken(token string, source TokenSource) bool {
t.l.Lock()
changed := t.agentRecoveryToken != token || t.agentRecoveryTokenSource != source
t.agentRecoveryToken = token
t.agentRecoveryTokenSource = source
if changed {
t.sendNotificationLocked(TokenKindAgentRecovery)
}
t.l.Unlock()
return changed
return t.updateToken(token, source, &t.agentRecoveryToken,
&t.agentRecoveryTokenSource, TokenKindAgentRecovery)
}
// UpdateReplicationToken replaces the current replication token in the store.
// Returns true if it was changed.
func (t *Store) UpdateReplicationToken(token string, source TokenSource) bool {
return t.updateToken(token, source, &t.replicationToken,
&t.replicationTokenSource, TokenKindReplication)
}
// UpdateConfigFileRegistrationToken replaces the current config file registration token
// in the store. Returns true if it was changed.
func (t *Store) UpdateConfigFileRegistrationToken(token string, source TokenSource) bool {
return t.updateToken(token, source, &t.configFileRegistrationToken,
&t.configFileRegistrationTokenSource, TokenKindConfigFileRegistration)
}
func (t *Store) updateToken(token string, source TokenSource, dstToken *string, dstSource *TokenSource, kind TokenKind) bool {
t.l.Lock()
changed := t.replicationToken != token || t.replicationTokenSource != source
t.replicationToken = token
t.replicationTokenSource = source
changed := *dstToken != token || *dstSource != source
*dstToken = token
*dstSource = source
if changed {
t.sendNotificationLocked(TokenKindReplication)
t.sendNotificationLocked(kind)
}
t.l.Unlock()
return changed
@ -254,6 +251,13 @@ func (t *Store) ReplicationToken() string {
return t.replicationToken
}
func (t *Store) ConfigFileRegistrationToken() string {
t.l.RLock()
defer t.l.RUnlock()
return t.configFileRegistrationToken
}
// UserToken returns the best token to use for user operations.
func (t *Store) UserTokenAndSource() (string, TokenSource) {
t.l.RLock()
@ -277,7 +281,7 @@ func (t *Store) AgentRecoveryTokenAndSource() (string, TokenSource) {
return t.agentRecoveryToken, t.agentRecoveryTokenSource
}
// ReplicationToken returns the replication token.
// ReplicationTokenAndSource returns the replication token and its source.
func (t *Store) ReplicationTokenAndSource() (string, TokenSource) {
t.l.RLock()
defer t.l.RUnlock()
@ -285,6 +289,13 @@ func (t *Store) ReplicationTokenAndSource() (string, TokenSource) {
return t.replicationToken, t.replicationTokenSource
}
func (t *Store) ConfigFileRegistrationTokenAndSource() (string, TokenSource) {
t.l.RLock()
defer t.l.RUnlock()
return t.configFileRegistrationToken, t.configFileRegistrationTokenSource
}
// IsAgentRecoveryToken checks to see if a given token is the agent recovery token.
// This will never match an empty token for safety.
func (t *Store) IsAgentRecoveryToken(token string) bool {

View File

@ -16,6 +16,8 @@ func TestStore_RegularTokens(t *testing.T) {
recoverySource TokenSource
repl string
replSource TokenSource
registration string
registrationSource TokenSource
}
tests := []struct {
@ -78,11 +80,23 @@ func TestStore_RegularTokens(t *testing.T) {
raw: tokens{recovery: "M", recoverySource: TokenSourceAPI},
effective: tokens{recovery: "M"},
},
{
name: "set registration - config",
set: tokens{registration: "G", registrationSource: TokenSourceConfig},
raw: tokens{registration: "G", registrationSource: TokenSourceConfig},
effective: tokens{registration: "G"},
},
{
name: "set registration - api",
set: tokens{registration: "G", registrationSource: TokenSourceAPI},
raw: tokens{registration: "G", registrationSource: TokenSourceAPI},
effective: tokens{registration: "G"},
},
{
name: "set all",
set: tokens{user: "U", agent: "A", repl: "R", recovery: "M"},
raw: tokens{user: "U", agent: "A", repl: "R", recovery: "M"},
effective: tokens{user: "U", agent: "A", repl: "R", recovery: "M"},
set: tokens{user: "U", agent: "A", repl: "R", recovery: "M", registration: "G"},
raw: tokens{user: "U", agent: "A", repl: "R", recovery: "M", registration: "G"},
effective: tokens{user: "U", agent: "A", repl: "R", recovery: "M", registration: "G"},
},
}
for _, tt := range tests {
@ -104,16 +118,22 @@ func TestStore_RegularTokens(t *testing.T) {
require.True(t, s.UpdateAgentRecoveryToken(tt.set.recovery, tt.set.recoverySource))
}
if tt.set.registration != "" {
require.True(t, s.UpdateConfigFileRegistrationToken(tt.set.registration, tt.set.registrationSource))
}
// If they don't change then they return false.
require.False(t, s.UpdateUserToken(tt.set.user, tt.set.userSource))
require.False(t, s.UpdateAgentToken(tt.set.agent, tt.set.agentSource))
require.False(t, s.UpdateReplicationToken(tt.set.repl, tt.set.replSource))
require.False(t, s.UpdateAgentRecoveryToken(tt.set.recovery, tt.set.recoverySource))
require.False(t, s.UpdateConfigFileRegistrationToken(tt.set.registration, tt.set.registrationSource))
require.Equal(t, tt.effective.user, s.UserToken())
require.Equal(t, tt.effective.agent, s.AgentToken())
require.Equal(t, tt.effective.recovery, s.AgentRecoveryToken())
require.Equal(t, tt.effective.repl, s.ReplicationToken())
require.Equal(t, tt.effective.registration, s.ConfigFileRegistrationToken())
tok, src := s.UserTokenAndSource()
require.Equal(t, tt.raw.user, tok)
@ -130,6 +150,10 @@ func TestStore_RegularTokens(t *testing.T) {
tok, src = s.ReplicationTokenAndSource()
require.Equal(t, tt.raw.repl, tok)
require.Equal(t, tt.raw.replSource, src)
tok, src = s.ConfigFileRegistrationTokenAndSource()
require.Equal(t, tt.raw.registration, tok)
require.Equal(t, tt.raw.registrationSource, src)
})
}
}
@ -183,20 +207,22 @@ func TestStore_Notify(t *testing.T) {
agentRecoveryNotifier := newNotification(t, s, TokenKindAgentRecovery)
replicationNotifier := newNotification(t, s, TokenKindReplication)
replicationNotifier2 := newNotification(t, s, TokenKindReplication)
registrationNotifier := newNotification(t, s, TokenKindConfigFileRegistration)
// perform an update of the user token
require.True(t, s.UpdateUserToken("edcae2a2-3b51-4864-b412-c7a568f49cb1", TokenSourceConfig))
// do it again to ensure it doesn't block even though nothing has read from the 1 buffered chan yet
require.True(t, s.UpdateUserToken("47788919-f944-476a-bda5-446d64be1df8", TokenSourceAPI))
// ensure notifications were sent to the user and all notifiers
// ensure notifications were sent to the user notifier and all other notifiers were not notified.
requireNotNotified(t, agentNotifier.Ch)
requireNotifiedOnce(t, userNotifier.Ch)
requireNotNotified(t, replicationNotifier.Ch)
requireNotNotified(t, agentRecoveryNotifier.Ch)
requireNotNotified(t, replicationNotifier2.Ch)
requireNotNotified(t, registrationNotifier.Ch)
// now update the agent token which should send notificaitons to the agent and all notifier
// update the agent token which should send a notification to the agent notifier.
require.True(t, s.UpdateAgentToken("5d748ec2-d536-461f-8e2a-1f7eae98d559", TokenSourceAPI))
requireNotifiedOnce(t, agentNotifier.Ch)
@ -204,8 +230,9 @@ func TestStore_Notify(t *testing.T) {
requireNotNotified(t, replicationNotifier.Ch)
requireNotNotified(t, agentRecoveryNotifier.Ch)
requireNotNotified(t, replicationNotifier2.Ch)
requireNotNotified(t, registrationNotifier.Ch)
// now update the agent recovery token which should send notificaitons to the agent recovery and all notifier
// update the agent recovery token which should send a notification to the agent recovery notifier.
require.True(t, s.UpdateAgentRecoveryToken("789badc8-f850-43e1-8742-9b9f484957cc", TokenSourceAPI))
requireNotNotified(t, agentNotifier.Ch)
@ -213,8 +240,9 @@ func TestStore_Notify(t *testing.T) {
requireNotNotified(t, replicationNotifier.Ch)
requireNotifiedOnce(t, agentRecoveryNotifier.Ch)
requireNotNotified(t, replicationNotifier2.Ch)
requireNotNotified(t, registrationNotifier.Ch)
// now update the replication token which should send notificaitons to the replication and all notifier
// update the replication token which should send a notification to the replication notifier.
require.True(t, s.UpdateReplicationToken("789badc8-f850-43e1-8742-9b9f484957cc", TokenSourceAPI))
requireNotNotified(t, agentNotifier.Ch)
@ -222,10 +250,11 @@ func TestStore_Notify(t *testing.T) {
requireNotifiedOnce(t, replicationNotifier.Ch)
requireNotNotified(t, agentRecoveryNotifier.Ch)
requireNotifiedOnce(t, replicationNotifier2.Ch)
requireNotNotified(t, registrationNotifier.Ch)
s.StopNotify(replicationNotifier2)
// now update the replication token which should send notificaitons to the replication and all notifier
// update the replication token which should send a notification to the replication notifier.
require.True(t, s.UpdateReplicationToken("eb0b56b9-fa65-4ae1-902a-c64457c62ac6", TokenSourceAPI))
requireNotNotified(t, agentNotifier.Ch)
@ -233,16 +262,29 @@ func TestStore_Notify(t *testing.T) {
requireNotifiedOnce(t, replicationNotifier.Ch)
requireNotNotified(t, agentRecoveryNotifier.Ch)
requireNotNotified(t, replicationNotifier2.Ch)
requireNotNotified(t, registrationNotifier.Ch)
// request updates but that are not changes
// update the config file registration token which should send a notification to the replication notifier.
require.True(t, s.UpdateConfigFileRegistrationToken("82fe7362-7d83-4f43-bb27-c35f1f15083c", TokenSourceAPI))
requireNotNotified(t, agentNotifier.Ch)
requireNotNotified(t, userNotifier.Ch)
requireNotNotified(t, replicationNotifier.Ch)
requireNotNotified(t, agentRecoveryNotifier.Ch)
requireNotNotified(t, replicationNotifier2.Ch)
requireNotifiedOnce(t, registrationNotifier.Ch)
// request updates that are not changes
require.False(t, s.UpdateAgentToken("5d748ec2-d536-461f-8e2a-1f7eae98d559", TokenSourceAPI))
require.False(t, s.UpdateAgentRecoveryToken("789badc8-f850-43e1-8742-9b9f484957cc", TokenSourceAPI))
require.False(t, s.UpdateUserToken("47788919-f944-476a-bda5-446d64be1df8", TokenSourceAPI))
require.False(t, s.UpdateReplicationToken("eb0b56b9-fa65-4ae1-902a-c64457c62ac6", TokenSourceAPI))
require.False(t, s.UpdateConfigFileRegistrationToken("82fe7362-7d83-4f43-bb27-c35f1f15083c", TokenSourceAPI))
// ensure that notifications were not sent
requireNotNotified(t, agentNotifier.Ch)
requireNotNotified(t, userNotifier.Ch)
requireNotNotified(t, replicationNotifier.Ch)
requireNotNotified(t, agentRecoveryNotifier.Ch)
requireNotNotified(t, registrationNotifier.Ch)
}

View File

@ -65,7 +65,7 @@ func TestShouldProcessUserEvent(t *testing.T) {
Tags: []string{"test", "foo", "bar", "primary"},
Port: 5000,
}
a.State.AddServiceWithChecks(srv1, nil, "")
a.State.AddServiceWithChecks(srv1, nil, "", false)
p := &UserEvent{}
if !a.shouldProcessUserEvent(p) {
@ -173,7 +173,7 @@ func TestFireReceiveEvent(t *testing.T) {
Tags: []string{"test", "foo", "bar", "primary"},
Port: 5000,
}
a.State.AddServiceWithChecks(srv1, nil, "")
a.State.AddServiceWithChecks(srv1, nil, "", false)
p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"}
err := a.UserEvent("dc1", "root", p1)

View File

@ -1331,6 +1331,12 @@ func (a *Agent) UpdateReplicationACLToken(token string, q *WriteOptions) (*Write
return a.updateTokenFallback(token, q, "replication", "acl_replication_token")
}
// UpdateConfigFileRegistrationToken updates the agent's "replication" token. See updateToken
// for more details
func (a *Agent) UpdateConfigFileRegistrationToken(token string, q *WriteOptions) (*WriteMeta, error) {
return a.updateToken("config_file_service_registration", token, q)
}
// updateToken can be used to update one of an agent's ACL tokens after the agent has
// started. The tokens are may not be persisted, so will need to be updated again if
// the agent is restarted unless the agent is configured to persist them.

View File

@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/stretchr/testify/assert"
"net/http"
"net/http/httptest"
"net/http/httputil"
@ -16,6 +15,7 @@ import (
"time"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/sdk/testutil"
@ -1580,6 +1580,11 @@ func TestAPI_AgentUpdateToken(t *testing.T) {
if _, err := agent.UpdateReplicationACLToken("root", nil); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := agent.UpdateConfigFileRegistrationToken("root", nil); err != nil {
t.Fatalf("err: %v", err)
}
})
t.Run("new with fallback", func(t *testing.T) {
@ -1665,6 +1670,9 @@ func TestAPI_AgentUpdateToken(t *testing.T) {
_, err = agent.UpdateReplicationACLToken("root", nil)
require.Error(t, err)
_, err = agent.UpdateConfigFileRegistrationToken("root", nil)
require.Error(t, err)
})
}

View File

@ -58,6 +58,8 @@ func (c *cmd) Run(args []string) int {
_, err = client.Agent().UpdateAgentRecoveryACLToken(token, nil)
case "replication":
_, err = client.Agent().UpdateReplicationACLToken(token, nil)
case "config_file_service_registration":
_, err = client.Agent().UpdateConfigFileRegistrationToken(token, nil)
default:
c.UI.Error(fmt.Sprintf("Unknown token type"))
return 1
@ -107,9 +109,9 @@ const synopsis = "Assign tokens for the Consul Agent's usage"
const help = `
Usage: consul acl set-agent-token [options] TYPE TOKEN
This command will set the corresponding token for the agent to use.
Note that the tokens uploaded this way are not persisted and if
the agent reloads then the tokens will need to be set again.
This command will set the corresponding token for the agent to use. If token
persistence is not enabled, then tokens uploaded this way are not persisted
and if the agent reloads then the tokens will need to be set again.
Token Types:
@ -128,6 +130,13 @@ Usage: consul acl set-agent-token [options] TYPE TOKEN
operations. This token will need to be configured with read access
to whatever data is being replicated.
config_file_service_registration This is the token that the agent uses to register services
and checks defined in config files. This token needs to
be configured with permission for the service or checks
being registered. If not set, the default token is used.
If a service or check definition contains a 'token'
field, then that token is used instead.
Example:
$ consul acl set-agent-token default c4d0f8df-3aba-4ab6-a7a0-35b760dc29a1

View File

@ -754,12 +754,14 @@ is restarted.
| `PUT` | `/agent/token/default` | `application/json` |
| `PUT` | `/agent/token/agent` | `application/json` |
| `PUT` | `/agent/token/agent_recovery` | `application/json` |
| `PUT` | `/agent/token/config_file_service_registration` | `application/json` |
| `PUT` | `/agent/token/replication` | `application/json` |
The paths above correspond to the token names as found in the agent configuration:
[`default`](/docs/agent/config/config-files#acl_tokens_default), [`agent`](/docs/agent/config/config-files#acl_tokens_agent),
[`agent_recovery`](/docs/agent/config/config-files#acl_tokens_agent_recovery), and
[`replication`](/docs/agent/config/config-files#acl_tokens_replication).
[`agent_recovery`](/docs/agent/config/config-files#acl_tokens_agent_recovery),
[`config_file_service_registration`](/docs/agent/config/config-files#acl_tokens_config_file_service_registration),
and [`replication`](/docs/agent/config/config-files#acl_tokens_replication).
-> **Deprecation Note:** The following paths were deprecated in version 1.11

View File

@ -46,6 +46,13 @@ The token types are:
operations. This token will need to be configured with read access to
whatever data is being replicated.
- `config_file_service_registration` - This is the token that the agent uses to
register services and checks defined in config files. This token needs to be
configured with write permissions for the services or checks being registered.
If not set, the `default` token is used. If a service or check definition
contains a `token` field, then that token is used to register that service or
check instead of the `config_file_service_registration` token.
### API Options
@include 'http_api_options_client.mdx'

View File

@ -914,6 +914,28 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
- `agent_master` ((#acl_tokens_agent_master)) **Renamed in Consul 1.11 to
[`acl.tokens.agent_recovery`](#acl_tokens_agent_recovery).**
- `config_file_service_registration` ((#acl_tokens_config_file_service_registration)) - The ACL
token this agent uses to register services and checks from [service
definitions](/docs/discovery/services) and [check definitions](/docs/discovery/checks) found
in configuration files or in configuration fragments passed to the agent using the `-hcl`
flag.
If the `token` field is defined in the service or check definition, then that token is used to
register the service or check instead. If the `config_file_service_registration` token is not
defined and if the `token` field is not defined in the service or check definition, then the
agent uses the [`default`](#acl_tokens_default) token to register the service or check.
This token needs write permission to register all services and checks defined in this agent's
configuration. For example, if there are two service definitions in the agent's configuration
files for services "A" and "B", then the token needs `service:write` permissions for both
services "A" and "B" in order to successfully register both services. If the token is missing
`service:write` permissions for service "B", the agent will successfully register service "A"
and fail to register service "B". Failed registration requests are eventually retried as part
of [anti-entropy enforcement](/docs/architecture/anti-entropy). If a registration request is
failing due to missing permissions, the the token for this agent can be updated with
additional policy rules or the `config_file_service_registration` token can be replaced using
the [Set Agent Token](/commands/acl/set-agent-token) CLI command.
- `replication` ((#acl_tokens_replication)) - The ACL token used to
authorize secondary datacenters with the primary datacenter for replication
operations. This token is required for servers outside the [`primary_datacenter`](#primary_datacenter) when ACLs are enabled. This token may be provided later using the [agent token API](/api-docs/agent#update-acl-tokens) on each server. This token must have at least "read" permissions on ACL data but if ACL token replication is enabled then it must have "write" permissions. This also enables Connect replication, for which the token will require both operator "write" and intention "read" permissions for replicating CA and Intention data.