From 36a3d00f0ddcd68f9c7a96e079c6c9d0ecd32eaa Mon Sep 17 00:00:00 2001 From: Dan Upton Date: Thu, 13 Oct 2022 12:04:27 +0100 Subject: [PATCH] bug: fix goroutine leaks caused by incorrect usage of `WatchCh` (#14916) memdb's `WatchCh` method creates a goroutine that will publish to the returned channel when the watchset is triggered or the given context is canceled. Although this is called out in its godoc comment, it's not obvious that this method creates a goroutine who's lifecycle you need to manage. In the xDS capacity controller, we were calling `WatchCh` on each iteration of the control loop, meaning the number of goroutines would grow on each autopilot event until there was catalog churn. In the catalog config source, we were calling `WatchCh` with the background context, meaning that the goroutine would keep running after the sync loop had terminated. --- .changelog/14916.txt | 3 +++ agent/consul/xdscapacity/capacity.go | 10 +++++----- agent/proxycfg-sources/catalog/config_source.go | 9 ++++++++- 3 files changed, 16 insertions(+), 6 deletions(-) create mode 100644 .changelog/14916.txt diff --git a/.changelog/14916.txt b/.changelog/14916.txt new file mode 100644 index 000000000..201242489 --- /dev/null +++ b/.changelog/14916.txt @@ -0,0 +1,3 @@ +```release-note:bug +server: fix goroutine/memory leaks in the xDS subsystem (these were present regardless of whether or not xDS was in-use) +``` diff --git a/agent/consul/xdscapacity/capacity.go b/agent/consul/xdscapacity/capacity.go index 2e24a09e0..f1974b964 100644 --- a/agent/consul/xdscapacity/capacity.go +++ b/agent/consul/xdscapacity/capacity.go @@ -79,7 +79,7 @@ func NewController(cfg Config) *Controller { func (c *Controller) Run(ctx context.Context) { defer close(c.doneCh) - ws, numProxies, err := c.countProxies(ctx) + watchCh, numProxies, err := c.countProxies(ctx) if err != nil { return } @@ -90,8 +90,8 @@ func (c *Controller) Run(ctx context.Context) { case s := <-c.serverCh: numServers = s c.updateMaxSessions(numServers, numProxies) - case <-ws.WatchCh(ctx): - ws, numProxies, err = c.countProxies(ctx) + case <-watchCh: + watchCh, numProxies, err = c.countProxies(ctx) if err != nil { return } @@ -170,7 +170,7 @@ func (c *Controller) updateMaxSessions(numServers, numProxies uint32) { // countProxies counts the number of registered proxy services, retrying on // error until the given context is cancelled. -func (c *Controller) countProxies(ctx context.Context) (memdb.WatchSet, uint32, error) { +func (c *Controller) countProxies(ctx context.Context) (<-chan error, uint32, error) { retryWaiter := &retry.Waiter{ MinFailures: 1, MinWait: 1 * time.Second, @@ -200,7 +200,7 @@ func (c *Controller) countProxies(ctx context.Context) (memdb.WatchSet, uint32, count += uint32(kindCount) } } - return ws, count, nil + return ws.WatchCh(ctx), count, nil } } diff --git a/agent/proxycfg-sources/catalog/config_source.go b/agent/proxycfg-sources/catalog/config_source.go index 56f222fcd..6a49e2812 100644 --- a/agent/proxycfg-sources/catalog/config_source.go +++ b/agent/proxycfg-sources/catalog/config_source.go @@ -141,10 +141,17 @@ func (m *ConfigSource) startSync(closeCh <-chan chan struct{}, proxyID proxycfg. } syncLoop := func(ws memdb.WatchSet) { + // Cancel the context on return to clean up the goroutine started by WatchCh. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { select { - case <-ws.WatchCh(context.Background()): + case <-ws.WatchCh(ctx): // Something changed, unblock and re-run the query. + // + // It is expected that all other branches of this select will return and + // cancel the context given to WatchCh (to clean up its goroutine). case doneCh := <-closeCh: // All watchers of this service (xDS streams) have gone away, so it's time // to free its resources.