Backport invalidation changes (#10292)

* merge activity log invalidation work from vault-enterprise PR 1546

* skip failing test due to enabled config on oss

Co-authored-by: Mark Gritter <mgritter@hashicorp.com>
This commit is contained in:
swayne275 2020-10-30 18:11:12 -06:00 committed by GitHub
parent 7f01a58aee
commit dffd85e09a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 331 additions and 129 deletions

View File

@ -8,6 +8,11 @@ import (
"time"
)
func StartOfPreviousMonth(t time.Time) time.Time {
year, month, _ := t.Date()
return time.Date(year, month, 1, 0, 0, 0, 0, t.Location()).AddDate(0, -1, 0)
}
func StartOfMonth(t time.Time) time.Time {
year, month, _ := t.Date()
return time.Date(year, month, 1, 0, 0, 0, 0, t.Location())
@ -99,8 +104,8 @@ func InRange(t, start, end time.Time) bool {
(t.Equal(end) || t.Before(end))
}
// Used when a storage path has the form <timestamp>/,
// where timestamp is a Unix timestamp.
// ParseTimeFromPath returns a UTC time from a path of the form '<timestamp>/',
// where <timestamp> is a Unix timestamp
func ParseTimeFromPath(path string) (time.Time, error) {
elems := strings.Split(path, "/")
if len(elems) == 1 {

View File

@ -6,6 +6,33 @@ import (
"time"
)
func TestTimeutil_StartOfPreviousMonth(t *testing.T) {
testCases := []struct {
Input time.Time
Expected time.Time
}{
{
Input: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC),
Expected: time.Date(2019, 12, 1, 0, 0, 0, 0, time.UTC),
},
{
Input: time.Date(2020, 1, 15, 0, 0, 0, 0, time.UTC),
Expected: time.Date(2019, 12, 1, 0, 0, 0, 0, time.UTC),
},
{
Input: time.Date(2020, 3, 31, 23, 59, 59, 999999999, time.UTC),
Expected: time.Date(2020, 2, 1, 0, 0, 0, 0, time.UTC),
},
}
for _, tc := range testCases {
result := StartOfPreviousMonth(tc.Input)
if !result.Equal(tc.Expected) {
t.Errorf("start of month before %v is %v, got %v", tc.Input, tc.Expected, result)
}
}
}
func TestTimeutil_StartOfMonth(t *testing.T) {
testCases := []struct {
Input time.Time

View File

@ -157,12 +157,6 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
return nil, err
}
emptyEntityActivityLog := &activity.EntityActivityLog{
Entities: make([]*activity.EntityRecord, 0),
}
emptyTokenCount := &activity.TokenCount{
CountByNamespaceID: make(map[string]uint64),
}
a := &ActivityLog{
core: core,
configOverrides: &core.activityLogConfig,
@ -176,9 +170,13 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
doneCh: make(chan struct{}, 1),
activeEntities: make(map[string]struct{}),
currentSegment: segmentInfo{
startTimestamp: 0,
currentEntities: emptyEntityActivityLog,
tokenCount: emptyTokenCount,
startTimestamp: 0,
currentEntities: &activity.EntityActivityLog{
Entities: make([]*activity.EntityRecord, 0),
},
tokenCount: &activity.TokenCount{
CountByNamespaceID: make(map[string]uint64),
},
entitySequenceNumber: 0,
},
standbyFragmentsReceived: make([]*activity.LogFragment, 0),
@ -199,8 +197,24 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
return a, nil
}
// Return the in-memory activeEntities from an activity log
func (c *Core) GetActiveEntities() map[string]struct{} {
out := make(map[string]struct{})
c.stateLock.RLock()
c.activityLog.fragmentLock.RLock()
for k, v := range c.activityLog.activeEntities {
out[k] = v
}
c.activityLog.fragmentLock.RUnlock()
c.stateLock.RUnlock()
return out
}
// saveCurrentSegmentToStorage updates the record of Entities or
// Non Entity Tokens in persistent storage
// :force: forces a save of tokens/entities even if the in-memory log is empty
func (a *ActivityLog) saveCurrentSegmentToStorage(ctx context.Context, force bool) error {
// Prevent simultaneous changes to segment
a.l.Lock()
@ -209,6 +223,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorage(ctx context.Context, force boo
}
// Must be called with l held.
// :force: forces a save of tokens/entities even if the in-memory log is empty
func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, force bool) error {
defer a.metrics.MeasureSinceWithLabels([]string{"core", "activity", "segment_write"},
time.Now(), []metricsutil.Label{})
@ -319,13 +334,12 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
return nil
}
// :force: forces a save of tokens/entities even if the in-memory log is empty
func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error {
entityPath := fmt.Sprintf("log/entity/%d/%d", a.currentSegment.startTimestamp, a.currentSegment.entitySequenceNumber)
// RFC (VLT-120) defines this as 1-indexed, but it should be 0-indexed
tokenPath := fmt.Sprintf("log/directtokens/%d/0", a.currentSegment.startTimestamp)
// TODO: have a member function on segmentInfo struct to do the below two
// blocks
if len(a.currentSegment.currentEntities.Entities) > 0 || force {
entities, err := proto.Marshal(a.currentSegment.currentEntities)
if err != nil {
@ -374,7 +388,8 @@ func parseSegmentNumberFromPath(path string) (int, bool) {
return segmentNum, true
}
// availableLogs returns the start_time(s) associated with months for which logs exist, sorted last to first
// availableLogs returns the start_time(s) (in UTC) associated with months for which logs exist,
// sorted last to first
func (a *ActivityLog) availableLogs(ctx context.Context) ([]time.Time, error) {
paths := make([]string, 0)
for _, basePath := range []string{activityEntityBasePath, activityTokenBasePath} {
@ -411,6 +426,8 @@ func (a *ActivityLog) availableLogs(ctx context.Context) ([]time.Time, error) {
return out, nil
}
// getMostRecentActivityLogSegment gets the times (in UTC) associated with the most recent
// contiguous set of activity logs, sorted in decreasing order (latest to earliest)
func (a *ActivityLog) getMostRecentActivityLogSegment(ctx context.Context) ([]time.Time, error) {
logTimes, err := a.availableLogs(ctx)
if err != nil {
@ -536,6 +553,7 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time
// loadCurrentEntitySegment loads the most recent segment (for "this month") into memory
// (to append new entries), and to the activeEntities to avoid duplication
// call with fragmentLock and l held
func (a *ActivityLog) loadCurrentEntitySegment(ctx context.Context, startTime time.Time, sequenceNum uint64) error {
path := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10)
data, err := a.view.Get(ctx, path)
@ -653,19 +671,26 @@ func (a *ActivityLog) newMonthCurrentLogLocked(currentTime time.Time) {
a.currentSegment.startTimestamp = monthStart.Unix()
}
// Initialize a new current segment, based on the given time
// should be called with fragmentLock and l held.
func (a *ActivityLog) newSegmentAtGivenTime(t time.Time) {
timestamp := t.Unix()
a.logger.Trace("starting a segment", "timestamp", timestamp)
a.resetCurrentLog()
a.currentSegment.startTimestamp = timestamp
}
// Reset all the current segment state.
// Should be called with fragmentLock and l held.
func (a *ActivityLog) resetCurrentLog() {
emptyEntityActivityLog := &activity.EntityActivityLog{
a.currentSegment.startTimestamp = 0
a.currentSegment.currentEntities = &activity.EntityActivityLog{
Entities: make([]*activity.EntityRecord, 0),
}
emptyTokenCount := &activity.TokenCount{
a.currentSegment.tokenCount = &activity.TokenCount{
CountByNamespaceID: make(map[string]uint64),
}
a.currentSegment.startTimestamp = 0
a.currentSegment.currentEntities = emptyEntityActivityLog
a.currentSegment.tokenCount = emptyTokenCount
a.currentSegment.entitySequenceNumber = 0
a.fragment = nil
@ -707,43 +732,73 @@ func (a *ActivityLog) deleteLogWorker(startTimestamp int64, whenDone chan struct
close(whenDone)
}
// refreshFromStoredLog loads entity segments and token counts into memory (for "this month" only)
// this will synchronously load the most recent entity segment (and the token counts) into memory,
// and then kick off a background task to load the rest of the segments
//
// This method is called during init so we don't acquire the normally-required locks in it.
// refreshFromStoredLog loads the appropriate entities/tokencounts for active and performance standbys
// the most recent segment is loaded synchronously, and older segments are loaded in the background
// this function expects stateLock to be held
func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGroup) error {
a.l.Lock()
defer a.l.Unlock()
a.fragmentLock.Lock()
defer a.fragmentLock.Unlock()
decreasingLogTimes, err := a.getMostRecentActivityLogSegment(ctx)
if err != nil {
return err
}
if len(decreasingLogTimes) == 0 {
// If no logs exist, and we are enabled, then
// start with the current timestamp
if a.enabled {
if a.core.perfStandby {
// reset the log without updating the timestamp
a.resetCurrentLog()
} else {
a.startNewCurrentLogLocked()
}
}
return nil
}
mostRecent := decreasingLogTimes[0]
if !a.enabled {
a.logger.Debug("activity log not enabled, skipping refresh from storage")
if !a.core.perfStandby && timeutil.IsCurrentMonth(mostRecent, time.Now().UTC()) {
a.logger.Debug("activity log is disabled, cleaning up logs for the current month")
go a.deleteLogWorker(mostRecent.Unix(), make(chan struct{}))
}
return nil
}
now := time.Now().UTC()
if timeutil.IsPreviousMonth(mostRecent, now) {
// no activity logs to load for this month. if we are enabled, interpret
// it as having missed the rotation, so let it fall through and load
// if we missed generating the precomputed query, activeFragmentWorker()
// will clean things up when it runs next
a.logger.Trace("no log segments for current month", "mostRecent", mostRecent)
a.logger.Info("rotating activity log to new month")
} else if mostRecent.After(now) {
// we can't do anything if the most recent log is in the future
a.logger.Warn("timestamp from log to load is in the future", "timestamp", mostRecent)
return nil
} else if !timeutil.IsCurrentMonth(mostRecent, now) {
// the most recent log in storage is 2+ months in the past
a.logger.Warn("most recent log in storage is 2 or more months in the past.", "timestamp", mostRecent)
if a.core.perfStandby {
// reset the log without updating the timestamp
a.resetCurrentLog()
} else {
a.startNewCurrentLogLocked()
}
return nil
}
mostRecent := decreasingLogTimes[0]
if !timeutil.IsCurrentMonth(mostRecent, time.Now().UTC()) {
// no activity logs to load for this month
// If we are enabled, interpret it as having missed
// the rotation.
if a.enabled {
a.logger.Trace("no log segments for current month", "mostRecent", mostRecent)
a.logger.Info("rotating activity log to new month")
a.newMonthCurrentLogLocked(time.Now().UTC())
}
return nil
}
if !a.enabled {
a.logger.Warn("activity log exists but is disabled, cleaning up")
go a.deleteLogWorker(mostRecent.Unix(), make(chan struct{}))
return nil
}
// load token counts from storage into memory
if !a.core.perfStandby {
err = a.loadTokenCount(ctx, mostRecent)
if err != nil {
@ -751,6 +806,7 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro
}
}
// load entity logs from storage into memory
lastSegment, segmentsExist, err := a.getLastEntitySegmentNumber(ctx, mostRecent)
if err != nil {
return err
@ -804,6 +860,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {
// enabled is protected by fragmentLock
a.fragmentLock.Lock()
originalEnabled := a.enabled
switch config.Enabled {
case "enable":
a.enabled = true
@ -813,6 +870,10 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {
a.enabled = false
}
if a.enabled != originalEnabled {
a.logger.Info("activity log enable changed", "original", originalEnabled, "current", a.enabled)
}
if !a.enabled && a.currentSegment.startTimestamp != 0 {
a.logger.Trace("deleting current segment")
a.deleteDone = make(chan struct{})
@ -848,6 +909,30 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {
go a.retentionWorker(time.Now(), a.retentionMonths)
}
// update the enable flag and reset the current log
func (a *ActivityLog) SetConfigStandby(ctx context.Context, config activityConfig) {
a.l.Lock()
defer a.l.Unlock()
// enable is protected by fragmentLock
a.fragmentLock.Lock()
originalEnabled := a.enabled
switch config.Enabled {
case "enable":
a.enabled = true
case "default":
a.enabled = activityLogEnabledDefault
case "disable":
a.enabled = false
}
if a.enabled != originalEnabled {
a.logger.Info("activity log enable changed", "original", originalEnabled, "current", a.enabled)
a.resetCurrentLog()
}
a.fragmentLock.Unlock()
}
func (a *ActivityLog) queriesAvailable(ctx context.Context) (bool, error) {
if a.queryStore == nil {
return false, nil
@ -1111,6 +1196,8 @@ func (a *ActivityLog) HandleEndOfMonth(currentTime time.Time) error {
a.fragmentLock.RLock()
// Don't bother if disabled
// since l is locked earlier (and SetConfig() is the only way enabled can change)
// we don't need to worry about enabled changing during this work
enabled := a.enabled
a.fragmentLock.RUnlock()
if !enabled {
@ -1149,7 +1236,7 @@ func (a *ActivityLog) HandleEndOfMonth(currentTime time.Time) error {
// Advance the log; no need to force a save here because we have
// the intent log written already.
//
// On recovery refreshFromStoredLock() will see we're no longer
// On recovery refreshFromStoredLog() will see we're no longer
// in the previous month, and recover by calling newMonthCurrentLog
// again and triggering the precomputed query.
a.fragmentLock.Lock()

View File

@ -22,7 +22,9 @@ import (
)
const (
logPrefix = "sys/counters/activity/log/"
logPrefix = "sys/counters/activity/log/"
activityPrefix = "sys/counters/activity/"
activityConfigPath = "sys/counters/activity/config"
)
func TestActivityLog_Creation(t *testing.T) {
@ -98,11 +100,12 @@ func TestActivityLog_Creation(t *testing.T) {
func checkExpectedEntitiesInMap(t *testing.T, a *ActivityLog, entityIDs []string) {
t.Helper()
if len(a.activeEntities) != len(entityIDs) {
t.Fatalf("mismatched number of entities, expected %v got %v", len(entityIDs), a.activeEntities)
activeEntities := a.core.GetActiveEntities()
if len(activeEntities) != len(entityIDs) {
t.Fatalf("mismatched number of entities, expected %v got %v", len(entityIDs), activeEntities)
}
for _, e := range entityIDs {
if _, present := a.activeEntities[e]; !present {
if _, present := activeEntities[e]; !present {
t.Errorf("entity ID %q is missing", e)
}
}
@ -213,8 +216,6 @@ func expectedEntityIDs(t *testing.T, out *activity.EntityActivityLog, ids []stri
}
}
// TODO setup predicate for what we expect (both positive and negative case) for testing
// factor things out into predicates and actions so test body is compact.
func TestActivityLog_SaveTokensToStorage(t *testing.T) {
core, _, _ := TestCoreUnsealed(t)
a := core.activityLog
@ -964,6 +965,10 @@ func activeEntitiesEqual(t *testing.T, active map[string]struct{}, test []*activ
func (a *ActivityLog) resetEntitiesInMemory(t *testing.T) {
t.Helper()
a.l.Lock()
defer a.l.Unlock()
a.fragmentLock.Lock()
defer a.fragmentLock.Unlock()
a.currentSegment = segmentInfo{
startTimestamp: time.Time{}.Unix(),
currentEntities: &activity.EntityActivityLog{
@ -1069,8 +1074,9 @@ func TestActivityLog_loadCurrentEntitySegment(t *testing.T) {
t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Entities, a.currentSegment.currentEntities, tc.path)
}
if !activeEntitiesEqual(t, a.activeEntities, tc.entities.Entities) {
t.Errorf("bad data loaded into active entites. expected only set of EntityID from %v in %v for path %q", tc.entities.Entities, a.activeEntities, tc.path)
activeEntities := core.GetActiveEntities()
if !activeEntitiesEqual(t, activeEntities, tc.entities.Entities) {
t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v for path %q", tc.entities.Entities, activeEntities, tc.path)
}
a.resetEntitiesInMemory(t)
@ -1148,8 +1154,12 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) {
ctx := context.Background()
for _, tc := range testCases {
if tc.refresh {
a.l.Lock()
a.fragmentLock.Lock()
a.activeEntities = make(map[string]struct{})
a.currentSegment.startTimestamp = tc.time
a.fragmentLock.Unlock()
a.l.Unlock()
}
err := a.loadPriorEntitySegment(ctx, time.Unix(tc.time, 0), tc.seqNum)
@ -1157,8 +1167,9 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) {
t.Fatalf("got error loading data for %q: %v", tc.path, err)
}
if !activeEntitiesEqual(t, a.activeEntities, tc.entities.Entities) {
t.Errorf("bad data loaded into active entites. expected only set of EntityID from %v in %v for path %q", tc.entities.Entities, a.activeEntities, tc.path)
activeEntities := core.GetActiveEntities()
if !activeEntitiesEqual(t, activeEntities, tc.entities.Entities) {
t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v for path %q", tc.entities.Entities, activeEntities, tc.path)
}
}
}
@ -1258,13 +1269,15 @@ func TestActivityLog_StopAndRestart(t *testing.T) {
}
func setupActivityRecordsInStorage(t *testing.T, includeEntities, includeTokens bool) (*ActivityLog, []*activity.EntityRecord, map[string]uint64) {
// :base: is the timestamp to start from for the setup logic (use to simulate newest log from past or future)
// entity records returned include [0] data from a previous month and [1:] data from the current month
// token counts returned are from the current month
func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities, includeTokens bool) (*ActivityLog, []*activity.EntityRecord, map[string]uint64) {
t.Helper()
core, _, _ := TestCoreUnsealed(t)
a := core.activityLog
now := time.Now().UTC()
monthsAgo := now.AddDate(0, -3, 0)
monthsAgo := base.AddDate(0, -3, 0)
var entityRecords []*activity.EntityRecord
if includeEntities {
@ -1309,8 +1322,8 @@ func setupActivityRecordsInStorage(t *testing.T, includeEntities, includeTokens
}
writeToStorage(t, core, logPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData1)
writeToStorage(t, core, logPrefix+"entity/"+fmt.Sprint(now.Unix())+"/0", entityData2)
writeToStorage(t, core, logPrefix+"entity/"+fmt.Sprint(now.Unix())+"/1", entityData3)
writeToStorage(t, core, logPrefix+"entity/"+fmt.Sprint(base.Unix())+"/0", entityData2)
writeToStorage(t, core, logPrefix+"entity/"+fmt.Sprint(base.Unix())+"/1", entityData3)
}
var tokenRecords map[string]uint64
@ -1329,14 +1342,14 @@ func setupActivityRecordsInStorage(t *testing.T, includeEntities, includeTokens
t.Fatalf(err.Error())
}
writeToStorage(t, core, logPrefix+"directtokens/"+fmt.Sprint(now.Unix())+"/0", tokenData)
writeToStorage(t, core, logPrefix+"directtokens/"+fmt.Sprint(base.Unix())+"/0", tokenData)
}
return a, entityRecords, tokenRecords
}
func TestActivityLog_refreshFromStoredLog(t *testing.T) {
a, expectedEntityRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, true, true)
a, expectedEntityRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true)
a.enabled = true
var wg sync.WaitGroup
@ -1361,46 +1374,15 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) {
t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, a.currentSegment.tokenCount.CountByNamespaceID)
}
if !activeEntitiesEqual(t, a.activeEntities, expectedActive.Entities) {
activeEntities := a.core.GetActiveEntities()
if !activeEntitiesEqual(t, activeEntities, expectedActive.Entities) {
// we expect activeEntities to be loaded for the entire month
t.Errorf("bad data loaded into active entites. expected only set of EntityID from %v in %v", expectedActive.Entities, a.activeEntities)
}
}
func TestActivityLog_refreshFromStoredLogOnStandby(t *testing.T) {
a, expectedEntityRecords, _ := setupActivityRecordsInStorage(t, true, true)
a.enabled = true
a.core.perfStandby = true
var wg sync.WaitGroup
err := a.refreshFromStoredLog(context.Background(), &wg)
if err != nil {
t.Fatalf("got error loading stored activity logs: %v", err)
}
wg.Wait()
expectedActive := &activity.EntityActivityLog{
Entities: expectedEntityRecords[1:],
}
if !activeEntitiesEqual(t, a.activeEntities, expectedActive.Entities) {
// we expect activeEntities to be loaded for the entire month
t.Errorf("bad data loaded into active entites. expected only set of EntityID from %v in %v", expectedActive.Entities, a.activeEntities)
}
// we expect nothing to be loaded to a.currentSegment (other than startTimestamp for end of month checking)
if len(a.currentSegment.currentEntities.Entities) > 0 {
t.Errorf("currentSegment entities should not be populated. got: %v", a.currentSegment.currentEntities)
}
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 {
t.Errorf("currentSegment token counts should not be populated. got: %v", a.currentSegment.tokenCount.CountByNamespaceID)
}
if a.currentSegment.entitySequenceNumber != 0 {
t.Errorf("currentSegment sequence number should be 0. got: %v", a.currentSegment.entitySequenceNumber)
t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v", expectedActive.Entities, activeEntities)
}
}
func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testing.T) {
a, expectedEntityRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, true, true)
a, expectedEntityRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true)
a.enabled = true
var wg sync.WaitGroup
@ -1424,14 +1406,15 @@ func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testi
t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, a.currentSegment.tokenCount.CountByNamespaceID)
}
if !activeEntitiesEqual(t, a.activeEntities, expected.Entities) {
activeEntities := a.core.GetActiveEntities()
if !activeEntitiesEqual(t, activeEntities, expected.Entities) {
// we only expect activeEntities to be loaded for the newest segment (for the current month)
t.Errorf("bad data loaded into active entites. expected only set of EntityID from %v in %v", expected.Entities, a.activeEntities)
t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v", expected.Entities, activeEntities)
}
}
func TestActivityLog_refreshFromStoredLogContextCancelled(t *testing.T) {
a, _, _ := setupActivityRecordsInStorage(t, true, true)
a, _, _ := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true)
var wg sync.WaitGroup
ctx, cancelFn := context.WithCancel(context.Background())
@ -1444,7 +1427,7 @@ func TestActivityLog_refreshFromStoredLogContextCancelled(t *testing.T) {
}
func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) {
a, expectedEntityRecords, _ := setupActivityRecordsInStorage(t, true, false)
a, expectedEntityRecords, _ := setupActivityRecordsInStorage(t, time.Now().UTC(), true, false)
a.enabled = true
var wg sync.WaitGroup
@ -1464,8 +1447,9 @@ func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) {
// we expect all segments for the current month to be loaded
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, a.currentSegment.currentEntities)
}
if !activeEntitiesEqual(t, a.activeEntities, expectedActive.Entities) {
t.Errorf("bad data loaded into active entites. expected only set of EntityID from %v in %v", expectedActive.Entities, a.activeEntities)
activeEntities := a.core.GetActiveEntities()
if !activeEntitiesEqual(t, activeEntities, expectedActive.Entities) {
t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v", expectedActive.Entities, activeEntities)
}
// we expect no tokens
@ -1475,7 +1459,7 @@ func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) {
}
func TestActivityLog_refreshFromStoredLogNoEntities(t *testing.T) {
a, _, expectedTokenCounts := setupActivityRecordsInStorage(t, false, true)
a, _, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), false, true)
a.enabled = true
var wg sync.WaitGroup
@ -1493,21 +1477,37 @@ func TestActivityLog_refreshFromStoredLogNoEntities(t *testing.T) {
if len(a.currentSegment.currentEntities.Entities) > 0 {
t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentSegment.currentEntities)
}
if len(a.activeEntities) > 0 {
t.Errorf("expected no active entity segment to be loaded. got: %v", a.activeEntities)
activeEntities := a.core.GetActiveEntities()
if len(activeEntities) > 0 {
t.Errorf("expected no active entity segment to be loaded. got: %v", activeEntities)
}
}
func TestActivityLog_refreshFromStoredLogNoData(t *testing.T) {
a, _, _ := setupActivityRecordsInStorage(t, false, false)
a.enabled = true
// verify current segment refreshed with non-nil empty components and the :expectedStart: timestamp
// note: if :verifyTimeNotZero: is true, ignore :expectedStart: and just make sure the timestamp
// isn't 0
func expectCurrentSegmentRefreshed(t *testing.T, a *ActivityLog, expectedStart int64, verifyTimeNotZero bool) {
t.Helper()
var wg sync.WaitGroup
err := a.refreshFromStoredLog(context.Background(), &wg)
if err != nil {
t.Fatalf("got error loading stored activity logs: %v", err)
a.l.RLock()
defer a.l.RUnlock()
a.fragmentLock.RLock()
defer a.fragmentLock.RUnlock()
if a.currentSegment.currentEntities == nil {
t.Fatalf("expected non-nil currentSegment.currentEntities")
}
if a.currentSegment.currentEntities.Entities == nil {
t.Errorf("expected non-nil currentSegment.currentEntities.Entities")
}
if a.activeEntities == nil {
t.Errorf("expected non-nil activeEntities")
}
if a.currentSegment.tokenCount == nil {
t.Fatalf("expected non-nil currentSegment.tokenCount")
}
if a.currentSegment.tokenCount.CountByNamespaceID == nil {
t.Errorf("expected non-nil currentSegment.tokenCount.CountByNamespaceID")
}
wg.Wait()
if len(a.currentSegment.currentEntities.Entities) > 0 {
t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentSegment.currentEntities)
@ -1518,6 +1518,103 @@ func TestActivityLog_refreshFromStoredLogNoData(t *testing.T) {
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 {
t.Errorf("expected no token counts to be loaded. got: %v", a.currentSegment.tokenCount.CountByNamespaceID)
}
if verifyTimeNotZero {
if a.currentSegment.startTimestamp == 0 {
t.Error("bad start timestamp. expected no reset but timestamp was reset")
}
} else if a.currentSegment.startTimestamp != expectedStart {
t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentSegment.startTimestamp)
}
}
func TestActivityLog_refreshFromStoredLogNoData(t *testing.T) {
now := time.Now().UTC()
a, _, _ := setupActivityRecordsInStorage(t, now, false, false)
a.enabled = true
var wg sync.WaitGroup
err := a.refreshFromStoredLog(context.Background(), &wg)
if err != nil {
t.Fatalf("got error loading stored activity logs: %v", err)
}
wg.Wait()
expectCurrentSegmentRefreshed(t, a, now.Unix(), false)
}
func TestActivityLog_refreshFromStoredLogTwoMonthsPrevious(t *testing.T) {
// test what happens when the most recent data is from month M-2 (or earlier - same effect)
now := time.Now().UTC()
twoMonthsAgoStart := timeutil.StartOfPreviousMonth(timeutil.StartOfPreviousMonth(now))
a, _, _ := setupActivityRecordsInStorage(t, twoMonthsAgoStart, true, true)
a.enabled = true
var wg sync.WaitGroup
err := a.refreshFromStoredLog(context.Background(), &wg)
if err != nil {
t.Fatalf("got error loading stored activity logs: %v", err)
}
wg.Wait()
expectCurrentSegmentRefreshed(t, a, now.Unix(), false)
}
func TestActivityLog_refreshFromStoredLogPreviousMonth(t *testing.T) {
// test what happens when most recent data is from month M-1
// we expect to load the data from the previous month so that the activeFragmentWorker
// can handle end of month rotations
monthStart := timeutil.StartOfMonth(time.Now().UTC())
oneMonthAgoStart := timeutil.StartOfPreviousMonth(monthStart)
a, expectedEntityRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, oneMonthAgoStart, true, true)
a.enabled = true
var wg sync.WaitGroup
err := a.refreshFromStoredLog(context.Background(), &wg)
if err != nil {
t.Fatalf("got error loading stored activity logs: %v", err)
}
wg.Wait()
expectedActive := &activity.EntityActivityLog{
Entities: expectedEntityRecords[1:],
}
expectedCurrent := &activity.EntityActivityLog{
Entities: expectedEntityRecords[2:],
}
if !entityRecordsEqual(t, a.currentSegment.currentEntities.Entities, expectedCurrent.Entities) {
// we only expect the newest entity segment to be loaded (for the current month)
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, a.currentSegment.currentEntities)
}
if !reflect.DeepEqual(a.currentSegment.tokenCount.CountByNamespaceID, expectedTokenCounts) {
// we expect all token counts to be loaded
t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, a.currentSegment.tokenCount.CountByNamespaceID)
}
activeEntities := a.core.GetActiveEntities()
if !activeEntitiesEqual(t, activeEntities, expectedActive.Entities) {
// we expect activeEntities to be loaded for the entire month
t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v", expectedActive.Entities, activeEntities)
}
}
func TestActivityLog_refreshFromStoredLogNextMonth(t *testing.T) {
t.Skip("works on enterprise, fails on oss (oss boots with activity log disabled)")
// test what happens when most recent data is from month M+1
nextMonthStart := timeutil.StartOfNextMonth(time.Now().UTC())
a, _, _ := setupActivityRecordsInStorage(t, nextMonthStart, true, true)
a.enabled = true
var wg sync.WaitGroup
err := a.refreshFromStoredLog(context.Background(), &wg)
if err != nil {
t.Fatalf("got error loading stored activity logs: %v", err)
}
wg.Wait()
// we can't know exactly what the timestamp should be set to, just that it shouldn't be zero
expectCurrentSegmentRefreshed(t, a, time.Now().Unix(), true)
}
func TestActivityLog_IncludeNamespace(t *testing.T) {
@ -1685,21 +1782,7 @@ func TestActivityLog_EnableDisable(t *testing.T) {
}
expectMissingSegment(t, core, path)
if a.currentSegment.startTimestamp != 0 {
t.Errorf("bad startTimestamp, expected 0 got %v", a.currentSegment.startTimestamp)
}
if len(a.currentSegment.currentEntities.Entities) != 0 {
t.Errorf("expected empty currentEntities, got %v", a.currentSegment.currentEntities.Entities)
}
if len(a.currentSegment.tokenCount.CountByNamespaceID) != 0 {
t.Errorf("expected empty tokens, got %v", a.currentSegment.tokenCount.CountByNamespaceID)
}
if len(a.activeEntities) != 0 {
t.Errorf("expected empty activeEntities, got %v", a.activeEntities)
}
if a.fragment != nil {
t.Errorf("expected nil fragment")
}
expectCurrentSegmentRefreshed(t, a, 0, false)
// enable (if not already) which force-writes an empty segment
enableRequest()