open-vault/vault/activity/query.go
Hridoy Roy 679ccc81a5
Query and Precompute Non-Contiguous Segments in the Activity Log (#15352)
* query and precompute non-contiguous segments in the activity log

* changelog

* newline formatting

* make fmt

* report listener and storage types as found keys

* report listener and storage types as found keys

* Update vault/activity_log_test.go

Co-authored-by: Chris Capurso <1036769+ccapurso@users.noreply.github.com>

* review comments

* merge conflict

* merge conflict

* merge conflict

* fix unchecked merge conflict

Co-authored-by: Chris Capurso <1036769+ccapurso@users.noreply.github.com>
2022-05-17 12:17:32 -07:00

310 lines
8.6 KiB
Go

package activity
import (
"context"
"encoding/json"
"errors"
"fmt"
"sort"
"strconv"
"time"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/timeutil"
"github.com/hashicorp/vault/sdk/helper/compressutil"
"github.com/hashicorp/vault/sdk/logical"
)
type NamespaceRecord struct {
NamespaceID string `json:"namespace_id"`
Entities uint64 `json:"entities"`
NonEntityTokens uint64 `json:"non_entity_tokens"`
Mounts []*MountRecord `json:"mounts"`
}
type CountsRecord struct {
EntityClients int `json:"entity_clients"`
NonEntityClients int `json:"non_entity_clients"`
}
type NewClientRecord struct {
Counts *CountsRecord `json:"counts"`
Namespaces []*MonthlyNamespaceRecord `json:"namespaces"`
}
type MonthRecord struct {
Timestamp int64 `json:"timestamp"`
Counts *CountsRecord `json:"counts"`
Namespaces []*MonthlyNamespaceRecord `json:"namespaces"`
NewClients *NewClientRecord `json:"new_clients"`
}
type MonthlyNamespaceRecord struct {
NamespaceID string `json:"namespace_id"`
Counts *CountsRecord `json:"counts"`
Mounts []*MountRecord `json:"mounts"`
}
type MountRecord struct {
MountPath string `json:"mount_path"`
Counts *CountsRecord `json:"counts"`
}
type PrecomputedQuery struct {
StartTime time.Time
EndTime time.Time
Namespaces []*NamespaceRecord `json:"namespaces"`
Months []*MonthRecord `json:"months"`
}
type PrecomputedQueryStore struct {
logger log.Logger
view logical.Storage
}
// The query store should be initialized with a view to the subdirectory
// it should use, like "queries".
func NewPrecomputedQueryStore(logger log.Logger, view logical.Storage, retentionMonths int) *PrecomputedQueryStore {
return &PrecomputedQueryStore{
logger: logger,
view: view,
}
}
func (s *PrecomputedQueryStore) Put(ctx context.Context, p *PrecomputedQuery) error {
path := fmt.Sprintf("%v/%v", p.StartTime.Unix(), p.EndTime.Unix())
asJson, err := json.Marshal(p)
if err != nil {
return err
}
compressedPq, err := compressutil.Compress(asJson, &compressutil.CompressionConfig{
Type: compressutil.CompressionTypeLZ4,
})
if err != nil {
return err
}
err = s.view.Put(ctx, &logical.StorageEntry{
Key: path,
Value: compressedPq,
})
if err != nil {
return err
}
return nil
}
func (s *PrecomputedQueryStore) listStartTimes(ctx context.Context) ([]time.Time, error) {
// We could cache this to save a storage operation on each query,
// but that seems like a marginal improvment.
rawStartTimes, err := s.view.List(ctx, "")
if err != nil {
return nil, err
}
startTimes := make([]time.Time, 0, len(rawStartTimes))
for _, raw := range rawStartTimes {
t, err := timeutil.ParseTimeFromPath(raw)
if err != nil {
s.logger.Warn("could not parse precomputed query subdirectory", "key", raw)
continue
}
startTimes = append(startTimes, t)
}
return startTimes, nil
}
func (s *PrecomputedQueryStore) listEndTimes(ctx context.Context, startTime time.Time) ([]time.Time, error) {
rawEndTimes, err := s.view.List(ctx, fmt.Sprintf("%v/", startTime.Unix()))
if err != nil {
return nil, err
}
endTimes := make([]time.Time, 0, len(rawEndTimes))
for _, raw := range rawEndTimes {
val, err := strconv.ParseInt(raw, 10, 64)
if err != nil {
s.logger.Warn("could not parse precomputed query end time", "key", raw)
continue
}
endTimes = append(endTimes, time.Unix(val, 0).UTC())
}
return endTimes, nil
}
func (s *PrecomputedQueryStore) getMaxEndTime(ctx context.Context, startTime time.Time, endTimeBound time.Time) (time.Time, error) {
rawEndTimes, err := s.view.List(ctx, fmt.Sprintf("%v/", startTime.Unix()))
if err != nil {
return time.Time{}, err
}
maxEndTime := time.Time{}
for _, raw := range rawEndTimes {
val, err := strconv.ParseInt(raw, 10, 64)
if err != nil {
s.logger.Warn("could not parse precomputed query end time", "key", raw)
continue
}
endTime := time.Unix(val, 0).UTC()
s.logger.Trace("end time in consideration is", "end time", endTime, "end time bound", endTimeBound)
if endTime.After(maxEndTime) && !endTime.After(endTimeBound) {
s.logger.Trace("end time has been updated")
maxEndTime = endTime
}
}
return maxEndTime, nil
}
func (s *PrecomputedQueryStore) QueriesAvailable(ctx context.Context) (bool, error) {
startTimes, err := s.listStartTimes(ctx)
if err != nil {
return false, err
}
return len(startTimes) > 0, nil
}
func (s *PrecomputedQueryStore) Get(ctx context.Context, startTime, endTime time.Time) (*PrecomputedQuery, error) {
if startTime.After(endTime) {
return nil, errors.New("start time is after end time")
}
startTime = timeutil.StartOfMonth(startTime)
endTime = timeutil.EndOfMonth(endTime)
s.logger.Trace("searching for matching queries", "startTime", startTime, "endTime", endTime)
// Find the oldest continuous region which overlaps with the given range.
// We only have to handle some collection of lower triangles like this,
// not arbitrary sets of endpoints (except in the middle of writes or GC):
//
// start ->
// end #
// | ##
// V ###
//
// #
// ##
// ###
//
// (1) find all saved start times T that are
// in [startTime,endTime]
// (if there is some report that overlaps, it will
// have a start time in the range-- an overlap
// only at the end is impossible.)
// (2) take the latest continguous region within
// that set
// i.e., walk up the diagonal as far as we can in a single
// triangle.
// (These could be combined into a single pass, but
// that seems more complicated to understand.)
startTimes, err := s.listStartTimes(ctx)
if err != nil {
return nil, err
}
s.logger.Trace("retrieved start times from storage", "startTimes", startTimes)
filteredList := make([]time.Time, 0)
for _, t := range startTimes {
if timeutil.InRange(t, startTime, endTime) {
filteredList = append(filteredList, t)
}
}
s.logger.Trace("filtered to range", "startTimes", filteredList)
if len(filteredList) == 0 {
return nil, nil
}
// Descending order, as required by the timeutil function
sort.Slice(filteredList, func(i, j int) bool {
return filteredList[i].After(filteredList[j])
})
closestStartTime := time.Time{}
closestEndTime := time.Time{}
maxTimeDifference := time.Duration(0)
for i := len(filteredList) - 1; i >= 0; i-- {
testStartTime := filteredList[i]
s.logger.Trace("trying test start times", "startTime", testStartTime, "filteredList", filteredList)
testEndTime, err := s.getMaxEndTime(ctx, testStartTime, endTime)
if err != nil {
return nil, err
}
if testEndTime.IsZero() {
// Might happen if there's a race with GC
s.logger.Warn("missing end times", "start time", testStartTime)
continue
}
s.logger.Trace("retrieved max end time from storage", "endTime", testEndTime)
diff := testEndTime.Sub(testStartTime)
if diff >= maxTimeDifference {
closestStartTime = testStartTime
closestEndTime = testEndTime
maxTimeDifference = diff
s.logger.Trace("updating closest times")
}
}
s.logger.Trace("chose start end end times", "startTime", closestStartTime, "endTime")
if closestStartTime.IsZero() || closestEndTime.IsZero() {
s.logger.Warn("no start or end time in range", "start time", closestStartTime, "end time", closestEndTime)
return nil, nil
}
path := fmt.Sprintf("%v/%v", closestStartTime.Unix(), closestEndTime.Unix())
entry, err := s.view.Get(ctx, path)
if err != nil {
return nil, err
}
if entry == nil {
s.logger.Warn("no end time entry found", "start time", closestStartTime, "end time", closestEndTime)
return nil, nil
}
value, notCompressed, err := compressutil.Decompress(entry.Value)
if err != nil {
return nil, err
}
if notCompressed {
value = entry.Value
}
p := &PrecomputedQuery{}
err = json.Unmarshal(value, p)
if err != nil {
s.logger.Warn("failed query lookup at", "path", path)
return nil, err
}
return p, nil
}
func (s *PrecomputedQueryStore) DeleteQueriesBefore(ctx context.Context, retentionThreshold time.Time) error {
startTimes, err := s.listStartTimes(ctx)
if err != nil {
return err
}
for _, t := range startTimes {
path := fmt.Sprintf("%v/", t.Unix())
if t.Before(retentionThreshold) {
rawEndTimes, err := s.view.List(ctx, path)
if err != nil {
return err
}
s.logger.Trace("deleting queries", "startTime", t)
// Don't care about what the end time is,
// the start time along determines deletion.
for _, end := range rawEndTimes {
err = s.view.Delete(ctx, path+end)
if err != nil {
return err
}
}
}
}
return nil
}