diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index e2d8b4ef0..5efc52edf 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -114,13 +114,19 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, for _, peer := range peers { logger.Trace("evaluating stored peer", "peer", peer.Name, "should_dial", peer.ShouldDial(), "sequence_id", seq) - if !peer.ShouldDial() { + if !peer.IsActive() { + // The peering is marked for deletion, no need to dial or track them. continue } - // TODO(peering) Account for deleted peers that are still in the state store + // Track all active peerings,since the reconciliation loop below applies to the token generator as well. stored[peer.ID] = struct{}{} + if !peer.ShouldDial() { + // We do not need to dial peerings where we generated the peering token. + continue + } + status, found := s.peeringService.StreamStatus(peer.ID) // TODO(peering): If there is new peering data and a connected stream, should we tear down the stream? @@ -179,6 +185,8 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, } func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer *pbpeering.Peering, cancelFns map[string]context.CancelFunc) error { + logger = logger.With("peer_name", peer.Name, "peer_id", peer.ID) + tlsOption := grpc.WithInsecure() if len(peer.PeerCAPems) > 0 { var haveCerts bool @@ -208,7 +216,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer buffer = buffer.Next() } - logger.Trace("establishing stream to peer", "peer_id", peer.ID) + logger.Trace("establishing stream to peer") retryCtx, cancel := context.WithCancel(ctx) cancelFns[peer.ID] = cancel @@ -224,7 +232,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer return fmt.Errorf("peer server address type %T is not a string", buffer.Value) } - logger.Trace("dialing peer", "peer_id", peer.ID, "addr", addr) + logger.Trace("dialing peer", "addr", addr) conn, err := grpc.DialContext(retryCtx, addr, grpc.WithContextDialer(newPeerDialer(addr)), grpc.WithBlock(), @@ -241,16 +249,23 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer return err } - err = s.peeringService.HandleStream(peering.HandleStreamRequest{ + streamReq := peering.HandleStreamRequest{ LocalID: peer.ID, RemoteID: peer.PeerID, PeerName: peer.Name, Partition: peer.Partition, Stream: stream, - }) + } + err = s.peeringService.HandleStream(streamReq) + // A nil error indicates that the peering was deleted and the stream needs to be gracefully shutdown. if err == nil { + stream.CloseSend() + s.peeringService.DrainStream(streamReq) + // This will cancel the retry-er context, letting us break out of this loop when we want to shut down the stream. cancel() + + logger.Info("closed outbound stream") } return err diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index dd79529b3..feb520e14 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -88,10 +88,12 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) { require.True(r, status.Connected) }) - // Delete the peering to trigger the termination sequence - require.NoError(t, s2.fsm.State().PeeringDelete(2000, state.Query{ - Value: "my-peer-s1", - })) + // Delete the peering to trigger the termination sequence. + deleted := &pbpeering.Peering{ + Name: "my-peer-s1", + DeletedAt: structs.TimeToProto(time.Now()), + } + require.NoError(t, s2.fsm.State().PeeringWrite(2000, deleted)) s2.logger.Trace("deleted peering for my-peer-s1") retry.Run(t, func(r *retry.R) { @@ -175,10 +177,12 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { require.True(r, status.Connected) }) - // Delete the peering from the server peer to trigger the termination sequence - require.NoError(t, s1.fsm.State().PeeringDelete(2000, state.Query{ - Value: "my-peer-s2", - })) + // Delete the peering from the server peer to trigger the termination sequence. + deleted := &pbpeering.Peering{ + Name: "my-peer-s2", + DeletedAt: structs.TimeToProto(time.Now()), + } + require.NoError(t, s1.fsm.State().PeeringWrite(2000, deleted)) s2.logger.Trace("deleted peering for my-peer-s1") retry.Run(t, func(r *retry.R) { @@ -186,7 +190,7 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { require.False(r, found) }) - // s2 should have received the termination message and updated the peering state + // s2 should have received the termination message and updated the peering state. retry.Run(t, func(r *retry.R) { _, peering, err := s2.fsm.State().PeeringRead(nil, state.Query{ Value: "my-peer-s1", diff --git a/agent/consul/state/peering.go b/agent/consul/state/peering.go index 285c0edc3..bfb6f7f82 100644 --- a/agent/consul/state/peering.go +++ b/agent/consul/state/peering.go @@ -17,8 +17,6 @@ import ( const ( tablePeering = "peering" tablePeeringTrustBundles = "peering-trust-bundles" - - indexDeleted = "deleted" ) func peeringTableSchema() *memdb.TableSchema { diff --git a/agent/consul/state/peering_test.go b/agent/consul/state/peering_test.go index 6dd9ef6c0..66c5e7486 100644 --- a/agent/consul/state/peering_test.go +++ b/agent/consul/state/peering_test.go @@ -1051,8 +1051,7 @@ func TestStateStore_PeeringsForService(t *testing.T) { }, { peering: &pbpeering.Peering{ - Name: "peer2", - State: pbpeering.PeeringState_TERMINATED, + Name: "peer2", }, delete: true, }, diff --git a/agent/consul/state/schema.go b/agent/consul/state/schema.go index d863d7efe..c60bea856 100644 --- a/agent/consul/state/schema.go +++ b/agent/consul/state/schema.go @@ -64,7 +64,10 @@ type IndexEntry struct { Value uint64 } -const tableIndex = "index" +const ( + tableIndex = "index" + indexDeleted = "deleted" +) // indexTableSchema returns a new table schema used for tracking various the // latest raft index for a table or entities within a table. diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index cc4e8bd01..c42084cea 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -556,17 +556,24 @@ func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResource // TODO(peering): If the peering is marked as deleted, send a Terminated message and return // TODO(peering): Store subscription request so that an event publisher can separately handle pushing messages for it - s.logger.Info("accepted initial replication request from peer", "peer_id", req.PeerID) + s.logger.Info("accepted initial replication request from peer", "peer_id", p.ID) - // For server peers both of these ID values are the same, because we generated a token with a local ID, - // and the client peer dials using that same ID. - return s.HandleStream(HandleStreamRequest{ + streamReq := HandleStreamRequest{ LocalID: p.ID, RemoteID: p.PeerID, PeerName: p.Name, Partition: p.Partition, Stream: stream, - }) + } + err = s.HandleStream(streamReq) + // A nil error indicates that the peering was deleted and the stream needs to be gracefully shutdown. + if err == nil { + s.DrainStream(streamReq) + return nil + } + + s.logger.Error("error handling stream", "peer_name", p.Name, "peer_id", req.PeerID, "error", err) + return err } type HandleStreamRequest struct { @@ -586,10 +593,28 @@ type HandleStreamRequest struct { Stream BidirectionalStream } +// DrainStream attempts to gracefully drain the stream when the connection is going to be torn down. +// Tearing down the connection too quickly can lead our peer receiving a context cancellation error before the stream termination message. +// Handling the termination message is important to set the expectation that the peering will not be reestablished unless recreated. +func (s *Service) DrainStream(req HandleStreamRequest) { + for { + // Ensure that we read until an error, or the peer has nothing more to send. + if _, err := req.Stream.Recv(); err != nil { + if err != io.EOF { + s.logger.Warn("failed to tear down stream gracefully: peer may not have received termination message", + "peer_name", req.PeerName, "peer_id", req.LocalID, "error", err) + } + break + } + // Since the peering is being torn down we discard all replication messages without an error. + // We want to avoid importing new data at this point. + } +} + // The localID provided is the locally-generated identifier for the peering. // The remoteID is an identifier that the remote peer recognizes for the peering. func (s *Service) HandleStream(req HandleStreamRequest) error { - logger := s.logger.Named("stream").With("peer_id", req.LocalID) + logger := s.logger.Named("stream").With("peer_name", req.PeerName, "peer_id", req.LocalID) logger.Trace("handling stream for peer") status, err := s.streams.connected(req.LocalID) @@ -646,25 +671,20 @@ func (s *Service) HandleStream(req HandleStreamRequest) error { defer close(recvChan) for { msg, err := req.Stream.Recv() + if err == nil { + logTraceRecv(logger, msg) + recvChan <- msg + continue + } + if err == io.EOF { logger.Info("stream ended by peer") status.trackReceiveError(err.Error()) return } - if e, ok := grpcstatus.FromError(err); ok { - // Cancelling the stream is not an error, that means we or our peer intended to terminate the peering. - if e.Code() == codes.Canceled { - return - } - } - if err != nil { - logger.Error("failed to receive from stream", "error", err) - status.trackReceiveError(err.Error()) - return - } - - logTraceRecv(logger, msg) - recvChan <- msg + logger.Error("failed to receive from stream", "error", err) + status.trackReceiveError(err.Error()) + return } }() @@ -693,13 +713,12 @@ func (s *Service) HandleStream(req HandleStreamRequest) error { case msg, open := <-recvChan: if !open { - // No longer receiving data on the stream. + logger.Trace("no longer receiving data on the stream") return nil } if !s.Backend.IsLeader() { // we are not the leader anymore so we will hang up on the dialer - logger.Error("node is not a leader anymore; cannot continue streaming") st, err := grpcstatus.New(codes.FailedPrecondition, @@ -750,11 +769,11 @@ func (s *Service) HandleStream(req HandleStreamRequest) error { } if term := msg.GetTerminated(); term != nil { - logger.Info("received peering termination message, cleaning up imported resources") + logger.Info("peering was deleted by our peer: marking peering as terminated and cleaning up imported resources") // Once marked as terminated, a separate deferred deletion routine will clean up imported resources. if err := s.Backend.Apply().PeeringTerminateByID(&pbpeering.PeeringTerminateByIDRequest{ID: req.LocalID}); err != nil { - return err + logger.Error("failed to mark peering as terminated: %w", err) } return nil } diff --git a/proto/pbpeering/peering.go b/proto/pbpeering/peering.go index 1a0a734f2..f090ebef7 100644 --- a/proto/pbpeering/peering.go +++ b/proto/pbpeering/peering.go @@ -88,7 +88,7 @@ func (msg *EstablishRequest) Timeout(rpcHoldTimeout time.Duration, maxQueryTime // If we generated a token for this peer we did not store our server addresses under PeerServerAddresses. // These server addresses are for dialing, and only the peer initiating the peering will do the dialing. func (p *Peering) ShouldDial() bool { - return len(p.PeerServerAddresses) > 0 && p.State != PeeringState_TERMINATED + return len(p.PeerServerAddresses) > 0 } func (x ReplicationMessage_Response_Operation) GoString() string { @@ -178,6 +178,9 @@ func PeeringStateFromAPI(t api.PeeringState) PeeringState { } func (p *Peering) IsActive() bool { + if p != nil && p.State == PeeringState_TERMINATED { + return false + } if p == nil || p.DeletedAt == nil { return true }