client: de-duplicate alloc updates and gate during restore (#17074)

When client nodes are restarted, all allocations that have been scheduled on the
node have their modify index updated, including terminal allocations. There are
several contributing factors:

* The `allocSync` method that updates the servers isn't gated on first contact
  with the servers. This means that if a server updates the desired state while
  the client is down, the `allocSync` races with the `Node.ClientGetAlloc`
  RPC. This will typically result in the client updating the server with "running"
  and then immediately thereafter "complete".

* The `allocSync` method unconditionally sends the `Node.UpdateAlloc` RPC even
  if it's possible to assert that the server has definitely seen the client
  state. The allocrunner may queue-up updates even if we gate sending them. So
  then we end up with a race between the allocrunner updating its internal state
  to overwrite the previous update and `allocSync` sending the bogus or duplicate
  update.

This changeset adds tracking of server-acknowledged state to the
allocrunner. This state gets checked in the `allocSync` before adding the update
to the batch, and updated when `Node.UpdateAlloc` returns successfully. To
implement this we need to be able to equality-check the updates against the last
acknowledged state. We also need to add the last acknowledged state to the
client state DB, otherwise we'd drop unacknowledged updates across restarts.

The client restart test has been expanded to cover a variety of allocation
states, including allocs stopped before shutdown, allocs stopped by the server
while the client is down, and allocs that have been completely GC'd on the
server while the client is down. I've also bench tested scenarios where the task
workload is killed while the client is down, resulting in a failed restore.

Fixes #16381
This commit is contained in:
Tim Gross 2023-05-11 09:05:24 -04:00 committed by GitHub
parent 4abb3e03ca
commit 9ed75e1f72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 630 additions and 85 deletions

3
.changelog/17074.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
client: de-duplicate allocation client status updates and prevent allocation client status updates from being sent until clients have first synchronized with the server
```

View File

@ -11,6 +11,7 @@ import (
log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"golang.org/x/exp/maps"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
@ -123,6 +124,10 @@ type allocRunner struct {
state *state.State
stateLock sync.RWMutex
// lastAcknowledgedState is the alloc runner state that was last
// acknowledged by the server (may lag behind ar.state)
lastAcknowledgedState *state.State
stateDB cstate.StateDB
// allocDir is used to build the allocations directory structure.
@ -738,8 +743,9 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
return states
}
// clientAlloc takes in the task states and returns an Allocation populated
// with Client specific fields
// clientAlloc takes in the task states and returns an Allocation populated with
// Client specific fields. Note: this mutates the allocRunner's state to store
// the taskStates!
func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *structs.Allocation {
ar.stateLock.Lock()
defer ar.stateLock.Unlock()
@ -1394,3 +1400,50 @@ func (ar *allocRunner) GetTaskDriverCapabilities(taskName string) (*drivers.Capa
return tr.DriverCapabilities()
}
// AcknowledgeState is called by the client's alloc sync when a given client
// state has been acknowledged by the server
func (ar *allocRunner) AcknowledgeState(a *state.State) {
ar.stateLock.Lock()
defer ar.stateLock.Unlock()
ar.lastAcknowledgedState = a
ar.persistLastAcknowledgedState(a)
}
// persistLastAcknowledgedState stores the last client state acknowledged by the server
func (ar *allocRunner) persistLastAcknowledgedState(a *state.State) {
if err := ar.stateDB.PutAcknowledgedState(ar.id, a); err != nil {
// While any persistence errors are very bad, the worst case scenario
// for failing to persist last acknowledged state is that if the agent
// is restarted it will send the update again.
ar.logger.Error("error storing acknowledged allocation status", "error", err)
}
}
// LastAcknowledgedStateIsCurrent returns true if the current state matches the
// state that was last acknowledged from a server update. This is called from
// the client in the same goroutine that called AcknowledgeState so that we
// can't get a TOCTOU error.
func (ar *allocRunner) LastAcknowledgedStateIsCurrent(a *structs.Allocation) bool {
ar.stateLock.RLock()
defer ar.stateLock.RUnlock()
last := ar.lastAcknowledgedState
if last == nil {
return false
}
switch {
case last.ClientStatus != a.ClientStatus:
return false
case last.ClientDescription != a.ClientDescription:
return false
case !last.DeploymentStatus.Equal(a.DeploymentStatus):
return false
case !last.NetworkStatus.Equal(a.NetworkStatus):
return false
}
return maps.EqualFunc(last.TaskStates, a.TaskStates, func(st, o *structs.TaskState) bool {
return st.Equal(o)
})
}

View File

@ -14,8 +14,13 @@ import (
"github.com/hashicorp/consul/api"
multierror "github.com/hashicorp/go-multierror"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/allochealth"
arstate "github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/client/allocrunner/tasklifecycle"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
"github.com/hashicorp/nomad/client/allocwatcher"
@ -26,9 +31,6 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/require"
)
// destroy does a blocking destroy on an alloc runner
@ -2443,3 +2445,59 @@ func TestAllocRunner_PreKill_RunOnDone(t *testing.T) {
wait.Gap(500*time.Millisecond),
))
}
func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) {
ci.Parallel(t)
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{"run_for": "2ms"}
alloc.DesiredStatus = "stop"
conf, cleanup := testAllocRunnerConfig(t, alloc.Copy())
t.Cleanup(cleanup)
ar, err := NewAllocRunner(conf)
must.NoError(t, err)
ar.SetNetworkStatus(&structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "192.168.1.1",
DNS: &structs.DNSConfig{},
})
calloc := ar.clientAlloc(map[string]*structs.TaskState{})
ar.AcknowledgeState(&arstate.State{
ClientStatus: calloc.ClientStatus,
ClientDescription: calloc.ClientDescription,
DeploymentStatus: calloc.DeploymentStatus,
TaskStates: calloc.TaskStates,
NetworkStatus: calloc.NetworkStatus,
})
must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))
// clientAlloc mutates the state, so verify this doesn't break the check
// without state having been updated
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))
// make a no-op state update
ar.SetNetworkStatus(&structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "192.168.1.1",
DNS: &structs.DNSConfig{},
})
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))
// make a state update that should be detected as a change
ar.SetNetworkStatus(&structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "192.168.2.1",
DNS: &structs.DNSConfig{},
})
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.False(t, ar.LastAcknowledgedStateIsCurrent(calloc))
}

View File

@ -160,6 +160,8 @@ type AllocRunner interface {
Signal(taskName, signal string) error
GetTaskEventHandler(taskName string) drivermanager.EventHandler
PersistState() error
AcknowledgeState(*arstate.State)
LastAcknowledgedStateIsCurrent(*structs.Allocation) bool
RestartTask(taskName string, taskEvent *structs.TaskEvent) error
RestartRunning(taskEvent *structs.TaskEvent) error
@ -512,7 +514,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.serviceRegWrapper = wrapper.NewHandlerWrapper(c.logger, c.consulService, c.nomadService)
// Batching of initial fingerprints is done to reduce the number of node
// updates sent to the server on startup. This is the first RPC to the servers
// updates sent to the server on startup.
go c.batchFirstFingerprints()
// create heartbeatStop. We go after the first attempt to connect to the server, so
@ -1270,6 +1272,14 @@ func (c *Client) restoreState() error {
continue
}
allocState, err := c.stateDB.GetAcknowledgedState(alloc.ID)
if err != nil {
c.logger.Error("error restoring last acknowledged alloc state, will update again",
err, "alloc_id", alloc.ID)
} else {
ar.AcknowledgeState(allocState)
}
// Maybe mark the alloc for halt on missing server heartbeats
if c.heartbeatStop.shouldStop(alloc) {
err = c.heartbeatStop.stopAlloc(alloc.ID)
@ -2144,10 +2154,20 @@ func (c *Client) allocSync() {
if len(updates) == 0 {
continue
}
// Ensure we never send an update before we've had at least one sync
// from the server
select {
case <-c.serversContactedCh:
default:
continue
}
sync := make([]*structs.Allocation, 0, len(updates))
for _, alloc := range updates {
sync = append(sync, alloc)
sync := c.filterAcknowledgedUpdates(updates)
if len(sync) == 0 {
// No updates to send
updates = make(map[string]*structs.Allocation, len(updates))
syncTicker.Reset(allocSyncIntv)
continue
}
// Send to server.
@ -2162,23 +2182,53 @@ func (c *Client) allocSync() {
// Error updating allocations, do *not* clear
// updates and retry after backoff
c.logger.Error("error updating allocations", "error", err)
syncTicker.Stop()
syncTicker = time.NewTicker(c.retryIntv(allocSyncRetryIntv))
syncTicker.Reset(c.retryIntv(allocSyncRetryIntv))
continue
}
c.allocLock.RLock()
for _, update := range sync {
if ar, ok := c.allocs[update.ID]; ok {
ar.AcknowledgeState(&arstate.State{
ClientStatus: update.ClientStatus,
ClientDescription: update.ClientDescription,
DeploymentStatus: update.DeploymentStatus,
TaskStates: update.TaskStates,
NetworkStatus: update.NetworkStatus,
})
}
}
c.allocLock.RUnlock()
// Successfully updated allocs, reset map and ticker.
// Always reset ticker to give loop time to receive
// alloc updates. If the RPC took the ticker interval
// we may call it in a tight loop before draining
// buffered updates.
updates = make(map[string]*structs.Allocation, len(updates))
syncTicker.Stop()
syncTicker = time.NewTicker(allocSyncIntv)
syncTicker.Reset(allocSyncIntv)
}
}
}
func (c *Client) filterAcknowledgedUpdates(updates map[string]*structs.Allocation) []*structs.Allocation {
sync := make([]*structs.Allocation, 0, len(updates))
c.allocLock.RLock()
defer c.allocLock.RUnlock()
for allocID, update := range updates {
if ar, ok := c.allocs[allocID]; ok {
if !ar.LastAcknowledgedStateIsCurrent(update) {
sync = append(sync, update)
}
} else {
// no allocrunner (typically a failed placement), so we need
// to send update
sync = append(sync, update)
}
}
return sync
}
// allocUpdates holds the results of receiving updated allocations from the
// servers.
type allocUpdates struct {

View File

@ -16,6 +16,7 @@ import (
memdb "github.com/hashicorp/go-memdb"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -554,67 +555,175 @@ func waitTilNodeReady(client *Client, t *testing.T) {
})
}
// TestClient_SaveRestoreState exercises the allocrunner restore code paths
// after a client restart. It runs several jobs in different states and asserts
// the expected final state and server updates.
func TestClient_SaveRestoreState(t *testing.T) {
ci.Parallel(t)
s1, _, cleanupS1 := testServer(t, nil)
defer cleanupS1()
t.Cleanup(cleanupS1)
testutil.WaitForLeader(t, s1.RPC)
c1, cleanupC1 := TestClient(t, func(c *config.Config) {
c.DevMode = false
c.RPCHandler = s1
})
defer cleanupC1()
t.Cleanup(func() {
for _, ar := range c1.getAllocRunners() {
ar.Destroy()
}
for _, ar := range c1.getAllocRunners() {
<-ar.DestroyCh()
}
cleanupC1()
})
// Wait until the node is ready
waitTilNodeReady(c1, t)
// Create mock allocations
job := mock.Job()
alloc1 := mock.Alloc()
alloc1.NodeID = c1.Node().ID
alloc1.Job = job
alloc1.JobID = job.ID
alloc1.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
alloc1.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}
alloc1.ClientStatus = structs.AllocClientStatusRunning
migrateStrategy := structs.DefaultMigrateStrategy()
migrateStrategy.MinHealthyTime = time.Millisecond
migrateStrategy.HealthCheck = structs.MigrateStrategyHealthStates
state := s1.State()
if err := state.UpsertJob(structs.MsgTypeTestSetup, 100, nil, job); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1}); err != nil {
t.Fatalf("err: %v", err)
// Create mock jobs and allocations that will start up fast
setup := func(id string) *structs.Job {
job := mock.MinJob()
job.ID = id
job.TaskGroups[0].Migrate = migrateStrategy
must.NoError(t, s1.RPC("Job.Register", &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global", Namespace: job.Namespace},
}, &structs.JobRegisterResponse{}))
return job
}
// Allocations should get registered
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
ar := c1.allocs[alloc1.ID]
c1.allocLock.RUnlock()
if ar == nil {
return false, fmt.Errorf("nil alloc runner")
}
if ar.Alloc().ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("client status: got %v; want %v", ar.Alloc().ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// job1: will be left running
// job2: will be stopped before shutdown
// job3: will be stopped after shutdown
// job4: will be stopped and GC'd after shutdown
job1, job2, job3, job4 := setup("job1"), setup("job2"), setup("job3"), setup("job4")
// Shutdown the client, saves state
if err := c1.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
// Allocations should be placed
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
c1.allocLock.RLock()
defer c1.allocLock.RUnlock()
if len(c1.allocs) != 4 {
return fmt.Errorf("expected 4 alloc runners")
}
for _, ar := range c1.allocs {
if ar.AllocState().ClientStatus != structs.AllocClientStatusRunning {
return fmt.Errorf("expected running client status, got %v",
ar.AllocState().ClientStatus)
}
}
return nil
}),
wait.Timeout(time.Second*10),
wait.Gap(time.Millisecond*30),
))
store := s1.State()
allocIDforJob := func(job *structs.Job) string {
allocs, err := store.AllocsByJob(nil, job.Namespace, job.ID, false)
must.NoError(t, err)
must.Len(t, 1, allocs) // we should only ever get 1 in this test
return allocs[0].ID
}
alloc1 := allocIDforJob(job1)
alloc2 := allocIDforJob(job2)
alloc3 := allocIDforJob(job3)
alloc4 := allocIDforJob(job4)
t.Logf("alloc1=%s alloc2=%s alloc3=%s alloc4=%s", alloc1, alloc2, alloc3, alloc4)
// Stop the 2nd job before we shut down
must.NoError(t, s1.RPC("Job.Deregister", &structs.JobDeregisterRequest{
JobID: job2.ID,
WriteRequest: structs.WriteRequest{Region: "global", Namespace: job2.Namespace},
}, &structs.JobDeregisterResponse{}))
var alloc2ModifyIndex uint64
var alloc2AllocModifyIndex uint64
// Wait till we're sure the client has received the stop and updated the server
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
alloc, err := store.AllocByID(nil, alloc2)
must.NotNil(t, alloc)
must.NoError(t, err)
if alloc.ClientStatus != structs.AllocClientStatusComplete {
// note that the allocrunner is non-nil until it's been
// client-GC'd, so we're just looking to make sure the client
// has updated the server
return fmt.Errorf("alloc2 should have been marked completed")
}
alloc2ModifyIndex = alloc.ModifyIndex
alloc2AllocModifyIndex = alloc.AllocModifyIndex
return nil
}),
wait.Timeout(time.Second*20),
wait.Gap(time.Millisecond*30),
))
t.Log("shutting down client")
must.NoError(t, c1.Shutdown()) // note: this saves the client state DB
// Stop the 3rd job while we're down
must.NoError(t, s1.RPC("Job.Deregister", &structs.JobDeregisterRequest{
JobID: job3.ID,
WriteRequest: structs.WriteRequest{Region: "global", Namespace: job3.Namespace},
}, &structs.JobDeregisterResponse{}))
// Stop and purge the 4th job while we're down
must.NoError(t, s1.RPC("Job.Deregister", &structs.JobDeregisterRequest{
JobID: job4.ID,
Purge: true,
WriteRequest: structs.WriteRequest{Region: "global", Namespace: job4.Namespace},
}, &structs.JobDeregisterResponse{}))
// Ensure the allocation has been deleted as well
must.NoError(t, s1.RPC("Eval.Reap", &structs.EvalReapRequest{
Allocs: []string{alloc4},
WriteRequest: structs.WriteRequest{Region: "global"},
}, &structs.GenericResponse{}))
var alloc3AllocModifyIndex uint64
var alloc3ModifyIndex uint64
// Wait till we're sure the scheduler has marked alloc3 for stop and deleted alloc4
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
alloc, err := store.AllocByID(nil, alloc3)
must.NotNil(t, alloc)
must.NoError(t, err)
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
return fmt.Errorf("alloc3 should have been marked for stop")
}
alloc3ModifyIndex = alloc.ModifyIndex
alloc3AllocModifyIndex = alloc.AllocModifyIndex
alloc, err = store.AllocByID(nil, alloc4)
must.NoError(t, err)
if alloc != nil {
return fmt.Errorf("alloc4 should have been deleted")
}
return nil
}),
wait.Timeout(time.Second*5),
wait.Gap(time.Millisecond*30),
))
a1, err := store.AllocByID(nil, alloc1)
var alloc1AllocModifyIndex uint64
var alloc1ModifyIndex uint64
alloc1ModifyIndex = a1.ModifyIndex
alloc1AllocModifyIndex = a1.AllocModifyIndex
t.Log("starting new client")
// Create a new client
logger := testlog.HCLogger(t)
c1.config.Logger = logger
consulCatalog := consul.NewMockCatalog(logger)
@ -625,34 +734,77 @@ func TestClient_SaveRestoreState(t *testing.T) {
c1.config.PluginSingletonLoader = singleton.NewSingletonLoader(logger, c1.config.PluginLoader)
c2, err := NewClient(c1.config, consulCatalog, nil, mockService, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
defer c2.Shutdown()
must.NoError(t, err)
// Ensure the allocation is running
testutil.WaitForResult(func() (bool, error) {
c2.allocLock.RLock()
ar := c2.allocs[alloc1.ID]
c2.allocLock.RUnlock()
status := ar.Alloc().ClientStatus
alive := status == structs.AllocClientStatusRunning || status == structs.AllocClientStatusPending
if !alive {
return false, fmt.Errorf("incorrect client status: %#v", ar.Alloc())
t.Cleanup(func() {
for _, ar := range c2.getAllocRunners() {
ar.Destroy()
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
for _, ar := range c2.getAllocRunners() {
<-ar.DestroyCh()
}
c2.Shutdown()
})
// Destroy all the allocations
for _, ar := range c2.getAllocRunners() {
ar.Destroy()
}
// Ensure only the expected allocation is running
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
c2.allocLock.RLock()
defer c2.allocLock.RUnlock()
if len(c2.allocs) != 3 {
// the GC'd alloc will not have restored AR
return fmt.Errorf("expected 3 alloc runners")
}
for allocID, ar := range c2.allocs {
if ar == nil {
return fmt.Errorf("nil alloc runner")
}
switch allocID {
case alloc1:
if ar.AllocState().ClientStatus != structs.AllocClientStatusRunning {
return fmt.Errorf("expected running client status, got %v",
ar.AllocState().ClientStatus)
}
default:
if ar.AllocState().ClientStatus != structs.AllocClientStatusComplete {
return fmt.Errorf("expected complete client status, got %v",
ar.AllocState().ClientStatus)
}
}
}
return nil
}),
wait.Timeout(time.Second*10),
wait.Gap(time.Millisecond*30),
))
for _, ar := range c2.getAllocRunners() {
<-ar.DestroyCh()
}
a1, err = store.AllocByID(nil, alloc1)
must.NoError(t, err)
must.NotNil(t, a1)
test.Eq(t, alloc1AllocModifyIndex, a1.AllocModifyIndex)
test.Eq(t, alloc1ModifyIndex, a1.ModifyIndex,
test.Sprint("alloc still running should not have updated"))
a2, err := store.AllocByID(nil, alloc2)
must.NoError(t, err)
must.NotNil(t, a2)
test.Eq(t, alloc2AllocModifyIndex, a2.AllocModifyIndex)
test.Eq(t, alloc2ModifyIndex, a2.ModifyIndex,
test.Sprintf("alloc %s stopped before shutdown should not have updated", a2.ID[:8]))
a3, err := store.AllocByID(nil, alloc3)
must.NoError(t, err)
must.NotNil(t, a3)
test.Eq(t, alloc3AllocModifyIndex, a3.AllocModifyIndex)
test.Greater(t, alloc3ModifyIndex, a3.ModifyIndex,
test.Sprintf("alloc %s stopped during shutdown should have updated", a3.ID[:8]))
// TODO: the alloc has been GC'd so the server will reject any update. It'd
// be nice if we could instrument the server here to ensure we didn't send
// one either.
a4, err := store.AllocByID(nil, alloc4)
must.NoError(t, err)
test.Nil(t, a4, test.Sprint("garbage collected alloc should not exist"))
}
func TestClient_AddAllocError(t *testing.T) {

View File

@ -11,6 +11,7 @@ import (
"time"
hclog "github.com/hashicorp/go-hclog"
arstate "github.com/hashicorp/nomad/client/allocrunner/state"
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
@ -32,6 +33,7 @@ allocations/
|--> alloc -> allocEntry{*structs.Allocation}
|--> deploy_status -> deployStatusEntry{*structs.AllocDeploymentStatus}
|--> network_status -> networkStatusEntry{*structs.AllocNetworkStatus}
|--> acknowledged_state -> acknowledgedStateEntry{*arstate.State}
|--> task-<name>/
|--> local_state -> *trstate.LocalState # Local-only state
|--> task_state -> *structs.TaskState # Syncs to servers
@ -83,6 +85,9 @@ var (
// stored under
allocNetworkStatusKey = []byte("network_status")
// acknowledgedStateKey is the key *arstate.State is stored under
acknowledgedStateKey = []byte("acknowledged_state")
// checkResultsBucket is the bucket name in which check query results are stored
checkResultsBucket = []byte("check_results")
@ -392,6 +397,54 @@ func (s *BoltStateDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkSta
return entry.NetworkStatus, nil
}
// PutAcknowledgedState stores an allocation's last acknowledged state or
// returns an error if it could not be stored.
func (s *BoltStateDB) PutAcknowledgedState(allocID string, state *arstate.State, opts ...WriteOption) error {
return s.updateWithOptions(opts, func(tx *boltdd.Tx) error {
allocBkt, err := getAllocationBucket(tx, allocID)
if err != nil {
return err
}
entry := acknowledgedStateEntry{
State: state,
}
return allocBkt.Put(acknowledgedStateKey, &entry)
})
}
// GetAcknowledgedState retrieves an allocation's last acknowledged state
func (s *BoltStateDB) GetAcknowledgedState(allocID string) (*arstate.State, error) {
var entry acknowledgedStateEntry
err := s.db.View(func(tx *boltdd.Tx) error {
allAllocsBkt := tx.Bucket(allocationsBucketName)
if allAllocsBkt == nil {
// No state, return
return nil
}
allocBkt := allAllocsBkt.Bucket([]byte(allocID))
if allocBkt == nil {
// No state for alloc, return
return nil
}
return allocBkt.Get(acknowledgedStateKey, &entry)
})
// It's valid for this field to be nil/missing
if boltdd.IsErrNotFound(err) {
return nil, nil
}
if err != nil {
return nil, err
}
return entry.State, nil
}
// GetTaskRunnerState returns the LocalState and TaskState for a
// TaskRunner. LocalState or TaskState will be nil if they do not exist.
//
@ -851,6 +904,12 @@ func (s *BoltStateDB) init() error {
})
}
// acknowledgedStateEntry wraps values in the acknowledged_state bucket, so we
// can expand it in the future if need be
type acknowledgedStateEntry struct {
State *arstate.State
}
// updateWithOptions enables adjustments to db.Update operation, including Batch mode.
func (s *BoltStateDB) updateWithOptions(opts []WriteOption, updateFn func(tx *boltdd.Tx) error) error {
writeOpts := mergeWriteOptions(opts)

View File

@ -6,6 +6,7 @@ package state
import (
"fmt"
arstate "github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
@ -52,6 +53,14 @@ func (m *ErrDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus,
return fmt.Errorf("Error!")
}
func (m *ErrDB) PutAcknowledgedState(allocID string, state *arstate.State, opts ...WriteOption) error {
return fmt.Errorf("Error!")
}
func (m *ErrDB) GetAcknowledgedState(allocID string) (*arstate.State, error) {
return nil, fmt.Errorf("Error!")
}
func (m *ErrDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) {
return nil, nil, fmt.Errorf("Error!")
}

View File

@ -7,6 +7,7 @@ import (
"sync"
"github.com/hashicorp/go-hclog"
arstate "github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
@ -28,6 +29,9 @@ type MemDB struct {
// alloc_id -> value
networkStatus map[string]*structs.AllocNetworkStatus
// alloc_id -> value
acknowledgedState map[string]*arstate.State
// alloc_id -> task_name -> value
localTaskState map[string]map[string]*state.LocalState
taskState map[string]map[string]*structs.TaskState
@ -55,13 +59,14 @@ type MemDB struct {
func NewMemDB(logger hclog.Logger) *MemDB {
logger = logger.Named("memdb")
return &MemDB{
allocs: make(map[string]*structs.Allocation),
deployStatus: make(map[string]*structs.AllocDeploymentStatus),
networkStatus: make(map[string]*structs.AllocNetworkStatus),
localTaskState: make(map[string]map[string]*state.LocalState),
taskState: make(map[string]map[string]*structs.TaskState),
checks: make(checks.ClientResults),
logger: logger,
allocs: make(map[string]*structs.Allocation),
deployStatus: make(map[string]*structs.AllocDeploymentStatus),
networkStatus: make(map[string]*structs.AllocNetworkStatus),
acknowledgedState: make(map[string]*arstate.State),
localTaskState: make(map[string]map[string]*state.LocalState),
taskState: make(map[string]map[string]*structs.TaskState),
checks: make(checks.ClientResults),
logger: logger,
}
}
@ -118,6 +123,19 @@ func (m *MemDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus,
return nil
}
func (m *MemDB) PutAcknowledgedState(allocID string, state *arstate.State, opts ...WriteOption) error {
m.mu.Lock()
m.acknowledgedState[allocID] = state
defer m.mu.Unlock()
return nil
}
func (m *MemDB) GetAcknowledgedState(allocID string) (*arstate.State, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.acknowledgedState[allocID], nil
}
func (m *MemDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) {
m.mu.RLock()
defer m.mu.RUnlock()

View File

@ -4,6 +4,7 @@
package state
import (
arstate "github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
@ -47,6 +48,12 @@ func (n NoopDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus,
return nil
}
func (n NoopDB) PutAcknowledgedState(allocID string, state *arstate.State, opts ...WriteOption) error {
return nil
}
func (n NoopDB) GetAcknowledgedState(allocID string) (*arstate.State, error) { return nil, nil }
func (n NoopDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) {
return nil, nil, nil
}

View File

@ -4,6 +4,7 @@
package state
import (
arstate "github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
@ -44,6 +45,14 @@ type StateDB interface {
// PutNetworkStatus puts the allocation's network status. It may be nil.
PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, opts ...WriteOption) error
// PutAcknowledgedState stores an allocation's last acknowledged state or
// returns an error if it could not be stored.
PutAcknowledgedState(string, *arstate.State, ...WriteOption) error
// GetAcknowledgedState retrieves an allocation's last acknowledged
// state. It may be nil even if there's no error
GetAcknowledgedState(string) (*arstate.State, error)
// GetTaskRunnerState returns the LocalState and TaskState for a
// TaskRunner. Either state may be nil if it is not found, but if an
// error is encountered only the error will be non-nil.

View File

@ -41,7 +41,8 @@ func diffAllocs(existing map[string]uint64, allocs *allocUpdates) *diffResult {
continue
}
// Check for an update
// Check for an update (note: AllocModifyIndex is only updated for
// server updates)
if pulled && alloc.AllocModifyIndex > existIndex {
result.updated = append(result.updated, alloc)
continue

View File

@ -2823,6 +2823,13 @@ func (d *DNSConfig) Copy() *DNSConfig {
}
}
func (d *DNSConfig) IsZero() bool {
if d == nil {
return true
}
return len(d.Options) == 0 || len(d.Searches) == 0 || len(d.Servers) == 0
}
// NetworkResource is used to represent available network
// resources
type NetworkResource struct {
@ -8358,6 +8365,16 @@ func (h *TaskHandle) Copy() *TaskHandle {
return &newTH
}
func (h *TaskHandle) Equal(o *TaskHandle) bool {
if h == nil || o == nil {
return h == o
}
if h.Version != o.Version {
return false
}
return bytes.Equal(h.DriverState, o.DriverState)
}
// Set of possible states for a task.
const (
TaskStatePending = "pending" // The task is waiting to be run.
@ -8437,6 +8454,37 @@ func (ts *TaskState) Successful() bool {
return ts.State == TaskStateDead && !ts.Failed
}
func (ts *TaskState) Equal(o *TaskState) bool {
if ts.State != o.State {
return false
}
if ts.Failed != o.Failed {
return false
}
if ts.Restarts != o.Restarts {
return false
}
if ts.LastRestart != o.LastRestart {
return false
}
if ts.StartedAt != o.StartedAt {
return false
}
if ts.FinishedAt != o.FinishedAt {
return false
}
if !slices.EqualFunc(ts.Events, o.Events, func(ts, o *TaskEvent) bool {
return ts.Equal(o)
}) {
return false
}
if !ts.TaskHandle.Equal(o.TaskHandle) {
return false
}
return true
}
const (
// TaskSetupFailure indicates that the task could not be started due to a
// a setup failure.
@ -8769,6 +8817,31 @@ func (e *TaskEvent) GoString() string {
return fmt.Sprintf("%v - %v", e.Time, e.Type)
}
// Equal on TaskEvent ignores the deprecated fields
func (e *TaskEvent) Equal(o *TaskEvent) bool {
if e == nil || o == nil {
return e == o
}
if e.Type != o.Type {
return false
}
if e.Time != o.Time {
return false
}
if e.Message != o.Message {
return false
}
if e.DisplayMessage != o.DisplayMessage {
return false
}
if !maps.Equal(e.Details, o.Details) {
return false
}
return true
}
// SetDisplayMessage sets the display message of TaskEvent
func (e *TaskEvent) SetDisplayMessage(msg string) *TaskEvent {
e.DisplayMessage = msg
@ -11257,6 +11330,41 @@ func (a *AllocNetworkStatus) Copy() *AllocNetworkStatus {
}
}
func (a *AllocNetworkStatus) Equal(o *AllocNetworkStatus) bool {
// note: this accounts for when DNSConfig is non-nil but empty
switch {
case a == nil && o.IsZero():
return true
case o == nil && a.IsZero():
return true
case a == nil || o == nil:
return a == o
}
switch {
case a.InterfaceName != o.InterfaceName:
return false
case a.Address != o.Address:
return false
case !a.DNS.Equal(o.DNS):
return false
}
return true
}
func (a *AllocNetworkStatus) IsZero() bool {
if a == nil {
return true
}
if a.InterfaceName != "" || a.Address != "" {
return false
}
if !a.DNS.IsZero() {
return false
}
return true
}
// NetworkStatus is an interface satisfied by alloc runner, for acquiring the
// network status of an allocation.
type NetworkStatus interface {
@ -11333,6 +11441,24 @@ func (a *AllocDeploymentStatus) Copy() *AllocDeploymentStatus {
return c
}
func (a *AllocDeploymentStatus) Equal(o *AllocDeploymentStatus) bool {
if a == nil || o == nil {
return a == o
}
switch {
case !pointer.Eq(a.Healthy, o.Healthy):
return false
case a.Timestamp != o.Timestamp:
return false
case a.Canary != o.Canary:
return false
case a.ModifyIndex != o.ModifyIndex:
return false
}
return true
}
const (
EvalStatusBlocked = "blocked"
EvalStatusPending = "pending"