state: convert the remaining functions to ReadTxn

Required also converting some of the transaction functions to WriteTxn
because TxnRO() called the same helper as TxnRW.

This change allows us to return a memdb.Txn for read-only txn instead of
wrapping them with state.txn.
This commit is contained in:
Daniel Nephin 2020-09-03 19:38:03 -04:00
parent 26387cdc0e
commit f9b2834171
20 changed files with 171 additions and 175 deletions

View File

@ -434,7 +434,7 @@ func resolveTokenPolicyLinks(tx *txn, token *structs.ACLToken, allowMissing bool
// stale when a linked policy was deleted or renamed. This will correct them and generate a newly allocated
// token only when fixes are needed. If the policy links are still accurate then we just return the original
// token.
func fixupTokenPolicyLinks(tx *txn, original *structs.ACLToken) (*structs.ACLToken, error) {
func fixupTokenPolicyLinks(tx ReadTxn, original *structs.ACLToken) (*structs.ACLToken, error) {
owned := false
token := original
@ -508,7 +508,7 @@ func resolveTokenRoleLinks(tx *txn, token *structs.ACLToken, allowMissing bool)
// stale when a linked role was deleted or renamed. This will correct them and generate a newly allocated
// token only when fixes are needed. If the role links are still accurate then we just return the original
// token.
func fixupTokenRoleLinks(tx *txn, original *structs.ACLToken) (*structs.ACLToken, error) {
func fixupTokenRoleLinks(tx ReadTxn, original *structs.ACLToken) (*structs.ACLToken, error) {
owned := false
token := original
@ -824,7 +824,7 @@ func (s *Store) ACLTokenBatchGet(ws memdb.WatchSet, accessors []string) (uint64,
return idx, tokens, nil
}
func aclTokenGetTxn(tx *txn, ws memdb.WatchSet, value, index string, entMeta *structs.EnterpriseMeta) (*structs.ACLToken, error) {
func aclTokenGetTxn(tx ReadTxn, ws memdb.WatchSet, value, index string, entMeta *structs.EnterpriseMeta) (*structs.ACLToken, error) {
watchCh, rawToken, err := aclTokenGetFromIndex(tx, value, index, entMeta)
if err != nil {
return nil, fmt.Errorf("failed acl token lookup: %v", err)
@ -1794,7 +1794,7 @@ func (s *Store) aclAuthMethodGet(ws memdb.WatchSet, name string, entMeta *struct
return idx, method, nil
}
func getAuthMethodWithTxn(tx *txn, ws memdb.WatchSet, name string, entMeta *structs.EnterpriseMeta) (*structs.ACLAuthMethod, error) {
func getAuthMethodWithTxn(tx ReadTxn, ws memdb.WatchSet, name string, entMeta *structs.EnterpriseMeta) (*structs.ACLAuthMethod, error) {
watchCh, rawMethod, err := aclAuthMethodGetByName(tx, name, entMeta)
if err != nil {
return nil, fmt.Errorf("failed acl auth method lookup: %v", err)

View File

@ -243,7 +243,7 @@ func aclPolicyDeleteWithPolicy(tx *txn, policy *structs.ACLPolicy, idx uint64) e
return nil
}
func aclPolicyMaxIndex(tx *txn, _ *structs.ACLPolicy, _ *structs.EnterpriseMeta) uint64 {
func aclPolicyMaxIndex(tx ReadTxn, _ *structs.ACLPolicy, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-policies")
}
@ -273,19 +273,19 @@ func aclTokenInsert(tx *txn, token *structs.ACLToken) error {
return nil
}
func aclTokenGetFromIndex(tx *txn, id string, index string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
func aclTokenGetFromIndex(tx ReadTxn, id string, index string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("acl-tokens", index, id)
}
func aclTokenListAll(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclTokenListAll(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "id")
}
func aclTokenListLocal(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclTokenListLocal(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "local", true)
}
func aclTokenListGlobal(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclTokenListGlobal(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "local", false)
}
@ -297,7 +297,7 @@ func aclTokenListByRole(tx ReadTxn, role string, _ *structs.EnterpriseMeta) (mem
return tx.Get("acl-tokens", "roles", role)
}
func aclTokenListByAuthMethod(tx *txn, authMethod string, _, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclTokenListByAuthMethod(tx ReadTxn, authMethod string, _, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "authmethod", authMethod)
}
@ -314,7 +314,7 @@ func aclTokenDeleteWithToken(tx *txn, token *structs.ACLToken, idx uint64) error
return nil
}
func aclTokenMaxIndex(tx *txn, _ *structs.ACLToken, entMeta *structs.EnterpriseMeta) uint64 {
func aclTokenMaxIndex(tx ReadTxn, _ *structs.ACLToken, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-tokens")
}
@ -372,7 +372,7 @@ func aclRoleDeleteWithRole(tx *txn, role *structs.ACLRole, idx uint64) error {
return nil
}
func aclRoleMaxIndex(tx *txn, _ *structs.ACLRole, _ *structs.EnterpriseMeta) uint64 {
func aclRoleMaxIndex(tx ReadTxn, _ *structs.ACLRole, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-roles")
}
@ -402,15 +402,15 @@ func aclBindingRuleInsert(tx *txn, rule *structs.ACLBindingRule) error {
return nil
}
func aclBindingRuleGetByID(tx *txn, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
func aclBindingRuleGetByID(tx ReadTxn, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("acl-binding-rules", "id", id)
}
func aclBindingRuleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclBindingRuleList(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-binding-rules", "id")
}
func aclBindingRuleListByAuthMethod(tx *txn, method string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclBindingRuleListByAuthMethod(tx ReadTxn, method string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-binding-rules", "authmethod", method)
}
@ -427,7 +427,7 @@ func aclBindingRuleDeleteWithRule(tx *txn, rule *structs.ACLBindingRule, idx uin
return nil
}
func aclBindingRuleMaxIndex(tx *txn, _ *structs.ACLBindingRule, entMeta *structs.EnterpriseMeta) uint64 {
func aclBindingRuleMaxIndex(tx ReadTxn, _ *structs.ACLBindingRule, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-binding-rules")
}
@ -457,11 +457,11 @@ func aclAuthMethodInsert(tx *txn, method *structs.ACLAuthMethod) error {
return nil
}
func aclAuthMethodGetByName(tx *txn, method string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
func aclAuthMethodGetByName(tx ReadTxn, method string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("acl-auth-methods", "id", method)
}
func aclAuthMethodList(tx *txn, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclAuthMethodList(tx ReadTxn, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-auth-methods", "id")
}
@ -478,7 +478,7 @@ func aclAuthMethodDeleteWithMethod(tx *txn, method *structs.ACLAuthMethod, idx u
return nil
}
func aclAuthMethodMaxIndex(tx *txn, _ *structs.ACLAuthMethod, entMeta *structs.EnterpriseMeta) uint64 {
func aclAuthMethodMaxIndex(tx ReadTxn, _ *structs.ACLAuthMethod, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-auth-methods")
}

View File

@ -6,15 +6,16 @@ import (
"reflect"
"strings"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
"github.com/mitchellh/copystructure"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
"github.com/mitchellh/copystructure"
)
const (
@ -262,10 +263,7 @@ func (s *Snapshot) Checks(node string) (memdb.ResultIterator, error) {
// performed within a single transaction to avoid race conditions on state
// updates.
func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error {
if err := s.store.ensureRegistrationTxn(s.tx, idx, true, req); err != nil {
return err
}
return nil
return s.store.ensureRegistrationTxn(s.tx, idx, true, req)
}
// EnsureRegistration is used to make sure a node, service, and check
@ -282,7 +280,7 @@ func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) err
return tx.Commit()
}
func (s *Store) ensureCheckIfNodeMatches(tx *txn, idx uint64, preserveIndexes bool, node string, check *structs.HealthCheck) error {
func (s *Store) ensureCheckIfNodeMatches(tx WriteTxn, idx uint64, preserveIndexes bool, node string, check *structs.HealthCheck) error {
if check.Node != node {
return fmt.Errorf("check node %q does not match node %q",
check.Node, node)
@ -296,7 +294,7 @@ func (s *Store) ensureCheckIfNodeMatches(tx *txn, idx uint64, preserveIndexes bo
// ensureRegistrationTxn is used to make sure a node, service, and check
// registration is performed within a single transaction to avoid race
// conditions on state updates.
func (s *Store) ensureRegistrationTxn(tx *txn, idx uint64, preserveIndexes bool, req *structs.RegisterRequest) error {
func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes bool, req *structs.RegisterRequest) error {
if _, err := validateRegisterRequestTxn(tx, req); err != nil {
return err
}
@ -378,7 +376,7 @@ func (s *Store) EnsureNode(idx uint64, node *structs.Node) error {
// ensureNoNodeWithSimilarNameTxn checks that no other node has conflict in its name
// If allowClashWithoutID then, getting a conflict on another node without ID will be allowed
func ensureNoNodeWithSimilarNameTxn(tx *txn, node *structs.Node, allowClashWithoutID bool) error {
func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWithoutID bool) error {
// Retrieve all of the nodes
enodes, err := tx.Get("nodes", "id")
if err != nil {
@ -414,7 +412,7 @@ func ensureNoNodeWithSimilarNameTxn(tx *txn, node *structs.Node, allowClashWitho
// ensureNodeCASTxn updates a node only if the existing index matches the given index.
// Returns a bool indicating if a write happened and any error.
func (s *Store) ensureNodeCASTxn(tx *txn, idx uint64, node *structs.Node) (bool, error) {
func (s *Store) ensureNodeCASTxn(tx WriteTxn, idx uint64, node *structs.Node) (bool, error) {
// Retrieve the existing entry.
existing, err := getNodeTxn(tx, node.Node)
if err != nil {
@ -444,7 +442,7 @@ func (s *Store) ensureNodeCASTxn(tx *txn, idx uint64, node *structs.Node) (bool,
// ensureNodeTxn is the inner function called to actually create a node
// registration or modify an existing one in the state store. It allows
// passing in a memdb transaction so it may be part of a larger txn.
func (s *Store) ensureNodeTxn(tx *txn, idx uint64, preserveIndexes bool, node *structs.Node) error {
func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, node *structs.Node) error {
// See if there's an existing node with this UUID, and make sure the
// name is the same.
var n *structs.Node
@ -546,7 +544,7 @@ func (s *Store) GetNode(id string) (uint64, *structs.Node, error) {
return idx, node, nil
}
func getNodeTxn(tx *txn, nodeName string) (*structs.Node, error) {
func getNodeTxn(tx ReadTxn, nodeName string) (*structs.Node, error) {
node, err := tx.First("nodes", "id", nodeName)
if err != nil {
return nil, fmt.Errorf("node lookup failed: %s", err)
@ -557,7 +555,7 @@ func getNodeTxn(tx *txn, nodeName string) (*structs.Node, error) {
return nil, nil
}
func getNodeIDTxn(tx *txn, id types.NodeID) (*structs.Node, error) {
func getNodeIDTxn(tx ReadTxn, id types.NodeID) (*structs.Node, error) {
strnode := string(id)
uuidValue, err := uuid.ParseUUID(strnode)
if err != nil {
@ -657,7 +655,7 @@ func (s *Store) DeleteNode(idx uint64, nodeName string) error {
// deleteNodeCASTxn is used to try doing a node delete operation with a given
// raft index. If the CAS index specified is not equal to the last observed index for
// the given check, then the call is a noop, otherwise a normal check delete is invoked.
func (s *Store) deleteNodeCASTxn(tx *txn, idx, cidx uint64, nodeName string) (bool, error) {
func (s *Store) deleteNodeCASTxn(tx WriteTxn, idx, cidx uint64, nodeName string) (bool, error) {
// Look up the node.
node, err := getNodeTxn(tx, nodeName)
if err != nil {
@ -684,7 +682,7 @@ func (s *Store) deleteNodeCASTxn(tx *txn, idx, cidx uint64, nodeName string) (bo
// deleteNodeTxn is the inner method used for removing a node from
// the store within a given transaction.
func (s *Store) deleteNodeTxn(tx *txn, idx uint64, nodeName string) error {
func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
// Look up the node.
node, err := tx.First("nodes", "id", nodeName)
if err != nil {
@ -791,7 +789,7 @@ var errCASCompareFailed = errors.New("compare-and-set: comparison failed")
// ensureServiceCASTxn updates a service only if the existing index matches the given index.
// Returns an error if the write didn't happen and nil if write was successful.
func ensureServiceCASTxn(tx *txn, idx uint64, node string, svc *structs.NodeService) error {
func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.NodeService) error {
// Retrieve the existing service.
_, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID)
if err != nil {
@ -816,7 +814,7 @@ func ensureServiceCASTxn(tx *txn, idx uint64, node string, svc *structs.NodeServ
// ensureServiceTxn is used to upsert a service registration within an
// existing memdb transaction.
func ensureServiceTxn(tx *txn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error {
func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error {
// Check for existing service
_, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID)
if err != nil {
@ -921,7 +919,7 @@ func (s *Store) ServiceList(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta)
return serviceListTxn(tx, ws, entMeta)
}
func serviceListTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) {
func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) {
idx := catalogServicesMaxIndex(tx, entMeta)
services, err := catalogServiceList(tx, entMeta, true)
@ -1025,7 +1023,7 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string,
// * return when the last instance of a service is removed
// * block until an instance for this service is available, or another
// service is unregistered.
func maxIndexForService(tx *txn, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) uint64 {
func maxIndexForService(tx ReadTxn, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) uint64 {
idx, _ := maxIndexAndWatchChForService(tx, serviceName, serviceExists, checks, entMeta)
return idx
}
@ -1044,7 +1042,7 @@ func maxIndexForService(tx *txn, serviceName string, serviceExists, checks bool,
// returned for the chan. This allows for blocking watchers to _only_ watch this
// one chan in the common case, falling back to watching all touched MemDB
// indexes in more complicated cases.
func maxIndexAndWatchChForService(tx *txn, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) (uint64, <-chan struct{}) {
func maxIndexAndWatchChForService(tx ReadTxn, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) (uint64, <-chan struct{}) {
if !serviceExists {
res, err := catalogServiceLastExtinctionIndex(tx, entMeta)
if missingIdx, ok := res.(*IndexEntry); ok && err == nil {
@ -1061,7 +1059,7 @@ func maxIndexAndWatchChForService(tx *txn, serviceName string, serviceExists, ch
}
// Wrapper for maxIndexAndWatchChForService that operates on a list of ServiceNodes
func maxIndexAndWatchChsForServiceNodes(tx *txn,
func maxIndexAndWatchChsForServiceNodes(tx ReadTxn,
nodes structs.ServiceNodes, watchChecks bool) (uint64, []<-chan struct{}) {
var watchChans []<-chan struct{}
@ -1268,7 +1266,7 @@ func (s *Store) ServiceAddressNodes(ws memdb.WatchSet, address string, entMeta *
// parseServiceNodes iterates over a services query and fills in the node details,
// returning a ServiceNodes slice.
func parseServiceNodes(tx *txn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) {
func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) {
// We don't want to track an unlimited number of nodes, so we pull a
// top-level watch to use as a fallback.
allNodes, err := tx.Get("nodes", "id")
@ -1325,7 +1323,7 @@ func (s *Store) NodeService(nodeName string, serviceID string, entMeta *structs.
return idx, service, nil
}
func getNodeServiceTxn(tx *txn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) {
func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) {
// Query the service
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
if err != nil {
@ -1467,7 +1465,7 @@ func (s *Store) DeleteService(idx uint64, nodeName, serviceID string, entMeta *s
// deleteServiceCASTxn is used to try doing a service delete operation with a given
// raft index. If the CAS index specified is not equal to the last observed index for
// the given service, then the call is a noop, otherwise a normal delete is invoked.
func (s *Store) deleteServiceCASTxn(tx *txn, idx, cidx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (bool, error) {
func (s *Store) deleteServiceCASTxn(tx WriteTxn, idx, cidx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (bool, error) {
// Look up the service.
service, err := getNodeServiceTxn(tx, nodeName, serviceID, entMeta)
if err != nil {
@ -1494,7 +1492,7 @@ func (s *Store) deleteServiceCASTxn(tx *txn, idx, cidx uint64, nodeName, service
// deleteServiceTxn is the inner method called to remove a service
// registration within an existing transaction.
func (s *Store) deleteServiceTxn(tx *txn, idx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) error {
func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) error {
// Look up the service.
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
if err != nil {
@ -1589,7 +1587,7 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
}
// updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node
func updateAllServiceIndexesOfNode(tx *txn, idx uint64, nodeID string) error {
func updateAllServiceIndexesOfNode(tx WriteTxn, idx uint64, nodeID string) error {
services, err := tx.Get("services", "node", nodeID)
if err != nil {
return fmt.Errorf("failed updating services for node %s: %s", nodeID, err)
@ -1608,7 +1606,7 @@ func updateAllServiceIndexesOfNode(tx *txn, idx uint64, nodeID string) error {
// ensureCheckCASTxn updates a check only if the existing index matches the given index.
// Returns a bool indicating if a write happened and any error.
func (s *Store) ensureCheckCASTxn(tx *txn, idx uint64, hc *structs.HealthCheck) (bool, error) {
func (s *Store) ensureCheckCASTxn(tx WriteTxn, idx uint64, hc *structs.HealthCheck) (bool, error) {
// Retrieve the existing entry.
_, existing, err := getNodeCheckTxn(tx, hc.Node, hc.CheckID, &hc.EnterpriseMeta)
if err != nil {
@ -1638,7 +1636,7 @@ func (s *Store) ensureCheckCASTxn(tx *txn, idx uint64, hc *structs.HealthCheck)
// ensureCheckTxn is used as the inner method to handle inserting
// a health check into the state store. It ensures safety against inserting
// checks with no matching node or service.
func (s *Store) ensureCheckTxn(tx *txn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error {
func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error {
// Check if we have an existing health check
_, existing, err := firstWatchCompoundWithTxn(tx, "checks", "id", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID))
if err != nil {
@ -1743,7 +1741,7 @@ func (s *Store) NodeCheck(nodeName string, checkID types.CheckID, entMeta *struc
// nodeCheckTxn is used as the inner method to handle reading a health check
// from the state store.
func getNodeCheckTxn(tx *txn, nodeName string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (uint64, *structs.HealthCheck, error) {
func getNodeCheckTxn(tx ReadTxn, nodeName string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (uint64, *structs.HealthCheck, error) {
// Get the table index.
idx := catalogChecksMaxIndex(tx, entMeta)
@ -1859,7 +1857,7 @@ func (s *Store) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, filters
return parseChecksByNodeMeta(tx, ws, idx, iter, filters)
}
func checksInStateTxn(tx *txn, ws memdb.WatchSet, state string, entMeta *structs.EnterpriseMeta) (uint64, memdb.ResultIterator, error) {
func checksInStateTxn(tx ReadTxn, ws memdb.WatchSet, state string, entMeta *structs.EnterpriseMeta) (uint64, memdb.ResultIterator, error) {
// Get the table index.
idx := catalogChecksMaxIndex(tx, entMeta)
@ -1881,7 +1879,7 @@ func checksInStateTxn(tx *txn, ws memdb.WatchSet, state string, entMeta *structs
// parseChecksByNodeMeta is a helper function used to deduplicate some
// repetitive code for returning health checks filtered by node metadata fields.
func parseChecksByNodeMeta(tx *txn, ws memdb.WatchSet,
func parseChecksByNodeMeta(tx ReadTxn, ws memdb.WatchSet,
idx uint64, iter memdb.ResultIterator, filters map[string]string) (uint64, structs.HealthChecks, error) {
// We don't want to track an unlimited number of nodes, so we pull a
@ -1930,7 +1928,7 @@ func (s *Store) DeleteCheck(idx uint64, node string, checkID types.CheckID, entM
// deleteCheckCASTxn is used to try doing a check delete operation with a given
// raft index. If the CAS index specified is not equal to the last observed index for
// the given check, then the call is a noop, otherwise a normal check delete is invoked.
func (s *Store) deleteCheckCASTxn(tx *txn, idx, cidx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (bool, error) {
func (s *Store) deleteCheckCASTxn(tx WriteTxn, idx, cidx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (bool, error) {
// Try to retrieve the existing health check.
_, hc, err := getNodeCheckTxn(tx, node, checkID, entMeta)
if err != nil {
@ -1957,7 +1955,7 @@ func (s *Store) deleteCheckCASTxn(tx *txn, idx, cidx uint64, node string, checkI
// deleteCheckTxn is the inner method used to call a health
// check deletion within an existing transaction.
func (s *Store) deleteCheckTxn(tx *txn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error {
func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error {
// Try to retrieve the existing health check.
_, hc, err := firstWatchCompoundWithTxn(tx, "checks", "id", entMeta, node, string(checkID))
if err != nil {
@ -2101,7 +2099,7 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
return checkServiceNodesTxn(tx, ws, serviceName, connect, entMeta)
}
func checkServiceNodesTxn(tx *txn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
func checkServiceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
// Function for lookup
index := "service"
if connect {
@ -2279,7 +2277,7 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru
// and query for an associated node and a set of checks. This is the inner
// method used to return a rich set of results from a more simple query.
func parseCheckServiceNodes(
tx *txn, ws memdb.WatchSet, idx uint64,
tx ReadTxn, ws memdb.WatchSet, idx uint64,
services structs.ServiceNodes,
err error) (uint64, structs.CheckServiceNodes, error) {
if err != nil {
@ -2404,7 +2402,7 @@ func (s *Store) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind
}
}
func serviceDumpAllTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
func serviceDumpAllTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
// Get the table index
idx := catalogMaxIndexWatch(tx, ws, entMeta, true)
@ -2422,7 +2420,7 @@ func serviceDumpAllTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMe
return parseCheckServiceNodes(tx, nil, idx, results, err)
}
func serviceDumpKindTxn(tx *txn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
func serviceDumpKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
// unlike when we are dumping all services here we only need to watch the kind specific index entry for changing (or nodes, checks)
// updating any services, nodes or checks will bump the appropriate service kind index so there is no need to watch any of the individual
// entries
@ -2446,7 +2444,7 @@ func serviceDumpKindTxn(tx *txn, ws memdb.WatchSet, kind structs.ServiceKind, en
// parseNodes takes an iterator over a set of nodes and returns a struct
// containing the nodes along with all of their associated services
// and/or health checks.
func parseNodes(tx *txn, ws memdb.WatchSet, idx uint64,
func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64,
iter memdb.ResultIterator, entMeta *structs.EnterpriseMeta) (uint64, structs.NodeDump, error) {
// We don't want to track an unlimited number of services, so we pull a
@ -2506,7 +2504,7 @@ func parseNodes(tx *txn, ws memdb.WatchSet, idx uint64,
}
// checkSessionsTxn returns the IDs of all sessions associated with a health check
func checkSessionsTxn(tx *txn, hc *structs.HealthCheck) ([]*sessionCheck, error) {
func checkSessionsTxn(tx ReadTxn, hc *structs.HealthCheck) ([]*sessionCheck, error) {
mappings, err := getCompoundWithTxn(tx, "session_checks", "node_check", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID))
if err != nil {
return nil, fmt.Errorf("failed session checks lookup: %s", err)
@ -2520,7 +2518,7 @@ func checkSessionsTxn(tx *txn, hc *structs.HealthCheck) ([]*sessionCheck, error)
}
// updateGatewayServices associates services with gateways as specified in a gateway config entry
func updateGatewayServices(tx *txn, idx uint64, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) error {
func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) error {
var (
noChange bool
gatewayServices structs.GatewayServices
@ -2582,7 +2580,7 @@ func updateGatewayServices(tx *txn, idx uint64, conf structs.ConfigEntry, entMet
// insertion into the memdb table, specific to ingress gateways. The boolean
// returned indicates that there are no changes necessary to the memdb table.
func ingressConfigGatewayServices(
tx *txn,
tx ReadTxn,
gateway structs.ServiceName,
conf structs.ConfigEntry,
entMeta *structs.EnterpriseMeta,
@ -2627,7 +2625,7 @@ func ingressConfigGatewayServices(
// boolean returned indicates that there are no changes necessary to the memdb
// table.
func terminatingConfigGatewayServices(
tx *txn,
tx ReadTxn,
gateway structs.ServiceName,
conf structs.ConfigEntry,
entMeta *structs.EnterpriseMeta,
@ -2667,7 +2665,7 @@ func terminatingConfigGatewayServices(
}
// updateGatewayNamespace is used to target all services within a namespace
func updateGatewayNamespace(tx *txn, idx uint64, service *structs.GatewayService, entMeta *structs.EnterpriseMeta) error {
func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewayService, entMeta *structs.EnterpriseMeta) error {
services, err := catalogServiceListByKind(tx, structs.ServiceKindTypical, entMeta)
if err != nil {
return fmt.Errorf("failed querying services: %s", err)
@ -2714,7 +2712,7 @@ func updateGatewayNamespace(tx *txn, idx uint64, service *structs.GatewayService
// updateGatewayService associates services with gateways after an eligible event
// ie. Registering a service in a namespace targeted by a gateway
func updateGatewayService(tx *txn, idx uint64, mapping *structs.GatewayService) error {
func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error {
// Check if mapping already exists in table if it's already in the table
// Avoid insert if nothing changed
existing, err := tx.First(gatewayServicesTableName, "id", mapping.Gateway, mapping.Service, mapping.Port)
@ -2749,7 +2747,7 @@ func updateGatewayService(tx *txn, idx uint64, mapping *structs.GatewayService)
// checkWildcardForGatewaysAndUpdate checks whether a service matches a
// wildcard definition in gateway config entries and if so adds it the the
// gateway-services table.
func checkGatewayWildcardsAndUpdate(tx *txn, idx uint64, svc *structs.NodeService) error {
func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.NodeService) error {
// Do not associate non-typical services with gateways or consul services
if svc.Kind != structs.ServiceKindTypical || svc.Service == "consul" {
return nil
@ -2776,7 +2774,7 @@ func checkGatewayWildcardsAndUpdate(tx *txn, idx uint64, svc *structs.NodeServic
return nil
}
func cleanupGatewayWildcards(tx *txn, idx uint64, svc *structs.ServiceNode) error {
func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) error {
// Clean up association between service name and gateways if needed
gateways, err := serviceGateways(tx, svc.ServiceName, &svc.EnterpriseMeta)
if err != nil {
@ -2805,11 +2803,11 @@ func cleanupGatewayWildcards(tx *txn, idx uint64, svc *structs.ServiceNode) erro
// serviceGateways returns all GatewayService entries with the given service name. This effectively looks up
// all the gateways mapped to this service.
func serviceGateways(tx *txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func serviceGateways(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(gatewayServicesTableName, "service", structs.NewServiceName(name, entMeta))
}
func gatewayServices(tx *txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func gatewayServices(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(gatewayServicesTableName, "gateway", structs.NewServiceName(name, entMeta))
}
@ -2832,7 +2830,7 @@ func (s *Store) DumpGatewayServices(ws memdb.WatchSet) (uint64, structs.GatewayS
return lib.MaxUint64(maxIdx, idx), results, nil
}
func (s *Store) collectGatewayServices(tx *txn, ws memdb.WatchSet, iter memdb.ResultIterator) (uint64, structs.GatewayServices, error) {
func (s *Store) collectGatewayServices(tx ReadTxn, ws memdb.WatchSet, iter memdb.ResultIterator) (uint64, structs.GatewayServices, error) {
var maxIdx uint64
var results structs.GatewayServices
@ -2858,7 +2856,7 @@ func (s *Store) collectGatewayServices(tx *txn, ws memdb.WatchSet, iter memdb.Re
// TODO(ingress): How to handle index rolling back when a config entry is
// deleted that references a service?
// We might need something like the service_last_extinction index?
func serviceGatewayNodes(tx *txn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
func serviceGatewayNodes(tx ReadTxn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
// Look up gateway name associated with the service
gws, err := serviceGateways(tx, service, entMeta)
if err != nil {
@ -3060,7 +3058,7 @@ func (s *Store) ServiceTopology(
// combinedServiceNodesTxn returns typical and connect endpoints for a list of services.
// This enabled aggregating checks statuses across both.
func (s *Store) combinedServiceNodesTxn(tx *txn, ws memdb.WatchSet, names []structs.ServiceName) (uint64, structs.CheckServiceNodes, error) {
func (s *Store) combinedServiceNodesTxn(tx ReadTxn, ws memdb.WatchSet, names []structs.ServiceName) (uint64, structs.CheckServiceNodes, error) {
var (
maxIdx uint64
resp structs.CheckServiceNodes
@ -3177,7 +3175,7 @@ func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.Se
}
// updateMeshTopology creates associations between the input service and its upstreams in the topology table
func updateMeshTopology(tx *txn, idx uint64, node string, svc *structs.NodeService, existing interface{}) error {
func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeService, existing interface{}) error {
oldUpstreams := make(map[structs.ServiceName]bool)
if e, ok := existing.(*structs.ServiceNode); ok {
for _, u := range e.ServiceProxy.Upstreams {
@ -3257,7 +3255,7 @@ func updateMeshTopology(tx *txn, idx uint64, node string, svc *structs.NodeServi
// cleanupMeshTopology removes a service from the mesh topology table
// This is only safe to call when there are no more known instances of this proxy
func cleanupMeshTopology(tx *txn, idx uint64, service *structs.ServiceNode) error {
func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode) error {
if service.ServiceKind != structs.ServiceKindConnectProxy {
return nil
}
@ -3294,7 +3292,7 @@ func cleanupMeshTopology(tx *txn, idx uint64, service *structs.ServiceNode) erro
return nil
}
func insertGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.GatewayService) error {
func insertGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.GatewayService) error {
// Only ingress gateways are standalone items in the mesh topology viz
if gs.GatewayKind != structs.ServiceKindIngressGateway || gs.Service.Name == structs.WildcardSpecifier {
return nil
@ -3315,7 +3313,7 @@ func insertGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.Gatewa
return nil
}
func deleteGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.GatewayService) error {
func deleteGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.GatewayService) error {
// Only ingress gateways are standalone items in the mesh topology viz
if gs.GatewayKind != structs.ServiceKindIngressGateway {
return nil
@ -3331,7 +3329,7 @@ func deleteGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.Gatewa
return nil
}
func truncateGatewayServiceTopologyMappings(tx *txn, idx uint64, gateway structs.ServiceName, kind string) error {
func truncateGatewayServiceTopologyMappings(tx WriteTxn, idx uint64, gateway structs.ServiceName, kind string) error {
// Only ingress gateways are standalone items in the mesh topology viz
if kind != string(structs.ServiceKindIngressGateway) {
return nil

View File

@ -168,7 +168,7 @@ func serviceKindIndexName(kind structs.ServiceKind, _ *structs.EnterpriseMeta) s
}
}
func catalogUpdateServicesIndexes(tx *txn, idx uint64, _ *structs.EnterpriseMeta) error {
func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
// overall services index
if err := indexUpdateMaxTxn(tx, idx, "services"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
@ -177,7 +177,7 @@ func catalogUpdateServicesIndexes(tx *txn, idx uint64, _ *structs.EnterpriseMeta
return nil
}
func catalogUpdateServiceKindIndexes(tx *txn, kind structs.ServiceKind, idx uint64, _ *structs.EnterpriseMeta) error {
func catalogUpdateServiceKindIndexes(tx WriteTxn, kind structs.ServiceKind, idx uint64, _ *structs.EnterpriseMeta) error {
// service-kind index
if err := indexUpdateMaxTxn(tx, idx, serviceKindIndexName(kind, nil)); err != nil {
return fmt.Errorf("failed updating index: %s", err)
@ -186,7 +186,7 @@ func catalogUpdateServiceKindIndexes(tx *txn, kind structs.ServiceKind, idx uint
return nil
}
func catalogUpdateServiceIndexes(tx *txn, serviceName string, idx uint64, _ *structs.EnterpriseMeta) error {
func catalogUpdateServiceIndexes(tx WriteTxn, serviceName string, idx uint64, _ *structs.EnterpriseMeta) error {
// per-service index
if err := indexUpdateMaxTxn(tx, idx, serviceIndexName(serviceName, nil)); err != nil {
return fmt.Errorf("failed updating index: %s", err)
@ -195,14 +195,14 @@ func catalogUpdateServiceIndexes(tx *txn, serviceName string, idx uint64, _ *str
return nil
}
func catalogUpdateServiceExtinctionIndex(tx *txn, idx uint64, _ *structs.EnterpriseMeta) error {
func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
if err := tx.Insert("index", &IndexEntry{serviceLastExtinctionIndexName, idx}); err != nil {
return fmt.Errorf("failed updating missing service extinction index: %s", err)
}
return nil
}
func catalogInsertService(tx *txn, svc *structs.ServiceNode) error {
func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error {
// Insert the service and update the index
if err := tx.Insert("services", svc); err != nil {
return fmt.Errorf("failed inserting service: %s", err)
@ -269,7 +269,7 @@ func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMe
return maxIndexWatchTxn(tx, ws, "nodes", "services")
}
func catalogUpdateCheckIndexes(tx *txn, idx uint64, _ *structs.EnterpriseMeta) error {
func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
// update the universal index entry
if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
@ -306,7 +306,7 @@ func catalogListServiceChecks(tx ReadTxn, node string, service string, _ *struct
return tx.Get("checks", "node_service", node, service)
}
func catalogInsertCheck(tx *txn, chk *structs.HealthCheck, idx uint64) error {
func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error {
// Insert the check
if err := tx.Insert("checks", chk); err != nil {
return fmt.Errorf("failed inserting check: %s", err)
@ -323,7 +323,7 @@ func catalogChecksForNodeService(tx ReadTxn, node string, service string, entMet
return tx.Get("checks", "node_service", node, service)
}
func validateRegisterRequestTxn(_ *txn, _ *structs.RegisterRequest) (*structs.EnterpriseMeta, error) {
func validateRegisterRequestTxn(_ ReadTxn, _ *structs.RegisterRequest) (*structs.EnterpriseMeta, error) {
return nil, nil
}

View File

@ -4415,12 +4415,12 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) {
require.NoError(t, tx.Commit())
// ensure no update happened
tx = s.db.Txn(false)
roTxn := s.db.Txn(false)
_, nsRead, err := s.NodeService("node1", "foo", nil)
require.NoError(t, err)
require.NotNil(t, nsRead)
require.Equal(t, uint64(2), nsRead.ModifyIndex)
require.NoError(t, tx.Commit())
roTxn.Commit()
ns.ModifyIndex = 99
// attempt to update with a non-matching index
@ -4430,12 +4430,12 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) {
require.NoError(t, tx.Commit())
// ensure no update happened
tx = s.db.Txn(false)
roTxn = s.db.Txn(false)
_, nsRead, err = s.NodeService("node1", "foo", nil)
require.NoError(t, err)
require.NotNil(t, nsRead)
require.Equal(t, uint64(2), nsRead.ModifyIndex)
require.NoError(t, tx.Commit())
roTxn.Commit()
ns.ModifyIndex = 2
// update with the matching modify index
@ -4445,12 +4445,12 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) {
require.NoError(t, tx.Commit())
// ensure the update happened
tx = s.db.Txn(false)
roTxn = s.db.Txn(false)
_, nsRead, err = s.NodeService("node1", "foo", nil)
require.NoError(t, err)
require.NotNil(t, nsRead)
require.Equal(t, uint64(7), nsRead.ModifyIndex)
require.NoError(t, tx.Commit())
roTxn.Commit()
}
func TestStateStore_GatewayServices_Terminating(t *testing.T) {

View File

@ -1,12 +1,12 @@
package state
import (
"encoding/json"
"fmt"
"sort"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/structs"
)
type ServiceIntentionLegacyIDIndex struct {
@ -123,7 +123,7 @@ func (s *ServiceIntentionSourceIndex) FromArgs(args ...interface{}) ([]byte, err
return []byte(arg.String() + "\x00"), nil
}
func (s *Store) configIntentionsListTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, bool, error) {
func (s *Store) configIntentionsListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, bool, error) {
// unrolled part of configEntriesByKindTxn
idx := maxIndexTxn(tx, configTableName)
@ -144,7 +144,7 @@ func (s *Store) configIntentionsListTxn(tx *txn, ws memdb.WatchSet, entMeta *str
return idx, results, true, nil
}
func (s *Store) configIntentionGetTxn(tx *txn, ws memdb.WatchSet, id string) (uint64, *structs.ServiceIntentionsConfigEntry, *structs.Intention, error) {
func (s *Store) configIntentionGetTxn(tx ReadTxn, ws memdb.WatchSet, id string) (uint64, *structs.ServiceIntentionsConfigEntry, *structs.Intention, error) {
idx := maxIndexTxn(tx, configTableName)
if idx < 1 {
idx = 1
@ -173,7 +173,7 @@ func (s *Store) configIntentionGetTxn(tx *txn, ws memdb.WatchSet, id string) (ui
return idx, nil, nil, nil // Shouldn't happen.
}
func (s *Store) configIntentionGetExactTxn(tx *txn, ws memdb.WatchSet, args *structs.IntentionQueryExact) (uint64, *structs.ServiceIntentionsConfigEntry, *structs.Intention, error) {
func (s *Store) configIntentionGetExactTxn(tx ReadTxn, ws memdb.WatchSet, args *structs.IntentionQueryExact) (uint64, *structs.ServiceIntentionsConfigEntry, *structs.Intention, error) {
if err := args.Validate(); err != nil {
return 0, nil, nil, err
}
@ -196,7 +196,7 @@ func (s *Store) configIntentionGetExactTxn(tx *txn, ws memdb.WatchSet, args *str
return idx, nil, nil, nil
}
func (s *Store) configIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *structs.IntentionQueryMatch) (uint64, []structs.Intentions, error) {
func (s *Store) configIntentionMatchTxn(tx ReadTxn, ws memdb.WatchSet, args *structs.IntentionQueryMatch) (uint64, []structs.Intentions, error) {
maxIndex := uint64(1)
// Make all the calls and accumulate the results
@ -207,7 +207,7 @@ func (s *Store) configIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *struct
// improving that in the future, the test cases shouldn't have to
// change for that.
index, ixns, err := s.configIntentionMatchOneTxn(tx, ws, entry, args.Type)
index, ixns, err := configIntentionMatchOneTxn(tx, ws, entry, args.Type)
if err != nil {
return 0, nil, err
}
@ -222,23 +222,23 @@ func (s *Store) configIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *struct
return maxIndex, results, nil
}
func (s *Store) configIntentionMatchOneTxn(
tx *txn,
func configIntentionMatchOneTxn(
tx ReadTxn,
ws memdb.WatchSet,
matchEntry structs.IntentionMatchEntry,
matchType structs.IntentionMatchType,
) (uint64, structs.Intentions, error) {
switch matchType {
case structs.IntentionMatchSource:
return s.readSourceIntentionsFromConfigEntriesTxn(tx, ws, matchEntry.Name, matchEntry.GetEnterpriseMeta())
return readSourceIntentionsFromConfigEntriesTxn(tx, ws, matchEntry.Name, matchEntry.GetEnterpriseMeta())
case structs.IntentionMatchDestination:
return s.readDestinationIntentionsFromConfigEntriesTxn(tx, ws, matchEntry.Name, matchEntry.GetEnterpriseMeta())
return readDestinationIntentionsFromConfigEntriesTxn(tx, ws, matchEntry.Name, matchEntry.GetEnterpriseMeta())
default:
return 0, nil, fmt.Errorf("invalid intention match type: %s", matchType)
}
}
func (s *Store) readSourceIntentionsFromConfigEntriesTxn(tx *txn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, error) {
func readSourceIntentionsFromConfigEntriesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, error) {
idx := maxIndexTxn(tx, configTableName)
var (
@ -248,7 +248,7 @@ func (s *Store) readSourceIntentionsFromConfigEntriesTxn(tx *txn, ws memdb.Watch
names := getIntentionPrecedenceMatchServiceNames(serviceName, entMeta)
for _, sn := range names {
results, err = s.readSourceIntentionsFromConfigEntriesForServiceTxn(
results, err = readSourceIntentionsFromConfigEntriesForServiceTxn(
tx, ws, sn.Name, &sn.EnterpriseMeta, results,
)
if err != nil {
@ -262,7 +262,7 @@ func (s *Store) readSourceIntentionsFromConfigEntriesTxn(tx *txn, ws memdb.Watch
return idx, results, nil
}
func (s *Store) readSourceIntentionsFromConfigEntriesForServiceTxn(tx *txn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta, results structs.Intentions) (structs.Intentions, error) {
func readSourceIntentionsFromConfigEntriesForServiceTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta, results structs.Intentions) (structs.Intentions, error) {
sn := structs.NewServiceName(serviceName, entMeta)
iter, err := tx.Get(configTableName, "intention-source", sn)
@ -283,12 +283,7 @@ func (s *Store) readSourceIntentionsFromConfigEntriesForServiceTxn(tx *txn, ws m
return results, nil
}
func jd(v interface{}) string {
d, _ := json.MarshalIndent(v, "", " ")
return string(d)
}
func (s *Store) readDestinationIntentionsFromConfigEntriesTxn(tx *txn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, error) {
func readDestinationIntentionsFromConfigEntriesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, error) {
idx := maxIndexTxn(tx, configTableName)
var results structs.Intentions

View File

@ -116,7 +116,7 @@ func (s *Store) CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, e
return caConfigTxn(tx, ws)
}
func caConfigTxn(tx *txn, ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) {
func caConfigTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) {
// Get the CA config
ch, c, err := tx.FirstWatch(caConfigTableName, "id")
if err != nil {
@ -236,7 +236,7 @@ func (s *Store) CARoots(ws memdb.WatchSet) (uint64, structs.CARoots, error) {
return caRootsTxn(tx, ws)
}
func caRootsTxn(tx *txn, ws memdb.WatchSet) (uint64, structs.CARoots, error) {
func caRootsTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, structs.CARoots, error) {
// Get the index
idx := maxIndexTxn(tx, caRootTableName)

View File

@ -134,7 +134,7 @@ func (s *Store) FederationStateGet(ws memdb.WatchSet, datacenter string) (uint64
return federationStateGetTxn(tx, ws, datacenter)
}
func federationStateGetTxn(tx *txn, ws memdb.WatchSet, datacenter string) (uint64, *structs.FederationState, error) {
func federationStateGetTxn(tx ReadTxn, ws memdb.WatchSet, datacenter string) (uint64, *structs.FederationState, error) {
// Get the index
idx := maxIndexTxn(tx, federationStateTableName)
@ -164,7 +164,7 @@ func (s *Store) FederationStateList(ws memdb.WatchSet) (uint64, []*structs.Feder
return federationStateListTxn(tx, ws)
}
func federationStateListTxn(tx *txn, ws memdb.WatchSet) (uint64, []*structs.FederationState, error) {
func federationStateListTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []*structs.FederationState, error) {
// Get the index
idx := maxIndexTxn(tx, federationStateTableName)

View File

@ -28,7 +28,7 @@ func NewGraveyard(gc *TombstoneGC) *Graveyard {
}
// InsertTxn adds a new tombstone.
func (g *Graveyard) InsertTxn(tx *txn, key string, idx uint64, entMeta *structs.EnterpriseMeta) error {
func (g *Graveyard) InsertTxn(tx WriteTxn, key string, idx uint64, entMeta *structs.EnterpriseMeta) error {
stone := &Tombstone{
Key: key,
Index: idx,
@ -51,7 +51,7 @@ func (g *Graveyard) InsertTxn(tx *txn, key string, idx uint64, entMeta *structs.
// GetMaxIndexTxn returns the highest index tombstone whose key matches the
// given context, using a prefix match.
func (g *Graveyard) GetMaxIndexTxn(tx *txn, prefix string, entMeta *structs.EnterpriseMeta) (uint64, error) {
func (g *Graveyard) GetMaxIndexTxn(tx ReadTxn, prefix string, entMeta *structs.EnterpriseMeta) (uint64, error) {
stones, err := getWithTxn(tx, "tombstones", "id_prefix", prefix, entMeta)
if err != nil {
return 0, fmt.Errorf("failed querying tombstones: %s", err)
@ -68,7 +68,7 @@ func (g *Graveyard) GetMaxIndexTxn(tx *txn, prefix string, entMeta *structs.Ente
}
// DumpTxn returns all the tombstones.
func (g *Graveyard) DumpTxn(tx *txn) (memdb.ResultIterator, error) {
func (g *Graveyard) DumpTxn(tx ReadTxn) (memdb.ResultIterator, error) {
iter, err := tx.Get("tombstones", "id")
if err != nil {
return nil, err

View File

@ -6,9 +6,7 @@ import (
"fmt"
)
func (g *Graveyard) insertTombstoneWithTxn(tx *txn,
table string, stone *Tombstone, updateMax bool) error {
func (g *Graveyard) insertTombstoneWithTxn(tx WriteTxn, _ string, stone *Tombstone, updateMax bool) error {
if err := tx.Insert("tombstones", stone); err != nil {
return err
}

View File

@ -5,10 +5,11 @@ import (
"fmt"
"sort"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
)
const (
@ -142,7 +143,7 @@ func (s *Store) AreIntentionsInConfigEntries() (bool, error) {
return areIntentionsInConfigEntries(tx)
}
func areIntentionsInConfigEntries(tx *txn) (bool, error) {
func areIntentionsInConfigEntries(tx ReadTxn) (bool, error) {
_, entry, err := systemMetadataGetTxn(tx, nil, structs.SystemMetadataIntentionFormatKey)
if err != nil {
return false, fmt.Errorf("failed system metadatalookup: %s", err)
@ -178,7 +179,7 @@ func (s *Store) Intentions(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (
return s.configIntentionsListTxn(tx, ws, entMeta)
}
func (s *Store) legacyIntentionsListTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, bool, error) {
func (s *Store) legacyIntentionsListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, bool, error) {
// Get the index
idx := maxIndexTxn(tx, intentionsTableName)
if idx < 1 {
@ -230,7 +231,7 @@ func (s *Store) LegacyIntentionSet(idx uint64, ixn *structs.Intention) error {
// legacyIntentionSetTxn is the inner method used to insert an intention with
// the proper indexes into the state store.
func legacyIntentionSetTxn(tx *txn, idx uint64, ixn *structs.Intention) error {
func legacyIntentionSetTxn(tx WriteTxn, idx uint64, ixn *structs.Intention) error {
// ID is required
if ixn.ID == "" {
return ErrMissingIntentionID
@ -301,7 +302,7 @@ func (s *Store) IntentionGet(ws memdb.WatchSet, id string) (uint64, *structs.Ser
return s.configIntentionGetTxn(tx, ws, id)
}
func (s *Store) legacyIntentionGetTxn(tx *txn, ws memdb.WatchSet, id string) (uint64, *structs.Intention, error) {
func (s *Store) legacyIntentionGetTxn(tx ReadTxn, ws memdb.WatchSet, id string) (uint64, *structs.Intention, error) {
// Get the table index.
idx := maxIndexTxn(tx, intentionsTableName)
if idx < 1 {
@ -340,7 +341,7 @@ func (s *Store) IntentionGetExact(ws memdb.WatchSet, args *structs.IntentionQuer
return s.configIntentionGetExactTxn(tx, ws, args)
}
func (s *Store) legacyIntentionGetExactTxn(tx *txn, ws memdb.WatchSet, args *structs.IntentionQueryExact) (uint64, *structs.Intention, error) {
func (s *Store) legacyIntentionGetExactTxn(tx ReadTxn, ws memdb.WatchSet, args *structs.IntentionQueryExact) (uint64, *structs.Intention, error) {
if err := args.Validate(); err != nil {
return 0, nil, err
}
@ -392,7 +393,7 @@ func (s *Store) LegacyIntentionDelete(idx uint64, id string) error {
// legacyIntentionDeleteTxn is the inner method used to delete a legacy intention
// with the proper indexes into the state store.
func legacyIntentionDeleteTxn(tx *txn, idx uint64, queryID string) error {
func legacyIntentionDeleteTxn(tx WriteTxn, idx uint64, queryID string) error {
// Pull the query.
wrapped, err := tx.First(intentionsTableName, "id", queryID)
if err != nil {
@ -531,7 +532,7 @@ func (s *Store) IntentionMatch(ws memdb.WatchSet, args *structs.IntentionQueryMa
return s.configIntentionMatchTxn(tx, ws, args)
}
func (s *Store) legacyIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *structs.IntentionQueryMatch) (uint64, []structs.Intentions, error) {
func (s *Store) legacyIntentionMatchTxn(tx ReadTxn, ws memdb.WatchSet, args *structs.IntentionQueryMatch) (uint64, []structs.Intentions, error) {
// Get the table index.
idx := maxIndexTxn(tx, intentionsTableName)
if idx < 1 {
@ -541,7 +542,7 @@ func (s *Store) legacyIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *struct
// Make all the calls and accumulate the results
results := make([]structs.Intentions, len(args.Entries))
for i, entry := range args.Entries {
ixns, err := s.intentionMatchOneTxn(tx, ws, entry, args.Type)
ixns, err := intentionMatchOneTxn(tx, ws, entry, args.Type)
if err != nil {
return 0, nil, err
}
@ -575,13 +576,13 @@ func (s *Store) IntentionMatchOne(
return 0, nil, err
}
if !usingConfigEntries {
return s.legacyIntentionMatchOneTxn(tx, ws, entry, matchType)
return legacyIntentionMatchOneTxn(tx, ws, entry, matchType)
}
return s.configIntentionMatchOneTxn(tx, ws, entry, matchType)
return configIntentionMatchOneTxn(tx, ws, entry, matchType)
}
func (s *Store) legacyIntentionMatchOneTxn(
tx *txn,
func legacyIntentionMatchOneTxn(
tx ReadTxn,
ws memdb.WatchSet,
entry structs.IntentionMatchEntry,
matchType structs.IntentionMatchType,
@ -592,7 +593,7 @@ func (s *Store) legacyIntentionMatchOneTxn(
idx = 1
}
results, err := s.intentionMatchOneTxn(tx, ws, entry, matchType)
results, err := intentionMatchOneTxn(tx, ws, entry, matchType)
if err != nil {
return 0, nil, err
}
@ -602,7 +603,7 @@ func (s *Store) legacyIntentionMatchOneTxn(
return idx, results, nil
}
func (s *Store) intentionMatchOneTxn(tx ReadTxn, ws memdb.WatchSet,
func intentionMatchOneTxn(tx ReadTxn, ws memdb.WatchSet,
entry structs.IntentionMatchEntry, matchType structs.IntentionMatchType) (structs.Intentions, error) {
// Each search entry may require multiple queries to memdb, so this

View File

@ -7,7 +7,7 @@ import (
memdb "github.com/hashicorp/go-memdb"
)
func intentionListTxn(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func intentionListTxn(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
// Get all intentions
return tx.Get(intentionsTableName, "id")
}

View File

@ -117,7 +117,7 @@ func (s *Store) KVSSet(idx uint64, entry *structs.DirEntry) error {
// If updateSession is true, then the incoming entry will set the new
// session (should be validated before calling this). Otherwise, we will keep
// whatever the existing session is.
func kvsSetTxn(tx *txn, idx uint64, entry *structs.DirEntry, updateSession bool) error {
func kvsSetTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry, updateSession bool) error {
// Retrieve an existing KV pair
existingNode, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta)
if err != nil {
@ -170,7 +170,7 @@ func (s *Store) KVSGet(ws memdb.WatchSet, key string, entMeta *structs.Enterpris
// kvsGetTxn is the inner method that gets a KVS entry inside an existing
// transaction.
func kvsGetTxn(tx *txn,
func kvsGetTxn(tx ReadTxn,
ws memdb.WatchSet, key string, entMeta *structs.EnterpriseMeta) (uint64, *structs.DirEntry, error) {
// Get the table index.
@ -203,7 +203,7 @@ func (s *Store) KVSList(ws memdb.WatchSet,
// kvsListTxn is the inner method that gets a list of KVS entries matching a
// prefix.
func (s *Store) kvsListTxn(tx *txn,
func (s *Store) kvsListTxn(tx ReadTxn,
ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) {
// Get the table indexes.
@ -252,7 +252,7 @@ func (s *Store) KVSDelete(idx uint64, key string, entMeta *structs.EnterpriseMet
// kvsDeleteTxn is the inner method used to perform the actual deletion
// of a key/value pair within an existing transaction.
func (s *Store) kvsDeleteTxn(tx *txn, idx uint64, key string, entMeta *structs.EnterpriseMeta) error {
func (s *Store) kvsDeleteTxn(tx WriteTxn, idx uint64, key string, entMeta *structs.EnterpriseMeta) error {
// Look up the entry in the state store.
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
if err != nil {
@ -289,7 +289,7 @@ func (s *Store) KVSDeleteCAS(idx, cidx uint64, key string, entMeta *structs.Ente
// kvsDeleteCASTxn is the inner method that does a CAS delete within an existing
// transaction.
func (s *Store) kvsDeleteCASTxn(tx *txn, idx, cidx uint64, key string, entMeta *structs.EnterpriseMeta) (bool, error) {
func (s *Store) kvsDeleteCASTxn(tx WriteTxn, idx, cidx uint64, key string, entMeta *structs.EnterpriseMeta) (bool, error) {
// Retrieve the existing kvs entry, if any exists.
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
if err != nil {
@ -330,7 +330,7 @@ func (s *Store) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) {
// kvsSetCASTxn is the inner method used to do a CAS inside an existing
// transaction.
func kvsSetCASTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) {
func kvsSetCASTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Retrieve the existing entry.
existing, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta)
if err != nil {
@ -394,7 +394,7 @@ func (s *Store) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) {
// kvsLockTxn is the inner method that does a lock inside an existing
// transaction.
func kvsLockTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) {
func kvsLockTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Verify that a session is present.
if entry.Session == "" {
return false, fmt.Errorf("missing session")
@ -460,7 +460,7 @@ func (s *Store) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) {
// kvsUnlockTxn is the inner method that does an unlock inside an existing
// transaction.
func kvsUnlockTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) {
func kvsUnlockTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Verify that a session is present.
if entry.Session == "" {
return false, fmt.Errorf("missing session")
@ -498,7 +498,7 @@ func kvsUnlockTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// kvsCheckSessionTxn checks to see if the given session matches the current
// entry for a key.
func kvsCheckSessionTxn(tx *txn,
func kvsCheckSessionTxn(tx WriteTxn,
key string, session string, entMeta *structs.EnterpriseMeta) (*structs.DirEntry, error) {
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
@ -519,7 +519,7 @@ func kvsCheckSessionTxn(tx *txn,
// kvsCheckIndexTxn checks to see if the given modify index matches the current
// entry for a key.
func kvsCheckIndexTxn(tx *txn,
func kvsCheckIndexTxn(tx WriteTxn,
key string, cidx uint64, entMeta *structs.EnterpriseMeta) (*structs.DirEntry, error) {
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)

View File

@ -16,7 +16,7 @@ func kvsIndexer() *memdb.StringFieldIndex {
}
}
func insertKVTxn(tx *txn, entry *structs.DirEntry, updateMax bool) error {
func insertKVTxn(tx WriteTxn, entry *structs.DirEntry, updateMax bool) error {
if err := tx.Insert("kvs", entry); err != nil {
return err
}
@ -33,7 +33,7 @@ func insertKVTxn(tx *txn, entry *structs.DirEntry, updateMax bool) error {
return nil
}
func kvsListEntriesTxn(tx *txn, ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) {
func kvsListEntriesTxn(tx ReadTxn, ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) {
var ents structs.DirEntries
var lindex uint64
@ -56,7 +56,7 @@ func kvsListEntriesTxn(tx *txn, ws memdb.WatchSet, prefix string, entMeta *struc
// kvsDeleteTreeTxn is the inner method that does a recursive delete inside an
// existing transaction.
func (s *Store) kvsDeleteTreeTxn(tx *txn, idx uint64, prefix string, entMeta *structs.EnterpriseMeta) error {
func (s *Store) kvsDeleteTreeTxn(tx WriteTxn, idx uint64, prefix string, entMeta *structs.EnterpriseMeta) error {
// For prefix deletes, only insert one tombstone and delete the entire subtree
deleted, err := tx.DeletePrefix("kvs", "id_prefix", prefix)
if err != nil {
@ -77,11 +77,11 @@ func (s *Store) kvsDeleteTreeTxn(tx *txn, idx uint64, prefix string, entMeta *st
return nil
}
func kvsMaxIndex(tx *txn, entMeta *structs.EnterpriseMeta) uint64 {
func kvsMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "kvs", "tombstones")
}
func kvsDeleteWithEntry(tx *txn, entry *structs.DirEntry, idx uint64) error {
func kvsDeleteWithEntry(tx WriteTxn, entry *structs.DirEntry, idx uint64) error {
// Delete the entry and update the index.
if err := tx.Delete("kvs", entry); err != nil {
return fmt.Errorf("failed deleting kvs entry: %s", err)

View File

@ -13,14 +13,22 @@ type ReadTxn interface {
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
First(table, index string, args ...interface{}) (interface{}, error)
FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error)
}
// AbortTxn is a ReadTxn that can also be aborted to end the transaction.
type AbortTxn interface {
ReadTxn
Abort()
}
// WriteTxn is implemented by memdb.Txn to perform write operations.
type WriteTxn interface {
ReadTxn
Defer(func())
Delete(table string, obj interface{}) error
DeleteAll(table, index string, args ...interface{}) (int, error)
DeletePrefix(table string, index string, prefix string) (bool, error)
Insert(table string, obj interface{}) error
Commit() error
}
// Changes wraps a memdb.Changes to include the index at which these changes
@ -46,20 +54,16 @@ type changeTrackerDB struct {
// with write=true.
//
// Deprecated: use either ReadTxn, or WriteTxn.
func (c *changeTrackerDB) Txn(write bool) *txn {
func (c *changeTrackerDB) Txn(write bool) *memdb.Txn {
if write {
panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)")
}
return c.ReadTxn()
}
// ReadTxn returns a read-only transaction which behaves exactly the same as
// memdb.Txn
//
// TODO: this could return a regular memdb.Txn if all the state functions accepted
// the ReadTxn interface
func (c *changeTrackerDB) ReadTxn() *txn {
return &txn{Txn: c.db.Txn(false)}
// ReadTxn returns a read-only transaction.
func (c *changeTrackerDB) ReadTxn() *memdb.Txn {
return c.db.Txn(false)
}
// WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store.

View File

@ -258,7 +258,7 @@ func (s *Store) PreparedQueryDelete(idx uint64, queryID string) error {
// preparedQueryDeleteTxn is the inner method used to delete a prepared query
// with the proper indexes into the state store.
func preparedQueryDeleteTxn(tx *txn, idx uint64, queryID string) error {
func preparedQueryDeleteTxn(tx WriteTxn, idx uint64, queryID string) error {
// Pull the query.
wrapped, err := tx.First("prepared-queries", "id", queryID)
if err != nil {

View File

@ -301,7 +301,7 @@ func (s *Store) SessionDestroy(idx uint64, sessionID string, entMeta *structs.En
// deleteSessionTxn is the inner method, which is used to do the actual
// session deletion and handle session invalidation, etc.
func (s *Store) deleteSessionTxn(tx *txn, idx uint64, sessionID string, entMeta *structs.EnterpriseMeta) error {
func (s *Store) deleteSessionTxn(tx WriteTxn, idx uint64, sessionID string, entMeta *structs.EnterpriseMeta) error {
// Look up the session.
sess, err := firstWithTxn(tx, "sessions", "id", sessionID, entMeta)
if err != nil {

View File

@ -35,7 +35,7 @@ func nodeChecksIndexer() *memdb.CompoundIndex {
}
}
func sessionDeleteWithSession(tx *txn, session *structs.Session, idx uint64) error {
func sessionDeleteWithSession(tx WriteTxn, session *structs.Session, idx uint64) error {
if err := tx.Delete("sessions", session); err != nil {
return fmt.Errorf("failed deleting session: %s", err)
}
@ -80,11 +80,11 @@ func insertSessionTxn(tx *txn, session *structs.Session, idx uint64, updateMax b
return nil
}
func allNodeSessionsTxn(tx *txn, node string) (structs.Sessions, error) {
func allNodeSessionsTxn(tx ReadTxn, node string) (structs.Sessions, error) {
return nodeSessionsTxn(tx, nil, node, nil)
}
func nodeSessionsTxn(tx *txn,
func nodeSessionsTxn(tx ReadTxn,
ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (structs.Sessions, error) {
sessions, err := tx.Get("sessions", "node", node)
@ -100,7 +100,7 @@ func nodeSessionsTxn(tx *txn,
return result, nil
}
func sessionMaxIndex(tx *txn, entMeta *structs.EnterpriseMeta) uint64 {
func sessionMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "sessions")
}

View File

@ -122,7 +122,7 @@ type Store struct {
// works by starting a read transaction against the whole state store.
type Snapshot struct {
store *Store
tx *txn
tx AbortTxn
lastIndex uint64
}
@ -288,7 +288,7 @@ func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 {
// indexUpdateMaxTxn is used when restoring entries and sets the table's index to
// the given idx only if it's greater than the current index.
func indexUpdateMaxTxn(tx *txn, idx uint64, table string) error {
func indexUpdateMaxTxn(tx WriteTxn, idx uint64, table string) error {
ti, err := tx.First("index", "id", table)
if err != nil {
return fmt.Errorf("failed to retrieve existing index: %s", err)

View File

@ -8,7 +8,7 @@ import (
)
// txnKVS handles all KV-related operations.
func (s *Store) txnKVS(tx *txn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) {
func (s *Store) txnKVS(tx WriteTxn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) {
var entry *structs.DirEntry
var err error
@ -110,7 +110,7 @@ func (s *Store) txnKVS(tx *txn, idx uint64, op *structs.TxnKVOp) (structs.TxnRes
}
// txnSession handles all Session-related operations.
func txnSession(tx *txn, idx uint64, op *structs.TxnSessionOp) error {
func txnSession(tx WriteTxn, idx uint64, op *structs.TxnSessionOp) error {
var err error
switch op.Verb {
@ -129,7 +129,7 @@ func txnSession(tx *txn, idx uint64, op *structs.TxnSessionOp) error {
// txnLegacyIntention handles all Intention-related operations.
//
// Deprecated: see TxnOp.Intention description
func txnLegacyIntention(tx *txn, idx uint64, op *structs.TxnIntentionOp) error {
func txnLegacyIntention(tx WriteTxn, idx uint64, op *structs.TxnIntentionOp) error {
switch op.Op {
case structs.IntentionOpCreate, structs.IntentionOpUpdate:
return legacyIntentionSetTxn(tx, idx, op.Intention)
@ -145,7 +145,7 @@ func txnLegacyIntention(tx *txn, idx uint64, op *structs.TxnIntentionOp) error {
}
// txnNode handles all Node-related operations.
func (s *Store) txnNode(tx *txn, idx uint64, op *structs.TxnNodeOp) (structs.TxnResults, error) {
func (s *Store) txnNode(tx WriteTxn, idx uint64, op *structs.TxnNodeOp) (structs.TxnResults, error) {
var entry *structs.Node
var err error
@ -214,7 +214,7 @@ func (s *Store) txnNode(tx *txn, idx uint64, op *structs.TxnNodeOp) (structs.Txn
}
// txnService handles all Service-related operations.
func (s *Store) txnService(tx *txn, idx uint64, op *structs.TxnServiceOp) (structs.TxnResults, error) {
func (s *Store) txnService(tx WriteTxn, idx uint64, op *structs.TxnServiceOp) (structs.TxnResults, error) {
switch op.Verb {
case api.ServiceGet:
entry, err := getNodeServiceTxn(tx, op.Node, op.Service.ID, &op.Service.EnterpriseMeta)
@ -276,7 +276,7 @@ func newTxnResultFromNodeServiceEntry(entry *structs.NodeService) structs.TxnRes
}
// txnCheck handles all Check-related operations.
func (s *Store) txnCheck(tx *txn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) {
func (s *Store) txnCheck(tx WriteTxn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) {
var entry *structs.HealthCheck
var err error
@ -338,7 +338,7 @@ func (s *Store) txnCheck(tx *txn, idx uint64, op *structs.TxnCheckOp) (structs.T
}
// txnDispatch runs the given operations inside the state store transaction.
func (s *Store) txnDispatch(tx *txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
func (s *Store) txnDispatch(tx WriteTxn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
results := make(structs.TxnResults, 0, len(ops))
errors := make(structs.TxnErrors, 0, len(ops))
for i, op := range ops {