Merge pull request #9435 from hashicorp/f-allocupdate-timer

client: always wait 200ms before sending updates
This commit is contained in:
Michael Schurter 2020-12-01 08:45:17 -08:00 committed by GitHub
commit ea0e1789f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 12 deletions

View File

@ -22,6 +22,7 @@ IMPROVEMENTS:
* client: Use ec2 CPU perf data from AWS API [[GH-7830](https://github.com/hashicorp/nomad/issues/7830)]
* client: Added support for Azure fingerprinting. [[GH-8979](https://github.com/hashicorp/nomad/issues/8979)]
* client: Batch state store writes to reduce disk IO. [[GH-9093](https://github.com/hashicorp/nomad/issues/9093)]
* client: Reduce rate of sending allocation updates when servers are slow. [[GH-9435](https://github.com/hashicorp/nomad/issues/9435)]
* client: Added support for fingerprinting the client node's Consul segment. [[GH-7214](https://github.com/hashicorp/nomad/issues/7214)]
* client: Added `NOMAD_JOB_ID` and `NOMAD_PARENT_JOB_ID` environment variables to those made available to jobs. [[GH-8967](https://github.com/hashicorp/nomad/issues/8967)]
* client: Updated consul-template to v0.25.0 - config `function_blacklist` deprecated and replaced with `function_denylist` [[GH-8988](https://github.com/hashicorp/nomad/pull/8988)]

View File

@ -1904,7 +1904,6 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) {
// allocSync is a long lived function that batches allocation updates to the
// server.
func (c *Client) allocSync() {
staggered := false
syncTicker := time.NewTicker(allocSyncIntv)
updates := make(map[string]*structs.Allocation)
for {
@ -1933,19 +1932,24 @@ func (c *Client) allocSync() {
}
var resp structs.GenericResponse
if err := c.RPC("Node.UpdateAlloc", &args, &resp); err != nil {
err := c.RPC("Node.UpdateAlloc", &args, &resp)
if err != nil {
// Error updating allocations, do *not* clear
// updates and retry after backoff
c.logger.Error("error updating allocations", "error", err)
syncTicker.Stop()
syncTicker = time.NewTicker(c.retryIntv(allocSyncRetryIntv))
staggered = true
} else {
updates = make(map[string]*structs.Allocation)
if staggered {
syncTicker.Stop()
syncTicker = time.NewTicker(allocSyncIntv)
staggered = false
}
continue
}
// Successfully updated allocs, reset map and ticker.
// Always reset ticker to give loop time to receive
// alloc updates. If the RPC took the ticker interval
// we may call it in a tight loop before draining
// buffered updates.
updates = make(map[string]*structs.Allocation, len(updates))
syncTicker.Stop()
syncTicker = time.NewTicker(allocSyncIntv)
}
}
}

View File

@ -1144,8 +1144,13 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
updates := n.updates
evals := n.evals
future := n.updateFuture
n.updates = nil
n.evals = nil
// Assume future update patterns will be similar to
// current batch and set cap appropriately to avoid
// slice resizing.
n.updates = make([]*structs.Allocation, 0, len(updates))
n.evals = make([]*structs.Evaluation, 0, len(evals))
n.updateFuture = nil
n.updateTimer = nil
n.updatesLock.Unlock()