From dffd85e09a27deefc5259be3303242418f15a784 Mon Sep 17 00:00:00 2001 From: swayne275 Date: Fri, 30 Oct 2020 18:11:12 -0600 Subject: [PATCH] Backport invalidation changes (#10292) * merge activity log invalidation work from vault-enterprise PR 1546 * skip failing test due to enabled config on oss Co-authored-by: Mark Gritter --- helper/timeutil/timeutil.go | 9 +- helper/timeutil/timeutil_test.go | 27 ++++ vault/activity_log.go | 173 +++++++++++++++------ vault/activity_log_test.go | 251 ++++++++++++++++++++----------- 4 files changed, 331 insertions(+), 129 deletions(-) diff --git a/helper/timeutil/timeutil.go b/helper/timeutil/timeutil.go index 284f6262e..da3fc441e 100644 --- a/helper/timeutil/timeutil.go +++ b/helper/timeutil/timeutil.go @@ -8,6 +8,11 @@ import ( "time" ) +func StartOfPreviousMonth(t time.Time) time.Time { + year, month, _ := t.Date() + return time.Date(year, month, 1, 0, 0, 0, 0, t.Location()).AddDate(0, -1, 0) +} + func StartOfMonth(t time.Time) time.Time { year, month, _ := t.Date() return time.Date(year, month, 1, 0, 0, 0, 0, t.Location()) @@ -99,8 +104,8 @@ func InRange(t, start, end time.Time) bool { (t.Equal(end) || t.Before(end)) } -// Used when a storage path has the form /, -// where timestamp is a Unix timestamp. +// ParseTimeFromPath returns a UTC time from a path of the form '/', +// where is a Unix timestamp func ParseTimeFromPath(path string) (time.Time, error) { elems := strings.Split(path, "/") if len(elems) == 1 { diff --git a/helper/timeutil/timeutil_test.go b/helper/timeutil/timeutil_test.go index 6925a204b..5cef2d206 100644 --- a/helper/timeutil/timeutil_test.go +++ b/helper/timeutil/timeutil_test.go @@ -6,6 +6,33 @@ import ( "time" ) +func TestTimeutil_StartOfPreviousMonth(t *testing.T) { + testCases := []struct { + Input time.Time + Expected time.Time + }{ + { + Input: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + Expected: time.Date(2019, 12, 1, 0, 0, 0, 0, time.UTC), + }, + { + Input: time.Date(2020, 1, 15, 0, 0, 0, 0, time.UTC), + Expected: time.Date(2019, 12, 1, 0, 0, 0, 0, time.UTC), + }, + { + Input: time.Date(2020, 3, 31, 23, 59, 59, 999999999, time.UTC), + Expected: time.Date(2020, 2, 1, 0, 0, 0, 0, time.UTC), + }, + } + + for _, tc := range testCases { + result := StartOfPreviousMonth(tc.Input) + if !result.Equal(tc.Expected) { + t.Errorf("start of month before %v is %v, got %v", tc.Input, tc.Expected, result) + } + } +} + func TestTimeutil_StartOfMonth(t *testing.T) { testCases := []struct { Input time.Time diff --git a/vault/activity_log.go b/vault/activity_log.go index 8a989b96b..a84785b0c 100644 --- a/vault/activity_log.go +++ b/vault/activity_log.go @@ -157,12 +157,6 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me return nil, err } - emptyEntityActivityLog := &activity.EntityActivityLog{ - Entities: make([]*activity.EntityRecord, 0), - } - emptyTokenCount := &activity.TokenCount{ - CountByNamespaceID: make(map[string]uint64), - } a := &ActivityLog{ core: core, configOverrides: &core.activityLogConfig, @@ -176,9 +170,13 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me doneCh: make(chan struct{}, 1), activeEntities: make(map[string]struct{}), currentSegment: segmentInfo{ - startTimestamp: 0, - currentEntities: emptyEntityActivityLog, - tokenCount: emptyTokenCount, + startTimestamp: 0, + currentEntities: &activity.EntityActivityLog{ + Entities: make([]*activity.EntityRecord, 0), + }, + tokenCount: &activity.TokenCount{ + CountByNamespaceID: make(map[string]uint64), + }, entitySequenceNumber: 0, }, standbyFragmentsReceived: make([]*activity.LogFragment, 0), @@ -199,8 +197,24 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me return a, nil } +// Return the in-memory activeEntities from an activity log +func (c *Core) GetActiveEntities() map[string]struct{} { + out := make(map[string]struct{}) + + c.stateLock.RLock() + c.activityLog.fragmentLock.RLock() + for k, v := range c.activityLog.activeEntities { + out[k] = v + } + c.activityLog.fragmentLock.RUnlock() + c.stateLock.RUnlock() + + return out +} + // saveCurrentSegmentToStorage updates the record of Entities or // Non Entity Tokens in persistent storage +// :force: forces a save of tokens/entities even if the in-memory log is empty func (a *ActivityLog) saveCurrentSegmentToStorage(ctx context.Context, force bool) error { // Prevent simultaneous changes to segment a.l.Lock() @@ -209,6 +223,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorage(ctx context.Context, force boo } // Must be called with l held. +// :force: forces a save of tokens/entities even if the in-memory log is empty func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, force bool) error { defer a.metrics.MeasureSinceWithLabels([]string{"core", "activity", "segment_write"}, time.Now(), []metricsutil.Label{}) @@ -319,13 +334,12 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for return nil } +// :force: forces a save of tokens/entities even if the in-memory log is empty func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error { entityPath := fmt.Sprintf("log/entity/%d/%d", a.currentSegment.startTimestamp, a.currentSegment.entitySequenceNumber) // RFC (VLT-120) defines this as 1-indexed, but it should be 0-indexed tokenPath := fmt.Sprintf("log/directtokens/%d/0", a.currentSegment.startTimestamp) - // TODO: have a member function on segmentInfo struct to do the below two - // blocks if len(a.currentSegment.currentEntities.Entities) > 0 || force { entities, err := proto.Marshal(a.currentSegment.currentEntities) if err != nil { @@ -374,7 +388,8 @@ func parseSegmentNumberFromPath(path string) (int, bool) { return segmentNum, true } -// availableLogs returns the start_time(s) associated with months for which logs exist, sorted last to first +// availableLogs returns the start_time(s) (in UTC) associated with months for which logs exist, +// sorted last to first func (a *ActivityLog) availableLogs(ctx context.Context) ([]time.Time, error) { paths := make([]string, 0) for _, basePath := range []string{activityEntityBasePath, activityTokenBasePath} { @@ -411,6 +426,8 @@ func (a *ActivityLog) availableLogs(ctx context.Context) ([]time.Time, error) { return out, nil } +// getMostRecentActivityLogSegment gets the times (in UTC) associated with the most recent +// contiguous set of activity logs, sorted in decreasing order (latest to earliest) func (a *ActivityLog) getMostRecentActivityLogSegment(ctx context.Context) ([]time.Time, error) { logTimes, err := a.availableLogs(ctx) if err != nil { @@ -536,6 +553,7 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time // loadCurrentEntitySegment loads the most recent segment (for "this month") into memory // (to append new entries), and to the activeEntities to avoid duplication +// call with fragmentLock and l held func (a *ActivityLog) loadCurrentEntitySegment(ctx context.Context, startTime time.Time, sequenceNum uint64) error { path := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10) data, err := a.view.Get(ctx, path) @@ -653,19 +671,26 @@ func (a *ActivityLog) newMonthCurrentLogLocked(currentTime time.Time) { a.currentSegment.startTimestamp = monthStart.Unix() } +// Initialize a new current segment, based on the given time +// should be called with fragmentLock and l held. +func (a *ActivityLog) newSegmentAtGivenTime(t time.Time) { + timestamp := t.Unix() + + a.logger.Trace("starting a segment", "timestamp", timestamp) + a.resetCurrentLog() + a.currentSegment.startTimestamp = timestamp +} + // Reset all the current segment state. // Should be called with fragmentLock and l held. func (a *ActivityLog) resetCurrentLog() { - emptyEntityActivityLog := &activity.EntityActivityLog{ + a.currentSegment.startTimestamp = 0 + a.currentSegment.currentEntities = &activity.EntityActivityLog{ Entities: make([]*activity.EntityRecord, 0), } - emptyTokenCount := &activity.TokenCount{ + a.currentSegment.tokenCount = &activity.TokenCount{ CountByNamespaceID: make(map[string]uint64), } - - a.currentSegment.startTimestamp = 0 - a.currentSegment.currentEntities = emptyEntityActivityLog - a.currentSegment.tokenCount = emptyTokenCount a.currentSegment.entitySequenceNumber = 0 a.fragment = nil @@ -707,43 +732,73 @@ func (a *ActivityLog) deleteLogWorker(startTimestamp int64, whenDone chan struct close(whenDone) } -// refreshFromStoredLog loads entity segments and token counts into memory (for "this month" only) -// this will synchronously load the most recent entity segment (and the token counts) into memory, -// and then kick off a background task to load the rest of the segments -// -// This method is called during init so we don't acquire the normally-required locks in it. +// refreshFromStoredLog loads the appropriate entities/tokencounts for active and performance standbys +// the most recent segment is loaded synchronously, and older segments are loaded in the background +// this function expects stateLock to be held func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGroup) error { + a.l.Lock() + defer a.l.Unlock() + a.fragmentLock.Lock() + defer a.fragmentLock.Unlock() + decreasingLogTimes, err := a.getMostRecentActivityLogSegment(ctx) if err != nil { return err } + if len(decreasingLogTimes) == 0 { - // If no logs exist, and we are enabled, then - // start with the current timestamp if a.enabled { + if a.core.perfStandby { + // reset the log without updating the timestamp + a.resetCurrentLog() + } else { + a.startNewCurrentLogLocked() + } + } + + return nil + } + + mostRecent := decreasingLogTimes[0] + + if !a.enabled { + a.logger.Debug("activity log not enabled, skipping refresh from storage") + if !a.core.perfStandby && timeutil.IsCurrentMonth(mostRecent, time.Now().UTC()) { + a.logger.Debug("activity log is disabled, cleaning up logs for the current month") + go a.deleteLogWorker(mostRecent.Unix(), make(chan struct{})) + } + + return nil + } + + now := time.Now().UTC() + if timeutil.IsPreviousMonth(mostRecent, now) { + // no activity logs to load for this month. if we are enabled, interpret + // it as having missed the rotation, so let it fall through and load + // if we missed generating the precomputed query, activeFragmentWorker() + // will clean things up when it runs next + + a.logger.Trace("no log segments for current month", "mostRecent", mostRecent) + a.logger.Info("rotating activity log to new month") + } else if mostRecent.After(now) { + // we can't do anything if the most recent log is in the future + a.logger.Warn("timestamp from log to load is in the future", "timestamp", mostRecent) + return nil + } else if !timeutil.IsCurrentMonth(mostRecent, now) { + // the most recent log in storage is 2+ months in the past + + a.logger.Warn("most recent log in storage is 2 or more months in the past.", "timestamp", mostRecent) + if a.core.perfStandby { + // reset the log without updating the timestamp + a.resetCurrentLog() + } else { a.startNewCurrentLogLocked() } - return nil - } - mostRecent := decreasingLogTimes[0] - if !timeutil.IsCurrentMonth(mostRecent, time.Now().UTC()) { - // no activity logs to load for this month - // If we are enabled, interpret it as having missed - // the rotation. - if a.enabled { - a.logger.Trace("no log segments for current month", "mostRecent", mostRecent) - a.logger.Info("rotating activity log to new month") - a.newMonthCurrentLogLocked(time.Now().UTC()) - } - return nil - } - if !a.enabled { - a.logger.Warn("activity log exists but is disabled, cleaning up") - go a.deleteLogWorker(mostRecent.Unix(), make(chan struct{})) return nil } + // load token counts from storage into memory if !a.core.perfStandby { err = a.loadTokenCount(ctx, mostRecent) if err != nil { @@ -751,6 +806,7 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro } } + // load entity logs from storage into memory lastSegment, segmentsExist, err := a.getLastEntitySegmentNumber(ctx, mostRecent) if err != nil { return err @@ -804,6 +860,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) { // enabled is protected by fragmentLock a.fragmentLock.Lock() + originalEnabled := a.enabled switch config.Enabled { case "enable": a.enabled = true @@ -813,6 +870,10 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) { a.enabled = false } + if a.enabled != originalEnabled { + a.logger.Info("activity log enable changed", "original", originalEnabled, "current", a.enabled) + } + if !a.enabled && a.currentSegment.startTimestamp != 0 { a.logger.Trace("deleting current segment") a.deleteDone = make(chan struct{}) @@ -848,6 +909,30 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) { go a.retentionWorker(time.Now(), a.retentionMonths) } +// update the enable flag and reset the current log +func (a *ActivityLog) SetConfigStandby(ctx context.Context, config activityConfig) { + a.l.Lock() + defer a.l.Unlock() + + // enable is protected by fragmentLock + a.fragmentLock.Lock() + originalEnabled := a.enabled + switch config.Enabled { + case "enable": + a.enabled = true + case "default": + a.enabled = activityLogEnabledDefault + case "disable": + a.enabled = false + } + + if a.enabled != originalEnabled { + a.logger.Info("activity log enable changed", "original", originalEnabled, "current", a.enabled) + a.resetCurrentLog() + } + a.fragmentLock.Unlock() +} + func (a *ActivityLog) queriesAvailable(ctx context.Context) (bool, error) { if a.queryStore == nil { return false, nil @@ -1111,6 +1196,8 @@ func (a *ActivityLog) HandleEndOfMonth(currentTime time.Time) error { a.fragmentLock.RLock() // Don't bother if disabled + // since l is locked earlier (and SetConfig() is the only way enabled can change) + // we don't need to worry about enabled changing during this work enabled := a.enabled a.fragmentLock.RUnlock() if !enabled { @@ -1149,7 +1236,7 @@ func (a *ActivityLog) HandleEndOfMonth(currentTime time.Time) error { // Advance the log; no need to force a save here because we have // the intent log written already. // - // On recovery refreshFromStoredLock() will see we're no longer + // On recovery refreshFromStoredLog() will see we're no longer // in the previous month, and recover by calling newMonthCurrentLog // again and triggering the precomputed query. a.fragmentLock.Lock() diff --git a/vault/activity_log_test.go b/vault/activity_log_test.go index 06df4df7c..dcf231d55 100644 --- a/vault/activity_log_test.go +++ b/vault/activity_log_test.go @@ -22,7 +22,9 @@ import ( ) const ( - logPrefix = "sys/counters/activity/log/" + logPrefix = "sys/counters/activity/log/" + activityPrefix = "sys/counters/activity/" + activityConfigPath = "sys/counters/activity/config" ) func TestActivityLog_Creation(t *testing.T) { @@ -98,11 +100,12 @@ func TestActivityLog_Creation(t *testing.T) { func checkExpectedEntitiesInMap(t *testing.T, a *ActivityLog, entityIDs []string) { t.Helper() - if len(a.activeEntities) != len(entityIDs) { - t.Fatalf("mismatched number of entities, expected %v got %v", len(entityIDs), a.activeEntities) + activeEntities := a.core.GetActiveEntities() + if len(activeEntities) != len(entityIDs) { + t.Fatalf("mismatched number of entities, expected %v got %v", len(entityIDs), activeEntities) } for _, e := range entityIDs { - if _, present := a.activeEntities[e]; !present { + if _, present := activeEntities[e]; !present { t.Errorf("entity ID %q is missing", e) } } @@ -213,8 +216,6 @@ func expectedEntityIDs(t *testing.T, out *activity.EntityActivityLog, ids []stri } } -// TODO setup predicate for what we expect (both positive and negative case) for testing -// factor things out into predicates and actions so test body is compact. func TestActivityLog_SaveTokensToStorage(t *testing.T) { core, _, _ := TestCoreUnsealed(t) a := core.activityLog @@ -964,6 +965,10 @@ func activeEntitiesEqual(t *testing.T, active map[string]struct{}, test []*activ func (a *ActivityLog) resetEntitiesInMemory(t *testing.T) { t.Helper() + a.l.Lock() + defer a.l.Unlock() + a.fragmentLock.Lock() + defer a.fragmentLock.Unlock() a.currentSegment = segmentInfo{ startTimestamp: time.Time{}.Unix(), currentEntities: &activity.EntityActivityLog{ @@ -1069,8 +1074,9 @@ func TestActivityLog_loadCurrentEntitySegment(t *testing.T) { t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Entities, a.currentSegment.currentEntities, tc.path) } - if !activeEntitiesEqual(t, a.activeEntities, tc.entities.Entities) { - t.Errorf("bad data loaded into active entites. expected only set of EntityID from %v in %v for path %q", tc.entities.Entities, a.activeEntities, tc.path) + activeEntities := core.GetActiveEntities() + if !activeEntitiesEqual(t, activeEntities, tc.entities.Entities) { + t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v for path %q", tc.entities.Entities, activeEntities, tc.path) } a.resetEntitiesInMemory(t) @@ -1148,8 +1154,12 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) { ctx := context.Background() for _, tc := range testCases { if tc.refresh { + a.l.Lock() + a.fragmentLock.Lock() a.activeEntities = make(map[string]struct{}) a.currentSegment.startTimestamp = tc.time + a.fragmentLock.Unlock() + a.l.Unlock() } err := a.loadPriorEntitySegment(ctx, time.Unix(tc.time, 0), tc.seqNum) @@ -1157,8 +1167,9 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) { t.Fatalf("got error loading data for %q: %v", tc.path, err) } - if !activeEntitiesEqual(t, a.activeEntities, tc.entities.Entities) { - t.Errorf("bad data loaded into active entites. expected only set of EntityID from %v in %v for path %q", tc.entities.Entities, a.activeEntities, tc.path) + activeEntities := core.GetActiveEntities() + if !activeEntitiesEqual(t, activeEntities, tc.entities.Entities) { + t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v for path %q", tc.entities.Entities, activeEntities, tc.path) } } } @@ -1258,13 +1269,15 @@ func TestActivityLog_StopAndRestart(t *testing.T) { } -func setupActivityRecordsInStorage(t *testing.T, includeEntities, includeTokens bool) (*ActivityLog, []*activity.EntityRecord, map[string]uint64) { +// :base: is the timestamp to start from for the setup logic (use to simulate newest log from past or future) +// entity records returned include [0] data from a previous month and [1:] data from the current month +// token counts returned are from the current month +func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities, includeTokens bool) (*ActivityLog, []*activity.EntityRecord, map[string]uint64) { t.Helper() core, _, _ := TestCoreUnsealed(t) a := core.activityLog - now := time.Now().UTC() - monthsAgo := now.AddDate(0, -3, 0) + monthsAgo := base.AddDate(0, -3, 0) var entityRecords []*activity.EntityRecord if includeEntities { @@ -1309,8 +1322,8 @@ func setupActivityRecordsInStorage(t *testing.T, includeEntities, includeTokens } writeToStorage(t, core, logPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData1) - writeToStorage(t, core, logPrefix+"entity/"+fmt.Sprint(now.Unix())+"/0", entityData2) - writeToStorage(t, core, logPrefix+"entity/"+fmt.Sprint(now.Unix())+"/1", entityData3) + writeToStorage(t, core, logPrefix+"entity/"+fmt.Sprint(base.Unix())+"/0", entityData2) + writeToStorage(t, core, logPrefix+"entity/"+fmt.Sprint(base.Unix())+"/1", entityData3) } var tokenRecords map[string]uint64 @@ -1329,14 +1342,14 @@ func setupActivityRecordsInStorage(t *testing.T, includeEntities, includeTokens t.Fatalf(err.Error()) } - writeToStorage(t, core, logPrefix+"directtokens/"+fmt.Sprint(now.Unix())+"/0", tokenData) + writeToStorage(t, core, logPrefix+"directtokens/"+fmt.Sprint(base.Unix())+"/0", tokenData) } return a, entityRecords, tokenRecords } func TestActivityLog_refreshFromStoredLog(t *testing.T) { - a, expectedEntityRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, true, true) + a, expectedEntityRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true) a.enabled = true var wg sync.WaitGroup @@ -1361,46 +1374,15 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) { t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, a.currentSegment.tokenCount.CountByNamespaceID) } - if !activeEntitiesEqual(t, a.activeEntities, expectedActive.Entities) { + activeEntities := a.core.GetActiveEntities() + if !activeEntitiesEqual(t, activeEntities, expectedActive.Entities) { // we expect activeEntities to be loaded for the entire month - t.Errorf("bad data loaded into active entites. expected only set of EntityID from %v in %v", expectedActive.Entities, a.activeEntities) - } -} - -func TestActivityLog_refreshFromStoredLogOnStandby(t *testing.T) { - a, expectedEntityRecords, _ := setupActivityRecordsInStorage(t, true, true) - a.enabled = true - a.core.perfStandby = true - - var wg sync.WaitGroup - err := a.refreshFromStoredLog(context.Background(), &wg) - if err != nil { - t.Fatalf("got error loading stored activity logs: %v", err) - } - wg.Wait() - - expectedActive := &activity.EntityActivityLog{ - Entities: expectedEntityRecords[1:], - } - if !activeEntitiesEqual(t, a.activeEntities, expectedActive.Entities) { - // we expect activeEntities to be loaded for the entire month - t.Errorf("bad data loaded into active entites. expected only set of EntityID from %v in %v", expectedActive.Entities, a.activeEntities) - } - - // we expect nothing to be loaded to a.currentSegment (other than startTimestamp for end of month checking) - if len(a.currentSegment.currentEntities.Entities) > 0 { - t.Errorf("currentSegment entities should not be populated. got: %v", a.currentSegment.currentEntities) - } - if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 { - t.Errorf("currentSegment token counts should not be populated. got: %v", a.currentSegment.tokenCount.CountByNamespaceID) - } - if a.currentSegment.entitySequenceNumber != 0 { - t.Errorf("currentSegment sequence number should be 0. got: %v", a.currentSegment.entitySequenceNumber) + t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v", expectedActive.Entities, activeEntities) } } func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testing.T) { - a, expectedEntityRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, true, true) + a, expectedEntityRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true) a.enabled = true var wg sync.WaitGroup @@ -1424,14 +1406,15 @@ func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testi t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, a.currentSegment.tokenCount.CountByNamespaceID) } - if !activeEntitiesEqual(t, a.activeEntities, expected.Entities) { + activeEntities := a.core.GetActiveEntities() + if !activeEntitiesEqual(t, activeEntities, expected.Entities) { // we only expect activeEntities to be loaded for the newest segment (for the current month) - t.Errorf("bad data loaded into active entites. expected only set of EntityID from %v in %v", expected.Entities, a.activeEntities) + t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v", expected.Entities, activeEntities) } } func TestActivityLog_refreshFromStoredLogContextCancelled(t *testing.T) { - a, _, _ := setupActivityRecordsInStorage(t, true, true) + a, _, _ := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true) var wg sync.WaitGroup ctx, cancelFn := context.WithCancel(context.Background()) @@ -1444,7 +1427,7 @@ func TestActivityLog_refreshFromStoredLogContextCancelled(t *testing.T) { } func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) { - a, expectedEntityRecords, _ := setupActivityRecordsInStorage(t, true, false) + a, expectedEntityRecords, _ := setupActivityRecordsInStorage(t, time.Now().UTC(), true, false) a.enabled = true var wg sync.WaitGroup @@ -1464,8 +1447,9 @@ func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) { // we expect all segments for the current month to be loaded t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, a.currentSegment.currentEntities) } - if !activeEntitiesEqual(t, a.activeEntities, expectedActive.Entities) { - t.Errorf("bad data loaded into active entites. expected only set of EntityID from %v in %v", expectedActive.Entities, a.activeEntities) + activeEntities := a.core.GetActiveEntities() + if !activeEntitiesEqual(t, activeEntities, expectedActive.Entities) { + t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v", expectedActive.Entities, activeEntities) } // we expect no tokens @@ -1475,7 +1459,7 @@ func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) { } func TestActivityLog_refreshFromStoredLogNoEntities(t *testing.T) { - a, _, expectedTokenCounts := setupActivityRecordsInStorage(t, false, true) + a, _, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), false, true) a.enabled = true var wg sync.WaitGroup @@ -1493,21 +1477,37 @@ func TestActivityLog_refreshFromStoredLogNoEntities(t *testing.T) { if len(a.currentSegment.currentEntities.Entities) > 0 { t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentSegment.currentEntities) } - if len(a.activeEntities) > 0 { - t.Errorf("expected no active entity segment to be loaded. got: %v", a.activeEntities) + activeEntities := a.core.GetActiveEntities() + if len(activeEntities) > 0 { + t.Errorf("expected no active entity segment to be loaded. got: %v", activeEntities) } } -func TestActivityLog_refreshFromStoredLogNoData(t *testing.T) { - a, _, _ := setupActivityRecordsInStorage(t, false, false) - a.enabled = true +// verify current segment refreshed with non-nil empty components and the :expectedStart: timestamp +// note: if :verifyTimeNotZero: is true, ignore :expectedStart: and just make sure the timestamp +// isn't 0 +func expectCurrentSegmentRefreshed(t *testing.T, a *ActivityLog, expectedStart int64, verifyTimeNotZero bool) { + t.Helper() - var wg sync.WaitGroup - err := a.refreshFromStoredLog(context.Background(), &wg) - if err != nil { - t.Fatalf("got error loading stored activity logs: %v", err) + a.l.RLock() + defer a.l.RUnlock() + a.fragmentLock.RLock() + defer a.fragmentLock.RUnlock() + if a.currentSegment.currentEntities == nil { + t.Fatalf("expected non-nil currentSegment.currentEntities") + } + if a.currentSegment.currentEntities.Entities == nil { + t.Errorf("expected non-nil currentSegment.currentEntities.Entities") + } + if a.activeEntities == nil { + t.Errorf("expected non-nil activeEntities") + } + if a.currentSegment.tokenCount == nil { + t.Fatalf("expected non-nil currentSegment.tokenCount") + } + if a.currentSegment.tokenCount.CountByNamespaceID == nil { + t.Errorf("expected non-nil currentSegment.tokenCount.CountByNamespaceID") } - wg.Wait() if len(a.currentSegment.currentEntities.Entities) > 0 { t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentSegment.currentEntities) @@ -1518,6 +1518,103 @@ func TestActivityLog_refreshFromStoredLogNoData(t *testing.T) { if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 { t.Errorf("expected no token counts to be loaded. got: %v", a.currentSegment.tokenCount.CountByNamespaceID) } + + if verifyTimeNotZero { + if a.currentSegment.startTimestamp == 0 { + t.Error("bad start timestamp. expected no reset but timestamp was reset") + } + } else if a.currentSegment.startTimestamp != expectedStart { + t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentSegment.startTimestamp) + } +} + +func TestActivityLog_refreshFromStoredLogNoData(t *testing.T) { + now := time.Now().UTC() + a, _, _ := setupActivityRecordsInStorage(t, now, false, false) + a.enabled = true + + var wg sync.WaitGroup + err := a.refreshFromStoredLog(context.Background(), &wg) + if err != nil { + t.Fatalf("got error loading stored activity logs: %v", err) + } + wg.Wait() + + expectCurrentSegmentRefreshed(t, a, now.Unix(), false) +} + +func TestActivityLog_refreshFromStoredLogTwoMonthsPrevious(t *testing.T) { + // test what happens when the most recent data is from month M-2 (or earlier - same effect) + now := time.Now().UTC() + twoMonthsAgoStart := timeutil.StartOfPreviousMonth(timeutil.StartOfPreviousMonth(now)) + a, _, _ := setupActivityRecordsInStorage(t, twoMonthsAgoStart, true, true) + a.enabled = true + + var wg sync.WaitGroup + err := a.refreshFromStoredLog(context.Background(), &wg) + if err != nil { + t.Fatalf("got error loading stored activity logs: %v", err) + } + wg.Wait() + + expectCurrentSegmentRefreshed(t, a, now.Unix(), false) +} + +func TestActivityLog_refreshFromStoredLogPreviousMonth(t *testing.T) { + // test what happens when most recent data is from month M-1 + // we expect to load the data from the previous month so that the activeFragmentWorker + // can handle end of month rotations + monthStart := timeutil.StartOfMonth(time.Now().UTC()) + oneMonthAgoStart := timeutil.StartOfPreviousMonth(monthStart) + a, expectedEntityRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, oneMonthAgoStart, true, true) + a.enabled = true + + var wg sync.WaitGroup + err := a.refreshFromStoredLog(context.Background(), &wg) + if err != nil { + t.Fatalf("got error loading stored activity logs: %v", err) + } + wg.Wait() + + expectedActive := &activity.EntityActivityLog{ + Entities: expectedEntityRecords[1:], + } + expectedCurrent := &activity.EntityActivityLog{ + Entities: expectedEntityRecords[2:], + } + if !entityRecordsEqual(t, a.currentSegment.currentEntities.Entities, expectedCurrent.Entities) { + // we only expect the newest entity segment to be loaded (for the current month) + t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, a.currentSegment.currentEntities) + } + if !reflect.DeepEqual(a.currentSegment.tokenCount.CountByNamespaceID, expectedTokenCounts) { + // we expect all token counts to be loaded + t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, a.currentSegment.tokenCount.CountByNamespaceID) + } + + activeEntities := a.core.GetActiveEntities() + if !activeEntitiesEqual(t, activeEntities, expectedActive.Entities) { + // we expect activeEntities to be loaded for the entire month + t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v", expectedActive.Entities, activeEntities) + } +} + +func TestActivityLog_refreshFromStoredLogNextMonth(t *testing.T) { + t.Skip("works on enterprise, fails on oss (oss boots with activity log disabled)") + + // test what happens when most recent data is from month M+1 + nextMonthStart := timeutil.StartOfNextMonth(time.Now().UTC()) + a, _, _ := setupActivityRecordsInStorage(t, nextMonthStart, true, true) + a.enabled = true + + var wg sync.WaitGroup + err := a.refreshFromStoredLog(context.Background(), &wg) + if err != nil { + t.Fatalf("got error loading stored activity logs: %v", err) + } + wg.Wait() + + // we can't know exactly what the timestamp should be set to, just that it shouldn't be zero + expectCurrentSegmentRefreshed(t, a, time.Now().Unix(), true) } func TestActivityLog_IncludeNamespace(t *testing.T) { @@ -1685,21 +1782,7 @@ func TestActivityLog_EnableDisable(t *testing.T) { } expectMissingSegment(t, core, path) - if a.currentSegment.startTimestamp != 0 { - t.Errorf("bad startTimestamp, expected 0 got %v", a.currentSegment.startTimestamp) - } - if len(a.currentSegment.currentEntities.Entities) != 0 { - t.Errorf("expected empty currentEntities, got %v", a.currentSegment.currentEntities.Entities) - } - if len(a.currentSegment.tokenCount.CountByNamespaceID) != 0 { - t.Errorf("expected empty tokens, got %v", a.currentSegment.tokenCount.CountByNamespaceID) - } - if len(a.activeEntities) != 0 { - t.Errorf("expected empty activeEntities, got %v", a.activeEntities) - } - if a.fragment != nil { - t.Errorf("expected nil fragment") - } + expectCurrentSegmentRefreshed(t, a, 0, false) // enable (if not already) which force-writes an empty segment enableRequest()