[OSS] Add upsert handling for receiving CheckServiceNode (#13061)

This commit is contained in:
Freddy 2022-05-12 15:04:44 -06:00 committed by GitHub
parent 42aec5caf4
commit 8894365c5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 628 additions and 85 deletions

View File

@ -100,6 +100,10 @@ func (m *EnterpriseMeta) UnsetPartition() {
// do nothing // do nothing
} }
func (m *EnterpriseMeta) OverridePartition(_ string) {
// do nothing
}
func NewEnterpriseMetaWithPartition(_, _ string) EnterpriseMeta { func NewEnterpriseMetaWithPartition(_, _ string) EnterpriseMeta {
return emptyEnterpriseMeta return emptyEnterpriseMeta
} }

View File

@ -112,17 +112,7 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink,
n := node.(*structs.Node) n := node.(*structs.Node)
nodeEntMeta := n.GetEnterpriseMeta() nodeEntMeta := n.GetEnterpriseMeta()
req := structs.RegisterRequest{ req := n.ToRegisterRequest()
ID: n.ID,
Node: n.Node,
Datacenter: n.Datacenter,
Address: n.Address,
TaggedAddresses: n.TaggedAddresses,
NodeMeta: n.Meta,
RaftIndex: n.RaftIndex,
EnterpriseMeta: *nodeEntMeta,
PeerName: n.PeerName,
}
// Register the node itself // Register the node itself
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {

View File

@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"net" "net"
"github.com/hashicorp/consul/agent/rpc/peering"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
@ -207,7 +208,13 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer
return err return err
} }
err = s.peeringService.HandleStream(peer.ID, peer.PeerID, stream) err = s.peeringService.HandleStream(peering.HandleStreamRequest{
LocalID: peer.ID,
RemoteID: peer.PeerID,
PeerName: peer.Name,
Partition: peer.Partition,
Stream: stream,
})
if err == nil { if err == nil {
// This will cancel the retry-er context, letting us break out of this loop when we want to shut down the stream. // This will cancel the retry-er context, letting us break out of this loop when we want to shut down the stream.
cancel() cancel()

View File

@ -123,4 +123,9 @@ func (a *peeringApply) PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDR
return err return err
} }
func (a *peeringApply) CatalogRegister(req *structs.RegisterRequest) error {
_, err := a.srv.leaderRaftApply("Catalog.Register", structs.RegisterRequestType, req)
return err
}
var _ peering.Apply = (*peeringApply)(nil) var _ peering.Apply = (*peeringApply)(nil)

View File

@ -30,6 +30,7 @@ import (
"github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbstatus" "github.com/hashicorp/consul/proto/pbstatus"
"github.com/hashicorp/consul/types"
) )
var ( var (
@ -105,6 +106,7 @@ type Backend interface {
// Store provides a read-only interface for querying Peering data. // Store provides a read-only interface for querying Peering data.
type Store interface { type Store interface {
PeeringRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.Peering, error) PeeringRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.Peering, error)
PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error)
PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, []structs.ServiceName, error) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, []structs.ServiceName, error)
AbandonCh() <-chan struct{} AbandonCh() <-chan struct{}
@ -115,6 +117,7 @@ type Apply interface {
PeeringWrite(req *pbpeering.PeeringWriteRequest) error PeeringWrite(req *pbpeering.PeeringWriteRequest) error
PeeringDelete(req *pbpeering.PeeringDeleteRequest) error PeeringDelete(req *pbpeering.PeeringDeleteRequest) error
PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error
CatalogRegister(req *structs.RegisterRequest) error
} }
// GenerateToken implements the PeeringService RPC method to generate a // GenerateToken implements the PeeringService RPC method to generate a
@ -405,44 +408,75 @@ func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResource
return grpcstatus.Error(codes.InvalidArgument, fmt.Sprintf("subscription request to unknown resource URL: %s", req.ResourceURL)) return grpcstatus.Error(codes.InvalidArgument, fmt.Sprintf("subscription request to unknown resource URL: %s", req.ResourceURL))
} }
// TODO(peering): Validate that a peering exists for this peer _, p, err := s.Backend.Store().PeeringReadByID(nil, req.PeerID)
if err != nil {
s.logger.Error("failed to look up peer", "peer_id", req.PeerID, "error", err)
return grpcstatus.Error(codes.Internal, "failed to find PeerID: "+req.PeerID)
}
if p == nil {
return grpcstatus.Error(codes.InvalidArgument, "initial subscription for unknown PeerID: "+req.PeerID)
}
// TODO(peering): If the peering is marked as deleted, send a Terminated message and return // TODO(peering): If the peering is marked as deleted, send a Terminated message and return
// TODO(peering): Store subscription request so that an event publisher can separately handle pushing messages for it // TODO(peering): Store subscription request so that an event publisher can separately handle pushing messages for it
s.logger.Info("accepted initial replication request from peer", "peer_id", req.PeerID) s.logger.Info("accepted initial replication request from peer", "peer_id", req.PeerID)
// For server peers both of these ID values are the same, because we generated a token with a local ID, // For server peers both of these ID values are the same, because we generated a token with a local ID,
// and the client peer dials using that same ID. // and the client peer dials using that same ID.
return s.HandleStream(req.PeerID, req.PeerID, stream) return s.HandleStream(HandleStreamRequest{
LocalID: req.PeerID,
RemoteID: req.PeerID,
PeerName: p.Name,
Partition: p.Partition,
Stream: stream,
})
}
type HandleStreamRequest struct {
// LocalID is the UUID for the peering in the local Consul datacenter.
LocalID string
// RemoteID is the UUID for the peering from the perspective of the peer.
RemoteID string
// PeerName is the name of the peering.
PeerName string
// Partition is the local partition associated with the peer.
Partition string
// Stream is the open stream to the peer cluster.
Stream BidirectionalStream
} }
// The localID provided is the locally-generated identifier for the peering. // The localID provided is the locally-generated identifier for the peering.
// The remoteID is an identifier that the remote peer recognizes for the peering. // The remoteID is an identifier that the remote peer recognizes for the peering.
func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStream) error { func (s *Service) HandleStream(req HandleStreamRequest) error {
logger := s.logger.Named("stream").With("peer_id", localID) logger := s.logger.Named("stream").With("peer_id", req.LocalID)
logger.Trace("handling stream for peer") logger.Trace("handling stream for peer")
status, err := s.streams.connected(localID) status, err := s.streams.connected(req.LocalID)
if err != nil { if err != nil {
return fmt.Errorf("failed to register stream: %v", err) return fmt.Errorf("failed to register stream: %v", err)
} }
// TODO(peering) Also need to clear subscriptions associated with the peer // TODO(peering) Also need to clear subscriptions associated with the peer
defer s.streams.disconnected(localID) defer s.streams.disconnected(req.LocalID)
mgr := newSubscriptionManager(stream.Context(), logger, s.Backend) mgr := newSubscriptionManager(req.Stream.Context(), logger, s.Backend)
subCh := mgr.subscribe(stream.Context(), localID) subCh := mgr.subscribe(req.Stream.Context(), req.LocalID)
sub := &pbpeering.ReplicationMessage{ sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{ Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{ Request: &pbpeering.ReplicationMessage_Request{
ResourceURL: pbpeering.TypeURLService, ResourceURL: pbpeering.TypeURLService,
PeerID: remoteID, PeerID: req.RemoteID,
}, },
}, },
} }
logTraceSend(logger, sub) logTraceSend(logger, sub)
if err := stream.Send(sub); err != nil { if err := req.Stream.Send(sub); err != nil {
if err == io.EOF { if err == io.EOF {
logger.Info("stream ended by peer") logger.Info("stream ended by peer")
status.trackReceiveError(err.Error()) status.trackReceiveError(err.Error())
@ -458,7 +492,7 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr
go func() { go func() {
defer close(recvChan) defer close(recvChan)
for { for {
msg, err := stream.Recv() msg, err := req.Stream.Recv()
if err == io.EOF { if err == io.EOF {
logger.Info("stream ended by peer") logger.Info("stream ended by peer")
status.trackReceiveError(err.Error()) status.trackReceiveError(err.Error())
@ -494,13 +528,13 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr
} }
logTraceSend(logger, term) logTraceSend(logger, term)
if err := stream.Send(term); err != nil { if err := req.Stream.Send(term); err != nil {
status.trackSendError(err.Error()) status.trackSendError(err.Error())
return fmt.Errorf("failed to send to stream: %v", err) return fmt.Errorf("failed to send to stream: %v", err)
} }
logger.Trace("deleting stream status") logger.Trace("deleting stream status")
s.streams.deleteStatus(localID) s.streams.deleteStatus(req.LocalID)
return nil return nil
@ -528,7 +562,8 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr
} }
if resp := msg.GetResponse(); resp != nil { if resp := msg.GetResponse(); resp != nil {
req, err := processResponse(resp) // TODO(peering): Ensure there's a nonce
reply, err := s.processResponse(req.PeerName, req.Partition, resp)
if err != nil { if err != nil {
logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID) logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID)
status.trackReceiveError(err.Error()) status.trackReceiveError(err.Error())
@ -536,8 +571,8 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr
status.trackReceiveSuccess() status.trackReceiveSuccess()
} }
logTraceSend(logger, req) logTraceSend(logger, reply)
if err := stream.Send(req); err != nil { if err := req.Stream.Send(reply); err != nil {
status.trackSendError(err.Error()) status.trackSendError(err.Error())
return fmt.Errorf("failed to send to stream: %v", err) return fmt.Errorf("failed to send to stream: %v", err)
} }
@ -549,7 +584,7 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr
logger.Info("received peering termination message, cleaning up imported resources") logger.Info("received peering termination message, cleaning up imported resources")
// Once marked as terminated, a separate deferred deletion routine will clean up imported resources. // Once marked as terminated, a separate deferred deletion routine will clean up imported resources.
if err := s.Backend.Apply().PeeringTerminateByID(&pbpeering.PeeringTerminateByIDRequest{ID: localID}); err != nil { if err := s.Backend.Apply().PeeringTerminateByID(&pbpeering.PeeringTerminateByIDRequest{ID: req.LocalID}); err != nil {
return err return err
} }
return nil return nil
@ -558,7 +593,7 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr
case update := <-subCh: case update := <-subCh:
switch { switch {
case strings.HasPrefix(update.CorrelationID, subExportedService): case strings.HasPrefix(update.CorrelationID, subExportedService):
if err := pushServiceResponse(logger, stream, status, update); err != nil { if err := pushServiceResponse(logger, req.Stream, status, update); err != nil {
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err) return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
} }
@ -667,7 +702,7 @@ func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbp
return msg return msg
} }
func processResponse(resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) { func (s *Service) processResponse(peerName string, partition string, resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) {
var ( var (
err error err error
errCode code.Code errCode code.Code
@ -682,7 +717,10 @@ func processResponse(resp *pbpeering.ReplicationMessage_Response) (*pbpeering.Re
switch resp.Operation { switch resp.Operation {
case pbpeering.ReplicationMessage_Response_UPSERT: case pbpeering.ReplicationMessage_Response_UPSERT:
err = handleUpsert(resp.ResourceURL, resp.Resource) if resp.Resource == nil {
break
}
err = s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource)
if err != nil { if err != nil {
errCode = code.Code_INTERNAL errCode = code.Code_INTERNAL
errMsg = err.Error() errMsg = err.Error()
@ -710,8 +748,90 @@ func processResponse(resp *pbpeering.ReplicationMessage_Response) (*pbpeering.Re
return makeReply(resp.ResourceURL, resp.Nonce, errCode, errMsg), err return makeReply(resp.ResourceURL, resp.Nonce, errCode, errMsg), err
} }
func handleUpsert(resourceURL string, resource *anypb.Any) error { func (s *Service) handleUpsert(peerName string, partition string, resourceURL string, resourceID string, resource *anypb.Any) error {
// TODO(peering): implement csn := &pbservice.IndexedCheckServiceNodes{}
err := ptypes.UnmarshalAny(resource, csn)
if err != nil {
return fmt.Errorf("failed to unmarshal resource, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
}
if csn == nil || len(csn.Nodes) == 0 {
return nil
}
type checkTuple struct {
checkID types.CheckID
serviceID string
nodeID types.NodeID
acl.EnterpriseMeta
}
var (
nodes = make(map[types.NodeID]*structs.Node)
services = make(map[types.NodeID][]*structs.NodeService)
checks = make(map[types.NodeID]map[checkTuple]*structs.HealthCheck)
)
for _, pbinstance := range csn.Nodes {
instance, err := pbservice.CheckServiceNodeToStructs(pbinstance)
if err != nil {
return fmt.Errorf("failed to convert instance, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
}
nodes[instance.Node.ID] = instance.Node
services[instance.Node.ID] = append(services[instance.Node.ID], instance.Service)
if _, ok := checks[instance.Node.ID]; !ok {
checks[instance.Node.ID] = make(map[checkTuple]*structs.HealthCheck)
}
for _, c := range instance.Checks {
tuple := checkTuple{
checkID: c.CheckID,
serviceID: c.ServiceID,
nodeID: instance.Node.ID,
EnterpriseMeta: c.EnterpriseMeta,
}
checks[instance.Node.ID][tuple] = c
}
}
for nodeID, node := range nodes {
// For all nodes, services, and checks we override the peer name and partition to be
// the local partition and local name for the peer.
node.PeerName, node.Partition = peerName, partition
// First register the node
req := node.ToRegisterRequest()
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
}
// Then register all services on that node
for _, svc := range services[nodeID] {
svc.PeerName = peerName
svc.OverridePartition(partition)
req.Service = svc
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
}
}
req.Service = nil
// Then register all checks on that node
var chks structs.HealthChecks
for _, c := range checks[nodeID] {
c.PeerName = peerName
c.OverridePartition(partition)
chks = append(chks, c)
}
req.Checks = chks
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
}
}
return nil return nil
} }

View File

@ -11,14 +11,18 @@ import (
"testing" "testing"
"time" "time"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
gogrpc "google.golang.org/grpc" gogrpc "google.golang.org/grpc"
"github.com/hashicorp/consul/agent/consul/state"
grpc "github.com/hashicorp/consul/agent/grpc/private" grpc "github.com/hashicorp/consul/agent/grpc/private"
"github.com/hashicorp/consul/agent/grpc/private/resolver" "github.com/hashicorp/consul/agent/grpc/private/resolver"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/prototest" "github.com/hashicorp/consul/proto/prototest"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
@ -307,6 +311,359 @@ func TestPeeringService_List(t *testing.T) {
prototest.AssertDeepEqual(t, expect, resp) prototest.AssertDeepEqual(t, expect, resp)
} }
func Test_StreamHandler_UpsertServices(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
type testCase struct {
name string
msg *pbpeering.ReplicationMessage_Response
input structs.CheckServiceNodes
expect structs.CheckServiceNodes
}
s := newTestServer(t, nil)
testrpc.WaitForLeader(t, s.Server.RPC, "dc1")
srv := peering.NewService(testutil.Logger(t), consul.NewPeeringBackend(s.Server, nil))
require.NoError(t, s.Server.FSM().State().PeeringWrite(0, &pbpeering.Peering{
Name: "my-peer",
}))
_, p, err := s.Server.FSM().State().PeeringRead(nil, state.Query{Value: "my-peer"})
require.NoError(t, err)
client := peering.NewMockClient(context.Background())
errCh := make(chan error, 1)
client.ErrCh = errCh
go func() {
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
// This matches gRPC's behavior when an error is returned by a server.
err := srv.StreamResources(client.ReplicationStream)
if err != nil {
errCh <- err
}
}()
sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
PeerID: p.ID,
ResourceURL: pbpeering.TypeURLService,
},
},
}
require.NoError(t, client.Send(sub))
// Receive subscription request from peer for our services
_, err = client.Recv()
require.NoError(t, err)
remoteEntMeta := structs.DefaultEnterpriseMetaInPartition("remote-partition")
localEntMeta := acl.DefaultEnterpriseMeta()
localPeerName := "my-peer"
// Scrub data we don't need for the assertions below.
scrubCheckServiceNodes := func(instances structs.CheckServiceNodes) {
for _, csn := range instances {
csn.Node.RaftIndex = structs.RaftIndex{}
csn.Service.TaggedAddresses = nil
csn.Service.Weights = nil
csn.Service.RaftIndex = structs.RaftIndex{}
csn.Service.Proxy = structs.ConnectProxyConfig{}
for _, c := range csn.Checks {
c.RaftIndex = structs.RaftIndex{}
c.Definition = structs.HealthCheckDefinition{}
}
}
}
run := func(t *testing.T, tc testCase) {
pbCSN := &pbservice.IndexedCheckServiceNodes{}
for _, csn := range tc.input {
pbCSN.Nodes = append(pbCSN.Nodes, pbservice.NewCheckServiceNodeFromStructs(&csn))
}
any, err := ptypes.MarshalAny(pbCSN)
require.NoError(t, err)
tc.msg.Resource = any
resp := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Response_{
Response: tc.msg,
},
}
require.NoError(t, client.Send(resp))
msg, err := client.RecvWithTimeout(1 * time.Second)
require.NoError(t, err)
req := msg.GetRequest()
require.NotNil(t, req)
require.Equal(t, tc.msg.Nonce, req.Nonce)
require.Nil(t, req.Error)
_, got, err := s.Server.FSM().State().CombinedCheckServiceNodes(nil, structs.NewServiceName("api", nil), localPeerName)
require.NoError(t, err)
scrubCheckServiceNodes(got)
require.Equal(t, tc.expect, got)
}
// NOTE: These test cases do not run against independent state stores, they show sequential updates for a given service.
// Every new upsert must replace the data from the previous case.
tt := []testCase{
{
name: "upsert an instance on a node",
msg: &pbpeering.ReplicationMessage_Response{
ResourceURL: pbpeering.TypeURLService,
ResourceID: "api",
Nonce: "1",
Operation: pbpeering.ReplicationMessage_Response_UPSERT,
},
input: structs.CheckServiceNodes{
{
Node: &structs.Node{
ID: "112e2243-ab62-4e8a-9317-63306972183c",
Node: "node-1",
Address: "10.0.0.1",
Datacenter: "dc1",
Partition: remoteEntMeta.PartitionOrEmpty(),
},
Service: &structs.NodeService{
Kind: "",
ID: "api-1",
Service: "api",
Port: 8080,
EnterpriseMeta: *remoteEntMeta,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-1-check",
Node: "node-1",
Status: api.HealthPassing,
EnterpriseMeta: *remoteEntMeta,
},
{
CheckID: "api-1-check",
ServiceID: "api-1",
ServiceName: "api",
Node: "node-1",
Status: api.HealthCritical,
EnterpriseMeta: *remoteEntMeta,
},
},
},
},
expect: structs.CheckServiceNodes{
{
Node: &structs.Node{
ID: "112e2243-ab62-4e8a-9317-63306972183c",
Node: "node-1",
Address: "10.0.0.1",
Datacenter: "dc1",
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: localPeerName,
},
Service: &structs.NodeService{
Kind: "",
ID: "api-1",
Service: "api",
Port: 8080,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-1-check",
Node: "node-1",
Status: api.HealthPassing,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
{
CheckID: "api-1-check",
ServiceID: "api-1",
ServiceName: "api",
Node: "node-1",
Status: api.HealthCritical,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
},
},
},
},
{
name: "upsert two instances on the same node",
msg: &pbpeering.ReplicationMessage_Response{
ResourceURL: pbpeering.TypeURLService,
ResourceID: "api",
Nonce: "2",
Operation: pbpeering.ReplicationMessage_Response_UPSERT,
},
input: structs.CheckServiceNodes{
{
Node: &structs.Node{
ID: "112e2243-ab62-4e8a-9317-63306972183c",
Node: "node-1",
Address: "10.0.0.1",
Datacenter: "dc1",
Partition: remoteEntMeta.PartitionOrEmpty(),
},
Service: &structs.NodeService{
Kind: "",
ID: "api-1",
Service: "api",
Port: 8080,
EnterpriseMeta: *remoteEntMeta,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-1-check",
Node: "node-1",
Status: api.HealthPassing,
EnterpriseMeta: *remoteEntMeta,
},
{
CheckID: "api-1-check",
ServiceID: "api-1",
ServiceName: "api",
Node: "node-1",
Status: api.HealthCritical,
EnterpriseMeta: *remoteEntMeta,
},
},
},
{
Node: &structs.Node{
ID: "112e2243-ab62-4e8a-9317-63306972183c",
Node: "node-1",
Address: "10.0.0.1",
Datacenter: "dc1",
Partition: remoteEntMeta.PartitionOrEmpty(),
},
Service: &structs.NodeService{
Kind: "",
ID: "api-2",
Service: "api",
Port: 9090,
EnterpriseMeta: *remoteEntMeta,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-1-check",
Node: "node-1",
Status: api.HealthPassing,
EnterpriseMeta: *remoteEntMeta,
},
{
CheckID: "api-2-check",
ServiceID: "api-2",
ServiceName: "api",
Node: "node-1",
Status: api.HealthWarning,
EnterpriseMeta: *remoteEntMeta,
},
},
},
},
expect: structs.CheckServiceNodes{
{
Node: &structs.Node{
ID: "112e2243-ab62-4e8a-9317-63306972183c",
Node: "node-1",
Address: "10.0.0.1",
Datacenter: "dc1",
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: localPeerName,
},
Service: &structs.NodeService{
Kind: "",
ID: "api-1",
Service: "api",
Port: 8080,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-1-check",
Node: "node-1",
Status: api.HealthPassing,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
{
CheckID: "api-1-check",
ServiceID: "api-1",
ServiceName: "api",
Node: "node-1",
Status: api.HealthCritical,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
},
},
{
Node: &structs.Node{
ID: "112e2243-ab62-4e8a-9317-63306972183c",
Node: "node-1",
Address: "10.0.0.1",
Datacenter: "dc1",
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: localPeerName,
},
Service: &structs.NodeService{
Kind: "",
ID: "api-2",
Service: "api",
Port: 9090,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-1-check",
Node: "node-1",
Status: api.HealthPassing,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
{
CheckID: "api-2-check",
ServiceID: "api-2",
ServiceName: "api",
Node: "node-1",
Status: api.HealthWarning,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
},
},
},
},
}
for _, tc := range tt {
runStep(t, tc.name, func(t *testing.T) {
run(t, tc)
})
}
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}
// newTestServer is copied from partition/service_test.go, with the addition of certs/cas. // newTestServer is copied from partition/service_test.go, with the addition of certs/cas.
// TODO(peering): these are endpoint tests and should live in the agent/consul // TODO(peering): these are endpoint tests and should live in the agent/consul
// package. Instead, these can be written around a mock client (see testing.go) // package. Instead, these can be written around a mock client (see testing.go)

View File

@ -32,16 +32,23 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) {
} }
run := func(t *testing.T, tc testCase) { run := func(t *testing.T, tc testCase) {
srv := NewService(testutil.Logger(t), nil) publisher := stream.NewEventPublisher(10 * time.Second)
client := newMockClient(context.Background()) store := newStateStore(t, publisher)
srv := NewService(testutil.Logger(t), &testStreamBackend{
store: store,
pub: publisher,
})
client := NewMockClient(context.Background())
errCh := make(chan error, 1) errCh := make(chan error, 1)
client.errCh = errCh client.ErrCh = errCh
go func() { go func() {
// Pass errors from server handler into errCh so that they can be seen by the client on Recv(). // Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
// This matches gRPC's behavior when an error is returned by a server. // This matches gRPC's behavior when an error is returned by a server.
err := srv.StreamResources(client.replicationStream) err := srv.StreamResources(client.ReplicationStream)
if err != nil { if err != nil {
errCh <- err errCh <- err
} }
@ -103,6 +110,18 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) {
}, },
wantErr: status.Error(codes.InvalidArgument, "subscription request to unknown resource URL: nomad.Job"), wantErr: status.Error(codes.InvalidArgument, "subscription request to unknown resource URL: nomad.Job"),
}, },
{
name: "unknown peer",
input: &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
PeerID: "63b60245-c475-426b-b314-4588d210859d",
ResourceURL: pbpeering.TypeURLService,
},
},
},
wantErr: status.Error(codes.InvalidArgument, "initial subscription for unknown PeerID: 63b60245-c475-426b-b314-4588d210859d"),
},
} }
for _, tc := range tt { for _, tc := range tt {
@ -127,21 +146,30 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
} }
srv.streams.timeNow = it.Now srv.streams.timeNow = it.Now
client := newMockClient(context.Background()) client := NewMockClient(context.Background())
errCh := make(chan error, 1) errCh := make(chan error, 1)
client.errCh = errCh client.ErrCh = errCh
go func() { go func() {
// Pass errors from server handler into errCh so that they can be seen by the client on Recv(). // Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
// This matches gRPC's behavior when an error is returned by a server. // This matches gRPC's behavior when an error is returned by a server.
if err := srv.StreamResources(client.replicationStream); err != nil { if err := srv.StreamResources(client.ReplicationStream); err != nil {
errCh <- err errCh <- err
} }
}() }()
peering := pbpeering.Peering{
Name: "my-peer",
}
require.NoError(t, store.PeeringWrite(0, &peering))
_, p, err := store.PeeringRead(nil, state.Query{Value: "my-peer"})
require.NoError(t, err)
// Receive a subscription from a peer // Receive a subscription from a peer
peerID := "63b60245-c475-426b-b314-4588d210859d" peerID := p.ID
sub := &pbpeering.ReplicationMessage{ sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{ Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{ Request: &pbpeering.ReplicationMessage_Request{
@ -150,7 +178,7 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
}, },
}, },
} }
err := client.Send(sub) err = client.Send(sub)
require.NoError(t, err) require.NoError(t, err)
testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) { testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) {
@ -209,14 +237,23 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
} }
srv.streams.timeNow = it.Now srv.streams.timeNow = it.Now
client := newMockClient(context.Background()) client := NewMockClient(context.Background())
errCh := make(chan error, 1) errCh := make(chan error, 1)
go func() { go func() {
errCh <- srv.StreamResources(client.replicationStream) errCh <- srv.StreamResources(client.ReplicationStream)
}() }()
peerID := "63b60245-c475-426b-b314-4588d210859d" peering := pbpeering.Peering{
Name: "my-peer",
}
require.NoError(t, store.PeeringWrite(0, &peering))
_, p, err := store.PeeringRead(nil, state.Query{Value: "my-peer"})
require.NoError(t, err)
peerID := p.ID
sub := &pbpeering.ReplicationMessage{ sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{ Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{ Request: &pbpeering.ReplicationMessage_Request{
@ -225,7 +262,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}, },
}, },
} }
err := client.Send(sub) err = client.Send(sub)
require.NoError(t, err) require.NoError(t, err)
testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) { testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) {
@ -483,15 +520,15 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
pub: publisher, pub: publisher,
}) })
client := newMockClient(context.Background()) client := NewMockClient(context.Background())
errCh := make(chan error, 1) errCh := make(chan error, 1)
client.errCh = errCh client.ErrCh = errCh
go func() { go func() {
// Pass errors from server handler into errCh so that they can be seen by the client on Recv(). // Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
// This matches gRPC's behavior when an error is returned by a server. // This matches gRPC's behavior when an error is returned by a server.
if err := srv.StreamResources(client.replicationStream); err != nil { if err := srv.StreamResources(client.ReplicationStream); err != nil {
errCh <- err errCh <- err
} }
}() }()
@ -683,7 +720,7 @@ func (b *testStreamBackend) Apply() Apply {
return nil return nil
} }
func Test_processResponse(t *testing.T) { func Test_processResponse_Validation(t *testing.T) {
type testCase struct { type testCase struct {
name string name string
in *pbpeering.ReplicationMessage_Response in *pbpeering.ReplicationMessage_Response
@ -691,8 +728,15 @@ func Test_processResponse(t *testing.T) {
wantErr bool wantErr bool
} }
publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher)
srv := NewService(testutil.Logger(t), &testStreamBackend{
store: store,
pub: publisher,
})
run := func(t *testing.T, tc testCase) { run := func(t *testing.T, tc testCase) {
reply, err := processResponse(tc.in) reply, err := srv.processResponse("", "", tc.in)
if tc.wantErr { if tc.wantErr {
require.Error(t, err) require.Error(t, err)
} else { } else {

View File

@ -47,6 +47,8 @@ func (e *exportedServiceRequest) CacheInfo() cache.RequestInfo {
// NewMaterializer implements submatview.Request // NewMaterializer implements submatview.Request
func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, error) { func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, error) {
reqFn := func(index uint64) *pbsubscribe.SubscribeRequest { reqFn := func(index uint64) *pbsubscribe.SubscribeRequest {
// TODO(peering): We need to be able to receive both connect proxies and typical service instances for a given name.
// Using the Topic_ServiceHealth will ignore proxies unless the ServiceName is a proxy name.
r := &pbsubscribe.SubscribeRequest{ r := &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: e.req.ServiceName, Key: e.req.ServiceName,

View File

@ -75,52 +75,52 @@ func TestPeeringToken(peerID string) structs.PeeringToken {
} }
} }
type mockClient struct { type MockClient struct {
mu sync.Mutex mu sync.Mutex
errCh chan error
replicationStream *mockStream ErrCh chan error
ReplicationStream *MockStream
} }
func (c *mockClient) Send(r *pbpeering.ReplicationMessage) error { func (c *MockClient) Send(r *pbpeering.ReplicationMessage) error {
c.replicationStream.recvCh <- r c.ReplicationStream.recvCh <- r
return nil return nil
} }
func (c *mockClient) Recv() (*pbpeering.ReplicationMessage, error) { func (c *MockClient) Recv() (*pbpeering.ReplicationMessage, error) {
select { select {
case err := <-c.errCh: case err := <-c.ErrCh:
return nil, err return nil, err
case r := <-c.replicationStream.sendCh: case r := <-c.ReplicationStream.sendCh:
return r, nil return r, nil
case <-time.After(10 * time.Millisecond): case <-time.After(10 * time.Millisecond):
return nil, io.EOF return nil, io.EOF
} }
} }
func (c *mockClient) RecvWithTimeout(dur time.Duration) (*pbpeering.ReplicationMessage, error) { func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeering.ReplicationMessage, error) {
select { select {
case err := <-c.errCh: case err := <-c.ErrCh:
return nil, err return nil, err
case r := <-c.replicationStream.sendCh: case r := <-c.ReplicationStream.sendCh:
return r, nil return r, nil
case <-time.After(dur): case <-time.After(dur):
return nil, io.EOF return nil, io.EOF
} }
} }
func (c *mockClient) Close() { func (c *MockClient) Close() {
close(c.replicationStream.recvCh) close(c.ReplicationStream.recvCh)
} }
func newMockClient(ctx context.Context) *mockClient { func NewMockClient(ctx context.Context) *MockClient {
return &mockClient{ return &MockClient{
replicationStream: newTestReplicationStream(ctx), ReplicationStream: newTestReplicationStream(ctx),
} }
} }
// mockStream mocks peering.PeeringService_StreamResourcesServer // MockStream mocks peering.PeeringService_StreamResourcesServer
type mockStream struct { type MockStream struct {
sendCh chan *pbpeering.ReplicationMessage sendCh chan *pbpeering.ReplicationMessage
recvCh chan *pbpeering.ReplicationMessage recvCh chan *pbpeering.ReplicationMessage
@ -128,10 +128,10 @@ type mockStream struct {
mu sync.Mutex mu sync.Mutex
} }
var _ pbpeering.PeeringService_StreamResourcesServer = (*mockStream)(nil) var _ pbpeering.PeeringService_StreamResourcesServer = (*MockStream)(nil)
func newTestReplicationStream(ctx context.Context) *mockStream { func newTestReplicationStream(ctx context.Context) *MockStream {
return &mockStream{ return &MockStream{
sendCh: make(chan *pbpeering.ReplicationMessage, 1), sendCh: make(chan *pbpeering.ReplicationMessage, 1),
recvCh: make(chan *pbpeering.ReplicationMessage, 1), recvCh: make(chan *pbpeering.ReplicationMessage, 1),
ctx: ctx, ctx: ctx,
@ -139,13 +139,13 @@ func newTestReplicationStream(ctx context.Context) *mockStream {
} }
// Send implements pbpeering.PeeringService_StreamResourcesServer // Send implements pbpeering.PeeringService_StreamResourcesServer
func (s *mockStream) Send(r *pbpeering.ReplicationMessage) error { func (s *MockStream) Send(r *pbpeering.ReplicationMessage) error {
s.sendCh <- r s.sendCh <- r
return nil return nil
} }
// Recv implements pbpeering.PeeringService_StreamResourcesServer // Recv implements pbpeering.PeeringService_StreamResourcesServer
func (s *mockStream) Recv() (*pbpeering.ReplicationMessage, error) { func (s *MockStream) Recv() (*pbpeering.ReplicationMessage, error) {
r := <-s.recvCh r := <-s.recvCh
if r == nil { if r == nil {
return nil, io.EOF return nil, io.EOF
@ -154,32 +154,32 @@ func (s *mockStream) Recv() (*pbpeering.ReplicationMessage, error) {
} }
// Context implements grpc.ServerStream and grpc.ClientStream // Context implements grpc.ServerStream and grpc.ClientStream
func (s *mockStream) Context() context.Context { func (s *MockStream) Context() context.Context {
return s.ctx return s.ctx
} }
// SendMsg implements grpc.ServerStream and grpc.ClientStream // SendMsg implements grpc.ServerStream and grpc.ClientStream
func (s *mockStream) SendMsg(m interface{}) error { func (s *MockStream) SendMsg(m interface{}) error {
return nil return nil
} }
// RecvMsg implements grpc.ServerStream and grpc.ClientStream // RecvMsg implements grpc.ServerStream and grpc.ClientStream
func (s *mockStream) RecvMsg(m interface{}) error { func (s *MockStream) RecvMsg(m interface{}) error {
return nil return nil
} }
// SetHeader implements grpc.ServerStream // SetHeader implements grpc.ServerStream
func (s *mockStream) SetHeader(metadata.MD) error { func (s *MockStream) SetHeader(metadata.MD) error {
return nil return nil
} }
// SendHeader implements grpc.ServerStream // SendHeader implements grpc.ServerStream
func (s *mockStream) SendHeader(metadata.MD) error { func (s *MockStream) SendHeader(metadata.MD) error {
return nil return nil
} }
// SetTrailer implements grpc.ServerStream // SetTrailer implements grpc.ServerStream
func (s *mockStream) SetTrailer(metadata.MD) {} func (s *MockStream) SetTrailer(metadata.MD) {}
type incrementalTime struct { type incrementalTime struct {
base time.Time base time.Time

View File

@ -855,6 +855,20 @@ func (n *Node) BestAddress(wan bool) string {
return n.Address return n.Address
} }
func (n *Node) ToRegisterRequest() RegisterRequest {
return RegisterRequest{
ID: n.ID,
Node: n.Node,
Datacenter: n.Datacenter,
Address: n.Address,
TaggedAddresses: n.TaggedAddresses,
NodeMeta: n.Meta,
RaftIndex: n.RaftIndex,
EnterpriseMeta: *n.GetEnterpriseMeta(),
PeerName: n.PeerName,
}
}
type Nodes []*Node type Nodes []*Node
// IsSame return whether nodes are similar without taking into account // IsSame return whether nodes are similar without taking into account