remove node events for state track changing pr

remove Txn and update calls with ReadTxn()

constructor for changetrackerdb
This commit is contained in:
Drew Bailey 2020-09-04 09:53:36 -04:00
parent d5f6d3b3c5
commit 28aa0387e9
No known key found for this signature in database
GPG Key ID: FBA61B9FB7CCE1A7
6 changed files with 102 additions and 434 deletions

View File

@ -27,7 +27,7 @@ func autopilotConfigTableSchema() *memdb.TableSchema {
// AutopilotConfig is used to get the current Autopilot configuration.
func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
tx := s.db.Txn(false)
tx := s.db.ReadTxn()
defer tx.Abort()
// Get the autopilot config

View File

@ -1,175 +0,0 @@
package state
import (
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/event"
"github.com/hashicorp/nomad/nomad/structs"
)
// NodeEvent represents a NodeEvent change on a given Node.
type NodeEvent struct {
Message string
NodeID string
}
// NNodeDrainEvent holds information related to a Node Drain
type NodeDrainEvent struct {
NodeID string
Allocs []string
DrainStrategy structs.DrainStrategy
Message string
}
func (s *StateStore) NodeEventsFromChanges(tx ReadTxn, changes Changes) ([]event.Event, error) {
var events []event.Event
var nodeChanges map[string]*memdb.Change
markNode := func(node string, nodeChange *memdb.Change) {
if nodeChanges == nil {
nodeChanges = make(map[string]*memdb.Change)
}
ch := nodeChanges[node]
if ch == nil {
nodeChanges[node] = nodeChange
}
}
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
nRaw := change.After
if change.After == nil {
nRaw = change.Before
}
n := nRaw.(*structs.Node)
changeCopy := change
markNode(n.ID, &changeCopy)
}
}
for node, change := range nodeChanges {
if change != nil && change.Deleted() {
// TODO Node delete event
continue
}
ne, err := s.statusEventsForNode(tx, node, change)
if err != nil {
return nil, err
}
// Rebuild node node events
events = append(events, ne...)
}
return events, nil
}
func (s *StateStore) statusEventsForNode(tx ReadTxn, node string, change *memdb.Change) ([]event.Event, error) {
events := []event.Event{}
if change.Created() {
n := change.After.(*structs.Node)
for _, e := range n.Events {
nodeEvent := NodeEvent{Message: e.Message, NodeID: node}
e := event.Event{
Topic: "NodeEvent",
Key: node,
Payload: nodeEvent,
}
events = append(events, e)
}
} else if change.Updated() {
nbefore := change.Before.(*structs.Node)
nafter := change.After.(*structs.Node)
newEvents := s.newNodeEvents(nbefore.Events, nafter.Events)
for _, e := range newEvents {
if s.isNodeDrainEvent(nbefore, nafter, newEvents) {
allocs, err := s.AllocsByNodeTx(tx, node)
if err != nil {
return []event.Event{}, err
}
var allocIDs []string
for _, a := range allocs {
allocIDs = append(allocIDs, a.ID)
}
nde := NodeDrainEvent{
NodeID: node,
DrainStrategy: *nafter.DrainStrategy,
Allocs: allocIDs,
Message: e.Message,
}
e := event.Event{
Topic: "NodeEvent",
Key: node,
Payload: nde,
}
events = append(events, e)
} else {
ne := NodeEvent{
Message: e.Message,
NodeID: node,
}
e := event.Event{
Topic: "NodeEvent",
Key: node,
Payload: ne,
}
events = append(events, e)
}
}
}
return events, nil
}
func (s *StateStore) newNodeEvents(before, after []*structs.NodeEvent) []*structs.NodeEvent {
events := []*structs.NodeEvent{}
if len(before) == len(after) {
return nil
}
for _, e := range after {
found := false
for _, be := range before {
if e.String() == be.String() {
found = true
break
}
}
if !found {
events = append(events, e)
}
}
return events
}
func (s *StateStore) isNodeDrainEvent(before, after *structs.Node, newEvents []*structs.NodeEvent) bool {
if before.Drain != after.Drain {
return true
}
for _, e := range newEvents {
if e.Subsystem == structs.NodeEventSubsystemDrain {
return true
}
}
return false
}
func (s *StateStore) AllocsByNodeTx(tx ReadTxn, node string) ([]*structs.Allocation, error) {
iter, err := tx.Get("allocs", "node_prefix", node)
if err != nil {
return nil, err
}
var out []*structs.Allocation
for {
raw := iter.Next()
if raw == nil {
break
}
out = append(out, raw.(*structs.Allocation))
}
return out, nil
}

View File

@ -1,147 +0,0 @@
package state
import (
"testing"
"time"
"github.com/hashicorp/nomad/nomad/event"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func allocID() string { return "e5bcbac313d6c-e29b-11c4-a5cd-5949157" }
func mockNode(id string) *structs.Node {
node := mock.Node()
node.ID = id
return node
}
func TestNodeEventsFromChanges(t *testing.T) {
cases := []struct {
Name string
Setup func(s *StateStore, index uint64) error
Mutate func(s *StateStore, tx *txn) error
WantEvents []event.Event
WantErr bool
}{
{
Name: "new node registered",
Setup: func(s *StateStore, idx uint64) error {
req := mockNode("8218b700-7e26-aac0-06d8-ff3b15f44e94")
return s.UpsertNode(idx, req)
},
Mutate: func(s *StateStore, tx *txn) error {
event := &structs.NodeEvent{
Message: "Node ready foo",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
return s.updateNodeStatusTxn(tx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusReady, time.Now().UnixNano(), event)
},
WantEvents: []event.Event{
{
Topic: "NodeEvent",
Key: "8218b700-7e26-aac0-06d8-ff3b15f44e94",
Payload: NodeEvent{
Message: "Node ready foo",
NodeID: "8218b700-7e26-aac0-06d8-ff3b15f44e94",
},
},
},
},
{
Name: "only new events",
Setup: func(s *StateStore, idx uint64) error {
req := mockNode("8218b700-7e26-aac0-06d8-ff3b15f44e94")
require.NoError(t, s.UpsertNode(idx, req))
event := &structs.NodeEvent{
Message: "Node foo initializing",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
return s.UpdateNodeStatus(idx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusInit, time.Now().UnixNano(), event)
},
Mutate: func(s *StateStore, tx *txn) error {
event := &structs.NodeEvent{
Message: "Node foo ready",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
return s.updateNodeStatusTxn(tx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusReady, time.Now().UnixNano(), event)
},
WantEvents: []event.Event{
{
Topic: "NodeEvent",
Key: "8218b700-7e26-aac0-06d8-ff3b15f44e94",
Payload: NodeEvent{
Message: "Node foo ready",
NodeID: "8218b700-7e26-aac0-06d8-ff3b15f44e94",
},
},
},
},
{
Name: "node drain event",
Setup: func(s *StateStore, idx uint64) error {
req := mockNode("8218b700-7e26-aac0-06d8-ff3b15f44e94")
require.NoError(t, s.UpsertNode(idx, req))
event := &structs.NodeEvent{
Message: "Node foo initializing",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
require.NoError(t, s.UpdateNodeStatus(idx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusInit, time.Now().UnixNano(), event))
alloc := mock.Alloc()
alloc.NodeID = req.ID
alloc.ID = allocID()
return s.UpsertAllocs(idx, []*structs.Allocation{alloc})
},
Mutate: func(s *StateStore, tx *txn) error {
event := &structs.NodeEvent{
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
event.SetMessage("Node drain strategy set")
event.SetSubsystem(structs.NodeEventSubsystemDrain)
drain := &structs.DrainStrategy{}
return s.updateNodeDrainImpl(tx, tx.Index, "8218b700-7e26-aac0-06d8-ff3b15f44e94", drain, false, time.Now().UnixNano(), event)
},
WantEvents: []event.Event{
{
Topic: "NodeEvent",
Key: "8218b700-7e26-aac0-06d8-ff3b15f44e94",
Payload: NodeDrainEvent{
Message: "Node drain strategy set",
NodeID: "8218b700-7e26-aac0-06d8-ff3b15f44e94",
Allocs: []string{allocID()},
},
},
},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
s := testStateStore(t)
if tc.Setup != nil {
require.NoError(t, tc.Setup(s, 10))
}
tx := s.db.WriteTxn(100)
require.NoError(t, tc.Mutate(s, tx))
got, err := s.NodeEventsFromChanges(tx, Changes{Changes: tx.Changes(), Index: 100})
if tc.WantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.WantEvents, got)
})
}
}

View File

@ -32,6 +32,16 @@ type changeTrackerDB struct {
processChanges func(ReadTxn, Changes) ([]event.Event, error)
}
func NewChangeTrackerDB(db *memdb.MemDB, publisher eventPublisher, changesFn changeProcessor) *changeTrackerDB {
return &changeTrackerDB{
db: db,
publisher: event.NewPublisher(),
processChanges: changesFn,
}
}
type changeProcessor func(ReadTxn, Changes) ([]event.Event, error)
type eventPublisher interface {
Publish(events []event.Event)
}
@ -42,18 +52,6 @@ type noOpPublisher struct{}
func (n *noOpPublisher) Publish(events []event.Event) {}
func noOpProcessChanges(ReadTxn, Changes) ([]event.Event, error) { return []event.Event{}, nil }
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
// code may use it to create a read-only transaction, but it will panic if called
// with write=true.
//
// Deprecated: use either ReadTxn, or WriteTxn.
func (c *changeTrackerDB) Txn(write bool) *txn {
if write {
panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)")
}
return c.ReadTxn()
}
// ReadTxn returns a read-only transaction which behaves exactly the same as
// memdb.Txn
//

View File

@ -80,11 +80,7 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
config: config,
abandonCh: make(chan struct{}),
}
s.db = &changeTrackerDB{
db: db,
publisher: event.NewPublisher(),
processChanges: processDBChanges,
}
s.db = NewChangeTrackerDB(db, event.NewPublisher(), processDBChanges)
// Initialize the state store with required enterprise objects
if err := s.enterpriseInit(); err != nil {
@ -109,12 +105,9 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
logger: s.logger,
config: s.config,
}
// create a changeTrackerDB that doesn't process changes
store.db = &changeTrackerDB{
db: memDBSnap,
publisher: &noOpPublisher{},
processChanges: noOpProcessChanges,
}
store.db = NewChangeTrackerDB(memDBSnap, &noOpPublisher{}, noOpProcessChanges)
snap := &StateSnapshot{
StateStore: store,
}
@ -470,7 +463,7 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl
}
func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire deployments table
iter, err := txn.Get("deployment", "id")
@ -483,7 +476,7 @@ func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error
}
func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire deployments table
iter, err := txn.Get("deployment", "namespace", namespace)
@ -496,7 +489,7 @@ func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string)
}
func (s *StateStore) DeploymentsByIDPrefix(ws memdb.WatchSet, namespace, deploymentID string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire deployments table
iter, err := txn.Get("deployment", "id_prefix", deploymentID)
@ -525,7 +518,7 @@ func deploymentNamespaceFilter(namespace string) func(interface{}) bool {
}
func (s *StateStore) DeploymentByID(ws memdb.WatchSet, deploymentID string) (*structs.Deployment, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
return s.deploymentByIDImpl(ws, deploymentID, txn)
}
@ -544,7 +537,7 @@ func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string,
}
func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Deployment, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
var job *structs.Job
// Read job from state store
@ -587,7 +580,7 @@ func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID stri
// LatestDeploymentByJobID returns the latest deployment for the given job. The
// latest is determined strictly by CreateIndex.
func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID string) (*structs.Deployment, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Get an iterator over the deployments
iter, err := txn.Get("deployment", "job", namespace, jobID)
@ -700,7 +693,7 @@ func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventR
// ScalingEvents returns an iterator over all the job scaling events
func (s *StateStore) ScalingEvents(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire scaling_event table
iter, err := txn.Get("scaling_event", "id")
@ -714,7 +707,7 @@ func (s *StateStore) ScalingEvents(ws memdb.WatchSet) (memdb.ResultIterator, err
}
func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID string) (map[string][]*structs.ScalingEvent, uint64, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
watchCh, existing, err := txn.FirstWatch("scaling_event", "id", namespace, jobID)
if err != nil {
@ -1325,7 +1318,7 @@ func (s *StateStore) deleteJobFromPlugins(index uint64, txn *txn, job *structs.J
// NodeByID is used to lookup a node by ID
func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
watchCh, existing, err := txn.FirstWatch("nodes", "id", nodeID)
if err != nil {
@ -1341,7 +1334,7 @@ func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node,
// NodesByIDPrefix is used to lookup nodes by prefix
func (s *StateStore) NodesByIDPrefix(ws memdb.WatchSet, nodeID string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("nodes", "id_prefix", nodeID)
if err != nil {
@ -1354,7 +1347,7 @@ func (s *StateStore) NodesByIDPrefix(ws memdb.WatchSet, nodeID string) (memdb.Re
// NodeBySecretID is used to lookup a node by SecretID
func (s *StateStore) NodeBySecretID(ws memdb.WatchSet, secretID string) (*structs.Node, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
watchCh, existing, err := txn.FirstWatch("nodes", "secret_id", secretID)
if err != nil {
@ -1370,7 +1363,7 @@ func (s *StateStore) NodeBySecretID(ws memdb.WatchSet, secretID string) (*struct
// Nodes returns an iterator over all the nodes
func (s *StateStore) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire nodes table
iter, err := txn.Get("nodes", "id")
@ -1721,7 +1714,7 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn)
// JobByID is used to lookup a job by its ID. JobByID returns the current/latest job
// version.
func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
return s.JobByIDTxn(ws, namespace, id, txn)
}
@ -1742,7 +1735,7 @@ func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn
// JobsByIDPrefix is used to lookup a job by prefix
func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("jobs", "id_prefix", namespace, id)
if err != nil {
@ -1756,7 +1749,7 @@ func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (me
// JobVersionsByID returns all the tracked versions of a job.
func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
return s.jobVersionByID(txn, &ws, namespace, id)
}
@ -1802,7 +1795,7 @@ func (s *StateStore) jobVersionByID(txn *txn, ws *memdb.WatchSet, namespace, id
// JobByIDAndVersion returns the job identified by its ID and Version. The
// passed watchset may be nil.
func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, namespace, id string, version uint64) (*structs.Job, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
return s.jobByIDAndVersionImpl(ws, namespace, id, version, txn)
}
@ -1829,7 +1822,7 @@ func (s *StateStore) jobByIDAndVersionImpl(ws memdb.WatchSet, namespace, id stri
}
func (s *StateStore) JobVersions(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire deployments table
iter, err := txn.Get("job_version", "id")
@ -1843,7 +1836,7 @@ func (s *StateStore) JobVersions(ws memdb.WatchSet) (memdb.ResultIterator, error
// Jobs returns an iterator over all the jobs
func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire jobs table
iter, err := txn.Get("jobs", "id")
@ -1858,7 +1851,7 @@ func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) {
// JobsByNamespace returns an iterator over all the jobs for the given namespace
func (s *StateStore) JobsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
return s.jobsByNamespaceImpl(ws, namespace, txn)
}
@ -1877,7 +1870,7 @@ func (s *StateStore) jobsByNamespaceImpl(ws memdb.WatchSet, namespace string, tx
// JobsByPeriodic returns an iterator over all the periodic or non-periodic jobs.
func (s *StateStore) JobsByPeriodic(ws memdb.WatchSet, periodic bool) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("jobs", "periodic", periodic)
if err != nil {
@ -1892,7 +1885,7 @@ func (s *StateStore) JobsByPeriodic(ws memdb.WatchSet, periodic bool) (memdb.Res
// JobsByScheduler returns an iterator over all the jobs with the specific
// scheduler type.
func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Return an iterator for jobs with the specific type.
iter, err := txn.Get("jobs", "type", schedulerType)
@ -1908,7 +1901,7 @@ func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (m
// JobsByGC returns an iterator over all jobs eligible or uneligible for garbage
// collection.
func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("jobs", "gc", gc)
if err != nil {
@ -1922,7 +1915,7 @@ func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator,
// JobSummary returns a job summary object which matches a specific id.
func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) (*structs.JobSummary, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
watchCh, existing, err := txn.FirstWatch("job_summary", "id", namespace, jobID)
if err != nil {
@ -1942,7 +1935,7 @@ func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string)
// JobSummaries walks the entire job summary table and returns all the job
// summary objects
func (s *StateStore) JobSummaries(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("job_summary", "id")
if err != nil {
@ -1956,7 +1949,7 @@ func (s *StateStore) JobSummaries(ws memdb.WatchSet) (memdb.ResultIterator, erro
// JobSummaryByPrefix is used to look up Job Summary by id prefix
func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("job_summary", "id_prefix", namespace, id)
if err != nil {
@ -2020,7 +2013,7 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
// CSIVolumes returns the unfiltered list of all volumes
func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
defer txn.Abort()
iter, err := txn.Get("csi_volumes", "id")
@ -2036,7 +2029,7 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error)
// CSIVolumeByID is used to lookup a single volume. Returns a copy of the volume
// because its plugins are denormalized to provide accurate Health.
func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*structs.CSIVolume, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
watchCh, obj, err := txn.FirstWatch("csi_volumes", "id_prefix", namespace, id)
if err != nil {
@ -2054,7 +2047,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st
// CSIVolumes looks up csi_volumes by pluginID
func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, pluginID string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("csi_volumes", "plugin_id", pluginID)
if err != nil {
@ -2076,7 +2069,7 @@ func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, pluginID
// CSIVolumesByIDPrefix supports search
func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("csi_volumes", "id_prefix", namespace, volumeID)
if err != nil {
@ -2115,7 +2108,7 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb
// Lookup the raw CSIVolumes to match the other list interfaces
iter := NewSliceIterator()
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
for id, namespace := range ids {
raw, err := txn.First("csi_volumes", "id", namespace, id)
if err != nil {
@ -2129,7 +2122,7 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb
// CSIVolumesByNamespace looks up the entire csi_volumes table
func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("csi_volumes", "id_prefix", namespace, "")
if err != nil {
@ -2282,7 +2275,7 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs
return nil, nil
}
// Lookup CSIPlugin, the health records, and calculate volume health
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
defer txn.Abort()
plug, err := s.CSIPluginByID(ws, vol.PluginID)
@ -2360,7 +2353,7 @@ func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVol
// CSIPlugins returns the unfiltered list of all plugin health status
func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
defer txn.Abort()
iter, err := txn.Get("csi_plugins", "id")
@ -2375,7 +2368,7 @@ func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error)
// CSIPluginsByIDPrefix supports search
func (s *StateStore) CSIPluginsByIDPrefix(ws memdb.WatchSet, pluginID string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("csi_plugins", "id_prefix", pluginID)
if err != nil {
@ -2389,7 +2382,7 @@ func (s *StateStore) CSIPluginsByIDPrefix(ws memdb.WatchSet, pluginID string) (m
// CSIPluginByID returns the one named CSIPlugin
func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
defer txn.Abort()
raw, err := txn.First("csi_plugins", "id_prefix", id)
@ -2564,7 +2557,7 @@ func (s *StateStore) DeletePeriodicLaunchTxn(index uint64, namespace, jobID stri
// PeriodicLaunchByID is used to lookup a periodic launch by the periodic job
// ID.
func (s *StateStore) PeriodicLaunchByID(ws memdb.WatchSet, namespace, id string) (*structs.PeriodicLaunch, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
watchCh, existing, err := txn.FirstWatch("periodic_launch", "id", namespace, id)
if err != nil {
@ -2581,7 +2574,7 @@ func (s *StateStore) PeriodicLaunchByID(ws memdb.WatchSet, namespace, id string)
// PeriodicLaunches returns an iterator over all the periodic launches
func (s *StateStore) PeriodicLaunches(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire table
iter, err := txn.Get("periodic_launch", "id")
@ -2805,7 +2798,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
// EvalByID is used to lookup an eval by its ID
func (s *StateStore) EvalByID(ws memdb.WatchSet, id string) (*structs.Evaluation, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
watchCh, existing, err := txn.FirstWatch("evals", "id", id)
if err != nil {
@ -2823,7 +2816,7 @@ func (s *StateStore) EvalByID(ws memdb.WatchSet, id string) (*structs.Evaluation
// EvalsByIDPrefix is used to lookup evaluations by prefix in a particular
// namespace
func (s *StateStore) EvalsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Get an iterator over all evals by the id prefix
iter, err := txn.Get("evals", "id_prefix", id)
@ -2853,7 +2846,7 @@ func evalNamespaceFilter(namespace string) func(interface{}) bool {
// EvalsByJob returns all the evaluations by job id
func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]*structs.Evaluation, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Get an iterator over the node allocations
iter, err := txn.Get("evals", "job_prefix", namespace, jobID)
@ -2884,7 +2877,7 @@ func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]*
// Evals returns an iterator over all the evaluations
func (s *StateStore) Evals(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire table
iter, err := txn.Get("evals", "id")
@ -2900,7 +2893,7 @@ func (s *StateStore) Evals(ws memdb.WatchSet) (memdb.ResultIterator, error) {
// EvalsByNamespace returns an iterator over all the evaluations in the given
// namespace
func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire table
iter, err := txn.Get("evals", "namespace", namespace)
@ -3225,7 +3218,7 @@ func (s *StateStore) nestedUpdateAllocDesiredTransition(
// AllocByID is used to lookup an allocation by its ID
func (s *StateStore) AllocByID(ws memdb.WatchSet, id string) (*structs.Allocation, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
watchCh, existing, err := txn.FirstWatch("allocs", "id", id)
if err != nil {
@ -3242,7 +3235,7 @@ func (s *StateStore) AllocByID(ws memdb.WatchSet, id string) (*structs.Allocatio
// AllocsByIDPrefix is used to lookup allocs by prefix
func (s *StateStore) AllocsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("allocs", "id_prefix", id)
if err != nil {
@ -3271,7 +3264,7 @@ func allocNamespaceFilter(namespace string) func(interface{}) bool {
// AllocsByIDPrefix is used to lookup allocs by prefix
func (s *StateStore) AllocsByIDPrefixInNSes(ws memdb.WatchSet, namespaces map[string]bool, prefix string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
var iter memdb.ResultIterator
var err error
@ -3303,7 +3296,7 @@ func (s *StateStore) AllocsByIDPrefixInNSes(ws memdb.WatchSet, namespaces map[st
// AllocsByNode returns all the allocations by node
func (s *StateStore) AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Allocation, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Get an iterator over the node allocations, using only the
// node prefix which ignores the terminal status
@ -3327,7 +3320,7 @@ func (s *StateStore) AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Al
// AllocsByNode returns all the allocations by node and terminal status
func (s *StateStore) AllocsByNodeTerminal(ws memdb.WatchSet, node string, terminal bool) ([]*structs.Allocation, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Get an iterator over the node allocations
iter, err := txn.Get("allocs", "node", node, terminal)
@ -3350,7 +3343,7 @@ func (s *StateStore) AllocsByNodeTerminal(ws memdb.WatchSet, node string, termin
// AllocsByJob returns allocations by job id
func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, anyCreateIndex bool) ([]*structs.Allocation, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Get the job
var job *structs.Job
@ -3391,7 +3384,7 @@ func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, any
// AllocsByEval returns all the allocations by eval id
func (s *StateStore) AllocsByEval(ws memdb.WatchSet, evalID string) ([]*structs.Allocation, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Get an iterator over the eval allocations
iter, err := txn.Get("allocs", "eval", evalID)
@ -3414,7 +3407,7 @@ func (s *StateStore) AllocsByEval(ws memdb.WatchSet, evalID string) ([]*structs.
// AllocsByDeployment returns all the allocations by deployment id
func (s *StateStore) AllocsByDeployment(ws memdb.WatchSet, deploymentID string) ([]*structs.Allocation, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Get an iterator over the deployments allocations
iter, err := txn.Get("allocs", "deployment", deploymentID)
@ -3437,7 +3430,7 @@ func (s *StateStore) AllocsByDeployment(ws memdb.WatchSet, deploymentID string)
// Allocs returns an iterator over all the evaluations
func (s *StateStore) Allocs(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire table
iter, err := txn.Get("allocs", "id")
@ -3453,7 +3446,7 @@ func (s *StateStore) Allocs(ws memdb.WatchSet) (memdb.ResultIterator, error) {
// AllocsByNamespace returns an iterator over all the allocations in the
// namespace
func (s *StateStore) AllocsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
return s.allocsByNamespaceImpl(ws, txn, namespace)
}
@ -3517,7 +3510,7 @@ func (s *StateStore) DeleteVaultAccessors(index uint64, accessors []*structs.Vau
// VaultAccessor returns the given Vault accessor
func (s *StateStore) VaultAccessor(ws memdb.WatchSet, accessor string) (*structs.VaultAccessor, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
watchCh, existing, err := txn.FirstWatch("vault_accessors", "id", accessor)
if err != nil {
@ -3535,7 +3528,7 @@ func (s *StateStore) VaultAccessor(ws memdb.WatchSet, accessor string) (*structs
// VaultAccessors returns an iterator of Vault accessors.
func (s *StateStore) VaultAccessors(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("vault_accessors", "id")
if err != nil {
@ -3549,7 +3542,7 @@ func (s *StateStore) VaultAccessors(ws memdb.WatchSet) (memdb.ResultIterator, er
// VaultAccessorsByAlloc returns all the Vault accessors by alloc id
func (s *StateStore) VaultAccessorsByAlloc(ws memdb.WatchSet, allocID string) ([]*structs.VaultAccessor, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Get an iterator over the accessors
iter, err := txn.Get("vault_accessors", "alloc_id", allocID)
@ -3572,7 +3565,7 @@ func (s *StateStore) VaultAccessorsByAlloc(ws memdb.WatchSet, allocID string) ([
// VaultAccessorsByNode returns all the Vault accessors by node id
func (s *StateStore) VaultAccessorsByNode(ws memdb.WatchSet, nodeID string) ([]*structs.VaultAccessor, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Get an iterator over the accessors
iter, err := txn.Get("vault_accessors", "node_id", nodeID)
@ -3650,7 +3643,7 @@ func (s *StateStore) DeleteSITokenAccessors(index uint64, accessors []*structs.S
// SITokenAccessor returns the given Service Identity token accessor.
func (s *StateStore) SITokenAccessor(ws memdb.WatchSet, accessorID string) (*structs.SITokenAccessor, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
defer txn.Abort()
watchCh, existing, err := txn.FirstWatch(siTokenAccessorTable, "id", accessorID)
@ -3669,7 +3662,7 @@ func (s *StateStore) SITokenAccessor(ws memdb.WatchSet, accessorID string) (*str
// SITokenAccessors returns an iterator of Service Identity token accessors.
func (s *StateStore) SITokenAccessors(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
defer txn.Abort()
iter, err := txn.Get(siTokenAccessorTable, "id")
@ -3684,7 +3677,7 @@ func (s *StateStore) SITokenAccessors(ws memdb.WatchSet) (memdb.ResultIterator,
// SITokenAccessorsByAlloc returns all the Service Identity token accessors by alloc ID.
func (s *StateStore) SITokenAccessorsByAlloc(ws memdb.WatchSet, allocID string) ([]*structs.SITokenAccessor, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
defer txn.Abort()
// Get an iterator over the accessors
@ -3705,7 +3698,7 @@ func (s *StateStore) SITokenAccessorsByAlloc(ws memdb.WatchSet, allocID string)
// SITokenAccessorsByNode returns all the Service Identity token accessors by node ID.
func (s *StateStore) SITokenAccessorsByNode(ws memdb.WatchSet, nodeID string) ([]*structs.SITokenAccessor, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
defer txn.Abort()
// Get an iterator over the accessors
@ -4095,7 +4088,7 @@ func (s *StateStore) LatestIndex() (uint64, error) {
// Index finds the matching index value
func (s *StateStore) Index(name string) (uint64, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Lookup the first matching index
out, err := txn.First("index", "id", name)
@ -4110,7 +4103,7 @@ func (s *StateStore) Index(name string) (uint64, error) {
// Indexes returns an iterator over all the indexes
func (s *StateStore) Indexes() (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire nodes table
iter, err := txn.Get("index", "id")
@ -4988,7 +4981,7 @@ func (s *StateStore) DeleteACLPolicies(index uint64, names []string) error {
// ACLPolicyByName is used to lookup a policy by name
func (s *StateStore) ACLPolicyByName(ws memdb.WatchSet, name string) (*structs.ACLPolicy, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
watchCh, existing, err := txn.FirstWatch("acl_policy", "id", name)
if err != nil {
@ -5004,7 +4997,7 @@ func (s *StateStore) ACLPolicyByName(ws memdb.WatchSet, name string) (*structs.A
// ACLPolicyByNamePrefix is used to lookup policies by prefix
func (s *StateStore) ACLPolicyByNamePrefix(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("acl_policy", "id_prefix", prefix)
if err != nil {
@ -5017,7 +5010,7 @@ func (s *StateStore) ACLPolicyByNamePrefix(ws memdb.WatchSet, prefix string) (me
// ACLPolicies returns an iterator over all the acl policies
func (s *StateStore) ACLPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire table
iter, err := txn.Get("acl_policy", "id")
@ -5099,7 +5092,7 @@ func (s *StateStore) ACLTokenByAccessorID(ws memdb.WatchSet, id string) (*struct
return nil, fmt.Errorf("acl token lookup failed: missing accessor id")
}
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
watchCh, existing, err := txn.FirstWatch("acl_token", "id", id)
if err != nil {
@ -5119,7 +5112,7 @@ func (s *StateStore) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*st
return nil, fmt.Errorf("acl token lookup failed: missing secret id")
}
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
watchCh, existing, err := txn.FirstWatch("acl_token", "secret", secretID)
if err != nil {
@ -5135,7 +5128,7 @@ func (s *StateStore) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*st
// ACLTokenByAccessorIDPrefix is used to lookup tokens by prefix
func (s *StateStore) ACLTokenByAccessorIDPrefix(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("acl_token", "id_prefix", prefix)
if err != nil {
@ -5147,7 +5140,7 @@ func (s *StateStore) ACLTokenByAccessorIDPrefix(ws memdb.WatchSet, prefix string
// ACLTokens returns an iterator over all the tokens
func (s *StateStore) ACLTokens(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire table
iter, err := txn.Get("acl_token", "id")
@ -5160,7 +5153,7 @@ func (s *StateStore) ACLTokens(ws memdb.WatchSet) (memdb.ResultIterator, error)
// ACLTokensByGlobal returns an iterator over all the tokens filtered by global value
func (s *StateStore) ACLTokensByGlobal(ws memdb.WatchSet, globalVal bool) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire table
iter, err := txn.Get("acl_token", "global", globalVal)
@ -5173,7 +5166,7 @@ func (s *StateStore) ACLTokensByGlobal(ws memdb.WatchSet, globalVal bool) (memdb
// CanBootstrapACLToken checks if bootstrapping is possible and returns the reset index
func (s *StateStore) CanBootstrapACLToken() (bool, uint64, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Lookup the bootstrap sentinel
out, err := txn.First("index", "id", "acl_token_bootstrap")
@ -5230,7 +5223,7 @@ func (s *StateStore) BootstrapACLTokens(index, resetIndex uint64, token *structs
// SchedulerConfig is used to get the current Scheduler configuration.
func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) {
tx := s.db.Txn(false)
tx := s.db.ReadTxn()
defer tx.Abort()
// Get the scheduler config
@ -5259,7 +5252,7 @@ func (s *StateStore) SchedulerSetConfig(index uint64, config *structs.SchedulerC
}
func (s *StateStore) ClusterMetadata(ws memdb.WatchSet) (*structs.ClusterMetadata, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
defer txn.Abort()
// Get the cluster metadata
@ -5477,7 +5470,7 @@ func (s *StateStore) DeleteScalingPoliciesTxn(index uint64, ids []string, txn *t
// ScalingPolicies returns an iterator over all the scaling policies
func (s *StateStore) ScalingPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// Walk the entire scaling_policy table
iter, err := txn.Get("scaling_policy", "id")
@ -5491,7 +5484,7 @@ func (s *StateStore) ScalingPolicies(ws memdb.WatchSet) (memdb.ResultIterator, e
}
func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
iter, err := txn.Get("scaling_policy", "target_prefix", namespace)
if err != nil {
@ -5514,7 +5507,7 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str
}
func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
return s.ScalingPoliciesByJobTxn(ws, namespace, jobID, txn)
}
@ -5543,7 +5536,7 @@ func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID
}
func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.ScalingPolicy, error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
watchCh, existing, err := txn.FirstWatch("scaling_policy", "id", id)
if err != nil {
@ -5560,7 +5553,7 @@ func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.S
func (s *StateStore) ScalingPolicyByTarget(ws memdb.WatchSet, target map[string]string) (*structs.ScalingPolicy,
error) {
txn := s.db.Txn(false)
txn := s.db.ReadTxn()
// currently, only scaling policy type is against a task group
namespace := target[structs.ScalingTargetNamespace]

View File

@ -6157,7 +6157,7 @@ func TestStateStore_GetJobStatus_NoEvalsOrAllocs(t *testing.T) {
job := mock.Job()
state := testStateStore(t)
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@ -6173,7 +6173,7 @@ func TestStateStore_GetJobStatus_NoEvalsOrAllocs_Periodic(t *testing.T) {
job := mock.PeriodicJob()
state := testStateStore(t)
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@ -6189,7 +6189,7 @@ func TestStateStore_GetJobStatus_NoEvalsOrAllocs_EvalDelete(t *testing.T) {
job := mock.Job()
state := testStateStore(t)
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, true)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@ -6223,7 +6223,7 @@ func TestStateStore_GetJobStatus_DeadEvalsAndAllocs(t *testing.T) {
t.Fatalf("err: %v", err)
}
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@ -6249,7 +6249,7 @@ func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) {
t.Fatalf("err: %v", err)
}
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, true)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@ -6266,7 +6266,7 @@ func TestStateStore_GetJobStatus_PeriodicJob(t *testing.T) {
state := testStateStore(t)
job := mock.PeriodicJob()
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@ -6295,7 +6295,7 @@ func TestStateStore_GetJobStatus_ParameterizedJob(t *testing.T) {
job := mock.Job()
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@ -6331,7 +6331,7 @@ func TestStateStore_SetJobStatus_PendingEval(t *testing.T) {
t.Fatalf("err: %v", err)
}
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, true)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@ -6359,7 +6359,7 @@ func TestStateStore_SetJobStatus_SystemJob(t *testing.T) {
t.Fatalf("err: %v", err)
}
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, true)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@ -8696,7 +8696,6 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) {
require.ElementsMatch([]string{policy2.ID}, policiesInNS2)
}
func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) {
t.Parallel()