peering: add config to enable/disable peering (#13867)
* peering: add config to enable/disable peering Add config: ``` peering { enabled = true } ``` Defaults to true. When disabled: 1. All peering RPC endpoints will return an error 2. Leader won't start its peering establishment goroutines 3. Leader won't start its peering deletion goroutines
This commit is contained in:
parent
3cbcfd4b13
commit
d21f793b74
|
@ -1341,6 +1341,8 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
|
|||
// function does not drift.
|
||||
cfg.SerfLANConfig = consul.CloneSerfLANConfig(cfg.SerfLANConfig)
|
||||
|
||||
cfg.PeeringEnabled = runtimeCfg.PeeringEnabled
|
||||
|
||||
enterpriseConsulConfig(cfg, runtimeCfg)
|
||||
return cfg, nil
|
||||
}
|
||||
|
|
|
@ -1014,6 +1014,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
|
|||
NodeMeta: c.NodeMeta,
|
||||
NodeName: b.nodeName(c.NodeName),
|
||||
ReadReplica: boolVal(c.ReadReplica),
|
||||
PeeringEnabled: boolVal(c.Peering.Enabled),
|
||||
PidFile: stringVal(c.PidFile),
|
||||
PrimaryDatacenter: primaryDatacenter,
|
||||
PrimaryGateways: b.expandAllOptionalAddrs("primary_gateways", c.PrimaryGateways),
|
||||
|
|
|
@ -197,6 +197,7 @@ type Config struct {
|
|||
NodeID *string `mapstructure:"node_id"`
|
||||
NodeMeta map[string]string `mapstructure:"node_meta"`
|
||||
NodeName *string `mapstructure:"node_name"`
|
||||
Peering Peering `mapstructure:"peering"`
|
||||
Performance Performance `mapstructure:"performance"`
|
||||
PidFile *string `mapstructure:"pid_file"`
|
||||
Ports Ports `mapstructure:"ports"`
|
||||
|
@ -887,3 +888,7 @@ type TLS struct {
|
|||
// config merging logic.
|
||||
GRPCModifiedByDeprecatedConfig *struct{} `mapstructure:"-"`
|
||||
}
|
||||
|
||||
type Peering struct {
|
||||
Enabled *bool `mapstructure:"enabled"`
|
||||
}
|
||||
|
|
|
@ -104,6 +104,9 @@ func DefaultSource() Source {
|
|||
kv_max_value_size = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + `
|
||||
txn_max_req_len = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + `
|
||||
}
|
||||
peering = {
|
||||
enabled = true
|
||||
}
|
||||
performance = {
|
||||
leave_drain_time = "5s"
|
||||
raft_multiplier = ` + strconv.Itoa(int(consul.DefaultRaftMultiplier)) + `
|
||||
|
|
|
@ -810,6 +810,14 @@ type RuntimeConfig struct {
|
|||
// flag: -non-voting-server
|
||||
ReadReplica bool
|
||||
|
||||
// PeeringEnabled enables cluster peering. This setting only applies for servers.
|
||||
// When disabled, all peering RPC endpoints will return errors,
|
||||
// peering requests from other clusters will receive errors, and any peerings already stored in this server's
|
||||
// state will be ignored.
|
||||
//
|
||||
// hcl: peering { enabled = (true|false) }
|
||||
PeeringEnabled bool
|
||||
|
||||
// PidFile is the file to store our PID in.
|
||||
//
|
||||
// hcl: pid_file = string
|
||||
|
|
|
@ -5548,6 +5548,16 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
|
|||
"tls.grpc was provided but TLS will NOT be enabled on the gRPC listener without an HTTPS listener configured (e.g. via ports.https)",
|
||||
},
|
||||
})
|
||||
run(t, testCase{
|
||||
desc: "peering.enabled defaults to true",
|
||||
args: []string{
|
||||
`-data-dir=` + dataDir,
|
||||
},
|
||||
expected: func(rt *RuntimeConfig) {
|
||||
rt.DataDir = dataDir
|
||||
rt.PeeringEnabled = true
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (tc testCase) run(format string, dataDir string) func(t *testing.T) {
|
||||
|
@ -5955,6 +5965,7 @@ func TestLoad_FullConfig(t *testing.T) {
|
|||
NodeMeta: map[string]string{"5mgGQMBk": "mJLtVMSG", "A7ynFMJB": "0Nx6RGab"},
|
||||
NodeName: "otlLxGaI",
|
||||
ReadReplica: true,
|
||||
PeeringEnabled: true,
|
||||
PidFile: "43xN80Km",
|
||||
PrimaryGateways: []string{"aej8eeZo", "roh2KahS"},
|
||||
PrimaryGatewaysInterval: 18866 * time.Second,
|
||||
|
|
|
@ -235,6 +235,7 @@
|
|||
"NodeID": "",
|
||||
"NodeMeta": {},
|
||||
"NodeName": "",
|
||||
"PeeringEnabled": false,
|
||||
"PidFile": "",
|
||||
"PrimaryDatacenter": "",
|
||||
"PrimaryGateways": [
|
||||
|
|
|
@ -305,6 +305,9 @@ node_meta {
|
|||
node_name = "otlLxGaI"
|
||||
non_voting_server = true
|
||||
partition = ""
|
||||
peering {
|
||||
enabled = true
|
||||
}
|
||||
performance {
|
||||
leave_drain_time = "8265s"
|
||||
raft_multiplier = 5
|
||||
|
|
|
@ -305,6 +305,9 @@
|
|||
"node_name": "otlLxGaI",
|
||||
"non_voting_server": true,
|
||||
"partition": "",
|
||||
"peering": {
|
||||
"enabled": true
|
||||
},
|
||||
"performance": {
|
||||
"leave_drain_time": "8265s",
|
||||
"raft_multiplier": 5,
|
||||
|
|
|
@ -396,6 +396,9 @@ type Config struct {
|
|||
|
||||
RaftBoltDBConfig RaftBoltDBConfig
|
||||
|
||||
// PeeringEnabled enables cluster peering.
|
||||
PeeringEnabled bool
|
||||
|
||||
// Embedded Consul Enterprise specific configuration
|
||||
*EnterpriseConfig
|
||||
}
|
||||
|
@ -512,6 +515,8 @@ func DefaultConfig() *Config {
|
|||
DefaultQueryTime: 300 * time.Second,
|
||||
MaxQueryTime: 600 * time.Second,
|
||||
|
||||
PeeringEnabled: true,
|
||||
|
||||
EnterpriseConfig: DefaultEnterpriseConfig(),
|
||||
}
|
||||
|
||||
|
|
|
@ -315,7 +315,9 @@ func (s *Server) establishLeadership(ctx context.Context) error {
|
|||
|
||||
s.startFederationStateAntiEntropy(ctx)
|
||||
|
||||
s.startPeeringStreamSync(ctx)
|
||||
if s.config.PeeringEnabled {
|
||||
s.startPeeringStreamSync(ctx)
|
||||
}
|
||||
|
||||
s.startDeferredDeletion(ctx)
|
||||
|
||||
|
@ -758,7 +760,9 @@ func (s *Server) stopACLReplication() {
|
|||
}
|
||||
|
||||
func (s *Server) startDeferredDeletion(ctx context.Context) {
|
||||
s.startPeeringDeferredDeletion(ctx)
|
||||
if s.config.PeeringEnabled {
|
||||
s.startPeeringDeferredDeletion(ctx)
|
||||
}
|
||||
s.startTenancyDeferredDeletion(ctx)
|
||||
}
|
||||
|
||||
|
|
|
@ -1036,3 +1036,89 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
|
|||
require.Equal(r, float32(2), metric2.Value) // for d, e services
|
||||
})
|
||||
}
|
||||
|
||||
// Test that the leader doesn't start its peering deletion routing when
|
||||
// peering is disabled.
|
||||
func TestLeader_Peering_NoDeletionWhenPeeringDisabled(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
_, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "s1.dc1"
|
||||
c.Datacenter = "dc1"
|
||||
c.TLSConfig.Domain = "consul"
|
||||
c.PeeringEnabled = false
|
||||
})
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
var (
|
||||
peerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d"
|
||||
peerName = "my-peer-s2"
|
||||
lastIdx = uint64(0)
|
||||
)
|
||||
|
||||
// Simulate a peering initiation event by writing a peering to the state store.
|
||||
lastIdx++
|
||||
require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: peerID,
|
||||
Name: peerName,
|
||||
}))
|
||||
|
||||
// Mark the peering for deletion to trigger the termination sequence.
|
||||
lastIdx++
|
||||
require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: peerID,
|
||||
Name: peerName,
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
}))
|
||||
|
||||
// The leader routine shouldn't be running so the peering should never get deleted.
|
||||
require.Never(t, func() bool {
|
||||
_, peering, err := s1.fsm.State().PeeringRead(nil, state.Query{
|
||||
Value: peerName,
|
||||
})
|
||||
if err != nil {
|
||||
t.Logf("unexpected err: %s", err)
|
||||
return true
|
||||
}
|
||||
if peering == nil {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}, 7*time.Second, 1*time.Second, "peering should not have been deleted")
|
||||
}
|
||||
|
||||
// Test that the leader doesn't start its peering establishment routine
|
||||
// when peering is disabled.
|
||||
func TestLeader_Peering_NoEstablishmentWhenPeeringDisabled(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
_, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "s1.dc1"
|
||||
c.Datacenter = "dc1"
|
||||
c.TLSConfig.Domain = "consul"
|
||||
c.PeeringEnabled = false
|
||||
})
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
var (
|
||||
peerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d"
|
||||
peerName = "my-peer-s2"
|
||||
lastIdx = uint64(0)
|
||||
)
|
||||
|
||||
// Simulate a peering initiation event by writing a peering to the state store.
|
||||
require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: peerID,
|
||||
Name: peerName,
|
||||
PeerServerAddresses: []string{"1.2.3.4"},
|
||||
}))
|
||||
|
||||
require.Never(t, func() bool {
|
||||
_, found := s1.peerStreamTracker.StreamStatus(peerID)
|
||||
return found
|
||||
}, 7*time.Second, 1*time.Second, "peering should not have been established")
|
||||
}
|
||||
|
|
|
@ -794,6 +794,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
|
|||
},
|
||||
Datacenter: config.Datacenter,
|
||||
ConnectEnabled: config.ConnectEnabled,
|
||||
PeeringEnabled: config.PeeringEnabled,
|
||||
})
|
||||
s.peeringServer = p
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ type Config struct {
|
|||
ForwardRPC func(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error)
|
||||
Datacenter string
|
||||
ConnectEnabled bool
|
||||
PeeringEnabled bool
|
||||
}
|
||||
|
||||
func NewServer(cfg Config) *Server {
|
||||
|
@ -139,6 +140,8 @@ type Store interface {
|
|||
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
||||
}
|
||||
|
||||
var peeringNotEnabledErr = grpcstatus.Error(codes.FailedPrecondition, "peering must be enabled to use this endpoint")
|
||||
|
||||
// GenerateToken implements the PeeringService RPC method to generate a
|
||||
// peering token which is the initial step in establishing a peering relationship
|
||||
// with other Consul clusters.
|
||||
|
@ -146,6 +149,10 @@ func (s *Server) GenerateToken(
|
|||
ctx context.Context,
|
||||
req *pbpeering.GenerateTokenRequest,
|
||||
) (*pbpeering.GenerateTokenResponse, error) {
|
||||
if !s.Config.PeeringEnabled {
|
||||
return nil, peeringNotEnabledErr
|
||||
}
|
||||
|
||||
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
||||
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
@ -251,6 +258,10 @@ func (s *Server) Establish(
|
|||
ctx context.Context,
|
||||
req *pbpeering.EstablishRequest,
|
||||
) (*pbpeering.EstablishResponse, error) {
|
||||
if !s.Config.PeeringEnabled {
|
||||
return nil, peeringNotEnabledErr
|
||||
}
|
||||
|
||||
// validate prior to forwarding to the leader, this saves a network hop
|
||||
if err := dns.ValidateLabel(req.PeerName); err != nil {
|
||||
return nil, fmt.Errorf("%s is not a valid peer name: %w", req.PeerName, err)
|
||||
|
@ -316,6 +327,10 @@ func (s *Server) Establish(
|
|||
}
|
||||
|
||||
func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequest) (*pbpeering.PeeringReadResponse, error) {
|
||||
if !s.Config.PeeringEnabled {
|
||||
return nil, peeringNotEnabledErr
|
||||
}
|
||||
|
||||
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
||||
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
@ -350,6 +365,10 @@ func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequ
|
|||
}
|
||||
|
||||
func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequest) (*pbpeering.PeeringListResponse, error) {
|
||||
if !s.Config.PeeringEnabled {
|
||||
return nil, peeringNotEnabledErr
|
||||
}
|
||||
|
||||
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
||||
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
@ -413,6 +432,10 @@ func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering
|
|||
// TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store.
|
||||
// Consider removing if we can find another way to populate state store in peering_endpoint_test.go
|
||||
func (s *Server) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteRequest) (*pbpeering.PeeringWriteResponse, error) {
|
||||
if !s.Config.PeeringEnabled {
|
||||
return nil, peeringNotEnabledErr
|
||||
}
|
||||
|
||||
if err := s.Backend.EnterpriseCheckPartitions(req.Peering.Partition); err != nil {
|
||||
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
@ -449,6 +472,10 @@ func (s *Server) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteRe
|
|||
}
|
||||
|
||||
func (s *Server) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDeleteRequest) (*pbpeering.PeeringDeleteResponse, error) {
|
||||
if !s.Config.PeeringEnabled {
|
||||
return nil, peeringNotEnabledErr
|
||||
}
|
||||
|
||||
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
||||
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
@ -505,6 +532,10 @@ func (s *Server) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDelete
|
|||
}
|
||||
|
||||
func (s *Server) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundleReadRequest) (*pbpeering.TrustBundleReadResponse, error) {
|
||||
if !s.Config.PeeringEnabled {
|
||||
return nil, peeringNotEnabledErr
|
||||
}
|
||||
|
||||
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
||||
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
@ -540,6 +571,10 @@ func (s *Server) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundle
|
|||
|
||||
// TODO(peering): rename rpc & request/response to drop the "service" part
|
||||
func (s *Server) TrustBundleListByService(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest) (*pbpeering.TrustBundleListByServiceResponse, error) {
|
||||
if !s.Config.PeeringEnabled {
|
||||
return nil, peeringNotEnabledErr
|
||||
}
|
||||
|
||||
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
||||
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
|
|
@ -15,6 +15,8 @@ import (
|
|||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
gogrpc "google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
grpcstatus "google.golang.org/grpc/status"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
|
@ -529,6 +531,67 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) {
|
|||
require.Equal(t, []string{"foo-root-1"}, resp.Bundles[1].RootPEMs)
|
||||
}
|
||||
|
||||
// Test RPC endpoint responses when peering is disabled. They should all return an error.
|
||||
func TestPeeringService_PeeringDisabled(t *testing.T) {
|
||||
// TODO(peering): see note on newTestServer, refactor to not use this
|
||||
s := newTestServer(t, func(c *consul.Config) { c.PeeringEnabled = false })
|
||||
client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
// assertFailedResponse is a helper function that checks the error from a gRPC
|
||||
// response is what we expect when peering is disabled.
|
||||
assertFailedResponse := func(t *testing.T, err error) {
|
||||
actErr, ok := grpcstatus.FromError(err)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, codes.FailedPrecondition, actErr.Code())
|
||||
require.Equal(t, "peering must be enabled to use this endpoint", actErr.Message())
|
||||
}
|
||||
|
||||
// Test all the endpoints.
|
||||
|
||||
t.Run("PeeringWrite", func(t *testing.T) {
|
||||
_, err := client.PeeringWrite(ctx, &pbpeering.PeeringWriteRequest{})
|
||||
assertFailedResponse(t, err)
|
||||
})
|
||||
|
||||
t.Run("PeeringRead", func(t *testing.T) {
|
||||
_, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{})
|
||||
assertFailedResponse(t, err)
|
||||
})
|
||||
|
||||
t.Run("PeeringDelete", func(t *testing.T) {
|
||||
_, err := client.PeeringDelete(ctx, &pbpeering.PeeringDeleteRequest{})
|
||||
assertFailedResponse(t, err)
|
||||
})
|
||||
|
||||
t.Run("PeeringList", func(t *testing.T) {
|
||||
_, err := client.PeeringList(ctx, &pbpeering.PeeringListRequest{})
|
||||
assertFailedResponse(t, err)
|
||||
})
|
||||
|
||||
t.Run("Establish", func(t *testing.T) {
|
||||
_, err := client.Establish(ctx, &pbpeering.EstablishRequest{})
|
||||
assertFailedResponse(t, err)
|
||||
})
|
||||
|
||||
t.Run("GenerateToken", func(t *testing.T) {
|
||||
_, err := client.GenerateToken(ctx, &pbpeering.GenerateTokenRequest{})
|
||||
assertFailedResponse(t, err)
|
||||
})
|
||||
|
||||
t.Run("TrustBundleRead", func(t *testing.T) {
|
||||
_, err := client.TrustBundleRead(ctx, &pbpeering.TrustBundleReadRequest{})
|
||||
assertFailedResponse(t, err)
|
||||
})
|
||||
|
||||
t.Run("TrustBundleListByService", func(t *testing.T) {
|
||||
_, err := client.TrustBundleListByService(ctx, &pbpeering.TrustBundleListByServiceRequest{})
|
||||
assertFailedResponse(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
// newTestServer is copied from partition/service_test.go, with the addition of certs/cas.
|
||||
// TODO(peering): these are endpoint tests and should live in the agent/consul
|
||||
// package. Instead, these can be written around a mock client (see testing.go)
|
||||
|
|
|
@ -45,6 +45,8 @@ There are four specific cases covered with increasing complexity:
|
|||
- [ ] Add that to `DefaultSource` in `agent/config/defaults.go`.
|
||||
- [ ] Add a test case to the table test `TestLoad_IntegrationWithFlags` in
|
||||
`agent/config/runtime_test.go`.
|
||||
- [ ] If the config needs to be defaulted for the test server used in unit tests,
|
||||
also add it to `DefaultConfig()` in `agent/consul/defaults.go`.
|
||||
- [ ] **If** your config should take effect on a reload/HUP.
|
||||
- [ ] Add necessary code to to trigger a safe (locked or atomic) update to
|
||||
any state the feature needs changing. This needs to be added to one or
|
||||
|
|
|
@ -551,6 +551,15 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
|
|||
|
||||
- `max_query_time` Equivalent to the [`-max-query-time` command-line flag](/docs/agent/config/cli-flags#_max_query_time).
|
||||
|
||||
- `peering` This object allows setting options for cluster peering.
|
||||
|
||||
The following sub-keys are available:
|
||||
|
||||
- `enabled` ((#peering_enabled)) (Defaults to `true`) Controls whether cluster peering is enabled.
|
||||
Has no effect on Consul clients, only on Consul servers. When disabled, all peering APIs will return
|
||||
an error, any peerings stored in Consul already will be ignored (but they will not be deleted),
|
||||
all peering connections from other clusters will be rejected. This was added in Consul 1.13.0.
|
||||
|
||||
- `partition` <EnterpriseAlert inline /> - This flag is used to set
|
||||
the name of the admin partition the agent belongs to. An agent can only join
|
||||
and communicate with other agents within its admin partition. Review the
|
||||
|
|
Loading…
Reference in New Issue