Handle return value from txn.Commit

This commit is contained in:
Daniel Nephin 2020-06-02 16:34:56 -04:00
parent 50db8f409a
commit 78c76f0773
19 changed files with 95 additions and 139 deletions

View File

@ -202,7 +202,9 @@ func (c *FSM) Restore(old io.ReadCloser) error {
return fmt.Errorf("Unrecognized msg type %d", msg)
}
}
restore.Commit()
if err := restore.Commit(); err != nil {
return err
}
// External code might be calling State(), so we need to synchronize
// here to make sure we swap in the new state store atomically.

View File

@ -310,8 +310,7 @@ func (s *Store) ACLBootstrap(idx, resetIndex uint64, token *structs.ACLToken, le
if err := tx.Insert("index", &IndexEntry{"acl-token-bootstrap", idx}); err != nil {
return fmt.Errorf("failed to mark ACL bootstrapping as complete: %v", err)
}
tx.Commit()
return nil
return tx.Commit()
}
// CanBootstrapACLToken checks if bootstrapping is possible and returns the reset index
@ -636,8 +635,7 @@ func (s *Store) ACLTokenSet(idx uint64, token *structs.ACLToken, legacy bool) er
return err
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) ACLTokenBatchSet(idx uint64, tokens structs.ACLTokens, cas, allowMissingPolicyAndRoleIDs, prohibitUnprivileged bool) error {
@ -650,8 +648,7 @@ func (s *Store) ACLTokenBatchSet(idx uint64, tokens structs.ACLTokens, cas, allo
}
}
tx.Commit()
return nil
return tx.Commit()
}
// aclTokenSetTxn is the inner method used to insert an ACL token with the
@ -1030,8 +1027,7 @@ func (s *Store) ACLTokenBatchDelete(idx uint64, tokenIDs []string) error {
}
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) aclTokenDelete(idx uint64, value, index string, entMeta *structs.EnterpriseMeta) error {
@ -1042,8 +1038,7 @@ func (s *Store) aclTokenDelete(idx uint64, value, index string, entMeta *structs
return err
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) aclTokenDeleteTxn(tx *txnWrapper, idx uint64, value, index string, entMeta *structs.EnterpriseMeta) error {
@ -1099,8 +1094,7 @@ func (s *Store) ACLPolicyBatchSet(idx uint64, policies structs.ACLPolicies) erro
}
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) ACLPolicySet(idx uint64, policy *structs.ACLPolicy) error {
@ -1111,8 +1105,7 @@ func (s *Store) ACLPolicySet(idx uint64, policy *structs.ACLPolicy) error {
return err
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) aclPolicySetTxn(tx *txnWrapper, idx uint64, policy *structs.ACLPolicy) error {
@ -1277,8 +1270,7 @@ func (s *Store) ACLPolicyBatchDelete(idx uint64, policyIDs []string) error {
return err
}
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) aclPolicyDelete(idx uint64, value string, fn aclPolicyGetFn, entMeta *structs.EnterpriseMeta) error {
@ -1289,8 +1281,7 @@ func (s *Store) aclPolicyDelete(idx uint64, value string, fn aclPolicyGetFn, ent
return err
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) aclPolicyDeleteTxn(tx *txnWrapper, idx uint64, value string, fn aclPolicyGetFn, entMeta *structs.EnterpriseMeta) error {
@ -1323,8 +1314,7 @@ func (s *Store) ACLRoleBatchSet(idx uint64, roles structs.ACLRoles, allowMissing
}
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) ACLRoleSet(idx uint64, role *structs.ACLRole) error {
@ -1335,8 +1325,7 @@ func (s *Store) ACLRoleSet(idx uint64, role *structs.ACLRole) error {
return err
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) aclRoleSetTxn(tx *txnWrapper, idx uint64, role *structs.ACLRole, allowMissing bool) error {
@ -1518,8 +1507,7 @@ func (s *Store) ACLRoleBatchDelete(idx uint64, roleIDs []string) error {
return err
}
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) aclRoleDelete(idx uint64, value string, fn aclRoleGetFn, entMeta *structs.EnterpriseMeta) error {
@ -1530,8 +1518,7 @@ func (s *Store) aclRoleDelete(idx uint64, value string, fn aclRoleGetFn, entMeta
return err
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) aclRoleDeleteTxn(tx *txnWrapper, idx uint64, value string, fn aclRoleGetFn, entMeta *structs.EnterpriseMeta) error {
@ -1560,8 +1547,7 @@ func (s *Store) ACLBindingRuleBatchSet(idx uint64, rules structs.ACLBindingRules
}
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) ACLBindingRuleSet(idx uint64, rule *structs.ACLBindingRule) error {
@ -1571,8 +1557,7 @@ func (s *Store) ACLBindingRuleSet(idx uint64, rule *structs.ACLBindingRule) erro
if err := s.aclBindingRuleSetTxn(tx, idx, rule); err != nil {
return err
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) aclBindingRuleSetTxn(tx *txnWrapper, idx uint64, rule *structs.ACLBindingRule) error {
@ -1677,8 +1662,7 @@ func (s *Store) ACLBindingRuleBatchDelete(idx uint64, bindingRuleIDs []string) e
for _, bindingRuleID := range bindingRuleIDs {
s.aclBindingRuleDeleteTxn(tx, idx, bindingRuleID, nil)
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) aclBindingRuleDelete(idx uint64, id string, entMeta *structs.EnterpriseMeta) error {
@ -1689,8 +1673,7 @@ func (s *Store) aclBindingRuleDelete(idx uint64, id string, entMeta *structs.Ent
return err
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) aclBindingRuleDeleteTxn(tx *txnWrapper, idx uint64, id string, entMeta *structs.EnterpriseMeta) error {
@ -1748,8 +1731,7 @@ func (s *Store) ACLAuthMethodBatchSet(idx uint64, methods structs.ACLAuthMethods
return err
}
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) ACLAuthMethodSet(idx uint64, method *structs.ACLAuthMethod) error {
@ -1760,8 +1742,7 @@ func (s *Store) ACLAuthMethodSet(idx uint64, method *structs.ACLAuthMethod) erro
return err
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) aclAuthMethodSetTxn(tx *txnWrapper, idx uint64, method *structs.ACLAuthMethod) error {
@ -1865,8 +1846,7 @@ func (s *Store) ACLAuthMethodBatchDelete(idx uint64, names []string, entMeta *st
s.aclAuthMethodDeleteTxn(tx, idx, name, entMeta)
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) aclAuthMethodDelete(idx uint64, name string, entMeta *structs.EnterpriseMeta) error {
@ -1877,8 +1857,7 @@ func (s *Store) aclAuthMethodDelete(idx uint64, name string, entMeta *structs.En
return err
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) aclAuthMethodDeleteTxn(tx *txnWrapper, idx uint64, name string, entMeta *structs.EnterpriseMeta) error {

View File

@ -81,8 +81,7 @@ func (s *Store) AutopilotSetConfig(idx uint64, config *autopilot.Config) error {
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// AutopilotCASConfig is used to try updating the Autopilot configuration with a
@ -110,8 +109,8 @@ func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *autopilot.Config) (
return false, err
}
tx.Commit()
return true, nil
err = tx.Commit()
return err == nil, err
}
func (s *Store) autopilotSetConfigTxn(idx uint64, tx *txnWrapper, config *autopilot.Config) error {

View File

@ -233,8 +233,7 @@ func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) err
return err
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) ensureCheckIfNodeMatches(tx *txnWrapper, idx uint64, node string, check *structs.HealthCheck) error {
@ -324,8 +323,7 @@ func (s *Store) EnsureNode(idx uint64, node *structs.Node) error {
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// ensureNoNodeWithSimilarNameTxn checks that no other node has conflict in its name
@ -599,8 +597,7 @@ func (s *Store) DeleteNode(idx uint64, nodeName string) error {
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// deleteNodeCASTxn is used to try doing a node delete operation with a given
@ -733,8 +730,7 @@ func (s *Store) EnsureService(idx uint64, node string, svc *structs.NodeService)
return err
}
tx.Commit()
return nil
return tx.Commit()
}
var errCASCompareFailed = errors.New("compare-and-set: comparison failed")
@ -1403,8 +1399,7 @@ func (s *Store) DeleteService(idx uint64, nodeName, serviceID string, entMeta *s
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// deleteServiceCASTxn is used to try doing a service delete operation with a given
@ -1541,8 +1536,7 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node
@ -1879,8 +1873,7 @@ func (s *Store) DeleteCheck(idx uint64, node string, checkID types.CheckID, entM
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// deleteCheckCASTxn is used to try doing a check delete operation with a given

View File

@ -4385,7 +4385,7 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) {
tx := s.db.WriteTxnRestore()
err := s.ensureServiceCASTxn(tx, 3, "node1", &ns)
require.Equal(t, err, errCASCompareFailed)
tx.Commit()
require.NoError(t, tx.Commit())
// ensure no update happened
tx = s.db.Txn(false)
@ -4393,14 +4393,14 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, nsRead)
require.Equal(t, uint64(2), nsRead.ModifyIndex)
tx.Commit()
require.NoError(t, tx.Commit())
ns.ModifyIndex = 99
// attempt to update with a non-matching index
tx = s.db.WriteTxnRestore()
err = s.ensureServiceCASTxn(tx, 4, "node1", &ns)
require.Equal(t, err, errCASCompareFailed)
tx.Commit()
require.NoError(t, tx.Commit())
// ensure no update happened
tx = s.db.Txn(false)
@ -4408,14 +4408,14 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, nsRead)
require.Equal(t, uint64(2), nsRead.ModifyIndex)
tx.Commit()
require.NoError(t, tx.Commit())
ns.ModifyIndex = 2
// update with the matching modify index
tx = s.db.WriteTxnRestore()
err = s.ensureServiceCASTxn(tx, 7, "node1", &ns)
require.NoError(t, err)
tx.Commit()
require.NoError(t, tx.Commit())
// ensure the update happened
tx = s.db.Txn(false)
@ -4423,7 +4423,7 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, nsRead)
require.Equal(t, uint64(7), nsRead.ModifyIndex)
tx.Commit()
require.NoError(t, tx.Commit())
}
func TestStateStore_GatewayServices_Terminating(t *testing.T) {

View File

@ -174,8 +174,7 @@ func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry, entMeta
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// ensureConfigEntryTxn upserts a config entry inside of a transaction.
@ -246,8 +245,8 @@ func (s *Store) EnsureConfigEntryCAS(idx, cidx uint64, conf structs.ConfigEntry,
return false, err
}
tx.Commit()
return true, nil
err = tx.Commit()
return err == nil, err
}
func (s *Store) DeleteConfigEntry(idx uint64, kind, name string, entMeta *structs.EnterpriseMeta) error {
@ -294,8 +293,7 @@ func (s *Store) DeleteConfigEntry(idx uint64, kind, name string, entMeta *struct
return fmt.Errorf("failed updating index: %s", err)
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) insertConfigEntryWithTxn(tx *txnWrapper, idx uint64, conf structs.ConfigEntry) error {

View File

@ -142,8 +142,7 @@ func (s *Store) CASetConfig(idx uint64, config *structs.CAConfiguration) error {
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// CACheckAndSetConfig is used to try updating the CA configuration with a
@ -171,8 +170,8 @@ func (s *Store) CACheckAndSetConfig(idx, cidx uint64, config *structs.CAConfigur
return false, err
}
tx.Commit()
return true, nil
err = tx.Commit()
return err == nil, err
}
func (s *Store) caSetConfigTxn(idx uint64, tx *txnWrapper, config *structs.CAConfiguration) error {
@ -336,8 +335,8 @@ func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, erro
return false, fmt.Errorf("failed updating index: %s", err)
}
tx.Commit()
return true, nil
err = tx.Commit()
return err == nil, err
}
// CAProviderState is used to pull the built-in provider states from the snapshot.
@ -417,9 +416,8 @@ func (s *Store) CASetProviderState(idx uint64, state *structs.CAConsulProviderSt
return false, fmt.Errorf("failed updating index: %s", err)
}
tx.Commit()
return true, nil
err = tx.Commit()
return err == nil, err
}
// CADeleteProviderState is used to remove the built-in Consul CA provider
@ -447,9 +445,7 @@ func (s *Store) CADeleteProviderState(idx uint64, id string) error {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) CALeafSetIndex(idx uint64, index uint64) error {
@ -504,7 +500,6 @@ func (s *Store) CAIncrementProviderSerialNumber(idx uint64) (uint64, error) {
return 0, fmt.Errorf("failed updating index: %s", err)
}
tx.Commit()
return next, nil
err = tx.Commit()
return next, err
}

View File

@ -167,6 +167,5 @@ func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) e
return fmt.Errorf("failed updating index: %s", err)
}
tx.Commit()
return nil
return tx.Commit()
}

View File

@ -274,7 +274,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
if err := tx.Insert("coordinates", badUpdate); err != nil {
t.Fatalf("err: %v", err)
}
tx.Commit()
require.NoError(t, tx.Commit())
// Snapshot the coordinates.
snap := s.Snapshot()

View File

@ -68,8 +68,7 @@ func (s *Store) FederationStateBatchSet(idx uint64, configs structs.FederationSt
}
}
tx.Commit()
return nil
return tx.Commit()
}
// FederationStateSet is called to do an upsert of a given federation state.
@ -81,8 +80,7 @@ func (s *Store) FederationStateSet(idx uint64, config *structs.FederationState)
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// federationStateSetTxn upserts a federation state inside of a transaction.
@ -191,8 +189,7 @@ func (s *Store) FederationStateDelete(idx uint64, datacenter string) error {
return err
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) FederationStateBatchDelete(idx uint64, datacenters []string) error {
@ -205,8 +202,7 @@ func (s *Store) FederationStateBatchDelete(idx uint64, datacenters []string) err
}
}
tx.Commit()
return nil
return tx.Commit()
}
func (s *Store) federationStateDeleteTxn(tx *txnWrapper, idx uint64, datacenter string) error {

View File

@ -3,6 +3,8 @@ package state
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestGraveyard_Lifecycle(t *testing.T) {
@ -29,7 +31,7 @@ func TestGraveyard_Lifecycle(t *testing.T) {
if err := g.InsertTxn(tx, "some/other/path", 9, nil); err != nil {
t.Fatalf("err: %s", err)
}
tx.Commit()
require.NoError(t, tx.Commit())
}()
// Check some prefixes.
@ -68,7 +70,7 @@ func TestGraveyard_Lifecycle(t *testing.T) {
if err := g.ReapTxn(tx, 6); err != nil {
t.Fatalf("err: %s", err)
}
tx.Commit()
require.NoError(t, tx.Commit())
}()
// Check prefixes to see that the reap took effect at the right index.
@ -142,7 +144,7 @@ func TestGraveyard_GC_Trigger(t *testing.T) {
if err := g.InsertTxn(tx, "foo/in/the/house", 2, nil); err != nil {
t.Fatalf("err: %s", err)
}
tx.Commit()
require.NoError(t, tx.Commit())
}()
// Make sure the GC got hinted.
@ -185,7 +187,7 @@ func TestGraveyard_Snapshot_Restore(t *testing.T) {
if err := g.InsertTxn(tx, "some/other/path", 9, nil); err != nil {
t.Fatalf("err: %s", err)
}
tx.Commit()
require.NoError(t, tx.Commit())
}()
// Verify the index was set correctly.
@ -240,7 +242,7 @@ func TestGraveyard_Snapshot_Restore(t *testing.T) {
t.Fatalf("err: %s", err)
}
}
tx.Commit()
require.NoError(t, tx.Commit())
}()
// Verify that the restore works.

View File

@ -164,8 +164,7 @@ func (s *Store) IntentionSet(idx uint64, ixn *structs.Intention) error {
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// intentionSetTxn is the inner method used to insert an intention with
@ -260,8 +259,7 @@ func (s *Store) IntentionDelete(idx uint64, id string) error {
return fmt.Errorf("failed intention delete: %s", err)
}
tx.Commit()
return nil
return tx.Commit()
}
// intentionDeleteTxn is the inner method used to delete a intention

View File

@ -96,8 +96,7 @@ func (s *Store) ReapTombstones(idx uint64, index uint64) error {
return fmt.Errorf("failed to reap kvs tombstones: %s", err)
}
tx.Commit()
return nil
return tx.Commit()
}
// KVSSet is used to store a key/value pair.
@ -110,8 +109,7 @@ func (s *Store) KVSSet(idx uint64, entry *structs.DirEntry) error {
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// kvsSetTxn is used to insert or update a key/value pair in the state
@ -249,8 +247,7 @@ func (s *Store) KVSDelete(idx uint64, key string, entMeta *structs.EnterpriseMet
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// kvsDeleteTxn is the inner method used to perform the actual deletion
@ -286,8 +283,8 @@ func (s *Store) KVSDeleteCAS(idx, cidx uint64, key string, entMeta *structs.Ente
return false, err
}
tx.Commit()
return true, nil
err = tx.Commit()
return err == nil, err
}
// kvsDeleteCASTxn is the inner method that does a CAS delete within an existing
@ -327,8 +324,8 @@ func (s *Store) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) {
return false, err
}
tx.Commit()
return true, nil
err = tx.Commit()
return err == nil, err
}
// kvsSetCASTxn is the inner method used to do a CAS inside an existing
@ -371,8 +368,7 @@ func (s *Store) KVSDeleteTree(idx uint64, prefix string, entMeta *structs.Enterp
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// KVSLockDelay returns the expiration time for any lock delay associated with
@ -392,8 +388,8 @@ func (s *Store) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) {
return false, err
}
tx.Commit()
return true, nil
err = tx.Commit()
return err == nil, err
}
// kvsLockTxn is the inner method that does a lock inside an existing
@ -458,8 +454,8 @@ func (s *Store) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) {
return false, err
}
tx.Commit()
return true, nil
err = tx.Commit()
return err == nil, err
}
// kvsUnlockTxn is the inner method that does an unlock inside an existing

View File

@ -85,11 +85,9 @@ type txnWrapper struct {
// Note that this function, unlike memdb.Txn, returns an error which must be checked
// by the caller. A non-nil error indicates that a commit failed and was not
// applied.
// TODO: currently none of the callers check error, should they all be changed?
func (tx *txnWrapper) Commit() error {
// changes may be empty if this is a read-only or WriteTxnRestore transaction.
//changes := tx.Txn.Changes()
// TODO: publish changes
// TODO: publish changes: changes := tx.Txn.Changes()
tx.Txn.Commit()
return nil

View File

@ -137,8 +137,7 @@ func (s *Store) PreparedQuerySet(idx uint64, query *structs.PreparedQuery) error
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// preparedQuerySetTxn is the inner method used to insert a prepared query with
@ -254,8 +253,7 @@ func (s *Store) PreparedQueryDelete(idx uint64, queryID string) error {
return fmt.Errorf("failed prepared query delete: %s", err)
}
tx.Commit()
return nil
return tx.Commit()
}
// preparedQueryDeleteTxn is the inner method used to delete a prepared query

View File

@ -170,8 +170,7 @@ func (s *Store) SessionCreate(idx uint64, sess *structs.Session) error {
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// sessionCreateTxn is the inner method used for creating session entries in
@ -297,8 +296,7 @@ func (s *Store) SessionDestroy(idx uint64, sessionID string, entMeta *structs.En
return err
}
tx.Commit()
return nil
return tx.Commit()
}
// deleteSessionTxn is the inner method, which is used to do the actual

View File

@ -221,8 +221,8 @@ func (s *Restore) Abort() {
// Commit commits the changes made by a restore. This or Abort should always be
// called.
func (s *Restore) Commit() {
s.tx.Commit()
func (s *Restore) Commit() error {
return s.tx.Commit()
}
// AbandonCh returns a channel you can wait on to know if the state store was

View File

@ -300,7 +300,7 @@ func TestStateStore_indexUpdateMaxTxn(t *testing.T) {
if err := indexUpdateMaxTxn(tx, 3, "nodes"); err != nil {
t.Fatalf("err: %s", err)
}
tx.Commit()
require.NoError(t, tx.Commit())
if max := s.maxIndex("nodes"); max != 3 {
t.Fatalf("bad max: %d", max)

View File

@ -390,7 +390,12 @@ func (s *Store) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults, struc
return nil, errors
}
tx.Commit()
err := tx.Commit()
if err != nil {
return nil, structs.TxnErrors{
{What: err.Error(), OpIndex: 0},
}
}
return results, nil
}