From 0fb96afe31754056e556ca26ac223bf4f45259bd Mon Sep 17 00:00:00 2001 From: freddygv Date: Tue, 2 Feb 2021 11:31:14 -0700 Subject: [PATCH 1/7] Avoid potential deadlock using non-blocking send Deadlock scenario: 1. Due to scheduling, the state runner sends one snapshot into snapCh and then attempts to send a second. The first send succeeds because the channel is buffered, but the second blocks. 2. Separately, Manager.Watch is called by the xDS server after getting a discovery request from Envoy. This function acquires the manager lock and then blocks on receiving the CurrentSnapshot from the state runner. 3. Separately, there is a Manager goroutine that reads the snapshots from the channel in step 1. These reads are done to notify proxy watchers, but they require holding the manager lock. This goroutine goes to acquire that lock, but can't because it is held by step 2. Now, the goroutine from step 3 is waiting on the one from step 2 to release the lock. The goroutine from step 2 won't release the lock until the goroutine in step 1 advances. But the goroutine in step 1 is waiting for the one in step 3. Deadlock. By making this send non-blocking step 1 above can proceed. The coalesce timer will be reset and a new valid snapshot will be delivered after it elapses or when one is requested by xDS. --- agent/proxycfg/state.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 703cb66af..59968e83e 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -622,7 +622,14 @@ func (s *state) run() { ) continue } - s.snapCh <- *snapCopy + + select { + case s.snapCh <- *snapCopy: + // try to send + default: + // avoid blocking if a snapshot is already buffered + } + // Allow the next change to trigger a send coalesceTimer = nil From a0be7dcc1d95d3d97e72d52945dfe621e911a7b9 Mon Sep 17 00:00:00 2001 From: freddygv Date: Tue, 2 Feb 2021 12:26:38 -0700 Subject: [PATCH 2/7] Add trace logs to proxycfg state runner and xds srv --- agent/proxycfg/state.go | 40 +++++++++++++++++++++++++++++----------- agent/xds/server.go | 12 ++++++++++++ logging/names.go | 1 + 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 59968e83e..fd9bdc802 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -583,6 +583,8 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot { } func (s *state) run() { + logger := s.logger.Named(logging.ProxyConfig) + // Close the channel we return from Watch when we stop so consumers can stop // watching and clean up their goroutines. It's important we do this here and // not in Close since this routine sends on this chan and so might panic if it @@ -603,10 +605,13 @@ func (s *state) run() { case <-s.ctx.Done(): return case u := <-s.ch: + logger.Trace("A blocking query returned; handling snapshot update", + "proxy-id", s.proxyID.String(), + ) + if err := s.handleUpdate(u, &snap); err != nil { - s.logger.Error("watch error", - "id", u.CorrelationID, - "error", err, + logger.Error("Failed to handle update from watch", + "id", u.CorrelationID, "error", err, ) continue } @@ -616,18 +621,24 @@ func (s *state) run() { // etc on future updates. snapCopy, err := snap.Clone() if err != nil { - s.logger.Error("Failed to copy config snapshot for proxy", - "proxy", s.proxyID, - "error", err, + logger.Error("Failed to copy config snapshot for proxy", + "proxy-id", s.proxyID.String(), "error", err, ) continue } select { + // try to send case s.snapCh <- *snapCopy: - // try to send + logger.Trace("Delivered new snapshot to proxy config watchers", + "proxy-id", s.proxyID.String(), + ) + + // avoid blocking if a snapshot is already buffered default: - // avoid blocking if a snapshot is already buffered + logger.Trace("Failed to deliver new snapshot to proxy config watchers", + "proxy-id", s.proxyID.String(), + ) } // Allow the next change to trigger a send @@ -638,18 +649,25 @@ func (s *state) run() { continue case replyCh := <-s.reqCh: + logger.Trace("A proxy config snapshot was requested", + "proxy-id", s.proxyID.String(), + ) + if !snap.Valid() { // Not valid yet just respond with nil and move on to next task. replyCh <- nil + + logger.Trace("The proxy's config snapshot is not valid yet", + "proxy-id", s.proxyID.String(), + ) continue } // Make a deep copy of snap so we don't mutate any of the embedded structs // etc on future updates. snapCopy, err := snap.Clone() if err != nil { - s.logger.Error("Failed to copy config snapshot for proxy", - "proxy", s.proxyID, - "error", err, + logger.Error("Failed to copy config snapshot for proxy", + "proxy-id", s.proxyID.String(), "error", err, ) continue } diff --git a/agent/xds/server.go b/agent/xds/server.go index d8a7ecbe5..b2506a753 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/hashicorp/consul/logging" "sync/atomic" "time" @@ -164,6 +165,8 @@ const ( ) func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) error { + logger := s.Logger.Named(logging.XDS) + // xDS requires a unique nonce to correlate response/request pairs var nonce uint64 @@ -324,6 +327,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) // state machine. defer watchCancel() + logger.Trace("watching proxy, pending initial proxycfg snapshot", + "proxy-id", proxyID.String()) + // Now wait for the config so we can check ACL state = statePendingInitialConfig case statePendingInitialConfig: @@ -335,6 +341,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) // Got config, try to authenticate next. state = stateRunning + logger.Trace("Got initial config snapshot", + "proxy-id", cfgSnap.ProxyID.String()) + // Lets actually process the config we just got or we'll mis responding fallthrough case stateRunning: @@ -346,6 +355,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) // timer is first started. extendAuthTimer() + logger.Trace("Invoking all xDS resource handlers and sending new data if there is any", + "proxy-id", cfgSnap.ProxyID.String()) + // See if any handlers need to have the current (possibly new) config // sent. Note the order here is actually significant so we can't just // range the map which has no determined order. It's important because: diff --git a/logging/names.go b/logging/names.go index 544990061..b9616b167 100644 --- a/logging/names.go +++ b/logging/names.go @@ -56,5 +56,6 @@ const ( UIMetricsProxy string = "ui_metrics_proxy" WAN string = "wan" Watch string = "watch" + XDS string = "xds" Vault string = "vault" ) From 57c29aba5d3c38df756e009b2b64f144dce62e01 Mon Sep 17 00:00:00 2001 From: freddygv Date: Fri, 5 Feb 2021 15:14:49 -0700 Subject: [PATCH 3/7] Update proxycfg logging, labels were already attached --- agent/proxycfg/state.go | 32 ++++++++++---------------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index fd9bdc802..fcb0302eb 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -583,8 +583,6 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot { } func (s *state) run() { - logger := s.logger.Named(logging.ProxyConfig) - // Close the channel we return from Watch when we stop so consumers can stop // watching and clean up their goroutines. It's important we do this here and // not in Close since this routine sends on this chan and so might panic if it @@ -605,12 +603,10 @@ func (s *state) run() { case <-s.ctx.Done(): return case u := <-s.ch: - logger.Trace("A blocking query returned; handling snapshot update", - "proxy-id", s.proxyID.String(), - ) + s.logger.Trace("A blocking query returned; handling snapshot update") if err := s.handleUpdate(u, &snap); err != nil { - logger.Error("Failed to handle update from watch", + s.logger.Error("Failed to handle update from watch", "id", u.CorrelationID, "error", err, ) continue @@ -621,8 +617,8 @@ func (s *state) run() { // etc on future updates. snapCopy, err := snap.Clone() if err != nil { - logger.Error("Failed to copy config snapshot for proxy", - "proxy-id", s.proxyID.String(), "error", err, + s.logger.Error("Failed to copy config snapshot for proxy", + "error", err, ) continue } @@ -630,15 +626,11 @@ func (s *state) run() { select { // try to send case s.snapCh <- *snapCopy: - logger.Trace("Delivered new snapshot to proxy config watchers", - "proxy-id", s.proxyID.String(), - ) + s.logger.Trace("Delivered new snapshot to proxy config watchers") // avoid blocking if a snapshot is already buffered default: - logger.Trace("Failed to deliver new snapshot to proxy config watchers", - "proxy-id", s.proxyID.String(), - ) + s.logger.Trace("Failed to deliver new snapshot to proxy config watchers") } // Allow the next change to trigger a send @@ -649,25 +641,21 @@ func (s *state) run() { continue case replyCh := <-s.reqCh: - logger.Trace("A proxy config snapshot was requested", - "proxy-id", s.proxyID.String(), - ) + s.logger.Trace("A proxy config snapshot was requested") if !snap.Valid() { // Not valid yet just respond with nil and move on to next task. replyCh <- nil - logger.Trace("The proxy's config snapshot is not valid yet", - "proxy-id", s.proxyID.String(), - ) + s.logger.Trace("The proxy's config snapshot is not valid yet") continue } // Make a deep copy of snap so we don't mutate any of the embedded structs // etc on future updates. snapCopy, err := snap.Clone() if err != nil { - logger.Error("Failed to copy config snapshot for proxy", - "proxy-id", s.proxyID.String(), "error", err, + s.logger.Error("Failed to copy config snapshot for proxy", + "error", err, ) continue } From 8de6b2590c25be4003bbce8023bc0e260053fad3 Mon Sep 17 00:00:00 2001 From: freddygv Date: Fri, 5 Feb 2021 15:15:52 -0700 Subject: [PATCH 4/7] Make xDS labeling consistent with proxycfg --- agent/xds/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/agent/xds/server.go b/agent/xds/server.go index b2506a753..7986b8015 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -328,7 +328,7 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) defer watchCancel() logger.Trace("watching proxy, pending initial proxycfg snapshot", - "proxy-id", proxyID.String()) + "service_id", proxyID.String()) // Now wait for the config so we can check ACL state = statePendingInitialConfig @@ -342,7 +342,7 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) state = stateRunning logger.Trace("Got initial config snapshot", - "proxy-id", cfgSnap.ProxyID.String()) + "service_id", cfgSnap.ProxyID.String()) // Lets actually process the config we just got or we'll mis responding fallthrough @@ -356,7 +356,7 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) extendAuthTimer() logger.Trace("Invoking all xDS resource handlers and sending new data if there is any", - "proxy-id", cfgSnap.ProxyID.String()) + "service_id", cfgSnap.ProxyID.String()) // See if any handlers need to have the current (possibly new) config // sent. Note the order here is actually significant so we can't just From 0a8f2f2105b954f933eecf337e5d611ae4301cc0 Mon Sep 17 00:00:00 2001 From: freddygv Date: Fri, 5 Feb 2021 18:00:59 -0700 Subject: [PATCH 5/7] Retry send after timer fires, in case no updates occur --- agent/proxycfg/state.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index fcb0302eb..ad1ef61ec 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -628,18 +628,27 @@ func (s *state) run() { case s.snapCh <- *snapCopy: s.logger.Trace("Delivered new snapshot to proxy config watchers") - // avoid blocking if a snapshot is already buffered + // Allow the next change to trigger a send + coalesceTimer = nil + + // Skip rest of loop - there is nothing to send since nothing changed on + // this iteration + continue + + // avoid blocking if a snapshot is already buffered, but queue up a retry with a timer default: s.logger.Trace("Failed to deliver new snapshot to proxy config watchers") + + if coalesceTimer == nil { + coalesceTimer = time.AfterFunc(coalesceTimeout, func() { + sendCh <- struct{}{} + }) + } + + // Do not reset coalesceTimer since we just queued a timer-based refresh + continue } - // Allow the next change to trigger a send - coalesceTimer = nil - - // Skip rest of loop - there is nothing to send since nothing changed on - // this iteration - continue - case replyCh := <-s.reqCh: s.logger.Trace("A proxy config snapshot was requested") From a417f88e447e73b2b8f3ca201414d66ed0863dfe Mon Sep 17 00:00:00 2001 From: freddygv Date: Mon, 8 Feb 2021 09:45:45 -0700 Subject: [PATCH 6/7] Update comments on avoiding proxycfg deadlock --- agent/proxycfg/state.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index ad1ef61ec..59d11a2f1 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -624,7 +624,7 @@ func (s *state) run() { } select { - // try to send + // Try to send case s.snapCh <- *snapCopy: s.logger.Trace("Delivered new snapshot to proxy config watchers") @@ -635,10 +635,12 @@ func (s *state) run() { // this iteration continue - // avoid blocking if a snapshot is already buffered, but queue up a retry with a timer + // Avoid blocking if a snapshot is already buffered in snapCh as this can result in a deadlock. + // See PR #9689 for more details. default: s.logger.Trace("Failed to deliver new snapshot to proxy config watchers") + // Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly. if coalesceTimer == nil { coalesceTimer = time.AfterFunc(coalesceTimeout, func() { sendCh <- struct{}{} From 87d4b1911c85823c7def25c4fd050f94d9938ece Mon Sep 17 00:00:00 2001 From: freddygv Date: Mon, 8 Feb 2021 09:45:58 -0700 Subject: [PATCH 7/7] Add changelog entry --- .changelog/9689.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/9689.txt diff --git a/.changelog/9689.txt b/.changelog/9689.txt new file mode 100644 index 000000000..85f78ac90 --- /dev/null +++ b/.changelog/9689.txt @@ -0,0 +1,3 @@ +```release-note:bug +proxycfg: avoid potential deadlock in delivering proxy snapshot to watchers. +``` \ No newline at end of file