diff --git a/changelog/15352.txt b/changelog/15352.txt new file mode 100644 index 000000000..c9ce4c04d --- /dev/null +++ b/changelog/15352.txt @@ -0,0 +1,3 @@ +```release-note:improvement +core/activity: allow client counts to be precomputed and queried on non-contiguous chunks of data +``` \ No newline at end of file diff --git a/command/server/config_test_helpers.go b/command/server/config_test_helpers.go index 06bb3a432..27921c70c 100644 --- a/command/server/config_test_helpers.go +++ b/command/server/config_test_helpers.go @@ -476,6 +476,16 @@ func testLoadConfigFile(t *testing.T) { } } +func testUnknownFieldValidationStorageAndListener(t *testing.T) { + config, err := LoadConfigFile("./test-fixtures/storage-listener-config.json") + if err != nil { + t.Fatalf("err: %s", err) + } + if len(config.UnusedKeys) != 0 { + t.Fatalf("unused keys for valid config are %+v\n", config.UnusedKeys) + } +} + func testUnknownFieldValidation(t *testing.T) { config, err := LoadConfigFile("./test-fixtures/config.hcl") if err != nil { @@ -525,16 +535,6 @@ func testUnknownFieldValidation(t *testing.T) { } } -func testUnknownFieldValidationStorageAndListener(t *testing.T) { - config, err := LoadConfigFile("./test-fixtures/storage-listener-config.json") - if err != nil { - t.Fatalf("err: %s", err) - } - if len(config.UnusedKeys) != 0 { - t.Fatalf("unused keys for valid config are %+v\n", config.UnusedKeys) - } -} - func testLoadConfigFile_json(t *testing.T) { config, err := LoadConfigFile("./test-fixtures/config.hcl.json") if err != nil { diff --git a/command/server/test-fixtures/storage-listener-config.json b/command/server/test-fixtures/storage-listener-config.json index 30d25ab4e..505c3b077 100644 --- a/command/server/test-fixtures/storage-listener-config.json +++ b/command/server/test-fixtures/storage-listener-config.json @@ -1,17 +1,17 @@ { - "api_addr": "https://localhost:8200", - "default_lease_ttl": "6h", - "disable_mlock": true, - "listener": { - "tcp": { - "address": "0.0.0.0:8200" - } - }, - "log_level": "info", - "storage": { - "consul": { - "address": "127.0.0.1:8500" - } - }, - "ui": true - } \ No newline at end of file + "api_addr": "https://localhost:8200", + "default_lease_ttl": "6h", + "disable_mlock": true, + "listener": { + "tcp": { + "address": "0.0.0.0:8200" + } + }, + "log_level": "info", + "storage": { + "consul": { + "address": "127.0.0.1:8500" + } + }, + "ui": true +} \ No newline at end of file diff --git a/vault/activity/query.go b/vault/activity/query.go index 9b15f8024..6d20e9867 100644 --- a/vault/activity/query.go +++ b/vault/activity/query.go @@ -134,6 +134,30 @@ func (s *PrecomputedQueryStore) listEndTimes(ctx context.Context, startTime time return endTimes, nil } +func (s *PrecomputedQueryStore) getMaxEndTime(ctx context.Context, startTime time.Time, endTimeBound time.Time) (time.Time, error) { + rawEndTimes, err := s.view.List(ctx, fmt.Sprintf("%v/", startTime.Unix())) + if err != nil { + return time.Time{}, err + } + + maxEndTime := time.Time{} + for _, raw := range rawEndTimes { + val, err := strconv.ParseInt(raw, 10, 64) + if err != nil { + s.logger.Warn("could not parse precomputed query end time", "key", raw) + continue + } + endTime := time.Unix(val, 0).UTC() + s.logger.Trace("end time in consideration is", "end time", endTime, "end time bound", endTimeBound) + if endTime.After(maxEndTime) && !endTime.After(endTimeBound) { + s.logger.Trace("end time has been updated") + maxEndTime = endTime + } + + } + return maxEndTime, nil +} + func (s *PrecomputedQueryStore) QueriesAvailable(ctx context.Context) (bool, error) { startTimes, err := s.listStartTimes(ctx) if err != nil { @@ -181,7 +205,7 @@ func (s *PrecomputedQueryStore) Get(ctx context.Context, startTime, endTime time } s.logger.Trace("retrieved start times from storage", "startTimes", startTimes) - filteredList := make([]time.Time, 0, len(startTimes)) + filteredList := make([]time.Time, 0) for _, t := range startTimes { if timeutil.InRange(t, startTime, endTime) { filteredList = append(filteredList, t) @@ -196,42 +220,45 @@ func (s *PrecomputedQueryStore) Get(ctx context.Context, startTime, endTime time sort.Slice(filteredList, func(i, j int) bool { return filteredList[i].After(filteredList[j]) }) - contiguous := timeutil.GetMostRecentContiguousMonths(filteredList) - actualStartTime := contiguous[len(contiguous)-1] - s.logger.Trace("chose start time", "actualStartTime", actualStartTime, "contiguous", contiguous) - - endTimes, err := s.listEndTimes(ctx, actualStartTime) - if err != nil { - return nil, err - } - s.logger.Trace("retrieved end times from storage", "endTimes", endTimes) - - // Might happen if there's a race with GC - if len(endTimes) == 0 { - s.logger.Warn("missing end times", "start time", actualStartTime) - return nil, nil - } - var actualEndTime time.Time - for _, t := range endTimes { - if timeutil.InRange(t, startTime, endTime) { - if actualEndTime.IsZero() || t.After(actualEndTime) { - actualEndTime = t - } + closestStartTime := time.Time{} + closestEndTime := time.Time{} + maxTimeDifference := time.Duration(0) + for i := len(filteredList) - 1; i >= 0; i-- { + testStartTime := filteredList[i] + s.logger.Trace("trying test start times", "startTime", testStartTime, "filteredList", filteredList) + testEndTime, err := s.getMaxEndTime(ctx, testStartTime, endTime) + if err != nil { + return nil, err + } + if testEndTime.IsZero() { + // Might happen if there's a race with GC + s.logger.Warn("missing end times", "start time", testStartTime) + continue + } + s.logger.Trace("retrieved max end time from storage", "endTime", testEndTime) + diff := testEndTime.Sub(testStartTime) + if diff >= maxTimeDifference { + closestStartTime = testStartTime + closestEndTime = testEndTime + maxTimeDifference = diff + s.logger.Trace("updating closest times") } } - if actualEndTime.IsZero() { - s.logger.Warn("no end time in range", "start time", actualStartTime) + s.logger.Trace("chose start end end times", "startTime", closestStartTime, "endTime") + + if closestStartTime.IsZero() || closestEndTime.IsZero() { + s.logger.Warn("no start or end time in range", "start time", closestStartTime, "end time", closestEndTime) return nil, nil } - path := fmt.Sprintf("%v/%v", actualStartTime.Unix(), actualEndTime.Unix()) + path := fmt.Sprintf("%v/%v", closestStartTime.Unix(), closestEndTime.Unix()) entry, err := s.view.Get(ctx, path) if err != nil { return nil, err } if entry == nil { - s.logger.Warn("no end time entry found", "start time", actualStartTime, "end time", actualEndTime) + s.logger.Warn("no end time entry found", "start time", closestStartTime, "end time", closestEndTime) return nil, nil } diff --git a/vault/activity_log.go b/vault/activity_log.go index d319f48a6..68fb88958 100644 --- a/vault/activity_log.go +++ b/vault/activity_log.go @@ -483,6 +483,23 @@ func (a *ActivityLog) getMostRecentActivityLogSegment(ctx context.Context) ([]ti return timeutil.GetMostRecentContiguousMonths(logTimes), nil } +// getMostRecentActivityLogSegment gets the times (in UTC) associated with the most recent +// contiguous set of activity logs, sorted in decreasing order (latest to earliest) +func (a *ActivityLog) getMostRecentNonContiguousActivityLogSegments(ctx context.Context) ([]time.Time, error) { + logTimes, err := a.availableLogs(ctx) + if err != nil { + return nil, err + } + if len(logTimes) <= 12 { + return logTimes, nil + } + contiguousMonths := timeutil.GetMostRecentContiguousMonths(logTimes) + if len(contiguousMonths) >= 12 { + return contiguousMonths, nil + } + return logTimes[:12], nil +} + // getLastEntitySegmentNumber returns the (non-negative) last segment number for the :startTime:, if it exists func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime time.Time) (uint64, bool, error) { p, err := a.view.List(ctx, activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/") @@ -2015,7 +2032,7 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error { lastMonth := intent.PreviousMonth a.logger.Info("computing queries", "month", time.Unix(lastMonth, 0).UTC()) - times, err := a.getMostRecentActivityLogSegment(ctx) + times, err := a.getMostRecentNonContiguousActivityLogSegments(ctx) if err != nil { a.logger.Warn("could not list recent segments", "error", err) return err diff --git a/vault/activity_log_test.go b/vault/activity_log_test.go index 85da89a68..044433cd5 100644 --- a/vault/activity_log_test.go +++ b/vault/activity_log_test.go @@ -2357,11 +2357,11 @@ func TestActivityLog_CalculatePrecomputedQueriesWithMixedTWEs(t *testing.T) { "deleted-ccccc", 5.0, }, - // august-september values + // january-september values { "identity.nonentity.active.reporting_period", "root", - 1220.0, + 1223.0, }, { "identity.nonentity.active.reporting_period", @@ -2399,7 +2399,7 @@ func TestActivityLog_CalculatePrecomputedQueriesWithMixedTWEs(t *testing.T) { } } if !found { - t.Errorf("No guage found for %v %v", + t.Errorf("No gauge found for %v %v", g.Name, g.NamespaceLabel) } } @@ -2776,6 +2776,187 @@ func TestActivityLog_Precompute(t *testing.T) { } } +// TestActivityLog_Precompute_SkipMonth will put two non-contiguous chunks of +// data in the activity log, and then run precomputedQueryWorker. Finally it +// will perform a query get over the skip month and expect a query for the entire +// time segment (non-contiguous) +func TestActivityLog_Precompute_SkipMonth(t *testing.T) { + timeutil.SkipAtEndOfMonth(t) + + august := time.Date(2020, 8, 15, 12, 0, 0, 0, time.UTC) + september := timeutil.StartOfMonth(time.Date(2020, 9, 1, 0, 0, 0, 0, time.UTC)) + october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC)) + november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC)) + december := timeutil.StartOfMonth(time.Date(2020, 12, 1, 0, 0, 0, 0, time.UTC)) + + core, _, _, _ := TestCoreUnsealedWithMetrics(t) + a := core.activityLog + ctx := namespace.RootContext(nil) + + entityRecords := make([]*activity.EntityRecord, 45) + + for i := range entityRecords { + entityRecords[i] = &activity.EntityRecord{ + ClientID: fmt.Sprintf("111122222-3333-4444-5555-%012v", i), + NamespaceID: "root", + Timestamp: time.Now().Unix(), + } + } + + toInsert := []struct { + StartTime int64 + Segment uint64 + Clients []*activity.EntityRecord + }{ + { + august.Unix(), + 0, + entityRecords[:20], + }, + { + september.Unix(), + 0, + entityRecords[20:30], + }, + { + november.Unix(), + 0, + entityRecords[30:45], + }, + } + + // Note that precomputedQuery worker doesn't filter + // for times <= the one it was asked to do. Is that a problem? + // Here, it means that we can't insert everything *first* and do multiple + // test cases, we have to write logs incrementally. + doInsert := func(i int) { + t.Helper() + segment := toInsert[i] + eal := &activity.EntityActivityLog{ + Clients: segment.Clients, + } + data, err := proto.Marshal(eal) + if err != nil { + t.Fatal(err) + } + path := fmt.Sprintf("%ventity/%v/%v", ActivityLogPrefix, segment.StartTime, segment.Segment) + WriteToStorage(t, core, path, data) + } + + expectedCounts := []struct { + StartTime time.Time + EndTime time.Time + ByNamespace map[string]int + }{ + // First test case + { + august, + timeutil.EndOfMonth(september), + map[string]int{ + "root": 30, + }, + }, + // Second test case + { + august, + timeutil.EndOfMonth(november), + map[string]int{ + "root": 45, + }, + }, + } + + checkPrecomputedQuery := func(i int) { + t.Helper() + pq, err := a.queryStore.Get(ctx, expectedCounts[i].StartTime, expectedCounts[i].EndTime) + if err != nil { + t.Fatal(err) + } + if pq == nil { + t.Errorf("empty result for %v -- %v", expectedCounts[i].StartTime, expectedCounts[i].EndTime) + } + if len(pq.Namespaces) != len(expectedCounts[i].ByNamespace) { + t.Errorf("mismatched number of namespaces, expected %v got %v", + len(expectedCounts[i].ByNamespace), len(pq.Namespaces)) + } + for _, nsRecord := range pq.Namespaces { + val, ok := expectedCounts[i].ByNamespace[nsRecord.NamespaceID] + if !ok { + t.Errorf("unexpected namespace %v", nsRecord.NamespaceID) + continue + } + if uint64(val) != nsRecord.Entities { + t.Errorf("wrong number of entities in %v: expected %v, got %v", + nsRecord.NamespaceID, val, nsRecord.Entities) + } + } + if !pq.StartTime.Equal(expectedCounts[i].StartTime) { + t.Errorf("mismatched start time: expected %v got %v", + expectedCounts[i].StartTime, pq.StartTime) + } + if !pq.EndTime.Equal(expectedCounts[i].EndTime) { + t.Errorf("mismatched end time: expected %v got %v", + expectedCounts[i].EndTime, pq.EndTime) + } + } + + testCases := []struct { + InsertUpTo int // index in the toInsert array + PrevMonth int64 + NextMonth int64 + ExpectedUpTo int // index in the expectedCounts array + }{ + { + 1, + september.Unix(), + october.Unix(), + 0, + }, + { + 2, + november.Unix(), + december.Unix(), + 1, + }, + } + + inserted := -1 + for _, tc := range testCases { + t.Logf("tc %+v", tc) + + // Persists across loops + for inserted < tc.InsertUpTo { + inserted += 1 + t.Logf("inserting segment %v", inserted) + doInsert(inserted) + } + + intent := &ActivityIntentLog{ + PreviousMonth: tc.PrevMonth, + NextMonth: tc.NextMonth, + } + data, err := json.Marshal(intent) + if err != nil { + t.Fatal(err) + } + WriteToStorage(t, core, "sys/counters/activity/endofmonth", data) + + // Pretend we've successfully rolled over to the following month + a.SetStartTimestamp(tc.NextMonth) + + err = a.precomputedQueryWorker(ctx) + if err != nil { + t.Fatal(err) + } + + expectMissingSegment(t, core, "sys/counters/activity/endofmonth") + + for i := 0; i <= tc.ExpectedUpTo; i++ { + checkPrecomputedQuery(i) + } + } +} + // TestActivityLog_PrecomputeNonEntityTokensWithID is the same test as // TestActivityLog_Precompute, except all the clients are tokens without // entities. This ensures the deduplication logic and separation logic between