From e32253455082efea9cfc498eac25d4646ead59d9 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 4 Aug 2015 13:56:41 -0700 Subject: [PATCH] nomad: state store CRUD for allocations --- nomad/schema.go | 21 ----- nomad/state_store.go | 83 +++++++++++++++++ nomad/state_store_test.go | 189 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 272 insertions(+), 21 deletions(-) diff --git a/nomad/schema.go b/nomad/schema.go index 7b247ef35..6400de70d 100644 --- a/nomad/schema.go +++ b/nomad/schema.go @@ -176,27 +176,6 @@ func allocTableSchema() *memdb.TableSchema { }, }, - // Job index is used to lookup allocations by job. - // It is a compound index on {JobName, TaskGroupName} - "job": &memdb.IndexSchema{ - Name: "job", - AllowMissing: false, - Unique: false, - Indexer: &memdb.CompoundIndex{ - AllowMissing: false, - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "JobName", - Lowercase: true, - }, - &memdb.StringFieldIndex{ - Field: "TaskGroupName", - Lowercase: true, - }, - }, - }, - }, - // Node index is used to lookup allocations by node "node": &memdb.IndexSchema{ Name: "node", diff --git a/nomad/state_store.go b/nomad/state_store.go index a21179a23..5b2fa9410 100644 --- a/nomad/state_store.go +++ b/nomad/state_store.go @@ -401,6 +401,81 @@ func (s *StateStore) Evals() (memdb.ResultIterator, error) { return iter, nil } +// UpdateAllocations is used to evict a set of allocations +// and allocate new ones at the same time. +func (s *StateStore) UpdateAllocations(index uint64, evicts []string, + allocs []*structs.Allocation) error { + txn := s.db.Txn(true) + defer txn.Abort() + + // Handle evictions first + for _, evict := range evicts { + existing, err := txn.First("allocs", "id", evict) + if err != nil { + return fmt.Errorf("alloc lookup failed: %v", err) + } + if existing == nil { + continue + } + if err := txn.Delete("allocs", existing); err != nil { + return fmt.Errorf("alloc delete failed: %v", err) + } + } + + // Handle the allocations + for _, alloc := range allocs { + existing, err := txn.First("allocs", "id", alloc.ID) + if err != nil { + return fmt.Errorf("alloc lookup failed: %v", err) + } + if existing == nil { + alloc.CreateIndex = index + alloc.ModifyIndex = index + } else { + alloc.CreateIndex = existing.(*structs.Allocation).CreateIndex + alloc.ModifyIndex = index + } + if err := txn.Insert("allocs", alloc); err != nil { + return fmt.Errorf("alloc insert failed: %v", err) + } + } + + // Update the indexes + if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + txn.Commit() + return nil +} + +// GetAllocByID is used to lookup an allocation by its ID +func (s *StateStore) GetAllocByID(id string) (*structs.Allocation, error) { + txn := s.db.Txn(false) + + existing, err := txn.First("allocs", "id", id) + if err != nil { + return nil, fmt.Errorf("alloc lookup failed: %v", err) + } + + if existing != nil { + return existing.(*structs.Allocation), nil + } + return nil, nil +} + +// Allocs returns an iterator over all the evaluations +func (s *StateStore) Allocs() (memdb.ResultIterator, error) { + txn := s.db.Txn(false) + + // Walk the entire table + iter, err := txn.Get("allocs", "id") + if err != nil { + return nil, err + } + return iter, nil +} + // GetIndex finds the matching index value func (s *StateStore) GetIndex(name string) (uint64, error) { txn := s.db.Txn(false) @@ -452,6 +527,14 @@ func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error { return nil } +// AllocRestore is used to restore an allocation +func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error { + if err := r.txn.Insert("allocs", alloc); err != nil { + return fmt.Errorf("alloc insert failed: %v", err) + } + return nil +} + // IndexRestore is used to restore an index func (r *StateRestore) IndexRestore(idx *IndexEntry) error { if err := r.txn.Insert("index", idx); err != nil { diff --git a/nomad/state_store_test.go b/nomad/state_store_test.go index 6c07abb19..afe8f6478 100644 --- a/nomad/state_store_test.go +++ b/nomad/state_store_test.go @@ -121,6 +121,14 @@ func mockEval() *structs.Evaluation { return eval } +func mockAlloc() *structs.Allocation { + alloc := &structs.Allocation{ + ID: generateUUID(), + NodeID: "foo", + } + return alloc +} + func TestStateStore_RegisterNode_GetNode(t *testing.T) { state := testStateStore(t) node := mockNode() @@ -659,6 +667,172 @@ func TestStateStore_RestoreEval(t *testing.T) { } } +func TestStateStore_UpsertAlloc_GetAlloc(t *testing.T) { + state := testStateStore(t) + + alloc := mockAlloc() + err := state.UpdateAllocations(1000, nil, + []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.GetAllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(alloc, out) { + t.Fatalf("bad: %#v %#v", alloc, out) + } + + index, err := state.GetIndex("allocs") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1000 { + t.Fatalf("bad: %d", index) + } +} + +func TestStateStore_UpdateAlloc_GetAlloc(t *testing.T) { + state := testStateStore(t) + alloc := mockAlloc() + + err := state.UpdateAllocations(1000, nil, + []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + alloc2 := mockAlloc() + alloc2.ID = alloc.ID + alloc2.NodeID = alloc.NodeID + ".new" + err = state.UpdateAllocations(1001, nil, + []*structs.Allocation{alloc2}) + if err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.GetAllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(alloc2, out) { + t.Fatalf("bad: %#v %#v", alloc2, out) + } + + if out.CreateIndex != 1000 { + t.Fatalf("bad: %#v", out) + } + if out.ModifyIndex != 1001 { + t.Fatalf("bad: %#v", out) + } + + index, err := state.GetIndex("allocs") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1001 { + t.Fatalf("bad: %d", index) + } +} + +func TestStateStore_EvictAlloc_GetAlloc(t *testing.T) { + state := testStateStore(t) + alloc := mockAlloc() + + err := state.UpdateAllocations(1001, nil, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + err = state.UpdateAllocations(1001, []string{alloc.ID}, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.GetAllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if out != nil { + t.Fatalf("bad: %#v %#v", alloc, out) + } + + index, err := state.GetIndex("allocs") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1001 { + t.Fatalf("bad: %d", index) + } +} + +func TestStateStore_Allocs(t *testing.T) { + state := testStateStore(t) + var allocs []*structs.Allocation + + for i := 0; i < 10; i++ { + alloc := mockAlloc() + allocs = append(allocs, alloc) + } + + err := state.UpdateAllocations(1000, nil, allocs) + if err != nil { + t.Fatalf("err: %v", err) + } + + iter, err := state.Allocs() + if err != nil { + t.Fatalf("err: %v", err) + } + + var out []*structs.Allocation + for { + raw := iter.Next() + if raw == nil { + break + } + out = append(out, raw.(*structs.Allocation)) + } + + sort.Sort(AllocIDSort(allocs)) + sort.Sort(AllocIDSort(out)) + + if !reflect.DeepEqual(allocs, out) { + t.Fatalf("bad: %#v %#v", allocs, out) + } +} + +func TestStateStore_RestoreAlloc(t *testing.T) { + state := testStateStore(t) + + restore, err := state.Restore() + if err != nil { + t.Fatalf("err: %v", err) + } + + alloc := mockAlloc() + err = restore.AllocRestore(alloc) + if err != nil { + t.Fatalf("err: %v", err) + } + + restore.Commit() + + out, err := state.GetAllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(out, alloc) { + t.Fatalf("Bad: %#v %#v", out, alloc) + } +} + // NodeIDSort is used to sort nodes by ID type NodeIDSort []*structs.Node @@ -703,3 +877,18 @@ func (n EvalIDSort) Less(i, j int) bool { func (n EvalIDSort) Swap(i, j int) { n[i], n[j] = n[j], n[i] } + +// AllocIDsort used to sort allocations by id +type AllocIDSort []*structs.Allocation + +func (n AllocIDSort) Len() int { + return len(n) +} + +func (n AllocIDSort) Less(i, j int) bool { + return n[i].ID < n[j].ID +} + +func (n AllocIDSort) Swap(i, j int) { + n[i], n[j] = n[j], n[i] +}