Implement Leader Routine Management (#6580)

* Implement leader routine manager

Switch over the following to use it for go routine management:

• Config entry Replication
• ACL replication - tokens, policies, roles and legacy tokens
• ACL legacy token upgrade
• ACL token reaping
• Intention Replication
• Secondary CA Roots Watching
• CA Root Pruning

Also added the StopAll call into the Server Shutdown method to ensure all leader routines get killed off when shutting down.

This should be mostly unnecessary as `revokeLeadership` should manually stop each one but just in case we really want these to go away (eventually).
This commit is contained in:
Matt Keeler 2019-10-04 13:08:45 -04:00 committed by GitHub
parent 29f0616708
commit b0b57588d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 482 additions and 353 deletions

View File

@ -9,17 +9,27 @@ import (
"golang.org/x/time/rate"
)
func (s *Server) startACLTokenReaping() {
s.aclTokenReapLock.Lock()
defer s.aclTokenReapLock.Unlock()
func (s *Server) reapExpiredTokens(ctx context.Context) error {
limiter := rate.NewLimiter(aclTokenReapingRateLimit, aclTokenReapingBurst)
for {
if err := limiter.Wait(ctx); err != nil {
return err
}
if s.aclTokenReapEnabled {
return
if s.LocalTokensEnabled() {
if _, err := s.reapExpiredLocalACLTokens(); err != nil {
s.logger.Printf("[ERR] acl: error reaping expired local ACL tokens: %v", err)
}
}
if s.InACLDatacenter() {
if _, err := s.reapExpiredGlobalACLTokens(); err != nil {
s.logger.Printf("[ERR] acl: error reaping expired global ACL tokens: %v", err)
}
}
}
}
ctx, cancel := context.WithCancel(context.Background())
s.aclTokenReapCancel = cancel
func (s *Server) startACLTokenReaping() {
// Do a quick check for config settings that would imply the goroutine
// below will just spin forever.
//
@ -30,41 +40,11 @@ func (s *Server) startACLTokenReaping() {
return
}
go func() {
limiter := rate.NewLimiter(aclTokenReapingRateLimit, aclTokenReapingBurst)
for {
if err := limiter.Wait(ctx); err != nil {
return
}
if s.LocalTokensEnabled() {
if _, err := s.reapExpiredLocalACLTokens(); err != nil {
s.logger.Printf("[ERR] acl: error reaping expired local ACL tokens: %v", err)
}
}
if s.InACLDatacenter() {
if _, err := s.reapExpiredGlobalACLTokens(); err != nil {
s.logger.Printf("[ERR] acl: error reaping expired global ACL tokens: %v", err)
}
}
}
}()
s.aclTokenReapEnabled = true
s.leaderRoutineManager.Start(aclTokenReapingRoutineName, s.reapExpiredTokens)
}
func (s *Server) stopACLTokenReaping() {
s.aclTokenReapLock.Lock()
defer s.aclTokenReapLock.Unlock()
if !s.aclTokenReapEnabled {
return
}
s.aclTokenReapCancel()
s.aclTokenReapCancel = nil
s.aclTokenReapEnabled = false
s.leaderRoutineManager.Stop(aclTokenReapingRoutineName)
}
func (s *Server) reapExpiredGlobalACLTokens() (int, error) {

View File

@ -649,239 +649,244 @@ func (s *Server) initializeACLs(upgrade bool) error {
return nil
}
func (s *Server) startACLUpgrade() {
s.aclUpgradeLock.Lock()
defer s.aclUpgradeLock.Unlock()
// This function is only intended to be run as a managed go routine, it will block until
// the context passed in indicates that it should exit.
func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error {
limiter := rate.NewLimiter(aclUpgradeRateLimit, int(aclUpgradeRateLimit))
for {
if err := limiter.Wait(ctx); err != nil {
return err
}
if s.aclUpgradeEnabled {
return
}
// actually run the upgrade here
state := s.fsm.State()
tokens, waitCh, err := state.ACLTokenListUpgradeable(aclUpgradeBatchSize)
if err != nil {
s.logger.Printf("[WARN] acl: encountered an error while searching for tokens without accessor ids: %v", err)
}
// No need to check expiration time here, as that only exists for v2 tokens.
ctx, cancel := context.WithCancel(context.Background())
s.aclUpgradeCancel = cancel
if len(tokens) == 0 {
ws := memdb.NewWatchSet()
ws.Add(state.AbandonCh())
ws.Add(waitCh)
ws.Add(ctx.Done())
go func() {
limiter := rate.NewLimiter(aclUpgradeRateLimit, int(aclUpgradeRateLimit))
for {
if err := limiter.Wait(ctx); err != nil {
return
}
// wait for more tokens to need upgrading or the aclUpgradeCh to be closed
ws.Watch(nil)
continue
}
// actually run the upgrade here
state := s.fsm.State()
tokens, waitCh, err := state.ACLTokenListUpgradeable(aclUpgradeBatchSize)
if err != nil {
s.logger.Printf("[WARN] acl: encountered an error while searching for tokens without accessor ids: %v", err)
}
// No need to check expiration time here, as that only exists for v2 tokens.
if len(tokens) == 0 {
ws := memdb.NewWatchSet()
ws.Add(state.AbandonCh())
ws.Add(waitCh)
ws.Add(ctx.Done())
// wait for more tokens to need upgrading or the aclUpgradeCh to be closed
ws.Watch(nil)
var newTokens structs.ACLTokens
for _, token := range tokens {
// This should be entirely unnecessary but is just a small safeguard against changing accessor IDs
if token.AccessorID != "" {
continue
}
var newTokens structs.ACLTokens
for _, token := range tokens {
// This should be entirely unnecessary but is just a small safeguard against changing accessor IDs
if token.AccessorID != "" {
newToken := *token
if token.SecretID == anonymousToken {
newToken.AccessorID = structs.ACLTokenAnonymousID
} else {
accessor, err := lib.GenerateUUID(s.checkTokenUUID)
if err != nil {
s.logger.Printf("[WARN] acl: failed to generate accessor during token auto-upgrade: %v", err)
continue
}
newToken := *token
if token.SecretID == anonymousToken {
newToken.AccessorID = structs.ACLTokenAnonymousID
} else {
accessor, err := lib.GenerateUUID(s.checkTokenUUID)
if err != nil {
s.logger.Printf("[WARN] acl: failed to generate accessor during token auto-upgrade: %v", err)
continue
}
newToken.AccessorID = accessor
}
// Assign the global-management policy to legacy management tokens
if len(newToken.Policies) == 0 &&
len(newToken.ServiceIdentities) == 0 &&
len(newToken.Roles) == 0 &&
newToken.Type == structs.ACLTokenTypeManagement {
newToken.Policies = append(newToken.Policies, structs.ACLTokenPolicyLink{ID: structs.ACLPolicyGlobalManagementID})
}
// need to copy these as we are going to do a CAS operation.
newToken.CreateIndex = token.CreateIndex
newToken.ModifyIndex = token.ModifyIndex
newToken.SetHash(true)
newTokens = append(newTokens, &newToken)
newToken.AccessorID = accessor
}
req := &structs.ACLTokenBatchSetRequest{Tokens: newTokens, CAS: true}
resp, err := s.raftApply(structs.ACLTokenSetRequestType, req)
if err != nil {
s.logger.Printf("[ERR] acl: failed to apply acl token upgrade batch: %v", err)
// Assign the global-management policy to legacy management tokens
if len(newToken.Policies) == 0 &&
len(newToken.ServiceIdentities) == 0 &&
len(newToken.Roles) == 0 &&
newToken.Type == structs.ACLTokenTypeManagement {
newToken.Policies = append(newToken.Policies, structs.ACLTokenPolicyLink{ID: structs.ACLPolicyGlobalManagementID})
}
if err, ok := resp.(error); ok {
s.logger.Printf("[ERR] acl: failed to apply acl token upgrade batch: %v", err)
}
// need to copy these as we are going to do a CAS operation.
newToken.CreateIndex = token.CreateIndex
newToken.ModifyIndex = token.ModifyIndex
newToken.SetHash(true)
newTokens = append(newTokens, &newToken)
}
}()
s.aclUpgradeEnabled = true
req := &structs.ACLTokenBatchSetRequest{Tokens: newTokens, CAS: true}
resp, err := s.raftApply(structs.ACLTokenSetRequestType, req)
if err != nil {
s.logger.Printf("[ERR] acl: failed to apply acl token upgrade batch: %v", err)
}
if err, ok := resp.(error); ok {
s.logger.Printf("[ERR] acl: failed to apply acl token upgrade batch: %v", err)
}
}
}
func (s *Server) startACLUpgrade() {
if s.config.PrimaryDatacenter != s.config.Datacenter {
// token upgrades should only run in the primary
return
}
s.leaderRoutineManager.Start(aclUpgradeRoutineName, s.legacyACLTokenUpgrade)
}
func (s *Server) stopACLUpgrade() {
s.aclUpgradeLock.Lock()
defer s.aclUpgradeLock.Unlock()
s.leaderRoutineManager.Stop(aclUpgradeRoutineName)
}
if !s.aclUpgradeEnabled {
return
// This function is only intended to be run as a managed go routine, it will block until
// the context passed in indicates that it should exit.
func (s *Server) runLegacyACLReplication(ctx context.Context) error {
var lastRemoteIndex uint64
limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst)
for {
if err := limiter.Wait(ctx); err != nil {
return err
}
if s.tokens.ReplicationToken() == "" {
continue
}
index, exit, err := s.replicateLegacyACLs(lastRemoteIndex, ctx)
if exit {
return nil
}
if err != nil {
lastRemoteIndex = 0
s.updateACLReplicationStatusError()
s.logger.Printf("[WARN] consul: Legacy ACL replication error (will retry if still leader): %v", err)
} else {
lastRemoteIndex = index
s.updateACLReplicationStatusIndex(structs.ACLReplicateLegacy, index)
s.logger.Printf("[DEBUG] consul: Legacy ACL replication completed through remote index %d", index)
}
}
s.aclUpgradeCancel()
s.aclUpgradeCancel = nil
s.aclUpgradeEnabled = false
}
func (s *Server) startLegacyACLReplication() {
s.aclReplicationLock.Lock()
defer s.aclReplicationLock.Unlock()
if s.InACLDatacenter() {
return
}
if s.aclReplicationEnabled {
// unlike some other leader routines this initializes some extra state
// and therefore we want to prevent re-initialization if things are already
// running
if s.leaderRoutineManager.IsRunning(legacyACLReplicationRoutineName) {
return
}
s.initReplicationStatus()
ctx, cancel := context.WithCancel(context.Background())
s.aclReplicationCancel = cancel
go func() {
var lastRemoteIndex uint64
limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst)
for {
if err := limiter.Wait(ctx); err != nil {
return
}
if s.tokens.ReplicationToken() == "" {
continue
}
index, exit, err := s.replicateLegacyACLs(lastRemoteIndex, ctx)
if exit {
return
}
if err != nil {
lastRemoteIndex = 0
s.updateACLReplicationStatusError()
s.logger.Printf("[WARN] consul: Legacy ACL replication error (will retry if still leader): %v", err)
} else {
lastRemoteIndex = index
s.updateACLReplicationStatusIndex(structs.ACLReplicateLegacy, index)
s.logger.Printf("[DEBUG] consul: Legacy ACL replication completed through remote index %d", index)
}
}
}()
s.leaderRoutineManager.Start(legacyACLReplicationRoutineName, s.runLegacyACLReplication)
s.logger.Printf("[INFO] acl: started legacy ACL replication")
s.updateACLReplicationStatusRunning(structs.ACLReplicateLegacy)
s.aclReplicationEnabled = true
}
func (s *Server) startACLReplication() {
s.aclReplicationLock.Lock()
defer s.aclReplicationLock.Unlock()
if s.InACLDatacenter() {
return
}
if s.aclReplicationEnabled {
// unlike some other leader routines this initializes some extra state
// and therefore we want to prevent re-initialization if things are already
// running
if s.leaderRoutineManager.IsRunning(aclPolicyReplicationRoutineName) {
return
}
s.initReplicationStatus()
ctx, cancel := context.WithCancel(context.Background())
s.aclReplicationCancel = cancel
s.startACLReplicator(ctx, structs.ACLReplicatePolicies, s.replicateACLPolicies)
s.startACLReplicator(ctx, structs.ACLReplicateRoles, s.replicateACLRoles)
s.leaderRoutineManager.Start(aclPolicyReplicationRoutineName, s.runACLPolicyReplicator)
s.leaderRoutineManager.Start(aclRoleReplicationRoutineName, s.runACLRoleReplicator)
if s.config.ACLTokenReplication {
s.startACLReplicator(ctx, structs.ACLReplicateTokens, s.replicateACLTokens)
s.leaderRoutineManager.Start(aclTokenReplicationRoutineName, s.runACLTokenReplicator)
s.updateACLReplicationStatusRunning(structs.ACLReplicateTokens)
} else {
s.updateACLReplicationStatusRunning(structs.ACLReplicatePolicies)
}
s.aclReplicationEnabled = true
}
type replicateFunc func(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error)
func (s *Server) startACLReplicator(ctx context.Context, replicationType structs.ACLReplicationType, replicateFunc replicateFunc) {
go func() {
var failedAttempts uint
limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst)
// This function is only intended to be run as a managed go routine, it will block until
// the context passed in indicates that it should exit.
func (s *Server) runACLPolicyReplicator(ctx context.Context) error {
s.logger.Printf("[INFO] acl: started ACL Policy replication")
var lastRemoteIndex uint64
for {
if err := limiter.Wait(ctx); err != nil {
return
}
return s.runACLReplicator(ctx, structs.ACLReplicatePolicies, s.replicateACLPolicies)
}
if s.tokens.ReplicationToken() == "" {
continue
}
// This function is only intended to be run as a managed go routine, it will block until
// the context passed in indicates that it should exit.
func (s *Server) runACLRoleReplicator(ctx context.Context) error {
s.logger.Printf("[INFO] acl: started ACL Role replication")
return s.runACLReplicator(ctx, structs.ACLReplicateRoles, s.replicateACLRoles)
}
index, exit, err := replicateFunc(ctx, lastRemoteIndex)
if exit {
return
}
// This function is only intended to be run as a managed go routine, it will block until
// the context passed in indicates that it should exit.
func (s *Server) runACLTokenReplicator(ctx context.Context) error {
return s.runACLReplicator(ctx, structs.ACLReplicateTokens, s.replicateACLTokens)
}
if err != nil {
lastRemoteIndex = 0
s.updateACLReplicationStatusError()
s.logger.Printf("[WARN] consul: ACL %s replication error (will retry if still leader): %v", replicationType.SingularNoun(), err)
if (1 << failedAttempts) < aclReplicationMaxRetryBackoff {
failedAttempts++
}
// This function is only intended to be run as a managed go routine, it will block until
// the context passed in indicates that it should exit.
func (s *Server) runACLReplicator(ctx context.Context, replicationType structs.ACLReplicationType, replicateFunc replicateFunc) error {
var failedAttempts uint
limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst)
select {
case <-ctx.Done():
return
case <-time.After((1 << failedAttempts) * time.Second):
// do nothing
}
} else {
lastRemoteIndex = index
s.updateACLReplicationStatusIndex(replicationType, index)
s.logger.Printf("[DEBUG] consul: ACL %s replication completed through remote index %d", replicationType.SingularNoun(), index)
failedAttempts = 0
}
var lastRemoteIndex uint64
for {
if err := limiter.Wait(ctx); err != nil {
return err
}
}()
s.logger.Printf("[INFO] acl: started ACL %s replication", replicationType.SingularNoun())
if s.tokens.ReplicationToken() == "" {
continue
}
index, exit, err := replicateFunc(ctx, lastRemoteIndex)
if exit {
return nil
}
if err != nil {
lastRemoteIndex = 0
s.updateACLReplicationStatusError()
s.logger.Printf("[WARN] consul: ACL %s replication error (will retry if still leader): %v", replicationType.SingularNoun(), err)
if (1 << failedAttempts) < aclReplicationMaxRetryBackoff {
failedAttempts++
}
select {
case <-ctx.Done():
return nil
case <-time.After((1 << failedAttempts) * time.Second):
// do nothing
}
} else {
lastRemoteIndex = index
s.updateACLReplicationStatusIndex(replicationType, index)
s.logger.Printf("[DEBUG] consul: ACL %s replication completed through remote index %d", replicationType.SingularNoun(), index)
failedAttempts = 0
}
}
}
func (s *Server) stopACLReplication() {
s.aclReplicationLock.Lock()
defer s.aclReplicationLock.Unlock()
if !s.aclReplicationEnabled {
return
}
s.aclReplicationCancel()
s.aclReplicationCancel = nil
s.updateACLReplicationStatusStopped()
s.aclReplicationEnabled = false
// these will be no-ops when not started
s.leaderRoutineManager.Stop(legacyACLReplicationRoutineName)
s.leaderRoutineManager.Stop(aclPolicyReplicationRoutineName)
s.leaderRoutineManager.Stop(aclRoleReplicationRoutineName)
s.leaderRoutineManager.Stop(aclTokenReplicationRoutineName)
}
func (s *Server) startConfigReplication() {
@ -890,12 +895,12 @@ func (s *Server) startConfigReplication() {
return
}
s.configReplicator.Start()
s.leaderRoutineManager.Start(configReplicationRoutineName, s.configReplicator.Run)
}
func (s *Server) stopConfigReplication() {
// will be a no-op when not started
s.configReplicator.Stop()
s.leaderRoutineManager.Stop(configReplicationRoutineName)
}
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary

View File

@ -439,52 +439,30 @@ func (s *Server) generateCASignRequest(csr string) *structs.CASignRequest {
// startConnectLeader starts multi-dc connect leader routines.
func (s *Server) startConnectLeader() {
s.connectLock.Lock()
defer s.connectLock.Unlock()
if s.connectEnabled {
return
}
s.connectCh = make(chan struct{})
// Start the Connect secondary DC actions if enabled.
if s.config.ConnectEnabled && s.config.Datacenter != s.config.PrimaryDatacenter {
go s.secondaryCARootWatch(s.connectCh)
go s.replicateIntentions(s.connectCh)
s.leaderRoutineManager.Start(secondaryCARootWatchRoutineName, s.secondaryCARootWatch)
s.leaderRoutineManager.Start(intentionReplicationRoutineName, s.replicateIntentions)
}
go s.runCARootPruning(s.connectCh)
s.connectEnabled = true
s.leaderRoutineManager.Start(caRootPruningRoutineName, s.runCARootPruning)
}
// stopConnectLeader stops connect specific leader functions.
func (s *Server) stopConnectLeader() {
s.connectLock.Lock()
defer s.connectLock.Unlock()
if !s.connectEnabled {
return
}
s.actingSecondaryLock.Lock()
s.actingSecondaryCA = false
s.actingSecondaryLock.Unlock()
close(s.connectCh)
s.connectEnabled = false
s.leaderRoutineManager.Stop(secondaryCARootWatchRoutineName)
s.leaderRoutineManager.Stop(intentionReplicationRoutineName)
s.leaderRoutineManager.Stop(caRootPruningRoutineName)
}
func (s *Server) runCARootPruning(stopCh <-chan struct{}) {
func (s *Server) runCARootPruning(ctx context.Context) error {
ticker := time.NewTicker(caRootPruneInterval)
defer ticker.Stop()
for {
select {
case <-stopCh:
return
case <-ctx.Done():
return nil
case <-ticker.C:
if err := s.pruneCARoots(); err != nil {
s.logger.Printf("[ERR] connect: error pruning CA roots: %v", err)
@ -549,7 +527,7 @@ func (s *Server) pruneCARoots() error {
// secondaryCARootWatch maintains a blocking query to the primary datacenter's
// ConnectCA.Roots endpoint to monitor when it needs to request a new signed
// intermediate certificate.
func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) {
func (s *Server) secondaryCARootWatch(ctx context.Context) error {
args := structs.DCSpecificRequest{
Datacenter: s.config.PrimaryDatacenter,
QueryOptions: structs.QueryOptions{
@ -559,7 +537,7 @@ func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) {
s.logger.Printf("[DEBUG] connect: starting Connect CA root replication from primary datacenter %q", s.config.PrimaryDatacenter)
retryLoopBackoff(stopCh, func() error {
retryLoopBackoff(ctx.Done(), func() error {
var roots structs.IndexedCARoots
if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil {
return fmt.Errorf("Error retrieving the primary datacenter's roots: %v", err)
@ -598,18 +576,20 @@ func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) {
}, func(err error) {
s.logger.Printf("[ERR] connect: %v", err)
})
return nil
}
// replicateIntentions executes a blocking query to the primary datacenter to replicate
// the intentions there to the local state.
func (s *Server) replicateIntentions(stopCh <-chan struct{}) {
func (s *Server) replicateIntentions(ctx context.Context) error {
args := structs.DCSpecificRequest{
Datacenter: s.config.PrimaryDatacenter,
}
s.logger.Printf("[DEBUG] connect: starting Connect intention replication from primary datacenter %q", s.config.PrimaryDatacenter)
retryLoopBackoff(stopCh, func() error {
retryLoopBackoff(ctx.Done(), func() error {
// Always use the latest replication token value in case it changed while looping.
args.QueryOptions.Token = s.tokens.ReplicationToken()
@ -653,6 +633,7 @@ func (s *Server) replicateIntentions(stopCh <-chan struct{}) {
}, func(err error) {
s.logger.Printf("[ERR] connect: error replicating intentions: %v", err)
})
return nil
}
// retryLoopBackoff loops a given function indefinitely, backing off exponentially

View File

@ -0,0 +1,120 @@
package consul
import (
"context"
"log"
"os"
"sync"
)
type LeaderRoutine func(ctx context.Context) error
type leaderRoutine struct {
running bool
cancel context.CancelFunc
}
type LeaderRoutineManager struct {
lock sync.RWMutex
logger *log.Logger
routines map[string]*leaderRoutine
}
func NewLeaderRoutineManager(logger *log.Logger) *LeaderRoutineManager {
if logger == nil {
logger = log.New(os.Stderr, "", log.LstdFlags)
}
return &LeaderRoutineManager{
logger: logger,
routines: make(map[string]*leaderRoutine),
}
}
func (m *LeaderRoutineManager) IsRunning(name string) bool {
m.lock.Lock()
defer m.lock.Unlock()
if routine, ok := m.routines[name]; ok {
return routine.running
}
return false
}
func (m *LeaderRoutineManager) Start(name string, routine LeaderRoutine) error {
return m.StartWithContext(nil, name, routine)
}
func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name string, routine LeaderRoutine) error {
m.lock.Lock()
defer m.lock.Unlock()
if instance, ok := m.routines[name]; ok && instance.running {
return nil
}
if parentCtx == nil {
parentCtx = context.Background()
}
ctx, cancel := context.WithCancel(parentCtx)
instance := &leaderRoutine{
running: true,
cancel: cancel,
}
go func() {
err := routine(ctx)
if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
m.logger.Printf("[ERROR] leader: %s routine exited with error: %v", name, err)
} else {
m.logger.Printf("[DEBUG] leader: stopped %s routine", name)
}
m.lock.Lock()
instance.running = false
m.lock.Unlock()
}()
m.routines[name] = instance
m.logger.Printf("[INFO] leader: started %s routine", name)
return nil
}
func (m *LeaderRoutineManager) Stop(name string) error {
m.lock.Lock()
defer m.lock.Unlock()
instance, ok := m.routines[name]
if !ok {
// no running instance
return nil
}
if !instance.running {
return nil
}
m.logger.Printf("[DEBUG] leader: stopping %s routine", name)
instance.cancel()
delete(m.routines, name)
return nil
}
func (m *LeaderRoutineManager) StopAll() {
m.lock.Lock()
defer m.lock.Unlock()
for name, routine := range m.routines {
if !routine.running {
continue
}
m.logger.Printf("[DEBUG] leader: stopping %s routine", name)
routine.cancel()
}
// just whipe out the entire map
m.routines = make(map[string]*leaderRoutine)
}

View File

@ -0,0 +1,73 @@
package consul
import (
"context"
"sync/atomic"
"testing"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/require"
)
func TestLeaderRoutineManager(t *testing.T) {
t.Parallel()
var runs uint32
var running uint32
// tlog := testutil.NewCancellableTestLogger(t)
// defer tlog.Cancel()
mgr := NewLeaderRoutineManager(testutil.TestLogger(t))
run := func(ctx context.Context) error {
atomic.StoreUint32(&running, 1)
defer atomic.StoreUint32(&running, 0)
atomic.AddUint32(&runs, 1)
<-ctx.Done()
return nil
}
// IsRunning on unregistered service should be false
require.False(t, mgr.IsRunning("not-found"))
// start
require.NoError(t, mgr.Start("run", run))
require.True(t, mgr.IsRunning("run"))
retry.Run(t, func(r *retry.R) {
require.Equal(r, uint32(1), atomic.LoadUint32(&runs))
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
})
require.NoError(t, mgr.Stop("run"))
// ensure the background go routine was actually cancelled
retry.Run(t, func(r *retry.R) {
require.Equal(r, uint32(1), atomic.LoadUint32(&runs))
require.Equal(r, uint32(0), atomic.LoadUint32(&running))
})
// restart and stop
require.NoError(t, mgr.Start("run", run))
retry.Run(t, func(r *retry.R) {
require.Equal(r, uint32(2), atomic.LoadUint32(&runs))
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
})
require.NoError(t, mgr.Stop("run"))
retry.Run(t, func(r *retry.R) {
require.Equal(r, uint32(0), atomic.LoadUint32(&running))
})
// start with a context
ctx, cancel := context.WithCancel(context.Background())
require.NoError(t, mgr.StartWithContext(ctx, "run", run))
cancel()
// The function should exit of its own accord due to the parent
// context being canceled
retry.Run(t, func(r *retry.R) {
require.Equal(r, uint32(3), atomic.LoadUint32(&runs))
require.Equal(r, uint32(0), atomic.LoadUint32(&running))
// the task should automatically set itself to not running if
// it exits early
require.False(r, mgr.IsRunning("run"))
})
}

View File

@ -5,7 +5,7 @@ import (
"fmt"
"log"
"os"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/consul/lib"
@ -40,15 +40,12 @@ type ReplicatorConfig struct {
type ReplicatorFunc func(ctx context.Context, lastRemoteIndex uint64) (index uint64, exit bool, err error)
type Replicator struct {
name string
lock sync.RWMutex
running bool
cancel context.CancelFunc
ctx context.Context
limiter *rate.Limiter
waiter *lib.RetryWaiter
replicate ReplicatorFunc
logger *log.Logger
name string
limiter *rate.Limiter
waiter *lib.RetryWaiter
replicateFn ReplicatorFunc
logger *log.Logger
lastRemoteIndex uint64
}
func NewReplicator(config *ReplicatorConfig) (*Replicator, error) {
@ -74,64 +71,45 @@ func NewReplicator(config *ReplicatorConfig) (*Replicator, error) {
}
waiter := lib.NewRetryWaiter(minFailures, 0*time.Second, maxWait, lib.NewJitterRandomStagger(10))
return &Replicator{
name: config.Name,
running: false,
limiter: limiter,
waiter: waiter,
replicate: config.ReplicateFn,
logger: config.Logger,
name: config.Name,
limiter: limiter,
waiter: waiter,
replicateFn: config.ReplicateFn,
logger: config.Logger,
}, nil
}
func (r *Replicator) Start() {
r.lock.Lock()
defer r.lock.Unlock()
if r.running {
return
}
r.ctx, r.cancel = context.WithCancel(context.Background())
go r.run()
r.running = true
r.logger.Printf("[INFO] replication: started %s replication", r.name)
}
func (r *Replicator) run() {
var lastRemoteIndex uint64
func (r *Replicator) Run(ctx context.Context) error {
defer r.logger.Printf("[INFO] replication: stopped %s replication", r.name)
for {
// This ensures we aren't doing too many successful replication rounds - mostly useful when
// the data within the primary datacenter is changing rapidly but we try to limit the amount
// of resources replication into the secondary datacenter should take
if err := r.limiter.Wait(r.ctx); err != nil {
return
if err := r.limiter.Wait(ctx); err != nil {
return nil
}
// Perform a single round of replication
index, exit, err := r.replicate(r.ctx, lastRemoteIndex)
index, exit, err := r.replicateFn(ctx, atomic.LoadUint64(&r.lastRemoteIndex))
if exit {
// the replication function told us to exit
return
return nil
}
if err != nil {
// reset the lastRemoteIndex when there is an RPC failure. This should cause a full sync to be done during
// the next round of replication
lastRemoteIndex = 0
atomic.StoreUint64(&r.lastRemoteIndex, 0)
r.logger.Printf("[WARN] replication: %s replication error (will retry if still leader): %v", r.name, err)
} else {
lastRemoteIndex = index
atomic.StoreUint64(&r.lastRemoteIndex, index)
r.logger.Printf("[DEBUG] replication: %s replication completed through remote index %d", r.name, index)
}
select {
case <-r.ctx.Done():
return
case <-ctx.Done():
return nil
// wait some amount of time to prevent churning through many replication rounds while replication is failing
case <-r.waiter.WaitIfErr(err):
// do nothing
@ -139,16 +117,6 @@ func (r *Replicator) run() {
}
}
func (r *Replicator) Stop() {
r.lock.Lock()
defer r.lock.Unlock()
if !r.running {
return
}
r.logger.Printf("[DEBUG] replication: stopping %s replication", r.name)
r.cancel()
r.cancel = nil
r.running = false
func (r *Replicator) Index() uint64 {
return atomic.LoadUint64(&r.lastRemoteIndex)
}

View File

@ -4,15 +4,19 @@ import (
"context"
"testing"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
)
func TestReplicationRestart(t *testing.T) {
mgr := NewLeaderRoutineManager(testutil.TestLogger(t))
config := ReplicatorConfig{
Name: "mock",
ReplicateFn: func(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) {
return 1, false, nil
},
Rate: 1,
Burst: 1,
}
@ -20,9 +24,9 @@ func TestReplicationRestart(t *testing.T) {
repl, err := NewReplicator(&config)
require.NoError(t, err)
repl.Start()
repl.Stop()
repl.Start()
mgr.Start("mock", repl.Run)
mgr.Stop("mock")
mgr.Start("mock", repl.Run)
// Previously this would have segfaulted
repl.Stop()
mgr.Stop("mock")
}

View File

@ -1,7 +1,6 @@
package consul
import (
"context"
"errors"
"fmt"
"io"
@ -88,6 +87,19 @@ const (
reconcileChSize = 256
)
const (
legacyACLReplicationRoutineName = "legacy ACL replication"
aclPolicyReplicationRoutineName = "ACL policy replication"
aclRoleReplicationRoutineName = "ACL role replication"
aclTokenReplicationRoutineName = "ACL token replication"
aclTokenReapingRoutineName = "acl token reaping"
aclUpgradeRoutineName = "legacy ACL token upgrade"
caRootPruningRoutineName = "CA root pruning"
configReplicationRoutineName = "config entry replication"
intentionReplicationRoutineName = "intention replication"
secondaryCARootWatchRoutineName = "secondary CA roots watch"
)
var (
ErrWANFederationDisabled = fmt.Errorf("WAN Federation is disabled")
)
@ -101,24 +113,6 @@ type Server struct {
// acls is used to resolve tokens to effective policies
acls *ACLResolver
// aclUpgradeCancel is used to cancel the ACL upgrade goroutine when we
// lose leadership
aclUpgradeCancel context.CancelFunc
aclUpgradeLock sync.RWMutex
aclUpgradeEnabled bool
// aclReplicationCancel is used to shut down the ACL replication goroutine
// when we lose leadership
aclReplicationCancel context.CancelFunc
aclReplicationLock sync.RWMutex
aclReplicationEnabled bool
// aclTokenReapCancel is used to shut down the ACL Token expiration reap
// goroutine when we lose leadership.
aclTokenReapCancel context.CancelFunc
aclTokenReapLock sync.RWMutex
aclTokenReapEnabled bool
aclAuthMethodValidators map[string]*authMethodValidatorEntry
aclAuthMethodValidatorLock sync.RWMutex
@ -271,15 +265,13 @@ type Server struct {
shutdownCh chan struct{}
shutdownLock sync.Mutex
// State for multi-dc connect leader logic
connectLock sync.RWMutex
connectEnabled bool
connectCh chan struct{}
// State for whether this datacenter is acting as a secondary CA.
actingSecondaryCA bool
actingSecondaryLock sync.RWMutex
// Manager to handle starting/stopping go routines when establishing/revoking raft leadership
leaderRoutineManager *LeaderRoutineManager
// embedded struct to hold all the enterprise specific data
EnterpriseServer
}
@ -354,24 +346,25 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store, tl
// Create server.
s := &Server{
config: config,
tokens: tokens,
connPool: connPool,
eventChLAN: make(chan serf.Event, serfEventChSize),
eventChWAN: make(chan serf.Event, serfEventChSize),
logger: logger,
leaveCh: make(chan struct{}),
reconcileCh: make(chan serf.Member, reconcileChSize),
router: router.NewRouter(logger, config.Datacenter),
rpcServer: rpc.NewServer(),
insecureRPCServer: rpc.NewServer(),
tlsConfigurator: tlsConfigurator,
reassertLeaderCh: make(chan chan error),
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
sessionTimers: NewSessionTimers(),
tombstoneGC: gc,
serverLookup: NewServerLookup(),
shutdownCh: shutdownCh,
config: config,
tokens: tokens,
connPool: connPool,
eventChLAN: make(chan serf.Event, serfEventChSize),
eventChWAN: make(chan serf.Event, serfEventChSize),
logger: logger,
leaveCh: make(chan struct{}),
reconcileCh: make(chan serf.Member, reconcileChSize),
router: router.NewRouter(logger, config.Datacenter),
rpcServer: rpc.NewServer(),
insecureRPCServer: rpc.NewServer(),
tlsConfigurator: tlsConfigurator,
reassertLeaderCh: make(chan chan error),
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
sessionTimers: NewSessionTimers(),
tombstoneGC: gc,
serverLookup: NewServerLookup(),
shutdownCh: shutdownCh,
leaderRoutineManager: NewLeaderRoutineManager(logger),
}
// Initialize enterprise specific server functionality
@ -812,6 +805,11 @@ func (s *Server) Shutdown() error {
s.shutdown = true
close(s.shutdownCh)
// ensure that any leader routines still running get canceled
if s.leaderRoutineManager != nil {
s.leaderRoutineManager.StopAll()
}
if s.serfLAN != nil {
s.serfLAN.Shutdown()
}