Add leader routine to clean up peerings
Once a peering is marked for deletion a new leader routine will now clean up all imported resources and then the peering itself. A lot of the logic was grabbed from the namespace/partitions deferred deletions but with a handful of simplifications: - The rate limiting is not configurable. - Deleting imported nodes/services/checks is done by deleting nodes with the Txn API. The services and checks are deleted as a side-effect. - There is no "round rate limiter" like with namespaces and partitions. This is because peerings are purely local, and deleting a peering in the datacenter does not depend on deleting data from other DCs like with WAN-federated namespaces. All rate limiting is handled by the Raft rate limiter.
This commit is contained in:
parent
2adb9f7c8a
commit
a5283e4361
|
@ -57,6 +57,12 @@ func (s *Server) revokeEnterpriseLeadership() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) startTenancyDeferredDeletion(ctx context.Context) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) stopTenancyDeferredDeletion() {
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) validateEnterpriseRequest(entMeta *acl.EnterpriseMeta, write bool) error {
|
func (s *Server) validateEnterpriseRequest(entMeta *acl.EnterpriseMeta, write bool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,9 @@ var LeaderSummaries = []prometheus.SummaryDefinition{
|
||||||
const (
|
const (
|
||||||
newLeaderEvent = "consul:new-leader"
|
newLeaderEvent = "consul:new-leader"
|
||||||
barrierWriteTimeout = 2 * time.Minute
|
barrierWriteTimeout = 2 * time.Minute
|
||||||
|
|
||||||
|
defaultDeletionRoundBurst int = 5 // number replication round bursts
|
||||||
|
defaultDeletionApplyRate rate.Limit = 10 // raft applies per second
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -313,6 +316,8 @@ func (s *Server) establishLeadership(ctx context.Context) error {
|
||||||
|
|
||||||
s.startPeeringStreamSync(ctx)
|
s.startPeeringStreamSync(ctx)
|
||||||
|
|
||||||
|
s.startDeferredDeletion(ctx)
|
||||||
|
|
||||||
if err := s.startConnectLeader(ctx); err != nil {
|
if err := s.startConnectLeader(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -751,6 +756,16 @@ func (s *Server) stopACLReplication() {
|
||||||
s.leaderRoutineManager.Stop(aclTokenReplicationRoutineName)
|
s.leaderRoutineManager.Stop(aclTokenReplicationRoutineName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) startDeferredDeletion(ctx context.Context) {
|
||||||
|
s.startPeeringDeferredDeletion(ctx)
|
||||||
|
s.startTenancyDeferredDeletion(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) stopDeferredDeletion() {
|
||||||
|
s.leaderRoutineManager.Stop(peeringDeletionRoutineName)
|
||||||
|
s.stopTenancyDeferredDeletion()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) startConfigReplication(ctx context.Context) {
|
func (s *Server) startConfigReplication(ctx context.Context) {
|
||||||
if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter {
|
if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter {
|
||||||
// replication shouldn't run in the primary DC
|
// replication shouldn't run in the primary DC
|
||||||
|
|
|
@ -12,12 +12,17 @@ import (
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
"github.com/hashicorp/go-uuid"
|
"github.com/hashicorp/go-uuid"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
"github.com/hashicorp/consul/agent/rpc/peering"
|
"github.com/hashicorp/consul/agent/rpc/peering"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/api"
|
||||||
|
"github.com/hashicorp/consul/logging"
|
||||||
"github.com/hashicorp/consul/proto/pbpeering"
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -297,3 +302,149 @@ func newPeerDialer(peerAddr string) func(context.Context, string) (net.Conn, err
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) startPeeringDeferredDeletion(ctx context.Context) {
|
||||||
|
s.leaderRoutineManager.Start(ctx, peeringDeletionRoutineName, s.runPeeringDeletions)
|
||||||
|
}
|
||||||
|
|
||||||
|
// runPeeringDeletions watches for peerings marked for deletions and then cleans up data for them.
|
||||||
|
func (s *Server) runPeeringDeletions(ctx context.Context) error {
|
||||||
|
logger := s.loggers.Named(logging.Peering)
|
||||||
|
|
||||||
|
// This limiter's purpose is to control the rate of raft applies caused by the deferred deletion
|
||||||
|
// process. This includes deletion of the peerings themselves in addition to any peering data
|
||||||
|
raftLimiter := rate.NewLimiter(defaultDeletionApplyRate, int(defaultDeletionApplyRate))
|
||||||
|
for {
|
||||||
|
ws := memdb.NewWatchSet()
|
||||||
|
state := s.fsm.State()
|
||||||
|
_, peerings, err := s.fsm.State().PeeringListDeleted(ws)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warn("encountered an error while searching for deleted peerings", "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(peerings) == 0 {
|
||||||
|
ws.Add(state.AbandonCh())
|
||||||
|
|
||||||
|
// wait for a peering to be deleted or the routine to be cancelled
|
||||||
|
if err := ws.WatchCtx(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range peerings {
|
||||||
|
s.removePeeringAndData(ctx, logger, raftLimiter, p.Name, acl.PartitionOrDefault(p.Partition))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// removepPeeringAndData removes data imported for a peering and the peering itself.
|
||||||
|
func (s *Server) removePeeringAndData(ctx context.Context, logger hclog.Logger, limiter *rate.Limiter, peer string, partition string) {
|
||||||
|
logger = logger.With("peer", peer, "partition", partition)
|
||||||
|
|
||||||
|
// First delete all imported data.
|
||||||
|
// By deleting all imported nodes we also delete all services and checks registered on them.
|
||||||
|
if err := s.deleteAllNodes(ctx, limiter, *structs.NodeEnterpriseMetaInPartition(partition), peer); err != nil {
|
||||||
|
logger.Error("Failed to remove Nodes for peer", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := s.deleteTrustBundleFromPeer(ctx, limiter, *structs.NodeEnterpriseMetaInPartition(partition), peer); err != nil {
|
||||||
|
logger.Error("Failed to remove trust bundle for peer", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := limiter.Wait(ctx); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Once all imported data is deleted, the peering itself is also deleted.
|
||||||
|
req := pbpeering.PeeringDeleteRequest{
|
||||||
|
Name: peer,
|
||||||
|
Partition: partition,
|
||||||
|
}
|
||||||
|
_, err := s.raftApplyProtobuf(structs.PeeringDeleteType, &req)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("failed to apply full peering deletion", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteAllNodes will delete all nodes in a partition or all nodes imported from a given peer name.
|
||||||
|
func (s *Server) deleteAllNodes(ctx context.Context, limiter *rate.Limiter, entMeta acl.EnterpriseMeta, peerName string) error {
|
||||||
|
// Same as ACL batch upsert size
|
||||||
|
nodeBatchSizeBytes := 256 * 1024
|
||||||
|
|
||||||
|
_, nodes, err := s.fsm.State().NodeDump(nil, &entMeta, peerName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(nodes) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
i := 0
|
||||||
|
for {
|
||||||
|
var ops structs.TxnOps
|
||||||
|
for batchSize := 0; batchSize < nodeBatchSizeBytes && i < len(nodes); i++ {
|
||||||
|
entry := nodes[i]
|
||||||
|
|
||||||
|
op := structs.TxnOp{
|
||||||
|
Node: &structs.TxnNodeOp{
|
||||||
|
Verb: api.NodeDelete,
|
||||||
|
Node: structs.Node{
|
||||||
|
Node: entry.Node,
|
||||||
|
Partition: entry.Partition,
|
||||||
|
PeerName: entry.PeerName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ops = append(ops, &op)
|
||||||
|
|
||||||
|
// Add entries to the transaction until it reaches the max batch size
|
||||||
|
batchSize += len(entry.Node) + len(entry.Partition)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send each batch as a TXN Req to avoid sending one at a time
|
||||||
|
req := structs.TxnRequest{
|
||||||
|
Datacenter: s.config.Datacenter,
|
||||||
|
Ops: ops,
|
||||||
|
}
|
||||||
|
if len(req.Ops) > 0 {
|
||||||
|
if err := limiter.Wait(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := s.raftApplyMsgpack(structs.TxnRequestType, &req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteTrustBundleFromPeer deletes the trust bundle imported from a peer, if present.
|
||||||
|
func (s *Server) deleteTrustBundleFromPeer(ctx context.Context, limiter *rate.Limiter, entMeta acl.EnterpriseMeta, peerName string) error {
|
||||||
|
_, bundle, err := s.fsm.State().PeeringTrustBundleRead(nil, state.Query{Value: peerName, EnterpriseMeta: entMeta})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if bundle == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := limiter.Wait(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
req := pbpeering.PeeringTrustBundleDeleteRequest{
|
||||||
|
Name: peerName,
|
||||||
|
Partition: entMeta.PartitionOrDefault(),
|
||||||
|
}
|
||||||
|
_, err = s.raftApplyProtobuf(structs.PeeringTrustBundleDeleteType, &req)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -7,6 +7,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
@ -107,7 +109,7 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) {
|
||||||
Value: "my-peer-s2",
|
Value: "my-peer-s2",
|
||||||
})
|
})
|
||||||
require.NoError(r, err)
|
require.NoError(r, err)
|
||||||
require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State)
|
require.Nil(r, peering)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -196,6 +198,162 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
|
||||||
Value: "my-peer-s1",
|
Value: "my-peer-s1",
|
||||||
})
|
})
|
||||||
require.NoError(r, err)
|
require.NoError(r, err)
|
||||||
require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State)
|
require.Nil(r, peering)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLeader_Peering_DeferredDeletion(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(peering): Configure with TLS
|
||||||
|
_, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.NodeName = "s1.dc1"
|
||||||
|
c.Datacenter = "dc1"
|
||||||
|
c.TLSConfig.Domain = "consul"
|
||||||
|
})
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
var (
|
||||||
|
peerName = "my-peer-s2"
|
||||||
|
defaultMeta = acl.DefaultEnterpriseMeta()
|
||||||
|
lastIdx = uint64(0)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Simulate a peering initiation event by writing a peering to the state store.
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||||
|
Name: peerName,
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Insert imported data: nodes, services, checks, trust bundle
|
||||||
|
lastIdx = insertTestPeeringData(t, s1.fsm.State(), peerName, lastIdx)
|
||||||
|
|
||||||
|
// Mark the peering for deletion to trigger the termination sequence.
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||||
|
Name: peerName,
|
||||||
|
DeletedAt: structs.TimeToProto(time.Now()),
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Ensure imported data is gone:
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
_, csn, err := s1.fsm.State().ServiceDump(nil, "", false, defaultMeta, peerName)
|
||||||
|
require.NoError(r, err)
|
||||||
|
require.Len(r, csn, 0)
|
||||||
|
|
||||||
|
_, checks, err := s1.fsm.State().ChecksInState(nil, api.HealthAny, defaultMeta, peerName)
|
||||||
|
require.NoError(r, err)
|
||||||
|
require.Len(r, checks, 0)
|
||||||
|
|
||||||
|
_, nodes, err := s1.fsm.State().NodeDump(nil, defaultMeta, peerName)
|
||||||
|
require.NoError(r, err)
|
||||||
|
require.Len(r, nodes, 0)
|
||||||
|
|
||||||
|
_, tb, err := s1.fsm.State().PeeringTrustBundleRead(nil, state.Query{Value: peerName})
|
||||||
|
require.NoError(r, err)
|
||||||
|
require.Nil(r, tb)
|
||||||
|
})
|
||||||
|
|
||||||
|
// The leader routine should pick up the deletion and finish deleting the peering.
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
_, peering, err := s1.fsm.State().PeeringRead(nil, state.Query{
|
||||||
|
Value: peerName,
|
||||||
|
})
|
||||||
|
require.NoError(r, err)
|
||||||
|
require.Nil(r, peering)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastIdx uint64) uint64 {
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, store.PeeringTrustBundleWrite(lastIdx, &pbpeering.PeeringTrustBundle{
|
||||||
|
TrustDomain: "952e6bd1-f4d6-47f7-83ff-84b31babaa17",
|
||||||
|
PeerName: peer,
|
||||||
|
RootPEMs: []string{"certificate bundle"},
|
||||||
|
}))
|
||||||
|
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, store.EnsureRegistration(lastIdx, &structs.RegisterRequest{
|
||||||
|
Node: "aaa",
|
||||||
|
Address: "10.0.0.1",
|
||||||
|
PeerName: peer,
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "a-service",
|
||||||
|
ID: "a-service-1",
|
||||||
|
Port: 8080,
|
||||||
|
PeerName: peer,
|
||||||
|
},
|
||||||
|
Checks: structs.HealthChecks{
|
||||||
|
{
|
||||||
|
CheckID: "a-service-1-check",
|
||||||
|
ServiceName: "a-service",
|
||||||
|
ServiceID: "a-service-1",
|
||||||
|
Node: "aaa",
|
||||||
|
PeerName: peer,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
CheckID: structs.SerfCheckID,
|
||||||
|
Node: "aaa",
|
||||||
|
PeerName: peer,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, store.EnsureRegistration(lastIdx, &structs.RegisterRequest{
|
||||||
|
Node: "bbb",
|
||||||
|
Address: "10.0.0.2",
|
||||||
|
PeerName: peer,
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "b-service",
|
||||||
|
ID: "b-service-1",
|
||||||
|
Port: 8080,
|
||||||
|
PeerName: peer,
|
||||||
|
},
|
||||||
|
Checks: structs.HealthChecks{
|
||||||
|
{
|
||||||
|
CheckID: "b-service-1-check",
|
||||||
|
ServiceName: "b-service",
|
||||||
|
ServiceID: "b-service-1",
|
||||||
|
Node: "bbb",
|
||||||
|
PeerName: peer,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
CheckID: structs.SerfCheckID,
|
||||||
|
Node: "bbb",
|
||||||
|
PeerName: peer,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, store.EnsureRegistration(lastIdx, &structs.RegisterRequest{
|
||||||
|
Node: "ccc",
|
||||||
|
Address: "10.0.0.3",
|
||||||
|
PeerName: peer,
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "c-service",
|
||||||
|
ID: "c-service-1",
|
||||||
|
Port: 8080,
|
||||||
|
PeerName: peer,
|
||||||
|
},
|
||||||
|
Checks: structs.HealthChecks{
|
||||||
|
{
|
||||||
|
CheckID: "c-service-1-check",
|
||||||
|
ServiceName: "c-service",
|
||||||
|
ServiceID: "c-service-1",
|
||||||
|
Node: "ccc",
|
||||||
|
PeerName: peer,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
CheckID: structs.SerfCheckID,
|
||||||
|
Node: "ccc",
|
||||||
|
PeerName: peer,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
return lastIdx
|
||||||
|
}
|
||||||
|
|
|
@ -126,6 +126,7 @@ const (
|
||||||
backgroundCAInitializationRoutineName = "CA initialization"
|
backgroundCAInitializationRoutineName = "CA initialization"
|
||||||
virtualIPCheckRoutineName = "virtual IP version check"
|
virtualIPCheckRoutineName = "virtual IP version check"
|
||||||
peeringStreamsRoutineName = "streaming peering resources"
|
peeringStreamsRoutineName = "streaming peering resources"
|
||||||
|
peeringDeletionRoutineName = "peering deferred deletion"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -12,13 +12,12 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
"google.golang.org/protobuf/encoding/protojson"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/proto/pbpeering"
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
var validCA = `
|
var validCA = `
|
||||||
|
@ -344,18 +343,14 @@ func TestHTTP_Peering_Delete(t *testing.T) {
|
||||||
require.Equal(t, "", resp.Body.String())
|
require.Equal(t, "", resp.Body.String())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("now the token is marked for deletion", func(t *testing.T) {
|
t.Run("now the token is deleted and reads should yield a 404", func(t *testing.T) {
|
||||||
req, err := http.NewRequest("GET", "/v1/peering/foo", nil)
|
retry.Run(t, func(r *retry.R) {
|
||||||
require.NoError(t, err)
|
req, err := http.NewRequest("GET", "/v1/peering/foo", nil)
|
||||||
resp := httptest.NewRecorder()
|
require.NoError(t, err)
|
||||||
a.srv.h.ServeHTTP(resp, req)
|
resp := httptest.NewRecorder()
|
||||||
require.Equal(t, http.StatusOK, resp.Code)
|
a.srv.h.ServeHTTP(resp, req)
|
||||||
|
require.Equal(r, http.StatusNotFound, resp.Code)
|
||||||
var p pbpeering.Peering
|
})
|
||||||
body, err := io.ReadAll(resp.Body)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NoError(t, protojson.Unmarshal(body, &p))
|
|
||||||
require.False(t, p.IsActive())
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("delete a token that does not exist", func(t *testing.T) {
|
t.Run("delete a token that does not exist", func(t *testing.T) {
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-uuid"
|
"github.com/hashicorp/go-uuid"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -297,11 +298,14 @@ func TestPeeringService_Delete(t *testing.T) {
|
||||||
_, err = client.PeeringDelete(ctx, &pbpeering.PeeringDeleteRequest{Name: "foo"})
|
_, err = client.PeeringDelete(ctx, &pbpeering.PeeringDeleteRequest{Name: "foo"})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// "foo" peering must only be marked for deletion, rather than actually be deleted.
|
retry.Run(t, func(r *retry.R) {
|
||||||
_, resp, err := s.Server.FSM().State().PeeringRead(nil, state.Query{Value: "foo"})
|
_, resp, err := s.Server.FSM().State().PeeringRead(nil, state.Query{Value: "foo"})
|
||||||
require.NoError(t, err)
|
require.NoError(r, err)
|
||||||
require.NotNil(t, resp.DeletedAt)
|
|
||||||
require.False(t, resp.IsActive())
|
// Initially the peering will be marked for deletion but eventually the leader
|
||||||
|
// routine will clean it up.
|
||||||
|
require.Nil(r, resp)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPeeringService_List(t *testing.T) {
|
func TestPeeringService_List(t *testing.T) {
|
||||||
|
|
|
@ -212,11 +212,12 @@ func TestAPI_Peering_GenerateToken_Read_Establish_Delete(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, wm)
|
require.NotNil(t, wm)
|
||||||
|
|
||||||
// Read to see if the token is marked for deletion
|
// Read to see if the token is gone
|
||||||
resp, qm, err := c.Peerings().Read(ctx, "peer1", nil)
|
retry.Run(t, func(r *retry.R) {
|
||||||
require.NoError(t, err)
|
resp, qm, err := c.Peerings().Read(ctx, "peer1", nil)
|
||||||
require.NotNil(t, qm)
|
require.NoError(r, err)
|
||||||
require.NotNil(t, resp)
|
require.NotNil(r, qm)
|
||||||
require.False(t, resp.DeletedAt.IsZero())
|
require.Nil(r, resp)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue