diff --git a/nomad/fsm.go b/nomad/fsm.go index 722801134..fa9d54aca 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -520,6 +520,9 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} panic(fmt.Errorf("failed to decode request: %v", err)) } + // Perform all store updates atomically to ensure a consistent views for store readers. + // A partial update may increment the snapshot index, allowing eval brokers to process + // evals for jobs whose deregistering didn't get committed yet. err := n.state.WithWriteTransaction(func(tx state.Txn) error { for jobNS, options := range req.Jobs { if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil { @@ -540,6 +543,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} return err } + // perform the side effects outside the transactions n.handleUpsertedEvals(req.Evals) return nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1b953fc64..191a2ea3b 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -14,6 +14,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +// Txn is a transaction against a state store. +// This can be a read or write transaction. type Txn = *memdb.Txn const ( @@ -925,6 +927,8 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return nil } +// UpsertJobTxn is used to register a job or update a job definition, like UpsertJob, +// but in a transcation. Useful for when making multiple modifications atomically func (s *StateStore) UpsertJobTxn(index uint64, job *structs.Job, txn Txn) error { return s.upsertJobImpl(index, job, false, txn) } @@ -1019,6 +1023,8 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error { return err } +// DeleteJobTxn is used to deregister a job, like DeleteJob, +// but in a transcation. Useful for when making multiple modifications atomically func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn) error { // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { @@ -1206,6 +1212,8 @@ func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs. return s.JobByIDTxn(ws, namespace, id, txn) } +// JobByIDTxn is used to lookup a job by its ID, like JobByID. JobByID returns the job version +// accessable through in the transaction func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn) (*structs.Job, error) { // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { @@ -1534,6 +1542,8 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string) return err } +// DeletePeriodicLaunchTxn is used to delete the periodic launch, like DeletePeriodicLaunch +// but in a transcation. Useful for when making multiple modifications atomically func (s *StateStore) DeletePeriodicLaunchTxn(index uint64, namespace, jobID string, txn Txn) error { // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { @@ -1610,6 +1620,8 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro return err } +// UpsertEvals is used to upsert a set of evaluations, like UpsertEvals +// but in a transcation. Useful for when making multiple modifications atomically func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, txn Txn) error { // Do a nested upsert jobs := make(map[structs.NamespacedID]string, len(evals)) @@ -3919,6 +3931,9 @@ func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerCon return nil } +// WithWriteTransaction executes the passed function within a write transaction, +// and returns its result. If the invocation returns no error, the transaction +// is committed; otherwise, it's aborted. func (s *StateStore) WithWriteTransaction(fn func(Txn) error) error { tx := s.db.Txn(true) defer tx.Abort()