diff --git a/changelog/17028.txt b/changelog/17028.txt new file mode 100644 index 000000000..fd4944044 --- /dev/null +++ b/changelog/17028.txt @@ -0,0 +1,3 @@ +```release-note:improvement +core: Activity log goroutine management improvements to allow tests to be more deterministic. +``` \ No newline at end of file diff --git a/vault/activity_log.go b/vault/activity_log.go index e6c9c42df..865fd24be 100644 --- a/vault/activity_log.go +++ b/vault/activity_log.go @@ -161,7 +161,8 @@ type ActivityLog struct { // channel closed when deletion at startup is done // (for unit test robustness) - retentionDone chan struct{} + retentionDone chan struct{} + computationWorkerDone chan struct{} // for testing: is config currently being invalidated. protected by l configInvalidationInProgress bool @@ -1062,15 +1063,19 @@ func (c *Core) setupActivityLog(ctx context.Context, wg *sync.WaitGroup) error { go manager.activeFragmentWorker(ctx) // Check for any intent log, in the background - go manager.precomputedQueryWorker(ctx) + manager.computationWorkerDone = make(chan struct{}) + go func() { + manager.precomputedQueryWorker(ctx) + close(manager.computationWorkerDone) + }() // Catch up on garbage collection // Signal when this is done so that unit tests can proceed. manager.retentionDone = make(chan struct{}) - go func() { - manager.retentionWorker(ctx, time.Now(), manager.retentionMonths) + go func(months int) { + manager.retentionWorker(ctx, time.Now(), months) close(manager.retentionDone) - }() + }(manager.retentionMonths) } return nil @@ -1198,6 +1203,11 @@ func (a *ActivityLog) activeFragmentWorker(ctx context.Context) { endOfMonth.Stop() } + endOfMonthChannel := endOfMonth.C + if a.core.activityLogConfig.DisableTimers { + endOfMonthChannel = nil + } + writeFunc := func() { ctx, cancel := context.WithTimeout(ctx, activitySegmentWriteTimeout) defer cancel() @@ -1212,6 +1222,7 @@ func (a *ActivityLog) activeFragmentWorker(ctx context.Context) { a.l.RLock() doneCh := a.doneCh a.l.RUnlock() + for { select { case <-doneCh: @@ -1241,7 +1252,7 @@ func (a *ActivityLog) activeFragmentWorker(ctx context.Context) { // Simpler, but ticker.Reset was introduced in go 1.15: // ticker.Reset(activitySegmentInterval) - case currentTime := <-endOfMonth.C: + case currentTime := <-endOfMonthChannel: err := a.HandleEndOfMonth(ctx, currentTime.UTC()) if err != nil { a.logger.Error("failed to perform end of month rotation", "error", err) @@ -2165,6 +2176,10 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error { // We expect the return value won't be checked, so log errors as they occur // (but for unit testing having the error return should help.) func (a *ActivityLog) retentionWorker(ctx context.Context, currentTime time.Time, retentionMonths int) error { + if a.core.activityLogConfig.DisableTimers { + return nil + } + ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/vault/activity_log_test.go b/vault/activity_log_test.go index dd25bacf0..40373a131 100644 --- a/vault/activity_log_test.go +++ b/vault/activity_log_test.go @@ -1787,7 +1787,12 @@ func TestActivityLog_Export(t *testing.T) { october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC)) november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC)) - core, _, _, _ := TestCoreUnsealedWithMetrics(t) + core, _, _ := TestCoreUnsealedWithConfig(t, &CoreConfig{ + ActivityLogConfig: ActivityLogCoreConfig{ + DisableTimers: true, + ForceEnable: true, + }, + }) a := core.activityLog ctx := namespace.RootContext(nil) @@ -2278,8 +2283,16 @@ func TestActivityLog_CalculatePrecomputedQueriesWithMixedTWEs(t *testing.T) { october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC)) november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC)) - core, _, _, sink := TestCoreUnsealedWithMetrics(t) + conf := &CoreConfig{ + ActivityLogConfig: ActivityLogCoreConfig{ + ForceEnable: true, + DisableTimers: true, + }, + } + sink := SetupMetrics(conf) + core, _, _ := TestCoreUnsealedWithConfig(t, conf) a := core.activityLog + <-a.computationWorkerDone ctx := namespace.RootContext(nil) // Generate overlapping sets of entity IDs from this list. @@ -2704,7 +2717,14 @@ func TestActivityLog_Precompute(t *testing.T) { october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC)) november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC)) - core, _, _, sink := TestCoreUnsealedWithMetrics(t) + conf := &CoreConfig{ + ActivityLogConfig: ActivityLogCoreConfig{ + ForceEnable: true, + DisableTimers: true, + }, + } + sink := SetupMetrics(conf) + core, _, _ := TestCoreUnsealedWithConfig(t, conf) a := core.activityLog ctx := namespace.RootContext(nil) @@ -3038,7 +3058,12 @@ func TestActivityLog_Precompute_SkipMonth(t *testing.T) { november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC)) december := timeutil.StartOfMonth(time.Date(2020, 12, 1, 0, 0, 0, 0, time.UTC)) - core, _, _, _ := TestCoreUnsealedWithMetrics(t) + core, _, _ := TestCoreUnsealedWithConfig(t, &CoreConfig{ + ActivityLogConfig: ActivityLogCoreConfig{ + ForceEnable: true, + DisableTimers: true, + }, + }) a := core.activityLog ctx := namespace.RootContext(nil) @@ -3219,7 +3244,14 @@ func TestActivityLog_PrecomputeNonEntityTokensWithID(t *testing.T) { october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC)) november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC)) - core, _, _, sink := TestCoreUnsealedWithMetrics(t) + conf := &CoreConfig{ + ActivityLogConfig: ActivityLogCoreConfig{ + ForceEnable: true, + DisableTimers: true, + }, + } + sink := SetupMetrics(conf) + core, _, _ := TestCoreUnsealedWithConfig(t, conf) a := core.activityLog ctx := namespace.RootContext(nil) diff --git a/vault/testing.go b/vault/testing.go index 8ba5f4930..fea43e699 100644 --- a/vault/testing.go +++ b/vault/testing.go @@ -225,6 +225,8 @@ func TestCoreWithSealAndUINoCleanup(t testing.T, opts *CoreConfig) *Core { conf.AuditBackends[k] = v } + conf.ActivityLogConfig = opts.ActivityLogConfig + c, err := NewCore(conf) if err != nil { t.Fatalf("err: %s", err) @@ -363,16 +365,21 @@ func TestCoreUnsealed(t testing.T) (*Core, [][]byte, string) { return testCoreUnsealed(t, core) } +func SetupMetrics(conf *CoreConfig) *metrics.InmemSink { + inmemSink := metrics.NewInmemSink(1000000*time.Hour, 2000000*time.Hour) + conf.MetricSink = metricsutil.NewClusterMetricSink("test-cluster", inmemSink) + conf.MetricsHelper = metricsutil.NewMetricsHelper(inmemSink, false) + return inmemSink +} + func TestCoreUnsealedWithMetrics(t testing.T) (*Core, [][]byte, string, *metrics.InmemSink) { t.Helper() - inmemSink := metrics.NewInmemSink(1000000*time.Hour, 2000000*time.Hour) conf := &CoreConfig{ BuiltinRegistry: NewMockBuiltinRegistry(), - MetricSink: metricsutil.NewClusterMetricSink("test-cluster", inmemSink), - MetricsHelper: metricsutil.NewMetricsHelper(inmemSink, false), } + sink := SetupMetrics(conf) core, keys, root := testCoreUnsealed(t, TestCoreWithSealAndUI(t, conf)) - return core, keys, root, inmemSink + return core, keys, root, sink } // TestCoreUnsealedRaw returns a pure in-memory core that is already