acl: fixes ACL replication for legacy tokens without AccessorIDs (#4885)
This commit is contained in:
parent
751f8552b2
commit
2e29f234b1
|
@ -171,32 +171,49 @@ func (s *Server) fetchACLPolicies(lastRemoteIndex uint64) (*structs.ACLPolicyLis
|
|||
return &response, nil
|
||||
}
|
||||
|
||||
func diffACLTokens(local structs.ACLTokens, remote structs.ACLTokenListStubs, lastRemoteIndex uint64) ([]string, []string) {
|
||||
type tokenDiffResults struct {
|
||||
LocalDeletes []string
|
||||
LocalUpserts []string
|
||||
LocalSkipped int
|
||||
RemoteSkipped int
|
||||
}
|
||||
|
||||
func diffACLTokens(local structs.ACLTokens, remote structs.ACLTokenListStubs, lastRemoteIndex uint64) tokenDiffResults {
|
||||
// Note: items with empty AccessorIDs will bubble up to the top.
|
||||
local.Sort()
|
||||
remote.Sort()
|
||||
|
||||
var deletions []string
|
||||
var updates []string
|
||||
var res tokenDiffResults
|
||||
var localIdx int
|
||||
var remoteIdx int
|
||||
for localIdx, remoteIdx = 0, 0; localIdx < len(local) && remoteIdx < len(remote); {
|
||||
if local[localIdx].AccessorID == "" {
|
||||
res.LocalSkipped++
|
||||
localIdx += 1
|
||||
continue
|
||||
}
|
||||
if remote[remoteIdx].AccessorID == "" {
|
||||
res.RemoteSkipped++
|
||||
remoteIdx += 1
|
||||
continue
|
||||
}
|
||||
if local[localIdx].AccessorID == remote[remoteIdx].AccessorID {
|
||||
// policy is in both the local and remote state - need to check raft indices and Hash
|
||||
if remote[remoteIdx].ModifyIndex > lastRemoteIndex && !bytes.Equal(remote[remoteIdx].Hash, local[localIdx].Hash) {
|
||||
updates = append(updates, remote[remoteIdx].AccessorID)
|
||||
res.LocalUpserts = append(res.LocalUpserts, remote[remoteIdx].AccessorID)
|
||||
}
|
||||
// increment both indices when equal
|
||||
localIdx += 1
|
||||
remoteIdx += 1
|
||||
} else if local[localIdx].AccessorID < remote[remoteIdx].AccessorID {
|
||||
// policy no longer in remoted state - needs deleting
|
||||
deletions = append(deletions, local[localIdx].AccessorID)
|
||||
res.LocalDeletes = append(res.LocalDeletes, local[localIdx].AccessorID)
|
||||
|
||||
// increment just the local index
|
||||
localIdx += 1
|
||||
} else {
|
||||
// local state doesn't have this policy - needs updating
|
||||
updates = append(updates, remote[remoteIdx].AccessorID)
|
||||
res.LocalUpserts = append(res.LocalUpserts, remote[remoteIdx].AccessorID)
|
||||
|
||||
// increment just the remote index
|
||||
remoteIdx += 1
|
||||
|
@ -204,14 +221,22 @@ func diffACLTokens(local structs.ACLTokens, remote structs.ACLTokenListStubs, la
|
|||
}
|
||||
|
||||
for ; localIdx < len(local); localIdx += 1 {
|
||||
deletions = append(deletions, local[localIdx].AccessorID)
|
||||
if local[localIdx].AccessorID != "" {
|
||||
res.LocalDeletes = append(res.LocalDeletes, local[localIdx].AccessorID)
|
||||
} else {
|
||||
res.LocalSkipped++
|
||||
}
|
||||
}
|
||||
|
||||
for ; remoteIdx < len(remote); remoteIdx += 1 {
|
||||
updates = append(updates, remote[remoteIdx].AccessorID)
|
||||
if remote[remoteIdx].AccessorID != "" {
|
||||
res.LocalUpserts = append(res.LocalUpserts, remote[remoteIdx].AccessorID)
|
||||
} else {
|
||||
res.RemoteSkipped++
|
||||
}
|
||||
}
|
||||
|
||||
return deletions, updates
|
||||
return res
|
||||
}
|
||||
|
||||
func (s *Server) deleteLocalACLTokens(deletions []string, ctx context.Context) (bool, error) {
|
||||
|
@ -453,12 +478,17 @@ func (s *Server) replicateACLTokens(lastRemoteIndex uint64, ctx context.Context)
|
|||
|
||||
// Calculate the changes required to bring the state into sync and then
|
||||
// apply them.
|
||||
deletions, updates := diffACLTokens(local, remote.Tokens, lastRemoteIndex)
|
||||
s.logger.Printf("[DEBUG] acl: token replication - deletions: %d, updates: %d", len(deletions), len(updates))
|
||||
res := diffACLTokens(local, remote.Tokens, lastRemoteIndex)
|
||||
if res.LocalSkipped > 0 || res.RemoteSkipped > 0 {
|
||||
s.logger.Printf("[DEBUG] acl: token replication - deletions: %d, updates: %d, skipped: %d, skippedRemote: %d",
|
||||
len(res.LocalDeletes), len(res.LocalUpserts), res.LocalSkipped, res.RemoteSkipped)
|
||||
} else {
|
||||
s.logger.Printf("[DEBUG] acl: token replication - deletions: %d, updates: %d", len(res.LocalDeletes), len(res.LocalUpserts))
|
||||
}
|
||||
|
||||
var tokens *structs.ACLTokenBatchResponse
|
||||
if len(updates) > 0 {
|
||||
tokens, err = s.fetchACLTokensBatch(updates)
|
||||
if len(res.LocalUpserts) > 0 {
|
||||
tokens, err = s.fetchACLTokensBatch(res.LocalUpserts)
|
||||
if err != nil {
|
||||
return 0, false, fmt.Errorf("failed to retrieve ACL token updates: %v", err)
|
||||
}
|
||||
|
@ -466,10 +496,10 @@ func (s *Server) replicateACLTokens(lastRemoteIndex uint64, ctx context.Context)
|
|||
s.logger.Printf("[DEBUG] acl: token replication - downloaded %d tokens", len(tokens.Tokens))
|
||||
}
|
||||
|
||||
if len(deletions) > 0 {
|
||||
if len(res.LocalDeletes) > 0 {
|
||||
s.logger.Printf("[DEBUG] acl: token replication - performing deletions")
|
||||
|
||||
exit, err := s.deleteLocalACLTokens(deletions, ctx)
|
||||
exit, err := s.deleteLocalACLTokens(res.LocalDeletes, ctx)
|
||||
if exit {
|
||||
return 0, true, nil
|
||||
}
|
||||
|
@ -479,7 +509,7 @@ func (s *Server) replicateACLTokens(lastRemoteIndex uint64, ctx context.Context)
|
|||
s.logger.Printf("[DEBUG] acl: token replication - finished deletions")
|
||||
}
|
||||
|
||||
if len(updates) > 0 {
|
||||
if len(res.LocalUpserts) > 0 {
|
||||
s.logger.Printf("[DEBUG] acl: token replication - performing updates")
|
||||
exit, err := s.updateLocalACLTokens(tokens.Tokens, ctx)
|
||||
if exit {
|
||||
|
|
|
@ -126,6 +126,24 @@ func TestACLReplication_diffACLPolicies(t *testing.T) {
|
|||
|
||||
func TestACLReplication_diffACLTokens(t *testing.T) {
|
||||
local := structs.ACLTokens{
|
||||
// When a just-upgraded (1.3->1.4+) secondary DC is replicating from an
|
||||
// upgraded primary DC (1.4+), the local state for tokens predating the
|
||||
// upgrade will lack AccessorIDs.
|
||||
//
|
||||
// The primary DC will lazily perform the update to assign AccessorIDs,
|
||||
// and that new update will come across the wire locally as a new
|
||||
// insert.
|
||||
//
|
||||
// We simulate that scenario here with 'token0' having no AccessorID in
|
||||
// the secondary (local) DC and having an AccessorID assigned in the
|
||||
// payload retrieved from the primary (remote) DC.
|
||||
&structs.ACLToken{
|
||||
AccessorID: "",
|
||||
SecretID: "5128289f-c22c-4d32-936e-7662443f1a55",
|
||||
Description: "token0 - old and not yet upgraded",
|
||||
Hash: []byte{1, 2, 3, 4},
|
||||
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 3},
|
||||
},
|
||||
&structs.ACLToken{
|
||||
AccessorID: "44ef9aec-7654-4401-901b-4d4a8b3c80fc",
|
||||
SecretID: "44ef9aec-7654-4401-901b-4d4a8b3c80fc",
|
||||
|
@ -157,6 +175,14 @@ func TestACLReplication_diffACLTokens(t *testing.T) {
|
|||
}
|
||||
|
||||
remote := structs.ACLTokenListStubs{
|
||||
&structs.ACLTokenListStub{
|
||||
AccessorID: "72fac6a3-a014-41c8-9cb2-8d9a5e935f3d",
|
||||
//SecretID: "5128289f-c22c-4d32-936e-7662443f1a55", (formerly)
|
||||
Description: "token0 - old and not yet upgraded locally",
|
||||
Hash: []byte{1, 2, 3, 4},
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 3,
|
||||
},
|
||||
&structs.ACLTokenListStub{
|
||||
AccessorID: "44ef9aec-7654-4401-901b-4d4a8b3c80fc",
|
||||
Description: "token1 - already in sync",
|
||||
|
@ -185,35 +211,66 @@ func TestACLReplication_diffACLTokens(t *testing.T) {
|
|||
CreateIndex: 1,
|
||||
ModifyIndex: 50,
|
||||
},
|
||||
// When a 1.4+ secondary DC is replicating from a 1.4+ primary DC,
|
||||
// tokens created using the legacy APIs will not initially have
|
||||
// AccessorIDs assigned. That assignment is lazy (but in quick
|
||||
// succession).
|
||||
//
|
||||
// The secondary (local) will see these in the api response as a stub
|
||||
// with "" as the AccessorID.
|
||||
//
|
||||
// We simulate that here to verify that the secondary does the right
|
||||
// thing by skipping them until it sees them with nonempty AccessorIDs.
|
||||
&structs.ACLTokenListStub{
|
||||
AccessorID: "",
|
||||
Description: "token6 - pending async AccessorID assignment",
|
||||
Hash: []byte{1, 2, 3, 4},
|
||||
CreateIndex: 51,
|
||||
ModifyIndex: 51,
|
||||
},
|
||||
}
|
||||
|
||||
// Do the full diff. This full exercises the main body of the loop
|
||||
deletions, updates := diffACLTokens(local, remote, 28)
|
||||
require.Len(t, updates, 2)
|
||||
require.ElementsMatch(t, updates, []string{
|
||||
t.Run("full-diff", func(t *testing.T) {
|
||||
res := diffACLTokens(local, remote, 28)
|
||||
require.Equal(t, 1, res.LocalSkipped)
|
||||
require.Equal(t, 1, res.RemoteSkipped)
|
||||
require.Len(t, res.LocalUpserts, 3)
|
||||
require.ElementsMatch(t, res.LocalUpserts, []string{
|
||||
"72fac6a3-a014-41c8-9cb2-8d9a5e935f3d",
|
||||
"c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926",
|
||||
"539f1cb6-40aa-464f-ae66-a900d26bc1b2"})
|
||||
|
||||
require.Len(t, deletions, 1)
|
||||
require.Equal(t, "e9d33298-6490-4466-99cb-ba93af64fa76", deletions[0])
|
||||
require.Len(t, res.LocalDeletes, 1)
|
||||
require.Equal(t, "e9d33298-6490-4466-99cb-ba93af64fa76", res.LocalDeletes[0])
|
||||
})
|
||||
|
||||
deletions, updates = diffACLTokens(local, nil, 28)
|
||||
require.Len(t, updates, 0)
|
||||
require.Len(t, deletions, 4)
|
||||
require.ElementsMatch(t, deletions, []string{
|
||||
t.Run("only-local", func(t *testing.T) {
|
||||
res := diffACLTokens(local, nil, 28)
|
||||
require.Equal(t, 1, res.LocalSkipped)
|
||||
require.Equal(t, 0, res.RemoteSkipped)
|
||||
require.Len(t, res.LocalUpserts, 0)
|
||||
require.Len(t, res.LocalDeletes, 4)
|
||||
require.ElementsMatch(t, res.LocalDeletes, []string{
|
||||
"44ef9aec-7654-4401-901b-4d4a8b3c80fc",
|
||||
"8ea41efb-8519-4091-bc91-c42da0cda9ae",
|
||||
"539f1cb6-40aa-464f-ae66-a900d26bc1b2",
|
||||
"e9d33298-6490-4466-99cb-ba93af64fa76"})
|
||||
})
|
||||
|
||||
deletions, updates = diffACLTokens(nil, remote, 28)
|
||||
require.Len(t, deletions, 0)
|
||||
require.Len(t, updates, 4)
|
||||
require.ElementsMatch(t, updates, []string{
|
||||
t.Run("only-remote", func(t *testing.T) {
|
||||
res := diffACLTokens(nil, remote, 28)
|
||||
require.Equal(t, 0, res.LocalSkipped)
|
||||
require.Equal(t, 1, res.RemoteSkipped)
|
||||
require.Len(t, res.LocalDeletes, 0)
|
||||
require.Len(t, res.LocalUpserts, 5)
|
||||
require.ElementsMatch(t, res.LocalUpserts, []string{
|
||||
"72fac6a3-a014-41c8-9cb2-8d9a5e935f3d",
|
||||
"44ef9aec-7654-4401-901b-4d4a8b3c80fc",
|
||||
"8ea41efb-8519-4091-bc91-c42da0cda9ae",
|
||||
"539f1cb6-40aa-464f-ae66-a900d26bc1b2",
|
||||
"c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926"})
|
||||
})
|
||||
}
|
||||
|
||||
func TestACLReplication_Tokens(t *testing.T) {
|
||||
|
|
|
@ -629,6 +629,8 @@ func (s *Server) startACLUpgrade() {
|
|||
newToken.CreateIndex = token.CreateIndex
|
||||
newToken.ModifyIndex = token.ModifyIndex
|
||||
|
||||
newToken.SetHash(true)
|
||||
|
||||
newTokens = append(newTokens, &newToken)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue