34140ff3e0
Previously, public referred to gRPC services that are both exposed on the dedicated gRPC port and have their definitions in the proto-public directory (so were considered usable by 3rd parties). Whereas private referred to services on the multiplexed server port that are only usable by agents and other servers. Now, we're splitting these definitions, such that external/internal refers to the port and public/private refers to whether they can be used by 3rd parties. This is necessary because the peering replication API needs to be exposed on the dedicated port, but is not (yet) suitable for use by 3rd parties.
423 lines
14 KiB
Go
423 lines
14 KiB
Go
package peerstream
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
|
|
"github.com/golang/protobuf/jsonpb"
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/hashicorp/go-hclog"
|
|
"google.golang.org/genproto/googleapis/rpc/code"
|
|
"google.golang.org/grpc/codes"
|
|
grpcstatus "google.golang.org/grpc/status"
|
|
|
|
"github.com/hashicorp/consul/agent/connect"
|
|
external "github.com/hashicorp/consul/agent/grpc-external"
|
|
"github.com/hashicorp/consul/proto/pbpeering"
|
|
"github.com/hashicorp/consul/proto/pbpeerstream"
|
|
)
|
|
|
|
type BidirectionalStream interface {
|
|
Send(*pbpeerstream.ReplicationMessage) error
|
|
Recv() (*pbpeerstream.ReplicationMessage, error)
|
|
Context() context.Context
|
|
}
|
|
|
|
// StreamResources handles incoming streaming connections.
|
|
func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamResourcesServer) error {
|
|
logger := s.Logger.Named("stream-resources").With("request_id", external.TraceID())
|
|
|
|
logger.Trace("Started processing request")
|
|
defer logger.Trace("Finished processing request")
|
|
|
|
// NOTE: this code should have similar error handling to the new-request
|
|
// handling code in HandleStream()
|
|
|
|
if !s.Backend.IsLeader() {
|
|
// we are not the leader so we will hang up on the dialer
|
|
|
|
logger.Error("cannot establish a peering stream on a follower node")
|
|
|
|
st, err := grpcstatus.New(codes.FailedPrecondition,
|
|
"cannot establish a peering stream on a follower node").WithDetails(
|
|
&pbpeerstream.LeaderAddress{Address: s.Backend.GetLeaderAddress()})
|
|
if err != nil {
|
|
logger.Error(fmt.Sprintf("failed to marshal the leader address in response; err: %v", err))
|
|
return grpcstatus.Error(codes.FailedPrecondition, "cannot establish a peering stream on a follower node")
|
|
} else {
|
|
return st.Err()
|
|
}
|
|
}
|
|
|
|
// Initial message on a new stream must be a new subscription request.
|
|
first, err := stream.Recv()
|
|
if err != nil {
|
|
logger.Error("failed to establish stream", "error", err)
|
|
return err
|
|
}
|
|
|
|
// TODO(peering) Make request contain a list of resources, so that roots and services can be
|
|
// subscribed to with a single request. See:
|
|
// https://github.com/envoyproxy/data-plane-api/blob/main/envoy/service/discovery/v3/discovery.proto#L46
|
|
req := first.GetRequest()
|
|
if req == nil {
|
|
return grpcstatus.Error(codes.InvalidArgument, "first message when initiating a peering must be a subscription request")
|
|
}
|
|
logger.Trace("received initial replication request from peer")
|
|
logTraceRecv(logger, req)
|
|
|
|
if req.PeerID == "" {
|
|
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request must specify a PeerID")
|
|
}
|
|
if req.ResponseNonce != "" {
|
|
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request must not contain a nonce")
|
|
}
|
|
if req.Error != nil {
|
|
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request must not contain an error")
|
|
}
|
|
if !pbpeerstream.KnownTypeURL(req.ResourceURL) {
|
|
return grpcstatus.Errorf(codes.InvalidArgument, "subscription request to unknown resource URL: %s", req.ResourceURL)
|
|
}
|
|
|
|
_, p, err := s.GetStore().PeeringReadByID(nil, req.PeerID)
|
|
if err != nil {
|
|
logger.Error("failed to look up peer", "peer_id", req.PeerID, "error", err)
|
|
return grpcstatus.Error(codes.Internal, "failed to find PeerID: "+req.PeerID)
|
|
}
|
|
if p == nil {
|
|
return grpcstatus.Error(codes.InvalidArgument, "initial subscription for unknown PeerID: "+req.PeerID)
|
|
}
|
|
|
|
// TODO(peering): If the peering is marked as deleted, send a Terminated message and return
|
|
// TODO(peering): Store subscription request so that an event publisher can separately handle pushing messages for it
|
|
logger.Info("accepted initial replication request from peer", "peer_id", p.ID)
|
|
|
|
if p.PeerID != "" {
|
|
return grpcstatus.Error(codes.InvalidArgument, "expected PeerID to be empty; the wrong end of peering is being dialed")
|
|
}
|
|
|
|
streamReq := HandleStreamRequest{
|
|
LocalID: p.ID,
|
|
RemoteID: "",
|
|
PeerName: p.Name,
|
|
Partition: p.Partition,
|
|
Stream: stream,
|
|
}
|
|
err = s.HandleStream(streamReq)
|
|
// A nil error indicates that the peering was deleted and the stream needs to be gracefully shutdown.
|
|
if err == nil {
|
|
s.DrainStream(streamReq)
|
|
return nil
|
|
}
|
|
|
|
logger.Error("error handling stream", "peer_name", p.Name, "peer_id", req.PeerID, "error", err)
|
|
return err
|
|
}
|
|
|
|
type HandleStreamRequest struct {
|
|
// LocalID is the UUID for the peering in the local Consul datacenter.
|
|
LocalID string
|
|
|
|
// RemoteID is the UUID for the peering from the perspective of the peer.
|
|
RemoteID string
|
|
|
|
// PeerName is the name of the peering.
|
|
PeerName string
|
|
|
|
// Partition is the local partition associated with the peer.
|
|
Partition string
|
|
|
|
// Stream is the open stream to the peer cluster.
|
|
Stream BidirectionalStream
|
|
}
|
|
|
|
func (r HandleStreamRequest) WasDialed() bool {
|
|
return r.RemoteID == ""
|
|
}
|
|
|
|
// DrainStream attempts to gracefully drain the stream when the connection is going to be torn down.
|
|
// Tearing down the connection too quickly can lead our peer receiving a context cancellation error before the stream termination message.
|
|
// Handling the termination message is important to set the expectation that the peering will not be reestablished unless recreated.
|
|
func (s *Server) DrainStream(req HandleStreamRequest) {
|
|
for {
|
|
// Ensure that we read until an error, or the peer has nothing more to send.
|
|
if _, err := req.Stream.Recv(); err != nil {
|
|
if err != io.EOF {
|
|
s.Logger.Warn("failed to tear down stream gracefully: peer may not have received termination message",
|
|
"peer_name", req.PeerName, "peer_id", req.LocalID, "error", err)
|
|
}
|
|
break
|
|
}
|
|
// Since the peering is being torn down we discard all replication messages without an error.
|
|
// We want to avoid importing new data at this point.
|
|
}
|
|
}
|
|
|
|
// The localID provided is the locally-generated identifier for the peering.
|
|
// The remoteID is an identifier that the remote peer recognizes for the peering.
|
|
func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
|
|
// TODO: pass logger down from caller?
|
|
logger := s.Logger.Named("stream").
|
|
With("peer_name", streamReq.PeerName).
|
|
With("peer_id", streamReq.LocalID).
|
|
With("dialed", streamReq.WasDialed())
|
|
logger.Trace("handling stream for peer")
|
|
|
|
status, err := s.Tracker.Connected(streamReq.LocalID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to register stream: %v", err)
|
|
}
|
|
|
|
// TODO(peering) Also need to clear subscriptions associated with the peer
|
|
defer s.Tracker.Disconnected(streamReq.LocalID)
|
|
|
|
var trustDomain string
|
|
if s.ConnectEnabled {
|
|
// Read the TrustDomain up front - we do not allow users to change the ClusterID
|
|
// so reading it once at the beginning of the stream is sufficient.
|
|
trustDomain, err = getTrustDomain(s.GetStore(), logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
mgr := newSubscriptionManager(
|
|
streamReq.Stream.Context(),
|
|
logger,
|
|
s.Config,
|
|
trustDomain,
|
|
s.Backend,
|
|
s.GetStore,
|
|
)
|
|
subCh := mgr.subscribe(streamReq.Stream.Context(), streamReq.LocalID, streamReq.PeerName, streamReq.Partition)
|
|
|
|
sub := makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{
|
|
ResourceURL: pbpeerstream.TypeURLService,
|
|
PeerID: streamReq.RemoteID,
|
|
})
|
|
logTraceSend(logger, sub)
|
|
|
|
if err := streamReq.Stream.Send(sub); err != nil {
|
|
if err == io.EOF {
|
|
logger.Info("stream ended by peer")
|
|
status.TrackReceiveError(err.Error())
|
|
return nil
|
|
}
|
|
// TODO(peering) Test error handling in calls to Send/Recv
|
|
status.TrackSendError(err.Error())
|
|
return fmt.Errorf("failed to send to stream: %v", err)
|
|
}
|
|
|
|
// TODO(peering): Should this be buffered?
|
|
recvChan := make(chan *pbpeerstream.ReplicationMessage)
|
|
go func() {
|
|
defer close(recvChan)
|
|
for {
|
|
msg, err := streamReq.Stream.Recv()
|
|
if err == nil {
|
|
logTraceRecv(logger, msg)
|
|
recvChan <- msg
|
|
continue
|
|
}
|
|
|
|
if err == io.EOF {
|
|
logger.Info("stream ended by peer")
|
|
status.TrackReceiveError(err.Error())
|
|
return
|
|
}
|
|
logger.Error("failed to receive from stream", "error", err)
|
|
status.TrackReceiveError(err.Error())
|
|
return
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
// When the doneCh is closed that means that the peering was deleted locally.
|
|
case <-status.Done():
|
|
logger.Info("ending stream")
|
|
|
|
term := &pbpeerstream.ReplicationMessage{
|
|
Payload: &pbpeerstream.ReplicationMessage_Terminated_{
|
|
Terminated: &pbpeerstream.ReplicationMessage_Terminated{},
|
|
},
|
|
}
|
|
logTraceSend(logger, term)
|
|
|
|
if err := streamReq.Stream.Send(term); err != nil {
|
|
status.TrackSendError(err.Error())
|
|
return fmt.Errorf("failed to send to stream: %v", err)
|
|
}
|
|
|
|
logger.Trace("deleting stream status")
|
|
s.Tracker.DeleteStatus(streamReq.LocalID)
|
|
|
|
return nil
|
|
|
|
case msg, open := <-recvChan:
|
|
if !open {
|
|
logger.Trace("no longer receiving data on the stream")
|
|
return nil
|
|
}
|
|
|
|
// NOTE: this code should have similar error handling to the
|
|
// initial handling code in StreamResources()
|
|
|
|
if !s.Backend.IsLeader() {
|
|
// we are not the leader anymore so we will hang up on the dialer
|
|
logger.Error("node is not a leader anymore; cannot continue streaming")
|
|
|
|
st, err := grpcstatus.New(codes.FailedPrecondition,
|
|
"node is not a leader anymore; cannot continue streaming").WithDetails(
|
|
&pbpeerstream.LeaderAddress{Address: s.Backend.GetLeaderAddress()})
|
|
if err != nil {
|
|
logger.Error(fmt.Sprintf("failed to marshal the leader address in response; err: %v", err))
|
|
return grpcstatus.Error(codes.FailedPrecondition, "node is not a leader anymore; cannot continue streaming")
|
|
} else {
|
|
return st.Err()
|
|
}
|
|
}
|
|
|
|
if req := msg.GetRequest(); req != nil {
|
|
if !pbpeerstream.KnownTypeURL(req.ResourceURL) {
|
|
return grpcstatus.Errorf(codes.InvalidArgument, "subscription request to unknown resource URL: %s", req.ResourceURL)
|
|
}
|
|
switch {
|
|
case req.ResponseNonce == "":
|
|
// TODO(peering): This can happen on a client peer since they don't try to receive subscriptions before entering HandleStream.
|
|
// Should change that behavior or only allow it that one time.
|
|
|
|
case req.Error != nil && (req.Error.Code != int32(code.Code_OK) || req.Error.Message != ""):
|
|
logger.Warn("client peer was unable to apply resource", "code", req.Error.Code, "error", req.Error.Message)
|
|
status.TrackNack(fmt.Sprintf("client peer was unable to apply resource: %s", req.Error.Message))
|
|
|
|
default:
|
|
status.TrackAck()
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
if resp := msg.GetResponse(); resp != nil {
|
|
// TODO(peering): Ensure there's a nonce
|
|
reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, resp)
|
|
if err != nil {
|
|
logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID)
|
|
status.TrackReceiveError(err.Error())
|
|
} else {
|
|
status.TrackReceiveSuccess()
|
|
}
|
|
|
|
logTraceSend(logger, reply)
|
|
if err := streamReq.Stream.Send(reply); err != nil {
|
|
status.TrackSendError(err.Error())
|
|
return fmt.Errorf("failed to send to stream: %v", err)
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
if term := msg.GetTerminated(); term != nil {
|
|
logger.Info("peering was deleted by our peer: marking peering as terminated and cleaning up imported resources")
|
|
|
|
// Once marked as terminated, a separate deferred deletion routine will clean up imported resources.
|
|
if err := s.Backend.PeeringTerminateByID(&pbpeering.PeeringTerminateByIDRequest{ID: streamReq.LocalID}); err != nil {
|
|
logger.Error("failed to mark peering as terminated: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
case update := <-subCh:
|
|
var resp *pbpeerstream.ReplicationMessage_Response
|
|
switch {
|
|
case strings.HasPrefix(update.CorrelationID, subExportedService):
|
|
resp, err = makeServiceResponse(logger, update)
|
|
if err != nil {
|
|
// Log the error and skip this response to avoid locking up peering due to a bad update event.
|
|
logger.Error("failed to create service response", "error", err)
|
|
continue
|
|
}
|
|
|
|
case strings.HasPrefix(update.CorrelationID, subMeshGateway):
|
|
// TODO(Peering): figure out how to sync this separately
|
|
|
|
case update.CorrelationID == subCARoot:
|
|
resp, err = makeCARootsResponse(logger, update)
|
|
if err != nil {
|
|
// Log the error and skip this response to avoid locking up peering due to a bad update event.
|
|
logger.Error("failed to create ca roots response", "error", err)
|
|
continue
|
|
}
|
|
|
|
default:
|
|
logger.Warn("unrecognized update type from subscription manager: " + update.CorrelationID)
|
|
continue
|
|
}
|
|
if resp == nil {
|
|
continue
|
|
}
|
|
|
|
replResp := makeReplicationResponse(resp)
|
|
|
|
logTraceSend(logger, replResp)
|
|
if err := streamReq.Stream.Send(replResp); err != nil {
|
|
status.TrackSendError(err.Error())
|
|
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func getTrustDomain(store StateStore, logger hclog.Logger) (string, error) {
|
|
_, cfg, err := store.CAConfig(nil)
|
|
switch {
|
|
case err != nil:
|
|
logger.Error("failed to read Connect CA Config", "error", err)
|
|
return "", grpcstatus.Error(codes.Internal, "failed to read Connect CA Config")
|
|
case cfg == nil:
|
|
logger.Warn("cannot begin stream because Connect CA is not yet initialized")
|
|
return "", grpcstatus.Error(codes.FailedPrecondition, "Connect CA is not yet initialized")
|
|
}
|
|
return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil
|
|
}
|
|
|
|
func (s *Server) StreamStatus(peer string) (resp Status, found bool) {
|
|
return s.Tracker.StreamStatus(peer)
|
|
}
|
|
|
|
// ConnectedStreams returns a map of connected stream IDs to the corresponding channel for tearing them down.
|
|
func (s *Server) ConnectedStreams() map[string]chan struct{} {
|
|
return s.Tracker.ConnectedStreams()
|
|
}
|
|
|
|
func logTraceRecv(logger hclog.Logger, pb proto.Message) {
|
|
logTraceProto(logger, pb, true)
|
|
}
|
|
|
|
func logTraceSend(logger hclog.Logger, pb proto.Message) {
|
|
logTraceProto(logger, pb, false)
|
|
}
|
|
|
|
func logTraceProto(logger hclog.Logger, pb proto.Message, received bool) {
|
|
if !logger.IsTrace() {
|
|
return
|
|
}
|
|
|
|
dir := "sent"
|
|
if received {
|
|
dir = "received"
|
|
}
|
|
|
|
m := jsonpb.Marshaler{
|
|
Indent: " ",
|
|
}
|
|
out, err := m.MarshalToString(pb)
|
|
if err != nil {
|
|
out = "<ERROR: " + err.Error() + ">"
|
|
}
|
|
|
|
logger.Trace("replication message", "direction", dir, "protobuf", out)
|
|
}
|