peering: refactor reconcile, cleanup (#13795)

Signed-off-by: acpana <8968914+acpana@users.noreply.github.com>
This commit is contained in:
alex 2022-07-19 11:43:29 -07:00 committed by GitHub
parent 6b653fb827
commit 64b3705a31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 59 additions and 71 deletions

View File

@ -750,7 +750,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
name string
description string
exportedService structs.ExportedServicesConfigEntry
expectedImportedExportedServicesCount uint64 // same count for a server that imports the services form a server that exports them
expectedImportedServsCount uint64
expectedExportedServsCount uint64
}
testCases := []testCase{
@ -770,7 +771,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
},
},
},
expectedImportedExportedServicesCount: 4, // 3 services from above + the "consul" service
expectedImportedServsCount: 4, // 3 services from above + the "consul" service
expectedExportedServsCount: 4, // 3 services from above + the "consul" service
},
{
name: "no sync",
@ -778,7 +780,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
exportedService: structs.ExportedServicesConfigEntry{
Name: "default",
},
expectedImportedExportedServicesCount: 0, // we want to see this decremented from 4 --> 0
expectedImportedServsCount: 0, // we want to see this decremented from 4 --> 0
expectedExportedServsCount: 0, // we want to see this decremented from 4 --> 0
},
{
name: "just a, b services",
@ -804,7 +807,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
},
},
},
expectedImportedExportedServicesCount: 2,
expectedImportedServsCount: 2,
expectedExportedServsCount: 2,
},
{
name: "unexport b service",
@ -822,7 +826,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
},
},
},
expectedImportedExportedServicesCount: 1,
expectedImportedServsCount: 1,
expectedExportedServsCount: 1,
},
{
name: "export c service",
@ -848,7 +853,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
},
},
},
expectedImportedExportedServicesCount: 2,
expectedImportedServsCount: 2,
expectedExportedServsCount: 2,
},
}
@ -872,13 +878,13 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
resp, err := peeringClient2.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s1"})
require.NoError(r, err)
require.NotNil(r, resp.Peering)
require.Equal(r, tc.expectedImportedExportedServicesCount, resp.Peering.ImportedServiceCount)
require.Equal(r, tc.expectedImportedServsCount, resp.Peering.ImportedServiceCount)
// on List
resp2, err2 := peeringClient2.PeeringList(ctx, &pbpeering.PeeringListRequest{})
require.NoError(r, err2)
require.NotEmpty(r, resp2.Peerings)
require.Equal(r, tc.expectedImportedExportedServicesCount, resp2.Peerings[0].ImportedServiceCount)
require.Equal(r, tc.expectedExportedServsCount, resp2.Peerings[0].ImportedServiceCount)
})
// Check that exported services count on S1 are what we expect
@ -887,13 +893,13 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
resp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s2"})
require.NoError(r, err)
require.NotNil(r, resp.Peering)
require.Equal(r, tc.expectedImportedExportedServicesCount, resp.Peering.ExportedServiceCount)
require.Equal(r, tc.expectedImportedServsCount, resp.Peering.ExportedServiceCount)
// on List
resp2, err2 := peeringClient.PeeringList(ctx, &pbpeering.PeeringListRequest{})
require.NoError(r, err2)
require.NotEmpty(r, resp2.Peerings)
require.Equal(r, tc.expectedImportedExportedServicesCount, resp2.Peerings[0].ExportedServiceCount)
require.Equal(r, tc.expectedExportedServsCount, resp2.Peerings[0].ExportedServiceCount)
})
})
}

View File

@ -43,7 +43,6 @@ func makeServiceResponse(
sn := structs.ServiceNameFromString(serviceName)
csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
if !ok {
logger.Error("did not increment or decrement exported services count", "service_name", serviceName)
return nil, fmt.Errorf("invalid type for service response: %T", update.Result)
}
@ -63,7 +62,6 @@ func makeServiceResponse(
// We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that.
// Case #1 is a no-op for the importing peer.
if len(csn.Nodes) == 0 {
logger.Trace("decrementing exported services count", "service_name", sn.String())
mst.RemoveExportedService(sn)
return &pbpeerstream.ReplicationMessage_Response{
@ -75,7 +73,6 @@ func makeServiceResponse(
}, nil
}
logger.Trace("incrementing exported services count", "service_name", sn.String())
mst.TrackExportedService(sn)
// If there are nodes in the response, we push them as an UPSERT operation.
@ -220,7 +217,6 @@ func (s *Server) handleUpsert(
return fmt.Errorf("did not increment imported services count for service=%q: %w", sn.String(), err)
}
logger.Trace("incrementing imported services count", "service_name", sn.String())
mutableStatus.TrackImportedService(sn)
return nil
@ -468,11 +464,9 @@ func (s *Server) handleDelete(
err := s.handleUpdateService(peerName, partition, sn, nil)
if err != nil {
logger.Error("did not decrement imported services count", "service_name", sn.String(), "error", err)
return err
}
logger.Trace("decrementing imported services count", "service_name", sn.String())
mutableStatus.RemoveImportedService(sn)
return nil

View File

@ -173,6 +173,14 @@ type Status struct {
ExportedServices map[string]struct{}
}
func (s *Status) GetImportedServicesCount() uint64 {
return uint64(len(s.ImportedServices))
}
func (s *Status) GetExportedServicesCount() uint64 {
return uint64(len(s.ExportedServices))
}
func newMutableStatus(now func() time.Time, connected bool) *MutableStatus {
return &MutableStatus{
Status: Status{

View File

@ -13,6 +13,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
@ -338,17 +339,7 @@ func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequ
return &pbpeering.PeeringReadResponse{Peering: nil}, nil
}
cp := copyPeeringWithNewState(peering, s.reconciledStreamStateHint(peering.ID, peering.State))
// add imported services count
st, found := s.Tracker.StreamStatus(peering.ID)
if !found {
s.Logger.Trace("did not find peer in stream tracker when reading peer", "peerID", peering.ID)
} else {
cp.ImportedServiceCount = uint64(len(st.ImportedServices))
cp.ExportedServiceCount = uint64(len(st.ExportedServices))
}
cp := s.reconcilePeering(peering)
return &pbpeering.PeeringReadResponse{Peering: cp}, nil
}
@ -379,34 +370,38 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ
// reconcile the actual peering state; need to copy over the ds for peering
var cPeerings []*pbpeering.Peering
for _, p := range peerings {
cp := copyPeeringWithNewState(p, s.reconciledStreamStateHint(p.ID, p.State))
// add imported services count
st, found := s.Tracker.StreamStatus(p.ID)
if !found {
s.Logger.Trace("did not find peer in stream tracker when listing peers", "peerID", p.ID)
} else {
cp.ImportedServiceCount = uint64(len(st.ImportedServices))
cp.ExportedServiceCount = uint64(len(st.ExportedServices))
}
cp := s.reconcilePeering(p)
cPeerings = append(cPeerings, cp)
}
return &pbpeering.PeeringListResponse{Peerings: cPeerings}, nil
}
// TODO(peering): Maybe get rid of this when actually monitoring the stream health
// reconciledStreamStateHint peaks into the streamTracker and determines whether a peering should be marked
// as PeeringState.Active or not
func (s *Server) reconciledStreamStateHint(pID string, pState pbpeering.PeeringState) pbpeering.PeeringState {
streamState, found := s.Tracker.StreamStatus(pID)
// TODO(peering): Get rid of this func when we stop using the stream tracker for imported/ exported services and the peering state
// reconcilePeering enriches the peering with the following information:
// -- PeeringState.Active if the peering is active
// -- ImportedServicesCount and ExportedServicesCount
// NOTE: we return a new peering with this additional data
func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering {
streamState, found := s.Tracker.StreamStatus(peering.ID)
if !found {
s.Logger.Warn("did not find peer in stream tracker; cannot populate imported and"+
" exported services count or reconcile peering state", "peerID", peering.ID)
return peering
} else {
cp := copyPeering(peering)
if found && streamState.Connected {
return pbpeering.PeeringState_ACTIVE
// reconcile pbpeering.PeeringState_Active
if streamState.Connected {
cp.State = pbpeering.PeeringState_ACTIVE
}
// default, no reconciliation
return pState
// add imported & exported services counts
cp.ImportedServiceCount = streamState.GetImportedServicesCount()
cp.ExportedServiceCount = streamState.GetExportedServicesCount()
return cp
}
}
// TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store.
@ -605,22 +600,9 @@ func (s *Server) getExistingOrCreateNewPeerID(peerName, partition string) (strin
return id, nil
}
func copyPeeringWithNewState(p *pbpeering.Peering, state pbpeering.PeeringState) *pbpeering.Peering {
return &pbpeering.Peering{
ID: p.ID,
Name: p.Name,
Partition: p.Partition,
DeletedAt: p.DeletedAt,
Meta: p.Meta,
PeerID: p.PeerID,
PeerCAPems: p.PeerCAPems,
PeerServerAddresses: p.PeerServerAddresses,
PeerServerName: p.PeerServerName,
CreateIndex: p.CreateIndex,
ModifyIndex: p.ModifyIndex,
ImportedServiceCount: p.ImportedServiceCount,
ExportedServiceCount: p.ExportedServiceCount,
func copyPeering(p *pbpeering.Peering) *pbpeering.Peering {
var copyP pbpeering.Peering
proto.Merge(&copyP, p)
State: state,
}
return &copyP
}

View File

@ -152,9 +152,7 @@ func TestAPI_Peering_GenerateToken_Read_Establish_Delete(t *testing.T) {
})
defer s2.Stop()
testutil.RunStep(t, "register services to get synced dc2", func(t *testing.T) {
testNodeServiceCheckRegistrations(t, c2, "dc2")
})
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()