open-consul/agent/consul/leader_peering.go

748 lines
25 KiB
Go
Raw Permalink Normal View History

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package consul
import (
"container/ring"
"context"
"errors"
"fmt"
"math"
"strings"
"time"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-uuid"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
grpcstatus "google.golang.org/grpc/status"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto/private/pbcommon"
Protobuf Refactoring for Multi-Module Cleanliness (#16302) Protobuf Refactoring for Multi-Module Cleanliness This commit includes the following: Moves all packages that were within proto/ to proto/private Rewrites imports to account for the packages being moved Adds in buf.work.yaml to enable buf workspaces Names the proto-public buf module so that we can override the Go package imports within proto/buf.yaml Bumps the buf version dependency to 1.14.0 (I was trying out the version to see if it would get around an issue - it didn't but it also doesn't break things and it seemed best to keep up with the toolchain changes) Why: In the future we will need to consume other protobuf dependencies such as the Google HTTP annotations for openapi generation or grpc-gateway usage. There were some recent changes to have our own ratelimiting annotations. The two combined were not working when I was trying to use them together (attempting to rebase another branch) Buf workspaces should be the solution to the problem Buf workspaces means that each module will have generated Go code that embeds proto file names relative to the proto dir and not the top level repo root. This resulted in proto file name conflicts in the Go global protobuf type registry. The solution to that was to add in a private/ directory into the path within the proto/ directory. That then required rewriting all the imports. Is this safe? AFAICT yes The gRPC wire protocol doesn't seem to care about the proto file names (although the Go grpc code does tack on the proto file name as Metadata in the ServiceDesc) Other than imports, there were no changes to any generated code as a result of this.
2023-02-17 21:14:46 +00:00
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/hashicorp/consul/proto/private/pbpeerstream"
)
var leaderExportedServicesCountKeyDeprecated = []string{"consul", "peering", "exported_services"}
var leaderExportedServicesCountKey = []string{"peering", "exported_services"}
var leaderHealthyPeeringKeyDeprecated = []string{"consul", "peering", "healthy"}
var leaderHealthyPeeringKey = []string{"peering", "healthy"}
var LeaderPeeringMetrics = []prometheus.GaugeDefinition{
{
Name: leaderExportedServicesCountKeyDeprecated,
Help: fmt.Sprint("Deprecated - please use ", strings.Join(leaderExportedServicesCountKey, "_")),
},
{
Name: leaderExportedServicesCountKey,
Help: "A gauge that tracks how many services are exported for the peering. " +
"The labels are \"peer_name\", \"peer_id\" and, for enterprise, \"partition\". " +
"We emit this metric every 9 seconds",
},
{
Name: leaderHealthyPeeringKeyDeprecated,
Help: fmt.Sprint("Deprecated - please use ", strings.Join(leaderExportedServicesCountKey, "_")),
},
{
Name: leaderHealthyPeeringKey,
Help: "A gauge that tracks how if a peering is healthy (1) or not (0). " +
"The labels are \"peer_name\", \"peer_id\" and, for enterprise, \"partition\". " +
"We emit this metric every 9 seconds",
},
}
var (
// fastConnRetryTimeout is how long we wait between retrying connections following the "fast" path
// which is triggered on specific connection errors.
fastConnRetryTimeout = 8 * time.Millisecond
// maxFastConnRetries is the maximum number of fast connection retries before we follow exponential backoff.
maxFastConnRetries = uint(5)
// maxFastRetryBackoff is the maximum amount of time we'll wait between retries following the fast path.
maxFastRetryBackoff = 8192 * time.Millisecond
// maxRetryBackoffPeering is the maximum number of seconds we'll wait between retries when attempting to re-establish a peering connection.
maxRetryBackoffPeering = 64
)
func (s *Server) startPeeringStreamSync(ctx context.Context) {
s.leaderRoutineManager.Start(ctx, peeringStreamsRoutineName, s.runPeeringSync)
s.leaderRoutineManager.Start(ctx, peeringStreamsMetricsRoutineName, s.runPeeringMetrics)
}
func (s *Server) runPeeringMetrics(ctx context.Context) error {
ticker := time.NewTicker(s.config.MetricsReportingInterval)
defer ticker.Stop()
logger := s.logger.Named(logging.PeeringMetrics)
defaultMetrics := metrics.Default
for {
select {
case <-ctx.Done():
logger.Info("stopping peering metrics")
// "Zero-out" the metric on exit so that when prometheus scrapes this
// metric from a non-leader, it does not get a stale value.
metrics.SetGauge(leaderExportedServicesCountKeyDeprecated, float32(0))
metrics.SetGauge(leaderExportedServicesCountKey, float32(0))
return nil
case <-ticker.C:
if err := s.emitPeeringMetricsOnce(defaultMetrics()); err != nil {
s.logger.Error("error emitting peering stream metrics", "error", err)
}
}
}
}
func (s *Server) emitPeeringMetricsOnce(metricsImpl *metrics.Metrics) error {
_, peers, err := s.fsm.State().PeeringList(nil, *structs.NodeEnterpriseMetaInPartition(structs.WildcardSpecifier))
if err != nil {
return err
}
for _, peer := range peers {
part := peer.Partition
labels := []metrics.Label{
{Name: "peer_name", Value: peer.Name},
{Name: "peer_id", Value: peer.ID},
}
if part != "" {
labels = append(labels, metrics.Label{Name: "partition", Value: part})
}
status, found := s.peerStreamServer.StreamStatus(peer.ID)
if found {
// exported services count metric
esc := status.GetExportedServicesCount()
metricsImpl.SetGaugeWithLabels(leaderExportedServicesCountKeyDeprecated, float32(esc), labels)
metricsImpl.SetGaugeWithLabels(leaderExportedServicesCountKey, float32(esc), labels)
}
// peering health metric
healthy := 0
switch {
case status.NeverConnected:
case s.peerStreamServer.Tracker.IsHealthy(status):
healthy = 1
}
metricsImpl.SetGaugeWithLabels(leaderHealthyPeeringKeyDeprecated, float32(healthy), labels)
metricsImpl.SetGaugeWithLabels(leaderHealthyPeeringKey, float32(healthy), labels)
}
return nil
}
func (s *Server) runPeeringSync(ctx context.Context) error {
logger := s.logger.Named("peering-syncer")
cancelFns := make(map[string]context.CancelFunc)
retryLoopBackoff(ctx, func() error {
if err := s.syncPeeringsAndBlock(ctx, logger, cancelFns); err != nil {
return err
}
return nil
}, func(err error) {
s.logger.Error("error syncing peering streams from state store", "error", err)
})
return nil
}
func (s *Server) stopPeeringStreamSync() {
// will be a no-op when not started
s.leaderRoutineManager.Stop(peeringStreamsRoutineName)
s.leaderRoutineManager.Stop(peeringStreamsMetricsRoutineName)
}
// syncPeeringsAndBlock is a long-running goroutine that is responsible for watching
// changes to peerings in the state store and managing streams to those peers.
func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, cancelFns map[string]context.CancelFunc) error {
// We have to be careful not to introduce a data race here. We want to
// compare the current known peerings in the state store with known
// connected streams to know when we should TERMINATE stray peerings.
//
// If you read the current peerings from the state store, then read the
// current established streams you could lose the data race and have the
// sequence of events be:
//
// 1. list peerings [A,B,C]
// 2. persist new peering [D]
// 3. accept new stream for [D]
// 4. list streams [A,B,C,D]
// 5. terminate [D]
//
// Which is wrong. If we instead ensure that (4) happens before (1), given
// that you can't get an established stream without first passing a "does
// this peering exist in the state store?" inquiry then this happens:
//
// 1. list streams [A,B,C]
// 2. list peerings [A,B,C]
// 3. persist new peering [D]
// 4. accept new stream for [D]
// 5. terminate []
//
// Or even this is fine:
//
// 1. list streams [A,B,C]
// 2. persist new peering [D]
// 3. accept new stream for [D]
// 4. list peerings [A,B,C,D]
// 5. terminate []
connectedStreams := s.peerStreamServer.ConnectedStreams()
state := s.fsm.State()
// Pull the state store contents and set up to block for changes.
ws := memdb.NewWatchSet()
ws.Add(state.AbandonCh())
ws.Add(ctx.Done())
_, peers, err := state.PeeringList(ws, *structs.NodeEnterpriseMetaInPartition(structs.WildcardSpecifier))
if err != nil {
return err
}
// TODO(peering) Adjust this debug info.
// Generate a UUID to trace different passes through this function.
seq, err := uuid.GenerateUUID()
if err != nil {
s.logger.Debug("failed to generate sequence uuid while syncing peerings")
}
logger.Trace("syncing new list of peers", "num_peers", len(peers), "sequence_id", seq)
// Stored tracks the unique set of peers that should be dialed.
// It is used to reconcile the list of active streams.
stored := make(map[string]struct{})
var merr *multierror.Error
// Create connections and streams to peers in the state store that do not have an active stream.
for _, peer := range peers {
logger.Trace("evaluating stored peer", "peer", peer.Name, "should_dial", peer.ShouldDial(), "sequence_id", seq)
if !peer.IsActive() {
// The peering was marked for deletion by ourselves or our peer, no need to dial or track them.
continue
}
// Track all active peerings,since the reconciliation loop below applies to the token generator as well.
stored[peer.ID] = struct{}{}
if !peer.ShouldDial() {
// We do not need to dial peerings where we generated the peering token.
continue
}
// We may have written this peering to the store to trigger xDS updates, but still in the process of establishing.
// If there isn't a secret yet, we're still trying to reach the other server.
logger.Trace("reading peering secret", "sequence_id", seq)
secret, err := s.fsm.State().PeeringSecretsRead(ws, peer.ID)
if err != nil {
return fmt.Errorf("failed to read secret for peering: %w", err)
}
if secret.GetStream().GetActiveSecretID() == "" {
continue
}
status, found := s.peerStreamServer.StreamStatus(peer.ID)
if found && status.Connected {
// Nothing to do when we already have an active stream to the peer.
// Updated data will only be used if the stream becomes disconnected
// since there's no need to tear down an active stream.
continue
}
logger.Trace("ensuring stream to peer", "peer_id", peer.ID, "sequence_id", seq)
if cancel, ok := cancelFns[peer.ID]; ok {
// If the peer is known but we're not connected, clean up the retry-er and start over.
// There may be new data in the state store that would enable us to get out of an error state.
logger.Trace("cancelling context to re-establish stream", "peer_id", peer.ID, "sequence_id", seq)
cancel()
}
if err := s.establishStream(ctx, logger, peer, secret, cancelFns); err != nil {
// TODO(peering): These errors should be reported in the peer status, otherwise they're only in the logs.
// Lockable status isn't available here though. Could report it via the peering.Service?
logger.Error("error establishing peering stream", "peer_id", peer.ID, "error", err)
merr = multierror.Append(merr, err)
// Continue on errors to avoid one bad peering from blocking the establishment and cleanup of others.
continue
}
}
logger.Trace("checking connected streams", "streams", connectedStreams, "sequence_id", seq)
// Clean up active streams of peerings that were deleted from the state store.
for stream, doneCh := range connectedStreams {
if _, ok := stored[stream]; ok {
// Active stream is in the state store, nothing to do.
continue
}
select {
case <-doneCh:
// channel is closed, do nothing to avoid a panic
default:
logger.Trace("tearing down stream for deleted peer", "peer_id", stream, "sequence_id", seq)
close(doneCh)
}
}
logger.Trace("blocking for changes", "sequence_id", seq)
// Block for any changes to the state store.
ws.WatchCtx(ctx)
logger.Trace("unblocked", "sequence_id", seq)
return merr.ErrorOrNil()
}
func (s *Server) establishStream(ctx context.Context,
logger hclog.Logger,
peer *pbpeering.Peering,
secret *pbpeering.PeeringSecrets,
cancelFns map[string]context.CancelFunc) error {
logger = logger.With("peer_name", peer.Name, "peer_id", peer.ID)
if peer.PeerID == "" {
return fmt.Errorf("expected PeerID to be non empty; the wrong end of peering is being dialed")
}
tlsOption, err := peer.TLSDialOption()
if err != nil {
return fmt.Errorf("failed to build TLS dial option from peering: %w", err)
}
if secret.GetStream().GetActiveSecretID() == "" {
return errors.New("missing stream secret for peering stream authorization, peering must be re-established")
}
logger.Trace("establishing stream to peer")
streamStatus, err := s.peerStreamServer.Tracker.Register(peer.ID)
if err != nil {
return fmt.Errorf("failed to register stream: %v", err)
}
streamCtx, cancel := context.WithCancel(ctx)
cancelFns[peer.ID] = cancel
// Start a goroutine to watch for updates to peer server addresses.
// The latest valid server address can be received from nextServerAddr.
nextServerAddr := make(chan string)
go s.watchAddresses(streamCtx, peer.ID, nextServerAddr)
// Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes.
go retryLoopBackoffPeering(streamCtx, logger, func() error {
// Try a new address on each iteration by advancing the ring buffer on errors.
addr, stillOpen := <-nextServerAddr
if !stillOpen {
// If the channel was closed that means the context was canceled, so we return.
return streamCtx.Err()
}
opts := []grpc.DialOption{
tlsOption,
// TODO(peering): Use a grpc.WithStatsHandler here.
// This should wait until the grpc-external server is wired up with a stats handler in NET-50.
// For keep alive parameters there is a larger comment in ClientConnPool.dial about that.
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: 10 * time.Second,
// send keepalive pings even if there is no active streams
PermitWithoutStream: true,
}),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(8*1024*1024), grpc.MaxCallRecvMsgSize(8*1024*1024)),
}
2022-10-13 21:55:55 +00:00
logger.Trace("dialing peer", "addr", addr)
conn, err := grpc.DialContext(streamCtx, addr, opts...)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
defer conn.Close()
client := pbpeerstream.NewPeerStreamServiceClient(conn)
stream, err := client.StreamResources(streamCtx)
if err != nil {
return err
}
initialReq := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Open_{
Open: &pbpeerstream.ReplicationMessage_Open{
PeerID: peer.PeerID,
StreamSecretID: secret.GetStream().GetActiveSecretID(),
Remote: &pbpeering.RemoteInfo{
Partition: peer.Partition,
Datacenter: s.config.Datacenter,
Locality: pbcommon.LocalityToProto(s.config.Locality),
},
},
},
}
if err := stream.Send(initialReq); err != nil {
return fmt.Errorf("failed to send initial stream request: %w", err)
}
streamReq := peerstream.HandleStreamRequest{
LocalID: peer.ID,
RemoteID: peer.PeerID,
PeerName: peer.Name,
Partition: peer.Partition,
Stream: stream,
}
err = s.peerStreamServer.HandleStream(streamReq)
// A nil error indicates that the peering was deleted and the stream needs to be gracefully shutdown.
if err == nil {
stream.CloseSend()
s.peerStreamServer.DrainStream(streamReq)
cancel()
logger.Info("closed outbound stream")
}
return err
}, func(err error) {
// TODO(peering): why are we using TrackSendError here? This could also be a receive error.
streamStatus.TrackSendError(err.Error())
switch {
case isErrCode(err, codes.FailedPrecondition):
logger.Debug("stream disconnected due to 'failed precondition' error; reconnecting",
"error", err)
case isErrCode(err, codes.ResourceExhausted):
logger.Debug("stream disconnected due to 'resource exhausted' error; reconnecting",
"error", err)
case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded):
logger.Debug("stream context was canceled", "error", err)
case err != nil:
logger.Error("error managing peering stream", "error", err)
}
}, peeringRetryTimeout)
return nil
}
// watchAddresses sends an up-to-date address to nextServerAddr.
// These could be either remote peer server addresses, or local mesh gateways.
// The function loads the addresses into a ring buffer and cycles through them until:
// 1. streamCtx is cancelled (peer is deleted or we're re-establishing the stream with new data)
// 2. the peer, Mesh config entry, or (optionally) mesh gateway address set is modified, and the watchset fires.
//
// In case (2) we re-fetch all the data sources and rebuild the ring buffer.
// In the event that the PeerThroughMeshGateways is set in the Mesh entry, we front-load the ring buffer with
// local mesh gateway addresses, so we can try those first, with the option to fall back to remote server addresses.
func (s *Server) watchAddresses(ctx context.Context, peerID string, nextServerAddr chan<- string) {
defer close(nextServerAddr)
var ringbuf *ring.Ring
var ws memdb.WatchSet
fetchAddresses := func() error {
// Re-instantiate ws since it can only be watched once.
ws = memdb.NewWatchSet()
newRing, _, err := s.peeringBackend.GetDialAddresses(s.logger, ws, peerID)
if err != nil {
return fmt.Errorf("failed to fetch updated addresses to dial peer: %w", err)
}
ringbuf = newRing
return nil
}
// Initialize the first ring buffer.
if err := fetchAddresses(); err != nil {
s.logger.Warn("error fetching addresses", "peer_id", peerID, "error", err)
}
for {
select {
case nextServerAddr <- ringbuf.Value.(string):
ringbuf = ringbuf.Next()
case err := <-ws.WatchCh(ctx):
if err != nil {
// Context was cancelled.
return
}
// Watch fired so we re-fetch the necessary addresses and replace the ring buffer.
if err := fetchAddresses(); err != nil {
s.logger.Warn("watch for new addresses fired but the address list to dial may not have been updated",
"peer_id", peerID,
"error", err)
}
}
}
}
func (s *Server) startPeeringDeferredDeletion(ctx context.Context) {
s.leaderRoutineManager.Start(ctx, peeringDeletionRoutineName, s.runPeeringDeletions)
}
// runPeeringDeletions watches for peerings marked for deletions and then cleans up data for them.
func (s *Server) runPeeringDeletions(ctx context.Context) error {
logger := s.loggers.Named(logging.Peering)
// This limiter's purpose is to control the rate of raft applies caused by the deferred deletion
// process. This includes deletion of the peerings themselves in addition to any peering data
raftLimiter := rate.NewLimiter(defaultDeletionApplyRate, int(defaultDeletionApplyRate))
for {
2022-08-25 15:25:59 +00:00
select {
case <-ctx.Done():
return nil
default:
}
ws := memdb.NewWatchSet()
state := s.fsm.State()
_, peerings, err := s.fsm.State().PeeringListDeleted(ws)
if err != nil {
logger.Warn("encountered an error while searching for deleted peerings", "error", err)
continue
}
if len(peerings) == 0 {
ws.Add(state.AbandonCh())
// wait for a peering to be deleted or the routine to be cancelled
if err := ws.WatchCtx(ctx); err != nil {
return err
}
continue
}
for _, p := range peerings {
s.removePeeringAndData(ctx, logger, raftLimiter, p)
}
}
}
// removepPeeringAndData removes data imported for a peering and the peering itself.
func (s *Server) removePeeringAndData(ctx context.Context, logger hclog.Logger, limiter *rate.Limiter, peer *pbpeering.Peering) {
logger = logger.With("peer_name", peer.Name, "peer_id", peer.ID)
entMeta := *structs.NodeEnterpriseMetaInPartition(peer.Partition)
// First delete all imported data.
// By deleting all imported nodes we also delete all services and checks registered on them.
if err := s.deleteAllNodes(ctx, limiter, entMeta, peer.Name); err != nil {
logger.Error("Failed to remove Nodes for peer", "error", err)
return
}
if err := s.deleteTrustBundleFromPeer(ctx, limiter, entMeta, peer.Name); err != nil {
logger.Error("Failed to remove trust bundle for peer", "error", err)
return
}
if err := limiter.Wait(ctx); err != nil {
return
}
if peer.State == pbpeering.PeeringState_TERMINATED {
// For peerings terminated by our peer we only clean up the local data, we do not delete the peering itself.
// This is to avoid a situation where the peering disappears without the local operator's knowledge.
return
}
// Once all imported data is deleted, the peering itself is also deleted.
req := &pbpeering.PeeringDeleteRequest{
Name: peer.Name,
Partition: acl.PartitionOrDefault(peer.Partition),
}
_, err := s.raftApplyProtobuf(structs.PeeringDeleteType, req)
if err != nil {
logger.Error("failed to apply full peering deletion", "error", err)
return
}
}
// deleteAllNodes will delete all nodes in a partition or all nodes imported from a given peer name.
func (s *Server) deleteAllNodes(ctx context.Context, limiter *rate.Limiter, entMeta acl.EnterpriseMeta, peerName string) error {
// Same as ACL batch upsert size
nodeBatchSizeBytes := 256 * 1024
_, nodes, err := s.fsm.State().NodeDump(nil, &entMeta, peerName)
if err != nil {
return err
}
if len(nodes) == 0 {
return nil
}
i := 0
for {
var ops structs.TxnOps
for batchSize := 0; batchSize < nodeBatchSizeBytes && i < len(nodes); i++ {
entry := nodes[i]
op := structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeDelete,
Node: structs.Node{
Node: entry.Node,
Partition: entry.Partition,
PeerName: entry.PeerName,
},
},
}
ops = append(ops, &op)
// Add entries to the transaction until it reaches the max batch size
batchSize += len(entry.Node) + len(entry.Partition) + len(entry.PeerName)
}
// Send each batch as a TXN Req to avoid sending one at a time
req := structs.TxnRequest{
Datacenter: s.config.Datacenter,
Ops: ops,
}
if len(req.Ops) > 0 {
if err := limiter.Wait(ctx); err != nil {
return err
}
_, err := s.raftApplyMsgpack(structs.TxnRequestType, &req)
if err != nil {
return err
}
} else {
break
}
}
return nil
}
// deleteTrustBundleFromPeer deletes the trust bundle imported from a peer, if present.
func (s *Server) deleteTrustBundleFromPeer(ctx context.Context, limiter *rate.Limiter, entMeta acl.EnterpriseMeta, peerName string) error {
_, bundle, err := s.fsm.State().PeeringTrustBundleRead(nil, state.Query{Value: peerName, EnterpriseMeta: entMeta})
if err != nil {
return err
}
if bundle == nil {
return nil
}
if err := limiter.Wait(ctx); err != nil {
return err
}
req := &pbpeering.PeeringTrustBundleDeleteRequest{
Name: peerName,
Partition: entMeta.PartitionOrDefault(),
}
_, err = s.raftApplyProtobuf(structs.PeeringTrustBundleDeleteType, req)
return err
}
// retryLoopBackoffPeering re-runs loopFn with a backoff on error. errFn is run whenever
// loopFn returns an error. retryTimeFn is used to calculate the time between retries on error.
// It is passed the number of errors in a row that loopFn has returned and the latest error
// from loopFn.
//
// This function is modelled off of retryLoopBackoffHandleSuccess but is specific to peering
// because peering needs to use different retry times depending on which error is returned.
// This function doesn't use a rate limiter, unlike retryLoopBackoffHandleSuccess, because
// the rate limiter is only needed in the success case when loopFn returns nil and we want to
// loop again. In the peering case, we exit on a successful loop so we don't need the limter.
func retryLoopBackoffPeering(ctx context.Context, logger hclog.Logger, loopFn func() error, errFn func(error),
retryTimeFn func(failedAttempts uint, loopErr error) time.Duration) {
var failedAttempts uint
var err error
for {
if err = loopFn(); err != nil {
errFn(err)
if failedAttempts < math.MaxUint {
failedAttempts++
}
retryTime := retryTimeFn(failedAttempts, err)
logger.Trace("in connection retry backoff", "delay", retryTime)
timer := time.NewTimer(retryTime)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
continue
}
return
}
}
// peeringRetryTimeout returns the time that should be waited between re-establishing a peering
// connection after an error. We follow the default backoff from retryLoopBackoff
// unless the error is a "failed precondition" error in which case we retry much more quickly.
// Retrying quickly is important in the case of a failed precondition error because we expect it to resolve
// quickly. For example in the case of connecting with a follower through a load balancer, we just need to retry
// until our request lands on a leader.
func peeringRetryTimeout(failedAttempts uint, loopErr error) time.Duration {
if loopErr != nil && isErrCode(loopErr, codes.FailedPrecondition) {
// Wait a constant time for the first number of retries.
if failedAttempts <= maxFastConnRetries {
return fastConnRetryTimeout
}
// From here, follow an exponential backoff maxing out at maxFastRetryBackoff.
// The below equation multiples the constantRetryTimeout by 2^n where n is the number of failed attempts
// we're on, starting at 1 now that we're past our maxFastConnRetries.
// For example if fastConnRetryTimeout == 8ms and maxFastConnRetries == 5, then at 6 failed retries
// we'll do 8ms * 2^1 = 16ms, then 8ms * 2^2 = 32ms, etc.
ms := fastConnRetryTimeout * (1 << (failedAttempts - maxFastConnRetries))
if ms > maxFastRetryBackoff {
return maxFastRetryBackoff
}
return ms
}
// if the message sent is too large probably should not retry at all
if loopErr != nil && isErrCode(loopErr, codes.ResourceExhausted) {
return maxFastRetryBackoff
}
// Else we go with the default backoff from retryLoopBackoff.
if (1 << failedAttempts) < maxRetryBackoffPeering {
return (1 << failedAttempts) * time.Second
}
return time.Duration(maxRetryBackoffPeering) * time.Second
}
// isErrCode returns true if err is a gRPC error with given error code.
func isErrCode(err error, code codes.Code) bool {
if err == nil {
return false
}
// Handle wrapped errors, since status.FromError does a naive assertion.
var statusErr interface {
GRPCStatus() *grpcstatus.Status
}
if errors.As(err, &statusErr) {
return statusErr.GRPCStatus().Code() == code
}
grpcErr, ok := grpcstatus.FromError(err)
if !ok {
return false
}
return grpcErr.Code() == code
}