Transaction change tracking

This commit wraps memdb.DB with a changeTrackerDB, which is a thin
wrapper around memdb.DB which enables go-memdb's TrackChanges on all write
transactions. When the transaction is comitted the changes are sent to
an eventPublisher which will be used to create and emit change events.

debugging TestFSM_ReconcileSummaries

wip

revert back rebase

revert back rebase

fix snapshot to actually use a snapshot
This commit is contained in:
Drew Bailey 2020-08-27 10:18:53 -04:00
parent 8e3fc429b2
commit 0af749c92e
No known key found for this signature in database
GPG key ID: FBA61B9FB7CCE1A7
10 changed files with 641 additions and 155 deletions

13
nomad/event/event.go Normal file
View file

@ -0,0 +1,13 @@
package event
type Event struct {
Topic string
Key string
Index uint64
Payload interface{}
}
type EventPublisher struct{}
func NewPublisher() *EventPublisher { return &EventPublisher{} }
func (e EventPublisher) Publish(events []Event) {}

View file

@ -574,7 +574,7 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}
err := n.state.WithWriteTransaction(func(tx state.Txn) error {
err := n.state.WithWriteTransaction(index, func(tx state.Txn) error {
err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx)
if err != nil {
@ -612,7 +612,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{}
// Perform all store updates atomically to ensure a consistent view for store readers.
// A partial update may increment the snapshot index, allowing eval brokers to process
// evals for jobs whose deregistering didn't get committed yet.
err := n.state.WithWriteTransaction(func(tx state.Txn) error {
err := n.state.WithWriteTransaction(index, func(tx state.Txn) error {
for jobNS, options := range req.Jobs {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil {
n.logger.Error("deregistering job failed", "job", jobNS, "error", err)

View file

@ -2866,22 +2866,22 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
// Add a node
node := mock.Node()
state.UpsertNode(800, node)
require.NoError(t, state.UpsertNode(800, node))
// Make a job so that none of the tasks can be placed
job1 := mock.Job()
job1.TaskGroups[0].Tasks[0].Resources.CPU = 5000
state.UpsertJob(1000, job1)
require.NoError(t, state.UpsertJob(1000, job1))
// make a job which can make partial progress
alloc := mock.Alloc()
alloc.NodeID = node.ID
state.UpsertJob(1010, alloc.Job)
state.UpsertAllocs(1011, []*structs.Allocation{alloc})
require.NoError(t, state.UpsertJob(1010, alloc.Job))
require.NoError(t, state.UpsertAllocs(1011, []*structs.Allocation{alloc}))
// Delete the summaries
state.DeleteJobSummary(1030, job1.Namespace, job1.ID)
state.DeleteJobSummary(1040, alloc.Namespace, alloc.Job.ID)
require.NoError(t, state.DeleteJobSummary(1030, job1.Namespace, job1.ID))
require.NoError(t, state.DeleteJobSummary(1040, alloc.Namespace, alloc.Job.ID))
req := structs.GenericRequest{}
buf, err := structs.Encode(structs.ReconcileJobSummariesRequestType, req)
@ -2895,7 +2895,9 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
}
ws := memdb.NewWatchSet()
out1, _ := state.JobSummaryByID(ws, job1.Namespace, job1.ID)
out1, err := state.JobSummaryByID(ws, job1.Namespace, job1.ID)
require.NoError(t, err)
expected := structs.JobSummary{
JobID: job1.ID,
Namespace: job1.Namespace,
@ -2914,7 +2916,9 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
// This exercises the code path which adds the allocations made by the
// planner and the number of unplaced allocations in the reconcile summaries
// codepath
out2, _ := state.JobSummaryByID(ws, alloc.Namespace, alloc.Job.ID)
out2, err := state.JobSummaryByID(ws, alloc.Namespace, alloc.Job.ID)
require.NoError(t, err)
expected = structs.JobSummary{
JobID: alloc.Job.ID,
Namespace: alloc.Job.Namespace,

View file

@ -45,11 +45,11 @@ func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error)
}
// AutopilotSetConfig is used to set the current Autopilot configuration.
func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error {
tx := s.db.Txn(true)
func (s *StateStore) AutopilotSetConfig(index uint64, config *structs.AutopilotConfig) error {
tx := s.db.WriteTxn(index)
defer tx.Abort()
s.autopilotSetConfigTxn(idx, tx, config)
s.autopilotSetConfigTxn(index, tx, config)
tx.Commit()
return nil
@ -58,8 +58,8 @@ func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotCon
// AutopilotCASConfig is used to try updating the Autopilot configuration with a
// given Raft index. If the CAS index specified is not equal to the last observed index
// for the config, then the call is a noop,
func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) {
tx := s.db.Txn(true)
func (s *StateStore) AutopilotCASConfig(index, cidx uint64, config *structs.AutopilotConfig) (bool, error) {
tx := s.db.WriteTxn(index)
defer tx.Abort()
// Check for an existing config
@ -76,13 +76,13 @@ func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.Autopi
return false, nil
}
s.autopilotSetConfigTxn(idx, tx, config)
s.autopilotSetConfigTxn(index, tx, config)
tx.Commit()
return true, nil
}
func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error {
func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *txn, config *structs.AutopilotConfig) error {
// Check for an existing config
existing, err := tx.First("autopilot-config", "id")
if err != nil {

175
nomad/state/node_events.go Normal file
View file

@ -0,0 +1,175 @@
package state
import (
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/event"
"github.com/hashicorp/nomad/nomad/structs"
)
// NodeEvent represents a NodeEvent change on a given Node.
type NodeEvent struct {
Message string
NodeID string
}
// NNodeDrainEvent holds information related to a Node Drain
type NodeDrainEvent struct {
NodeID string
Allocs []string
DrainStrategy structs.DrainStrategy
Message string
}
func (s *StateStore) NodeEventsFromChanges(tx ReadTxn, changes Changes) ([]event.Event, error) {
var events []event.Event
var nodeChanges map[string]*memdb.Change
markNode := func(node string, nodeChange *memdb.Change) {
if nodeChanges == nil {
nodeChanges = make(map[string]*memdb.Change)
}
ch := nodeChanges[node]
if ch == nil {
nodeChanges[node] = nodeChange
}
}
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
nRaw := change.After
if change.After == nil {
nRaw = change.Before
}
n := nRaw.(*structs.Node)
changeCopy := change
markNode(n.ID, &changeCopy)
}
}
for node, change := range nodeChanges {
if change != nil && change.Deleted() {
// TODO Node delete event
continue
}
ne, err := s.statusEventsForNode(tx, node, change)
if err != nil {
return nil, err
}
// Rebuild node node events
events = append(events, ne...)
}
return events, nil
}
func (s *StateStore) statusEventsForNode(tx ReadTxn, node string, change *memdb.Change) ([]event.Event, error) {
events := []event.Event{}
if change.Created() {
n := change.After.(*structs.Node)
for _, e := range n.Events {
nodeEvent := NodeEvent{Message: e.Message, NodeID: node}
e := event.Event{
Topic: "NodeEvent",
Key: node,
Payload: nodeEvent,
}
events = append(events, e)
}
} else if change.Updated() {
nbefore := change.Before.(*structs.Node)
nafter := change.After.(*structs.Node)
newEvents := s.newNodeEvents(nbefore.Events, nafter.Events)
for _, e := range newEvents {
if s.isNodeDrainEvent(nbefore, nafter, newEvents) {
allocs, err := s.AllocsByNodeTx(tx, node)
if err != nil {
return []event.Event{}, err
}
var allocIDs []string
for _, a := range allocs {
allocIDs = append(allocIDs, a.ID)
}
nde := NodeDrainEvent{
NodeID: node,
DrainStrategy: *nafter.DrainStrategy,
Allocs: allocIDs,
Message: e.Message,
}
e := event.Event{
Topic: "NodeEvent",
Key: node,
Payload: nde,
}
events = append(events, e)
} else {
ne := NodeEvent{
Message: e.Message,
NodeID: node,
}
e := event.Event{
Topic: "NodeEvent",
Key: node,
Payload: ne,
}
events = append(events, e)
}
}
}
return events, nil
}
func (s *StateStore) newNodeEvents(before, after []*structs.NodeEvent) []*structs.NodeEvent {
events := []*structs.NodeEvent{}
if len(before) == len(after) {
return nil
}
for _, e := range after {
found := false
for _, be := range before {
if e.String() == be.String() {
found = true
break
}
}
if !found {
events = append(events, e)
}
}
return events
}
func (s *StateStore) isNodeDrainEvent(before, after *structs.Node, newEvents []*structs.NodeEvent) bool {
if before.Drain != after.Drain {
return true
}
for _, e := range newEvents {
if e.Subsystem == structs.NodeEventSubsystemDrain {
return true
}
}
return false
}
func (s *StateStore) AllocsByNodeTx(tx ReadTxn, node string) ([]*structs.Allocation, error) {
iter, err := tx.Get("allocs", "node_prefix", node)
if err != nil {
return nil, err
}
var out []*structs.Allocation
for {
raw := iter.Next()
if raw == nil {
break
}
out = append(out, raw.(*structs.Allocation))
}
return out, nil
}

View file

@ -0,0 +1,147 @@
package state
import (
"testing"
"time"
"github.com/hashicorp/nomad/nomad/event"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func allocID() string { return "e5bcbac313d6c-e29b-11c4-a5cd-5949157" }
func mockNode(id string) *structs.Node {
node := mock.Node()
node.ID = id
return node
}
func TestNodeEventsFromChanges(t *testing.T) {
cases := []struct {
Name string
Setup func(s *StateStore, index uint64) error
Mutate func(s *StateStore, tx *txn) error
WantEvents []event.Event
WantErr bool
}{
{
Name: "new node registered",
Setup: func(s *StateStore, idx uint64) error {
req := mockNode("8218b700-7e26-aac0-06d8-ff3b15f44e94")
return s.UpsertNode(idx, req)
},
Mutate: func(s *StateStore, tx *txn) error {
event := &structs.NodeEvent{
Message: "Node ready foo",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
return s.updateNodeStatusTxn(tx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusReady, time.Now().UnixNano(), event)
},
WantEvents: []event.Event{
{
Topic: "NodeEvent",
Key: "8218b700-7e26-aac0-06d8-ff3b15f44e94",
Payload: NodeEvent{
Message: "Node ready foo",
NodeID: "8218b700-7e26-aac0-06d8-ff3b15f44e94",
},
},
},
},
{
Name: "only new events",
Setup: func(s *StateStore, idx uint64) error {
req := mockNode("8218b700-7e26-aac0-06d8-ff3b15f44e94")
require.NoError(t, s.UpsertNode(idx, req))
event := &structs.NodeEvent{
Message: "Node foo initializing",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
return s.UpdateNodeStatus(idx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusInit, time.Now().UnixNano(), event)
},
Mutate: func(s *StateStore, tx *txn) error {
event := &structs.NodeEvent{
Message: "Node foo ready",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
return s.updateNodeStatusTxn(tx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusReady, time.Now().UnixNano(), event)
},
WantEvents: []event.Event{
{
Topic: "NodeEvent",
Key: "8218b700-7e26-aac0-06d8-ff3b15f44e94",
Payload: NodeEvent{
Message: "Node foo ready",
NodeID: "8218b700-7e26-aac0-06d8-ff3b15f44e94",
},
},
},
},
{
Name: "node drain event",
Setup: func(s *StateStore, idx uint64) error {
req := mockNode("8218b700-7e26-aac0-06d8-ff3b15f44e94")
require.NoError(t, s.UpsertNode(idx, req))
event := &structs.NodeEvent{
Message: "Node foo initializing",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
require.NoError(t, s.UpdateNodeStatus(idx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusInit, time.Now().UnixNano(), event))
alloc := mock.Alloc()
alloc.NodeID = req.ID
alloc.ID = allocID()
return s.UpsertAllocs(idx, []*structs.Allocation{alloc})
},
Mutate: func(s *StateStore, tx *txn) error {
event := &structs.NodeEvent{
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
event.SetMessage("Node drain strategy set")
event.SetSubsystem(structs.NodeEventSubsystemDrain)
drain := &structs.DrainStrategy{}
return s.updateNodeDrainImpl(tx, tx.Index, "8218b700-7e26-aac0-06d8-ff3b15f44e94", drain, false, time.Now().UnixNano(), event)
},
WantEvents: []event.Event{
{
Topic: "NodeEvent",
Key: "8218b700-7e26-aac0-06d8-ff3b15f44e94",
Payload: NodeDrainEvent{
Message: "Node drain strategy set",
NodeID: "8218b700-7e26-aac0-06d8-ff3b15f44e94",
Allocs: []string{allocID()},
},
},
},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
s := testStateStore(t)
if tc.Setup != nil {
require.NoError(t, tc.Setup(s, 10))
}
tx := s.db.WriteTxn(100)
require.NoError(t, tc.Mutate(s, tx))
got, err := s.NodeEventsFromChanges(tx, Changes{Changes: tx.Changes(), Index: 100})
if tc.WantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.WantEvents, got)
})
}
}

View file

@ -0,0 +1,148 @@
package state
import (
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/event"
)
// ReadTxn is implemented by memdb.Txn to perform read operations.
type ReadTxn interface {
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
First(table, index string, args ...interface{}) (interface{}, error)
FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error)
Abort()
}
// Changes wraps a memdb.Changes to include the index at which these changes
// were made.
type Changes struct {
// Index is the latest index at the time these changes were committed.
Index uint64
Changes memdb.Changes
}
// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on
// all write transactions. When the transaction is committed the changes are
// sent to the eventPublisher which will create and emit change events.
type changeTrackerDB struct {
db *memdb.MemDB
publisher eventPublisher
processChanges func(ReadTxn, Changes) ([]event.Event, error)
}
type eventPublisher interface {
Publish(events []event.Event)
}
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
// code may use it to create a read-only transaction, but it will panic if called
// with write=true.
//
// Deprecated: use either ReadTxn, or WriteTxn.
func (c *changeTrackerDB) Txn(write bool) *txn {
if write {
panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)")
}
return c.ReadTxn()
}
// ReadTxn returns a read-only transaction which behaves exactly the same as
// memdb.Txn
//
// TODO: this could return a regular memdb.Txn if all the state functions accepted
// the ReadTxn interface
func (c *changeTrackerDB) ReadTxn() *txn {
return &txn{Txn: c.db.Txn(false)}
}
// WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store.
// It will track changes and publish events for the changes when Commit
// is called.
//
// The idx argument must be the index of the current Raft operation. Almost
// all mutations to state should happen as part of a raft apply so the index of
// the log being applied can be passed to WriteTxn.
// The exceptional cases are transactions that are executed on an empty
// memdb.DB as part of Restore, and those executed by tests where we insert
// data directly into the DB. These cases may use WriteTxnRestore.
func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
t := &txn{
Txn: c.db.Txn(true),
Index: idx,
publish: c.publish,
}
t.Txn.TrackChanges()
return t
}
func (c *changeTrackerDB) publish(changes Changes) error {
readOnlyTx := c.db.Txn(false)
defer readOnlyTx.Abort()
events, err := c.processChanges(readOnlyTx, changes)
if err != nil {
return fmt.Errorf("failed generating events from changes: %v", err)
}
c.publisher.Publish(events)
return nil
}
// WriteTxnRestore returns a wrapped RW transaction that does NOT have change
// tracking enabled. This should only be used in Restore where we need to
// replace the entire contents of the Store without a need to track the changes.
// WriteTxnRestore uses a zero index since the whole restore doesn't really occur
// at one index - the effect is to write many values that were previously
// written across many indexes.
func (c *changeTrackerDB) WriteTxnRestore() *txn {
return &txn{
Txn: c.db.Txn(true),
Index: 0,
}
}
// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher.
//
// This can not be done with txn.Defer because the callback passed to Defer is
// invoked after commit completes, and because the callback can not return an
// error. Any errors from the callback would be lost, which would result in a
// missing change event, even though the state store had changed.
type txn struct {
*memdb.Txn
// Index in raft where the write is occurring. The value is zero for a
// read-only, or WriteTxnRestore transaction.
// Index is stored so that it may be passed along to any subscribers as part
// of a change event.
Index uint64
publish func(changes Changes) error
}
// Commit first pushes changes to EventPublisher, then calls Commit on the
// underlying transaction.
//
// Note that this function, unlike memdb.Txn, returns an error which must be checked
// by the caller. A non-nil error indicates that a commit failed and was not
// applied.
func (tx *txn) Commit() error {
// publish may be nil if this is a read-only or WriteTxnRestore transaction.
// In those cases changes should also be empty, and there will be nothing
// to publish.
if tx.publish != nil {
changes := Changes{
Index: tx.Index,
Changes: tx.Txn.Changes(),
}
if err := tx.publish(changes); err != nil {
return err
}
}
tx.Txn.Commit()
return nil
}
func processDBChanges(tx ReadTxn, changes Changes) ([]event.Event, error) {
// TODO: add handlers here.
return []event.Event{}, nil
}

View file

@ -7,18 +7,20 @@ import (
"sort"
"time"
"github.com/davecgh/go-spew/spew"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/event"
"github.com/hashicorp/nomad/nomad/structs"
)
// Txn is a transaction against a state store.
// This can be a read or write transaction.
type Txn = *memdb.Txn
type Txn = *txn
const (
// NodeRegisterEventReregistered is the message used when the node becomes
@ -55,7 +57,7 @@ type StateStoreConfig struct {
// considered a constant and NEVER modified in place.
type StateStore struct {
logger log.Logger
db *memdb.MemDB
db *changeTrackerDB
// config is the passed in configuration
config *StateStoreConfig
@ -76,10 +78,14 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
// Create the state store
s := &StateStore{
logger: config.Logger.Named("state_store"),
db: db,
config: config,
abandonCh: make(chan struct{}),
}
s.db = &changeTrackerDB{
db: db,
publisher: event.NewPublisher(),
processChanges: processDBChanges,
}
// Initialize the state store with required enterprise objects
if err := s.enterpriseInit(); err != nil {
@ -98,12 +104,20 @@ func (s *StateStore) Config() *StateStoreConfig {
// we use MemDB, we just need to snapshot the state of the underlying
// database.
func (s *StateStore) Snapshot() (*StateSnapshot, error) {
snap := &StateSnapshot{
StateStore: StateStore{
memDB := s.db.db
memDBSnap := memDB.Snapshot()
store := StateStore{
logger: s.logger,
config: s.config,
db: s.db.Snapshot(),
},
}
store.db = &changeTrackerDB{
db: memDBSnap,
publisher: event.NewPublisher(), // TODO nil publishelklr
processChanges: processDBChanges, // nil
}
snap := &StateSnapshot{
StateStore: store,
}
return snap, nil
}
@ -167,7 +181,7 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State
// state by minimizing the number of transactions and checking
// overhead.
func (s *StateStore) Restore() (*StateRestore, error) {
txn := s.db.Txn(true)
txn := s.db.WriteTxnRestore()
r := &StateRestore{
txn: txn,
}
@ -253,7 +267,7 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
return err
}
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Upsert the newly created or updated deployment
@ -347,7 +361,7 @@ func addComputedAllocAttrs(allocs []*structs.Allocation, job *structs.Job) {
// upsertDeploymentUpdates updates the deployments given the passed status
// updates.
func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *memdb.Txn) error {
func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *txn) error {
for _, u := range updates {
if err := s.updateDeploymentStatusImpl(index, u, txn); err != nil {
return err
@ -359,7 +373,7 @@ func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.De
// UpsertJobSummary upserts a job summary into the state store.
func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Check if the job summary already exists
@ -394,7 +408,7 @@ func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSumma
// DeleteJobSummary deletes the job summary with the given ID. This is for
// testing purposes only.
func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Delete the job summary
@ -411,7 +425,7 @@ func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error
// UpsertDeployment is used to insert a new deployment. If cancelPrior is set to
// true, all prior deployments for the same job will be cancelled.
func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
if err := s.upsertDeploymentImpl(index, deployment, txn); err != nil {
return err
@ -420,7 +434,7 @@ func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployme
return nil
}
func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Deployment, txn *memdb.Txn) error {
func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Deployment, txn *txn) error {
// Check if the deployment already exists
existing, err := txn.First("deployment", "id", deployment.ID)
if err != nil {
@ -516,7 +530,7 @@ func (s *StateStore) DeploymentByID(ws memdb.WatchSet, deploymentID string) (*st
return s.deploymentByIDImpl(ws, deploymentID, txn)
}
func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string, txn *memdb.Txn) (*structs.Deployment, error) {
func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string, txn *txn) (*structs.Deployment, error) {
watchCh, existing, err := txn.FirstWatch("deployment", "id", deploymentID)
if err != nil {
return nil, fmt.Errorf("deployment lookup failed: %v", err)
@ -602,7 +616,7 @@ func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID
// DeleteDeployment is used to delete a set of deployments by ID
func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
if len(deploymentIDs) == 0 {
@ -636,7 +650,7 @@ func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) erro
// UpsertScalingEvent is used to insert a new scaling event.
// Only the most recent JobTrackedScalingEvents will be kept.
func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventRequest) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Get the existing events
@ -720,7 +734,7 @@ func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID stri
// This is assumed to be triggered by the client, so we retain the value
// of drain/eligibility which is set by the scheduler.
func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Check if the node already exists
@ -781,7 +795,7 @@ func (s *StateStore) DeleteNode(index uint64, nodes []string) error {
return fmt.Errorf("node ids missing")
}
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
for _, nodeID := range nodes {
@ -814,9 +828,19 @@ func (s *StateStore) DeleteNode(index uint64, nodes []string) error {
// UpdateNodeStatus is used to update the status of a node
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
if err := s.updateNodeStatusTxn(txn, index, nodeID, status, updatedAt, event); err != nil {
return err
}
txn.Commit()
return nil
}
func (s *StateStore) updateNodeStatusTxn(txn *txn, index uint64, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error {
// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
if err != nil {
@ -844,17 +868,15 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, updat
if err := txn.Insert("nodes", copyNode); err != nil {
return fmt.Errorf("node update failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
if err := txn.Insert("index", &IndexEntry{"nodes", txn.Index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Commit()
return nil
}
// BatchUpdateNodeDrain is used to update the drain of a node set of nodes
func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
for node, update := range updates {
if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, updatedAt, events[node]); err != nil {
@ -869,7 +891,7 @@ func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates
func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string,
drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil {
return err
@ -878,7 +900,7 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string,
return nil
}
func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID string,
func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string,
drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error {
// Lookup the node
@ -925,7 +947,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
// UpdateNodeEligibility is used to update the scheduling eligibility of a node
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Lookup the node
@ -971,7 +993,7 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil
// UpsertNodeEvents adds the node events to the nodes, rotating events as
// necessary.
func (s *StateStore) UpsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
for nodeID, events := range nodeEvents {
@ -987,7 +1009,7 @@ func (s *StateStore) UpsertNodeEvents(index uint64, nodeEvents map[string][]*str
// upsertNodeEvent upserts a node event for a respective node. It also maintains
// that a fixed number of node events are ever stored simultaneously, deleting
// older events once this bound has been reached.
func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*structs.NodeEvent, txn *memdb.Txn) error {
func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*structs.NodeEvent, txn *txn) error {
// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
if err != nil {
@ -1031,7 +1053,7 @@ func appendNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEv
// upsertNodeCSIPlugins indexes csi plugins for volume retrieval, with health. It's called
// on upsertNodeEvents, so that event driven health changes are updated
func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) error {
func upsertNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error {
loop := func(info *structs.CSIInfo) error {
raw, err := txn.First("csi_plugins", "id", info.PluginID)
@ -1147,7 +1169,7 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
}
// deleteNodeCSIPlugins cleans up CSIInfo node health status, called in DeleteNode
func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) error {
func deleteNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error {
if len(node.CSIControllerPlugins) == 0 && len(node.CSINodePlugins) == 0 {
return nil
}
@ -1190,7 +1212,7 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
}
// updateOrGCPlugin updates a plugin but will delete it if the plugin is empty
func updateOrGCPlugin(index uint64, txn *memdb.Txn, plug *structs.CSIPlugin) error {
func updateOrGCPlugin(index uint64, txn *txn, plug *structs.CSIPlugin) error {
plug.ModifyIndex = index
if plug.IsEmpty() {
@ -1209,7 +1231,7 @@ func updateOrGCPlugin(index uint64, txn *memdb.Txn, plug *structs.CSIPlugin) err
// deleteJobFromPlugins removes the allocations of this job from any plugins the job is
// running, possibly deleting the plugin if it's no longer in use. It's called in DeleteJobTxn
func (s *StateStore) deleteJobFromPlugins(index uint64, txn *memdb.Txn, job *structs.Job) error {
func (s *StateStore) deleteJobFromPlugin(index uint64, txn *txn, job *structs.Job) error {
ws := memdb.NewWatchSet()
summary, err := s.JobSummaryByID(ws, job.Namespace, job.ID)
if err != nil {
@ -1362,7 +1384,7 @@ func (s *StateStore) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) {
// UpsertJob is used to register a job or update a job definition
func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
if err := s.upsertJobImpl(index, job, false, txn); err != nil {
return err
@ -1378,7 +1400,7 @@ func (s *StateStore) UpsertJobTxn(index uint64, job *structs.Job, txn Txn) error
}
// upsertJobImpl is the implementation for registering a job or updating a job definition
func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *memdb.Txn) error {
func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *txn) error {
// Assert the namespace exists
if exists, err := s.namespaceExists(txn, job.Namespace); err != nil {
return err
@ -1465,7 +1487,7 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b
// DeleteJob is used to deregister a job
func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
err := s.DeleteJobTxn(index, namespace, jobID, txn)
@ -1577,7 +1599,7 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
}
// deleteJobScalingPolicies deletes any scaling policies associated with the job
func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error {
func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *txn) error {
iter, err := s.ScalingPoliciesByJobTxn(nil, job.Namespace, job.ID, txn)
if err != nil {
return fmt.Errorf("getting job scaling policies for deletion failed: %v", err)
@ -1610,7 +1632,7 @@ func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, tx
}
// deleteJobVersions deletes all versions of the given job.
func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memdb.Txn) error {
func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *txn) error {
iter, err := txn.Get("job_version", "id_prefix", job.Namespace, job.ID)
if err != nil {
return err
@ -1650,7 +1672,7 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memd
// upsertJobVersion inserts a job into its historic version table and limits the
// number of job versions that are tracked.
func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb.Txn) error {
func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) error {
// Insert the job
if err := txn.Insert("job_version", job); err != nil {
return fmt.Errorf("failed to insert job into job_version table: %v", err)
@ -1743,7 +1765,7 @@ func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([
// jobVersionByID is the underlying implementation for retrieving all tracked
// versions of a job and is called under an existing transaction. A watch set
// can optionally be passed in to add the job histories to the watch set.
func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, namespace, id string) ([]*structs.Job, error) {
func (s *StateStore) jobVersionByID(txn *txn, ws *memdb.WatchSet, namespace, id string) ([]*structs.Job, error) {
// Get all the historic jobs for this ID
iter, err := txn.Get("job_version", "id_prefix", namespace, id)
if err != nil {
@ -1788,7 +1810,7 @@ func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, namespace, id string,
// jobByIDAndVersionImpl returns the job identified by its ID and Version. The
// passed watchset may be nil.
func (s *StateStore) jobByIDAndVersionImpl(ws memdb.WatchSet, namespace, id string,
version uint64, txn *memdb.Txn) (*structs.Job, error) {
version uint64, txn *txn) (*structs.Job, error) {
watchCh, existing, err := txn.FirstWatch("job_version", "id", namespace, id, version)
if err != nil {
@ -1842,7 +1864,7 @@ func (s *StateStore) JobsByNamespace(ws memdb.WatchSet, namespace string) (memdb
}
// jobsByNamespaceImpl returns an iterator over all the jobs for the given namespace
func (s *StateStore) jobsByNamespaceImpl(ws memdb.WatchSet, namespace string, txn *memdb.Txn) (memdb.ResultIterator, error) {
func (s *StateStore) jobsByNamespaceImpl(ws memdb.WatchSet, namespace string, txn *txn) (memdb.ResultIterator, error) {
// Walk the entire jobs table
iter, err := txn.Get("jobs", "id_prefix", namespace, "")
if err != nil {
@ -1949,7 +1971,7 @@ func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string)
// CSIVolumeRegister adds a volume to the server store, failing if it already exists
func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolume) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
for _, v := range volumes {
@ -2121,7 +2143,7 @@ func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace string)
// CSIVolumeClaim updates the volume's claim count and allocation list
func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *structs.CSIVolumeClaim) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
ws := memdb.NewWatchSet()
@ -2189,7 +2211,7 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s
// CSIVolumeDeregister removes the volume from the server
func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []string, force bool) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
for _, id := range ids {
@ -2418,7 +2440,7 @@ func (s *StateStore) CSIPluginDenormalize(ws memdb.WatchSet, plug *structs.CSIPl
// is currently no raft message for this, as it's intended to support
// testing use cases.
func (s *StateStore) UpsertCSIPlugin(index uint64, plug *structs.CSIPlugin) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
existing, err := txn.First("csi_plugins", "id", plug.ID)
@ -2444,7 +2466,7 @@ func (s *StateStore) UpsertCSIPlugin(index uint64, plug *structs.CSIPlugin) erro
// DeleteCSIPlugin deletes the plugin if it's not in use.
func (s *StateStore) DeleteCSIPlugin(index uint64, id string) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
ws := memdb.NewWatchSet()
@ -2475,7 +2497,7 @@ func (s *StateStore) DeleteCSIPlugin(index uint64, id string) error {
// UpsertPeriodicLaunch is used to register a launch or update it.
func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.PeriodicLaunch) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Check if the job already exists
@ -2507,7 +2529,7 @@ func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.Periodic
// DeletePeriodicLaunch is used to delete the periodic launch
func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
err := s.DeletePeriodicLaunchTxn(index, namespace, jobID, txn)
@ -2575,7 +2597,7 @@ func (s *StateStore) PeriodicLaunches(ws memdb.WatchSet) (memdb.ResultIterator,
// UpsertEvals is used to upsert a set of evaluations
func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
err := s.UpsertEvalsTxn(index, evals, txn)
@ -2611,7 +2633,7 @@ func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, t
}
// nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction
func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error {
func (s *StateStore) nestedUpsertEval(txn *txn, index uint64, eval *structs.Evaluation) error {
// Lookup the evaluation
existing, err := txn.First("evals", "id", eval.ID)
if err != nil {
@ -2702,7 +2724,7 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
// updateEvalModifyIndex is used to update the modify index of an evaluation that has been
// through a scheduler pass. This is done as part of plan apply. It ensures that when a subsequent
// scheduler workers process a re-queued evaluation it sees any partial updates from the plan apply.
func (s *StateStore) updateEvalModifyIndex(txn *memdb.Txn, index uint64, evalID string) error {
func (s *StateStore) updateEvalModifyIndex(txn *txn, index uint64, evalID string) error {
// Lookup the evaluation
existing, err := txn.First("evals", "id", evalID)
if err != nil {
@ -2728,7 +2750,7 @@ func (s *StateStore) updateEvalModifyIndex(txn *memdb.Txn, index uint64, evalID
// DeleteEval is used to delete an evaluation
func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
jobs := make(map[structs.NamespacedID]string, len(evals))
@ -2898,7 +2920,7 @@ func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memd
// the desired state comes from the schedulers, while the actual state comes
// from clients.
func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allocation) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Handle each of the updated allocations
@ -2918,7 +2940,7 @@ func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allo
}
// nestedUpdateAllocFromClient is used to nest an update of an allocation with client status
func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, alloc *structs.Allocation) error {
func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc *structs.Allocation) error {
// Look for existing alloc
existing, err := txn.First("allocs", "id", alloc.ID)
if err != nil {
@ -3007,7 +3029,7 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a
// UpsertAllocs is used to evict a set of allocations and allocate new ones at
// the same time.
func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
if err := s.upsertAllocsImpl(index, allocs, txn); err != nil {
return err
@ -3018,7 +3040,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
// upsertAllocs is the actual implementation of UpsertAllocs so that it may be
// used with an existing transaction.
func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation, txn *memdb.Txn) error {
func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation, txn *txn) error {
// Handle the allocations
jobs := make(map[structs.NamespacedID]string, 1)
for _, alloc := range allocs {
@ -3142,7 +3164,7 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
func (s *StateStore) UpdateAllocsDesiredTransitions(index uint64, allocs map[string]*structs.DesiredTransition,
evals []*structs.Evaluation) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Handle each of the updated allocations
@ -3170,7 +3192,7 @@ func (s *StateStore) UpdateAllocsDesiredTransitions(index uint64, allocs map[str
// nestedUpdateAllocDesiredTransition is used to nest an update of an
// allocations desired transition
func (s *StateStore) nestedUpdateAllocDesiredTransition(
txn *memdb.Txn, index uint64, allocID string,
txn *txn, index uint64, allocID string,
transition *structs.DesiredTransition) error {
// Look for existing alloc
@ -3438,7 +3460,7 @@ func (s *StateStore) AllocsByNamespace(ws memdb.WatchSet, namespace string) (mem
// allocsByNamespaceImpl returns an iterator over all the allocations in the
// namespace
func (s *StateStore) allocsByNamespaceImpl(ws memdb.WatchSet, txn *memdb.Txn, namespace string) (memdb.ResultIterator, error) {
func (s *StateStore) allocsByNamespaceImpl(ws memdb.WatchSet, txn *txn, namespace string) (memdb.ResultIterator, error) {
// Walk the entire table
iter, err := txn.Get("allocs", "namespace", namespace)
if err != nil {
@ -3452,7 +3474,7 @@ func (s *StateStore) allocsByNamespaceImpl(ws memdb.WatchSet, txn *memdb.Txn, na
// UpsertVaultAccessors is used to register a set of Vault Accessors
func (s *StateStore) UpsertVaultAccessor(index uint64, accessors []*structs.VaultAccessor) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
for _, accessor := range accessors {
@ -3475,7 +3497,7 @@ func (s *StateStore) UpsertVaultAccessor(index uint64, accessors []*structs.Vaul
// DeleteVaultAccessors is used to delete a set of Vault Accessors
func (s *StateStore) DeleteVaultAccessors(index uint64, accessors []*structs.VaultAccessor) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Lookup the accessor
@ -3583,7 +3605,7 @@ const siTokenAccessorTable = "si_token_accessors"
// UpsertSITokenAccessors is used to register a set of Service Identity token accessors.
func (s *StateStore) UpsertSITokenAccessors(index uint64, accessors []*structs.SITokenAccessor) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
for _, accessor := range accessors {
@ -3607,7 +3629,7 @@ func (s *StateStore) UpsertSITokenAccessors(index uint64, accessors []*structs.S
// DeleteSITokenAccessors is used to delete a set of Service Identity token accessors.
func (s *StateStore) DeleteSITokenAccessors(index uint64, accessors []*structs.SITokenAccessor) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Lookup each accessor
@ -3706,7 +3728,7 @@ func (s *StateStore) SITokenAccessorsByNode(ws memdb.WatchSet, nodeID string) ([
// UpdateDeploymentStatus is used to make deployment status updates and
// potentially make a evaluation
func (s *StateStore) UpdateDeploymentStatus(index uint64, req *structs.DeploymentStatusUpdateRequest) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
if err := s.updateDeploymentStatusImpl(index, req.DeploymentUpdate, txn); err != nil {
@ -3732,7 +3754,7 @@ func (s *StateStore) UpdateDeploymentStatus(index uint64, req *structs.Deploymen
}
// updateDeploymentStatusImpl is used to make deployment status updates
func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.DeploymentStatusUpdate, txn *memdb.Txn) error {
func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.DeploymentStatusUpdate, txn *txn) error {
// Retrieve deployment
ws := memdb.NewWatchSet()
deployment, err := s.deploymentByIDImpl(ws, u.DeploymentID, txn)
@ -3773,7 +3795,7 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym
// UpdateJobStability updates the stability of the given job and version to the
// desired status.
func (s *StateStore) UpdateJobStability(index uint64, namespace, jobID string, jobVersion uint64, stable bool) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
if err := s.updateJobStabilityImpl(index, namespace, jobID, jobVersion, stable, txn); err != nil {
@ -3785,7 +3807,7 @@ func (s *StateStore) UpdateJobStability(index uint64, namespace, jobID string, j
}
// updateJobStabilityImpl updates the stability of the given job and version
func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID string, jobVersion uint64, stable bool, txn *memdb.Txn) error {
func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID string, jobVersion uint64, stable bool, txn *txn) error {
// Get the job that is referenced
job, err := s.jobByIDAndVersionImpl(nil, namespace, jobID, jobVersion, txn)
if err != nil {
@ -3810,7 +3832,7 @@ func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID strin
// UpdateDeploymentPromotion is used to promote canaries in a deployment and
// potentially make a evaluation
func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyDeploymentPromoteRequest) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Retrieve deployment and ensure it is not terminal and is active
@ -3953,7 +3975,7 @@ func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyD
// UpdateDeploymentAllocHealth is used to update the health of allocations as
// part of the deployment and potentially make a evaluation
func (s *StateStore) UpdateDeploymentAllocHealth(index uint64, req *structs.ApplyDeploymentAllocHealthRequest) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Retrieve deployment and ensure it is not terminal and is active
@ -4087,19 +4109,6 @@ func (s *StateStore) Index(name string) (uint64, error) {
return out.(*IndexEntry).Value, nil
}
// RemoveIndex is a helper method to remove an index for testing purposes
func (s *StateStore) RemoveIndex(name string) error {
txn := s.db.Txn(true)
defer txn.Abort()
if _, err := txn.DeleteAll("index", "id", name); err != nil {
return err
}
txn.Commit()
return nil
}
// Indexes returns an iterator over all the indexes
func (s *StateStore) Indexes() (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
@ -4115,7 +4124,7 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) {
// ReconcileJobSummaries re-creates summaries for all jobs present in the state
// store
func (s *StateStore) ReconcileJobSummaries(index uint64) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Get all the jobs
@ -4274,7 +4283,7 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error {
// setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID.
// It takes a map of job IDs to an optional forceStatus string. It returns an
// error if the job doesn't exist or setJobStatus fails.
func (s *StateStore) setJobStatuses(index uint64, txn *memdb.Txn,
func (s *StateStore) setJobStatuses(index uint64, txn *txn,
jobs map[structs.NamespacedID]string, evalDelete bool) error {
for tuple, forceStatus := range jobs {
@ -4300,7 +4309,7 @@ func (s *StateStore) setJobStatuses(index uint64, txn *memdb.Txn,
// called because an evaluation is being deleted (potentially because of garbage
// collection). If forceStatus is non-empty, the job's status will be set to the
// passed status.
func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn,
func (s *StateStore) setJobStatus(index uint64, txn *txn,
job *structs.Job, evalDelete bool, forceStatus string) error {
// Capture the current status so we can check if there is a change
@ -4399,7 +4408,7 @@ func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn,
return nil
}
func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete bool) (string, error) {
func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) (string, error) {
// System, Periodic and Parameterized jobs are running until explicitly
// stopped
if job.Type == structs.JobTypeSystem || job.IsParameterized() || job.IsPeriodic() {
@ -4456,7 +4465,7 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
// updateSummaryWithJob creates or updates job summaries when new jobs are
// upserted or existing ones are updated
func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
txn *memdb.Txn) error {
txn *txn) error {
// Update the job summary
summaryRaw, err := txn.First("job_summary", "id", job.Namespace, job.ID)
@ -4511,7 +4520,7 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
// updateJobScalingPolicies upserts any scaling policies contained in the job and removes
// any previous scaling policies that were removed from the job
func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error {
func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, txn *txn) error {
ws := memdb.NewWatchSet()
@ -4623,7 +4632,7 @@ func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, t
// updateDeploymentWithAlloc is used to update the deployment state associated
// with the given allocation. The passed alloc may be updated if the deployment
// status has changed to capture the modify index at which it has changed.
func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *structs.Allocation, txn *memdb.Txn) error {
func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *structs.Allocation, txn *txn) error {
// Nothing to do if the allocation is not associated with a deployment
if alloc.DeploymentID == "" {
return nil
@ -4730,7 +4739,7 @@ func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *st
// updateSummaryWithAlloc updates the job summary when allocations are updated
// or inserted
func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocation,
existingAlloc *structs.Allocation, txn *memdb.Txn) error {
existingAlloc *structs.Allocation, txn *txn) error {
// We don't have to update the summary if the job is missing
if alloc.Job == nil {
@ -4780,6 +4789,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
}
switch alloc.ClientStatus {
case structs.AllocClientStatusPending:
spew.Dump("STARTING += 1")
tgSummary.Starting += 1
if tgSummary.Queued > 0 {
tgSummary.Queued -= 1
@ -4849,7 +4859,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
// updatePluginWithAlloc updates the CSI plugins for an alloc when the
// allocation is updated or inserted with a terminal server status.
func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation,
txn *memdb.Txn) error {
txn *txn) error {
if !alloc.ServerTerminalStatus() {
return nil
}
@ -4920,7 +4930,7 @@ func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.J
// UpsertACLPolicies is used to create or update a set of ACL policies
func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
for _, policy := range policies {
@ -4962,7 +4972,7 @@ func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPoli
// DeleteACLPolicies deletes the policies with the given names
func (s *StateStore) DeleteACLPolicies(index uint64, names []string) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Delete the policy
@ -5022,7 +5032,7 @@ func (s *StateStore) ACLPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error
// UpsertACLTokens is used to create or update a set of ACL tokens
func (s *StateStore) UpsertACLTokens(index uint64, tokens []*structs.ACLToken) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
for _, token := range tokens {
@ -5069,7 +5079,7 @@ func (s *StateStore) UpsertACLTokens(index uint64, tokens []*structs.ACLToken) e
// DeleteACLTokens deletes the tokens with the given accessor ids
func (s *StateStore) DeleteACLTokens(index uint64, ids []string) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Delete the tokens
@ -5184,7 +5194,7 @@ func (s *StateStore) CanBootstrapACLToken() (bool, uint64, error) {
// BootstrapACLToken is used to create an initial ACL token
func (s *StateStore) BootstrapACLTokens(index, resetIndex uint64, token *structs.ACLToken) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
// Check if we have already done a bootstrap
@ -5240,11 +5250,11 @@ func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration,
}
// SchedulerSetConfig is used to set the current Scheduler configuration.
func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerConfiguration) error {
tx := s.db.Txn(true)
func (s *StateStore) SchedulerSetConfig(index uint64, config *structs.SchedulerConfiguration) error {
tx := s.db.WriteTxn(index)
defer tx.Abort()
s.schedulerSetConfigTxn(idx, tx, config)
s.schedulerSetConfigTxn(index, tx, config)
tx.Commit()
return nil
@ -5269,7 +5279,7 @@ func (s *StateStore) ClusterMetadata(ws memdb.WatchSet) (*structs.ClusterMetadat
}
func (s *StateStore) ClusterSetMetadata(index uint64, meta *structs.ClusterMetadata) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
if err := s.setClusterMetadata(txn, meta); err != nil {
@ -5283,8 +5293,8 @@ func (s *StateStore) ClusterSetMetadata(index uint64, meta *structs.ClusterMetad
// WithWriteTransaction executes the passed function within a write transaction,
// and returns its result. If the invocation returns no error, the transaction
// is committed; otherwise, it's aborted.
func (s *StateStore) WithWriteTransaction(fn func(Txn) error) error {
tx := s.db.Txn(true)
func (s *StateStore) WithWriteTransaction(index uint64, fn func(Txn) error) error {
tx := s.db.WriteTxn(index)
defer tx.Abort()
err := fn(tx)
@ -5297,8 +5307,8 @@ func (s *StateStore) WithWriteTransaction(fn func(Txn) error) error {
// SchedulerCASConfig is used to update the scheduler configuration with a
// given Raft index. If the CAS index specified is not equal to the last observed index
// for the config, then the call is a noop.
func (s *StateStore) SchedulerCASConfig(idx, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) {
tx := s.db.Txn(true)
func (s *StateStore) SchedulerCASConfig(index, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) {
tx := s.db.WriteTxn(index)
defer tx.Abort()
// Check for an existing config
@ -5315,13 +5325,13 @@ func (s *StateStore) SchedulerCASConfig(idx, cidx uint64, config *structs.Schedu
return false, nil
}
s.schedulerSetConfigTxn(idx, tx, config)
s.schedulerSetConfigTxn(index, tx, config)
tx.Commit()
return true, nil
}
func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.SchedulerConfiguration) error {
func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *txn, config *structs.SchedulerConfiguration) error {
// Check for an existing config
existing, err := tx.First("scheduler_config", "id")
if err != nil {
@ -5342,7 +5352,7 @@ func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *st
return nil
}
func (s *StateStore) setClusterMetadata(txn *memdb.Txn, meta *structs.ClusterMetadata) error {
func (s *StateStore) setClusterMetadata(txn *txn, meta *structs.ClusterMetadata) error {
// Check for an existing config, if it exists, sanity check the cluster ID matches
existing, err := txn.First("cluster_meta", "id")
if err != nil {
@ -5367,7 +5377,7 @@ func (s *StateStore) setClusterMetadata(txn *memdb.Txn, meta *structs.ClusterMet
// UpsertScalingPolicy is used to insert a new scaling policy.
func (s *StateStore) UpsertScalingPolicies(index uint64, scalingPolicies []*structs.ScalingPolicy) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
if err := s.UpsertScalingPoliciesTxn(index, scalingPolicies, txn); err != nil {
@ -5380,7 +5390,7 @@ func (s *StateStore) UpsertScalingPolicies(index uint64, scalingPolicies []*stru
// upsertScalingPolicy is used to insert a new scaling policy.
func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*structs.ScalingPolicy,
txn *memdb.Txn) error {
txn *txn) error {
hadUpdates := false
@ -5427,7 +5437,7 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s
}
func (s *StateStore) DeleteScalingPolicies(index uint64, ids []string) error {
txn := s.db.Txn(true)
txn := s.db.WriteTxn(index)
defer txn.Abort()
err := s.DeleteScalingPoliciesTxn(index, ids, txn)
@ -5439,7 +5449,7 @@ func (s *StateStore) DeleteScalingPolicies(index uint64, ids []string) error {
}
// DeleteScalingPolicies is used to delete a set of scaling policies by ID
func (s *StateStore) DeleteScalingPoliciesTxn(index uint64, ids []string, txn *memdb.Txn) error {
func (s *StateStore) DeleteScalingPoliciesTxn(index uint64, ids []string, txn *txn) error {
if len(ids) == 0 {
return nil
}
@ -5491,19 +5501,7 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str
}
ws.Add(iter.WatchCh())
filter := func(raw interface{}) bool {
d, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}
return d.Target[structs.ScalingTargetNamespace] != namespace
}
// Wrap the iterator in a filter
wrap := memdb.NewFilterIterator(iter, filter)
return wrap, nil
return iter, nil
}
func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) {
@ -5512,7 +5510,7 @@ func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID st
}
func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID string,
txn *memdb.Txn) (memdb.ResultIterator, error) {
txn *txn) (memdb.ResultIterator, error) {
iter, err := txn.Get("scaling_policy", "target_prefix", namespace, jobID)
if err != nil {
@ -5665,7 +5663,7 @@ func getPreemptedAllocDesiredDescription(PreemptedByAllocID string) string {
// restoring state by only using a single large transaction
// instead of thousands of sub transactions
type StateRestore struct {
txn *memdb.Txn
txn *txn
}
// Abort is used to abort the restore operation

View file

@ -3,7 +3,6 @@
package state
import (
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -14,13 +13,13 @@ func (s *StateStore) enterpriseInit() error {
}
// namespaceExists returns whether a namespace exists
func (s *StateStore) namespaceExists(txn *memdb.Txn, namespace string) (bool, error) {
func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) {
return namespace == structs.DefaultNamespace, nil
}
// updateEntWithAlloc is used to update Nomad Enterprise objects when an allocation is
// added/modified/deleted
func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *memdb.Txn) error {
func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *txn) error {
return nil
}

View file

@ -1903,7 +1903,7 @@ func TestStateStore_DeleteJobTxn_BatchDeletes(t *testing.T) {
// Actually delete
const deletionIndex = uint64(10001)
err = state.WithWriteTransaction(func(txn Txn) error {
err = state.WithWriteTransaction(deletionIndex, func(txn Txn) error {
for i, job := range jobs {
err := state.DeleteJobTxn(deletionIndex, job.Namespace, job.ID, txn)
require.NoError(t, err, "failed at %d %e", i, err)
@ -6053,19 +6053,20 @@ func TestStateStore_RestoreAlloc(t *testing.T) {
func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) {
t.Parallel()
index := uint64(0)
state := testStateStore(t)
txn := state.db.Txn(true)
txn := state.db.WriteTxn(index)
// Create and insert a mock job.
job := mock.Job()
job.Status = ""
job.ModifyIndex = 0
job.ModifyIndex = index
if err := txn.Insert("jobs", job); err != nil {
t.Fatalf("job insert failed: %v", err)
}
exp := "foobar"
index := uint64(1000)
index = uint64(1000)
if err := state.setJobStatus(index, txn, job, false, exp); err != nil {
t.Fatalf("setJobStatus() failed: %v", err)
}
@ -6088,8 +6089,9 @@ func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) {
func TestStateStore_SetJobStatus_NoOp(t *testing.T) {
t.Parallel()
index := uint64(0)
state := testStateStore(t)
txn := state.db.Txn(true)
txn := state.db.WriteTxn(index)
// Create and insert a mock job that should be pending.
job := mock.Job()
@ -6099,7 +6101,7 @@ func TestStateStore_SetJobStatus_NoOp(t *testing.T) {
t.Fatalf("job insert failed: %v", err)
}
index := uint64(1000)
index = uint64(1000)
if err := state.setJobStatus(index, txn, job, false, ""); err != nil {
t.Fatalf("setJobStatus() failed: %v", err)
}
@ -6119,7 +6121,7 @@ func TestStateStore_SetJobStatus(t *testing.T) {
t.Parallel()
state := testStateStore(t)
txn := state.db.Txn(true)
txn := state.db.WriteTxn(uint64(0))
// Create and insert a mock job that should be pending but has an incorrect
// status.