Ensure that retryLoopBackoff can be cancelled

We needed to pass a cancellable context into the limiter.Wait instead of context.Background. So I made the func take a context instead of a chan as most places were just passing through a Done chan from a context anyways.

Fix go routine leak in the gateway locator
This commit is contained in:
Matt Keeler 2020-06-24 12:36:14 -04:00
parent e395efdbdc
commit 341aedbce9
No known key found for this signature in database
GPG Key ID: 04DBAE1857E0081B
4 changed files with 20 additions and 12 deletions

View File

@ -1,6 +1,7 @@
package consul package consul
import ( import (
"context"
"errors" "errors"
"math/rand" "math/rand"
"sort" "sort"
@ -261,9 +262,9 @@ func NewGatewayLocator(
var errGatewayLocalStateNotInitialized = errors.New("local state not initialized") var errGatewayLocalStateNotInitialized = errors.New("local state not initialized")
func (g *GatewayLocator) Run(stopCh <-chan struct{}) { func (g *GatewayLocator) Run(ctx context.Context) {
var lastFetchIndex uint64 var lastFetchIndex uint64
retryLoopBackoff(stopCh, func() error { retryLoopBackoff(ctx, func() error {
idx, err := g.runOnce(lastFetchIndex) idx, err := g.runOnce(lastFetchIndex)
if err != nil { if err != nil {
return err return err

View File

@ -651,7 +651,7 @@ func (s *Server) secondaryIntermediateCertRenewalWatch(ctx context.Context) erro
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
case <-time.After(structs.IntermediateCertRenewInterval): case <-time.After(structs.IntermediateCertRenewInterval):
retryLoopBackoff(ctx.Done(), func() error { retryLoopBackoff(ctx, func() error {
s.caProviderReconfigurationLock.Lock() s.caProviderReconfigurationLock.Lock()
defer s.caProviderReconfigurationLock.Unlock() defer s.caProviderReconfigurationLock.Unlock()
@ -724,7 +724,7 @@ func (s *Server) secondaryCARootWatch(ctx context.Context) error {
connectLogger.Debug("starting Connect CA root replication from primary datacenter", "primary", s.config.PrimaryDatacenter) connectLogger.Debug("starting Connect CA root replication from primary datacenter", "primary", s.config.PrimaryDatacenter)
retryLoopBackoff(ctx.Done(), func() error { retryLoopBackoff(ctx, func() error {
var roots structs.IndexedCARoots var roots structs.IndexedCARoots
if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil { if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil {
return fmt.Errorf("Error retrieving the primary datacenter's roots: %v", err) return fmt.Errorf("Error retrieving the primary datacenter's roots: %v", err)
@ -780,7 +780,7 @@ func (s *Server) replicateIntentions(ctx context.Context) error {
connectLogger.Debug("starting Connect intention replication from primary datacenter", "primary", s.config.PrimaryDatacenter) connectLogger.Debug("starting Connect intention replication from primary datacenter", "primary", s.config.PrimaryDatacenter)
retryLoopBackoff(ctx.Done(), func() error { retryLoopBackoff(ctx, func() error {
// Always use the latest replication token value in case it changed while looping. // Always use the latest replication token value in case it changed while looping.
args.QueryOptions.Token = s.tokens.ReplicationToken() args.QueryOptions.Token = s.tokens.ReplicationToken()
@ -832,14 +832,14 @@ func (s *Server) replicateIntentions(ctx context.Context) error {
// retryLoopBackoff loops a given function indefinitely, backing off exponentially // retryLoopBackoff loops a given function indefinitely, backing off exponentially
// upon errors up to a maximum of maxRetryBackoff seconds. // upon errors up to a maximum of maxRetryBackoff seconds.
func retryLoopBackoff(stopCh <-chan struct{}, loopFn func() error, errFn func(error)) { func retryLoopBackoff(ctx context.Context, loopFn func() error, errFn func(error)) {
var failedAttempts uint var failedAttempts uint
limiter := rate.NewLimiter(loopRateLimit, retryBucketSize) limiter := rate.NewLimiter(loopRateLimit, retryBucketSize)
for { for {
// Rate limit how often we run the loop // Rate limit how often we run the loop
limiter.Wait(context.Background()) limiter.Wait(ctx)
select { select {
case <-stopCh: case <-ctx.Done():
return return
default: default:
} }
@ -850,8 +850,15 @@ func retryLoopBackoff(stopCh <-chan struct{}, loopFn func() error, errFn func(er
if err := loopFn(); err != nil { if err := loopFn(); err != nil {
errFn(err) errFn(err)
time.Sleep(retryTime)
continue timer := time.NewTimer(retryTime)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
continue
}
} }
// Reset the failed attempts after a successful run. // Reset the failed attempts after a successful run.

View File

@ -42,7 +42,7 @@ func (s *Server) stopFederationStateAntiEntropy() {
func (s *Server) federationStateAntiEntropySync(ctx context.Context) error { func (s *Server) federationStateAntiEntropySync(ctx context.Context) error {
var lastFetchIndex uint64 var lastFetchIndex uint64
retryLoopBackoff(ctx.Done(), func() error { retryLoopBackoff(ctx, func() error {
if !s.DatacenterSupportsFederationStates() { if !s.DatacenterSupportsFederationStates() {
return nil return nil
} }

View File

@ -525,7 +525,7 @@ func NewServerWithOptions(config *Config, options ...ConsulOption) (*Server, err
} }
if s.gatewayLocator != nil { if s.gatewayLocator != nil {
go s.gatewayLocator.Run(s.shutdownCh) go s.gatewayLocator.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
} }
// Serf and dynamic bind ports // Serf and dynamic bind ports