feat(peering): validate server name conflicts on establish
This commit is contained in:
parent
ea4d95a5c6
commit
b37a2ba889
|
@ -0,0 +1,3 @@
|
|||
```release-note:improvement
|
||||
peering: Validate peering tokens for server name conflicts
|
||||
```
|
|
@ -1370,6 +1370,11 @@ func (s *Server) WANMembers() []serf.Member {
|
|||
return s.serfWAN.Members()
|
||||
}
|
||||
|
||||
// GetPeeringBackend is a test helper.
|
||||
func (s *Server) GetPeeringBackend() peering.Backend {
|
||||
return s.peeringBackend
|
||||
}
|
||||
|
||||
// RemoveFailedNode is used to remove a failed node from the cluster.
|
||||
func (s *Server) RemoveFailedNode(node string, prune bool, entMeta *acl.EnterpriseMeta) error {
|
||||
var removeFn func(*serf.Serf, string) error
|
||||
|
|
|
@ -267,8 +267,8 @@ func TestHTTP_Peering_Establish(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Success", func(t *testing.T) {
|
||||
a2 := NewTestAgent(t, "")
|
||||
testrpc.WaitForTestAgent(t, a2.RPC, "dc1")
|
||||
a2 := NewTestAgent(t, `datacenter = "dc2"`)
|
||||
testrpc.WaitForTestAgent(t, a2.RPC, "dc2")
|
||||
|
||||
bodyBytes, err := json.Marshal(&pbpeering.GenerateTokenRequest{
|
||||
PeerName: "foo",
|
||||
|
|
|
@ -374,7 +374,7 @@ func (s *Server) Establish(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if err := s.validatePeeringInPartition(tok.PeerID, entMeta.PartitionOrEmpty()); err != nil {
|
||||
if err := s.validatePeeringLocality(tok, entMeta.PartitionOrEmpty()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -463,15 +463,23 @@ func (s *Server) Establish(
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
// validatePeeringInPartition makes sure that we don't create a peering in the same partition. We validate by looking at
|
||||
// the remotePeerID from the PeeringToken and looking up for a peering in the partition. If there is one and the
|
||||
// request partition is the same, then we are attempting to peer within the partition, which we shouldn't.
|
||||
func (s *Server) validatePeeringInPartition(remotePeerID, partition string) error {
|
||||
_, peering, err := s.Backend.Store().PeeringReadByID(nil, remotePeerID)
|
||||
// validatePeeringLocality makes sure that we don't create a peering in the cluster/partition it was generated.
|
||||
// We validate by looking at the remote PeerID from the PeeringToken and looking up that peering in the partition.
|
||||
// If there is one and the request partition is the same, then we are attempting to peer within the partition, which we shouldn't.
|
||||
// We also perform a check to verify if the ServerName of the PeeringToken overlaps with our own, we do not process it
|
||||
// unless we've been able to find the peering in the store, i.e. this peering is between two local partitions.
|
||||
func (s *Server) validatePeeringLocality(token *structs.PeeringToken, partition string) error {
|
||||
_, peering, err := s.Backend.Store().PeeringReadByID(nil, token.PeerID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read peering by ID: %w", err)
|
||||
}
|
||||
|
||||
// If the token has the same server name as this cluster, but we can't find the peering
|
||||
// in our store, it indicates a naming conflict.
|
||||
if s.Backend.GetServerName() == token.ServerName && peering == nil {
|
||||
return fmt.Errorf("conflict - peering token's server name matches the current cluster's server name, %q, but there is no record in the database", s.Backend.GetServerName())
|
||||
}
|
||||
|
||||
if peering != nil && acl.EqualPartitions(peering.GetPartition(), partition) {
|
||||
return fmt.Errorf("cannot create a peering within the same partition (ENT) or cluster (OSS)")
|
||||
}
|
||||
|
|
|
@ -345,8 +345,8 @@ func TestPeeringService_Establish_Validation(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// We define a valid peering by a peering that does not occur over the same server addresses
|
||||
func TestPeeringService_Establish_validPeeringInPartition(t *testing.T) {
|
||||
// Loopback peering within the same cluster/partion should throw an error
|
||||
func TestPeeringService_Establish_invalidPeeringInSamePartition(t *testing.T) {
|
||||
// TODO(peering): see note on newTestServer, refactor to not use this
|
||||
s := newTestServer(t, nil)
|
||||
client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))
|
||||
|
@ -369,12 +369,48 @@ func TestPeeringService_Establish_validPeeringInPartition(t *testing.T) {
|
|||
require.Nil(t, respE)
|
||||
}
|
||||
|
||||
// When tokens have the same name as the dialing cluster but are unknown by ID, we
|
||||
// should be throwing an error to note the server name conflict.
|
||||
func TestPeeringService_Establish_serverNameConflict(t *testing.T) {
|
||||
// TODO(peering): see note on newTestServer, refactor to not use this
|
||||
s := newTestServer(t, nil)
|
||||
client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
// Manufacture token to have the same server name but a PeerID not in the store.
|
||||
id, err := uuid.GenerateUUID()
|
||||
require.NoError(t, err, "could not generate uuid")
|
||||
peeringToken := structs.PeeringToken{
|
||||
ServerAddresses: []string{"1.2.3.4:8502"},
|
||||
ServerName: s.Server.GetPeeringBackend().GetServerName(),
|
||||
EstablishmentSecret: "foo",
|
||||
PeerID: id,
|
||||
}
|
||||
|
||||
jsonToken, err := json.Marshal(peeringToken)
|
||||
require.NoError(t, err, "could not marshal peering token")
|
||||
base64Token := base64.StdEncoding.EncodeToString(jsonToken)
|
||||
|
||||
establishReq := &pbpeering.EstablishRequest{
|
||||
PeerName: "peerTwo",
|
||||
PeeringToken: base64Token,
|
||||
}
|
||||
|
||||
respE, errE := client.Establish(ctx, establishReq)
|
||||
require.Error(t, errE)
|
||||
require.Contains(t, errE.Error(), "conflict - peering token's server name matches the current cluster's server name")
|
||||
require.Nil(t, respE)
|
||||
}
|
||||
|
||||
func TestPeeringService_Establish(t *testing.T) {
|
||||
// TODO(peering): see note on newTestServer, refactor to not use this
|
||||
s1 := newTestServer(t, nil)
|
||||
client1 := pbpeering.NewPeeringServiceClient(s1.ClientConn(t))
|
||||
|
||||
s2 := newTestServer(t, func(conf *consul.Config) {
|
||||
conf.Datacenter = "dc2"
|
||||
conf.GRPCPort = 5301
|
||||
})
|
||||
client2 := pbpeering.NewPeeringServiceClient(s2.ClientConn(t))
|
||||
|
@ -1070,6 +1106,7 @@ func TestPeeringService_validatePeer(t *testing.T) {
|
|||
|
||||
s2 := newTestServer(t, func(conf *consul.Config) {
|
||||
conf.GRPCPort = 5301
|
||||
conf.Datacenter = "dc2"
|
||||
})
|
||||
client2 := pbpeering.NewPeeringServiceClient(s2.ClientConn(t))
|
||||
|
||||
|
|
|
@ -51,6 +51,7 @@ func TestAPI_Peering_ACLDeny(t *testing.T) {
|
|||
serverConfig.ACL.Enabled = true
|
||||
serverConfig.ACL.DefaultPolicy = "deny"
|
||||
serverConfig.Ports.GRPC = 5301
|
||||
serverConfig.Datacenter = "dc2"
|
||||
})
|
||||
defer s2.Stop()
|
||||
|
||||
|
|
|
@ -32,11 +32,11 @@ func TestEstablishCommand(t *testing.T) {
|
|||
acceptor := agent.NewTestAgent(t, ``)
|
||||
t.Cleanup(func() { _ = acceptor.Shutdown() })
|
||||
|
||||
dialer := agent.NewTestAgent(t, ``)
|
||||
dialer := agent.NewTestAgent(t, `datacenter = "dc2"`)
|
||||
t.Cleanup(func() { _ = dialer.Shutdown() })
|
||||
|
||||
testrpc.WaitForTestAgent(t, acceptor.RPC, "dc1")
|
||||
testrpc.WaitForTestAgent(t, dialer.RPC, "dc1")
|
||||
testrpc.WaitForTestAgent(t, dialer.RPC, "dc2")
|
||||
|
||||
acceptingClient := acceptor.Client()
|
||||
dialingClient := dialer.Client()
|
||||
|
|
Loading…
Reference in New Issue