diff --git a/agent/agent.go b/agent/agent.go index a7c89a727..197434e77 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1341,6 +1341,8 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co // function does not drift. cfg.SerfLANConfig = consul.CloneSerfLANConfig(cfg.SerfLANConfig) + cfg.PeeringEnabled = runtimeCfg.PeeringEnabled + enterpriseConsulConfig(cfg, runtimeCfg) return cfg, nil } diff --git a/agent/config/builder.go b/agent/config/builder.go index f855aae51..70c5d044c 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1014,6 +1014,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) { NodeMeta: c.NodeMeta, NodeName: b.nodeName(c.NodeName), ReadReplica: boolVal(c.ReadReplica), + PeeringEnabled: boolVal(c.Peering.Enabled), PidFile: stringVal(c.PidFile), PrimaryDatacenter: primaryDatacenter, PrimaryGateways: b.expandAllOptionalAddrs("primary_gateways", c.PrimaryGateways), diff --git a/agent/config/config.go b/agent/config/config.go index c4f752a82..23e7550aa 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -197,6 +197,7 @@ type Config struct { NodeID *string `mapstructure:"node_id"` NodeMeta map[string]string `mapstructure:"node_meta"` NodeName *string `mapstructure:"node_name"` + Peering Peering `mapstructure:"peering"` Performance Performance `mapstructure:"performance"` PidFile *string `mapstructure:"pid_file"` Ports Ports `mapstructure:"ports"` @@ -887,3 +888,7 @@ type TLS struct { // config merging logic. GRPCModifiedByDeprecatedConfig *struct{} `mapstructure:"-"` } + +type Peering struct { + Enabled *bool `mapstructure:"enabled"` +} diff --git a/agent/config/default.go b/agent/config/default.go index 951d9f126..d0cc2865d 100644 --- a/agent/config/default.go +++ b/agent/config/default.go @@ -104,6 +104,9 @@ func DefaultSource() Source { kv_max_value_size = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + ` txn_max_req_len = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + ` } + peering = { + enabled = true + } performance = { leave_drain_time = "5s" raft_multiplier = ` + strconv.Itoa(int(consul.DefaultRaftMultiplier)) + ` diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 2ae9888ae..db46c2184 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -810,6 +810,14 @@ type RuntimeConfig struct { // flag: -non-voting-server ReadReplica bool + // PeeringEnabled enables cluster peering. This setting only applies for servers. + // When disabled, all peering RPC endpoints will return errors, + // peering requests from other clusters will receive errors, and any peerings already stored in this server's + // state will be ignored. + // + // hcl: peering { enabled = (true|false) } + PeeringEnabled bool + // PidFile is the file to store our PID in. // // hcl: pid_file = string diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 0963ec07f..b05b31491 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -5548,6 +5548,16 @@ func TestLoad_IntegrationWithFlags(t *testing.T) { "tls.grpc was provided but TLS will NOT be enabled on the gRPC listener without an HTTPS listener configured (e.g. via ports.https)", }, }) + run(t, testCase{ + desc: "peering.enabled defaults to true", + args: []string{ + `-data-dir=` + dataDir, + }, + expected: func(rt *RuntimeConfig) { + rt.DataDir = dataDir + rt.PeeringEnabled = true + }, + }) } func (tc testCase) run(format string, dataDir string) func(t *testing.T) { @@ -5955,6 +5965,7 @@ func TestLoad_FullConfig(t *testing.T) { NodeMeta: map[string]string{"5mgGQMBk": "mJLtVMSG", "A7ynFMJB": "0Nx6RGab"}, NodeName: "otlLxGaI", ReadReplica: true, + PeeringEnabled: true, PidFile: "43xN80Km", PrimaryGateways: []string{"aej8eeZo", "roh2KahS"}, PrimaryGatewaysInterval: 18866 * time.Second, diff --git a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden index 25fbba0c0..b5d72f864 100644 --- a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden +++ b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden @@ -235,6 +235,7 @@ "NodeID": "", "NodeMeta": {}, "NodeName": "", + "PeeringEnabled": false, "PidFile": "", "PrimaryDatacenter": "", "PrimaryGateways": [ diff --git a/agent/config/testdata/full-config.hcl b/agent/config/testdata/full-config.hcl index bb544b54a..ed8203296 100644 --- a/agent/config/testdata/full-config.hcl +++ b/agent/config/testdata/full-config.hcl @@ -305,6 +305,9 @@ node_meta { node_name = "otlLxGaI" non_voting_server = true partition = "" +peering { + enabled = true +} performance { leave_drain_time = "8265s" raft_multiplier = 5 diff --git a/agent/config/testdata/full-config.json b/agent/config/testdata/full-config.json index 36f52e681..8294a27b7 100644 --- a/agent/config/testdata/full-config.json +++ b/agent/config/testdata/full-config.json @@ -305,6 +305,9 @@ "node_name": "otlLxGaI", "non_voting_server": true, "partition": "", + "peering": { + "enabled": true + }, "performance": { "leave_drain_time": "8265s", "raft_multiplier": 5, diff --git a/agent/consul/config.go b/agent/consul/config.go index 50235c681..469ccc919 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -396,6 +396,9 @@ type Config struct { RaftBoltDBConfig RaftBoltDBConfig + // PeeringEnabled enables cluster peering. + PeeringEnabled bool + // Embedded Consul Enterprise specific configuration *EnterpriseConfig } @@ -512,6 +515,8 @@ func DefaultConfig() *Config { DefaultQueryTime: 300 * time.Second, MaxQueryTime: 600 * time.Second, + PeeringEnabled: true, + EnterpriseConfig: DefaultEnterpriseConfig(), } diff --git a/agent/consul/leader.go b/agent/consul/leader.go index eb197deb3..389b79056 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -315,7 +315,9 @@ func (s *Server) establishLeadership(ctx context.Context) error { s.startFederationStateAntiEntropy(ctx) - s.startPeeringStreamSync(ctx) + if s.config.PeeringEnabled { + s.startPeeringStreamSync(ctx) + } s.startDeferredDeletion(ctx) @@ -758,7 +760,9 @@ func (s *Server) stopACLReplication() { } func (s *Server) startDeferredDeletion(ctx context.Context) { - s.startPeeringDeferredDeletion(ctx) + if s.config.PeeringEnabled { + s.startPeeringDeferredDeletion(ctx) + } s.startTenancyDeferredDeletion(ctx) } diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index 06cbda43d..feaf5be02 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -1036,3 +1036,89 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) { require.Equal(r, float32(2), metric2.Value) // for d, e services }) } + +// Test that the leader doesn't start its peering deletion routing when +// peering is disabled. +func TestLeader_Peering_NoDeletionWhenPeeringDisabled(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "s1.dc1" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + c.PeeringEnabled = false + }) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + var ( + peerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d" + peerName = "my-peer-s2" + lastIdx = uint64(0) + ) + + // Simulate a peering initiation event by writing a peering to the state store. + lastIdx++ + require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID, + Name: peerName, + })) + + // Mark the peering for deletion to trigger the termination sequence. + lastIdx++ + require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID, + Name: peerName, + DeletedAt: structs.TimeToProto(time.Now()), + })) + + // The leader routine shouldn't be running so the peering should never get deleted. + require.Never(t, func() bool { + _, peering, err := s1.fsm.State().PeeringRead(nil, state.Query{ + Value: peerName, + }) + if err != nil { + t.Logf("unexpected err: %s", err) + return true + } + if peering == nil { + return true + } + return false + }, 7*time.Second, 1*time.Second, "peering should not have been deleted") +} + +// Test that the leader doesn't start its peering establishment routine +// when peering is disabled. +func TestLeader_Peering_NoEstablishmentWhenPeeringDisabled(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "s1.dc1" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + c.PeeringEnabled = false + }) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + var ( + peerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d" + peerName = "my-peer-s2" + lastIdx = uint64(0) + ) + + // Simulate a peering initiation event by writing a peering to the state store. + require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID, + Name: peerName, + PeerServerAddresses: []string{"1.2.3.4"}, + })) + + require.Never(t, func() bool { + _, found := s1.peerStreamTracker.StreamStatus(peerID) + return found + }, 7*time.Second, 1*time.Second, "peering should not have been established") +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 3c240c5f7..a14253d80 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -794,6 +794,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler }, Datacenter: config.Datacenter, ConnectEnabled: config.ConnectEnabled, + PeeringEnabled: config.PeeringEnabled, }) s.peeringServer = p diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 1d0d219f6..a8ca3b199 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -56,6 +56,7 @@ type Config struct { ForwardRPC func(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error) Datacenter string ConnectEnabled bool + PeeringEnabled bool } func NewServer(cfg Config) *Server { @@ -139,6 +140,8 @@ type Store interface { TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) } +var peeringNotEnabledErr = grpcstatus.Error(codes.FailedPrecondition, "peering must be enabled to use this endpoint") + // GenerateToken implements the PeeringService RPC method to generate a // peering token which is the initial step in establishing a peering relationship // with other Consul clusters. @@ -146,6 +149,10 @@ func (s *Server) GenerateToken( ctx context.Context, req *pbpeering.GenerateTokenRequest, ) (*pbpeering.GenerateTokenResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil { return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } @@ -251,6 +258,10 @@ func (s *Server) Establish( ctx context.Context, req *pbpeering.EstablishRequest, ) (*pbpeering.EstablishResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + // validate prior to forwarding to the leader, this saves a network hop if err := dns.ValidateLabel(req.PeerName); err != nil { return nil, fmt.Errorf("%s is not a valid peer name: %w", req.PeerName, err) @@ -316,6 +327,10 @@ func (s *Server) Establish( } func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequest) (*pbpeering.PeeringReadResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil { return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } @@ -350,6 +365,10 @@ func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequ } func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequest) (*pbpeering.PeeringListResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil { return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } @@ -413,6 +432,10 @@ func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering // TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store. // Consider removing if we can find another way to populate state store in peering_endpoint_test.go func (s *Server) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteRequest) (*pbpeering.PeeringWriteResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + if err := s.Backend.EnterpriseCheckPartitions(req.Peering.Partition); err != nil { return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } @@ -449,6 +472,10 @@ func (s *Server) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteRe } func (s *Server) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDeleteRequest) (*pbpeering.PeeringDeleteResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil { return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } @@ -505,6 +532,10 @@ func (s *Server) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDelete } func (s *Server) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundleReadRequest) (*pbpeering.TrustBundleReadResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil { return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } @@ -540,6 +571,10 @@ func (s *Server) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundle // TODO(peering): rename rpc & request/response to drop the "service" part func (s *Server) TrustBundleListByService(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest) (*pbpeering.TrustBundleListByServiceResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil { return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index 939a304d2..c7e37c91d 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -15,6 +15,8 @@ import ( "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" gogrpc "google.golang.org/grpc" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul" @@ -529,6 +531,67 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) { require.Equal(t, []string{"foo-root-1"}, resp.Bundles[1].RootPEMs) } +// Test RPC endpoint responses when peering is disabled. They should all return an error. +func TestPeeringService_PeeringDisabled(t *testing.T) { + // TODO(peering): see note on newTestServer, refactor to not use this + s := newTestServer(t, func(c *consul.Config) { c.PeeringEnabled = false }) + client := pbpeering.NewPeeringServiceClient(s.ClientConn(t)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + // assertFailedResponse is a helper function that checks the error from a gRPC + // response is what we expect when peering is disabled. + assertFailedResponse := func(t *testing.T, err error) { + actErr, ok := grpcstatus.FromError(err) + require.True(t, ok) + require.Equal(t, codes.FailedPrecondition, actErr.Code()) + require.Equal(t, "peering must be enabled to use this endpoint", actErr.Message()) + } + + // Test all the endpoints. + + t.Run("PeeringWrite", func(t *testing.T) { + _, err := client.PeeringWrite(ctx, &pbpeering.PeeringWriteRequest{}) + assertFailedResponse(t, err) + }) + + t.Run("PeeringRead", func(t *testing.T) { + _, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{}) + assertFailedResponse(t, err) + }) + + t.Run("PeeringDelete", func(t *testing.T) { + _, err := client.PeeringDelete(ctx, &pbpeering.PeeringDeleteRequest{}) + assertFailedResponse(t, err) + }) + + t.Run("PeeringList", func(t *testing.T) { + _, err := client.PeeringList(ctx, &pbpeering.PeeringListRequest{}) + assertFailedResponse(t, err) + }) + + t.Run("Establish", func(t *testing.T) { + _, err := client.Establish(ctx, &pbpeering.EstablishRequest{}) + assertFailedResponse(t, err) + }) + + t.Run("GenerateToken", func(t *testing.T) { + _, err := client.GenerateToken(ctx, &pbpeering.GenerateTokenRequest{}) + assertFailedResponse(t, err) + }) + + t.Run("TrustBundleRead", func(t *testing.T) { + _, err := client.TrustBundleRead(ctx, &pbpeering.TrustBundleReadRequest{}) + assertFailedResponse(t, err) + }) + + t.Run("TrustBundleListByService", func(t *testing.T) { + _, err := client.TrustBundleListByService(ctx, &pbpeering.TrustBundleListByServiceRequest{}) + assertFailedResponse(t, err) + }) +} + // newTestServer is copied from partition/service_test.go, with the addition of certs/cas. // TODO(peering): these are endpoint tests and should live in the agent/consul // package. Instead, these can be written around a mock client (see testing.go) diff --git a/docs/config/checklist-adding-config-fields.md b/docs/config/checklist-adding-config-fields.md index e17139411..7a47eb841 100644 --- a/docs/config/checklist-adding-config-fields.md +++ b/docs/config/checklist-adding-config-fields.md @@ -45,6 +45,8 @@ There are four specific cases covered with increasing complexity: - [ ] Add that to `DefaultSource` in `agent/config/defaults.go`. - [ ] Add a test case to the table test `TestLoad_IntegrationWithFlags` in `agent/config/runtime_test.go`. + - [ ] If the config needs to be defaulted for the test server used in unit tests, + also add it to `DefaultConfig()` in `agent/consul/defaults.go`. - [ ] **If** your config should take effect on a reload/HUP. - [ ] Add necessary code to to trigger a safe (locked or atomic) update to any state the feature needs changing. This needs to be added to one or diff --git a/website/content/docs/agent/config/config-files.mdx b/website/content/docs/agent/config/config-files.mdx index a8eaba6d5..6b902987e 100644 --- a/website/content/docs/agent/config/config-files.mdx +++ b/website/content/docs/agent/config/config-files.mdx @@ -551,6 +551,15 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." - `max_query_time` Equivalent to the [`-max-query-time` command-line flag](/docs/agent/config/cli-flags#_max_query_time). +- `peering` This object allows setting options for cluster peering. + + The following sub-keys are available: + + - `enabled` ((#peering_enabled)) (Defaults to `true`) Controls whether cluster peering is enabled. + Has no effect on Consul clients, only on Consul servers. When disabled, all peering APIs will return + an error, any peerings stored in Consul already will be ignored (but they will not be deleted), + all peering connections from other clusters will be rejected. This was added in Consul 1.13.0. + - `partition` - This flag is used to set the name of the admin partition the agent belongs to. An agent can only join and communicate with other agents within its admin partition. Review the