From bec4df0679ac6d5f00199a8276b3c6ebcef318c4 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Fri, 15 Jul 2022 15:03:40 -0500 Subject: [PATCH] peerstream: require a resource subscription to receive updates of that type (#13767) This mimics xDS's discovery protocol where you must request a resource explicitly for the exporting side to send those events to you. As part of this I aligned the overall ResourceURL with the TypeURL that gets embedded into the encoded protobuf Any construct. The CheckServiceNodes is now wrapped in a better named "ExportedService" struct now. --- .../services/peerstream/replication.go | 51 +-- .../services/peerstream/stream_resources.go | 196 +++++++++-- .../services/peerstream/stream_test.go | 304 ++++++++++-------- .../peerstream/subscription_blocking.go | 14 + .../peerstream/subscription_manager.go | 37 ++- .../peerstream/subscription_manager_test.go | 23 +- .../services/peerstream/testing.go | 38 ++- proto/pbpeerstream/convert.go | 25 ++ proto/pbpeerstream/peerstream.pb.binary.go | 10 + proto/pbpeerstream/peerstream.pb.go | 289 ++++++++++------- proto/pbpeerstream/peerstream.proto | 6 + proto/pbpeerstream/types.go | 12 +- proto/pbservice/convert.go | 19 -- proto/prototest/testing.go | 57 ++++ 14 files changed, 736 insertions(+), 345 deletions(-) create mode 100644 proto/pbpeerstream/convert.go diff --git a/agent/grpc-external/services/peerstream/replication.go b/agent/grpc-external/services/peerstream/replication.go index e21b48a63..c69d705d3 100644 --- a/agent/grpc-external/services/peerstream/replication.go +++ b/agent/grpc-external/services/peerstream/replication.go @@ -5,10 +5,9 @@ import ( "fmt" "strings" - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" "github.com/hashicorp/go-hclog" "google.golang.org/genproto/googleapis/rpc/code" + newproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "github.com/hashicorp/consul/agent/cache" @@ -39,7 +38,16 @@ func makeServiceResponse( logger hclog.Logger, update cache.UpdateEvent, ) (*pbpeerstream.ReplicationMessage_Response, error) { - any, csn, err := marshalToProtoAny[*pbservice.IndexedCheckServiceNodes](update.Result) + csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) + if !ok { + return nil, fmt.Errorf("invalid type for service response: %T", update.Result) + } + + export := &pbpeerstream.ExportedService{ + Nodes: csn.Nodes, + } + + any, err := anypb.New(export) if err != nil { return nil, fmt.Errorf("failed to marshal: %w", err) } @@ -53,9 +61,9 @@ func makeServiceResponse( // // We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that. // Case #1 is a no-op for the importing peer. - if len(csn.Nodes) == 0 { + if len(export.Nodes) == 0 { return &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, // TODO(peering): Nonce management Nonce: "", ResourceID: serviceName, @@ -65,7 +73,7 @@ func makeServiceResponse( // If there are nodes in the response, we push them as an UPSERT operation. return &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, // TODO(peering): Nonce management Nonce: "", ResourceID: serviceName, @@ -84,7 +92,7 @@ func makeCARootsResponse( } return &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLRoots, + ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle, // TODO(peering): Nonce management Nonce: "", ResourceID: "roots", @@ -97,13 +105,13 @@ func makeCARootsResponse( // the protobuf.Any type, the asserted T type, and any errors // during marshalling or type assertion. // `in` MUST be of type T or it returns an error. -func marshalToProtoAny[T proto.Message](in any) (*anypb.Any, T, error) { +func marshalToProtoAny[T newproto.Message](in any) (*anypb.Any, T, error) { typ, ok := in.(T) if !ok { var outType T return nil, typ, fmt.Errorf("input type is not %T: %T", outType, in) } - any, err := ptypes.MarshalAny(typ) + any, err := anypb.New(typ) if err != nil { return nil, typ, err } @@ -186,20 +194,23 @@ func (s *Server) handleUpsert( resource *anypb.Any, logger hclog.Logger, ) error { + if resource.TypeUrl != resourceURL { + return fmt.Errorf("mismatched resourceURL %q and Any typeUrl %q", resourceURL, resource.TypeUrl) + } + switch resourceURL { - case pbpeerstream.TypeURLService: + case pbpeerstream.TypeURLExportedService: sn := structs.ServiceNameFromString(resourceID) sn.OverridePartition(partition) - csn := &pbservice.IndexedCheckServiceNodes{} - if err := ptypes.UnmarshalAny(resource, csn); err != nil { + export := &pbpeerstream.ExportedService{} + if err := resource.UnmarshalTo(export); err != nil { return fmt.Errorf("failed to unmarshal resource: %w", err) } - err := s.handleUpdateService(peerName, partition, sn, csn) + err := s.handleUpdateService(peerName, partition, sn, export) if err != nil { - logger.Error("did not increment imported services count", "service_name", sn.String(), "error", err) - return err + return fmt.Errorf("did not increment imported services count for service=%q: %w", sn.String(), err) } logger.Trace("incrementing imported services count", "service_name", sn.String()) @@ -207,9 +218,9 @@ func (s *Server) handleUpsert( return nil - case pbpeerstream.TypeURLRoots: + case pbpeerstream.TypeURLPeeringTrustBundle: roots := &pbpeering.PeeringTrustBundle{} - if err := ptypes.UnmarshalAny(resource, roots); err != nil { + if err := resource.UnmarshalTo(roots); err != nil { return fmt.Errorf("failed to unmarshal resource: %w", err) } @@ -232,7 +243,7 @@ func (s *Server) handleUpdateService( peerName string, partition string, sn structs.ServiceName, - pbNodes *pbservice.IndexedCheckServiceNodes, + export *pbpeerstream.ExportedService, ) error { // Capture instances in the state store for reconciliation later. _, storedInstances, err := s.GetStore().CheckServiceNodes(nil, sn.Name, &sn.EnterpriseMeta, peerName) @@ -240,7 +251,7 @@ func (s *Server) handleUpdateService( return fmt.Errorf("failed to read imported services: %w", err) } - structsNodes, err := pbNodes.CheckServiceNodesToStruct() + structsNodes, err := export.CheckServiceNodesToStruct() if err != nil { return fmt.Errorf("failed to convert protobuf instances to structs: %w", err) } @@ -444,7 +455,7 @@ func (s *Server) handleDelete( logger hclog.Logger, ) error { switch resourceURL { - case pbpeerstream.TypeURLService: + case pbpeerstream.TypeURLExportedService: sn := structs.ServiceNameFromString(resourceID) sn.OverridePartition(partition) diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 57bc350c0..26d5a7b00 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -9,7 +9,6 @@ import ( "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/hashicorp/go-hclog" - "google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" @@ -99,11 +98,12 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes } streamReq := HandleStreamRequest{ - LocalID: p.ID, - RemoteID: "", - PeerName: p.Name, - Partition: p.Partition, - Stream: stream, + LocalID: p.ID, + RemoteID: "", + PeerName: p.Name, + Partition: p.Partition, + InitialResourceURL: req.ResourceURL, + Stream: stream, } err = s.HandleStream(streamReq) // A nil error indicates that the peering was deleted and the stream needs to be gracefully shutdown. @@ -129,6 +129,9 @@ type HandleStreamRequest struct { // Partition is the local partition associated with the peer. Partition string + // InitialResourceURL is the ResourceURL from the initial Request. + InitialResourceURL string + // Stream is the open stream to the peer cluster. Stream BidirectionalStream } @@ -183,6 +186,13 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error { } } + remoteSubTracker := newResourceSubscriptionTracker() + if streamReq.InitialResourceURL != "" { + if remoteSubTracker.Subscribe(streamReq.InitialResourceURL) { + logger.Info("subscribing to resource type", "resourceURL", streamReq.InitialResourceURL) + } + } + mgr := newSubscriptionManager( streamReq.Stream.Context(), logger, @@ -190,24 +200,31 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error { trustDomain, s.Backend, s.GetStore, + remoteSubTracker, ) subCh := mgr.subscribe(streamReq.Stream.Context(), streamReq.LocalID, streamReq.PeerName, streamReq.Partition) - sub := makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, - PeerID: streamReq.RemoteID, - }) - logTraceSend(logger, sub) + // Subscribe to all relevant resource types. + for _, resourceURL := range []string{ + pbpeerstream.TypeURLExportedService, + pbpeerstream.TypeURLPeeringTrustBundle, + } { + sub := makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{ + ResourceURL: resourceURL, + PeerID: streamReq.RemoteID, + }) + logTraceSend(logger, sub) - if err := streamReq.Stream.Send(sub); err != nil { - if err == io.EOF { - logger.Info("stream ended by peer") - status.TrackReceiveError(err.Error()) - return nil + if err := streamReq.Stream.Send(sub); err != nil { + if err == io.EOF { + logger.Info("stream ended by peer") + status.TrackReceiveError(err.Error()) + return nil + } + // TODO(peering) Test error handling in calls to Send/Recv + status.TrackSendError(err.Error()) + return fmt.Errorf("failed to send subscription for %q to stream: %w", resourceURL, err) } - // TODO(peering) Test error handling in calls to Send/Recv - status.TrackSendError(err.Error()) - return fmt.Errorf("failed to send to stream: %v", err) } // TODO(peering): Should this be buffered? @@ -289,17 +306,86 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error { if !pbpeerstream.KnownTypeURL(req.ResourceURL) { return grpcstatus.Errorf(codes.InvalidArgument, "subscription request to unknown resource URL: %s", req.ResourceURL) } - switch { - case req.ResponseNonce == "": - // TODO(peering): This can happen on a client peer since they don't try to receive subscriptions before entering HandleStream. - // Should change that behavior or only allow it that one time. - case req.Error != nil && (req.Error.Code != int32(code.Code_OK) || req.Error.Message != ""): + // There are different formats of requests depending upon where in the stream lifecycle we are. + // + // 1. Initial Request: This is the first request being received + // FROM the establishing peer. This is handled specially in + // (*Server).StreamResources BEFORE calling + // (*Server).HandleStream. This takes care of determining what + // the PeerID is for the stream. This is ALSO treated as (2) below. + // + // 2. Subscription Request: This is the first request for a + // given ResourceURL within a stream. The Initial Request (1) + // is always one of these as well. + // + // These must contain a valid ResourceURL with no Error or + // ResponseNonce set. + // + // It is valid to subscribe to the same ResourceURL twice + // within the lifetime of a stream, but all duplicate + // subscriptions are treated as no-ops upon receipt. + // + // 3. ACK Request: This is the message sent in reaction to an + // earlier Response to indicate that the response was processed + // by the other side successfully. + // + // These must contain a ResponseNonce and no Error. + // + // 4. NACK Request: This is the message sent in reaction to an + // earlier Response to indicate that the response was NOT + // processed by the other side successfully. + // + // These must contain a ResponseNonce and an Error. + // + if !remoteSubTracker.IsSubscribed(req.ResourceURL) { + // This must be a new subscription request to add a new + // resource type, vet it like a new request. + + if !streamReq.WasDialed() { + if req.PeerID != "" && req.PeerID != streamReq.RemoteID { + // Not necessary after the first request from the dialer, + // but if provided must match. + return grpcstatus.Errorf(codes.InvalidArgument, + "initial subscription requests for a resource type must have consistent PeerID values: got=%q expected=%q", + req.PeerID, + streamReq.RemoteID, + ) + } + } + if req.ResponseNonce != "" { + return grpcstatus.Error(codes.InvalidArgument, "initial subscription requests for a resource type must not contain a nonce") + } + if req.Error != nil { + return grpcstatus.Error(codes.InvalidArgument, "initial subscription request for a resource type must not contain an error") + } + + if remoteSubTracker.Subscribe(req.ResourceURL) { + logger.Info("subscribing to resource type", "resourceURL", req.ResourceURL) + } + status.TrackAck() + continue + } + + // At this point we have a valid ResourceURL and we are subscribed to it. + + switch { + case req.ResponseNonce == "" && req.Error != nil: + return grpcstatus.Error(codes.InvalidArgument, "initial subscription request for a resource type must not contain an error") + + case req.ResponseNonce != "" && req.Error == nil: // ACK + // TODO(peering): handle ACK fully + status.TrackAck() + + case req.ResponseNonce != "" && req.Error != nil: // NACK + // TODO(peering): handle NACK fully logger.Warn("client peer was unable to apply resource", "code", req.Error.Code, "error", req.Error.Message) status.TrackNack(fmt.Sprintf("client peer was unable to apply resource: %s", req.Error.Message)) default: - status.TrackAck() + // This branch might be dead code, but it could also happen + // during a stray 're-subscribe' so just ignore the + // message. } continue @@ -425,3 +511,63 @@ func logTraceProto(logger hclog.Logger, pb proto.Message, received bool) { logger.Trace("replication message", "direction", dir, "protobuf", out) } + +// resourceSubscriptionTracker is used to keep track of the ResourceURLs that a +// stream has subscribed to and can notify you when a subscription comes in by +// closing the channels returned by SubscribedChan. +type resourceSubscriptionTracker struct { + // notifierMap keeps track of a notification channel for each resourceURL. + // Keys may exist in here even when they do not exist in 'subscribed' as + // calling SubscribedChan has to possibly create and and hand out a + // notification channel in advance of any notification. + notifierMap map[string]chan struct{} + + // subscribed is a set that keeps track of resourceURLs that are currently + // subscribed to. Keys are never deleted. If a key is present in this map + // it is also present in 'notifierMap'. + subscribed map[string]struct{} +} + +func newResourceSubscriptionTracker() *resourceSubscriptionTracker { + return &resourceSubscriptionTracker{ + subscribed: make(map[string]struct{}), + notifierMap: make(map[string]chan struct{}), + } +} + +// IsSubscribed returns true if the given ResourceURL has an active subscription. +func (t *resourceSubscriptionTracker) IsSubscribed(resourceURL string) bool { + _, ok := t.subscribed[resourceURL] + return ok +} + +// Subscribe subscribes to the given ResourceURL. It will return true if this +// was the FIRST time a subscription occurred. It will also close the +// notification channel associated with this ResourceURL. +func (t *resourceSubscriptionTracker) Subscribe(resourceURL string) bool { + if _, ok := t.subscribed[resourceURL]; ok { + return false + } + t.subscribed[resourceURL] = struct{}{} + + // and notify + ch := t.ensureNotifierChan(resourceURL) + close(ch) + + return true +} + +// SubscribedChan returns a channel that will be closed when the ResourceURL is +// subscribed using the Subscribe method. +func (t *resourceSubscriptionTracker) SubscribedChan(resourceURL string) <-chan struct{} { + return t.ensureNotifierChan(resourceURL) +} + +func (t *resourceSubscriptionTracker) ensureNotifierChan(resourceURL string) chan struct{} { + if ch, ok := t.notifierMap[resourceURL]; ok { + return ch + } + ch := make(chan struct{}) + t.notifierMap[resourceURL] = ch + return ch +} diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index 1074f5960..1e3117ecc 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -12,15 +12,14 @@ import ( "testing" "time" - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" - "github.com/golang/protobuf/ptypes/any" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" "google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" @@ -97,18 +96,6 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) { backend.leaderAddr = "expected:address" }) - client := NewMockClient(context.Background()) - - errCh := make(chan error, 1) - client.ErrCh = errCh - - go func() { - err := srv.StreamResources(client.ReplicationStream) - if err != nil { - errCh <- err - } - }() - p := writePeeringToBeDialed(t, store, 1, "my-peer") require.Empty(t, p.PeerID, "should be empty if being dialed") peerID := p.ID @@ -116,53 +103,73 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) { // Set the initial roots and CA configuration. _, _ = writeInitialRootsAndCA(t, store) - // Receive a subscription from a peer - sub := &pbpeerstream.ReplicationMessage{ - Payload: &pbpeerstream.ReplicationMessage_Request_{ - Request: &pbpeerstream.ReplicationMessage_Request{ - PeerID: peerID, - ResourceURL: pbpeerstream.TypeURLService, + client := NewMockClient(context.Background()) + + errCh := make(chan error, 1) + client.ErrCh = errCh + + go func() { + // Pass errors from server handler into ErrCh so that they can be seen by the client on Recv(). + // This matches gRPC's behavior when an error is returned by a server. + if err := srv.StreamResources(client.ReplicationStream); err != nil { + errCh <- err + } + }() + + // Receive a subscription from a peer. This message arrives while the + // server is a leader and should work. + testutil.RunStep(t, "send subscription request to leader and consume its two requests", func(t *testing.T) { + sub := &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Request_{ + Request: &pbpeerstream.ReplicationMessage_Request{ + PeerID: peerID, + ResourceURL: pbpeerstream.TypeURLExportedService, + }, }, - }, - } - err := client.Send(sub) - require.NoError(t, err) + } + err := client.Send(sub) + require.NoError(t, err) - msg, err := client.Recv() - require.NoError(t, err) - require.NotEmpty(t, msg) + msg1, err := client.Recv() + require.NoError(t, err) + require.NotEmpty(t, msg1) - receiveRoots, err := client.Recv() - require.NoError(t, err) - require.NotNil(t, receiveRoots.GetResponse()) - require.Equal(t, pbpeerstream.TypeURLRoots, receiveRoots.GetResponse().ResourceURL) + msg2, err := client.Recv() + require.NoError(t, err) + require.NotEmpty(t, msg2) + }) - input2 := &pbpeerstream.ReplicationMessage{ - Payload: &pbpeerstream.ReplicationMessage_Request_{ - Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, - ResponseNonce: "1", + // The ACK will be a new request but at this point the server is not the + // leader in the test and this should fail. + testutil.RunStep(t, "ack fails with non leader", func(t *testing.T) { + ack := &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Request_{ + Request: &pbpeerstream.ReplicationMessage_Request{ + ResourceURL: pbpeerstream.TypeURLExportedService, + ResponseNonce: "1", + }, }, - }, - } + } - err2 := client.Send(input2) - require.NoError(t, err2) + err := client.Send(ack) + require.NoError(t, err) - // expect error - msg2, err2 := client.Recv() - require.Nil(t, msg2) - require.Error(t, err2) - require.EqualError(t, err2, "rpc error: code = FailedPrecondition desc = node is not a leader anymore; cannot continue streaming") + // expect error + msg, err := client.Recv() + require.Nil(t, msg) + require.Error(t, err) + require.EqualError(t, err, "rpc error: code = FailedPrecondition desc = node is not a leader anymore; cannot continue streaming") - // expect a status error - st, ok := status.FromError(err2) - require.True(t, ok, "need to get back a grpc status error") - deets := st.Details() + // expect a status error + st, ok := status.FromError(err) + require.True(t, ok, "need to get back a grpc status error") - // expect a LeaderAddress message - exp := []interface{}{&pbpeerstream.LeaderAddress{Address: "expected:address"}} - prototest.AssertDeepEqual(t, exp, deets) + // expect a LeaderAddress message + expect := []interface{}{ + &pbpeerstream.LeaderAddress{Address: "expected:address"}, + } + prototest.AssertDeepEqual(t, expect, st.Details()) + }) } func TestStreamResources_Server_FirstRequest(t *testing.T) { @@ -204,7 +211,7 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) { input: &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Response_{ Response: &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, ResourceID: "api-service", Nonce: "2", }, @@ -251,7 +258,7 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) { Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ PeerID: "63b60245-c475-426b-b314-4588d210859d", - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, }, }, }, @@ -291,7 +298,7 @@ func TestStreamResources_Server_Terminate(t *testing.T) { receiveRoots, err := client.Recv() require.NoError(t, err) require.NotNil(t, receiveRoots.GetResponse()) - require.Equal(t, pbpeerstream.TypeURLRoots, receiveRoots.GetResponse().ResourceURL) + require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, receiveRoots.GetResponse().ResourceURL) testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) { retry.Run(t, func(r *retry.R) { @@ -347,7 +354,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }) }) - var sequence uint64 var lastSendSuccess time.Time testutil.RunStep(t, "ack tracked as success", func(t *testing.T) { @@ -355,18 +361,17 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ PeerID: peerID, - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, ResponseNonce: "1", // Acks do not have an Error populated in the request }, }, } + + lastSendSuccess = it.FutureNow(1) err := client.Send(ack) require.NoError(t, err) - sequence++ - - lastSendSuccess = it.base.Add(time.Duration(sequence) * time.Second).UTC() expect := Status{ Connected: true, @@ -388,7 +393,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ PeerID: peerID, - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, ResponseNonce: "2", Error: &pbstatus.Status{ Code: int32(code.Code_UNAVAILABLE), @@ -397,12 +402,12 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }, }, } + + lastNack = it.FutureNow(1) err := client.Send(nack) require.NoError(t, err) - sequence++ lastNackMsg = "client peer was unable to apply resource: bad bad not good" - lastNack = it.base.Add(time.Duration(sequence) * time.Second).UTC() expect := Status{ Connected: true, @@ -424,22 +429,22 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { resp := &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Response_{ Response: &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, ResourceID: "api", Nonce: "21", Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: makeAnyPB(t, &pbservice.IndexedCheckServiceNodes{}), + Resource: makeAnyPB(t, &pbpeerstream.ExportedService{}), }, }, } + lastRecvSuccess = it.FutureNow(1) err := client.Send(resp) require.NoError(t, err) - sequence++ expectRoots := &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Response_{ Response: &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLRoots, + ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle, ResourceID: "roots", Resource: makeAnyPB(t, &pbpeering.PeeringTrustBundle{ TrustDomain: connect.TestTrustDomain, @@ -460,15 +465,13 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expectAck := &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, ResponseNonce: "21", }, }, } prototest.AssertDeepEqual(t, expectAck, ack) - lastRecvSuccess = it.base.Add(time.Duration(sequence) * time.Second).UTC() - api := structs.NewServiceName("api", nil) expect := Status{ @@ -496,7 +499,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { resp := &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Response_{ Response: &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, ResourceID: "web", Nonce: "24", @@ -505,9 +508,9 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }, }, } + lastRecvError = it.FutureNow(1) err := client.Send(resp) require.NoError(t, err) - sequence++ ack, err := client.Recv() require.NoError(t, err) @@ -515,7 +518,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expectNack := &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, ResponseNonce: "24", Error: &pbstatus.Status{ Code: int32(code.Code_INVALID_ARGUMENT), @@ -526,7 +529,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { } prototest.AssertDeepEqual(t, expectNack, ack) - lastRecvError = it.base.Add(time.Duration(sequence) * time.Second).UTC() lastRecvErrorMsg = `unsupported operation: "OPERATION_UNSPECIFIED"` api := structs.NewServiceName("api", nil) @@ -552,14 +554,12 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }) testutil.RunStep(t, "client disconnect marks stream as disconnected", func(t *testing.T) { + lastRecvError = it.FutureNow(1) + disconnectTime := it.FutureNow(2) + lastRecvErrorMsg = io.EOF.Error() + client.Close() - sequence++ - lastRecvError := it.base.Add(time.Duration(sequence) * time.Second).UTC() - - sequence++ - disconnectTime := it.base.Add(time.Duration(sequence) * time.Second).UTC() - api := structs.NewServiceName("api", nil) expect := Status{ @@ -569,8 +569,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { LastNackMessage: lastNackMsg, DisconnectTime: disconnectTime, LastReceiveSuccess: lastRecvSuccess, - LastReceiveErrorMessage: io.EOF.Error(), LastReceiveError: lastRecvError, + LastReceiveErrorMessage: lastRecvErrorMsg, ImportedServices: map[string]struct{}{ api.String(): {}, }, @@ -654,35 +654,35 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { expectReplEvents(t, client, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { - require.Equal(t, pbpeerstream.TypeURLRoots, msg.GetResponse().ResourceURL) + require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL) // Roots tested in TestStreamResources_Server_CARootUpdates }, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { // no mongo instances exist - require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL) require.Equal(t, mongoSN, msg.GetResponse().ResourceID) require.Equal(t, pbpeerstream.Operation_OPERATION_DELETE, msg.GetResponse().Operation) require.Nil(t, msg.GetResponse().Resource) }, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { // proxies can't export because no mesh gateway exists yet - require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL) require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID) require.Equal(t, pbpeerstream.Operation_OPERATION_DELETE, msg.GetResponse().Operation) require.Nil(t, msg.GetResponse().Resource) }, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { - require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL) require.Equal(t, mysqlSN, msg.GetResponse().ResourceID) require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) - var nodes pbservice.IndexedCheckServiceNodes - require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) + var nodes pbpeerstream.ExportedService + require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes)) require.Len(t, nodes.Nodes, 1) }, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { // proxies can't export because no mesh gateway exists yet - require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL) require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID) require.Equal(t, pbpeerstream.Operation_OPERATION_DELETE, msg.GetResponse().Operation) require.Nil(t, msg.GetResponse().Resource) @@ -704,12 +704,12 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { expectReplEvents(t, client, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { - require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL) require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID) require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) - var nodes pbservice.IndexedCheckServiceNodes - require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) + var nodes pbpeerstream.ExportedService + require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes)) require.Len(t, nodes.Nodes, 1) pm := nodes.Nodes[0].Service.Connect.PeerMeta @@ -721,12 +721,12 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { require.Equal(t, spiffeIDs, pm.SpiffeID) }, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { - require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL) require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID) require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) - var nodes pbservice.IndexedCheckServiceNodes - require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) + var nodes pbpeerstream.ExportedService + require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes)) require.Len(t, nodes.Nodes, 1) pm := nodes.Nodes[0].Service.Connect.PeerMeta @@ -758,8 +758,8 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { require.Equal(r, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) require.Equal(r, mongo.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID) - var nodes pbservice.IndexedCheckServiceNodes - require.NoError(r, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) + var nodes pbpeerstream.ExportedService + require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes)) require.Len(r, nodes.Nodes, 1) }) }) @@ -824,12 +824,12 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) { testutil.RunStep(t, "initial CA Roots replication", func(t *testing.T) { expectReplEvents(t, client, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { - require.Equal(t, pbpeerstream.TypeURLRoots, msg.GetResponse().ResourceURL) + require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL) require.Equal(t, "roots", msg.GetResponse().ResourceID) require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) var trustBundle pbpeering.PeeringTrustBundle - require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &trustBundle)) + require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&trustBundle)) require.ElementsMatch(t, []string{rootA.RootCert}, trustBundle.RootPEMs) expect := connect.SpiffeIDSigningForCluster(clusterID).Host() @@ -853,12 +853,12 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) { expectReplEvents(t, client, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { - require.Equal(t, pbpeerstream.TypeURLRoots, msg.GetResponse().ResourceURL) + require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL) require.Equal(t, "roots", msg.GetResponse().ResourceID) require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) var trustBundle pbpeering.PeeringTrustBundle - require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &trustBundle)) + require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&trustBundle)) require.ElementsMatch(t, []string{rootB.RootCert, rootC.RootCert}, trustBundle.RootPEMs) expect := connect.SpiffeIDSigningForCluster(clusterID).Host() @@ -886,33 +886,57 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s } }() - // Issue a services subscription to server - init := &pbpeerstream.ReplicationMessage{ - Payload: &pbpeerstream.ReplicationMessage_Request_{ - Request: &pbpeerstream.ReplicationMessage_Request{ - PeerID: peerID, - ResourceURL: pbpeerstream.TypeURLService, + // Issue a services and roots subscription pair to server + for _, resourceURL := range []string{ + pbpeerstream.TypeURLExportedService, + pbpeerstream.TypeURLPeeringTrustBundle, + } { + init := &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Request_{ + Request: &pbpeerstream.ReplicationMessage_Request{ + PeerID: peerID, + ResourceURL: resourceURL, + }, }, - }, + } + require.NoError(t, client.Send(init)) } - require.NoError(t, client.Send(init)) - // Receive a services subscription from server - receivedSub, err := client.Recv() + // Receive a services and roots subscription request pair from server + receivedSub1, err := client.Recv() + require.NoError(t, err) + receivedSub2, err := client.Recv() require.NoError(t, err) - expect := &pbpeerstream.ReplicationMessage{ - Payload: &pbpeerstream.ReplicationMessage_Request_{ - Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, - // The PeerID field is only set for the messages coming FROM - // the establishing side and are going to be empty from the - // other side. - PeerID: "", + expect := []*pbpeerstream.ReplicationMessage{ + { + Payload: &pbpeerstream.ReplicationMessage_Request_{ + Request: &pbpeerstream.ReplicationMessage_Request{ + ResourceURL: pbpeerstream.TypeURLExportedService, + // The PeerID field is only set for the messages coming FROM + // the establishing side and are going to be empty from the + // other side. + PeerID: "", + }, + }, + }, + { + Payload: &pbpeerstream.ReplicationMessage_Request_{ + Request: &pbpeerstream.ReplicationMessage_Request{ + ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle, + // The PeerID field is only set for the messages coming FROM + // the establishing side and are going to be empty from the + // other side. + PeerID: "", + }, }, }, } - prototest.AssertDeepEqual(t, expect, receivedSub) + got := []*pbpeerstream.ReplicationMessage{ + receivedSub1, + receivedSub2, + } + prototest.AssertElementsMatch[*pbpeerstream.ReplicationMessage](t, expect, got) return client } @@ -1017,16 +1041,16 @@ func Test_processResponse_Validation(t *testing.T) { { name: "valid upsert", in: &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, ResourceID: "api", Nonce: "1", Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: makeAnyPB(t, &pbservice.IndexedCheckServiceNodes{}), + Resource: makeAnyPB(t, &pbpeerstream.ExportedService{}), }, expect: &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, ResponseNonce: "1", }, }, @@ -1036,7 +1060,7 @@ func Test_processResponse_Validation(t *testing.T) { { name: "valid delete", in: &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, ResourceID: "api", Nonce: "1", Operation: pbpeerstream.Operation_OPERATION_DELETE, @@ -1044,7 +1068,7 @@ func Test_processResponse_Validation(t *testing.T) { expect: &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, ResponseNonce: "1", }, }, @@ -1075,14 +1099,14 @@ func Test_processResponse_Validation(t *testing.T) { { name: "unknown operation", in: &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, Nonce: "1", Operation: pbpeerstream.Operation_OPERATION_UNSPECIFIED, }, expect: &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, ResponseNonce: "1", Error: &pbstatus.Status{ Code: int32(code.Code_INVALID_ARGUMENT), @@ -1096,14 +1120,14 @@ func Test_processResponse_Validation(t *testing.T) { { name: "out of range operation", in: &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, Nonce: "1", Operation: pbpeerstream.Operation(100000), }, expect: &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, ResponseNonce: "1", Error: &pbstatus.Status{ Code: int32(code.Code_INVALID_ARGUMENT), @@ -1163,8 +1187,8 @@ func writeInitialRootsAndCA(t *testing.T, store *state.Store) (string, *structs. return clusterID, rootA } -func makeAnyPB(t *testing.T, pb proto.Message) *any.Any { - any, err := ptypes.MarshalAny(pb) +func makeAnyPB(t *testing.T, pb proto.Message) *anypb.Any { + any, err := anypb.New(pb) require.NoError(t, err) return any } @@ -1255,7 +1279,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) { type testCase struct { name string seed []*structs.RegisterRequest - input *pbservice.IndexedCheckServiceNodes + input *pbpeerstream.ExportedService expect map[string]structs.CheckServiceNodes expectedImportedServicesCount int } @@ -1296,7 +1320,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) { } in := &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLService, + ResourceURL: pbpeerstream.TypeURLExportedService, ResourceID: apiSN.String(), Nonce: "1", Operation: op, @@ -1322,7 +1346,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) { tt := []testCase{ { name: "upsert two service instances to the same node", - input: &pbservice.IndexedCheckServiceNodes{ + input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ @@ -1454,7 +1478,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) { }, { name: "upsert two service instances to different nodes", - input: &pbservice.IndexedCheckServiceNodes{ + input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ @@ -1636,7 +1660,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) { }, }, }, - input: &pbservice.IndexedCheckServiceNodes{}, + input: &pbpeerstream.ExportedService{}, expect: map[string]structs.CheckServiceNodes{ "api": {}, }, @@ -1695,7 +1719,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) { }, }, // Nil input is for the "api" service. - input: &pbservice.IndexedCheckServiceNodes{}, + input: &pbpeerstream.ExportedService{}, expect: map[string]structs.CheckServiceNodes{ "api": {}, // Existing redis service was not affected by deletion. @@ -1761,7 +1785,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) { }, }, }, - input: &pbservice.IndexedCheckServiceNodes{ + input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ @@ -1856,7 +1880,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) { }, }, }, - input: &pbservice.IndexedCheckServiceNodes{ + input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ @@ -1991,7 +2015,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) { }, }, }, - input: &pbservice.IndexedCheckServiceNodes{ + input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ diff --git a/agent/grpc-external/services/peerstream/subscription_blocking.go b/agent/grpc-external/services/peerstream/subscription_blocking.go index c2720dcdb..d11e03d55 100644 --- a/agent/grpc-external/services/peerstream/subscription_blocking.go +++ b/agent/grpc-external/services/peerstream/subscription_blocking.go @@ -19,6 +19,13 @@ import ( // streaming machinery instead to be cheaper. func (m *subscriptionManager) notifyExportedServicesForPeerID(ctx context.Context, state *subscriptionState, peerID string) { + // Wait until this is subscribed-to. + select { + case <-m.serviceSubReady: + case <-ctx.Done(): + return + } + // syncSubscriptionsAndBlock ensures that the subscriptions to the subscription backend // match the list of services exported to the peer. m.syncViaBlockingQuery(ctx, "exported-services", func(ctx context.Context, store StateStore, ws memdb.WatchSet) (interface{}, error) { @@ -34,6 +41,13 @@ func (m *subscriptionManager) notifyExportedServicesForPeerID(ctx context.Contex // TODO: add a new streaming subscription type to list-by-kind-and-partition since we're getting evictions func (m *subscriptionManager) notifyMeshGatewaysForPartition(ctx context.Context, state *subscriptionState, partition string) { + // Wait until this is subscribed-to. + select { + case <-m.serviceSubReady: + case <-ctx.Done(): + return + } + m.syncViaBlockingQuery(ctx, "mesh-gateways", func(ctx context.Context, store StateStore, ws memdb.WatchSet) (interface{}, error) { // Fetch our current list of all mesh gateways. entMeta := structs.DefaultEnterpriseMetaInPartition(partition) diff --git a/agent/grpc-external/services/peerstream/subscription_manager.go b/agent/grpc-external/services/peerstream/subscription_manager.go index 33726a216..0c69b0338 100644 --- a/agent/grpc-external/services/peerstream/subscription_manager.go +++ b/agent/grpc-external/services/peerstream/subscription_manager.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbpeering" + "github.com/hashicorp/consul/proto/pbpeerstream" "github.com/hashicorp/consul/proto/pbservice" ) @@ -33,12 +34,14 @@ type SubscriptionBackend interface { // subscriptionManager handlers requests to subscribe to events from an events publisher. type subscriptionManager struct { - logger hclog.Logger - config Config - trustDomain string - viewStore MaterializedViewStore - backend SubscriptionBackend - getStore func() StateStore + logger hclog.Logger + config Config + trustDomain string + viewStore MaterializedViewStore + backend SubscriptionBackend + getStore func() StateStore + serviceSubReady <-chan struct{} + trustBundlesSubReady <-chan struct{} } // TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering. @@ -49,18 +52,21 @@ func newSubscriptionManager( trustDomain string, backend SubscriptionBackend, getStore func() StateStore, + remoteSubTracker *resourceSubscriptionTracker, ) *subscriptionManager { logger = logger.Named("subscriptions") store := submatview.NewStore(logger.Named("viewstore")) go store.Run(ctx) return &subscriptionManager{ - logger: logger, - config: config, - trustDomain: trustDomain, - viewStore: store, - backend: backend, - getStore: getStore, + logger: logger, + config: config, + trustDomain: trustDomain, + viewStore: store, + backend: backend, + getStore: getStore, + serviceSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLExportedService), + trustBundlesSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLPeeringTrustBundle), } } @@ -297,6 +303,13 @@ func (m *subscriptionManager) notifyRootCAUpdatesForPartition( updateCh chan<- cache.UpdateEvent, partition string, ) { + // Wait until this is subscribed-to. + select { + case <-m.trustBundlesSubReady: + case <-ctx.Done(): + return + } + var idx uint64 // TODO(peering): retry logic; fail past a threshold for { diff --git a/agent/grpc-external/services/peerstream/subscription_manager_test.go b/agent/grpc-external/services/peerstream/subscription_manager_test.go index cd12b2c22..1a5269817 100644 --- a/agent/grpc-external/services/peerstream/subscription_manager_test.go +++ b/agent/grpc-external/services/peerstream/subscription_manager_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbpeering" + "github.com/hashicorp/consul/proto/pbpeerstream" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/prototest" "github.com/hashicorp/consul/sdk/testutil" @@ -32,12 +33,16 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) { _, id := backend.ensurePeering(t, "my-peering") partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() + // Only configure a tracker for catalog events. + tracker := newResourceSubscriptionTracker() + tracker.Subscribe(pbpeerstream.TypeURLExportedService) + mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{ Datacenter: "dc1", ConnectEnabled: true, }, connect.TestTrustDomain, backend, func() StateStore { return backend.store - }) + }, tracker) subCh := mgr.subscribe(ctx, id, "my-peering", partition) var ( @@ -442,12 +447,16 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) { _, id := backend.ensurePeering(t, "my-peering") partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() + // Only configure a tracker for catalog events. + tracker := newResourceSubscriptionTracker() + tracker.Subscribe(pbpeerstream.TypeURLExportedService) + mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{ Datacenter: "dc1", ConnectEnabled: true, }, connect.TestTrustDomain, backend, func() StateStore { return backend.store - }) + }, tracker) subCh := mgr.subscribe(ctx, id, "my-peering", partition) // Register two services that are not yet exported @@ -571,21 +580,21 @@ func TestSubscriptionManager_CARoots(t *testing.T) { _, id := backend.ensurePeering(t, "my-peering") partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() + // Only configure a tracker for CA roots events. + tracker := newResourceSubscriptionTracker() + tracker.Subscribe(pbpeerstream.TypeURLPeeringTrustBundle) + mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{ Datacenter: "dc1", ConnectEnabled: true, }, connect.TestTrustDomain, backend, func() StateStore { return backend.store - }) + }, tracker) subCh := mgr.subscribe(ctx, id, "my-peering", partition) testutil.RunStep(t, "initial events contain trust bundle", func(t *testing.T) { // events are ordered so we can expect a deterministic list expectEvents(t, subCh, - func(t *testing.T, got cache.UpdateEvent) { - // mesh-gateway assertions are done in other tests - require.Equal(t, subMeshGateway+partition, got.CorrelationID) - }, func(t *testing.T, got cache.UpdateEvent) { require.Equal(t, subCARoot, got.CorrelationID) roots, ok := got.Result.(*pbpeering.PeeringTrustBundle) diff --git a/agent/grpc-external/services/peerstream/testing.go b/agent/grpc-external/services/peerstream/testing.go index 939c38dfa..1f85b2b78 100644 --- a/agent/grpc-external/services/peerstream/testing.go +++ b/agent/grpc-external/services/peerstream/testing.go @@ -2,6 +2,7 @@ package peerstream import ( "context" + "fmt" "io" "sync" "time" @@ -24,14 +25,7 @@ func (c *MockClient) Send(r *pbpeerstream.ReplicationMessage) error { } func (c *MockClient) Recv() (*pbpeerstream.ReplicationMessage, error) { - select { - case err := <-c.ErrCh: - return nil, err - case r := <-c.ReplicationStream.sendCh: - return r, nil - case <-time.After(10 * time.Millisecond): - return nil, io.EOF - } + return c.RecvWithTimeout(10 * time.Millisecond) } func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeerstream.ReplicationMessage, error) { @@ -61,7 +55,6 @@ type MockStream struct { recvCh chan *pbpeerstream.ReplicationMessage ctx context.Context - mu sync.Mutex } var _ pbpeerstream.PeerStreamService_StreamResourcesServer = (*MockStream)(nil) @@ -117,12 +110,37 @@ func (s *MockStream) SendHeader(metadata.MD) error { // SetTrailer implements grpc.ServerStream func (s *MockStream) SetTrailer(metadata.MD) {} +// incrementalTime is an artificial clock used during testing. For those +// scenarios you would pass around the method pointer for `Now` in places where +// you would be using `time.Now`. type incrementalTime struct { base time.Time next uint64 + mu sync.Mutex } +// Now advances the internal clock by 1 second and returns that value. func (t *incrementalTime) Now() time.Time { + t.mu.Lock() + defer t.mu.Unlock() t.next++ - return t.base.Add(time.Duration(t.next) * time.Second) + + dur := time.Duration(t.next) * time.Second + + return t.base.Add(dur) +} + +// FutureNow will return a given future value of the Now() function. +// The numerical argument indicates which future Now value you wanted. The +// value must be > 0. +func (t *incrementalTime) FutureNow(n int) time.Time { + if n < 1 { + panic(fmt.Sprintf("argument must be > 1 but was %d", n)) + } + t.mu.Lock() + defer t.mu.Unlock() + + dur := time.Duration(t.next+uint64(n)) * time.Second + + return t.base.Add(dur) } diff --git a/proto/pbpeerstream/convert.go b/proto/pbpeerstream/convert.go new file mode 100644 index 000000000..b0df6c42a --- /dev/null +++ b/proto/pbpeerstream/convert.go @@ -0,0 +1,25 @@ +package pbpeerstream + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/structs" + pbservice "github.com/hashicorp/consul/proto/pbservice" +) + +// CheckServiceNodesToStruct converts the contained CheckServiceNodes to their structs equivalent. +func (s *ExportedService) CheckServiceNodesToStruct() ([]structs.CheckServiceNode, error) { + if s == nil { + return nil, nil + } + + resp := make([]structs.CheckServiceNode, 0, len(s.Nodes)) + for _, pb := range s.Nodes { + instance, err := pbservice.CheckServiceNodeToStructs(pb) + if err != nil { + return resp, fmt.Errorf("failed to convert instance: %w", err) + } + resp = append(resp, *instance) + } + return resp, nil +} diff --git a/proto/pbpeerstream/peerstream.pb.binary.go b/proto/pbpeerstream/peerstream.pb.binary.go index 39dbdb814..c5d928949 100644 --- a/proto/pbpeerstream/peerstream.pb.binary.go +++ b/proto/pbpeerstream/peerstream.pb.binary.go @@ -56,3 +56,13 @@ func (msg *LeaderAddress) MarshalBinary() ([]byte, error) { func (msg *LeaderAddress) UnmarshalBinary(b []byte) error { return proto.Unmarshal(b, msg) } + +// MarshalBinary implements encoding.BinaryMarshaler +func (msg *ExportedService) MarshalBinary() ([]byte, error) { + return proto.Marshal(msg) +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler +func (msg *ExportedService) UnmarshalBinary(b []byte) error { + return proto.Unmarshal(b, msg) +} diff --git a/proto/pbpeerstream/peerstream.pb.go b/proto/pbpeerstream/peerstream.pb.go index e9da9cc56..8b71b4e8c 100644 --- a/proto/pbpeerstream/peerstream.pb.go +++ b/proto/pbpeerstream/peerstream.pb.go @@ -7,6 +7,7 @@ package pbpeerstream import ( + pbservice "github.com/hashicorp/consul/proto/pbservice" pbstatus "github.com/hashicorp/consul/proto/pbstatus" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" @@ -220,6 +221,54 @@ func (x *LeaderAddress) GetAddress() string { return "" } +// ExportedService is one of the types of data returned via peer stream replication. +type ExportedService struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Nodes []*pbservice.CheckServiceNode `protobuf:"bytes,1,rep,name=Nodes,proto3" json:"Nodes,omitempty"` +} + +func (x *ExportedService) Reset() { + *x = ExportedService{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ExportedService) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExportedService) ProtoMessage() {} + +func (x *ExportedService) ProtoReflect() protoreflect.Message { + mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExportedService.ProtoReflect.Descriptor instead. +func (*ExportedService) Descriptor() ([]byte, []int) { + return file_proto_pbpeerstream_peerstream_proto_rawDescGZIP(), []int{2} +} + +func (x *ExportedService) GetNodes() []*pbservice.CheckServiceNode { + if x != nil { + return x.Nodes + } + return nil +} + // A Request requests to subscribe to a resource of a given type. type ReplicationMessage_Request struct { state protoimpl.MessageState @@ -244,7 +293,7 @@ type ReplicationMessage_Request struct { func (x *ReplicationMessage_Request) Reset() { *x = ReplicationMessage_Request{} if protoimpl.UnsafeEnabled { - mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[2] + mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -257,7 +306,7 @@ func (x *ReplicationMessage_Request) String() string { func (*ReplicationMessage_Request) ProtoMessage() {} func (x *ReplicationMessage_Request) ProtoReflect() protoreflect.Message { - mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[2] + mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -323,7 +372,7 @@ type ReplicationMessage_Response struct { func (x *ReplicationMessage_Response) Reset() { *x = ReplicationMessage_Response{} if protoimpl.UnsafeEnabled { - mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[3] + mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -336,7 +385,7 @@ func (x *ReplicationMessage_Response) String() string { func (*ReplicationMessage_Response) ProtoMessage() {} func (x *ReplicationMessage_Response) ProtoReflect() protoreflect.Message { - mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[3] + mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -398,7 +447,7 @@ type ReplicationMessage_Terminated struct { func (x *ReplicationMessage_Terminated) Reset() { *x = ReplicationMessage_Terminated{} if protoimpl.UnsafeEnabled { - mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[4] + mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -411,7 +460,7 @@ func (x *ReplicationMessage_Terminated) String() string { func (*ReplicationMessage_Terminated) ProtoMessage() {} func (x *ReplicationMessage_Terminated) ProtoReflect() protoreflect.Message { - mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[4] + mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -436,92 +485,99 @@ var file_proto_pbpeerstream_peerstream_proto_rawDesc = []byte{ 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, - 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x22, 0xe5, 0x05, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x5c, 0x0a, 0x07, 0x72, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x40, 0x2e, 0x68, 0x61, - 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, - 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, - 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x5f, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x68, 0x61, 0x73, - 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, - 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, - 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x65, 0x0a, 0x0a, 0x74, 0x65, 0x72, - 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x43, 0x2e, - 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, - 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, - 0x65, 0x64, 0x48, 0x00, 0x52, 0x0a, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, - 0x1a, 0xa9, 0x01, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, - 0x50, 0x65, 0x65, 0x72, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x65, - 0x65, 0x72, 0x49, 0x44, 0x12, 0x24, 0x0a, 0x0d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x52, 0x65, - 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x12, 0x3e, 0x0a, 0x05, - 0x45, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x68, 0x61, - 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, - 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x53, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x1a, 0xe3, 0x01, 0x0a, - 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x4e, 0x6f, 0x6e, - 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x12, - 0x20, 0x0a, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, - 0x4c, 0x12, 0x1e, 0x0a, 0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x44, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, - 0x44, 0x12, 0x30, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, - 0x72, 0x63, 0x65, 0x12, 0x4d, 0x0a, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2f, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1a, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, + 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x1a, 0x1b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x73, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, + 0xe5, 0x05, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x5c, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x40, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, + 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, + 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x5f, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, - 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4f, 0x70, - 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x1a, 0x0c, 0x0a, 0x0a, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, - 0x42, 0x09, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x29, 0x0a, 0x0d, 0x4c, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x18, 0x0a, 0x07, - 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, - 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x2a, 0x52, 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x0a, 0x15, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, - 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x14, - 0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x50, 0x53, 0x45, - 0x52, 0x54, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, - 0x4e, 0x5f, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x10, 0x02, 0x32, 0x9f, 0x01, 0x0a, 0x11, 0x50, - 0x65, 0x65, 0x72, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x12, 0x89, 0x01, 0x0a, 0x0f, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x6f, 0x75, - 0x72, 0x63, 0x65, 0x73, 0x12, 0x38, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, + 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65, + 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x08, 0x72, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x65, 0x0a, 0x0a, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, + 0x74, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x43, 0x2e, 0x68, 0x61, 0x73, 0x68, + 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, + 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x2e, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x48, 0x00, + 0x52, 0x0a, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x1a, 0xa9, 0x01, 0x0a, + 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x65, 0x65, 0x72, + 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x65, 0x65, 0x72, 0x49, 0x44, + 0x12, 0x24, 0x0a, 0x0d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4e, 0x6f, 0x6e, 0x63, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x52, 0x65, 0x73, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x12, 0x3e, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f, + 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, + 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x1a, 0xe3, 0x01, 0x0a, 0x08, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x52, + 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x12, 0x1e, 0x0a, + 0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x44, 0x12, 0x30, 0x0a, + 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, + 0x4d, 0x0a, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x2f, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, + 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x0c, + 0x0a, 0x0a, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x42, 0x09, 0x0a, 0x07, + 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x29, 0x0a, 0x0d, 0x4c, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, + 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, + 0x73, 0x73, 0x22, 0x5c, 0x0a, 0x0f, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x64, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x49, 0x0a, 0x05, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x33, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, - 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, - 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x38, + 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x05, 0x4e, 0x6f, 0x64, 0x65, 0x73, + 0x2a, 0x52, 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x0a, + 0x15, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, + 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52, + 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x50, 0x53, 0x45, 0x52, 0x54, 0x10, 0x01, 0x12, 0x14, + 0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x44, 0x45, 0x4c, 0x45, + 0x54, 0x45, 0x10, 0x02, 0x32, 0x9f, 0x01, 0x0a, 0x11, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, 0x01, 0x0a, 0x0f, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x38, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x9f, 0x02, 0x0a, - 0x28, 0x63, 0x6f, 0x6d, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, - 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, - 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x42, 0x0f, 0x50, 0x65, 0x65, 0x72, 0x73, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2e, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, - 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, - 0x70, 0x62, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0xa2, 0x02, 0x04, 0x48, - 0x43, 0x49, 0x50, 0xaa, 0x02, 0x24, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, - 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, - 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0xca, 0x02, 0x24, 0x48, 0x61, 0x73, - 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49, 0x6e, - 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0xe2, 0x02, 0x30, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, - 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x50, 0x65, - 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, - 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x27, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, - 0x3a, 0x3a, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x3a, 0x3a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, - 0x61, 0x6c, 0x3a, 0x3a, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x38, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, + 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, + 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x9f, 0x02, 0x0a, 0x28, 0x63, 0x6f, 0x6d, 0x2e, 0x68, + 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x42, 0x0f, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, + 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, + 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x70, 0x65, 0x65, 0x72, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0xa2, 0x02, 0x04, 0x48, 0x43, 0x49, 0x50, 0xaa, 0x02, 0x24, + 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, + 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0xca, 0x02, 0x24, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, + 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, + 0x5c, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0xe2, 0x02, 0x30, 0x48, 0x61, + 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49, + 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, + 0x27, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x3a, 0x3a, 0x43, 0x6f, 0x6e, 0x73, + 0x75, 0x6c, 0x3a, 0x3a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x3a, 0x3a, 0x50, 0x65, + 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -537,31 +593,34 @@ func file_proto_pbpeerstream_peerstream_proto_rawDescGZIP() []byte { } var file_proto_pbpeerstream_peerstream_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_proto_pbpeerstream_peerstream_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_proto_pbpeerstream_peerstream_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_proto_pbpeerstream_peerstream_proto_goTypes = []interface{}{ (Operation)(0), // 0: hashicorp.consul.internal.peerstream.Operation (*ReplicationMessage)(nil), // 1: hashicorp.consul.internal.peerstream.ReplicationMessage (*LeaderAddress)(nil), // 2: hashicorp.consul.internal.peerstream.LeaderAddress - (*ReplicationMessage_Request)(nil), // 3: hashicorp.consul.internal.peerstream.ReplicationMessage.Request - (*ReplicationMessage_Response)(nil), // 4: hashicorp.consul.internal.peerstream.ReplicationMessage.Response - (*ReplicationMessage_Terminated)(nil), // 5: hashicorp.consul.internal.peerstream.ReplicationMessage.Terminated - (*pbstatus.Status)(nil), // 6: hashicorp.consul.internal.status.Status - (*anypb.Any)(nil), // 7: google.protobuf.Any + (*ExportedService)(nil), // 3: hashicorp.consul.internal.peerstream.ExportedService + (*ReplicationMessage_Request)(nil), // 4: hashicorp.consul.internal.peerstream.ReplicationMessage.Request + (*ReplicationMessage_Response)(nil), // 5: hashicorp.consul.internal.peerstream.ReplicationMessage.Response + (*ReplicationMessage_Terminated)(nil), // 6: hashicorp.consul.internal.peerstream.ReplicationMessage.Terminated + (*pbservice.CheckServiceNode)(nil), // 7: hashicorp.consul.internal.service.CheckServiceNode + (*pbstatus.Status)(nil), // 8: hashicorp.consul.internal.status.Status + (*anypb.Any)(nil), // 9: google.protobuf.Any } var file_proto_pbpeerstream_peerstream_proto_depIdxs = []int32{ - 3, // 0: hashicorp.consul.internal.peerstream.ReplicationMessage.request:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Request - 4, // 1: hashicorp.consul.internal.peerstream.ReplicationMessage.response:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Response - 5, // 2: hashicorp.consul.internal.peerstream.ReplicationMessage.terminated:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Terminated - 6, // 3: hashicorp.consul.internal.peerstream.ReplicationMessage.Request.Error:type_name -> hashicorp.consul.internal.status.Status - 7, // 4: hashicorp.consul.internal.peerstream.ReplicationMessage.Response.Resource:type_name -> google.protobuf.Any - 0, // 5: hashicorp.consul.internal.peerstream.ReplicationMessage.Response.operation:type_name -> hashicorp.consul.internal.peerstream.Operation - 1, // 6: hashicorp.consul.internal.peerstream.PeerStreamService.StreamResources:input_type -> hashicorp.consul.internal.peerstream.ReplicationMessage - 1, // 7: hashicorp.consul.internal.peerstream.PeerStreamService.StreamResources:output_type -> hashicorp.consul.internal.peerstream.ReplicationMessage - 7, // [7:8] is the sub-list for method output_type - 6, // [6:7] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 4, // 0: hashicorp.consul.internal.peerstream.ReplicationMessage.request:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Request + 5, // 1: hashicorp.consul.internal.peerstream.ReplicationMessage.response:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Response + 6, // 2: hashicorp.consul.internal.peerstream.ReplicationMessage.terminated:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Terminated + 7, // 3: hashicorp.consul.internal.peerstream.ExportedService.Nodes:type_name -> hashicorp.consul.internal.service.CheckServiceNode + 8, // 4: hashicorp.consul.internal.peerstream.ReplicationMessage.Request.Error:type_name -> hashicorp.consul.internal.status.Status + 9, // 5: hashicorp.consul.internal.peerstream.ReplicationMessage.Response.Resource:type_name -> google.protobuf.Any + 0, // 6: hashicorp.consul.internal.peerstream.ReplicationMessage.Response.operation:type_name -> hashicorp.consul.internal.peerstream.Operation + 1, // 7: hashicorp.consul.internal.peerstream.PeerStreamService.StreamResources:input_type -> hashicorp.consul.internal.peerstream.ReplicationMessage + 1, // 8: hashicorp.consul.internal.peerstream.PeerStreamService.StreamResources:output_type -> hashicorp.consul.internal.peerstream.ReplicationMessage + 8, // [8:9] is the sub-list for method output_type + 7, // [7:8] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_proto_pbpeerstream_peerstream_proto_init() } @@ -595,7 +654,7 @@ func file_proto_pbpeerstream_peerstream_proto_init() { } } file_proto_pbpeerstream_peerstream_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReplicationMessage_Request); i { + switch v := v.(*ExportedService); i { case 0: return &v.state case 1: @@ -607,7 +666,7 @@ func file_proto_pbpeerstream_peerstream_proto_init() { } } file_proto_pbpeerstream_peerstream_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReplicationMessage_Response); i { + switch v := v.(*ReplicationMessage_Request); i { case 0: return &v.state case 1: @@ -619,6 +678,18 @@ func file_proto_pbpeerstream_peerstream_proto_init() { } } file_proto_pbpeerstream_peerstream_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReplicationMessage_Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_pbpeerstream_peerstream_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ReplicationMessage_Terminated); i { case 0: return &v.state @@ -642,7 +713,7 @@ func file_proto_pbpeerstream_peerstream_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_proto_pbpeerstream_peerstream_proto_rawDesc, NumEnums: 1, - NumMessages: 5, + NumMessages: 6, NumExtensions: 0, NumServices: 1, }, diff --git a/proto/pbpeerstream/peerstream.proto b/proto/pbpeerstream/peerstream.proto index ee19a2df7..54be6e4b7 100644 --- a/proto/pbpeerstream/peerstream.proto +++ b/proto/pbpeerstream/peerstream.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package hashicorp.consul.internal.peerstream; import "google/protobuf/any.proto"; +import "proto/pbservice/node.proto"; // TODO(peering): Handle this some other way import "proto/pbstatus/status.proto"; @@ -89,3 +90,8 @@ message LeaderAddress { // address is an ip:port best effort hint at what could be the cluster leader's address string address = 1; } + +// ExportedService is one of the types of data returned via peer stream replication. +message ExportedService { + repeated hashicorp.consul.internal.service.CheckServiceNode Nodes = 1; +} diff --git a/proto/pbpeerstream/types.go b/proto/pbpeerstream/types.go index 52f32487d..df300cccd 100644 --- a/proto/pbpeerstream/types.go +++ b/proto/pbpeerstream/types.go @@ -1,10 +1,16 @@ package pbpeerstream const ( - TypeURLService = "type.googleapis.com/consul.api.Service" - TypeURLRoots = "type.googleapis.com/consul.api.CARoots" + apiTypePrefix = "type.googleapis.com/" + + TypeURLExportedService = apiTypePrefix + "hashicorp.consul.internal.peerstream.ExportedService" + TypeURLPeeringTrustBundle = apiTypePrefix + "hashicorp.consul.internal.peering.PeeringTrustBundle" ) func KnownTypeURL(s string) bool { - return s == TypeURLService || s == TypeURLRoots + switch s { + case TypeURLExportedService, TypeURLPeeringTrustBundle: + return true + } + return false } diff --git a/proto/pbservice/convert.go b/proto/pbservice/convert.go index 02895adf9..d5233dd99 100644 --- a/proto/pbservice/convert.go +++ b/proto/pbservice/convert.go @@ -1,8 +1,6 @@ package pbservice import ( - "fmt" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/types" @@ -44,23 +42,6 @@ func NewMapHeadersFromStructs(t map[string][]string) map[string]*HeaderValue { return s } -// CheckServiceNodesToStruct converts the contained CheckServiceNodes to their structs equivalent. -func (s *IndexedCheckServiceNodes) CheckServiceNodesToStruct() ([]structs.CheckServiceNode, error) { - if s == nil { - return nil, nil - } - - resp := make([]structs.CheckServiceNode, 0, len(s.Nodes)) - for _, pb := range s.Nodes { - instance, err := CheckServiceNodeToStructs(pb) - if err != nil { - return resp, fmt.Errorf("failed to convert instance: %w", err) - } - resp = append(resp, *instance) - } - return resp, nil -} - // TODO: use mog once it supports pointers and slices func CheckServiceNodeToStructs(s *CheckServiceNode) (*structs.CheckServiceNode, error) { if s == nil { diff --git a/proto/prototest/testing.go b/proto/prototest/testing.go index c196d77b3..275d8502b 100644 --- a/proto/prototest/testing.go +++ b/proto/prototest/testing.go @@ -16,3 +16,60 @@ func AssertDeepEqual(t testing.TB, x, y interface{}, opts ...cmp.Option) { t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff) } } + +// AssertElementsMatch asserts that the specified listX(array, slice...) is +// equal to specified listY(array, slice...) ignoring the order of the +// elements. If there are duplicate elements, the number of appearances of each +// of them in both lists should match. +// +// prototest.AssertElementsMatch(t, [1, 3, 2, 3], [1, 3, 3, 2]) +func AssertElementsMatch[V any]( + t testing.TB, listX, listY []V, opts ...cmp.Option, +) { + t.Helper() + + if len(listX) == 0 && len(listY) == 0 { + return + } + + opts = append(opts, protocmp.Transform()) + + // dump into a map keyed by sliceID + mapX := make(map[int]V) + for i, val := range listX { + mapX[i] = val + } + + mapY := make(map[int]V) + for i, val := range listY { + mapY[i] = val + } + + var outX, outY []V + for i, itemX := range mapX { + for j, itemY := range mapY { + if diff := cmp.Diff(itemX, itemY, opts...); diff == "" { + outX = append(outX, itemX) + outY = append(outY, itemY) + delete(mapX, i) + delete(mapY, j) + } + } + } + + if len(outX) == len(outY) && len(outX) == len(listX) { + return // matches + } + + // dump remainder into the slice so we can generate a useful error + for _, itemX := range mapX { + outX = append(outX, itemX) + } + for _, itemY := range mapY { + outY = append(outY, itemY) + } + + if diff := cmp.Diff(outX, outY, opts...); diff != "" { + t.Fatalf("assertion failed: slices do not have matching elements\n--- expected\n+++ actual\n%v", diff) + } +}