state: use tx.First instead of tx.FirstWatch
Where appropriate. After removing the helper function a bunch of these calls can be changed to tx.First.
This commit is contained in:
parent
49938bc472
commit
abbe5c3701
|
@ -141,7 +141,7 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b
|
|||
// node info above to make sure we actually need to update the service
|
||||
// definition in order to prevent useless churn if nothing has changed.
|
||||
if req.Service != nil {
|
||||
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: req.Service.EnterpriseMeta, Node: req.Node, Service: req.Service.ID})
|
||||
existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: req.Service.EnterpriseMeta, Node: req.Node, Service: req.Service.ID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -194,7 +194,7 @@ func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWi
|
|||
if strings.EqualFold(node.Node, enode.Node) && node.ID != enode.ID {
|
||||
// Look up the existing node's Serf health check to see if it's failed.
|
||||
// If it is, the node can be renamed.
|
||||
_, enodeCheck, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *structs.DefaultEnterpriseMeta(), Node: enode.Node, CheckID: string(structs.SerfCheckID)})
|
||||
enodeCheck, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *structs.DefaultEnterpriseMeta(), Node: enode.Node, CheckID: string(structs.SerfCheckID)})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Cannot get status of node %s: %s", enode.Node, err)
|
||||
}
|
||||
|
@ -602,7 +602,7 @@ var errCASCompareFailed = errors.New("compare-and-set: comparison failed")
|
|||
// Returns an error if the write didn't happen and nil if write was successful.
|
||||
func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.NodeService) error {
|
||||
// Retrieve the existing service.
|
||||
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
|
||||
existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -627,7 +627,7 @@ func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.Node
|
|||
// existing memdb transaction.
|
||||
func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error {
|
||||
// Check for existing service
|
||||
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
|
||||
existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -1148,7 +1148,7 @@ func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.
|
|||
}
|
||||
|
||||
// Query the service
|
||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
|
||||
service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
|
||||
}
|
||||
|
@ -1321,8 +1321,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
|||
entMeta = structs.DefaultEnterpriseMeta()
|
||||
}
|
||||
|
||||
// Look up the service.
|
||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
|
||||
service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -1471,7 +1470,7 @@ func (s *Store) ensureCheckCASTxn(tx WriteTxn, idx uint64, hc *structs.HealthChe
|
|||
// checks with no matching node or service.
|
||||
func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error {
|
||||
// Check if we have an existing health check
|
||||
_, existing, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, CheckID: string(hc.CheckID)})
|
||||
existing, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, CheckID: string(hc.CheckID)})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed health check lookup: %s", err)
|
||||
}
|
||||
|
@ -1503,7 +1502,7 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc
|
|||
// If the check is associated with a service, check that we have
|
||||
// a registration for the service.
|
||||
if hc.ServiceID != "" {
|
||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, Service: hc.ServiceID})
|
||||
service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, Service: hc.ServiceID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -1584,7 +1583,7 @@ func getNodeCheckTxn(tx ReadTxn, nodeName string, checkID types.CheckID, entMeta
|
|||
}
|
||||
|
||||
// Return the check.
|
||||
_, check, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: nodeName, CheckID: string(checkID)})
|
||||
check, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: nodeName, CheckID: string(checkID)})
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
|
||||
}
|
||||
|
@ -1810,7 +1809,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ
|
|||
}
|
||||
|
||||
// Try to retrieve the existing health check.
|
||||
_, hc, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: node, CheckID: string(checkID)})
|
||||
hc, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: node, CheckID: string(checkID)})
|
||||
if err != nil {
|
||||
return fmt.Errorf("check lookup failed: %s", err)
|
||||
}
|
||||
|
@ -1825,7 +1824,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ
|
|||
return err
|
||||
}
|
||||
|
||||
_, svcRaw, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: existing.EnterpriseMeta, Node: existing.Node, Service: existing.ServiceID})
|
||||
svcRaw, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: existing.EnterpriseMeta, Node: existing.Node, Service: existing.ServiceID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed retrieving service from state store: %v", err)
|
||||
}
|
||||
|
|
|
@ -2067,7 +2067,7 @@ func TestStateStore_DeleteService(t *testing.T) {
|
|||
// that it actually is removed in the state store.
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
_, check, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{Node: "node1", CheckID: "check1"})
|
||||
check, err := tx.First(tableChecks, indexID, NodeCheckID{Node: "node1", CheckID: "check1"})
|
||||
if err != nil || check != nil {
|
||||
t.Fatalf("bad: %#v (err: %s)", check, err)
|
||||
}
|
||||
|
|
|
@ -105,7 +105,7 @@ func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, s
|
|||
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
|
||||
service, err := tx.First(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -138,7 +138,7 @@ func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serv
|
|||
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
|
||||
service, err := tx.First(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -163,7 +163,7 @@ func testRegisterCheck(t *testing.T, s *Store, idx uint64,
|
|||
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
_, c, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{Node: nodeID, CheckID: string(checkID)})
|
||||
c, err := tx.First(tableChecks, indexID, NodeCheckID{Node: nodeID, CheckID: string(checkID)})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue