From f9b283417105720491b97e316397908425a834f5 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 3 Sep 2020 19:38:03 -0400 Subject: [PATCH] state: convert the remaining functions to ReadTxn Required also converting some of the transaction functions to WriteTxn because TxnRO() called the same helper as TxnRW. This change allows us to return a memdb.Txn for read-only txn instead of wrapping them with state.txn. --- agent/consul/state/acl.go | 8 +- agent/consul/state/acl_oss.go | 30 ++--- agent/consul/state/catalog.go | 112 +++++++++---------- agent/consul/state/catalog_oss.go | 16 +-- agent/consul/state/catalog_test.go | 12 +- agent/consul/state/config_entry_intention.go | 35 +++--- agent/consul/state/connect_ca.go | 4 +- agent/consul/state/federation_state.go | 4 +- agent/consul/state/graveyard.go | 6 +- agent/consul/state/graveyard_oss.go | 4 +- agent/consul/state/intention.go | 31 ++--- agent/consul/state/intention_oss.go | 2 +- agent/consul/state/kvs.go | 20 ++-- agent/consul/state/kvs_oss.go | 10 +- agent/consul/state/memdb.go | 22 ++-- agent/consul/state/prepared_query.go | 2 +- agent/consul/state/session.go | 2 +- agent/consul/state/session_oss.go | 8 +- agent/consul/state/state_store.go | 4 +- agent/consul/state/txn.go | 14 +-- 20 files changed, 171 insertions(+), 175 deletions(-) diff --git a/agent/consul/state/acl.go b/agent/consul/state/acl.go index 1613e753a..61f0b54ba 100644 --- a/agent/consul/state/acl.go +++ b/agent/consul/state/acl.go @@ -434,7 +434,7 @@ func resolveTokenPolicyLinks(tx *txn, token *structs.ACLToken, allowMissing bool // stale when a linked policy was deleted or renamed. This will correct them and generate a newly allocated // token only when fixes are needed. If the policy links are still accurate then we just return the original // token. -func fixupTokenPolicyLinks(tx *txn, original *structs.ACLToken) (*structs.ACLToken, error) { +func fixupTokenPolicyLinks(tx ReadTxn, original *structs.ACLToken) (*structs.ACLToken, error) { owned := false token := original @@ -508,7 +508,7 @@ func resolveTokenRoleLinks(tx *txn, token *structs.ACLToken, allowMissing bool) // stale when a linked role was deleted or renamed. This will correct them and generate a newly allocated // token only when fixes are needed. If the role links are still accurate then we just return the original // token. -func fixupTokenRoleLinks(tx *txn, original *structs.ACLToken) (*structs.ACLToken, error) { +func fixupTokenRoleLinks(tx ReadTxn, original *structs.ACLToken) (*structs.ACLToken, error) { owned := false token := original @@ -824,7 +824,7 @@ func (s *Store) ACLTokenBatchGet(ws memdb.WatchSet, accessors []string) (uint64, return idx, tokens, nil } -func aclTokenGetTxn(tx *txn, ws memdb.WatchSet, value, index string, entMeta *structs.EnterpriseMeta) (*structs.ACLToken, error) { +func aclTokenGetTxn(tx ReadTxn, ws memdb.WatchSet, value, index string, entMeta *structs.EnterpriseMeta) (*structs.ACLToken, error) { watchCh, rawToken, err := aclTokenGetFromIndex(tx, value, index, entMeta) if err != nil { return nil, fmt.Errorf("failed acl token lookup: %v", err) @@ -1794,7 +1794,7 @@ func (s *Store) aclAuthMethodGet(ws memdb.WatchSet, name string, entMeta *struct return idx, method, nil } -func getAuthMethodWithTxn(tx *txn, ws memdb.WatchSet, name string, entMeta *structs.EnterpriseMeta) (*structs.ACLAuthMethod, error) { +func getAuthMethodWithTxn(tx ReadTxn, ws memdb.WatchSet, name string, entMeta *structs.EnterpriseMeta) (*structs.ACLAuthMethod, error) { watchCh, rawMethod, err := aclAuthMethodGetByName(tx, name, entMeta) if err != nil { return nil, fmt.Errorf("failed acl auth method lookup: %v", err) diff --git a/agent/consul/state/acl_oss.go b/agent/consul/state/acl_oss.go index 6826d0ac1..36e907b1a 100644 --- a/agent/consul/state/acl_oss.go +++ b/agent/consul/state/acl_oss.go @@ -243,7 +243,7 @@ func aclPolicyDeleteWithPolicy(tx *txn, policy *structs.ACLPolicy, idx uint64) e return nil } -func aclPolicyMaxIndex(tx *txn, _ *structs.ACLPolicy, _ *structs.EnterpriseMeta) uint64 { +func aclPolicyMaxIndex(tx ReadTxn, _ *structs.ACLPolicy, _ *structs.EnterpriseMeta) uint64 { return maxIndexTxn(tx, "acl-policies") } @@ -273,19 +273,19 @@ func aclTokenInsert(tx *txn, token *structs.ACLToken) error { return nil } -func aclTokenGetFromIndex(tx *txn, id string, index string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { +func aclTokenGetFromIndex(tx ReadTxn, id string, index string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { return tx.FirstWatch("acl-tokens", index, id) } -func aclTokenListAll(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListAll(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "id") } -func aclTokenListLocal(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListLocal(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "local", true) } -func aclTokenListGlobal(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListGlobal(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "local", false) } @@ -297,7 +297,7 @@ func aclTokenListByRole(tx ReadTxn, role string, _ *structs.EnterpriseMeta) (mem return tx.Get("acl-tokens", "roles", role) } -func aclTokenListByAuthMethod(tx *txn, authMethod string, _, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListByAuthMethod(tx ReadTxn, authMethod string, _, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "authmethod", authMethod) } @@ -314,7 +314,7 @@ func aclTokenDeleteWithToken(tx *txn, token *structs.ACLToken, idx uint64) error return nil } -func aclTokenMaxIndex(tx *txn, _ *structs.ACLToken, entMeta *structs.EnterpriseMeta) uint64 { +func aclTokenMaxIndex(tx ReadTxn, _ *structs.ACLToken, entMeta *structs.EnterpriseMeta) uint64 { return maxIndexTxn(tx, "acl-tokens") } @@ -372,7 +372,7 @@ func aclRoleDeleteWithRole(tx *txn, role *structs.ACLRole, idx uint64) error { return nil } -func aclRoleMaxIndex(tx *txn, _ *structs.ACLRole, _ *structs.EnterpriseMeta) uint64 { +func aclRoleMaxIndex(tx ReadTxn, _ *structs.ACLRole, _ *structs.EnterpriseMeta) uint64 { return maxIndexTxn(tx, "acl-roles") } @@ -402,15 +402,15 @@ func aclBindingRuleInsert(tx *txn, rule *structs.ACLBindingRule) error { return nil } -func aclBindingRuleGetByID(tx *txn, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { +func aclBindingRuleGetByID(tx ReadTxn, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { return tx.FirstWatch("acl-binding-rules", "id", id) } -func aclBindingRuleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclBindingRuleList(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-binding-rules", "id") } -func aclBindingRuleListByAuthMethod(tx *txn, method string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclBindingRuleListByAuthMethod(tx ReadTxn, method string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-binding-rules", "authmethod", method) } @@ -427,7 +427,7 @@ func aclBindingRuleDeleteWithRule(tx *txn, rule *structs.ACLBindingRule, idx uin return nil } -func aclBindingRuleMaxIndex(tx *txn, _ *structs.ACLBindingRule, entMeta *structs.EnterpriseMeta) uint64 { +func aclBindingRuleMaxIndex(tx ReadTxn, _ *structs.ACLBindingRule, entMeta *structs.EnterpriseMeta) uint64 { return maxIndexTxn(tx, "acl-binding-rules") } @@ -457,11 +457,11 @@ func aclAuthMethodInsert(tx *txn, method *structs.ACLAuthMethod) error { return nil } -func aclAuthMethodGetByName(tx *txn, method string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { +func aclAuthMethodGetByName(tx ReadTxn, method string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { return tx.FirstWatch("acl-auth-methods", "id", method) } -func aclAuthMethodList(tx *txn, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclAuthMethodList(tx ReadTxn, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-auth-methods", "id") } @@ -478,7 +478,7 @@ func aclAuthMethodDeleteWithMethod(tx *txn, method *structs.ACLAuthMethod, idx u return nil } -func aclAuthMethodMaxIndex(tx *txn, _ *structs.ACLAuthMethod, entMeta *structs.EnterpriseMeta) uint64 { +func aclAuthMethodMaxIndex(tx ReadTxn, _ *structs.ACLAuthMethod, entMeta *structs.EnterpriseMeta) uint64 { return maxIndexTxn(tx, "acl-auth-methods") } diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 8230f100d..a6ba6195f 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -6,15 +6,16 @@ import ( "reflect" "strings" + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-uuid" + "github.com/mitchellh/copystructure" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/types" - memdb "github.com/hashicorp/go-memdb" - "github.com/hashicorp/go-uuid" - "github.com/mitchellh/copystructure" ) const ( @@ -262,10 +263,7 @@ func (s *Snapshot) Checks(node string) (memdb.ResultIterator, error) { // performed within a single transaction to avoid race conditions on state // updates. func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error { - if err := s.store.ensureRegistrationTxn(s.tx, idx, true, req); err != nil { - return err - } - return nil + return s.store.ensureRegistrationTxn(s.tx, idx, true, req) } // EnsureRegistration is used to make sure a node, service, and check @@ -282,7 +280,7 @@ func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) err return tx.Commit() } -func (s *Store) ensureCheckIfNodeMatches(tx *txn, idx uint64, preserveIndexes bool, node string, check *structs.HealthCheck) error { +func (s *Store) ensureCheckIfNodeMatches(tx WriteTxn, idx uint64, preserveIndexes bool, node string, check *structs.HealthCheck) error { if check.Node != node { return fmt.Errorf("check node %q does not match node %q", check.Node, node) @@ -296,7 +294,7 @@ func (s *Store) ensureCheckIfNodeMatches(tx *txn, idx uint64, preserveIndexes bo // ensureRegistrationTxn is used to make sure a node, service, and check // registration is performed within a single transaction to avoid race // conditions on state updates. -func (s *Store) ensureRegistrationTxn(tx *txn, idx uint64, preserveIndexes bool, req *structs.RegisterRequest) error { +func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes bool, req *structs.RegisterRequest) error { if _, err := validateRegisterRequestTxn(tx, req); err != nil { return err } @@ -378,7 +376,7 @@ func (s *Store) EnsureNode(idx uint64, node *structs.Node) error { // ensureNoNodeWithSimilarNameTxn checks that no other node has conflict in its name // If allowClashWithoutID then, getting a conflict on another node without ID will be allowed -func ensureNoNodeWithSimilarNameTxn(tx *txn, node *structs.Node, allowClashWithoutID bool) error { +func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWithoutID bool) error { // Retrieve all of the nodes enodes, err := tx.Get("nodes", "id") if err != nil { @@ -414,7 +412,7 @@ func ensureNoNodeWithSimilarNameTxn(tx *txn, node *structs.Node, allowClashWitho // ensureNodeCASTxn updates a node only if the existing index matches the given index. // Returns a bool indicating if a write happened and any error. -func (s *Store) ensureNodeCASTxn(tx *txn, idx uint64, node *structs.Node) (bool, error) { +func (s *Store) ensureNodeCASTxn(tx WriteTxn, idx uint64, node *structs.Node) (bool, error) { // Retrieve the existing entry. existing, err := getNodeTxn(tx, node.Node) if err != nil { @@ -444,7 +442,7 @@ func (s *Store) ensureNodeCASTxn(tx *txn, idx uint64, node *structs.Node) (bool, // ensureNodeTxn is the inner function called to actually create a node // registration or modify an existing one in the state store. It allows // passing in a memdb transaction so it may be part of a larger txn. -func (s *Store) ensureNodeTxn(tx *txn, idx uint64, preserveIndexes bool, node *structs.Node) error { +func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, node *structs.Node) error { // See if there's an existing node with this UUID, and make sure the // name is the same. var n *structs.Node @@ -546,7 +544,7 @@ func (s *Store) GetNode(id string) (uint64, *structs.Node, error) { return idx, node, nil } -func getNodeTxn(tx *txn, nodeName string) (*structs.Node, error) { +func getNodeTxn(tx ReadTxn, nodeName string) (*structs.Node, error) { node, err := tx.First("nodes", "id", nodeName) if err != nil { return nil, fmt.Errorf("node lookup failed: %s", err) @@ -557,7 +555,7 @@ func getNodeTxn(tx *txn, nodeName string) (*structs.Node, error) { return nil, nil } -func getNodeIDTxn(tx *txn, id types.NodeID) (*structs.Node, error) { +func getNodeIDTxn(tx ReadTxn, id types.NodeID) (*structs.Node, error) { strnode := string(id) uuidValue, err := uuid.ParseUUID(strnode) if err != nil { @@ -657,7 +655,7 @@ func (s *Store) DeleteNode(idx uint64, nodeName string) error { // deleteNodeCASTxn is used to try doing a node delete operation with a given // raft index. If the CAS index specified is not equal to the last observed index for // the given check, then the call is a noop, otherwise a normal check delete is invoked. -func (s *Store) deleteNodeCASTxn(tx *txn, idx, cidx uint64, nodeName string) (bool, error) { +func (s *Store) deleteNodeCASTxn(tx WriteTxn, idx, cidx uint64, nodeName string) (bool, error) { // Look up the node. node, err := getNodeTxn(tx, nodeName) if err != nil { @@ -684,7 +682,7 @@ func (s *Store) deleteNodeCASTxn(tx *txn, idx, cidx uint64, nodeName string) (bo // deleteNodeTxn is the inner method used for removing a node from // the store within a given transaction. -func (s *Store) deleteNodeTxn(tx *txn, idx uint64, nodeName string) error { +func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error { // Look up the node. node, err := tx.First("nodes", "id", nodeName) if err != nil { @@ -791,7 +789,7 @@ var errCASCompareFailed = errors.New("compare-and-set: comparison failed") // ensureServiceCASTxn updates a service only if the existing index matches the given index. // Returns an error if the write didn't happen and nil if write was successful. -func ensureServiceCASTxn(tx *txn, idx uint64, node string, svc *structs.NodeService) error { +func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.NodeService) error { // Retrieve the existing service. _, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID) if err != nil { @@ -816,7 +814,7 @@ func ensureServiceCASTxn(tx *txn, idx uint64, node string, svc *structs.NodeServ // ensureServiceTxn is used to upsert a service registration within an // existing memdb transaction. -func ensureServiceTxn(tx *txn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error { +func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error { // Check for existing service _, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID) if err != nil { @@ -921,7 +919,7 @@ func (s *Store) ServiceList(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) return serviceListTxn(tx, ws, entMeta) } -func serviceListTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) { +func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) { idx := catalogServicesMaxIndex(tx, entMeta) services, err := catalogServiceList(tx, entMeta, true) @@ -1025,7 +1023,7 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, // * return when the last instance of a service is removed // * block until an instance for this service is available, or another // service is unregistered. -func maxIndexForService(tx *txn, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) uint64 { +func maxIndexForService(tx ReadTxn, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) uint64 { idx, _ := maxIndexAndWatchChForService(tx, serviceName, serviceExists, checks, entMeta) return idx } @@ -1044,7 +1042,7 @@ func maxIndexForService(tx *txn, serviceName string, serviceExists, checks bool, // returned for the chan. This allows for blocking watchers to _only_ watch this // one chan in the common case, falling back to watching all touched MemDB // indexes in more complicated cases. -func maxIndexAndWatchChForService(tx *txn, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) (uint64, <-chan struct{}) { +func maxIndexAndWatchChForService(tx ReadTxn, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) (uint64, <-chan struct{}) { if !serviceExists { res, err := catalogServiceLastExtinctionIndex(tx, entMeta) if missingIdx, ok := res.(*IndexEntry); ok && err == nil { @@ -1061,7 +1059,7 @@ func maxIndexAndWatchChForService(tx *txn, serviceName string, serviceExists, ch } // Wrapper for maxIndexAndWatchChForService that operates on a list of ServiceNodes -func maxIndexAndWatchChsForServiceNodes(tx *txn, +func maxIndexAndWatchChsForServiceNodes(tx ReadTxn, nodes structs.ServiceNodes, watchChecks bool) (uint64, []<-chan struct{}) { var watchChans []<-chan struct{} @@ -1268,7 +1266,7 @@ func (s *Store) ServiceAddressNodes(ws memdb.WatchSet, address string, entMeta * // parseServiceNodes iterates over a services query and fills in the node details, // returning a ServiceNodes slice. -func parseServiceNodes(tx *txn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) { +func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) { // We don't want to track an unlimited number of nodes, so we pull a // top-level watch to use as a fallback. allNodes, err := tx.Get("nodes", "id") @@ -1325,7 +1323,7 @@ func (s *Store) NodeService(nodeName string, serviceID string, entMeta *structs. return idx, service, nil } -func getNodeServiceTxn(tx *txn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) { +func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) { // Query the service _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID) if err != nil { @@ -1467,7 +1465,7 @@ func (s *Store) DeleteService(idx uint64, nodeName, serviceID string, entMeta *s // deleteServiceCASTxn is used to try doing a service delete operation with a given // raft index. If the CAS index specified is not equal to the last observed index for // the given service, then the call is a noop, otherwise a normal delete is invoked. -func (s *Store) deleteServiceCASTxn(tx *txn, idx, cidx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (bool, error) { +func (s *Store) deleteServiceCASTxn(tx WriteTxn, idx, cidx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (bool, error) { // Look up the service. service, err := getNodeServiceTxn(tx, nodeName, serviceID, entMeta) if err != nil { @@ -1494,7 +1492,7 @@ func (s *Store) deleteServiceCASTxn(tx *txn, idx, cidx uint64, nodeName, service // deleteServiceTxn is the inner method called to remove a service // registration within an existing transaction. -func (s *Store) deleteServiceTxn(tx *txn, idx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) error { +func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) error { // Look up the service. _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID) if err != nil { @@ -1589,7 +1587,7 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { } // updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node -func updateAllServiceIndexesOfNode(tx *txn, idx uint64, nodeID string) error { +func updateAllServiceIndexesOfNode(tx WriteTxn, idx uint64, nodeID string) error { services, err := tx.Get("services", "node", nodeID) if err != nil { return fmt.Errorf("failed updating services for node %s: %s", nodeID, err) @@ -1608,7 +1606,7 @@ func updateAllServiceIndexesOfNode(tx *txn, idx uint64, nodeID string) error { // ensureCheckCASTxn updates a check only if the existing index matches the given index. // Returns a bool indicating if a write happened and any error. -func (s *Store) ensureCheckCASTxn(tx *txn, idx uint64, hc *structs.HealthCheck) (bool, error) { +func (s *Store) ensureCheckCASTxn(tx WriteTxn, idx uint64, hc *structs.HealthCheck) (bool, error) { // Retrieve the existing entry. _, existing, err := getNodeCheckTxn(tx, hc.Node, hc.CheckID, &hc.EnterpriseMeta) if err != nil { @@ -1638,7 +1636,7 @@ func (s *Store) ensureCheckCASTxn(tx *txn, idx uint64, hc *structs.HealthCheck) // ensureCheckTxn is used as the inner method to handle inserting // a health check into the state store. It ensures safety against inserting // checks with no matching node or service. -func (s *Store) ensureCheckTxn(tx *txn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error { +func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error { // Check if we have an existing health check _, existing, err := firstWatchCompoundWithTxn(tx, "checks", "id", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID)) if err != nil { @@ -1743,7 +1741,7 @@ func (s *Store) NodeCheck(nodeName string, checkID types.CheckID, entMeta *struc // nodeCheckTxn is used as the inner method to handle reading a health check // from the state store. -func getNodeCheckTxn(tx *txn, nodeName string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (uint64, *structs.HealthCheck, error) { +func getNodeCheckTxn(tx ReadTxn, nodeName string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (uint64, *structs.HealthCheck, error) { // Get the table index. idx := catalogChecksMaxIndex(tx, entMeta) @@ -1859,7 +1857,7 @@ func (s *Store) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, filters return parseChecksByNodeMeta(tx, ws, idx, iter, filters) } -func checksInStateTxn(tx *txn, ws memdb.WatchSet, state string, entMeta *structs.EnterpriseMeta) (uint64, memdb.ResultIterator, error) { +func checksInStateTxn(tx ReadTxn, ws memdb.WatchSet, state string, entMeta *structs.EnterpriseMeta) (uint64, memdb.ResultIterator, error) { // Get the table index. idx := catalogChecksMaxIndex(tx, entMeta) @@ -1881,7 +1879,7 @@ func checksInStateTxn(tx *txn, ws memdb.WatchSet, state string, entMeta *structs // parseChecksByNodeMeta is a helper function used to deduplicate some // repetitive code for returning health checks filtered by node metadata fields. -func parseChecksByNodeMeta(tx *txn, ws memdb.WatchSet, +func parseChecksByNodeMeta(tx ReadTxn, ws memdb.WatchSet, idx uint64, iter memdb.ResultIterator, filters map[string]string) (uint64, structs.HealthChecks, error) { // We don't want to track an unlimited number of nodes, so we pull a @@ -1930,7 +1928,7 @@ func (s *Store) DeleteCheck(idx uint64, node string, checkID types.CheckID, entM // deleteCheckCASTxn is used to try doing a check delete operation with a given // raft index. If the CAS index specified is not equal to the last observed index for // the given check, then the call is a noop, otherwise a normal check delete is invoked. -func (s *Store) deleteCheckCASTxn(tx *txn, idx, cidx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (bool, error) { +func (s *Store) deleteCheckCASTxn(tx WriteTxn, idx, cidx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (bool, error) { // Try to retrieve the existing health check. _, hc, err := getNodeCheckTxn(tx, node, checkID, entMeta) if err != nil { @@ -1957,7 +1955,7 @@ func (s *Store) deleteCheckCASTxn(tx *txn, idx, cidx uint64, node string, checkI // deleteCheckTxn is the inner method used to call a health // check deletion within an existing transaction. -func (s *Store) deleteCheckTxn(tx *txn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error { +func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error { // Try to retrieve the existing health check. _, hc, err := firstWatchCompoundWithTxn(tx, "checks", "id", entMeta, node, string(checkID)) if err != nil { @@ -2101,7 +2099,7 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect return checkServiceNodesTxn(tx, ws, serviceName, connect, entMeta) } -func checkServiceNodesTxn(tx *txn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { +func checkServiceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { // Function for lookup index := "service" if connect { @@ -2279,7 +2277,7 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru // and query for an associated node and a set of checks. This is the inner // method used to return a rich set of results from a more simple query. func parseCheckServiceNodes( - tx *txn, ws memdb.WatchSet, idx uint64, + tx ReadTxn, ws memdb.WatchSet, idx uint64, services structs.ServiceNodes, err error) (uint64, structs.CheckServiceNodes, error) { if err != nil { @@ -2404,7 +2402,7 @@ func (s *Store) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind } } -func serviceDumpAllTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { +func serviceDumpAllTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { // Get the table index idx := catalogMaxIndexWatch(tx, ws, entMeta, true) @@ -2422,7 +2420,7 @@ func serviceDumpAllTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMe return parseCheckServiceNodes(tx, nil, idx, results, err) } -func serviceDumpKindTxn(tx *txn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { +func serviceDumpKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { // unlike when we are dumping all services here we only need to watch the kind specific index entry for changing (or nodes, checks) // updating any services, nodes or checks will bump the appropriate service kind index so there is no need to watch any of the individual // entries @@ -2446,7 +2444,7 @@ func serviceDumpKindTxn(tx *txn, ws memdb.WatchSet, kind structs.ServiceKind, en // parseNodes takes an iterator over a set of nodes and returns a struct // containing the nodes along with all of their associated services // and/or health checks. -func parseNodes(tx *txn, ws memdb.WatchSet, idx uint64, +func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64, iter memdb.ResultIterator, entMeta *structs.EnterpriseMeta) (uint64, structs.NodeDump, error) { // We don't want to track an unlimited number of services, so we pull a @@ -2506,7 +2504,7 @@ func parseNodes(tx *txn, ws memdb.WatchSet, idx uint64, } // checkSessionsTxn returns the IDs of all sessions associated with a health check -func checkSessionsTxn(tx *txn, hc *structs.HealthCheck) ([]*sessionCheck, error) { +func checkSessionsTxn(tx ReadTxn, hc *structs.HealthCheck) ([]*sessionCheck, error) { mappings, err := getCompoundWithTxn(tx, "session_checks", "node_check", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID)) if err != nil { return nil, fmt.Errorf("failed session checks lookup: %s", err) @@ -2520,7 +2518,7 @@ func checkSessionsTxn(tx *txn, hc *structs.HealthCheck) ([]*sessionCheck, error) } // updateGatewayServices associates services with gateways as specified in a gateway config entry -func updateGatewayServices(tx *txn, idx uint64, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) error { +func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) error { var ( noChange bool gatewayServices structs.GatewayServices @@ -2582,7 +2580,7 @@ func updateGatewayServices(tx *txn, idx uint64, conf structs.ConfigEntry, entMet // insertion into the memdb table, specific to ingress gateways. The boolean // returned indicates that there are no changes necessary to the memdb table. func ingressConfigGatewayServices( - tx *txn, + tx ReadTxn, gateway structs.ServiceName, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta, @@ -2627,7 +2625,7 @@ func ingressConfigGatewayServices( // boolean returned indicates that there are no changes necessary to the memdb // table. func terminatingConfigGatewayServices( - tx *txn, + tx ReadTxn, gateway structs.ServiceName, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta, @@ -2667,7 +2665,7 @@ func terminatingConfigGatewayServices( } // updateGatewayNamespace is used to target all services within a namespace -func updateGatewayNamespace(tx *txn, idx uint64, service *structs.GatewayService, entMeta *structs.EnterpriseMeta) error { +func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewayService, entMeta *structs.EnterpriseMeta) error { services, err := catalogServiceListByKind(tx, structs.ServiceKindTypical, entMeta) if err != nil { return fmt.Errorf("failed querying services: %s", err) @@ -2714,7 +2712,7 @@ func updateGatewayNamespace(tx *txn, idx uint64, service *structs.GatewayService // updateGatewayService associates services with gateways after an eligible event // ie. Registering a service in a namespace targeted by a gateway -func updateGatewayService(tx *txn, idx uint64, mapping *structs.GatewayService) error { +func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error { // Check if mapping already exists in table if it's already in the table // Avoid insert if nothing changed existing, err := tx.First(gatewayServicesTableName, "id", mapping.Gateway, mapping.Service, mapping.Port) @@ -2749,7 +2747,7 @@ func updateGatewayService(tx *txn, idx uint64, mapping *structs.GatewayService) // checkWildcardForGatewaysAndUpdate checks whether a service matches a // wildcard definition in gateway config entries and if so adds it the the // gateway-services table. -func checkGatewayWildcardsAndUpdate(tx *txn, idx uint64, svc *structs.NodeService) error { +func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.NodeService) error { // Do not associate non-typical services with gateways or consul services if svc.Kind != structs.ServiceKindTypical || svc.Service == "consul" { return nil @@ -2776,7 +2774,7 @@ func checkGatewayWildcardsAndUpdate(tx *txn, idx uint64, svc *structs.NodeServic return nil } -func cleanupGatewayWildcards(tx *txn, idx uint64, svc *structs.ServiceNode) error { +func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) error { // Clean up association between service name and gateways if needed gateways, err := serviceGateways(tx, svc.ServiceName, &svc.EnterpriseMeta) if err != nil { @@ -2805,11 +2803,11 @@ func cleanupGatewayWildcards(tx *txn, idx uint64, svc *structs.ServiceNode) erro // serviceGateways returns all GatewayService entries with the given service name. This effectively looks up // all the gateways mapped to this service. -func serviceGateways(tx *txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func serviceGateways(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get(gatewayServicesTableName, "service", structs.NewServiceName(name, entMeta)) } -func gatewayServices(tx *txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func gatewayServices(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get(gatewayServicesTableName, "gateway", structs.NewServiceName(name, entMeta)) } @@ -2832,7 +2830,7 @@ func (s *Store) DumpGatewayServices(ws memdb.WatchSet) (uint64, structs.GatewayS return lib.MaxUint64(maxIdx, idx), results, nil } -func (s *Store) collectGatewayServices(tx *txn, ws memdb.WatchSet, iter memdb.ResultIterator) (uint64, structs.GatewayServices, error) { +func (s *Store) collectGatewayServices(tx ReadTxn, ws memdb.WatchSet, iter memdb.ResultIterator) (uint64, structs.GatewayServices, error) { var maxIdx uint64 var results structs.GatewayServices @@ -2858,7 +2856,7 @@ func (s *Store) collectGatewayServices(tx *txn, ws memdb.WatchSet, iter memdb.Re // TODO(ingress): How to handle index rolling back when a config entry is // deleted that references a service? // We might need something like the service_last_extinction index? -func serviceGatewayNodes(tx *txn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { +func serviceGatewayNodes(tx ReadTxn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { // Look up gateway name associated with the service gws, err := serviceGateways(tx, service, entMeta) if err != nil { @@ -3060,7 +3058,7 @@ func (s *Store) ServiceTopology( // combinedServiceNodesTxn returns typical and connect endpoints for a list of services. // This enabled aggregating checks statuses across both. -func (s *Store) combinedServiceNodesTxn(tx *txn, ws memdb.WatchSet, names []structs.ServiceName) (uint64, structs.CheckServiceNodes, error) { +func (s *Store) combinedServiceNodesTxn(tx ReadTxn, ws memdb.WatchSet, names []structs.ServiceName) (uint64, structs.CheckServiceNodes, error) { var ( maxIdx uint64 resp structs.CheckServiceNodes @@ -3177,7 +3175,7 @@ func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.Se } // updateMeshTopology creates associations between the input service and its upstreams in the topology table -func updateMeshTopology(tx *txn, idx uint64, node string, svc *structs.NodeService, existing interface{}) error { +func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeService, existing interface{}) error { oldUpstreams := make(map[structs.ServiceName]bool) if e, ok := existing.(*structs.ServiceNode); ok { for _, u := range e.ServiceProxy.Upstreams { @@ -3257,7 +3255,7 @@ func updateMeshTopology(tx *txn, idx uint64, node string, svc *structs.NodeServi // cleanupMeshTopology removes a service from the mesh topology table // This is only safe to call when there are no more known instances of this proxy -func cleanupMeshTopology(tx *txn, idx uint64, service *structs.ServiceNode) error { +func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode) error { if service.ServiceKind != structs.ServiceKindConnectProxy { return nil } @@ -3294,7 +3292,7 @@ func cleanupMeshTopology(tx *txn, idx uint64, service *structs.ServiceNode) erro return nil } -func insertGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.GatewayService) error { +func insertGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.GatewayService) error { // Only ingress gateways are standalone items in the mesh topology viz if gs.GatewayKind != structs.ServiceKindIngressGateway || gs.Service.Name == structs.WildcardSpecifier { return nil @@ -3315,7 +3313,7 @@ func insertGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.Gatewa return nil } -func deleteGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.GatewayService) error { +func deleteGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.GatewayService) error { // Only ingress gateways are standalone items in the mesh topology viz if gs.GatewayKind != structs.ServiceKindIngressGateway { return nil @@ -3331,7 +3329,7 @@ func deleteGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.Gatewa return nil } -func truncateGatewayServiceTopologyMappings(tx *txn, idx uint64, gateway structs.ServiceName, kind string) error { +func truncateGatewayServiceTopologyMappings(tx WriteTxn, idx uint64, gateway structs.ServiceName, kind string) error { // Only ingress gateways are standalone items in the mesh topology viz if kind != string(structs.ServiceKindIngressGateway) { return nil diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index d0ca48489..631e6ff1c 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -168,7 +168,7 @@ func serviceKindIndexName(kind structs.ServiceKind, _ *structs.EnterpriseMeta) s } } -func catalogUpdateServicesIndexes(tx *txn, idx uint64, _ *structs.EnterpriseMeta) error { +func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error { // overall services index if err := indexUpdateMaxTxn(tx, idx, "services"); err != nil { return fmt.Errorf("failed updating index: %s", err) @@ -177,7 +177,7 @@ func catalogUpdateServicesIndexes(tx *txn, idx uint64, _ *structs.EnterpriseMeta return nil } -func catalogUpdateServiceKindIndexes(tx *txn, kind structs.ServiceKind, idx uint64, _ *structs.EnterpriseMeta) error { +func catalogUpdateServiceKindIndexes(tx WriteTxn, kind structs.ServiceKind, idx uint64, _ *structs.EnterpriseMeta) error { // service-kind index if err := indexUpdateMaxTxn(tx, idx, serviceKindIndexName(kind, nil)); err != nil { return fmt.Errorf("failed updating index: %s", err) @@ -186,7 +186,7 @@ func catalogUpdateServiceKindIndexes(tx *txn, kind structs.ServiceKind, idx uint return nil } -func catalogUpdateServiceIndexes(tx *txn, serviceName string, idx uint64, _ *structs.EnterpriseMeta) error { +func catalogUpdateServiceIndexes(tx WriteTxn, serviceName string, idx uint64, _ *structs.EnterpriseMeta) error { // per-service index if err := indexUpdateMaxTxn(tx, idx, serviceIndexName(serviceName, nil)); err != nil { return fmt.Errorf("failed updating index: %s", err) @@ -195,14 +195,14 @@ func catalogUpdateServiceIndexes(tx *txn, serviceName string, idx uint64, _ *str return nil } -func catalogUpdateServiceExtinctionIndex(tx *txn, idx uint64, _ *structs.EnterpriseMeta) error { +func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error { if err := tx.Insert("index", &IndexEntry{serviceLastExtinctionIndexName, idx}); err != nil { return fmt.Errorf("failed updating missing service extinction index: %s", err) } return nil } -func catalogInsertService(tx *txn, svc *structs.ServiceNode) error { +func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error { // Insert the service and update the index if err := tx.Insert("services", svc); err != nil { return fmt.Errorf("failed inserting service: %s", err) @@ -269,7 +269,7 @@ func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMe return maxIndexWatchTxn(tx, ws, "nodes", "services") } -func catalogUpdateCheckIndexes(tx *txn, idx uint64, _ *structs.EnterpriseMeta) error { +func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error { // update the universal index entry if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) @@ -306,7 +306,7 @@ func catalogListServiceChecks(tx ReadTxn, node string, service string, _ *struct return tx.Get("checks", "node_service", node, service) } -func catalogInsertCheck(tx *txn, chk *structs.HealthCheck, idx uint64) error { +func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error { // Insert the check if err := tx.Insert("checks", chk); err != nil { return fmt.Errorf("failed inserting check: %s", err) @@ -323,7 +323,7 @@ func catalogChecksForNodeService(tx ReadTxn, node string, service string, entMet return tx.Get("checks", "node_service", node, service) } -func validateRegisterRequestTxn(_ *txn, _ *structs.RegisterRequest) (*structs.EnterpriseMeta, error) { +func validateRegisterRequestTxn(_ ReadTxn, _ *structs.RegisterRequest) (*structs.EnterpriseMeta, error) { return nil, nil } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 352a3f7ee..af80ac2d4 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -4415,12 +4415,12 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) { require.NoError(t, tx.Commit()) // ensure no update happened - tx = s.db.Txn(false) + roTxn := s.db.Txn(false) _, nsRead, err := s.NodeService("node1", "foo", nil) require.NoError(t, err) require.NotNil(t, nsRead) require.Equal(t, uint64(2), nsRead.ModifyIndex) - require.NoError(t, tx.Commit()) + roTxn.Commit() ns.ModifyIndex = 99 // attempt to update with a non-matching index @@ -4430,12 +4430,12 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) { require.NoError(t, tx.Commit()) // ensure no update happened - tx = s.db.Txn(false) + roTxn = s.db.Txn(false) _, nsRead, err = s.NodeService("node1", "foo", nil) require.NoError(t, err) require.NotNil(t, nsRead) require.Equal(t, uint64(2), nsRead.ModifyIndex) - require.NoError(t, tx.Commit()) + roTxn.Commit() ns.ModifyIndex = 2 // update with the matching modify index @@ -4445,12 +4445,12 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) { require.NoError(t, tx.Commit()) // ensure the update happened - tx = s.db.Txn(false) + roTxn = s.db.Txn(false) _, nsRead, err = s.NodeService("node1", "foo", nil) require.NoError(t, err) require.NotNil(t, nsRead) require.Equal(t, uint64(7), nsRead.ModifyIndex) - require.NoError(t, tx.Commit()) + roTxn.Commit() } func TestStateStore_GatewayServices_Terminating(t *testing.T) { diff --git a/agent/consul/state/config_entry_intention.go b/agent/consul/state/config_entry_intention.go index 464f1e380..c22e48427 100644 --- a/agent/consul/state/config_entry_intention.go +++ b/agent/consul/state/config_entry_intention.go @@ -1,12 +1,12 @@ package state import ( - "encoding/json" "fmt" "sort" - "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/structs" ) type ServiceIntentionLegacyIDIndex struct { @@ -123,7 +123,7 @@ func (s *ServiceIntentionSourceIndex) FromArgs(args ...interface{}) ([]byte, err return []byte(arg.String() + "\x00"), nil } -func (s *Store) configIntentionsListTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, bool, error) { +func (s *Store) configIntentionsListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, bool, error) { // unrolled part of configEntriesByKindTxn idx := maxIndexTxn(tx, configTableName) @@ -144,7 +144,7 @@ func (s *Store) configIntentionsListTxn(tx *txn, ws memdb.WatchSet, entMeta *str return idx, results, true, nil } -func (s *Store) configIntentionGetTxn(tx *txn, ws memdb.WatchSet, id string) (uint64, *structs.ServiceIntentionsConfigEntry, *structs.Intention, error) { +func (s *Store) configIntentionGetTxn(tx ReadTxn, ws memdb.WatchSet, id string) (uint64, *structs.ServiceIntentionsConfigEntry, *structs.Intention, error) { idx := maxIndexTxn(tx, configTableName) if idx < 1 { idx = 1 @@ -173,7 +173,7 @@ func (s *Store) configIntentionGetTxn(tx *txn, ws memdb.WatchSet, id string) (ui return idx, nil, nil, nil // Shouldn't happen. } -func (s *Store) configIntentionGetExactTxn(tx *txn, ws memdb.WatchSet, args *structs.IntentionQueryExact) (uint64, *structs.ServiceIntentionsConfigEntry, *structs.Intention, error) { +func (s *Store) configIntentionGetExactTxn(tx ReadTxn, ws memdb.WatchSet, args *structs.IntentionQueryExact) (uint64, *structs.ServiceIntentionsConfigEntry, *structs.Intention, error) { if err := args.Validate(); err != nil { return 0, nil, nil, err } @@ -196,7 +196,7 @@ func (s *Store) configIntentionGetExactTxn(tx *txn, ws memdb.WatchSet, args *str return idx, nil, nil, nil } -func (s *Store) configIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *structs.IntentionQueryMatch) (uint64, []structs.Intentions, error) { +func (s *Store) configIntentionMatchTxn(tx ReadTxn, ws memdb.WatchSet, args *structs.IntentionQueryMatch) (uint64, []structs.Intentions, error) { maxIndex := uint64(1) // Make all the calls and accumulate the results @@ -207,7 +207,7 @@ func (s *Store) configIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *struct // improving that in the future, the test cases shouldn't have to // change for that. - index, ixns, err := s.configIntentionMatchOneTxn(tx, ws, entry, args.Type) + index, ixns, err := configIntentionMatchOneTxn(tx, ws, entry, args.Type) if err != nil { return 0, nil, err } @@ -222,23 +222,23 @@ func (s *Store) configIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *struct return maxIndex, results, nil } -func (s *Store) configIntentionMatchOneTxn( - tx *txn, +func configIntentionMatchOneTxn( + tx ReadTxn, ws memdb.WatchSet, matchEntry structs.IntentionMatchEntry, matchType structs.IntentionMatchType, ) (uint64, structs.Intentions, error) { switch matchType { case structs.IntentionMatchSource: - return s.readSourceIntentionsFromConfigEntriesTxn(tx, ws, matchEntry.Name, matchEntry.GetEnterpriseMeta()) + return readSourceIntentionsFromConfigEntriesTxn(tx, ws, matchEntry.Name, matchEntry.GetEnterpriseMeta()) case structs.IntentionMatchDestination: - return s.readDestinationIntentionsFromConfigEntriesTxn(tx, ws, matchEntry.Name, matchEntry.GetEnterpriseMeta()) + return readDestinationIntentionsFromConfigEntriesTxn(tx, ws, matchEntry.Name, matchEntry.GetEnterpriseMeta()) default: return 0, nil, fmt.Errorf("invalid intention match type: %s", matchType) } } -func (s *Store) readSourceIntentionsFromConfigEntriesTxn(tx *txn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, error) { +func readSourceIntentionsFromConfigEntriesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, error) { idx := maxIndexTxn(tx, configTableName) var ( @@ -248,7 +248,7 @@ func (s *Store) readSourceIntentionsFromConfigEntriesTxn(tx *txn, ws memdb.Watch names := getIntentionPrecedenceMatchServiceNames(serviceName, entMeta) for _, sn := range names { - results, err = s.readSourceIntentionsFromConfigEntriesForServiceTxn( + results, err = readSourceIntentionsFromConfigEntriesForServiceTxn( tx, ws, sn.Name, &sn.EnterpriseMeta, results, ) if err != nil { @@ -262,7 +262,7 @@ func (s *Store) readSourceIntentionsFromConfigEntriesTxn(tx *txn, ws memdb.Watch return idx, results, nil } -func (s *Store) readSourceIntentionsFromConfigEntriesForServiceTxn(tx *txn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta, results structs.Intentions) (structs.Intentions, error) { +func readSourceIntentionsFromConfigEntriesForServiceTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta, results structs.Intentions) (structs.Intentions, error) { sn := structs.NewServiceName(serviceName, entMeta) iter, err := tx.Get(configTableName, "intention-source", sn) @@ -283,12 +283,7 @@ func (s *Store) readSourceIntentionsFromConfigEntriesForServiceTxn(tx *txn, ws m return results, nil } -func jd(v interface{}) string { - d, _ := json.MarshalIndent(v, "", " ") - return string(d) -} - -func (s *Store) readDestinationIntentionsFromConfigEntriesTxn(tx *txn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, error) { +func readDestinationIntentionsFromConfigEntriesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, error) { idx := maxIndexTxn(tx, configTableName) var results structs.Intentions diff --git a/agent/consul/state/connect_ca.go b/agent/consul/state/connect_ca.go index a60025ac1..7a467e75d 100644 --- a/agent/consul/state/connect_ca.go +++ b/agent/consul/state/connect_ca.go @@ -116,7 +116,7 @@ func (s *Store) CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, e return caConfigTxn(tx, ws) } -func caConfigTxn(tx *txn, ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) { +func caConfigTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) { // Get the CA config ch, c, err := tx.FirstWatch(caConfigTableName, "id") if err != nil { @@ -236,7 +236,7 @@ func (s *Store) CARoots(ws memdb.WatchSet) (uint64, structs.CARoots, error) { return caRootsTxn(tx, ws) } -func caRootsTxn(tx *txn, ws memdb.WatchSet) (uint64, structs.CARoots, error) { +func caRootsTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, structs.CARoots, error) { // Get the index idx := maxIndexTxn(tx, caRootTableName) diff --git a/agent/consul/state/federation_state.go b/agent/consul/state/federation_state.go index 88764d0e9..6854267d6 100644 --- a/agent/consul/state/federation_state.go +++ b/agent/consul/state/federation_state.go @@ -134,7 +134,7 @@ func (s *Store) FederationStateGet(ws memdb.WatchSet, datacenter string) (uint64 return federationStateGetTxn(tx, ws, datacenter) } -func federationStateGetTxn(tx *txn, ws memdb.WatchSet, datacenter string) (uint64, *structs.FederationState, error) { +func federationStateGetTxn(tx ReadTxn, ws memdb.WatchSet, datacenter string) (uint64, *structs.FederationState, error) { // Get the index idx := maxIndexTxn(tx, federationStateTableName) @@ -164,7 +164,7 @@ func (s *Store) FederationStateList(ws memdb.WatchSet) (uint64, []*structs.Feder return federationStateListTxn(tx, ws) } -func federationStateListTxn(tx *txn, ws memdb.WatchSet) (uint64, []*structs.FederationState, error) { +func federationStateListTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []*structs.FederationState, error) { // Get the index idx := maxIndexTxn(tx, federationStateTableName) diff --git a/agent/consul/state/graveyard.go b/agent/consul/state/graveyard.go index bfae5f2c2..8230d9b25 100644 --- a/agent/consul/state/graveyard.go +++ b/agent/consul/state/graveyard.go @@ -28,7 +28,7 @@ func NewGraveyard(gc *TombstoneGC) *Graveyard { } // InsertTxn adds a new tombstone. -func (g *Graveyard) InsertTxn(tx *txn, key string, idx uint64, entMeta *structs.EnterpriseMeta) error { +func (g *Graveyard) InsertTxn(tx WriteTxn, key string, idx uint64, entMeta *structs.EnterpriseMeta) error { stone := &Tombstone{ Key: key, Index: idx, @@ -51,7 +51,7 @@ func (g *Graveyard) InsertTxn(tx *txn, key string, idx uint64, entMeta *structs. // GetMaxIndexTxn returns the highest index tombstone whose key matches the // given context, using a prefix match. -func (g *Graveyard) GetMaxIndexTxn(tx *txn, prefix string, entMeta *structs.EnterpriseMeta) (uint64, error) { +func (g *Graveyard) GetMaxIndexTxn(tx ReadTxn, prefix string, entMeta *structs.EnterpriseMeta) (uint64, error) { stones, err := getWithTxn(tx, "tombstones", "id_prefix", prefix, entMeta) if err != nil { return 0, fmt.Errorf("failed querying tombstones: %s", err) @@ -68,7 +68,7 @@ func (g *Graveyard) GetMaxIndexTxn(tx *txn, prefix string, entMeta *structs.Ente } // DumpTxn returns all the tombstones. -func (g *Graveyard) DumpTxn(tx *txn) (memdb.ResultIterator, error) { +func (g *Graveyard) DumpTxn(tx ReadTxn) (memdb.ResultIterator, error) { iter, err := tx.Get("tombstones", "id") if err != nil { return nil, err diff --git a/agent/consul/state/graveyard_oss.go b/agent/consul/state/graveyard_oss.go index 157f23e1d..d4ee5848a 100644 --- a/agent/consul/state/graveyard_oss.go +++ b/agent/consul/state/graveyard_oss.go @@ -6,9 +6,7 @@ import ( "fmt" ) -func (g *Graveyard) insertTombstoneWithTxn(tx *txn, - table string, stone *Tombstone, updateMax bool) error { - +func (g *Graveyard) insertTombstoneWithTxn(tx WriteTxn, _ string, stone *Tombstone, updateMax bool) error { if err := tx.Insert("tombstones", stone); err != nil { return err } diff --git a/agent/consul/state/intention.go b/agent/consul/state/intention.go index 59a82e76c..b66738fb1 100644 --- a/agent/consul/state/intention.go +++ b/agent/consul/state/intention.go @@ -5,10 +5,11 @@ import ( "fmt" "sort" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/go-memdb" ) const ( @@ -142,7 +143,7 @@ func (s *Store) AreIntentionsInConfigEntries() (bool, error) { return areIntentionsInConfigEntries(tx) } -func areIntentionsInConfigEntries(tx *txn) (bool, error) { +func areIntentionsInConfigEntries(tx ReadTxn) (bool, error) { _, entry, err := systemMetadataGetTxn(tx, nil, structs.SystemMetadataIntentionFormatKey) if err != nil { return false, fmt.Errorf("failed system metadatalookup: %s", err) @@ -178,7 +179,7 @@ func (s *Store) Intentions(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) ( return s.configIntentionsListTxn(tx, ws, entMeta) } -func (s *Store) legacyIntentionsListTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, bool, error) { +func (s *Store) legacyIntentionsListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, bool, error) { // Get the index idx := maxIndexTxn(tx, intentionsTableName) if idx < 1 { @@ -230,7 +231,7 @@ func (s *Store) LegacyIntentionSet(idx uint64, ixn *structs.Intention) error { // legacyIntentionSetTxn is the inner method used to insert an intention with // the proper indexes into the state store. -func legacyIntentionSetTxn(tx *txn, idx uint64, ixn *structs.Intention) error { +func legacyIntentionSetTxn(tx WriteTxn, idx uint64, ixn *structs.Intention) error { // ID is required if ixn.ID == "" { return ErrMissingIntentionID @@ -301,7 +302,7 @@ func (s *Store) IntentionGet(ws memdb.WatchSet, id string) (uint64, *structs.Ser return s.configIntentionGetTxn(tx, ws, id) } -func (s *Store) legacyIntentionGetTxn(tx *txn, ws memdb.WatchSet, id string) (uint64, *structs.Intention, error) { +func (s *Store) legacyIntentionGetTxn(tx ReadTxn, ws memdb.WatchSet, id string) (uint64, *structs.Intention, error) { // Get the table index. idx := maxIndexTxn(tx, intentionsTableName) if idx < 1 { @@ -340,7 +341,7 @@ func (s *Store) IntentionGetExact(ws memdb.WatchSet, args *structs.IntentionQuer return s.configIntentionGetExactTxn(tx, ws, args) } -func (s *Store) legacyIntentionGetExactTxn(tx *txn, ws memdb.WatchSet, args *structs.IntentionQueryExact) (uint64, *structs.Intention, error) { +func (s *Store) legacyIntentionGetExactTxn(tx ReadTxn, ws memdb.WatchSet, args *structs.IntentionQueryExact) (uint64, *structs.Intention, error) { if err := args.Validate(); err != nil { return 0, nil, err } @@ -392,7 +393,7 @@ func (s *Store) LegacyIntentionDelete(idx uint64, id string) error { // legacyIntentionDeleteTxn is the inner method used to delete a legacy intention // with the proper indexes into the state store. -func legacyIntentionDeleteTxn(tx *txn, idx uint64, queryID string) error { +func legacyIntentionDeleteTxn(tx WriteTxn, idx uint64, queryID string) error { // Pull the query. wrapped, err := tx.First(intentionsTableName, "id", queryID) if err != nil { @@ -531,7 +532,7 @@ func (s *Store) IntentionMatch(ws memdb.WatchSet, args *structs.IntentionQueryMa return s.configIntentionMatchTxn(tx, ws, args) } -func (s *Store) legacyIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *structs.IntentionQueryMatch) (uint64, []structs.Intentions, error) { +func (s *Store) legacyIntentionMatchTxn(tx ReadTxn, ws memdb.WatchSet, args *structs.IntentionQueryMatch) (uint64, []structs.Intentions, error) { // Get the table index. idx := maxIndexTxn(tx, intentionsTableName) if idx < 1 { @@ -541,7 +542,7 @@ func (s *Store) legacyIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *struct // Make all the calls and accumulate the results results := make([]structs.Intentions, len(args.Entries)) for i, entry := range args.Entries { - ixns, err := s.intentionMatchOneTxn(tx, ws, entry, args.Type) + ixns, err := intentionMatchOneTxn(tx, ws, entry, args.Type) if err != nil { return 0, nil, err } @@ -575,13 +576,13 @@ func (s *Store) IntentionMatchOne( return 0, nil, err } if !usingConfigEntries { - return s.legacyIntentionMatchOneTxn(tx, ws, entry, matchType) + return legacyIntentionMatchOneTxn(tx, ws, entry, matchType) } - return s.configIntentionMatchOneTxn(tx, ws, entry, matchType) + return configIntentionMatchOneTxn(tx, ws, entry, matchType) } -func (s *Store) legacyIntentionMatchOneTxn( - tx *txn, +func legacyIntentionMatchOneTxn( + tx ReadTxn, ws memdb.WatchSet, entry structs.IntentionMatchEntry, matchType structs.IntentionMatchType, @@ -592,7 +593,7 @@ func (s *Store) legacyIntentionMatchOneTxn( idx = 1 } - results, err := s.intentionMatchOneTxn(tx, ws, entry, matchType) + results, err := intentionMatchOneTxn(tx, ws, entry, matchType) if err != nil { return 0, nil, err } @@ -602,7 +603,7 @@ func (s *Store) legacyIntentionMatchOneTxn( return idx, results, nil } -func (s *Store) intentionMatchOneTxn(tx ReadTxn, ws memdb.WatchSet, +func intentionMatchOneTxn(tx ReadTxn, ws memdb.WatchSet, entry structs.IntentionMatchEntry, matchType structs.IntentionMatchType) (structs.Intentions, error) { // Each search entry may require multiple queries to memdb, so this diff --git a/agent/consul/state/intention_oss.go b/agent/consul/state/intention_oss.go index 56944467d..dcdadfa43 100644 --- a/agent/consul/state/intention_oss.go +++ b/agent/consul/state/intention_oss.go @@ -7,7 +7,7 @@ import ( memdb "github.com/hashicorp/go-memdb" ) -func intentionListTxn(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func intentionListTxn(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { // Get all intentions return tx.Get(intentionsTableName, "id") } diff --git a/agent/consul/state/kvs.go b/agent/consul/state/kvs.go index 0926e3573..0dd6c36fa 100644 --- a/agent/consul/state/kvs.go +++ b/agent/consul/state/kvs.go @@ -117,7 +117,7 @@ func (s *Store) KVSSet(idx uint64, entry *structs.DirEntry) error { // If updateSession is true, then the incoming entry will set the new // session (should be validated before calling this). Otherwise, we will keep // whatever the existing session is. -func kvsSetTxn(tx *txn, idx uint64, entry *structs.DirEntry, updateSession bool) error { +func kvsSetTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry, updateSession bool) error { // Retrieve an existing KV pair existingNode, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta) if err != nil { @@ -170,7 +170,7 @@ func (s *Store) KVSGet(ws memdb.WatchSet, key string, entMeta *structs.Enterpris // kvsGetTxn is the inner method that gets a KVS entry inside an existing // transaction. -func kvsGetTxn(tx *txn, +func kvsGetTxn(tx ReadTxn, ws memdb.WatchSet, key string, entMeta *structs.EnterpriseMeta) (uint64, *structs.DirEntry, error) { // Get the table index. @@ -203,7 +203,7 @@ func (s *Store) KVSList(ws memdb.WatchSet, // kvsListTxn is the inner method that gets a list of KVS entries matching a // prefix. -func (s *Store) kvsListTxn(tx *txn, +func (s *Store) kvsListTxn(tx ReadTxn, ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) { // Get the table indexes. @@ -252,7 +252,7 @@ func (s *Store) KVSDelete(idx uint64, key string, entMeta *structs.EnterpriseMet // kvsDeleteTxn is the inner method used to perform the actual deletion // of a key/value pair within an existing transaction. -func (s *Store) kvsDeleteTxn(tx *txn, idx uint64, key string, entMeta *structs.EnterpriseMeta) error { +func (s *Store) kvsDeleteTxn(tx WriteTxn, idx uint64, key string, entMeta *structs.EnterpriseMeta) error { // Look up the entry in the state store. entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta) if err != nil { @@ -289,7 +289,7 @@ func (s *Store) KVSDeleteCAS(idx, cidx uint64, key string, entMeta *structs.Ente // kvsDeleteCASTxn is the inner method that does a CAS delete within an existing // transaction. -func (s *Store) kvsDeleteCASTxn(tx *txn, idx, cidx uint64, key string, entMeta *structs.EnterpriseMeta) (bool, error) { +func (s *Store) kvsDeleteCASTxn(tx WriteTxn, idx, cidx uint64, key string, entMeta *structs.EnterpriseMeta) (bool, error) { // Retrieve the existing kvs entry, if any exists. entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta) if err != nil { @@ -330,7 +330,7 @@ func (s *Store) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) { // kvsSetCASTxn is the inner method used to do a CAS inside an existing // transaction. -func kvsSetCASTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) { +func kvsSetCASTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry) (bool, error) { // Retrieve the existing entry. existing, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta) if err != nil { @@ -394,7 +394,7 @@ func (s *Store) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) { // kvsLockTxn is the inner method that does a lock inside an existing // transaction. -func kvsLockTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) { +func kvsLockTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry) (bool, error) { // Verify that a session is present. if entry.Session == "" { return false, fmt.Errorf("missing session") @@ -460,7 +460,7 @@ func (s *Store) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) { // kvsUnlockTxn is the inner method that does an unlock inside an existing // transaction. -func kvsUnlockTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) { +func kvsUnlockTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry) (bool, error) { // Verify that a session is present. if entry.Session == "" { return false, fmt.Errorf("missing session") @@ -498,7 +498,7 @@ func kvsUnlockTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) { // kvsCheckSessionTxn checks to see if the given session matches the current // entry for a key. -func kvsCheckSessionTxn(tx *txn, +func kvsCheckSessionTxn(tx WriteTxn, key string, session string, entMeta *structs.EnterpriseMeta) (*structs.DirEntry, error) { entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta) @@ -519,7 +519,7 @@ func kvsCheckSessionTxn(tx *txn, // kvsCheckIndexTxn checks to see if the given modify index matches the current // entry for a key. -func kvsCheckIndexTxn(tx *txn, +func kvsCheckIndexTxn(tx WriteTxn, key string, cidx uint64, entMeta *structs.EnterpriseMeta) (*structs.DirEntry, error) { entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta) diff --git a/agent/consul/state/kvs_oss.go b/agent/consul/state/kvs_oss.go index b6586a8e5..72389d24f 100644 --- a/agent/consul/state/kvs_oss.go +++ b/agent/consul/state/kvs_oss.go @@ -16,7 +16,7 @@ func kvsIndexer() *memdb.StringFieldIndex { } } -func insertKVTxn(tx *txn, entry *structs.DirEntry, updateMax bool) error { +func insertKVTxn(tx WriteTxn, entry *structs.DirEntry, updateMax bool) error { if err := tx.Insert("kvs", entry); err != nil { return err } @@ -33,7 +33,7 @@ func insertKVTxn(tx *txn, entry *structs.DirEntry, updateMax bool) error { return nil } -func kvsListEntriesTxn(tx *txn, ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) { +func kvsListEntriesTxn(tx ReadTxn, ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) { var ents structs.DirEntries var lindex uint64 @@ -56,7 +56,7 @@ func kvsListEntriesTxn(tx *txn, ws memdb.WatchSet, prefix string, entMeta *struc // kvsDeleteTreeTxn is the inner method that does a recursive delete inside an // existing transaction. -func (s *Store) kvsDeleteTreeTxn(tx *txn, idx uint64, prefix string, entMeta *structs.EnterpriseMeta) error { +func (s *Store) kvsDeleteTreeTxn(tx WriteTxn, idx uint64, prefix string, entMeta *structs.EnterpriseMeta) error { // For prefix deletes, only insert one tombstone and delete the entire subtree deleted, err := tx.DeletePrefix("kvs", "id_prefix", prefix) if err != nil { @@ -77,11 +77,11 @@ func (s *Store) kvsDeleteTreeTxn(tx *txn, idx uint64, prefix string, entMeta *st return nil } -func kvsMaxIndex(tx *txn, entMeta *structs.EnterpriseMeta) uint64 { +func kvsMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 { return maxIndexTxn(tx, "kvs", "tombstones") } -func kvsDeleteWithEntry(tx *txn, entry *structs.DirEntry, idx uint64) error { +func kvsDeleteWithEntry(tx WriteTxn, entry *structs.DirEntry, idx uint64) error { // Delete the entry and update the index. if err := tx.Delete("kvs", entry); err != nil { return fmt.Errorf("failed deleting kvs entry: %s", err) diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 5e6bbb604..d7076dacd 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -13,14 +13,22 @@ type ReadTxn interface { Get(table, index string, args ...interface{}) (memdb.ResultIterator, error) First(table, index string, args ...interface{}) (interface{}, error) FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) +} + +// AbortTxn is a ReadTxn that can also be aborted to end the transaction. +type AbortTxn interface { + ReadTxn Abort() } // WriteTxn is implemented by memdb.Txn to perform write operations. type WriteTxn interface { ReadTxn + Defer(func()) + Delete(table string, obj interface{}) error + DeleteAll(table, index string, args ...interface{}) (int, error) + DeletePrefix(table string, index string, prefix string) (bool, error) Insert(table string, obj interface{}) error - Commit() error } // Changes wraps a memdb.Changes to include the index at which these changes @@ -46,20 +54,16 @@ type changeTrackerDB struct { // with write=true. // // Deprecated: use either ReadTxn, or WriteTxn. -func (c *changeTrackerDB) Txn(write bool) *txn { +func (c *changeTrackerDB) Txn(write bool) *memdb.Txn { if write { panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)") } return c.ReadTxn() } -// ReadTxn returns a read-only transaction which behaves exactly the same as -// memdb.Txn -// -// TODO: this could return a regular memdb.Txn if all the state functions accepted -// the ReadTxn interface -func (c *changeTrackerDB) ReadTxn() *txn { - return &txn{Txn: c.db.Txn(false)} +// ReadTxn returns a read-only transaction. +func (c *changeTrackerDB) ReadTxn() *memdb.Txn { + return c.db.Txn(false) } // WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store. diff --git a/agent/consul/state/prepared_query.go b/agent/consul/state/prepared_query.go index d4ffa6f1a..3009dc577 100644 --- a/agent/consul/state/prepared_query.go +++ b/agent/consul/state/prepared_query.go @@ -258,7 +258,7 @@ func (s *Store) PreparedQueryDelete(idx uint64, queryID string) error { // preparedQueryDeleteTxn is the inner method used to delete a prepared query // with the proper indexes into the state store. -func preparedQueryDeleteTxn(tx *txn, idx uint64, queryID string) error { +func preparedQueryDeleteTxn(tx WriteTxn, idx uint64, queryID string) error { // Pull the query. wrapped, err := tx.First("prepared-queries", "id", queryID) if err != nil { diff --git a/agent/consul/state/session.go b/agent/consul/state/session.go index e6fb906e1..6308bfd47 100644 --- a/agent/consul/state/session.go +++ b/agent/consul/state/session.go @@ -301,7 +301,7 @@ func (s *Store) SessionDestroy(idx uint64, sessionID string, entMeta *structs.En // deleteSessionTxn is the inner method, which is used to do the actual // session deletion and handle session invalidation, etc. -func (s *Store) deleteSessionTxn(tx *txn, idx uint64, sessionID string, entMeta *structs.EnterpriseMeta) error { +func (s *Store) deleteSessionTxn(tx WriteTxn, idx uint64, sessionID string, entMeta *structs.EnterpriseMeta) error { // Look up the session. sess, err := firstWithTxn(tx, "sessions", "id", sessionID, entMeta) if err != nil { diff --git a/agent/consul/state/session_oss.go b/agent/consul/state/session_oss.go index f7b6517eb..f8c04801e 100644 --- a/agent/consul/state/session_oss.go +++ b/agent/consul/state/session_oss.go @@ -35,7 +35,7 @@ func nodeChecksIndexer() *memdb.CompoundIndex { } } -func sessionDeleteWithSession(tx *txn, session *structs.Session, idx uint64) error { +func sessionDeleteWithSession(tx WriteTxn, session *structs.Session, idx uint64) error { if err := tx.Delete("sessions", session); err != nil { return fmt.Errorf("failed deleting session: %s", err) } @@ -80,11 +80,11 @@ func insertSessionTxn(tx *txn, session *structs.Session, idx uint64, updateMax b return nil } -func allNodeSessionsTxn(tx *txn, node string) (structs.Sessions, error) { +func allNodeSessionsTxn(tx ReadTxn, node string) (structs.Sessions, error) { return nodeSessionsTxn(tx, nil, node, nil) } -func nodeSessionsTxn(tx *txn, +func nodeSessionsTxn(tx ReadTxn, ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (structs.Sessions, error) { sessions, err := tx.Get("sessions", "node", node) @@ -100,7 +100,7 @@ func nodeSessionsTxn(tx *txn, return result, nil } -func sessionMaxIndex(tx *txn, entMeta *structs.EnterpriseMeta) uint64 { +func sessionMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 { return maxIndexTxn(tx, "sessions") } diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index a02cd864d..e71b995f1 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -122,7 +122,7 @@ type Store struct { // works by starting a read transaction against the whole state store. type Snapshot struct { store *Store - tx *txn + tx AbortTxn lastIndex uint64 } @@ -288,7 +288,7 @@ func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 { // indexUpdateMaxTxn is used when restoring entries and sets the table's index to // the given idx only if it's greater than the current index. -func indexUpdateMaxTxn(tx *txn, idx uint64, table string) error { +func indexUpdateMaxTxn(tx WriteTxn, idx uint64, table string) error { ti, err := tx.First("index", "id", table) if err != nil { return fmt.Errorf("failed to retrieve existing index: %s", err) diff --git a/agent/consul/state/txn.go b/agent/consul/state/txn.go index 04bb62660..f4306d0fb 100644 --- a/agent/consul/state/txn.go +++ b/agent/consul/state/txn.go @@ -8,7 +8,7 @@ import ( ) // txnKVS handles all KV-related operations. -func (s *Store) txnKVS(tx *txn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) { +func (s *Store) txnKVS(tx WriteTxn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) { var entry *structs.DirEntry var err error @@ -110,7 +110,7 @@ func (s *Store) txnKVS(tx *txn, idx uint64, op *structs.TxnKVOp) (structs.TxnRes } // txnSession handles all Session-related operations. -func txnSession(tx *txn, idx uint64, op *structs.TxnSessionOp) error { +func txnSession(tx WriteTxn, idx uint64, op *structs.TxnSessionOp) error { var err error switch op.Verb { @@ -129,7 +129,7 @@ func txnSession(tx *txn, idx uint64, op *structs.TxnSessionOp) error { // txnLegacyIntention handles all Intention-related operations. // // Deprecated: see TxnOp.Intention description -func txnLegacyIntention(tx *txn, idx uint64, op *structs.TxnIntentionOp) error { +func txnLegacyIntention(tx WriteTxn, idx uint64, op *structs.TxnIntentionOp) error { switch op.Op { case structs.IntentionOpCreate, structs.IntentionOpUpdate: return legacyIntentionSetTxn(tx, idx, op.Intention) @@ -145,7 +145,7 @@ func txnLegacyIntention(tx *txn, idx uint64, op *structs.TxnIntentionOp) error { } // txnNode handles all Node-related operations. -func (s *Store) txnNode(tx *txn, idx uint64, op *structs.TxnNodeOp) (structs.TxnResults, error) { +func (s *Store) txnNode(tx WriteTxn, idx uint64, op *structs.TxnNodeOp) (structs.TxnResults, error) { var entry *structs.Node var err error @@ -214,7 +214,7 @@ func (s *Store) txnNode(tx *txn, idx uint64, op *structs.TxnNodeOp) (structs.Txn } // txnService handles all Service-related operations. -func (s *Store) txnService(tx *txn, idx uint64, op *structs.TxnServiceOp) (structs.TxnResults, error) { +func (s *Store) txnService(tx WriteTxn, idx uint64, op *structs.TxnServiceOp) (structs.TxnResults, error) { switch op.Verb { case api.ServiceGet: entry, err := getNodeServiceTxn(tx, op.Node, op.Service.ID, &op.Service.EnterpriseMeta) @@ -276,7 +276,7 @@ func newTxnResultFromNodeServiceEntry(entry *structs.NodeService) structs.TxnRes } // txnCheck handles all Check-related operations. -func (s *Store) txnCheck(tx *txn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) { +func (s *Store) txnCheck(tx WriteTxn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) { var entry *structs.HealthCheck var err error @@ -338,7 +338,7 @@ func (s *Store) txnCheck(tx *txn, idx uint64, op *structs.TxnCheckOp) (structs.T } // txnDispatch runs the given operations inside the state store transaction. -func (s *Store) txnDispatch(tx *txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { +func (s *Store) txnDispatch(tx WriteTxn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { results := make(structs.TxnResults, 0, len(ops)) errors := make(structs.TxnErrors, 0, len(ops)) for i, op := range ops {