Replace ring buffer with async version (#14314)

We need to watch for changes to peerings and update the server addresses which get served by the ring buffer.

Also, if there is an active connection for a peer, we are getting up-to-date server addresses from the replication stream and can safely ignore the token's addresses which may be stale.
This commit is contained in:
Chris S. Kim 2022-08-26 10:27:13 -04:00 committed by GitHub
parent f64af3be24
commit a8090268d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 233 additions and 29 deletions

View File

@ -295,13 +295,6 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
return fmt.Errorf("failed to build TLS dial option from peering: %w", err)
}
// Create a ring buffer to cycle through peer addresses in the retry loop below.
buffer := ring.New(len(peer.PeerServerAddresses))
for _, addr := range peer.PeerServerAddresses {
buffer.Value = addr
buffer = buffer.Next()
}
secret, err := s.fsm.State().PeeringSecretsRead(ws, peer.ID)
if err != nil {
return fmt.Errorf("failed to read secret for peering: %w", err)
@ -312,27 +305,26 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
logger.Trace("establishing stream to peer")
retryCtx, cancel := context.WithCancel(ctx)
cancelFns[peer.ID] = cancel
streamStatus, err := s.peerStreamTracker.Register(peer.ID)
if err != nil {
return fmt.Errorf("failed to register stream: %v", err)
}
streamCtx, cancel := context.WithCancel(ctx)
cancelFns[peer.ID] = cancel
// Start a goroutine to watch for updates to peer server addresses.
// The latest valid server address can be received from nextServerAddr.
nextServerAddr := make(chan string)
go s.watchPeerServerAddrs(streamCtx, peer, nextServerAddr)
// Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes.
go retryLoopBackoffPeering(retryCtx, logger, func() error {
go retryLoopBackoffPeering(streamCtx, logger, func() error {
// Try a new address on each iteration by advancing the ring buffer on errors.
defer func() {
buffer = buffer.Next()
}()
addr, ok := buffer.Value.(string)
if !ok {
return fmt.Errorf("peer server address type %T is not a string", buffer.Value)
}
addr := <-nextServerAddr
logger.Trace("dialing peer", "addr", addr)
conn, err := grpc.DialContext(retryCtx, addr,
conn, err := grpc.DialContext(streamCtx, addr,
// TODO(peering): use a grpc.WithStatsHandler here?)
tlsOption,
// For keep alive parameters there is a larger comment in ClientConnPool.dial about that.
@ -349,7 +341,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
defer conn.Close()
client := pbpeerstream.NewPeerStreamServiceClient(conn)
stream, err := client.StreamResources(retryCtx)
stream, err := client.StreamResources(streamCtx)
if err != nil {
return err
}
@ -397,6 +389,74 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
return nil
}
// watchPeerServerAddrs sends an up-to-date peer server address to nextServerAddr.
// It loads the server addresses into a ring buffer and cycles through them until:
// 1. streamCtx is cancelled (peer is deleted)
// 2. the peer is modified and the watchset fires.
//
// In case (2) we refetch the peering and rebuild the ring buffer.
func (s *Server) watchPeerServerAddrs(ctx context.Context, peer *pbpeering.Peering, nextServerAddr chan<- string) {
defer close(nextServerAddr)
// we initialize the ring buffer with the peer passed to `establishStream`
// because the caller has pre-checked `peer.ShouldDial`, guaranteeing
// at least one server address.
//
// IMPORTANT: ringbuf must always be length > 0 or else `<-nextServerAddr` may block.
ringbuf := ring.New(len(peer.PeerServerAddresses))
for _, addr := range peer.PeerServerAddresses {
ringbuf.Value = addr
ringbuf = ringbuf.Next()
}
innerWs := memdb.NewWatchSet()
_, _, err := s.fsm.State().PeeringReadByID(innerWs, peer.ID)
if err != nil {
s.logger.Warn("failed to watch for changes to peer; server addresses may become stale over time.",
"peer_id", peer.ID,
"error", err)
}
fetchAddrs := func() error {
// reinstantiate innerWs to prevent it from growing indefinitely
innerWs = memdb.NewWatchSet()
_, peering, err := s.fsm.State().PeeringReadByID(innerWs, peer.ID)
if err != nil {
return fmt.Errorf("failed to fetch peer %q: %w", peer.ID, err)
}
if !peering.IsActive() {
return fmt.Errorf("peer %q is no longer active", peer.ID)
}
if len(peering.PeerServerAddresses) == 0 {
return fmt.Errorf("peer %q has no addresses to dial", peer.ID)
}
ringbuf = ring.New(len(peering.PeerServerAddresses))
for _, addr := range peering.PeerServerAddresses {
ringbuf.Value = addr
ringbuf = ringbuf.Next()
}
return nil
}
for {
select {
case nextServerAddr <- ringbuf.Value.(string):
ringbuf = ringbuf.Next()
case err := <-innerWs.WatchCh(ctx):
if err != nil {
// context was cancelled
return
}
// watch fired so we refetch the peering and rebuild the ring buffer
if err := fetchAddrs(); err != nil {
s.logger.Warn("watchset for peer was fired but failed to update server addresses",
"peer_id", peer.ID,
"error", err)
}
}
}
}
func (s *Server) startPeeringDeferredDeletion(ctx context.Context) {
s.leaderRoutineManager.Start(ctx, peeringDeletionRoutineName, s.runPeeringDeletions)
}

View File

@ -18,6 +18,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
@ -25,6 +26,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types"
@ -1375,3 +1377,138 @@ func Test_isFailedPreconditionErr(t *testing.T) {
werr := fmt.Errorf("wrapped: %w", err)
assert.True(t, isFailedPreconditionErr(werr))
}
func Test_Leader_PeeringSync_ServerAddressUpdates(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
// We want 1s retries for this test
orig := maxRetryBackoff
maxRetryBackoff = 1
t.Cleanup(func() { maxRetryBackoff = orig })
_, acceptor := testServerWithConfig(t, func(c *Config) {
c.NodeName = "acceptor"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
})
testrpc.WaitForLeader(t, acceptor.RPC, "dc1")
// Create a peering by generating a token
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
acceptorClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-dialer",
}
resp, err := acceptorClient.GenerateToken(ctx, &req)
require.NoError(t, err)
// Bring up dialer and establish a peering with acceptor's token so that it attempts to dial.
_, dialer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialer"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
})
testrpc.WaitForLeader(t, dialer.RPC, "dc2")
// Create a peering at dialer by establishing a peering with acceptor's token
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
dialerClient := pbpeering.NewPeeringServiceClient(conn)
establishReq := pbpeering.EstablishRequest{
PeerName: "my-peer-acceptor",
PeeringToken: resp.PeeringToken,
}
_, err = dialerClient.Establish(ctx, &establishReq)
require.NoError(t, err)
p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"})
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID)
require.True(r, found)
require.True(r, status.Connected)
})
testutil.RunStep(t, "calling establish with active connection does not overwrite server addresses", func(t *testing.T) {
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
// generate a new token from the acceptor
req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-dialer",
}
resp, err := acceptorClient.GenerateToken(ctx, &req)
require.NoError(t, err)
token, err := acceptor.peeringBackend.DecodeToken([]byte(resp.PeeringToken))
require.NoError(t, err)
// we will update the token with bad addresses to assert it doesn't clobber existing ones
token.ServerAddresses = []string{"1.2.3.4:1234"}
badToken, err := acceptor.peeringBackend.EncodeToken(token)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
// Try establishing.
// This call will only succeed if the bad address was not used in the calls to exchange the peering secret.
establishReq := pbpeering.EstablishRequest{
PeerName: "my-peer-acceptor",
PeeringToken: string(badToken),
}
_, err = dialerClient.Establish(ctx, &establishReq)
require.NoError(t, err)
p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"})
require.NoError(t, err)
require.NotContains(t, p.Peering.PeerServerAddresses, "1.2.3.4:1234")
})
testutil.RunStep(t, "updated server addresses are picked up by the leader", func(t *testing.T) {
// force close the acceptor's gRPC server so the dialier retries with a new address.
acceptor.externalGRPCServer.Stop()
clone := proto.Clone(p.Peering)
updated := clone.(*pbpeering.Peering)
// start with a bad address so we can assert for a specific error
updated.PeerServerAddresses = append([]string{
"bad",
}, p.Peering.PeerServerAddresses...)
// this write will wake up the watch on the leader to refetch server addresses
require.NoError(t, dialer.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{Peering: updated}))
retry.Run(t, func(r *retry.R) {
status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID)
require.True(r, found)
// We assert for this error to be set which would indicate that we iterated
// through a bad address.
require.Contains(r, status.LastSendErrorMessage, "transport: Error while dialing dial tcp: address bad: missing port in address")
require.False(r, status.Connected)
})
})
}

View File

@ -7,12 +7,13 @@ import (
"strings"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/maps"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/go-memdb"
)
const (
@ -981,7 +982,7 @@ func peeringsForServiceTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, en
if idx > maxIdx {
maxIdx = idx
}
if peering == nil || !peering.IsActive() {
if !peering.IsActive() {
continue
}
peerings = append(peerings, peering)

View File

@ -8,7 +8,6 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/proto/pbpeerstream"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
@ -27,6 +26,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbpeerstream"
)
var (
@ -379,6 +379,7 @@ func (s *Server) Establish(
}
var id string
serverAddrs := tok.ServerAddresses
if existing == nil {
id, err = lib.GenerateUUID(s.Backend.CheckPeeringUUID)
if err != nil {
@ -386,6 +387,11 @@ func (s *Server) Establish(
}
} else {
id = existing.ID
// If there is a connected stream, assume that the existing ServerAddresses
// are up to date and do not try to overwrite them with the token's addresses.
if status, ok := s.Tracker.StreamStatus(id); ok && status.Connected {
serverAddrs = existing.PeerServerAddresses
}
}
// validate that this peer name is not being used as an acceptor already
@ -397,7 +403,7 @@ func (s *Server) Establish(
ID: id,
Name: req.PeerName,
PeerCAPems: tok.CA,
PeerServerAddresses: tok.ServerAddresses,
PeerServerAddresses: serverAddrs,
PeerServerName: tok.ServerName,
PeerID: tok.PeerID,
Meta: req.Meta,
@ -418,9 +424,9 @@ func (s *Server) Establish(
}
var exchangeResp *pbpeerstream.ExchangeSecretResponse
// Loop through the token's addresses once, attempting to fetch the long-lived stream secret.
// Loop through the known server addresses once, attempting to fetch the long-lived stream secret.
var dialErrors error
for _, addr := range peering.PeerServerAddresses {
for _, addr := range serverAddrs {
exchangeResp, err = exchangeSecret(ctx, addr, tlsOption, &exchangeReq)
if err != nil {
dialErrors = multierror.Append(dialErrors, fmt.Errorf("failed to exchange peering secret with %q: %w", addr, err))
@ -720,7 +726,7 @@ func (s *Server) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDelete
return nil, err
}
if existing == nil || !existing.IsActive() {
if !existing.IsActive() {
// Return early when the Peering doesn't exist or is already marked for deletion.
// We don't return nil because the pb will fail to marshal.
return &pbpeering.PeeringDeleteResponse{}, nil

View File

@ -143,10 +143,10 @@ func PeeringStateFromAPI(t api.PeeringState) PeeringState {
}
func (p *Peering) IsActive() bool {
if p != nil && p.State == PeeringState_TERMINATED {
if p == nil || p.State == PeeringState_TERMINATED {
return false
}
if p == nil || p.DeletedAt == nil {
if p.DeletedAt == nil {
return true
}