nomad: adding UpdateAllocFromClient for client based updates
This commit is contained in:
parent
93a5b50483
commit
922eb37484
|
@ -475,6 +475,54 @@ func (s *StateStore) Evals() (memdb.ResultIterator, error) {
|
||||||
return iter, nil
|
return iter, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateAllocFromClient is used to update an allocation based on input
|
||||||
|
// from a client. While the schedulers are the authority on the allocation for
|
||||||
|
// most things, some updates are authoritative from the client. Specifically,
|
||||||
|
// the desired state comes from the schedulers, while the actual state comes
|
||||||
|
// from clients.
|
||||||
|
func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocation) error {
|
||||||
|
txn := s.db.Txn(true)
|
||||||
|
defer txn.Abort()
|
||||||
|
|
||||||
|
// Look for existing alloc
|
||||||
|
existing, err := txn.First("allocs", "id", alloc.ID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("alloc lookup failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Nothing to do if this does not exist
|
||||||
|
if existing == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
exist := existing.(*structs.Allocation)
|
||||||
|
|
||||||
|
// Copy everything from the existing allocation
|
||||||
|
copyAlloc := new(structs.Allocation)
|
||||||
|
*copyAlloc = *exist
|
||||||
|
|
||||||
|
// Pull in anything the client is the authority on
|
||||||
|
copyAlloc.ClientStatus = alloc.ClientStatus
|
||||||
|
copyAlloc.ClientDescription = alloc.ClientDescription
|
||||||
|
|
||||||
|
// Update the modify index
|
||||||
|
copyAlloc.ModifyIndex = index
|
||||||
|
|
||||||
|
// Update the allocation
|
||||||
|
if err := txn.Insert("allocs", copyAlloc); err != nil {
|
||||||
|
return fmt.Errorf("alloc insert failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the indexes
|
||||||
|
if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
|
||||||
|
return fmt.Errorf("index update failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := map[string]struct{}{alloc.NodeID: struct{}{}}
|
||||||
|
txn.Defer(func() { s.watch.notifyAllocs(nodes) })
|
||||||
|
txn.Commit()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateAllocations is used to evict a set of allocations
|
// UpdateAllocations is used to evict a set of allocations
|
||||||
// and allocate new ones at the same time.
|
// and allocate new ones at the same time.
|
||||||
func (s *StateStore) UpdateAllocations(index uint64, allocs []*structs.Allocation) error {
|
func (s *StateStore) UpdateAllocations(index uint64, allocs []*structs.Allocation) error {
|
||||||
|
|
|
@ -611,6 +611,43 @@ func TestStateStore_RestoreEval(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStateStore_UpdateAllocFromClient(t *testing.T) {
|
||||||
|
state := testStateStore(t)
|
||||||
|
|
||||||
|
alloc := mock.Alloc()
|
||||||
|
err := state.UpdateAllocations(1000, []*structs.Allocation{alloc})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
update := new(structs.Allocation)
|
||||||
|
*update = *alloc
|
||||||
|
update.ClientStatus = structs.AllocClientStatusFailed
|
||||||
|
|
||||||
|
err = state.UpdateAllocFromClient(1001, update)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err := state.GetAllocByID(alloc.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
update.ModifyIndex = 1001
|
||||||
|
if !reflect.DeepEqual(update, out) {
|
||||||
|
t.Fatalf("bad: %#v %#v", update, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
index, err := state.GetIndex("allocs")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if index != 1001 {
|
||||||
|
t.Fatalf("bad: %d", index)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestStateStore_UpsertAlloc_GetAlloc(t *testing.T) {
|
func TestStateStore_UpsertAlloc_GetAlloc(t *testing.T) {
|
||||||
state := testStateStore(t)
|
state := testStateStore(t)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue