bug: fix goroutine leaks caused by incorrect usage of `WatchCh` (#14916)

memdb's `WatchCh` method creates a goroutine that will publish to the
returned channel when the watchset is triggered or the given context
is canceled. Although this is called out in its godoc comment, it's
not obvious that this method creates a goroutine who's lifecycle you
need to manage.

In the xDS capacity controller, we were calling `WatchCh` on each
iteration of the control loop, meaning the number of goroutines would
grow on each autopilot event until there was catalog churn.

In the catalog config source, we were calling `WatchCh` with the
background context, meaning that the goroutine would keep running after
the sync loop had terminated.
This commit is contained in:
Dan Upton 2022-10-13 12:04:27 +01:00 committed by GitHub
parent 56580d6fa6
commit 36a3d00f0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 6 deletions

3
.changelog/14916.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
server: fix goroutine/memory leaks in the xDS subsystem (these were present regardless of whether or not xDS was in-use)
```

View File

@ -79,7 +79,7 @@ func NewController(cfg Config) *Controller {
func (c *Controller) Run(ctx context.Context) { func (c *Controller) Run(ctx context.Context) {
defer close(c.doneCh) defer close(c.doneCh)
ws, numProxies, err := c.countProxies(ctx) watchCh, numProxies, err := c.countProxies(ctx)
if err != nil { if err != nil {
return return
} }
@ -90,8 +90,8 @@ func (c *Controller) Run(ctx context.Context) {
case s := <-c.serverCh: case s := <-c.serverCh:
numServers = s numServers = s
c.updateMaxSessions(numServers, numProxies) c.updateMaxSessions(numServers, numProxies)
case <-ws.WatchCh(ctx): case <-watchCh:
ws, numProxies, err = c.countProxies(ctx) watchCh, numProxies, err = c.countProxies(ctx)
if err != nil { if err != nil {
return return
} }
@ -170,7 +170,7 @@ func (c *Controller) updateMaxSessions(numServers, numProxies uint32) {
// countProxies counts the number of registered proxy services, retrying on // countProxies counts the number of registered proxy services, retrying on
// error until the given context is cancelled. // error until the given context is cancelled.
func (c *Controller) countProxies(ctx context.Context) (memdb.WatchSet, uint32, error) { func (c *Controller) countProxies(ctx context.Context) (<-chan error, uint32, error) {
retryWaiter := &retry.Waiter{ retryWaiter := &retry.Waiter{
MinFailures: 1, MinFailures: 1,
MinWait: 1 * time.Second, MinWait: 1 * time.Second,
@ -200,7 +200,7 @@ func (c *Controller) countProxies(ctx context.Context) (memdb.WatchSet, uint32,
count += uint32(kindCount) count += uint32(kindCount)
} }
} }
return ws, count, nil return ws.WatchCh(ctx), count, nil
} }
} }

View File

@ -141,10 +141,17 @@ func (m *ConfigSource) startSync(closeCh <-chan chan struct{}, proxyID proxycfg.
} }
syncLoop := func(ws memdb.WatchSet) { syncLoop := func(ws memdb.WatchSet) {
// Cancel the context on return to clean up the goroutine started by WatchCh.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for { for {
select { select {
case <-ws.WatchCh(context.Background()): case <-ws.WatchCh(ctx):
// Something changed, unblock and re-run the query. // Something changed, unblock and re-run the query.
//
// It is expected that all other branches of this select will return and
// cancel the context given to WatchCh (to clean up its goroutine).
case doneCh := <-closeCh: case doneCh := <-closeCh:
// All watchers of this service (xDS streams) have gone away, so it's time // All watchers of this service (xDS streams) have gone away, so it's time
// to free its resources. // to free its resources.