nomad: state store CRUD for allocations
This commit is contained in:
parent
1034eec8b3
commit
e322534550
|
@ -176,27 +176,6 @@ func allocTableSchema() *memdb.TableSchema {
|
|||
},
|
||||
},
|
||||
|
||||
// Job index is used to lookup allocations by job.
|
||||
// It is a compound index on {JobName, TaskGroupName}
|
||||
"job": &memdb.IndexSchema{
|
||||
Name: "job",
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
AllowMissing: false,
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "JobName",
|
||||
Lowercase: true,
|
||||
},
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "TaskGroupName",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
// Node index is used to lookup allocations by node
|
||||
"node": &memdb.IndexSchema{
|
||||
Name: "node",
|
||||
|
|
|
@ -401,6 +401,81 @@ func (s *StateStore) Evals() (memdb.ResultIterator, error) {
|
|||
return iter, nil
|
||||
}
|
||||
|
||||
// UpdateAllocations is used to evict a set of allocations
|
||||
// and allocate new ones at the same time.
|
||||
func (s *StateStore) UpdateAllocations(index uint64, evicts []string,
|
||||
allocs []*structs.Allocation) error {
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
|
||||
// Handle evictions first
|
||||
for _, evict := range evicts {
|
||||
existing, err := txn.First("allocs", "id", evict)
|
||||
if err != nil {
|
||||
return fmt.Errorf("alloc lookup failed: %v", err)
|
||||
}
|
||||
if existing == nil {
|
||||
continue
|
||||
}
|
||||
if err := txn.Delete("allocs", existing); err != nil {
|
||||
return fmt.Errorf("alloc delete failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle the allocations
|
||||
for _, alloc := range allocs {
|
||||
existing, err := txn.First("allocs", "id", alloc.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("alloc lookup failed: %v", err)
|
||||
}
|
||||
if existing == nil {
|
||||
alloc.CreateIndex = index
|
||||
alloc.ModifyIndex = index
|
||||
} else {
|
||||
alloc.CreateIndex = existing.(*structs.Allocation).CreateIndex
|
||||
alloc.ModifyIndex = index
|
||||
}
|
||||
if err := txn.Insert("allocs", alloc); err != nil {
|
||||
return fmt.Errorf("alloc insert failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Update the indexes
|
||||
if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
txn.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAllocByID is used to lookup an allocation by its ID
|
||||
func (s *StateStore) GetAllocByID(id string) (*structs.Allocation, error) {
|
||||
txn := s.db.Txn(false)
|
||||
|
||||
existing, err := txn.First("allocs", "id", id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("alloc lookup failed: %v", err)
|
||||
}
|
||||
|
||||
if existing != nil {
|
||||
return existing.(*structs.Allocation), nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Allocs returns an iterator over all the evaluations
|
||||
func (s *StateStore) Allocs() (memdb.ResultIterator, error) {
|
||||
txn := s.db.Txn(false)
|
||||
|
||||
// Walk the entire table
|
||||
iter, err := txn.Get("allocs", "id")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// GetIndex finds the matching index value
|
||||
func (s *StateStore) GetIndex(name string) (uint64, error) {
|
||||
txn := s.db.Txn(false)
|
||||
|
@ -452,6 +527,14 @@ func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// AllocRestore is used to restore an allocation
|
||||
func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error {
|
||||
if err := r.txn.Insert("allocs", alloc); err != nil {
|
||||
return fmt.Errorf("alloc insert failed: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IndexRestore is used to restore an index
|
||||
func (r *StateRestore) IndexRestore(idx *IndexEntry) error {
|
||||
if err := r.txn.Insert("index", idx); err != nil {
|
||||
|
|
|
@ -121,6 +121,14 @@ func mockEval() *structs.Evaluation {
|
|||
return eval
|
||||
}
|
||||
|
||||
func mockAlloc() *structs.Allocation {
|
||||
alloc := &structs.Allocation{
|
||||
ID: generateUUID(),
|
||||
NodeID: "foo",
|
||||
}
|
||||
return alloc
|
||||
}
|
||||
|
||||
func TestStateStore_RegisterNode_GetNode(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
node := mockNode()
|
||||
|
@ -659,6 +667,172 @@ func TestStateStore_RestoreEval(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_UpsertAlloc_GetAlloc(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
|
||||
alloc := mockAlloc()
|
||||
err := state.UpdateAllocations(1000, nil,
|
||||
[]*structs.Allocation{alloc})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
out, err := state.GetAllocByID(alloc.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(alloc, out) {
|
||||
t.Fatalf("bad: %#v %#v", alloc, out)
|
||||
}
|
||||
|
||||
index, err := state.GetIndex("allocs")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if index != 1000 {
|
||||
t.Fatalf("bad: %d", index)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_UpdateAlloc_GetAlloc(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
alloc := mockAlloc()
|
||||
|
||||
err := state.UpdateAllocations(1000, nil,
|
||||
[]*structs.Allocation{alloc})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
alloc2 := mockAlloc()
|
||||
alloc2.ID = alloc.ID
|
||||
alloc2.NodeID = alloc.NodeID + ".new"
|
||||
err = state.UpdateAllocations(1001, nil,
|
||||
[]*structs.Allocation{alloc2})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
out, err := state.GetAllocByID(alloc.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(alloc2, out) {
|
||||
t.Fatalf("bad: %#v %#v", alloc2, out)
|
||||
}
|
||||
|
||||
if out.CreateIndex != 1000 {
|
||||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
if out.ModifyIndex != 1001 {
|
||||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
|
||||
index, err := state.GetIndex("allocs")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if index != 1001 {
|
||||
t.Fatalf("bad: %d", index)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_EvictAlloc_GetAlloc(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
alloc := mockAlloc()
|
||||
|
||||
err := state.UpdateAllocations(1001, nil, []*structs.Allocation{alloc})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
err = state.UpdateAllocations(1001, []string{alloc.ID}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
out, err := state.GetAllocByID(alloc.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if out != nil {
|
||||
t.Fatalf("bad: %#v %#v", alloc, out)
|
||||
}
|
||||
|
||||
index, err := state.GetIndex("allocs")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if index != 1001 {
|
||||
t.Fatalf("bad: %d", index)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Allocs(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
var allocs []*structs.Allocation
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
alloc := mockAlloc()
|
||||
allocs = append(allocs, alloc)
|
||||
}
|
||||
|
||||
err := state.UpdateAllocations(1000, nil, allocs)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
iter, err := state.Allocs()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
var out []*structs.Allocation
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
out = append(out, raw.(*structs.Allocation))
|
||||
}
|
||||
|
||||
sort.Sort(AllocIDSort(allocs))
|
||||
sort.Sort(AllocIDSort(out))
|
||||
|
||||
if !reflect.DeepEqual(allocs, out) {
|
||||
t.Fatalf("bad: %#v %#v", allocs, out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_RestoreAlloc(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
|
||||
restore, err := state.Restore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
alloc := mockAlloc()
|
||||
err = restore.AllocRestore(alloc)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
restore.Commit()
|
||||
|
||||
out, err := state.GetAllocByID(alloc.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(out, alloc) {
|
||||
t.Fatalf("Bad: %#v %#v", out, alloc)
|
||||
}
|
||||
}
|
||||
|
||||
// NodeIDSort is used to sort nodes by ID
|
||||
type NodeIDSort []*structs.Node
|
||||
|
||||
|
@ -703,3 +877,18 @@ func (n EvalIDSort) Less(i, j int) bool {
|
|||
func (n EvalIDSort) Swap(i, j int) {
|
||||
n[i], n[j] = n[j], n[i]
|
||||
}
|
||||
|
||||
// AllocIDsort used to sort allocations by id
|
||||
type AllocIDSort []*structs.Allocation
|
||||
|
||||
func (n AllocIDSort) Len() int {
|
||||
return len(n)
|
||||
}
|
||||
|
||||
func (n AllocIDSort) Less(i, j int) bool {
|
||||
return n[i].ID < n[j].ID
|
||||
}
|
||||
|
||||
func (n AllocIDSort) Swap(i, j int) {
|
||||
n[i], n[j] = n[j], n[i]
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue