From 41a97360caa3f97eca62d2c3d92c1825efa8b583 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 29 Sep 2021 14:36:55 -0400 Subject: [PATCH 1/7] acl: fix test failures caused by remocving legacy ACLs This commit two test failures: 1. Remove check for "in legacy ACL mode", the actual upgrade will be removed in a following commit. 2. Use the root token in WaitForLeader, because without it the test was failing with ACL not found. --- agent/consul/acl_endpoint.go | 5 ----- agent/consul/leader_federation_state_ae_test.go | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/agent/consul/acl_endpoint.go b/agent/consul/acl_endpoint.go index 4311c4434..c9bbeaabc 100644 --- a/agent/consul/acl_endpoint.go +++ b/agent/consul/acl_endpoint.go @@ -170,11 +170,6 @@ func (a *ACL) aclPreCheck() error { if !a.srv.config.ACLsEnabled { return acl.ErrDisabled } - - if a.srv.UseLegacyACLs() { - return fmt.Errorf("The ACL system is currently in legacy mode.") - } - return nil } diff --git a/agent/consul/leader_federation_state_ae_test.go b/agent/consul/leader_federation_state_ae_test.go index 897133496..13504ec04 100644 --- a/agent/consul/leader_federation_state_ae_test.go +++ b/agent/consul/leader_federation_state_ae_test.go @@ -383,7 +383,7 @@ func TestLeader_FederationStateAntiEntropyPruning_ACLDeny(t *testing.T) { // Try to join. joinWAN(t, s2, s1) testrpc.WaitForLeader(t, s1.RPC, "dc1") - testrpc.WaitForLeader(t, s1.RPC, "dc2") + testrpc.WaitForLeader(t, s1.RPC, "dc2", testrpc.WithToken("root")) // Create the ACL token. opWriteToken, err := upsertTestTokenWithPolicyRules(client, "root", "dc1", `operator = "write"`) From ebb23886058270d029f1c188122d64eef481c94d Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 22 Sep 2021 19:34:14 -0400 Subject: [PATCH 2/7] acl: remove legacy ACL upgrades from Server As part of removing the legacy ACL system --- agent/consul/acl.go | 14 --- agent/consul/acl_endpoint_test.go | 12 --- agent/consul/acl_replication_test.go | 16 --- agent/consul/acl_server.go | 65 ------------ agent/consul/acl_token_exp.go | 3 - agent/consul/helper_test.go | 10 -- agent/consul/intention_endpoint_test.go | 1 - agent/consul/leader.go | 49 +-------- agent/consul/leader_connect_test.go | 3 - agent/consul/leader_test.go | 24 +++-- agent/consul/rpc_test.go | 10 -- agent/consul/server.go | 11 +- agent/consul/util.go | 19 +--- agent/consul/util_test.go | 128 +----------------------- 14 files changed, 23 insertions(+), 342 deletions(-) diff --git a/agent/consul/acl.go b/agent/consul/acl.go index 59ee3e71e..273c558af 100644 --- a/agent/consul/acl.go +++ b/agent/consul/acl.go @@ -77,20 +77,6 @@ const ( // due to the data being more variable in its size. aclBatchUpsertSize = 256 * 1024 - // DEPRECATED (ACL-Legacy-Compat) aclModeCheck* are all only for legacy usage - // aclModeCheckMinInterval is the minimum amount of time between checking if the - // agent should be using the new or legacy ACL system. All the places it is - // currently used will backoff as it detects that it is remaining in legacy mode. - // However the initial min value is kept small so that new cluster creation - // can enter into new ACL mode quickly. - // TODO(ACL-Legacy-Compat): remove - aclModeCheckMinInterval = 50 * time.Millisecond - - // aclModeCheckMaxInterval controls the maximum interval for how often the agent - // checks if it should be using the new or legacy ACL system. - // TODO(ACL-Legacy-Compat): remove - aclModeCheckMaxInterval = 30 * time.Second - // Maximum number of re-resolution requests to be made if the token is modified between // resolving the token and resolving its policies that would remove one of its policies. tokenPolicyResolutionMaxRetries = 5 diff --git a/agent/consul/acl_endpoint_test.go b/agent/consul/acl_endpoint_test.go index e216ee01c..a7e24dc21 100644 --- a/agent/consul/acl_endpoint_test.go +++ b/agent/consul/acl_endpoint_test.go @@ -1393,9 +1393,6 @@ func TestACLEndpoint_TokenDelete(t *testing.T) { // Try to join joinWAN(t, s2, s1) - waitForNewACLs(t, s1) - waitForNewACLs(t, s2) - // Ensure s2 is authoritative. waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0) @@ -3632,9 +3629,6 @@ func TestACLEndpoint_SecureIntroEndpoints_LocalTokensDisabled(t *testing.T) { // Try to join joinWAN(t, s2, s1) - waitForNewACLs(t, s1) - waitForNewACLs(t, s2) - acl2 := ACL{srv: s2} var ignored bool @@ -3736,9 +3730,6 @@ func TestACLEndpoint_SecureIntroEndpoints_OnlyCreateLocalData(t *testing.T) { // Try to join joinWAN(t, s2, s1) - waitForNewACLs(t, s1) - waitForNewACLs(t, s2) - // Ensure s2 is authoritative. waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0) @@ -4623,9 +4614,6 @@ func TestACLEndpoint_Login_with_TokenLocality(t *testing.T) { joinWAN(t, s2, s1) - waitForNewACLs(t, s1) - waitForNewACLs(t, s2) - // Ensure s2 is authoritative. waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0) diff --git a/agent/consul/acl_replication_test.go b/agent/consul/acl_replication_test.go index 41045a5f4..f46964ea9 100644 --- a/agent/consul/acl_replication_test.go +++ b/agent/consul/acl_replication_test.go @@ -327,11 +327,6 @@ func TestACLReplication_Tokens(t *testing.T) { joinWAN(t, s2, s1) testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc2") - - // Wait for legacy acls to be disabled so we are clear that - // legacy replication isn't meddling. - waitForNewACLs(t, s1) - waitForNewACLs(t, s2) waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0) // Create a bunch of new tokens and policies @@ -544,11 +539,6 @@ func TestACLReplication_Policies(t *testing.T) { joinWAN(t, s2, s1) testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc2") - - // Wait for legacy acls to be disabled so we are clear that - // legacy replication isn't meddling. - waitForNewACLs(t, s1) - waitForNewACLs(t, s2) waitForNewACLReplication(t, s2, structs.ACLReplicatePolicies, 1, 0, 0) // Create a bunch of new policies @@ -700,7 +690,6 @@ func TestACLReplication_TokensRedacted(t *testing.T) { joinWAN(t, s2, s1) testrpc.WaitForLeader(t, s2.RPC, "dc2") testrpc.WaitForLeader(t, s2.RPC, "dc1") - waitForNewACLs(t, s2) // ensures replication is working ok retry.Run(t, func(r *retry.R) { @@ -820,11 +809,6 @@ func TestACLReplication_AllTypes(t *testing.T) { joinWAN(t, s2, s1) testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc2") - - // Wait for legacy acls to be disabled so we are clear that - // legacy replication isn't meddling. - waitForNewACLs(t, s1) - waitForNewACLs(t, s2) waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0) const ( diff --git a/agent/consul/acl_server.go b/agent/consul/acl_server.go index 025012c1f..ecb6019aa 100644 --- a/agent/consul/acl_server.go +++ b/agent/consul/acl_server.go @@ -1,12 +1,10 @@ package consul import ( - "sync/atomic" "time" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/lib/serf" ) var serverACLCacheConfig *structs.ACLCachesConfig = &structs.ACLCachesConfig{ @@ -84,73 +82,10 @@ func (s *Server) checkBindingRuleUUID(id string) (bool, error) { return !structs.ACLIDReserved(id), nil } -func (s *Server) updateSerfTags(key, value string) { - // Update the LAN serf - serf.UpdateTag(s.serfLAN, key, value) - - if s.serfWAN != nil { - serf.UpdateTag(s.serfWAN, key, value) - } - - s.updateEnterpriseSerfTags(key, value) -} - -// TODO: -func (s *Server) updateACLAdvertisement() { - // One thing to note is that once in new ACL mode the server will - // never transition to legacy ACL mode. This is not currently a - // supported use case. - s.updateSerfTags("acls", string(structs.ACLModeEnabled)) -} - -func (s *Server) canUpgradeToNewACLs(isLeader bool) bool { - if atomic.LoadInt32(&s.useNewACLs) != 0 { - // can't upgrade because we are already upgraded - return false - } - - // Check to see if we already upgraded the last time we ran by seeing if we - // have a copy of any global management policy stored locally. This should - // always be true because policies always replicate. - _, mgmtPolicy, err := s.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID, structs.DefaultEnterpriseMetaInDefaultPartition()) - if err != nil { - s.logger.Warn("Failed to get the builtin global-management policy to check for a completed ACL upgrade; skipping this optimization", "error", err) - } else if mgmtPolicy != nil { - return true - } - - if !s.InACLDatacenter() { - foundServers, mode, _ := ServersGetACLMode(s, "", s.config.PrimaryDatacenter) - if mode != structs.ACLModeEnabled || !foundServers { - s.logger.Debug("Cannot upgrade to new ACLs, servers in acl datacenter are not yet upgraded", "PrimaryDatacenter", s.config.PrimaryDatacenter, "mode", mode, "found", foundServers) - return false - } - } - - leaderAddr := string(s.raft.Leader()) - foundServers, mode, leaderMode := ServersGetACLMode(s, leaderAddr, s.config.Datacenter) - if isLeader { - if mode == structs.ACLModeLegacy { - return true - } - } else { - if leaderMode == structs.ACLModeEnabled { - return true - } - } - - s.logger.Debug("Cannot upgrade to new ACLs", "leaderMode", leaderMode, "mode", mode, "found", foundServers, "leader", leaderAddr) - return false -} - func (s *Server) InACLDatacenter() bool { return s.config.PrimaryDatacenter == "" || s.config.Datacenter == s.config.PrimaryDatacenter } -func (s *Server) UseLegacyACLs() bool { - return atomic.LoadInt32(&s.useNewACLs) == 0 -} - func (s *Server) LocalTokensEnabled() bool { // in ACL datacenter so local tokens are always enabled if s.InACLDatacenter() { diff --git a/agent/consul/acl_token_exp.go b/agent/consul/acl_token_exp.go index 5ea72b8fe..9ad3f1ec4 100644 --- a/agent/consul/acl_token_exp.go +++ b/agent/consul/acl_token_exp.go @@ -58,9 +58,6 @@ func (s *Server) reapExpiredACLTokens(local, global bool) (int, error) { if !s.config.ACLsEnabled { return 0, nil } - if s.UseLegacyACLs() { - return 0, nil - } if local == global { return 0, fmt.Errorf("cannot reap both local and global tokens in the same request") } diff --git a/agent/consul/helper_test.go b/agent/consul/helper_test.go index 7167fa35f..e8477bd00 100644 --- a/agent/consul/helper_test.go +++ b/agent/consul/helper_test.go @@ -165,16 +165,6 @@ func joinWAN(t *testing.T, member, leader *Server) { } } -func waitForNewACLs(t *testing.T, server *Server) { - t.Helper() - - retry.Run(t, func(r *retry.R) { - require.False(r, server.UseLegacyACLs(), "Server cannot use new ACLs") - }) - - require.False(t, server.UseLegacyACLs(), "Server cannot use new ACLs") -} - func waitForNewACLReplication(t *testing.T, server *Server, expectedReplicationType structs.ACLReplicationType, minPolicyIndex, minTokenIndex, minRoleIndex uint64) { t.Helper() retry.Run(t, func(r *retry.R) { diff --git a/agent/consul/intention_endpoint_test.go b/agent/consul/intention_endpoint_test.go index 230e09c86..59f450ff9 100644 --- a/agent/consul/intention_endpoint_test.go +++ b/agent/consul/intention_endpoint_test.go @@ -1606,7 +1606,6 @@ func TestIntentionList_acl(t *testing.T) { defer codec.Close() waitForLeaderEstablishment(t, s1) - waitForNewACLs(t, s1) token, err := upsertTestTokenWithPolicyRules(codec, TestDefaultMasterToken, "dc1", `service_prefix "foo" { policy = "write" }`) require.NoError(t, err) diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 91b1451c7..a1b012163 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -68,11 +68,6 @@ func (s *Server) monitorLeadership() { // cleanup and to ensure we never run multiple leader loops. raftNotifyCh := s.raftNotifyCh - aclModeCheckWait := aclModeCheckMinInterval - var aclUpgradeCh <-chan time.Time - if s.config.ACLsEnabled { - aclUpgradeCh = time.After(aclModeCheckWait) - } var weAreLeaderCh chan struct{} var leaderLoop sync.WaitGroup for { @@ -105,33 +100,6 @@ func (s *Server) monitorLeadership() { weAreLeaderCh = nil s.logger.Info("cluster leadership lost") } - case <-aclUpgradeCh: - if atomic.LoadInt32(&s.useNewACLs) == 0 { - aclModeCheckWait = aclModeCheckWait * 2 - if aclModeCheckWait > aclModeCheckMaxInterval { - aclModeCheckWait = aclModeCheckMaxInterval - } - aclUpgradeCh = time.After(aclModeCheckWait) - - if canUpgrade := s.canUpgradeToNewACLs(weAreLeaderCh != nil); canUpgrade { - if weAreLeaderCh != nil { - if err := s.initializeACLs(&lib.StopChannelContext{StopCh: weAreLeaderCh}, true); err != nil { - s.logger.Error("error transitioning to using new ACLs", "error", err) - continue - } - } - - s.logger.Debug("transitioning out of legacy ACL mode") - atomic.StoreInt32(&s.useNewACLs, 1) - s.updateACLAdvertisement() - - // setting this to nil ensures that we will never hit this case again - aclUpgradeCh = nil - } - } else { - // establishLeadership probably transitioned us - aclUpgradeCh = nil - } case <-s.shutdownCh: return } @@ -305,15 +273,7 @@ WAIT: // state is up-to-date. func (s *Server) establishLeadership(ctx context.Context) error { start := time.Now() - // check for the upgrade here - this helps us transition to new ACLs much - // quicker if this is a new cluster or this is a test agent - if canUpgrade := s.canUpgradeToNewACLs(true); canUpgrade { - if err := s.initializeACLs(ctx, true); err != nil { - return err - } - atomic.StoreInt32(&s.useNewACLs, 1) - s.updateACLAdvertisement() - } else if err := s.initializeACLs(ctx, false); err != nil { + if err := s.initializeACLs(ctx); err != nil { return err } @@ -400,7 +360,7 @@ func (s *Server) revokeLeadership() { // initializeACLs is used to setup the ACLs if we are the leader // and need to do this. -func (s *Server) initializeACLs(ctx context.Context, upgrade bool) error { +func (s *Server) initializeACLs(ctx context.Context) error { if !s.config.ACLsEnabled { return nil } @@ -573,11 +533,6 @@ func (s *Server) initializeACLs(ctx context.Context, upgrade bool) error { // launch the upgrade go routine to generate accessors for everything s.startACLUpgrade(ctx) } else { - if upgrade { - s.stopACLReplication() - } - - // ACL replication is now mandatory s.startACLReplication(ctx) } diff --git a/agent/consul/leader_connect_test.go b/agent/consul/leader_connect_test.go index fe32e4ed1..0151d068c 100644 --- a/agent/consul/leader_connect_test.go +++ b/agent/consul/leader_connect_test.go @@ -240,9 +240,6 @@ func TestLeader_SecondaryCA_Initialize(t *testing.T) { testrpc.WaitForLeader(t, s2.RPC, "secondary") - waitForNewACLs(t, s1) - waitForNewACLs(t, s2) - // Ensure s2 is authoritative. waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0) diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 1dcaeda86..0e54c009c 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/consul/agent/structs" tokenStore "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" + libserf "github.com/hashicorp/consul/lib/serf" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" @@ -1257,9 +1258,6 @@ func TestLeader_ACLUpgrade_IsStickyEvenIfSerfTagsRegress(t *testing.T) { joinWAN(t, s2, s1) waitForLeaderEstablishment(t, s1) waitForLeaderEstablishment(t, s2) - - waitForNewACLs(t, s1) - waitForNewACLs(t, s2) waitForNewACLReplication(t, s2, structs.ACLReplicatePolicies, 1, 0, 0) // Everybody has the management policy. @@ -1296,9 +1294,6 @@ func TestLeader_ACLUpgrade_IsStickyEvenIfSerfTagsRegress(t *testing.T) { defer s2new.Shutdown() waitForLeaderEstablishment(t, s2new) - - // It should be able to transition without connectivity to the primary. - waitForNewACLs(t, s2new) } func TestLeader_ConfigEntryBootstrap(t *testing.T) { @@ -1507,7 +1502,7 @@ func TestDatacenterSupportsFederationStates(t *testing.T) { defer os.RemoveAll(dir1) defer s1.Shutdown() - s1.updateSerfTags("ft_fs", "0") + updateSerfTags(s1, "ft_fs", "0") waitForLeaderEstablishment(t, s1) @@ -1562,7 +1557,7 @@ func TestDatacenterSupportsFederationStates(t *testing.T) { defer os.RemoveAll(dir1) defer s1.Shutdown() - s1.updateSerfTags("ft_fs", "0") + updateSerfTags(s1, "ft_fs", "0") waitForLeaderEstablishment(t, s1) @@ -1737,7 +1732,7 @@ func TestDatacenterSupportsFederationStates(t *testing.T) { defer os.RemoveAll(dir1) defer s1.Shutdown() - s1.updateSerfTags("ft_fs", "0") + updateSerfTags(s1, "ft_fs", "0") waitForLeaderEstablishment(t, s1) @@ -1775,6 +1770,17 @@ func TestDatacenterSupportsFederationStates(t *testing.T) { }) } +func updateSerfTags(s *Server, key, value string) { + // Update the LAN serf + libserf.UpdateTag(s.serfLAN, key, value) + + if s.serfWAN != nil { + libserf.UpdateTag(s.serfWAN, key, value) + } + + s.updateEnterpriseSerfTags(key, value) +} + func TestDatacenterSupportsIntentionsAsConfigEntries(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index f1b23872a..89bfe38eb 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -873,11 +873,6 @@ func TestRPC_LocalTokenStrippedOnForward(t *testing.T) { joinWAN(t, s2, s1) testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc2") - - // Wait for legacy acls to be disabled so we are clear that - // legacy replication isn't meddling. - waitForNewACLs(t, s1) - waitForNewACLs(t, s2) waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0) // create simple kv policy @@ -1010,11 +1005,6 @@ func TestRPC_LocalTokenStrippedOnForward_GRPC(t *testing.T) { joinWAN(t, s2, s1) testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc2") - - // Wait for legacy acls to be disabled so we are clear that - // legacy replication isn't meddling. - waitForNewACLs(t, s1) - waitForNewACLs(t, s2) waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0) // create simple service policy diff --git a/agent/consul/server.go b/agent/consul/server.go index 974ee6878..00fe5e378 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -134,10 +134,6 @@ type Server struct { aclAuthMethodValidators authmethod.Cache - // DEPRECATED (ACL-Legacy-Compat) - only needed while we support both - // useNewACLs is used to determine whether we can use new ACLs or not - useNewACLs int32 - // autopilot is the Autopilot instance for this server. autopilot *autopilot.Autopilot @@ -428,7 +424,6 @@ func NewServer(config *Config, flat Deps) (*Server, error) { s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter) s.aclConfig = newACLConfig(logger) - s.useNewACLs = 0 aclConfig := ACLResolverConfig{ Config: config.ACLResolverSettings, Delegate: s, @@ -1346,11 +1341,7 @@ func (s *Server) Stats() map[string]map[string]string { } if s.config.ACLsEnabled { - if s.UseLegacyACLs() { - stats["consul"]["acl"] = "legacy" - } else { - stats["consul"]["acl"] = "enabled" - } + stats["consul"]["acl"] = "enabled" } else { stats["consul"]["acl"] = "disabled" } diff --git a/agent/consul/util.go b/agent/consul/util.go index d79f6c3ed..afc30451c 100644 --- a/agent/consul/util.go +++ b/agent/consul/util.go @@ -4,10 +4,11 @@ import ( "runtime" "strconv" - "github.com/hashicorp/consul/agent/metadata" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-version" "github.com/hashicorp/serf/serf" + + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/agent/structs" ) // CanServersUnderstandProtocol checks to see if all the servers in the given @@ -213,17 +214,3 @@ func (s *serversACLMode) update(srv *metadata.Server) bool { return true } - -// ServersGetACLMode checks all the servers in a particular datacenter and determines -// what the minimum ACL mode amongst them is and what the leaders ACL mode is. -// The "found" return value indicates whether there were any servers considered in -// this datacenter. If that is false then the other mode return values are meaningless -// as they will be ACLModeEnabled and ACLModeUnkown respectively. -func ServersGetACLMode(provider checkServersProvider, leaderAddr string, datacenter string) (found bool, mode structs.ACLMode, leaderMode structs.ACLMode) { - var state serversACLMode - state.init(leaderAddr) - - provider.CheckServers(datacenter, state.update) - - return state.found, state.mode, state.leaderMode -} diff --git a/agent/consul/util_test.go b/agent/consul/util_test.go index 7a68f035e..48b608e69 100644 --- a/agent/consul/util_test.go +++ b/agent/consul/util_test.go @@ -2,15 +2,14 @@ package consul import ( "fmt" - "net" "regexp" "testing" - "github.com/hashicorp/consul/agent/metadata" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-version" "github.com/hashicorp/serf/serf" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/metadata" ) func TestUtil_CanServersUnderstandProtocol(t *testing.T) { @@ -246,126 +245,3 @@ func TestServersInDCMeetMinimumVersion(t *testing.T) { require.Equal(t, tc.expectedFound, found) } } - -func TestServersGetACLMode(t *testing.T) { - t.Parallel() - makeServer := func(datacenter string, acls structs.ACLMode, status serf.MemberStatus, addr net.IP) metadata.Server { - return metadata.Server{ - Name: "foo", - ShortName: "foo", - ID: "asdf", - Port: 10000, - Expect: 3, - RaftVersion: 3, - Status: status, - WanJoinPort: 1234, - Version: 1, - Addr: &net.TCPAddr{IP: addr, Port: 10000}, - // shouldn't matter for these tests - Build: *version.Must(version.NewVersion("1.7.0")), - Datacenter: datacenter, - ACLs: acls, - } - } - - type tcase struct { - servers testServersProvider - leaderAddr string - datacenter string - foundServers bool - minMode structs.ACLMode - leaderMode structs.ACLMode - } - - cases := map[string]tcase{ - "filter-members": { - servers: testServersProvider{ - makeServer("primary", structs.ACLModeLegacy, serf.StatusAlive, net.IP([]byte{127, 0, 0, 1})), - makeServer("primary", structs.ACLModeLegacy, serf.StatusFailed, net.IP([]byte{127, 0, 0, 2})), - // filtered datacenter - makeServer("secondary", structs.ACLModeUnknown, serf.StatusAlive, net.IP([]byte{127, 0, 0, 4})), - // filtered status - makeServer("primary", structs.ACLModeUnknown, serf.StatusLeaving, net.IP([]byte{127, 0, 0, 5})), - // filtered status - makeServer("primary", structs.ACLModeUnknown, serf.StatusLeft, net.IP([]byte{127, 0, 0, 6})), - // filtered status - makeServer("primary", structs.ACLModeUnknown, serf.StatusNone, net.IP([]byte{127, 0, 0, 7})), - }, - foundServers: true, - leaderAddr: "127.0.0.1:10000", - datacenter: "primary", - minMode: structs.ACLModeLegacy, - leaderMode: structs.ACLModeLegacy, - }, - "disabled": { - servers: testServersProvider{ - makeServer("primary", structs.ACLModeLegacy, serf.StatusAlive, net.IP([]byte{127, 0, 0, 1})), - makeServer("primary", structs.ACLModeUnknown, serf.StatusAlive, net.IP([]byte{127, 0, 0, 2})), - makeServer("primary", structs.ACLModeDisabled, serf.StatusAlive, net.IP([]byte{127, 0, 0, 3})), - }, - foundServers: true, - leaderAddr: "127.0.0.1:10000", - datacenter: "primary", - minMode: structs.ACLModeDisabled, - leaderMode: structs.ACLModeLegacy, - }, - "unknown": { - servers: testServersProvider{ - makeServer("primary", structs.ACLModeLegacy, serf.StatusAlive, net.IP([]byte{127, 0, 0, 1})), - makeServer("primary", structs.ACLModeUnknown, serf.StatusAlive, net.IP([]byte{127, 0, 0, 2})), - }, - foundServers: true, - leaderAddr: "127.0.0.1:10000", - datacenter: "primary", - minMode: structs.ACLModeUnknown, - leaderMode: structs.ACLModeLegacy, - }, - "legacy": { - servers: testServersProvider{ - makeServer("primary", structs.ACLModeEnabled, serf.StatusAlive, net.IP([]byte{127, 0, 0, 1})), - makeServer("primary", structs.ACLModeLegacy, serf.StatusAlive, net.IP([]byte{127, 0, 0, 2})), - }, - foundServers: true, - leaderAddr: "127.0.0.1:10000", - datacenter: "primary", - minMode: structs.ACLModeLegacy, - leaderMode: structs.ACLModeEnabled, - }, - "enabled": { - servers: testServersProvider{ - makeServer("primary", structs.ACLModeEnabled, serf.StatusAlive, net.IP([]byte{127, 0, 0, 1})), - makeServer("primary", structs.ACLModeEnabled, serf.StatusAlive, net.IP([]byte{127, 0, 0, 2})), - makeServer("primary", structs.ACLModeEnabled, serf.StatusAlive, net.IP([]byte{127, 0, 0, 3})), - }, - foundServers: true, - leaderAddr: "127.0.0.1:10000", - datacenter: "primary", - minMode: structs.ACLModeEnabled, - leaderMode: structs.ACLModeEnabled, - }, - "failed-members": { - servers: testServersProvider{ - makeServer("primary", structs.ACLModeLegacy, serf.StatusAlive, net.IP([]byte{127, 0, 0, 1})), - makeServer("primary", structs.ACLModeUnknown, serf.StatusFailed, net.IP([]byte{127, 0, 0, 2})), - makeServer("primary", structs.ACLModeLegacy, serf.StatusFailed, net.IP([]byte{127, 0, 0, 3})), - }, - foundServers: true, - leaderAddr: "127.0.0.1:10000", - datacenter: "primary", - minMode: structs.ACLModeUnknown, - leaderMode: structs.ACLModeLegacy, - }, - } - - for name, tc := range cases { - name := name - tc := tc - t.Run(name, func(t *testing.T) { - actualServers, actualMinMode, actualLeaderMode := ServersGetACLMode(tc.servers, tc.leaderAddr, tc.datacenter) - - require.Equal(t, tc.minMode, actualMinMode) - require.Equal(t, tc.leaderMode, actualLeaderMode) - require.Equal(t, tc.foundServers, actualServers) - }) - } -} From b866e3c4f43ac504694d8e7c5042ecff95008e32 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 29 Sep 2021 15:21:30 -0400 Subject: [PATCH 3/7] acl: fix test failure For some reason removing legacy ACL upgrade requires using an ACL token now for this WaitForLeader. --- agent/consul/acl_replication_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/consul/acl_replication_test.go b/agent/consul/acl_replication_test.go index f46964ea9..8bc1e8c24 100644 --- a/agent/consul/acl_replication_test.go +++ b/agent/consul/acl_replication_test.go @@ -538,7 +538,7 @@ func TestACLReplication_Policies(t *testing.T) { // Try to join. joinWAN(t, s2, s1) testrpc.WaitForLeader(t, s1.RPC, "dc1") - testrpc.WaitForLeader(t, s1.RPC, "dc2") + testrpc.WaitForLeader(t, s1.RPC, "dc2", testrpc.WithToken("root")) waitForNewACLReplication(t, s2, structs.ACLReplicatePolicies, 1, 0, 0) // Create a bunch of new policies From f21097beda3e09d9df411d1a46ee7f8d0649687a Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 29 Sep 2021 15:45:11 -0400 Subject: [PATCH 4/7] acl: remove reading of serf acl tags We no long need to read the acl serf tag, because servers are always either ACL enabled or ACL disabled. We continue to write the tag so that during an upgarde older servers will see the tag. --- agent/consul/server_serf.go | 2 ++ agent/consul/util.go | 56 ------------------------------------- agent/metadata/server.go | 12 +------- agent/structs/acl.go | 10 ++----- 4 files changed, 5 insertions(+), 75 deletions(-) diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index f72c64c34..1950c6c32 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -72,6 +72,8 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w conf.Tags["use_tls"] = "1" } + // TODO(ACL-Legacy-Compat): remove in phase 2. These are kept for now to + // allow for upgrades. if s.acls.ACLsEnabled() { conf.Tags[metadata.TagACLs] = string(structs.ACLModeEnabled) } else { diff --git a/agent/consul/util.go b/agent/consul/util.go index afc30451c..09e69381a 100644 --- a/agent/consul/util.go +++ b/agent/consul/util.go @@ -8,7 +8,6 @@ import ( "github.com/hashicorp/serf/serf" "github.com/hashicorp/consul/agent/metadata" - "github.com/hashicorp/consul/agent/structs" ) // CanServersUnderstandProtocol checks to see if all the servers in the given @@ -159,58 +158,3 @@ func (c *Client) CheckServers(datacenter string, fn func(*metadata.Server) bool) c.router.CheckServers(datacenter, fn) } - -type serversACLMode struct { - // leader is the address of the leader - leader string - - // mode indicates the overall ACL mode of the servers - mode structs.ACLMode - - // leaderMode is the ACL mode of the leader server - leaderMode structs.ACLMode - - // indicates that at least one server was processed - found bool -} - -func (s *serversACLMode) init(leader string) { - s.leader = leader - s.mode = structs.ACLModeEnabled - s.leaderMode = structs.ACLModeUnknown - s.found = false -} - -func (s *serversACLMode) update(srv *metadata.Server) bool { - if srv.Status != serf.StatusAlive && srv.Status != serf.StatusFailed { - // they are left or something so regardless we treat these servers as meeting - // the version requirement - return true - } - - // mark that we processed at least one server - s.found = true - - if srvAddr := srv.Addr.String(); srvAddr == s.leader { - s.leaderMode = srv.ACLs - } - - switch srv.ACLs { - case structs.ACLModeDisabled: - // anything disabled means we cant enable ACLs - s.mode = structs.ACLModeDisabled - case structs.ACLModeEnabled: - // do nothing - case structs.ACLModeLegacy: - // This covers legacy mode and older server versions that don't advertise ACL support - if s.mode != structs.ACLModeDisabled && s.mode != structs.ACLModeUnknown { - s.mode = structs.ACLModeLegacy - } - default: - if s.mode != structs.ACLModeDisabled { - s.mode = structs.ACLModeUnknown - } - } - - return true -} diff --git a/agent/metadata/server.go b/agent/metadata/server.go index b77d1d6d0..6fdad57c8 100644 --- a/agent/metadata/server.go +++ b/agent/metadata/server.go @@ -9,8 +9,6 @@ import ( "github.com/hashicorp/go-version" "github.com/hashicorp/serf/serf" - - "github.com/hashicorp/consul/agent/structs" ) // Key is used in maps and for equality tests. A key is based on endpoints. @@ -42,7 +40,6 @@ type Server struct { Addr net.Addr Status serf.MemberStatus ReadReplica bool - ACLs structs.ACLMode FeatureFlags map[string]int // If true, use TLS when connecting to this server @@ -97,13 +94,6 @@ func IsConsulServer(m serf.Member) (bool, *Server) { return false, nil } - var acls structs.ACLMode - if aclMode, ok := m.Tags[TagACLs]; ok { - acls = structs.ACLMode(aclMode) - } else { - acls = structs.ACLModeUnknown - } - segmentAddrs := make(map[string]string) segmentPorts := make(map[string]int) featureFlags := make(map[string]int) @@ -188,12 +178,12 @@ func IsConsulServer(m serf.Member) (bool, *Server) { UseTLS: useTLS, // DEPRECATED - remove nonVoter check once support for that tag is removed ReadReplica: nonVoter || readReplica, - ACLs: acls, FeatureFlags: featureFlags, } return true, parts } +// TODO(ACL-Legacy-Compat): remove in phase 2 const TagACLs = "acls" const featureFlagPrefix = "ft_" diff --git a/agent/structs/acl.go b/agent/structs/acl.go index 576199262..f4b944daf 100644 --- a/agent/structs/acl.go +++ b/agent/structs/acl.go @@ -20,16 +20,10 @@ import ( type ACLMode string const ( - // ACLs are disabled by configuration + // ACLModeDisabled indicates the ACL system is disabled ACLModeDisabled ACLMode = "0" - // ACLs are enabled + // ACLModeEnabled indicates the ACL system is enabled ACLModeEnabled ACLMode = "1" - // DEPRECATED (ACL-Legacy-Compat) - only needed while legacy ACLs are supported - // ACLs are enabled and using legacy ACLs - ACLModeLegacy ACLMode = "2" - // DEPRECATED (ACL-Legacy-Compat) - only needed while legacy ACLs are supported - // ACLs are assumed enabled but not being advertised - ACLModeUnknown ACLMode = "3" ) type ACLTokenIDType string From 0c077d052721d04d162112defbf2b60493f9a74e Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 29 Sep 2021 16:14:36 -0400 Subject: [PATCH 5/7] acl: only run startACLUpgrade once Since legacy ACL tokens can no longer be created we only need to run this upgrade a single time when leadership is estalbished. --- agent/consul/acl.go | 8 -------- agent/consul/leader.go | 29 +++++++++++++++++------------ agent/consul/state/acl.go | 1 + agent/consul/state/acl_schema.go | 1 + 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/agent/consul/acl.go b/agent/consul/acl.go index 273c558af..a5c4010f1 100644 --- a/agent/consul/acl.go +++ b/agent/consul/acl.go @@ -54,14 +54,6 @@ const ( // are not allowed to be displayed. redactedToken = "" - // aclUpgradeBatchSize controls how many tokens we look at during each round of upgrading. Individual raft logs - // will be further capped using the aclBatchUpsertSize. This limit just prevents us from creating a single slice - // with all tokens in it. - aclUpgradeBatchSize = 128 - - // aclUpgradeRateLimit is the number of batch upgrade requests per second allowed. - aclUpgradeRateLimit rate.Limit = 1.0 - // aclTokenReapingRateLimit is the number of batch token reaping requests per second allowed. aclTokenReapingRateLimit rate.Limit = 1.0 diff --git a/agent/consul/leader.go b/agent/consul/leader.go index a1b012163..f5b0cec6a 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -13,7 +13,6 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" "github.com/hashicorp/go-version" "github.com/hashicorp/raft" @@ -541,9 +540,19 @@ func (s *Server) initializeACLs(ctx context.Context) error { return nil } -// This function is only intended to be run as a managed go routine, it will block until -// the context passed in indicates that it should exit. +// legacyACLTokenUpgrade runs a single time to upgrade any tokens that may +// have been created immediately before the Consul upgrade, or any legacy tokens +// from a restored snapshot. +// TODO(ACL-Legacy-Compat): remove in phase 2 func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error { + // aclUpgradeRateLimit is the number of batch upgrade requests per second allowed. + const aclUpgradeRateLimit rate.Limit = 1.0 + + // aclUpgradeBatchSize controls how many tokens we look at during each round of upgrading. Individual raft logs + // will be further capped using the aclBatchUpsertSize. This limit just prevents us from creating a single slice + // with all tokens in it. + const aclUpgradeBatchSize = 128 + limiter := rate.NewLimiter(aclUpgradeRateLimit, int(aclUpgradeRateLimit)) for { if err := limiter.Wait(ctx); err != nil { @@ -552,21 +561,15 @@ func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error { // actually run the upgrade here state := s.fsm.State() - tokens, waitCh, err := state.ACLTokenListUpgradeable(aclUpgradeBatchSize) + tokens, _, err := state.ACLTokenListUpgradeable(aclUpgradeBatchSize) if err != nil { s.logger.Warn("encountered an error while searching for tokens without accessor ids", "error", err) } // No need to check expiration time here, as that only exists for v2 tokens. if len(tokens) == 0 { - ws := memdb.NewWatchSet() - ws.Add(state.AbandonCh()) - ws.Add(waitCh) - ws.Add(ctx.Done()) - - // wait for more tokens to need upgrading or the aclUpgradeCh to be closed - ws.Watch(nil) - continue + // No new legacy tokens can be created, so we can exit + return nil } var newTokens structs.ACLTokens @@ -615,6 +618,8 @@ func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error { } } +// TODO(ACL-Legacy-Compat): remove in phase 2. Keeping it for now so that we +// can upgrade any tokens created immediately before the upgrade happens. func (s *Server) startACLUpgrade(ctx context.Context) { if s.config.PrimaryDatacenter != s.config.Datacenter { // token upgrades should only run in the primary diff --git a/agent/consul/state/acl.go b/agent/consul/state/acl.go index fb19377a3..5497dfe92 100644 --- a/agent/consul/state/acl.go +++ b/agent/consul/state/acl.go @@ -728,6 +728,7 @@ func (s *Store) ACLTokenList(ws memdb.WatchSet, local, global bool, policy, role return idx, result, nil } +// TODO(ACL-Legacy-Compat): remove in phase 2 func (s *Store) ACLTokenListUpgradeable(max int) (structs.ACLTokens, <-chan struct{}, error) { tx := s.db.Txn(false) defer tx.Abort() diff --git a/agent/consul/state/acl_schema.go b/agent/consul/state/acl_schema.go index 20210182f..1e8f415f4 100644 --- a/agent/consul/state/acl_schema.go +++ b/agent/consul/state/acl_schema.go @@ -107,6 +107,7 @@ func tokensTableSchema() *memdb.TableSchema { //DEPRECATED (ACL-Legacy-Compat) - This index is only needed while we support upgrading v1 to v2 acls // This table indexes all the ACL tokens that do not have an AccessorID + // TODO(ACL-Legacy-Compat): remove in phase 2 "needs-upgrade": { Name: "needs-upgrade", AllowMissing: false, From ec935a2486cf08d2e456a3991bddccc938c14c34 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 29 Sep 2021 17:36:43 -0400 Subject: [PATCH 6/7] acl: call stop for the upgrade goroutine when done TestAgentLeaks_Server was reporting a goroutine leak without this. Not sure if it would actually be a leak in production or if this is due to the test setup, but seems easy enough to call it this way until we remove legacyACLTokenUpgrade. --- agent/consul/leader.go | 1 + agent/routine-leak-checker/leak_test.go | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/agent/consul/leader.go b/agent/consul/leader.go index f5b0cec6a..0c5b4b50d 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -569,6 +569,7 @@ func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error { if len(tokens) == 0 { // No new legacy tokens can be created, so we can exit + s.stopACLUpgrade() // required to prevent goroutine leak, according to TestAgentLeaks_Server return nil } diff --git a/agent/routine-leak-checker/leak_test.go b/agent/routine-leak-checker/leak_test.go index abc6ab6db..7b8de34c7 100644 --- a/agent/routine-leak-checker/leak_test.go +++ b/agent/routine-leak-checker/leak_test.go @@ -6,12 +6,13 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/tlsutil" - "github.com/stretchr/testify/require" - "go.uber.org/goleak" ) func testTLSCertificates(serverName string) (cert string, key string, cacert string, err error) { From b9f0014d70cb8dc665092ff94aa2ba5da81b3f97 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 4 Oct 2021 17:01:51 -0400 Subject: [PATCH 7/7] acl: remove updateEnterpriseSerfTags The only remaining caller is a test helper, and the tests don't use the enterprise gossip pools. --- agent/consul/enterprise_server_oss.go | 7 +------ agent/consul/leader_test.go | 3 --- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/agent/consul/enterprise_server_oss.go b/agent/consul/enterprise_server_oss.go index 8561d2de1..f729fd810 100644 --- a/agent/consul/enterprise_server_oss.go +++ b/agent/consul/enterprise_server_oss.go @@ -1,3 +1,4 @@ +//go:build !consulent // +build !consulent package consul @@ -86,9 +87,3 @@ func (s *Server) validateEnterpriseIntentionNamespace(ns string, _ bool) error { func addEnterpriseSerfTags(_ map[string]string, _ *structs.EnterpriseMeta) { // do nothing } - -// updateEnterpriseSerfTags in enterprise will update any instances of Serf with the tag that -// are not the normal LAN or WAN serf instances (network segments and network areas) -func (_ *Server) updateEnterpriseSerfTags(_, _ string) { - // do nothing -} diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 0e54c009c..c6544148d 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -1771,14 +1771,11 @@ func TestDatacenterSupportsFederationStates(t *testing.T) { } func updateSerfTags(s *Server, key, value string) { - // Update the LAN serf libserf.UpdateTag(s.serfLAN, key, value) if s.serfWAN != nil { libserf.UpdateTag(s.serfWAN, key, value) } - - s.updateEnterpriseSerfTags(key, value) } func TestDatacenterSupportsIntentionsAsConfigEntries(t *testing.T) {