From 78c76f07739764c70db971f8be28b9e69fb59ea0 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 2 Jun 2020 16:34:56 -0400 Subject: [PATCH] Handle return value from txn.Commit --- agent/consul/fsm/fsm.go | 4 +- agent/consul/state/acl.go | 63 +++++++++----------------- agent/consul/state/autopilot.go | 7 ++- agent/consul/state/catalog.go | 21 +++------ agent/consul/state/catalog_test.go | 12 ++--- agent/consul/state/config_entry.go | 10 ++-- agent/consul/state/connect_ca.go | 25 ++++------ agent/consul/state/coordinate.go | 3 +- agent/consul/state/coordinate_test.go | 2 +- agent/consul/state/federation_state.go | 12 ++--- agent/consul/state/graveyard_test.go | 12 +++-- agent/consul/state/intention.go | 6 +-- agent/consul/state/kvs.go | 28 +++++------- agent/consul/state/memdb_wrapper.go | 4 +- agent/consul/state/prepared_query.go | 6 +-- agent/consul/state/session.go | 6 +-- agent/consul/state/state_store.go | 4 +- agent/consul/state/state_store_test.go | 2 +- agent/consul/state/txn.go | 7 ++- 19 files changed, 95 insertions(+), 139 deletions(-) diff --git a/agent/consul/fsm/fsm.go b/agent/consul/fsm/fsm.go index 7b2b7b7b0..85abe1928 100644 --- a/agent/consul/fsm/fsm.go +++ b/agent/consul/fsm/fsm.go @@ -202,7 +202,9 @@ func (c *FSM) Restore(old io.ReadCloser) error { return fmt.Errorf("Unrecognized msg type %d", msg) } } - restore.Commit() + if err := restore.Commit(); err != nil { + return err + } // External code might be calling State(), so we need to synchronize // here to make sure we swap in the new state store atomically. diff --git a/agent/consul/state/acl.go b/agent/consul/state/acl.go index d67a16e25..0757cbbab 100644 --- a/agent/consul/state/acl.go +++ b/agent/consul/state/acl.go @@ -310,8 +310,7 @@ func (s *Store) ACLBootstrap(idx, resetIndex uint64, token *structs.ACLToken, le if err := tx.Insert("index", &IndexEntry{"acl-token-bootstrap", idx}); err != nil { return fmt.Errorf("failed to mark ACL bootstrapping as complete: %v", err) } - tx.Commit() - return nil + return tx.Commit() } // CanBootstrapACLToken checks if bootstrapping is possible and returns the reset index @@ -636,8 +635,7 @@ func (s *Store) ACLTokenSet(idx uint64, token *structs.ACLToken, legacy bool) er return err } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) ACLTokenBatchSet(idx uint64, tokens structs.ACLTokens, cas, allowMissingPolicyAndRoleIDs, prohibitUnprivileged bool) error { @@ -650,8 +648,7 @@ func (s *Store) ACLTokenBatchSet(idx uint64, tokens structs.ACLTokens, cas, allo } } - tx.Commit() - return nil + return tx.Commit() } // aclTokenSetTxn is the inner method used to insert an ACL token with the @@ -1030,8 +1027,7 @@ func (s *Store) ACLTokenBatchDelete(idx uint64, tokenIDs []string) error { } } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) aclTokenDelete(idx uint64, value, index string, entMeta *structs.EnterpriseMeta) error { @@ -1042,8 +1038,7 @@ func (s *Store) aclTokenDelete(idx uint64, value, index string, entMeta *structs return err } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) aclTokenDeleteTxn(tx *txnWrapper, idx uint64, value, index string, entMeta *structs.EnterpriseMeta) error { @@ -1099,8 +1094,7 @@ func (s *Store) ACLPolicyBatchSet(idx uint64, policies structs.ACLPolicies) erro } } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) ACLPolicySet(idx uint64, policy *structs.ACLPolicy) error { @@ -1111,8 +1105,7 @@ func (s *Store) ACLPolicySet(idx uint64, policy *structs.ACLPolicy) error { return err } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) aclPolicySetTxn(tx *txnWrapper, idx uint64, policy *structs.ACLPolicy) error { @@ -1277,8 +1270,7 @@ func (s *Store) ACLPolicyBatchDelete(idx uint64, policyIDs []string) error { return err } } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) aclPolicyDelete(idx uint64, value string, fn aclPolicyGetFn, entMeta *structs.EnterpriseMeta) error { @@ -1289,8 +1281,7 @@ func (s *Store) aclPolicyDelete(idx uint64, value string, fn aclPolicyGetFn, ent return err } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) aclPolicyDeleteTxn(tx *txnWrapper, idx uint64, value string, fn aclPolicyGetFn, entMeta *structs.EnterpriseMeta) error { @@ -1323,8 +1314,7 @@ func (s *Store) ACLRoleBatchSet(idx uint64, roles structs.ACLRoles, allowMissing } } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) ACLRoleSet(idx uint64, role *structs.ACLRole) error { @@ -1335,8 +1325,7 @@ func (s *Store) ACLRoleSet(idx uint64, role *structs.ACLRole) error { return err } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) aclRoleSetTxn(tx *txnWrapper, idx uint64, role *structs.ACLRole, allowMissing bool) error { @@ -1518,8 +1507,7 @@ func (s *Store) ACLRoleBatchDelete(idx uint64, roleIDs []string) error { return err } } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) aclRoleDelete(idx uint64, value string, fn aclRoleGetFn, entMeta *structs.EnterpriseMeta) error { @@ -1530,8 +1518,7 @@ func (s *Store) aclRoleDelete(idx uint64, value string, fn aclRoleGetFn, entMeta return err } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) aclRoleDeleteTxn(tx *txnWrapper, idx uint64, value string, fn aclRoleGetFn, entMeta *structs.EnterpriseMeta) error { @@ -1560,8 +1547,7 @@ func (s *Store) ACLBindingRuleBatchSet(idx uint64, rules structs.ACLBindingRules } } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) ACLBindingRuleSet(idx uint64, rule *structs.ACLBindingRule) error { @@ -1571,8 +1557,7 @@ func (s *Store) ACLBindingRuleSet(idx uint64, rule *structs.ACLBindingRule) erro if err := s.aclBindingRuleSetTxn(tx, idx, rule); err != nil { return err } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) aclBindingRuleSetTxn(tx *txnWrapper, idx uint64, rule *structs.ACLBindingRule) error { @@ -1677,8 +1662,7 @@ func (s *Store) ACLBindingRuleBatchDelete(idx uint64, bindingRuleIDs []string) e for _, bindingRuleID := range bindingRuleIDs { s.aclBindingRuleDeleteTxn(tx, idx, bindingRuleID, nil) } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) aclBindingRuleDelete(idx uint64, id string, entMeta *structs.EnterpriseMeta) error { @@ -1689,8 +1673,7 @@ func (s *Store) aclBindingRuleDelete(idx uint64, id string, entMeta *structs.Ent return err } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) aclBindingRuleDeleteTxn(tx *txnWrapper, idx uint64, id string, entMeta *structs.EnterpriseMeta) error { @@ -1748,8 +1731,7 @@ func (s *Store) ACLAuthMethodBatchSet(idx uint64, methods structs.ACLAuthMethods return err } } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) ACLAuthMethodSet(idx uint64, method *structs.ACLAuthMethod) error { @@ -1760,8 +1742,7 @@ func (s *Store) ACLAuthMethodSet(idx uint64, method *structs.ACLAuthMethod) erro return err } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) aclAuthMethodSetTxn(tx *txnWrapper, idx uint64, method *structs.ACLAuthMethod) error { @@ -1865,8 +1846,7 @@ func (s *Store) ACLAuthMethodBatchDelete(idx uint64, names []string, entMeta *st s.aclAuthMethodDeleteTxn(tx, idx, name, entMeta) } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) aclAuthMethodDelete(idx uint64, name string, entMeta *structs.EnterpriseMeta) error { @@ -1877,8 +1857,7 @@ func (s *Store) aclAuthMethodDelete(idx uint64, name string, entMeta *structs.En return err } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) aclAuthMethodDeleteTxn(tx *txnWrapper, idx uint64, name string, entMeta *structs.EnterpriseMeta) error { diff --git a/agent/consul/state/autopilot.go b/agent/consul/state/autopilot.go index 02ddc6c4d..c4c877918 100644 --- a/agent/consul/state/autopilot.go +++ b/agent/consul/state/autopilot.go @@ -81,8 +81,7 @@ func (s *Store) AutopilotSetConfig(idx uint64, config *autopilot.Config) error { return err } - tx.Commit() - return nil + return tx.Commit() } // AutopilotCASConfig is used to try updating the Autopilot configuration with a @@ -110,8 +109,8 @@ func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *autopilot.Config) ( return false, err } - tx.Commit() - return true, nil + err = tx.Commit() + return err == nil, err } func (s *Store) autopilotSetConfigTxn(idx uint64, tx *txnWrapper, config *autopilot.Config) error { diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index b8275b25d..f8249dd37 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -233,8 +233,7 @@ func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) err return err } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) ensureCheckIfNodeMatches(tx *txnWrapper, idx uint64, node string, check *structs.HealthCheck) error { @@ -324,8 +323,7 @@ func (s *Store) EnsureNode(idx uint64, node *structs.Node) error { return err } - tx.Commit() - return nil + return tx.Commit() } // ensureNoNodeWithSimilarNameTxn checks that no other node has conflict in its name @@ -599,8 +597,7 @@ func (s *Store) DeleteNode(idx uint64, nodeName string) error { return err } - tx.Commit() - return nil + return tx.Commit() } // deleteNodeCASTxn is used to try doing a node delete operation with a given @@ -733,8 +730,7 @@ func (s *Store) EnsureService(idx uint64, node string, svc *structs.NodeService) return err } - tx.Commit() - return nil + return tx.Commit() } var errCASCompareFailed = errors.New("compare-and-set: comparison failed") @@ -1403,8 +1399,7 @@ func (s *Store) DeleteService(idx uint64, nodeName, serviceID string, entMeta *s return err } - tx.Commit() - return nil + return tx.Commit() } // deleteServiceCASTxn is used to try doing a service delete operation with a given @@ -1541,8 +1536,7 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { return err } - tx.Commit() - return nil + return tx.Commit() } // updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node @@ -1879,8 +1873,7 @@ func (s *Store) DeleteCheck(idx uint64, node string, checkID types.CheckID, entM return err } - tx.Commit() - return nil + return tx.Commit() } // deleteCheckCASTxn is used to try doing a check delete operation with a given diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index e9054612c..7faf72471 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -4385,7 +4385,7 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) { tx := s.db.WriteTxnRestore() err := s.ensureServiceCASTxn(tx, 3, "node1", &ns) require.Equal(t, err, errCASCompareFailed) - tx.Commit() + require.NoError(t, tx.Commit()) // ensure no update happened tx = s.db.Txn(false) @@ -4393,14 +4393,14 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) { require.NoError(t, err) require.NotNil(t, nsRead) require.Equal(t, uint64(2), nsRead.ModifyIndex) - tx.Commit() + require.NoError(t, tx.Commit()) ns.ModifyIndex = 99 // attempt to update with a non-matching index tx = s.db.WriteTxnRestore() err = s.ensureServiceCASTxn(tx, 4, "node1", &ns) require.Equal(t, err, errCASCompareFailed) - tx.Commit() + require.NoError(t, tx.Commit()) // ensure no update happened tx = s.db.Txn(false) @@ -4408,14 +4408,14 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) { require.NoError(t, err) require.NotNil(t, nsRead) require.Equal(t, uint64(2), nsRead.ModifyIndex) - tx.Commit() + require.NoError(t, tx.Commit()) ns.ModifyIndex = 2 // update with the matching modify index tx = s.db.WriteTxnRestore() err = s.ensureServiceCASTxn(tx, 7, "node1", &ns) require.NoError(t, err) - tx.Commit() + require.NoError(t, tx.Commit()) // ensure the update happened tx = s.db.Txn(false) @@ -4423,7 +4423,7 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) { require.NoError(t, err) require.NotNil(t, nsRead) require.Equal(t, uint64(7), nsRead.ModifyIndex) - tx.Commit() + require.NoError(t, tx.Commit()) } func TestStateStore_GatewayServices_Terminating(t *testing.T) { diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index ea06bbd43..310ece95e 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -174,8 +174,7 @@ func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry, entMeta return err } - tx.Commit() - return nil + return tx.Commit() } // ensureConfigEntryTxn upserts a config entry inside of a transaction. @@ -246,8 +245,8 @@ func (s *Store) EnsureConfigEntryCAS(idx, cidx uint64, conf structs.ConfigEntry, return false, err } - tx.Commit() - return true, nil + err = tx.Commit() + return err == nil, err } func (s *Store) DeleteConfigEntry(idx uint64, kind, name string, entMeta *structs.EnterpriseMeta) error { @@ -294,8 +293,7 @@ func (s *Store) DeleteConfigEntry(idx uint64, kind, name string, entMeta *struct return fmt.Errorf("failed updating index: %s", err) } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) insertConfigEntryWithTxn(tx *txnWrapper, idx uint64, conf structs.ConfigEntry) error { diff --git a/agent/consul/state/connect_ca.go b/agent/consul/state/connect_ca.go index 5c39b6b91..1453db886 100644 --- a/agent/consul/state/connect_ca.go +++ b/agent/consul/state/connect_ca.go @@ -142,8 +142,7 @@ func (s *Store) CASetConfig(idx uint64, config *structs.CAConfiguration) error { return err } - tx.Commit() - return nil + return tx.Commit() } // CACheckAndSetConfig is used to try updating the CA configuration with a @@ -171,8 +170,8 @@ func (s *Store) CACheckAndSetConfig(idx, cidx uint64, config *structs.CAConfigur return false, err } - tx.Commit() - return true, nil + err = tx.Commit() + return err == nil, err } func (s *Store) caSetConfigTxn(idx uint64, tx *txnWrapper, config *structs.CAConfiguration) error { @@ -336,8 +335,8 @@ func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, erro return false, fmt.Errorf("failed updating index: %s", err) } - tx.Commit() - return true, nil + err = tx.Commit() + return err == nil, err } // CAProviderState is used to pull the built-in provider states from the snapshot. @@ -417,9 +416,8 @@ func (s *Store) CASetProviderState(idx uint64, state *structs.CAConsulProviderSt return false, fmt.Errorf("failed updating index: %s", err) } - tx.Commit() - - return true, nil + err = tx.Commit() + return err == nil, err } // CADeleteProviderState is used to remove the built-in Consul CA provider @@ -447,9 +445,7 @@ func (s *Store) CADeleteProviderState(idx uint64, id string) error { return fmt.Errorf("failed updating index: %s", err) } - tx.Commit() - - return nil + return tx.Commit() } func (s *Store) CALeafSetIndex(idx uint64, index uint64) error { @@ -504,7 +500,6 @@ func (s *Store) CAIncrementProviderSerialNumber(idx uint64) (uint64, error) { return 0, fmt.Errorf("failed updating index: %s", err) } - tx.Commit() - - return next, nil + err = tx.Commit() + return next, err } diff --git a/agent/consul/state/coordinate.go b/agent/consul/state/coordinate.go index a7b806532..dc9958ab6 100644 --- a/agent/consul/state/coordinate.go +++ b/agent/consul/state/coordinate.go @@ -167,6 +167,5 @@ func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) e return fmt.Errorf("failed updating index: %s", err) } - tx.Commit() - return nil + return tx.Commit() } diff --git a/agent/consul/state/coordinate_test.go b/agent/consul/state/coordinate_test.go index d8f98d6ae..e4d27ccb2 100644 --- a/agent/consul/state/coordinate_test.go +++ b/agent/consul/state/coordinate_test.go @@ -274,7 +274,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { if err := tx.Insert("coordinates", badUpdate); err != nil { t.Fatalf("err: %v", err) } - tx.Commit() + require.NoError(t, tx.Commit()) // Snapshot the coordinates. snap := s.Snapshot() diff --git a/agent/consul/state/federation_state.go b/agent/consul/state/federation_state.go index b5fe7d103..cd770f582 100644 --- a/agent/consul/state/federation_state.go +++ b/agent/consul/state/federation_state.go @@ -68,8 +68,7 @@ func (s *Store) FederationStateBatchSet(idx uint64, configs structs.FederationSt } } - tx.Commit() - return nil + return tx.Commit() } // FederationStateSet is called to do an upsert of a given federation state. @@ -81,8 +80,7 @@ func (s *Store) FederationStateSet(idx uint64, config *structs.FederationState) return err } - tx.Commit() - return nil + return tx.Commit() } // federationStateSetTxn upserts a federation state inside of a transaction. @@ -191,8 +189,7 @@ func (s *Store) FederationStateDelete(idx uint64, datacenter string) error { return err } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) FederationStateBatchDelete(idx uint64, datacenters []string) error { @@ -205,8 +202,7 @@ func (s *Store) FederationStateBatchDelete(idx uint64, datacenters []string) err } } - tx.Commit() - return nil + return tx.Commit() } func (s *Store) federationStateDeleteTxn(tx *txnWrapper, idx uint64, datacenter string) error { diff --git a/agent/consul/state/graveyard_test.go b/agent/consul/state/graveyard_test.go index 285b63bb7..7b7d533d2 100644 --- a/agent/consul/state/graveyard_test.go +++ b/agent/consul/state/graveyard_test.go @@ -3,6 +3,8 @@ package state import ( "testing" "time" + + "github.com/stretchr/testify/require" ) func TestGraveyard_Lifecycle(t *testing.T) { @@ -29,7 +31,7 @@ func TestGraveyard_Lifecycle(t *testing.T) { if err := g.InsertTxn(tx, "some/other/path", 9, nil); err != nil { t.Fatalf("err: %s", err) } - tx.Commit() + require.NoError(t, tx.Commit()) }() // Check some prefixes. @@ -68,7 +70,7 @@ func TestGraveyard_Lifecycle(t *testing.T) { if err := g.ReapTxn(tx, 6); err != nil { t.Fatalf("err: %s", err) } - tx.Commit() + require.NoError(t, tx.Commit()) }() // Check prefixes to see that the reap took effect at the right index. @@ -142,7 +144,7 @@ func TestGraveyard_GC_Trigger(t *testing.T) { if err := g.InsertTxn(tx, "foo/in/the/house", 2, nil); err != nil { t.Fatalf("err: %s", err) } - tx.Commit() + require.NoError(t, tx.Commit()) }() // Make sure the GC got hinted. @@ -185,7 +187,7 @@ func TestGraveyard_Snapshot_Restore(t *testing.T) { if err := g.InsertTxn(tx, "some/other/path", 9, nil); err != nil { t.Fatalf("err: %s", err) } - tx.Commit() + require.NoError(t, tx.Commit()) }() // Verify the index was set correctly. @@ -240,7 +242,7 @@ func TestGraveyard_Snapshot_Restore(t *testing.T) { t.Fatalf("err: %s", err) } } - tx.Commit() + require.NoError(t, tx.Commit()) }() // Verify that the restore works. diff --git a/agent/consul/state/intention.go b/agent/consul/state/intention.go index f812dee2b..71219f0d8 100644 --- a/agent/consul/state/intention.go +++ b/agent/consul/state/intention.go @@ -164,8 +164,7 @@ func (s *Store) IntentionSet(idx uint64, ixn *structs.Intention) error { return err } - tx.Commit() - return nil + return tx.Commit() } // intentionSetTxn is the inner method used to insert an intention with @@ -260,8 +259,7 @@ func (s *Store) IntentionDelete(idx uint64, id string) error { return fmt.Errorf("failed intention delete: %s", err) } - tx.Commit() - return nil + return tx.Commit() } // intentionDeleteTxn is the inner method used to delete a intention diff --git a/agent/consul/state/kvs.go b/agent/consul/state/kvs.go index 55d15c4cb..0ca71c120 100644 --- a/agent/consul/state/kvs.go +++ b/agent/consul/state/kvs.go @@ -96,8 +96,7 @@ func (s *Store) ReapTombstones(idx uint64, index uint64) error { return fmt.Errorf("failed to reap kvs tombstones: %s", err) } - tx.Commit() - return nil + return tx.Commit() } // KVSSet is used to store a key/value pair. @@ -110,8 +109,7 @@ func (s *Store) KVSSet(idx uint64, entry *structs.DirEntry) error { return err } - tx.Commit() - return nil + return tx.Commit() } // kvsSetTxn is used to insert or update a key/value pair in the state @@ -249,8 +247,7 @@ func (s *Store) KVSDelete(idx uint64, key string, entMeta *structs.EnterpriseMet return err } - tx.Commit() - return nil + return tx.Commit() } // kvsDeleteTxn is the inner method used to perform the actual deletion @@ -286,8 +283,8 @@ func (s *Store) KVSDeleteCAS(idx, cidx uint64, key string, entMeta *structs.Ente return false, err } - tx.Commit() - return true, nil + err = tx.Commit() + return err == nil, err } // kvsDeleteCASTxn is the inner method that does a CAS delete within an existing @@ -327,8 +324,8 @@ func (s *Store) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) { return false, err } - tx.Commit() - return true, nil + err = tx.Commit() + return err == nil, err } // kvsSetCASTxn is the inner method used to do a CAS inside an existing @@ -371,8 +368,7 @@ func (s *Store) KVSDeleteTree(idx uint64, prefix string, entMeta *structs.Enterp return err } - tx.Commit() - return nil + return tx.Commit() } // KVSLockDelay returns the expiration time for any lock delay associated with @@ -392,8 +388,8 @@ func (s *Store) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) { return false, err } - tx.Commit() - return true, nil + err = tx.Commit() + return err == nil, err } // kvsLockTxn is the inner method that does a lock inside an existing @@ -458,8 +454,8 @@ func (s *Store) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) { return false, err } - tx.Commit() - return true, nil + err = tx.Commit() + return err == nil, err } // kvsUnlockTxn is the inner method that does an unlock inside an existing diff --git a/agent/consul/state/memdb_wrapper.go b/agent/consul/state/memdb_wrapper.go index cfa063641..a3c0dae41 100644 --- a/agent/consul/state/memdb_wrapper.go +++ b/agent/consul/state/memdb_wrapper.go @@ -85,11 +85,9 @@ type txnWrapper struct { // Note that this function, unlike memdb.Txn, returns an error which must be checked // by the caller. A non-nil error indicates that a commit failed and was not // applied. -// TODO: currently none of the callers check error, should they all be changed? func (tx *txnWrapper) Commit() error { // changes may be empty if this is a read-only or WriteTxnRestore transaction. - //changes := tx.Txn.Changes() - // TODO: publish changes + // TODO: publish changes: changes := tx.Txn.Changes() tx.Txn.Commit() return nil diff --git a/agent/consul/state/prepared_query.go b/agent/consul/state/prepared_query.go index 3b3c240f2..0c226e22b 100644 --- a/agent/consul/state/prepared_query.go +++ b/agent/consul/state/prepared_query.go @@ -137,8 +137,7 @@ func (s *Store) PreparedQuerySet(idx uint64, query *structs.PreparedQuery) error return err } - tx.Commit() - return nil + return tx.Commit() } // preparedQuerySetTxn is the inner method used to insert a prepared query with @@ -254,8 +253,7 @@ func (s *Store) PreparedQueryDelete(idx uint64, queryID string) error { return fmt.Errorf("failed prepared query delete: %s", err) } - tx.Commit() - return nil + return tx.Commit() } // preparedQueryDeleteTxn is the inner method used to delete a prepared query diff --git a/agent/consul/state/session.go b/agent/consul/state/session.go index 991ee96b7..7181d608d 100644 --- a/agent/consul/state/session.go +++ b/agent/consul/state/session.go @@ -170,8 +170,7 @@ func (s *Store) SessionCreate(idx uint64, sess *structs.Session) error { return err } - tx.Commit() - return nil + return tx.Commit() } // sessionCreateTxn is the inner method used for creating session entries in @@ -297,8 +296,7 @@ func (s *Store) SessionDestroy(idx uint64, sessionID string, entMeta *structs.En return err } - tx.Commit() - return nil + return tx.Commit() } // deleteSessionTxn is the inner method, which is used to do the actual diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index 758b67bc3..f796fb32c 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -221,8 +221,8 @@ func (s *Restore) Abort() { // Commit commits the changes made by a restore. This or Abort should always be // called. -func (s *Restore) Commit() { - s.tx.Commit() +func (s *Restore) Commit() error { + return s.tx.Commit() } // AbandonCh returns a channel you can wait on to know if the state store was diff --git a/agent/consul/state/state_store_test.go b/agent/consul/state/state_store_test.go index 0288ec1b4..2c418af1c 100644 --- a/agent/consul/state/state_store_test.go +++ b/agent/consul/state/state_store_test.go @@ -300,7 +300,7 @@ func TestStateStore_indexUpdateMaxTxn(t *testing.T) { if err := indexUpdateMaxTxn(tx, 3, "nodes"); err != nil { t.Fatalf("err: %s", err) } - tx.Commit() + require.NoError(t, tx.Commit()) if max := s.maxIndex("nodes"); max != 3 { t.Fatalf("bad max: %d", max) diff --git a/agent/consul/state/txn.go b/agent/consul/state/txn.go index be2602fef..8fe84b3e0 100644 --- a/agent/consul/state/txn.go +++ b/agent/consul/state/txn.go @@ -390,7 +390,12 @@ func (s *Store) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults, struc return nil, errors } - tx.Commit() + err := tx.Commit() + if err != nil { + return nil, structs.TxnErrors{ + {What: err.Error(), OpIndex: 0}, + } + } return results, nil }