lib/retry: extract a new package from lib
This commit is contained in:
parent
5524a43f10
commit
0c7f9c72d7
|
@ -7,13 +7,14 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/proto/pbautoconf"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
// AutoConfig is all the state necessary for being able to parse a configuration
|
||||
|
@ -24,7 +25,7 @@ type AutoConfig struct {
|
|||
acConfig Config
|
||||
logger hclog.Logger
|
||||
cache Cache
|
||||
waiter *lib.RetryWaiter
|
||||
waiter *retry.Waiter
|
||||
config *config.RuntimeConfig
|
||||
autoConfigResponse *pbautoconf.AutoConfigResponse
|
||||
autoConfigSource config.Source
|
||||
|
@ -84,7 +85,7 @@ func New(config Config) (*AutoConfig, error) {
|
|||
}
|
||||
|
||||
if config.Waiter == nil {
|
||||
config.Waiter = lib.NewRetryWaiter(1, 0, 10*time.Minute, lib.NewJitterRandomStagger(25))
|
||||
config.Waiter = retry.NewRetryWaiter(1, 0, 10*time.Minute, retry.NewJitterRandomStagger(25))
|
||||
}
|
||||
|
||||
return &AutoConfig{
|
||||
|
|
|
@ -11,6 +11,9 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
|
@ -18,13 +21,11 @@ import (
|
|||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/consul/proto/pbautoconf"
|
||||
"github.com/hashicorp/consul/proto/pbconfig"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
testretry "github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
)
|
||||
|
||||
type configLoader struct {
|
||||
|
@ -412,7 +413,7 @@ func TestInitialConfiguration_retries(t *testing.T) {
|
|||
mcfg.Config.Loader = loader.Load
|
||||
|
||||
// reduce the retry wait times to make this test run faster
|
||||
mcfg.Config.Waiter = lib.NewRetryWaiter(2, 0, 1*time.Millisecond, nil)
|
||||
mcfg.Config.Waiter = retry.NewWaiter(2, 0, 1*time.Millisecond, nil)
|
||||
|
||||
indexedRoots, cert, extraCerts := mcfg.setupInitialTLS(t, "autoconf", "dc1", "secret")
|
||||
|
||||
|
@ -927,7 +928,7 @@ func TestRootsUpdate(t *testing.T) {
|
|||
// however there is no deterministic way to know once its been written outside of maybe a filesystem
|
||||
// event notifier. That seems a little heavy handed just for this and especially to do in any sort
|
||||
// of cross platform way.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
testretry.Run(t, func(r *testretry.R) {
|
||||
resp, err := testAC.ac.readPersistedAutoConfig()
|
||||
require.NoError(r, err)
|
||||
require.Equal(r, secondRoots.ActiveRootID, resp.CARoots.GetActiveRootID())
|
||||
|
@ -972,7 +973,7 @@ func TestCertUpdate(t *testing.T) {
|
|||
// persisting these to disk happens after all the things we would wait for in assertCertUpdated
|
||||
// will have fired. There is no deterministic way to know once its been written so we wrap
|
||||
// this in a retry.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
testretry.Run(t, func(r *testretry.R) {
|
||||
resp, err := testAC.ac.readPersistedAutoConfig()
|
||||
require.NoError(r, err)
|
||||
|
||||
|
@ -1099,7 +1100,7 @@ func TestFallback(t *testing.T) {
|
|||
|
||||
// persisting these to disk happens after the RPC we waited on above will have fired
|
||||
// There is no deterministic way to know once its been written so we wrap this in a retry.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
testretry.Run(t, func(r *testretry.R) {
|
||||
resp, err := testAC.ac.readPersistedAutoConfig()
|
||||
require.NoError(r, err)
|
||||
|
||||
|
|
|
@ -11,16 +11,17 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAutoEncrypt_generateCSR(t *testing.T) {
|
||||
|
@ -247,7 +248,7 @@ func TestAutoEncrypt_InitialCerts(t *testing.T) {
|
|||
resp.VerifyServerHostname = true
|
||||
})
|
||||
|
||||
mcfg.Config.Waiter = lib.NewRetryWaiter(2, 0, 1*time.Millisecond, nil)
|
||||
mcfg.Config.Waiter = retry.NewRetryWaiter(2, 0, 1*time.Millisecond, nil)
|
||||
|
||||
ac := AutoConfig{
|
||||
config: &config.RuntimeConfig{
|
||||
|
|
|
@ -5,12 +5,13 @@ import (
|
|||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
)
|
||||
|
||||
// DirectRPC is the interface that needs to be satisifed for AutoConfig to be able to perform
|
||||
|
@ -77,7 +78,7 @@ type Config struct {
|
|||
// jitter of 25% of the wait time. Setting this is mainly useful for
|
||||
// testing purposes to allow testing out the retrying functionality without
|
||||
// having the test take minutes/hours to complete.
|
||||
Waiter *lib.RetryWaiter
|
||||
Waiter *retry.Waiter
|
||||
|
||||
// Loader merges source with the existing FileSources and returns the complete
|
||||
// RuntimeConfig.
|
||||
|
|
|
@ -7,10 +7,11 @@ import (
|
|||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -46,7 +47,7 @@ type ReplicatorConfig struct {
|
|||
|
||||
type Replicator struct {
|
||||
limiter *rate.Limiter
|
||||
waiter *lib.RetryWaiter
|
||||
waiter *retry.Waiter
|
||||
delegate ReplicatorDelegate
|
||||
logger hclog.Logger
|
||||
lastRemoteIndex uint64
|
||||
|
@ -75,7 +76,7 @@ func NewReplicator(config *ReplicatorConfig) (*Replicator, error) {
|
|||
if minFailures < 0 {
|
||||
minFailures = 0
|
||||
}
|
||||
waiter := lib.NewRetryWaiter(minFailures, 0*time.Second, maxWait, lib.NewJitterRandomStagger(10))
|
||||
waiter := retry.NewRetryWaiter(minFailures, 0*time.Second, maxWait, retry.NewJitterRandomStagger(10))
|
||||
return &Replicator{
|
||||
limiter: limiter,
|
||||
waiter: waiter,
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
package lib
|
||||
package retry
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/lib"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -39,12 +41,12 @@ func (j *JitterRandomStagger) AddJitter(baseTime time.Duration) time.Duration {
|
|||
|
||||
// time.Duration is actually a type alias for int64 which is why casting
|
||||
// to the duration type and then dividing works
|
||||
return baseTime + RandomStagger((baseTime*time.Duration(j.percent))/100)
|
||||
return baseTime + lib.RandomStagger((baseTime*time.Duration(j.percent))/100)
|
||||
}
|
||||
|
||||
// RetryWaiter will record failed and successful operations and provide
|
||||
// a channel to wait on before a failed operation can be retried.
|
||||
type RetryWaiter struct {
|
||||
type Waiter struct {
|
||||
minFailures uint
|
||||
minWait time.Duration
|
||||
maxWait time.Duration
|
||||
|
@ -53,7 +55,7 @@ type RetryWaiter struct {
|
|||
}
|
||||
|
||||
// Creates a new RetryWaiter
|
||||
func NewRetryWaiter(minFailures int, minWait, maxWait time.Duration, jitter Jitter) *RetryWaiter {
|
||||
func NewRetryWaiter(minFailures int, minWait, maxWait time.Duration, jitter Jitter) *Waiter {
|
||||
if minFailures < 0 {
|
||||
minFailures = defaultMinFailures
|
||||
}
|
||||
|
@ -66,7 +68,7 @@ func NewRetryWaiter(minFailures int, minWait, maxWait time.Duration, jitter Jitt
|
|||
minWait = 0 * time.Nanosecond
|
||||
}
|
||||
|
||||
return &RetryWaiter{
|
||||
return &Waiter{
|
||||
minFailures: uint(minFailures),
|
||||
minWait: minWait,
|
||||
maxWait: maxWait,
|
||||
|
@ -77,7 +79,7 @@ func NewRetryWaiter(minFailures int, minWait, maxWait time.Duration, jitter Jitt
|
|||
|
||||
// calculates the necessary wait time before the
|
||||
// next operation should be allowed.
|
||||
func (rw *RetryWaiter) calculateWait() time.Duration {
|
||||
func (rw *Waiter) calculateWait() time.Duration {
|
||||
waitTime := rw.minWait
|
||||
if rw.failures > rw.minFailures {
|
||||
shift := rw.failures - rw.minFailures - 1
|
||||
|
@ -104,7 +106,7 @@ func (rw *RetryWaiter) calculateWait() time.Duration {
|
|||
// calculates the waitTime and returns a chan
|
||||
// that will become selectable once that amount
|
||||
// of time has elapsed.
|
||||
func (rw *RetryWaiter) wait() <-chan struct{} {
|
||||
func (rw *Waiter) wait() <-chan struct{} {
|
||||
waitTime := rw.calculateWait()
|
||||
ch := make(chan struct{})
|
||||
if waitTime > 0 {
|
||||
|
@ -119,28 +121,33 @@ func (rw *RetryWaiter) wait() <-chan struct{} {
|
|||
|
||||
// Marks that an operation is successful which resets the failure count.
|
||||
// The chan that is returned will be immediately selectable
|
||||
func (rw *RetryWaiter) Success() <-chan struct{} {
|
||||
func (rw *Waiter) Success() <-chan struct{} {
|
||||
rw.Reset()
|
||||
return rw.wait()
|
||||
}
|
||||
|
||||
// Marks that an operation failed. The chan returned will be selectable
|
||||
// once the calculated retry wait amount of time has elapsed
|
||||
func (rw *RetryWaiter) Failed() <-chan struct{} {
|
||||
func (rw *Waiter) Failed() <-chan struct{} {
|
||||
rw.failures += 1
|
||||
ch := rw.wait()
|
||||
return ch
|
||||
}
|
||||
|
||||
// Resets the internal failure counter
|
||||
func (rw *RetryWaiter) Reset() {
|
||||
// Resets the internal failure counter.
|
||||
func (rw *Waiter) Reset() {
|
||||
rw.failures = 0
|
||||
}
|
||||
|
||||
// Failures returns the current number of consecutive failures recorded.
|
||||
func (rw *Waiter) Failures() int {
|
||||
return int(rw.failures)
|
||||
}
|
||||
|
||||
// WaitIf is a convenice method to record whether the last
|
||||
// operation was a success or failure and return a chan that
|
||||
// will be selectablw when the next operation can be done.
|
||||
func (rw *RetryWaiter) WaitIf(failure bool) <-chan struct{} {
|
||||
func (rw *Waiter) WaitIf(failure bool) <-chan struct{} {
|
||||
if failure {
|
||||
return rw.Failed()
|
||||
}
|
||||
|
@ -151,6 +158,6 @@ func (rw *RetryWaiter) WaitIf(failure bool) <-chan struct{} {
|
|||
// operation was a success or failure based on whether the err
|
||||
// is nil and then return a chan that will be selectable when
|
||||
// the next operation can be done.
|
||||
func (rw *RetryWaiter) WaitIfErr(err error) <-chan struct{} {
|
||||
func (rw *Waiter) WaitIfErr(err error) <-chan struct{} {
|
||||
return rw.WaitIf(err != nil)
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package lib
|
||||
package retry
|
||||
|
||||
import (
|
||||
"fmt"
|
Loading…
Reference in New Issue