Merge pull request #14694 from hashicorp/peering/mgw-addrs

This commit is contained in:
Freddy 2022-10-03 12:35:11 -06:00 committed by GitHub
commit 7ea726ed1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 353 additions and 84 deletions

View File

@ -9,10 +9,12 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
"github.com/hashicorp/consul/agent/rpc/peering"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/proto/pbpeering"
)
@ -57,9 +59,38 @@ func (b *PeeringBackend) GetAgentCACertificates() ([]string, error) {
return b.srv.tlsConfigurator.GRPCManualCAPems(), nil
}
// GetServerAddresses looks up server node addresses from the state store.
// GetServerAddresses looks up server or mesh gateway addresses from the state store.
func (b *PeeringBackend) GetServerAddresses() ([]string, error) {
state := b.srv.fsm.State()
_, rawEntry, err := b.srv.fsm.State().ConfigEntry(nil, structs.MeshConfig, structs.MeshConfigMesh, acl.DefaultEnterpriseMeta())
if err != nil {
return nil, fmt.Errorf("failed to read mesh config entry: %w", err)
}
meshConfig, ok := rawEntry.(*structs.MeshConfigEntry)
if ok && meshConfig.Peering != nil && meshConfig.Peering.PeerThroughMeshGateways {
return meshGatewayAdresses(b.srv.fsm.State())
}
return serverAddresses(b.srv.fsm.State())
}
func meshGatewayAdresses(state *state.Store) ([]string, error) {
_, nodes, err := state.ServiceDump(nil, structs.ServiceKindMeshGateway, true, acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword)
if err != nil {
return nil, fmt.Errorf("failed to dump gateway addresses: %w", err)
}
var addrs []string
for _, node := range nodes {
_, addr, port := node.BestAddress(true)
addrs = append(addrs, ipaddr.FormatAddressPort(addr, port))
}
if len(addrs) == 0 {
return nil, fmt.Errorf("servers are configured to PeerThroughMeshGateways, but no mesh gateway instances are registered")
}
return addrs, nil
}
func serverAddresses(state *state.Store) ([]string, error) {
_, nodes, err := state.ServiceNodes(nil, "consul", structs.DefaultEnterpriseMetaInDefaultPartition(), structs.DefaultPeerKeyword)
if err != nil {
return nil, err

View File

@ -2,10 +2,14 @@ package consul
import (
"context"
"fmt"
"net"
"testing"
"time"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/types"
"github.com/stretchr/testify/require"
gogrpc "google.golang.org/grpc"
@ -17,7 +21,9 @@ import (
)
func TestPeeringBackend_ForwardToLeader(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, conf1 := testServerConfig(t)
server1, err := newServer(t, conf1)
@ -60,6 +66,83 @@ func TestPeeringBackend_ForwardToLeader(t *testing.T) {
})
}
func TestPeeringBackend_GetServerAddresses(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, cfg := testServerConfig(t)
cfg.GRPCTLSPort = freeport.GetOne(t)
srv, err := newServer(t, cfg)
require.NoError(t, err)
testrpc.WaitForLeader(t, srv.RPC, "dc1")
backend := NewPeeringBackend(srv)
testutil.RunStep(t, "peer to servers", func(t *testing.T) {
addrs, err := backend.GetServerAddresses()
require.NoError(t, err)
expect := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCTLSPort)
require.Equal(t, []string{expect}, addrs)
})
testutil.RunStep(t, "existence of mesh config entry is not enough to peer through gateways", func(t *testing.T) {
mesh := structs.MeshConfigEntry{
// Enable unrelated config.
TransparentProxy: structs.TransparentProxyMeshConfig{
MeshDestinationsOnly: true,
},
}
require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh))
addrs, err := backend.GetServerAddresses()
require.NoError(t, err)
// Still expect server address because PeerThroughMeshGateways was not enabled.
expect := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCTLSPort)
require.Equal(t, []string{expect}, addrs)
})
testutil.RunStep(t, "cannot peer through gateways without registered gateways", func(t *testing.T) {
mesh := structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{PeerThroughMeshGateways: true},
}
require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh))
addrs, err := backend.GetServerAddresses()
require.Nil(t, addrs)
testutil.RequireErrorContains(t, err,
"servers are configured to PeerThroughMeshGateways, but no mesh gateway instances are registered")
})
testutil.RunStep(t, "peer through mesh gateways", func(t *testing.T) {
reg := structs.RegisterRequest{
ID: types.NodeID("b5489ca9-f5e9-4dba-a779-61fec4e8e364"),
Node: "gw-node",
Address: "1.2.3.4",
TaggedAddresses: map[string]string{
structs.TaggedAddressWAN: "172.217.22.14",
},
Service: &structs.NodeService{
ID: "mesh-gateway",
Service: "mesh-gateway",
Kind: structs.ServiceKindMeshGateway,
Port: 443,
TaggedAddresses: map[string]structs.ServiceAddress{
structs.TaggedAddressWAN: {Address: "154.238.12.252", Port: 8443},
},
},
}
require.NoError(t, srv.fsm.State().EnsureRegistration(2, &reg))
addrs, err := backend.GetServerAddresses()
require.NoError(t, err)
require.Equal(t, []string{"154.238.12.252:8443"}, addrs)
})
}
func newServerDialer(serverAddr string) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, addr string) (net.Conn, error) {
d := net.Dialer{}
@ -79,7 +162,9 @@ func newServerDialer(serverAddr string) func(context.Context, string) (net.Conn,
}
func TestPeerStreamService_ForwardToLeader(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, conf1 := testServerConfig(t)
server1, err := newServer(t, conf1)

View File

@ -150,7 +150,6 @@ func (m *CertManager) watchServerToken(ctx context.Context) {
// Cancel existing the leaf cert watch and spin up new one any time the server token changes.
// The watch needs the current token as set by the leader since certificate signing requests go to the leader.
fmt.Println("canceling and resetting")
cancel()
notifyCtx, cancel = context.WithCancel(ctx)

View File

@ -123,5 +123,6 @@ type StateStore interface {
CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error)
ConfigEntry(ws memdb.WatchSet, kind, name string, entMeta *acl.EnterpriseMeta) (uint64, structs.ConfigEntry, error)
AbandonCh() <-chan struct{}
}

View File

@ -640,9 +640,6 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
continue
}
case strings.HasPrefix(update.CorrelationID, subMeshGateway):
// TODO(Peering): figure out how to sync this separately
case update.CorrelationID == subCARoot:
resp, err = makeCARootsResponse(update)
if err != nil {

View File

@ -98,25 +98,25 @@ func (m *subscriptionManager) syncViaBlockingQuery(
ws.Add(store.AbandonCh())
ws.Add(ctx.Done())
if result, err := queryFn(ctx, store, ws); err != nil {
if result, err := queryFn(ctx, store, ws); err != nil && ctx.Err() == nil {
logger.Error("failed to sync from query", "error", err)
} else {
// Block for any changes to the state store.
updateCh <- cache.UpdateEvent{
CorrelationID: correlationID,
Result: result,
select {
case <-ctx.Done():
return
case updateCh <- cache.UpdateEvent{CorrelationID: correlationID, Result: result}:
}
// Block for any changes to the state store.
ws.WatchCtx(ctx)
}
if err := waiter.Wait(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
logger.Error("failed to wait before re-trying sync", "error", err)
}
select {
case <-ctx.Done():
err := waiter.Wait(ctx)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
default:
} else if err != nil {
logger.Error("failed to wait before re-trying sync", "error", err)
}
}
}

View File

@ -6,9 +6,13 @@ import (
"fmt"
"strconv"
"strings"
"time"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
@ -247,16 +251,10 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
pending := &pendingPayload{}
// Directly replicate information about our mesh gateways to the consuming side.
// TODO(peering): should we scrub anything before replicating this?
if err := pending.Add(meshGatewayPayloadID, u.CorrelationID, csn); err != nil {
return err
}
if state.exportList != nil {
// Trigger public events for all synthetic discovery chain replies.
for chainName, info := range state.connectServices {
m.emitEventForDiscoveryChain(ctx, state, pending, chainName, info)
m.collectPendingEventForDiscoveryChain(ctx, state, pending, chainName, info)
}
}
@ -490,7 +488,7 @@ func (m *subscriptionManager) syncDiscoveryChains(
state.connectServices[chainName] = info
m.emitEventForDiscoveryChain(ctx, state, pending, chainName, info)
m.collectPendingEventForDiscoveryChain(ctx, state, pending, chainName, info)
}
// if it was dropped, try to emit an DELETE event
@ -517,7 +515,7 @@ func (m *subscriptionManager) syncDiscoveryChains(
}
}
func (m *subscriptionManager) emitEventForDiscoveryChain(
func (m *subscriptionManager) collectPendingEventForDiscoveryChain(
ctx context.Context,
state *subscriptionState,
pending *pendingPayload,
@ -738,32 +736,118 @@ func (m *subscriptionManager) notifyServerAddrUpdates(
ctx context.Context,
updateCh chan<- cache.UpdateEvent,
) {
// Wait until this is subscribed-to.
// Wait until server address updates are subscribed-to.
select {
case <-m.serverAddrsSubReady:
case <-ctx.Done():
return
}
var idx uint64
// TODO(peering): retry logic; fail past a threshold
for {
var err error
// Typically, this function will block inside `m.subscribeServerAddrs` and only return on error.
// Errors are logged and the watch is retried.
idx, err = m.subscribeServerAddrs(ctx, idx, updateCh)
if errors.Is(err, stream.ErrSubForceClosed) {
m.logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt resume")
} else if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
m.logger.Warn("failed to subscribe to server addresses, will attempt resume", "error", err.Error())
} else {
m.logger.Trace(err.Error())
}
configNotifyCh := m.notifyMeshConfigUpdates(ctx)
// Intentionally initialized to empty values.
// These are set after the first mesh config entry update arrives.
var queryCtx context.Context
cancel := func() {}
useGateways := false
for {
select {
case <-ctx.Done():
cancel()
return
case event := <-configNotifyCh:
entry, ok := event.Result.(*structs.MeshConfigEntry)
if event.Result != nil && !ok {
m.logger.Error(fmt.Sprintf("saw unexpected type %T for mesh config entry: falling back to pushing direct server addresses", event.Result))
}
if entry != nil && entry.Peering != nil && entry.Peering.PeerThroughMeshGateways {
useGateways = true
} else {
useGateways = false
}
// Cancel and re-set watches based on the updated config entry.
cancel()
queryCtx, cancel = context.WithCancel(ctx)
if useGateways {
go m.notifyServerMeshGatewayAddresses(queryCtx, updateCh)
} else {
go m.ensureServerAddrSubscription(queryCtx, updateCh)
}
}
}
}
func (m *subscriptionManager) notifyMeshConfigUpdates(ctx context.Context) <-chan cache.UpdateEvent {
const meshConfigWatch = "mesh-config-entry"
notifyCh := make(chan cache.UpdateEvent, 1)
go m.syncViaBlockingQuery(ctx, meshConfigWatch, func(ctx_ context.Context, store StateStore, ws memdb.WatchSet) (interface{}, error) {
_, rawEntry, err := store.ConfigEntry(ws, structs.MeshConfig, structs.MeshConfigMesh, acl.DefaultEnterpriseMeta())
if err != nil {
return nil, fmt.Errorf("failed to get mesh config entry: %w", err)
}
return rawEntry, nil
}, meshConfigWatch, notifyCh)
return notifyCh
}
func (m *subscriptionManager) notifyServerMeshGatewayAddresses(ctx context.Context, updateCh chan<- cache.UpdateEvent) {
m.syncViaBlockingQuery(ctx, "mesh-gateways", func(ctx context.Context, store StateStore, ws memdb.WatchSet) (interface{}, error) {
_, nodes, err := store.ServiceDump(ws, structs.ServiceKindMeshGateway, true, acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword)
if err != nil {
return nil, fmt.Errorf("failed to watch mesh gateways services for servers: %w", err)
}
var gatewayAddrs []string
for _, csn := range nodes {
_, addr, port := csn.BestAddress(true)
gatewayAddrs = append(gatewayAddrs, ipaddr.FormatAddressPort(addr, port))
}
if len(gatewayAddrs) == 0 {
return nil, errors.New("configured to peer through mesh gateways but no mesh gateways are registered")
}
// We may return an empty list if there are no gateway addresses.
return &pbpeering.PeeringServerAddresses{
Addresses: gatewayAddrs,
}, nil
}, subServerAddrs, updateCh)
}
func (m *subscriptionManager) ensureServerAddrSubscription(ctx context.Context, updateCh chan<- cache.UpdateEvent) {
waiter := &retry.Waiter{
MinFailures: 1,
Factor: 500 * time.Millisecond,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
}
logger := m.logger.With("queryType", "server-addresses")
var idx uint64
for {
var err error
idx, err = m.subscribeServerAddrs(ctx, idx, updateCh)
if errors.Is(err, stream.ErrSubForceClosed) {
logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt resume")
} else if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
logger.Warn("failed to subscribe to server addresses, will attempt resume", "error", err.Error())
} else if err != nil {
logger.Trace(err.Error())
return
}
if err := waiter.Wait(ctx); err != nil {
return
default:
}
}
}
@ -826,17 +910,22 @@ func (m *subscriptionManager) subscribeServerAddrs(
grpcAddr := srv.Address + ":" + strconv.Itoa(srv.ExtGRPCPort)
serverAddrs = append(serverAddrs, grpcAddr)
}
if len(serverAddrs) == 0 {
m.logger.Warn("did not find any server addresses with external gRPC ports to publish")
continue
}
updateCh <- cache.UpdateEvent{
u := cache.UpdateEvent{
CorrelationID: subServerAddrs,
Result: &pbpeering.PeeringServerAddresses{
Addresses: serverAddrs,
},
}
select {
case <-ctx.Done():
return 0, ctx.Err()
case updateCh <- u:
}
}
}

View File

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@ -49,10 +50,7 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
subCh := mgr.subscribe(ctx, id, "my-peering", partition)
var (
gatewayCorrID = subMeshGateway + partition
mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String()
mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String()
mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String()
)
@ -60,11 +58,7 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
checkExportedServices(t, got, []string{})
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, gatewayCorrID, 0)
},
)
})
// Initially add in L4 failover so that later we can test removing it. We
// cannot do the other way around because it would fail validation to
@ -300,17 +294,6 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
},
}, res.Nodes[0])
},
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, gatewayCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1)
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("mgw", "10.1.1.1", partition),
Service: pbService("mesh-gateway", "gateway-1", "gateway", 8443, nil),
}, res.Nodes[0])
},
)
})
@ -434,13 +417,6 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, gatewayCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 0)
},
)
@ -506,8 +482,6 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
backend.ensureService(t, "zip", mongo.Service)
var (
gatewayCorrID = subMeshGateway + partition
mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String()
mongoCorrID = subExportedService + structs.NewServiceName("mongo", nil).String()
chainCorrID = subExportedService + structs.NewServiceName("chain", nil).String()
@ -521,9 +495,6 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
checkExportedServices(t, got, []string{})
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, gatewayCorrID, 0)
})
// At this point in time we'll have a mesh-gateway notification with no
@ -597,9 +568,6 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlProxyCorrID, 1, "mysql-sidecar-proxy", string(structs.ServiceKindConnectProxy))
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, gatewayCorrID, 1, "gateway", string(structs.ServiceKindMeshGateway))
},
)
})
}
@ -741,6 +709,102 @@ func TestSubscriptionManager_ServerAddrs(t *testing.T) {
},
)
})
testutil.RunStep(t, "flipped to peering through mesh gateways", func(t *testing.T) {
require.NoError(t, backend.store.EnsureConfigEntry(1, &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
}))
select {
case <-time.After(100 * time.Millisecond):
case <-subCh:
t.Fatal("expected to time out: no mesh gateways are registered")
}
})
testutil.RunStep(t, "registered and received a mesh gateway", func(t *testing.T) {
reg := structs.RegisterRequest{
ID: types.NodeID("b5489ca9-f5e9-4dba-a779-61fec4e8e364"),
Node: "gw-node",
Address: "1.2.3.4",
TaggedAddresses: map[string]string{
structs.TaggedAddressWAN: "172.217.22.14",
},
Service: &structs.NodeService{
ID: "mesh-gateway",
Service: "mesh-gateway",
Kind: structs.ServiceKindMeshGateway,
Port: 443,
TaggedAddresses: map[string]structs.ServiceAddress{
structs.TaggedAddressWAN: {Address: "154.238.12.252", Port: 8443},
},
},
}
require.NoError(t, backend.store.EnsureRegistration(2, &reg))
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, subServerAddrs, got.CorrelationID)
addrs, ok := got.Result.(*pbpeering.PeeringServerAddresses)
require.True(t, ok)
require.Equal(t, []string{"154.238.12.252:8443"}, addrs.GetAddresses())
},
)
})
testutil.RunStep(t, "registered and received a second mesh gateway", func(t *testing.T) {
reg := structs.RegisterRequest{
ID: types.NodeID("e4cc0af3-5c09-4ddf-94a9-5840e427bc45"),
Node: "gw-node-2",
Address: "1.2.3.5",
TaggedAddresses: map[string]string{
structs.TaggedAddressWAN: "172.217.22.15",
},
Service: &structs.NodeService{
ID: "mesh-gateway",
Service: "mesh-gateway",
Kind: structs.ServiceKindMeshGateway,
Port: 443,
},
}
require.NoError(t, backend.store.EnsureRegistration(3, &reg))
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, subServerAddrs, got.CorrelationID)
addrs, ok := got.Result.(*pbpeering.PeeringServerAddresses)
require.True(t, ok)
require.Equal(t, []string{"154.238.12.252:8443", "172.217.22.15:443"}, addrs.GetAddresses())
},
)
})
testutil.RunStep(t, "disabled peering through gateways and received server addresses", func(t *testing.T) {
require.NoError(t, backend.store.EnsureConfigEntry(4, &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: false,
},
}))
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, subServerAddrs, got.CorrelationID)
addrs, ok := got.Result.(*pbpeering.PeeringServerAddresses)
require.True(t, ok)
// New subscriptions receive a snapshot from the event publisher.
// At the start of the test the handler registered a mock that only returns a single address.
require.Equal(t, []string{"198.18.0.1:8502"}, addrs.GetAddresses())
},
)
})
}
type testSubscriptionBackend struct {

View File

@ -117,7 +117,8 @@ type Backend interface {
// GetAgentCACertificates returns the CA certificate to be returned in the peering token data
GetAgentCACertificates() ([]string, error)
// GetServerAddresses returns the addresses used for establishing a peering connection
// GetServerAddresses returns the addresses used for establishing a peering connection.
// These may be server addresses or mesh gateway addresses if peering through mesh gateways.
GetServerAddresses() ([]string, error)
// GetServerName returns the SNI to be returned in the peering token data which

View File

@ -96,7 +96,9 @@ func (w *Waiter) Failures() int {
// Every call to Wait increments the failures count, so Reset must be called
// after Wait when there wasn't a failure.
//
// Wait will return ctx.Err() if the context is cancelled.
// The only non-nil error that Wait returns will come from ctx.Err(),
// such as when the context is canceled. This makes it suitable for
// long-running routines that do not get re-initialized, such as replication.
func (w *Waiter) Wait(ctx context.Context) error {
w.failures++
timer := time.NewTimer(w.delay())