state: rename table name constants to new pattern

Using Apps Hungarian Notation for these constants makes the memdb queries more readable.
This commit is contained in:
Daniel Nephin 2021-01-28 20:48:51 -05:00
parent 09425b22a1
commit afdbf2a8ef
5 changed files with 75 additions and 78 deletions

View file

@ -18,15 +18,9 @@ import (
"github.com/hashicorp/consul/types"
)
const (
servicesTableName = "services"
gatewayServicesTableName = "gateway-services"
topologyTableName = "mesh-topology"
// serviceLastExtinctionIndexName keeps track of the last raft index when the last instance
// indexServiceExtinction keeps track of the last raft index when the last instance
// of any service was unregistered. This is used by blocking queries on missing services.
serviceLastExtinctionIndexName = "service_last_extinction"
)
const indexServiceExtinction = "service_last_extinction"
const (
// minUUIDLookupLen is used as a minimum length of a node name required before
@ -2087,7 +2081,7 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru
if err != nil {
return 0, nil, err
}
idx := maxIndexTxn(tx, gatewayServicesTableName)
idx := maxIndexTxn(tx, tableGatewayServices)
return lib.MaxUint64(maxIdx, idx), results, nil
}
@ -2361,7 +2355,7 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en
// Delete all associated with gateway first, to avoid keeping mappings that were removed
sn := structs.NewServiceName(conf.GetName(), entMeta)
if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", sn); err != nil {
if _, err := tx.DeleteAll(tableGatewayServices, "gateway", sn); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}
if err := truncateGatewayServiceTopologyMappings(tx, idx, sn, conf.GetKind()); err != nil {
@ -2389,7 +2383,7 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en
}
}
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
if err := indexUpdateMaxTxn(tx, idx, tableGatewayServices); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
return nil
@ -2499,7 +2493,7 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer
continue
}
existing, err := tx.First(gatewayServicesTableName, "id", service.Gateway, sn.CompoundServiceName(), service.Port)
existing, err := tx.First(tableGatewayServices, "id", service.Gateway, sn.CompoundServiceName(), service.Port)
if err != nil {
return fmt.Errorf("gateway service lookup failed: %s", err)
}
@ -2534,7 +2528,7 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer
func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error {
// Check if mapping already exists in table if it's already in the table
// Avoid insert if nothing changed
existing, err := tx.First(gatewayServicesTableName, "id", mapping.Gateway, mapping.Service, mapping.Port)
existing, err := tx.First(tableGatewayServices, "id", mapping.Gateway, mapping.Service, mapping.Port)
if err != nil {
return fmt.Errorf("gateway service lookup failed: %s", err)
}
@ -2549,11 +2543,11 @@ func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayServi
}
mapping.ModifyIndex = idx
if err := tx.Insert(gatewayServicesTableName, mapping); err != nil {
if err := tx.Insert(tableGatewayServices, mapping); err != nil {
return fmt.Errorf("failed inserting gateway service mapping: %s", err)
}
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
if err := indexUpdateMaxTxn(tx, idx, tableGatewayServices); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
@ -2613,10 +2607,10 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode)
// Otherwise the service was specified in the config entry, and the association should be maintained
// for when the service is re-registered
if m.FromWildcard {
if err := tx.Delete(gatewayServicesTableName, m); err != nil {
if err := tx.Delete(tableGatewayServices, m); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
if err := indexUpdateMaxTxn(tx, idx, tableGatewayServices); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
if err := deleteGatewayServiceTopologyMapping(tx, idx, m); err != nil {
@ -2630,18 +2624,18 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode)
// serviceGateways returns all GatewayService entries with the given service name. This effectively looks up
// all the gateways mapped to this service.
func serviceGateways(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(gatewayServicesTableName, "service", structs.NewServiceName(name, entMeta))
return tx.Get(tableGatewayServices, "service", structs.NewServiceName(name, entMeta))
}
func gatewayServices(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(gatewayServicesTableName, "gateway", structs.NewServiceName(name, entMeta))
return tx.Get(tableGatewayServices, "gateway", structs.NewServiceName(name, entMeta))
}
func (s *Store) DumpGatewayServices(ws memdb.WatchSet) (uint64, structs.GatewayServices, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
iter, err := tx.Get(gatewayServicesTableName, "id")
iter, err := tx.Get(tableGatewayServices, "id")
if err != nil {
return 0, nil, fmt.Errorf("failed to dump gateway-services: %s", err)
}
@ -2651,7 +2645,7 @@ func (s *Store) DumpGatewayServices(ws memdb.WatchSet) (uint64, structs.GatewayS
if err != nil {
return 0, nil, err
}
idx := maxIndexTxn(tx, gatewayServicesTableName)
idx := maxIndexTxn(tx, tableGatewayServices)
return lib.MaxUint64(maxIdx, idx), results, nil
}
@ -2968,9 +2962,9 @@ func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.Se
index = "upstream"
}
iter, err := tx.Get(topologyTableName, index, service)
iter, err := tx.Get(tableMeshTopology, index, service)
if err != nil {
return 0, nil, fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
return 0, nil, fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err)
}
ws.Add(iter.WatchCh())
@ -2993,7 +2987,7 @@ func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.Se
// TODO (freddy) This needs a tombstone to avoid the index sliding back on mapping deletion
// Using the table index here means that blocking queries will wake up more often than they should
tableIdx := maxIndexTxn(tx, topologyTableName)
tableIdx := maxIndexTxn(tx, tableMeshTopology)
if tableIdx > idx {
idx = tableIdx
}
@ -3024,9 +3018,9 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS
upstreamMeta := structs.EnterpriseMetaInitializer(u.DestinationNamespace)
upstream := structs.NewServiceName(u.DestinationName, &upstreamMeta)
obj, err := tx.First(topologyTableName, "id", upstream, downstream)
obj, err := tx.First(tableMeshTopology, "id", upstream, downstream)
if err != nil {
return fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
return fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err)
}
sid := svc.CompoundServiceID()
uid := structs.UniqueID(node, sid.String())
@ -3057,22 +3051,22 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS
},
}
}
if err := tx.Insert(topologyTableName, mapping); err != nil {
return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err)
if err := tx.Insert(tableMeshTopology, mapping); err != nil {
return fmt.Errorf("failed inserting %s mapping: %s", tableMeshTopology, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err)
}
inserted[upstream] = true
}
for u := range oldUpstreams {
if !inserted[u] {
if _, err := tx.DeleteAll(topologyTableName, "id", u, downstream); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
if _, err := tx.DeleteAll(tableMeshTopology, "id", u, downstream); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err)
}
}
}
@ -3090,9 +3084,9 @@ func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode)
sid := service.CompoundServiceID()
uid := structs.UniqueID(service.Node, sid.String())
iter, err := tx.Get(topologyTableName, "downstream", sn)
iter, err := tx.Get(tableMeshTopology, "downstream", sn)
if err != nil {
return fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
return fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err)
}
mappings := make([]*structs.UpstreamDownstream, 0)
@ -3118,17 +3112,17 @@ func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode)
delete(copy.Refs, uid)
if len(copy.Refs) == 0 {
if err := tx.Delete(topologyTableName, m); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
if err := tx.Delete(tableMeshTopology, m); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err)
}
continue
}
if err := tx.Insert(topologyTableName, copy); err != nil {
return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err)
if err := tx.Insert(tableMeshTopology, copy); err != nil {
return fmt.Errorf("failed inserting %s mapping: %s", tableMeshTopology, err)
}
}
return nil
@ -3145,11 +3139,11 @@ func insertGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.Ga
Downstream: gs.Gateway,
RaftIndex: gs.RaftIndex,
}
if err := tx.Insert(topologyTableName, &mapping); err != nil {
return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err)
if err := tx.Insert(tableMeshTopology, &mapping); err != nil {
return fmt.Errorf("failed inserting %s mapping: %s", tableMeshTopology, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err)
}
return nil
@ -3161,11 +3155,11 @@ func deleteGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.Ga
return nil
}
if _, err := tx.DeleteAll(topologyTableName, "id", gs.Service, gs.Gateway); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
if _, err := tx.DeleteAll(tableMeshTopology, "id", gs.Service, gs.Gateway); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err)
}
return nil
@ -3177,11 +3171,11 @@ func truncateGatewayServiceTopologyMappings(tx WriteTxn, idx uint64, gateway str
return nil
}
if _, err := tx.DeleteAll(topologyTableName, "downstream", gateway); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
if _, err := tx.DeleteAll(tableMeshTopology, "downstream", gateway); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err)
}
return nil

View file

@ -54,7 +54,7 @@ func catalogUpdateServiceIndexes(tx WriteTxn, serviceName string, idx uint64, _
}
func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
if err := tx.Insert("index", &IndexEntry{serviceLastExtinctionIndexName, idx}); err != nil {
if err := tx.Insert("index", &IndexEntry{indexServiceExtinction, idx}); err != nil {
return fmt.Errorf("failed updating missing service extinction index: %s", err)
}
return nil
@ -110,7 +110,7 @@ func catalogServiceNodeList(tx ReadTxn, name string, index string, _ *structs.En
}
func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) {
return tx.First("index", "id", serviceLastExtinctionIndexName)
return tx.First("index", "id", indexServiceExtinction)
}
func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 {

View file

@ -14,6 +14,8 @@ const (
tableNodes = "nodes"
tableServices = "services"
tableChecks = "checks"
tableGatewayServices = "gateway-services"
tableMeshTopology = "mesh-topology"
indexID = "id"
indexServiceName = "service"
@ -205,11 +207,11 @@ func checksTableSchema() *memdb.TableSchema {
}
}
// gatewayServicesTableNameSchema returns a new table schema used to store information
// gatewayServicesTableSchema returns a new table schema used to store information
// about services associated with terminating gateways.
func gatewayServicesTableNameSchema() *memdb.TableSchema {
func gatewayServicesTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: gatewayServicesTableName,
Name: tableGatewayServices,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
@ -249,11 +251,11 @@ func gatewayServicesTableNameSchema() *memdb.TableSchema {
}
}
// topologyTableNameSchema returns a new table schema used to store information
// meshTopologyTableSchema returns a new table schema used to store information
// relating upstream and downstream services
func topologyTableNameSchema() *memdb.TableSchema {
func meshTopologyTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: topologyTableName,
Name: tableMeshTopology,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
@ -350,6 +352,6 @@ func init() {
registerSchema(nodesTableSchema)
registerSchema(servicesTableSchema)
registerSchema(checksTableSchema)
registerSchema(gatewayServicesTableNameSchema)
registerSchema(topologyTableNameSchema)
registerSchema(gatewayServicesTableSchema)
registerSchema(meshTopologyTableSchema)
}

View file

@ -273,20 +273,20 @@ func deleteConfigEntryTxn(tx WriteTxn, idx uint64, kind, name string, entMeta *s
sn := structs.NewServiceName(name, entMeta)
if kind == structs.TerminatingGateway || kind == structs.IngressGateway {
if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", sn); err != nil {
if _, err := tx.DeleteAll(tableGatewayServices, "gateway", sn); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
if err := indexUpdateMaxTxn(tx, idx, tableGatewayServices); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
}
// Also clean up associations in the mesh topology table for ingress gateways
if kind == structs.IngressGateway {
if _, err := tx.DeleteAll(topologyTableName, "downstream", sn); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
if _, err := tx.DeleteAll(tableMeshTopology, "downstream", sn); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err)
}
}

View file

@ -3,8 +3,9 @@ package state
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/structs"
)
const (
@ -101,7 +102,7 @@ func updateUsage(tx WriteTxn, changes Changes) error {
// This will happen when restoring from a snapshot, just take the max index
// of the tables we are tracking.
if idx == 0 {
idx = maxIndexTxn(tx, "nodes", servicesTableName)
idx = maxIndexTxn(tx, "nodes", tableServices)
}
return writeUsageDeltas(tx, idx, usageDeltas)
@ -110,7 +111,7 @@ func updateUsage(tx WriteTxn, changes Changes) error {
func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) {
serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges))
for svc, delta := range serviceNameChanges {
serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.Name, &svc.EnterpriseMeta)
serviceIter, err := getWithTxn(tx, tableServices, "service", svc.Name, &svc.EnterpriseMeta)
if err != nil {
return nil, err
}
@ -226,7 +227,7 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
serviceInstances, err := firstUsageEntry(tx, servicesTableName)
serviceInstances, err := firstUsageEntry(tx, tableServices)
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}