diff --git a/consul/state_store.go b/consul/state_store.go index 6a43aa027..80870ac5a 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -377,13 +377,20 @@ func (s *StateStore) QueryTables(q string) MDBTables { // EnsureNode is used to ensure a given node exists, with the provided address func (s *StateStore) EnsureNode(index uint64, node structs.Node) error { - // Start a new txn tx, err := s.nodeTable.StartTxn(false, nil) if err != nil { return err } defer tx.Abort() + if err := s.ensureNodeTxn(index, node, tx); err != nil { + return err + } + return tx.Commit() +} +// ensureNodeTxn is used to ensure a given node exists, with the provided address +// within a given txn +func (s *StateStore) ensureNodeTxn(index uint64, node structs.Node, tx *MDBTxn) error { if err := s.nodeTable.InsertTxn(tx, node); err != nil { return err } @@ -391,7 +398,7 @@ func (s *StateStore) EnsureNode(index uint64, node structs.Node) error { return err } tx.Defer(func() { s.watch[s.nodeTable].Notify() }) - return tx.Commit() + return nil } // GetNode returns all the address of the known and if it was found @@ -428,7 +435,14 @@ func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeSe panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() + if err := s.ensureServiceTxn(index, node, ns, tx); err != nil { + return nil + } + return tx.Commit() +} +// ensureServiceTxn is used to ensure a given node exposes a service in a transaction +func (s *StateStore) ensureServiceTxn(index uint64, node string, ns *structs.NodeService, tx *MDBTxn) error { // Ensure the node exists res, err := s.nodeTable.GetTxn(tx, "id", node) if err != nil { @@ -455,7 +469,7 @@ func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeSe return err } tx.Defer(func() { s.watch[s.serviceTable].Notify() }) - return tx.Commit() + return nil } // NodeServices is used to return all the services of a given node @@ -699,17 +713,23 @@ func (s *StateStore) parseServiceNodes(tx *MDBTxn, table *MDBTable, res []interf // EnsureCheck is used to create a check or updates it's state func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error { - // Ensure we have a status - if check.Status == "" { - check.Status = structs.HealthUnknown - } - - // Start the txn tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() + if err := s.ensureCheckTxn(index, check, tx); err != nil { + return err + } + return tx.Commit() +} + +// ensureCheckTxn is used to create a check or updates it's state in a transaction +func (s *StateStore) ensureCheckTxn(index uint64, check *structs.HealthCheck, tx *MDBTxn) error { + // Ensure we have a status + if check.Status == "" { + check.Status = structs.HealthUnknown + } // Ensure the node exists res, err := s.nodeTable.GetTxn(tx, "id", check.Node) @@ -750,7 +770,7 @@ func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error return err } tx.Defer(func() { s.watch[s.checkTable].Notify() }) - return tx.Commit() + return nil } // DeleteNodeCheck is used to delete a node health check