diff --git a/command/acl_role_delete_test.go b/command/acl_role_delete_test.go index 0afe31494..217c06654 100644 --- a/command/acl_role_delete_test.go +++ b/command/acl_role_delete_test.go @@ -68,7 +68,7 @@ func TestACLRoleDeleteCommand_Run(t *testing.T) { Policies: []*structs.ACLRolePolicyLink{{Name: aclPolicy.Name}}, } err = srv.Agent.Server().State().UpsertACLRoles( - structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole}) + structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole}, false) require.NoError(t, err) // Delete the existing ACL role. diff --git a/command/acl_role_info_test.go b/command/acl_role_info_test.go index 0dfac81e8..4f4cc5db5 100644 --- a/command/acl_role_info_test.go +++ b/command/acl_role_info_test.go @@ -68,7 +68,7 @@ func TestACLRoleInfoCommand_Run(t *testing.T) { Policies: []*structs.ACLRolePolicyLink{{Name: aclPolicy.Name}}, } err = srv.Agent.Server().State().UpsertACLRoles( - structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole}) + structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole}, false) require.NoError(t, err) // Look up the ACL role using its ID. diff --git a/command/acl_role_list_test.go b/command/acl_role_list_test.go index 9c529f591..7c7855edf 100644 --- a/command/acl_role_list_test.go +++ b/command/acl_role_list_test.go @@ -60,7 +60,7 @@ func TestACLRoleListCommand_Run(t *testing.T) { Policies: []*structs.ACLRolePolicyLink{{Name: aclPolicy.Name}}, } err = srv.Agent.Server().State().UpsertACLRoles( - structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole}) + structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole}, false) require.NoError(t, err) // Perform a listing to get the created role. diff --git a/command/acl_role_update_test.go b/command/acl_role_update_test.go index 9c02d3696..3e0449bc8 100644 --- a/command/acl_role_update_test.go +++ b/command/acl_role_update_test.go @@ -71,7 +71,7 @@ func TestACLRoleUpdateCommand_Run(t *testing.T) { } err = srv.Agent.Server().State().UpsertACLRoles( - structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole}) + structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole}, false) require.NoError(t, err) // Try a merge update without setting any parameters to update. diff --git a/command/agent/acl_endpoint_test.go b/command/agent/acl_endpoint_test.go index 25350fb0f..00f61c1ed 100644 --- a/command/agent/acl_endpoint_test.go +++ b/command/agent/acl_endpoint_test.go @@ -636,7 +636,7 @@ func TestHTTPServer_ACLRoleListRequest(t *testing.T) { // Create two ACL roles and put these directly into state. aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} - require.NoError(t, srv.server.State().UpsertACLRoles(structs.MsgTypeTestSetup, 20, aclRoles)) + require.NoError(t, srv.server.State().UpsertACLRoles(structs.MsgTypeTestSetup, 20, aclRoles, false)) // Build the HTTP request. req, err := http.NewRequest(http.MethodGet, "/v1/acl/roles", nil) @@ -669,7 +669,7 @@ func TestHTTPServer_ACLRoleListRequest(t *testing.T) { // using a custom prefix. aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} aclRoles[1].ID = "badger-badger-badger-" + uuid.Generate() - require.NoError(t, srv.server.State().UpsertACLRoles(structs.MsgTypeTestSetup, 20, aclRoles)) + require.NoError(t, srv.server.State().UpsertACLRoles(structs.MsgTypeTestSetup, 20, aclRoles, false)) // Build the HTTP request. req, err := http.NewRequest(http.MethodGet, "/v1/acl/roles?prefix=badger-badger-badger", nil) @@ -901,7 +901,7 @@ func TestHTTPServer_ACLRoleSpecificRequest(t *testing.T) { // Create a mock role and put directly into state. mockACLRole := mock.ACLRole() require.NoError(t, srv.server.State().UpsertACLRoles( - structs.MsgTypeTestSetup, 20, []*structs.ACLRole{mockACLRole})) + structs.MsgTypeTestSetup, 20, []*structs.ACLRole{mockACLRole}, false)) url := fmt.Sprintf("/v1/acl/role/name/%s", mockACLRole.Name) @@ -935,7 +935,7 @@ func TestHTTPServer_ACLRoleSpecificRequest(t *testing.T) { // Create a mock role and put directly into state. mockACLRole := mock.ACLRole() require.NoError(t, srv.server.State().UpsertACLRoles( - structs.MsgTypeTestSetup, 20, []*structs.ACLRole{mockACLRole})) + structs.MsgTypeTestSetup, 20, []*structs.ACLRole{mockACLRole}, false)) url := fmt.Sprintf("/v1/acl/role/%s", mockACLRole.ID) diff --git a/nomad/acl_endpoint.go b/nomad/acl_endpoint.go index d4161dd30..3de416ee7 100644 --- a/nomad/acl_endpoint.go +++ b/nomad/acl_endpoint.go @@ -1128,15 +1128,18 @@ func (a *ACL) UpsertRoles( // as ensure the policies exist within state. for _, policyLink := range role.Policies { - // Perform a state look up for the policy. An error or not being - // able to find the policy is terminal. We can include the name in - // the error message as it has previously been validated. - existing, err := stateSnapshot.ACLPolicyByName(nil, policyLink.Name) - if err != nil { - return structs.NewErrRPCCodedf(http.StatusInternalServerError, "policy lookup failed: %v", err) - } - if existing == nil { - return structs.NewErrRPCCodedf(http.StatusBadRequest, "cannot find policy %s", policyLink.Name) + // If the RPC does not allow for missing policies, perform a state + // look up for the policy. An error or not being able to find the + // policy is terminal. We can include the name in the error message + // as it has previously been validated. + if !args.AllowMissingPolicies { + existing, err := stateSnapshot.ACLPolicyByName(nil, policyLink.Name) + if err != nil { + return structs.NewErrRPCCodedf(http.StatusInternalServerError, "policy lookup failed: %v", err) + } + if existing == nil { + return structs.NewErrRPCCodedf(http.StatusBadRequest, "cannot find policy %s", policyLink.Name) + } } // If the policy name is not found within our map, this means we @@ -1274,6 +1277,12 @@ func (a *ACL) ListRoles( queryMeta: &reply.QueryMeta, run: func(ws memdb.WatchSet, stateStore *state.StateStore) error { + // The iteration below appends directly to the reply object, so in + // order for blocking queries to work properly we must ensure the + // ACLRoles are reset. This allows the blocking query run function + // to work as expected. + reply.ACLRoles = nil + var ( err error iter memdb.ResultIterator @@ -1305,6 +1314,59 @@ func (a *ACL) ListRoles( }) } +// GetRolesByID is used to get a set of ACL Roles as defined by their ID. This +// endpoint is used by the replication process and uses a specific response in +// order to make that process easier. +func (a *ACL) GetRolesByID(args *structs.ACLRolesByIDRequest, reply *structs.ACLRolesByIDResponse) error { + + // This endpoint is only used by the replication process which is only + // running on ACL enabled clusters, so this check should never be + // triggered. + if !a.srv.config.ACLEnabled { + return aclDisabled + } + + if done, err := a.srv.forward(structs.ACLGetRolesByIDRPCMethod, args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "acl", "get_roles_id"}, time.Now()) + + // Check that the caller has a management token and that ACLs are enabled + // properly. + if acl, err := a.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if acl == nil || !acl.IsManagement() { + return structs.ErrPermissionDenied + } + + // Set up and return the blocking query + return a.srv.blockingRPC(&blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, stateStore *state.StateStore) error { + + // Instantiate the output map to the correct maximum length. + reply.ACLRoles = make(map[string]*structs.ACLRole, len(args.ACLRoleIDs)) + + // Look for the ACL role and add this to our mapping if we have + // found it. + for _, roleID := range args.ACLRoleIDs { + out, err := stateStore.GetACLRoleByID(ws, roleID) + if err != nil { + return err + } + if out != nil { + reply.ACLRoles[out.ID] = out + } + } + + // Use the index table to populate the query meta as we have no way + // of tracking the max index on deletes. + return a.srv.setReplyQueryMeta(stateStore, state.TableACLRoles, &reply.QueryMeta) + }, + }) +} + // GetRoleByID is used to look up an individual ACL role using its ID. func (a *ACL) GetRoleByID( args *structs.ACLRoleByIDRequest, diff --git a/nomad/acl_endpoint_test.go b/nomad/acl_endpoint_test.go index dfff2caff..1e01eb1f2 100644 --- a/nomad/acl_endpoint_test.go +++ b/nomad/acl_endpoint_test.go @@ -1657,7 +1657,7 @@ func TestACLEndpoint_UpsertTokens(t *testing.T) { aclRole1.Policies = []*structs.ACLRolePolicyLink{{Name: policy1.Name}} require.NoError(t, testServer.fsm.State().UpsertACLRoles( - structs.MsgTypeTestSetup, 20, []*structs.ACLRole{aclRole1})) + structs.MsgTypeTestSetup, 20, []*structs.ACLRole{aclRole1}, false)) // Create a token which references the created ACL role. This // role reference is duplicated to ensure the handler @@ -1710,7 +1710,7 @@ func TestACLEndpoint_UpsertTokens(t *testing.T) { aclRole1.Policies = []*structs.ACLRolePolicyLink{{Name: policy1.Name}} require.NoError(t, testServer.fsm.State().UpsertACLRoles( - structs.MsgTypeTestSetup, 20, []*structs.ACLRole{aclRole1})) + structs.MsgTypeTestSetup, 20, []*structs.ACLRole{aclRole1}, false)) // Create an ACL token with both ACL role and policy links. tokenReq1 := &structs.ACLTokenUpsertRequest{ @@ -2019,7 +2019,7 @@ func TestACL_DeleteRolesByID(t *testing.T) { // Create two ACL roles and put these directly into state. aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} - require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles)) + require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles, false)) // Attempt to delete an ACL role without setting an auth token. This should // fail. @@ -2094,7 +2094,7 @@ func TestACL_ListRoles(t *testing.T) { aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} aclRoles[0].ID = "prefix-" + uuid.Generate() aclRoles[1].ID = "prefix-" + uuid.Generate() - require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles)) + require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles, false)) // Try listing roles without a valid ACL token. aclRoleReq1 := &structs.ACLRolesListRequest{ @@ -2145,6 +2145,133 @@ func TestACL_ListRoles(t *testing.T) { err = msgpackrpc.CallWithCodec(codec, structs.ACLListRolesRPCMethod, aclRoleReq4, &aclRoleResp4) require.NoError(t, err) require.Len(t, aclRoleResp4.ACLRoles, 2) + + // Now test a blocking query, where we wait for an update to the list which + // is triggered by a deletion. + type res struct { + err error + reply *structs.ACLRolesListResponse + } + resultCh := make(chan *res) + + go func(resultCh chan *res) { + aclRoleReq5 := &structs.ACLRolesListRequest{ + QueryOptions: structs.QueryOptions{ + Region: DefaultRegion, + AuthToken: aclRootToken.SecretID, + MinQueryIndex: aclRoleResp4.Index, + MaxQueryTime: 10 * time.Second, + }, + } + var aclRoleResp5 structs.ACLRolesListResponse + err = msgpackrpc.CallWithCodec(codec, structs.ACLListRolesRPCMethod, aclRoleReq5, &aclRoleResp5) + resultCh <- &res{err: err, reply: &aclRoleResp5} + }(resultCh) + + // Delete an ACL role from state which should return the blocking query. + require.NoError(t, testServer.fsm.State().DeleteACLRolesByID( + structs.MsgTypeTestSetup, aclRoleResp4.Index+10, []string{aclRoles[0].ID})) + + // Wait until the test within the routine is complete. + result := <-resultCh + require.NoError(t, result.err) + require.Len(t, result.reply.ACLRoles, 1) + require.NotEqual(t, result.reply.ACLRoles[0].ID, aclRoles[0].ID) +} + +func TestACL_GetRolesByID(t *testing.T) { + ci.Parallel(t) + + testServer, aclRootToken, testServerCleanupFn := TestACLServer(t, nil) + defer testServerCleanupFn() + codec := rpcClient(t, testServer) + testutil.WaitForLeader(t, testServer.RPC) + + // Try reading a role without setting a correct auth token. + aclRoleReq1 := &structs.ACLRolesByIDRequest{ + QueryOptions: structs.QueryOptions{ + Region: DefaultRegion, + }, + } + var aclRoleResp1 structs.ACLRolesByIDResponse + err := msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq1, &aclRoleResp1) + require.ErrorContains(t, err, "Permission denied") + require.Empty(t, aclRoleResp1.ACLRoles) + + // Try reading a role that doesn't exist. + aclRoleReq2 := &structs.ACLRolesByIDRequest{ + ACLRoleIDs: []string{"nope"}, + QueryOptions: structs.QueryOptions{ + Region: DefaultRegion, + AuthToken: aclRootToken.SecretID, + }, + } + var aclRoleResp2 structs.ACLRolesByIDResponse + err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq2, &aclRoleResp2) + require.NoError(t, err) + require.Empty(t, aclRoleResp2.ACLRoles) + + // Create the policies our ACL roles wants to link to. + policy1 := mock.ACLPolicy() + policy1.Name = "mocked-test-policy-1" + policy2 := mock.ACLPolicy() + policy2.Name = "mocked-test-policy-2" + + require.NoError(t, testServer.fsm.State().UpsertACLPolicies( + structs.MsgTypeTestSetup, 10, []*structs.ACLPolicy{policy1, policy2})) + + // Create two ACL roles and put these directly into state. + aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} + require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 20, aclRoles, false)) + + // Try reading both roles that are within state. + aclRoleReq3 := &structs.ACLRolesByIDRequest{ + ACLRoleIDs: []string{aclRoles[0].ID, aclRoles[1].ID}, + QueryOptions: structs.QueryOptions{ + Region: DefaultRegion, + AuthToken: aclRootToken.SecretID, + }, + } + var aclRoleResp3 structs.ACLRolesByIDResponse + err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq3, &aclRoleResp3) + require.NoError(t, err) + require.Len(t, aclRoleResp3.ACLRoles, 2) + require.Contains(t, aclRoleResp3.ACLRoles, aclRoles[0].ID) + require.Contains(t, aclRoleResp3.ACLRoles, aclRoles[1].ID) + + // Now test a blocking query, where we wait for an update to the set which + // is triggered by a deletion. + type res struct { + err error + reply *structs.ACLRolesByIDResponse + } + resultCh := make(chan *res) + + go func(resultCh chan *res) { + aclRoleReq4 := &structs.ACLRolesByIDRequest{ + ACLRoleIDs: []string{aclRoles[0].ID, aclRoles[1].ID}, + QueryOptions: structs.QueryOptions{ + Region: DefaultRegion, + AuthToken: aclRootToken.SecretID, + MinQueryIndex: aclRoleResp3.Index, + MaxQueryTime: 10 * time.Second, + }, + } + var aclRoleResp4 structs.ACLRolesByIDResponse + err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq4, &aclRoleResp4) + resultCh <- &res{err: err, reply: &aclRoleResp4} + }(resultCh) + + // Delete an ACL role from state which should return the blocking query. + require.NoError(t, testServer.fsm.State().DeleteACLRolesByID( + structs.MsgTypeTestSetup, aclRoleResp3.Index+10, []string{aclRoles[0].ID})) + + // Wait for the result and then test it. + result := <-resultCh + require.NoError(t, result.err) + require.Len(t, result.reply.ACLRoles, 1) + _, ok := result.reply.ACLRoles[aclRoles[1].ID] + require.True(t, ok) } func TestACL_GetRoleByID(t *testing.T) { @@ -2166,7 +2293,7 @@ func TestACL_GetRoleByID(t *testing.T) { // Create two ACL roles and put these directly into state. aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} - require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles)) + require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles, false)) // Try reading a role without setting a correct auth token. aclRoleReq1 := &structs.ACLRoleByIDRequest{ @@ -2236,7 +2363,7 @@ func TestACL_GetRoleByName(t *testing.T) { // Create two ACL roles and put these directly into state. aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} - require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles)) + require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles, false)) // Try reading a role without setting a correct auth token. aclRoleReq1 := &structs.ACLRoleByNameRequest{ diff --git a/nomad/acl_test.go b/nomad/acl_test.go index acd5455db..4be4e1f4b 100644 --- a/nomad/acl_test.go +++ b/nomad/acl_test.go @@ -195,7 +195,7 @@ func TestResolveACLToken(t *testing.T) { {Name: policy2.Name}, } err = testServer.State().UpsertACLRoles( - structs.MsgTypeTestSetup, 30, []*structs.ACLRole{aclRole}) + structs.MsgTypeTestSetup, 30, []*structs.ACLRole{aclRole}, false) require.NoError(t, err) clientToken := mock.ACLToken() @@ -221,7 +221,7 @@ func TestResolveACLToken(t *testing.T) { // permissions are updated. aclRole.Policies = []*structs.ACLRolePolicyLink{} err = testServer.State().UpsertACLRoles( - structs.MsgTypeTestSetup, 40, []*structs.ACLRole{aclRole}) + structs.MsgTypeTestSetup, 40, []*structs.ACLRole{aclRole}, false) require.NoError(t, err) aclResp, err = testServer.ResolveToken(clientToken.SecretID) @@ -265,7 +265,7 @@ func TestResolveACLToken(t *testing.T) { aclRole := mock.ACLRole() aclRole.Policies = []*structs.ACLRolePolicyLink{{Name: policy2.Name}} err = testServer.State().UpsertACLRoles( - structs.MsgTypeTestSetup, 20, []*structs.ACLRole{aclRole}) + structs.MsgTypeTestSetup, 20, []*structs.ACLRole{aclRole}, false) require.NoError(t, err) // Create a token which references the policy and role. diff --git a/nomad/fsm.go b/nomad/fsm.go index abfca1ce7..0b9232231 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -2036,7 +2036,7 @@ func (n *nomadFSM) applyACLRolesUpsert(msgType structs.MessageType, buf []byte, panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.UpsertACLRoles(msgType, index, req.ACLRoles); err != nil { + if err := n.state.UpsertACLRoles(msgType, index, req.ACLRoles, req.AllowMissingPolicies); err != nil { n.logger.Error("UpsertACLRoles failed", "error", err) return err } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 3f18cad57..ff858c2d5 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2911,7 +2911,7 @@ func TestFSM_SnapshotRestore_ACLRoles(t *testing.T) { // Generate and upsert some ACL roles. aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} - require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles)) + require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles, false)) // Perform a snapshot restore. restoredFSM := testSnapshotRestore(t, fsm) @@ -3497,7 +3497,7 @@ func TestFSM_ApplyACLRolesDeleteByID(t *testing.T) { // Generate and upsert two ACL roles. aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} - require.NoError(t, fsm.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles)) + require.NoError(t, fsm.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles, false)) // Build and apply our message. req := structs.ACLRolesDeleteByIDRequest{ACLRoleIDs: []string{aclRoles[0].ID, aclRoles[1].ID}} diff --git a/nomad/leader.go b/nomad/leader.go index 756f1d025..503981d66 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -395,6 +395,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { default: go s.replicateACLPolicies(stopCh) go s.replicateACLTokens(stopCh) + go s.replicateACLRoles(stopCh) go s.replicateNamespaces(stopCh) } } @@ -1689,6 +1690,229 @@ func diffACLTokens(store *state.StateStore, minIndex uint64, remoteList []*struc return } +// replicateACLRoles is used to replicate ACL Roles from the authoritative +// region to this region. The loop should only be run on the leader within the +// federated region. +func (s *Server) replicateACLRoles(stopCh chan struct{}) { + + // Generate our request object. We only need to do this once and reuse it + // for every RPC request. The MinQueryIndex is updated after every + // successful replication loop, so the next query acts as a blocking query + // and only returns upon a change in the authoritative region. + req := structs.ACLRolesListRequest{ + QueryOptions: structs.QueryOptions{ + AllowStale: true, + Region: s.config.AuthoritativeRegion, + }, + } + + // Create our replication rate limiter for ACL roles and log a lovely + // message to indicate the process is starting. + limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit)) + s.logger.Debug("starting ACL Role replication from authoritative region", + "authoritative_region", req.Region) + + // Enter the main ACL Role replication loop that will only exit when the + // stopCh is closed. + // + // Any error encountered will use the replicationBackoffContinue function + // which handles replication backoff and shutdown coordination in the event + // of an error inside the loop. + for { + select { + case <-stopCh: + return + default: + + // Rate limit how often we attempt replication. It is OK to ignore + // the error as the context will never be cancelled and the limit + // parameters are controlled internally. + _ = limiter.Wait(context.Background()) + + // Set the replication token on each replication iteration so that + // it is always current and can handle agent SIGHUP reloads. + req.AuthToken = s.ReplicationToken() + + var resp structs.ACLRolesListResponse + + // Make the list RPC request to the authoritative region, so we + // capture the latest ACL role listing. + err := s.forwardRegion(s.config.AuthoritativeRegion, structs.ACLListRolesRPCMethod, &req, &resp) + if err != nil { + s.logger.Error("failed to fetch ACL Roles from authoritative region", "error", err) + if s.replicationBackoffContinue(stopCh) { + continue + } else { + return + } + } + + // Perform a two-way diff on the ACL roles. + toDelete, toUpdate := diffACLRoles(s.State(), req.MinQueryIndex, resp.ACLRoles) + + // A significant amount of time could pass between the last check + // on whether we should stop the replication process. Therefore, do + // a check here, before calling Raft. + select { + case <-stopCh: + return + default: + } + + // If we have ACL roles to delete, make this call directly to Raft. + if len(toDelete) > 0 { + args := structs.ACLRolesDeleteByIDRequest{ACLRoleIDs: toDelete} + _, _, err := s.raftApply(structs.ACLRolesDeleteByIDRequestType, &args) + + // If the error was because we lost leadership while calling + // Raft, avoid logging as this can be confusing to operators. + if err != nil { + if err != raft.ErrLeadershipLost { + s.logger.Error("failed to delete ACL roles", "error", err) + } + if s.replicationBackoffContinue(stopCh) { + continue + } else { + return + } + } + } + + // Fetch any outdated policies. + var fetched []*structs.ACLRole + if len(toUpdate) > 0 { + req := structs.ACLRolesByIDRequest{ + ACLRoleIDs: toUpdate, + QueryOptions: structs.QueryOptions{ + Region: s.config.AuthoritativeRegion, + AuthToken: s.ReplicationToken(), + AllowStale: true, + MinQueryIndex: resp.Index - 1, + }, + } + var reply structs.ACLRolesByIDResponse + if err := s.forwardRegion(s.config.AuthoritativeRegion, structs.ACLGetRolesByIDRPCMethod, &req, &reply); err != nil { + s.logger.Error("failed to fetch ACL Roles from authoritative region", "error", err) + if s.replicationBackoffContinue(stopCh) { + continue + } else { + return + } + } + for _, aclRole := range reply.ACLRoles { + fetched = append(fetched, aclRole) + } + } + + // Update local tokens + if len(fetched) > 0 { + + // The replication of ACL roles and policies are independent, + // therefore we cannot ensure the policies linked within the + // role are present. We must set allow missing to true. + args := structs.ACLRolesUpsertRequest{ + ACLRoles: fetched, + AllowMissingPolicies: true, + } + + // Perform the upsert directly via Raft. + _, _, err := s.raftApply(structs.ACLRolesUpsertRequestType, &args) + if err != nil { + s.logger.Error("failed to update ACL roles", "error", err) + if s.replicationBackoffContinue(stopCh) { + continue + } else { + return + } + } + } + + // Update the minimum query index, blocks until there is a change. + req.MinQueryIndex = resp.Index + } + } +} + +// replicationBackoffContinue should be used when a replication loop encounters +// an error and wants to wait until either the backoff time has been met, or +// the stopCh has been closed. The boolean indicates whether the replication +// process should continue. +// +// Typical use: +// +// if s.replicationBackoffContinue(stopCh) { +// continue +// } else { +// return +// } +func (s *Server) replicationBackoffContinue(stopCh chan struct{}) bool { + + timer, timerStopFn := helper.NewSafeTimer(s.config.ReplicationBackoff) + defer timerStopFn() + + select { + case <-timer.C: + return true + case <-stopCh: + return false + } +} + +// diffACLRoles is used to perform a two-way diff between the local ACL Roles +// and the remote Roles to determine which tokens need to be deleted or +// updated. The returned array's contain ACL Role IDs. +func diffACLRoles( + store *state.StateStore, minIndex uint64, remoteList []*structs.ACLRole) ( + delete []string, update []string) { + + // The local ACL role tracking is keyed by the role ID and the value is the + // hash of the role. + local := make(map[string][]byte) + + // The remote ACL role tracking is keyed by the role ID; the value is an + // empty struct as we already have the full object. + remote := make(map[string]struct{}) + + // Read all the ACL role currently held within our local state. This panic + // will only happen as a developer making a mistake with naming the index + // to use. + iter, err := store.GetACLRoles(nil) + if err != nil { + panic(fmt.Sprintf("failed to iterate local ACL roles: %v", err)) + } + + // Iterate the local ACL roles and add them to our tracking of local roles. + for raw := iter.Next(); raw != nil; raw = iter.Next() { + aclRole := raw.(*structs.ACLRole) + local[aclRole.ID] = aclRole.Hash + } + + // Iterate over the remote ACL roles. + for _, remoteACLRole := range remoteList { + remote[remoteACLRole.ID] = struct{}{} + + // Identify whether the ACL role is within the local state. If it is + // not, add this to our update list. + if localHash, ok := local[remoteACLRole.ID]; !ok { + update = append(update, remoteACLRole.ID) + + // Check if ACL role is newer remotely and there is a hash + // mismatch. + } else if remoteACLRole.ModifyIndex > minIndex && !bytes.Equal(localHash, remoteACLRole.Hash) { + update = append(update, remoteACLRole.ID) + } + } + + // If we have ACL roles within state which are no longer present in the + // authoritative region we should delete them. + for localACLRole := range local { + if _, ok := remote[localACLRole]; !ok { + delete = append(delete, localACLRole) + } + } + return +} + // getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig { state := s.fsm.State() @@ -1807,10 +2031,10 @@ func (s *Server) generateClusterID() (string, error) { // // The function checks the server is the leader and uses a mutex to avoid any // potential timings problems. Consider the following timings: -// - operator updates the configuration via the API -// - the RPC handler applies the change via Raft -// - leadership transitions with write barrier -// - the RPC handler call this function to enact the change +// - operator updates the configuration via the API +// - the RPC handler applies the change via Raft +// - leadership transitions with write barrier +// - the RPC handler call this function to enact the change // // The mutex also protects against a situation where leadership is revoked // while this function is being called. Ensuring the correct series of actions diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 2bd1c1070..6cf117ba4 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -1026,6 +1026,113 @@ func TestLeader_DiffACLTokens(t *testing.T) { assert.Equal(t, []string{p3.AccessorID, p4.AccessorID}, update) } +func TestServer_replicationBackoffContinue(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + name string + testFn func() + }{ + { + name: "leadership lost", + testFn: func() { + + // Create a test server with a long enough backoff that we will + // be able to close the channel before it fires, but not too + // long that the test having problems means CI will hang + // forever. + testServer, testServerCleanup := TestServer(t, func(c *Config) { + c.ReplicationBackoff = 5 * time.Second + }) + defer testServerCleanup() + + // Create our stop channel which is used by the server to + // indicate leadership loss. + stopCh := make(chan struct{}) + + // The resultCh is used to block and collect the output from + // the test routine. + resultCh := make(chan bool, 1) + + // Run a routine to collect the result and close the channel + // straight away. + go func() { + output := testServer.replicationBackoffContinue(stopCh) + resultCh <- output + }() + + close(stopCh) + + actualResult := <-resultCh + require.False(t, actualResult) + }, + }, + { + name: "backoff continue", + testFn: func() { + + // Create a test server with a short backoff. + testServer, testServerCleanup := TestServer(t, func(c *Config) { + c.ReplicationBackoff = 10 * time.Nanosecond + }) + defer testServerCleanup() + + // Create our stop channel which is used by the server to + // indicate leadership loss. + stopCh := make(chan struct{}) + + // The resultCh is used to block and collect the output from + // the test routine. + resultCh := make(chan bool, 1) + + // Run a routine to collect the result without closing stopCh. + go func() { + output := testServer.replicationBackoffContinue(stopCh) + resultCh <- output + }() + + actualResult := <-resultCh + require.True(t, actualResult) + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tc.testFn() + }) + } +} + +func Test_diffACLRoles(t *testing.T) { + ci.Parallel(t) + + stateStore := state.TestStateStore(t) + + // Build an initial baseline of ACL Roles. + aclRole0 := mock.ACLRole() + aclRole1 := mock.ACLRole() + aclRole2 := mock.ACLRole() + aclRole3 := mock.ACLRole() + + // Upsert these into our local state. Use copies, so we can alter the roles + // directly and use within the diff func. + err := stateStore.UpsertACLRoles(structs.MsgTypeTestSetup, 50, + []*structs.ACLRole{aclRole0.Copy(), aclRole1.Copy(), aclRole2.Copy(), aclRole3.Copy()}, true) + require.NoError(t, err) + + // Modify the ACL roles to create a number of differences. These roles + // represent the state of the authoritative region. + aclRole2.ModifyIndex = 50 + aclRole3.ModifyIndex = 200 + aclRole3.Hash = []byte{0, 1, 2, 3} + aclRole4 := mock.ACLRole() + + // Run the diff function and test the output. + toDelete, toUpdate := diffACLRoles(stateStore, 50, []*structs.ACLRole{aclRole2, aclRole3, aclRole4}) + require.ElementsMatch(t, []string{aclRole0.ID, aclRole1.ID}, toDelete) + require.ElementsMatch(t, []string{aclRole3.ID, aclRole4.ID}, toUpdate) +} + func TestLeader_UpgradeRaftVersion(t *testing.T) { ci.Parallel(t) diff --git a/nomad/state/state_store_acl.go b/nomad/state/state_store_acl.go index 1f9f273ca..faaa85ff5 100644 --- a/nomad/state/state_store_acl.go +++ b/nomad/state/state_store_acl.go @@ -39,7 +39,7 @@ func expiresIndexName(global bool) string { // It uses a single write transaction for efficiency, however, any error means // no entries will be committed. func (s *StateStore) UpsertACLRoles( - msgType structs.MessageType, index uint64, roles []*structs.ACLRole) error { + msgType structs.MessageType, index uint64, roles []*structs.ACLRole, allowMissingPolicies bool) error { // Grab a write transaction. txn := s.db.WriteTxnMsgT(msgType, index) @@ -53,7 +53,7 @@ func (s *StateStore) UpsertACLRoles( // fail via the txn.Abort() defer. for _, role := range roles { - roleUpdated, err := s.upsertACLRoleTxn(index, txn, role) + roleUpdated, err := s.upsertACLRoleTxn(index, txn, role, allowMissingPolicies) if err != nil { return err } @@ -79,7 +79,7 @@ func (s *StateStore) UpsertACLRoles( // provided write transaction. It is the responsibility of the caller to update // the index table. func (s *StateStore) upsertACLRoleTxn( - index uint64, txn *txn, role *structs.ACLRole) (bool, error) { + index uint64, txn *txn, role *structs.ACLRole, allowMissingPolicies bool) (bool, error) { // Ensure the role hash is not zero to provide defense in depth. This // should be done outside the state store, so we do not spend time here @@ -92,8 +92,10 @@ func (s *StateStore) upsertACLRoleTxn( // could mean that by the time the state call is invoked, another Raft // update has deleted policies detailed in role. Therefore, check again // while in our write txn. - if err := s.validateACLRolePolicyLinksTxn(txn, role); err != nil { - return false, err + if !allowMissingPolicies { + if err := s.validateACLRolePolicyLinksTxn(txn, role); err != nil { + return false, err + } } existing, err := txn.First(TableACLRoles, indexID, role.ID) diff --git a/nomad/state/state_store_acl_test.go b/nomad/state/state_store_acl_test.go index 16e375579..8458e88e7 100644 --- a/nomad/state/state_store_acl_test.go +++ b/nomad/state/state_store_acl_test.go @@ -159,7 +159,7 @@ func TestStateStore_UpsertACLRoles(t *testing.T) { // straight into state. It should fail because the ACL policies do not // exist. mockedACLRoles := []*structs.ACLRole{mock.ACLRole()} - err := testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, mockedACLRoles) + err := testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, mockedACLRoles, false) require.ErrorContains(t, err, "policy not found") // Create the policies our ACL roles wants to link to and then try the @@ -171,7 +171,7 @@ func TestStateStore_UpsertACLRoles(t *testing.T) { require.NoError(t, testState.UpsertACLPolicies( structs.MsgTypeTestSetup, 10, []*structs.ACLPolicy{policy1, policy2})) - require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 20, mockedACLRoles)) + require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 20, mockedACLRoles, false)) // Check that the index for the table was modified as expected. initialIndex, err := testState.Index(TableACLRoles) @@ -200,7 +200,7 @@ func TestStateStore_UpsertACLRoles(t *testing.T) { // Try writing the same ACL roles to state which should not result in an // update to the table index. - require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 30, mockedACLRoles)) + require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 30, mockedACLRoles, false)) reInsertActualIndex, err := testState.Index(TableACLRoles) require.NoError(t, err) must.Eq(t, 20, reInsertActualIndex) @@ -211,7 +211,7 @@ func TestStateStore_UpsertACLRoles(t *testing.T) { updatedMockedACLRole.Policies = []*structs.ACLRolePolicyLink{{Name: "mocked-test-policy-1"}} updatedMockedACLRole.SetHash() require.NoError(t, testState.UpsertACLRoles( - structs.MsgTypeTestSetup, 30, []*structs.ACLRole{updatedMockedACLRole})) + structs.MsgTypeTestSetup, 30, []*structs.ACLRole{updatedMockedACLRole}, false)) // Check that the index for the table was modified as expected. updatedIndex, err := testState.Index(TableACLRoles) @@ -235,6 +235,17 @@ func TestStateStore_UpsertACLRoles(t *testing.T) { must.Eq(t, 30, aclRole.ModifyIndex) } require.Equal(t, 1, count, "incorrect number of ACL roles found") + + // Now try inserting an ACL role using the missing policies' argument to + // simulate replication. + replicatedACLRole := mock.ACLRole() + replicatedACLRole.Policies = []*structs.ACLRolePolicyLink{{Name: "nope"}} + require.NoError(t, testState.UpsertACLRoles( + structs.MsgTypeTestSetup, 40, []*structs.ACLRole{replicatedACLRole}, true)) + + replicatedACLRoleResp, err := testState.GetACLRoleByName(ws, replicatedACLRole.Name) + require.NoError(t, err) + must.Eq(t, replicatedACLRole.Hash, replicatedACLRoleResp.Hash) } func TestStateStore_ValidateACLRolePolicyLinks(t *testing.T) { @@ -245,7 +256,7 @@ func TestStateStore_ValidateACLRolePolicyLinks(t *testing.T) { mockedACLRoles := []*structs.ACLRole{mock.ACLRole()} // This should error as no policies exist within state. - err := testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, mockedACLRoles) + err := testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, mockedACLRoles, false) require.ErrorContains(t, err, "ACL policy not found") // Upsert one ACL policy and retry the role which should still fail. @@ -253,7 +264,7 @@ func TestStateStore_ValidateACLRolePolicyLinks(t *testing.T) { policy1.Name = "mocked-test-policy-1" require.NoError(t, testState.UpsertACLPolicies(structs.MsgTypeTestSetup, 10, []*structs.ACLPolicy{policy1})) - err = testState.UpsertACLRoles(structs.MsgTypeTestSetup, 20, mockedACLRoles) + err = testState.UpsertACLRoles(structs.MsgTypeTestSetup, 20, mockedACLRoles, false) require.ErrorContains(t, err, "ACL policy not found") // Upsert the second ACL policy. The ACL role should now upsert into state @@ -262,7 +273,7 @@ func TestStateStore_ValidateACLRolePolicyLinks(t *testing.T) { policy2.Name = "mocked-test-policy-2" require.NoError(t, testState.UpsertACLPolicies(structs.MsgTypeTestSetup, 20, []*structs.ACLPolicy{policy2})) - require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 30, mockedACLRoles)) + require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 30, mockedACLRoles, false)) } func TestStateStore_DeleteACLRolesByID(t *testing.T) { @@ -281,7 +292,7 @@ func TestStateStore_DeleteACLRolesByID(t *testing.T) { // Generate a some mocked ACL roles for testing and upsert these straight // into state. mockedACLRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} - require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, mockedACLRoles)) + require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, mockedACLRoles, false)) // Try and delete a role using a name that doesn't exist. This should // return an error and not change the index for the table. @@ -353,7 +364,7 @@ func TestStateStore_GetACLRoles(t *testing.T) { // Generate a some mocked ACL roles for testing and upsert these straight // into state. mockedACLRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} - require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, mockedACLRoles)) + require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, mockedACLRoles, false)) // List the ACL roles and ensure they are exactly as we expect. ws := memdb.NewWatchSet() @@ -391,7 +402,7 @@ func TestStateStore_GetACLRoleByID(t *testing.T) { // Generate a some mocked ACL roles for testing and upsert these straight // into state. mockedACLRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} - require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, mockedACLRoles)) + require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, mockedACLRoles, false)) ws := memdb.NewWatchSet() @@ -426,7 +437,7 @@ func TestStateStore_GetACLRoleByName(t *testing.T) { // Generate a some mocked ACL roles for testing and upsert these straight // into state. mockedACLRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} - require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, mockedACLRoles)) + require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, mockedACLRoles, false)) ws := memdb.NewWatchSet() @@ -464,7 +475,7 @@ func TestStateStore_GetACLRoleByIDPrefix(t *testing.T) { mockedACLRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} mockedACLRoles[0].ID = "test-prefix-" + uuid.Generate() mockedACLRoles[1].ID = "test-prefix-" + uuid.Generate() - require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, mockedACLRoles)) + require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 10, mockedACLRoles, false)) ws := memdb.NewWatchSet() @@ -513,7 +524,7 @@ func TestStateStore_fixTokenRoleLinks(t *testing.T) { // Generate a some mocked ACL roles for testing and upsert these straight // into state. mockedACLRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} - require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 20, mockedACLRoles)) + require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 20, mockedACLRoles, false)) // Create an ACL token linking to the ACL role. token1 := mock.ACLToken() @@ -548,7 +559,7 @@ func TestStateStore_fixTokenRoleLinks(t *testing.T) { // Generate a some mocked ACL roles for testing and upsert these straight // into state. mockedACLRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} - require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 20, mockedACLRoles)) + require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 20, mockedACLRoles, false)) // Create an ACL token linking to the ACL roles. token1 := mock.ACLToken() @@ -588,7 +599,7 @@ func TestStateStore_fixTokenRoleLinks(t *testing.T) { // Generate a some mocked ACL roles for testing and upsert these straight // into state. mockedACLRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()} - require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 20, mockedACLRoles)) + require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 20, mockedACLRoles, false)) // Create an ACL token linking to the ACL roles. token1 := mock.ACLToken() @@ -598,7 +609,7 @@ func TestStateStore_fixTokenRoleLinks(t *testing.T) { // Now change the name of one of the ACL roles. mockedACLRoles[0].Name = "badger-badger-badger" - require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 40, mockedACLRoles)) + require.NoError(t, testState.UpsertACLRoles(structs.MsgTypeTestSetup, 40, mockedACLRoles, false)) // Perform the fix and check the returned token contains the // correct roles. diff --git a/nomad/structs/acl.go b/nomad/structs/acl.go index 8ab6862b1..4c7cc5c3e 100644 --- a/nomad/structs/acl.go +++ b/nomad/structs/acl.go @@ -56,6 +56,14 @@ const ( // Reply: ACLRolesListResponse ACLListRolesRPCMethod = "ACL.ListRoles" + // ACLGetRolesByIDRPCMethod is the RPC method for detailing a number of ACL + // roles using their ID. This is an internal only RPC endpoint and used by + // the ACL Role replication process. + // + // Args: ACLRolesByIDRequest + // Reply: ACLRolesByIDResponse + ACLGetRolesByIDRPCMethod = "ACL.GetRolesByID" + // ACLGetRoleByIDRPCMethod is the RPC method for detailing an individual // ACL role using its ID. // @@ -360,6 +368,12 @@ func (a *ACLRole) Copy() *ACLRole { // roles. type ACLRolesUpsertRequest struct { ACLRoles []*ACLRole + + // AllowMissingPolicies skips the ACL Role policy link verification and is + // used by the replication process. The replication cannot ensure policies + // are present before ACL Roles are replicated. + AllowMissingPolicies bool + WriteRequest } @@ -395,6 +409,20 @@ type ACLRolesListResponse struct { QueryMeta } +// ACLRolesByIDRequest is the request object when performing a lookup of +// multiple roles by the ID. +type ACLRolesByIDRequest struct { + ACLRoleIDs []string + QueryOptions +} + +// ACLRolesByIDResponse is the response object when performing a lookup of +// multiple roles by their IDs. +type ACLRolesByIDResponse struct { + ACLRoles map[string]*ACLRole + QueryMeta +} + // ACLRoleByIDRequest is the request object to perform a lookup of an ACL // role using a specific ID. type ACLRoleByIDRequest struct { diff --git a/nomad/structs/acl_test.go b/nomad/structs/acl_test.go index 1a9b426c4..ca41ab6c5 100644 --- a/nomad/structs/acl_test.go +++ b/nomad/structs/acl_test.go @@ -682,6 +682,11 @@ func Test_ACLRolesListRequest(t *testing.T) { require.True(t, req.IsRead()) } +func Test_ACLRolesByIDRequest(t *testing.T) { + req := ACLRolesByIDRequest{} + require.True(t, req.IsRead()) +} + func Test_ACLRoleByIDRequest(t *testing.T) { req := ACLRoleByIDRequest{} require.True(t, req.IsRead())