Ensure that fewer goroutines survive after a test completes (#14197)

* Various changes to try to ensure that fewer goroutines survive after a test completes:
* add Core.ShutdownWait that doesn't return until shutdown is done
* create the usedCodes cache on seal and nil it out on pre-seal so that the finalizer kills the janitor goroutine
* stop seal health checks on seal rather than wait for them to discover the active context is done
* make sure all lease-loading goroutines are done before returning from restore
* make uniquePoliciesGc discover closed quitCh immediately instead of only when the ticker fires
* make sure all loading goroutines are done before returning from loadEntities, loadCachedEntitiesOfLocalAliases
This commit is contained in:
Nick Cabatoff 2022-02-23 10:33:52 -05:00 committed by GitHub
parent a0bfb70579
commit 2551a3e8ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 70 additions and 31 deletions

3
changelog/14197.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
core: Small changes to ensure goroutines terminate in tests
```

View File

@ -1149,6 +1149,15 @@ func (c *Core) Shutdown() error {
return err return err
} }
func (c *Core) ShutdownWait() error {
donech := c.ShutdownDone()
err := c.Shutdown()
if donech != nil {
<-donech
}
return err
}
// ShutdownDone returns a channel that will be closed after Shutdown completes // ShutdownDone returns a channel that will be closed after Shutdown completes
func (c *Core) ShutdownDone() <-chan struct{} { func (c *Core) ShutdownDone() <-chan struct{} {
return c.shutdownDoneCh return c.shutdownDoneCh
@ -2233,6 +2242,7 @@ func (c *Core) postUnseal(ctx context.Context, ctxCancelFunc context.CancelFunc,
} }
} }
c.loginMFABackend.usedCodes = cache.New(0, 30*time.Second)
c.logger.Info("post-unseal setup complete") c.logger.Info("post-unseal setup complete")
return nil return nil
} }
@ -2243,6 +2253,9 @@ func (c *Core) preSeal() error {
defer metrics.MeasureSince([]string{"core", "pre_seal"}, time.Now()) defer metrics.MeasureSince([]string{"core", "pre_seal"}, time.Now())
c.logger.Info("pre-seal teardown starting") c.logger.Info("pre-seal teardown starting")
if seal, ok := c.seal.(*autoSeal); ok {
seal.StopHealthCheck()
}
// Clear any pending funcs // Clear any pending funcs
c.postUnsealFuncs = nil c.postUnsealFuncs = nil
c.activeTime = time.Time{} c.activeTime = time.Time{}
@ -2305,6 +2318,7 @@ func (c *Core) preSeal() error {
seal.StopHealthCheck() seal.StopHealthCheck()
} }
c.loginMFABackend.usedCodes = nil
preSealPhysical(c) preSealPhysical(c)
c.logger.Info("pre-seal teardown complete") c.logger.Info("pre-seal teardown complete")

View File

@ -364,11 +364,15 @@ func TestCore_Shutdown(t *testing.T) {
// verify the channel returned by ShutdownDone is closed after Finalize // verify the channel returned by ShutdownDone is closed after Finalize
func TestCore_ShutdownDone(t *testing.T) { func TestCore_ShutdownDone(t *testing.T) {
c, _, _ := TestCoreUnsealed(t) c := TestCoreWithSealAndUINoCleanup(t, &CoreConfig{})
testCoreUnsealed(t, c)
doneCh := c.ShutdownDone() doneCh := c.ShutdownDone()
go func() { go func() {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
c.Shutdown() err := c.Shutdown()
if err != nil {
t.Fatal(err)
}
}() }()
select { select {

View File

@ -742,16 +742,17 @@ func (m *ExpirationManager) Restore(errorFunc func()) (retErr error) {
}() }()
// Ensure all keys on the chan are processed // Ensure all keys on the chan are processed
LOOP:
for i := 0; i < leaseCount; i++ { for i := 0; i < leaseCount; i++ {
select { select {
case err := <-errs: case err = <-errs:
// Close all go routines // Close all go routines
close(quit) close(quit)
return err break LOOP
case <-m.quitCh: case <-m.quitCh:
close(quit) close(quit)
return nil break LOOP
case <-result: case <-result:
} }
@ -759,6 +760,9 @@ func (m *ExpirationManager) Restore(errorFunc func()) (retErr error) {
// Let all go routines finish // Let all go routines finish
wg.Wait() wg.Wait()
if err != nil {
return err
}
m.restoreModeLock.Lock() m.restoreModeLock.Lock()
atomic.StoreInt32(m.restoreMode, 0) atomic.StoreInt32(m.restoreMode, 0)
@ -1714,7 +1718,11 @@ func (m *ExpirationManager) inMemoryLeaseInfo(le *leaseEntry) *leaseEntry {
func (m *ExpirationManager) uniquePoliciesGc() { func (m *ExpirationManager) uniquePoliciesGc() {
for { for {
<-m.emptyUniquePolicies.C select {
case <-m.quitCh:
return
case <-m.emptyUniquePolicies.C:
}
// If the maximum lease is a month, and we blow away the unique // If the maximum lease is a month, and we blow away the unique
// policy cache every week, the pessimal case is 4x larger space // policy cache every week, the pessimal case is 4x larger space

View File

@ -269,6 +269,13 @@ func (i *IdentityStore) loadCachedEntitiesOfLocalAliases(ctx context.Context) er
close(broker) close(broker)
}() }()
defer func() {
// Let all go routines finish
wg.Wait()
i.logger.Info("cached entities of local aliases restored")
}()
// Restore each key by pulling from the result chan // Restore each key by pulling from the result chan
for j := 0; j < len(existing); j++ { for j := 0; j < len(existing); j++ {
select { select {
@ -306,13 +313,6 @@ func (i *IdentityStore) loadCachedEntitiesOfLocalAliases(ctx context.Context) er
} }
} }
// Let all go routines finish
wg.Wait()
if i.logger.IsInfo() {
i.logger.Info("cached entities of local aliases restored")
}
return nil return nil
} }
@ -391,13 +391,13 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
}() }()
// Restore each key by pulling from the result chan // Restore each key by pulling from the result chan
LOOP:
for j := 0; j < len(existing); j++ { for j := 0; j < len(existing); j++ {
select { select {
case err := <-errs: case err = <-errs:
// Close all go routines // Close all go routines
close(quit) close(quit)
break LOOP
return err
case bucket := <-result: case bucket := <-result:
// If there is no entry, nothing to restore // If there is no entry, nothing to restore
@ -474,6 +474,9 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
// Let all go routines finish // Let all go routines finish
wg.Wait() wg.Wait()
if err != nil {
return err
}
// Flatten the map into a list of keys, in order to log them // Flatten the map into a list of keys, in order to log them
duplicatedAccessorsList := make([]string, len(duplicatedAccessors)) duplicatedAccessorsList := make([]string, len(duplicatedAccessors))

View File

@ -140,7 +140,6 @@ func NewMFABackend(core *Core, logger hclog.Logger, prefix string, schemaFuncs [
mfaLogger: logger.Named("mfa"), mfaLogger: logger.Named("mfa"),
namespacer: core, namespacer: core,
methodTable: prefix, methodTable: prefix,
usedCodes: cache.New(0, 30*time.Second),
} }
} }

View File

@ -199,6 +199,7 @@ func TestAutoSeal_HealthCheck(t *testing.T) {
sealHealthTestIntervalUnhealthy = 10 * time.Millisecond sealHealthTestIntervalUnhealthy = 10 * time.Millisecond
setErr(errors.New("disconnected")) setErr(errors.New("disconnected"))
autoSeal.StartHealthCheck() autoSeal.StartHealthCheck()
defer autoSeal.StopHealthCheck()
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)

View File

@ -161,6 +161,23 @@ func TestCoreUI(t testing.T, enableUI bool) *Core {
} }
func TestCoreWithSealAndUI(t testing.T, opts *CoreConfig) *Core { func TestCoreWithSealAndUI(t testing.T, opts *CoreConfig) *Core {
c := TestCoreWithSealAndUINoCleanup(t, opts)
t.Cleanup(func() {
defer func() {
if r := recover(); r != nil {
t.Log("panic closing core during cleanup", "panic", r)
}
}()
err := c.ShutdownWait()
if err != nil {
t.Logf("shutdown returned error: %v", err)
}
})
return c
}
func TestCoreWithSealAndUINoCleanup(t testing.T, opts *CoreConfig) *Core {
logger := logging.NewVaultLogger(log.Trace) logger := logging.NewVaultLogger(log.Trace)
physicalBackend, err := physInmem.NewInmem(nil, logger) physicalBackend, err := physInmem.NewInmem(nil, logger)
if err != nil { if err != nil {
@ -209,15 +226,6 @@ func TestCoreWithSealAndUI(t testing.T, opts *CoreConfig) *Core {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
t.Cleanup(func() {
defer func() {
if r := recover(); r != nil {
t.Log("panic closing core during cleanup", "panic", r)
}
}()
c.Shutdown()
})
return c return c
} }
@ -389,10 +397,6 @@ func testCoreUnsealed(t testing.T, core *Core) (*Core, [][]byte, string) {
} }
testCoreAddSecretMount(t, core, token) testCoreAddSecretMount(t, core, token)
t.Cleanup(func() {
core.Shutdown()
})
return core, keys, token return core, keys, token
} }
@ -452,7 +456,10 @@ func TestCoreUnsealedBackend(t testing.T, backend physical.Backend) (*Core, [][]
t.Log("panic closing core during cleanup", "panic", r) t.Log("panic closing core during cleanup", "panic", r)
} }
}() }()
core.Shutdown() err := core.ShutdownWait()
if err != nil {
t.Logf("shutdown returned error: %v", err)
}
}) })
return core, keys, token return core, keys, token