Comment public functions and batch write txn
This commit is contained in:
parent
9c0a15f3ce
commit
8513b3cccb
|
@ -520,6 +520,9 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{}
|
|||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
|
||||
// Perform all store updates atomically to ensure a consistent views for store readers.
|
||||
// A partial update may increment the snapshot index, allowing eval brokers to process
|
||||
// evals for jobs whose deregistering didn't get committed yet.
|
||||
err := n.state.WithWriteTransaction(func(tx state.Txn) error {
|
||||
for jobNS, options := range req.Jobs {
|
||||
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil {
|
||||
|
@ -540,6 +543,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{}
|
|||
return err
|
||||
}
|
||||
|
||||
// perform the side effects outside the transactions
|
||||
n.handleUpsertedEvals(req.Evals)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -14,6 +14,8 @@ import (
|
|||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// Txn is a transaction against a state store.
|
||||
// This can be a read or write transaction.
|
||||
type Txn = *memdb.Txn
|
||||
|
||||
const (
|
||||
|
@ -925,6 +927,8 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// UpsertJobTxn is used to register a job or update a job definition, like UpsertJob,
|
||||
// but in a transcation. Useful for when making multiple modifications atomically
|
||||
func (s *StateStore) UpsertJobTxn(index uint64, job *structs.Job, txn Txn) error {
|
||||
return s.upsertJobImpl(index, job, false, txn)
|
||||
}
|
||||
|
@ -1019,6 +1023,8 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// DeleteJobTxn is used to deregister a job, like DeleteJob,
|
||||
// but in a transcation. Useful for when making multiple modifications atomically
|
||||
func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn) error {
|
||||
// COMPAT 0.7: Upgrade old objects that do not have namespaces
|
||||
if namespace == "" {
|
||||
|
@ -1206,6 +1212,8 @@ func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs.
|
|||
return s.JobByIDTxn(ws, namespace, id, txn)
|
||||
}
|
||||
|
||||
// JobByIDTxn is used to lookup a job by its ID, like JobByID. JobByID returns the job version
|
||||
// accessable through in the transaction
|
||||
func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn) (*structs.Job, error) {
|
||||
// COMPAT 0.7: Upgrade old objects that do not have namespaces
|
||||
if namespace == "" {
|
||||
|
@ -1534,6 +1542,8 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string)
|
|||
return err
|
||||
}
|
||||
|
||||
// DeletePeriodicLaunchTxn is used to delete the periodic launch, like DeletePeriodicLaunch
|
||||
// but in a transcation. Useful for when making multiple modifications atomically
|
||||
func (s *StateStore) DeletePeriodicLaunchTxn(index uint64, namespace, jobID string, txn Txn) error {
|
||||
// COMPAT 0.7: Upgrade old objects that do not have namespaces
|
||||
if namespace == "" {
|
||||
|
@ -1610,6 +1620,8 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
|
|||
return err
|
||||
}
|
||||
|
||||
// UpsertEvals is used to upsert a set of evaluations, like UpsertEvals
|
||||
// but in a transcation. Useful for when making multiple modifications atomically
|
||||
func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, txn Txn) error {
|
||||
// Do a nested upsert
|
||||
jobs := make(map[structs.NamespacedID]string, len(evals))
|
||||
|
@ -3919,6 +3931,9 @@ func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerCon
|
|||
return nil
|
||||
}
|
||||
|
||||
// WithWriteTransaction executes the passed function within a write transaction,
|
||||
// and returns its result. If the invocation returns no error, the transaction
|
||||
// is committed; otherwise, it's aborted.
|
||||
func (s *StateStore) WithWriteTransaction(fn func(Txn) error) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
|
Loading…
Reference in New Issue