Remaining ACL Unit Tests (#4852)

* Add leader token upgrade test and fix various ACL enablement bugs

* Update the leader ACL initialization tests.

* Add a StateStore ACL tests for ACLTokenSet and ACLTokenGetBy* functions

* Advertise the agents acl support status with the agent/self endpoint.

* Make batch token upsert CAS’able to prevent consistency issues with token auto-upgrade

* Finish up the ACL state store token tests

* Finish the ACL state store unit tests

Also rename some things to make them more consistent.

* Do as much ACL replication testing as I can.
This commit is contained in:
Matt Keeler 2018-10-31 16:00:46 -04:00 committed by Jack Pearkes
parent d24a65eb8c
commit ec9934b6f8
23 changed files with 2299 additions and 880 deletions

View File

@ -126,7 +126,7 @@ func (s *HTTPServer) ACLRulesTranslateLegacyToken(resp http.ResponseWriter, req
return nil, BadRequestError{Reason: "Missing token ID"}
}
args := structs.ACLTokenReadRequest{
args := structs.ACLTokenGetRequest{
Datacenter: s.agent.config.Datacenter,
TokenID: tokenID,
TokenIDType: structs.ACLTokenAccessor,
@ -223,7 +223,7 @@ func (s *HTTPServer) ACLPolicyCRUD(resp http.ResponseWriter, req *http.Request)
}
func (s *HTTPServer) ACLPolicyRead(resp http.ResponseWriter, req *http.Request, policyID string) (interface{}, error) {
args := structs.ACLPolicyReadRequest{
args := structs.ACLPolicyGetRequest{
Datacenter: s.agent.config.Datacenter,
PolicyID: policyID,
}
@ -284,7 +284,7 @@ func fixCreateTimeAndHash(raw interface{}) error {
}
func (s *HTTPServer) ACLPolicyWrite(resp http.ResponseWriter, req *http.Request, policyID string) (interface{}, error) {
args := structs.ACLPolicyUpsertRequest{
args := structs.ACLPolicySetRequest{
Datacenter: s.agent.config.Datacenter,
}
s.parseToken(req, &args.Token)
@ -302,7 +302,7 @@ func (s *HTTPServer) ACLPolicyWrite(resp http.ResponseWriter, req *http.Request,
}
var out structs.ACLPolicy
if err := s.agent.RPC("ACL.PolicyUpsert", args, &out); err != nil {
if err := s.agent.RPC("ACL.PolicySet", args, &out); err != nil {
return nil, err
}
@ -361,10 +361,10 @@ func (s *HTTPServer) ACLTokenCRUD(resp http.ResponseWriter, req *http.Request) (
switch req.Method {
case "GET":
fn = s.ACLTokenRead
fn = s.ACLTokenGet
case "PUT":
fn = s.ACLTokenWrite
fn = s.ACLTokenSet
case "DELETE":
fn = s.ACLTokenDelete
@ -390,7 +390,7 @@ func (s *HTTPServer) ACLTokenSelf(resp http.ResponseWriter, req *http.Request) (
return nil, nil
}
args := structs.ACLTokenReadRequest{
args := structs.ACLTokenGetRequest{
TokenIDType: structs.ACLTokenSecret,
}
@ -423,11 +423,11 @@ func (s *HTTPServer) ACLTokenCreate(resp http.ResponseWriter, req *http.Request)
return nil, nil
}
return s.ACLTokenWrite(resp, req, "")
return s.ACLTokenSet(resp, req, "")
}
func (s *HTTPServer) ACLTokenRead(resp http.ResponseWriter, req *http.Request, tokenID string) (interface{}, error) {
args := structs.ACLTokenReadRequest{
func (s *HTTPServer) ACLTokenGet(resp http.ResponseWriter, req *http.Request, tokenID string) (interface{}, error) {
args := structs.ACLTokenGetRequest{
Datacenter: s.agent.config.Datacenter,
TokenID: tokenID,
TokenIDType: structs.ACLTokenAccessor,
@ -454,8 +454,8 @@ func (s *HTTPServer) ACLTokenRead(resp http.ResponseWriter, req *http.Request, t
return out.Token, nil
}
func (s *HTTPServer) ACLTokenWrite(resp http.ResponseWriter, req *http.Request, tokenID string) (interface{}, error) {
args := structs.ACLTokenUpsertRequest{
func (s *HTTPServer) ACLTokenSet(resp http.ResponseWriter, req *http.Request, tokenID string) (interface{}, error) {
args := structs.ACLTokenSetRequest{
Datacenter: s.agent.config.Datacenter,
}
s.parseToken(req, &args.Token)
@ -471,7 +471,7 @@ func (s *HTTPServer) ACLTokenWrite(resp http.ResponseWriter, req *http.Request,
}
var out structs.ACLToken
if err := s.agent.RPC("ACL.TokenUpsert", args, &out); err != nil {
if err := s.agent.RPC("ACL.TokenSet", args, &out); err != nil {
return nil, err
}
@ -497,7 +497,7 @@ func (s *HTTPServer) ACLTokenClone(resp http.ResponseWriter, req *http.Request,
return nil, nil
}
args := structs.ACLTokenUpsertRequest{
args := structs.ACLTokenSetRequest{
Datacenter: s.agent.config.Datacenter,
}

View File

@ -378,7 +378,7 @@ func (r *ACLResolver) fireAsyncTokenResult(token string, identity structs.ACLIde
}
func (r *ACLResolver) resolveIdentityFromTokenAsync(token string, cached *structs.IdentityCacheEntry) {
req := structs.ACLTokenReadRequest{
req := structs.ACLTokenGetRequest{
Datacenter: r.delegate.ACLDatacenter(false),
TokenID: token,
TokenIDType: structs.ACLTokenSecret,
@ -491,7 +491,7 @@ func (r *ACLResolver) fireAsyncPolicyResult(policyID string, policy *structs.ACL
}
func (r *ACLResolver) resolvePoliciesAsyncForIdentity(identity structs.ACLIdentity, policyIDs []string, cached map[string]*structs.PolicyCacheEntry) {
req := structs.ACLPolicyBatchReadRequest{
req := structs.ACLPolicyBatchGetRequest{
Datacenter: r.delegate.ACLDatacenter(false),
PolicyIDs: policyIDs,
QueryOptions: structs.QueryOptions{
@ -501,7 +501,7 @@ func (r *ACLResolver) resolvePoliciesAsyncForIdentity(identity structs.ACLIdenti
}
found := make(map[string]struct{})
var resp structs.ACLPoliciesResponse
var resp structs.ACLPolicyBatchResponse
err := r.delegate.RPC("ACL.PolicyResolve", &req, &resp)
if err == nil {
for _, policy := range resp.Policies {

View File

@ -175,7 +175,7 @@ func (a *ACL) BootstrapTokens(args *structs.DCSpecificRequest, reply *structs.AC
return nil
}
func (a *ACL) TokenRead(args *structs.ACLTokenReadRequest, reply *structs.ACLTokenResponse) error {
func (a *ACL) TokenRead(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
if err := a.aclPreCheck(); err != nil {
return err
}
@ -227,7 +227,7 @@ func (a *ACL) TokenRead(args *structs.ACLTokenReadRequest, reply *structs.ACLTok
})
}
func (a *ACL) TokenClone(args *structs.ACLTokenUpsertRequest, reply *structs.ACLToken) error {
func (a *ACL) TokenClone(args *structs.ACLTokenSetRequest, reply *structs.ACLToken) error {
if err := a.aclPreCheck(); err != nil {
return err
}
@ -265,7 +265,7 @@ func (a *ACL) TokenClone(args *structs.ACLTokenUpsertRequest, reply *structs.ACL
return fmt.Errorf("Cannot clone a legacy ACL with this endpoint")
}
cloneReq := structs.ACLTokenUpsertRequest{
cloneReq := structs.ACLTokenSetRequest{
Datacenter: args.Datacenter,
ACLToken: structs.ACLToken{
Policies: token.Policies,
@ -279,10 +279,10 @@ func (a *ACL) TokenClone(args *structs.ACLTokenUpsertRequest, reply *structs.ACL
cloneReq.ACLToken.Description = args.ACLToken.Description
}
return a.tokenUpsertInternal(&cloneReq, reply, false)
return a.tokenSetInternal(&cloneReq, reply, false)
}
func (a *ACL) TokenUpsert(args *structs.ACLTokenUpsertRequest, reply *structs.ACLToken) error {
func (a *ACL) TokenSet(args *structs.ACLTokenSetRequest, reply *structs.ACLToken) error {
if err := a.aclPreCheck(); err != nil {
return err
}
@ -294,7 +294,7 @@ func (a *ACL) TokenUpsert(args *structs.ACLTokenUpsertRequest, reply *structs.AC
return fmt.Errorf("Local tokens are disabled")
}
if done, err := a.srv.forward("ACL.TokenUpsert", args, args, reply); done {
if done, err := a.srv.forward("ACL.TokenSet", args, args, reply); done {
return err
}
@ -307,10 +307,10 @@ func (a *ACL) TokenUpsert(args *structs.ACLTokenUpsertRequest, reply *structs.AC
return acl.ErrPermissionDenied
}
return a.tokenUpsertInternal(args, reply, false)
return a.tokenSetInternal(args, reply, false)
}
func (a *ACL) tokenUpsertInternal(args *structs.ACLTokenUpsertRequest, reply *structs.ACLToken, upgrade bool) error {
func (a *ACL) tokenSetInternal(args *structs.ACLTokenSetRequest, reply *structs.ACLToken, upgrade bool) error {
token := &args.ACLToken
if !a.srv.LocalTokensEnabled() {
@ -420,12 +420,12 @@ func (a *ACL) tokenUpsertInternal(args *structs.ACLTokenUpsertRequest, reply *st
token.SetHash(true)
req := &structs.ACLTokenBatchUpsertRequest{
Tokens: structs.ACLTokens{token},
AllowCreate: true,
req := &structs.ACLTokenBatchSetRequest{
Tokens: structs.ACLTokens{token},
CAS: false,
}
resp, err := a.srv.raftApply(structs.ACLTokenUpsertRequestType, req)
resp, err := a.srv.raftApply(structs.ACLTokenSetRequestType, req)
if err != nil {
return fmt.Errorf("Failed to apply token write request: %v", err)
}
@ -553,7 +553,7 @@ func (a *ACL) TokenList(args *structs.ACLTokenListRequest, reply *structs.ACLTok
})
}
func (a *ACL) TokenBatchRead(args *structs.ACLTokenBatchReadRequest, reply *structs.ACLTokensResponse) error {
func (a *ACL) TokenBatchRead(args *structs.ACLTokenBatchGetRequest, reply *structs.ACLTokenBatchResponse) error {
if err := a.aclPreCheck(); err != nil {
return err
}
@ -575,7 +575,7 @@ func (a *ACL) TokenBatchRead(args *structs.ACLTokenBatchReadRequest, reply *stru
return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, tokens, err := state.ACLTokenBatchRead(ws, args.AccessorIDs)
index, tokens, err := state.ACLTokenBatchGet(ws, args.AccessorIDs)
if err != nil {
return err
}
@ -587,7 +587,7 @@ func (a *ACL) TokenBatchRead(args *structs.ACLTokenBatchReadRequest, reply *stru
})
}
func (a *ACL) PolicyRead(args *structs.ACLPolicyReadRequest, reply *structs.ACLPolicyResponse) error {
func (a *ACL) PolicyRead(args *structs.ACLPolicyGetRequest, reply *structs.ACLPolicyResponse) error {
if err := a.aclPreCheck(); err != nil {
return err
}
@ -615,7 +615,7 @@ func (a *ACL) PolicyRead(args *structs.ACLPolicyReadRequest, reply *structs.ACLP
})
}
func (a *ACL) PolicyBatchRead(args *structs.ACLPolicyBatchReadRequest, reply *structs.ACLPoliciesResponse) error {
func (a *ACL) PolicyBatchRead(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
if err := a.aclPreCheck(); err != nil {
return err
}
@ -632,7 +632,7 @@ func (a *ACL) PolicyBatchRead(args *structs.ACLPolicyBatchReadRequest, reply *st
return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, policies, err := state.ACLPolicyBatchRead(ws, args.PolicyIDs)
index, policies, err := state.ACLPolicyBatchGet(ws, args.PolicyIDs)
if err != nil {
return err
}
@ -642,7 +642,7 @@ func (a *ACL) PolicyBatchRead(args *structs.ACLPolicyBatchReadRequest, reply *st
})
}
func (a *ACL) PolicyUpsert(args *structs.ACLPolicyUpsertRequest, reply *structs.ACLPolicy) error {
func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPolicy) error {
if err := a.aclPreCheck(); err != nil {
return err
}
@ -651,7 +651,7 @@ func (a *ACL) PolicyUpsert(args *structs.ACLPolicyUpsertRequest, reply *structs.
args.Datacenter = a.srv.config.ACLDatacenter
}
if done, err := a.srv.forward("ACL.PolicyUpsert", args, args, reply); done {
if done, err := a.srv.forward("ACL.PolicySet", args, args, reply); done {
return err
}
@ -736,11 +736,11 @@ func (a *ACL) PolicyUpsert(args *structs.ACLPolicyUpsertRequest, reply *structs.
// calcualte the hash for this policy
policy.SetHash(true)
req := &structs.ACLPolicyBatchUpsertRequest{
req := &structs.ACLPolicyBatchSetRequest{
Policies: structs.ACLPolicies{policy},
}
resp, err := a.srv.raftApply(structs.ACLPolicyUpsertRequestType, req)
resp, err := a.srv.raftApply(structs.ACLPolicySetRequestType, req)
if err != nil {
return fmt.Errorf("Failed to apply policy upsert request: %v", err)
}
@ -837,7 +837,7 @@ func (a *ACL) PolicyList(args *structs.ACLPolicyListRequest, reply *structs.ACLP
return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, policies, err := state.ACLPolicyList(ws, args.DCScope)
index, policies, err := state.ACLPolicyList(ws)
if err != nil {
return err
}
@ -854,7 +854,7 @@ func (a *ACL) PolicyList(args *structs.ACLPolicyListRequest, reply *structs.ACLP
// PolicyResolve is used to retrieve a subset of the policies associated with a given token
// The policy ids in the args simply act as a filter on the policy set assigned to the token
func (a *ACL) PolicyResolve(args *structs.ACLPolicyBatchReadRequest, reply *structs.ACLPoliciesResponse) error {
func (a *ACL) PolicyResolve(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
if err := a.aclPreCheck(); err != nil {
return err
}

View File

@ -145,7 +145,7 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
defer metrics.MeasureSince([]string{"acl", "apply"}, time.Now())
// Verify we are allowed to serve this request
if a.srv.config.ACLDatacenter != a.srv.config.Datacenter {
if !a.srv.ACLsEnabled() {
return acl.ErrDisabled
}
@ -189,7 +189,7 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
}
// Verify we are allowed to serve this request
if a.srv.config.ACLDatacenter != a.srv.config.Datacenter {
if !a.srv.ACLsEnabled() {
return acl.ErrDisabled
}
@ -226,7 +226,7 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
}
// Verify we are allowed to serve this request
if a.srv.config.ACLDatacenter != a.srv.config.Datacenter {
if !a.srv.ACLsEnabled() {
return acl.ErrDisabled
}

View File

@ -648,7 +648,7 @@ func TestACLEndpoint_TokenRead(t *testing.T) {
// exists and matches what we created
{
req := structs.ACLTokenReadRequest{
req := structs.ACLTokenGetRequest{
Datacenter: "dc1",
TokenID: token.AccessorID,
TokenIDType: structs.ACLTokenAccessor,
@ -670,7 +670,7 @@ func TestACLEndpoint_TokenRead(t *testing.T) {
fakeID, err := uuid.GenerateUUID()
assert.NoError(err)
req := structs.ACLTokenReadRequest{
req := structs.ACLTokenGetRequest{
Datacenter: "dc1",
TokenID: fakeID,
TokenIDType: structs.ACLTokenAccessor,
@ -686,7 +686,7 @@ func TestACLEndpoint_TokenRead(t *testing.T) {
// validates ID format
{
req := structs.ACLTokenReadRequest{
req := structs.ACLTokenGetRequest{
Datacenter: "dc1",
TokenID: "definitely-really-certainly-not-a-uuid",
TokenIDType: structs.ACLTokenAccessor,
@ -722,7 +722,7 @@ func TestACLEndpoint_TokenClone(t *testing.T) {
acl := ACL{srv: s1}
req := structs.ACLTokenUpsertRequest{
req := structs.ACLTokenSetRequest{
Datacenter: "dc1",
ACLToken: structs.ACLToken{AccessorID: t1.AccessorID},
WriteRequest: structs.WriteRequest{Token: "root"},
@ -741,7 +741,7 @@ func TestACLEndpoint_TokenClone(t *testing.T) {
assert.NotEqual(t1.SecretID, t2.SecretID)
}
func TestACLEndpoint_TokenUpsert(t *testing.T) {
func TestACLEndpoint_TokenSet(t *testing.T) {
t.Parallel()
assert := assert.New(t)
@ -762,7 +762,7 @@ func TestACLEndpoint_TokenUpsert(t *testing.T) {
// Create it
{
req := structs.ACLTokenUpsertRequest{
req := structs.ACLTokenSetRequest{
Datacenter: "dc1",
ACLToken: structs.ACLToken{
Description: "foobar",
@ -774,7 +774,7 @@ func TestACLEndpoint_TokenUpsert(t *testing.T) {
resp := structs.ACLToken{}
err := acl.TokenUpsert(&req, &resp)
err := acl.TokenSet(&req, &resp)
assert.NoError(err)
// Get the token directly to validate that it exists
@ -790,7 +790,7 @@ func TestACLEndpoint_TokenUpsert(t *testing.T) {
}
// Update it
{
req := structs.ACLTokenUpsertRequest{
req := structs.ACLTokenSetRequest{
Datacenter: "dc1",
ACLToken: structs.ACLToken{
Description: "new-description",
@ -801,7 +801,7 @@ func TestACLEndpoint_TokenUpsert(t *testing.T) {
resp := structs.ACLToken{}
err := acl.TokenUpsert(&req, &resp)
err := acl.TokenSet(&req, &resp)
assert.NoError(err)
// Get the token directly to validate that it exists
@ -814,7 +814,7 @@ func TestACLEndpoint_TokenUpsert(t *testing.T) {
assert.Equal(token.AccessorID, resp.AccessorID)
}
}
func TestACLEndpoint_TokenUpsert_anon(t *testing.T) {
func TestACLEndpoint_TokenSet_anon(t *testing.T) {
t.Parallel()
assert := assert.New(t)
@ -835,7 +835,7 @@ func TestACLEndpoint_TokenUpsert_anon(t *testing.T) {
acl := ACL{srv: s1}
// Assign the policies to a token
tokenUpsertReq := structs.ACLTokenUpsertRequest{
tokenUpsertReq := structs.ACLTokenSetRequest{
Datacenter: "dc1",
ACLToken: structs.ACLToken{
AccessorID: structs.ACLTokenAnonymousID,
@ -848,7 +848,7 @@ func TestACLEndpoint_TokenUpsert_anon(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
token := structs.ACLToken{}
err = acl.TokenUpsert(&tokenUpsertReq, &token)
err = acl.TokenSet(&tokenUpsertReq, &token)
assert.NoError(err)
assert.NotEmpty(token.SecretID)
@ -1021,13 +1021,13 @@ func TestACLEndpoint_TokenBatchRead(t *testing.T) {
acl := ACL{srv: s1}
tokens := []string{t1.AccessorID, t2.AccessorID}
req := structs.ACLTokenBatchReadRequest{
req := structs.ACLTokenBatchGetRequest{
Datacenter: "dc1",
AccessorIDs: tokens,
QueryOptions: structs.QueryOptions{Token: "root"},
}
resp := structs.ACLTokensResponse{}
resp := structs.ACLTokenBatchResponse{}
err = acl.TokenBatchRead(&req, &resp)
assert.NoError(err)
@ -1061,7 +1061,7 @@ func TestACLEndpoint_PolicyRead(t *testing.T) {
acl := ACL{srv: s1}
req := structs.ACLPolicyReadRequest{
req := structs.ACLPolicyGetRequest{
Datacenter: "dc1",
PolicyID: policy.ID,
QueryOptions: structs.QueryOptions{Token: "root"},
@ -1104,13 +1104,13 @@ func TestACLEndpoint_PolicyBatchRead(t *testing.T) {
acl := ACL{srv: s1}
tokens := []string{t1.AccessorID, t2.AccessorID}
req := structs.ACLTokenBatchReadRequest{
req := structs.ACLTokenBatchGetRequest{
Datacenter: "dc1",
AccessorIDs: tokens,
QueryOptions: structs.QueryOptions{Token: "root"},
}
resp := structs.ACLTokensResponse{}
resp := structs.ACLTokenBatchResponse{}
err = acl.TokenBatchRead(&req, &resp)
assert.NoError(err)
@ -1123,7 +1123,7 @@ func TestACLEndpoint_PolicyBatchRead(t *testing.T) {
assert.EqualValues(retrievedTokens, tokens)
}
func TestACLEndpoint_PolicyUpsert(t *testing.T) {
func TestACLEndpoint_PolicySet(t *testing.T) {
t.Parallel()
assert := assert.New(t)
@ -1144,7 +1144,7 @@ func TestACLEndpoint_PolicyUpsert(t *testing.T) {
// Create it
{
req := structs.ACLPolicyUpsertRequest{
req := structs.ACLPolicySetRequest{
Datacenter: "dc1",
Policy: structs.ACLPolicy{
Description: "foobar",
@ -1155,7 +1155,7 @@ func TestACLEndpoint_PolicyUpsert(t *testing.T) {
}
resp := structs.ACLPolicy{}
err := acl.PolicyUpsert(&req, &resp)
err := acl.PolicySet(&req, &resp)
assert.NoError(err)
assert.NotNil(resp.ID)
@ -1174,7 +1174,7 @@ func TestACLEndpoint_PolicyUpsert(t *testing.T) {
// Update it
{
req := structs.ACLPolicyUpsertRequest{
req := structs.ACLPolicySetRequest{
Datacenter: "dc1",
Policy: structs.ACLPolicy{
ID: policyID,
@ -1186,7 +1186,7 @@ func TestACLEndpoint_PolicyUpsert(t *testing.T) {
}
resp := structs.ACLPolicy{}
err := acl.PolicyUpsert(&req, &resp)
err := acl.PolicySet(&req, &resp)
assert.NoError(err)
assert.NotNil(resp.ID)
@ -1202,7 +1202,7 @@ func TestACLEndpoint_PolicyUpsert(t *testing.T) {
}
}
func TestACLEndpoint_PolicyUpsert_globalManagement(t *testing.T) {
func TestACLEndpoint_PolicySet_globalManagement(t *testing.T) {
t.Parallel()
assert := assert.New(t)
@ -1223,7 +1223,7 @@ func TestACLEndpoint_PolicyUpsert_globalManagement(t *testing.T) {
// Can't change the rules
{
req := structs.ACLPolicyUpsertRequest{
req := structs.ACLPolicySetRequest{
Datacenter: "dc1",
Policy: structs.ACLPolicy{
ID: structs.ACLPolicyGlobalManagementID,
@ -1234,13 +1234,13 @@ func TestACLEndpoint_PolicyUpsert_globalManagement(t *testing.T) {
}
resp := structs.ACLPolicy{}
err := acl.PolicyUpsert(&req, &resp)
err := acl.PolicySet(&req, &resp)
assert.EqualError(err, "Changing the Rules for the builtin global-management policy is not permitted")
}
// Can rename it
{
req := structs.ACLPolicyUpsertRequest{
req := structs.ACLPolicySetRequest{
Datacenter: "dc1",
Policy: structs.ACLPolicy{
ID: structs.ACLPolicyGlobalManagementID,
@ -1251,7 +1251,7 @@ func TestACLEndpoint_PolicyUpsert_globalManagement(t *testing.T) {
}
resp := structs.ACLPolicy{}
err := acl.PolicyUpsert(&req, &resp)
err := acl.PolicySet(&req, &resp)
assert.NoError(err)
// Get the policy again
@ -1404,7 +1404,7 @@ func TestACLEndpoint_PolicyResolve(t *testing.T) {
policies := []string{p1.ID, p2.ID}
// Assign the policies to a token
tokenUpsertReq := structs.ACLTokenUpsertRequest{
tokenUpsertReq := structs.ACLTokenSetRequest{
Datacenter: "dc1",
ACLToken: structs.ACLToken{
Policies: []structs.ACLTokenPolicyLink{
@ -1419,12 +1419,12 @@ func TestACLEndpoint_PolicyResolve(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
token := structs.ACLToken{}
err = acl.TokenUpsert(&tokenUpsertReq, &token)
err = acl.TokenSet(&tokenUpsertReq, &token)
assert.NoError(err)
assert.NotEmpty(token.SecretID)
resp := structs.ACLPoliciesResponse{}
req := structs.ACLPolicyBatchReadRequest{
resp := structs.ACLPolicyBatchResponse{}
req := structs.ACLPolicyBatchGetRequest{
Datacenter: "dc1",
PolicyIDs: []string{p1.ID, p2.ID},
QueryOptions: structs.QueryOptions{Token: token.SecretID},
@ -1442,7 +1442,7 @@ func TestACLEndpoint_PolicyResolve(t *testing.T) {
// upsertTestToken creates a token for testing purposes
func upsertTestToken(codec rpc.ClientCodec, masterToken string, datacenter string) (*structs.ACLToken, error) {
arg := structs.ACLTokenUpsertRequest{
arg := structs.ACLTokenSetRequest{
Datacenter: datacenter,
ACLToken: structs.ACLToken{
Description: "User token",
@ -1454,7 +1454,7 @@ func upsertTestToken(codec rpc.ClientCodec, masterToken string, datacenter strin
var out structs.ACLToken
err := msgpackrpc.CallWithCodec(codec, "ACL.TokenUpsert", &arg, &out)
err := msgpackrpc.CallWithCodec(codec, "ACL.TokenSet", &arg, &out)
if err != nil {
return nil, err
@ -1469,7 +1469,7 @@ func upsertTestToken(codec rpc.ClientCodec, masterToken string, datacenter strin
// retrieveTestToken returns a policy for testing purposes
func retrieveTestToken(codec rpc.ClientCodec, masterToken string, datacenter string, id string) (*structs.ACLTokenResponse, error) {
arg := structs.ACLTokenReadRequest{
arg := structs.ACLTokenGetRequest{
Datacenter: datacenter,
TokenID: id,
TokenIDType: structs.ACLTokenAccessor,
@ -1495,7 +1495,7 @@ func upsertTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter stri
return nil, err
}
arg := structs.ACLPolicyUpsertRequest{
arg := structs.ACLPolicySetRequest{
Datacenter: datacenter,
Policy: structs.ACLPolicy{
Name: fmt.Sprintf("test-policy-%s", policyUnq),
@ -1505,7 +1505,7 @@ func upsertTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter stri
var out structs.ACLPolicy
err = msgpackrpc.CallWithCodec(codec, "ACL.PolicyUpsert", &arg, &out)
err = msgpackrpc.CallWithCodec(codec, "ACL.PolicySet", &arg, &out)
if err != nil {
return nil, err
@ -1520,7 +1520,7 @@ func upsertTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter stri
// retrieveTestPolicy returns a policy for testing purposes
func retrieveTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter string, id string) (*structs.ACLPolicyResponse, error) {
arg := structs.ACLPolicyReadRequest{
arg := structs.ACLPolicyGetRequest{
Datacenter: datacenter,
PolicyID: id,
QueryOptions: structs.QueryOptions{Token: masterToken},

View File

@ -105,11 +105,11 @@ func (s *Server) updateLocalACLPolicies(policies structs.ACLPolicies, ctx contex
batchSize += policies[batchEnd].EstimateSize()
}
req := structs.ACLPolicyBatchUpsertRequest{
req := structs.ACLPolicyBatchSetRequest{
Policies: policies[batchStart:batchEnd],
}
resp, err := s.raftApply(structs.ACLPolicyUpsertRequestType, &req)
resp, err := s.raftApply(structs.ACLPolicySetRequestType, &req)
if err != nil {
return false, fmt.Errorf("Failed to apply policy upserts: %v", err)
}
@ -134,8 +134,8 @@ func (s *Server) updateLocalACLPolicies(policies structs.ACLPolicies, ctx contex
return false, nil
}
func (s *Server) fetchACLPoliciesBatch(policyIDs []string) (*structs.ACLPoliciesResponse, error) {
req := structs.ACLPolicyBatchReadRequest{
func (s *Server) fetchACLPoliciesBatch(policyIDs []string) (*structs.ACLPolicyBatchResponse, error) {
req := structs.ACLPolicyBatchGetRequest{
Datacenter: s.config.ACLDatacenter,
PolicyIDs: policyIDs,
QueryOptions: structs.QueryOptions{
@ -144,7 +144,7 @@ func (s *Server) fetchACLPoliciesBatch(policyIDs []string) (*structs.ACLPolicies
},
}
var response structs.ACLPoliciesResponse
var response structs.ACLPolicyBatchResponse
if err := s.RPC("ACL.PolicyBatchRead", &req, &response); err != nil {
return nil, err
}
@ -261,12 +261,12 @@ func (s *Server) updateLocalACLTokens(tokens structs.ACLTokens, ctx context.Cont
batchSize += tokens[batchEnd].EstimateSize()
}
req := structs.ACLTokenBatchUpsertRequest{
Tokens: tokens[batchStart:batchEnd],
AllowCreate: true,
req := structs.ACLTokenBatchSetRequest{
Tokens: tokens[batchStart:batchEnd],
CAS: false,
}
resp, err := s.raftApply(structs.ACLTokenUpsertRequestType, &req)
resp, err := s.raftApply(structs.ACLTokenSetRequestType, &req)
if err != nil {
return false, fmt.Errorf("Failed to apply token upserts: %v", err)
}
@ -292,8 +292,8 @@ func (s *Server) updateLocalACLTokens(tokens structs.ACLTokens, ctx context.Cont
return false, nil
}
func (s *Server) fetchACLTokensBatch(tokenIDs []string) (*structs.ACLTokensResponse, error) {
req := structs.ACLTokenBatchReadRequest{
func (s *Server) fetchACLTokensBatch(tokenIDs []string) (*structs.ACLTokenBatchResponse, error) {
req := structs.ACLTokenBatchGetRequest{
Datacenter: s.config.ACLDatacenter,
AccessorIDs: tokenIDs,
QueryOptions: structs.QueryOptions{
@ -302,7 +302,7 @@ func (s *Server) fetchACLTokensBatch(tokenIDs []string) (*structs.ACLTokensRespo
},
}
var response structs.ACLTokensResponse
var response structs.ACLTokenBatchResponse
if err := s.RPC("ACL.TokenBatchRead", &req, &response); err != nil {
return nil, err
}
@ -354,7 +354,7 @@ func (s *Server) replicateACLPolicies(lastRemoteIndex uint64, ctx context.Contex
// replication process is.
defer metrics.MeasureSince([]string{"leader", "replication", "acl", "policy", "apply"}, time.Now())
_, local, err := s.fsm.State().ACLPolicyList(nil, "")
_, local, err := s.fsm.State().ACLPolicyList(nil)
if err != nil {
return 0, false, fmt.Errorf("failed to retrieve local ACL policies: %v", err)
}
@ -374,7 +374,7 @@ func (s *Server) replicateACLPolicies(lastRemoteIndex uint64, ctx context.Contex
s.logger.Printf("[DEBUG] acl: policy replication - deletions: %d, updates: %d", len(deletions), len(updates))
var policies *structs.ACLPoliciesResponse
var policies *structs.ACLPolicyBatchResponse
if len(updates) > 0 {
policies, err = s.fetchACLPoliciesBatch(updates)
if err != nil {
@ -456,7 +456,7 @@ func (s *Server) replicateACLTokens(lastRemoteIndex uint64, ctx context.Context)
deletions, updates := diffACLTokens(local, remote.Tokens, lastRemoteIndex)
s.logger.Printf("[DEBUG] acl: token replication - deletions: %d, updates: %d", len(deletions), len(updates))
var tokens *structs.ACLTokensResponse
var tokens *structs.ACLTokenBatchResponse
if len(updates) > 0 {
tokens, err = s.fetchACLTokensBatch(updates)
if err != nil {

View File

@ -0,0 +1,467 @@
package consul
import (
"bytes"
"context"
"fmt"
"os"
"reflect"
"sort"
"strconv"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/testutil/retry"
)
func TestACLReplication_Sorter(t *testing.T) {
t.Parallel()
acls := structs.ACLs{
&structs.ACL{ID: "a"},
&structs.ACL{ID: "b"},
&structs.ACL{ID: "c"},
}
sorter := &aclIterator{acls, 0}
if len := sorter.Len(); len != 3 {
t.Fatalf("bad: %d", len)
}
if !sorter.Less(0, 1) {
t.Fatalf("should be less")
}
if sorter.Less(1, 0) {
t.Fatalf("should not be less")
}
if !sort.IsSorted(sorter) {
t.Fatalf("should be sorted")
}
expected := structs.ACLs{
&structs.ACL{ID: "b"},
&structs.ACL{ID: "a"},
&structs.ACL{ID: "c"},
}
sorter.Swap(0, 1)
if !reflect.DeepEqual(acls, expected) {
t.Fatalf("bad: %v", acls)
}
if sort.IsSorted(sorter) {
t.Fatalf("should not be sorted")
}
sort.Sort(sorter)
if !sort.IsSorted(sorter) {
t.Fatalf("should be sorted")
}
}
func TestACLReplication_Iterator(t *testing.T) {
t.Parallel()
acls := structs.ACLs{}
iter := newACLIterator(acls)
if front := iter.Front(); front != nil {
t.Fatalf("bad: %v", front)
}
iter.Next()
if front := iter.Front(); front != nil {
t.Fatalf("bad: %v", front)
}
acls = structs.ACLs{
&structs.ACL{ID: "a"},
&structs.ACL{ID: "b"},
&structs.ACL{ID: "c"},
}
iter = newACLIterator(acls)
if front := iter.Front(); front != acls[0] {
t.Fatalf("bad: %v", front)
}
iter.Next()
if front := iter.Front(); front != acls[1] {
t.Fatalf("bad: %v", front)
}
iter.Next()
if front := iter.Front(); front != acls[2] {
t.Fatalf("bad: %v", front)
}
iter.Next()
if front := iter.Front(); front != nil {
t.Fatalf("bad: %v", front)
}
}
func TestACLReplication_reconcileACLs(t *testing.T) {
t.Parallel()
parseACLs := func(raw string) structs.ACLs {
var acls structs.ACLs
for _, key := range strings.Split(raw, "|") {
if len(key) == 0 {
continue
}
tuple := strings.Split(key, ":")
index, err := strconv.Atoi(tuple[1])
if err != nil {
t.Fatalf("err: %v", err)
}
acl := &structs.ACL{
ID: tuple[0],
Rules: tuple[2],
RaftIndex: structs.RaftIndex{
ModifyIndex: uint64(index),
},
}
acls = append(acls, acl)
}
return acls
}
parseChanges := func(changes structs.ACLRequests) string {
var ret string
for i, change := range changes {
if i > 0 {
ret += "|"
}
ret += fmt.Sprintf("%s:%s:%s", change.Op, change.ACL.ID, change.ACL.Rules)
}
return ret
}
tests := []struct {
local string
remote string
lastRemoteIndex uint64
expected string
}{
// Everything empty.
{
local: "",
remote: "",
lastRemoteIndex: 0,
expected: "",
},
// First time with empty local.
{
local: "",
remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
lastRemoteIndex: 0,
expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X",
},
// Remote not sorted.
{
local: "",
remote: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X",
lastRemoteIndex: 0,
expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X",
},
// Neither side sorted.
{
local: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X",
remote: "ccc:9:X|bbb:3:X|ddd:2:X|eee:11:X",
lastRemoteIndex: 0,
expected: "",
},
// Fully replicated, nothing to do.
{
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
lastRemoteIndex: 0,
expected: "",
},
// Change an ACL.
{
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X",
lastRemoteIndex: 0,
expected: "set:ccc:Y",
},
// Change an ACL, but mask the change by the last replicated
// index. This isn't how things work normally, but it proves
// we are skipping the full compare based on the index.
{
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X",
lastRemoteIndex: 33,
expected: "",
},
// Empty everything out.
{
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "",
lastRemoteIndex: 0,
expected: "delete:bbb:X|delete:ccc:X|delete:ddd:X|delete:eee:X",
},
// Adds on the ends and in the middle.
{
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "aaa:99:X|bbb:3:X|ccc:9:X|ccx:101:X|ddd:2:X|eee:11:X|fff:102:X",
lastRemoteIndex: 0,
expected: "set:aaa:X|set:ccx:X|set:fff:X",
},
// Deletes on the ends and in the middle.
{
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "ccc:9:X",
lastRemoteIndex: 0,
expected: "delete:bbb:X|delete:ddd:X|delete:eee:X",
},
// Everything.
{
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "aaa:99:X|bbb:3:X|ccx:101:X|ddd:103:Y|eee:11:X|fff:102:X",
lastRemoteIndex: 11,
expected: "set:aaa:X|delete:ccc:X|set:ccx:X|set:ddd:Y|set:fff:X",
},
}
for i, test := range tests {
local, remote := parseACLs(test.local), parseACLs(test.remote)
changes := reconcileLegacyACLs(local, remote, test.lastRemoteIndex)
if actual := parseChanges(changes); actual != test.expected {
t.Errorf("test case %d failed: %s", i, actual)
}
}
}
func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLReplicationApplyLimit = 1
})
s1.tokens.UpdateACLReplicationToken("secret")
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc2")
changes := structs.ACLRequests{
&structs.ACLRequest{
Op: structs.ACLSet,
ACL: structs.ACL{
ID: "secret",
Type: "client",
},
},
}
// Should be throttled to 1 Hz.
start := time.Now()
if _, err := s1.updateLocalLegacyACLs(changes, context.Background()); err != nil {
t.Fatalf("err: %v", err)
}
if dur := time.Since(start); dur < time.Second {
t.Fatalf("too slow: %9.6f", dur.Seconds())
}
changes = append(changes,
&structs.ACLRequest{
Op: structs.ACLSet,
ACL: structs.ACL{
ID: "secret",
Type: "client",
},
})
// Should be throttled to 1 Hz.
start = time.Now()
if _, err := s1.updateLocalLegacyACLs(changes, context.Background()); err != nil {
t.Fatalf("err: %v", err)
}
if dur := time.Since(start); dur < 2*time.Second {
t.Fatalf("too fast: %9.6f", dur.Seconds())
}
}
func TestACLReplication_IsACLReplicationEnabled(t *testing.T) {
t.Parallel()
// ACLs not enabled.
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = ""
c.ACLsEnabled = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
if s1.IsACLReplicationEnabled() {
t.Fatalf("should not be enabled")
}
// ACLs enabled but not replication.
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc2")
if s2.IsACLReplicationEnabled() {
t.Fatalf("should not be enabled")
}
// ACLs enabled with replication.
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLTokenReplication = true
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
testrpc.WaitForLeader(t, s3.RPC, "dc2")
if !s3.IsACLReplicationEnabled() {
t.Fatalf("should be enabled")
}
// ACLs enabled with replication, but inside the ACL datacenter
// so replication should be disabled.
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLTokenReplication = true
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
testrpc.WaitForLeader(t, s4.RPC, "dc1")
if s4.IsACLReplicationEnabled() {
t.Fatalf("should not be enabled")
}
}
func TestACLReplication_LegacyTokens(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
client := rpcClient(t, s1)
defer client.Close()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLTokenReplication = true
c.ACLReplicationRate = 100
c.ACLReplicationBurst = 100
c.ACLReplicationApplyLimit = 1000000
})
s2.tokens.UpdateACLReplicationToken("root")
testrpc.WaitForLeader(t, s2.RPC, "dc2")
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join.
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
// Create a bunch of new tokens.
var id string
for i := 0; i < 50; i++ {
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTokenTypeClient,
Rules: testACLPolicy,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
checkSame := func() error {
index, remote, err := s1.fsm.State().ACLTokenList(nil, true, true, "")
if err != nil {
return err
}
_, local, err := s2.fsm.State().ACLTokenList(nil, true, true, "")
if err != nil {
return err
}
if got, want := len(remote), len(local); got != want {
return fmt.Errorf("got %d remote ACLs want %d", got, want)
}
for i, token := range remote {
if !bytes.Equal(token.Hash, local[i].Hash) {
return fmt.Errorf("ACLs differ")
}
}
var status structs.ACLReplicationStatus
s2.aclReplicationStatusLock.RLock()
status = s2.aclReplicationStatus
s2.aclReplicationStatusLock.RUnlock()
if !status.Enabled || !status.Running ||
status.ReplicatedTokenIndex != index ||
status.SourceDatacenter != "dc1" {
return fmt.Errorf("ACL replication status differs")
}
return nil
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
if err := checkSame(); err != nil {
r.Fatal(err)
}
})
// Create more new tokens.
for i := 0; i < 50; i++ {
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTokenTypeClient,
Rules: testACLPolicy,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var dontCare string
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
t.Fatalf("err: %v", err)
}
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
if err := checkSame(); err != nil {
r.Fatal(err)
}
})
// Delete a token.
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLDelete,
ACL: structs.ACL{
ID: id,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var dontCare string
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
t.Fatalf("err: %v", err)
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
if err := checkSame(); err != nil {
r.Fatal(err)
}
})
}

View File

@ -1,16 +1,9 @@
package consul
import (
"bytes"
"context"
"fmt"
"os"
"reflect"
"sort"
"strconv"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
@ -19,455 +12,6 @@ import (
"github.com/stretchr/testify/require"
)
func TestACLReplication_Sorter(t *testing.T) {
t.Parallel()
acls := structs.ACLs{
&structs.ACL{ID: "a"},
&structs.ACL{ID: "b"},
&structs.ACL{ID: "c"},
}
sorter := &aclIterator{acls, 0}
if len := sorter.Len(); len != 3 {
t.Fatalf("bad: %d", len)
}
if !sorter.Less(0, 1) {
t.Fatalf("should be less")
}
if sorter.Less(1, 0) {
t.Fatalf("should not be less")
}
if !sort.IsSorted(sorter) {
t.Fatalf("should be sorted")
}
expected := structs.ACLs{
&structs.ACL{ID: "b"},
&structs.ACL{ID: "a"},
&structs.ACL{ID: "c"},
}
sorter.Swap(0, 1)
if !reflect.DeepEqual(acls, expected) {
t.Fatalf("bad: %v", acls)
}
if sort.IsSorted(sorter) {
t.Fatalf("should not be sorted")
}
sort.Sort(sorter)
if !sort.IsSorted(sorter) {
t.Fatalf("should be sorted")
}
}
func TestACLReplication_Iterator(t *testing.T) {
t.Parallel()
acls := structs.ACLs{}
iter := newACLIterator(acls)
if front := iter.Front(); front != nil {
t.Fatalf("bad: %v", front)
}
iter.Next()
if front := iter.Front(); front != nil {
t.Fatalf("bad: %v", front)
}
acls = structs.ACLs{
&structs.ACL{ID: "a"},
&structs.ACL{ID: "b"},
&structs.ACL{ID: "c"},
}
iter = newACLIterator(acls)
if front := iter.Front(); front != acls[0] {
t.Fatalf("bad: %v", front)
}
iter.Next()
if front := iter.Front(); front != acls[1] {
t.Fatalf("bad: %v", front)
}
iter.Next()
if front := iter.Front(); front != acls[2] {
t.Fatalf("bad: %v", front)
}
iter.Next()
if front := iter.Front(); front != nil {
t.Fatalf("bad: %v", front)
}
}
func TestACLReplication_reconcileACLs(t *testing.T) {
t.Parallel()
parseACLs := func(raw string) structs.ACLs {
var acls structs.ACLs
for _, key := range strings.Split(raw, "|") {
if len(key) == 0 {
continue
}
tuple := strings.Split(key, ":")
index, err := strconv.Atoi(tuple[1])
if err != nil {
t.Fatalf("err: %v", err)
}
acl := &structs.ACL{
ID: tuple[0],
Rules: tuple[2],
RaftIndex: structs.RaftIndex{
ModifyIndex: uint64(index),
},
}
acls = append(acls, acl)
}
return acls
}
parseChanges := func(changes structs.ACLRequests) string {
var ret string
for i, change := range changes {
if i > 0 {
ret += "|"
}
ret += fmt.Sprintf("%s:%s:%s", change.Op, change.ACL.ID, change.ACL.Rules)
}
return ret
}
tests := []struct {
local string
remote string
lastRemoteIndex uint64
expected string
}{
// Everything empty.
{
local: "",
remote: "",
lastRemoteIndex: 0,
expected: "",
},
// First time with empty local.
{
local: "",
remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
lastRemoteIndex: 0,
expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X",
},
// Remote not sorted.
{
local: "",
remote: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X",
lastRemoteIndex: 0,
expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X",
},
// Neither side sorted.
{
local: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X",
remote: "ccc:9:X|bbb:3:X|ddd:2:X|eee:11:X",
lastRemoteIndex: 0,
expected: "",
},
// Fully replicated, nothing to do.
{
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
lastRemoteIndex: 0,
expected: "",
},
// Change an ACL.
{
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X",
lastRemoteIndex: 0,
expected: "set:ccc:Y",
},
// Change an ACL, but mask the change by the last replicated
// index. This isn't how things work normally, but it proves
// we are skipping the full compare based on the index.
{
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X",
lastRemoteIndex: 33,
expected: "",
},
// Empty everything out.
{
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "",
lastRemoteIndex: 0,
expected: "delete:bbb:X|delete:ccc:X|delete:ddd:X|delete:eee:X",
},
// Adds on the ends and in the middle.
{
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "aaa:99:X|bbb:3:X|ccc:9:X|ccx:101:X|ddd:2:X|eee:11:X|fff:102:X",
lastRemoteIndex: 0,
expected: "set:aaa:X|set:ccx:X|set:fff:X",
},
// Deletes on the ends and in the middle.
{
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "ccc:9:X",
lastRemoteIndex: 0,
expected: "delete:bbb:X|delete:ddd:X|delete:eee:X",
},
// Everything.
{
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "aaa:99:X|bbb:3:X|ccx:101:X|ddd:103:Y|eee:11:X|fff:102:X",
lastRemoteIndex: 11,
expected: "set:aaa:X|delete:ccc:X|set:ccx:X|set:ddd:Y|set:fff:X",
},
}
for i, test := range tests {
local, remote := parseACLs(test.local), parseACLs(test.remote)
changes := reconcileLegacyACLs(local, remote, test.lastRemoteIndex)
if actual := parseChanges(changes); actual != test.expected {
t.Errorf("test case %d failed: %s", i, actual)
}
}
}
func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLReplicationApplyLimit = 1
})
s1.tokens.UpdateACLReplicationToken("secret")
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc2")
changes := structs.ACLRequests{
&structs.ACLRequest{
Op: structs.ACLSet,
ACL: structs.ACL{
ID: "secret",
Type: "client",
},
},
}
// Should be throttled to 1 Hz.
start := time.Now()
if _, err := s1.updateLocalLegacyACLs(changes, context.Background()); err != nil {
t.Fatalf("err: %v", err)
}
if dur := time.Since(start); dur < time.Second {
t.Fatalf("too slow: %9.6f", dur.Seconds())
}
changes = append(changes,
&structs.ACLRequest{
Op: structs.ACLSet,
ACL: structs.ACL{
ID: "secret",
Type: "client",
},
})
// Should be throttled to 1 Hz.
start = time.Now()
if _, err := s1.updateLocalLegacyACLs(changes, context.Background()); err != nil {
t.Fatalf("err: %v", err)
}
if dur := time.Since(start); dur < 2*time.Second {
t.Fatalf("too fast: %9.6f", dur.Seconds())
}
}
func TestACLReplication_IsACLReplicationEnabled(t *testing.T) {
t.Parallel()
// ACLs not enabled.
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = ""
c.ACLsEnabled = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
if s1.IsACLReplicationEnabled() {
t.Fatalf("should not be enabled")
}
// ACLs enabled but not replication.
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc2")
if s2.IsACLReplicationEnabled() {
t.Fatalf("should not be enabled")
}
// ACLs enabled with replication.
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLTokenReplication = true
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
testrpc.WaitForLeader(t, s3.RPC, "dc2")
if !s3.IsACLReplicationEnabled() {
t.Fatalf("should be enabled")
}
// ACLs enabled with replication, but inside the ACL datacenter
// so replication should be disabled.
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLTokenReplication = true
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
testrpc.WaitForLeader(t, s4.RPC, "dc1")
if s4.IsACLReplicationEnabled() {
t.Fatalf("should not be enabled")
}
}
func TestACLReplication(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
client := rpcClient(t, s1)
defer client.Close()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLTokenReplication = true
c.ACLReplicationRate = 100
c.ACLReplicationBurst = 100
c.ACLReplicationApplyLimit = 1000000
})
s2.tokens.UpdateACLReplicationToken("root")
testrpc.WaitForLeader(t, s2.RPC, "dc2")
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join.
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
// Create a bunch of new tokens.
var id string
for i := 0; i < 50; i++ {
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTokenTypeClient,
Rules: testACLPolicy,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
checkSame := func() error {
index, remote, err := s1.fsm.State().ACLTokenList(nil, true, true, "")
if err != nil {
return err
}
_, local, err := s2.fsm.State().ACLTokenList(nil, true, true, "")
if err != nil {
return err
}
if got, want := len(remote), len(local); got != want {
return fmt.Errorf("got %d remote ACLs want %d", got, want)
}
for i, token := range remote {
if !bytes.Equal(token.Hash, local[i].Hash) {
return fmt.Errorf("ACLs differ")
}
}
var status structs.ACLReplicationStatus
s2.aclReplicationStatusLock.RLock()
status = s2.aclReplicationStatus
s2.aclReplicationStatusLock.RUnlock()
if !status.Enabled || !status.Running ||
status.ReplicatedTokenIndex != index ||
status.SourceDatacenter != "dc1" {
return fmt.Errorf("ACL replication status differs")
}
return nil
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
if err := checkSame(); err != nil {
r.Fatal(err)
}
})
// Create more new tokens.
for i := 0; i < 50; i++ {
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTokenTypeClient,
Rules: testACLPolicy,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var dontCare string
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
t.Fatalf("err: %v", err)
}
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
if err := checkSame(); err != nil {
r.Fatal(err)
}
})
// Delete a token.
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLDelete,
ACL: structs.ACL{
ID: id,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var dontCare string
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
t.Fatalf("err: %v", err)
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
if err := checkSame(); err != nil {
r.Fatal(err)
}
})
}
func TestACLReplication_diffACLPolicies(t *testing.T) {
local := structs.ACLPolicies{
&structs.ACLPolicy{
@ -671,3 +215,293 @@ func TestACLReplication_diffACLTokens(t *testing.T) {
"539f1cb6-40aa-464f-ae66-a900d26bc1b2",
"c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926"})
}
func TestACLReplication_Tokens(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
client := rpcClient(t, s1)
defer client.Close()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLTokenReplication = true
c.ACLReplicationRate = 100
c.ACLReplicationBurst = 100
c.ACLReplicationApplyLimit = 1000000
})
s2.tokens.UpdateACLReplicationToken("root")
testrpc.WaitForLeader(t, s2.RPC, "dc2")
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join.
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
// Create a bunch of new tokens and policies
var tokens structs.ACLTokens
for i := 0; i < 50; i++ {
arg := structs.ACLTokenSetRequest{
Datacenter: "dc1",
ACLToken: structs.ACLToken{
Description: fmt.Sprintf("token-%d", i),
Policies: []structs.ACLTokenPolicyLink{
structs.ACLTokenPolicyLink{
ID: structs.ACLPolicyGlobalManagementID,
},
},
Local: false,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var token structs.ACLToken
require.NoError(t, s1.RPC("ACL.TokenSet", &arg, &token))
tokens = append(tokens, &token)
}
checkSame := func(t *retry.R) error {
// only account for global tokens - local tokens shouldn't be replicated
index, remote, err := s1.fsm.State().ACLTokenList(nil, false, true, "")
require.NoError(t, err)
_, local, err := s2.fsm.State().ACLTokenList(nil, false, true, "")
require.NoError(t, err)
require.Len(t, local, len(remote))
for i, token := range remote {
require.Equal(t, token.Hash, local[i].Hash)
}
var status structs.ACLReplicationStatus
s2.aclReplicationStatusLock.RLock()
status = s2.aclReplicationStatus
s2.aclReplicationStatusLock.RUnlock()
if !status.Enabled || !status.Running ||
status.ReplicationType != structs.ACLReplicateTokens ||
status.ReplicatedTokenIndex != index ||
status.SourceDatacenter != "dc1" {
return fmt.Errorf("ACL replication status differs")
}
return nil
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
checkSame(r)
})
// add some local tokens to the secondary DC
// these shouldn't be deleted by replication
for i := 0; i < 50; i++ {
arg := structs.ACLTokenSetRequest{
Datacenter: "dc2",
ACLToken: structs.ACLToken{
Description: fmt.Sprintf("token-%d", i),
Policies: []structs.ACLTokenPolicyLink{
structs.ACLTokenPolicyLink{
ID: structs.ACLPolicyGlobalManagementID,
},
},
Local: true,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var token structs.ACLToken
require.NoError(t, s2.RPC("ACL.TokenSet", &arg, &token))
}
// add some local tokens to the primary DC
// these shouldn't be replicated to the secondary DC
for i := 0; i < 50; i++ {
arg := structs.ACLTokenSetRequest{
Datacenter: "dc1",
ACLToken: structs.ACLToken{
Description: fmt.Sprintf("token-%d", i),
Policies: []structs.ACLTokenPolicyLink{
structs.ACLTokenPolicyLink{
ID: structs.ACLPolicyGlobalManagementID,
},
},
Local: true,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var token structs.ACLToken
require.NoError(t, s1.RPC("ACL.TokenSet", &arg, &token))
}
// Update those other tokens
for i := 0; i < 50; i++ {
arg := structs.ACLTokenSetRequest{
Datacenter: "dc1",
ACLToken: structs.ACLToken{
AccessorID: tokens[i].AccessorID,
SecretID: tokens[i].SecretID,
Description: fmt.Sprintf("token-%d-modified", i),
Policies: []structs.ACLTokenPolicyLink{
structs.ACLTokenPolicyLink{
ID: structs.ACLPolicyGlobalManagementID,
},
},
Local: false,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var token structs.ACLToken
require.NoError(t, s1.RPC("ACL.TokenSet", &arg, &token))
}
// Wait for the replica to converge.
// this time it also verifies the local tokens from the primary were not replicated.
retry.Run(t, func(r *retry.R) {
checkSame(r)
})
// verify dc2 local tokens didn't get blown away
_, local, err := s2.fsm.State().ACLTokenList(nil, true, false, "")
require.NoError(t, err)
require.Len(t, local, 50)
for _, token := range tokens {
arg := structs.ACLTokenDeleteRequest{
Datacenter: "dc1",
TokenID: token.AccessorID,
WriteRequest: structs.WriteRequest{Token: "root"},
}
var dontCare string
require.NoError(t, s1.RPC("ACL.TokenDelete", &arg, &dontCare))
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
checkSame(r)
})
}
func TestACLReplication_Policies(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
client := rpcClient(t, s1)
defer client.Close()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLTokenReplication = false
c.ACLReplicationRate = 100
c.ACLReplicationBurst = 100
c.ACLReplicationApplyLimit = 1000000
})
s2.tokens.UpdateACLReplicationToken("root")
testrpc.WaitForLeader(t, s2.RPC, "dc2")
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join.
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
// Create a bunch of new policies
var policies structs.ACLPolicies
for i := 0; i < 50; i++ {
arg := structs.ACLPolicySetRequest{
Datacenter: "dc1",
Policy: structs.ACLPolicy{
Name: fmt.Sprintf("token-%d", i),
Description: fmt.Sprintf("token-%d", i),
Rules: fmt.Sprintf(`service "app-%d" { policy = "read" }`, i),
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var policy structs.ACLPolicy
require.NoError(t, s1.RPC("ACL.PolicySet", &arg, &policy))
policies = append(policies, &policy)
}
checkSame := func(t *retry.R) error {
// only account for global tokens - local tokens shouldn't be replicated
index, remote, err := s1.fsm.State().ACLPolicyList(nil)
require.NoError(t, err)
_, local, err := s2.fsm.State().ACLPolicyList(nil)
require.NoError(t, err)
require.Len(t, local, len(remote))
for i, policy := range remote {
require.Equal(t, policy.Hash, local[i].Hash)
}
var status structs.ACLReplicationStatus
s2.aclReplicationStatusLock.RLock()
status = s2.aclReplicationStatus
s2.aclReplicationStatusLock.RUnlock()
if !status.Enabled || !status.Running ||
status.ReplicationType != structs.ACLReplicatePolicies ||
status.ReplicatedIndex != index ||
status.SourceDatacenter != "dc1" {
return fmt.Errorf("ACL replication status differs")
}
return nil
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
checkSame(r)
})
// Update those policies
for i := 0; i < 50; i++ {
arg := structs.ACLPolicySetRequest{
Datacenter: "dc1",
Policy: structs.ACLPolicy{
ID: policies[i].ID,
Name: fmt.Sprintf("token-%d-modified", i),
Description: fmt.Sprintf("token-%d-modified", i),
Rules: policies[i].Rules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var policy structs.ACLPolicy
require.NoError(t, s1.RPC("ACL.PolicySet", &arg, &policy))
}
// Wait for the replica to converge.
// this time it also verifies the local tokens from the primary were not replicated.
retry.Run(t, func(r *retry.R) {
checkSame(r)
})
for _, policy := range policies {
arg := structs.ACLPolicyDeleteRequest{
Datacenter: "dc1",
PolicyID: policy.ID,
WriteRequest: structs.WriteRequest{Token: "root"},
}
var dontCare string
require.NoError(t, s1.RPC("ACL.PolicyDelete", &arg, &dontCare))
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
checkSame(r)
})
}

View File

@ -76,19 +76,19 @@ func (s *Server) canUpgradeToNewACLs(isLeader bool) bool {
}
if !s.InACLDatacenter() {
mode, _ := ServersGetACLMode(s.WANMembers(), "", s.config.ACLDatacenter)
if mode != structs.ACLModeEnabled {
numServers, mode, _ := ServersGetACLMode(s.WANMembers(), "", s.config.ACLDatacenter)
if mode != structs.ACLModeEnabled || numServers == 0 {
return false
}
}
if isLeader {
if mode, _ := ServersGetACLMode(s.LANMembers(), "", ""); mode == structs.ACLModeLegacy {
if _, mode, _ := ServersGetACLMode(s.LANMembers(), "", ""); mode == structs.ACLModeLegacy {
return true
}
} else {
leader := string(s.raft.Leader())
if _, leaderMode := ServersGetACLMode(s.LANMembers(), leader, ""); leaderMode == structs.ACLModeEnabled {
if _, _, leaderMode := ServersGetACLMode(s.LANMembers(), leader, ""); leaderMode == structs.ACLModeEnabled {
return true
}
}
@ -97,7 +97,7 @@ func (s *Server) canUpgradeToNewACLs(isLeader bool) bool {
}
func (s *Server) InACLDatacenter() bool {
return s.config.Datacenter == s.config.ACLDatacenter
return s.config.ACLDatacenter == "" || s.config.Datacenter == s.config.ACLDatacenter
}
func (s *Server) UseLegacyACLs() bool {

View File

@ -163,8 +163,8 @@ type ACLResolverTestDelegate struct {
localTokens bool
localPolicies bool
getPolicyFn func(*structs.ACLPolicyResolveLegacyRequest, *structs.ACLPolicyResolveLegacyResponse) error
tokenReadFn func(*structs.ACLTokenReadRequest, *structs.ACLTokenResponse) error
policyResolveFn func(*structs.ACLPolicyBatchReadRequest, *structs.ACLPoliciesResponse) error
tokenReadFn func(*structs.ACLTokenGetRequest, *structs.ACLTokenResponse) error
policyResolveFn func(*structs.ACLPolicyBatchGetRequest, *structs.ACLPolicyBatchResponse) error
}
func (d *ACLResolverTestDelegate) ACLsEnabled() bool {
@ -204,12 +204,12 @@ func (d *ACLResolverTestDelegate) RPC(method string, args interface{}, reply int
panic("Bad Test Implmentation: should provide a getPolicyFn to the ACLResolverTestDelegate")
case "ACL.TokenRead":
if d.tokenReadFn != nil {
return d.tokenReadFn(args.(*structs.ACLTokenReadRequest), reply.(*structs.ACLTokenResponse))
return d.tokenReadFn(args.(*structs.ACLTokenGetRequest), reply.(*structs.ACLTokenResponse))
}
panic("Bad Test Implmentation: should provide a tokenReadFn to the ACLResolverTestDelegate")
case "ACL.PolicyResolve":
if d.policyResolveFn != nil {
return d.policyResolveFn(args.(*structs.ACLPolicyBatchReadRequest), reply.(*structs.ACLPoliciesResponse))
return d.policyResolveFn(args.(*structs.ACLPolicyBatchGetRequest), reply.(*structs.ACLPolicyBatchResponse))
}
panic("Bad Test Implmentation: should provide a policyResolveFn to the ACLResolverTestDelegate")
}
@ -300,7 +300,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
legacy: false,
localTokens: false,
localPolicies: true,
tokenReadFn: func(*structs.ACLTokenReadRequest, *structs.ACLTokenResponse) error {
tokenReadFn: func(*structs.ACLTokenGetRequest, *structs.ACLTokenResponse) error {
return fmt.Errorf("Induced RPC Error")
},
}
@ -322,7 +322,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
legacy: false,
localTokens: false,
localPolicies: true,
tokenReadFn: func(*structs.ACLTokenReadRequest, *structs.ACLTokenResponse) error {
tokenReadFn: func(*structs.ACLTokenGetRequest, *structs.ACLTokenResponse) error {
return fmt.Errorf("Induced RPC Error")
},
}
@ -345,7 +345,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
legacy: false,
localTokens: true,
localPolicies: false,
policyResolveFn: func(args *structs.ACLPolicyBatchReadRequest, reply *structs.ACLPoliciesResponse) error {
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
if !policyCached {
for _, policyID := range args.PolicyIDs {
_, policy, _ := testPolicyForID(policyID)
@ -388,7 +388,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
legacy: false,
localTokens: false,
localPolicies: true,
tokenReadFn: func(args *structs.ACLTokenReadRequest, reply *structs.ACLTokenResponse) error {
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
if !cached {
_, token, _ := testIdentityForToken("found")
reply.Token = token.(*structs.ACLToken)
@ -425,7 +425,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
legacy: false,
localTokens: true,
localPolicies: false,
policyResolveFn: func(args *structs.ACLPolicyBatchReadRequest, reply *structs.ACLPoliciesResponse) error {
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
if !policyCached {
for _, policyID := range args.PolicyIDs {
_, policy, _ := testPolicyForID(policyID)
@ -468,7 +468,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
legacy: false,
localTokens: true,
localPolicies: false,
policyResolveFn: func(args *structs.ACLPolicyBatchReadRequest, reply *structs.ACLPoliciesResponse) error {
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
if !policyCached {
for _, policyID := range args.PolicyIDs {
_, policy, _ := testPolicyForID(policyID)
@ -523,7 +523,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
legacy: false,
localTokens: false,
localPolicies: false,
tokenReadFn: func(args *structs.ACLTokenReadRequest, reply *structs.ACLTokenResponse) error {
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
if !tokenCached {
_, token, _ := testIdentityForToken("found")
reply.Token = token.(*structs.ACLToken)
@ -532,7 +532,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
}
return fmt.Errorf("Induced RPC Error")
},
policyResolveFn: func(args *structs.ACLPolicyBatchReadRequest, reply *structs.ACLPoliciesResponse) error {
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
if !policyCached {
for _, policyID := range args.PolicyIDs {
_, policy, _ := testPolicyForID(policyID)
@ -576,7 +576,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
legacy: false,
localTokens: false,
localPolicies: true,
tokenReadFn: func(args *structs.ACLTokenReadRequest, reply *structs.ACLTokenResponse) error {
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
if !cached {
_, token, _ := testIdentityForToken("found")
reply.Token = token.(*structs.ACLToken)
@ -725,7 +725,7 @@ func TestACLResolver_LocalPolicies(t *testing.T) {
legacy: false,
localTokens: false,
localPolicies: true,
tokenReadFn: func(args *structs.ACLTokenReadRequest, reply *structs.ACLTokenResponse) error {
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
_, token, err := testIdentityForToken(args.TokenID)
if token != nil {

View File

@ -384,6 +384,16 @@ func (c *Client) Stats() map[string]map[string]string {
"runtime": runtimeStats(),
}
if c.ACLsEnabled() {
if c.UseLegacyACLs() {
stats["consul"]["acl"] = "legacy"
} else {
stats["consul"]["acl"] = "enabled"
}
} else {
stats["consul"]["acl"] = "disabled"
}
for outerKey, outerValue := range c.enterpriseStats() {
if _, ok := stats[outerKey]; ok {
for innerKey, innerValue := range outerValue {

View File

@ -23,10 +23,10 @@ func init() {
registerCommand(structs.AutopilotRequestType, (*FSM).applyAutopilotUpdate)
registerCommand(structs.IntentionRequestType, (*FSM).applyIntentionOperation)
registerCommand(structs.ConnectCARequestType, (*FSM).applyConnectCAOperation)
registerCommand(structs.ACLTokenUpsertRequestType, (*FSM).applyACLTokenUpsertOperation)
registerCommand(structs.ACLTokenSetRequestType, (*FSM).applyACLTokenSetOperation)
registerCommand(structs.ACLTokenDeleteRequestType, (*FSM).applyACLTokenDeleteOperation)
registerCommand(structs.ACLBootstrapRequestType, (*FSM).applyACLTokenBootstrap)
registerCommand(structs.ACLPolicyUpsertRequestType, (*FSM).applyACLPolicyUpsertOperation)
registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation)
registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation)
}
@ -179,7 +179,7 @@ func (c *FSM) applyACLOperation(buf []byte, index uint64) interface{} {
}
return req.ACL.ID
case structs.ACLDelete:
return c.state.ACLTokenDeleteSecret(index, req.ACL.ID)
return c.state.ACLTokenDeleteBySecret(index, req.ACL.ID)
default:
c.logger.Printf("[WARN] consul.fsm: Invalid ACL operation '%s'", req.Op)
return fmt.Errorf("Invalid ACL operation '%s'", req.Op)
@ -351,15 +351,15 @@ func (c *FSM) applyConnectCAOperation(buf []byte, index uint64) interface{} {
}
}
func (c *FSM) applyACLTokenUpsertOperation(buf []byte, index uint64) interface{} {
var req structs.ACLTokenBatchUpsertRequest
func (c *FSM) applyACLTokenSetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLTokenBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "token"}, time.Now(),
[]metrics.Label{{Name: "op", Value: "upsert"}})
return c.state.ACLTokensUpsert(index, req.Tokens, req.AllowCreate)
return c.state.ACLTokenBatchSet(index, req.Tokens, req.CAS)
}
func (c *FSM) applyACLTokenDeleteOperation(buf []byte, index uint64) interface{} {
@ -370,7 +370,7 @@ func (c *FSM) applyACLTokenDeleteOperation(buf []byte, index uint64) interface{}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "token"}, time.Now(),
[]metrics.Label{{Name: "op", Value: "delete"}})
return c.state.ACLTokensDelete(index, req.TokenIDs)
return c.state.ACLTokenBatchDelete(index, req.TokenIDs)
}
func (c *FSM) applyACLTokenBootstrap(buf []byte, index uint64) interface{} {
@ -383,15 +383,15 @@ func (c *FSM) applyACLTokenBootstrap(buf []byte, index uint64) interface{} {
return c.state.ACLBootstrap(index, req.ResetIndex, &req.Token, false)
}
func (c *FSM) applyACLPolicyUpsertOperation(buf []byte, index uint64) interface{} {
var req structs.ACLPolicyBatchUpsertRequest
func (c *FSM) applyACLPolicySetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLPolicyBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "policy"}, time.Now(),
[]metrics.Label{{Name: "op", Value: "upsert"}})
return c.state.ACLPoliciesUpsert(index, req.Policies)
return c.state.ACLPolicyBatchSet(index, req.Policies)
}
func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{} {
@ -402,5 +402,5 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "policy"}, time.Now(),
[]metrics.Label{{Name: "op", Value: "delete"}})
return c.state.ACLPoliciesDelete(index, req.PolicyIDs)
return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs)
}

View File

@ -25,8 +25,8 @@ func init() {
registerRestorer(structs.ConnectCAProviderStateType, restoreConnectCAProviderState)
registerRestorer(structs.ConnectCAConfigType, restoreConnectCAConfig)
registerRestorer(structs.IndexRequestType, restoreIndex)
registerRestorer(structs.ACLTokenUpsertRequestType, restoreToken)
registerRestorer(structs.ACLPolicyUpsertRequestType, restorePolicy)
registerRestorer(structs.ACLTokenSetRequestType, restoreToken)
registerRestorer(structs.ACLPolicySetRequestType, restorePolicy)
}
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
@ -173,7 +173,7 @@ func (s *snapshot) persistACLs(sink raft.SnapshotSink,
}
for token := tokens.Next(); token != nil; token = tokens.Next() {
if _, err := sink.Write([]byte{byte(structs.ACLTokenUpsertRequestType)}); err != nil {
if _, err := sink.Write([]byte{byte(structs.ACLTokenSetRequestType)}); err != nil {
return err
}
if err := encoder.Encode(token.(*structs.ACLToken)); err != nil {
@ -187,7 +187,7 @@ func (s *snapshot) persistACLs(sink raft.SnapshotSink,
}
for policy := policies.Next(); policy != nil; policy = policies.Next() {
if _, err := sink.Write([]byte{byte(structs.ACLPolicyUpsertRequestType)}); err != nil {
if _, err := sink.Write([]byte{byte(structs.ACLPolicySetRequestType)}); err != nil {
return err
}
if err := encoder.Encode(policy.(*structs.ACLPolicy)); err != nil {

View File

@ -433,10 +433,10 @@ func (s *Server) initializeACLs(upgrade bool) error {
}
policy.SetHash(true)
req := structs.ACLPolicyBatchUpsertRequest{
req := structs.ACLPolicyBatchSetRequest{
Policies: structs.ACLPolicies{&policy},
}
_, err := s.raftApply(structs.ACLPolicyUpsertRequestType, &req)
_, err := s.raftApply(structs.ACLPolicySetRequestType, &req)
if err != nil {
return fmt.Errorf("failed to create global-management policy: %v", err)
}
@ -497,10 +497,11 @@ func (s *Server) initializeACLs(upgrade bool) error {
if !done {
// either we didn't attempt to or setting the token with a bootstrap request failed.
req := structs.ACLTokenBatchUpsertRequest{
req := structs.ACLTokenBatchSetRequest{
Tokens: structs.ACLTokens{&token},
CAS: false,
}
if _, err := s.raftApply(structs.ACLTokenUpsertRequestType, &req); err != nil {
if _, err := s.raftApply(structs.ACLTokenSetRequestType, &req); err != nil {
return fmt.Errorf("failed to create master token: %v", err)
}
@ -531,11 +532,11 @@ func (s *Server) initializeACLs(upgrade bool) error {
}
token.SetHash(true)
req := structs.ACLTokenBatchUpsertRequest{
Tokens: structs.ACLTokens{token},
AllowCreate: true,
req := structs.ACLTokenBatchSetRequest{
Tokens: structs.ACLTokens{token},
CAS: false,
}
_, err := s.raftApply(structs.ACLTokenUpsertRequestType, &req)
_, err := s.raftApply(structs.ACLTokenSetRequestType, &req)
if err != nil {
return fmt.Errorf("failed to create anonymous token: %v", err)
}
@ -623,12 +624,16 @@ func (s *Server) startACLUpgrade() {
newToken.Policies = append(newToken.Policies, structs.ACLTokenPolicyLink{ID: structs.ACLPolicyGlobalManagementID})
}
// need to copy these as we are going to do a CAS operation.
newToken.CreateIndex = token.CreateIndex
newToken.ModifyIndex = token.ModifyIndex
newTokens = append(newTokens, &newToken)
}
req := &structs.ACLTokenBatchUpsertRequest{Tokens: newTokens, AllowCreate: false}
req := &structs.ACLTokenBatchSetRequest{Tokens: newTokens, CAS: true}
resp, err := s.raftApply(structs.ACLTokenUpsertRequestType, req)
resp, err := s.raftApply(structs.ACLTokenSetRequestType, req)
if err != nil {
s.logger.Printf("[ERR] acl: failed to apply acl token upgrade batch: %v", err)
}

View File

@ -984,33 +984,21 @@ func TestLeader_ACL_Initialization(t *testing.T) {
if tt.master != "" {
_, master, err := s1.fsm.State().ACLTokenGetBySecret(nil, tt.master)
if err != nil {
t.Fatalf("err: %v", err)
}
if master == nil {
t.Fatalf("master token wasn't created")
}
require.NoError(t, err)
require.NotNil(t, master)
}
_, anon, err := s1.fsm.State().ACLTokenGetBySecret(nil, anonymousToken)
if err != nil {
t.Fatalf("err: %v", err)
}
if anon == nil {
t.Fatalf("anonymous token wasn't created")
}
require.NoError(t, err)
require.NotNil(t, anon)
canBootstrap, _, err := s1.fsm.State().CanBootstrapACLToken()
if err != nil {
t.Fatalf("err: %v", err)
}
if tt.bootstrap {
if !canBootstrap {
t.Fatalf("bootstrap should be allowed")
}
} else if canBootstrap {
t.Fatalf("bootstrap should not be allowed")
}
require.NoError(t, err)
require.Equal(t, tt.bootstrap, canBootstrap)
_, policy, err := s1.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID)
require.NoError(t, err)
require.NotNil(t, policy)
})
}
}
@ -1156,3 +1144,65 @@ func TestLeader_PersistIntermediateCAs(t *testing.T) {
}
})
}
func TestLeader_ACLUpgrade(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLsEnabled = true
c.ACLMasterToken = "root"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// create a legacy management ACL
mgmt := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "Management token",
Type: structs.ACLTokenTypeManagement,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var mgmt_id string
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ACL.Apply", &mgmt, &mgmt_id))
// wait for it to be upgraded
retry.Run(t, func(t *retry.R) {
_, token, err := s1.fsm.State().ACLTokenGetBySecret(nil, mgmt_id)
require.NoError(t, err)
require.NotNil(t, token)
require.NotEqual(t, "", token.AccessorID)
require.Equal(t, structs.ACLTokenTypeManagement, token.Type)
require.Len(t, token.Policies, 1)
require.Equal(t, structs.ACLPolicyGlobalManagementID, token.Policies[0].ID)
})
// create a legacy management ACL
client := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "Management token",
Type: structs.ACLTokenTypeClient,
Rules: `node "" { policy = "read"}`,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var client_id string
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ACL.Apply", &client, &client_id))
// wait for it to be upgraded
retry.Run(t, func(t *retry.R) {
_, token, err := s1.fsm.State().ACLTokenGetBySecret(nil, client_id)
require.NoError(t, err)
require.NotNil(t, token)
require.NotEqual(t, "", token.AccessorID)
require.Len(t, token.Policies, 0)
require.Equal(t, structs.ACLTokenTypeClient, token.Type)
require.Equal(t, client.ACL.Rules, token.Rules)
})
}

View File

@ -1071,6 +1071,17 @@ func (s *Server) Stats() map[string]map[string]string {
"serf_lan": s.serfLAN.Stats(),
"runtime": runtimeStats(),
}
if s.ACLsEnabled() {
if s.UseLegacyACLs() {
stats["consul"]["acl"] = "legacy"
} else {
stats["consul"]["acl"] = "enabled"
}
} else {
stats["consul"]["acl"] = "disabled"
}
if s.serfWAN != nil {
stats["serf_wan"] = s.serfWAN.Stats()
}

View File

@ -71,7 +71,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.Tags["use_tls"] = "1"
}
if s.config.ACLDatacenter != "" {
if s.acls.ACLsEnabled() {
// we start in legacy mode and allow upgrading later
conf.Tags["acls"] = string(structs.ACLModeLegacy)
} else {

View File

@ -63,7 +63,8 @@ func tokensTableSchema() *memdb.TableSchema {
Name: "acl-tokens",
Indexes: map[string]*memdb.IndexSchema{
"accessor": &memdb.IndexSchema{
Name: "accessor",
Name: "accessor",
// DEPRECATED (ACL-Legacy-Compat) - we should not AllowMissing here once legacy compat is removed
AllowMissing: true,
Unique: true,
Indexer: &memdb.UUIDFieldIndex{
@ -141,15 +142,6 @@ func policiesTableSchema() *memdb.TableSchema {
Lowercase: true,
},
},
"datacenters": &memdb.IndexSchema{
Name: "datacenters",
AllowMissing: true,
Unique: false,
Indexer: &memdb.StringSliceFieldIndex{
Field: "Datacenters",
Lowercase: false,
},
},
},
}
}
@ -220,7 +212,7 @@ func (s *Store) ACLBootstrap(idx, resetIndex uint64, token *structs.ACLToken, le
}
}
if err := s.aclTokenSetTxn(tx, idx, token, true, false, legacy); err != nil {
if err := s.aclTokenSetTxn(tx, idx, token, false, false, legacy); err != nil {
return fmt.Errorf("failed inserting bootstrap token: %v", err)
}
if err := indexUpdateMaxTxn(tx, idx, "acl-tokens"); err != nil {
@ -280,7 +272,7 @@ func (s *Store) ACLTokenSet(idx uint64, token *structs.ACLToken, legacy bool) er
defer tx.Abort()
// Call set on the ACL
if err := s.aclTokenSetTxn(tx, idx, token, true, false, legacy); err != nil {
if err := s.aclTokenSetTxn(tx, idx, token, false, false, legacy); err != nil {
return err
}
@ -292,14 +284,14 @@ func (s *Store) ACLTokenSet(idx uint64, token *structs.ACLToken, legacy bool) er
return nil
}
func (s *Store) ACLTokensUpsert(idx uint64, tokens structs.ACLTokens, allowCreate bool) error {
func (s *Store) ACLTokenBatchSet(idx uint64, tokens structs.ACLTokens, cas bool) error {
tx := s.db.Txn(true)
defer tx.Abort()
for _, token := range tokens {
// this is only used when doing batch insertions for upgrades and replication. Therefore
// we take whatever those said.
if err := s.aclTokenSetTxn(tx, idx, token, allowCreate, true, false); err != nil {
if err := s.aclTokenSetTxn(tx, idx, token, cas, true, false); err != nil {
return err
}
}
@ -314,7 +306,7 @@ func (s *Store) ACLTokensUpsert(idx uint64, tokens structs.ACLTokens, allowCreat
// aclTokenSetTxn is the inner method used to insert an ACL token with the
// proper indexes into the state store.
func (s *Store) aclTokenSetTxn(tx *memdb.Txn, idx uint64, token *structs.ACLToken, allowCreate, allowMissingPolicyIDs, legacy bool) error {
func (s *Store) aclTokenSetTxn(tx *memdb.Txn, idx uint64, token *structs.ACLToken, cas, allowMissingPolicyIDs, legacy bool) error {
// Check that the ID is set
if token.SecretID == "" {
return ErrMissingACLTokenSecret
@ -331,13 +323,29 @@ func (s *Store) aclTokenSetTxn(tx *memdb.Txn, idx uint64, token *structs.ACLToke
return fmt.Errorf("failed token lookup: %s", err)
}
if existing == nil && !allowCreate {
return nil
var original *structs.ACLToken
if existing != nil {
original = existing.(*structs.ACLToken)
}
if legacy && existing != nil {
original := existing.(*structs.ACLToken)
if len(original.Policies) > 0 {
if cas {
// set-if-unset case
if token.ModifyIndex == 0 && original != nil {
return nil
}
// token already deleted
if token.ModifyIndex != 0 && original == nil {
return nil
}
// check for other modifications
if token.ModifyIndex != 0 && token.ModifyIndex != original.ModifyIndex {
return nil
}
}
if legacy && original != nil {
if len(original.Policies) > 0 || original.Type == "" {
return fmt.Errorf("failed inserting acl token: cannot use legacy endpoint to modify a non-legacy token")
}
@ -349,8 +357,16 @@ func (s *Store) aclTokenSetTxn(tx *memdb.Txn, idx uint64, token *structs.ACLToke
}
// Set the indexes
if existing != nil {
token.CreateIndex = existing.(*structs.ACLToken).CreateIndex
if original != nil {
if original.AccessorID != "" && token.AccessorID != original.AccessorID {
return fmt.Errorf("The ACL Token AccessorID field is immutable")
}
if token.SecretID != original.SecretID {
return fmt.Errorf("The ACL Token SecretID field is immutable")
}
token.CreateIndex = original.CreateIndex
token.ModifyIndex = idx
} else {
token.CreateIndex = idx
@ -389,7 +405,7 @@ func (s *Store) aclTokenGet(ws memdb.WatchSet, value, index string) (uint64, *st
return idx, token, nil
}
func (s *Store) ACLTokenBatchRead(ws memdb.WatchSet, accessors []string) (uint64, structs.ACLTokens, error) {
func (s *Store) ACLTokenBatchGet(ws memdb.WatchSet, accessors []string) (uint64, structs.ACLTokens, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -511,19 +527,19 @@ func (s *Store) ACLTokenListUpgradeable(max int) (structs.ACLTokens, <-chan stru
return tokens, iter.WatchCh(), nil
}
// ACLTokenDeleteSecret is used to remove an existing ACL from the state store. If
// ACLTokenDeleteBySecret is used to remove an existing ACL from the state store. If
// the ACL does not exist this is a no-op and no error is returned.
func (s *Store) ACLTokenDeleteSecret(idx uint64, secret string) error {
func (s *Store) ACLTokenDeleteBySecret(idx uint64, secret string) error {
return s.aclTokenDelete(idx, secret, "id")
}
// ACLTokenDeleteAccessor is used to remove an existing ACL from the state store. If
// ACLTokenDeleteByAccessor is used to remove an existing ACL from the state store. If
// the ACL does not exist this is a no-op and no error is returned.
func (s *Store) ACLTokenDeleteAccessor(idx uint64, accessor string) error {
func (s *Store) ACLTokenDeleteByAccessor(idx uint64, accessor string) error {
return s.aclTokenDelete(idx, accessor, "accessor")
}
func (s *Store) ACLTokensDelete(idx uint64, tokenIDs []string) error {
func (s *Store) ACLTokenBatchDelete(idx uint64, tokenIDs []string) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -541,7 +557,9 @@ func (s *Store) aclTokenDelete(idx uint64, value, index string) error {
tx := s.db.Txn(true)
defer tx.Abort()
s.aclTokenDeleteTxn(tx, idx, value, index)
if err := s.aclTokenDeleteTxn(tx, idx, value, index); err != nil {
return err
}
tx.Commit()
return nil
@ -558,6 +576,10 @@ func (s *Store) aclTokenDeleteTxn(tx *memdb.Txn, idx uint64, value, index string
return nil
}
if token.(*structs.ACLToken).AccessorID == structs.ACLTokenAnonymousID {
return fmt.Errorf("Deletion of the builtin anonymous token is not permitted")
}
if err := tx.Delete("acl-tokens", token); err != nil {
return fmt.Errorf("failed deleting acl token: %v", err)
}
@ -567,7 +589,7 @@ func (s *Store) aclTokenDeleteTxn(tx *memdb.Txn, idx uint64, value, index string
return nil
}
func (s *Store) ACLPoliciesUpsert(idx uint64, policies structs.ACLPolicies) error {
func (s *Store) ACLPolicyBatchSet(idx uint64, policies structs.ACLPolicies) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -664,7 +686,7 @@ func (s *Store) ACLPolicyGetByName(ws memdb.WatchSet, name string) (uint64, *str
return s.aclPolicyGet(ws, name, "name")
}
func (s *Store) ACLPolicyBatchRead(ws memdb.WatchSet, ids []string) (uint64, structs.ACLPolicies, error) {
func (s *Store) ACLPolicyBatchGet(ws memdb.WatchSet, ids []string) (uint64, structs.ACLPolicies, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -713,19 +735,11 @@ func (s *Store) aclPolicyGet(ws memdb.WatchSet, value, index string) (uint64, *s
return idx, policy, nil
}
func (s *Store) ACLPolicyList(ws memdb.WatchSet, datacenter string) (uint64, structs.ACLPolicies, error) {
func (s *Store) ACLPolicyList(ws memdb.WatchSet) (uint64, structs.ACLPolicies, error) {
tx := s.db.Txn(false)
defer tx.Abort()
var iter memdb.ResultIterator
var err error
if datacenter != "" {
iter, err = tx.Get("acl-policies", "datacenters", datacenter)
} else {
iter, err = tx.Get("acl-policies", "id")
}
iter, err := tx.Get("acl-policies", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed acl policy lookup: %v", err)
}
@ -750,7 +764,7 @@ func (s *Store) ACLPolicyDeleteByName(idx uint64, name string) error {
return s.aclPolicyDelete(idx, name, "name")
}
func (s *Store) ACLPoliciesDelete(idx uint64, policyIDs []string) error {
func (s *Store) ACLPolicyBatchDelete(idx uint64, policyIDs []string) error {
tx := s.db.Txn(true)
defer tx.Abort()

File diff suppressed because it is too large Load Diff

View File

@ -284,15 +284,19 @@ func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Versio
return true
}
func ServersGetACLMode(members []serf.Member, leader string, datacenter string) (mode structs.ACLMode, leaderMode structs.ACLMode) {
func ServersGetACLMode(members []serf.Member, leader string, datacenter string) (numServers int, mode structs.ACLMode, leaderMode structs.ACLMode) {
numServers = 0
mode = structs.ACLModeEnabled
leaderMode = structs.ACLModeDisabled
leaderMode = structs.ACLModeUnknown
for _, member := range members {
if valid, parts := metadata.IsConsulServer(member); valid {
if datacenter != "" && parts.Datacenter != datacenter {
continue
}
numServers += 1
if memberAddr := (&net.TCPAddr{IP: member.Addr, Port: parts.Port}).String(); memberAddr == leader {
leaderMode = parts.ACLs
}

View File

@ -499,27 +499,27 @@ type ACLReplicationStatus struct {
LastError time.Time
}
// ACLTokenUpsertRequest is used for token creation and update operations
// ACLTokenSetRequest is used for token creation and update operations
// at the RPC layer
type ACLTokenUpsertRequest struct {
type ACLTokenSetRequest struct {
ACLToken ACLToken // Token to manipulate - I really dislike this name but "Token" is taken in the WriteRequest
Datacenter string // The datacenter to perform the request within
WriteRequest
}
func (r *ACLTokenUpsertRequest) RequestDatacenter() string {
func (r *ACLTokenSetRequest) RequestDatacenter() string {
return r.Datacenter
}
// ACLTokenReadRequest is used for token read operations at the RPC layer
type ACLTokenReadRequest struct {
// ACLTokenGetRequest is used for token read operations at the RPC layer
type ACLTokenGetRequest struct {
TokenID string // id used for the token lookup
TokenIDType ACLTokenIDType // The Type of ID used to lookup the token
Datacenter string // The datacenter to perform the request within
QueryOptions
}
func (r *ACLTokenReadRequest) RequestDatacenter() string {
func (r *ACLTokenGetRequest) RequestDatacenter() string {
return r.Datacenter
}
@ -554,27 +554,27 @@ type ACLTokenListResponse struct {
QueryMeta
}
// ACLTokenBatchReadRequest is used for reading multiple tokens, this is
// ACLTokenBatchGetRequest is used for reading multiple tokens, this is
// different from the the token list request in that only tokens with the
// the requested ids are returned
type ACLTokenBatchReadRequest struct {
type ACLTokenBatchGetRequest struct {
AccessorIDs []string // List of accessor ids to fetch
Datacenter string // The datacenter to perform the request within
QueryOptions
}
func (r *ACLTokenBatchReadRequest) RequestDatacenter() string {
func (r *ACLTokenBatchGetRequest) RequestDatacenter() string {
return r.Datacenter
}
// ACLTokenBatchUpsertRequest is used only at the Raft layer
// ACLTokenBatchSetRequest is used only at the Raft layer
// for batching multiple token creation/update operations
//
// This is particularly useful during token replication and during
// automatic legacy token upgrades.
type ACLTokenBatchUpsertRequest struct {
Tokens ACLTokens
AllowCreate bool
type ACLTokenBatchSetRequest struct {
Tokens ACLTokens
CAS bool
}
// ACLTokenBatchDeleteRequest is used only at the Raft layer
@ -603,20 +603,20 @@ type ACLTokenResponse struct {
QueryMeta
}
// ACLTokensResponse returns multiple Tokens associated with the same metadata
type ACLTokensResponse struct {
// ACLTokenBatchResponse returns multiple Tokens associated with the same metadata
type ACLTokenBatchResponse struct {
Tokens []*ACLToken
QueryMeta
}
// ACLPolicyUpsertRequest is used at the RPC layer for creation and update requests
type ACLPolicyUpsertRequest struct {
// ACLPolicySetRequest is used at the RPC layer for creation and update requests
type ACLPolicySetRequest struct {
Policy ACLPolicy // The policy to upsert
Datacenter string // The datacenter to perform the request within
WriteRequest
}
func (r *ACLPolicyUpsertRequest) RequestDatacenter() string {
func (r *ACLPolicySetRequest) RequestDatacenter() string {
return r.Datacenter
}
@ -631,20 +631,19 @@ func (r *ACLPolicyDeleteRequest) RequestDatacenter() string {
return r.Datacenter
}
// ACLPolicyReadRequest is used at the RPC layer to perform policy read operations
type ACLPolicyReadRequest struct {
// ACLPolicyGetRequest is used at the RPC layer to perform policy read operations
type ACLPolicyGetRequest struct {
PolicyID string // id used for the policy lookup
Datacenter string // The datacenter to perform the request within
QueryOptions
}
func (r *ACLPolicyReadRequest) RequestDatacenter() string {
func (r *ACLPolicyGetRequest) RequestDatacenter() string {
return r.Datacenter
}
// ACLPolicyListRequest is used at the RPC layer to request a listing of policies
type ACLPolicyListRequest struct {
DCScope string
Datacenter string // The datacenter to perform the request within
QueryOptions
}
@ -658,15 +657,15 @@ type ACLPolicyListResponse struct {
QueryMeta
}
// ACLPolicyBatchReadRequest is used at the RPC layer to request a subset of
// ACLPolicyBatchGetRequest is used at the RPC layer to request a subset of
// the policies associated with the token used for retrieval
type ACLPolicyBatchReadRequest struct {
type ACLPolicyBatchGetRequest struct {
PolicyIDs []string // List of policy ids to fetch
Datacenter string // The datacenter to perform the request within
QueryOptions
}
func (r *ACLPolicyBatchReadRequest) RequestDatacenter() string {
func (r *ACLPolicyBatchGetRequest) RequestDatacenter() string {
return r.Datacenter
}
@ -676,16 +675,16 @@ type ACLPolicyResponse struct {
QueryMeta
}
type ACLPoliciesResponse struct {
type ACLPolicyBatchResponse struct {
Policies []*ACLPolicy
QueryMeta
}
// ACLPolicyBatchUpsertRequest is used at the Raft layer for batching
// ACLPolicyBatchSetRequest is used at the Raft layer for batching
// multiple policy creations and updates
//
// This is particularly useful during replication
type ACLPolicyBatchUpsertRequest struct {
type ACLPolicyBatchSetRequest struct {
Policies ACLPolicies
}

View File

@ -48,9 +48,9 @@ const (
ConnectCAProviderStateType = 14
ConnectCAConfigType = 15 // FSM snapshots only.
IndexRequestType = 16 // FSM snapshots only.
ACLTokenUpsertRequestType = 17
ACLTokenSetRequestType = 17
ACLTokenDeleteRequestType = 18
ACLPolicyUpsertRequestType = 19
ACLPolicySetRequestType = 19
ACLPolicyDeleteRequestType = 20
)

View File

@ -56,9 +56,9 @@ func TestStructs_Implements(t *testing.T) {
_ RPCInfo = &SessionSpecificRequest{}
_ RPCInfo = &EventFireRequest{}
_ RPCInfo = &ACLPolicyResolveLegacyRequest{}
_ RPCInfo = &ACLPolicyBatchReadRequest{}
_ RPCInfo = &ACLPolicyReadRequest{}
_ RPCInfo = &ACLTokenReadRequest{}
_ RPCInfo = &ACLPolicyBatchGetRequest{}
_ RPCInfo = &ACLPolicyGetRequest{}
_ RPCInfo = &ACLTokenGetRequest{}
_ RPCInfo = &KeyringRequest{}
_ CompoundResponse = &KeyringResponses{}
)