Fix Racy Activity Log Tests (#10484)
* fix racy activity log tests and move testing utilities elsewhere * remove TODO * move SetEnable out of activity log * clarify not waiting on waitgroup * remove todo
This commit is contained in:
parent
f6138a98d1
commit
88eaf5f4c3
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -127,3 +128,14 @@ func MonthsPreviousTo(months int, now time.Time) time.Time {
|
|||
firstOfMonth := StartOfMonth(now.UTC())
|
||||
return firstOfMonth.AddDate(0, -months, 0)
|
||||
}
|
||||
|
||||
// Skip this test if too close to the end of a month!
|
||||
func SkipAtEndOfMonth(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
thisMonth := StartOfMonth(time.Now().UTC())
|
||||
endOfMonth := EndOfMonth(thisMonth)
|
||||
if endOfMonth.Sub(time.Now()) < 10*time.Minute {
|
||||
t.Skip("too close to end of month")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,10 @@ const (
|
|||
activityConfigKey = "config"
|
||||
activityIntentLogKey = "endofmonth"
|
||||
|
||||
// for testing purposes (public as needed)
|
||||
ActivityLogPrefix = "sys/counters/activity/log/"
|
||||
ActivityPrefix = "sys/counters/activity/"
|
||||
|
||||
// Time to wait on perf standby before sending fragment
|
||||
activityFragmentStandbyTime = 10 * time.Minute
|
||||
|
||||
|
@ -120,8 +124,6 @@ type ActivityLog struct {
|
|||
activeEntities map[string]struct{}
|
||||
|
||||
// track metadata and contents of the most recent log segment
|
||||
// currentSegment is currently unprotected by a mutex, because it is updated
|
||||
// only by the worker performing rotation.
|
||||
currentSegment segmentInfo
|
||||
|
||||
// Fragments received from performance standbys
|
||||
|
@ -132,11 +134,11 @@ type ActivityLog struct {
|
|||
defaultReportMonths int
|
||||
retentionMonths int
|
||||
|
||||
// cancel function to stop loading entities/tokens from storage to memory
|
||||
activityCancel context.CancelFunc
|
||||
|
||||
// channel closed by delete worker when done
|
||||
deleteDone chan struct{}
|
||||
|
||||
// for testing: is config currently being invalidated. protected by l
|
||||
configInvalidationInProgress bool
|
||||
}
|
||||
|
||||
// These non-persistent configuration options allow us to disable
|
||||
|
@ -197,21 +199,6 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
|
|||
return a, nil
|
||||
}
|
||||
|
||||
// Return the in-memory activeEntities from an activity log
|
||||
func (c *Core) GetActiveEntities() map[string]struct{} {
|
||||
out := make(map[string]struct{})
|
||||
|
||||
c.stateLock.RLock()
|
||||
c.activityLog.fragmentLock.RLock()
|
||||
for k, v := range c.activityLog.activeEntities {
|
||||
out[k] = v
|
||||
}
|
||||
c.activityLog.fragmentLock.RUnlock()
|
||||
c.stateLock.RUnlock()
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
// saveCurrentSegmentToStorage updates the record of Entities or
|
||||
// Non Entity Tokens in persistent storage
|
||||
// :force: forces a save of tokens/entities even if the in-memory log is empty
|
||||
|
@ -637,7 +624,7 @@ func (a *ActivityLog) loadTokenCount(ctx context.Context, startTime time.Time) e
|
|||
return nil
|
||||
}
|
||||
|
||||
// entityBackgroundLoader loads entity activity log records for start_date :t:
|
||||
// entityBackgroundLoader loads entity activity log records for start_date `t`
|
||||
func (a *ActivityLog) entityBackgroundLoader(ctx context.Context, wg *sync.WaitGroup, t time.Time, seqNums <-chan uint64) {
|
||||
defer wg.Done()
|
||||
for seqNum := range seqNums {
|
||||
|
@ -953,7 +940,7 @@ func (a *ActivityLog) queriesAvailable(ctx context.Context) (bool, error) {
|
|||
}
|
||||
|
||||
// setupActivityLog hooks up the singleton ActivityLog into Core.
|
||||
func (c *Core) setupActivityLog(ctx context.Context) error {
|
||||
func (c *Core) setupActivityLog(ctx context.Context, wg *sync.WaitGroup) error {
|
||||
logger := c.baseLogger.Named("activity")
|
||||
c.AddLogger(logger)
|
||||
|
||||
|
@ -971,10 +958,7 @@ func (c *Core) setupActivityLog(ctx context.Context) error {
|
|||
c.activityLog = manager
|
||||
|
||||
// load activity log for "this month" into memory
|
||||
refreshCtx, cancelFunc := context.WithCancel(namespace.RootContext(nil))
|
||||
manager.activityCancel = cancelFunc
|
||||
var wg sync.WaitGroup
|
||||
err = manager.refreshFromStoredLog(refreshCtx, &wg, time.Now().UTC())
|
||||
err = manager.refreshFromStoredLog(manager.core.activeContext, wg, time.Now().UTC())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1011,10 +995,6 @@ func (c *Core) stopActivityLog() error {
|
|||
if c.activityLog != nil {
|
||||
// Shut down background worker
|
||||
close(c.activityLog.doneCh)
|
||||
// cancel refreshing logs from storage
|
||||
if c.activityLog.activityCancel != nil {
|
||||
c.activityLog.activityCancel()
|
||||
}
|
||||
}
|
||||
|
||||
c.activityLog = nil
|
||||
|
|
|
@ -21,17 +21,11 @@ import (
|
|||
"github.com/hashicorp/vault/vault/activity"
|
||||
)
|
||||
|
||||
const (
|
||||
logPrefix = "sys/counters/activity/log/"
|
||||
activityPrefix = "sys/counters/activity/"
|
||||
activityConfigPath = "sys/counters/activity/config"
|
||||
)
|
||||
|
||||
func TestActivityLog_Creation(t *testing.T) {
|
||||
core, _, _ := TestCoreUnsealed(t)
|
||||
|
||||
a := core.activityLog
|
||||
a.enabled = true
|
||||
a.SetEnable(true)
|
||||
|
||||
if a == nil {
|
||||
t.Fatal("no activity log found")
|
||||
|
@ -114,7 +108,7 @@ func checkExpectedEntitiesInMap(t *testing.T, a *ActivityLog, entityIDs []string
|
|||
func TestActivityLog_UniqueEntities(t *testing.T) {
|
||||
core, _, _ := TestCoreUnsealed(t)
|
||||
a := core.activityLog
|
||||
a.enabled = true
|
||||
a.SetEnable(true)
|
||||
|
||||
id1 := "11111111-1111-1111-1111-111111111111"
|
||||
t1 := time.Now()
|
||||
|
@ -182,18 +176,6 @@ func expectMissingSegment(t *testing.T, c *Core, path string) {
|
|||
}
|
||||
}
|
||||
|
||||
func writeToStorage(t *testing.T, c *Core, path string, data []byte) {
|
||||
t.Helper()
|
||||
err := c.barrier.Put(context.Background(), &logical.StorageEntry{
|
||||
Key: path,
|
||||
Value: data,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to write %s to %s", data, path)
|
||||
}
|
||||
}
|
||||
|
||||
func expectedEntityIDs(t *testing.T, out *activity.EntityActivityLog, ids []string) {
|
||||
t.Helper()
|
||||
|
||||
|
@ -218,19 +200,20 @@ func expectedEntityIDs(t *testing.T, out *activity.EntityActivityLog, ids []stri
|
|||
|
||||
func TestActivityLog_SaveTokensToStorage(t *testing.T) {
|
||||
core, _, _ := TestCoreUnsealed(t)
|
||||
ctx := context.Background()
|
||||
|
||||
a := core.activityLog
|
||||
a.enabled = true
|
||||
// set a nonzero segment
|
||||
a.currentSegment.startTimestamp = time.Now().Unix()
|
||||
a.SetStandbyEnable(ctx, true)
|
||||
a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment
|
||||
|
||||
nsIDs := [...]string{"ns1_id", "ns2_id", "ns3_id"}
|
||||
path := fmt.Sprintf("%sdirecttokens/%d/0", logPrefix, a.currentSegment.startTimestamp)
|
||||
path := fmt.Sprintf("%sdirecttokens/%d/0", ActivityLogPrefix, a.GetStartTimestamp())
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
a.AddTokenToFragment(nsIDs[0])
|
||||
}
|
||||
a.AddTokenToFragment(nsIDs[1])
|
||||
err := a.saveCurrentSegmentToStorage(context.Background(), false)
|
||||
err := a.saveCurrentSegmentToStorage(ctx, false)
|
||||
if err != nil {
|
||||
t.Fatalf("got error writing tokens to storage: %v", err)
|
||||
}
|
||||
|
@ -262,7 +245,7 @@ func TestActivityLog_SaveTokensToStorage(t *testing.T) {
|
|||
|
||||
a.AddTokenToFragment(nsIDs[0])
|
||||
a.AddTokenToFragment(nsIDs[2])
|
||||
err = a.saveCurrentSegmentToStorage(context.Background(), false)
|
||||
err = a.saveCurrentSegmentToStorage(ctx, false)
|
||||
if err != nil {
|
||||
t.Fatalf("got error writing tokens to storage: %v", err)
|
||||
}
|
||||
|
@ -298,10 +281,11 @@ func TestActivityLog_SaveTokensToStorage(t *testing.T) {
|
|||
|
||||
func TestActivityLog_SaveEntitiesToStorage(t *testing.T) {
|
||||
core, _, _ := TestCoreUnsealed(t)
|
||||
ctx := context.Background()
|
||||
|
||||
a := core.activityLog
|
||||
a.enabled = true
|
||||
// set a nonzero segment
|
||||
a.currentSegment.startTimestamp = time.Now().Unix()
|
||||
a.SetStandbyEnable(ctx, true)
|
||||
a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment
|
||||
|
||||
now := time.Now()
|
||||
ids := []string{"11111111-1111-1111-1111-111111111111", "22222222-2222-2222-2222-222222222222", "33333333-2222-2222-2222-222222222222"}
|
||||
|
@ -310,11 +294,11 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) {
|
|||
now.Add(1 * time.Second).Unix(),
|
||||
now.Add(2 * time.Second).Unix(),
|
||||
}
|
||||
path := fmt.Sprintf("%sentity/%d/0", logPrefix, a.currentSegment.startTimestamp)
|
||||
path := fmt.Sprintf("%sentity/%d/0", ActivityLogPrefix, a.GetStartTimestamp())
|
||||
|
||||
a.AddEntityToFragment(ids[0], "root", times[0])
|
||||
a.AddEntityToFragment(ids[1], "root2", times[1])
|
||||
err := a.saveCurrentSegmentToStorage(context.Background(), false)
|
||||
err := a.saveCurrentSegmentToStorage(ctx, false)
|
||||
if err != nil {
|
||||
t.Fatalf("got error writing entities to storage: %v", err)
|
||||
}
|
||||
|
@ -332,7 +316,7 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) {
|
|||
|
||||
a.AddEntityToFragment(ids[0], "root", times[2])
|
||||
a.AddEntityToFragment(ids[2], "root", times[2])
|
||||
err = a.saveCurrentSegmentToStorage(context.Background(), false)
|
||||
err = a.saveCurrentSegmentToStorage(ctx, false)
|
||||
if err != nil {
|
||||
t.Fatalf("got error writing segments to storage: %v", err)
|
||||
}
|
||||
|
@ -349,7 +333,7 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) {
|
|||
func TestActivityLog_ReceivedFragment(t *testing.T) {
|
||||
core, _, _ := TestCoreUnsealed(t)
|
||||
a := core.activityLog
|
||||
a.enabled = true
|
||||
a.SetEnable(true)
|
||||
|
||||
ids := []string{
|
||||
"11111111-1111-1111-1111-111111111111",
|
||||
|
@ -419,7 +403,7 @@ func TestActivityLog_availableLogs(t *testing.T) {
|
|||
expectedTimes := [...]time.Time{time.Unix(1000000, 0), time.Unix(1111, 0), time.Unix(992, 0)}
|
||||
|
||||
for _, path := range paths {
|
||||
writeToStorage(t, core, logPrefix+path, []byte("test"))
|
||||
WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
|
||||
}
|
||||
|
||||
// verify above files are there, and dates in correct order
|
||||
|
@ -443,18 +427,16 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
|
|||
a := core.activityLog
|
||||
|
||||
// enabled check is now inside AddEntityToFragment
|
||||
a.enabled = true
|
||||
// set a nonzero segment
|
||||
a.l.Lock()
|
||||
a.currentSegment.startTimestamp = time.Now().Unix()
|
||||
a.l.Unlock()
|
||||
a.SetEnable(true)
|
||||
a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment
|
||||
|
||||
// Stop timers for test purposes
|
||||
close(a.doneCh)
|
||||
|
||||
path0 := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", a.currentSegment.startTimestamp)
|
||||
path1 := fmt.Sprintf("sys/counters/activity/log/entity/%d/1", a.currentSegment.startTimestamp)
|
||||
tokenPath := fmt.Sprintf("sys/counters/activity/log/directtokens/%d/0", a.currentSegment.startTimestamp)
|
||||
startTimestamp := a.GetStartTimestamp()
|
||||
path0 := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", startTimestamp)
|
||||
path1 := fmt.Sprintf("sys/counters/activity/log/entity/%d/1", startTimestamp)
|
||||
tokenPath := fmt.Sprintf("sys/counters/activity/log/directtokens/%d/0", startTimestamp)
|
||||
|
||||
genID := func(i int) string {
|
||||
return fmt.Sprintf("11111111-1111-1111-1111-%012d", i)
|
||||
|
@ -542,8 +524,9 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
|
|||
t.Fatalf("got error writing entities to storage: %v", err)
|
||||
}
|
||||
|
||||
if a.currentSegment.entitySequenceNumber != 1 {
|
||||
t.Fatalf("expected sequence number 1, got %v", a.currentSegment.entitySequenceNumber)
|
||||
seqNum := a.GetEntitySequenceNumber()
|
||||
if seqNum != 1 {
|
||||
t.Fatalf("expected sequence number 1, got %v", seqNum)
|
||||
}
|
||||
|
||||
protoSegment0 = readSegmentFromStorage(t, core, path0)
|
||||
|
@ -793,7 +776,7 @@ func TestActivityLog_getLastEntitySegmentNumber(t *testing.T) {
|
|||
a := core.activityLog
|
||||
paths := [...]string{"entity/992/0", "entity/1000/-1", "entity/1001/foo", "entity/1111/0", "entity/1111/1"}
|
||||
for _, path := range paths {
|
||||
writeToStorage(t, core, logPrefix+path, []byte("test"))
|
||||
WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
|
@ -848,7 +831,7 @@ func TestActivityLog_tokenCountExists(t *testing.T) {
|
|||
a := core.activityLog
|
||||
paths := [...]string{"directtokens/992/0", "directtokens/1001/foo", "directtokens/1111/0", "directtokens/2222/1"}
|
||||
for _, path := range paths {
|
||||
writeToStorage(t, core, logPrefix+path, []byte("test"))
|
||||
WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
|
@ -948,22 +931,6 @@ func entityRecordsEqual(t *testing.T, record1, record2 []*activity.EntityRecord)
|
|||
return true
|
||||
}
|
||||
|
||||
func activeEntitiesEqual(t *testing.T, active map[string]struct{}, test []*activity.EntityRecord) bool {
|
||||
t.Helper()
|
||||
|
||||
if len(active) != len(test) {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, ent := range test {
|
||||
if _, ok := active[ent.EntityID]; !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (a *ActivityLog) resetEntitiesInMemory(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
|
@ -993,7 +960,7 @@ func TestActivityLog_loadCurrentEntitySegment(t *testing.T) {
|
|||
tokenCount := &activity.TokenCount{
|
||||
CountByNamespaceID: tokenRecords,
|
||||
}
|
||||
a.currentSegment.tokenCount = tokenCount
|
||||
a.SetTokenCount(tokenCount)
|
||||
|
||||
// setup in-storage data to load for testing
|
||||
entityRecords := []*activity.EntityRecord{
|
||||
|
@ -1052,7 +1019,7 @@ func TestActivityLog_loadCurrentEntitySegment(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
writeToStorage(t, core, logPrefix+tc.path, data)
|
||||
WriteToStorage(t, core, ActivityLogPrefix+tc.path, data)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
@ -1061,23 +1028,28 @@ func TestActivityLog_loadCurrentEntitySegment(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("got error loading data for %q: %v", tc.path, err)
|
||||
}
|
||||
if !reflect.DeepEqual(a.currentSegment.tokenCount.CountByNamespaceID, tokenCount.CountByNamespaceID) {
|
||||
if !reflect.DeepEqual(a.GetCountByNamespaceID(), tokenCount.CountByNamespaceID) {
|
||||
t.Errorf("this function should not wipe out the in-memory token count")
|
||||
}
|
||||
|
||||
// verify accurate data in in-memory current segment
|
||||
if a.currentSegment.startTimestamp != tc.time {
|
||||
t.Errorf("bad timestamp loaded. expected: %v, got: %v for path %q", tc.time, a.currentSegment.startTimestamp, tc.path)
|
||||
startTimestamp := a.GetStartTimestamp()
|
||||
if startTimestamp != tc.time {
|
||||
t.Errorf("bad timestamp loaded. expected: %v, got: %v for path %q", tc.time, startTimestamp, tc.path)
|
||||
}
|
||||
if a.currentSegment.entitySequenceNumber != tc.seqNum {
|
||||
t.Errorf("bad sequence number loaded. expected: %v, got: %v for path %q", tc.seqNum, a.currentSegment.entitySequenceNumber, tc.path)
|
||||
|
||||
seqNum := a.GetEntitySequenceNumber()
|
||||
if seqNum != tc.seqNum {
|
||||
t.Errorf("bad sequence number loaded. expected: %v, got: %v for path %q", tc.seqNum, seqNum, tc.path)
|
||||
}
|
||||
if !entityRecordsEqual(t, a.currentSegment.currentEntities.Entities, tc.entities.Entities) {
|
||||
t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Entities, a.currentSegment.currentEntities, tc.path)
|
||||
|
||||
currentEntities := a.GetCurrentEntities()
|
||||
if !entityRecordsEqual(t, currentEntities.Entities, tc.entities.Entities) {
|
||||
t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Entities, currentEntities, tc.path)
|
||||
}
|
||||
|
||||
activeEntities := core.GetActiveEntities()
|
||||
if !activeEntitiesEqual(t, activeEntities, tc.entities.Entities) {
|
||||
if !ActiveEntitiesEqual(activeEntities, tc.entities.Entities) {
|
||||
t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v for path %q", tc.entities.Entities, activeEntities, tc.path)
|
||||
}
|
||||
|
||||
|
@ -1088,7 +1060,7 @@ func TestActivityLog_loadCurrentEntitySegment(t *testing.T) {
|
|||
func TestActivityLog_loadPriorEntitySegment(t *testing.T) {
|
||||
core, _, _ := TestCoreUnsealed(t)
|
||||
a := core.activityLog
|
||||
a.enabled = true
|
||||
a.SetEnable(true)
|
||||
|
||||
// setup in-storage data to load for testing
|
||||
entityRecords := []*activity.EntityRecord{
|
||||
|
@ -1150,7 +1122,7 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
writeToStorage(t, core, logPrefix+tc.path, data)
|
||||
WriteToStorage(t, core, ActivityLogPrefix+tc.path, data)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
@ -1170,7 +1142,7 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) {
|
|||
}
|
||||
|
||||
activeEntities := core.GetActiveEntities()
|
||||
if !activeEntitiesEqual(t, activeEntities, tc.entities.Entities) {
|
||||
if !ActiveEntitiesEqual(activeEntities, tc.entities.Entities) {
|
||||
t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v for path %q", tc.entities.Entities, activeEntities, tc.path)
|
||||
}
|
||||
}
|
||||
|
@ -1210,7 +1182,7 @@ func TestActivityLog_loadTokenCount(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
writeToStorage(t, core, logPrefix+tc.path, data)
|
||||
WriteToStorage(t, core, ActivityLogPrefix+tc.path, data)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
@ -1220,8 +1192,10 @@ func TestActivityLog_loadTokenCount(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("got error loading data for %q: %v", tc.path, err)
|
||||
}
|
||||
if !reflect.DeepEqual(a.currentSegment.tokenCount.CountByNamespaceID, tokenRecords) {
|
||||
t.Errorf("bad token count loaded. expected: %v got: %v for path %q", tokenRecords, a.currentSegment.tokenCount.CountByNamespaceID, tc.path)
|
||||
|
||||
nsCount := a.GetCountByNamespaceID()
|
||||
if !reflect.DeepEqual(nsCount, tokenRecords) {
|
||||
t.Errorf("bad token count loaded. expected: %v got: %v for path %q", tokenRecords, nsCount, tc.path)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1261,10 +1235,12 @@ func TestActivityLog_StopAndRestart(t *testing.T) {
|
|||
|
||||
// Simulate seal/unseal cycle
|
||||
core.stopActivityLog()
|
||||
core.setupActivityLog(ctx)
|
||||
var wg sync.WaitGroup
|
||||
core.setupActivityLog(ctx, &wg)
|
||||
wg.Wait()
|
||||
|
||||
a = core.activityLog
|
||||
if a.currentSegment.tokenCount.CountByNamespaceID == nil {
|
||||
if a.GetCountByNamespaceID() == nil {
|
||||
t.Fatalf("nil token count map")
|
||||
}
|
||||
|
||||
|
@ -1330,9 +1306,9 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities
|
|||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
writeToStorage(t, core, logPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData1)
|
||||
writeToStorage(t, core, logPrefix+"entity/"+fmt.Sprint(base.Unix())+"/0", entityData2)
|
||||
writeToStorage(t, core, logPrefix+"entity/"+fmt.Sprint(base.Unix())+"/1", entityData3)
|
||||
WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData1)
|
||||
WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/0", entityData2)
|
||||
WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/1", entityData3)
|
||||
}
|
||||
|
||||
var tokenRecords map[string]uint64
|
||||
|
@ -1351,7 +1327,7 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities
|
|||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
writeToStorage(t, core, logPrefix+"directtokens/"+fmt.Sprint(base.Unix())+"/0", tokenData)
|
||||
WriteToStorage(t, core, ActivityLogPrefix+"directtokens/"+fmt.Sprint(base.Unix())+"/0", tokenData)
|
||||
}
|
||||
|
||||
return a, entityRecords, tokenRecords
|
||||
|
@ -1359,7 +1335,7 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities
|
|||
|
||||
func TestActivityLog_refreshFromStoredLog(t *testing.T) {
|
||||
a, expectedEntityRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true)
|
||||
a.enabled = true
|
||||
a.SetEnable(true)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
err := a.refreshFromStoredLog(context.Background(), &wg, time.Now().UTC())
|
||||
|
@ -1374,17 +1350,21 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) {
|
|||
expectedCurrent := &activity.EntityActivityLog{
|
||||
Entities: expectedEntityRecords[2:],
|
||||
}
|
||||
if !entityRecordsEqual(t, a.currentSegment.currentEntities.Entities, expectedCurrent.Entities) {
|
||||
|
||||
currentEntities := a.GetCurrentEntities()
|
||||
if !entityRecordsEqual(t, currentEntities.Entities, expectedCurrent.Entities) {
|
||||
// we only expect the newest entity segment to be loaded (for the current month)
|
||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, a.currentSegment.currentEntities)
|
||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities)
|
||||
}
|
||||
if !reflect.DeepEqual(a.currentSegment.tokenCount.CountByNamespaceID, expectedTokenCounts) {
|
||||
|
||||
nsCount := a.GetCountByNamespaceID()
|
||||
if !reflect.DeepEqual(nsCount, expectedTokenCounts) {
|
||||
// we expect all token counts to be loaded
|
||||
t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, a.currentSegment.tokenCount.CountByNamespaceID)
|
||||
t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, nsCount)
|
||||
}
|
||||
|
||||
activeEntities := a.core.GetActiveEntities()
|
||||
if !activeEntitiesEqual(t, activeEntities, expectedActive.Entities) {
|
||||
if !ActiveEntitiesEqual(activeEntities, expectedActive.Entities) {
|
||||
// we expect activeEntities to be loaded for the entire month
|
||||
t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v", expectedActive.Entities, activeEntities)
|
||||
}
|
||||
|
@ -1392,7 +1372,7 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) {
|
|||
|
||||
func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testing.T) {
|
||||
a, expectedEntityRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true)
|
||||
a.enabled = true
|
||||
a.SetEnable(true)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
close(a.doneCh)
|
||||
|
@ -1406,17 +1386,21 @@ func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testi
|
|||
expected := &activity.EntityActivityLog{
|
||||
Entities: expectedEntityRecords[2:],
|
||||
}
|
||||
if !entityRecordsEqual(t, a.currentSegment.currentEntities.Entities, expected.Entities) {
|
||||
|
||||
currentEntities := a.GetCurrentEntities()
|
||||
if !entityRecordsEqual(t, currentEntities.Entities, expected.Entities) {
|
||||
// we only expect the newest entity segment to be loaded (for the current month)
|
||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expected, a.currentSegment.currentEntities)
|
||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expected, currentEntities)
|
||||
}
|
||||
if !reflect.DeepEqual(a.currentSegment.tokenCount.CountByNamespaceID, expectedTokenCounts) {
|
||||
|
||||
nsCount := a.GetCountByNamespaceID()
|
||||
if !reflect.DeepEqual(nsCount, expectedTokenCounts) {
|
||||
// we expect all token counts to be loaded
|
||||
t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, a.currentSegment.tokenCount.CountByNamespaceID)
|
||||
t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, nsCount)
|
||||
}
|
||||
|
||||
activeEntities := a.core.GetActiveEntities()
|
||||
if !activeEntitiesEqual(t, activeEntities, expected.Entities) {
|
||||
if !ActiveEntitiesEqual(activeEntities, expected.Entities) {
|
||||
// we only expect activeEntities to be loaded for the newest segment (for the current month)
|
||||
t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v", expected.Entities, activeEntities)
|
||||
}
|
||||
|
@ -1437,7 +1421,7 @@ func TestActivityLog_refreshFromStoredLogContextCancelled(t *testing.T) {
|
|||
|
||||
func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) {
|
||||
a, expectedEntityRecords, _ := setupActivityRecordsInStorage(t, time.Now().UTC(), true, false)
|
||||
a.enabled = true
|
||||
a.SetEnable(true)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
err := a.refreshFromStoredLog(context.Background(), &wg, time.Now().UTC())
|
||||
|
@ -1452,24 +1436,27 @@ func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) {
|
|||
expectedCurrent := &activity.EntityActivityLog{
|
||||
Entities: expectedEntityRecords[2:],
|
||||
}
|
||||
if !entityRecordsEqual(t, a.currentSegment.currentEntities.Entities, expectedCurrent.Entities) {
|
||||
|
||||
currentEntities := a.GetCurrentEntities()
|
||||
if !entityRecordsEqual(t, currentEntities.Entities, expectedCurrent.Entities) {
|
||||
// we expect all segments for the current month to be loaded
|
||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, a.currentSegment.currentEntities)
|
||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities)
|
||||
}
|
||||
activeEntities := a.core.GetActiveEntities()
|
||||
if !activeEntitiesEqual(t, activeEntities, expectedActive.Entities) {
|
||||
if !ActiveEntitiesEqual(activeEntities, expectedActive.Entities) {
|
||||
t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v", expectedActive.Entities, activeEntities)
|
||||
}
|
||||
|
||||
// we expect no tokens
|
||||
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 {
|
||||
t.Errorf("expected no token counts to be loaded. got: %v", a.currentSegment.tokenCount.CountByNamespaceID)
|
||||
nsCount := a.GetCountByNamespaceID()
|
||||
if len(nsCount) > 0 {
|
||||
t.Errorf("expected no token counts to be loaded. got: %v", nsCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActivityLog_refreshFromStoredLogNoEntities(t *testing.T) {
|
||||
a, _, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), false, true)
|
||||
a.enabled = true
|
||||
a.SetEnable(true)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
err := a.refreshFromStoredLog(context.Background(), &wg, time.Now().UTC())
|
||||
|
@ -1478,13 +1465,15 @@ func TestActivityLog_refreshFromStoredLogNoEntities(t *testing.T) {
|
|||
}
|
||||
wg.Wait()
|
||||
|
||||
if !reflect.DeepEqual(a.currentSegment.tokenCount.CountByNamespaceID, expectedTokenCounts) {
|
||||
nsCount := a.GetCountByNamespaceID()
|
||||
if !reflect.DeepEqual(nsCount, expectedTokenCounts) {
|
||||
// we expect all token counts to be loaded
|
||||
t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, a.currentSegment.tokenCount.CountByNamespaceID)
|
||||
t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, nsCount)
|
||||
}
|
||||
|
||||
if len(a.currentSegment.currentEntities.Entities) > 0 {
|
||||
t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentSegment.currentEntities)
|
||||
currentEntities := a.GetCurrentEntities()
|
||||
if len(currentEntities.Entities) > 0 {
|
||||
t.Errorf("expected no current entity segment to be loaded. got: %v", currentEntities)
|
||||
}
|
||||
activeEntities := a.core.GetActiveEntities()
|
||||
if len(activeEntities) > 0 {
|
||||
|
@ -1492,55 +1481,10 @@ func TestActivityLog_refreshFromStoredLogNoEntities(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// verify current segment refreshed with non-nil empty components and the :expectedStart: timestamp
|
||||
// note: if :verifyTimeNotZero: is true, ignore :expectedStart: and just make sure the timestamp
|
||||
// isn't 0
|
||||
func expectCurrentSegmentRefreshed(t *testing.T, a *ActivityLog, expectedStart int64, verifyTimeNotZero bool) {
|
||||
t.Helper()
|
||||
|
||||
a.l.RLock()
|
||||
defer a.l.RUnlock()
|
||||
a.fragmentLock.RLock()
|
||||
defer a.fragmentLock.RUnlock()
|
||||
if a.currentSegment.currentEntities == nil {
|
||||
t.Fatalf("expected non-nil currentSegment.currentEntities")
|
||||
}
|
||||
if a.currentSegment.currentEntities.Entities == nil {
|
||||
t.Errorf("expected non-nil currentSegment.currentEntities.Entities")
|
||||
}
|
||||
if a.activeEntities == nil {
|
||||
t.Errorf("expected non-nil activeEntities")
|
||||
}
|
||||
if a.currentSegment.tokenCount == nil {
|
||||
t.Fatalf("expected non-nil currentSegment.tokenCount")
|
||||
}
|
||||
if a.currentSegment.tokenCount.CountByNamespaceID == nil {
|
||||
t.Errorf("expected non-nil currentSegment.tokenCount.CountByNamespaceID")
|
||||
}
|
||||
|
||||
if len(a.currentSegment.currentEntities.Entities) > 0 {
|
||||
t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentSegment.currentEntities)
|
||||
}
|
||||
if len(a.activeEntities) > 0 {
|
||||
t.Errorf("expected no active entity segment to be loaded. got: %v", a.activeEntities)
|
||||
}
|
||||
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 {
|
||||
t.Errorf("expected no token counts to be loaded. got: %v", a.currentSegment.tokenCount.CountByNamespaceID)
|
||||
}
|
||||
|
||||
if verifyTimeNotZero {
|
||||
if a.currentSegment.startTimestamp == 0 {
|
||||
t.Error("bad start timestamp. expected no reset but timestamp was reset")
|
||||
}
|
||||
} else if a.currentSegment.startTimestamp != expectedStart {
|
||||
t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentSegment.startTimestamp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActivityLog_refreshFromStoredLogNoData(t *testing.T) {
|
||||
now := time.Now().UTC()
|
||||
a, _, _ := setupActivityRecordsInStorage(t, now, false, false)
|
||||
a.enabled = true
|
||||
a.SetEnable(true)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
err := a.refreshFromStoredLog(context.Background(), &wg, now)
|
||||
|
@ -1549,7 +1493,7 @@ func TestActivityLog_refreshFromStoredLogNoData(t *testing.T) {
|
|||
}
|
||||
wg.Wait()
|
||||
|
||||
expectCurrentSegmentRefreshed(t, a, now.Unix(), false)
|
||||
a.ExpectCurrentSegmentRefreshed(t, now.Unix(), false)
|
||||
}
|
||||
|
||||
func TestActivityLog_refreshFromStoredLogTwoMonthsPrevious(t *testing.T) {
|
||||
|
@ -1557,7 +1501,7 @@ func TestActivityLog_refreshFromStoredLogTwoMonthsPrevious(t *testing.T) {
|
|||
now := time.Now().UTC()
|
||||
twoMonthsAgoStart := timeutil.StartOfPreviousMonth(timeutil.StartOfPreviousMonth(now))
|
||||
a, _, _ := setupActivityRecordsInStorage(t, twoMonthsAgoStart, true, true)
|
||||
a.enabled = true
|
||||
a.SetEnable(true)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
err := a.refreshFromStoredLog(context.Background(), &wg, now)
|
||||
|
@ -1566,7 +1510,7 @@ func TestActivityLog_refreshFromStoredLogTwoMonthsPrevious(t *testing.T) {
|
|||
}
|
||||
wg.Wait()
|
||||
|
||||
expectCurrentSegmentRefreshed(t, a, now.Unix(), false)
|
||||
a.ExpectCurrentSegmentRefreshed(t, now.Unix(), false)
|
||||
}
|
||||
|
||||
func TestActivityLog_refreshFromStoredLogPreviousMonth(t *testing.T) {
|
||||
|
@ -1576,7 +1520,7 @@ func TestActivityLog_refreshFromStoredLogPreviousMonth(t *testing.T) {
|
|||
monthStart := timeutil.StartOfMonth(time.Now().UTC())
|
||||
oneMonthAgoStart := timeutil.StartOfPreviousMonth(monthStart)
|
||||
a, expectedEntityRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, oneMonthAgoStart, true, true)
|
||||
a.enabled = true
|
||||
a.SetEnable(true)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
err := a.refreshFromStoredLog(context.Background(), &wg, time.Now().UTC())
|
||||
|
@ -1591,17 +1535,21 @@ func TestActivityLog_refreshFromStoredLogPreviousMonth(t *testing.T) {
|
|||
expectedCurrent := &activity.EntityActivityLog{
|
||||
Entities: expectedEntityRecords[2:],
|
||||
}
|
||||
if !entityRecordsEqual(t, a.currentSegment.currentEntities.Entities, expectedCurrent.Entities) {
|
||||
|
||||
currentEntities := a.GetCurrentEntities()
|
||||
if !entityRecordsEqual(t, currentEntities.Entities, expectedCurrent.Entities) {
|
||||
// we only expect the newest entity segment to be loaded (for the current month)
|
||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, a.currentSegment.currentEntities)
|
||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities)
|
||||
}
|
||||
if !reflect.DeepEqual(a.currentSegment.tokenCount.CountByNamespaceID, expectedTokenCounts) {
|
||||
|
||||
nsCount := a.GetCountByNamespaceID()
|
||||
if !reflect.DeepEqual(nsCount, expectedTokenCounts) {
|
||||
// we expect all token counts to be loaded
|
||||
t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, a.currentSegment.tokenCount.CountByNamespaceID)
|
||||
t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, nsCount)
|
||||
}
|
||||
|
||||
activeEntities := a.core.GetActiveEntities()
|
||||
if !activeEntitiesEqual(t, activeEntities, expectedActive.Entities) {
|
||||
if !ActiveEntitiesEqual(activeEntities, expectedActive.Entities) {
|
||||
// we expect activeEntities to be loaded for the entire month
|
||||
t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v", expectedActive.Entities, activeEntities)
|
||||
}
|
||||
|
@ -1667,7 +1615,7 @@ func TestActivityLog_DeleteWorker(t *testing.T) {
|
|||
"directtokens/1112/1",
|
||||
}
|
||||
for _, path := range paths {
|
||||
writeToStorage(t, core, logPrefix+path, []byte("test"))
|
||||
WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
|
||||
}
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
|
@ -1682,28 +1630,18 @@ func TestActivityLog_DeleteWorker(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check segments still present
|
||||
readSegmentFromStorage(t, core, logPrefix+"entity/1112/1")
|
||||
readSegmentFromStorage(t, core, logPrefix+"directtokens/1112/1")
|
||||
readSegmentFromStorage(t, core, ActivityLogPrefix+"entity/1112/1")
|
||||
readSegmentFromStorage(t, core, ActivityLogPrefix+"directtokens/1112/1")
|
||||
|
||||
// Check other segments not present
|
||||
expectMissingSegment(t, core, logPrefix+"entity/1111/1")
|
||||
expectMissingSegment(t, core, logPrefix+"entity/1111/2")
|
||||
expectMissingSegment(t, core, logPrefix+"entity/1111/3")
|
||||
expectMissingSegment(t, core, logPrefix+"directtokens/1111/1")
|
||||
}
|
||||
|
||||
// Skip this test if too close to the end of a month!
|
||||
// TODO: move testhelper?
|
||||
func SkipAtEndOfMonth(t *testing.T) {
|
||||
thisMonth := timeutil.StartOfMonth(time.Now().UTC())
|
||||
endOfMonth := timeutil.EndOfMonth(thisMonth)
|
||||
if endOfMonth.Sub(time.Now()) < 10*time.Minute {
|
||||
t.Skip("too close to end of month")
|
||||
}
|
||||
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/1")
|
||||
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/2")
|
||||
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/3")
|
||||
expectMissingSegment(t, core, ActivityLogPrefix+"directtokens/1111/1")
|
||||
}
|
||||
|
||||
func TestActivityLog_EnableDisable(t *testing.T) {
|
||||
SkipAtEndOfMonth(t)
|
||||
timeutil.SkipAtEndOfMonth(t)
|
||||
|
||||
core, b, _ := testCoreSystemBackend(t)
|
||||
a := core.activityLog
|
||||
|
@ -1746,15 +1684,15 @@ func TestActivityLog_EnableDisable(t *testing.T) {
|
|||
a.AddEntityToFragment(id1, "root", time.Now().Unix())
|
||||
a.AddEntityToFragment(id2, "root", time.Now().Unix())
|
||||
|
||||
a.currentSegment.startTimestamp -= 10
|
||||
seg1 := a.currentSegment.startTimestamp
|
||||
a.SetStartTimestamp(a.GetStartTimestamp() - 10)
|
||||
seg1 := a.GetStartTimestamp()
|
||||
err := a.saveCurrentSegmentToStorage(ctx, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// verify segment exists
|
||||
path := fmt.Sprintf("%ventity/%v/0", logPrefix, seg1)
|
||||
path := fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, seg1)
|
||||
readSegmentFromStorage(t, core, path)
|
||||
|
||||
// Add in-memory fragment
|
||||
|
@ -1772,27 +1710,27 @@ func TestActivityLog_EnableDisable(t *testing.T) {
|
|||
}
|
||||
|
||||
expectMissingSegment(t, core, path)
|
||||
expectCurrentSegmentRefreshed(t, a, 0, false)
|
||||
a.ExpectCurrentSegmentRefreshed(t, 0, false)
|
||||
|
||||
// enable (if not already) which force-writes an empty segment
|
||||
enableRequest()
|
||||
|
||||
seg2 := a.currentSegment.startTimestamp
|
||||
seg2 := a.GetStartTimestamp()
|
||||
if seg1 >= seg2 {
|
||||
t.Errorf("bad second segment timestamp, %v >= %v", seg1, seg2)
|
||||
}
|
||||
|
||||
// Verify empty segments are present
|
||||
path = fmt.Sprintf("%ventity/%v/0", logPrefix, seg2)
|
||||
path = fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, seg2)
|
||||
readSegmentFromStorage(t, core, path)
|
||||
|
||||
path = fmt.Sprintf("%vdirecttokens/%v/0", logPrefix, seg2)
|
||||
path = fmt.Sprintf("%vdirecttokens/%v/0", ActivityLogPrefix, seg2)
|
||||
readSegmentFromStorage(t, core, path)
|
||||
}
|
||||
|
||||
func TestActivityLog_EndOfMonth(t *testing.T) {
|
||||
// We only want *fake* end of months, *real* ones are too scary.
|
||||
SkipAtEndOfMonth(t)
|
||||
timeutil.SkipAtEndOfMonth(t)
|
||||
|
||||
core, _, _ := TestCoreUnsealed(t)
|
||||
a := core.activityLog
|
||||
|
@ -1811,7 +1749,7 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
|
|||
a.AddEntityToFragment(id1, "root", time.Now().Unix())
|
||||
|
||||
month0 := time.Now().UTC()
|
||||
segment0 := a.currentSegment.startTimestamp
|
||||
segment0 := a.GetStartTimestamp()
|
||||
month1 := month0.AddDate(0, 1, 0)
|
||||
month2 := month0.AddDate(0, 2, 0)
|
||||
|
||||
|
@ -1819,7 +1757,7 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
|
|||
a.HandleEndOfMonth(month1)
|
||||
|
||||
// Check segment is present, with 1 entity
|
||||
path := fmt.Sprintf("%ventity/%v/0", logPrefix, segment0)
|
||||
path := fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, segment0)
|
||||
protoSegment := readSegmentFromStorage(t, core, path)
|
||||
out := &activity.EntityActivityLog{}
|
||||
err := proto.Unmarshal(protoSegment.Value, out)
|
||||
|
@ -1827,7 +1765,7 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
segment1 := a.currentSegment.startTimestamp
|
||||
segment1 := a.GetStartTimestamp()
|
||||
expectedTimestamp := timeutil.StartOfMonth(month1).Unix()
|
||||
if segment1 != expectedTimestamp {
|
||||
t.Errorf("expected segment timestamp %v got %v", expectedTimestamp, segment1)
|
||||
|
@ -1855,7 +1793,7 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
|
|||
a.AddEntityToFragment(id2, "root", time.Now().Unix())
|
||||
|
||||
a.HandleEndOfMonth(month2)
|
||||
segment2 := a.currentSegment.startTimestamp
|
||||
segment2 := a.GetStartTimestamp()
|
||||
|
||||
a.AddEntityToFragment(id3, "root", time.Now().Unix())
|
||||
|
||||
|
@ -1876,7 +1814,7 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
|
|||
|
||||
for i, tc := range testCases {
|
||||
t.Logf("checking segment %v timestamp %v", i, tc.SegmentTimestamp)
|
||||
path := fmt.Sprintf("%ventity/%v/0", logPrefix, tc.SegmentTimestamp)
|
||||
path := fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, tc.SegmentTimestamp)
|
||||
protoSegment := readSegmentFromStorage(t, core, path)
|
||||
out := &activity.EntityActivityLog{}
|
||||
err = proto.Unmarshal(protoSegment.Value, out)
|
||||
|
@ -1898,7 +1836,7 @@ func TestActivityLog_SaveAfterDisable(t *testing.T) {
|
|||
})
|
||||
|
||||
a.AddEntityToFragment("1111-1111-11111111", "root", time.Now().Unix())
|
||||
startTimestamp := a.currentSegment.startTimestamp
|
||||
startTimestamp := a.GetStartTimestamp()
|
||||
|
||||
// This kicks off an asynchronous delete
|
||||
a.SetConfig(ctx, activityConfig{
|
||||
|
@ -1921,15 +1859,15 @@ func TestActivityLog_SaveAfterDisable(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
path := logPrefix + "entity/0/0"
|
||||
path := ActivityLogPrefix + "entity/0/0"
|
||||
expectMissingSegment(t, core, path)
|
||||
|
||||
path = fmt.Sprintf("%ventity/%v/0", logPrefix, startTimestamp)
|
||||
path = fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, startTimestamp)
|
||||
expectMissingSegment(t, core, path)
|
||||
}
|
||||
|
||||
func TestActivityLog_Precompute(t *testing.T) {
|
||||
SkipAtEndOfMonth(t)
|
||||
timeutil.SkipAtEndOfMonth(t)
|
||||
|
||||
january := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
august := time.Date(2020, 8, 15, 12, 0, 0, 0, time.UTC)
|
||||
|
@ -2015,8 +1953,8 @@ func TestActivityLog_Precompute(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
path := fmt.Sprintf("%ventity/%v/%v", logPrefix, segment.StartTime, segment.Segment)
|
||||
writeToStorage(t, core, path, data)
|
||||
path := fmt.Sprintf("%ventity/%v/%v", ActivityLogPrefix, segment.StartTime, segment.Segment)
|
||||
WriteToStorage(t, core, path, data)
|
||||
}
|
||||
|
||||
expectedCounts := []struct {
|
||||
|
@ -2164,12 +2102,10 @@ func TestActivityLog_Precompute(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
writeToStorage(t, core, "sys/counters/activity/endofmonth", data)
|
||||
WriteToStorage(t, core, "sys/counters/activity/endofmonth", data)
|
||||
|
||||
// Pretend we've successfully rolled over to the following month
|
||||
a.l.Lock()
|
||||
a.currentSegment.startTimestamp = tc.NextMonth
|
||||
a.l.Unlock()
|
||||
a.SetStartTimestamp(tc.NextMonth)
|
||||
|
||||
err = a.precomputedQueryWorker()
|
||||
if err != nil {
|
||||
|
@ -2237,7 +2173,7 @@ func TestActivityLog_PrecomputeCancel(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestActivityLog_NextMonthStart(t *testing.T) {
|
||||
SkipAtEndOfMonth(t)
|
||||
timeutil.SkipAtEndOfMonth(t)
|
||||
|
||||
now := time.Now().UTC()
|
||||
year, month, _ := now.Date()
|
||||
|
@ -2266,9 +2202,7 @@ func TestActivityLog_NextMonthStart(t *testing.T) {
|
|||
|
||||
for _, tc := range testCases {
|
||||
t.Logf("segmentStart=%v", tc.SegmentStart)
|
||||
a.l.Lock()
|
||||
a.currentSegment.startTimestamp = tc.SegmentStart
|
||||
a.l.Unlock()
|
||||
a.SetStartTimestamp(tc.SegmentStart)
|
||||
|
||||
actual := a.StartOfNextMonth()
|
||||
if !actual.Equal(tc.ExpectedTime) {
|
||||
|
@ -2278,7 +2212,7 @@ func TestActivityLog_NextMonthStart(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestActivityLog_Deletion(t *testing.T) {
|
||||
SkipAtEndOfMonth(t)
|
||||
timeutil.SkipAtEndOfMonth(t)
|
||||
|
||||
core, _, _ := TestCoreUnsealed(t)
|
||||
a := core.activityLog
|
||||
|
@ -2314,20 +2248,20 @@ func TestActivityLog_Deletion(t *testing.T) {
|
|||
for i, start := range times {
|
||||
// no entities in some months, just for fun
|
||||
for j := 0; j < (i+3)%5; j++ {
|
||||
entityPath := fmt.Sprintf("%ventity/%v/%v", logPrefix, start.Unix(), j)
|
||||
entityPath := fmt.Sprintf("%ventity/%v/%v", ActivityLogPrefix, start.Unix(), j)
|
||||
paths[i] = append(paths[i], entityPath)
|
||||
writeToStorage(t, core, entityPath, []byte("test"))
|
||||
WriteToStorage(t, core, entityPath, []byte("test"))
|
||||
}
|
||||
tokenPath := fmt.Sprintf("%vdirecttokens/%v/0", logPrefix, start.Unix())
|
||||
tokenPath := fmt.Sprintf("%vdirecttokens/%v/0", ActivityLogPrefix, start.Unix())
|
||||
paths[i] = append(paths[i], tokenPath)
|
||||
writeToStorage(t, core, tokenPath, []byte("test"))
|
||||
WriteToStorage(t, core, tokenPath, []byte("test"))
|
||||
|
||||
// No queries for November yet
|
||||
if i < novIndex {
|
||||
for _, endTime := range times[i+1 : novIndex] {
|
||||
queryPath := fmt.Sprintf("sys/counters/activity/queries/%v/%v", start.Unix(), endTime.Unix())
|
||||
paths[i] = append(paths[i], queryPath)
|
||||
writeToStorage(t, core, queryPath, []byte("test"))
|
||||
WriteToStorage(t, core, queryPath, []byte("test"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
package vault
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"github.com/hashicorp/vault/vault/activity"
|
||||
)
|
||||
|
||||
// Return the in-memory activeEntities from an activity log
|
||||
func (c *Core) GetActiveEntities() map[string]struct{} {
|
||||
out := make(map[string]struct{})
|
||||
|
||||
c.stateLock.RLock()
|
||||
c.activityLog.fragmentLock.RLock()
|
||||
for k, v := range c.activityLog.activeEntities {
|
||||
out[k] = v
|
||||
}
|
||||
c.activityLog.fragmentLock.RUnlock()
|
||||
c.stateLock.RUnlock()
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
// GetCurrentEntities returns the current entity activity log
|
||||
func (a *ActivityLog) GetCurrentEntities() *activity.EntityActivityLog {
|
||||
a.l.RLock()
|
||||
defer a.l.RUnlock()
|
||||
return a.currentSegment.currentEntities
|
||||
}
|
||||
|
||||
// WriteToStorage is used to put entity data in storage
|
||||
// `path` should be the complete path (not relative to the view)
|
||||
func WriteToStorage(t *testing.T, c *Core, path string, data []byte) {
|
||||
t.Helper()
|
||||
err := c.barrier.Put(context.Background(), &logical.StorageEntry{
|
||||
Key: path,
|
||||
Value: data,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to write %s\nto %s\nerror: %v", data, path, err)
|
||||
}
|
||||
}
|
||||
|
||||
// SetStandbyEnable sets enabled on a performance standby (using config)
|
||||
func (a *ActivityLog) SetStandbyEnable(ctx context.Context, enabled bool) {
|
||||
var enableStr string
|
||||
if enabled {
|
||||
enableStr = "enable"
|
||||
} else {
|
||||
enableStr = "disable"
|
||||
}
|
||||
|
||||
// TODO only patch enabled?
|
||||
a.SetConfigStandby(ctx, activityConfig{
|
||||
DefaultReportMonths: 12,
|
||||
RetentionMonths: 24,
|
||||
Enabled: enableStr,
|
||||
})
|
||||
}
|
||||
|
||||
// ExpectCurrentSegmentRefreshed verifies that the current segment has been refreshed
|
||||
// non-nil empty components and updated with the `expectedStart` timestamp
|
||||
// Note: if `verifyTimeNotZero` is true, ignore `expectedStart` and just make sure the timestamp isn't 0
|
||||
func (a *ActivityLog) ExpectCurrentSegmentRefreshed(t *testing.T, expectedStart int64, verifyTimeNotZero bool) {
|
||||
t.Helper()
|
||||
|
||||
a.l.RLock()
|
||||
defer a.l.RUnlock()
|
||||
a.fragmentLock.RLock()
|
||||
defer a.fragmentLock.RUnlock()
|
||||
if a.currentSegment.currentEntities == nil {
|
||||
t.Fatalf("expected non-nil currentSegment.currentEntities")
|
||||
}
|
||||
if a.currentSegment.currentEntities.Entities == nil {
|
||||
t.Errorf("expected non-nil currentSegment.currentEntities.Entities")
|
||||
}
|
||||
if a.activeEntities == nil {
|
||||
t.Errorf("expected non-nil activeEntities")
|
||||
}
|
||||
if a.currentSegment.tokenCount == nil {
|
||||
t.Fatalf("expected non-nil currentSegment.tokenCount")
|
||||
}
|
||||
if a.currentSegment.tokenCount.CountByNamespaceID == nil {
|
||||
t.Errorf("expected non-nil currentSegment.tokenCount.CountByNamespaceID")
|
||||
}
|
||||
|
||||
if len(a.currentSegment.currentEntities.Entities) > 0 {
|
||||
t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentSegment.currentEntities)
|
||||
}
|
||||
if len(a.activeEntities) > 0 {
|
||||
t.Errorf("expected no active entity segment to be loaded. got: %v", a.activeEntities)
|
||||
}
|
||||
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 {
|
||||
t.Errorf("expected no token counts to be loaded. got: %v", a.currentSegment.tokenCount.CountByNamespaceID)
|
||||
}
|
||||
|
||||
if verifyTimeNotZero {
|
||||
if a.currentSegment.startTimestamp == 0 {
|
||||
t.Error("bad start timestamp. expected no reset but timestamp was reset")
|
||||
}
|
||||
} else if a.currentSegment.startTimestamp != expectedStart {
|
||||
t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentSegment.startTimestamp)
|
||||
}
|
||||
}
|
||||
|
||||
// ActiveEntitiesEqual checks that only the set of `test` exists in `active`
|
||||
func ActiveEntitiesEqual(active map[string]struct{}, test []*activity.EntityRecord) bool {
|
||||
if len(active) != len(test) {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, ent := range test {
|
||||
if _, ok := active[ent.EntityID]; !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// GetStartTimestamp returns the start timestamp on an activity log
|
||||
func (a *ActivityLog) GetStartTimestamp() int64 {
|
||||
a.l.RLock()
|
||||
defer a.l.RUnlock()
|
||||
return a.currentSegment.startTimestamp
|
||||
}
|
||||
|
||||
// SetStartTimestamp sets the start timestamp on an activity log
|
||||
func (a *ActivityLog) SetStartTimestamp(timestamp int64) {
|
||||
a.l.Lock()
|
||||
defer a.l.Unlock()
|
||||
a.currentSegment.startTimestamp = timestamp
|
||||
}
|
||||
|
||||
// SetTokenCount sets the tokenCount on an activity log
|
||||
func (a *ActivityLog) SetTokenCount(tokenCount *activity.TokenCount) {
|
||||
a.l.Lock()
|
||||
defer a.l.Unlock()
|
||||
a.currentSegment.tokenCount = tokenCount
|
||||
}
|
||||
|
||||
// GetCountByNamespaceID returns the count of tokens by namespace ID
|
||||
func (a *ActivityLog) GetCountByNamespaceID() map[string]uint64 {
|
||||
a.l.RLock()
|
||||
defer a.l.RUnlock()
|
||||
return a.currentSegment.tokenCount.CountByNamespaceID
|
||||
}
|
||||
|
||||
// GetEntitySequenceNumber returns the current entity sequence number
|
||||
func (a *ActivityLog) GetEntitySequenceNumber() uint64 {
|
||||
a.l.RLock()
|
||||
defer a.l.RUnlock()
|
||||
return a.currentSegment.entitySequenceNumber
|
||||
}
|
||||
|
||||
// SetEnable sets the enabled flag on the activity log
|
||||
func (a *ActivityLog) SetEnable(enabled bool) {
|
||||
a.l.Lock()
|
||||
defer a.l.Unlock()
|
||||
a.fragmentLock.Lock()
|
||||
defer a.fragmentLock.Unlock()
|
||||
a.enabled = enabled
|
||||
}
|
|
@ -1926,7 +1926,9 @@ func (s standardUnsealStrategy) unseal(ctx context.Context, logger log.Logger, c
|
|||
if err := c.setupAuditedHeadersConfig(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.setupActivityLog(ctx); err != nil {
|
||||
// not waiting on wg to avoid changing existing behavior
|
||||
var wg sync.WaitGroup
|
||||
if err := c.setupActivityLog(ctx, &wg); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue