06972fae0c
The allocrunner sends several updates to the server during the early lifecycle of an allocation and its tasks. Clients batch-up allocation updates every 200ms, but experiments like the C2M challenge has shown that even with this batching, servers can be overwhelmed with client updates during high volume deployments. Benchmarking done in #9451 has shown that client updates can easily represent ~70% of all Nomad Raft traffic. Each allocation sends many updates during its lifetime, but only those that change the `ClientStatus` field are critical for progressing a deployment or kicking off a reschedule to recover from failures. Add a priority to the client allocation sync and update the `syncTicker` receiver so that we only send an update if there's a high priority update waiting, or on every 5th tick. This means when there are no high priority updates, the client will send updates at most every 1s instead of 200ms. Benchmarks have shown this can reduce overall Raft traffic by 10%, as well as reduce client-to-server RPC traffic. This changeset also switches from a channel-based collection of updates to a shared buffer, so as to split batching from sending and prevent backpressure onto the allocrunner when the RPC is slow. This doesn't have a major performance benefit in the benchmarks but makes the implementation of the prioritized update simpler. Fixes: #9451
168 lines
5.1 KiB
Go
168 lines
5.1 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package client
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/hashicorp/nomad/ci"
|
|
"github.com/hashicorp/nomad/client/allocdir"
|
|
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
|
"github.com/hashicorp/nomad/client/allocrunner/state"
|
|
"github.com/hashicorp/nomad/client/config"
|
|
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
|
|
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/plugins/device"
|
|
"github.com/hashicorp/nomad/plugins/drivers"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
)
|
|
|
|
// TestEmptyAllocRunner demonstrates the minimum interface necessary to
|
|
// implement a mock AllocRunner that can report client status back to the server
|
|
func TestEmptyAllocRunner(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, _, cleanupS1 := testServer(t, nil)
|
|
defer cleanupS1()
|
|
|
|
_, cleanup := TestClient(t, func(c *config.Config) {
|
|
c.RPCHandler = s1
|
|
c.AllocRunnerFactory = newEmptyAllocRunnerFunc
|
|
})
|
|
defer cleanup()
|
|
|
|
job := mock.Job()
|
|
job.Constraints = nil
|
|
job.TaskGroups[0].Constraints = nil
|
|
job.TaskGroups[0].Count = 1
|
|
task := job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"run_for": "10s",
|
|
}
|
|
task.Services = nil
|
|
|
|
// WaitForRunning polls the server until the ClientStatus is running
|
|
testutil.WaitForRunning(t, s1.RPC, job)
|
|
}
|
|
|
|
type emptyAllocRunner struct {
|
|
c cinterfaces.AllocStateHandler
|
|
alloc *structs.Allocation
|
|
allocState *state.State
|
|
allocLock sync.RWMutex
|
|
}
|
|
|
|
func newEmptyAllocRunnerFunc(conf *config.AllocRunnerConfig) (interfaces.AllocRunner, error) {
|
|
return &emptyAllocRunner{
|
|
c: conf.StateUpdater,
|
|
alloc: conf.Alloc,
|
|
allocState: &state.State{},
|
|
}, nil
|
|
}
|
|
|
|
func (ar *emptyAllocRunner) Alloc() *structs.Allocation {
|
|
ar.allocLock.RLock()
|
|
defer ar.allocLock.RUnlock()
|
|
return ar.alloc.Copy()
|
|
}
|
|
|
|
func (ar *emptyAllocRunner) Run() {
|
|
ar.allocLock.RLock()
|
|
defer ar.allocLock.RUnlock()
|
|
ar.alloc.ClientStatus = "running"
|
|
ar.c.AllocStateUpdated(ar.alloc)
|
|
}
|
|
|
|
func (ar *emptyAllocRunner) Restore() error { return nil }
|
|
func (ar *emptyAllocRunner) Update(update *structs.Allocation) {
|
|
ar.allocLock.Lock()
|
|
defer ar.allocLock.Unlock()
|
|
ar.alloc = update
|
|
}
|
|
|
|
func (ar *emptyAllocRunner) Reconnect(update *structs.Allocation) error {
|
|
ar.allocLock.Lock()
|
|
defer ar.allocLock.Unlock()
|
|
ar.alloc = update
|
|
return nil
|
|
}
|
|
|
|
func (ar *emptyAllocRunner) Shutdown() {}
|
|
func (ar *emptyAllocRunner) Destroy() {}
|
|
|
|
func (ar *emptyAllocRunner) IsDestroyed() bool { return false }
|
|
func (ar *emptyAllocRunner) IsMigrating() bool { return false }
|
|
func (ar *emptyAllocRunner) IsWaiting() bool { return false }
|
|
|
|
func (ar *emptyAllocRunner) WaitCh() <-chan struct{} { return make(chan struct{}) }
|
|
|
|
func (ar *emptyAllocRunner) DestroyCh() <-chan struct{} {
|
|
ch := make(chan struct{})
|
|
close(ch)
|
|
return ch
|
|
}
|
|
|
|
func (ar *emptyAllocRunner) ShutdownCh() <-chan struct{} {
|
|
ch := make(chan struct{})
|
|
close(ch)
|
|
return ch
|
|
}
|
|
|
|
func (ar *emptyAllocRunner) AllocState() *state.State {
|
|
ar.allocLock.RLock()
|
|
defer ar.allocLock.RUnlock()
|
|
return ar.allocState.Copy()
|
|
}
|
|
|
|
func (ar *emptyAllocRunner) PersistState() error { return nil }
|
|
func (ar *emptyAllocRunner) AcknowledgeState(*state.State) {}
|
|
func (ar *emptyAllocRunner) GetUpdatePriority(*structs.Allocation) cstructs.AllocUpdatePriority {
|
|
return cstructs.AllocUpdatePriorityUrgent
|
|
}
|
|
|
|
func (ar *emptyAllocRunner) SetClientStatus(status string) {
|
|
ar.allocLock.Lock()
|
|
defer ar.allocLock.Unlock()
|
|
ar.alloc.ClientStatus = status
|
|
}
|
|
|
|
func (ar *emptyAllocRunner) Signal(taskName, signal string) error { return nil }
|
|
func (ar *emptyAllocRunner) RestartTask(taskName string, taskEvent *structs.TaskEvent) error {
|
|
return nil
|
|
}
|
|
func (ar *emptyAllocRunner) RestartRunning(taskEvent *structs.TaskEvent) error { return nil }
|
|
func (ar *emptyAllocRunner) RestartAll(taskEvent *structs.TaskEvent) error { return nil }
|
|
|
|
func (ar *emptyAllocRunner) GetTaskEventHandler(taskName string) drivermanager.EventHandler {
|
|
return nil
|
|
}
|
|
func (ar *emptyAllocRunner) GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler {
|
|
return nil
|
|
}
|
|
func (ar *emptyAllocRunner) GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (ar *emptyAllocRunner) StatsReporter() interfaces.AllocStatsReporter { return ar }
|
|
func (ar *emptyAllocRunner) Listener() *cstructs.AllocListener { return nil }
|
|
func (ar *emptyAllocRunner) GetAllocDir() *allocdir.AllocDir { return nil }
|
|
|
|
// LatestAllocStats lets this empty runner implement AllocStatsReporter
|
|
func (ar *emptyAllocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) {
|
|
return &cstructs.AllocResourceUsage{
|
|
ResourceUsage: &cstructs.ResourceUsage{
|
|
MemoryStats: &cstructs.MemoryStats{},
|
|
CpuStats: &cstructs.CpuStats{},
|
|
DeviceStats: []*device.DeviceGroupStats{},
|
|
},
|
|
Tasks: map[string]*cstructs.TaskResourceUsage{},
|
|
Timestamp: 0,
|
|
}, nil
|
|
}
|