peering: use ShouldDial to validate peer role (#13823)
Signed-off-by: acpana <8968914+acpana@users.noreply.github.com>
This commit is contained in:
parent
d21f793b74
commit
b60ebc022e
|
@ -194,13 +194,22 @@ func (s *Server) GenerateToken(
|
|||
}
|
||||
}
|
||||
|
||||
peeringOrNil, err := s.getExistingPeering(req.PeerName, req.Partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// validate that this peer name is not being used as a dialer already
|
||||
if err = validatePeer(peeringOrNil, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
canRetry := true
|
||||
RETRY_ONCE:
|
||||
id, err := s.getExistingOrCreateNewPeerID(req.PeerName, req.Partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
writeReq := pbpeering.PeeringWriteRequest{
|
||||
Peering: &pbpeering.Peering{
|
||||
ID: id,
|
||||
|
@ -290,17 +299,32 @@ func (s *Server) Establish(
|
|||
|
||||
defer metrics.MeasureSince([]string{"peering", "establish"}, time.Now())
|
||||
|
||||
peeringOrNil, err := s.getExistingPeering(req.PeerName, req.Partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// validate that this peer name is not being used as an acceptor already
|
||||
if err = validatePeer(peeringOrNil, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var id string
|
||||
if peeringOrNil != nil {
|
||||
id = peeringOrNil.ID
|
||||
} else {
|
||||
id, err = lib.GenerateUUID(s.Backend.CheckPeeringUUID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// convert ServiceAddress values to strings
|
||||
serverAddrs := make([]string, len(tok.ServerAddresses))
|
||||
for i, addr := range tok.ServerAddresses {
|
||||
serverAddrs[i] = addr
|
||||
}
|
||||
|
||||
id, err := s.getExistingOrCreateNewPeerID(req.PeerName, req.Partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// as soon as a peering is written with a list of ServerAddresses that is
|
||||
// non-empty, the leader routine will see the peering and attempt to
|
||||
// establish a connection with the remote peer.
|
||||
|
@ -622,16 +646,12 @@ func (s *Server) TrustBundleListByService(ctx context.Context, req *pbpeering.Tr
|
|||
}
|
||||
|
||||
func (s *Server) getExistingOrCreateNewPeerID(peerName, partition string) (string, error) {
|
||||
q := state.Query{
|
||||
Value: strings.ToLower(peerName),
|
||||
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(partition),
|
||||
}
|
||||
_, peering, err := s.Backend.Store().PeeringRead(nil, q)
|
||||
peeringOrNil, err := s.getExistingPeering(peerName, partition)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if peering != nil {
|
||||
return peering.ID, nil
|
||||
if peeringOrNil != nil {
|
||||
return peeringOrNil.ID, nil
|
||||
}
|
||||
|
||||
id, err := lib.GenerateUUID(s.Backend.CheckPeeringUUID)
|
||||
|
@ -641,6 +661,36 @@ func (s *Server) getExistingOrCreateNewPeerID(peerName, partition string) (strin
|
|||
return id, nil
|
||||
}
|
||||
|
||||
func (s *Server) getExistingPeering(peerName, partition string) (*pbpeering.Peering, error) {
|
||||
q := state.Query{
|
||||
Value: strings.ToLower(peerName),
|
||||
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(partition),
|
||||
}
|
||||
_, peering, err := s.Backend.Store().PeeringRead(nil, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return peering, nil
|
||||
}
|
||||
|
||||
// validatePeer enforces the following rule for an existing peering:
|
||||
// - if a peering already exists, it can only be used as an acceptor or dialer
|
||||
//
|
||||
// We define a DIALER as a peering that has server addresses (or a peering that is created via the Establish endpoint)
|
||||
// Conversely, we define an ACCEPTOR as a peering that is created via the GenerateToken endpoint
|
||||
func validatePeer(peering *pbpeering.Peering, allowedToDial bool) error {
|
||||
if peering != nil && peering.ShouldDial() != allowedToDial {
|
||||
if allowedToDial {
|
||||
return fmt.Errorf("cannot create peering with name: %q; there is an existing peering expecting to be dialed", peering.Name)
|
||||
} else {
|
||||
return fmt.Errorf("cannot create peering with name: %q; there is already an established peering", peering.Name)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func copyPeering(p *pbpeering.Peering) *pbpeering.Peering {
|
||||
var copyP pbpeering.Peering
|
||||
proto.Merge(©P, p)
|
||||
|
|
|
@ -531,6 +531,81 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) {
|
|||
require.Equal(t, []string{"foo-root-1"}, resp.Bundles[1].RootPEMs)
|
||||
}
|
||||
|
||||
func TestPeeringService_validatePeer(t *testing.T) {
|
||||
dir := testutil.TempDir(t, "consul")
|
||||
signer, _, _ := tlsutil.GeneratePrivateKey()
|
||||
ca, _, _ := tlsutil.GenerateCA(tlsutil.CAOpts{Signer: signer})
|
||||
cafile := path.Join(dir, "cacert.pem")
|
||||
require.NoError(t, ioutil.WriteFile(cafile, []byte(ca), 0600))
|
||||
|
||||
s := newTestServer(t, func(c *consul.Config) {
|
||||
c.SerfLANConfig.MemberlistConfig.AdvertiseAddr = "127.0.0.1"
|
||||
c.TLSConfig.GRPC.CAFile = cafile
|
||||
c.DataDir = dir
|
||||
})
|
||||
client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
testutil.RunStep(t, "generate a token", func(t *testing.T) {
|
||||
req := pbpeering.GenerateTokenRequest{PeerName: "peerB"}
|
||||
resp, err := client.GenerateToken(ctx, &req)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, resp)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "generate a token with the same name", func(t *testing.T) {
|
||||
req := pbpeering.GenerateTokenRequest{PeerName: "peerB"}
|
||||
resp, err := client.GenerateToken(ctx, &req)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, resp)
|
||||
})
|
||||
|
||||
validToken := peering.TestPeeringToken("83474a06-cca4-4ff4-99a4-4152929c8160")
|
||||
validTokenJSON, _ := json.Marshal(&validToken)
|
||||
validTokenB64 := base64.StdEncoding.EncodeToString(validTokenJSON)
|
||||
|
||||
testutil.RunStep(t, "send an establish request for a different peer name", func(t *testing.T) {
|
||||
resp, err := client.Establish(ctx, &pbpeering.EstablishRequest{
|
||||
PeerName: "peer1-usw1",
|
||||
PeeringToken: validTokenB64,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, resp)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "send an establish request for a different peer name again", func(t *testing.T) {
|
||||
resp, err := client.Establish(ctx, &pbpeering.EstablishRequest{
|
||||
PeerName: "peer1-usw1",
|
||||
PeeringToken: validTokenB64,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, resp)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "attempt to generate token with the same name used as dialer", func(t *testing.T) {
|
||||
req := pbpeering.GenerateTokenRequest{PeerName: "peer1-usw1"}
|
||||
resp, err := client.GenerateToken(ctx, &req)
|
||||
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(),
|
||||
"cannot create peering with name: \"peer1-usw1\"; there is already an established peering")
|
||||
require.Nil(t, resp)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "attempt to establish the with the same name used as acceptor", func(t *testing.T) {
|
||||
resp, err := client.Establish(ctx, &pbpeering.EstablishRequest{
|
||||
PeerName: "peerB",
|
||||
PeeringToken: validTokenB64,
|
||||
})
|
||||
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(),
|
||||
"cannot create peering with name: \"peerB\"; there is an existing peering expecting to be dialed")
|
||||
require.Nil(t, resp)
|
||||
})
|
||||
}
|
||||
|
||||
// Test RPC endpoint responses when peering is disabled. They should all return an error.
|
||||
func TestPeeringService_PeeringDisabled(t *testing.T) {
|
||||
// TODO(peering): see note on newTestServer, refactor to not use this
|
||||
|
|
Loading…
Reference in New Issue