peering: don't track imported services/nodes in usage

Services/nodes that are imported from other peers are stored in
state. We don't want to count those as part of our own cluster's usage.
This commit is contained in:
Luke Kysow 2022-07-27 09:08:51 -07:00
parent 0e10e5b765
commit 92c1f30359
3 changed files with 141 additions and 3 deletions

View File

@ -57,17 +57,19 @@ func testStateStore(t *testing.T) *Store {
return s
}
// testRegisterNode registers a node into the state store.
func testRegisterNode(t *testing.T, s *Store, idx uint64, nodeID string) {
testRegisterNodeOpts(t, s, idx, nodeID)
}
// testRegisterNodeWithChange registers a node and ensures it gets different from previous registration
// testRegisterNodeWithChange registers a node and ensures it gets different from previous registration.
func testRegisterNodeWithChange(t *testing.T, s *Store, idx uint64, nodeID string) {
testRegisterNodeOpts(t, s, idx, nodeID, regNodeWithMeta(map[string]string{
"version": fmt.Sprint(idx),
}))
}
// testRegisterNodeWithMeta registers a node into the state store wth meta.
func testRegisterNodeWithMeta(t *testing.T, s *Store, idx uint64, nodeID string, meta map[string]string) {
testRegisterNodeOpts(t, s, idx, nodeID, regNodeWithMeta(meta))
}
@ -81,6 +83,15 @@ func regNodeWithMeta(meta map[string]string) func(*structs.Node) error {
}
}
// testRegisterNodePeer registers a node into the state store that was imported from peer.
func testRegisterNodePeer(t *testing.T, s *Store, idx uint64, nodeID string, peer string) {
testRegisterNodeOpts(t, s, idx, nodeID, func(node *structs.Node) error {
node.PeerName = peer
return nil
})
}
// testRegisterNodeOpts registers a node into the state store and runs opts to modify it prior to writing.
func testRegisterNodeOpts(t *testing.T, s *Store, idx uint64, nodeID string, opts ...regNodeOption) {
node := &structs.Node{Node: nodeID}
for _, opt := range opts {
@ -93,11 +104,12 @@ func testRegisterNodeOpts(t *testing.T, s *Store, idx uint64, nodeID string, opt
t.Fatalf("err: %s", err)
}
tx := s.db.Txn(false)
tx := s.db.ReadTxn()
defer tx.Abort()
n, err := tx.First(tableNodes, indexID, Query{
Value: nodeID,
EnterpriseMeta: *node.GetEnterpriseMeta(),
PeerName: node.PeerName,
})
if err != nil {
t.Fatalf("err: %s", err)
@ -107,10 +119,28 @@ func testRegisterNodeOpts(t *testing.T, s *Store, idx uint64, nodeID string, opt
}
}
// testRegisterServicePeer registers a service into the state store that was imported from peer.
func testRegisterServicePeer(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, peer string) {
testRegisterServiceOpts(t, s, idx, nodeID, serviceID, func(service *structs.NodeService) {
service.PeerName = peer
})
}
// testRegisterServiceOpts registers a service into the state store and runs opts to modify it prior to writing.
func testRegisterServiceOpts(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, opts ...func(service *structs.NodeService)) {
testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, false, opts...)
}
// testRegisterServiceWithChange registers a service and allow ensuring the consul index is updated
// even if service already exists if using `modifyAccordingIndex`.
// This is done by setting the transaction ID in "version" meta so service will be updated if it already exists
func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool) {
testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, modifyAccordingIndex)
}
// testRegisterServiceWithChangeOpts is the same as testRegisterServiceWithChange with the addition of opts that can
// modify the service prior to writing.
func testRegisterServiceWithChangeOpts(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool, opts ...func(service *structs.NodeService)) {
meta := make(map[string]string)
if modifyAccordingIndex {
meta["version"] = fmt.Sprint(idx)
@ -122,13 +152,17 @@ func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, s
Port: 1111,
Meta: meta,
}
for _, o := range opts {
o(svc)
}
if err := s.EnsureService(idx, nodeID, svc); err != nil {
t.Fatalf("err: %s", err)
}
tx := s.db.Txn(false)
defer tx.Abort()
service, err := tx.First(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
service, err := tx.First(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID, PeerName: svc.PeerName})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -205,6 +239,17 @@ func testRegisterCheckWithPartition(t *testing.T, s *Store, idx uint64,
}
func testRegisterSidecarProxy(t *testing.T, s *Store, idx uint64, nodeID string, targetServiceID string) {
testRegisterSidecarProxyOpts(t, s, idx, nodeID, targetServiceID)
}
// testRegisterSidecarProxyPeer adds a sidecar proxy to the state store that was imported from peer.
func testRegisterSidecarProxyPeer(t *testing.T, s *Store, idx uint64, nodeID string, targetServiceID string, peer string) {
testRegisterSidecarProxyOpts(t, s, idx, nodeID, targetServiceID, func(service *structs.NodeService) {
service.PeerName = peer
})
}
func testRegisterSidecarProxyOpts(t *testing.T, s *Store, idx uint64, nodeID string, targetServiceID string, opts ...func(service *structs.NodeService)) {
svc := &structs.NodeService{
ID: targetServiceID + "-sidecar-proxy",
Service: targetServiceID + "-sidecar-proxy",
@ -215,10 +260,24 @@ func testRegisterSidecarProxy(t *testing.T, s *Store, idx uint64, nodeID string,
DestinationServiceID: targetServiceID,
},
}
for _, o := range opts {
o(svc)
}
require.NoError(t, s.EnsureService(idx, nodeID, svc))
}
func testRegisterConnectNativeService(t *testing.T, s *Store, idx uint64, nodeID string, serviceID string) {
testRegisterConnectNativeServiceOpts(t, s, idx, nodeID, serviceID)
}
// testRegisterConnectNativeServicePeer adds a connect native service to the state store that was imported from peer.
func testRegisterConnectNativeServicePeer(t *testing.T, s *Store, idx uint64, nodeID string, serviceID string, peer string) {
testRegisterConnectNativeServiceOpts(t, s, idx, nodeID, serviceID, func(service *structs.NodeService) {
service.PeerName = peer
})
}
func testRegisterConnectNativeServiceOpts(t *testing.T, s *Store, idx uint64, nodeID string, serviceID string, opts ...func(service *structs.NodeService)) {
svc := &structs.NodeService{
ID: serviceID,
Service: serviceID,
@ -227,6 +286,9 @@ func testRegisterConnectNativeService(t *testing.T, s *Store, idx uint64, nodeID
Native: true,
},
}
for _, o := range opts {
o(svc)
}
require.NoError(t, s.EnsureService(idx, nodeID, svc))
}

View File

@ -99,11 +99,20 @@ func updateUsage(tx WriteTxn, changes Changes) error {
switch change.Table {
case tableNodes:
node := changeObject(change).(*structs.Node)
if node.PeerName != "" {
// TODO(peering) track peered nodes separately. For now not tracking to avoid double billing.
continue
}
usageDeltas[change.Table] += delta
addEnterpriseNodeUsage(usageDeltas, change)
case tableServices:
svc := changeObject(change).(*structs.ServiceNode)
if svc.PeerName != "" {
// TODO(peering) track peered services separately. For now not tracking to avoid double billing.
continue
}
usageDeltas[change.Table] += delta
addEnterpriseServiceInstanceUsage(usageDeltas, change)

View File

@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
)
func TestStateStore_Usage_NodeUsage(t *testing.T) {
@ -45,6 +46,32 @@ func TestStateStore_Usage_NodeUsage_Delete(t *testing.T) {
require.Equal(t, usage.Nodes, 1)
}
// Test that nodes added/deleted from peers are not counted.
func TestStateStore_Usage_NodeUsagePeering(t *testing.T) {
s := testStateStore(t)
// Register two nodes, one local, one from a remote peer.
testRegisterNodePeer(t, s, 0, "node1", "peer")
testRegisterNode(t, s, 1, "node2")
testutil.RunStep(t, "write node", func(t *testing.T) {
// Test that we're only tracking the local node.
idx, usage, err := s.NodeUsage()
require.NoError(t, err)
require.Equal(t, uint64(1), idx)
require.Equal(t, 1, usage.Nodes)
})
testutil.RunStep(t, "delete node", func(t *testing.T) {
// Deregister the remote peered node. Count should remain the same.
require.NoError(t, s.DeleteNode(2, "node1", nil, "peer"))
idx, usage, err := s.NodeUsage()
require.NoError(t, err)
require.Equal(t, uint64(1), idx)
require.Equal(t, 1, usage.Nodes)
})
}
func TestStateStore_Usage_KVUsage(t *testing.T) {
s := testStateStore(t)
@ -164,6 +191,46 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
}
}
// Test that services from remote peers aren't counted in writes or deletes.
func TestStateStore_Usage_ServiceUsagePeering(t *testing.T) {
s := testStateStore(t)
peerName := "peer"
// Register remote peer node/services.
testRegisterNodePeer(t, s, 0, "node1", peerName)
testRegisterServicePeer(t, s, 1, "node1", "service1", peerName)
testRegisterSidecarProxyPeer(t, s, 2, "node1", "service1", peerName)
testRegisterConnectNativeServicePeer(t, s, 3, "node1", "service-native", peerName)
// Register local node/services.
testRegisterNode(t, s, 4, "node2")
testRegisterService(t, s, 5, "node2", "service1")
testRegisterSidecarProxy(t, s, 6, "node2", "service1")
testRegisterConnectNativeService(t, s, 7, "node2", "service-native")
testutil.RunStep(t, "writes", func(t *testing.T) {
idx, usage, err := s.ServiceUsage()
require.NoError(t, err)
require.Equal(t, uint64(7), idx)
require.Equal(t, 3, usage.Services)
require.Equal(t, 3, usage.ServiceInstances)
require.Equal(t, 1, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 1, usage.ConnectServiceInstances[connectNativeInstancesTable])
})
testutil.RunStep(t, "deletes", func(t *testing.T) {
require.NoError(t, s.DeleteNode(7, "node1", nil, peerName))
require.NoError(t, s.DeleteNode(8, "node2", nil, ""))
idx, usage, err := s.ServiceUsage()
require.NoError(t, err)
require.Equal(t, uint64(8), idx)
require.Equal(t, 0, usage.Services)
require.Equal(t, 0, usage.ServiceInstances)
require.Equal(t, 0, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 0, usage.ConnectServiceInstances[connectNativeInstancesTable])
})
}
func TestStateStore_Usage_Restore(t *testing.T) {
s := testStateStore(t)
restore := s.Restore()