From 4be4dd7af0dc5b11133abf91fd7967000422b4c8 Mon Sep 17 00:00:00 2001 From: Derek Menteer <105233703+hashi-derek@users.noreply.github.com> Date: Fri, 10 Feb 2023 09:47:17 -0600 Subject: [PATCH] Fix peering acceptors in secondary datacenters. (#16230) Prior to this commit, secondary datacenters could not be initialized as peering acceptors if ACLs were enabled. This is due to the fact that internal server-to-server API calls would fail because the management token was not generated. This PR makes it so that both primary and secondary datacenters generate their own management token whenever a leader is elected in their respective clusters. --- .changelog/16230.txt | 3 + agent/consul/leader.go | 26 ++-- agent/consul/leader_test.go | 45 +++++++ .../test/wanfed/wanfed_peering_test.go | 118 ++++++++++++++++++ 4 files changed, 179 insertions(+), 13 deletions(-) create mode 100644 .changelog/16230.txt create mode 100644 test/integration/consul-container/test/wanfed/wanfed_peering_test.go diff --git a/.changelog/16230.txt b/.changelog/16230.txt new file mode 100644 index 000000000..81e574798 --- /dev/null +++ b/.changelog/16230.txt @@ -0,0 +1,3 @@ +```release-note:bug +peering: Fix issue where secondary wan-federated datacenters could not be used as peering acceptors. +``` diff --git a/agent/consul/leader.go b/agent/consul/leader.go index fce72a32d..d5eb00fbb 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -509,23 +509,23 @@ func (s *Server) initializeACLs(ctx context.Context) error { if err := s.InsertAnonymousToken(); err != nil { return err } - - // Generate or rotate the server management token on leadership transitions. - // This token is used by Consul servers for authn/authz when making - // requests to themselves through public APIs such as the agent cache. - // It is stored as system metadata because it is internally - // managed and users are not meant to see it or interact with it. - secretID, err := lib.GenerateUUID(nil) - if err != nil { - return fmt.Errorf("failed to generate the secret ID for the server management token: %w", err) - } - if err := s.setSystemMetadataKey(structs.ServerManagementTokenAccessorID, secretID); err != nil { - return fmt.Errorf("failed to persist server management token: %w", err) - } } else { s.startACLReplication(ctx) } + // Generate or rotate the server management token on leadership transitions. + // This token is used by Consul servers for authn/authz when making + // requests to themselves through public APIs such as the agent cache. + // It is stored as system metadata because it is internally + // managed and users are not meant to see it or interact with it. + secretID, err := lib.GenerateUUID(nil) + if err != nil { + return fmt.Errorf("failed to generate the secret ID for the server management token: %w", err) + } + if err := s.setSystemMetadataKey(structs.ServerManagementTokenAccessorID, secretID); err != nil { + return fmt.Errorf("failed to persist server management token: %w", err) + } + s.startACLTokenReaping(ctx) return nil diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 3bbe08bc6..e8bcb39a6 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -1310,6 +1310,51 @@ func TestLeader_ACL_Initialization(t *testing.T) { } } +func TestLeader_ACL_Initialization_SecondaryDC(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = true + c.Datacenter = "dc1" + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = true + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + testrpc.WaitForTestAgent(t, s2.RPC, "dc2") + + // Check dc1's management token + serverToken1, err := s1.getSystemMetadata(structs.ServerManagementTokenAccessorID) + require.NoError(t, err) + require.NotEmpty(t, serverToken1) + _, err = uuid.ParseUUID(serverToken1) + require.NoError(t, err) + + // Check dc2's management token + serverToken2, err := s2.getSystemMetadata(structs.ServerManagementTokenAccessorID) + require.NoError(t, err) + require.NotEmpty(t, serverToken2) + _, err = uuid.ParseUUID(serverToken2) + require.NoError(t, err) + + // Ensure the tokens were not replicated between clusters. + require.NotEqual(t, serverToken1, serverToken2) +} + func TestLeader_ACLUpgrade_IsStickyEvenIfSerfTagsRegress(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/test/integration/consul-container/test/wanfed/wanfed_peering_test.go b/test/integration/consul-container/test/wanfed/wanfed_peering_test.go new file mode 100644 index 000000000..10b008745 --- /dev/null +++ b/test/integration/consul-container/test/wanfed/wanfed_peering_test.go @@ -0,0 +1,118 @@ +package peering + +import ( + "context" + "testing" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/testutil/retry" + libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert" + libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster" + libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service" + "github.com/stretchr/testify/require" +) + +func TestPeering_WanFedSecondaryDC(t *testing.T) { + t.Parallel() + + _, c1Agent := createCluster(t, "primary", func(c *libcluster.ConfigBuilder) { + c.Set("primary_datacenter", "primary") + // Enable ACLs, since they affect how the peering certificates are generated. + c.Set("acl.enabled", true) + }) + + c2, c2Agent := createCluster(t, "secondary", func(c *libcluster.ConfigBuilder) { + c.Set("primary_datacenter", "primary") + c.Set("retry_join_wan", []string{c1Agent.GetIP()}) + // Enable ACLs, since they affect how the peering certificates are generated. + c.Set("acl.enabled", true) + }) + + c3, c3Agent := createCluster(t, "alpha", nil) + + t.Run("secondary dc services are visible in primary dc", func(t *testing.T) { + createConnectService(t, c2) + assertCatalogService(t, c1Agent.GetClient(), "static-server", &api.QueryOptions{Datacenter: "secondary"}) + }) + + t.Run("secondary dc can peer to alpha dc", func(t *testing.T) { + // Create the gateway + _, err := libservice.NewGatewayService(context.Background(), "mesh", "mesh", c3.Servers()[0]) + require.NoError(t, err) + + // Create the peering connection + require.NoError(t, c3.PeerWithCluster(c2Agent.GetClient(), "secondary-to-alpha", "alpha-to-secondary")) + libassert.PeeringStatus(t, c2Agent.GetClient(), "secondary-to-alpha", api.PeeringStateActive) + }) + + t.Run("secondary dc can access services in alpha dc", func(t *testing.T) { + service := createConnectService(t, c3) + require.NoError(t, service.Export("default", "alpha-to-secondary", c3Agent.GetClient())) + + // Create a testing sidecar to proxy requests through + clientConnectProxy, err := libservice.CreateAndRegisterStaticClientSidecar(c2Agent, "secondary-to-alpha", false) + require.NoError(t, err) + assertCatalogService(t, c2Agent.GetClient(), "static-client-sidecar-proxy", nil) + + // Ensure envoy is configured for the peer service and healthy. + _, adminPort := clientConnectProxy.GetAdminAddr() + libassert.AssertUpstreamEndpointStatus(t, adminPort, "static-server.default.secondary-to-alpha.external", "HEALTHY", 1) + libassert.AssertEnvoyMetricAtMost(t, adminPort, "cluster.static-server.default.secondary-to-alpha.external.", "upstream_cx_total", 0) + + // Make a call to the peered service multiple times. + _, port := clientConnectProxy.GetAddr() + for i := 0; i < 10; i++ { + libassert.HTTPServiceEchoes(t, "localhost", port, "") + libassert.AssertEnvoyMetricAtLeast(t, adminPort, "cluster.static-server.default.secondary-to-alpha.external.", "upstream_cx_total", i) + } + }) +} + +func assertCatalogService(t *testing.T, c *api.Client, svc string, opts *api.QueryOptions) { + retry.Run(t, func(r *retry.R) { + services, _, err := c.Catalog().Service(svc, "", opts) + if err != nil { + r.Fatal("error reading catalog data", err) + } + if len(services) == 0 { + r.Fatal("did not find catalog entry for ", svc) + } + }) +} + +func createCluster(t *testing.T, dc string, f func(c *libcluster.ConfigBuilder)) (*libcluster.Cluster, libcluster.Agent) { + ctx := libcluster.NewBuildContext(t, libcluster.BuildOptions{Datacenter: dc}) + conf := libcluster.NewConfigBuilder(ctx).Advanced(f) + + cluster, err := libcluster.New(t, []libcluster.Config{*conf.ToAgentConfig(t)}) + require.NoError(t, err) + + client := cluster.Agents[0].GetClient() + + libcluster.WaitForLeader(t, cluster, client) + libcluster.WaitForMembers(t, client, 1) + + agent, err := cluster.Leader() + require.NoError(t, err) + return cluster, agent +} + +func createConnectService(t *testing.T, cluster *libcluster.Cluster) libservice.Service { + node := cluster.Agents[0] + client := node.GetClient() + + // Create a service and proxy instance + opts := libservice.ServiceOpts{ + Name: libservice.StaticServerServiceName, + ID: libservice.StaticServerServiceName, + HTTPPort: 8080, + GRPCPort: 8079, + } + serverConnectProxy, _, err := libservice.CreateAndRegisterStaticServerAndSidecar(node, &opts) + require.NoError(t, err) + + assertCatalogService(t, client, "static-server-sidecar-proxy", nil) + assertCatalogService(t, client, "static-server", nil) + + return serverConnectProxy +}