Add a new table to query service names by kind

This table purposefully does not index by partition/namespace. It's a
global view into all service names.

This table is intended to replace the current serviceListTxn watch in
intentionTopologyTxn. For cross-partition transparent proxying we need
to be able to calculate upstreams from intentions in any partition. This
means that the existing serviceListTxn function is insufficient since
it's scoped to a partition.

Moving away from that function is also beneficial because it watches the
main "services" table, so watchers will wake up when any instance is
registered or deregistered.
This commit is contained in:
freddygv 2021-12-01 17:44:13 -07:00
parent 97b4068137
commit 142d8193e5
9 changed files with 370 additions and 10 deletions

View File

@ -464,6 +464,14 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.2")
_, serviceNames, err := fsm.state.ServiceNamesOfKind(nil, structs.ServiceKindTypical)
require.NoError(t, err)
expect := []string{"backend", "db", "frontend", "web"}
for i, sn := range serviceNames {
require.Equal(t, expect[i], sn.Service.Name)
}
// Snapshot
snap, err := fsm.Snapshot()
require.NoError(t, err)
@ -690,10 +698,10 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.Len(t, roots, 2)
// Verify provider state is restored.
_, state, err := fsm2.state.CAProviderState("asdf")
_, provider, err := fsm2.state.CAProviderState("asdf")
require.NoError(t, err)
require.Equal(t, "foo", state.PrivateKey)
require.Equal(t, "bar", state.RootCert)
require.Equal(t, "foo", provider.PrivateKey)
require.Equal(t, "bar", provider.RootCert)
// Verify CA configuration is restored.
_, caConf, err := fsm2.state.CAConfig(nil)
@ -751,6 +759,14 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.NoError(t, err)
require.Equal(t, meshConfig, meshConfigEntry)
_, restoredServiceNames, err := fsm2.state.ServiceNamesOfKind(nil, structs.ServiceKindTypical)
require.NoError(t, err)
expect = []string{"backend", "db", "frontend", "web"}
for i, sn := range restoredServiceNames {
require.Equal(t, expect[i], sn.Service.Name)
}
// Snapshot
snap, err = fsm2.Snapshot()
require.NoError(t, err)

View File

@ -741,6 +741,9 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
if err = checkGatewayWildcardsAndUpdate(tx, idx, svc); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
if err := upsertKindServiceName(tx, idx, svc.Kind, svc.CompoundServiceName()); err != nil {
return fmt.Errorf("failed to persist service name: %v", err)
}
// Update upstream/downstream mappings if it's a connect service
if svc.Kind == structs.ServiceKindConnectProxy || svc.Connect.Native {
@ -1691,6 +1694,9 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
if err := freeServiceVirtualIP(tx, svc.ServiceName, entMeta); err != nil {
return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err)
}
if err := cleanupKindServiceName(tx, idx, svc.CompoundServiceName(), svc.ServiceKind); err != nil {
return fmt.Errorf("failed to persist service name: %v", err)
}
}
} else {
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
@ -2526,6 +2532,26 @@ func (s *Store) VirtualIPForService(sn structs.ServiceName) (string, error) {
return result.String(), nil
}
func (s *Store) KindServiceNamesOfKind(ws memdb.WatchSet, kind structs.ServiceKind) (uint64, []*KindServiceName, error) {
tx := s.db.Txn(false)
defer tx.Abort()
var names []*KindServiceName
iter, err := tx.Get(tableKindServiceNames, indexKindOnly, kind)
if err != nil {
return 0, nil, err
}
ws.Add(iter.WatchCh())
idx := kindServiceNamesMaxIndex(tx, ws, kind)
for name := iter.Next(); name != nil; name = iter.Next() {
ksn := name.(*KindServiceName)
names = append(names, ksn)
}
return idx, names, nil
}
// parseCheckServiceNodes is used to parse through a given set of services,
// and query for an associated node and a set of checks. This is the inner
// method used to return a rich set of results from a more simple query.
@ -3862,3 +3888,44 @@ func truncateGatewayServiceTopologyMappings(tx WriteTxn, idx uint64, gateway str
return nil
}
func upsertKindServiceName(tx WriteTxn, idx uint64, kind structs.ServiceKind, name structs.ServiceName) error {
q := KindServiceNameQuery{Name: name.Name, Kind: kind, EnterpriseMeta: name.EnterpriseMeta}
existing, err := tx.First(tableKindServiceNames, indexID, q)
if err != nil {
return err
}
// Service name is already known. Nothing to do.
if existing != nil {
return nil
}
ksn := KindServiceName{
Kind: kind,
Service: name,
RaftIndex: structs.RaftIndex{
CreateIndex: idx,
ModifyIndex: idx,
},
}
if err := tx.Insert(tableKindServiceNames, &ksn); err != nil {
return fmt.Errorf("failed inserting %s/%s into %s: %s", kind, name.String(), tableKindServiceNames, err)
}
if err := indexUpdateMaxTxn(tx, idx, kindServiceNameIndexName(kind)); err != nil {
return fmt.Errorf("failed updating %s index: %v", tableKindServiceNames, err)
}
return nil
}
func cleanupKindServiceName(tx WriteTxn, idx uint64, name structs.ServiceName, kind structs.ServiceKind) error {
q := KindServiceNameQuery{Name: name.Name, Kind: kind, EnterpriseMeta: name.EnterpriseMeta}
if _, err := tx.DeleteAll(tableKindServiceNames, indexID, q); err != nil {
return fmt.Errorf("failed to delete %s from %s: %s", name, tableKindServiceNames, err)
}
if err := indexUpdateMaxTxn(tx, idx, kindServiceNameIndexName(kind)); err != nil {
return fmt.Errorf("failed updating %s index: %v", tableKindServiceNames, err)
}
return nil
}

View File

@ -5,6 +5,7 @@ package state
import (
"fmt"
"strings"
memdb "github.com/hashicorp/go-memdb"
@ -18,13 +19,7 @@ func serviceIndexName(name string, _ *structs.EnterpriseMeta) string {
}
func serviceKindIndexName(kind structs.ServiceKind, _ *structs.EnterpriseMeta) string {
switch kind {
case structs.ServiceKindTypical:
// needs a special case here
return "service_kind.typical"
default:
return "service_kind." + string(kind)
}
return "service_kind." + kind.Normalized()
}
func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, entMeta *structs.EnterpriseMeta) error {
@ -192,3 +187,22 @@ func validateRegisterRequestTxn(_ ReadTxn, _ *structs.RegisterRequest, _ bool) (
func (s *Store) ValidateRegisterRequest(_ *structs.RegisterRequest) (*structs.EnterpriseMeta, error) {
return nil, nil
}
func indexFromKindServiceName(arg interface{}) ([]byte, error) {
var b indexBuilder
switch n := arg.(type) {
case KindServiceNameQuery:
b.String(strings.ToLower(string(n.Kind)))
b.String(strings.ToLower(n.Name))
return b.Bytes(), nil
case *KindServiceName:
b.String(strings.ToLower(string(n.Kind)))
b.String(strings.ToLower(n.Service.Name))
return b.Bytes(), nil
default:
return nil, fmt.Errorf("type must be KindServiceNameQuery or *KindServiceName: %T", arg)
}
}

View File

@ -412,3 +412,40 @@ func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase {
},
}
}
func testIndexerTableKindServiceNames() map[string]indexerTestCase {
obj := &KindServiceName{
Service: structs.ServiceName{
Name: "web-sidecar-proxy",
},
Kind: structs.ServiceKindConnectProxy,
}
return map[string]indexerTestCase{
indexID: {
read: indexValue{
source: &KindServiceName{
Service: structs.ServiceName{
Name: "web-sidecar-proxy",
},
Kind: structs.ServiceKindConnectProxy,
},
expected: []byte("connect-proxy\x00web-sidecar-proxy\x00"),
},
write: indexValue{
source: obj,
expected: []byte("connect-proxy\x00web-sidecar-proxy\x00"),
},
},
indexKind: {
read: indexValue{
source: structs.ServiceKindConnectProxy,
expected: []byte("connect-proxy\x00"),
},
write: indexValue{
source: obj,
expected: []byte("connect-proxy\x00"),
},
},
}
}

View File

@ -19,6 +19,7 @@ const (
tableMeshTopology = "mesh-topology"
tableServiceVirtualIPs = "service-virtual-ips"
tableFreeVirtualIPs = "free-virtual-ips"
tableKindServiceNames = "kind-service-names"
indexID = "id"
indexService = "service"
@ -661,3 +662,80 @@ func freeVirtualIPTableSchema() *memdb.TableSchema {
},
}
}
type KindServiceName struct {
Kind structs.ServiceKind
Service structs.ServiceName
structs.RaftIndex
}
func kindServiceNameTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: tableKindServiceNames,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
AllowMissing: false,
Unique: true,
Indexer: indexerSingle{
readIndex: indexFromKindServiceName,
writeIndex: indexFromKindServiceName,
},
},
indexKindOnly: {
Name: indexKindOnly,
AllowMissing: false,
Unique: false,
Indexer: indexerSingle{
readIndex: indexFromKindServiceNameKindOnly,
writeIndex: indexFromKindServiceNameKindOnly,
},
},
},
}
}
// KindServiceNameQuery is used to lookup service names by kind or enterprise meta.
type KindServiceNameQuery struct {
Kind structs.ServiceKind
Name string
structs.EnterpriseMeta
}
// NamespaceOrDefault exists because structs.EnterpriseMeta uses a pointer
// receiver for this method. Remove once that is fixed.
func (q KindServiceNameQuery) NamespaceOrDefault() string {
return q.EnterpriseMeta.NamespaceOrDefault()
}
// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer
// receiver for this method. Remove once that is fixed.
func (q KindServiceNameQuery) PartitionOrDefault() string {
return q.EnterpriseMeta.PartitionOrDefault()
}
func indexFromKindServiceNameKindOnly(raw interface{}) ([]byte, error) {
switch x := raw.(type) {
case *KindServiceName:
var b indexBuilder
b.String(strings.ToLower(string(x.Kind)))
return b.Bytes(), nil
case structs.ServiceKind:
var b indexBuilder
b.String(strings.ToLower(string(x)))
return b.Bytes(), nil
default:
return nil, fmt.Errorf("type must be *KindServiceName or structs.ServiceKind: %T", raw)
}
}
func kindServiceNamesMaxIndex(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind) uint64 {
return maxIndexWatchTxn(tx, ws, kindServiceNameIndexName(kind))
}
func kindServiceNameIndexName(kind structs.ServiceKind) string {
return "kind_service_names." + kind.Normalized()
}

View File

@ -7656,6 +7656,143 @@ func TestProtocolForIngressGateway(t *testing.T) {
}
}
func TestStateStore_EnsureService_ServiceNames(t *testing.T) {
s := testStateStore(t)
// Create the service registration.
entMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
services := []structs.NodeService{
{
Kind: structs.ServiceKindIngressGateway,
ID: "ingress-gateway",
Service: "ingress-gateway",
Address: "2.2.2.2",
Port: 2222,
EnterpriseMeta: *entMeta,
},
{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway",
Service: "mesh-gateway",
Address: "4.4.4.4",
Port: 4444,
EnterpriseMeta: *entMeta,
},
{
Kind: structs.ServiceKindConnectProxy,
ID: "connect-proxy",
Service: "connect-proxy",
Address: "1.1.1.1",
Port: 1111,
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "foo"},
EnterpriseMeta: *entMeta,
},
{
Kind: structs.ServiceKindTerminatingGateway,
ID: "terminating-gateway",
Service: "terminating-gateway",
Address: "3.3.3.3",
Port: 3333,
EnterpriseMeta: *entMeta,
},
{
Kind: structs.ServiceKindTypical,
ID: "web",
Service: "web",
Address: "5.5.5.5",
Port: 5555,
EnterpriseMeta: *entMeta,
},
}
var idx uint64
testRegisterNode(t, s, idx, "node1")
for _, svc := range services {
idx++
require.NoError(t, s.EnsureService(idx, "node1", &svc))
// Ensure the service name was stored for all of them under the appropriate kind
gotIdx, gotNames, err := s.KindServiceNamesOfKind(nil, svc.Kind)
require.NoError(t, err)
require.Equal(t, idx, gotIdx)
require.Len(t, gotNames, 1)
require.Equal(t, svc.CompoundServiceName(), gotNames[0].Service)
require.Equal(t, svc.Kind, gotNames[0].Kind)
}
// Register another ingress gateway and there should be two names under the kind index
newIngress := structs.NodeService{
Kind: structs.ServiceKindIngressGateway,
ID: "new-ingress-gateway",
Service: "new-ingress-gateway",
Address: "6.6.6.6",
Port: 6666,
EnterpriseMeta: *entMeta,
}
idx++
require.NoError(t, s.EnsureService(idx, "node1", &newIngress))
gotIdx, got, err := s.KindServiceNamesOfKind(nil, structs.ServiceKindIngressGateway)
require.NoError(t, err)
require.Equal(t, idx, gotIdx)
expect := []*KindServiceName{
{
Kind: structs.ServiceKindIngressGateway,
Service: structs.NewServiceName("ingress-gateway", nil),
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
},
{
Kind: structs.ServiceKindIngressGateway,
Service: structs.NewServiceName("new-ingress-gateway", nil),
RaftIndex: structs.RaftIndex{
CreateIndex: idx,
ModifyIndex: idx,
},
},
}
require.Equal(t, expect, got)
// Deregister an ingress gateway and the index should not slide back
idx++
require.NoError(t, s.DeleteService(idx, "node1", "new-ingress-gateway", entMeta))
gotIdx, got, err = s.ServiceNamesOfKind(nil, structs.ServiceKindIngressGateway)
require.NoError(t, err)
require.Equal(t, idx, gotIdx)
require.Equal(t, expect[:1], got)
// Registering another instance of a known service should not bump the kind index
newMGW := structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway-1",
Service: "mesh-gateway",
Address: "7.7.7.7",
Port: 7777,
EnterpriseMeta: *entMeta,
}
idx++
require.NoError(t, s.EnsureService(idx, "node1", &newMGW))
gotIdx, _, err = s.KindServiceNamesOfKind(nil, structs.ServiceKindMeshGateway)
require.NoError(t, err)
require.Equal(t, uint64(2), gotIdx)
// Deregister the single typical service and the service name should also be dropped
idx++
require.NoError(t, s.DeleteService(idx, "node1", "web", entMeta))
gotIdx, got, err = s.KindServiceNamesOfKind(nil, structs.ServiceKindTypical)
require.NoError(t, err)
require.Equal(t, idx, gotIdx)
require.Empty(t, got)
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {

View File

@ -40,6 +40,7 @@ func newDBSchema() *memdb.DBSchema {
tombstonesTableSchema,
usageTableSchema,
freeVirtualIPTableSchema,
kindServiceNameTableSchema,
)
withEnterpriseSchema(db)
return db

View File

@ -50,6 +50,7 @@ func TestNewDBSchema_Indexers(t *testing.T) {
tableMeshTopology: testIndexerTableMeshTopology,
tableGatewayServices: testIndexerTableGatewayServices,
tableServiceVirtualIPs: testIndexerTableServiceVirtualIPs,
tableKindServiceNames: testIndexerTableKindServiceNames,
// KV
tableKVs: testIndexerTableKVs,
tableTombstones: testIndexerTableTombstones,

View File

@ -72,6 +72,7 @@ const (
SystemMetadataRequestType = 31
ServiceVirtualIPRequestType = 32
FreeVirtualIPRequestType = 33
KindServiceNamesType = 34
)
// if a new request type is added above it must be
@ -114,6 +115,7 @@ var requestTypeStrings = map[MessageType]string{
SystemMetadataRequestType: "SystemMetadata",
ServiceVirtualIPRequestType: "ServiceVirtualIP",
FreeVirtualIPRequestType: "FreeVirtualIP",
KindServiceNamesType: "KindServiceName",
}
const (
@ -1029,6 +1031,13 @@ type ServiceNodes []*ServiceNode
// ServiceKind is the kind of service being registered.
type ServiceKind string
func (k ServiceKind) Normalized() string {
if k == ServiceKindTypical {
return "typical"
}
return string(k)
}
const (
// ServiceKindTypical is a typical, classic Consul service. This is
// represented by the absence of a value. This was chosen for ease of