Avoid potential proxycfg/xDS deadlock using non-blocking send
This commit is contained in:
commit
5a50b26767
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
proxycfg: avoid potential deadlock in delivering proxy snapshot to watchers.
|
||||||
|
```
|
|
@ -603,10 +603,11 @@ func (s *state) run() {
|
||||||
case <-s.ctx.Done():
|
case <-s.ctx.Done():
|
||||||
return
|
return
|
||||||
case u := <-s.ch:
|
case u := <-s.ch:
|
||||||
|
s.logger.Trace("A blocking query returned; handling snapshot update")
|
||||||
|
|
||||||
if err := s.handleUpdate(u, &snap); err != nil {
|
if err := s.handleUpdate(u, &snap); err != nil {
|
||||||
s.logger.Error("watch error",
|
s.logger.Error("Failed to handle update from watch",
|
||||||
"id", u.CorrelationID,
|
"id", u.CorrelationID, "error", err,
|
||||||
"error", err,
|
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -617,23 +618,47 @@ func (s *state) run() {
|
||||||
snapCopy, err := snap.Clone()
|
snapCopy, err := snap.Clone()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Error("Failed to copy config snapshot for proxy",
|
s.logger.Error("Failed to copy config snapshot for proxy",
|
||||||
"proxy", s.proxyID,
|
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.snapCh <- *snapCopy
|
|
||||||
// Allow the next change to trigger a send
|
|
||||||
coalesceTimer = nil
|
|
||||||
|
|
||||||
// Skip rest of loop - there is nothing to send since nothing changed on
|
select {
|
||||||
// this iteration
|
// Try to send
|
||||||
continue
|
case s.snapCh <- *snapCopy:
|
||||||
|
s.logger.Trace("Delivered new snapshot to proxy config watchers")
|
||||||
|
|
||||||
|
// Allow the next change to trigger a send
|
||||||
|
coalesceTimer = nil
|
||||||
|
|
||||||
|
// Skip rest of loop - there is nothing to send since nothing changed on
|
||||||
|
// this iteration
|
||||||
|
continue
|
||||||
|
|
||||||
|
// Avoid blocking if a snapshot is already buffered in snapCh as this can result in a deadlock.
|
||||||
|
// See PR #9689 for more details.
|
||||||
|
default:
|
||||||
|
s.logger.Trace("Failed to deliver new snapshot to proxy config watchers")
|
||||||
|
|
||||||
|
// Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly.
|
||||||
|
if coalesceTimer == nil {
|
||||||
|
coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
|
||||||
|
sendCh <- struct{}{}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do not reset coalesceTimer since we just queued a timer-based refresh
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
case replyCh := <-s.reqCh:
|
case replyCh := <-s.reqCh:
|
||||||
|
s.logger.Trace("A proxy config snapshot was requested")
|
||||||
|
|
||||||
if !snap.Valid() {
|
if !snap.Valid() {
|
||||||
// Not valid yet just respond with nil and move on to next task.
|
// Not valid yet just respond with nil and move on to next task.
|
||||||
replyCh <- nil
|
replyCh <- nil
|
||||||
|
|
||||||
|
s.logger.Trace("The proxy's config snapshot is not valid yet")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Make a deep copy of snap so we don't mutate any of the embedded structs
|
// Make a deep copy of snap so we don't mutate any of the embedded structs
|
||||||
|
@ -641,7 +666,6 @@ func (s *state) run() {
|
||||||
snapCopy, err := snap.Clone()
|
snapCopy, err := snap.Clone()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Error("Failed to copy config snapshot for proxy",
|
s.logger.Error("Failed to copy config snapshot for proxy",
|
||||||
"proxy", s.proxyID,
|
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/hashicorp/consul/logging"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -164,6 +165,8 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) error {
|
func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) error {
|
||||||
|
logger := s.Logger.Named(logging.XDS)
|
||||||
|
|
||||||
// xDS requires a unique nonce to correlate response/request pairs
|
// xDS requires a unique nonce to correlate response/request pairs
|
||||||
var nonce uint64
|
var nonce uint64
|
||||||
|
|
||||||
|
@ -324,6 +327,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
||||||
// state machine.
|
// state machine.
|
||||||
defer watchCancel()
|
defer watchCancel()
|
||||||
|
|
||||||
|
logger.Trace("watching proxy, pending initial proxycfg snapshot",
|
||||||
|
"service_id", proxyID.String())
|
||||||
|
|
||||||
// Now wait for the config so we can check ACL
|
// Now wait for the config so we can check ACL
|
||||||
state = statePendingInitialConfig
|
state = statePendingInitialConfig
|
||||||
case statePendingInitialConfig:
|
case statePendingInitialConfig:
|
||||||
|
@ -335,6 +341,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
||||||
// Got config, try to authenticate next.
|
// Got config, try to authenticate next.
|
||||||
state = stateRunning
|
state = stateRunning
|
||||||
|
|
||||||
|
logger.Trace("Got initial config snapshot",
|
||||||
|
"service_id", cfgSnap.ProxyID.String())
|
||||||
|
|
||||||
// Lets actually process the config we just got or we'll mis responding
|
// Lets actually process the config we just got or we'll mis responding
|
||||||
fallthrough
|
fallthrough
|
||||||
case stateRunning:
|
case stateRunning:
|
||||||
|
@ -346,6 +355,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
||||||
// timer is first started.
|
// timer is first started.
|
||||||
extendAuthTimer()
|
extendAuthTimer()
|
||||||
|
|
||||||
|
logger.Trace("Invoking all xDS resource handlers and sending new data if there is any",
|
||||||
|
"service_id", cfgSnap.ProxyID.String())
|
||||||
|
|
||||||
// See if any handlers need to have the current (possibly new) config
|
// See if any handlers need to have the current (possibly new) config
|
||||||
// sent. Note the order here is actually significant so we can't just
|
// sent. Note the order here is actually significant so we can't just
|
||||||
// range the map which has no determined order. It's important because:
|
// range the map which has no determined order. It's important because:
|
||||||
|
|
|
@ -56,5 +56,6 @@ const (
|
||||||
UIMetricsProxy string = "ui_metrics_proxy"
|
UIMetricsProxy string = "ui_metrics_proxy"
|
||||||
WAN string = "wan"
|
WAN string = "wan"
|
||||||
Watch string = "watch"
|
Watch string = "watch"
|
||||||
|
XDS string = "xds"
|
||||||
Vault string = "vault"
|
Vault string = "vault"
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue