ActivityLog Implement HyperLogLog Store Functionality During Precomputation (#16146)
* adding hll for each month * add changelog * removing influxdb * removing influxdb * removing influxdb * changing switch to if-else for semgrep
This commit is contained in:
parent
c57d053d28
commit
42b13448f9
|
@ -0,0 +1,3 @@
|
|||
```release-note:improvement
|
||||
core/activity: generate hyperloglogs containing clientIds for each month during precomputation
|
||||
```
|
2
go.mod
2
go.mod
|
@ -32,6 +32,7 @@ require (
|
|||
github.com/armon/go-radix v1.0.0
|
||||
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a
|
||||
github.com/aws/aws-sdk-go v1.43.4
|
||||
github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a
|
||||
github.com/cenkalti/backoff/v3 v3.2.2
|
||||
github.com/chrismalek/oktasdk-go v0.0.0-20181212195951-3430665dfaa0
|
||||
github.com/client9/misspell v0.3.4
|
||||
|
@ -255,6 +256,7 @@ require (
|
|||
github.com/couchbase/gocbcore/v10 v10.0.4 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba // indirect
|
||||
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect
|
||||
github.com/digitalocean/godo v1.7.5 // indirect
|
||||
github.com/dimchansky/utfbom v1.1.1 // indirect
|
||||
github.com/docker/cli v20.10.9+incompatible // indirect
|
||||
|
|
5
go.sum
5
go.sum
|
@ -273,6 +273,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.6.1 h1:1Pls85C5CFjhE3aH+h85/hyAk89kQ
|
|||
github.com/aws/aws-sdk-go-v2/service/sts v1.6.1/go.mod h1:hLZ/AnkIKHLuPGjEiyghNEdvJ2PP0MgOxcmv9EBJ4xs=
|
||||
github.com/aws/smithy-go v1.7.0 h1:+cLHMRrDZvQ4wk+KuQ9yH6eEg6KZEJ9RI2IkDqnygCg=
|
||||
github.com/aws/smithy-go v1.7.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
|
||||
github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a h1:eqjiAL3qooftPm8b9C1GsSSRcmlw7iOva8vdBTmV2PY=
|
||||
github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a/go.mod h1:2stgcRjl6QmW+gU2h5E7BQXg4HU0gzxKWDuT5HviN9s=
|
||||
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7On9kyUzm7fvRZumSyy/IUiSC7AzL0I1jKKtwooA=
|
||||
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
|
||||
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
|
||||
|
@ -499,6 +501,8 @@ github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba h1:p6poVbjHDkK
|
|||
github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0=
|
||||
github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc h1:8WFBn63wegobsYAX0YjD+8suexZDga5CctH4CCTx2+8=
|
||||
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
|
||||
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
|
||||
github.com/digitalocean/godo v1.7.5 h1:JOQbAO6QT1GGjor0doT0mXefX2FgUDPOpYh2RaXA+ko=
|
||||
github.com/digitalocean/godo v1.7.5/go.mod h1:h6faOIcZ8lWIwNQ+DN7b3CgX4Kwby5T+nbpNqkUIozU=
|
||||
|
@ -1093,6 +1097,7 @@ github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH
|
|||
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
|
||||
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
|
||||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
github.com/influxdata/influxdb v1.7.6/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
|
||||
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab h1:HqW4xhhynfjrtEiiSGcQUd6vrK23iMam1FO8rI7mwig=
|
||||
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
|
||||
github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA=
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/axiomhq/hyperloglog"
|
||||
"github.com/golang/protobuf/proto"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/vault/helper/metricsutil"
|
||||
|
@ -36,6 +37,9 @@ const (
|
|||
activityConfigKey = "config"
|
||||
activityIntentLogKey = "endofmonth"
|
||||
|
||||
// sketch for each month that stores hash of client ids
|
||||
distinctClientsBasePath = "log/distinctclients/"
|
||||
|
||||
// for testing purposes (public as needed)
|
||||
ActivityLogPrefix = "sys/counters/activity/log/"
|
||||
ActivityPrefix = "sys/counters/activity/"
|
||||
|
@ -363,6 +367,53 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
|
|||
return nil
|
||||
}
|
||||
|
||||
// CreateOrFetchHyperlogLog creates a new hyperlogLog for each startTime (month) if it does not exist in storage.
|
||||
// hyperlogLog is used here to solve count-distinct problem i.e, to count the number of distinct clients
|
||||
// In activity log, hyperloglog is a sketch containing clientID's in a given month
|
||||
func (a *ActivityLog) CreateOrFetchHyperlogLog(ctx context.Context, startTime time.Time) *hyperloglog.Sketch {
|
||||
monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix())
|
||||
hll := hyperloglog.New()
|
||||
a.logger.Trace("fetching hyperloglog ", "path", monthlyHLLPath)
|
||||
data, err := a.view.Get(ctx, monthlyHLLPath)
|
||||
if err != nil {
|
||||
a.logger.Error("error fetching hyperloglog", "path", monthlyHLLPath, "error", err)
|
||||
return hll
|
||||
}
|
||||
if data == nil {
|
||||
a.logger.Trace("creating hyperloglog ", "path", monthlyHLLPath)
|
||||
err = a.StoreHyperlogLog(ctx, startTime, hll)
|
||||
if err != nil {
|
||||
a.logger.Error("error storing hyperloglog", "path", monthlyHLLPath, "error", err)
|
||||
return hll
|
||||
}
|
||||
} else {
|
||||
err = hll.UnmarshalBinary(data.Value)
|
||||
if err != nil {
|
||||
a.logger.Error("error unmarshaling hyperloglog", "path", monthlyHLLPath, "error", err)
|
||||
return hll
|
||||
}
|
||||
}
|
||||
return hll
|
||||
}
|
||||
|
||||
// StoreHyperlogLog stores the hyperloglog (a sketch containing client IDs) for startTime (month) in storage
|
||||
func (a *ActivityLog) StoreHyperlogLog(ctx context.Context, startTime time.Time, newHll *hyperloglog.Sketch) error {
|
||||
monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix())
|
||||
a.logger.Trace("storing hyperloglog ", "path", monthlyHLLPath)
|
||||
marshalledHll, err := newHll.MarshalBinary()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = a.view.Put(ctx, &logical.StorageEntry{
|
||||
Key: monthlyHLLPath,
|
||||
Value: marshalledHll,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// :force: forces a save of tokens/entities even if the in-memory log is empty
|
||||
func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error {
|
||||
entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, a.currentSegment.startTimestamp, a.currentSegment.clientSequenceNumber)
|
||||
|
@ -515,7 +566,7 @@ func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime
|
|||
}
|
||||
|
||||
// WalkEntitySegments loads each of the entity segments for a particular start time
|
||||
func (a *ActivityLog) WalkEntitySegments(ctx context.Context, startTime time.Time, walkFn func(*activity.EntityActivityLog, time.Time) error) error {
|
||||
func (a *ActivityLog) WalkEntitySegments(ctx context.Context, startTime time.Time, hll *hyperloglog.Sketch, walkFn func(*activity.EntityActivityLog, time.Time, *hyperloglog.Sketch) error) error {
|
||||
basePath := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/"
|
||||
pathList, err := a.view.List(ctx, basePath)
|
||||
if err != nil {
|
||||
|
@ -537,7 +588,7 @@ func (a *ActivityLog) WalkEntitySegments(ctx context.Context, startTime time.Tim
|
|||
if err != nil {
|
||||
return fmt.Errorf("unable to parse segment %v%v: %w", basePath, path, err)
|
||||
}
|
||||
err = walkFn(out, startTime)
|
||||
err = walkFn(out, startTime, hll)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to walk entities: %w", err)
|
||||
}
|
||||
|
@ -2069,10 +2120,21 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
|
|||
byNamespace := make(map[string]*processByNamespace)
|
||||
byMonth := make(map[int64]*processMonth)
|
||||
|
||||
walkEntities := func(l *activity.EntityActivityLog, startTime time.Time) error {
|
||||
walkEntities := func(l *activity.EntityActivityLog, startTime time.Time, hll *hyperloglog.Sketch) error {
|
||||
for _, e := range l.Clients {
|
||||
|
||||
processClientRecord(e, byNamespace, byMonth, startTime)
|
||||
|
||||
// We maintain an hyperloglog for each month
|
||||
// hyperloglog is a sketch (hyperloglog data-structure) containing client ID's in a given month
|
||||
// hyperloglog is used in activity log to get the approximate number new clients in the current billing month
|
||||
// by counting the number of distinct clients in all the months including current month
|
||||
// (this can be done by merging the hyperloglog all months with current month hyperloglog)
|
||||
// and subtracting the number of distinct clients in the current month
|
||||
// NOTE: current month here is not the month of startTime but the time period from the start of the current month,
|
||||
// up until the time that this request was made.
|
||||
hll.Insert([]byte(e.ClientID))
|
||||
|
||||
// The byMonth map will be filled in the reverse order of time. For
|
||||
// example, if the billing period is from Jan to June, the byMonth
|
||||
// will be filled for June first, May next and so on till Jan. When
|
||||
|
@ -2144,11 +2206,17 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
|
|||
break
|
||||
}
|
||||
|
||||
err = a.WalkEntitySegments(ctx, startTime, walkEntities)
|
||||
hyperloglog := a.CreateOrFetchHyperlogLog(ctx, startTime)
|
||||
err = a.WalkEntitySegments(ctx, startTime, hyperloglog, walkEntities)
|
||||
if err != nil {
|
||||
a.logger.Warn("failed to load previous segments", "error", err)
|
||||
return err
|
||||
}
|
||||
// Store the hyperloglog
|
||||
err = a.StoreHyperlogLog(ctx, startTime, hyperloglog)
|
||||
if err != nil {
|
||||
a.logger.Warn("failed to store hyperloglog for month", "start time", startTime, "error", err)
|
||||
}
|
||||
err = a.WalkTokenSegments(ctx, startTime, walkTokens)
|
||||
if err != nil {
|
||||
a.logger.Warn("failed to load previous token counts", "error", err)
|
||||
|
@ -2646,7 +2714,8 @@ func (a *ActivityLog) writeExport(ctx context.Context, rw http.ResponseWriter, f
|
|||
a.logger.Info("starting activity log export", "start_time", startTime, "end_time", endTime, "format", format)
|
||||
|
||||
dedupedIds := make(map[string]struct{})
|
||||
walkEntities := func(l *activity.EntityActivityLog, startTime time.Time) error {
|
||||
|
||||
walkEntities := func(l *activity.EntityActivityLog, startTime time.Time, hll *hyperloglog.Sketch) error {
|
||||
for _, e := range l.Clients {
|
||||
if _, ok := dedupedIds[e.ClientID]; ok {
|
||||
continue
|
||||
|
@ -2663,8 +2732,9 @@ func (a *ActivityLog) writeExport(ctx context.Context, rw http.ResponseWriter, f
|
|||
}
|
||||
|
||||
// For each month in the filtered list walk all the log segments
|
||||
|
||||
for _, startTime := range filteredList {
|
||||
err := a.WalkEntitySegments(ctx, startTime, walkEntities)
|
||||
err := a.WalkEntitySegments(ctx, startTime, nil, walkEntities)
|
||||
if err != nil {
|
||||
a.logger.Error("failed to load segments for export", "error", err)
|
||||
return fmt.Errorf("failed to load segments for export: %w", err)
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/axiomhq/hyperloglog"
|
||||
"github.com/go-test/deep"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/hashicorp/vault/helper/constants"
|
||||
|
@ -472,6 +473,34 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) {
|
|||
expectedEntityIDs(t, out, ids)
|
||||
}
|
||||
|
||||
// Test to check store hyperloglog and fetch hyperloglog from storage
|
||||
func TestActivityLog_StoreAndReadHyperloglog(t *testing.T) {
|
||||
core, _, _ := TestCoreUnsealed(t)
|
||||
ctx := context.Background()
|
||||
|
||||
a := core.activityLog
|
||||
a.SetStandbyEnable(ctx, true)
|
||||
a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment
|
||||
currentMonth := timeutil.StartOfMonth(time.Now())
|
||||
currentMonthHll := hyperloglog.New()
|
||||
currentMonthHll.Insert([]byte("a"))
|
||||
currentMonthHll.Insert([]byte("a"))
|
||||
currentMonthHll.Insert([]byte("b"))
|
||||
currentMonthHll.Insert([]byte("c"))
|
||||
currentMonthHll.Insert([]byte("d"))
|
||||
currentMonthHll.Insert([]byte("d"))
|
||||
|
||||
err := a.StoreHyperlogLog(ctx, currentMonth, currentMonthHll)
|
||||
if err != nil {
|
||||
t.Fatalf("error storing hyperloglog in storage: %v", err)
|
||||
}
|
||||
fetchedHll := a.CreateOrFetchHyperlogLog(ctx, currentMonth)
|
||||
// check the distinct count stored from hll
|
||||
if fetchedHll.Estimate() != 4 {
|
||||
t.Fatalf("wrong number of distinct elements: expected: 5 actual: %v", fetchedHll.Estimate())
|
||||
}
|
||||
}
|
||||
|
||||
func TestModifyResponseMonthsNilAppend(t *testing.T) {
|
||||
end := time.Now().UTC()
|
||||
start := timeutil.StartOfMonth(end).AddDate(0, -5, 0)
|
||||
|
|
Loading…
Reference in New Issue