diff --git a/agent/consul/acl_replication.go b/agent/consul/acl_replication.go index a6fabf668..6d469c5cb 100644 --- a/agent/consul/acl_replication.go +++ b/agent/consul/acl_replication.go @@ -171,32 +171,49 @@ func (s *Server) fetchACLPolicies(lastRemoteIndex uint64) (*structs.ACLPolicyLis return &response, nil } -func diffACLTokens(local structs.ACLTokens, remote structs.ACLTokenListStubs, lastRemoteIndex uint64) ([]string, []string) { +type tokenDiffResults struct { + LocalDeletes []string + LocalUpserts []string + LocalSkipped int + RemoteSkipped int +} + +func diffACLTokens(local structs.ACLTokens, remote structs.ACLTokenListStubs, lastRemoteIndex uint64) tokenDiffResults { + // Note: items with empty AccessorIDs will bubble up to the top. local.Sort() remote.Sort() - var deletions []string - var updates []string + var res tokenDiffResults var localIdx int var remoteIdx int for localIdx, remoteIdx = 0, 0; localIdx < len(local) && remoteIdx < len(remote); { + if local[localIdx].AccessorID == "" { + res.LocalSkipped++ + localIdx += 1 + continue + } + if remote[remoteIdx].AccessorID == "" { + res.RemoteSkipped++ + remoteIdx += 1 + continue + } if local[localIdx].AccessorID == remote[remoteIdx].AccessorID { // policy is in both the local and remote state - need to check raft indices and Hash if remote[remoteIdx].ModifyIndex > lastRemoteIndex && !bytes.Equal(remote[remoteIdx].Hash, local[localIdx].Hash) { - updates = append(updates, remote[remoteIdx].AccessorID) + res.LocalUpserts = append(res.LocalUpserts, remote[remoteIdx].AccessorID) } // increment both indices when equal localIdx += 1 remoteIdx += 1 } else if local[localIdx].AccessorID < remote[remoteIdx].AccessorID { // policy no longer in remoted state - needs deleting - deletions = append(deletions, local[localIdx].AccessorID) + res.LocalDeletes = append(res.LocalDeletes, local[localIdx].AccessorID) // increment just the local index localIdx += 1 } else { // local state doesn't have this policy - needs updating - updates = append(updates, remote[remoteIdx].AccessorID) + res.LocalUpserts = append(res.LocalUpserts, remote[remoteIdx].AccessorID) // increment just the remote index remoteIdx += 1 @@ -204,14 +221,22 @@ func diffACLTokens(local structs.ACLTokens, remote structs.ACLTokenListStubs, la } for ; localIdx < len(local); localIdx += 1 { - deletions = append(deletions, local[localIdx].AccessorID) + if local[localIdx].AccessorID != "" { + res.LocalDeletes = append(res.LocalDeletes, local[localIdx].AccessorID) + } else { + res.LocalSkipped++ + } } for ; remoteIdx < len(remote); remoteIdx += 1 { - updates = append(updates, remote[remoteIdx].AccessorID) + if remote[remoteIdx].AccessorID != "" { + res.LocalUpserts = append(res.LocalUpserts, remote[remoteIdx].AccessorID) + } else { + res.RemoteSkipped++ + } } - return deletions, updates + return res } func (s *Server) deleteLocalACLTokens(deletions []string, ctx context.Context) (bool, error) { @@ -453,12 +478,17 @@ func (s *Server) replicateACLTokens(lastRemoteIndex uint64, ctx context.Context) // Calculate the changes required to bring the state into sync and then // apply them. - deletions, updates := diffACLTokens(local, remote.Tokens, lastRemoteIndex) - s.logger.Printf("[DEBUG] acl: token replication - deletions: %d, updates: %d", len(deletions), len(updates)) + res := diffACLTokens(local, remote.Tokens, lastRemoteIndex) + if res.LocalSkipped > 0 || res.RemoteSkipped > 0 { + s.logger.Printf("[DEBUG] acl: token replication - deletions: %d, updates: %d, skipped: %d, skippedRemote: %d", + len(res.LocalDeletes), len(res.LocalUpserts), res.LocalSkipped, res.RemoteSkipped) + } else { + s.logger.Printf("[DEBUG] acl: token replication - deletions: %d, updates: %d", len(res.LocalDeletes), len(res.LocalUpserts)) + } var tokens *structs.ACLTokenBatchResponse - if len(updates) > 0 { - tokens, err = s.fetchACLTokensBatch(updates) + if len(res.LocalUpserts) > 0 { + tokens, err = s.fetchACLTokensBatch(res.LocalUpserts) if err != nil { return 0, false, fmt.Errorf("failed to retrieve ACL token updates: %v", err) } @@ -466,10 +496,10 @@ func (s *Server) replicateACLTokens(lastRemoteIndex uint64, ctx context.Context) s.logger.Printf("[DEBUG] acl: token replication - downloaded %d tokens", len(tokens.Tokens)) } - if len(deletions) > 0 { + if len(res.LocalDeletes) > 0 { s.logger.Printf("[DEBUG] acl: token replication - performing deletions") - exit, err := s.deleteLocalACLTokens(deletions, ctx) + exit, err := s.deleteLocalACLTokens(res.LocalDeletes, ctx) if exit { return 0, true, nil } @@ -479,7 +509,7 @@ func (s *Server) replicateACLTokens(lastRemoteIndex uint64, ctx context.Context) s.logger.Printf("[DEBUG] acl: token replication - finished deletions") } - if len(updates) > 0 { + if len(res.LocalUpserts) > 0 { s.logger.Printf("[DEBUG] acl: token replication - performing updates") exit, err := s.updateLocalACLTokens(tokens.Tokens, ctx) if exit { diff --git a/agent/consul/acl_replication_test.go b/agent/consul/acl_replication_test.go index 7f3c74f4c..0bc277601 100644 --- a/agent/consul/acl_replication_test.go +++ b/agent/consul/acl_replication_test.go @@ -126,6 +126,24 @@ func TestACLReplication_diffACLPolicies(t *testing.T) { func TestACLReplication_diffACLTokens(t *testing.T) { local := structs.ACLTokens{ + // When a just-upgraded (1.3->1.4+) secondary DC is replicating from an + // upgraded primary DC (1.4+), the local state for tokens predating the + // upgrade will lack AccessorIDs. + // + // The primary DC will lazily perform the update to assign AccessorIDs, + // and that new update will come across the wire locally as a new + // insert. + // + // We simulate that scenario here with 'token0' having no AccessorID in + // the secondary (local) DC and having an AccessorID assigned in the + // payload retrieved from the primary (remote) DC. + &structs.ACLToken{ + AccessorID: "", + SecretID: "5128289f-c22c-4d32-936e-7662443f1a55", + Description: "token0 - old and not yet upgraded", + Hash: []byte{1, 2, 3, 4}, + RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 3}, + }, &structs.ACLToken{ AccessorID: "44ef9aec-7654-4401-901b-4d4a8b3c80fc", SecretID: "44ef9aec-7654-4401-901b-4d4a8b3c80fc", @@ -157,6 +175,14 @@ func TestACLReplication_diffACLTokens(t *testing.T) { } remote := structs.ACLTokenListStubs{ + &structs.ACLTokenListStub{ + AccessorID: "72fac6a3-a014-41c8-9cb2-8d9a5e935f3d", + //SecretID: "5128289f-c22c-4d32-936e-7662443f1a55", (formerly) + Description: "token0 - old and not yet upgraded locally", + Hash: []byte{1, 2, 3, 4}, + CreateIndex: 1, + ModifyIndex: 3, + }, &structs.ACLTokenListStub{ AccessorID: "44ef9aec-7654-4401-901b-4d4a8b3c80fc", Description: "token1 - already in sync", @@ -185,35 +211,66 @@ func TestACLReplication_diffACLTokens(t *testing.T) { CreateIndex: 1, ModifyIndex: 50, }, + // When a 1.4+ secondary DC is replicating from a 1.4+ primary DC, + // tokens created using the legacy APIs will not initially have + // AccessorIDs assigned. That assignment is lazy (but in quick + // succession). + // + // The secondary (local) will see these in the api response as a stub + // with "" as the AccessorID. + // + // We simulate that here to verify that the secondary does the right + // thing by skipping them until it sees them with nonempty AccessorIDs. + &structs.ACLTokenListStub{ + AccessorID: "", + Description: "token6 - pending async AccessorID assignment", + Hash: []byte{1, 2, 3, 4}, + CreateIndex: 51, + ModifyIndex: 51, + }, } // Do the full diff. This full exercises the main body of the loop - deletions, updates := diffACLTokens(local, remote, 28) - require.Len(t, updates, 2) - require.ElementsMatch(t, updates, []string{ - "c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926", - "539f1cb6-40aa-464f-ae66-a900d26bc1b2"}) + t.Run("full-diff", func(t *testing.T) { + res := diffACLTokens(local, remote, 28) + require.Equal(t, 1, res.LocalSkipped) + require.Equal(t, 1, res.RemoteSkipped) + require.Len(t, res.LocalUpserts, 3) + require.ElementsMatch(t, res.LocalUpserts, []string{ + "72fac6a3-a014-41c8-9cb2-8d9a5e935f3d", + "c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926", + "539f1cb6-40aa-464f-ae66-a900d26bc1b2"}) - require.Len(t, deletions, 1) - require.Equal(t, "e9d33298-6490-4466-99cb-ba93af64fa76", deletions[0]) + require.Len(t, res.LocalDeletes, 1) + require.Equal(t, "e9d33298-6490-4466-99cb-ba93af64fa76", res.LocalDeletes[0]) + }) - deletions, updates = diffACLTokens(local, nil, 28) - require.Len(t, updates, 0) - require.Len(t, deletions, 4) - require.ElementsMatch(t, deletions, []string{ - "44ef9aec-7654-4401-901b-4d4a8b3c80fc", - "8ea41efb-8519-4091-bc91-c42da0cda9ae", - "539f1cb6-40aa-464f-ae66-a900d26bc1b2", - "e9d33298-6490-4466-99cb-ba93af64fa76"}) + t.Run("only-local", func(t *testing.T) { + res := diffACLTokens(local, nil, 28) + require.Equal(t, 1, res.LocalSkipped) + require.Equal(t, 0, res.RemoteSkipped) + require.Len(t, res.LocalUpserts, 0) + require.Len(t, res.LocalDeletes, 4) + require.ElementsMatch(t, res.LocalDeletes, []string{ + "44ef9aec-7654-4401-901b-4d4a8b3c80fc", + "8ea41efb-8519-4091-bc91-c42da0cda9ae", + "539f1cb6-40aa-464f-ae66-a900d26bc1b2", + "e9d33298-6490-4466-99cb-ba93af64fa76"}) + }) - deletions, updates = diffACLTokens(nil, remote, 28) - require.Len(t, deletions, 0) - require.Len(t, updates, 4) - require.ElementsMatch(t, updates, []string{ - "44ef9aec-7654-4401-901b-4d4a8b3c80fc", - "8ea41efb-8519-4091-bc91-c42da0cda9ae", - "539f1cb6-40aa-464f-ae66-a900d26bc1b2", - "c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926"}) + t.Run("only-remote", func(t *testing.T) { + res := diffACLTokens(nil, remote, 28) + require.Equal(t, 0, res.LocalSkipped) + require.Equal(t, 1, res.RemoteSkipped) + require.Len(t, res.LocalDeletes, 0) + require.Len(t, res.LocalUpserts, 5) + require.ElementsMatch(t, res.LocalUpserts, []string{ + "72fac6a3-a014-41c8-9cb2-8d9a5e935f3d", + "44ef9aec-7654-4401-901b-4d4a8b3c80fc", + "8ea41efb-8519-4091-bc91-c42da0cda9ae", + "539f1cb6-40aa-464f-ae66-a900d26bc1b2", + "c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926"}) + }) } func TestACLReplication_Tokens(t *testing.T) { diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 71e615801..05a1992ac 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -629,6 +629,8 @@ func (s *Server) startACLUpgrade() { newToken.CreateIndex = token.CreateIndex newToken.ModifyIndex = token.ModifyIndex + newToken.SetHash(true) + newTokens = append(newTokens, &newToken) }