Merge pull request #9796 from hashicorp/dnephin/state-cleanup-catalog-index-oss

state: remove duplicate tableCheck indexes
This commit is contained in:
Daniel Nephin 2021-03-10 12:20:09 -05:00 committed by GitHub
commit 5c5ba9564d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 280 additions and 70 deletions

View File

@ -1316,9 +1316,14 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
return nil
}
// TODO: accept a non-pointer value for EnterpriseMeta
if entMeta == nil {
entMeta = structs.DefaultEnterpriseMeta()
}
// Delete any checks associated with the service. This will invalidate
// sessions as necessary.
checks, err := catalogChecksForNodeService(tx, nodeName, serviceID, entMeta)
q := NodeServiceQuery{Node: nodeName, Service: serviceID, EnterpriseMeta: *entMeta}
checks, err := tx.Get(tableChecks, indexNodeService, q)
if err != nil {
return fmt.Errorf("failed service check lookup: %s", err)
}
@ -1767,6 +1772,13 @@ func (s *Store) deleteCheckCASTxn(tx WriteTxn, idx, cidx uint64, node string, ch
return true, nil
}
// NodeServiceQuery is a type used to query the checks table.
type NodeServiceQuery struct {
Node string
Service string
structs.EnterpriseMeta
}
// deleteCheckTxn is the inner method used to call a health
// check deletion within an existing transaction.
func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error {
@ -2142,7 +2154,8 @@ func parseCheckServiceNodes(
// First add the node-level checks. These always apply to any
// service on the node.
var checks structs.HealthChecks
iter, err := catalogListNodeChecks(tx, sn.Node)
q := NodeServiceQuery{Node: sn.Node, EnterpriseMeta: *structs.DefaultEnterpriseMeta()}
iter, err := tx.Get(tableChecks, indexNodeService, q)
if err != nil {
return 0, nil, err
}
@ -2152,7 +2165,8 @@ func parseCheckServiceNodes(
}
// Now add the service-specific checks.
iter, err = catalogListServiceChecks(tx, sn.Node, sn.ServiceID, &sn.EnterpriseMeta)
q = NodeServiceQuery{Node: sn.Node, Service: sn.ServiceID, EnterpriseMeta: sn.EnterpriseMeta}
iter, err = tx.Get(tableChecks, indexNodeService, q)
if err != nil {
return 0, nil, err
}

View File

@ -4,6 +4,7 @@ package state
import (
"fmt"
"strings"
memdb "github.com/hashicorp/go-memdb"
@ -12,6 +13,34 @@ import (
func withEnterpriseSchema(_ *memdb.DBSchema) {}
func indexNodeServiceFromHealthCheck(raw interface{}) ([]byte, error) {
hc, ok := raw.(*structs.HealthCheck)
if !ok {
return nil, fmt.Errorf("unexpected type %T for structs.HealthCheck index", raw)
}
if hc.Node == "" {
return nil, errMissingValueForIndex
}
var b indexBuilder
b.String(strings.ToLower(hc.Node))
b.String(strings.ToLower(hc.ServiceID))
return b.Bytes(), nil
}
func indexFromNodeServiceQuery(arg interface{}) ([]byte, error) {
hc, ok := arg.(NodeServiceQuery)
if !ok {
return nil, fmt.Errorf("unexpected type %T for NodeServiceQuery index", arg)
}
var b indexBuilder
b.String(strings.ToLower(hc.Node))
b.String(strings.ToLower(hc.Service))
return b.Bytes(), nil
}
func serviceIndexName(name string, _ *structs.EnterpriseMeta) string {
return fmt.Sprintf("service.%s", name)
}
@ -156,14 +185,6 @@ func catalogListChecks(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultItera
return tx.Get("checks", "id")
}
func catalogListNodeChecks(tx ReadTxn, node string) (memdb.ResultIterator, error) {
return tx.Get("checks", "node_service_check", node, false)
}
func catalogListServiceChecks(tx ReadTxn, node string, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("checks", "node_service", node, service)
}
func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error {
// Insert the check
if err := tx.Insert("checks", chk); err != nil {
@ -177,10 +198,6 @@ func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error
return nil
}
func catalogChecksForNodeService(tx ReadTxn, node string, service string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("checks", "node_service", node, service)
}
func validateRegisterRequestTxn(_ ReadTxn, _ *structs.RegisterRequest, _ bool) (*structs.EnterpriseMeta, error) {
return nil, nil
}

View File

@ -0,0 +1,26 @@
// +build !consulent
package state
import "github.com/hashicorp/consul/agent/structs"
func testIndexerTableChecks() map[string]indexerTestCase {
return map[string]indexerTestCase{
indexNodeService: {
read: indexValue{
source: NodeServiceQuery{
Node: "NoDe",
Service: "SeRvIcE",
},
expected: []byte("node\x00service\x00"),
},
write: indexValue{
source: &structs.HealthCheck{
Node: "NoDe",
ServiceID: "SeRvIcE",
},
expected: []byte("node\x00service\x00"),
},
},
}
}

View File

@ -17,13 +17,12 @@ const (
tableGatewayServices = "gateway-services"
tableMeshTopology = "mesh-topology"
indexID = "id"
indexServiceName = "service"
indexConnect = "connect"
indexKind = "kind"
indexStatus = "status"
indexNodeServiceCheck = "node_service_check"
indexNodeService = "node_service"
indexID = "id"
indexServiceName = "service"
indexConnect = "connect"
indexKind = "kind"
indexStatus = "status"
indexNodeService = "node_service"
)
// nodesTableSchema returns a new table schema used for storing node
@ -170,37 +169,13 @@ func checksTableSchema() *memdb.TableSchema {
Lowercase: true,
},
},
indexNodeServiceCheck: {
Name: indexNodeServiceCheck,
AllowMissing: true,
Unique: false,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.FieldSetIndex{
Field: "ServiceID",
},
},
},
},
indexNodeService: {
Name: indexNodeService,
AllowMissing: true,
Unique: false,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "ServiceID",
Lowercase: true,
},
},
Indexer: indexerSingle{
readIndex: readIndex(indexFromNodeServiceQuery),
writeIndex: writeIndex(indexNodeServiceFromHealthCheck),
},
},
},

View File

@ -0,0 +1,94 @@
package state
import (
"bytes"
"errors"
"fmt"
)
// indexerSingle implements both memdb.Indexer and memdb.SingleIndexer. It may
// be used in a memdb.IndexSchema to specify functions that generate the index
// value for memdb.Txn operations.
type indexerSingle struct {
// readIndex is used by memdb for Txn.Get, Txn.First, and other operations
// that read data.
readIndex
// writeIndex is used by memdb for Txn.Insert, Txn.Delete, for operations
// that write data to the index.
writeIndex
}
// indexerMulti implements both memdb.Indexer and memdb.MultiIndexer. It may
// be used in a memdb.IndexSchema to specify functions that generate the index
// value for memdb.Txn operations.
type indexerMulti struct {
// readIndex is used by memdb for Txn.Get, Txn.First, and other operations
// that read data.
readIndex
// writeIndexMulti is used by memdb for Txn.Insert, Txn.Delete, for operations
// that write data to the index.
writeIndexMulti
}
// readIndex implements memdb.Indexer. It exists so that a function can be used
// to provide the interface.
//
// Unlike memdb.Indexer, a readIndex function accepts only a single argument. To
// generate an index from multiple values, use a struct type with multiple fields.
type readIndex func(arg interface{}) ([]byte, error)
func (f readIndex) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) != 1 {
return nil, fmt.Errorf("index supports only a single arg")
}
return f(args[0])
}
var errMissingValueForIndex = fmt.Errorf("object is missing a value for this index")
// writeIndex implements memdb.SingleIndexer. It is used so that a function
// can be used to provide this interface.
//
// Instead of a bool return value, writeIndex expects errMissingValueForIndex to
// indicate that an index could not be build for the object. It will translate
// this error into a false value to satisfy the memdb.SingleIndexer interface.
type writeIndex func(raw interface{}) ([]byte, error)
func (f writeIndex) FromObject(raw interface{}) (bool, []byte, error) {
v, err := f(raw)
if errors.Is(err, errMissingValueForIndex) {
return false, nil, nil
}
return err == nil, v, err
}
// writeIndexMulti implements memdb.MultiIndexer. It is used so that a function
// can be used to provide this interface.
//
// Instead of a bool return value, writeIndexMulti expects errMissingValueForIndex to
// indicate that an index could not be build for the object. It will translate
// this error into a false value to satisfy the memdb.MultiIndexer interface.
type writeIndexMulti func(raw interface{}) ([][]byte, error)
func (f writeIndexMulti) FromObject(raw interface{}) (bool, [][]byte, error) {
v, err := f(raw)
if errors.Is(err, errMissingValueForIndex) {
return false, nil, nil
}
return err == nil, v, err
}
const null = "\x00"
// indexBuilder is a buffer used to construct memdb index values.
type indexBuilder bytes.Buffer
// String appends the string and a null terminator to the buffer.
func (b *indexBuilder) String(v string) {
(*bytes.Buffer)(b).WriteString(v)
(*bytes.Buffer)(b).WriteString(null)
}
func (b *indexBuilder) Bytes() []byte {
return (*bytes.Buffer)(b).Bytes()
}

View File

@ -14,7 +14,9 @@ import (
"github.com/hashicorp/consul/internal/testing/golden"
)
func TestStateStoreSchema(t *testing.T) {
// TODO: once TestNewDBSchema_Indexers has test cases for all tables and indexes
// it is probably safe to remove this test
func TestNewDBSchema(t *testing.T) {
schema := newDBSchema()
require.NoError(t, schema.Validate())
@ -67,25 +69,30 @@ func formatIndexer(buf *bytes.Buffer, indexer memdb.Indexer) {
for i := 0; i < typ.NumField(); i++ {
fmt.Fprintf(buf, " %v=", typ.Field(i).Name)
field := v.Field(i)
switch typ.Field(i).Type.Kind() {
case reflect.Slice:
buf.WriteString("[")
for j := 0; j < field.Len(); j++ {
if j != 0 {
buf.WriteString(", ")
}
// TODO: handle other types of slices
formatIndexer(buf, v.Field(i).Index(j).Interface().(memdb.Indexer))
formatField(buf, v.Field(i))
}
}
func formatField(buf *bytes.Buffer, field reflect.Value) {
switch field.Type().Kind() {
case reflect.Slice:
buf.WriteString("[")
for j := 0; j < field.Len(); j++ {
if j != 0 {
buf.WriteString(", ")
}
buf.WriteString("]")
case reflect.Func:
// Functions are printed as pointer addresses, which change frequently.
// Instead use the name.
buf.WriteString(runtime.FuncForPC(field.Pointer()).Name())
default:
fmt.Fprintf(buf, "%v", field)
// TODO: handle other types of slices
formatIndexer(buf, field.Index(j).Interface().(memdb.Indexer))
}
buf.WriteString("]")
case reflect.Func:
// Functions are printed as pointer addresses, which change frequently.
// Instead use the name.
buf.WriteString(runtime.FuncForPC(field.Pointer()).Name())
case reflect.Interface:
formatField(buf, field.Elem())
default:
fmt.Fprintf(buf, "%v", field)
}
}
@ -98,3 +105,82 @@ func indexNames(table *memdb.TableSchema) []string {
sort.Strings(indexes)
return indexes
}
type indexerTestCase struct {
read indexValue
write indexValue
prefix []indexValue
writeMulti indexValueMulti
}
type indexValue struct {
source interface{}
expected []byte
}
type indexValueMulti struct {
source interface{}
expected [][]byte
}
func TestNewDBSchema_Indexers(t *testing.T) {
schema := newDBSchema()
require.NoError(t, schema.Validate())
var testcases = map[string]func() map[string]indexerTestCase{
tableChecks: testIndexerTableChecks,
}
for _, table := range schema.Tables {
if testcases[table.Name] == nil {
continue
}
t.Run(table.Name, func(t *testing.T) {
tableTCs := testcases[table.Name]()
for _, index := range table.Indexes {
t.Run(index.Name, func(t *testing.T) {
indexer := index.Indexer
tc, ok := tableTCs[index.Name]
if !ok {
t.Skip("TODO: missing test case")
}
args := []interface{}{tc.read.source}
if s, ok := tc.read.source.([]interface{}); ok {
// Indexes using memdb.CompoundIndex must be expanded to multiple args
args = s
}
actual, err := indexer.FromArgs(args...)
require.NoError(t, err)
require.Equal(t, tc.read.expected, actual)
if i, ok := indexer.(memdb.SingleIndexer); ok {
valid, actual, err := i.FromObject(tc.write.source)
require.NoError(t, err)
require.True(t, valid)
require.Equal(t, tc.write.expected, actual)
}
if i, ok := indexer.(memdb.PrefixIndexer); ok {
for _, c := range tc.prefix {
t.Run("", func(t *testing.T) {
actual, err := i.PrefixFromArgs(c.source)
require.NoError(t, err)
require.Equal(t, c.expected, actual)
})
}
}
if i, ok := indexer.(memdb.MultiIndexer); ok {
valid, actual, err := i.FromObject(tc.writeMulti.source)
require.NoError(t, err)
require.True(t, valid)
require.Equal(t, tc.writeMulti.expected, actual)
}
})
}
})
}
}

View File

@ -52,9 +52,7 @@ table=checks
index=node allow-missing
indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true
index=node_service allow-missing
indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.StringFieldIndex Field=ServiceID Lowercase=true] AllowMissing=false
index=node_service_check allow-missing
indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.FieldSetIndex Field=ServiceID] AllowMissing=false
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeServiceQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexNodeServiceFromHealthCheck
index=service allow-missing
indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=ServiceName Lowercase=true
index=status