diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 859353726..9e5c09b76 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -475,6 +475,54 @@ func (s *StateStore) Evals() (memdb.ResultIterator, error) { return iter, nil } +// UpdateAllocFromClient is used to update an allocation based on input +// from a client. While the schedulers are the authority on the allocation for +// most things, some updates are authoritative from the client. Specifically, +// the desired state comes from the schedulers, while the actual state comes +// from clients. +func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocation) error { + txn := s.db.Txn(true) + defer txn.Abort() + + // Look for existing alloc + existing, err := txn.First("allocs", "id", alloc.ID) + if err != nil { + return fmt.Errorf("alloc lookup failed: %v", err) + } + + // Nothing to do if this does not exist + if existing == nil { + return nil + } + exist := existing.(*structs.Allocation) + + // Copy everything from the existing allocation + copyAlloc := new(structs.Allocation) + *copyAlloc = *exist + + // Pull in anything the client is the authority on + copyAlloc.ClientStatus = alloc.ClientStatus + copyAlloc.ClientDescription = alloc.ClientDescription + + // Update the modify index + copyAlloc.ModifyIndex = index + + // Update the allocation + if err := txn.Insert("allocs", copyAlloc); err != nil { + return fmt.Errorf("alloc insert failed: %v", err) + } + + // Update the indexes + if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + nodes := map[string]struct{}{alloc.NodeID: struct{}{}} + txn.Defer(func() { s.watch.notifyAllocs(nodes) }) + txn.Commit() + return nil +} + // UpdateAllocations is used to evict a set of allocations // and allocate new ones at the same time. func (s *StateStore) UpdateAllocations(index uint64, allocs []*structs.Allocation) error { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 593374349..b6c971e5e 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -611,6 +611,43 @@ func TestStateStore_RestoreEval(t *testing.T) { } } +func TestStateStore_UpdateAllocFromClient(t *testing.T) { + state := testStateStore(t) + + alloc := mock.Alloc() + err := state.UpdateAllocations(1000, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + update := new(structs.Allocation) + *update = *alloc + update.ClientStatus = structs.AllocClientStatusFailed + + err = state.UpdateAllocFromClient(1001, update) + if err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.GetAllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + update.ModifyIndex = 1001 + if !reflect.DeepEqual(update, out) { + t.Fatalf("bad: %#v %#v", update, out) + } + + index, err := state.GetIndex("allocs") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1001 { + t.Fatalf("bad: %d", index) + } +} + func TestStateStore_UpsertAlloc_GetAlloc(t *testing.T) { state := testStateStore(t)