client: always wait 200ms before sending updates
Always wait 200ms before calling the Node.UpdateAlloc RPC to send allocation updates to servers. Prior to this change we only reset the update ticker when an error was encountered. This meant the 200ms ticker was running while the RPC was being performed. If the RPC was slow due to network latency or server load and took >=200ms, the ticker would tick during the RPC. Then on the next loop only the select would randomly choose between the two viable cases: receive an update or fire the RPC again. If the RPC case won it would immediately loop again due to there being no updates to send. When the update chan receive is selected a single update is added to the slice. The odds are then 50/50 that the subsequent loop will send the single update instead of receiving any more updates. This could cause a couple of problems: 1. Since only a small number of updates are sent, the chan buffer may fill, applying backpressure, and slowing down other client operations. 2. The small number of updates sent may already be stale and not represent the current state of the allocation locally. A risk here is that it's hard to reason about how this will interact with the 50ms batches on servers when the servers under load. A further improvement would be to completely remove the alloc update chan and instead use a mutex to build a map of alloc updates. I wanted to test the lowest risk possible change on loaded servers first before making more drastic changes.
This commit is contained in:
parent
9bd1f267d2
commit
5ec065b180
|
@ -1904,7 +1904,6 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) {
|
|||
// allocSync is a long lived function that batches allocation updates to the
|
||||
// server.
|
||||
func (c *Client) allocSync() {
|
||||
staggered := false
|
||||
syncTicker := time.NewTicker(allocSyncIntv)
|
||||
updates := make(map[string]*structs.Allocation)
|
||||
for {
|
||||
|
@ -1933,19 +1932,24 @@ func (c *Client) allocSync() {
|
|||
}
|
||||
|
||||
var resp structs.GenericResponse
|
||||
if err := c.RPC("Node.UpdateAlloc", &args, &resp); err != nil {
|
||||
err := c.RPC("Node.UpdateAlloc", &args, &resp)
|
||||
if err != nil {
|
||||
// Error updating allocations, do *not* clear
|
||||
// updates and retry after backoff
|
||||
c.logger.Error("error updating allocations", "error", err)
|
||||
syncTicker.Stop()
|
||||
syncTicker = time.NewTicker(c.retryIntv(allocSyncRetryIntv))
|
||||
staggered = true
|
||||
} else {
|
||||
updates = make(map[string]*structs.Allocation)
|
||||
if staggered {
|
||||
syncTicker.Stop()
|
||||
syncTicker = time.NewTicker(allocSyncIntv)
|
||||
staggered = false
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Successfully updated allocs, reset map and ticker.
|
||||
// Always reset ticker to give loop time to receive
|
||||
// alloc updates. If the RPC took the ticker interval
|
||||
// we may call it in a tight loop before draining
|
||||
// buffered updates.
|
||||
updates = make(map[string]*structs.Allocation, len(updates))
|
||||
syncTicker.Stop()
|
||||
syncTicker = time.NewTicker(allocSyncIntv)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue