Make some activity log tests less flaky (#17028)
* OSS parts of ent #3157. Some activity log tests were flaky because background workers could race with them; now we overload DisableTimers to stop some of them from running, and add some channels we can use to wait for others to complete before we start testing. * Add CL
This commit is contained in:
parent
cd574b5cc6
commit
7842b861b3
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:improvement
|
||||||
|
core: Activity log goroutine management improvements to allow tests to be more deterministic.
|
||||||
|
```
|
|
@ -162,6 +162,7 @@ type ActivityLog struct {
|
||||||
// channel closed when deletion at startup is done
|
// channel closed when deletion at startup is done
|
||||||
// (for unit test robustness)
|
// (for unit test robustness)
|
||||||
retentionDone chan struct{}
|
retentionDone chan struct{}
|
||||||
|
computationWorkerDone chan struct{}
|
||||||
|
|
||||||
// for testing: is config currently being invalidated. protected by l
|
// for testing: is config currently being invalidated. protected by l
|
||||||
configInvalidationInProgress bool
|
configInvalidationInProgress bool
|
||||||
|
@ -1062,15 +1063,19 @@ func (c *Core) setupActivityLog(ctx context.Context, wg *sync.WaitGroup) error {
|
||||||
go manager.activeFragmentWorker(ctx)
|
go manager.activeFragmentWorker(ctx)
|
||||||
|
|
||||||
// Check for any intent log, in the background
|
// Check for any intent log, in the background
|
||||||
go manager.precomputedQueryWorker(ctx)
|
manager.computationWorkerDone = make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
manager.precomputedQueryWorker(ctx)
|
||||||
|
close(manager.computationWorkerDone)
|
||||||
|
}()
|
||||||
|
|
||||||
// Catch up on garbage collection
|
// Catch up on garbage collection
|
||||||
// Signal when this is done so that unit tests can proceed.
|
// Signal when this is done so that unit tests can proceed.
|
||||||
manager.retentionDone = make(chan struct{})
|
manager.retentionDone = make(chan struct{})
|
||||||
go func() {
|
go func(months int) {
|
||||||
manager.retentionWorker(ctx, time.Now(), manager.retentionMonths)
|
manager.retentionWorker(ctx, time.Now(), months)
|
||||||
close(manager.retentionDone)
|
close(manager.retentionDone)
|
||||||
}()
|
}(manager.retentionMonths)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -1198,6 +1203,11 @@ func (a *ActivityLog) activeFragmentWorker(ctx context.Context) {
|
||||||
endOfMonth.Stop()
|
endOfMonth.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
endOfMonthChannel := endOfMonth.C
|
||||||
|
if a.core.activityLogConfig.DisableTimers {
|
||||||
|
endOfMonthChannel = nil
|
||||||
|
}
|
||||||
|
|
||||||
writeFunc := func() {
|
writeFunc := func() {
|
||||||
ctx, cancel := context.WithTimeout(ctx, activitySegmentWriteTimeout)
|
ctx, cancel := context.WithTimeout(ctx, activitySegmentWriteTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
@ -1212,6 +1222,7 @@ func (a *ActivityLog) activeFragmentWorker(ctx context.Context) {
|
||||||
a.l.RLock()
|
a.l.RLock()
|
||||||
doneCh := a.doneCh
|
doneCh := a.doneCh
|
||||||
a.l.RUnlock()
|
a.l.RUnlock()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-doneCh:
|
case <-doneCh:
|
||||||
|
@ -1241,7 +1252,7 @@ func (a *ActivityLog) activeFragmentWorker(ctx context.Context) {
|
||||||
|
|
||||||
// Simpler, but ticker.Reset was introduced in go 1.15:
|
// Simpler, but ticker.Reset was introduced in go 1.15:
|
||||||
// ticker.Reset(activitySegmentInterval)
|
// ticker.Reset(activitySegmentInterval)
|
||||||
case currentTime := <-endOfMonth.C:
|
case currentTime := <-endOfMonthChannel:
|
||||||
err := a.HandleEndOfMonth(ctx, currentTime.UTC())
|
err := a.HandleEndOfMonth(ctx, currentTime.UTC())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.logger.Error("failed to perform end of month rotation", "error", err)
|
a.logger.Error("failed to perform end of month rotation", "error", err)
|
||||||
|
@ -2165,6 +2176,10 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
|
||||||
// We expect the return value won't be checked, so log errors as they occur
|
// We expect the return value won't be checked, so log errors as they occur
|
||||||
// (but for unit testing having the error return should help.)
|
// (but for unit testing having the error return should help.)
|
||||||
func (a *ActivityLog) retentionWorker(ctx context.Context, currentTime time.Time, retentionMonths int) error {
|
func (a *ActivityLog) retentionWorker(ctx context.Context, currentTime time.Time, retentionMonths int) error {
|
||||||
|
if a.core.activityLogConfig.DisableTimers {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
|
|
@ -1787,7 +1787,12 @@ func TestActivityLog_Export(t *testing.T) {
|
||||||
october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC))
|
october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC))
|
||||||
november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC))
|
november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
|
||||||
core, _, _, _ := TestCoreUnsealedWithMetrics(t)
|
core, _, _ := TestCoreUnsealedWithConfig(t, &CoreConfig{
|
||||||
|
ActivityLogConfig: ActivityLogCoreConfig{
|
||||||
|
DisableTimers: true,
|
||||||
|
ForceEnable: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
a := core.activityLog
|
a := core.activityLog
|
||||||
ctx := namespace.RootContext(nil)
|
ctx := namespace.RootContext(nil)
|
||||||
|
|
||||||
|
@ -2278,8 +2283,16 @@ func TestActivityLog_CalculatePrecomputedQueriesWithMixedTWEs(t *testing.T) {
|
||||||
october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC))
|
october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC))
|
||||||
november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC))
|
november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
|
||||||
core, _, _, sink := TestCoreUnsealedWithMetrics(t)
|
conf := &CoreConfig{
|
||||||
|
ActivityLogConfig: ActivityLogCoreConfig{
|
||||||
|
ForceEnable: true,
|
||||||
|
DisableTimers: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
sink := SetupMetrics(conf)
|
||||||
|
core, _, _ := TestCoreUnsealedWithConfig(t, conf)
|
||||||
a := core.activityLog
|
a := core.activityLog
|
||||||
|
<-a.computationWorkerDone
|
||||||
ctx := namespace.RootContext(nil)
|
ctx := namespace.RootContext(nil)
|
||||||
|
|
||||||
// Generate overlapping sets of entity IDs from this list.
|
// Generate overlapping sets of entity IDs from this list.
|
||||||
|
@ -2704,7 +2717,14 @@ func TestActivityLog_Precompute(t *testing.T) {
|
||||||
october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC))
|
october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC))
|
||||||
november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC))
|
november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
|
||||||
core, _, _, sink := TestCoreUnsealedWithMetrics(t)
|
conf := &CoreConfig{
|
||||||
|
ActivityLogConfig: ActivityLogCoreConfig{
|
||||||
|
ForceEnable: true,
|
||||||
|
DisableTimers: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
sink := SetupMetrics(conf)
|
||||||
|
core, _, _ := TestCoreUnsealedWithConfig(t, conf)
|
||||||
a := core.activityLog
|
a := core.activityLog
|
||||||
ctx := namespace.RootContext(nil)
|
ctx := namespace.RootContext(nil)
|
||||||
|
|
||||||
|
@ -3038,7 +3058,12 @@ func TestActivityLog_Precompute_SkipMonth(t *testing.T) {
|
||||||
november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC))
|
november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC))
|
||||||
december := timeutil.StartOfMonth(time.Date(2020, 12, 1, 0, 0, 0, 0, time.UTC))
|
december := timeutil.StartOfMonth(time.Date(2020, 12, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
|
||||||
core, _, _, _ := TestCoreUnsealedWithMetrics(t)
|
core, _, _ := TestCoreUnsealedWithConfig(t, &CoreConfig{
|
||||||
|
ActivityLogConfig: ActivityLogCoreConfig{
|
||||||
|
ForceEnable: true,
|
||||||
|
DisableTimers: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
a := core.activityLog
|
a := core.activityLog
|
||||||
ctx := namespace.RootContext(nil)
|
ctx := namespace.RootContext(nil)
|
||||||
|
|
||||||
|
@ -3219,7 +3244,14 @@ func TestActivityLog_PrecomputeNonEntityTokensWithID(t *testing.T) {
|
||||||
october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC))
|
october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC))
|
||||||
november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC))
|
november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
|
||||||
core, _, _, sink := TestCoreUnsealedWithMetrics(t)
|
conf := &CoreConfig{
|
||||||
|
ActivityLogConfig: ActivityLogCoreConfig{
|
||||||
|
ForceEnable: true,
|
||||||
|
DisableTimers: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
sink := SetupMetrics(conf)
|
||||||
|
core, _, _ := TestCoreUnsealedWithConfig(t, conf)
|
||||||
a := core.activityLog
|
a := core.activityLog
|
||||||
ctx := namespace.RootContext(nil)
|
ctx := namespace.RootContext(nil)
|
||||||
|
|
||||||
|
|
|
@ -225,6 +225,8 @@ func TestCoreWithSealAndUINoCleanup(t testing.T, opts *CoreConfig) *Core {
|
||||||
conf.AuditBackends[k] = v
|
conf.AuditBackends[k] = v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
conf.ActivityLogConfig = opts.ActivityLogConfig
|
||||||
|
|
||||||
c, err := NewCore(conf)
|
c, err := NewCore(conf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
|
@ -363,16 +365,21 @@ func TestCoreUnsealed(t testing.T) (*Core, [][]byte, string) {
|
||||||
return testCoreUnsealed(t, core)
|
return testCoreUnsealed(t, core)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SetupMetrics(conf *CoreConfig) *metrics.InmemSink {
|
||||||
|
inmemSink := metrics.NewInmemSink(1000000*time.Hour, 2000000*time.Hour)
|
||||||
|
conf.MetricSink = metricsutil.NewClusterMetricSink("test-cluster", inmemSink)
|
||||||
|
conf.MetricsHelper = metricsutil.NewMetricsHelper(inmemSink, false)
|
||||||
|
return inmemSink
|
||||||
|
}
|
||||||
|
|
||||||
func TestCoreUnsealedWithMetrics(t testing.T) (*Core, [][]byte, string, *metrics.InmemSink) {
|
func TestCoreUnsealedWithMetrics(t testing.T) (*Core, [][]byte, string, *metrics.InmemSink) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
inmemSink := metrics.NewInmemSink(1000000*time.Hour, 2000000*time.Hour)
|
|
||||||
conf := &CoreConfig{
|
conf := &CoreConfig{
|
||||||
BuiltinRegistry: NewMockBuiltinRegistry(),
|
BuiltinRegistry: NewMockBuiltinRegistry(),
|
||||||
MetricSink: metricsutil.NewClusterMetricSink("test-cluster", inmemSink),
|
|
||||||
MetricsHelper: metricsutil.NewMetricsHelper(inmemSink, false),
|
|
||||||
}
|
}
|
||||||
|
sink := SetupMetrics(conf)
|
||||||
core, keys, root := testCoreUnsealed(t, TestCoreWithSealAndUI(t, conf))
|
core, keys, root := testCoreUnsealed(t, TestCoreWithSealAndUI(t, conf))
|
||||||
return core, keys, root, inmemSink
|
return core, keys, root, sink
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestCoreUnsealedRaw returns a pure in-memory core that is already
|
// TestCoreUnsealedRaw returns a pure in-memory core that is already
|
||||||
|
|
Loading…
Reference in New Issue