Make TestActivityLog_MultipleFragmentsAndSegments timeout on its own (#11490)
* The main driver for this change was to make the read from a.newFragmentCh timeout quickly rather than waiting for the test timeout (much longer). While testing the change I observed a panic during shutdown, but it was swallowed and moreover there was no stack trace so it wasn't obvious. I'm hoping we can get rid of the recover, so I fixed the issue in the activitylog tests that needed it.
This commit is contained in:
parent
67374ba14d
commit
663ad150a7
|
@ -456,7 +456,6 @@ func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime
|
|||
func (a *ActivityLog) WalkEntitySegments(ctx context.Context,
|
||||
startTime time.Time,
|
||||
walkFn func(*activity.EntityActivityLog)) error {
|
||||
|
||||
basePath := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/"
|
||||
pathList, err := a.view.List(ctx, basePath)
|
||||
if err != nil {
|
||||
|
@ -486,7 +485,6 @@ func (a *ActivityLog) WalkEntitySegments(ctx context.Context,
|
|||
func (a *ActivityLog) WalkTokenSegments(ctx context.Context,
|
||||
startTime time.Time,
|
||||
walkFn func(*activity.TokenCount)) error {
|
||||
|
||||
basePath := activityTokenBasePath + fmt.Sprint(startTime.Unix()) + "/"
|
||||
pathList, err := a.view.List(ctx, basePath)
|
||||
if err != nil {
|
||||
|
@ -995,7 +993,7 @@ func (c *Core) setupActivityLog(ctx context.Context, wg *sync.WaitGroup) error {
|
|||
|
||||
// stopActivityLog removes the ActivityLog from Core
|
||||
// and frees any resources.
|
||||
func (c *Core) stopActivityLog() error {
|
||||
func (c *Core) stopActivityLog() {
|
||||
if c.tokenStore != nil {
|
||||
c.tokenStore.SetActivityLog(nil)
|
||||
}
|
||||
|
@ -1007,7 +1005,6 @@ func (c *Core) stopActivityLog() error {
|
|||
}
|
||||
|
||||
c.activityLog = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *ActivityLog) StartOfNextMonth() time.Time {
|
||||
|
@ -1526,14 +1523,20 @@ func (a *ActivityLog) precomputedQueryWorker() error {
|
|||
|
||||
// Cancel the context if activity log is shut down.
|
||||
// This will cause the next storage operation to fail.
|
||||
go func() {
|
||||
a.l.RLock()
|
||||
// doneCh is modified in some tests, so we don't want to access that member
|
||||
// without a lock, but we don't want to hold the lock for the entire lifetime
|
||||
// of this goroutine. Passing the channel to the goroutine works here because
|
||||
// no tests depend on us accessing the new doneCh after modifying the field.
|
||||
go func(done chan struct{}) {
|
||||
select {
|
||||
case <-a.doneCh:
|
||||
case <-done:
|
||||
cancel()
|
||||
case <-ctx.Done():
|
||||
break
|
||||
}
|
||||
}()
|
||||
}(a.doneCh)
|
||||
a.l.RUnlock()
|
||||
|
||||
// Load the intent log
|
||||
rawIntentLog, err := a.view.Get(ctx, activityIntentLogKey)
|
||||
|
|
|
@ -431,6 +431,11 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
|
|||
|
||||
// Stop timers for test purposes
|
||||
close(a.doneCh)
|
||||
defer func() {
|
||||
a.l.Lock()
|
||||
a.doneCh = make(chan struct{}, 1)
|
||||
a.l.Unlock()
|
||||
}()
|
||||
|
||||
startTimestamp := a.GetStartTimestamp()
|
||||
path0 := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", startTimestamp)
|
||||
|
@ -516,7 +521,11 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
|
|||
a.receivedFragment(fragment1)
|
||||
a.receivedFragment(fragment2)
|
||||
|
||||
<-a.newFragmentCh
|
||||
select {
|
||||
case <-a.newFragmentCh:
|
||||
case <-time.After(time.Minute):
|
||||
t.Fatal("timed out waiting for new fragment")
|
||||
}
|
||||
|
||||
err = a.saveCurrentSegmentToStorage(context.Background(), false)
|
||||
if err != nil {
|
||||
|
@ -1376,6 +1385,9 @@ func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testi
|
|||
|
||||
var wg sync.WaitGroup
|
||||
close(a.doneCh)
|
||||
defer func() {
|
||||
a.doneCh = make(chan struct{}, 1)
|
||||
}()
|
||||
|
||||
err := a.refreshFromStoredLog(context.Background(), &wg, time.Now().UTC())
|
||||
if err != nil {
|
||||
|
@ -2249,7 +2261,8 @@ func TestActivityLog_PrecomputeCancel(t *testing.T) {
|
|||
|
||||
// This will block if the shutdown didn't work.
|
||||
go func() {
|
||||
a.precomputedQueryWorker()
|
||||
// We expect this to error because of BlockingInmemStorage
|
||||
_ = a.precomputedQueryWorker()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
|
|
|
@ -2154,9 +2154,7 @@ func (c *Core) preSeal() error {
|
|||
if err := c.stopExpiration(); err != nil {
|
||||
result = multierror.Append(result, errwrap.Wrapf("error stopping expiration: {{err}}", err))
|
||||
}
|
||||
if err := c.stopActivityLog(); err != nil {
|
||||
result = multierror.Append(result, errwrap.Wrapf("error stopping activity log: {{err}}", err))
|
||||
}
|
||||
c.stopActivityLog()
|
||||
if err := c.teardownCredentials(context.Background()); err != nil {
|
||||
result = multierror.Append(result, errwrap.Wrapf("error tearing down credentials: {{err}}", err))
|
||||
}
|
||||
|
|
|
@ -554,7 +554,8 @@ func benchmarkExpirationBackend(b *testing.B, physicalBackend physical.Backend,
|
|||
}
|
||||
|
||||
func TestExpiration_Restore(t *testing.T) {
|
||||
exp := mockExpiration(t)
|
||||
c, _, _ := TestCoreUnsealed(t)
|
||||
exp := c.expiration
|
||||
noop := &NoopBackend{}
|
||||
_, barrier, _ := mockBarrier(t)
|
||||
view := NewBarrierView(barrier, "logical/")
|
||||
|
@ -601,7 +602,7 @@ func TestExpiration_Restore(t *testing.T) {
|
|||
}
|
||||
|
||||
// Stop everything
|
||||
err = exp.Stop()
|
||||
err = c.stopExpiration()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
|
|
@ -359,11 +359,6 @@ func testCoreUnsealed(t testing.T, core *Core) (*Core, [][]byte, string) {
|
|||
testCoreAddSecretMount(t, core, token)
|
||||
|
||||
t.Cleanup(func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
t.Log("panic closing core during cleanup", "panic", r)
|
||||
}
|
||||
}()
|
||||
core.Shutdown()
|
||||
})
|
||||
return core, keys, token
|
||||
|
@ -1871,7 +1866,6 @@ func (testCluster *TestCluster) newCore(t testing.T, idx int, coreConfig *CoreCo
|
|||
func (testCluster *TestCluster) setupClusterListener(
|
||||
t testing.T, idx int, core *Core, coreConfig *CoreConfig,
|
||||
opts *TestClusterOptions, listeners []*TestListener, handler http.Handler) {
|
||||
|
||||
if coreConfig.ClusterAddr == "" {
|
||||
return
|
||||
}
|
||||
|
@ -2058,7 +2052,6 @@ func (tc *TestCluster) initCores(t testing.T, opts *TestClusterOptions, addAudit
|
|||
func (testCluster *TestCluster) getAPIClient(
|
||||
t testing.T, opts *TestClusterOptions,
|
||||
port int, tlsConfig *tls.Config) *api.Client {
|
||||
|
||||
transport := cleanhttp.DefaultPooledTransport()
|
||||
transport.TLSClientConfig = tlsConfig.Clone()
|
||||
if err := http2.ConfigureTransport(transport); err != nil {
|
||||
|
|
|
@ -1150,7 +1150,7 @@ func TestTokenStore_CreateLookup_ExpirationInRestoreMode(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
err = ts.expiration.Stop()
|
||||
err = c.stopExpiration()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue