peering: reconcile/ hint active state for list (#13619)

Signed-off-by: acpana <8968914+acpana@users.noreply.github.com>
This commit is contained in:
alex 2022-06-29 09:43:50 -07:00 committed by GitHub
parent 7672532b05
commit a8ae8de20e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 112 additions and 11 deletions

View File

@ -344,7 +344,11 @@ func (s *Service) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadReq
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &pbpeering.PeeringReadResponse{Peering: peering}, nil if peering == nil {
return &pbpeering.PeeringReadResponse{Peering: nil}, nil
}
cp := copyPeeringWithNewState(peering, s.reconciledStreamStateHint(peering.ID, peering.State))
return &pbpeering.PeeringReadResponse{Peering: cp}, nil
} }
func (s *Service) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequest) (*pbpeering.PeeringListResponse, error) { func (s *Service) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequest) (*pbpeering.PeeringListResponse, error) {
@ -370,7 +374,28 @@ func (s *Service) PeeringList(ctx context.Context, req *pbpeering.PeeringListReq
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &pbpeering.PeeringListResponse{Peerings: peerings}, nil
// reconcile the actual peering state; need to copy over the ds for peering
var cPeerings []*pbpeering.Peering
for _, p := range peerings {
cp := copyPeeringWithNewState(p, s.reconciledStreamStateHint(p.ID, p.State))
cPeerings = append(cPeerings, cp)
}
return &pbpeering.PeeringListResponse{Peerings: cPeerings}, nil
}
// TODO(peering): Maybe get rid of this when actually monitoring the stream health
// reconciledStreamStateHint peaks into the streamTracker and determines whether a peering should be marked
// as PeeringState.Active or not
func (s *Service) reconciledStreamStateHint(pID string, pState pbpeering.PeeringState) pbpeering.PeeringState {
streamState, found := s.streams.streamStatus(pID)
if found && streamState.Connected {
return pbpeering.PeeringState_ACTIVE
}
// default, no reconciliation
return pState
} }
// TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store. // TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store.
@ -930,3 +955,21 @@ func logTraceProto(logger hclog.Logger, pb proto.Message, received bool) {
logger.Trace("replication message", "direction", dir, "protobuf", out) logger.Trace("replication message", "direction", dir, "protobuf", out)
} }
func copyPeeringWithNewState(p *pbpeering.Peering, state pbpeering.PeeringState) *pbpeering.Peering {
return &pbpeering.Peering{
ID: p.ID,
Name: p.Name,
Partition: p.Partition,
DeletedAt: p.DeletedAt,
Meta: p.Meta,
PeerID: p.PeerID,
PeerCAPems: p.PeerCAPems,
PeerServerAddresses: p.PeerServerAddresses,
PeerServerName: p.PeerServerName,
CreateIndex: p.CreateIndex,
ModifyIndex: p.ModifyIndex,
State: state,
}
}

View File

@ -853,6 +853,26 @@ func Test_StreamHandler_UpsertServices(t *testing.T) {
run(t, tc) run(t, tc)
}) })
} }
// call PeeringRead and look at the peering state; the peering state must be active
{
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
resp, err := srv.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: localPeerName})
require.NoError(t, err)
require.Equal(t, pbpeering.PeeringState_ACTIVE, resp.Peering.State)
}
// call PeeringList and look at the peering state; the peering state must be active
{
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
resp, err := srv.PeeringList(ctx, &pbpeering.PeeringListRequest{})
require.NoError(t, err)
require.Equal(t, pbpeering.PeeringState_ACTIVE, resp.Peerings[0].State)
}
} }
// newTestServer is copied from partition/service_test.go, with the addition of certs/cas. // newTestServer is copied from partition/service_test.go, with the addition of certs/cas.

View File

@ -140,11 +140,21 @@ func TestAPI_Peering_GenerateToken(t *testing.T) {
func TestAPI_Peering_GenerateToken_Read_Establish_Delete(t *testing.T) { func TestAPI_Peering_GenerateToken_Read_Establish_Delete(t *testing.T) {
t.Parallel() t.Parallel()
c, s := makeClientWithCA(t) c, s := makeClient(t) // this is "dc1"
defer s.Stop() defer s.Stop()
s.WaitForSerfCheck(t) s.WaitForSerfCheck(t)
ctx, cancel := context.WithTimeout(context.Background(), DefaultCtxDuration) // make a "client" server in second DC for peering
c2, s2 := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.Datacenter = "dc2"
})
defer s2.Stop()
testutil.RunStep(t, "register services to get synced dc2", func(t *testing.T) {
testNodeServiceCheckRegistrations(t, c2, "dc2")
})
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() defer cancel()
var token1 string var token1 string
@ -175,12 +185,6 @@ func TestAPI_Peering_GenerateToken_Read_Establish_Delete(t *testing.T) {
require.Equal(t, map[string]string{"foo": "bar"}, resp.Meta) require.Equal(t, map[string]string{"foo": "bar"}, resp.Meta)
}) })
// make a "client" server in second DC for peering
c2, s2 := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.Datacenter = "dc2"
})
defer s2.Stop()
testutil.RunStep(t, "establish peering", func(t *testing.T) { testutil.RunStep(t, "establish peering", func(t *testing.T) {
i := PeeringEstablishRequest{ i := PeeringEstablishRequest{
Datacenter: c2.config.Datacenter, Datacenter: c2.config.Datacenter,
@ -201,8 +205,42 @@ func TestAPI_Peering_GenerateToken_Read_Establish_Delete(t *testing.T) {
// require that the peering state is not undefined // require that the peering state is not undefined
require.Equal(r, PeeringStateInitial, resp.State) require.Equal(r, PeeringStateInitial, resp.State)
require.Equal(r, map[string]string{"foo": "bar"}, resp.Meta) require.Equal(r, map[string]string{"foo": "bar"}, resp.Meta)
})
})
// TODO(peering) -- let's go all the way and test in code either here or somewhere else that PeeringState does move to Active testutil.RunStep(t, "look for active state of peering in dc2", func(t *testing.T) {
// read and list the peer to make sure the status transitions to active
retry.Run(t, func(r *retry.R) {
peering, qm, err := c2.Peerings().Read(ctx, "peer1", nil)
require.NoError(r, err)
require.NotNil(r, qm)
require.NotNil(r, peering)
require.Equal(r, PeeringStateActive, peering.State)
peerings, qm, err := c2.Peerings().List(ctx, nil)
require.NoError(r, err)
require.NotNil(r, qm)
require.NotNil(r, peerings)
require.Equal(r, PeeringStateActive, peerings[0].State)
})
})
testutil.RunStep(t, "look for active state of peering in dc1", func(t *testing.T) {
// read and list the peer to make sure the status transitions to active
retry.Run(t, func(r *retry.R) {
peering, qm, err := c.Peerings().Read(ctx, "peer1", nil)
require.NoError(r, err)
require.NotNil(r, qm)
require.NotNil(r, peering)
require.Equal(r, PeeringStateActive, peering.State)
peerings, qm, err := c.Peerings().List(ctx, nil)
require.NoError(r, err)
require.NotNil(r, qm)
require.NotNil(r, peerings)
require.Equal(r, PeeringStateActive, peerings[0].State)
}) })
}) })