From 0af749c92e314a7de967d0f0120f0df4a6148cb0 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Thu, 27 Aug 2020 10:18:53 -0400 Subject: [PATCH] Transaction change tracking This commit wraps memdb.DB with a changeTrackerDB, which is a thin wrapper around memdb.DB which enables go-memdb's TrackChanges on all write transactions. When the transaction is comitted the changes are sent to an eventPublisher which will be used to create and emit change events. debugging TestFSM_ReconcileSummaries wip revert back rebase revert back rebase fix snapshot to actually use a snapshot --- nomad/event/event.go | 13 ++ nomad/fsm.go | 4 +- nomad/fsm_test.go | 20 ++- nomad/state/autopilot.go | 14 +- nomad/state/node_events.go | 175 ++++++++++++++++++++++ nomad/state/node_events_test.go | 147 ++++++++++++++++++ nomad/state/state_changes.go | 148 +++++++++++++++++++ nomad/state/state_store.go | 254 ++++++++++++++++---------------- nomad/state/state_store_oss.go | 5 +- nomad/state/state_store_test.go | 16 +- 10 files changed, 641 insertions(+), 155 deletions(-) create mode 100644 nomad/event/event.go create mode 100644 nomad/state/node_events.go create mode 100644 nomad/state/node_events_test.go create mode 100644 nomad/state/state_changes.go diff --git a/nomad/event/event.go b/nomad/event/event.go new file mode 100644 index 000000000..49e809ac8 --- /dev/null +++ b/nomad/event/event.go @@ -0,0 +1,13 @@ +package event + +type Event struct { + Topic string + Key string + Index uint64 + Payload interface{} +} + +type EventPublisher struct{} + +func NewPublisher() *EventPublisher { return &EventPublisher{} } +func (e EventPublisher) Publish(events []Event) {} diff --git a/nomad/fsm.go b/nomad/fsm.go index 98a65590b..22663fe17 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -574,7 +574,7 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - err := n.state.WithWriteTransaction(func(tx state.Txn) error { + err := n.state.WithWriteTransaction(index, func(tx state.Txn) error { err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx) if err != nil { @@ -612,7 +612,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} // Perform all store updates atomically to ensure a consistent view for store readers. // A partial update may increment the snapshot index, allowing eval brokers to process // evals for jobs whose deregistering didn't get committed yet. - err := n.state.WithWriteTransaction(func(tx state.Txn) error { + err := n.state.WithWriteTransaction(index, func(tx state.Txn) error { for jobNS, options := range req.Jobs { if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil { n.logger.Error("deregistering job failed", "job", jobNS, "error", err) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index e371b5d2a..714471654 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2866,22 +2866,22 @@ func TestFSM_ReconcileSummaries(t *testing.T) { // Add a node node := mock.Node() - state.UpsertNode(800, node) + require.NoError(t, state.UpsertNode(800, node)) // Make a job so that none of the tasks can be placed job1 := mock.Job() job1.TaskGroups[0].Tasks[0].Resources.CPU = 5000 - state.UpsertJob(1000, job1) + require.NoError(t, state.UpsertJob(1000, job1)) // make a job which can make partial progress alloc := mock.Alloc() alloc.NodeID = node.ID - state.UpsertJob(1010, alloc.Job) - state.UpsertAllocs(1011, []*structs.Allocation{alloc}) + require.NoError(t, state.UpsertJob(1010, alloc.Job)) + require.NoError(t, state.UpsertAllocs(1011, []*structs.Allocation{alloc})) // Delete the summaries - state.DeleteJobSummary(1030, job1.Namespace, job1.ID) - state.DeleteJobSummary(1040, alloc.Namespace, alloc.Job.ID) + require.NoError(t, state.DeleteJobSummary(1030, job1.Namespace, job1.ID)) + require.NoError(t, state.DeleteJobSummary(1040, alloc.Namespace, alloc.Job.ID)) req := structs.GenericRequest{} buf, err := structs.Encode(structs.ReconcileJobSummariesRequestType, req) @@ -2895,7 +2895,9 @@ func TestFSM_ReconcileSummaries(t *testing.T) { } ws := memdb.NewWatchSet() - out1, _ := state.JobSummaryByID(ws, job1.Namespace, job1.ID) + out1, err := state.JobSummaryByID(ws, job1.Namespace, job1.ID) + require.NoError(t, err) + expected := structs.JobSummary{ JobID: job1.ID, Namespace: job1.Namespace, @@ -2914,7 +2916,9 @@ func TestFSM_ReconcileSummaries(t *testing.T) { // This exercises the code path which adds the allocations made by the // planner and the number of unplaced allocations in the reconcile summaries // codepath - out2, _ := state.JobSummaryByID(ws, alloc.Namespace, alloc.Job.ID) + out2, err := state.JobSummaryByID(ws, alloc.Namespace, alloc.Job.ID) + require.NoError(t, err) + expected = structs.JobSummary{ JobID: alloc.Job.ID, Namespace: alloc.Job.Namespace, diff --git a/nomad/state/autopilot.go b/nomad/state/autopilot.go index 79447b2ce..149baaba6 100644 --- a/nomad/state/autopilot.go +++ b/nomad/state/autopilot.go @@ -45,11 +45,11 @@ func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) } // AutopilotSetConfig is used to set the current Autopilot configuration. -func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error { - tx := s.db.Txn(true) +func (s *StateStore) AutopilotSetConfig(index uint64, config *structs.AutopilotConfig) error { + tx := s.db.WriteTxn(index) defer tx.Abort() - s.autopilotSetConfigTxn(idx, tx, config) + s.autopilotSetConfigTxn(index, tx, config) tx.Commit() return nil @@ -58,8 +58,8 @@ func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotCon // AutopilotCASConfig is used to try updating the Autopilot configuration with a // given Raft index. If the CAS index specified is not equal to the last observed index // for the config, then the call is a noop, -func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) { - tx := s.db.Txn(true) +func (s *StateStore) AutopilotCASConfig(index, cidx uint64, config *structs.AutopilotConfig) (bool, error) { + tx := s.db.WriteTxn(index) defer tx.Abort() // Check for an existing config @@ -76,13 +76,13 @@ func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.Autopi return false, nil } - s.autopilotSetConfigTxn(idx, tx, config) + s.autopilotSetConfigTxn(index, tx, config) tx.Commit() return true, nil } -func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error { +func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *txn, config *structs.AutopilotConfig) error { // Check for an existing config existing, err := tx.First("autopilot-config", "id") if err != nil { diff --git a/nomad/state/node_events.go b/nomad/state/node_events.go new file mode 100644 index 000000000..6a56b3a22 --- /dev/null +++ b/nomad/state/node_events.go @@ -0,0 +1,175 @@ +package state + +import ( + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/event" + "github.com/hashicorp/nomad/nomad/structs" +) + +// NodeEvent represents a NodeEvent change on a given Node. +type NodeEvent struct { + Message string + NodeID string +} + +// NNodeDrainEvent holds information related to a Node Drain +type NodeDrainEvent struct { + NodeID string + Allocs []string + DrainStrategy structs.DrainStrategy + Message string +} + +func (s *StateStore) NodeEventsFromChanges(tx ReadTxn, changes Changes) ([]event.Event, error) { + var events []event.Event + + var nodeChanges map[string]*memdb.Change + + markNode := func(node string, nodeChange *memdb.Change) { + if nodeChanges == nil { + nodeChanges = make(map[string]*memdb.Change) + } + ch := nodeChanges[node] + if ch == nil { + nodeChanges[node] = nodeChange + } + } + + for _, change := range changes.Changes { + switch change.Table { + case "nodes": + nRaw := change.After + if change.After == nil { + nRaw = change.Before + } + n := nRaw.(*structs.Node) + changeCopy := change + markNode(n.ID, &changeCopy) + } + } + + for node, change := range nodeChanges { + if change != nil && change.Deleted() { + // TODO Node delete event + continue + } + + ne, err := s.statusEventsForNode(tx, node, change) + if err != nil { + return nil, err + } + // Rebuild node node events + events = append(events, ne...) + } + return events, nil +} + +func (s *StateStore) statusEventsForNode(tx ReadTxn, node string, change *memdb.Change) ([]event.Event, error) { + events := []event.Event{} + if change.Created() { + n := change.After.(*structs.Node) + for _, e := range n.Events { + nodeEvent := NodeEvent{Message: e.Message, NodeID: node} + e := event.Event{ + Topic: "NodeEvent", + Key: node, + Payload: nodeEvent, + } + events = append(events, e) + } + } else if change.Updated() { + nbefore := change.Before.(*structs.Node) + nafter := change.After.(*structs.Node) + newEvents := s.newNodeEvents(nbefore.Events, nafter.Events) + for _, e := range newEvents { + if s.isNodeDrainEvent(nbefore, nafter, newEvents) { + allocs, err := s.AllocsByNodeTx(tx, node) + if err != nil { + return []event.Event{}, err + } + var allocIDs []string + for _, a := range allocs { + allocIDs = append(allocIDs, a.ID) + } + + nde := NodeDrainEvent{ + NodeID: node, + DrainStrategy: *nafter.DrainStrategy, + Allocs: allocIDs, + Message: e.Message, + } + e := event.Event{ + Topic: "NodeEvent", + Key: node, + Payload: nde, + } + events = append(events, e) + } else { + + ne := NodeEvent{ + Message: e.Message, + NodeID: node, + } + e := event.Event{ + Topic: "NodeEvent", + Key: node, + Payload: ne, + } + events = append(events, e) + } + } + } + + return events, nil +} + +func (s *StateStore) newNodeEvents(before, after []*structs.NodeEvent) []*structs.NodeEvent { + events := []*structs.NodeEvent{} + if len(before) == len(after) { + return nil + } + + for _, e := range after { + found := false + for _, be := range before { + if e.String() == be.String() { + found = true + break + } + } + if !found { + events = append(events, e) + } + } + return events +} + +func (s *StateStore) isNodeDrainEvent(before, after *structs.Node, newEvents []*structs.NodeEvent) bool { + if before.Drain != after.Drain { + return true + } + + for _, e := range newEvents { + if e.Subsystem == structs.NodeEventSubsystemDrain { + return true + } + } + return false +} + +func (s *StateStore) AllocsByNodeTx(tx ReadTxn, node string) ([]*structs.Allocation, error) { + iter, err := tx.Get("allocs", "node_prefix", node) + if err != nil { + return nil, err + } + + var out []*structs.Allocation + for { + raw := iter.Next() + if raw == nil { + break + } + out = append(out, raw.(*structs.Allocation)) + } + return out, nil +} diff --git a/nomad/state/node_events_test.go b/nomad/state/node_events_test.go new file mode 100644 index 000000000..cc6258a1c --- /dev/null +++ b/nomad/state/node_events_test.go @@ -0,0 +1,147 @@ +package state + +import ( + "testing" + "time" + + "github.com/hashicorp/nomad/nomad/event" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func allocID() string { return "e5bcbac313d6c-e29b-11c4-a5cd-5949157" } + +func mockNode(id string) *structs.Node { + node := mock.Node() + node.ID = id + return node +} + +func TestNodeEventsFromChanges(t *testing.T) { + cases := []struct { + Name string + Setup func(s *StateStore, index uint64) error + Mutate func(s *StateStore, tx *txn) error + WantEvents []event.Event + WantErr bool + }{ + { + Name: "new node registered", + Setup: func(s *StateStore, idx uint64) error { + req := mockNode("8218b700-7e26-aac0-06d8-ff3b15f44e94") + return s.UpsertNode(idx, req) + }, + Mutate: func(s *StateStore, tx *txn) error { + event := &structs.NodeEvent{ + Message: "Node ready foo", + Subsystem: structs.NodeEventSubsystemCluster, + Timestamp: time.Now(), + } + return s.updateNodeStatusTxn(tx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusReady, time.Now().UnixNano(), event) + }, + WantEvents: []event.Event{ + { + Topic: "NodeEvent", + Key: "8218b700-7e26-aac0-06d8-ff3b15f44e94", + Payload: NodeEvent{ + Message: "Node ready foo", + NodeID: "8218b700-7e26-aac0-06d8-ff3b15f44e94", + }, + }, + }, + }, + { + Name: "only new events", + Setup: func(s *StateStore, idx uint64) error { + req := mockNode("8218b700-7e26-aac0-06d8-ff3b15f44e94") + require.NoError(t, s.UpsertNode(idx, req)) + event := &structs.NodeEvent{ + Message: "Node foo initializing", + Subsystem: structs.NodeEventSubsystemCluster, + Timestamp: time.Now(), + } + return s.UpdateNodeStatus(idx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusInit, time.Now().UnixNano(), event) + }, + Mutate: func(s *StateStore, tx *txn) error { + event := &structs.NodeEvent{ + Message: "Node foo ready", + Subsystem: structs.NodeEventSubsystemCluster, + Timestamp: time.Now(), + } + return s.updateNodeStatusTxn(tx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusReady, time.Now().UnixNano(), event) + }, + WantEvents: []event.Event{ + { + Topic: "NodeEvent", + Key: "8218b700-7e26-aac0-06d8-ff3b15f44e94", + Payload: NodeEvent{ + Message: "Node foo ready", + NodeID: "8218b700-7e26-aac0-06d8-ff3b15f44e94", + }, + }, + }, + }, + { + Name: "node drain event", + Setup: func(s *StateStore, idx uint64) error { + + req := mockNode("8218b700-7e26-aac0-06d8-ff3b15f44e94") + require.NoError(t, s.UpsertNode(idx, req)) + event := &structs.NodeEvent{ + Message: "Node foo initializing", + Subsystem: structs.NodeEventSubsystemCluster, + Timestamp: time.Now(), + } + require.NoError(t, s.UpdateNodeStatus(idx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusInit, time.Now().UnixNano(), event)) + alloc := mock.Alloc() + alloc.NodeID = req.ID + alloc.ID = allocID() + return s.UpsertAllocs(idx, []*structs.Allocation{alloc}) + + }, + Mutate: func(s *StateStore, tx *txn) error { + event := &structs.NodeEvent{ + Subsystem: structs.NodeEventSubsystemCluster, + Timestamp: time.Now(), + } + event.SetMessage("Node drain strategy set") + event.SetSubsystem(structs.NodeEventSubsystemDrain) + drain := &structs.DrainStrategy{} + return s.updateNodeDrainImpl(tx, tx.Index, "8218b700-7e26-aac0-06d8-ff3b15f44e94", drain, false, time.Now().UnixNano(), event) + }, + WantEvents: []event.Event{ + { + Topic: "NodeEvent", + Key: "8218b700-7e26-aac0-06d8-ff3b15f44e94", + Payload: NodeDrainEvent{ + Message: "Node drain strategy set", + NodeID: "8218b700-7e26-aac0-06d8-ff3b15f44e94", + Allocs: []string{allocID()}, + }, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + s := testStateStore(t) + + if tc.Setup != nil { + require.NoError(t, tc.Setup(s, 10)) + } + + tx := s.db.WriteTxn(100) + require.NoError(t, tc.Mutate(s, tx)) + + got, err := s.NodeEventsFromChanges(tx, Changes{Changes: tx.Changes(), Index: 100}) + if tc.WantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.WantEvents, got) + }) + } +} diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go new file mode 100644 index 000000000..5d82c618d --- /dev/null +++ b/nomad/state/state_changes.go @@ -0,0 +1,148 @@ +package state + +import ( + "fmt" + + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/event" +) + +// ReadTxn is implemented by memdb.Txn to perform read operations. +type ReadTxn interface { + Get(table, index string, args ...interface{}) (memdb.ResultIterator, error) + First(table, index string, args ...interface{}) (interface{}, error) + FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) + Abort() +} + +// Changes wraps a memdb.Changes to include the index at which these changes +// were made. +type Changes struct { + // Index is the latest index at the time these changes were committed. + Index uint64 + Changes memdb.Changes +} + +// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on +// all write transactions. When the transaction is committed the changes are +// sent to the eventPublisher which will create and emit change events. +type changeTrackerDB struct { + db *memdb.MemDB + publisher eventPublisher + processChanges func(ReadTxn, Changes) ([]event.Event, error) +} + +type eventPublisher interface { + Publish(events []event.Event) +} + +// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting +// code may use it to create a read-only transaction, but it will panic if called +// with write=true. +// +// Deprecated: use either ReadTxn, or WriteTxn. +func (c *changeTrackerDB) Txn(write bool) *txn { + if write { + panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)") + } + return c.ReadTxn() +} + +// ReadTxn returns a read-only transaction which behaves exactly the same as +// memdb.Txn +// +// TODO: this could return a regular memdb.Txn if all the state functions accepted +// the ReadTxn interface +func (c *changeTrackerDB) ReadTxn() *txn { + return &txn{Txn: c.db.Txn(false)} +} + +// WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store. +// It will track changes and publish events for the changes when Commit +// is called. +// +// The idx argument must be the index of the current Raft operation. Almost +// all mutations to state should happen as part of a raft apply so the index of +// the log being applied can be passed to WriteTxn. +// The exceptional cases are transactions that are executed on an empty +// memdb.DB as part of Restore, and those executed by tests where we insert +// data directly into the DB. These cases may use WriteTxnRestore. +func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { + t := &txn{ + Txn: c.db.Txn(true), + Index: idx, + publish: c.publish, + } + t.Txn.TrackChanges() + return t +} + +func (c *changeTrackerDB) publish(changes Changes) error { + readOnlyTx := c.db.Txn(false) + defer readOnlyTx.Abort() + + events, err := c.processChanges(readOnlyTx, changes) + if err != nil { + return fmt.Errorf("failed generating events from changes: %v", err) + } + c.publisher.Publish(events) + return nil +} + +// WriteTxnRestore returns a wrapped RW transaction that does NOT have change +// tracking enabled. This should only be used in Restore where we need to +// replace the entire contents of the Store without a need to track the changes. +// WriteTxnRestore uses a zero index since the whole restore doesn't really occur +// at one index - the effect is to write many values that were previously +// written across many indexes. +func (c *changeTrackerDB) WriteTxnRestore() *txn { + return &txn{ + Txn: c.db.Txn(true), + Index: 0, + } +} + +// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher. +// +// This can not be done with txn.Defer because the callback passed to Defer is +// invoked after commit completes, and because the callback can not return an +// error. Any errors from the callback would be lost, which would result in a +// missing change event, even though the state store had changed. +type txn struct { + *memdb.Txn + // Index in raft where the write is occurring. The value is zero for a + // read-only, or WriteTxnRestore transaction. + // Index is stored so that it may be passed along to any subscribers as part + // of a change event. + Index uint64 + publish func(changes Changes) error +} + +// Commit first pushes changes to EventPublisher, then calls Commit on the +// underlying transaction. +// +// Note that this function, unlike memdb.Txn, returns an error which must be checked +// by the caller. A non-nil error indicates that a commit failed and was not +// applied. +func (tx *txn) Commit() error { + // publish may be nil if this is a read-only or WriteTxnRestore transaction. + // In those cases changes should also be empty, and there will be nothing + // to publish. + if tx.publish != nil { + changes := Changes{ + Index: tx.Index, + Changes: tx.Txn.Changes(), + } + if err := tx.publish(changes); err != nil { + return err + } + } + + tx.Txn.Commit() + return nil +} + +func processDBChanges(tx ReadTxn, changes Changes) ([]event.Event, error) { + // TODO: add handlers here. + return []event.Event{}, nil +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index d8ebbabd9..bd6ad970f 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -7,18 +7,20 @@ import ( "sort" "time" + "github.com/davecgh/go-spew/spew" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" multierror "github.com/hashicorp/go-multierror" "github.com/pkg/errors" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/event" "github.com/hashicorp/nomad/nomad/structs" ) // Txn is a transaction against a state store. // This can be a read or write transaction. -type Txn = *memdb.Txn +type Txn = *txn const ( // NodeRegisterEventReregistered is the message used when the node becomes @@ -55,7 +57,7 @@ type StateStoreConfig struct { // considered a constant and NEVER modified in place. type StateStore struct { logger log.Logger - db *memdb.MemDB + db *changeTrackerDB // config is the passed in configuration config *StateStoreConfig @@ -76,10 +78,14 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { // Create the state store s := &StateStore{ logger: config.Logger.Named("state_store"), - db: db, config: config, abandonCh: make(chan struct{}), } + s.db = &changeTrackerDB{ + db: db, + publisher: event.NewPublisher(), + processChanges: processDBChanges, + } // Initialize the state store with required enterprise objects if err := s.enterpriseInit(); err != nil { @@ -98,12 +104,20 @@ func (s *StateStore) Config() *StateStoreConfig { // we use MemDB, we just need to snapshot the state of the underlying // database. func (s *StateStore) Snapshot() (*StateSnapshot, error) { + memDB := s.db.db + memDBSnap := memDB.Snapshot() + + store := StateStore{ + logger: s.logger, + config: s.config, + } + store.db = &changeTrackerDB{ + db: memDBSnap, + publisher: event.NewPublisher(), // TODO nil publishelklr + processChanges: processDBChanges, // nil + } snap := &StateSnapshot{ - StateStore: StateStore{ - logger: s.logger, - config: s.config, - db: s.db.Snapshot(), - }, + StateStore: store, } return snap, nil } @@ -167,7 +181,7 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State // state by minimizing the number of transactions and checking // overhead. func (s *StateStore) Restore() (*StateRestore, error) { - txn := s.db.Txn(true) + txn := s.db.WriteTxnRestore() r := &StateRestore{ txn: txn, } @@ -253,7 +267,7 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR return err } - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Upsert the newly created or updated deployment @@ -347,7 +361,7 @@ func addComputedAllocAttrs(allocs []*structs.Allocation, job *structs.Job) { // upsertDeploymentUpdates updates the deployments given the passed status // updates. -func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *memdb.Txn) error { +func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *txn) error { for _, u := range updates { if err := s.updateDeploymentStatusImpl(index, u, txn); err != nil { return err @@ -359,7 +373,7 @@ func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.De // UpsertJobSummary upserts a job summary into the state store. func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Check if the job summary already exists @@ -394,7 +408,7 @@ func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSumma // DeleteJobSummary deletes the job summary with the given ID. This is for // testing purposes only. func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Delete the job summary @@ -411,7 +425,7 @@ func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error // UpsertDeployment is used to insert a new deployment. If cancelPrior is set to // true, all prior deployments for the same job will be cancelled. func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.upsertDeploymentImpl(index, deployment, txn); err != nil { return err @@ -420,7 +434,7 @@ func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployme return nil } -func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Deployment, txn *memdb.Txn) error { +func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Deployment, txn *txn) error { // Check if the deployment already exists existing, err := txn.First("deployment", "id", deployment.ID) if err != nil { @@ -516,7 +530,7 @@ func (s *StateStore) DeploymentByID(ws memdb.WatchSet, deploymentID string) (*st return s.deploymentByIDImpl(ws, deploymentID, txn) } -func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string, txn *memdb.Txn) (*structs.Deployment, error) { +func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string, txn *txn) (*structs.Deployment, error) { watchCh, existing, err := txn.FirstWatch("deployment", "id", deploymentID) if err != nil { return nil, fmt.Errorf("deployment lookup failed: %v", err) @@ -602,7 +616,7 @@ func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID // DeleteDeployment is used to delete a set of deployments by ID func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if len(deploymentIDs) == 0 { @@ -636,7 +650,7 @@ func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) erro // UpsertScalingEvent is used to insert a new scaling event. // Only the most recent JobTrackedScalingEvents will be kept. func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventRequest) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Get the existing events @@ -720,7 +734,7 @@ func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID stri // This is assumed to be triggered by the client, so we retain the value // of drain/eligibility which is set by the scheduler. func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Check if the node already exists @@ -781,7 +795,7 @@ func (s *StateStore) DeleteNode(index uint64, nodes []string) error { return fmt.Errorf("node ids missing") } - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for _, nodeID := range nodes { @@ -814,9 +828,19 @@ func (s *StateStore) DeleteNode(index uint64, nodes []string) error { // UpdateNodeStatus is used to update the status of a node func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() + if err := s.updateNodeStatusTxn(txn, index, nodeID, status, updatedAt, event); err != nil { + return err + } + + txn.Commit() + return nil +} + +func (s *StateStore) updateNodeStatusTxn(txn *txn, index uint64, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error { + // Lookup the node existing, err := txn.First("nodes", "id", nodeID) if err != nil { @@ -844,17 +868,15 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, updat if err := txn.Insert("nodes", copyNode); err != nil { return fmt.Errorf("node update failed: %v", err) } - if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { + if err := txn.Insert("index", &IndexEntry{"nodes", txn.Index}); err != nil { return fmt.Errorf("index update failed: %v", err) } - - txn.Commit() return nil } // BatchUpdateNodeDrain is used to update the drain of a node set of nodes func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for node, update := range updates { if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, updatedAt, events[node]); err != nil { @@ -869,7 +891,7 @@ func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil { return err @@ -878,7 +900,7 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, return nil } -func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID string, +func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string, drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error { // Lookup the node @@ -925,7 +947,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st // UpdateNodeEligibility is used to update the scheduling eligibility of a node func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Lookup the node @@ -971,7 +993,7 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil // UpsertNodeEvents adds the node events to the nodes, rotating events as // necessary. func (s *StateStore) UpsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for nodeID, events := range nodeEvents { @@ -987,7 +1009,7 @@ func (s *StateStore) UpsertNodeEvents(index uint64, nodeEvents map[string][]*str // upsertNodeEvent upserts a node event for a respective node. It also maintains // that a fixed number of node events are ever stored simultaneously, deleting // older events once this bound has been reached. -func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*structs.NodeEvent, txn *memdb.Txn) error { +func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*structs.NodeEvent, txn *txn) error { // Lookup the node existing, err := txn.First("nodes", "id", nodeID) if err != nil { @@ -1031,7 +1053,7 @@ func appendNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEv // upsertNodeCSIPlugins indexes csi plugins for volume retrieval, with health. It's called // on upsertNodeEvents, so that event driven health changes are updated -func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) error { +func upsertNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error { loop := func(info *structs.CSIInfo) error { raw, err := txn.First("csi_plugins", "id", info.PluginID) @@ -1147,7 +1169,7 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro } // deleteNodeCSIPlugins cleans up CSIInfo node health status, called in DeleteNode -func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) error { +func deleteNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error { if len(node.CSIControllerPlugins) == 0 && len(node.CSINodePlugins) == 0 { return nil } @@ -1190,7 +1212,7 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro } // updateOrGCPlugin updates a plugin but will delete it if the plugin is empty -func updateOrGCPlugin(index uint64, txn *memdb.Txn, plug *structs.CSIPlugin) error { +func updateOrGCPlugin(index uint64, txn *txn, plug *structs.CSIPlugin) error { plug.ModifyIndex = index if plug.IsEmpty() { @@ -1209,7 +1231,7 @@ func updateOrGCPlugin(index uint64, txn *memdb.Txn, plug *structs.CSIPlugin) err // deleteJobFromPlugins removes the allocations of this job from any plugins the job is // running, possibly deleting the plugin if it's no longer in use. It's called in DeleteJobTxn -func (s *StateStore) deleteJobFromPlugins(index uint64, txn *memdb.Txn, job *structs.Job) error { +func (s *StateStore) deleteJobFromPlugin(index uint64, txn *txn, job *structs.Job) error { ws := memdb.NewWatchSet() summary, err := s.JobSummaryByID(ws, job.Namespace, job.ID) if err != nil { @@ -1362,7 +1384,7 @@ func (s *StateStore) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) { // UpsertJob is used to register a job or update a job definition func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.upsertJobImpl(index, job, false, txn); err != nil { return err @@ -1378,7 +1400,7 @@ func (s *StateStore) UpsertJobTxn(index uint64, job *structs.Job, txn Txn) error } // upsertJobImpl is the implementation for registering a job or updating a job definition -func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *memdb.Txn) error { +func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *txn) error { // Assert the namespace exists if exists, err := s.namespaceExists(txn, job.Namespace); err != nil { return err @@ -1465,7 +1487,7 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b // DeleteJob is used to deregister a job func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() err := s.DeleteJobTxn(index, namespace, jobID, txn) @@ -1577,7 +1599,7 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn } // deleteJobScalingPolicies deletes any scaling policies associated with the job -func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error { +func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *txn) error { iter, err := s.ScalingPoliciesByJobTxn(nil, job.Namespace, job.ID, txn) if err != nil { return fmt.Errorf("getting job scaling policies for deletion failed: %v", err) @@ -1610,7 +1632,7 @@ func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, tx } // deleteJobVersions deletes all versions of the given job. -func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memdb.Txn) error { +func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *txn) error { iter, err := txn.Get("job_version", "id_prefix", job.Namespace, job.ID) if err != nil { return err @@ -1650,7 +1672,7 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memd // upsertJobVersion inserts a job into its historic version table and limits the // number of job versions that are tracked. -func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb.Txn) error { +func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) error { // Insert the job if err := txn.Insert("job_version", job); err != nil { return fmt.Errorf("failed to insert job into job_version table: %v", err) @@ -1743,7 +1765,7 @@ func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([ // jobVersionByID is the underlying implementation for retrieving all tracked // versions of a job and is called under an existing transaction. A watch set // can optionally be passed in to add the job histories to the watch set. -func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { +func (s *StateStore) jobVersionByID(txn *txn, ws *memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { // Get all the historic jobs for this ID iter, err := txn.Get("job_version", "id_prefix", namespace, id) if err != nil { @@ -1788,7 +1810,7 @@ func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, namespace, id string, // jobByIDAndVersionImpl returns the job identified by its ID and Version. The // passed watchset may be nil. func (s *StateStore) jobByIDAndVersionImpl(ws memdb.WatchSet, namespace, id string, - version uint64, txn *memdb.Txn) (*structs.Job, error) { + version uint64, txn *txn) (*structs.Job, error) { watchCh, existing, err := txn.FirstWatch("job_version", "id", namespace, id, version) if err != nil { @@ -1842,7 +1864,7 @@ func (s *StateStore) JobsByNamespace(ws memdb.WatchSet, namespace string) (memdb } // jobsByNamespaceImpl returns an iterator over all the jobs for the given namespace -func (s *StateStore) jobsByNamespaceImpl(ws memdb.WatchSet, namespace string, txn *memdb.Txn) (memdb.ResultIterator, error) { +func (s *StateStore) jobsByNamespaceImpl(ws memdb.WatchSet, namespace string, txn *txn) (memdb.ResultIterator, error) { // Walk the entire jobs table iter, err := txn.Get("jobs", "id_prefix", namespace, "") if err != nil { @@ -1949,7 +1971,7 @@ func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string) // CSIVolumeRegister adds a volume to the server store, failing if it already exists func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolume) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for _, v := range volumes { @@ -2121,7 +2143,7 @@ func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace string) // CSIVolumeClaim updates the volume's claim count and allocation list func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *structs.CSIVolumeClaim) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() ws := memdb.NewWatchSet() @@ -2189,7 +2211,7 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s // CSIVolumeDeregister removes the volume from the server func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []string, force bool) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for _, id := range ids { @@ -2418,7 +2440,7 @@ func (s *StateStore) CSIPluginDenormalize(ws memdb.WatchSet, plug *structs.CSIPl // is currently no raft message for this, as it's intended to support // testing use cases. func (s *StateStore) UpsertCSIPlugin(index uint64, plug *structs.CSIPlugin) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() existing, err := txn.First("csi_plugins", "id", plug.ID) @@ -2444,7 +2466,7 @@ func (s *StateStore) UpsertCSIPlugin(index uint64, plug *structs.CSIPlugin) erro // DeleteCSIPlugin deletes the plugin if it's not in use. func (s *StateStore) DeleteCSIPlugin(index uint64, id string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() ws := memdb.NewWatchSet() @@ -2475,7 +2497,7 @@ func (s *StateStore) DeleteCSIPlugin(index uint64, id string) error { // UpsertPeriodicLaunch is used to register a launch or update it. func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.PeriodicLaunch) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Check if the job already exists @@ -2507,7 +2529,7 @@ func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.Periodic // DeletePeriodicLaunch is used to delete the periodic launch func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() err := s.DeletePeriodicLaunchTxn(index, namespace, jobID, txn) @@ -2575,7 +2597,7 @@ func (s *StateStore) PeriodicLaunches(ws memdb.WatchSet) (memdb.ResultIterator, // UpsertEvals is used to upsert a set of evaluations func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() err := s.UpsertEvalsTxn(index, evals, txn) @@ -2611,7 +2633,7 @@ func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, t } // nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction -func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error { +func (s *StateStore) nestedUpsertEval(txn *txn, index uint64, eval *structs.Evaluation) error { // Lookup the evaluation existing, err := txn.First("evals", "id", eval.ID) if err != nil { @@ -2702,7 +2724,7 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct // updateEvalModifyIndex is used to update the modify index of an evaluation that has been // through a scheduler pass. This is done as part of plan apply. It ensures that when a subsequent // scheduler workers process a re-queued evaluation it sees any partial updates from the plan apply. -func (s *StateStore) updateEvalModifyIndex(txn *memdb.Txn, index uint64, evalID string) error { +func (s *StateStore) updateEvalModifyIndex(txn *txn, index uint64, evalID string) error { // Lookup the evaluation existing, err := txn.First("evals", "id", evalID) if err != nil { @@ -2728,7 +2750,7 @@ func (s *StateStore) updateEvalModifyIndex(txn *memdb.Txn, index uint64, evalID // DeleteEval is used to delete an evaluation func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() jobs := make(map[structs.NamespacedID]string, len(evals)) @@ -2898,7 +2920,7 @@ func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memd // the desired state comes from the schedulers, while the actual state comes // from clients. func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allocation) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Handle each of the updated allocations @@ -2918,7 +2940,7 @@ func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allo } // nestedUpdateAllocFromClient is used to nest an update of an allocation with client status -func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, alloc *structs.Allocation) error { +func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc *structs.Allocation) error { // Look for existing alloc existing, err := txn.First("allocs", "id", alloc.ID) if err != nil { @@ -3007,7 +3029,7 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a // UpsertAllocs is used to evict a set of allocations and allocate new ones at // the same time. func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.upsertAllocsImpl(index, allocs, txn); err != nil { return err @@ -3018,7 +3040,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er // upsertAllocs is the actual implementation of UpsertAllocs so that it may be // used with an existing transaction. -func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation, txn *memdb.Txn) error { +func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation, txn *txn) error { // Handle the allocations jobs := make(map[structs.NamespacedID]string, 1) for _, alloc := range allocs { @@ -3142,7 +3164,7 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation func (s *StateStore) UpdateAllocsDesiredTransitions(index uint64, allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Handle each of the updated allocations @@ -3170,7 +3192,7 @@ func (s *StateStore) UpdateAllocsDesiredTransitions(index uint64, allocs map[str // nestedUpdateAllocDesiredTransition is used to nest an update of an // allocations desired transition func (s *StateStore) nestedUpdateAllocDesiredTransition( - txn *memdb.Txn, index uint64, allocID string, + txn *txn, index uint64, allocID string, transition *structs.DesiredTransition) error { // Look for existing alloc @@ -3438,7 +3460,7 @@ func (s *StateStore) AllocsByNamespace(ws memdb.WatchSet, namespace string) (mem // allocsByNamespaceImpl returns an iterator over all the allocations in the // namespace -func (s *StateStore) allocsByNamespaceImpl(ws memdb.WatchSet, txn *memdb.Txn, namespace string) (memdb.ResultIterator, error) { +func (s *StateStore) allocsByNamespaceImpl(ws memdb.WatchSet, txn *txn, namespace string) (memdb.ResultIterator, error) { // Walk the entire table iter, err := txn.Get("allocs", "namespace", namespace) if err != nil { @@ -3452,7 +3474,7 @@ func (s *StateStore) allocsByNamespaceImpl(ws memdb.WatchSet, txn *memdb.Txn, na // UpsertVaultAccessors is used to register a set of Vault Accessors func (s *StateStore) UpsertVaultAccessor(index uint64, accessors []*structs.VaultAccessor) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for _, accessor := range accessors { @@ -3475,7 +3497,7 @@ func (s *StateStore) UpsertVaultAccessor(index uint64, accessors []*structs.Vaul // DeleteVaultAccessors is used to delete a set of Vault Accessors func (s *StateStore) DeleteVaultAccessors(index uint64, accessors []*structs.VaultAccessor) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Lookup the accessor @@ -3583,7 +3605,7 @@ const siTokenAccessorTable = "si_token_accessors" // UpsertSITokenAccessors is used to register a set of Service Identity token accessors. func (s *StateStore) UpsertSITokenAccessors(index uint64, accessors []*structs.SITokenAccessor) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for _, accessor := range accessors { @@ -3607,7 +3629,7 @@ func (s *StateStore) UpsertSITokenAccessors(index uint64, accessors []*structs.S // DeleteSITokenAccessors is used to delete a set of Service Identity token accessors. func (s *StateStore) DeleteSITokenAccessors(index uint64, accessors []*structs.SITokenAccessor) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Lookup each accessor @@ -3706,7 +3728,7 @@ func (s *StateStore) SITokenAccessorsByNode(ws memdb.WatchSet, nodeID string) ([ // UpdateDeploymentStatus is used to make deployment status updates and // potentially make a evaluation func (s *StateStore) UpdateDeploymentStatus(index uint64, req *structs.DeploymentStatusUpdateRequest) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.updateDeploymentStatusImpl(index, req.DeploymentUpdate, txn); err != nil { @@ -3732,7 +3754,7 @@ func (s *StateStore) UpdateDeploymentStatus(index uint64, req *structs.Deploymen } // updateDeploymentStatusImpl is used to make deployment status updates -func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.DeploymentStatusUpdate, txn *memdb.Txn) error { +func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.DeploymentStatusUpdate, txn *txn) error { // Retrieve deployment ws := memdb.NewWatchSet() deployment, err := s.deploymentByIDImpl(ws, u.DeploymentID, txn) @@ -3773,7 +3795,7 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym // UpdateJobStability updates the stability of the given job and version to the // desired status. func (s *StateStore) UpdateJobStability(index uint64, namespace, jobID string, jobVersion uint64, stable bool) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.updateJobStabilityImpl(index, namespace, jobID, jobVersion, stable, txn); err != nil { @@ -3785,7 +3807,7 @@ func (s *StateStore) UpdateJobStability(index uint64, namespace, jobID string, j } // updateJobStabilityImpl updates the stability of the given job and version -func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID string, jobVersion uint64, stable bool, txn *memdb.Txn) error { +func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID string, jobVersion uint64, stable bool, txn *txn) error { // Get the job that is referenced job, err := s.jobByIDAndVersionImpl(nil, namespace, jobID, jobVersion, txn) if err != nil { @@ -3810,7 +3832,7 @@ func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID strin // UpdateDeploymentPromotion is used to promote canaries in a deployment and // potentially make a evaluation func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyDeploymentPromoteRequest) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Retrieve deployment and ensure it is not terminal and is active @@ -3953,7 +3975,7 @@ func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyD // UpdateDeploymentAllocHealth is used to update the health of allocations as // part of the deployment and potentially make a evaluation func (s *StateStore) UpdateDeploymentAllocHealth(index uint64, req *structs.ApplyDeploymentAllocHealthRequest) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Retrieve deployment and ensure it is not terminal and is active @@ -4087,19 +4109,6 @@ func (s *StateStore) Index(name string) (uint64, error) { return out.(*IndexEntry).Value, nil } -// RemoveIndex is a helper method to remove an index for testing purposes -func (s *StateStore) RemoveIndex(name string) error { - txn := s.db.Txn(true) - defer txn.Abort() - - if _, err := txn.DeleteAll("index", "id", name); err != nil { - return err - } - - txn.Commit() - return nil -} - // Indexes returns an iterator over all the indexes func (s *StateStore) Indexes() (memdb.ResultIterator, error) { txn := s.db.Txn(false) @@ -4115,7 +4124,7 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) { // ReconcileJobSummaries re-creates summaries for all jobs present in the state // store func (s *StateStore) ReconcileJobSummaries(index uint64) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Get all the jobs @@ -4274,7 +4283,7 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { // setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID. // It takes a map of job IDs to an optional forceStatus string. It returns an // error if the job doesn't exist or setJobStatus fails. -func (s *StateStore) setJobStatuses(index uint64, txn *memdb.Txn, +func (s *StateStore) setJobStatuses(index uint64, txn *txn, jobs map[structs.NamespacedID]string, evalDelete bool) error { for tuple, forceStatus := range jobs { @@ -4300,7 +4309,7 @@ func (s *StateStore) setJobStatuses(index uint64, txn *memdb.Txn, // called because an evaluation is being deleted (potentially because of garbage // collection). If forceStatus is non-empty, the job's status will be set to the // passed status. -func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn, +func (s *StateStore) setJobStatus(index uint64, txn *txn, job *structs.Job, evalDelete bool, forceStatus string) error { // Capture the current status so we can check if there is a change @@ -4399,7 +4408,7 @@ func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn, return nil } -func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete bool) (string, error) { +func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) (string, error) { // System, Periodic and Parameterized jobs are running until explicitly // stopped if job.Type == structs.JobTypeSystem || job.IsParameterized() || job.IsPeriodic() { @@ -4456,7 +4465,7 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b // updateSummaryWithJob creates or updates job summaries when new jobs are // upserted or existing ones are updated func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, - txn *memdb.Txn) error { + txn *txn) error { // Update the job summary summaryRaw, err := txn.First("job_summary", "id", job.Namespace, job.ID) @@ -4511,7 +4520,7 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, // updateJobScalingPolicies upserts any scaling policies contained in the job and removes // any previous scaling policies that were removed from the job -func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error { +func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, txn *txn) error { ws := memdb.NewWatchSet() @@ -4623,7 +4632,7 @@ func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, t // updateDeploymentWithAlloc is used to update the deployment state associated // with the given allocation. The passed alloc may be updated if the deployment // status has changed to capture the modify index at which it has changed. -func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *structs.Allocation, txn *memdb.Txn) error { +func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *structs.Allocation, txn *txn) error { // Nothing to do if the allocation is not associated with a deployment if alloc.DeploymentID == "" { return nil @@ -4730,7 +4739,7 @@ func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *st // updateSummaryWithAlloc updates the job summary when allocations are updated // or inserted func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocation, - existingAlloc *structs.Allocation, txn *memdb.Txn) error { + existingAlloc *structs.Allocation, txn *txn) error { // We don't have to update the summary if the job is missing if alloc.Job == nil { @@ -4780,6 +4789,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat } switch alloc.ClientStatus { case structs.AllocClientStatusPending: + spew.Dump("STARTING += 1") tgSummary.Starting += 1 if tgSummary.Queued > 0 { tgSummary.Queued -= 1 @@ -4849,7 +4859,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat // updatePluginWithAlloc updates the CSI plugins for an alloc when the // allocation is updated or inserted with a terminal server status. func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation, - txn *memdb.Txn) error { + txn *txn) error { if !alloc.ServerTerminalStatus() { return nil } @@ -4920,7 +4930,7 @@ func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.J // UpsertACLPolicies is used to create or update a set of ACL policies func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for _, policy := range policies { @@ -4962,7 +4972,7 @@ func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPoli // DeleteACLPolicies deletes the policies with the given names func (s *StateStore) DeleteACLPolicies(index uint64, names []string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Delete the policy @@ -5022,7 +5032,7 @@ func (s *StateStore) ACLPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error // UpsertACLTokens is used to create or update a set of ACL tokens func (s *StateStore) UpsertACLTokens(index uint64, tokens []*structs.ACLToken) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for _, token := range tokens { @@ -5069,7 +5079,7 @@ func (s *StateStore) UpsertACLTokens(index uint64, tokens []*structs.ACLToken) e // DeleteACLTokens deletes the tokens with the given accessor ids func (s *StateStore) DeleteACLTokens(index uint64, ids []string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Delete the tokens @@ -5184,7 +5194,7 @@ func (s *StateStore) CanBootstrapACLToken() (bool, uint64, error) { // BootstrapACLToken is used to create an initial ACL token func (s *StateStore) BootstrapACLTokens(index, resetIndex uint64, token *structs.ACLToken) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Check if we have already done a bootstrap @@ -5240,11 +5250,11 @@ func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, } // SchedulerSetConfig is used to set the current Scheduler configuration. -func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerConfiguration) error { - tx := s.db.Txn(true) +func (s *StateStore) SchedulerSetConfig(index uint64, config *structs.SchedulerConfiguration) error { + tx := s.db.WriteTxn(index) defer tx.Abort() - s.schedulerSetConfigTxn(idx, tx, config) + s.schedulerSetConfigTxn(index, tx, config) tx.Commit() return nil @@ -5269,7 +5279,7 @@ func (s *StateStore) ClusterMetadata(ws memdb.WatchSet) (*structs.ClusterMetadat } func (s *StateStore) ClusterSetMetadata(index uint64, meta *structs.ClusterMetadata) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.setClusterMetadata(txn, meta); err != nil { @@ -5283,8 +5293,8 @@ func (s *StateStore) ClusterSetMetadata(index uint64, meta *structs.ClusterMetad // WithWriteTransaction executes the passed function within a write transaction, // and returns its result. If the invocation returns no error, the transaction // is committed; otherwise, it's aborted. -func (s *StateStore) WithWriteTransaction(fn func(Txn) error) error { - tx := s.db.Txn(true) +func (s *StateStore) WithWriteTransaction(index uint64, fn func(Txn) error) error { + tx := s.db.WriteTxn(index) defer tx.Abort() err := fn(tx) @@ -5297,8 +5307,8 @@ func (s *StateStore) WithWriteTransaction(fn func(Txn) error) error { // SchedulerCASConfig is used to update the scheduler configuration with a // given Raft index. If the CAS index specified is not equal to the last observed index // for the config, then the call is a noop. -func (s *StateStore) SchedulerCASConfig(idx, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) { - tx := s.db.Txn(true) +func (s *StateStore) SchedulerCASConfig(index, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) { + tx := s.db.WriteTxn(index) defer tx.Abort() // Check for an existing config @@ -5315,13 +5325,13 @@ func (s *StateStore) SchedulerCASConfig(idx, cidx uint64, config *structs.Schedu return false, nil } - s.schedulerSetConfigTxn(idx, tx, config) + s.schedulerSetConfigTxn(index, tx, config) tx.Commit() return true, nil } -func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.SchedulerConfiguration) error { +func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *txn, config *structs.SchedulerConfiguration) error { // Check for an existing config existing, err := tx.First("scheduler_config", "id") if err != nil { @@ -5342,7 +5352,7 @@ func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *st return nil } -func (s *StateStore) setClusterMetadata(txn *memdb.Txn, meta *structs.ClusterMetadata) error { +func (s *StateStore) setClusterMetadata(txn *txn, meta *structs.ClusterMetadata) error { // Check for an existing config, if it exists, sanity check the cluster ID matches existing, err := txn.First("cluster_meta", "id") if err != nil { @@ -5367,7 +5377,7 @@ func (s *StateStore) setClusterMetadata(txn *memdb.Txn, meta *structs.ClusterMet // UpsertScalingPolicy is used to insert a new scaling policy. func (s *StateStore) UpsertScalingPolicies(index uint64, scalingPolicies []*structs.ScalingPolicy) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.UpsertScalingPoliciesTxn(index, scalingPolicies, txn); err != nil { @@ -5380,7 +5390,7 @@ func (s *StateStore) UpsertScalingPolicies(index uint64, scalingPolicies []*stru // upsertScalingPolicy is used to insert a new scaling policy. func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*structs.ScalingPolicy, - txn *memdb.Txn) error { + txn *txn) error { hadUpdates := false @@ -5427,7 +5437,7 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s } func (s *StateStore) DeleteScalingPolicies(index uint64, ids []string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() err := s.DeleteScalingPoliciesTxn(index, ids, txn) @@ -5439,7 +5449,7 @@ func (s *StateStore) DeleteScalingPolicies(index uint64, ids []string) error { } // DeleteScalingPolicies is used to delete a set of scaling policies by ID -func (s *StateStore) DeleteScalingPoliciesTxn(index uint64, ids []string, txn *memdb.Txn) error { +func (s *StateStore) DeleteScalingPoliciesTxn(index uint64, ids []string, txn *txn) error { if len(ids) == 0 { return nil } @@ -5491,19 +5501,7 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str } ws.Add(iter.WatchCh()) - - filter := func(raw interface{}) bool { - d, ok := raw.(*structs.ScalingPolicy) - if !ok { - return true - } - - return d.Target[structs.ScalingTargetNamespace] != namespace - } - - // Wrap the iterator in a filter - wrap := memdb.NewFilterIterator(iter, filter) - return wrap, nil + return iter, nil } func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) { @@ -5512,7 +5510,7 @@ func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID st } func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID string, - txn *memdb.Txn) (memdb.ResultIterator, error) { + txn *txn) (memdb.ResultIterator, error) { iter, err := txn.Get("scaling_policy", "target_prefix", namespace, jobID) if err != nil { @@ -5665,7 +5663,7 @@ func getPreemptedAllocDesiredDescription(PreemptedByAllocID string) string { // restoring state by only using a single large transaction // instead of thousands of sub transactions type StateRestore struct { - txn *memdb.Txn + txn *txn } // Abort is used to abort the restore operation diff --git a/nomad/state/state_store_oss.go b/nomad/state/state_store_oss.go index 3ef4e600b..487f84213 100644 --- a/nomad/state/state_store_oss.go +++ b/nomad/state/state_store_oss.go @@ -3,7 +3,6 @@ package state import ( - memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" ) @@ -14,13 +13,13 @@ func (s *StateStore) enterpriseInit() error { } // namespaceExists returns whether a namespace exists -func (s *StateStore) namespaceExists(txn *memdb.Txn, namespace string) (bool, error) { +func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) { return namespace == structs.DefaultNamespace, nil } // updateEntWithAlloc is used to update Nomad Enterprise objects when an allocation is // added/modified/deleted -func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *memdb.Txn) error { +func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *txn) error { return nil } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 4458bcaa9..cb3d94c61 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1903,7 +1903,7 @@ func TestStateStore_DeleteJobTxn_BatchDeletes(t *testing.T) { // Actually delete const deletionIndex = uint64(10001) - err = state.WithWriteTransaction(func(txn Txn) error { + err = state.WithWriteTransaction(deletionIndex, func(txn Txn) error { for i, job := range jobs { err := state.DeleteJobTxn(deletionIndex, job.Namespace, job.ID, txn) require.NoError(t, err, "failed at %d %e", i, err) @@ -6053,19 +6053,20 @@ func TestStateStore_RestoreAlloc(t *testing.T) { func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) { t.Parallel() + index := uint64(0) state := testStateStore(t) - txn := state.db.Txn(true) + txn := state.db.WriteTxn(index) // Create and insert a mock job. job := mock.Job() job.Status = "" - job.ModifyIndex = 0 + job.ModifyIndex = index if err := txn.Insert("jobs", job); err != nil { t.Fatalf("job insert failed: %v", err) } exp := "foobar" - index := uint64(1000) + index = uint64(1000) if err := state.setJobStatus(index, txn, job, false, exp); err != nil { t.Fatalf("setJobStatus() failed: %v", err) } @@ -6088,8 +6089,9 @@ func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) { func TestStateStore_SetJobStatus_NoOp(t *testing.T) { t.Parallel() + index := uint64(0) state := testStateStore(t) - txn := state.db.Txn(true) + txn := state.db.WriteTxn(index) // Create and insert a mock job that should be pending. job := mock.Job() @@ -6099,7 +6101,7 @@ func TestStateStore_SetJobStatus_NoOp(t *testing.T) { t.Fatalf("job insert failed: %v", err) } - index := uint64(1000) + index = uint64(1000) if err := state.setJobStatus(index, txn, job, false, ""); err != nil { t.Fatalf("setJobStatus() failed: %v", err) } @@ -6119,7 +6121,7 @@ func TestStateStore_SetJobStatus(t *testing.T) { t.Parallel() state := testStateStore(t) - txn := state.db.Txn(true) + txn := state.db.WriteTxn(uint64(0)) // Create and insert a mock job that should be pending but has an incorrect // status.