VAULT-14733: Split logic of precomputedQueryWorker (#20073)

* split precomputed query worker and add unit tests

* add new client delete method and test

* add changelog

* fixes from pr review

* add missing comment

* fix comparison
This commit is contained in:
miagilepner 2023-05-16 16:29:18 +02:00 committed by GitHub
parent b483288703
commit f14a039a65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 610 additions and 160 deletions

3
changelog/20073.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
core/activity: refactor the activity log's generation of precomputed queries
```

View File

@ -1895,6 +1895,17 @@ func newProcessCounts() *processCounts {
}
}
func (p *processCounts) delete(client *activity.EntityRecord) {
if !p.contains(client) {
return
}
if client.NonEntity {
delete(p.NonEntities, client.ClientID)
} else {
delete(p.Entities, client.ClientID)
}
}
func (p *processCounts) add(client *activity.EntityRecord) {
if client.NonEntity {
p.NonEntities[client.ClientID] = struct{}{}
@ -1926,6 +1937,10 @@ func (p *processMount) add(client *activity.EntityRecord) {
p.Counts.add(client)
}
func (p *processMount) delete(client *activity.EntityRecord) {
p.Counts.delete(client)
}
func (s summaryByMount) add(client *activity.EntityRecord) {
if _, present := s[client.MountAccessor]; !present {
s[client.MountAccessor] = newProcessMount()
@ -1933,6 +1948,12 @@ func (s summaryByMount) add(client *activity.EntityRecord) {
s[client.MountAccessor].add(client)
}
func (s summaryByMount) delete(client *activity.EntityRecord) {
if m, present := s[client.MountAccessor]; present {
m.delete(client)
}
}
type processByNamespace struct {
Counts *processCounts
Mounts summaryByMount
@ -1950,6 +1971,11 @@ func (p *processByNamespace) add(client *activity.EntityRecord) {
p.Mounts.add(client)
}
func (p *processByNamespace) delete(client *activity.EntityRecord) {
p.Counts.delete(client)
p.Mounts.delete(client)
}
func (s summaryByNamespace) add(client *activity.EntityRecord) {
if _, present := s[client.NamespaceID]; !present {
s[client.NamespaceID] = newByNamespace()
@ -1957,6 +1983,12 @@ func (s summaryByNamespace) add(client *activity.EntityRecord) {
s[client.NamespaceID].add(client)
}
func (s summaryByNamespace) delete(client *activity.EntityRecord) {
if n, present := s[client.NamespaceID]; present {
n.delete(client)
}
}
type processNewClients struct {
Counts *processCounts
Namespaces summaryByNamespace
@ -1974,6 +2006,11 @@ func (p *processNewClients) add(client *activity.EntityRecord) {
p.Namespaces.add(client)
}
func (p *processNewClients) delete(client *activity.EntityRecord) {
p.Counts.delete(client)
p.Namespaces.delete(client)
}
type processMonth struct {
Counts *processCounts
Namespaces summaryByNamespace
@ -2009,6 +2046,185 @@ func processClientRecord(e *activity.EntityRecord, byNamespace summaryByNamespac
byMonth.add(e, startTime)
}
// handleEntitySegment processes the record and adds it to the correct month/
// namespace breakdown maps, as well as to the hyperloglog for the month. New
// clients are deduplicated in opts.byMonth so that clients will only appear in
// the first month in which they are seen.
// This method must be called in reverse chronological order of the months (with
// the most recent month being called before previous months)
func (a *ActivityLog) handleEntitySegment(l *activity.EntityActivityLog, segmentTime time.Time, hll *hyperloglog.Sketch, opts pqOptions) error {
for _, e := range l.Clients {
processClientRecord(e, opts.byNamespace, opts.byMonth, segmentTime)
hll.Insert([]byte(e.ClientID))
// step forward in time through the months to check if the client is
// present. If it is, delete it. This is because the client should only
// be reported as new in the earliest month that it was seen
finalMonth := timeutil.StartOfMonth(opts.activePeriodEnd).UTC()
for currMonth := timeutil.StartOfMonth(segmentTime).UTC(); currMonth.Before(finalMonth); currMonth = timeutil.StartOfNextMonth(currMonth).UTC() {
// Invalidate the client from being a new client in the next month
next := timeutil.StartOfNextMonth(currMonth).UTC().Unix()
if _, present := opts.byMonth[next]; present {
// delete from the new clients map for the next month
// this will handle deleting from the per-namespace and per-mount maps of NewClients
opts.byMonth[next].NewClients.delete(e)
}
}
}
return nil
}
// breakdownTokenSegment handles a TokenCount record, adding it to the namespace breakdown
func (a *ActivityLog) breakdownTokenSegment(l *activity.TokenCount, byNamespace map[string]*processByNamespace) {
for nsID, v := range l.CountByNamespaceID {
if _, present := byNamespace[nsID]; !present {
byNamespace[nsID] = newByNamespace()
}
byNamespace[nsID].Counts.Tokens += v
}
}
func (a *ActivityLog) writePrecomputedQuery(ctx context.Context, segmentTime time.Time, opts pqOptions) error {
pq := &activity.PrecomputedQuery{
StartTime: segmentTime,
EndTime: opts.endTime,
Namespaces: make([]*activity.NamespaceRecord, 0, len(opts.byNamespace)),
Months: make([]*activity.MonthRecord, 0, len(opts.byMonth)),
}
// this will transform the byMonth map into the correctly formatted protobuf
pq.Months = a.transformMonthBreakdowns(opts.byMonth)
// the byNamespace map also needs to be transformed into a protobuf
for nsID, entry := range opts.byNamespace {
mountRecord := make([]*activity.MountRecord, 0, len(entry.Mounts))
for mountAccessor, mountData := range entry.Mounts {
mountRecord = append(mountRecord, &activity.MountRecord{
MountPath: a.mountAccessorToMountPath(mountAccessor),
Counts: &activity.CountsRecord{
EntityClients: len(mountData.Counts.Entities),
NonEntityClients: int(mountData.Counts.Tokens) + len(mountData.Counts.NonEntities),
},
})
}
pq.Namespaces = append(pq.Namespaces, &activity.NamespaceRecord{
NamespaceID: nsID,
Entities: uint64(len(entry.Counts.Entities)),
NonEntityTokens: entry.Counts.Tokens + uint64(len(entry.Counts.NonEntities)),
Mounts: mountRecord,
})
}
err := a.queryStore.Put(ctx, pq)
if err != nil {
a.logger.Warn("failed to store precomputed query", "error", err)
}
return nil
}
// pqOptions holds fields that will be used when creating precomputed queries
// These fields will remain the same for every segment that a precomputed query worker is handling
type pqOptions struct {
byNamespace map[string]*processByNamespace
byMonth map[int64]*processMonth
// endTime sets the end time of the precomputed query.
// When invoked on schedule by the precomputedQueryWorker, this is the end of the month that just finished.
endTime time.Time
// activePeriodStart is the earliest date in our retention window
activePeriodStart time.Time
// activePeriodEnd is the latest date in our retention window.
// When invoked on schedule by the precomputedQueryWorker, this will be the timestamp of the most recent segment
// that's present in storage
activePeriodEnd time.Time
}
// segmentToPrecomputedQuery processes a single segment
func (a *ActivityLog) segmentToPrecomputedQuery(ctx context.Context, segmentTime time.Time, reader SegmentReader, opts pqOptions) error {
hyperloglog, err := a.CreateOrFetchHyperlogLog(ctx, segmentTime)
if err != nil {
// We were unable to create or fetch the hll, but we should still
// continue with our precomputation
a.logger.Warn("unable to create or fetch hyperloglog", "start time", segmentTime, "error", err)
}
// Iterate through entities, adding them to the hyperloglog and the summary maps in opts
for {
entity, err := reader.ReadEntity(ctx)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
a.logger.Warn("failed to read segment", "error", err)
return err
}
err = a.handleEntitySegment(entity, segmentTime, hyperloglog, opts)
if err != nil {
a.logger.Warn("failed to handle entity segment", "error", err)
return err
}
}
// Store the hyperloglog
err = a.StoreHyperlogLog(ctx, segmentTime, hyperloglog)
if err != nil {
a.logger.Warn("failed to store hyperloglog for month", "start time", segmentTime, "error", err)
}
// Iterate through any tokens and add them to per namespace map
for {
token, err := reader.ReadToken(ctx)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
a.logger.Warn("failed to load token counts", "error", err)
return err
}
a.breakdownTokenSegment(token, opts.byNamespace)
}
// write metrics
for nsID, entry := range opts.byNamespace {
// If this is the most recent month, or the start of the reporting period, output
// a metric for each namespace.
if segmentTime == opts.activePeriodEnd {
a.metrics.SetGaugeWithLabels(
[]string{"identity", "entity", "active", "monthly"},
float32(len(entry.Counts.Entities)),
[]metricsutil.Label{
{Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)},
},
)
a.metrics.SetGaugeWithLabels(
[]string{"identity", "nonentity", "active", "monthly"},
float32(len(entry.Counts.NonEntities))+float32(entry.Counts.Tokens),
[]metricsutil.Label{
{Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)},
},
)
} else if segmentTime == opts.activePeriodStart {
a.metrics.SetGaugeWithLabels(
[]string{"identity", "entity", "active", "reporting_period"},
float32(len(entry.Counts.Entities)),
[]metricsutil.Label{
{Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)},
},
)
a.metrics.SetGaugeWithLabels(
[]string{"identity", "nonentity", "active", "reporting_period"},
float32(len(entry.Counts.NonEntities))+float32(entry.Counts.Tokens),
[]metricsutil.Label{
{Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)},
},
)
}
}
// convert the maps to the proper format and write them as precomputed queries
return a.writePrecomputedQuery(ctx, segmentTime, opts)
}
// goroutine to process the request in the intent log, creating precomputed queries.
// We expect the return value won't be checked, so log errors as they occur
// (but for unit testing having the error return should help.)
@ -2085,187 +2301,38 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
return errors.New("previous month not found")
}
// "times" is already in reverse order, start building the per-namespace maps
// from the last month backward
byNamespace := make(map[string]*processByNamespace)
byMonth := make(map[int64]*processMonth)
walkEntities := func(l *activity.EntityActivityLog, startTime time.Time, hll *hyperloglog.Sketch) error {
for _, e := range l.Clients {
processClientRecord(e, byNamespace, byMonth, startTime)
// We maintain an hyperloglog for each month
// hyperloglog is a sketch (hyperloglog data-structure) containing client ID's in a given month
// hyperloglog is used in activity log to get the approximate number new clients in the current billing month
// by counting the number of distinct clients in all the months including current month
// (this can be done by merging the hyperloglog all months with current month hyperloglog)
// and subtracting the number of distinct clients in the current month
// NOTE: current month here is not the month of startTime but the time period from the start of the current month,
// up until the time that this request was made.
hll.Insert([]byte(e.ClientID))
// The byMonth map will be filled in the reverse order of time. For
// example, if the billing period is from Jan to June, the byMonth
// will be filled for June first, May next and so on till Jan. When
// processing a client for the current month, it has been added as a
// new client above. Now, we check if that client is also used in
// the subsequent months (on any given month, byMonth map has
// already been processed for all the subsequent months due to the
// reverse ordering). If yes, we remove those references. This way a
// client is considered new only in the earliest month of its use in
// the billing period.
for currMonth := timeutil.StartOfMonth(startTime).UTC(); currMonth != timeutil.StartOfMonth(times[0]).UTC(); currMonth = timeutil.StartOfNextMonth(currMonth).UTC() {
// Invalidate the client from being a new client in the next month
next := timeutil.StartOfNextMonth(currMonth).UTC().Unix()
if _, present := byMonth[next]; !present {
continue
}
newClients := byMonth[next].NewClients
// Remove the client from the top level counts within the month.
if e.NonEntity {
delete(newClients.Counts.NonEntities, e.ClientID)
} else {
delete(newClients.Counts.Entities, e.ClientID)
}
if _, present := newClients.Namespaces[e.NamespaceID]; present {
// Remove the client from the namespace within the month.
if e.NonEntity {
delete(newClients.Namespaces[e.NamespaceID].Counts.NonEntities, e.ClientID)
} else {
delete(newClients.Namespaces[e.NamespaceID].Counts.Entities, e.ClientID)
}
if _, present := newClients.Namespaces[e.NamespaceID].Mounts[e.MountAccessor]; present {
// Remove the client from the mount within the namespace within the month.
if e.NonEntity {
delete(newClients.Namespaces[e.NamespaceID].Mounts[e.MountAccessor].Counts.NonEntities, e.ClientID)
} else {
delete(newClients.Namespaces[e.NamespaceID].Mounts[e.MountAccessor].Counts.Entities, e.ClientID)
}
}
}
}
}
return nil
}
walkTokens := func(l *activity.TokenCount) {
for nsID, v := range l.CountByNamespaceID {
if _, present := byNamespace[nsID]; !present {
byNamespace[nsID] = newByNamespace()
}
byNamespace[nsID].Counts.Tokens += v
}
}
endTime := timeutil.EndOfMonth(time.Unix(lastMonth, 0).UTC())
activePeriodStart := timeutil.MonthsPreviousTo(a.defaultReportMonths, endTime)
// If not enough data, report as much as we have in the window
if activePeriodStart.Before(times[len(times)-1]) {
activePeriodStart = times[len(times)-1]
}
opts := pqOptions{
byNamespace: byNamespace,
byMonth: byMonth,
endTime: endTime,
activePeriodStart: activePeriodStart,
activePeriodEnd: times[0],
}
// "times" is already in reverse order, start building the per-namespace maps
// from the last month backward
for _, startTime := range times {
// Do not work back further than the current retention window,
// which will just get deleted anyway.
if startTime.Before(retentionWindow) {
break
}
hyperloglog, err := a.CreateOrFetchHyperlogLog(ctx, startTime)
reader, err := a.NewSegmentFileReader(ctx, startTime)
if err != nil {
// We were unable to create or fetch the hll, but we should still
// continue with our precomputation
a.logger.Warn("unable to create or fetch hyperloglog", "start time", startTime, "error", err)
}
err = a.WalkEntitySegments(ctx, startTime, hyperloglog, walkEntities)
if err != nil {
a.logger.Warn("failed to load previous segments", "error", err)
return err
}
// Store the hyperloglog
err = a.StoreHyperlogLog(ctx, startTime, hyperloglog)
err = a.segmentToPrecomputedQuery(ctx, startTime, reader, opts)
if err != nil {
a.logger.Warn("failed to store hyperloglog for month", "start time", startTime, "error", err)
}
err = a.WalkTokenSegments(ctx, startTime, walkTokens)
if err != nil {
a.logger.Warn("failed to load previous token counts", "error", err)
return err
}
// Save the work to date in a record
pq := &activity.PrecomputedQuery{
StartTime: startTime,
EndTime: endTime,
Namespaces: make([]*activity.NamespaceRecord, 0, len(byNamespace)),
Months: make([]*activity.MonthRecord, 0, len(byMonth)),
}
pq.Months = a.transformMonthBreakdowns(byMonth)
for nsID, entry := range byNamespace {
mountRecord := make([]*activity.MountRecord, 0, len(entry.Mounts))
for mountAccessor, mountData := range entry.Mounts {
mountRecord = append(mountRecord, &activity.MountRecord{
MountPath: a.mountAccessorToMountPath(mountAccessor),
Counts: &activity.CountsRecord{
EntityClients: len(mountData.Counts.Entities),
NonEntityClients: int(mountData.Counts.Tokens) + len(mountData.Counts.NonEntities),
},
})
}
pq.Namespaces = append(pq.Namespaces, &activity.NamespaceRecord{
NamespaceID: nsID,
Entities: uint64(len(entry.Counts.Entities)),
NonEntityTokens: entry.Counts.Tokens + uint64(len(entry.Counts.NonEntities)),
Mounts: mountRecord,
})
// If this is the most recent month, or the start of the reporting period, output
// a metric for each namespace.
if startTime == times[0] {
a.metrics.SetGaugeWithLabels(
[]string{"identity", "entity", "active", "monthly"},
float32(len(entry.Counts.Entities)),
[]metricsutil.Label{
{Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)},
},
)
a.metrics.SetGaugeWithLabels(
[]string{"identity", "nonentity", "active", "monthly"},
float32(len(entry.Counts.NonEntities))+float32(entry.Counts.Tokens),
[]metricsutil.Label{
{Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)},
},
)
} else if startTime == activePeriodStart {
a.metrics.SetGaugeWithLabels(
[]string{"identity", "entity", "active", "reporting_period"},
float32(len(entry.Counts.Entities)),
[]metricsutil.Label{
{Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)},
},
)
a.metrics.SetGaugeWithLabels(
[]string{"identity", "nonentity", "active", "reporting_period"},
float32(len(entry.Counts.NonEntities))+float32(entry.Counts.Tokens),
[]metricsutil.Label{
{Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)},
},
)
}
}
err = a.queryStore.Put(ctx, pq)
if err != nil {
a.logger.Warn("failed to store precomputed query", "error", err)
}
}
// delete the intent log
@ -2371,6 +2438,8 @@ func (a *ActivityLog) populateNamespaceAndMonthlyBreakdowns() (map[int64]*proces
return byMonth, byNamespace
}
// transformMonthBreakdowns converts a map of unix timestamp -> processMonth to
// a slice of MonthRecord
func (a *ActivityLog) transformMonthBreakdowns(byMonth map[int64]*processMonth) []*activity.MonthRecord {
monthly := make([]*activity.MonthRecord, 0)
processByNamespaces := func(nsMap map[string]*processByNamespace) []*activity.MonthlyNamespaceRecord {

View File

@ -4243,6 +4243,50 @@ func TestActivityLog_partialMonthClientCountWithMultipleMountPaths(t *testing.T)
}
}
// TestActivityLog_processNewClients_delete ensures that the correct clients are deleted from a processNewClients struct
func TestActivityLog_processNewClients_delete(t *testing.T) {
mount := "mount"
namespace := "namespace"
clientID := "client-id"
run := func(t *testing.T, isNonEntity bool) {
t.Helper()
record := &activity.EntityRecord{
MountAccessor: mount,
NamespaceID: namespace,
ClientID: clientID,
NonEntity: isNonEntity,
}
newClients := newProcessNewClients()
newClients.add(record)
require.True(t, newClients.Counts.contains(record))
require.True(t, newClients.Namespaces[namespace].Counts.contains(record))
require.True(t, newClients.Namespaces[namespace].Mounts[mount].Counts.contains(record))
newClients.delete(record)
byNS := newClients.Namespaces
counts := newClients.Counts
require.NotContains(t, counts.NonEntities, clientID)
require.NotContains(t, counts.Entities, clientID)
require.NotContains(t, counts.NonEntities, clientID)
require.NotContains(t, counts.Entities, clientID)
require.NotContains(t, byNS[namespace].Mounts[mount].Counts.NonEntities, clientID)
require.NotContains(t, byNS[namespace].Counts.NonEntities, clientID)
require.NotContains(t, byNS[namespace].Mounts[mount].Counts.Entities, clientID)
require.NotContains(t, byNS[namespace].Counts.Entities, clientID)
}
t.Run("entity", func(t *testing.T) {
run(t, false)
})
t.Run("non-entity", func(t *testing.T) {
run(t, true)
})
}
// TestActivityLog_processClientRecord calls processClientRecord for an entity and a non-entity record and verifies that
// the record is present in the namespace and month maps
func TestActivityLog_processClientRecord(t *testing.T) {
@ -4301,3 +4345,337 @@ func TestActivityLog_processClientRecord(t *testing.T) {
run(t, false)
})
}
func verifyByNamespaceContains(t *testing.T, s summaryByNamespace, clients ...*activity.EntityRecord) {
t.Helper()
for _, c := range clients {
require.Contains(t, s, c.NamespaceID)
counts := s[c.NamespaceID].Counts
require.True(t, counts.contains(c))
mounts := s[c.NamespaceID].Mounts
require.Contains(t, mounts, c.MountAccessor)
require.True(t, mounts[c.MountAccessor].Counts.contains(c))
}
}
func (s summaryByMonth) firstSeen(t *testing.T, client *activity.EntityRecord) time.Time {
t.Helper()
var seen int64
for month, data := range s {
present := data.NewClients.Counts.contains(client)
if present {
if seen != 0 {
require.Fail(t, "client seen more than once", client.ClientID, s)
}
seen = month
}
}
return time.Unix(seen, 0).UTC()
}
// TestActivityLog_handleEntitySegment verifies that the by namespace and by month summaries are correctly filled in a
// variety of scenarios
func TestActivityLog_handleEntitySegment(t *testing.T) {
finalTime := timeutil.StartOfMonth(time.Date(2022, 12, 1, 0, 0, 0, 0, time.UTC))
addMonths := func(i int) time.Time {
return timeutil.StartOfMonth(finalTime.AddDate(0, i, 0))
}
currentSegmentClients := make([]*activity.EntityRecord, 0, 3)
for i := 0; i < 3; i++ {
currentSegmentClients = append(currentSegmentClients, &activity.EntityRecord{
ClientID: fmt.Sprintf("id-%d", i),
NamespaceID: fmt.Sprintf("ns-%d", i),
MountAccessor: fmt.Sprintf("mnt-%d", i),
NonEntity: i == 0,
})
}
a := &ActivityLog{}
t.Run("older segment empty", func(t *testing.T) {
hll := hyperloglog.New()
byNS := make(summaryByNamespace)
byMonth := make(summaryByMonth)
segmentTime := addMonths(-3)
// our 3 clients were seen 3 months ago, with no other clients having been seen
err := a.handleEntitySegment(&activity.EntityActivityLog{Clients: currentSegmentClients}, segmentTime, hll, pqOptions{
byNamespace: byNS,
byMonth: byMonth,
endTime: timeutil.EndOfMonth(segmentTime),
activePeriodStart: addMonths(-12),
activePeriodEnd: addMonths(12),
})
require.NoError(t, err)
require.Len(t, byNS, 3)
verifyByNamespaceContains(t, byNS, currentSegmentClients...)
require.Len(t, byMonth, 1)
// they should all be registered as having first been seen 3 months ago
require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[0]), segmentTime)
require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[1]), segmentTime)
require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[2]), segmentTime)
// and all 3 should be in the hyperloglog
require.Equal(t, hll.Estimate(), uint64(3))
})
t.Run("older segment clients seen earlier", func(t *testing.T) {
hll := hyperloglog.New()
byNS := make(summaryByNamespace)
byNS.add(currentSegmentClients[0])
byNS.add(currentSegmentClients[1])
byMonth := make(summaryByMonth)
segmentTime := addMonths(-3)
seenBefore2Months := addMonths(-2)
seenBefore1Month := addMonths(-1)
// client 0 was seen 2 months ago
byMonth.add(currentSegmentClients[0], seenBefore2Months)
// client 1 was seen 1 month ago
byMonth.add(currentSegmentClients[1], seenBefore1Month)
// handle clients 0, 1, and 2 as having been seen 3 months ago
err := a.handleEntitySegment(&activity.EntityActivityLog{Clients: currentSegmentClients}, segmentTime, hll, pqOptions{
byNamespace: byNS,
byMonth: byMonth,
endTime: timeutil.EndOfMonth(segmentTime),
activePeriodStart: addMonths(-12),
activePeriodEnd: addMonths(12),
})
require.NoError(t, err)
require.Len(t, byNS, 3)
verifyByNamespaceContains(t, byNS, currentSegmentClients...)
// we expect that they will only be registered as new 3 months ago, because that's when they were first seen
require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[0]), segmentTime)
require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[1]), segmentTime)
require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[2]), segmentTime)
require.Equal(t, hll.Estimate(), uint64(3))
})
t.Run("disjoint set of clients", func(t *testing.T) {
hll := hyperloglog.New()
byNS := make(summaryByNamespace)
byNS.add(currentSegmentClients[0])
byNS.add(currentSegmentClients[1])
byMonth := make(summaryByMonth)
segmentTime := addMonths(-3)
seenBefore2Months := addMonths(-2)
seenBefore1Month := addMonths(-1)
// client 0 was seen 2 months ago
byMonth.add(currentSegmentClients[0], seenBefore2Months)
// client 1 was seen 1 month ago
byMonth.add(currentSegmentClients[1], seenBefore1Month)
// handle client 2 as having been seen 3 months ago
err := a.handleEntitySegment(&activity.EntityActivityLog{Clients: currentSegmentClients[2:]}, segmentTime, hll, pqOptions{
byNamespace: byNS,
byMonth: byMonth,
endTime: timeutil.EndOfMonth(segmentTime),
activePeriodStart: addMonths(-12),
activePeriodEnd: addMonths(12),
})
require.NoError(t, err)
require.Len(t, byNS, 3)
verifyByNamespaceContains(t, byNS, currentSegmentClients...)
// client 2 should be added to the map, and the other clients should stay where they were
require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[0]), seenBefore2Months)
require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[1]), seenBefore1Month)
require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[2]), segmentTime)
// the hyperloglog will have 1 element, because there was only 1 client in the segment
require.Equal(t, hll.Estimate(), uint64(1))
})
t.Run("new clients same namespaces", func(t *testing.T) {
hll := hyperloglog.New()
byNS := make(summaryByNamespace)
byNS.add(currentSegmentClients[0])
byNS.add(currentSegmentClients[1])
byNS.add(currentSegmentClients[2])
byMonth := make(summaryByMonth)
segmentTime := addMonths(-3)
seenBefore2Months := addMonths(-2)
seenBefore1Month := addMonths(-1)
// client 0 and 2 were seen 2 months ago
byMonth.add(currentSegmentClients[0], seenBefore2Months)
byMonth.add(currentSegmentClients[2], seenBefore2Months)
// client 1 was seen 1 month ago
byMonth.add(currentSegmentClients[1], seenBefore1Month)
// create 3 additional clients
// these have ns-1, ns-2, ns-3 and mnt-1, mnt-2, mnt-3
moreSegmentClients := make([]*activity.EntityRecord, 0, 3)
for i := 0; i < 3; i++ {
moreSegmentClients = append(moreSegmentClients, &activity.EntityRecord{
ClientID: fmt.Sprintf("id-%d", i+3),
NamespaceID: fmt.Sprintf("ns-%d", i),
MountAccessor: fmt.Sprintf("ns-%d", i),
NonEntity: i == 1,
})
}
// 3 new clients have been seen 3 months ago
err := a.handleEntitySegment(&activity.EntityActivityLog{Clients: moreSegmentClients}, segmentTime, hll, pqOptions{
byNamespace: byNS,
byMonth: byMonth,
endTime: timeutil.EndOfMonth(segmentTime),
activePeriodStart: addMonths(-12),
activePeriodEnd: addMonths(12),
})
require.NoError(t, err)
// there are only 3 namespaces, since both currentSegmentClients and moreSegmentClients use the same namespaces
require.Len(t, byNS, 3)
verifyByNamespaceContains(t, byNS, currentSegmentClients...)
verifyByNamespaceContains(t, byNS, moreSegmentClients...)
// The segment clients that have already been seen have their same first seen dates
require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[0]), seenBefore2Months)
require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[1]), seenBefore1Month)
require.Equal(t, byMonth.firstSeen(t, currentSegmentClients[2]), seenBefore2Months)
// and the new clients should be first seen at segmentTime
require.Equal(t, byMonth.firstSeen(t, moreSegmentClients[0]), segmentTime)
require.Equal(t, byMonth.firstSeen(t, moreSegmentClients[1]), segmentTime)
require.Equal(t, byMonth.firstSeen(t, moreSegmentClients[2]), segmentTime)
// the hyperloglog will have 3 elements, because there were the 3 new elements in moreSegmentClients seen
require.Equal(t, hll.Estimate(), uint64(3))
})
}
// TestActivityLog_breakdownTokenSegment verifies that tokens are correctly added to a map that tracks counts per namespace
func TestActivityLog_breakdownTokenSegment(t *testing.T) {
toAdd := map[string]uint64{
"a": 1,
"b": 2,
"c": 3,
}
a := &ActivityLog{}
testCases := []struct {
name string
existingNamespaceCounts map[string]uint64
wantCounts map[string]uint64
}{
{
name: "empty",
wantCounts: toAdd,
},
{
name: "some overlap",
existingNamespaceCounts: map[string]uint64{
"a": 2,
"z": 1,
},
wantCounts: map[string]uint64{
"a": 3,
"b": 2,
"c": 3,
"z": 1,
},
},
{
name: "disjoint sets",
existingNamespaceCounts: map[string]uint64{
"z": 5,
"y": 3,
"x": 2,
},
wantCounts: map[string]uint64{
"a": 1,
"b": 2,
"c": 3,
"z": 5,
"y": 3,
"x": 2,
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
byNamespace := make(map[string]*processByNamespace)
for k, v := range tc.existingNamespaceCounts {
byNamespace[k] = newByNamespace()
byNamespace[k].Counts.Tokens = v
}
a.breakdownTokenSegment(&activity.TokenCount{CountByNamespaceID: toAdd}, byNamespace)
got := make(map[string]uint64)
for k, v := range byNamespace {
got[k] = v.Counts.Tokens
}
require.Equal(t, tc.wantCounts, got)
})
}
}
// TestActivityLog_writePrecomputedQuery calls writePrecomputedQuery for a segment with 1 non entity and 1 entity client,
// which have different namespaces and mounts. The precomputed query is then retrieved from storage and we verify that
// the data structure is filled correctly
func TestActivityLog_writePrecomputedQuery(t *testing.T) {
core, _, _ := TestCoreUnsealed(t)
a := core.activityLog
a.SetEnable(true)
byMonth := make(summaryByMonth)
byNS := make(summaryByNamespace)
clientEntity := &activity.EntityRecord{
ClientID: "id-1",
NamespaceID: "ns-1",
MountAccessor: "mnt-1",
}
clientNonEntity := &activity.EntityRecord{
ClientID: "id-2",
NamespaceID: "ns-2",
MountAccessor: "mnt-2",
NonEntity: true,
}
now := time.Now()
// add the 2 clients to the namespace and month summaries
processClientRecord(clientEntity, byNS, byMonth, now)
processClientRecord(clientNonEntity, byNS, byMonth, now)
endTime := timeutil.EndOfMonth(now)
opts := pqOptions{
byNamespace: byNS,
byMonth: byMonth,
endTime: endTime,
}
err := a.writePrecomputedQuery(context.Background(), now, opts)
require.NoError(t, err)
// read the query back from storage
val, err := a.queryStore.Get(context.Background(), now, endTime)
require.NoError(t, err)
require.Equal(t, now.UTC().Unix(), val.StartTime.UTC().Unix())
require.Equal(t, endTime.UTC().Unix(), val.EndTime.UTC().Unix())
// ns-1 and ns-2 should both be present in the results
require.Len(t, val.Namespaces, 2)
require.Len(t, val.Months, 1)
resultByNS := make(map[string]*activity.NamespaceRecord)
for _, ns := range val.Namespaces {
resultByNS[ns.NamespaceID] = ns
}
ns1 := resultByNS["ns-1"]
ns2 := resultByNS["ns-2"]
require.Equal(t, ns1.Entities, uint64(1))
require.Equal(t, ns1.NonEntityTokens, uint64(0))
require.Equal(t, ns2.Entities, uint64(0))
require.Equal(t, ns2.NonEntityTokens, uint64(1))
require.Len(t, ns1.Mounts, 1)
require.Len(t, ns2.Mounts, 1)
// ns-1 needs to have mnt-1
require.Contains(t, ns1.Mounts[0].MountPath, "mnt-1")
// ns-2 needs to have mnt-2
require.Contains(t, ns2.Mounts[0].MountPath, "mnt-2")
require.Equal(t, 1, ns1.Mounts[0].Counts.EntityClients)
require.Equal(t, 0, ns1.Mounts[0].Counts.NonEntityClients)
require.Equal(t, 0, ns2.Mounts[0].Counts.EntityClients)
require.Equal(t, 1, ns2.Mounts[0].Counts.NonEntityClients)
monthRecord := val.Months[0]
// there should only be one month present, since the clients were added with the same timestamp
require.Equal(t, monthRecord.Timestamp, timeutil.StartOfMonth(now).UTC().Unix())
require.Equal(t, 1, monthRecord.Counts.NonEntityClients)
require.Equal(t, 1, monthRecord.Counts.EntityClients)
require.Len(t, monthRecord.Namespaces, 2)
require.Len(t, monthRecord.NewClients.Namespaces, 2)
require.Equal(t, 1, monthRecord.NewClients.Counts.EntityClients)
require.Equal(t, 1, monthRecord.NewClients.Counts.NonEntityClients)
}