From b781fec6648a3cb603e25c04effa2d673cfbf1cf Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 10 Feb 2021 16:07:21 -0500 Subject: [PATCH 1/4] state: remove duplicate function catalogChecksForNodeService was a duplicate of catalogListServiceChecks --- agent/consul/state/catalog.go | 2 +- agent/consul/state/catalog_oss.go | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 52714468f..25a4a8568 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1317,7 +1317,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st // Delete any checks associated with the service. This will invalidate // sessions as necessary. - checks, err := catalogChecksForNodeService(tx, nodeName, serviceID, entMeta) + checks, err := catalogListServiceChecks(tx, nodeName, serviceID, entMeta) if err != nil { return fmt.Errorf("failed service check lookup: %s", err) } diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index cce6cb943..04bd7515d 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -157,11 +157,11 @@ func catalogListChecks(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultItera } func catalogListNodeChecks(tx ReadTxn, node string) (memdb.ResultIterator, error) { - return tx.Get("checks", "node_service_check", node, false) + return tx.Get("checks", indexNodeServiceCheck, node, false) } func catalogListServiceChecks(tx ReadTxn, node string, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - return tx.Get("checks", "node_service", node, service) + return tx.Get("checks", indexNodeService, node, service) } func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error { @@ -177,10 +177,6 @@ func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error return nil } -func catalogChecksForNodeService(tx ReadTxn, node string, service string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - return tx.Get("checks", "node_service", node, service) -} - func validateRegisterRequestTxn(_ ReadTxn, _ *structs.RegisterRequest, _ bool) (*structs.EnterpriseMeta, error) { return nil, nil } From 88a9bd6d3c9126037c73dd797077122f18b95a34 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 10 Feb 2021 19:40:32 -0500 Subject: [PATCH 2/4] state: remove duplicate index on the checks table By using a new pattern for more specific indexes. This allows us to use the same index for both service checks and node checks. It removes the abstraction around memdb.Txn operations, and isolates all of the enterprise differences in a single place (the indexer). --- agent/consul/state/catalog.go | 20 +++++++-- agent/consul/state/catalog_oss.go | 37 ++++++++++++---- agent/consul/state/catalog_schema.go | 43 ++++--------------- .../testdata/TestStateStoreSchema.golden | 4 +- 4 files changed, 56 insertions(+), 48 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 25a4a8568..98650230f 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1315,9 +1315,14 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st return nil } + // TODO: accept a non-pointer value for EnterpriseMeta + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } // Delete any checks associated with the service. This will invalidate // sessions as necessary. - checks, err := catalogListServiceChecks(tx, nodeName, serviceID, entMeta) + q := NodeServiceQuery{Node: nodeName, Service: serviceID, EnterpriseMeta: *entMeta} + checks, err := tx.Get(tableChecks, indexNodeService, q) if err != nil { return fmt.Errorf("failed service check lookup: %s", err) } @@ -1766,6 +1771,13 @@ func (s *Store) deleteCheckCASTxn(tx WriteTxn, idx, cidx uint64, node string, ch return true, nil } +// NodeServiceQuery is a type used to query the checks table. +type NodeServiceQuery struct { + Node string + Service string + structs.EnterpriseMeta +} + // deleteCheckTxn is the inner method used to call a health // check deletion within an existing transaction. func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error { @@ -2137,7 +2149,8 @@ func parseCheckServiceNodes( // First add the node-level checks. These always apply to any // service on the node. var checks structs.HealthChecks - iter, err := catalogListNodeChecks(tx, sn.Node) + q := NodeServiceQuery{Node: sn.Node, EnterpriseMeta: *structs.DefaultEnterpriseMeta()} + iter, err := tx.Get(tableChecks, indexNodeService, q) if err != nil { return 0, nil, err } @@ -2147,7 +2160,8 @@ func parseCheckServiceNodes( } // Now add the service-specific checks. - iter, err = catalogListServiceChecks(tx, sn.Node, sn.ServiceID, &sn.EnterpriseMeta) + q = NodeServiceQuery{Node: sn.Node, Service: sn.ServiceID, EnterpriseMeta: sn.EnterpriseMeta} + iter, err = tx.Get(tableChecks, indexNodeService, q) if err != nil { return 0, nil, err } diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index 04bd7515d..f0e59eeb6 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -4,6 +4,7 @@ package state import ( "fmt" + "strings" memdb "github.com/hashicorp/go-memdb" @@ -12,6 +13,34 @@ import ( func withEnterpriseSchema(_ *memdb.DBSchema) {} +func indexNodeServiceFromHealthCheck(raw interface{}) ([]byte, error) { + hc, ok := raw.(*structs.HealthCheck) + if !ok { + return nil, fmt.Errorf("unexpected type %T for structs.HealthCheck index", raw) + } + + if hc.Node == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(strings.ToLower(hc.Node)) + b.String(strings.ToLower(hc.ServiceID)) + return b.Bytes(), nil +} + +func indexFromNodeServiceQuery(arg interface{}) ([]byte, error) { + hc, ok := arg.(NodeServiceQuery) + if !ok { + return nil, fmt.Errorf("unexpected type %T for NodeServiceQuery index", arg) + } + + var b indexBuilder + b.String(strings.ToLower(hc.Node)) + b.String(strings.ToLower(hc.Service)) + return b.Bytes(), nil +} + func serviceIndexName(name string, _ *structs.EnterpriseMeta) string { return fmt.Sprintf("service.%s", name) } @@ -156,14 +185,6 @@ func catalogListChecks(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultItera return tx.Get("checks", "id") } -func catalogListNodeChecks(tx ReadTxn, node string) (memdb.ResultIterator, error) { - return tx.Get("checks", indexNodeServiceCheck, node, false) -} - -func catalogListServiceChecks(tx ReadTxn, node string, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - return tx.Get("checks", indexNodeService, node, service) -} - func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error { // Insert the check if err := tx.Insert("checks", chk); err != nil { diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index 6b5ebfbf1..4047df272 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -17,13 +17,12 @@ const ( tableGatewayServices = "gateway-services" tableMeshTopology = "mesh-topology" - indexID = "id" - indexServiceName = "service" - indexConnect = "connect" - indexKind = "kind" - indexStatus = "status" - indexNodeServiceCheck = "node_service_check" - indexNodeService = "node_service" + indexID = "id" + indexServiceName = "service" + indexConnect = "connect" + indexKind = "kind" + indexStatus = "status" + indexNodeService = "node_service" ) // nodesTableSchema returns a new table schema used for storing node @@ -170,37 +169,13 @@ func checksTableSchema() *memdb.TableSchema { Lowercase: true, }, }, - indexNodeServiceCheck: { - Name: indexNodeServiceCheck, - AllowMissing: true, - Unique: false, - Indexer: &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "Node", - Lowercase: true, - }, - &memdb.FieldSetIndex{ - Field: "ServiceID", - }, - }, - }, - }, indexNodeService: { Name: indexNodeService, AllowMissing: true, Unique: false, - Indexer: &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "Node", - Lowercase: true, - }, - &memdb.StringFieldIndex{ - Field: "ServiceID", - Lowercase: true, - }, - }, + Indexer: indexerSingle{ + fromArgsIndexer: indexFromSingleArg(indexFromNodeServiceQuery), + fromObjectIndexer: indexFromObject(indexNodeServiceFromHealthCheck), }, }, }, diff --git a/agent/consul/state/testdata/TestStateStoreSchema.golden b/agent/consul/state/testdata/TestStateStoreSchema.golden index 2b817d029..9b0abeadd 100644 --- a/agent/consul/state/testdata/TestStateStoreSchema.golden +++ b/agent/consul/state/testdata/TestStateStoreSchema.golden @@ -52,9 +52,7 @@ table=checks index=node allow-missing indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true index=node_service allow-missing - indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.StringFieldIndex Field=ServiceID Lowercase=true] AllowMissing=false - index=node_service_check allow-missing - indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.FieldSetIndex Field=ServiceID] AllowMissing=false + indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle fromArgsIndexer=github.com/hashicorp/consul/agent/consul/state.indexFromSingleArg.func1 fromObjectIndexer=github.com/hashicorp/consul/agent/consul/state.indexNodeServiceFromHealthCheck index=service allow-missing indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=ServiceName Lowercase=true index=status From 7e4d693aaa0eb5d8e2f84ce09ed9141dac8ee3c9 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 10 Feb 2021 19:00:13 -0500 Subject: [PATCH 3/4] state: support for functional indexers These new functional indexers provide a few advantages: 1. enterprise differences can be isolated to a single function (the indexer function), making code easier to change 2. as a consequence of (1) we no longer need to wrap all the calls to Txn operations, making code easier to read. 3. by removing reflection we should increase the performance of all operations. One important change is in making all the function signatures the same. https://blog.golang.org/errors-are-values An extra boolean return value for SingleIndexer.FromObject is superfluous. The error value can indicate when the index value could not be created. By removing this extra return value we can use the same signature for both indexer functions. This has the nice properly of a function being usable for both indexing operations. --- agent/consul/state/catalog_schema.go | 4 +- agent/consul/state/indexer.go | 94 +++++++++++++++++++ agent/consul/state/schema_test.go | 39 ++++---- .../testdata/TestStateStoreSchema.golden | 2 +- 4 files changed, 119 insertions(+), 20 deletions(-) create mode 100644 agent/consul/state/indexer.go diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index 4047df272..a424c6abb 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -174,8 +174,8 @@ func checksTableSchema() *memdb.TableSchema { AllowMissing: true, Unique: false, Indexer: indexerSingle{ - fromArgsIndexer: indexFromSingleArg(indexFromNodeServiceQuery), - fromObjectIndexer: indexFromObject(indexNodeServiceFromHealthCheck), + readIndex: readIndex(indexFromNodeServiceQuery), + writeIndex: writeIndex(indexNodeServiceFromHealthCheck), }, }, }, diff --git a/agent/consul/state/indexer.go b/agent/consul/state/indexer.go new file mode 100644 index 000000000..dc91e2af8 --- /dev/null +++ b/agent/consul/state/indexer.go @@ -0,0 +1,94 @@ +package state + +import ( + "bytes" + "errors" + "fmt" +) + +// indexerSingle implements both memdb.Indexer and memdb.SingleIndexer. It may +// be used in a memdb.IndexSchema to specify functions that generate the index +// value for memdb.Txn operations. +type indexerSingle struct { + // readIndex is used by memdb for Txn.Get, Txn.First, and other operations + // that read data. + readIndex + // writeIndex is used by memdb for Txn.Insert, Txn.Delete, for operations + // that write data to the index. + writeIndex +} + +// indexerMulti implements both memdb.Indexer and memdb.MultiIndexer. It may +// be used in a memdb.IndexSchema to specify functions that generate the index +// value for memdb.Txn operations. +type indexerMulti struct { + // readIndex is used by memdb for Txn.Get, Txn.First, and other operations + // that read data. + readIndex + // writeIndexMulti is used by memdb for Txn.Insert, Txn.Delete, for operations + // that write data to the index. + writeIndexMulti +} + +// readIndex implements memdb.Indexer. It exists so that a function can be used +// to provide the interface. +// +// Unlike memdb.Indexer, a readIndex function accepts only a single argument. To +// generate an index from multiple values, use a struct type with multiple fields. +type readIndex func(arg interface{}) ([]byte, error) + +func (f readIndex) FromArgs(args ...interface{}) ([]byte, error) { + if len(args) != 1 { + return nil, fmt.Errorf("index supports only a single arg") + } + return f(args[0]) +} + +var errMissingValueForIndex = fmt.Errorf("object is missing a value for this index") + +// writeIndex implements memdb.SingleIndexer. It is used so that a function +// can be used to provide this interface. +// +// Instead of a bool return value, writeIndex expects errMissingValueForIndex to +// indicate that an index could not be build for the object. It will translate +// this error into a false value to satisfy the memdb.SingleIndexer interface. +type writeIndex func(raw interface{}) ([]byte, error) + +func (f writeIndex) FromObject(raw interface{}) (bool, []byte, error) { + v, err := f(raw) + if errors.Is(err, errMissingValueForIndex) { + return false, nil, nil + } + return err == nil, v, err +} + +// writeIndexMulti implements memdb.MultiIndexer. It is used so that a function +// can be used to provide this interface. +// +// Instead of a bool return value, writeIndexMulti expects errMissingValueForIndex to +// indicate that an index could not be build for the object. It will translate +// this error into a false value to satisfy the memdb.MultiIndexer interface. +type writeIndexMulti func(raw interface{}) ([][]byte, error) + +func (f writeIndexMulti) FromObject(raw interface{}) (bool, [][]byte, error) { + v, err := f(raw) + if errors.Is(err, errMissingValueForIndex) { + return false, nil, nil + } + return err == nil, v, err +} + +const null = "\x00" + +// indexBuilder is a buffer used to construct memdb index values. +type indexBuilder bytes.Buffer + +// String appends the string and a null terminator to the buffer. +func (b *indexBuilder) String(v string) { + (*bytes.Buffer)(b).WriteString(v) + (*bytes.Buffer)(b).WriteString(null) +} + +func (b *indexBuilder) Bytes() []byte { + return (*bytes.Buffer)(b).Bytes() +} diff --git a/agent/consul/state/schema_test.go b/agent/consul/state/schema_test.go index 8fff28c05..ccb91db0e 100644 --- a/agent/consul/state/schema_test.go +++ b/agent/consul/state/schema_test.go @@ -67,25 +67,30 @@ func formatIndexer(buf *bytes.Buffer, indexer memdb.Indexer) { for i := 0; i < typ.NumField(); i++ { fmt.Fprintf(buf, " %v=", typ.Field(i).Name) - field := v.Field(i) - switch typ.Field(i).Type.Kind() { - case reflect.Slice: - buf.WriteString("[") - for j := 0; j < field.Len(); j++ { - if j != 0 { - buf.WriteString(", ") - } - // TODO: handle other types of slices - formatIndexer(buf, v.Field(i).Index(j).Interface().(memdb.Indexer)) + formatField(buf, v.Field(i)) + } +} + +func formatField(buf *bytes.Buffer, field reflect.Value) { + switch field.Type().Kind() { + case reflect.Slice: + buf.WriteString("[") + for j := 0; j < field.Len(); j++ { + if j != 0 { + buf.WriteString(", ") } - buf.WriteString("]") - case reflect.Func: - // Functions are printed as pointer addresses, which change frequently. - // Instead use the name. - buf.WriteString(runtime.FuncForPC(field.Pointer()).Name()) - default: - fmt.Fprintf(buf, "%v", field) + // TODO: handle other types of slices + formatIndexer(buf, field.Index(j).Interface().(memdb.Indexer)) } + buf.WriteString("]") + case reflect.Func: + // Functions are printed as pointer addresses, which change frequently. + // Instead use the name. + buf.WriteString(runtime.FuncForPC(field.Pointer()).Name()) + case reflect.Interface: + formatField(buf, field.Elem()) + default: + fmt.Fprintf(buf, "%v", field) } } diff --git a/agent/consul/state/testdata/TestStateStoreSchema.golden b/agent/consul/state/testdata/TestStateStoreSchema.golden index 9b0abeadd..764f4e5e6 100644 --- a/agent/consul/state/testdata/TestStateStoreSchema.golden +++ b/agent/consul/state/testdata/TestStateStoreSchema.golden @@ -52,7 +52,7 @@ table=checks index=node allow-missing indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true index=node_service allow-missing - indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle fromArgsIndexer=github.com/hashicorp/consul/agent/consul/state.indexFromSingleArg.func1 fromObjectIndexer=github.com/hashicorp/consul/agent/consul/state.indexNodeServiceFromHealthCheck + indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeServiceQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexNodeServiceFromHealthCheck index=service allow-missing indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=ServiceName Lowercase=true index=status From dd45c4cfe4943ca8e978f8e6c8eb501b70edf2ca Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 19 Feb 2021 14:38:07 -0500 Subject: [PATCH 4/4] state: add a test case for memdb indexers --- agent/consul/state/catalog_oss_test.go | 26 ++++++++ agent/consul/state/schema_test.go | 83 +++++++++++++++++++++++++- 2 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 agent/consul/state/catalog_oss_test.go diff --git a/agent/consul/state/catalog_oss_test.go b/agent/consul/state/catalog_oss_test.go new file mode 100644 index 000000000..270489da1 --- /dev/null +++ b/agent/consul/state/catalog_oss_test.go @@ -0,0 +1,26 @@ +// +build !consulent + +package state + +import "github.com/hashicorp/consul/agent/structs" + +func testIndexerTableChecks() map[string]indexerTestCase { + return map[string]indexerTestCase{ + indexNodeService: { + read: indexValue{ + source: NodeServiceQuery{ + Node: "NoDe", + Service: "SeRvIcE", + }, + expected: []byte("node\x00service\x00"), + }, + write: indexValue{ + source: &structs.HealthCheck{ + Node: "NoDe", + ServiceID: "SeRvIcE", + }, + expected: []byte("node\x00service\x00"), + }, + }, + } +} diff --git a/agent/consul/state/schema_test.go b/agent/consul/state/schema_test.go index ccb91db0e..572e70eb8 100644 --- a/agent/consul/state/schema_test.go +++ b/agent/consul/state/schema_test.go @@ -14,7 +14,9 @@ import ( "github.com/hashicorp/consul/internal/testing/golden" ) -func TestStateStoreSchema(t *testing.T) { +// TODO: once TestNewDBSchema_Indexers has test cases for all tables and indexes +// it is probably safe to remove this test +func TestNewDBSchema(t *testing.T) { schema := newDBSchema() require.NoError(t, schema.Validate()) @@ -103,3 +105,82 @@ func indexNames(table *memdb.TableSchema) []string { sort.Strings(indexes) return indexes } + +type indexerTestCase struct { + read indexValue + write indexValue + prefix []indexValue + writeMulti indexValueMulti +} + +type indexValue struct { + source interface{} + expected []byte +} + +type indexValueMulti struct { + source interface{} + expected [][]byte +} + +func TestNewDBSchema_Indexers(t *testing.T) { + schema := newDBSchema() + require.NoError(t, schema.Validate()) + + var testcases = map[string]func() map[string]indexerTestCase{ + tableChecks: testIndexerTableChecks, + } + + for _, table := range schema.Tables { + if testcases[table.Name] == nil { + continue + } + t.Run(table.Name, func(t *testing.T) { + tableTCs := testcases[table.Name]() + + for _, index := range table.Indexes { + t.Run(index.Name, func(t *testing.T) { + indexer := index.Indexer + tc, ok := tableTCs[index.Name] + if !ok { + t.Skip("TODO: missing test case") + } + + args := []interface{}{tc.read.source} + if s, ok := tc.read.source.([]interface{}); ok { + // Indexes using memdb.CompoundIndex must be expanded to multiple args + args = s + } + + actual, err := indexer.FromArgs(args...) + require.NoError(t, err) + require.Equal(t, tc.read.expected, actual) + + if i, ok := indexer.(memdb.SingleIndexer); ok { + valid, actual, err := i.FromObject(tc.write.source) + require.NoError(t, err) + require.True(t, valid) + require.Equal(t, tc.write.expected, actual) + } + + if i, ok := indexer.(memdb.PrefixIndexer); ok { + for _, c := range tc.prefix { + t.Run("", func(t *testing.T) { + actual, err := i.PrefixFromArgs(c.source) + require.NoError(t, err) + require.Equal(t, c.expected, actual) + }) + } + } + + if i, ok := indexer.(memdb.MultiIndexer); ok { + valid, actual, err := i.FromObject(tc.writeMulti.source) + require.NoError(t, err) + require.True(t, valid) + require.Equal(t, tc.writeMulti.expected, actual) + } + }) + } + }) + } +}