Query and Precompute Non-Contiguous Segments in the Activity Log (#15352)

* query and precompute non-contiguous segments in the activity log

* changelog

* newline formatting

* make fmt

* report listener and storage types as found keys

* report listener and storage types as found keys

* Update vault/activity_log_test.go

Co-authored-by: Chris Capurso <1036769+ccapurso@users.noreply.github.com>

* review comments

* merge conflict

* merge conflict

* merge conflict

* fix unchecked merge conflict

Co-authored-by: Chris Capurso <1036769+ccapurso@users.noreply.github.com>
This commit is contained in:
Hridoy Roy 2022-05-17 12:17:32 -07:00 committed by GitHub
parent 24e8b73c73
commit 679ccc81a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 284 additions and 56 deletions

3
changelog/15352.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
core/activity: allow client counts to be precomputed and queried on non-contiguous chunks of data
```

View File

@ -476,6 +476,16 @@ func testLoadConfigFile(t *testing.T) {
}
}
func testUnknownFieldValidationStorageAndListener(t *testing.T) {
config, err := LoadConfigFile("./test-fixtures/storage-listener-config.json")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(config.UnusedKeys) != 0 {
t.Fatalf("unused keys for valid config are %+v\n", config.UnusedKeys)
}
}
func testUnknownFieldValidation(t *testing.T) {
config, err := LoadConfigFile("./test-fixtures/config.hcl")
if err != nil {
@ -525,16 +535,6 @@ func testUnknownFieldValidation(t *testing.T) {
}
}
func testUnknownFieldValidationStorageAndListener(t *testing.T) {
config, err := LoadConfigFile("./test-fixtures/storage-listener-config.json")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(config.UnusedKeys) != 0 {
t.Fatalf("unused keys for valid config are %+v\n", config.UnusedKeys)
}
}
func testLoadConfigFile_json(t *testing.T) {
config, err := LoadConfigFile("./test-fixtures/config.hcl.json")
if err != nil {

View File

@ -1,17 +1,17 @@
{
"api_addr": "https://localhost:8200",
"default_lease_ttl": "6h",
"disable_mlock": true,
"listener": {
"tcp": {
"address": "0.0.0.0:8200"
}
},
"log_level": "info",
"storage": {
"consul": {
"address": "127.0.0.1:8500"
}
},
"ui": true
}
"api_addr": "https://localhost:8200",
"default_lease_ttl": "6h",
"disable_mlock": true,
"listener": {
"tcp": {
"address": "0.0.0.0:8200"
}
},
"log_level": "info",
"storage": {
"consul": {
"address": "127.0.0.1:8500"
}
},
"ui": true
}

View File

@ -134,6 +134,30 @@ func (s *PrecomputedQueryStore) listEndTimes(ctx context.Context, startTime time
return endTimes, nil
}
func (s *PrecomputedQueryStore) getMaxEndTime(ctx context.Context, startTime time.Time, endTimeBound time.Time) (time.Time, error) {
rawEndTimes, err := s.view.List(ctx, fmt.Sprintf("%v/", startTime.Unix()))
if err != nil {
return time.Time{}, err
}
maxEndTime := time.Time{}
for _, raw := range rawEndTimes {
val, err := strconv.ParseInt(raw, 10, 64)
if err != nil {
s.logger.Warn("could not parse precomputed query end time", "key", raw)
continue
}
endTime := time.Unix(val, 0).UTC()
s.logger.Trace("end time in consideration is", "end time", endTime, "end time bound", endTimeBound)
if endTime.After(maxEndTime) && !endTime.After(endTimeBound) {
s.logger.Trace("end time has been updated")
maxEndTime = endTime
}
}
return maxEndTime, nil
}
func (s *PrecomputedQueryStore) QueriesAvailable(ctx context.Context) (bool, error) {
startTimes, err := s.listStartTimes(ctx)
if err != nil {
@ -181,7 +205,7 @@ func (s *PrecomputedQueryStore) Get(ctx context.Context, startTime, endTime time
}
s.logger.Trace("retrieved start times from storage", "startTimes", startTimes)
filteredList := make([]time.Time, 0, len(startTimes))
filteredList := make([]time.Time, 0)
for _, t := range startTimes {
if timeutil.InRange(t, startTime, endTime) {
filteredList = append(filteredList, t)
@ -196,42 +220,45 @@ func (s *PrecomputedQueryStore) Get(ctx context.Context, startTime, endTime time
sort.Slice(filteredList, func(i, j int) bool {
return filteredList[i].After(filteredList[j])
})
contiguous := timeutil.GetMostRecentContiguousMonths(filteredList)
actualStartTime := contiguous[len(contiguous)-1]
s.logger.Trace("chose start time", "actualStartTime", actualStartTime, "contiguous", contiguous)
endTimes, err := s.listEndTimes(ctx, actualStartTime)
if err != nil {
return nil, err
}
s.logger.Trace("retrieved end times from storage", "endTimes", endTimes)
// Might happen if there's a race with GC
if len(endTimes) == 0 {
s.logger.Warn("missing end times", "start time", actualStartTime)
return nil, nil
}
var actualEndTime time.Time
for _, t := range endTimes {
if timeutil.InRange(t, startTime, endTime) {
if actualEndTime.IsZero() || t.After(actualEndTime) {
actualEndTime = t
}
closestStartTime := time.Time{}
closestEndTime := time.Time{}
maxTimeDifference := time.Duration(0)
for i := len(filteredList) - 1; i >= 0; i-- {
testStartTime := filteredList[i]
s.logger.Trace("trying test start times", "startTime", testStartTime, "filteredList", filteredList)
testEndTime, err := s.getMaxEndTime(ctx, testStartTime, endTime)
if err != nil {
return nil, err
}
if testEndTime.IsZero() {
// Might happen if there's a race with GC
s.logger.Warn("missing end times", "start time", testStartTime)
continue
}
s.logger.Trace("retrieved max end time from storage", "endTime", testEndTime)
diff := testEndTime.Sub(testStartTime)
if diff >= maxTimeDifference {
closestStartTime = testStartTime
closestEndTime = testEndTime
maxTimeDifference = diff
s.logger.Trace("updating closest times")
}
}
if actualEndTime.IsZero() {
s.logger.Warn("no end time in range", "start time", actualStartTime)
s.logger.Trace("chose start end end times", "startTime", closestStartTime, "endTime")
if closestStartTime.IsZero() || closestEndTime.IsZero() {
s.logger.Warn("no start or end time in range", "start time", closestStartTime, "end time", closestEndTime)
return nil, nil
}
path := fmt.Sprintf("%v/%v", actualStartTime.Unix(), actualEndTime.Unix())
path := fmt.Sprintf("%v/%v", closestStartTime.Unix(), closestEndTime.Unix())
entry, err := s.view.Get(ctx, path)
if err != nil {
return nil, err
}
if entry == nil {
s.logger.Warn("no end time entry found", "start time", actualStartTime, "end time", actualEndTime)
s.logger.Warn("no end time entry found", "start time", closestStartTime, "end time", closestEndTime)
return nil, nil
}

View File

@ -483,6 +483,23 @@ func (a *ActivityLog) getMostRecentActivityLogSegment(ctx context.Context) ([]ti
return timeutil.GetMostRecentContiguousMonths(logTimes), nil
}
// getMostRecentActivityLogSegment gets the times (in UTC) associated with the most recent
// contiguous set of activity logs, sorted in decreasing order (latest to earliest)
func (a *ActivityLog) getMostRecentNonContiguousActivityLogSegments(ctx context.Context) ([]time.Time, error) {
logTimes, err := a.availableLogs(ctx)
if err != nil {
return nil, err
}
if len(logTimes) <= 12 {
return logTimes, nil
}
contiguousMonths := timeutil.GetMostRecentContiguousMonths(logTimes)
if len(contiguousMonths) >= 12 {
return contiguousMonths, nil
}
return logTimes[:12], nil
}
// getLastEntitySegmentNumber returns the (non-negative) last segment number for the :startTime:, if it exists
func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime time.Time) (uint64, bool, error) {
p, err := a.view.List(ctx, activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/")
@ -2015,7 +2032,7 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
lastMonth := intent.PreviousMonth
a.logger.Info("computing queries", "month", time.Unix(lastMonth, 0).UTC())
times, err := a.getMostRecentActivityLogSegment(ctx)
times, err := a.getMostRecentNonContiguousActivityLogSegments(ctx)
if err != nil {
a.logger.Warn("could not list recent segments", "error", err)
return err

View File

@ -2357,11 +2357,11 @@ func TestActivityLog_CalculatePrecomputedQueriesWithMixedTWEs(t *testing.T) {
"deleted-ccccc",
5.0,
},
// august-september values
// january-september values
{
"identity.nonentity.active.reporting_period",
"root",
1220.0,
1223.0,
},
{
"identity.nonentity.active.reporting_period",
@ -2399,7 +2399,7 @@ func TestActivityLog_CalculatePrecomputedQueriesWithMixedTWEs(t *testing.T) {
}
}
if !found {
t.Errorf("No guage found for %v %v",
t.Errorf("No gauge found for %v %v",
g.Name, g.NamespaceLabel)
}
}
@ -2776,6 +2776,187 @@ func TestActivityLog_Precompute(t *testing.T) {
}
}
// TestActivityLog_Precompute_SkipMonth will put two non-contiguous chunks of
// data in the activity log, and then run precomputedQueryWorker. Finally it
// will perform a query get over the skip month and expect a query for the entire
// time segment (non-contiguous)
func TestActivityLog_Precompute_SkipMonth(t *testing.T) {
timeutil.SkipAtEndOfMonth(t)
august := time.Date(2020, 8, 15, 12, 0, 0, 0, time.UTC)
september := timeutil.StartOfMonth(time.Date(2020, 9, 1, 0, 0, 0, 0, time.UTC))
october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC))
november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC))
december := timeutil.StartOfMonth(time.Date(2020, 12, 1, 0, 0, 0, 0, time.UTC))
core, _, _, _ := TestCoreUnsealedWithMetrics(t)
a := core.activityLog
ctx := namespace.RootContext(nil)
entityRecords := make([]*activity.EntityRecord, 45)
for i := range entityRecords {
entityRecords[i] = &activity.EntityRecord{
ClientID: fmt.Sprintf("111122222-3333-4444-5555-%012v", i),
NamespaceID: "root",
Timestamp: time.Now().Unix(),
}
}
toInsert := []struct {
StartTime int64
Segment uint64
Clients []*activity.EntityRecord
}{
{
august.Unix(),
0,
entityRecords[:20],
},
{
september.Unix(),
0,
entityRecords[20:30],
},
{
november.Unix(),
0,
entityRecords[30:45],
},
}
// Note that precomputedQuery worker doesn't filter
// for times <= the one it was asked to do. Is that a problem?
// Here, it means that we can't insert everything *first* and do multiple
// test cases, we have to write logs incrementally.
doInsert := func(i int) {
t.Helper()
segment := toInsert[i]
eal := &activity.EntityActivityLog{
Clients: segment.Clients,
}
data, err := proto.Marshal(eal)
if err != nil {
t.Fatal(err)
}
path := fmt.Sprintf("%ventity/%v/%v", ActivityLogPrefix, segment.StartTime, segment.Segment)
WriteToStorage(t, core, path, data)
}
expectedCounts := []struct {
StartTime time.Time
EndTime time.Time
ByNamespace map[string]int
}{
// First test case
{
august,
timeutil.EndOfMonth(september),
map[string]int{
"root": 30,
},
},
// Second test case
{
august,
timeutil.EndOfMonth(november),
map[string]int{
"root": 45,
},
},
}
checkPrecomputedQuery := func(i int) {
t.Helper()
pq, err := a.queryStore.Get(ctx, expectedCounts[i].StartTime, expectedCounts[i].EndTime)
if err != nil {
t.Fatal(err)
}
if pq == nil {
t.Errorf("empty result for %v -- %v", expectedCounts[i].StartTime, expectedCounts[i].EndTime)
}
if len(pq.Namespaces) != len(expectedCounts[i].ByNamespace) {
t.Errorf("mismatched number of namespaces, expected %v got %v",
len(expectedCounts[i].ByNamespace), len(pq.Namespaces))
}
for _, nsRecord := range pq.Namespaces {
val, ok := expectedCounts[i].ByNamespace[nsRecord.NamespaceID]
if !ok {
t.Errorf("unexpected namespace %v", nsRecord.NamespaceID)
continue
}
if uint64(val) != nsRecord.Entities {
t.Errorf("wrong number of entities in %v: expected %v, got %v",
nsRecord.NamespaceID, val, nsRecord.Entities)
}
}
if !pq.StartTime.Equal(expectedCounts[i].StartTime) {
t.Errorf("mismatched start time: expected %v got %v",
expectedCounts[i].StartTime, pq.StartTime)
}
if !pq.EndTime.Equal(expectedCounts[i].EndTime) {
t.Errorf("mismatched end time: expected %v got %v",
expectedCounts[i].EndTime, pq.EndTime)
}
}
testCases := []struct {
InsertUpTo int // index in the toInsert array
PrevMonth int64
NextMonth int64
ExpectedUpTo int // index in the expectedCounts array
}{
{
1,
september.Unix(),
october.Unix(),
0,
},
{
2,
november.Unix(),
december.Unix(),
1,
},
}
inserted := -1
for _, tc := range testCases {
t.Logf("tc %+v", tc)
// Persists across loops
for inserted < tc.InsertUpTo {
inserted += 1
t.Logf("inserting segment %v", inserted)
doInsert(inserted)
}
intent := &ActivityIntentLog{
PreviousMonth: tc.PrevMonth,
NextMonth: tc.NextMonth,
}
data, err := json.Marshal(intent)
if err != nil {
t.Fatal(err)
}
WriteToStorage(t, core, "sys/counters/activity/endofmonth", data)
// Pretend we've successfully rolled over to the following month
a.SetStartTimestamp(tc.NextMonth)
err = a.precomputedQueryWorker(ctx)
if err != nil {
t.Fatal(err)
}
expectMissingSegment(t, core, "sys/counters/activity/endofmonth")
for i := 0; i <= tc.ExpectedUpTo; i++ {
checkPrecomputedQuery(i)
}
}
}
// TestActivityLog_PrecomputeNonEntityTokensWithID is the same test as
// TestActivityLog_Precompute, except all the clients are tokens without
// entities. This ensures the deduplication logic and separation logic between