diff --git a/.changelog/17354.txt b/.changelog/17354.txt new file mode 100644 index 000000000..92a6e613f --- /dev/null +++ b/.changelog/17354.txt @@ -0,0 +1,3 @@ +```release-note:improvement +client: prioritize allocation updates to reduce Raft and RPC load +``` diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index d82b238ef..04de68ab4 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -1420,30 +1420,36 @@ func (ar *allocRunner) persistLastAcknowledgedState(a *state.State) { } } -// LastAcknowledgedStateIsCurrent returns true if the current state matches the -// state that was last acknowledged from a server update. This is called from -// the client in the same goroutine that called AcknowledgeState so that we -// can't get a TOCTOU error. -func (ar *allocRunner) LastAcknowledgedStateIsCurrent(a *structs.Allocation) bool { +// GetUpdatePriority returns the update priority based the difference between +// the current state and the state that was last acknowledged from a server +// update. This is called from the client in the same goroutine that called +// AcknowledgeState so that we can't get a TOCTOU error. +func (ar *allocRunner) GetUpdatePriority(a *structs.Allocation) cstructs.AllocUpdatePriority { ar.stateLock.RLock() defer ar.stateLock.RUnlock() last := ar.lastAcknowledgedState if last == nil { - return false + return cstructs.AllocUpdatePriorityTypical } switch { case last.ClientStatus != a.ClientStatus: - return false + return cstructs.AllocUpdatePriorityUrgent case last.ClientDescription != a.ClientDescription: - return false + return cstructs.AllocUpdatePriorityTypical case !last.DeploymentStatus.Equal(a.DeploymentStatus): - return false + return cstructs.AllocUpdatePriorityTypical case !last.NetworkStatus.Equal(a.NetworkStatus): - return false + return cstructs.AllocUpdatePriorityTypical } - return maps.EqualFunc(last.TaskStates, a.TaskStates, func(st, o *structs.TaskState) bool { + + if !maps.EqualFunc(last.TaskStates, a.TaskStates, func(st, o *structs.TaskState) bool { return st.Equal(o) - }) + + }) { + return cstructs.AllocUpdatePriorityTypical + } + + return cstructs.AllocUpdatePriorityNone } diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index a477ae243..775b1d5ff 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -28,6 +28,7 @@ import ( "github.com/hashicorp/nomad/client/serviceregistration" regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" "github.com/hashicorp/nomad/client/state" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -2458,7 +2459,7 @@ func TestAllocRunner_PreKill_RunOnDone(t *testing.T) { )) } -func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) { +func TestAllocRunner_GetUpdatePriority(t *testing.T) { ci.Parallel(t) alloc := mock.Alloc() @@ -2489,12 +2490,12 @@ func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) { NetworkStatus: calloc.NetworkStatus, }) - must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc)) + must.Eq(t, cstructs.AllocUpdatePriorityNone, ar.GetUpdatePriority(calloc)) // clientAlloc mutates the state, so verify this doesn't break the check // without state having been updated calloc = ar.clientAlloc(map[string]*structs.TaskState{}) - must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc)) + must.Eq(t, cstructs.AllocUpdatePriorityNone, ar.GetUpdatePriority(calloc)) // make a no-op state update ar.SetNetworkStatus(&structs.AllocNetworkStatus{ @@ -2503,14 +2504,19 @@ func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) { DNS: &structs.DNSConfig{}, }) calloc = ar.clientAlloc(map[string]*structs.TaskState{}) - must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc)) + must.Eq(t, cstructs.AllocUpdatePriorityNone, ar.GetUpdatePriority(calloc)) - // make a state update that should be detected as a change + // make a low priority state update ar.SetNetworkStatus(&structs.AllocNetworkStatus{ InterfaceName: "eth0", - Address: "192.168.2.1", + Address: "192.168.1.2", DNS: &structs.DNSConfig{}, }) calloc = ar.clientAlloc(map[string]*structs.TaskState{}) - must.False(t, ar.LastAcknowledgedStateIsCurrent(calloc)) + must.Eq(t, cstructs.AllocUpdatePriorityTypical, ar.GetUpdatePriority(calloc)) + + // make a state update that should be detected as high priority + ar.SetClientStatus(structs.AllocClientStatusFailed) + calloc = ar.clientAlloc(map[string]*structs.TaskState{}) + must.Eq(t, cstructs.AllocUpdatePriorityUrgent, ar.GetUpdatePriority(calloc)) } diff --git a/client/allocrunner/interfaces/runner.go b/client/allocrunner/interfaces/runner.go index d78894266..6ed59e258 100644 --- a/client/allocrunner/interfaces/runner.go +++ b/client/allocrunner/interfaces/runner.go @@ -35,7 +35,7 @@ type AllocRunner interface { AllocState() *state.State PersistState() error AcknowledgeState(*state.State) - LastAcknowledgedStateIsCurrent(*structs.Allocation) bool + GetUpdatePriority(*structs.Allocation) cstructs.AllocUpdatePriority SetClientStatus(string) Signal(taskName, signal string) error diff --git a/client/client.go b/client/client.go index daf7512a6..18e4cbe86 100644 --- a/client/client.go +++ b/client/client.go @@ -43,6 +43,7 @@ import ( "github.com/hashicorp/nomad/client/serviceregistration/wrapper" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/stats" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" @@ -213,8 +214,8 @@ type Client struct { invalidAllocs map[string]struct{} invalidAllocsLock sync.Mutex - // allocUpdates stores allocations that need to be synced to the server. - allocUpdates chan *structs.Allocation + // pendingUpdates stores allocations that need to be synced to the server. + pendingUpdates *pendingClientUpdates // consulService is the Consul handler implementation for managing services // and checks. @@ -366,7 +367,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie logger: logger, rpcLogger: logger.Named("rpc"), allocs: make(map[string]interfaces.AllocRunner), - allocUpdates: make(chan *structs.Allocation, 64), + pendingUpdates: newPendingClientUpdates(), shutdownCh: make(chan struct{}), triggerDiscoveryCh: make(chan struct{}), triggerNodeUpdate: make(chan struct{}, 8), @@ -1322,10 +1323,7 @@ func (c *Client) handleInvalidAllocs(alloc *structs.Allocation, err error) { // Mark alloc as failed so server can handle this failed := makeFailedAlloc(alloc, err) - select { - case c.allocUpdates <- failed: - case <-c.shutdownCh: - } + c.pendingUpdates.add(failed) } // saveState is used to snapshot our state into the data dir. @@ -2099,10 +2097,7 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) { stripped.DeploymentStatus = alloc.DeploymentStatus stripped.NetworkStatus = alloc.NetworkStatus - select { - case c.allocUpdates <- stripped: - case <-c.shutdownCh: - } + c.pendingUpdates.add(stripped) } // PutAllocation stores an allocation or returns an error if it could not be stored. @@ -2114,39 +2109,27 @@ func (c *Client) PutAllocation(alloc *structs.Allocation) error { // server. func (c *Client) allocSync() { syncTicker := time.NewTicker(allocSyncIntv) - updates := make(map[string]*structs.Allocation) + updateTicks := 0 + for { select { case <-c.shutdownCh: syncTicker.Stop() return - case alloc := <-c.allocUpdates: - // Batch the allocation updates until the timer triggers. - updates[alloc.ID] = alloc - case <-syncTicker.C: - // Fast path if there are no updates - if len(updates) == 0 { - continue - } - // Ensure we never send an update before we've had at least one sync - // from the server - select { - case <-c.serversContactedCh: - default: - continue - } - sync := c.filterAcknowledgedUpdates(updates) - if len(sync) == 0 { - // No updates to send - updates = make(map[string]*structs.Allocation, len(updates)) + case <-syncTicker.C: + + updateTicks++ + toSync := c.pendingUpdates.nextBatch(c, updateTicks) + + if len(toSync) == 0 { syncTicker.Reset(allocSyncIntv) continue } // Send to server. args := structs.AllocUpdateRequest{ - Alloc: sync, + Alloc: toSync, WriteRequest: structs.WriteRequest{Region: c.Region()}, } @@ -2156,12 +2139,17 @@ func (c *Client) allocSync() { // Error updating allocations, do *not* clear // updates and retry after backoff c.logger.Error("error updating allocations", "error", err) + + // refill the updates queue with updates that we failed to make + c.pendingUpdates.restore(toSync) syncTicker.Reset(c.retryIntv(allocSyncRetryIntv)) continue } + // Record that we've successfully synced these updates so that it's + // written to disk c.allocLock.RLock() - for _, update := range sync { + for _, update := range toSync { if ar, ok := c.allocs[update.ID]; ok { ar.AcknowledgeState(&arstate.State{ ClientStatus: update.ClientStatus, @@ -2174,35 +2162,15 @@ func (c *Client) allocSync() { } c.allocLock.RUnlock() - // Successfully updated allocs, reset map and ticker. - // Always reset ticker to give loop time to receive - // alloc updates. If the RPC took the ticker interval - // we may call it in a tight loop before draining - // buffered updates. - updates = make(map[string]*structs.Allocation, len(updates)) + // Successfully updated allocs. Reset ticker to give loop time to + // receive new alloc updates. Otherwise if the RPC took the ticker + // interval we may call it in a tight loop reading empty updates. + updateTicks = 0 syncTicker.Reset(allocSyncIntv) } } } -func (c *Client) filterAcknowledgedUpdates(updates map[string]*structs.Allocation) []*structs.Allocation { - sync := make([]*structs.Allocation, 0, len(updates)) - c.allocLock.RLock() - defer c.allocLock.RUnlock() - for allocID, update := range updates { - if ar, ok := c.allocs[allocID]; ok { - if !ar.LastAcknowledgedStateIsCurrent(update) { - sync = append(sync, update) - } - } else { - // no allocrunner (typically a failed placement), so we need - // to send update - sync = append(sync, update) - } - } - return sync -} - // allocUpdates holds the results of receiving updated allocations from the // servers. type allocUpdates struct { @@ -3306,3 +3274,103 @@ func (g *group) AddCh(ch <-chan struct{}) { func (g *group) Wait() { g.wg.Wait() } + +// pendingClientUpdates are the set of allocation updates that the client is +// waiting to send +type pendingClientUpdates struct { + updates map[string]*structs.Allocation + lock sync.Mutex +} + +func newPendingClientUpdates() *pendingClientUpdates { + return &pendingClientUpdates{ + updates: make(map[string]*structs.Allocation, 64), + } +} + +// add overwrites a pending update. The updates we get from the allocrunner are +// lightweight copies of its *structs.Allocation (i.e. just the client state), +// serialized with an internal lock. So the latest update is always the +// authoritative one, and the server only cares about that one. +func (p *pendingClientUpdates) add(alloc *structs.Allocation) { + p.lock.Lock() + defer p.lock.Unlock() + p.updates[alloc.ID] = alloc +} + +// restore refills the pending updates map, but only if a newer update hasn't come in +func (p *pendingClientUpdates) restore(toRestore []*structs.Allocation) { + p.lock.Lock() + defer p.lock.Unlock() + + for _, alloc := range toRestore { + if _, ok := p.updates[alloc.ID]; !ok { + p.updates[alloc.ID] = alloc + } + } +} + +// nextBatch returns a list of client allocation updates we need to make in this +// tick of the allocSync. It returns nil if there's no updates to make yet. The +// caller is responsible for calling restore() if it can't successfully send the +// updates. +func (p *pendingClientUpdates) nextBatch(c *Client, updateTicks int) []*structs.Allocation { + p.lock.Lock() + defer p.lock.Unlock() + + // Fast path if there are no pending updates + if len(p.updates) == 0 { + return nil + } + + // Ensure we never send an update before we've had at least one sync from + // the server + select { + case <-c.serversContactedCh: + default: + return nil + } + + toSync, urgent := p.filterAcknowledgedUpdatesLocked(c) + + // Only update every 5th tick if there's no priority updates + if updateTicks%5 != 0 && !urgent { + return nil + } + + // Clear here so that allocrunners can queue up the next set of updates + // while we're waiting to hear from the server + maps.Clear(p.updates) + + return toSync + +} + +// filteredAcknowledgedUpdatesLocked returns a list of client alloc updates with +// the already-acknowledged updates removed, and the highest priority of any +// update. note: this method requires that p.lock is held +func (p *pendingClientUpdates) filterAcknowledgedUpdatesLocked(c *Client) ([]*structs.Allocation, bool) { + var urgent bool + sync := make([]*structs.Allocation, 0, len(p.updates)) + c.allocLock.RLock() + defer c.allocLock.RUnlock() + + for allocID, update := range p.updates { + if ar, ok := c.allocs[allocID]; ok { + switch ar.GetUpdatePriority(update) { + case cstructs.AllocUpdatePriorityUrgent: + sync = append(sync, update) + urgent = true + case cstructs.AllocUpdatePriorityTypical: + sync = append(sync, update) + case cstructs.AllocUpdatePriorityNone: + // update is dropped + } + } else { + // no allocrunner (typically a failed placement), so we need + // to send update + sync = append(sync, update) + } + } + return sync, urgent +} diff --git a/client/client_interface_test.go b/client/client_interface_test.go index 5a985b7bc..326cd5484 100644 --- a/client/client_interface_test.go +++ b/client/client_interface_test.go @@ -120,9 +120,11 @@ func (ar *emptyAllocRunner) AllocState() *state.State { return ar.allocState.Copy() } -func (ar *emptyAllocRunner) PersistState() error { return nil } -func (ar *emptyAllocRunner) AcknowledgeState(*state.State) {} -func (ar *emptyAllocRunner) LastAcknowledgedStateIsCurrent(*structs.Allocation) bool { return false } +func (ar *emptyAllocRunner) PersistState() error { return nil } +func (ar *emptyAllocRunner) AcknowledgeState(*state.State) {} +func (ar *emptyAllocRunner) GetUpdatePriority(*structs.Allocation) cstructs.AllocUpdatePriority { + return cstructs.AllocUpdatePriorityUrgent +} func (ar *emptyAllocRunner) SetClientStatus(status string) { ar.allocLock.Lock() diff --git a/client/client_test.go b/client/client_test.go index 04148854c..719d81c9e 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -765,6 +765,24 @@ func TestClient_SaveRestoreState(t *testing.T) { return fmt.Errorf("expected running client status, got %v", ar.AllocState().ClientStatus) } + + case alloc3: + if ar.AllocState().ClientStatus != structs.AllocClientStatusComplete { + return fmt.Errorf("expected complete client status, got %v", + ar.AllocState().ClientStatus) + } + + // because the client's update will be batched, we need to + // ensure we wait for the server update too + a3, err := store.AllocByID(nil, alloc3) + must.NoError(t, err) + must.NotNil(t, a3) + if alloc3AllocModifyIndex != a3.AllocModifyIndex || + alloc3ModifyIndex >= a3.ModifyIndex { + return fmt.Errorf( + "alloc %s stopped during shutdown should have updated", a3.ID[:8]) + } + default: if ar.AllocState().ClientStatus != structs.AllocClientStatusComplete { return fmt.Errorf("expected complete client status, got %v", @@ -778,33 +796,43 @@ func TestClient_SaveRestoreState(t *testing.T) { wait.Gap(time.Millisecond*30), )) - a1, err = store.AllocByID(nil, alloc1) - must.NoError(t, err) - must.NotNil(t, a1) - test.Eq(t, alloc1AllocModifyIndex, a1.AllocModifyIndex) - test.Eq(t, alloc1ModifyIndex, a1.ModifyIndex, - test.Sprint("alloc still running should not have updated")) + // Because we're asserting that no changes have been made, we have to wait a + // sufficient amount of time to verify that + must.Wait(t, wait.ContinualSuccess( + wait.ErrorFunc(func() error { + a1, err = store.AllocByID(nil, alloc1) + must.NoError(t, err) + must.NotNil(t, a1) - a2, err := store.AllocByID(nil, alloc2) - must.NoError(t, err) - must.NotNil(t, a2) - test.Eq(t, alloc2AllocModifyIndex, a2.AllocModifyIndex) - test.Eq(t, alloc2ModifyIndex, a2.ModifyIndex, - test.Sprintf("alloc %s stopped before shutdown should not have updated", a2.ID[:8])) + if alloc1AllocModifyIndex != a1.AllocModifyIndex || + alloc1ModifyIndex != a1.ModifyIndex { + return fmt.Errorf("alloc still running should not have updated") + } - a3, err := store.AllocByID(nil, alloc3) - must.NoError(t, err) - must.NotNil(t, a3) - test.Eq(t, alloc3AllocModifyIndex, a3.AllocModifyIndex) - test.Greater(t, alloc3ModifyIndex, a3.ModifyIndex, - test.Sprintf("alloc %s stopped during shutdown should have updated", a3.ID[:8])) + a2, err := store.AllocByID(nil, alloc2) + must.NoError(t, err) + must.NotNil(t, a2) + if alloc2AllocModifyIndex != a2.AllocModifyIndex || + alloc2ModifyIndex != a2.ModifyIndex { + return fmt.Errorf( + "alloc %s stopped before shutdown should not have updated", a2.ID[:8]) + } + + // TODO: the alloc has been GC'd so the server will reject any + // update. It'd be nice if we could instrument the server here to + // ensure we didn't send one either. + a4, err := store.AllocByID(nil, alloc4) + must.NoError(t, err) + if a4 != nil { + return fmt.Errorf("garbage collected alloc should not exist") + } + + return nil + }), + wait.Timeout(time.Second*3), + wait.Gap(time.Millisecond*100), + )) - // TODO: the alloc has been GC'd so the server will reject any update. It'd - // be nice if we could instrument the server here to ensure we didn't send - // one either. - a4, err := store.AllocByID(nil, alloc4) - must.NoError(t, err) - test.Nil(t, a4, test.Sprint("garbage collected alloc should not exist")) } func TestClient_AddAllocError(t *testing.T) { diff --git a/client/structs/enum.go b/client/structs/enum.go new file mode 100644 index 000000000..ed11bb1aa --- /dev/null +++ b/client/structs/enum.go @@ -0,0 +1,11 @@ +package structs + +// AllocUpdatePriority indicates the urgency of an allocation update so that the +// client can decide whether to wait longer +type AllocUpdatePriority int + +const ( + AllocUpdatePriorityNone AllocUpdatePriority = iota + AllocUpdatePriorityTypical + AllocUpdatePriorityUrgent +)