nomad: CRUD for evals in state store
This commit is contained in:
parent
60e786100b
commit
a69c71b239
|
@ -304,6 +304,100 @@ func (s *StateStore) Jobs() (memdb.ResultIterator, error) {
|
|||
return iter, nil
|
||||
}
|
||||
|
||||
// UpsertEvaluation is used to upsert an evaluation
|
||||
func (s *StateStore) UpsertEval(index uint64, eval *structs.Evaluation) error {
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
|
||||
// Do a nested upsert
|
||||
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
txn.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction
|
||||
func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error {
|
||||
// Lookup the evaluation
|
||||
existing, err := txn.First("evals", "id", eval.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("eval lookup failed: %v", err)
|
||||
}
|
||||
|
||||
// Update the indexes
|
||||
if existing != nil {
|
||||
eval.CreateIndex = existing.(*structs.Evaluation).CreateIndex
|
||||
eval.ModifyIndex = index
|
||||
} else {
|
||||
eval.CreateIndex = index
|
||||
eval.ModifyIndex = index
|
||||
}
|
||||
|
||||
// Insert the eval
|
||||
if err := txn.Insert("evals", eval); err != nil {
|
||||
return fmt.Errorf("eval insert failed: %v", err)
|
||||
}
|
||||
if err := txn.Insert("index", &IndexEntry{"evals", index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteEval is used to delete an evaluation
|
||||
func (s *StateStore) DeleteEval(index uint64, evalID string) error {
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
|
||||
// Lookup the node
|
||||
existing, err := txn.First("evals", "id", evalID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("eval lookup failed: %v", err)
|
||||
}
|
||||
if existing == nil {
|
||||
return fmt.Errorf("eval not found")
|
||||
}
|
||||
|
||||
// Delete the node
|
||||
if err := txn.Delete("evals", existing); err != nil {
|
||||
return fmt.Errorf("eval delete failed: %v", err)
|
||||
}
|
||||
if err := txn.Insert("index", &IndexEntry{"evals", index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
txn.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetEvalByID is used to lookup an eval by its ID
|
||||
func (s *StateStore) GetEvalByID(id string) (*structs.Evaluation, error) {
|
||||
txn := s.db.Txn(false)
|
||||
|
||||
existing, err := txn.First("evals", "id", id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("eval lookup failed: %v", err)
|
||||
}
|
||||
|
||||
if existing != nil {
|
||||
return existing.(*structs.Evaluation), nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Evals returns an iterator over all the evaluations
|
||||
func (s *StateStore) Evals() (memdb.ResultIterator, error) {
|
||||
txn := s.db.Txn(false)
|
||||
|
||||
// Walk the entire table
|
||||
iter, err := txn.Get("evals", "id")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// GetIndex finds the matching index value
|
||||
func (s *StateStore) GetIndex(name string) (uint64, error) {
|
||||
txn := s.db.Txn(false)
|
||||
|
@ -347,6 +441,15 @@ func (r *StateRestore) JobRestore(job *structs.Job) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// EvalRestore is used to restore an evaluation
|
||||
func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error {
|
||||
if err := r.txn.Insert("evals", eval); err != nil {
|
||||
return fmt.Errorf("eval insert failed: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IndexRestore is used to restore an index
|
||||
func (r *StateRestore) IndexRestore(idx *IndexEntry) error {
|
||||
if err := r.txn.Insert("index", idx); err != nil {
|
||||
return fmt.Errorf("index insert failed: %v", err)
|
||||
|
|
|
@ -110,6 +110,14 @@ func mockJob() *structs.Job {
|
|||
return job
|
||||
}
|
||||
|
||||
func mockEval() *structs.Evaluation {
|
||||
eval := &structs.Evaluation{
|
||||
ID: generateUUID(),
|
||||
Status: structs.EvalStatusPending,
|
||||
}
|
||||
return eval
|
||||
}
|
||||
|
||||
func TestStateStore_RegisterNode_GetNode(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
node := mockNode()
|
||||
|
@ -486,6 +494,168 @@ func TestStateStore_RestoreIndex(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_UpsertEval_GetEval(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
eval := mockEval()
|
||||
|
||||
err := state.UpsertEval(1000, eval)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
out, err := state.GetEvalByID(eval.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(eval, out) {
|
||||
t.Fatalf("bad: %#v %#v", eval, out)
|
||||
}
|
||||
|
||||
index, err := state.GetIndex("evals")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if index != 1000 {
|
||||
t.Fatalf("bad: %d", index)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Update_UpsertEval_GetEval(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
eval := mockEval()
|
||||
|
||||
err := state.UpsertEval(1000, eval)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
eval2 := mockEval()
|
||||
eval2.ID = eval.ID
|
||||
err = state.UpsertEval(1001, eval2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
out, err := state.GetEvalByID(eval.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(eval2, out) {
|
||||
t.Fatalf("bad: %#v %#v", eval2, out)
|
||||
}
|
||||
|
||||
if out.CreateIndex != 1000 {
|
||||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
if out.ModifyIndex != 1001 {
|
||||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
|
||||
index, err := state.GetIndex("evals")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if index != 1001 {
|
||||
t.Fatalf("bad: %d", index)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_DeleteEval_GetEval(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
job := mockEval()
|
||||
|
||||
err := state.UpsertEval(1000, job)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
err = state.DeleteEval(1001, job.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
out, err := state.GetEvalByID(job.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if out != nil {
|
||||
t.Fatalf("bad: %#v %#v", job, out)
|
||||
}
|
||||
|
||||
index, err := state.GetIndex("evals")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if index != 1001 {
|
||||
t.Fatalf("bad: %d", index)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Evals(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
var evals []*structs.Evaluation
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
eval := mockEval()
|
||||
evals = append(evals, eval)
|
||||
|
||||
err := state.UpsertEval(1000+uint64(i), eval)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
iter, err := state.Evals()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
var out []*structs.Evaluation
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
out = append(out, raw.(*structs.Evaluation))
|
||||
}
|
||||
|
||||
sort.Sort(EvalIDSort(evals))
|
||||
sort.Sort(EvalIDSort(out))
|
||||
|
||||
if !reflect.DeepEqual(evals, out) {
|
||||
t.Fatalf("bad: %#v %#v", evals, out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_RestoreEval(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
|
||||
restore, err := state.Restore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
job := mockEval()
|
||||
err = restore.EvalRestore(job)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
restore.Commit()
|
||||
|
||||
out, err := state.GetEvalByID(job.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(out, job) {
|
||||
t.Fatalf("Bad: %#v %#v", out, job)
|
||||
}
|
||||
}
|
||||
|
||||
// NodeIDSort is used to sort nodes by ID
|
||||
type NodeIDSort []*structs.Node
|
||||
|
||||
|
@ -515,3 +685,18 @@ func (n JobIDSort) Less(i, j int) bool {
|
|||
func (n JobIDSort) Swap(i, j int) {
|
||||
n[i], n[j] = n[j], n[i]
|
||||
}
|
||||
|
||||
// EvalIDis used to sort evals by id
|
||||
type EvalIDSort []*structs.Evaluation
|
||||
|
||||
func (n EvalIDSort) Len() int {
|
||||
return len(n)
|
||||
}
|
||||
|
||||
func (n EvalIDSort) Less(i, j int) bool {
|
||||
return n[i].ID < n[j].ID
|
||||
}
|
||||
|
||||
func (n EvalIDSort) Swap(i, j int) {
|
||||
n[i], n[j] = n[j], n[i]
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue