peering: track imported services (#13718)

This commit is contained in:
alex 2022-07-15 10:20:43 -07:00 committed by GitHub
parent d9643ca499
commit 70ad4804b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 422 additions and 43 deletions

View File

@ -17,6 +17,7 @@ import (
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types"
)
func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) {
@ -309,11 +310,6 @@ func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastId
Node: "aaa",
PeerName: peer,
},
{
CheckID: structs.SerfCheckID,
Node: "aaa",
PeerName: peer,
},
},
}))
@ -336,11 +332,6 @@ func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastId
Node: "bbb",
PeerName: peer,
},
{
CheckID: structs.SerfCheckID,
Node: "bbb",
PeerName: peer,
},
},
}))
@ -363,13 +354,269 @@ func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastId
Node: "ccc",
PeerName: peer,
},
{
CheckID: structs.SerfCheckID,
Node: "ccc",
PeerName: peer,
},
},
}))
return lastIdx
}
// TODO(peering): once we move away from leader only request for PeeringList, move this test to consul/server_test maybe
func TestLeader_Peering_ImportedServicesCount(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
// TODO(peering): Configure with TLS
_, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "s1.dc1"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
})
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Create a peering by generating a token
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
t.Cleanup(cancel)
conn, err := grpc.DialContext(ctx, s1.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
peeringClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-s2",
}
resp, err := peeringClient.GenerateToken(ctx, &req)
require.NoError(t, err)
tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
require.NoError(t, err)
var token structs.PeeringToken
require.NoError(t, json.Unmarshal(tokenJSON, &token))
var (
s2PeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d"
lastIdx = uint64(0)
)
// Bring up s2 and store s1's token so that it attempts to dial.
_, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "s2.dc2"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
})
testrpc.WaitForLeader(t, s2.RPC, "dc2")
// Simulate a peering initiation event by writing a peering with data from a peering token.
// Eventually the leader in dc2 should dial and connect to the leader in dc1.
p := &pbpeering.Peering{
ID: s2PeerID,
Name: "my-peer-s1",
PeerID: token.PeerID,
PeerCAPems: token.CA,
PeerServerName: token.ServerName,
PeerServerAddresses: token.ServerAddresses,
}
require.True(t, p.ShouldDial())
lastIdx++
require.NoError(t, s2.fsm.State().PeeringWrite(lastIdx, p))
/// add services to S1 to be synced to S2
lastIdx++
require.NoError(t, s1.FSM().State().EnsureRegistration(lastIdx, &structs.RegisterRequest{
ID: types.NodeID(generateUUID()),
Node: "aaa",
Address: "10.0.0.1",
Service: &structs.NodeService{
Service: "a-service",
ID: "a-service-1",
Port: 8080,
},
Checks: structs.HealthChecks{
{
CheckID: "a-service-1-check",
ServiceName: "a-service",
ServiceID: "a-service-1",
Node: "aaa",
},
},
}))
lastIdx++
require.NoError(t, s1.FSM().State().EnsureRegistration(lastIdx, &structs.RegisterRequest{
ID: types.NodeID(generateUUID()),
Node: "bbb",
Address: "10.0.0.2",
Service: &structs.NodeService{
Service: "b-service",
ID: "b-service-1",
Port: 8080,
},
Checks: structs.HealthChecks{
{
CheckID: "b-service-1-check",
ServiceName: "b-service",
ServiceID: "b-service-1",
Node: "bbb",
},
},
}))
lastIdx++
require.NoError(t, s1.FSM().State().EnsureRegistration(lastIdx, &structs.RegisterRequest{
ID: types.NodeID(generateUUID()),
Node: "ccc",
Address: "10.0.0.3",
Service: &structs.NodeService{
Service: "c-service",
ID: "c-service-1",
Port: 8080,
},
Checks: structs.HealthChecks{
{
CheckID: "c-service-1-check",
ServiceName: "c-service",
ServiceID: "c-service-1",
Node: "ccc",
},
},
}))
/// finished adding services
type testCase struct {
name string
description string
exportedService structs.ExportedServicesConfigEntry
expectedImportedServicesCount uint64
}
testCases := []testCase{
{
name: "wildcard",
description: "for a wildcard exported services, we want to see all services synced",
exportedService: structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: structs.WildcardSpecifier,
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peer-s2",
},
},
},
},
},
expectedImportedServicesCount: 4, // 3 services from above + the "consul" service
},
{
name: "no sync",
description: "update the config entry to allow no service sync",
exportedService: structs.ExportedServicesConfigEntry{
Name: "default",
},
expectedImportedServicesCount: 0, // we want to see this decremented from 4 --> 0
},
{
name: "just a, b services",
description: "export just two services",
exportedService: structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "a-service",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peer-s2",
},
},
},
{
Name: "b-service",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peer-s2",
},
},
},
},
},
expectedImportedServicesCount: 2,
},
{
name: "unexport b service",
description: "by unexporting b we want to see the count decrement eventually",
exportedService: structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "a-service",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peer-s2",
},
},
},
},
},
expectedImportedServicesCount: 1,
},
{
name: "export c service",
description: "now export the c service and expect the count to increment",
exportedService: structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "a-service",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peer-s2",
},
},
},
{
Name: "c-service",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peer-s2",
},
},
},
},
},
expectedImportedServicesCount: 2,
},
}
conn2, err := grpc.DialContext(ctx, s2.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(s2.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn2.Close()
peeringClient2 := pbpeering.NewPeeringServiceClient(conn2)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
lastIdx++
require.NoError(t, s1.fsm.State().EnsureConfigEntry(lastIdx, &tc.exportedService))
retry.Run(t, func(r *retry.R) {
resp2, err := peeringClient2.PeeringList(ctx, &pbpeering.PeeringListRequest{})
require.NoError(r, err)
require.NotEmpty(r, resp2.Peerings)
require.Equal(r, tc.expectedImportedServicesCount, resp2.Peerings[0].ImportedServiceCount)
})
})
}
}

View File

@ -113,7 +113,9 @@ func marshalToProtoAny[T proto.Message](in any) (*anypb.Any, T, error) {
func (s *Server) processResponse(
peerName string,
partition string,
mutableStatus *MutableStatus,
resp *pbpeerstream.ReplicationMessage_Response,
logger hclog.Logger,
) (*pbpeerstream.ReplicationMessage, error) {
if !pbpeerstream.KnownTypeURL(resp.ResourceURL) {
err := fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL)
@ -137,7 +139,7 @@ func (s *Server) processResponse(
), err
}
if err := s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource); err != nil {
if err := s.handleUpsert(peerName, partition, mutableStatus, resp.ResourceURL, resp.ResourceID, resp.Resource, logger); err != nil {
return makeNACKReply(
resp.ResourceURL,
resp.Nonce,
@ -149,7 +151,7 @@ func (s *Server) processResponse(
return makeACKReply(resp.ResourceURL, resp.Nonce), nil
case pbpeerstream.Operation_OPERATION_DELETE:
if err := s.handleDelete(peerName, partition, resp.ResourceURL, resp.ResourceID); err != nil {
if err := s.handleDelete(peerName, partition, mutableStatus, resp.ResourceURL, resp.ResourceID, logger); err != nil {
return makeNACKReply(
resp.ResourceURL,
resp.Nonce,
@ -178,9 +180,11 @@ func (s *Server) processResponse(
func (s *Server) handleUpsert(
peerName string,
partition string,
mutableStatus *MutableStatus,
resourceURL string,
resourceID string,
resource *anypb.Any,
logger hclog.Logger,
) error {
switch resourceURL {
case pbpeerstream.TypeURLService:
@ -192,7 +196,16 @@ func (s *Server) handleUpsert(
return fmt.Errorf("failed to unmarshal resource: %w", err)
}
return s.handleUpdateService(peerName, partition, sn, csn)
err := s.handleUpdateService(peerName, partition, sn, csn)
if err != nil {
logger.Error("did not increment imported services count", "service_name", sn.String(), "error", err)
return err
}
logger.Trace("incrementing imported services count", "service_name", sn.String())
mutableStatus.TrackImportedService(sn)
return nil
case pbpeerstream.TypeURLRoots:
roots := &pbpeering.PeeringTrustBundle{}
@ -425,14 +438,26 @@ func (s *Server) handleUpsertRoots(
func (s *Server) handleDelete(
peerName string,
partition string,
mutableStatus *MutableStatus,
resourceURL string,
resourceID string,
logger hclog.Logger,
) error {
switch resourceURL {
case pbpeerstream.TypeURLService:
sn := structs.ServiceNameFromString(resourceID)
sn.OverridePartition(partition)
return s.handleUpdateService(peerName, partition, sn, nil)
err := s.handleUpdateService(peerName, partition, sn, nil)
if err != nil {
logger.Error("did not decrement imported services count", "service_name", sn.String(), "error", err)
return err
}
logger.Trace("decrementing imported services count", "service_name", sn.String())
mutableStatus.RemoveImportedService(sn)
return nil
default:
return fmt.Errorf("unexpected resourceURL: %s", resourceURL)

View File

@ -302,7 +302,7 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
if resp := msg.GetResponse(); resp != nil {
// TODO(peering): Ensure there's a nonce
reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, resp)
reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, status, resp, logger)
if err != nil {
logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID)
status.TrackReceiveError(err.Error())

View File

@ -475,6 +475,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
LastNack: lastNack,
LastNackMessage: lastNackMsg,
LastReceiveSuccess: lastRecvSuccess,
ImportedServices: map[string]struct{}{"api": {}},
}
retry.Run(t, func(r *retry.R) {
@ -532,6 +533,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
LastReceiveSuccess: lastRecvSuccess,
LastReceiveError: lastRecvError,
LastReceiveErrorMessage: lastRecvErrorMsg,
ImportedServices: map[string]struct{}{"api": {}},
}
retry.Run(t, func(r *retry.R) {
@ -559,6 +561,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
LastReceiveSuccess: lastRecvSuccess,
LastReceiveErrorMessage: io.EOF.Error(),
LastReceiveError: lastRecvError,
ImportedServices: map[string]struct{}{"api": {}},
}
retry.Run(t, func(r *retry.R) {
@ -968,6 +971,9 @@ func (b *testStreamBackend) CatalogDeregister(req *structs.DeregisterRequest) er
}
func Test_processResponse_Validation(t *testing.T) {
peerName := "billing"
peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5"
type testCase struct {
name string
in *pbpeerstream.ReplicationMessage_Response
@ -975,10 +981,18 @@ func Test_processResponse_Validation(t *testing.T) {
wantErr bool
}
srv, _ := newTestServer(t, nil)
srv, store := newTestServer(t, nil)
require.NoError(t, store.PeeringWrite(31, &pbpeering.Peering{
ID: peerID,
Name: peerName},
))
// connect the stream
mst, err := srv.Tracker.Connected(peerID)
require.NoError(t, err)
run := func(t *testing.T, tc testCase) {
reply, err := srv.processResponse("", "", tc.in)
reply, err := srv.processResponse(peerName, "", mst, tc.in, srv.Logger)
if tc.wantErr {
require.Error(t, err)
} else {
@ -1218,8 +1232,8 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test
}
}
func TestHandleUpdateService(t *testing.T) {
srv, _ := newTestServer(t, func(c *Config) {
func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
srv, store := newTestServer(t, func(c *Config) {
backend := c.Backend.(*testStreamBackend)
backend.leader = func() bool {
return false
@ -1227,13 +1241,15 @@ func TestHandleUpdateService(t *testing.T) {
})
type testCase struct {
name string
seed []*structs.RegisterRequest
input *pbservice.IndexedCheckServiceNodes
expect map[string]structs.CheckServiceNodes
name string
seed []*structs.RegisterRequest
input *pbservice.IndexedCheckServiceNodes
expect map[string]structs.CheckServiceNodes
expectedImportedServicesCount int
}
peerName := "billing"
peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5"
remoteMeta := pbcommon.NewEnterpriseMetaFromStructs(*structs.DefaultEnterpriseMetaInPartition("billing-ap"))
// "api" service is imported from the billing-ap partition, corresponding to the billing peer.
@ -1241,14 +1257,43 @@ func TestHandleUpdateService(t *testing.T) {
defaultMeta := *acl.DefaultEnterpriseMeta()
apiSN := structs.NewServiceName("api", &defaultMeta)
// create a peering in the state store
require.NoError(t, store.PeeringWrite(31, &pbpeering.Peering{
ID: peerID,
Name: peerName},
))
// connect the stream
mst, err := srv.Tracker.Connected(peerID)
require.NoError(t, err)
run := func(t *testing.T, tc testCase) {
// Seed the local catalog with some data to reconcile against.
// and increment the tracker's imported services count
for _, reg := range tc.seed {
require.NoError(t, srv.Backend.CatalogRegister(reg))
mst.TrackImportedService(reg.Service.CompoundServiceName())
}
var op pbpeerstream.Operation
if len(tc.input.Nodes) == 0 {
op = pbpeerstream.Operation_OPERATION_DELETE
} else {
op = pbpeerstream.Operation_OPERATION_UPSERT
}
in := &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService,
ResourceID: apiSN.String(),
Nonce: "1",
Operation: op,
Resource: makeAnyPB(t, tc.input),
}
// Simulate an update arriving for billing/api.
require.NoError(t, srv.handleUpdateService(peerName, acl.DefaultPartitionName, apiSN, tc.input))
_, err = srv.processResponse(peerName, acl.DefaultPartitionName, mst, in, srv.Logger)
require.NoError(t, err)
for svc, expect := range tc.expect {
t.Run(svc, func(t *testing.T) {
@ -1257,6 +1302,9 @@ func TestHandleUpdateService(t *testing.T) {
requireEqualInstances(t, expect, got)
})
}
// assert the imported services count modifications
require.Equal(t, tc.expectedImportedServicesCount, mst.GetImportedServicesCount())
}
tt := []testCase{
@ -1390,6 +1438,7 @@ func TestHandleUpdateService(t *testing.T) {
},
},
},
expectedImportedServicesCount: 1,
},
{
name: "upsert two service instances to different nodes",
@ -1521,6 +1570,7 @@ func TestHandleUpdateService(t *testing.T) {
},
},
},
expectedImportedServicesCount: 1,
},
{
name: "receiving a nil input leads to deleting data in the catalog",
@ -1574,10 +1624,11 @@ func TestHandleUpdateService(t *testing.T) {
},
},
},
input: nil,
input: &pbservice.IndexedCheckServiceNodes{},
expect: map[string]structs.CheckServiceNodes{
"api": {},
},
expectedImportedServicesCount: 0,
},
{
name: "deleting one service name from a node does not delete other service names",
@ -1632,7 +1683,7 @@ func TestHandleUpdateService(t *testing.T) {
},
},
// Nil input is for the "api" service.
input: nil,
input: &pbservice.IndexedCheckServiceNodes{},
expect: map[string]structs.CheckServiceNodes{
"api": {},
// Existing redis service was not affected by deletion.
@ -1668,6 +1719,7 @@ func TestHandleUpdateService(t *testing.T) {
},
},
},
expectedImportedServicesCount: 1,
},
{
name: "service checks are cleaned up when not present in a response",
@ -1738,6 +1790,7 @@ func TestHandleUpdateService(t *testing.T) {
},
},
},
expectedImportedServicesCount: 2,
},
{
name: "node checks are cleaned up when not present in a response",
@ -1872,6 +1925,7 @@ func TestHandleUpdateService(t *testing.T) {
},
},
},
expectedImportedServicesCount: 2,
},
{
name: "replacing a service instance on a node cleans up the old instance",
@ -2019,6 +2073,7 @@ func TestHandleUpdateService(t *testing.T) {
},
},
},
expectedImportedServicesCount: 2,
},
}

View File

@ -4,9 +4,11 @@ import (
"fmt"
"sync"
"time"
"github.com/hashicorp/consul/agent/structs"
)
// Tracker contains a map of (PeerID -> Status).
// Tracker contains a map of (PeerID -> MutableStatus).
// As streams are opened and closed we track details about their status.
type Tracker struct {
mu sync.RWMutex
@ -142,6 +144,10 @@ type Status struct {
// - The error message when we failed to store a resource replicated FROM the peer.
// - The last error message when receiving from the stream.
LastReceiveErrorMessage string
// TODO(peering): consider keeping track of imported service counts thru raft
// ImportedServices is set that keeps track of which service names are imported for the peer
ImportedServices map[string]struct{}
}
func newMutableStatus(now func() time.Time) *MutableStatus {
@ -222,3 +228,28 @@ func (s *MutableStatus) GetStatus() Status {
return copy
}
func (s *MutableStatus) RemoveImportedService(sn structs.ServiceName) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.ImportedServices, sn.String())
}
func (s *MutableStatus) TrackImportedService(sn structs.ServiceName) {
s.mu.Lock()
defer s.mu.Unlock()
if s.ImportedServices == nil {
s.ImportedServices = make(map[string]struct{})
}
s.ImportedServices[sn.String()] = struct{}{}
}
func (s *MutableStatus) GetImportedServicesCount() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.ImportedServices)
}

View File

@ -337,7 +337,17 @@ func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequ
if peering == nil {
return &pbpeering.PeeringReadResponse{Peering: nil}, nil
}
cp := copyPeeringWithNewState(peering, s.reconciledStreamStateHint(peering.ID, peering.State))
// add imported services count
st, found := s.Tracker.StreamStatus(peering.ID)
if !found {
s.Logger.Trace("did not find peer in stream tracker when reading peer", "peerID", peering.ID)
} else {
cp.ImportedServiceCount = uint64(len(st.ImportedServices))
}
return &pbpeering.PeeringReadResponse{Peering: cp}, nil
}
@ -369,6 +379,15 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ
var cPeerings []*pbpeering.Peering
for _, p := range peerings {
cp := copyPeeringWithNewState(p, s.reconciledStreamStateHint(p.ID, p.State))
// add imported services count
st, found := s.Tracker.StreamStatus(p.ID)
if !found {
s.Logger.Trace("did not find peer in stream tracker when listing peers", "peerID", p.ID)
} else {
cp.ImportedServiceCount = uint64(len(st.ImportedServices))
}
cPeerings = append(cPeerings, cp)
}
return &pbpeering.PeeringListResponse{Peerings: cPeerings}, nil
@ -586,17 +605,19 @@ func (s *Server) getExistingOrCreateNewPeerID(peerName, partition string) (strin
func copyPeeringWithNewState(p *pbpeering.Peering, state pbpeering.PeeringState) *pbpeering.Peering {
return &pbpeering.Peering{
ID: p.ID,
Name: p.Name,
Partition: p.Partition,
DeletedAt: p.DeletedAt,
Meta: p.Meta,
PeerID: p.PeerID,
PeerCAPems: p.PeerCAPems,
PeerServerAddresses: p.PeerServerAddresses,
PeerServerName: p.PeerServerName,
CreateIndex: p.CreateIndex,
ModifyIndex: p.ModifyIndex,
ID: p.ID,
Name: p.Name,
Partition: p.Partition,
DeletedAt: p.DeletedAt,
Meta: p.Meta,
PeerID: p.PeerID,
PeerCAPems: p.PeerCAPems,
PeerServerAddresses: p.PeerServerAddresses,
PeerServerName: p.PeerServerName,
CreateIndex: p.CreateIndex,
ModifyIndex: p.ModifyIndex,
ImportedServiceCount: p.ImportedServiceCount,
ExportedServiceCount: p.ExportedServiceCount,
State: state,
}