From f14a039a6538004e30130a7b2ef35ddc331ad204 Mon Sep 17 00:00:00 2001 From: miagilepner Date: Tue, 16 May 2023 16:29:18 +0200 Subject: [PATCH] VAULT-14733: Split logic of precomputedQueryWorker (#20073) * split precomputed query worker and add unit tests * add new client delete method and test * add changelog * fixes from pr review * add missing comment * fix comparison --- changelog/20073.txt | 3 + vault/activity_log.go | 389 ++++++++++++++++++++++--------------- vault/activity_log_test.go | 378 +++++++++++++++++++++++++++++++++++ 3 files changed, 610 insertions(+), 160 deletions(-) create mode 100644 changelog/20073.txt diff --git a/changelog/20073.txt b/changelog/20073.txt new file mode 100644 index 000000000..10c21a58b --- /dev/null +++ b/changelog/20073.txt @@ -0,0 +1,3 @@ +```release-note:improvement +core/activity: refactor the activity log's generation of precomputed queries +``` \ No newline at end of file diff --git a/vault/activity_log.go b/vault/activity_log.go index 4fa0a8068..cf87287fa 100644 --- a/vault/activity_log.go +++ b/vault/activity_log.go @@ -1895,6 +1895,17 @@ func newProcessCounts() *processCounts { } } +func (p *processCounts) delete(client *activity.EntityRecord) { + if !p.contains(client) { + return + } + if client.NonEntity { + delete(p.NonEntities, client.ClientID) + } else { + delete(p.Entities, client.ClientID) + } +} + func (p *processCounts) add(client *activity.EntityRecord) { if client.NonEntity { p.NonEntities[client.ClientID] = struct{}{} @@ -1926,6 +1937,10 @@ func (p *processMount) add(client *activity.EntityRecord) { p.Counts.add(client) } +func (p *processMount) delete(client *activity.EntityRecord) { + p.Counts.delete(client) +} + func (s summaryByMount) add(client *activity.EntityRecord) { if _, present := s[client.MountAccessor]; !present { s[client.MountAccessor] = newProcessMount() @@ -1933,6 +1948,12 @@ func (s summaryByMount) add(client *activity.EntityRecord) { s[client.MountAccessor].add(client) } +func (s summaryByMount) delete(client *activity.EntityRecord) { + if m, present := s[client.MountAccessor]; present { + m.delete(client) + } +} + type processByNamespace struct { Counts *processCounts Mounts summaryByMount @@ -1950,6 +1971,11 @@ func (p *processByNamespace) add(client *activity.EntityRecord) { p.Mounts.add(client) } +func (p *processByNamespace) delete(client *activity.EntityRecord) { + p.Counts.delete(client) + p.Mounts.delete(client) +} + func (s summaryByNamespace) add(client *activity.EntityRecord) { if _, present := s[client.NamespaceID]; !present { s[client.NamespaceID] = newByNamespace() @@ -1957,6 +1983,12 @@ func (s summaryByNamespace) add(client *activity.EntityRecord) { s[client.NamespaceID].add(client) } +func (s summaryByNamespace) delete(client *activity.EntityRecord) { + if n, present := s[client.NamespaceID]; present { + n.delete(client) + } +} + type processNewClients struct { Counts *processCounts Namespaces summaryByNamespace @@ -1974,6 +2006,11 @@ func (p *processNewClients) add(client *activity.EntityRecord) { p.Namespaces.add(client) } +func (p *processNewClients) delete(client *activity.EntityRecord) { + p.Counts.delete(client) + p.Namespaces.delete(client) +} + type processMonth struct { Counts *processCounts Namespaces summaryByNamespace @@ -2009,6 +2046,185 @@ func processClientRecord(e *activity.EntityRecord, byNamespace summaryByNamespac byMonth.add(e, startTime) } +// handleEntitySegment processes the record and adds it to the correct month/ +// namespace breakdown maps, as well as to the hyperloglog for the month. New +// clients are deduplicated in opts.byMonth so that clients will only appear in +// the first month in which they are seen. +// This method must be called in reverse chronological order of the months (with +// the most recent month being called before previous months) +func (a *ActivityLog) handleEntitySegment(l *activity.EntityActivityLog, segmentTime time.Time, hll *hyperloglog.Sketch, opts pqOptions) error { + for _, e := range l.Clients { + + processClientRecord(e, opts.byNamespace, opts.byMonth, segmentTime) + hll.Insert([]byte(e.ClientID)) + + // step forward in time through the months to check if the client is + // present. If it is, delete it. This is because the client should only + // be reported as new in the earliest month that it was seen + finalMonth := timeutil.StartOfMonth(opts.activePeriodEnd).UTC() + for currMonth := timeutil.StartOfMonth(segmentTime).UTC(); currMonth.Before(finalMonth); currMonth = timeutil.StartOfNextMonth(currMonth).UTC() { + // Invalidate the client from being a new client in the next month + next := timeutil.StartOfNextMonth(currMonth).UTC().Unix() + if _, present := opts.byMonth[next]; present { + // delete from the new clients map for the next month + // this will handle deleting from the per-namespace and per-mount maps of NewClients + opts.byMonth[next].NewClients.delete(e) + } + } + } + + return nil +} + +// breakdownTokenSegment handles a TokenCount record, adding it to the namespace breakdown +func (a *ActivityLog) breakdownTokenSegment(l *activity.TokenCount, byNamespace map[string]*processByNamespace) { + for nsID, v := range l.CountByNamespaceID { + if _, present := byNamespace[nsID]; !present { + byNamespace[nsID] = newByNamespace() + } + byNamespace[nsID].Counts.Tokens += v + } +} + +func (a *ActivityLog) writePrecomputedQuery(ctx context.Context, segmentTime time.Time, opts pqOptions) error { + pq := &activity.PrecomputedQuery{ + StartTime: segmentTime, + EndTime: opts.endTime, + Namespaces: make([]*activity.NamespaceRecord, 0, len(opts.byNamespace)), + Months: make([]*activity.MonthRecord, 0, len(opts.byMonth)), + } + // this will transform the byMonth map into the correctly formatted protobuf + pq.Months = a.transformMonthBreakdowns(opts.byMonth) + + // the byNamespace map also needs to be transformed into a protobuf + for nsID, entry := range opts.byNamespace { + mountRecord := make([]*activity.MountRecord, 0, len(entry.Mounts)) + for mountAccessor, mountData := range entry.Mounts { + mountRecord = append(mountRecord, &activity.MountRecord{ + MountPath: a.mountAccessorToMountPath(mountAccessor), + Counts: &activity.CountsRecord{ + EntityClients: len(mountData.Counts.Entities), + NonEntityClients: int(mountData.Counts.Tokens) + len(mountData.Counts.NonEntities), + }, + }) + } + + pq.Namespaces = append(pq.Namespaces, &activity.NamespaceRecord{ + NamespaceID: nsID, + Entities: uint64(len(entry.Counts.Entities)), + NonEntityTokens: entry.Counts.Tokens + uint64(len(entry.Counts.NonEntities)), + Mounts: mountRecord, + }) + } + err := a.queryStore.Put(ctx, pq) + if err != nil { + a.logger.Warn("failed to store precomputed query", "error", err) + } + return nil +} + +// pqOptions holds fields that will be used when creating precomputed queries +// These fields will remain the same for every segment that a precomputed query worker is handling +type pqOptions struct { + byNamespace map[string]*processByNamespace + byMonth map[int64]*processMonth + // endTime sets the end time of the precomputed query. + // When invoked on schedule by the precomputedQueryWorker, this is the end of the month that just finished. + endTime time.Time + // activePeriodStart is the earliest date in our retention window + activePeriodStart time.Time + // activePeriodEnd is the latest date in our retention window. + // When invoked on schedule by the precomputedQueryWorker, this will be the timestamp of the most recent segment + // that's present in storage + activePeriodEnd time.Time +} + +// segmentToPrecomputedQuery processes a single segment +func (a *ActivityLog) segmentToPrecomputedQuery(ctx context.Context, segmentTime time.Time, reader SegmentReader, opts pqOptions) error { + hyperloglog, err := a.CreateOrFetchHyperlogLog(ctx, segmentTime) + if err != nil { + // We were unable to create or fetch the hll, but we should still + // continue with our precomputation + a.logger.Warn("unable to create or fetch hyperloglog", "start time", segmentTime, "error", err) + } + + // Iterate through entities, adding them to the hyperloglog and the summary maps in opts + for { + entity, err := reader.ReadEntity(ctx) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + a.logger.Warn("failed to read segment", "error", err) + return err + } + err = a.handleEntitySegment(entity, segmentTime, hyperloglog, opts) + if err != nil { + a.logger.Warn("failed to handle entity segment", "error", err) + return err + } + } + + // Store the hyperloglog + err = a.StoreHyperlogLog(ctx, segmentTime, hyperloglog) + if err != nil { + a.logger.Warn("failed to store hyperloglog for month", "start time", segmentTime, "error", err) + } + + // Iterate through any tokens and add them to per namespace map + for { + token, err := reader.ReadToken(ctx) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + a.logger.Warn("failed to load token counts", "error", err) + return err + } + a.breakdownTokenSegment(token, opts.byNamespace) + } + + // write metrics + for nsID, entry := range opts.byNamespace { + // If this is the most recent month, or the start of the reporting period, output + // a metric for each namespace. + if segmentTime == opts.activePeriodEnd { + a.metrics.SetGaugeWithLabels( + []string{"identity", "entity", "active", "monthly"}, + float32(len(entry.Counts.Entities)), + []metricsutil.Label{ + {Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)}, + }, + ) + a.metrics.SetGaugeWithLabels( + []string{"identity", "nonentity", "active", "monthly"}, + float32(len(entry.Counts.NonEntities))+float32(entry.Counts.Tokens), + []metricsutil.Label{ + {Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)}, + }, + ) + } else if segmentTime == opts.activePeriodStart { + a.metrics.SetGaugeWithLabels( + []string{"identity", "entity", "active", "reporting_period"}, + float32(len(entry.Counts.Entities)), + []metricsutil.Label{ + {Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)}, + }, + ) + a.metrics.SetGaugeWithLabels( + []string{"identity", "nonentity", "active", "reporting_period"}, + float32(len(entry.Counts.NonEntities))+float32(entry.Counts.Tokens), + []metricsutil.Label{ + {Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)}, + }, + ) + } + } + + // convert the maps to the proper format and write them as precomputed queries + return a.writePrecomputedQuery(ctx, segmentTime, opts) +} + // goroutine to process the request in the intent log, creating precomputed queries. // We expect the return value won't be checked, so log errors as they occur // (but for unit testing having the error return should help.) @@ -2085,187 +2301,38 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error { return errors.New("previous month not found") } - // "times" is already in reverse order, start building the per-namespace maps - // from the last month backward - byNamespace := make(map[string]*processByNamespace) byMonth := make(map[int64]*processMonth) - walkEntities := func(l *activity.EntityActivityLog, startTime time.Time, hll *hyperloglog.Sketch) error { - for _, e := range l.Clients { - - processClientRecord(e, byNamespace, byMonth, startTime) - - // We maintain an hyperloglog for each month - // hyperloglog is a sketch (hyperloglog data-structure) containing client ID's in a given month - // hyperloglog is used in activity log to get the approximate number new clients in the current billing month - // by counting the number of distinct clients in all the months including current month - // (this can be done by merging the hyperloglog all months with current month hyperloglog) - // and subtracting the number of distinct clients in the current month - // NOTE: current month here is not the month of startTime but the time period from the start of the current month, - // up until the time that this request was made. - hll.Insert([]byte(e.ClientID)) - - // The byMonth map will be filled in the reverse order of time. For - // example, if the billing period is from Jan to June, the byMonth - // will be filled for June first, May next and so on till Jan. When - // processing a client for the current month, it has been added as a - // new client above. Now, we check if that client is also used in - // the subsequent months (on any given month, byMonth map has - // already been processed for all the subsequent months due to the - // reverse ordering). If yes, we remove those references. This way a - // client is considered new only in the earliest month of its use in - // the billing period. - for currMonth := timeutil.StartOfMonth(startTime).UTC(); currMonth != timeutil.StartOfMonth(times[0]).UTC(); currMonth = timeutil.StartOfNextMonth(currMonth).UTC() { - // Invalidate the client from being a new client in the next month - next := timeutil.StartOfNextMonth(currMonth).UTC().Unix() - if _, present := byMonth[next]; !present { - continue - } - - newClients := byMonth[next].NewClients - - // Remove the client from the top level counts within the month. - if e.NonEntity { - delete(newClients.Counts.NonEntities, e.ClientID) - } else { - delete(newClients.Counts.Entities, e.ClientID) - } - - if _, present := newClients.Namespaces[e.NamespaceID]; present { - // Remove the client from the namespace within the month. - if e.NonEntity { - delete(newClients.Namespaces[e.NamespaceID].Counts.NonEntities, e.ClientID) - } else { - delete(newClients.Namespaces[e.NamespaceID].Counts.Entities, e.ClientID) - } - if _, present := newClients.Namespaces[e.NamespaceID].Mounts[e.MountAccessor]; present { - // Remove the client from the mount within the namespace within the month. - if e.NonEntity { - delete(newClients.Namespaces[e.NamespaceID].Mounts[e.MountAccessor].Counts.NonEntities, e.ClientID) - } else { - delete(newClients.Namespaces[e.NamespaceID].Mounts[e.MountAccessor].Counts.Entities, e.ClientID) - } - } - } - } - } - - return nil - } - - walkTokens := func(l *activity.TokenCount) { - for nsID, v := range l.CountByNamespaceID { - if _, present := byNamespace[nsID]; !present { - byNamespace[nsID] = newByNamespace() - } - byNamespace[nsID].Counts.Tokens += v - } - } - endTime := timeutil.EndOfMonth(time.Unix(lastMonth, 0).UTC()) activePeriodStart := timeutil.MonthsPreviousTo(a.defaultReportMonths, endTime) // If not enough data, report as much as we have in the window if activePeriodStart.Before(times[len(times)-1]) { activePeriodStart = times[len(times)-1] } - + opts := pqOptions{ + byNamespace: byNamespace, + byMonth: byMonth, + endTime: endTime, + activePeriodStart: activePeriodStart, + activePeriodEnd: times[0], + } + // "times" is already in reverse order, start building the per-namespace maps + // from the last month backward for _, startTime := range times { // Do not work back further than the current retention window, // which will just get deleted anyway. if startTime.Before(retentionWindow) { break } - - hyperloglog, err := a.CreateOrFetchHyperlogLog(ctx, startTime) + reader, err := a.NewSegmentFileReader(ctx, startTime) if err != nil { - // We were unable to create or fetch the hll, but we should still - // continue with our precomputation - a.logger.Warn("unable to create or fetch hyperloglog", "start time", startTime, "error", err) - } - err = a.WalkEntitySegments(ctx, startTime, hyperloglog, walkEntities) - if err != nil { - a.logger.Warn("failed to load previous segments", "error", err) return err } - // Store the hyperloglog - err = a.StoreHyperlogLog(ctx, startTime, hyperloglog) + err = a.segmentToPrecomputedQuery(ctx, startTime, reader, opts) if err != nil { - a.logger.Warn("failed to store hyperloglog for month", "start time", startTime, "error", err) - } - err = a.WalkTokenSegments(ctx, startTime, walkTokens) - if err != nil { - a.logger.Warn("failed to load previous token counts", "error", err) return err } - - // Save the work to date in a record - pq := &activity.PrecomputedQuery{ - StartTime: startTime, - EndTime: endTime, - Namespaces: make([]*activity.NamespaceRecord, 0, len(byNamespace)), - Months: make([]*activity.MonthRecord, 0, len(byMonth)), - } - pq.Months = a.transformMonthBreakdowns(byMonth) - - for nsID, entry := range byNamespace { - mountRecord := make([]*activity.MountRecord, 0, len(entry.Mounts)) - for mountAccessor, mountData := range entry.Mounts { - mountRecord = append(mountRecord, &activity.MountRecord{ - MountPath: a.mountAccessorToMountPath(mountAccessor), - Counts: &activity.CountsRecord{ - EntityClients: len(mountData.Counts.Entities), - NonEntityClients: int(mountData.Counts.Tokens) + len(mountData.Counts.NonEntities), - }, - }) - } - - pq.Namespaces = append(pq.Namespaces, &activity.NamespaceRecord{ - NamespaceID: nsID, - Entities: uint64(len(entry.Counts.Entities)), - NonEntityTokens: entry.Counts.Tokens + uint64(len(entry.Counts.NonEntities)), - Mounts: mountRecord, - }) - - // If this is the most recent month, or the start of the reporting period, output - // a metric for each namespace. - if startTime == times[0] { - a.metrics.SetGaugeWithLabels( - []string{"identity", "entity", "active", "monthly"}, - float32(len(entry.Counts.Entities)), - []metricsutil.Label{ - {Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)}, - }, - ) - a.metrics.SetGaugeWithLabels( - []string{"identity", "nonentity", "active", "monthly"}, - float32(len(entry.Counts.NonEntities))+float32(entry.Counts.Tokens), - []metricsutil.Label{ - {Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)}, - }, - ) - } else if startTime == activePeriodStart { - a.metrics.SetGaugeWithLabels( - []string{"identity", "entity", "active", "reporting_period"}, - float32(len(entry.Counts.Entities)), - []metricsutil.Label{ - {Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)}, - }, - ) - a.metrics.SetGaugeWithLabels( - []string{"identity", "nonentity", "active", "reporting_period"}, - float32(len(entry.Counts.NonEntities))+float32(entry.Counts.Tokens), - []metricsutil.Label{ - {Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)}, - }, - ) - } - } - - err = a.queryStore.Put(ctx, pq) - if err != nil { - a.logger.Warn("failed to store precomputed query", "error", err) - } } // delete the intent log @@ -2371,6 +2438,8 @@ func (a *ActivityLog) populateNamespaceAndMonthlyBreakdowns() (map[int64]*proces return byMonth, byNamespace } +// transformMonthBreakdowns converts a map of unix timestamp -> processMonth to +// a slice of MonthRecord func (a *ActivityLog) transformMonthBreakdowns(byMonth map[int64]*processMonth) []*activity.MonthRecord { monthly := make([]*activity.MonthRecord, 0) processByNamespaces := func(nsMap map[string]*processByNamespace) []*activity.MonthlyNamespaceRecord { diff --git a/vault/activity_log_test.go b/vault/activity_log_test.go index 4416d78be..2ea84a209 100644 --- a/vault/activity_log_test.go +++ b/vault/activity_log_test.go @@ -4243,6 +4243,50 @@ func TestActivityLog_partialMonthClientCountWithMultipleMountPaths(t *testing.T) } } +// TestActivityLog_processNewClients_delete ensures that the correct clients are deleted from a processNewClients struct +func TestActivityLog_processNewClients_delete(t *testing.T) { + mount := "mount" + namespace := "namespace" + clientID := "client-id" + run := func(t *testing.T, isNonEntity bool) { + t.Helper() + record := &activity.EntityRecord{ + MountAccessor: mount, + NamespaceID: namespace, + ClientID: clientID, + NonEntity: isNonEntity, + } + newClients := newProcessNewClients() + newClients.add(record) + + require.True(t, newClients.Counts.contains(record)) + require.True(t, newClients.Namespaces[namespace].Counts.contains(record)) + require.True(t, newClients.Namespaces[namespace].Mounts[mount].Counts.contains(record)) + + newClients.delete(record) + + byNS := newClients.Namespaces + counts := newClients.Counts + require.NotContains(t, counts.NonEntities, clientID) + require.NotContains(t, counts.Entities, clientID) + + require.NotContains(t, counts.NonEntities, clientID) + require.NotContains(t, counts.Entities, clientID) + + require.NotContains(t, byNS[namespace].Mounts[mount].Counts.NonEntities, clientID) + require.NotContains(t, byNS[namespace].Counts.NonEntities, clientID) + + require.NotContains(t, byNS[namespace].Mounts[mount].Counts.Entities, clientID) + require.NotContains(t, byNS[namespace].Counts.Entities, clientID) + } + t.Run("entity", func(t *testing.T) { + run(t, false) + }) + t.Run("non-entity", func(t *testing.T) { + run(t, true) + }) +} + // TestActivityLog_processClientRecord calls processClientRecord for an entity and a non-entity record and verifies that // the record is present in the namespace and month maps func TestActivityLog_processClientRecord(t *testing.T) { @@ -4301,3 +4345,337 @@ func TestActivityLog_processClientRecord(t *testing.T) { run(t, false) }) } + +func verifyByNamespaceContains(t *testing.T, s summaryByNamespace, clients ...*activity.EntityRecord) { + t.Helper() + for _, c := range clients { + require.Contains(t, s, c.NamespaceID) + counts := s[c.NamespaceID].Counts + require.True(t, counts.contains(c)) + mounts := s[c.NamespaceID].Mounts + require.Contains(t, mounts, c.MountAccessor) + require.True(t, mounts[c.MountAccessor].Counts.contains(c)) + } +} + +func (s summaryByMonth) firstSeen(t *testing.T, client *activity.EntityRecord) time.Time { + t.Helper() + var seen int64 + for month, data := range s { + present := data.NewClients.Counts.contains(client) + if present { + if seen != 0 { + require.Fail(t, "client seen more than once", client.ClientID, s) + } + seen = month + } + } + return time.Unix(seen, 0).UTC() +} + +// TestActivityLog_handleEntitySegment verifies that the by namespace and by month summaries are correctly filled in a +// variety of scenarios +func TestActivityLog_handleEntitySegment(t *testing.T) { + finalTime := timeutil.StartOfMonth(time.Date(2022, 12, 1, 0, 0, 0, 0, time.UTC)) + addMonths := func(i int) time.Time { + return timeutil.StartOfMonth(finalTime.AddDate(0, i, 0)) + } + currentSegmentClients := make([]*activity.EntityRecord, 0, 3) + for i := 0; i < 3; i++ { + currentSegmentClients = append(currentSegmentClients, &activity.EntityRecord{ + ClientID: fmt.Sprintf("id-%d", i), + NamespaceID: fmt.Sprintf("ns-%d", i), + MountAccessor: fmt.Sprintf("mnt-%d", i), + NonEntity: i == 0, + }) + } + a := &ActivityLog{} + t.Run("older segment empty", func(t *testing.T) { + hll := hyperloglog.New() + byNS := make(summaryByNamespace) + byMonth := make(summaryByMonth) + segmentTime := addMonths(-3) + // our 3 clients were seen 3 months ago, with no other clients having been seen + err := a.handleEntitySegment(&activity.EntityActivityLog{Clients: currentSegmentClients}, segmentTime, hll, pqOptions{ + byNamespace: byNS, + byMonth: byMonth, + endTime: timeutil.EndOfMonth(segmentTime), + activePeriodStart: addMonths(-12), + activePeriodEnd: addMonths(12), + }) + require.NoError(t, err) + require.Len(t, byNS, 3) + verifyByNamespaceContains(t, byNS, currentSegmentClients...) + require.Len(t, byMonth, 1) + // they should all be registered as having first been seen 3 months ago + require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[0]), segmentTime) + require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[1]), segmentTime) + require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[2]), segmentTime) + // and all 3 should be in the hyperloglog + require.Equal(t, hll.Estimate(), uint64(3)) + }) + t.Run("older segment clients seen earlier", func(t *testing.T) { + hll := hyperloglog.New() + byNS := make(summaryByNamespace) + byNS.add(currentSegmentClients[0]) + byNS.add(currentSegmentClients[1]) + byMonth := make(summaryByMonth) + segmentTime := addMonths(-3) + seenBefore2Months := addMonths(-2) + seenBefore1Month := addMonths(-1) + + // client 0 was seen 2 months ago + byMonth.add(currentSegmentClients[0], seenBefore2Months) + // client 1 was seen 1 month ago + byMonth.add(currentSegmentClients[1], seenBefore1Month) + + // handle clients 0, 1, and 2 as having been seen 3 months ago + err := a.handleEntitySegment(&activity.EntityActivityLog{Clients: currentSegmentClients}, segmentTime, hll, pqOptions{ + byNamespace: byNS, + byMonth: byMonth, + endTime: timeutil.EndOfMonth(segmentTime), + activePeriodStart: addMonths(-12), + activePeriodEnd: addMonths(12), + }) + require.NoError(t, err) + require.Len(t, byNS, 3) + verifyByNamespaceContains(t, byNS, currentSegmentClients...) + // we expect that they will only be registered as new 3 months ago, because that's when they were first seen + require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[0]), segmentTime) + require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[1]), segmentTime) + require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[2]), segmentTime) + + require.Equal(t, hll.Estimate(), uint64(3)) + }) + t.Run("disjoint set of clients", func(t *testing.T) { + hll := hyperloglog.New() + byNS := make(summaryByNamespace) + byNS.add(currentSegmentClients[0]) + byNS.add(currentSegmentClients[1]) + byMonth := make(summaryByMonth) + segmentTime := addMonths(-3) + seenBefore2Months := addMonths(-2) + seenBefore1Month := addMonths(-1) + + // client 0 was seen 2 months ago + byMonth.add(currentSegmentClients[0], seenBefore2Months) + // client 1 was seen 1 month ago + byMonth.add(currentSegmentClients[1], seenBefore1Month) + + // handle client 2 as having been seen 3 months ago + err := a.handleEntitySegment(&activity.EntityActivityLog{Clients: currentSegmentClients[2:]}, segmentTime, hll, pqOptions{ + byNamespace: byNS, + byMonth: byMonth, + endTime: timeutil.EndOfMonth(segmentTime), + activePeriodStart: addMonths(-12), + activePeriodEnd: addMonths(12), + }) + require.NoError(t, err) + require.Len(t, byNS, 3) + verifyByNamespaceContains(t, byNS, currentSegmentClients...) + // client 2 should be added to the map, and the other clients should stay where they were + require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[0]), seenBefore2Months) + require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[1]), seenBefore1Month) + require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[2]), segmentTime) + // the hyperloglog will have 1 element, because there was only 1 client in the segment + require.Equal(t, hll.Estimate(), uint64(1)) + }) + t.Run("new clients same namespaces", func(t *testing.T) { + hll := hyperloglog.New() + byNS := make(summaryByNamespace) + byNS.add(currentSegmentClients[0]) + byNS.add(currentSegmentClients[1]) + byNS.add(currentSegmentClients[2]) + byMonth := make(summaryByMonth) + segmentTime := addMonths(-3) + seenBefore2Months := addMonths(-2) + seenBefore1Month := addMonths(-1) + + // client 0 and 2 were seen 2 months ago + byMonth.add(currentSegmentClients[0], seenBefore2Months) + byMonth.add(currentSegmentClients[2], seenBefore2Months) + // client 1 was seen 1 month ago + byMonth.add(currentSegmentClients[1], seenBefore1Month) + + // create 3 additional clients + // these have ns-1, ns-2, ns-3 and mnt-1, mnt-2, mnt-3 + moreSegmentClients := make([]*activity.EntityRecord, 0, 3) + for i := 0; i < 3; i++ { + moreSegmentClients = append(moreSegmentClients, &activity.EntityRecord{ + ClientID: fmt.Sprintf("id-%d", i+3), + NamespaceID: fmt.Sprintf("ns-%d", i), + MountAccessor: fmt.Sprintf("ns-%d", i), + NonEntity: i == 1, + }) + } + // 3 new clients have been seen 3 months ago + err := a.handleEntitySegment(&activity.EntityActivityLog{Clients: moreSegmentClients}, segmentTime, hll, pqOptions{ + byNamespace: byNS, + byMonth: byMonth, + endTime: timeutil.EndOfMonth(segmentTime), + activePeriodStart: addMonths(-12), + activePeriodEnd: addMonths(12), + }) + require.NoError(t, err) + // there are only 3 namespaces, since both currentSegmentClients and moreSegmentClients use the same namespaces + require.Len(t, byNS, 3) + verifyByNamespaceContains(t, byNS, currentSegmentClients...) + verifyByNamespaceContains(t, byNS, moreSegmentClients...) + // The segment clients that have already been seen have their same first seen dates + require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[0]), seenBefore2Months) + require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[1]), seenBefore1Month) + require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[2]), seenBefore2Months) + // and the new clients should be first seen at segmentTime + require.Equal(t, byMonth.firstSeen(t, moreSegmentClients[0]), segmentTime) + require.Equal(t, byMonth.firstSeen(t, moreSegmentClients[1]), segmentTime) + require.Equal(t, byMonth.firstSeen(t, moreSegmentClients[2]), segmentTime) + // the hyperloglog will have 3 elements, because there were the 3 new elements in moreSegmentClients seen + require.Equal(t, hll.Estimate(), uint64(3)) + }) +} + +// TestActivityLog_breakdownTokenSegment verifies that tokens are correctly added to a map that tracks counts per namespace +func TestActivityLog_breakdownTokenSegment(t *testing.T) { + toAdd := map[string]uint64{ + "a": 1, + "b": 2, + "c": 3, + } + a := &ActivityLog{} + testCases := []struct { + name string + existingNamespaceCounts map[string]uint64 + wantCounts map[string]uint64 + }{ + { + name: "empty", + wantCounts: toAdd, + }, + { + name: "some overlap", + existingNamespaceCounts: map[string]uint64{ + "a": 2, + "z": 1, + }, + wantCounts: map[string]uint64{ + "a": 3, + "b": 2, + "c": 3, + "z": 1, + }, + }, + { + name: "disjoint sets", + existingNamespaceCounts: map[string]uint64{ + "z": 5, + "y": 3, + "x": 2, + }, + wantCounts: map[string]uint64{ + "a": 1, + "b": 2, + "c": 3, + "z": 5, + "y": 3, + "x": 2, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + byNamespace := make(map[string]*processByNamespace) + for k, v := range tc.existingNamespaceCounts { + byNamespace[k] = newByNamespace() + byNamespace[k].Counts.Tokens = v + } + a.breakdownTokenSegment(&activity.TokenCount{CountByNamespaceID: toAdd}, byNamespace) + got := make(map[string]uint64) + for k, v := range byNamespace { + got[k] = v.Counts.Tokens + } + require.Equal(t, tc.wantCounts, got) + }) + } +} + +// TestActivityLog_writePrecomputedQuery calls writePrecomputedQuery for a segment with 1 non entity and 1 entity client, +// which have different namespaces and mounts. The precomputed query is then retrieved from storage and we verify that +// the data structure is filled correctly +func TestActivityLog_writePrecomputedQuery(t *testing.T) { + core, _, _ := TestCoreUnsealed(t) + + a := core.activityLog + a.SetEnable(true) + + byMonth := make(summaryByMonth) + byNS := make(summaryByNamespace) + clientEntity := &activity.EntityRecord{ + ClientID: "id-1", + NamespaceID: "ns-1", + MountAccessor: "mnt-1", + } + clientNonEntity := &activity.EntityRecord{ + ClientID: "id-2", + NamespaceID: "ns-2", + MountAccessor: "mnt-2", + NonEntity: true, + } + now := time.Now() + + // add the 2 clients to the namespace and month summaries + processClientRecord(clientEntity, byNS, byMonth, now) + processClientRecord(clientNonEntity, byNS, byMonth, now) + + endTime := timeutil.EndOfMonth(now) + opts := pqOptions{ + byNamespace: byNS, + byMonth: byMonth, + endTime: endTime, + } + + err := a.writePrecomputedQuery(context.Background(), now, opts) + require.NoError(t, err) + + // read the query back from storage + val, err := a.queryStore.Get(context.Background(), now, endTime) + require.NoError(t, err) + require.Equal(t, now.UTC().Unix(), val.StartTime.UTC().Unix()) + require.Equal(t, endTime.UTC().Unix(), val.EndTime.UTC().Unix()) + + // ns-1 and ns-2 should both be present in the results + require.Len(t, val.Namespaces, 2) + require.Len(t, val.Months, 1) + resultByNS := make(map[string]*activity.NamespaceRecord) + for _, ns := range val.Namespaces { + resultByNS[ns.NamespaceID] = ns + } + ns1 := resultByNS["ns-1"] + ns2 := resultByNS["ns-2"] + + require.Equal(t, ns1.Entities, uint64(1)) + require.Equal(t, ns1.NonEntityTokens, uint64(0)) + require.Equal(t, ns2.Entities, uint64(0)) + require.Equal(t, ns2.NonEntityTokens, uint64(1)) + + require.Len(t, ns1.Mounts, 1) + require.Len(t, ns2.Mounts, 1) + // ns-1 needs to have mnt-1 + require.Contains(t, ns1.Mounts[0].MountPath, "mnt-1") + // ns-2 needs to have mnt-2 + require.Contains(t, ns2.Mounts[0].MountPath, "mnt-2") + + require.Equal(t, 1, ns1.Mounts[0].Counts.EntityClients) + require.Equal(t, 0, ns1.Mounts[0].Counts.NonEntityClients) + require.Equal(t, 0, ns2.Mounts[0].Counts.EntityClients) + require.Equal(t, 1, ns2.Mounts[0].Counts.NonEntityClients) + + monthRecord := val.Months[0] + // there should only be one month present, since the clients were added with the same timestamp + require.Equal(t, monthRecord.Timestamp, timeutil.StartOfMonth(now).UTC().Unix()) + require.Equal(t, 1, monthRecord.Counts.NonEntityClients) + require.Equal(t, 1, monthRecord.Counts.EntityClients) + require.Len(t, monthRecord.Namespaces, 2) + require.Len(t, monthRecord.NewClients.Namespaces, 2) + require.Equal(t, 1, monthRecord.NewClients.Counts.EntityClients) + require.Equal(t, 1, monthRecord.NewClients.Counts.NonEntityClients) +}