Implement config entry replication (#5706)
This commit is contained in:
parent
51f2b9d1d4
commit
3b5d38fb49
|
@ -321,6 +321,20 @@ type Config struct {
|
|||
// user events. This function should not block.
|
||||
UserEventHandler func(serf.UserEvent)
|
||||
|
||||
// ConfigReplicationRate is the max number of replication rounds that can
|
||||
// be run per second. Note that either 1 or 2 RPCs are used during each replication
|
||||
// round
|
||||
ConfigReplicationRate int
|
||||
|
||||
// ConfigReplicationBurst is how many replication rounds can be bursted after a
|
||||
// period of idleness
|
||||
ConfigReplicationBurst int
|
||||
|
||||
// ConfigReplicationApply limit is the max number of replication-related
|
||||
// apply operations that we allow during a one second period. This is
|
||||
// used to limit the amount of Raft bandwidth used for replication.
|
||||
ConfigReplicationApplyLimit int
|
||||
|
||||
// CoordinateUpdatePeriod controls how long a server batches coordinate
|
||||
// updates before applying them in a Raft transaction. A larger period
|
||||
// leads to fewer Raft transactions, but also the stored coordinates
|
||||
|
@ -432,26 +446,29 @@ func DefaultConfig() *Config {
|
|||
}
|
||||
|
||||
conf := &Config{
|
||||
Build: version.Version,
|
||||
Datacenter: DefaultDC,
|
||||
NodeName: hostname,
|
||||
RPCAddr: DefaultRPCAddr,
|
||||
RaftConfig: raft.DefaultConfig(),
|
||||
SerfLANConfig: lib.SerfDefaultConfig(),
|
||||
SerfWANConfig: lib.SerfDefaultConfig(),
|
||||
SerfFloodInterval: 60 * time.Second,
|
||||
ReconcileInterval: 60 * time.Second,
|
||||
ProtocolVersion: ProtocolVersion2Compatible,
|
||||
ACLPolicyTTL: 30 * time.Second,
|
||||
ACLTokenTTL: 30 * time.Second,
|
||||
ACLDefaultPolicy: "allow",
|
||||
ACLDownPolicy: "extend-cache",
|
||||
ACLReplicationRate: 1,
|
||||
ACLReplicationBurst: 5,
|
||||
ACLReplicationApplyLimit: 100, // ops / sec
|
||||
TombstoneTTL: 15 * time.Minute,
|
||||
TombstoneTTLGranularity: 30 * time.Second,
|
||||
SessionTTLMin: 10 * time.Second,
|
||||
Build: version.Version,
|
||||
Datacenter: DefaultDC,
|
||||
NodeName: hostname,
|
||||
RPCAddr: DefaultRPCAddr,
|
||||
RaftConfig: raft.DefaultConfig(),
|
||||
SerfLANConfig: lib.SerfDefaultConfig(),
|
||||
SerfWANConfig: lib.SerfDefaultConfig(),
|
||||
SerfFloodInterval: 60 * time.Second,
|
||||
ReconcileInterval: 60 * time.Second,
|
||||
ProtocolVersion: ProtocolVersion2Compatible,
|
||||
ACLPolicyTTL: 30 * time.Second,
|
||||
ACLTokenTTL: 30 * time.Second,
|
||||
ACLDefaultPolicy: "allow",
|
||||
ACLDownPolicy: "extend-cache",
|
||||
ACLReplicationRate: 1,
|
||||
ACLReplicationBurst: 5,
|
||||
ACLReplicationApplyLimit: 100, // ops / sec
|
||||
ConfigReplicationRate: 1,
|
||||
ConfigReplicationBurst: 5,
|
||||
ConfigReplicationApplyLimit: 100, // ops / sec
|
||||
TombstoneTTL: 15 * time.Minute,
|
||||
TombstoneTTLGranularity: 30 * time.Second,
|
||||
SessionTTLMin: 10 * time.Second,
|
||||
|
||||
// These are tuned to provide a total throughput of 128 updates
|
||||
// per second. If you update these, you should update the client-
|
||||
|
|
|
@ -132,6 +132,43 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe
|
|||
})
|
||||
}
|
||||
|
||||
// ListAll returns all the known configuration entries
|
||||
func (c *ConfigEntry) ListAll(args *structs.DCSpecificRequest, reply *structs.IndexedGenericConfigEntries) error {
|
||||
if done, err := c.srv.forward("ConfigEntry.ListAll", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"config_entry", "listAll"}, time.Now())
|
||||
|
||||
// Fetch the ACL token, if any.
|
||||
rule, err := c.srv.ResolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
index, entries, err := state.ConfigEntries(ws)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Filter the entries returned by ACL permissions.
|
||||
filteredEntries := make([]structs.ConfigEntry, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
if rule != nil && !entry.CanRead(rule) {
|
||||
continue
|
||||
}
|
||||
filteredEntries = append(filteredEntries, entry)
|
||||
}
|
||||
|
||||
reply.Entries = filteredEntries
|
||||
reply.Index = index
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Delete deletes a config entry.
|
||||
func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{}) error {
|
||||
if done, err := c.srv.forward("ConfigEntry.Delete", args, args, reply); done {
|
||||
|
|
|
@ -274,6 +274,49 @@ func TestConfigEntry_List(t *testing.T) {
|
|||
require.Equal(expected, out)
|
||||
}
|
||||
|
||||
func TestConfigEntry_ListAll(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// Create some dummy services in the state store to look up.
|
||||
state := s1.fsm.State()
|
||||
expected := structs.IndexedGenericConfigEntries{
|
||||
Entries: []structs.ConfigEntry{
|
||||
&structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: "global",
|
||||
},
|
||||
&structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "bar",
|
||||
},
|
||||
&structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
},
|
||||
},
|
||||
}
|
||||
require.NoError(state.EnsureConfigEntry(1, expected.Entries[0]))
|
||||
require.NoError(state.EnsureConfigEntry(2, expected.Entries[1]))
|
||||
require.NoError(state.EnsureConfigEntry(3, expected.Entries[2]))
|
||||
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var out structs.IndexedGenericConfigEntries
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ListAll", &args, &out))
|
||||
|
||||
expected.QueryMeta = out.QueryMeta
|
||||
require.Equal(expected, out)
|
||||
}
|
||||
|
||||
func TestConfigEntry_List_ACLDeny(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -355,6 +398,86 @@ operator = "read"
|
|||
require.Equal(structs.ProxyDefaults, proxyConf.Kind)
|
||||
}
|
||||
|
||||
func TestConfigEntry_ListAll_ACLDeny(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// Create the ACL.
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTokenTypeClient,
|
||||
Rules: `
|
||||
service "foo" {
|
||||
policy = "read"
|
||||
}
|
||||
operator = "read"
|
||||
`,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create some dummy service/proxy configs to be looked up.
|
||||
state := s1.fsm.State()
|
||||
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
}))
|
||||
require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
}))
|
||||
require.NoError(state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "db",
|
||||
}))
|
||||
|
||||
// This should filter out the "db" service since we don't have permissions for it.
|
||||
args := structs.ConfigEntryQuery{
|
||||
Datacenter: s1.config.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: id},
|
||||
}
|
||||
var out structs.IndexedGenericConfigEntries
|
||||
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.ListAll", &args, &out)
|
||||
require.NoError(err)
|
||||
require.Len(out.Entries, 2)
|
||||
svcIndex := 0
|
||||
proxyIndex := 1
|
||||
if out.Entries[0].GetKind() == structs.ProxyDefaults {
|
||||
svcIndex = 1
|
||||
proxyIndex = 0
|
||||
}
|
||||
|
||||
svcConf, ok := out.Entries[svcIndex].(*structs.ServiceConfigEntry)
|
||||
require.True(ok)
|
||||
proxyConf, ok := out.Entries[proxyIndex].(*structs.ProxyConfigEntry)
|
||||
require.True(ok)
|
||||
|
||||
require.Equal("foo", svcConf.Name)
|
||||
require.Equal(structs.ServiceDefaults, svcConf.Kind)
|
||||
require.Equal(structs.ProxyConfigGlobal, proxyConf.Name)
|
||||
require.Equal(structs.ProxyDefaults, proxyConf.Kind)
|
||||
}
|
||||
|
||||
func TestConfigEntry_Delete(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -0,0 +1,199 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func cmpConfigLess(first structs.ConfigEntry, second structs.ConfigEntry) bool {
|
||||
return first.GetKind() < second.GetKind() || (first.GetKind() == second.GetKind() && first.GetName() < second.GetName())
|
||||
}
|
||||
|
||||
func configSort(configs []structs.ConfigEntry) {
|
||||
sort.Slice(configs, func(i, j int) bool {
|
||||
return cmpConfigLess(configs[i], configs[j])
|
||||
})
|
||||
}
|
||||
|
||||
func diffConfigEntries(local []structs.ConfigEntry, remote []structs.ConfigEntry, lastRemoteIndex uint64) ([]structs.ConfigEntry, []structs.ConfigEntry) {
|
||||
configSort(local)
|
||||
configSort(remote)
|
||||
|
||||
var deletions []structs.ConfigEntry
|
||||
var updates []structs.ConfigEntry
|
||||
var localIdx int
|
||||
var remoteIdx int
|
||||
for localIdx, remoteIdx = 0, 0; localIdx < len(local) && remoteIdx < len(remote); {
|
||||
if local[localIdx].GetKind() == remote[remoteIdx].GetKind() && local[localIdx].GetName() == remote[remoteIdx].GetName() {
|
||||
// config is in both the local and remote state - need to check raft indices
|
||||
if remote[remoteIdx].GetRaftIndex().ModifyIndex > lastRemoteIndex {
|
||||
updates = append(updates, remote[remoteIdx])
|
||||
}
|
||||
// increment both indices when equal
|
||||
localIdx += 1
|
||||
remoteIdx += 1
|
||||
} else if cmpConfigLess(local[localIdx], remote[remoteIdx]) {
|
||||
// config no longer in remoted state - needs deleting
|
||||
deletions = append(deletions, local[localIdx])
|
||||
|
||||
// increment just the local index
|
||||
localIdx += 1
|
||||
} else {
|
||||
// local state doesn't have this config - needs updating
|
||||
updates = append(updates, remote[remoteIdx])
|
||||
|
||||
// increment just the remote index
|
||||
remoteIdx += 1
|
||||
}
|
||||
}
|
||||
|
||||
for ; localIdx < len(local); localIdx += 1 {
|
||||
deletions = append(deletions, local[localIdx])
|
||||
}
|
||||
|
||||
for ; remoteIdx < len(remote); remoteIdx += 1 {
|
||||
updates = append(updates, remote[remoteIdx])
|
||||
}
|
||||
|
||||
return deletions, updates
|
||||
}
|
||||
|
||||
func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.ConfigEntry, op structs.ConfigEntryOp) (bool, error) {
|
||||
ticker := time.NewTicker(time.Second / time.Duration(s.config.ConfigReplicationApplyLimit))
|
||||
defer ticker.Stop()
|
||||
|
||||
for i, entry := range configs {
|
||||
req := structs.ConfigEntryRequest{
|
||||
Op: op,
|
||||
Datacenter: s.config.Datacenter,
|
||||
Entry: entry,
|
||||
}
|
||||
|
||||
resp, err := s.raftApply(structs.ConfigEntryRequestType, &req)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Failed to apply config %s: %v", op, err)
|
||||
}
|
||||
if respErr, ok := resp.(error); ok && err != nil {
|
||||
return false, fmt.Errorf("Failed to apply config %s: %v", op, respErr)
|
||||
}
|
||||
|
||||
if i < len(configs)-1 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return true, nil
|
||||
case <-ticker.C:
|
||||
// do nothing - ready for the next batch
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (s *Server) fetchConfigEntries(lastRemoteIndex uint64) (*structs.IndexedGenericConfigEntries, error) {
|
||||
defer metrics.MeasureSince([]string{"leader", "replication", "config-entries", "fetch"}, time.Now())
|
||||
|
||||
req := structs.DCSpecificRequest{
|
||||
Datacenter: s.config.PrimaryDatacenter,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
AllowStale: true,
|
||||
MinQueryIndex: lastRemoteIndex,
|
||||
Token: s.tokens.ReplicationToken(),
|
||||
},
|
||||
}
|
||||
|
||||
var response structs.IndexedGenericConfigEntries
|
||||
if err := s.RPC("ConfigEntry.ListAll", &req, &response); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (s *Server) replicateConfig(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) {
|
||||
remote, err := s.fetchConfigEntries(lastRemoteIndex)
|
||||
if err != nil {
|
||||
return 0, false, fmt.Errorf("failed to retrieve remote config entries: %v", err)
|
||||
}
|
||||
|
||||
s.logger.Printf("[DEBUG] replication: finished fetching config entries: %d", len(remote.Entries))
|
||||
|
||||
// Need to check if we should be stopping. This will be common as the fetching process is a blocking
|
||||
// RPC which could have been hanging around for a long time and during that time leadership could
|
||||
// have been lost.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, true, nil
|
||||
default:
|
||||
// do nothing
|
||||
}
|
||||
|
||||
// Measure everything after the remote query, which can block for long
|
||||
// periods of time. This metric is a good measure of how expensive the
|
||||
// replication process is.
|
||||
defer metrics.MeasureSince([]string{"leader", "replication", "config", "apply"}, time.Now())
|
||||
|
||||
_, local, err := s.fsm.State().ConfigEntries(nil)
|
||||
if err != nil {
|
||||
return 0, false, fmt.Errorf("failed to retrieve local config entries: %v", err)
|
||||
}
|
||||
|
||||
// If the remote index ever goes backwards, it's a good indication that
|
||||
// the remote side was rebuilt and we should do a full sync since we
|
||||
// can't make any assumptions about what's going on.
|
||||
//
|
||||
// Resetting lastRemoteIndex to 0 will work because we never consider local
|
||||
// raft indices. Instead we compare the raft modify index in the response object
|
||||
// with the lastRemoteIndex (only when we already have a config entry of the same kind/name)
|
||||
// to determine if an update is needed. Resetting lastRemoteIndex to 0 then has the affect
|
||||
// of making us think all the local state is out of date and any matching entries should
|
||||
// still be updated.
|
||||
//
|
||||
// The lastRemoteIndex is not used when the entry exists either only in the local state or
|
||||
// only in the remote state. In those situations we need to either delete it or create it.
|
||||
if remote.QueryMeta.Index < lastRemoteIndex {
|
||||
s.logger.Printf("[WARN] replication: Config Entry replication remote index moved backwards (%d to %d), forcing a full Config Entry sync", lastRemoteIndex, remote.QueryMeta.Index)
|
||||
lastRemoteIndex = 0
|
||||
}
|
||||
|
||||
s.logger.Printf("[DEBUG] replication: Config Entry replication - local: %d, remote: %d", len(local), len(remote.Entries))
|
||||
// Calculate the changes required to bring the state into sync and then
|
||||
// apply them.
|
||||
deletions, updates := diffConfigEntries(local, remote.Entries, lastRemoteIndex)
|
||||
|
||||
s.logger.Printf("[DEBUG] replication: Config Entry replication - deletions: %d, updates: %d", len(deletions), len(updates))
|
||||
|
||||
if len(deletions) > 0 {
|
||||
s.logger.Printf("[DEBUG] replication: Config Entry replication - performing %d deletions", len(deletions))
|
||||
|
||||
exit, err := s.reconcileLocalConfig(ctx, deletions, structs.ConfigEntryDelete)
|
||||
if exit {
|
||||
return 0, true, nil
|
||||
}
|
||||
if err != nil {
|
||||
return 0, false, fmt.Errorf("failed to delete local config entries: %v", err)
|
||||
}
|
||||
s.logger.Printf("[DEBUG] replication: Config Entry replication - finished deletions")
|
||||
}
|
||||
|
||||
if len(updates) > 0 {
|
||||
s.logger.Printf("[DEBUG] replication: Config Entry replication - performing %d updates", len(updates))
|
||||
exit, err := s.reconcileLocalConfig(ctx, updates, structs.ConfigEntryUpsert)
|
||||
if exit {
|
||||
return 0, true, nil
|
||||
}
|
||||
if err != nil {
|
||||
return 0, false, fmt.Errorf("failed to update local config entries: %v", err)
|
||||
}
|
||||
s.logger.Printf("[DEBUG] replication: Config Entry replication - finished updates")
|
||||
}
|
||||
|
||||
// Return the index we got back from the remote side, since we've synced
|
||||
// up with the remote state as of that index.
|
||||
return remote.QueryMeta.Index, false, nil
|
||||
}
|
|
@ -0,0 +1,165 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestReplication_ConfigEntries(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.ConfigReplicationRate = 100
|
||||
c.ConfigReplicationBurst = 100
|
||||
c.ConfigReplicationApplyLimit = 1000000
|
||||
})
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Try to join.
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
// Create some new configuration entries
|
||||
var entries []structs.ConfigEntry
|
||||
for i := 0; i < 50; i++ {
|
||||
arg := structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ConfigEntryUpsert,
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: fmt.Sprintf("svc-%d", i),
|
||||
Protocol: "tcp",
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
require.NoError(t, s1.RPC("ConfigEntry.Apply", &arg, &out))
|
||||
entries = append(entries, arg.Entry)
|
||||
}
|
||||
|
||||
arg := structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ConfigEntryUpsert,
|
||||
Entry: &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: "global",
|
||||
Config: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
"bar": 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
require.NoError(t, s1.RPC("ConfigEntry.Apply", &arg, &out))
|
||||
entries = append(entries, arg.Entry)
|
||||
|
||||
checkSame := func(t *retry.R) error {
|
||||
_, remote, err := s1.fsm.State().ConfigEntries(nil)
|
||||
require.NoError(t, err)
|
||||
_, local, err := s2.fsm.State().ConfigEntries(nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, local, len(remote))
|
||||
for i, entry := range remote {
|
||||
require.Equal(t, entry.GetKind(), local[i].GetKind())
|
||||
require.Equal(t, entry.GetName(), local[i].GetName())
|
||||
|
||||
// more validations
|
||||
switch entry.GetKind() {
|
||||
case structs.ServiceDefaults:
|
||||
localSvc, ok := local[i].(*structs.ServiceConfigEntry)
|
||||
require.True(t, ok)
|
||||
remoteSvc, ok := entry.(*structs.ServiceConfigEntry)
|
||||
require.True(t, ok)
|
||||
|
||||
require.Equal(t, remoteSvc.Protocol, localSvc.Protocol)
|
||||
require.Equal(t, remoteSvc.Connect, localSvc.Connect)
|
||||
case structs.ProxyDefaults:
|
||||
localProxy, ok := local[i].(*structs.ProxyConfigEntry)
|
||||
require.True(t, ok)
|
||||
remoteProxy, ok := entry.(*structs.ProxyConfigEntry)
|
||||
require.True(t, ok)
|
||||
|
||||
require.Equal(t, remoteProxy.Config, localProxy.Config)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
checkSame(r)
|
||||
})
|
||||
|
||||
// Update those policies
|
||||
for i := 0; i < 50; i++ {
|
||||
arg := structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ConfigEntryUpsert,
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: fmt.Sprintf("svc-%d", i),
|
||||
Protocol: "udp",
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
require.NoError(t, s1.RPC("ConfigEntry.Apply", &arg, &out))
|
||||
}
|
||||
|
||||
arg = structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ConfigEntryUpsert,
|
||||
Entry: &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: "global",
|
||||
Config: map[string]interface{}{
|
||||
"foo": "baz",
|
||||
"baz": 2,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
require.NoError(t, s1.RPC("ConfigEntry.Apply", &arg, &out))
|
||||
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
checkSame(r)
|
||||
})
|
||||
|
||||
for _, entry := range entries {
|
||||
arg := structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ConfigEntryDelete,
|
||||
Entry: entry,
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
require.NoError(t, s1.RPC("ConfigEntry.Delete", &arg, &out))
|
||||
}
|
||||
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
checkSame(r)
|
||||
})
|
||||
}
|
|
@ -269,6 +269,8 @@ func (s *Server) establishLeadership() error {
|
|||
return err
|
||||
}
|
||||
|
||||
s.startConfigReplication()
|
||||
|
||||
s.startEnterpriseLeader()
|
||||
|
||||
s.startCARootPruning()
|
||||
|
@ -289,6 +291,8 @@ func (s *Server) revokeLeadership() error {
|
|||
return err
|
||||
}
|
||||
|
||||
s.stopConfigReplication()
|
||||
|
||||
s.stopEnterpriseLeader()
|
||||
|
||||
s.stopCARootPruning()
|
||||
|
@ -848,6 +852,20 @@ func (s *Server) stopACLReplication() {
|
|||
s.aclReplicationEnabled = false
|
||||
}
|
||||
|
||||
func (s *Server) startConfigReplication() {
|
||||
if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter {
|
||||
// replication shouldn't run in the primary DC
|
||||
return
|
||||
}
|
||||
|
||||
s.configReplicator.Start()
|
||||
}
|
||||
|
||||
func (s *Server) stopConfigReplication() {
|
||||
// will be a no-op when not started
|
||||
s.configReplicator.Stop()
|
||||
}
|
||||
|
||||
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
|
||||
func (s *Server) getOrCreateAutopilotConfig() *autopilot.Config {
|
||||
state := s.fsm.State()
|
||||
|
|
|
@ -0,0 +1,155 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
const (
|
||||
// replicationMaxRetryWait is the maximum number of seconds to wait between
|
||||
// failed blocking queries when backing off.
|
||||
replicationDefaultMaxRetryWait = 120 * time.Second
|
||||
|
||||
replicationDefaultRate = 1
|
||||
)
|
||||
|
||||
type ReplicatorConfig struct {
|
||||
// Name to be used in various logging
|
||||
Name string
|
||||
// Function to perform the actual replication
|
||||
ReplicateFn ReplicatorFunc
|
||||
// The number of replication rounds per second that are allowed
|
||||
Rate int
|
||||
// The number of replication rounds that can be done in a burst
|
||||
Burst int
|
||||
// Minimum number of RPC failures to ignore before backing off
|
||||
MinFailures int
|
||||
// Maximum wait time between failing RPCs
|
||||
MaxRetryWait time.Duration
|
||||
// Where to send our logs
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
||||
type ReplicatorFunc func(ctx context.Context, lastRemoteIndex uint64) (index uint64, exit bool, err error)
|
||||
|
||||
type Replicator struct {
|
||||
name string
|
||||
lock sync.RWMutex
|
||||
running bool
|
||||
cancel context.CancelFunc
|
||||
ctx context.Context
|
||||
limiter *rate.Limiter
|
||||
waiter *lib.RetryWaiter
|
||||
replicate ReplicatorFunc
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
func NewReplicator(config *ReplicatorConfig) (*Replicator, error) {
|
||||
if config == nil {
|
||||
return nil, fmt.Errorf("Cannot create the Replicator without a config")
|
||||
}
|
||||
if config.ReplicateFn == nil {
|
||||
return nil, fmt.Errorf("Cannot create the Replicator without a ReplicateFn set in the config")
|
||||
}
|
||||
if config.Logger == nil {
|
||||
config.Logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
limiter := rate.NewLimiter(rate.Limit(config.Rate), config.Burst)
|
||||
|
||||
maxWait := config.MaxRetryWait
|
||||
if maxWait == 0 {
|
||||
maxWait = replicationDefaultMaxRetryWait
|
||||
}
|
||||
|
||||
minFailures := config.MinFailures
|
||||
if minFailures < 0 {
|
||||
minFailures = 0
|
||||
}
|
||||
waiter := lib.NewRetryWaiter(minFailures, 0*time.Second, maxWait, lib.NewJitterRandomStagger(10))
|
||||
return &Replicator{
|
||||
name: config.Name,
|
||||
running: false,
|
||||
cancel: cancel,
|
||||
ctx: ctx,
|
||||
limiter: limiter,
|
||||
waiter: waiter,
|
||||
replicate: config.ReplicateFn,
|
||||
logger: config.Logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *Replicator) Start() {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
if r.running {
|
||||
return
|
||||
}
|
||||
|
||||
go r.run()
|
||||
|
||||
r.running = true
|
||||
r.logger.Printf("[INFO] replication: started %s replication", r.name)
|
||||
}
|
||||
|
||||
func (r *Replicator) run() {
|
||||
var lastRemoteIndex uint64
|
||||
|
||||
defer r.logger.Printf("[INFO] replication: stopped %s replication", r.name)
|
||||
|
||||
for {
|
||||
// This ensures we aren't doing too many successful replication rounds - mostly useful when
|
||||
// the data within the primary datacenter is changing rapidly but we try to limit the amount
|
||||
// of resources replication into the secondary datacenter should take
|
||||
if err := r.limiter.Wait(r.ctx); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Perform a single round of replication
|
||||
index, exit, err := r.replicate(r.ctx, lastRemoteIndex)
|
||||
if exit {
|
||||
// the replication function told us to exit
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// reset the lastRemoteIndex when there is an RPC failure. This should cause a full sync to be done during
|
||||
// the next round of replication
|
||||
lastRemoteIndex = 0
|
||||
r.logger.Printf("[WARN] replication: %s replication error (will retry if still leader): %v", r.name, err)
|
||||
} else {
|
||||
lastRemoteIndex = index
|
||||
r.logger.Printf("[DEBUG] replication: %s replication completed through remote index %d", r.name, index)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
return
|
||||
// wait some amount of time to prevent churning through many replication rounds while replication is failing
|
||||
case <-r.waiter.WaitIfErr(err):
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Replicator) Stop() {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
if !r.running {
|
||||
return
|
||||
}
|
||||
|
||||
r.logger.Printf("[DEBUG] replication: stopping %s replication", r.name)
|
||||
r.cancel()
|
||||
r.cancel = nil
|
||||
r.running = false
|
||||
}
|
|
@ -138,6 +138,10 @@ type Server struct {
|
|||
// Consul configuration
|
||||
config *Config
|
||||
|
||||
// configReplicator is used to manage the leaders replication routines for
|
||||
// centralized config
|
||||
configReplicator *Replicator
|
||||
|
||||
// tokens holds ACL tokens initially from the configuration, but can
|
||||
// be updated at runtime, so should always be used instead of going to
|
||||
// the configuration directly.
|
||||
|
@ -347,6 +351,19 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store, tl
|
|||
return nil, err
|
||||
}
|
||||
|
||||
configReplicatorConfig := ReplicatorConfig{
|
||||
Name: "Config Entry",
|
||||
ReplicateFn: s.replicateConfig,
|
||||
Rate: s.config.ConfigReplicationRate,
|
||||
Burst: s.config.ConfigReplicationBurst,
|
||||
Logger: logger,
|
||||
}
|
||||
s.configReplicator, err = NewReplicator(&configReplicatorConfig)
|
||||
if err != nil {
|
||||
s.Shutdown()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize the stats fetcher that autopilot will use.
|
||||
s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter)
|
||||
|
||||
|
|
|
@ -107,7 +107,7 @@ func (s *Store) ConfigEntry(ws memdb.WatchSet, kind, name string) (uint64, struc
|
|||
|
||||
// ConfigEntries is called to get all config entry objects.
|
||||
func (s *Store) ConfigEntries(ws memdb.WatchSet) (uint64, []structs.ConfigEntry, error) {
|
||||
return s.ConfigEntriesByKind(nil, "")
|
||||
return s.ConfigEntriesByKind(ws, "")
|
||||
}
|
||||
|
||||
// ConfigEntriesByKind is called to get all config entry objects with the given kind.
|
||||
|
|
|
@ -1298,6 +1298,73 @@ func (c *IndexedConfigEntries) UnmarshalBinary(data []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type IndexedGenericConfigEntries struct {
|
||||
Entries []ConfigEntry
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
func (c *IndexedGenericConfigEntries) MarshalBinary() (data []byte, err error) {
|
||||
// bs will grow if needed but allocate enough to avoid reallocation in common
|
||||
// case.
|
||||
bs := make([]byte, 128)
|
||||
enc := codec.NewEncoderBytes(&bs, msgpackHandle)
|
||||
|
||||
if err := enc.Encode(len(c.Entries)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, entry := range c.Entries {
|
||||
if err := enc.Encode(entry.GetKind()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := enc.Encode(entry); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := enc.Encode(c.QueryMeta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
func (c *IndexedGenericConfigEntries) UnmarshalBinary(data []byte) error {
|
||||
// First decode the number of entries.
|
||||
var numEntries int
|
||||
dec := codec.NewDecoderBytes(data, msgpackHandle)
|
||||
if err := dec.Decode(&numEntries); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Then decode the slice of ConfigEntries
|
||||
c.Entries = make([]ConfigEntry, numEntries)
|
||||
for i := 0; i < numEntries; i++ {
|
||||
var kind string
|
||||
if err := dec.Decode(&kind); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
entry, err := MakeConfigEntry(kind, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := dec.Decode(entry); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.Entries[i] = entry
|
||||
}
|
||||
|
||||
if err := dec.Decode(&c.QueryMeta); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// DirEntry is used to represent a directory entry. This is
|
||||
// used for values in our Key-Value store.
|
||||
type DirEntry struct {
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
package lib
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMinFailures = 0
|
||||
defaultMaxWait = 2 * time.Minute
|
||||
)
|
||||
|
||||
// Interface used for offloading jitter calculations from the RetryWaiter
|
||||
type Jitter interface {
|
||||
AddJitter(baseTime time.Duration) time.Duration
|
||||
}
|
||||
|
||||
// Calculates a random jitter between 0 and up to a specific percentage of the baseTime
|
||||
type JitterRandomStagger struct {
|
||||
// int64 because we are going to be doing math against an int64 to represent nanoseconds
|
||||
percent int64
|
||||
}
|
||||
|
||||
// Creates a new JitterRandomStagger
|
||||
func NewJitterRandomStagger(percent int) *JitterRandomStagger {
|
||||
if percent < 0 {
|
||||
percent = 0
|
||||
}
|
||||
|
||||
return &JitterRandomStagger{
|
||||
percent: int64(percent),
|
||||
}
|
||||
}
|
||||
|
||||
// Implments the Jitter interface
|
||||
func (j *JitterRandomStagger) AddJitter(baseTime time.Duration) time.Duration {
|
||||
if j.percent == 0 {
|
||||
return baseTime
|
||||
}
|
||||
|
||||
// time.Duration is actually a type alias for int64 which is why casting
|
||||
// to the duration type and then dividing works
|
||||
return baseTime + RandomStagger((baseTime*time.Duration(j.percent))/100)
|
||||
}
|
||||
|
||||
// RetryWaiter will record failed and successful operations and provide
|
||||
// a channel to wait on before a failed operation can be retried.
|
||||
type RetryWaiter struct {
|
||||
minFailures uint
|
||||
minWait time.Duration
|
||||
maxWait time.Duration
|
||||
jitter Jitter
|
||||
failures uint
|
||||
}
|
||||
|
||||
// Creates a new RetryWaiter
|
||||
func NewRetryWaiter(minFailures int, minWait, maxWait time.Duration, jitter Jitter) *RetryWaiter {
|
||||
if minFailures < 0 {
|
||||
minFailures = defaultMinFailures
|
||||
}
|
||||
|
||||
if maxWait <= 0 {
|
||||
maxWait = defaultMaxWait
|
||||
}
|
||||
|
||||
if minWait <= 0 {
|
||||
minWait = 0 * time.Nanosecond
|
||||
}
|
||||
|
||||
return &RetryWaiter{
|
||||
minFailures: uint(minFailures),
|
||||
minWait: minWait,
|
||||
maxWait: maxWait,
|
||||
failures: 0,
|
||||
jitter: jitter,
|
||||
}
|
||||
}
|
||||
|
||||
// calculates the necessary wait time before the
|
||||
// next operation should be allowed.
|
||||
func (rw *RetryWaiter) calculateWait() time.Duration {
|
||||
waitTime := rw.minWait
|
||||
if rw.failures > rw.minFailures {
|
||||
shift := rw.failures - rw.minFailures - 1
|
||||
waitTime = rw.maxWait
|
||||
if shift < 31 {
|
||||
waitTime = (1 << shift) * time.Second
|
||||
}
|
||||
if waitTime > rw.maxWait {
|
||||
waitTime = rw.maxWait
|
||||
}
|
||||
|
||||
if rw.jitter != nil {
|
||||
waitTime = rw.jitter.AddJitter(waitTime)
|
||||
}
|
||||
}
|
||||
|
||||
if waitTime < rw.minWait {
|
||||
waitTime = rw.minWait
|
||||
}
|
||||
|
||||
return waitTime
|
||||
}
|
||||
|
||||
// calculates the waitTime and returns a chan
|
||||
// that will become selectable once that amount
|
||||
// of time has elapsed.
|
||||
func (rw *RetryWaiter) wait() <-chan struct{} {
|
||||
waitTime := rw.calculateWait()
|
||||
ch := make(chan struct{})
|
||||
if waitTime > 0 {
|
||||
time.AfterFunc(waitTime, func() { close(ch) })
|
||||
} else {
|
||||
// if there should be 0 wait time then we ensure
|
||||
// that the chan will be immediately selectable
|
||||
close(ch)
|
||||
}
|
||||
return ch
|
||||
}
|
||||
|
||||
// Marks that an operation is successful which resets the failure count.
|
||||
// The chan that is returned will be immediately selectable
|
||||
func (rw *RetryWaiter) Success() <-chan struct{} {
|
||||
rw.Reset()
|
||||
return rw.wait()
|
||||
}
|
||||
|
||||
// Marks that an operation failed. The chan returned will be selectable
|
||||
// once the calculated retry wait amount of time has elapsed
|
||||
func (rw *RetryWaiter) Failed() <-chan struct{} {
|
||||
rw.failures += 1
|
||||
ch := rw.wait()
|
||||
return ch
|
||||
}
|
||||
|
||||
// Resets the internal failure counter
|
||||
func (rw *RetryWaiter) Reset() {
|
||||
rw.failures = 0
|
||||
}
|
||||
|
||||
// WaitIf is a convenice method to record whether the last
|
||||
// operation was a success or failure and return a chan that
|
||||
// will be selectablw when the next operation can be done.
|
||||
func (rw *RetryWaiter) WaitIf(failure bool) <-chan struct{} {
|
||||
if failure {
|
||||
return rw.Failed()
|
||||
}
|
||||
return rw.Success()
|
||||
}
|
||||
|
||||
// WaitIfErr is a convenience method to record whether the last
|
||||
// operation was a success or failure based on whether the err
|
||||
// is nil and then return a chan that will be selectable when
|
||||
// the next operation can be done.
|
||||
func (rw *RetryWaiter) WaitIfErr(err error) <-chan struct{} {
|
||||
return rw.WaitIf(err != nil)
|
||||
}
|
|
@ -0,0 +1,184 @@
|
|||
package lib
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestJitterRandomStagger(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("0 percent", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
jitter := NewJitterRandomStagger(0)
|
||||
for i := 0; i < 10; i++ {
|
||||
baseTime := time.Duration(i) * time.Second
|
||||
require.Equal(t, baseTime, jitter.AddJitter(baseTime))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("10 percent", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
jitter := NewJitterRandomStagger(10)
|
||||
for i := 0; i < 10; i++ {
|
||||
baseTime := 5000 * time.Millisecond
|
||||
maxTime := 5500 * time.Millisecond
|
||||
newTime := jitter.AddJitter(baseTime)
|
||||
require.True(t, newTime > baseTime)
|
||||
require.True(t, newTime <= maxTime)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("100 percent", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
jitter := NewJitterRandomStagger(100)
|
||||
for i := 0; i < 10; i++ {
|
||||
baseTime := 1234 * time.Millisecond
|
||||
maxTime := 2468 * time.Millisecond
|
||||
newTime := jitter.AddJitter(baseTime)
|
||||
require.True(t, newTime > baseTime)
|
||||
require.True(t, newTime <= maxTime)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestRetryWaiter_calculateWait(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("Defaults", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rw := NewRetryWaiter(0, 0, 0, nil)
|
||||
|
||||
require.Equal(t, 0*time.Nanosecond, rw.calculateWait())
|
||||
rw.failures += 1
|
||||
require.Equal(t, 1*time.Second, rw.calculateWait())
|
||||
rw.failures += 1
|
||||
require.Equal(t, 2*time.Second, rw.calculateWait())
|
||||
rw.failures = 31
|
||||
require.Equal(t, defaultMaxWait, rw.calculateWait())
|
||||
})
|
||||
|
||||
t.Run("Minimum Wait", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rw := NewRetryWaiter(0, 5*time.Second, 0, nil)
|
||||
|
||||
require.Equal(t, 5*time.Second, rw.calculateWait())
|
||||
rw.failures += 1
|
||||
require.Equal(t, 5*time.Second, rw.calculateWait())
|
||||
rw.failures += 1
|
||||
require.Equal(t, 5*time.Second, rw.calculateWait())
|
||||
rw.failures += 1
|
||||
require.Equal(t, 5*time.Second, rw.calculateWait())
|
||||
rw.failures += 1
|
||||
require.Equal(t, 8*time.Second, rw.calculateWait())
|
||||
})
|
||||
|
||||
t.Run("Minimum Failures", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rw := NewRetryWaiter(5, 0, 0, nil)
|
||||
require.Equal(t, 0*time.Nanosecond, rw.calculateWait())
|
||||
rw.failures += 5
|
||||
require.Equal(t, 0*time.Nanosecond, rw.calculateWait())
|
||||
rw.failures += 1
|
||||
require.Equal(t, 1*time.Second, rw.calculateWait())
|
||||
})
|
||||
|
||||
t.Run("Maximum Wait", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rw := NewRetryWaiter(0, 0, 5*time.Second, nil)
|
||||
require.Equal(t, 0*time.Nanosecond, rw.calculateWait())
|
||||
rw.failures += 1
|
||||
require.Equal(t, 1*time.Second, rw.calculateWait())
|
||||
rw.failures += 1
|
||||
require.Equal(t, 2*time.Second, rw.calculateWait())
|
||||
rw.failures += 1
|
||||
require.Equal(t, 4*time.Second, rw.calculateWait())
|
||||
rw.failures += 1
|
||||
require.Equal(t, 5*time.Second, rw.calculateWait())
|
||||
rw.failures = 31
|
||||
require.Equal(t, 5*time.Second, rw.calculateWait())
|
||||
})
|
||||
}
|
||||
|
||||
func TestRetryWaiter_WaitChans(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("Minimum Wait - Success", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rw := NewRetryWaiter(0, 250*time.Millisecond, 0, nil)
|
||||
|
||||
select {
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
case <-rw.Success():
|
||||
require.Fail(t, "minimum wait not respected")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Minimum Wait - WaitIf", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rw := NewRetryWaiter(0, 250*time.Millisecond, 0, nil)
|
||||
|
||||
select {
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
case <-rw.WaitIf(false):
|
||||
require.Fail(t, "minimum wait not respected")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Minimum Wait - WaitIfErr", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rw := NewRetryWaiter(0, 250*time.Millisecond, 0, nil)
|
||||
|
||||
select {
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
case <-rw.WaitIfErr(nil):
|
||||
require.Fail(t, "minimum wait not respected")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Maximum Wait - Failed", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rw := NewRetryWaiter(0, 0, 250*time.Millisecond, nil)
|
||||
|
||||
select {
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
require.Fail(t, "maximum wait not respected")
|
||||
case <-rw.Failed():
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Maximum Wait - WaitIf", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rw := NewRetryWaiter(0, 0, 250*time.Millisecond, nil)
|
||||
|
||||
select {
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
require.Fail(t, "maximum wait not respected")
|
||||
case <-rw.WaitIf(true):
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Maximum Wait - WaitIfErr", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rw := NewRetryWaiter(0, 0, 250*time.Millisecond, nil)
|
||||
|
||||
select {
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
require.Fail(t, "maximum wait not respected")
|
||||
case <-rw.WaitIfErr(fmt.Errorf("Fake Error")):
|
||||
}
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue