Merge pull request #11182 from hashicorp/dnephin/acl-legacy-remove-upgrade
acl: remove upgrade from legacy, start in non-legacy mode
This commit is contained in:
commit
e03b7e4c68
|
@ -54,14 +54,6 @@ const (
|
|||
// are not allowed to be displayed.
|
||||
redactedToken = "<hidden>"
|
||||
|
||||
// aclUpgradeBatchSize controls how many tokens we look at during each round of upgrading. Individual raft logs
|
||||
// will be further capped using the aclBatchUpsertSize. This limit just prevents us from creating a single slice
|
||||
// with all tokens in it.
|
||||
aclUpgradeBatchSize = 128
|
||||
|
||||
// aclUpgradeRateLimit is the number of batch upgrade requests per second allowed.
|
||||
aclUpgradeRateLimit rate.Limit = 1.0
|
||||
|
||||
// aclTokenReapingRateLimit is the number of batch token reaping requests per second allowed.
|
||||
aclTokenReapingRateLimit rate.Limit = 1.0
|
||||
|
||||
|
@ -77,20 +69,6 @@ const (
|
|||
// due to the data being more variable in its size.
|
||||
aclBatchUpsertSize = 256 * 1024
|
||||
|
||||
// DEPRECATED (ACL-Legacy-Compat) aclModeCheck* are all only for legacy usage
|
||||
// aclModeCheckMinInterval is the minimum amount of time between checking if the
|
||||
// agent should be using the new or legacy ACL system. All the places it is
|
||||
// currently used will backoff as it detects that it is remaining in legacy mode.
|
||||
// However the initial min value is kept small so that new cluster creation
|
||||
// can enter into new ACL mode quickly.
|
||||
// TODO(ACL-Legacy-Compat): remove
|
||||
aclModeCheckMinInterval = 50 * time.Millisecond
|
||||
|
||||
// aclModeCheckMaxInterval controls the maximum interval for how often the agent
|
||||
// checks if it should be using the new or legacy ACL system.
|
||||
// TODO(ACL-Legacy-Compat): remove
|
||||
aclModeCheckMaxInterval = 30 * time.Second
|
||||
|
||||
// Maximum number of re-resolution requests to be made if the token is modified between
|
||||
// resolving the token and resolving its policies that would remove one of its policies.
|
||||
tokenPolicyResolutionMaxRetries = 5
|
||||
|
|
|
@ -1393,9 +1393,6 @@ func TestACLEndpoint_TokenDelete(t *testing.T) {
|
|||
// Try to join
|
||||
joinWAN(t, s2, s1)
|
||||
|
||||
waitForNewACLs(t, s1)
|
||||
waitForNewACLs(t, s2)
|
||||
|
||||
// Ensure s2 is authoritative.
|
||||
waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
|
||||
|
||||
|
@ -3632,9 +3629,6 @@ func TestACLEndpoint_SecureIntroEndpoints_LocalTokensDisabled(t *testing.T) {
|
|||
// Try to join
|
||||
joinWAN(t, s2, s1)
|
||||
|
||||
waitForNewACLs(t, s1)
|
||||
waitForNewACLs(t, s2)
|
||||
|
||||
acl2 := ACL{srv: s2}
|
||||
var ignored bool
|
||||
|
||||
|
@ -3736,9 +3730,6 @@ func TestACLEndpoint_SecureIntroEndpoints_OnlyCreateLocalData(t *testing.T) {
|
|||
// Try to join
|
||||
joinWAN(t, s2, s1)
|
||||
|
||||
waitForNewACLs(t, s1)
|
||||
waitForNewACLs(t, s2)
|
||||
|
||||
// Ensure s2 is authoritative.
|
||||
waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
|
||||
|
||||
|
@ -4623,9 +4614,6 @@ func TestACLEndpoint_Login_with_TokenLocality(t *testing.T) {
|
|||
|
||||
joinWAN(t, s2, s1)
|
||||
|
||||
waitForNewACLs(t, s1)
|
||||
waitForNewACLs(t, s2)
|
||||
|
||||
// Ensure s2 is authoritative.
|
||||
waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
|
||||
|
||||
|
|
|
@ -327,11 +327,6 @@ func TestACLReplication_Tokens(t *testing.T) {
|
|||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
// Wait for legacy acls to be disabled so we are clear that
|
||||
// legacy replication isn't meddling.
|
||||
waitForNewACLs(t, s1)
|
||||
waitForNewACLs(t, s2)
|
||||
waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
|
||||
|
||||
// Create a bunch of new tokens and policies
|
||||
|
@ -543,12 +538,7 @@ func TestACLReplication_Policies(t *testing.T) {
|
|||
// Try to join.
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
// Wait for legacy acls to be disabled so we are clear that
|
||||
// legacy replication isn't meddling.
|
||||
waitForNewACLs(t, s1)
|
||||
waitForNewACLs(t, s2)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2", testrpc.WithToken("root"))
|
||||
waitForNewACLReplication(t, s2, structs.ACLReplicatePolicies, 1, 0, 0)
|
||||
|
||||
// Create a bunch of new policies
|
||||
|
@ -700,7 +690,6 @@ func TestACLReplication_TokensRedacted(t *testing.T) {
|
|||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc1")
|
||||
waitForNewACLs(t, s2)
|
||||
|
||||
// ensures replication is working ok
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
@ -820,11 +809,6 @@ func TestACLReplication_AllTypes(t *testing.T) {
|
|||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
// Wait for legacy acls to be disabled so we are clear that
|
||||
// legacy replication isn't meddling.
|
||||
waitForNewACLs(t, s1)
|
||||
waitForNewACLs(t, s2)
|
||||
waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
|
||||
|
||||
const (
|
||||
|
|
|
@ -1,12 +1,10 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib/serf"
|
||||
)
|
||||
|
||||
var serverACLCacheConfig *structs.ACLCachesConfig = &structs.ACLCachesConfig{
|
||||
|
@ -84,73 +82,10 @@ func (s *Server) checkBindingRuleUUID(id string) (bool, error) {
|
|||
return !structs.ACLIDReserved(id), nil
|
||||
}
|
||||
|
||||
func (s *Server) updateSerfTags(key, value string) {
|
||||
// Update the LAN serf
|
||||
serf.UpdateTag(s.serfLAN, key, value)
|
||||
|
||||
if s.serfWAN != nil {
|
||||
serf.UpdateTag(s.serfWAN, key, value)
|
||||
}
|
||||
|
||||
s.updateEnterpriseSerfTags(key, value)
|
||||
}
|
||||
|
||||
// TODO:
|
||||
func (s *Server) updateACLAdvertisement() {
|
||||
// One thing to note is that once in new ACL mode the server will
|
||||
// never transition to legacy ACL mode. This is not currently a
|
||||
// supported use case.
|
||||
s.updateSerfTags("acls", string(structs.ACLModeEnabled))
|
||||
}
|
||||
|
||||
func (s *Server) canUpgradeToNewACLs(isLeader bool) bool {
|
||||
if atomic.LoadInt32(&s.useNewACLs) != 0 {
|
||||
// can't upgrade because we are already upgraded
|
||||
return false
|
||||
}
|
||||
|
||||
// Check to see if we already upgraded the last time we ran by seeing if we
|
||||
// have a copy of any global management policy stored locally. This should
|
||||
// always be true because policies always replicate.
|
||||
_, mgmtPolicy, err := s.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID, structs.DefaultEnterpriseMetaInDefaultPartition())
|
||||
if err != nil {
|
||||
s.logger.Warn("Failed to get the builtin global-management policy to check for a completed ACL upgrade; skipping this optimization", "error", err)
|
||||
} else if mgmtPolicy != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if !s.InACLDatacenter() {
|
||||
foundServers, mode, _ := ServersGetACLMode(s, "", s.config.PrimaryDatacenter)
|
||||
if mode != structs.ACLModeEnabled || !foundServers {
|
||||
s.logger.Debug("Cannot upgrade to new ACLs, servers in acl datacenter are not yet upgraded", "PrimaryDatacenter", s.config.PrimaryDatacenter, "mode", mode, "found", foundServers)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
leaderAddr := string(s.raft.Leader())
|
||||
foundServers, mode, leaderMode := ServersGetACLMode(s, leaderAddr, s.config.Datacenter)
|
||||
if isLeader {
|
||||
if mode == structs.ACLModeLegacy {
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
if leaderMode == structs.ACLModeEnabled {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
s.logger.Debug("Cannot upgrade to new ACLs", "leaderMode", leaderMode, "mode", mode, "found", foundServers, "leader", leaderAddr)
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *Server) InACLDatacenter() bool {
|
||||
return s.config.PrimaryDatacenter == "" || s.config.Datacenter == s.config.PrimaryDatacenter
|
||||
}
|
||||
|
||||
func (s *Server) UseLegacyACLs() bool {
|
||||
return atomic.LoadInt32(&s.useNewACLs) == 0
|
||||
}
|
||||
|
||||
func (s *Server) LocalTokensEnabled() bool {
|
||||
// in ACL datacenter so local tokens are always enabled
|
||||
if s.InACLDatacenter() {
|
||||
|
|
|
@ -58,9 +58,6 @@ func (s *Server) reapExpiredACLTokens(local, global bool) (int, error) {
|
|||
if !s.config.ACLsEnabled {
|
||||
return 0, nil
|
||||
}
|
||||
if s.UseLegacyACLs() {
|
||||
return 0, nil
|
||||
}
|
||||
if local == global {
|
||||
return 0, fmt.Errorf("cannot reap both local and global tokens in the same request")
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//go:build !consulent
|
||||
// +build !consulent
|
||||
|
||||
package consul
|
||||
|
@ -86,9 +87,3 @@ func (s *Server) validateEnterpriseIntentionNamespace(ns string, _ bool) error {
|
|||
func addEnterpriseSerfTags(_ map[string]string, _ *structs.EnterpriseMeta) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
// updateEnterpriseSerfTags in enterprise will update any instances of Serf with the tag that
|
||||
// are not the normal LAN or WAN serf instances (network segments and network areas)
|
||||
func (_ *Server) updateEnterpriseSerfTags(_, _ string) {
|
||||
// do nothing
|
||||
}
|
||||
|
|
|
@ -165,16 +165,6 @@ func joinWAN(t *testing.T, member, leader *Server) {
|
|||
}
|
||||
}
|
||||
|
||||
func waitForNewACLs(t *testing.T, server *Server) {
|
||||
t.Helper()
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.False(r, server.UseLegacyACLs(), "Server cannot use new ACLs")
|
||||
})
|
||||
|
||||
require.False(t, server.UseLegacyACLs(), "Server cannot use new ACLs")
|
||||
}
|
||||
|
||||
func waitForNewACLReplication(t *testing.T, server *Server, expectedReplicationType structs.ACLReplicationType, minPolicyIndex, minTokenIndex, minRoleIndex uint64) {
|
||||
t.Helper()
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
|
|
@ -1606,7 +1606,6 @@ func TestIntentionList_acl(t *testing.T) {
|
|||
defer codec.Close()
|
||||
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
waitForNewACLs(t, s1)
|
||||
|
||||
token, err := upsertTestTokenWithPolicyRules(codec, TestDefaultMasterToken, "dc1", `service_prefix "foo" { policy = "write" }`)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -13,7 +13,6 @@ import (
|
|||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/go-version"
|
||||
"github.com/hashicorp/raft"
|
||||
|
@ -68,11 +67,6 @@ func (s *Server) monitorLeadership() {
|
|||
// cleanup and to ensure we never run multiple leader loops.
|
||||
raftNotifyCh := s.raftNotifyCh
|
||||
|
||||
aclModeCheckWait := aclModeCheckMinInterval
|
||||
var aclUpgradeCh <-chan time.Time
|
||||
if s.config.ACLsEnabled {
|
||||
aclUpgradeCh = time.After(aclModeCheckWait)
|
||||
}
|
||||
var weAreLeaderCh chan struct{}
|
||||
var leaderLoop sync.WaitGroup
|
||||
for {
|
||||
|
@ -105,33 +99,6 @@ func (s *Server) monitorLeadership() {
|
|||
weAreLeaderCh = nil
|
||||
s.logger.Info("cluster leadership lost")
|
||||
}
|
||||
case <-aclUpgradeCh:
|
||||
if atomic.LoadInt32(&s.useNewACLs) == 0 {
|
||||
aclModeCheckWait = aclModeCheckWait * 2
|
||||
if aclModeCheckWait > aclModeCheckMaxInterval {
|
||||
aclModeCheckWait = aclModeCheckMaxInterval
|
||||
}
|
||||
aclUpgradeCh = time.After(aclModeCheckWait)
|
||||
|
||||
if canUpgrade := s.canUpgradeToNewACLs(weAreLeaderCh != nil); canUpgrade {
|
||||
if weAreLeaderCh != nil {
|
||||
if err := s.initializeACLs(&lib.StopChannelContext{StopCh: weAreLeaderCh}, true); err != nil {
|
||||
s.logger.Error("error transitioning to using new ACLs", "error", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
s.logger.Debug("transitioning out of legacy ACL mode")
|
||||
atomic.StoreInt32(&s.useNewACLs, 1)
|
||||
s.updateACLAdvertisement()
|
||||
|
||||
// setting this to nil ensures that we will never hit this case again
|
||||
aclUpgradeCh = nil
|
||||
}
|
||||
} else {
|
||||
// establishLeadership probably transitioned us
|
||||
aclUpgradeCh = nil
|
||||
}
|
||||
case <-s.shutdownCh:
|
||||
return
|
||||
}
|
||||
|
@ -305,15 +272,7 @@ WAIT:
|
|||
// state is up-to-date.
|
||||
func (s *Server) establishLeadership(ctx context.Context) error {
|
||||
start := time.Now()
|
||||
// check for the upgrade here - this helps us transition to new ACLs much
|
||||
// quicker if this is a new cluster or this is a test agent
|
||||
if canUpgrade := s.canUpgradeToNewACLs(true); canUpgrade {
|
||||
if err := s.initializeACLs(ctx, true); err != nil {
|
||||
return err
|
||||
}
|
||||
atomic.StoreInt32(&s.useNewACLs, 1)
|
||||
s.updateACLAdvertisement()
|
||||
} else if err := s.initializeACLs(ctx, false); err != nil {
|
||||
if err := s.initializeACLs(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -400,7 +359,7 @@ func (s *Server) revokeLeadership() {
|
|||
|
||||
// initializeACLs is used to setup the ACLs if we are the leader
|
||||
// and need to do this.
|
||||
func (s *Server) initializeACLs(ctx context.Context, upgrade bool) error {
|
||||
func (s *Server) initializeACLs(ctx context.Context) error {
|
||||
if !s.config.ACLsEnabled {
|
||||
return nil
|
||||
}
|
||||
|
@ -573,11 +532,6 @@ func (s *Server) initializeACLs(ctx context.Context, upgrade bool) error {
|
|||
// launch the upgrade go routine to generate accessors for everything
|
||||
s.startACLUpgrade(ctx)
|
||||
} else {
|
||||
if upgrade {
|
||||
s.stopACLReplication()
|
||||
}
|
||||
|
||||
// ACL replication is now mandatory
|
||||
s.startACLReplication(ctx)
|
||||
}
|
||||
|
||||
|
@ -586,9 +540,19 @@ func (s *Server) initializeACLs(ctx context.Context, upgrade bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// This function is only intended to be run as a managed go routine, it will block until
|
||||
// the context passed in indicates that it should exit.
|
||||
// legacyACLTokenUpgrade runs a single time to upgrade any tokens that may
|
||||
// have been created immediately before the Consul upgrade, or any legacy tokens
|
||||
// from a restored snapshot.
|
||||
// TODO(ACL-Legacy-Compat): remove in phase 2
|
||||
func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error {
|
||||
// aclUpgradeRateLimit is the number of batch upgrade requests per second allowed.
|
||||
const aclUpgradeRateLimit rate.Limit = 1.0
|
||||
|
||||
// aclUpgradeBatchSize controls how many tokens we look at during each round of upgrading. Individual raft logs
|
||||
// will be further capped using the aclBatchUpsertSize. This limit just prevents us from creating a single slice
|
||||
// with all tokens in it.
|
||||
const aclUpgradeBatchSize = 128
|
||||
|
||||
limiter := rate.NewLimiter(aclUpgradeRateLimit, int(aclUpgradeRateLimit))
|
||||
for {
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
|
@ -597,21 +561,16 @@ func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error {
|
|||
|
||||
// actually run the upgrade here
|
||||
state := s.fsm.State()
|
||||
tokens, waitCh, err := state.ACLTokenListUpgradeable(aclUpgradeBatchSize)
|
||||
tokens, _, err := state.ACLTokenListUpgradeable(aclUpgradeBatchSize)
|
||||
if err != nil {
|
||||
s.logger.Warn("encountered an error while searching for tokens without accessor ids", "error", err)
|
||||
}
|
||||
// No need to check expiration time here, as that only exists for v2 tokens.
|
||||
|
||||
if len(tokens) == 0 {
|
||||
ws := memdb.NewWatchSet()
|
||||
ws.Add(state.AbandonCh())
|
||||
ws.Add(waitCh)
|
||||
ws.Add(ctx.Done())
|
||||
|
||||
// wait for more tokens to need upgrading or the aclUpgradeCh to be closed
|
||||
ws.Watch(nil)
|
||||
continue
|
||||
// No new legacy tokens can be created, so we can exit
|
||||
s.stopACLUpgrade() // required to prevent goroutine leak, according to TestAgentLeaks_Server
|
||||
return nil
|
||||
}
|
||||
|
||||
var newTokens structs.ACLTokens
|
||||
|
@ -660,6 +619,8 @@ func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO(ACL-Legacy-Compat): remove in phase 2. Keeping it for now so that we
|
||||
// can upgrade any tokens created immediately before the upgrade happens.
|
||||
func (s *Server) startACLUpgrade(ctx context.Context) {
|
||||
if s.config.PrimaryDatacenter != s.config.Datacenter {
|
||||
// token upgrades should only run in the primary
|
||||
|
|
|
@ -240,9 +240,6 @@ func TestLeader_SecondaryCA_Initialize(t *testing.T) {
|
|||
|
||||
testrpc.WaitForLeader(t, s2.RPC, "secondary")
|
||||
|
||||
waitForNewACLs(t, s1)
|
||||
waitForNewACLs(t, s2)
|
||||
|
||||
// Ensure s2 is authoritative.
|
||||
waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
|
||||
|
||||
|
|
|
@ -382,7 +382,7 @@ func TestLeader_FederationStateAntiEntropyPruning_ACLDeny(t *testing.T) {
|
|||
// Try to join.
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2", testrpc.WithToken("root"))
|
||||
|
||||
// Create the ACL token.
|
||||
opWriteToken, err := upsertTestTokenWithPolicyRules(client, "root", "dc1", `operator = "write"`)
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
tokenStore "github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/api"
|
||||
libserf "github.com/hashicorp/consul/lib/serf"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
|
@ -1257,9 +1258,6 @@ func TestLeader_ACLUpgrade_IsStickyEvenIfSerfTagsRegress(t *testing.T) {
|
|||
joinWAN(t, s2, s1)
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
waitForLeaderEstablishment(t, s2)
|
||||
|
||||
waitForNewACLs(t, s1)
|
||||
waitForNewACLs(t, s2)
|
||||
waitForNewACLReplication(t, s2, structs.ACLReplicatePolicies, 1, 0, 0)
|
||||
|
||||
// Everybody has the management policy.
|
||||
|
@ -1296,9 +1294,6 @@ func TestLeader_ACLUpgrade_IsStickyEvenIfSerfTagsRegress(t *testing.T) {
|
|||
defer s2new.Shutdown()
|
||||
|
||||
waitForLeaderEstablishment(t, s2new)
|
||||
|
||||
// It should be able to transition without connectivity to the primary.
|
||||
waitForNewACLs(t, s2new)
|
||||
}
|
||||
|
||||
func TestLeader_ConfigEntryBootstrap(t *testing.T) {
|
||||
|
@ -1507,7 +1502,7 @@ func TestDatacenterSupportsFederationStates(t *testing.T) {
|
|||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
s1.updateSerfTags("ft_fs", "0")
|
||||
updateSerfTags(s1, "ft_fs", "0")
|
||||
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
|
||||
|
@ -1562,7 +1557,7 @@ func TestDatacenterSupportsFederationStates(t *testing.T) {
|
|||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
s1.updateSerfTags("ft_fs", "0")
|
||||
updateSerfTags(s1, "ft_fs", "0")
|
||||
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
|
||||
|
@ -1737,7 +1732,7 @@ func TestDatacenterSupportsFederationStates(t *testing.T) {
|
|||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
s1.updateSerfTags("ft_fs", "0")
|
||||
updateSerfTags(s1, "ft_fs", "0")
|
||||
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
|
||||
|
@ -1775,6 +1770,14 @@ func TestDatacenterSupportsFederationStates(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func updateSerfTags(s *Server, key, value string) {
|
||||
libserf.UpdateTag(s.serfLAN, key, value)
|
||||
|
||||
if s.serfWAN != nil {
|
||||
libserf.UpdateTag(s.serfWAN, key, value)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDatacenterSupportsIntentionsAsConfigEntries(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
|
|
|
@ -873,11 +873,6 @@ func TestRPC_LocalTokenStrippedOnForward(t *testing.T) {
|
|||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
// Wait for legacy acls to be disabled so we are clear that
|
||||
// legacy replication isn't meddling.
|
||||
waitForNewACLs(t, s1)
|
||||
waitForNewACLs(t, s2)
|
||||
waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
|
||||
|
||||
// create simple kv policy
|
||||
|
@ -1010,11 +1005,6 @@ func TestRPC_LocalTokenStrippedOnForward_GRPC(t *testing.T) {
|
|||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
// Wait for legacy acls to be disabled so we are clear that
|
||||
// legacy replication isn't meddling.
|
||||
waitForNewACLs(t, s1)
|
||||
waitForNewACLs(t, s2)
|
||||
waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
|
||||
|
||||
// create simple service policy
|
||||
|
|
|
@ -134,10 +134,6 @@ type Server struct {
|
|||
|
||||
aclAuthMethodValidators authmethod.Cache
|
||||
|
||||
// DEPRECATED (ACL-Legacy-Compat) - only needed while we support both
|
||||
// useNewACLs is used to determine whether we can use new ACLs or not
|
||||
useNewACLs int32
|
||||
|
||||
// autopilot is the Autopilot instance for this server.
|
||||
autopilot *autopilot.Autopilot
|
||||
|
||||
|
@ -428,7 +424,6 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
|
|||
s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter)
|
||||
|
||||
s.aclConfig = newACLConfig(logger)
|
||||
s.useNewACLs = 0
|
||||
aclConfig := ACLResolverConfig{
|
||||
Config: config.ACLResolverSettings,
|
||||
Delegate: s,
|
||||
|
@ -1346,11 +1341,7 @@ func (s *Server) Stats() map[string]map[string]string {
|
|||
}
|
||||
|
||||
if s.config.ACLsEnabled {
|
||||
if s.UseLegacyACLs() {
|
||||
stats["consul"]["acl"] = "legacy"
|
||||
} else {
|
||||
stats["consul"]["acl"] = "enabled"
|
||||
}
|
||||
stats["consul"]["acl"] = "enabled"
|
||||
} else {
|
||||
stats["consul"]["acl"] = "disabled"
|
||||
}
|
||||
|
|
|
@ -72,6 +72,8 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
|
|||
conf.Tags["use_tls"] = "1"
|
||||
}
|
||||
|
||||
// TODO(ACL-Legacy-Compat): remove in phase 2. These are kept for now to
|
||||
// allow for upgrades.
|
||||
if s.acls.ACLsEnabled() {
|
||||
conf.Tags[metadata.TagACLs] = string(structs.ACLModeEnabled)
|
||||
} else {
|
||||
|
|
|
@ -730,6 +730,7 @@ func (s *Store) ACLTokenList(ws memdb.WatchSet, local, global bool, policy, role
|
|||
return idx, result, nil
|
||||
}
|
||||
|
||||
// TODO(ACL-Legacy-Compat): remove in phase 2
|
||||
func (s *Store) ACLTokenListUpgradeable(max int) (structs.ACLTokens, <-chan struct{}, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
|
|
@ -107,6 +107,7 @@ func tokensTableSchema() *memdb.TableSchema {
|
|||
|
||||
//DEPRECATED (ACL-Legacy-Compat) - This index is only needed while we support upgrading v1 to v2 acls
|
||||
// This table indexes all the ACL tokens that do not have an AccessorID
|
||||
// TODO(ACL-Legacy-Compat): remove in phase 2
|
||||
"needs-upgrade": {
|
||||
Name: "needs-upgrade",
|
||||
AllowMissing: false,
|
||||
|
|
|
@ -4,10 +4,10 @@ import (
|
|||
"runtime"
|
||||
"strconv"
|
||||
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-version"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
)
|
||||
|
||||
// CanServersUnderstandProtocol checks to see if all the servers in the given
|
||||
|
@ -158,72 +158,3 @@ func (c *Client) CheckServers(datacenter string, fn func(*metadata.Server) bool)
|
|||
|
||||
c.router.CheckServers(datacenter, fn)
|
||||
}
|
||||
|
||||
type serversACLMode struct {
|
||||
// leader is the address of the leader
|
||||
leader string
|
||||
|
||||
// mode indicates the overall ACL mode of the servers
|
||||
mode structs.ACLMode
|
||||
|
||||
// leaderMode is the ACL mode of the leader server
|
||||
leaderMode structs.ACLMode
|
||||
|
||||
// indicates that at least one server was processed
|
||||
found bool
|
||||
}
|
||||
|
||||
func (s *serversACLMode) init(leader string) {
|
||||
s.leader = leader
|
||||
s.mode = structs.ACLModeEnabled
|
||||
s.leaderMode = structs.ACLModeUnknown
|
||||
s.found = false
|
||||
}
|
||||
|
||||
func (s *serversACLMode) update(srv *metadata.Server) bool {
|
||||
if srv.Status != serf.StatusAlive && srv.Status != serf.StatusFailed {
|
||||
// they are left or something so regardless we treat these servers as meeting
|
||||
// the version requirement
|
||||
return true
|
||||
}
|
||||
|
||||
// mark that we processed at least one server
|
||||
s.found = true
|
||||
|
||||
if srvAddr := srv.Addr.String(); srvAddr == s.leader {
|
||||
s.leaderMode = srv.ACLs
|
||||
}
|
||||
|
||||
switch srv.ACLs {
|
||||
case structs.ACLModeDisabled:
|
||||
// anything disabled means we cant enable ACLs
|
||||
s.mode = structs.ACLModeDisabled
|
||||
case structs.ACLModeEnabled:
|
||||
// do nothing
|
||||
case structs.ACLModeLegacy:
|
||||
// This covers legacy mode and older server versions that don't advertise ACL support
|
||||
if s.mode != structs.ACLModeDisabled && s.mode != structs.ACLModeUnknown {
|
||||
s.mode = structs.ACLModeLegacy
|
||||
}
|
||||
default:
|
||||
if s.mode != structs.ACLModeDisabled {
|
||||
s.mode = structs.ACLModeUnknown
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// ServersGetACLMode checks all the servers in a particular datacenter and determines
|
||||
// what the minimum ACL mode amongst them is and what the leaders ACL mode is.
|
||||
// The "found" return value indicates whether there were any servers considered in
|
||||
// this datacenter. If that is false then the other mode return values are meaningless
|
||||
// as they will be ACLModeEnabled and ACLModeUnkown respectively.
|
||||
func ServersGetACLMode(provider checkServersProvider, leaderAddr string, datacenter string) (found bool, mode structs.ACLMode, leaderMode structs.ACLMode) {
|
||||
var state serversACLMode
|
||||
state.init(leaderAddr)
|
||||
|
||||
provider.CheckServers(datacenter, state.update)
|
||||
|
||||
return state.found, state.mode, state.leaderMode
|
||||
}
|
||||
|
|
|
@ -2,15 +2,14 @@ package consul
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"regexp"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-version"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
)
|
||||
|
||||
func TestUtil_CanServersUnderstandProtocol(t *testing.T) {
|
||||
|
@ -246,126 +245,3 @@ func TestServersInDCMeetMinimumVersion(t *testing.T) {
|
|||
require.Equal(t, tc.expectedFound, found)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServersGetACLMode(t *testing.T) {
|
||||
t.Parallel()
|
||||
makeServer := func(datacenter string, acls structs.ACLMode, status serf.MemberStatus, addr net.IP) metadata.Server {
|
||||
return metadata.Server{
|
||||
Name: "foo",
|
||||
ShortName: "foo",
|
||||
ID: "asdf",
|
||||
Port: 10000,
|
||||
Expect: 3,
|
||||
RaftVersion: 3,
|
||||
Status: status,
|
||||
WanJoinPort: 1234,
|
||||
Version: 1,
|
||||
Addr: &net.TCPAddr{IP: addr, Port: 10000},
|
||||
// shouldn't matter for these tests
|
||||
Build: *version.Must(version.NewVersion("1.7.0")),
|
||||
Datacenter: datacenter,
|
||||
ACLs: acls,
|
||||
}
|
||||
}
|
||||
|
||||
type tcase struct {
|
||||
servers testServersProvider
|
||||
leaderAddr string
|
||||
datacenter string
|
||||
foundServers bool
|
||||
minMode structs.ACLMode
|
||||
leaderMode structs.ACLMode
|
||||
}
|
||||
|
||||
cases := map[string]tcase{
|
||||
"filter-members": {
|
||||
servers: testServersProvider{
|
||||
makeServer("primary", structs.ACLModeLegacy, serf.StatusAlive, net.IP([]byte{127, 0, 0, 1})),
|
||||
makeServer("primary", structs.ACLModeLegacy, serf.StatusFailed, net.IP([]byte{127, 0, 0, 2})),
|
||||
// filtered datacenter
|
||||
makeServer("secondary", structs.ACLModeUnknown, serf.StatusAlive, net.IP([]byte{127, 0, 0, 4})),
|
||||
// filtered status
|
||||
makeServer("primary", structs.ACLModeUnknown, serf.StatusLeaving, net.IP([]byte{127, 0, 0, 5})),
|
||||
// filtered status
|
||||
makeServer("primary", structs.ACLModeUnknown, serf.StatusLeft, net.IP([]byte{127, 0, 0, 6})),
|
||||
// filtered status
|
||||
makeServer("primary", structs.ACLModeUnknown, serf.StatusNone, net.IP([]byte{127, 0, 0, 7})),
|
||||
},
|
||||
foundServers: true,
|
||||
leaderAddr: "127.0.0.1:10000",
|
||||
datacenter: "primary",
|
||||
minMode: structs.ACLModeLegacy,
|
||||
leaderMode: structs.ACLModeLegacy,
|
||||
},
|
||||
"disabled": {
|
||||
servers: testServersProvider{
|
||||
makeServer("primary", structs.ACLModeLegacy, serf.StatusAlive, net.IP([]byte{127, 0, 0, 1})),
|
||||
makeServer("primary", structs.ACLModeUnknown, serf.StatusAlive, net.IP([]byte{127, 0, 0, 2})),
|
||||
makeServer("primary", structs.ACLModeDisabled, serf.StatusAlive, net.IP([]byte{127, 0, 0, 3})),
|
||||
},
|
||||
foundServers: true,
|
||||
leaderAddr: "127.0.0.1:10000",
|
||||
datacenter: "primary",
|
||||
minMode: structs.ACLModeDisabled,
|
||||
leaderMode: structs.ACLModeLegacy,
|
||||
},
|
||||
"unknown": {
|
||||
servers: testServersProvider{
|
||||
makeServer("primary", structs.ACLModeLegacy, serf.StatusAlive, net.IP([]byte{127, 0, 0, 1})),
|
||||
makeServer("primary", structs.ACLModeUnknown, serf.StatusAlive, net.IP([]byte{127, 0, 0, 2})),
|
||||
},
|
||||
foundServers: true,
|
||||
leaderAddr: "127.0.0.1:10000",
|
||||
datacenter: "primary",
|
||||
minMode: structs.ACLModeUnknown,
|
||||
leaderMode: structs.ACLModeLegacy,
|
||||
},
|
||||
"legacy": {
|
||||
servers: testServersProvider{
|
||||
makeServer("primary", structs.ACLModeEnabled, serf.StatusAlive, net.IP([]byte{127, 0, 0, 1})),
|
||||
makeServer("primary", structs.ACLModeLegacy, serf.StatusAlive, net.IP([]byte{127, 0, 0, 2})),
|
||||
},
|
||||
foundServers: true,
|
||||
leaderAddr: "127.0.0.1:10000",
|
||||
datacenter: "primary",
|
||||
minMode: structs.ACLModeLegacy,
|
||||
leaderMode: structs.ACLModeEnabled,
|
||||
},
|
||||
"enabled": {
|
||||
servers: testServersProvider{
|
||||
makeServer("primary", structs.ACLModeEnabled, serf.StatusAlive, net.IP([]byte{127, 0, 0, 1})),
|
||||
makeServer("primary", structs.ACLModeEnabled, serf.StatusAlive, net.IP([]byte{127, 0, 0, 2})),
|
||||
makeServer("primary", structs.ACLModeEnabled, serf.StatusAlive, net.IP([]byte{127, 0, 0, 3})),
|
||||
},
|
||||
foundServers: true,
|
||||
leaderAddr: "127.0.0.1:10000",
|
||||
datacenter: "primary",
|
||||
minMode: structs.ACLModeEnabled,
|
||||
leaderMode: structs.ACLModeEnabled,
|
||||
},
|
||||
"failed-members": {
|
||||
servers: testServersProvider{
|
||||
makeServer("primary", structs.ACLModeLegacy, serf.StatusAlive, net.IP([]byte{127, 0, 0, 1})),
|
||||
makeServer("primary", structs.ACLModeUnknown, serf.StatusFailed, net.IP([]byte{127, 0, 0, 2})),
|
||||
makeServer("primary", structs.ACLModeLegacy, serf.StatusFailed, net.IP([]byte{127, 0, 0, 3})),
|
||||
},
|
||||
foundServers: true,
|
||||
leaderAddr: "127.0.0.1:10000",
|
||||
datacenter: "primary",
|
||||
minMode: structs.ACLModeUnknown,
|
||||
leaderMode: structs.ACLModeLegacy,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range cases {
|
||||
name := name
|
||||
tc := tc
|
||||
t.Run(name, func(t *testing.T) {
|
||||
actualServers, actualMinMode, actualLeaderMode := ServersGetACLMode(tc.servers, tc.leaderAddr, tc.datacenter)
|
||||
|
||||
require.Equal(t, tc.minMode, actualMinMode)
|
||||
require.Equal(t, tc.leaderMode, actualLeaderMode)
|
||||
require.Equal(t, tc.foundServers, actualServers)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,8 +9,6 @@ import (
|
|||
|
||||
"github.com/hashicorp/go-version"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// Key is used in maps and for equality tests. A key is based on endpoints.
|
||||
|
@ -42,7 +40,6 @@ type Server struct {
|
|||
Addr net.Addr
|
||||
Status serf.MemberStatus
|
||||
ReadReplica bool
|
||||
ACLs structs.ACLMode
|
||||
FeatureFlags map[string]int
|
||||
|
||||
// If true, use TLS when connecting to this server
|
||||
|
@ -97,13 +94,6 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
|
|||
return false, nil
|
||||
}
|
||||
|
||||
var acls structs.ACLMode
|
||||
if aclMode, ok := m.Tags[TagACLs]; ok {
|
||||
acls = structs.ACLMode(aclMode)
|
||||
} else {
|
||||
acls = structs.ACLModeUnknown
|
||||
}
|
||||
|
||||
segmentAddrs := make(map[string]string)
|
||||
segmentPorts := make(map[string]int)
|
||||
featureFlags := make(map[string]int)
|
||||
|
@ -188,12 +178,12 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
|
|||
UseTLS: useTLS,
|
||||
// DEPRECATED - remove nonVoter check once support for that tag is removed
|
||||
ReadReplica: nonVoter || readReplica,
|
||||
ACLs: acls,
|
||||
FeatureFlags: featureFlags,
|
||||
}
|
||||
return true, parts
|
||||
}
|
||||
|
||||
// TODO(ACL-Legacy-Compat): remove in phase 2
|
||||
const TagACLs = "acls"
|
||||
|
||||
const featureFlagPrefix = "ft_"
|
||||
|
|
|
@ -6,12 +6,13 @@ import (
|
|||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/goleak"
|
||||
|
||||
"github.com/hashicorp/consul/agent"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/goleak"
|
||||
)
|
||||
|
||||
func testTLSCertificates(serverName string) (cert string, key string, cacert string, err error) {
|
||||
|
|
|
@ -20,16 +20,10 @@ import (
|
|||
type ACLMode string
|
||||
|
||||
const (
|
||||
// ACLs are disabled by configuration
|
||||
// ACLModeDisabled indicates the ACL system is disabled
|
||||
ACLModeDisabled ACLMode = "0"
|
||||
// ACLs are enabled
|
||||
// ACLModeEnabled indicates the ACL system is enabled
|
||||
ACLModeEnabled ACLMode = "1"
|
||||
// DEPRECATED (ACL-Legacy-Compat) - only needed while legacy ACLs are supported
|
||||
// ACLs are enabled and using legacy ACLs
|
||||
ACLModeLegacy ACLMode = "2"
|
||||
// DEPRECATED (ACL-Legacy-Compat) - only needed while legacy ACLs are supported
|
||||
// ACLs are assumed enabled but not being advertised
|
||||
ACLModeUnknown ACLMode = "3"
|
||||
)
|
||||
|
||||
type ACLTokenIDType string
|
||||
|
|
Loading…
Reference in New Issue