diff --git a/agent/consul/enterprise_server_oss.go b/agent/consul/enterprise_server_oss.go index 187d59e97..d6e07ddd8 100644 --- a/agent/consul/enterprise_server_oss.go +++ b/agent/consul/enterprise_server_oss.go @@ -57,6 +57,12 @@ func (s *Server) revokeEnterpriseLeadership() error { return nil } +func (s *Server) startTenancyDeferredDeletion(ctx context.Context) { +} + +func (s *Server) stopTenancyDeferredDeletion() { +} + func (s *Server) validateEnterpriseRequest(entMeta *acl.EnterpriseMeta, write bool) error { return nil } diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 0d9359c14..d38e5015a 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -47,6 +47,9 @@ var LeaderSummaries = []prometheus.SummaryDefinition{ const ( newLeaderEvent = "consul:new-leader" barrierWriteTimeout = 2 * time.Minute + + defaultDeletionRoundBurst int = 5 // number replication round bursts + defaultDeletionApplyRate rate.Limit = 10 // raft applies per second ) var ( @@ -313,6 +316,8 @@ func (s *Server) establishLeadership(ctx context.Context) error { s.startPeeringStreamSync(ctx) + s.startDeferredDeletion(ctx) + if err := s.startConnectLeader(ctx); err != nil { return err } @@ -751,6 +756,16 @@ func (s *Server) stopACLReplication() { s.leaderRoutineManager.Stop(aclTokenReplicationRoutineName) } +func (s *Server) startDeferredDeletion(ctx context.Context) { + s.startPeeringDeferredDeletion(ctx) + s.startTenancyDeferredDeletion(ctx) +} + +func (s *Server) stopDeferredDeletion() { + s.leaderRoutineManager.Stop(peeringDeletionRoutineName) + s.stopTenancyDeferredDeletion() +} + func (s *Server) startConfigReplication(ctx context.Context) { if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter { // replication shouldn't run in the primary DC diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index 5efc52edf..dea09ebed 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -12,12 +12,17 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-uuid" + "golang.org/x/time/rate" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/rpc/peering" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/proto/pbpeering" ) @@ -297,3 +302,149 @@ func newPeerDialer(peerAddr string) func(context.Context, string) (net.Conn, err return conn, nil } } + +func (s *Server) startPeeringDeferredDeletion(ctx context.Context) { + s.leaderRoutineManager.Start(ctx, peeringDeletionRoutineName, s.runPeeringDeletions) +} + +// runPeeringDeletions watches for peerings marked for deletions and then cleans up data for them. +func (s *Server) runPeeringDeletions(ctx context.Context) error { + logger := s.loggers.Named(logging.Peering) + + // This limiter's purpose is to control the rate of raft applies caused by the deferred deletion + // process. This includes deletion of the peerings themselves in addition to any peering data + raftLimiter := rate.NewLimiter(defaultDeletionApplyRate, int(defaultDeletionApplyRate)) + for { + ws := memdb.NewWatchSet() + state := s.fsm.State() + _, peerings, err := s.fsm.State().PeeringListDeleted(ws) + if err != nil { + logger.Warn("encountered an error while searching for deleted peerings", "error", err) + continue + } + + if len(peerings) == 0 { + ws.Add(state.AbandonCh()) + + // wait for a peering to be deleted or the routine to be cancelled + if err := ws.WatchCtx(ctx); err != nil { + return err + } + continue + } + + for _, p := range peerings { + s.removePeeringAndData(ctx, logger, raftLimiter, p.Name, acl.PartitionOrDefault(p.Partition)) + } + } +} + +// removepPeeringAndData removes data imported for a peering and the peering itself. +func (s *Server) removePeeringAndData(ctx context.Context, logger hclog.Logger, limiter *rate.Limiter, peer string, partition string) { + logger = logger.With("peer", peer, "partition", partition) + + // First delete all imported data. + // By deleting all imported nodes we also delete all services and checks registered on them. + if err := s.deleteAllNodes(ctx, limiter, *structs.NodeEnterpriseMetaInPartition(partition), peer); err != nil { + logger.Error("Failed to remove Nodes for peer", "error", err) + return + } + if err := s.deleteTrustBundleFromPeer(ctx, limiter, *structs.NodeEnterpriseMetaInPartition(partition), peer); err != nil { + logger.Error("Failed to remove trust bundle for peer", "error", err) + return + } + + if err := limiter.Wait(ctx); err != nil { + return + } + + // Once all imported data is deleted, the peering itself is also deleted. + req := pbpeering.PeeringDeleteRequest{ + Name: peer, + Partition: partition, + } + _, err := s.raftApplyProtobuf(structs.PeeringDeleteType, &req) + if err != nil { + logger.Error("failed to apply full peering deletion", "error", err) + return + } +} + +// deleteAllNodes will delete all nodes in a partition or all nodes imported from a given peer name. +func (s *Server) deleteAllNodes(ctx context.Context, limiter *rate.Limiter, entMeta acl.EnterpriseMeta, peerName string) error { + // Same as ACL batch upsert size + nodeBatchSizeBytes := 256 * 1024 + + _, nodes, err := s.fsm.State().NodeDump(nil, &entMeta, peerName) + if err != nil { + return err + } + if len(nodes) == 0 { + return nil + } + + i := 0 + for { + var ops structs.TxnOps + for batchSize := 0; batchSize < nodeBatchSizeBytes && i < len(nodes); i++ { + entry := nodes[i] + + op := structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeDelete, + Node: structs.Node{ + Node: entry.Node, + Partition: entry.Partition, + PeerName: entry.PeerName, + }, + }, + } + ops = append(ops, &op) + + // Add entries to the transaction until it reaches the max batch size + batchSize += len(entry.Node) + len(entry.Partition) + } + + // Send each batch as a TXN Req to avoid sending one at a time + req := structs.TxnRequest{ + Datacenter: s.config.Datacenter, + Ops: ops, + } + if len(req.Ops) > 0 { + if err := limiter.Wait(ctx); err != nil { + return err + } + + _, err := s.raftApplyMsgpack(structs.TxnRequestType, &req) + if err != nil { + return err + } + } else { + break + } + } + + return nil +} + +// deleteTrustBundleFromPeer deletes the trust bundle imported from a peer, if present. +func (s *Server) deleteTrustBundleFromPeer(ctx context.Context, limiter *rate.Limiter, entMeta acl.EnterpriseMeta, peerName string) error { + _, bundle, err := s.fsm.State().PeeringTrustBundleRead(nil, state.Query{Value: peerName, EnterpriseMeta: entMeta}) + if err != nil { + return err + } + if bundle == nil { + return nil + } + + if err := limiter.Wait(ctx); err != nil { + return err + } + + req := pbpeering.PeeringTrustBundleDeleteRequest{ + Name: peerName, + Partition: entMeta.PartitionOrDefault(), + } + _, err = s.raftApplyProtobuf(structs.PeeringTrustBundleDeleteType, &req) + return err +} diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index feb520e14..e881b144e 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/api" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -107,7 +109,7 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) { Value: "my-peer-s2", }) require.NoError(r, err) - require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State) + require.Nil(r, peering) }) } @@ -196,6 +198,162 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { Value: "my-peer-s1", }) require.NoError(r, err) - require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State) + require.Nil(r, peering) }) } + +func TestLeader_Peering_DeferredDeletion(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + // TODO(peering): Configure with TLS + _, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "s1.dc1" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + }) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + var ( + peerName = "my-peer-s2" + defaultMeta = acl.DefaultEnterpriseMeta() + lastIdx = uint64(0) + ) + + // Simulate a peering initiation event by writing a peering to the state store. + lastIdx++ + require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{ + Name: peerName, + })) + + // Insert imported data: nodes, services, checks, trust bundle + lastIdx = insertTestPeeringData(t, s1.fsm.State(), peerName, lastIdx) + + // Mark the peering for deletion to trigger the termination sequence. + lastIdx++ + require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{ + Name: peerName, + DeletedAt: structs.TimeToProto(time.Now()), + })) + + // Ensure imported data is gone: + retry.Run(t, func(r *retry.R) { + _, csn, err := s1.fsm.State().ServiceDump(nil, "", false, defaultMeta, peerName) + require.NoError(r, err) + require.Len(r, csn, 0) + + _, checks, err := s1.fsm.State().ChecksInState(nil, api.HealthAny, defaultMeta, peerName) + require.NoError(r, err) + require.Len(r, checks, 0) + + _, nodes, err := s1.fsm.State().NodeDump(nil, defaultMeta, peerName) + require.NoError(r, err) + require.Len(r, nodes, 0) + + _, tb, err := s1.fsm.State().PeeringTrustBundleRead(nil, state.Query{Value: peerName}) + require.NoError(r, err) + require.Nil(r, tb) + }) + + // The leader routine should pick up the deletion and finish deleting the peering. + retry.Run(t, func(r *retry.R) { + _, peering, err := s1.fsm.State().PeeringRead(nil, state.Query{ + Value: peerName, + }) + require.NoError(r, err) + require.Nil(r, peering) + }) +} + +func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastIdx uint64) uint64 { + lastIdx++ + require.NoError(t, store.PeeringTrustBundleWrite(lastIdx, &pbpeering.PeeringTrustBundle{ + TrustDomain: "952e6bd1-f4d6-47f7-83ff-84b31babaa17", + PeerName: peer, + RootPEMs: []string{"certificate bundle"}, + })) + + lastIdx++ + require.NoError(t, store.EnsureRegistration(lastIdx, &structs.RegisterRequest{ + Node: "aaa", + Address: "10.0.0.1", + PeerName: peer, + Service: &structs.NodeService{ + Service: "a-service", + ID: "a-service-1", + Port: 8080, + PeerName: peer, + }, + Checks: structs.HealthChecks{ + { + CheckID: "a-service-1-check", + ServiceName: "a-service", + ServiceID: "a-service-1", + Node: "aaa", + PeerName: peer, + }, + { + CheckID: structs.SerfCheckID, + Node: "aaa", + PeerName: peer, + }, + }, + })) + + lastIdx++ + require.NoError(t, store.EnsureRegistration(lastIdx, &structs.RegisterRequest{ + Node: "bbb", + Address: "10.0.0.2", + PeerName: peer, + Service: &structs.NodeService{ + Service: "b-service", + ID: "b-service-1", + Port: 8080, + PeerName: peer, + }, + Checks: structs.HealthChecks{ + { + CheckID: "b-service-1-check", + ServiceName: "b-service", + ServiceID: "b-service-1", + Node: "bbb", + PeerName: peer, + }, + { + CheckID: structs.SerfCheckID, + Node: "bbb", + PeerName: peer, + }, + }, + })) + + lastIdx++ + require.NoError(t, store.EnsureRegistration(lastIdx, &structs.RegisterRequest{ + Node: "ccc", + Address: "10.0.0.3", + PeerName: peer, + Service: &structs.NodeService{ + Service: "c-service", + ID: "c-service-1", + Port: 8080, + PeerName: peer, + }, + Checks: structs.HealthChecks{ + { + CheckID: "c-service-1-check", + ServiceName: "c-service", + ServiceID: "c-service-1", + Node: "ccc", + PeerName: peer, + }, + { + CheckID: structs.SerfCheckID, + Node: "ccc", + PeerName: peer, + }, + }, + })) + + return lastIdx +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 344efb167..0b8702cba 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -126,6 +126,7 @@ const ( backgroundCAInitializationRoutineName = "CA initialization" virtualIPCheckRoutineName = "virtual IP version check" peeringStreamsRoutineName = "streaming peering resources" + peeringDeletionRoutineName = "peering deferred deletion" ) var ( diff --git a/agent/peering_endpoint_test.go b/agent/peering_endpoint_test.go index 412ea4ae4..07dd9bfaf 100644 --- a/agent/peering_endpoint_test.go +++ b/agent/peering_endpoint_test.go @@ -12,13 +12,12 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/encoding/protojson" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto/pbpeering" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" + "github.com/stretchr/testify/require" ) var validCA = ` @@ -344,18 +343,14 @@ func TestHTTP_Peering_Delete(t *testing.T) { require.Equal(t, "", resp.Body.String()) }) - t.Run("now the token is marked for deletion", func(t *testing.T) { - req, err := http.NewRequest("GET", "/v1/peering/foo", nil) - require.NoError(t, err) - resp := httptest.NewRecorder() - a.srv.h.ServeHTTP(resp, req) - require.Equal(t, http.StatusOK, resp.Code) - - var p pbpeering.Peering - body, err := io.ReadAll(resp.Body) - require.NoError(t, err) - require.NoError(t, protojson.Unmarshal(body, &p)) - require.False(t, p.IsActive()) + t.Run("now the token is deleted and reads should yield a 404", func(t *testing.T) { + retry.Run(t, func(r *retry.R) { + req, err := http.NewRequest("GET", "/v1/peering/foo", nil) + require.NoError(t, err) + resp := httptest.NewRecorder() + a.srv.h.ServeHTTP(resp, req) + require.Equal(r, http.StatusNotFound, resp.Code) + }) }) t.Run("delete a token that does not exist", func(t *testing.T) { diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index 2cedfc80b..1e462dd9e 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" @@ -297,11 +298,14 @@ func TestPeeringService_Delete(t *testing.T) { _, err = client.PeeringDelete(ctx, &pbpeering.PeeringDeleteRequest{Name: "foo"}) require.NoError(t, err) - // "foo" peering must only be marked for deletion, rather than actually be deleted. - _, resp, err := s.Server.FSM().State().PeeringRead(nil, state.Query{Value: "foo"}) - require.NoError(t, err) - require.NotNil(t, resp.DeletedAt) - require.False(t, resp.IsActive()) + retry.Run(t, func(r *retry.R) { + _, resp, err := s.Server.FSM().State().PeeringRead(nil, state.Query{Value: "foo"}) + require.NoError(r, err) + + // Initially the peering will be marked for deletion but eventually the leader + // routine will clean it up. + require.Nil(r, resp) + }) } func TestPeeringService_List(t *testing.T) { diff --git a/api/peering_test.go b/api/peering_test.go index ea93e6345..015da723f 100644 --- a/api/peering_test.go +++ b/api/peering_test.go @@ -212,11 +212,12 @@ func TestAPI_Peering_GenerateToken_Read_Establish_Delete(t *testing.T) { require.NoError(t, err) require.NotNil(t, wm) - // Read to see if the token is marked for deletion - resp, qm, err := c.Peerings().Read(ctx, "peer1", nil) - require.NoError(t, err) - require.NotNil(t, qm) - require.NotNil(t, resp) - require.False(t, resp.DeletedAt.IsZero()) + // Read to see if the token is gone + retry.Run(t, func(r *retry.R) { + resp, qm, err := c.Peerings().Read(ctx, "peer1", nil) + require.NoError(r, err) + require.NotNil(r, qm) + require.Nil(r, resp) + }) }) }