peerstream: require a resource subscription to receive updates of that type (#13767)

This mimics xDS's discovery protocol where you must request a resource
explicitly for the exporting side to send those events to you.

As part of this I aligned the overall ResourceURL with the TypeURL that
gets embedded into the encoded protobuf Any construct. The
CheckServiceNodes is now wrapped in a better named "ExportedService"
struct now.
This commit is contained in:
R.B. Boyer 2022-07-15 15:03:40 -05:00 committed by GitHub
parent 7da65c02a6
commit bec4df0679
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 736 additions and 345 deletions

View File

@ -5,10 +5,9 @@ import (
"fmt"
"strings"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/go-hclog"
"google.golang.org/genproto/googleapis/rpc/code"
newproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/agent/cache"
@ -39,7 +38,16 @@ func makeServiceResponse(
logger hclog.Logger,
update cache.UpdateEvent,
) (*pbpeerstream.ReplicationMessage_Response, error) {
any, csn, err := marshalToProtoAny[*pbservice.IndexedCheckServiceNodes](update.Result)
csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
if !ok {
return nil, fmt.Errorf("invalid type for service response: %T", update.Result)
}
export := &pbpeerstream.ExportedService{
Nodes: csn.Nodes,
}
any, err := anypb.New(export)
if err != nil {
return nil, fmt.Errorf("failed to marshal: %w", err)
}
@ -53,9 +61,9 @@ func makeServiceResponse(
//
// We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that.
// Case #1 is a no-op for the importing peer.
if len(csn.Nodes) == 0 {
if len(export.Nodes) == 0 {
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: serviceName,
@ -65,7 +73,7 @@ func makeServiceResponse(
// If there are nodes in the response, we push them as an UPSERT operation.
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: serviceName,
@ -84,7 +92,7 @@ func makeCARootsResponse(
}
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLRoots,
ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: "roots",
@ -97,13 +105,13 @@ func makeCARootsResponse(
// the protobuf.Any type, the asserted T type, and any errors
// during marshalling or type assertion.
// `in` MUST be of type T or it returns an error.
func marshalToProtoAny[T proto.Message](in any) (*anypb.Any, T, error) {
func marshalToProtoAny[T newproto.Message](in any) (*anypb.Any, T, error) {
typ, ok := in.(T)
if !ok {
var outType T
return nil, typ, fmt.Errorf("input type is not %T: %T", outType, in)
}
any, err := ptypes.MarshalAny(typ)
any, err := anypb.New(typ)
if err != nil {
return nil, typ, err
}
@ -186,20 +194,23 @@ func (s *Server) handleUpsert(
resource *anypb.Any,
logger hclog.Logger,
) error {
if resource.TypeUrl != resourceURL {
return fmt.Errorf("mismatched resourceURL %q and Any typeUrl %q", resourceURL, resource.TypeUrl)
}
switch resourceURL {
case pbpeerstream.TypeURLService:
case pbpeerstream.TypeURLExportedService:
sn := structs.ServiceNameFromString(resourceID)
sn.OverridePartition(partition)
csn := &pbservice.IndexedCheckServiceNodes{}
if err := ptypes.UnmarshalAny(resource, csn); err != nil {
export := &pbpeerstream.ExportedService{}
if err := resource.UnmarshalTo(export); err != nil {
return fmt.Errorf("failed to unmarshal resource: %w", err)
}
err := s.handleUpdateService(peerName, partition, sn, csn)
err := s.handleUpdateService(peerName, partition, sn, export)
if err != nil {
logger.Error("did not increment imported services count", "service_name", sn.String(), "error", err)
return err
return fmt.Errorf("did not increment imported services count for service=%q: %w", sn.String(), err)
}
logger.Trace("incrementing imported services count", "service_name", sn.String())
@ -207,9 +218,9 @@ func (s *Server) handleUpsert(
return nil
case pbpeerstream.TypeURLRoots:
case pbpeerstream.TypeURLPeeringTrustBundle:
roots := &pbpeering.PeeringTrustBundle{}
if err := ptypes.UnmarshalAny(resource, roots); err != nil {
if err := resource.UnmarshalTo(roots); err != nil {
return fmt.Errorf("failed to unmarshal resource: %w", err)
}
@ -232,7 +243,7 @@ func (s *Server) handleUpdateService(
peerName string,
partition string,
sn structs.ServiceName,
pbNodes *pbservice.IndexedCheckServiceNodes,
export *pbpeerstream.ExportedService,
) error {
// Capture instances in the state store for reconciliation later.
_, storedInstances, err := s.GetStore().CheckServiceNodes(nil, sn.Name, &sn.EnterpriseMeta, peerName)
@ -240,7 +251,7 @@ func (s *Server) handleUpdateService(
return fmt.Errorf("failed to read imported services: %w", err)
}
structsNodes, err := pbNodes.CheckServiceNodesToStruct()
structsNodes, err := export.CheckServiceNodesToStruct()
if err != nil {
return fmt.Errorf("failed to convert protobuf instances to structs: %w", err)
}
@ -444,7 +455,7 @@ func (s *Server) handleDelete(
logger hclog.Logger,
) error {
switch resourceURL {
case pbpeerstream.TypeURLService:
case pbpeerstream.TypeURLExportedService:
sn := structs.ServiceNameFromString(resourceID)
sn.OverridePartition(partition)

View File

@ -9,7 +9,6 @@ import (
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/go-hclog"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
@ -99,11 +98,12 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes
}
streamReq := HandleStreamRequest{
LocalID: p.ID,
RemoteID: "",
PeerName: p.Name,
Partition: p.Partition,
Stream: stream,
LocalID: p.ID,
RemoteID: "",
PeerName: p.Name,
Partition: p.Partition,
InitialResourceURL: req.ResourceURL,
Stream: stream,
}
err = s.HandleStream(streamReq)
// A nil error indicates that the peering was deleted and the stream needs to be gracefully shutdown.
@ -129,6 +129,9 @@ type HandleStreamRequest struct {
// Partition is the local partition associated with the peer.
Partition string
// InitialResourceURL is the ResourceURL from the initial Request.
InitialResourceURL string
// Stream is the open stream to the peer cluster.
Stream BidirectionalStream
}
@ -183,6 +186,13 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
}
}
remoteSubTracker := newResourceSubscriptionTracker()
if streamReq.InitialResourceURL != "" {
if remoteSubTracker.Subscribe(streamReq.InitialResourceURL) {
logger.Info("subscribing to resource type", "resourceURL", streamReq.InitialResourceURL)
}
}
mgr := newSubscriptionManager(
streamReq.Stream.Context(),
logger,
@ -190,24 +200,31 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
trustDomain,
s.Backend,
s.GetStore,
remoteSubTracker,
)
subCh := mgr.subscribe(streamReq.Stream.Context(), streamReq.LocalID, streamReq.PeerName, streamReq.Partition)
sub := makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService,
PeerID: streamReq.RemoteID,
})
logTraceSend(logger, sub)
// Subscribe to all relevant resource types.
for _, resourceURL := range []string{
pbpeerstream.TypeURLExportedService,
pbpeerstream.TypeURLPeeringTrustBundle,
} {
sub := makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{
ResourceURL: resourceURL,
PeerID: streamReq.RemoteID,
})
logTraceSend(logger, sub)
if err := streamReq.Stream.Send(sub); err != nil {
if err == io.EOF {
logger.Info("stream ended by peer")
status.TrackReceiveError(err.Error())
return nil
if err := streamReq.Stream.Send(sub); err != nil {
if err == io.EOF {
logger.Info("stream ended by peer")
status.TrackReceiveError(err.Error())
return nil
}
// TODO(peering) Test error handling in calls to Send/Recv
status.TrackSendError(err.Error())
return fmt.Errorf("failed to send subscription for %q to stream: %w", resourceURL, err)
}
// TODO(peering) Test error handling in calls to Send/Recv
status.TrackSendError(err.Error())
return fmt.Errorf("failed to send to stream: %v", err)
}
// TODO(peering): Should this be buffered?
@ -289,17 +306,86 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
if !pbpeerstream.KnownTypeURL(req.ResourceURL) {
return grpcstatus.Errorf(codes.InvalidArgument, "subscription request to unknown resource URL: %s", req.ResourceURL)
}
switch {
case req.ResponseNonce == "":
// TODO(peering): This can happen on a client peer since they don't try to receive subscriptions before entering HandleStream.
// Should change that behavior or only allow it that one time.
case req.Error != nil && (req.Error.Code != int32(code.Code_OK) || req.Error.Message != ""):
// There are different formats of requests depending upon where in the stream lifecycle we are.
//
// 1. Initial Request: This is the first request being received
// FROM the establishing peer. This is handled specially in
// (*Server).StreamResources BEFORE calling
// (*Server).HandleStream. This takes care of determining what
// the PeerID is for the stream. This is ALSO treated as (2) below.
//
// 2. Subscription Request: This is the first request for a
// given ResourceURL within a stream. The Initial Request (1)
// is always one of these as well.
//
// These must contain a valid ResourceURL with no Error or
// ResponseNonce set.
//
// It is valid to subscribe to the same ResourceURL twice
// within the lifetime of a stream, but all duplicate
// subscriptions are treated as no-ops upon receipt.
//
// 3. ACK Request: This is the message sent in reaction to an
// earlier Response to indicate that the response was processed
// by the other side successfully.
//
// These must contain a ResponseNonce and no Error.
//
// 4. NACK Request: This is the message sent in reaction to an
// earlier Response to indicate that the response was NOT
// processed by the other side successfully.
//
// These must contain a ResponseNonce and an Error.
//
if !remoteSubTracker.IsSubscribed(req.ResourceURL) {
// This must be a new subscription request to add a new
// resource type, vet it like a new request.
if !streamReq.WasDialed() {
if req.PeerID != "" && req.PeerID != streamReq.RemoteID {
// Not necessary after the first request from the dialer,
// but if provided must match.
return grpcstatus.Errorf(codes.InvalidArgument,
"initial subscription requests for a resource type must have consistent PeerID values: got=%q expected=%q",
req.PeerID,
streamReq.RemoteID,
)
}
}
if req.ResponseNonce != "" {
return grpcstatus.Error(codes.InvalidArgument, "initial subscription requests for a resource type must not contain a nonce")
}
if req.Error != nil {
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request for a resource type must not contain an error")
}
if remoteSubTracker.Subscribe(req.ResourceURL) {
logger.Info("subscribing to resource type", "resourceURL", req.ResourceURL)
}
status.TrackAck()
continue
}
// At this point we have a valid ResourceURL and we are subscribed to it.
switch {
case req.ResponseNonce == "" && req.Error != nil:
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request for a resource type must not contain an error")
case req.ResponseNonce != "" && req.Error == nil: // ACK
// TODO(peering): handle ACK fully
status.TrackAck()
case req.ResponseNonce != "" && req.Error != nil: // NACK
// TODO(peering): handle NACK fully
logger.Warn("client peer was unable to apply resource", "code", req.Error.Code, "error", req.Error.Message)
status.TrackNack(fmt.Sprintf("client peer was unable to apply resource: %s", req.Error.Message))
default:
status.TrackAck()
// This branch might be dead code, but it could also happen
// during a stray 're-subscribe' so just ignore the
// message.
}
continue
@ -425,3 +511,63 @@ func logTraceProto(logger hclog.Logger, pb proto.Message, received bool) {
logger.Trace("replication message", "direction", dir, "protobuf", out)
}
// resourceSubscriptionTracker is used to keep track of the ResourceURLs that a
// stream has subscribed to and can notify you when a subscription comes in by
// closing the channels returned by SubscribedChan.
type resourceSubscriptionTracker struct {
// notifierMap keeps track of a notification channel for each resourceURL.
// Keys may exist in here even when they do not exist in 'subscribed' as
// calling SubscribedChan has to possibly create and and hand out a
// notification channel in advance of any notification.
notifierMap map[string]chan struct{}
// subscribed is a set that keeps track of resourceURLs that are currently
// subscribed to. Keys are never deleted. If a key is present in this map
// it is also present in 'notifierMap'.
subscribed map[string]struct{}
}
func newResourceSubscriptionTracker() *resourceSubscriptionTracker {
return &resourceSubscriptionTracker{
subscribed: make(map[string]struct{}),
notifierMap: make(map[string]chan struct{}),
}
}
// IsSubscribed returns true if the given ResourceURL has an active subscription.
func (t *resourceSubscriptionTracker) IsSubscribed(resourceURL string) bool {
_, ok := t.subscribed[resourceURL]
return ok
}
// Subscribe subscribes to the given ResourceURL. It will return true if this
// was the FIRST time a subscription occurred. It will also close the
// notification channel associated with this ResourceURL.
func (t *resourceSubscriptionTracker) Subscribe(resourceURL string) bool {
if _, ok := t.subscribed[resourceURL]; ok {
return false
}
t.subscribed[resourceURL] = struct{}{}
// and notify
ch := t.ensureNotifierChan(resourceURL)
close(ch)
return true
}
// SubscribedChan returns a channel that will be closed when the ResourceURL is
// subscribed using the Subscribe method.
func (t *resourceSubscriptionTracker) SubscribedChan(resourceURL string) <-chan struct{} {
return t.ensureNotifierChan(resourceURL)
}
func (t *resourceSubscriptionTracker) ensureNotifierChan(resourceURL string) chan struct{} {
if ch, ok := t.notifierMap[resourceURL]; ok {
return ch
}
ch := make(chan struct{})
t.notifierMap[resourceURL] = ch
return ch
}

View File

@ -12,15 +12,14 @@ import (
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
@ -97,18 +96,6 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
backend.leaderAddr = "expected:address"
})
client := NewMockClient(context.Background())
errCh := make(chan error, 1)
client.ErrCh = errCh
go func() {
err := srv.StreamResources(client.ReplicationStream)
if err != nil {
errCh <- err
}
}()
p := writePeeringToBeDialed(t, store, 1, "my-peer")
require.Empty(t, p.PeerID, "should be empty if being dialed")
peerID := p.ID
@ -116,53 +103,73 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
// Set the initial roots and CA configuration.
_, _ = writeInitialRootsAndCA(t, store)
// Receive a subscription from a peer
sub := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
PeerID: peerID,
ResourceURL: pbpeerstream.TypeURLService,
client := NewMockClient(context.Background())
errCh := make(chan error, 1)
client.ErrCh = errCh
go func() {
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
// This matches gRPC's behavior when an error is returned by a server.
if err := srv.StreamResources(client.ReplicationStream); err != nil {
errCh <- err
}
}()
// Receive a subscription from a peer. This message arrives while the
// server is a leader and should work.
testutil.RunStep(t, "send subscription request to leader and consume its two requests", func(t *testing.T) {
sub := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
PeerID: peerID,
ResourceURL: pbpeerstream.TypeURLExportedService,
},
},
},
}
err := client.Send(sub)
require.NoError(t, err)
}
err := client.Send(sub)
require.NoError(t, err)
msg, err := client.Recv()
require.NoError(t, err)
require.NotEmpty(t, msg)
msg1, err := client.Recv()
require.NoError(t, err)
require.NotEmpty(t, msg1)
receiveRoots, err := client.Recv()
require.NoError(t, err)
require.NotNil(t, receiveRoots.GetResponse())
require.Equal(t, pbpeerstream.TypeURLRoots, receiveRoots.GetResponse().ResourceURL)
msg2, err := client.Recv()
require.NoError(t, err)
require.NotEmpty(t, msg2)
})
input2 := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService,
ResponseNonce: "1",
// The ACK will be a new request but at this point the server is not the
// leader in the test and this should fail.
testutil.RunStep(t, "ack fails with non leader", func(t *testing.T) {
ack := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "1",
},
},
},
}
}
err2 := client.Send(input2)
require.NoError(t, err2)
err := client.Send(ack)
require.NoError(t, err)
// expect error
msg2, err2 := client.Recv()
require.Nil(t, msg2)
require.Error(t, err2)
require.EqualError(t, err2, "rpc error: code = FailedPrecondition desc = node is not a leader anymore; cannot continue streaming")
// expect error
msg, err := client.Recv()
require.Nil(t, msg)
require.Error(t, err)
require.EqualError(t, err, "rpc error: code = FailedPrecondition desc = node is not a leader anymore; cannot continue streaming")
// expect a status error
st, ok := status.FromError(err2)
require.True(t, ok, "need to get back a grpc status error")
deets := st.Details()
// expect a status error
st, ok := status.FromError(err)
require.True(t, ok, "need to get back a grpc status error")
// expect a LeaderAddress message
exp := []interface{}{&pbpeerstream.LeaderAddress{Address: "expected:address"}}
prototest.AssertDeepEqual(t, exp, deets)
// expect a LeaderAddress message
expect := []interface{}{
&pbpeerstream.LeaderAddress{Address: "expected:address"},
}
prototest.AssertDeepEqual(t, expect, st.Details())
})
}
func TestStreamResources_Server_FirstRequest(t *testing.T) {
@ -204,7 +211,7 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) {
input: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Response_{
Response: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: "api-service",
Nonce: "2",
},
@ -251,7 +258,7 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) {
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
PeerID: "63b60245-c475-426b-b314-4588d210859d",
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
},
},
},
@ -291,7 +298,7 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
receiveRoots, err := client.Recv()
require.NoError(t, err)
require.NotNil(t, receiveRoots.GetResponse())
require.Equal(t, pbpeerstream.TypeURLRoots, receiveRoots.GetResponse().ResourceURL)
require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, receiveRoots.GetResponse().ResourceURL)
testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) {
retry.Run(t, func(r *retry.R) {
@ -347,7 +354,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
})
})
var sequence uint64
var lastSendSuccess time.Time
testutil.RunStep(t, "ack tracked as success", func(t *testing.T) {
@ -355,18 +361,17 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
PeerID: peerID,
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "1",
// Acks do not have an Error populated in the request
},
},
}
lastSendSuccess = it.FutureNow(1)
err := client.Send(ack)
require.NoError(t, err)
sequence++
lastSendSuccess = it.base.Add(time.Duration(sequence) * time.Second).UTC()
expect := Status{
Connected: true,
@ -388,7 +393,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
PeerID: peerID,
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "2",
Error: &pbstatus.Status{
Code: int32(code.Code_UNAVAILABLE),
@ -397,12 +402,12 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
},
},
}
lastNack = it.FutureNow(1)
err := client.Send(nack)
require.NoError(t, err)
sequence++
lastNackMsg = "client peer was unable to apply resource: bad bad not good"
lastNack = it.base.Add(time.Duration(sequence) * time.Second).UTC()
expect := Status{
Connected: true,
@ -424,22 +429,22 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
resp := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Response_{
Response: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: "api",
Nonce: "21",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: makeAnyPB(t, &pbservice.IndexedCheckServiceNodes{}),
Resource: makeAnyPB(t, &pbpeerstream.ExportedService{}),
},
},
}
lastRecvSuccess = it.FutureNow(1)
err := client.Send(resp)
require.NoError(t, err)
sequence++
expectRoots := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Response_{
Response: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLRoots,
ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle,
ResourceID: "roots",
Resource: makeAnyPB(t, &pbpeering.PeeringTrustBundle{
TrustDomain: connect.TestTrustDomain,
@ -460,15 +465,13 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expectAck := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "21",
},
},
}
prototest.AssertDeepEqual(t, expectAck, ack)
lastRecvSuccess = it.base.Add(time.Duration(sequence) * time.Second).UTC()
api := structs.NewServiceName("api", nil)
expect := Status{
@ -496,7 +499,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
resp := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Response_{
Response: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: "web",
Nonce: "24",
@ -505,9 +508,9 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
},
},
}
lastRecvError = it.FutureNow(1)
err := client.Send(resp)
require.NoError(t, err)
sequence++
ack, err := client.Recv()
require.NoError(t, err)
@ -515,7 +518,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expectNack := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "24",
Error: &pbstatus.Status{
Code: int32(code.Code_INVALID_ARGUMENT),
@ -526,7 +529,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}
prototest.AssertDeepEqual(t, expectNack, ack)
lastRecvError = it.base.Add(time.Duration(sequence) * time.Second).UTC()
lastRecvErrorMsg = `unsupported operation: "OPERATION_UNSPECIFIED"`
api := structs.NewServiceName("api", nil)
@ -552,14 +554,12 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
})
testutil.RunStep(t, "client disconnect marks stream as disconnected", func(t *testing.T) {
lastRecvError = it.FutureNow(1)
disconnectTime := it.FutureNow(2)
lastRecvErrorMsg = io.EOF.Error()
client.Close()
sequence++
lastRecvError := it.base.Add(time.Duration(sequence) * time.Second).UTC()
sequence++
disconnectTime := it.base.Add(time.Duration(sequence) * time.Second).UTC()
api := structs.NewServiceName("api", nil)
expect := Status{
@ -569,8 +569,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
LastNackMessage: lastNackMsg,
DisconnectTime: disconnectTime,
LastReceiveSuccess: lastRecvSuccess,
LastReceiveErrorMessage: io.EOF.Error(),
LastReceiveError: lastRecvError,
LastReceiveErrorMessage: lastRecvErrorMsg,
ImportedServices: map[string]struct{}{
api.String(): {},
},
@ -654,35 +654,35 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
expectReplEvents(t, client,
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLRoots, msg.GetResponse().ResourceURL)
require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL)
// Roots tested in TestStreamResources_Server_CARootUpdates
},
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
// no mongo instances exist
require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoSN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource)
},
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
// proxies can't export because no mesh gateway exists yet
require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource)
},
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlSN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var nodes pbservice.IndexedCheckServiceNodes
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
var nodes pbpeerstream.ExportedService
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
require.Len(t, nodes.Nodes, 1)
},
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
// proxies can't export because no mesh gateway exists yet
require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource)
@ -704,12 +704,12 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
expectReplEvents(t, client,
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var nodes pbservice.IndexedCheckServiceNodes
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
var nodes pbpeerstream.ExportedService
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
require.Len(t, nodes.Nodes, 1)
pm := nodes.Nodes[0].Service.Connect.PeerMeta
@ -721,12 +721,12 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
require.Equal(t, spiffeIDs, pm.SpiffeID)
},
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var nodes pbservice.IndexedCheckServiceNodes
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
var nodes pbpeerstream.ExportedService
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
require.Len(t, nodes.Nodes, 1)
pm := nodes.Nodes[0].Service.Connect.PeerMeta
@ -758,8 +758,8 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
require.Equal(r, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
require.Equal(r, mongo.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID)
var nodes pbservice.IndexedCheckServiceNodes
require.NoError(r, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
var nodes pbpeerstream.ExportedService
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
require.Len(r, nodes.Nodes, 1)
})
})
@ -824,12 +824,12 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) {
testutil.RunStep(t, "initial CA Roots replication", func(t *testing.T) {
expectReplEvents(t, client,
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLRoots, msg.GetResponse().ResourceURL)
require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL)
require.Equal(t, "roots", msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var trustBundle pbpeering.PeeringTrustBundle
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &trustBundle))
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&trustBundle))
require.ElementsMatch(t, []string{rootA.RootCert}, trustBundle.RootPEMs)
expect := connect.SpiffeIDSigningForCluster(clusterID).Host()
@ -853,12 +853,12 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) {
expectReplEvents(t, client,
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLRoots, msg.GetResponse().ResourceURL)
require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL)
require.Equal(t, "roots", msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var trustBundle pbpeering.PeeringTrustBundle
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &trustBundle))
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&trustBundle))
require.ElementsMatch(t, []string{rootB.RootCert, rootC.RootCert}, trustBundle.RootPEMs)
expect := connect.SpiffeIDSigningForCluster(clusterID).Host()
@ -886,33 +886,57 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s
}
}()
// Issue a services subscription to server
init := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
PeerID: peerID,
ResourceURL: pbpeerstream.TypeURLService,
// Issue a services and roots subscription pair to server
for _, resourceURL := range []string{
pbpeerstream.TypeURLExportedService,
pbpeerstream.TypeURLPeeringTrustBundle,
} {
init := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
PeerID: peerID,
ResourceURL: resourceURL,
},
},
},
}
require.NoError(t, client.Send(init))
}
require.NoError(t, client.Send(init))
// Receive a services subscription from server
receivedSub, err := client.Recv()
// Receive a services and roots subscription request pair from server
receivedSub1, err := client.Recv()
require.NoError(t, err)
receivedSub2, err := client.Recv()
require.NoError(t, err)
expect := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService,
// The PeerID field is only set for the messages coming FROM
// the establishing side and are going to be empty from the
// other side.
PeerID: "",
expect := []*pbpeerstream.ReplicationMessage{
{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLExportedService,
// The PeerID field is only set for the messages coming FROM
// the establishing side and are going to be empty from the
// other side.
PeerID: "",
},
},
},
{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle,
// The PeerID field is only set for the messages coming FROM
// the establishing side and are going to be empty from the
// other side.
PeerID: "",
},
},
},
}
prototest.AssertDeepEqual(t, expect, receivedSub)
got := []*pbpeerstream.ReplicationMessage{
receivedSub1,
receivedSub2,
}
prototest.AssertElementsMatch[*pbpeerstream.ReplicationMessage](t, expect, got)
return client
}
@ -1017,16 +1041,16 @@ func Test_processResponse_Validation(t *testing.T) {
{
name: "valid upsert",
in: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: "api",
Nonce: "1",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: makeAnyPB(t, &pbservice.IndexedCheckServiceNodes{}),
Resource: makeAnyPB(t, &pbpeerstream.ExportedService{}),
},
expect: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "1",
},
},
@ -1036,7 +1060,7 @@ func Test_processResponse_Validation(t *testing.T) {
{
name: "valid delete",
in: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: "api",
Nonce: "1",
Operation: pbpeerstream.Operation_OPERATION_DELETE,
@ -1044,7 +1068,7 @@ func Test_processResponse_Validation(t *testing.T) {
expect: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "1",
},
},
@ -1075,14 +1099,14 @@ func Test_processResponse_Validation(t *testing.T) {
{
name: "unknown operation",
in: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
Nonce: "1",
Operation: pbpeerstream.Operation_OPERATION_UNSPECIFIED,
},
expect: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "1",
Error: &pbstatus.Status{
Code: int32(code.Code_INVALID_ARGUMENT),
@ -1096,14 +1120,14 @@ func Test_processResponse_Validation(t *testing.T) {
{
name: "out of range operation",
in: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
Nonce: "1",
Operation: pbpeerstream.Operation(100000),
},
expect: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "1",
Error: &pbstatus.Status{
Code: int32(code.Code_INVALID_ARGUMENT),
@ -1163,8 +1187,8 @@ func writeInitialRootsAndCA(t *testing.T, store *state.Store) (string, *structs.
return clusterID, rootA
}
func makeAnyPB(t *testing.T, pb proto.Message) *any.Any {
any, err := ptypes.MarshalAny(pb)
func makeAnyPB(t *testing.T, pb proto.Message) *anypb.Any {
any, err := anypb.New(pb)
require.NoError(t, err)
return any
}
@ -1255,7 +1279,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
type testCase struct {
name string
seed []*structs.RegisterRequest
input *pbservice.IndexedCheckServiceNodes
input *pbpeerstream.ExportedService
expect map[string]structs.CheckServiceNodes
expectedImportedServicesCount int
}
@ -1296,7 +1320,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
}
in := &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService,
ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: apiSN.String(),
Nonce: "1",
Operation: op,
@ -1322,7 +1346,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
tt := []testCase{
{
name: "upsert two service instances to the same node",
input: &pbservice.IndexedCheckServiceNodes{
input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{
{
Node: &pbservice.Node{
@ -1454,7 +1478,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
},
{
name: "upsert two service instances to different nodes",
input: &pbservice.IndexedCheckServiceNodes{
input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{
{
Node: &pbservice.Node{
@ -1636,7 +1660,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
},
},
},
input: &pbservice.IndexedCheckServiceNodes{},
input: &pbpeerstream.ExportedService{},
expect: map[string]structs.CheckServiceNodes{
"api": {},
},
@ -1695,7 +1719,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
},
},
// Nil input is for the "api" service.
input: &pbservice.IndexedCheckServiceNodes{},
input: &pbpeerstream.ExportedService{},
expect: map[string]structs.CheckServiceNodes{
"api": {},
// Existing redis service was not affected by deletion.
@ -1761,7 +1785,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
},
},
},
input: &pbservice.IndexedCheckServiceNodes{
input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{
{
Node: &pbservice.Node{
@ -1856,7 +1880,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
},
},
},
input: &pbservice.IndexedCheckServiceNodes{
input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{
{
Node: &pbservice.Node{
@ -1991,7 +2015,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
},
},
},
input: &pbservice.IndexedCheckServiceNodes{
input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{
{
Node: &pbservice.Node{

View File

@ -19,6 +19,13 @@ import (
// streaming machinery instead to be cheaper.
func (m *subscriptionManager) notifyExportedServicesForPeerID(ctx context.Context, state *subscriptionState, peerID string) {
// Wait until this is subscribed-to.
select {
case <-m.serviceSubReady:
case <-ctx.Done():
return
}
// syncSubscriptionsAndBlock ensures that the subscriptions to the subscription backend
// match the list of services exported to the peer.
m.syncViaBlockingQuery(ctx, "exported-services", func(ctx context.Context, store StateStore, ws memdb.WatchSet) (interface{}, error) {
@ -34,6 +41,13 @@ func (m *subscriptionManager) notifyExportedServicesForPeerID(ctx context.Contex
// TODO: add a new streaming subscription type to list-by-kind-and-partition since we're getting evictions
func (m *subscriptionManager) notifyMeshGatewaysForPartition(ctx context.Context, state *subscriptionState, partition string) {
// Wait until this is subscribed-to.
select {
case <-m.serviceSubReady:
case <-ctx.Done():
return
}
m.syncViaBlockingQuery(ctx, "mesh-gateways", func(ctx context.Context, store StateStore, ws memdb.WatchSet) (interface{}, error) {
// Fetch our current list of all mesh gateways.
entMeta := structs.DefaultEnterpriseMetaInPartition(partition)

View File

@ -19,6 +19,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbpeerstream"
"github.com/hashicorp/consul/proto/pbservice"
)
@ -33,12 +34,14 @@ type SubscriptionBackend interface {
// subscriptionManager handlers requests to subscribe to events from an events publisher.
type subscriptionManager struct {
logger hclog.Logger
config Config
trustDomain string
viewStore MaterializedViewStore
backend SubscriptionBackend
getStore func() StateStore
logger hclog.Logger
config Config
trustDomain string
viewStore MaterializedViewStore
backend SubscriptionBackend
getStore func() StateStore
serviceSubReady <-chan struct{}
trustBundlesSubReady <-chan struct{}
}
// TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering.
@ -49,18 +52,21 @@ func newSubscriptionManager(
trustDomain string,
backend SubscriptionBackend,
getStore func() StateStore,
remoteSubTracker *resourceSubscriptionTracker,
) *subscriptionManager {
logger = logger.Named("subscriptions")
store := submatview.NewStore(logger.Named("viewstore"))
go store.Run(ctx)
return &subscriptionManager{
logger: logger,
config: config,
trustDomain: trustDomain,
viewStore: store,
backend: backend,
getStore: getStore,
logger: logger,
config: config,
trustDomain: trustDomain,
viewStore: store,
backend: backend,
getStore: getStore,
serviceSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLExportedService),
trustBundlesSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLPeeringTrustBundle),
}
}
@ -297,6 +303,13 @@ func (m *subscriptionManager) notifyRootCAUpdatesForPartition(
updateCh chan<- cache.UpdateEvent,
partition string,
) {
// Wait until this is subscribed-to.
select {
case <-m.trustBundlesSubReady:
case <-ctx.Done():
return
}
var idx uint64
// TODO(peering): retry logic; fail past a threshold
for {

View File

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbpeerstream"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/prototest"
"github.com/hashicorp/consul/sdk/testutil"
@ -32,12 +33,16 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
_, id := backend.ensurePeering(t, "my-peering")
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
// Only configure a tracker for catalog events.
tracker := newResourceSubscriptionTracker()
tracker.Subscribe(pbpeerstream.TypeURLExportedService)
mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{
Datacenter: "dc1",
ConnectEnabled: true,
}, connect.TestTrustDomain, backend, func() StateStore {
return backend.store
})
}, tracker)
subCh := mgr.subscribe(ctx, id, "my-peering", partition)
var (
@ -442,12 +447,16 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
_, id := backend.ensurePeering(t, "my-peering")
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
// Only configure a tracker for catalog events.
tracker := newResourceSubscriptionTracker()
tracker.Subscribe(pbpeerstream.TypeURLExportedService)
mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{
Datacenter: "dc1",
ConnectEnabled: true,
}, connect.TestTrustDomain, backend, func() StateStore {
return backend.store
})
}, tracker)
subCh := mgr.subscribe(ctx, id, "my-peering", partition)
// Register two services that are not yet exported
@ -571,21 +580,21 @@ func TestSubscriptionManager_CARoots(t *testing.T) {
_, id := backend.ensurePeering(t, "my-peering")
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
// Only configure a tracker for CA roots events.
tracker := newResourceSubscriptionTracker()
tracker.Subscribe(pbpeerstream.TypeURLPeeringTrustBundle)
mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{
Datacenter: "dc1",
ConnectEnabled: true,
}, connect.TestTrustDomain, backend, func() StateStore {
return backend.store
})
}, tracker)
subCh := mgr.subscribe(ctx, id, "my-peering", partition)
testutil.RunStep(t, "initial events contain trust bundle", func(t *testing.T) {
// events are ordered so we can expect a deterministic list
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
// mesh-gateway assertions are done in other tests
require.Equal(t, subMeshGateway+partition, got.CorrelationID)
},
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, subCARoot, got.CorrelationID)
roots, ok := got.Result.(*pbpeering.PeeringTrustBundle)

View File

@ -2,6 +2,7 @@ package peerstream
import (
"context"
"fmt"
"io"
"sync"
"time"
@ -24,14 +25,7 @@ func (c *MockClient) Send(r *pbpeerstream.ReplicationMessage) error {
}
func (c *MockClient) Recv() (*pbpeerstream.ReplicationMessage, error) {
select {
case err := <-c.ErrCh:
return nil, err
case r := <-c.ReplicationStream.sendCh:
return r, nil
case <-time.After(10 * time.Millisecond):
return nil, io.EOF
}
return c.RecvWithTimeout(10 * time.Millisecond)
}
func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeerstream.ReplicationMessage, error) {
@ -61,7 +55,6 @@ type MockStream struct {
recvCh chan *pbpeerstream.ReplicationMessage
ctx context.Context
mu sync.Mutex
}
var _ pbpeerstream.PeerStreamService_StreamResourcesServer = (*MockStream)(nil)
@ -117,12 +110,37 @@ func (s *MockStream) SendHeader(metadata.MD) error {
// SetTrailer implements grpc.ServerStream
func (s *MockStream) SetTrailer(metadata.MD) {}
// incrementalTime is an artificial clock used during testing. For those
// scenarios you would pass around the method pointer for `Now` in places where
// you would be using `time.Now`.
type incrementalTime struct {
base time.Time
next uint64
mu sync.Mutex
}
// Now advances the internal clock by 1 second and returns that value.
func (t *incrementalTime) Now() time.Time {
t.mu.Lock()
defer t.mu.Unlock()
t.next++
return t.base.Add(time.Duration(t.next) * time.Second)
dur := time.Duration(t.next) * time.Second
return t.base.Add(dur)
}
// FutureNow will return a given future value of the Now() function.
// The numerical argument indicates which future Now value you wanted. The
// value must be > 0.
func (t *incrementalTime) FutureNow(n int) time.Time {
if n < 1 {
panic(fmt.Sprintf("argument must be > 1 but was %d", n))
}
t.mu.Lock()
defer t.mu.Unlock()
dur := time.Duration(t.next+uint64(n)) * time.Second
return t.base.Add(dur)
}

View File

@ -0,0 +1,25 @@
package pbpeerstream
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
pbservice "github.com/hashicorp/consul/proto/pbservice"
)
// CheckServiceNodesToStruct converts the contained CheckServiceNodes to their structs equivalent.
func (s *ExportedService) CheckServiceNodesToStruct() ([]structs.CheckServiceNode, error) {
if s == nil {
return nil, nil
}
resp := make([]structs.CheckServiceNode, 0, len(s.Nodes))
for _, pb := range s.Nodes {
instance, err := pbservice.CheckServiceNodeToStructs(pb)
if err != nil {
return resp, fmt.Errorf("failed to convert instance: %w", err)
}
resp = append(resp, *instance)
}
return resp, nil
}

View File

@ -56,3 +56,13 @@ func (msg *LeaderAddress) MarshalBinary() ([]byte, error) {
func (msg *LeaderAddress) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *ExportedService) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *ExportedService) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}

View File

@ -7,6 +7,7 @@
package pbpeerstream
import (
pbservice "github.com/hashicorp/consul/proto/pbservice"
pbstatus "github.com/hashicorp/consul/proto/pbstatus"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
@ -220,6 +221,54 @@ func (x *LeaderAddress) GetAddress() string {
return ""
}
// ExportedService is one of the types of data returned via peer stream replication.
type ExportedService struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Nodes []*pbservice.CheckServiceNode `protobuf:"bytes,1,rep,name=Nodes,proto3" json:"Nodes,omitempty"`
}
func (x *ExportedService) Reset() {
*x = ExportedService{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ExportedService) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ExportedService) ProtoMessage() {}
func (x *ExportedService) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ExportedService.ProtoReflect.Descriptor instead.
func (*ExportedService) Descriptor() ([]byte, []int) {
return file_proto_pbpeerstream_peerstream_proto_rawDescGZIP(), []int{2}
}
func (x *ExportedService) GetNodes() []*pbservice.CheckServiceNode {
if x != nil {
return x.Nodes
}
return nil
}
// A Request requests to subscribe to a resource of a given type.
type ReplicationMessage_Request struct {
state protoimpl.MessageState
@ -244,7 +293,7 @@ type ReplicationMessage_Request struct {
func (x *ReplicationMessage_Request) Reset() {
*x = ReplicationMessage_Request{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[2]
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -257,7 +306,7 @@ func (x *ReplicationMessage_Request) String() string {
func (*ReplicationMessage_Request) ProtoMessage() {}
func (x *ReplicationMessage_Request) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[2]
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -323,7 +372,7 @@ type ReplicationMessage_Response struct {
func (x *ReplicationMessage_Response) Reset() {
*x = ReplicationMessage_Response{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[3]
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -336,7 +385,7 @@ func (x *ReplicationMessage_Response) String() string {
func (*ReplicationMessage_Response) ProtoMessage() {}
func (x *ReplicationMessage_Response) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[3]
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -398,7 +447,7 @@ type ReplicationMessage_Terminated struct {
func (x *ReplicationMessage_Terminated) Reset() {
*x = ReplicationMessage_Terminated{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[4]
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -411,7 +460,7 @@ func (x *ReplicationMessage_Terminated) String() string {
func (*ReplicationMessage_Terminated) ProtoMessage() {}
func (x *ReplicationMessage_Terminated) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[4]
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -436,92 +485,99 @@ var file_proto_pbpeerstream_peerstream_proto_rawDesc = []byte{
0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c,
0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x1a, 0x19, 0x67, 0x6f, 0x6f,
0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62,
0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x22, 0xe5, 0x05, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74,
0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x5c, 0x0a, 0x07, 0x72, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x40, 0x2e, 0x68, 0x61,
0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69,
0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52,
0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x5f, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x68, 0x61, 0x73,
0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e,
0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52,
0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x65, 0x0a, 0x0a, 0x74, 0x65, 0x72,
0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x43, 0x2e,
0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c,
0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74,
0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74,
0x65, 0x64, 0x48, 0x00, 0x52, 0x0a, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64,
0x1a, 0xa9, 0x01, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06,
0x50, 0x65, 0x65, 0x72, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x65,
0x65, 0x72, 0x49, 0x44, 0x12, 0x24, 0x0a, 0x0d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x52, 0x65,
0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x12, 0x3e, 0x0a, 0x05,
0x45, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x68, 0x61,
0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69,
0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x53,
0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x1a, 0xe3, 0x01, 0x0a,
0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x4e, 0x6f, 0x6e,
0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x12,
0x20, 0x0a, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52,
0x4c, 0x12, 0x1e, 0x0a, 0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x44, 0x18,
0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49,
0x44, 0x12, 0x30, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x04, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75,
0x72, 0x63, 0x65, 0x12, 0x4d, 0x0a, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2f, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1a, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62,
0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x1a, 0x1b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x73, 0x74, 0x61, 0x74,
0x75, 0x73, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22,
0xe5, 0x05, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x5c, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x40, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63,
0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72,
0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52,
0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x12, 0x5f, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f,
0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e,
0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4f, 0x70,
0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x1a, 0x0c, 0x0a, 0x0a, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64,
0x42, 0x09, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x29, 0x0a, 0x0d, 0x4c,
0x65, 0x61, 0x64, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x18, 0x0a, 0x07,
0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x2a, 0x52, 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74,
0x69, 0x6f, 0x6e, 0x12, 0x19, 0x0a, 0x15, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e,
0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x14,
0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x50, 0x53, 0x45,
0x52, 0x54, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f,
0x4e, 0x5f, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x10, 0x02, 0x32, 0x9f, 0x01, 0x0a, 0x11, 0x50,
0x65, 0x65, 0x72, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
0x12, 0x89, 0x01, 0x0a, 0x0f, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x6f, 0x75,
0x72, 0x63, 0x65, 0x73, 0x12, 0x38, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70,
0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65,
0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x08, 0x72, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x65, 0x0a, 0x0a, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61,
0x74, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x43, 0x2e, 0x68, 0x61, 0x73, 0x68,
0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74,
0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d,
0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x2e, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x48, 0x00,
0x52, 0x0a, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x1a, 0xa9, 0x01, 0x0a,
0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x65, 0x65, 0x72,
0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x65, 0x65, 0x72, 0x49, 0x44,
0x12, 0x24, 0x0a, 0x0d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4e, 0x6f, 0x6e, 0x63,
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72,
0x63, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x52, 0x65, 0x73,
0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x12, 0x3e, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f,
0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63,
0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72,
0x6e, 0x61, 0x6c, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75,
0x73, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x1a, 0xe3, 0x01, 0x0a, 0x08, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x52,
0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x12, 0x1e, 0x0a,
0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x44, 0x12, 0x30, 0x0a,
0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12,
0x4d, 0x0a, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01,
0x28, 0x0e, 0x32, 0x2f, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63,
0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70,
0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74,
0x69, 0x6f, 0x6e, 0x52, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x0c,
0x0a, 0x0a, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x42, 0x09, 0x0a, 0x07,
0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x29, 0x0a, 0x0d, 0x4c, 0x65, 0x61, 0x64, 0x65,
0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72,
0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65,
0x73, 0x73, 0x22, 0x5c, 0x0a, 0x0f, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x64, 0x53, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x49, 0x0a, 0x05, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x18, 0x01,
0x20, 0x03, 0x28, 0x0b, 0x32, 0x33, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70,
0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c,
0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c,
0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x38,
0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x05, 0x4e, 0x6f, 0x64, 0x65, 0x73,
0x2a, 0x52, 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x0a,
0x15, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45,
0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52,
0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x50, 0x53, 0x45, 0x52, 0x54, 0x10, 0x01, 0x12, 0x14,
0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x44, 0x45, 0x4c, 0x45,
0x54, 0x45, 0x10, 0x02, 0x32, 0x9f, 0x01, 0x0a, 0x11, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x72,
0x65, 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, 0x01, 0x0a, 0x0f, 0x53,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x38,
0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75,
0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f,
0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x9f, 0x02, 0x0a,
0x28, 0x63, 0x6f, 0x6d, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63,
0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70,
0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x42, 0x0f, 0x50, 0x65, 0x65, 0x72, 0x73,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2e, 0x67, 0x69,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f,
0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f,
0x70, 0x62, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0xa2, 0x02, 0x04, 0x48,
0x43, 0x49, 0x50, 0xaa, 0x02, 0x24, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e,
0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e,
0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0xca, 0x02, 0x24, 0x48, 0x61, 0x73,
0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49, 0x6e,
0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6d, 0xe2, 0x02, 0x30, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f,
0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x50, 0x65,
0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61,
0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x27, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70,
0x3a, 0x3a, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x3a, 0x3a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e,
0x61, 0x6c, 0x3a, 0x3a, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x38, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69,
0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65,
0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e,
0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x9f, 0x02, 0x0a, 0x28, 0x63, 0x6f, 0x6d, 0x2e, 0x68,
0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e,
0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72,
0x65, 0x61, 0x6d, 0x42, 0x0f, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50,
0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e,
0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x70, 0x65, 0x65, 0x72,
0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0xa2, 0x02, 0x04, 0x48, 0x43, 0x49, 0x50, 0xaa, 0x02, 0x24,
0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c,
0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74,
0x72, 0x65, 0x61, 0x6d, 0xca, 0x02, 0x24, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70,
0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c,
0x5c, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0xe2, 0x02, 0x30, 0x48, 0x61,
0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49,
0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02,
0x27, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x3a, 0x3a, 0x43, 0x6f, 0x6e, 0x73,
0x75, 0x6c, 0x3a, 0x3a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x3a, 0x3a, 0x50, 0x65,
0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -537,31 +593,34 @@ func file_proto_pbpeerstream_peerstream_proto_rawDescGZIP() []byte {
}
var file_proto_pbpeerstream_peerstream_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_proto_pbpeerstream_peerstream_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_proto_pbpeerstream_peerstream_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_proto_pbpeerstream_peerstream_proto_goTypes = []interface{}{
(Operation)(0), // 0: hashicorp.consul.internal.peerstream.Operation
(*ReplicationMessage)(nil), // 1: hashicorp.consul.internal.peerstream.ReplicationMessage
(*LeaderAddress)(nil), // 2: hashicorp.consul.internal.peerstream.LeaderAddress
(*ReplicationMessage_Request)(nil), // 3: hashicorp.consul.internal.peerstream.ReplicationMessage.Request
(*ReplicationMessage_Response)(nil), // 4: hashicorp.consul.internal.peerstream.ReplicationMessage.Response
(*ReplicationMessage_Terminated)(nil), // 5: hashicorp.consul.internal.peerstream.ReplicationMessage.Terminated
(*pbstatus.Status)(nil), // 6: hashicorp.consul.internal.status.Status
(*anypb.Any)(nil), // 7: google.protobuf.Any
(*ExportedService)(nil), // 3: hashicorp.consul.internal.peerstream.ExportedService
(*ReplicationMessage_Request)(nil), // 4: hashicorp.consul.internal.peerstream.ReplicationMessage.Request
(*ReplicationMessage_Response)(nil), // 5: hashicorp.consul.internal.peerstream.ReplicationMessage.Response
(*ReplicationMessage_Terminated)(nil), // 6: hashicorp.consul.internal.peerstream.ReplicationMessage.Terminated
(*pbservice.CheckServiceNode)(nil), // 7: hashicorp.consul.internal.service.CheckServiceNode
(*pbstatus.Status)(nil), // 8: hashicorp.consul.internal.status.Status
(*anypb.Any)(nil), // 9: google.protobuf.Any
}
var file_proto_pbpeerstream_peerstream_proto_depIdxs = []int32{
3, // 0: hashicorp.consul.internal.peerstream.ReplicationMessage.request:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Request
4, // 1: hashicorp.consul.internal.peerstream.ReplicationMessage.response:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Response
5, // 2: hashicorp.consul.internal.peerstream.ReplicationMessage.terminated:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Terminated
6, // 3: hashicorp.consul.internal.peerstream.ReplicationMessage.Request.Error:type_name -> hashicorp.consul.internal.status.Status
7, // 4: hashicorp.consul.internal.peerstream.ReplicationMessage.Response.Resource:type_name -> google.protobuf.Any
0, // 5: hashicorp.consul.internal.peerstream.ReplicationMessage.Response.operation:type_name -> hashicorp.consul.internal.peerstream.Operation
1, // 6: hashicorp.consul.internal.peerstream.PeerStreamService.StreamResources:input_type -> hashicorp.consul.internal.peerstream.ReplicationMessage
1, // 7: hashicorp.consul.internal.peerstream.PeerStreamService.StreamResources:output_type -> hashicorp.consul.internal.peerstream.ReplicationMessage
7, // [7:8] is the sub-list for method output_type
6, // [6:7] is the sub-list for method input_type
6, // [6:6] is the sub-list for extension type_name
6, // [6:6] is the sub-list for extension extendee
0, // [0:6] is the sub-list for field type_name
4, // 0: hashicorp.consul.internal.peerstream.ReplicationMessage.request:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Request
5, // 1: hashicorp.consul.internal.peerstream.ReplicationMessage.response:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Response
6, // 2: hashicorp.consul.internal.peerstream.ReplicationMessage.terminated:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Terminated
7, // 3: hashicorp.consul.internal.peerstream.ExportedService.Nodes:type_name -> hashicorp.consul.internal.service.CheckServiceNode
8, // 4: hashicorp.consul.internal.peerstream.ReplicationMessage.Request.Error:type_name -> hashicorp.consul.internal.status.Status
9, // 5: hashicorp.consul.internal.peerstream.ReplicationMessage.Response.Resource:type_name -> google.protobuf.Any
0, // 6: hashicorp.consul.internal.peerstream.ReplicationMessage.Response.operation:type_name -> hashicorp.consul.internal.peerstream.Operation
1, // 7: hashicorp.consul.internal.peerstream.PeerStreamService.StreamResources:input_type -> hashicorp.consul.internal.peerstream.ReplicationMessage
1, // 8: hashicorp.consul.internal.peerstream.PeerStreamService.StreamResources:output_type -> hashicorp.consul.internal.peerstream.ReplicationMessage
8, // [8:9] is the sub-list for method output_type
7, // [7:8] is the sub-list for method input_type
7, // [7:7] is the sub-list for extension type_name
7, // [7:7] is the sub-list for extension extendee
0, // [0:7] is the sub-list for field type_name
}
func init() { file_proto_pbpeerstream_peerstream_proto_init() }
@ -595,7 +654,7 @@ func file_proto_pbpeerstream_peerstream_proto_init() {
}
}
file_proto_pbpeerstream_peerstream_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReplicationMessage_Request); i {
switch v := v.(*ExportedService); i {
case 0:
return &v.state
case 1:
@ -607,7 +666,7 @@ func file_proto_pbpeerstream_peerstream_proto_init() {
}
}
file_proto_pbpeerstream_peerstream_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReplicationMessage_Response); i {
switch v := v.(*ReplicationMessage_Request); i {
case 0:
return &v.state
case 1:
@ -619,6 +678,18 @@ func file_proto_pbpeerstream_peerstream_proto_init() {
}
}
file_proto_pbpeerstream_peerstream_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReplicationMessage_Response); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_pbpeerstream_peerstream_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReplicationMessage_Terminated); i {
case 0:
return &v.state
@ -642,7 +713,7 @@ func file_proto_pbpeerstream_peerstream_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_proto_pbpeerstream_peerstream_proto_rawDesc,
NumEnums: 1,
NumMessages: 5,
NumMessages: 6,
NumExtensions: 0,
NumServices: 1,
},

View File

@ -3,6 +3,7 @@ syntax = "proto3";
package hashicorp.consul.internal.peerstream;
import "google/protobuf/any.proto";
import "proto/pbservice/node.proto";
// TODO(peering): Handle this some other way
import "proto/pbstatus/status.proto";
@ -89,3 +90,8 @@ message LeaderAddress {
// address is an ip:port best effort hint at what could be the cluster leader's address
string address = 1;
}
// ExportedService is one of the types of data returned via peer stream replication.
message ExportedService {
repeated hashicorp.consul.internal.service.CheckServiceNode Nodes = 1;
}

View File

@ -1,10 +1,16 @@
package pbpeerstream
const (
TypeURLService = "type.googleapis.com/consul.api.Service"
TypeURLRoots = "type.googleapis.com/consul.api.CARoots"
apiTypePrefix = "type.googleapis.com/"
TypeURLExportedService = apiTypePrefix + "hashicorp.consul.internal.peerstream.ExportedService"
TypeURLPeeringTrustBundle = apiTypePrefix + "hashicorp.consul.internal.peering.PeeringTrustBundle"
)
func KnownTypeURL(s string) bool {
return s == TypeURLService || s == TypeURLRoots
switch s {
case TypeURLExportedService, TypeURLPeeringTrustBundle:
return true
}
return false
}

View File

@ -1,8 +1,6 @@
package pbservice
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/types"
@ -44,23 +42,6 @@ func NewMapHeadersFromStructs(t map[string][]string) map[string]*HeaderValue {
return s
}
// CheckServiceNodesToStruct converts the contained CheckServiceNodes to their structs equivalent.
func (s *IndexedCheckServiceNodes) CheckServiceNodesToStruct() ([]structs.CheckServiceNode, error) {
if s == nil {
return nil, nil
}
resp := make([]structs.CheckServiceNode, 0, len(s.Nodes))
for _, pb := range s.Nodes {
instance, err := CheckServiceNodeToStructs(pb)
if err != nil {
return resp, fmt.Errorf("failed to convert instance: %w", err)
}
resp = append(resp, *instance)
}
return resp, nil
}
// TODO: use mog once it supports pointers and slices
func CheckServiceNodeToStructs(s *CheckServiceNode) (*structs.CheckServiceNode, error) {
if s == nil {

View File

@ -16,3 +16,60 @@ func AssertDeepEqual(t testing.TB, x, y interface{}, opts ...cmp.Option) {
t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
}
}
// AssertElementsMatch asserts that the specified listX(array, slice...) is
// equal to specified listY(array, slice...) ignoring the order of the
// elements. If there are duplicate elements, the number of appearances of each
// of them in both lists should match.
//
// prototest.AssertElementsMatch(t, [1, 3, 2, 3], [1, 3, 3, 2])
func AssertElementsMatch[V any](
t testing.TB, listX, listY []V, opts ...cmp.Option,
) {
t.Helper()
if len(listX) == 0 && len(listY) == 0 {
return
}
opts = append(opts, protocmp.Transform())
// dump into a map keyed by sliceID
mapX := make(map[int]V)
for i, val := range listX {
mapX[i] = val
}
mapY := make(map[int]V)
for i, val := range listY {
mapY[i] = val
}
var outX, outY []V
for i, itemX := range mapX {
for j, itemY := range mapY {
if diff := cmp.Diff(itemX, itemY, opts...); diff == "" {
outX = append(outX, itemX)
outY = append(outY, itemY)
delete(mapX, i)
delete(mapY, j)
}
}
}
if len(outX) == len(outY) && len(outX) == len(listX) {
return // matches
}
// dump remainder into the slice so we can generate a useful error
for _, itemX := range mapX {
outX = append(outX, itemX)
}
for _, itemY := range mapY {
outY = append(outY, itemY)
}
if diff := cmp.Diff(outX, outY, opts...); diff != "" {
t.Fatalf("assertion failed: slices do not have matching elements\n--- expected\n+++ actual\n%v", diff)
}
}