2973 lines
96 KiB
Go
2973 lines
96 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package vault
|
|
|
|
import (
|
|
"context"
|
|
"encoding/csv"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
"unicode/utf8"
|
|
|
|
"github.com/axiomhq/hyperloglog"
|
|
"github.com/golang/protobuf/proto"
|
|
log "github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/vault/helper/metricsutil"
|
|
"github.com/hashicorp/vault/helper/namespace"
|
|
"github.com/hashicorp/vault/helper/timeutil"
|
|
"github.com/hashicorp/vault/sdk/logical"
|
|
"github.com/hashicorp/vault/vault/activity"
|
|
"go.uber.org/atomic"
|
|
)
|
|
|
|
const (
|
|
// activitySubPath is the directory under the system view where
|
|
// the log will be stored.
|
|
activitySubPath = "counters/activity/"
|
|
activityEntityBasePath = "log/entity/"
|
|
activityTokenBasePath = "log/directtokens/"
|
|
activityQueryBasePath = "queries/"
|
|
activityConfigKey = "config"
|
|
activityIntentLogKey = "endofmonth"
|
|
|
|
// sketch for each month that stores hash of client ids
|
|
distinctClientsBasePath = "log/distinctclients/"
|
|
|
|
// for testing purposes (public as needed)
|
|
ActivityLogPrefix = "sys/counters/activity/log/"
|
|
ActivityPrefix = "sys/counters/activity/"
|
|
|
|
// Time to wait on perf standby before sending fragment
|
|
activityFragmentStandbyTime = 10 * time.Minute
|
|
|
|
// Time between writes of segment to storage
|
|
activitySegmentInterval = 10 * time.Minute
|
|
|
|
// Timeout on RPC calls.
|
|
activityFragmentSendTimeout = 1 * time.Minute
|
|
|
|
// Timeout on storage calls.
|
|
activitySegmentWriteTimeout = 1 * time.Minute
|
|
|
|
// Number of client records to store per segment. Each ClientRecord may
|
|
// consume upto 99 bytes; rounding it to 100bytes. This []byte undergo JSON marshalling
|
|
// before adding them in storage increasing the size by approximately 4/3 times. Considering the storage
|
|
// limit of 512KB per storage entry, we can roughly store 512KB/(100bytes * 4/3) yielding approximately 3820 records.
|
|
ActivitySegmentClientCapacity = 3820
|
|
|
|
// Maximum number of segments per month. This allows for 700K entities per
|
|
// month; 700K/3820 (ActivitySegmentClientCapacity). These limits are geared towards controlling the storage
|
|
// implications of persisting activity logs. If we hit a scenario where the
|
|
// storage consequences are less important in comparison to the accuracy of
|
|
// the client activity, these limits can be further relaxed or even be
|
|
// removed.
|
|
activityLogMaxSegmentPerMonth = 184
|
|
|
|
// trackedTWESegmentPeriod is a time period of a little over a month, and represents
|
|
// the amount of time that needs to pass after a 1.9 or later upgrade to result in
|
|
// all fragments and segments no longer storing token counts in the directtokens
|
|
// storage path.
|
|
trackedTWESegmentPeriod = 35 * 24
|
|
|
|
// Known types of activity events; there's presently two internal event
|
|
// types (tokens/clients with and without entities), but we're beginning
|
|
// to support additional buckets for e.g., ACME requests.
|
|
nonEntityTokenActivityType = "non-entity-token"
|
|
entityActivityType = "entity"
|
|
)
|
|
|
|
type segmentInfo struct {
|
|
startTimestamp int64
|
|
currentClients *activity.EntityActivityLog
|
|
clientSequenceNumber uint64
|
|
// DEPRECATED
|
|
// This field is needed for backward compatibility with fragments
|
|
// and segments created with vault versions before 1.9.
|
|
tokenCount *activity.TokenCount
|
|
}
|
|
|
|
// ActivityLog tracks unique entity counts and non-entity token counts.
|
|
// It handles assembling log fragments (and sending them to the active
|
|
// node), writing log segments, and precomputing queries.
|
|
type ActivityLog struct {
|
|
core *Core
|
|
configOverrides *ActivityLogCoreConfig
|
|
|
|
// ActivityLog.l protects the configuration settings, except enable, and any modifications
|
|
// to the current segment.
|
|
// Acquire "l" before fragmentLock if both must be held.
|
|
l sync.RWMutex
|
|
|
|
// fragmentLock protects enable, partialMonthClientTracker, fragment,
|
|
// standbyFragmentsReceived.
|
|
fragmentLock sync.RWMutex
|
|
|
|
// enabled indicates if the activity log is enabled for this cluster.
|
|
// This is protected by fragmentLock so we can check with only
|
|
// a single synchronization call.
|
|
enabled bool
|
|
|
|
// log destination
|
|
logger log.Logger
|
|
|
|
// metrics sink
|
|
metrics metricsutil.Metrics
|
|
|
|
// view is the storage location used by ActivityLog,
|
|
// defaults to sys/activity.
|
|
view *BarrierView
|
|
|
|
// nodeID is the ID to use for all fragments that
|
|
// are generated.
|
|
// TODO: use secondary ID when available?
|
|
nodeID string
|
|
|
|
// current log fragment (may be nil)
|
|
fragment *activity.LogFragment
|
|
fragmentCreation time.Time
|
|
|
|
// Channel to signal a new fragment has been created
|
|
// so it's appropriate to start the timer.
|
|
newFragmentCh chan struct{}
|
|
|
|
// Channel for sending fragment immediately
|
|
sendCh chan struct{}
|
|
|
|
// Channel for writing fragment immediately
|
|
writeCh chan struct{}
|
|
|
|
// Channel to stop background processing
|
|
doneCh chan struct{}
|
|
|
|
// track metadata and contents of the most recent log segment
|
|
currentSegment segmentInfo
|
|
|
|
// Fragments received from performance standbys
|
|
standbyFragmentsReceived []*activity.LogFragment
|
|
|
|
// precomputed queries
|
|
queryStore *activity.PrecomputedQueryStore
|
|
defaultReportMonths int
|
|
retentionMonths int
|
|
|
|
// channel closed by delete worker when done
|
|
deleteDone chan struct{}
|
|
|
|
// channel closed when deletion at startup is done
|
|
// (for unit test robustness)
|
|
retentionDone chan struct{}
|
|
computationWorkerDone chan struct{}
|
|
|
|
// for testing: is config currently being invalidated. protected by l
|
|
configInvalidationInProgress bool
|
|
|
|
// partialMonthClientTracker tracks active clients this month. Protected by fragmentLock.
|
|
partialMonthClientTracker map[string]*activity.EntityRecord
|
|
|
|
inprocessExport *atomic.Bool
|
|
|
|
// CensusReportDone is a channel used to signal tests upon successful calls
|
|
// to (CensusReporter).Write() in CensusReport.
|
|
CensusReportDone chan bool
|
|
|
|
// CensusReportInterval is the testing configuration for time between
|
|
// Write() calls initiated in CensusReport.
|
|
CensusReportInterval time.Duration
|
|
|
|
// clock is used to support manipulating time in unit and integration tests
|
|
clock timeutil.Clock
|
|
// precomputedQueryWritten receives an element whenever a precomputed query
|
|
// is written. It's used for unit testing
|
|
precomputedQueryWritten chan struct{}
|
|
}
|
|
|
|
// These non-persistent configuration options allow us to disable
|
|
// parts of the implementation for integration testing.
|
|
// The default values should turn everything on.
|
|
type ActivityLogCoreConfig struct {
|
|
// Enable activity log even if the feature flag not set
|
|
ForceEnable bool
|
|
|
|
// Do not start timers to send or persist fragments.
|
|
DisableTimers bool
|
|
|
|
// CensusReportInterval is the testing configuration for time
|
|
CensusReportInterval time.Duration
|
|
|
|
// MinimumRetentionMonths defines the minimum value for retention
|
|
MinimumRetentionMonths int
|
|
|
|
// Clock holds a custom clock to modify time.Now, time.Ticker, time.Timer.
|
|
// If nil, the default functions from the time package are used
|
|
Clock timeutil.Clock
|
|
}
|
|
|
|
// NewActivityLog creates an activity log.
|
|
func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics metricsutil.Metrics) (*ActivityLog, error) {
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
clock := core.activityLogConfig.Clock
|
|
if clock == nil {
|
|
clock = timeutil.DefaultClock{}
|
|
}
|
|
a := &ActivityLog{
|
|
core: core,
|
|
configOverrides: &core.activityLogConfig,
|
|
logger: logger,
|
|
view: view,
|
|
metrics: metrics,
|
|
nodeID: hostname,
|
|
newFragmentCh: make(chan struct{}, 1),
|
|
sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size
|
|
writeCh: make(chan struct{}, 1), // same for full segment
|
|
doneCh: make(chan struct{}, 1),
|
|
partialMonthClientTracker: make(map[string]*activity.EntityRecord),
|
|
CensusReportInterval: time.Hour * 1,
|
|
clock: clock,
|
|
currentSegment: segmentInfo{
|
|
startTimestamp: 0,
|
|
currentClients: &activity.EntityActivityLog{
|
|
Clients: make([]*activity.EntityRecord, 0),
|
|
},
|
|
// tokenCount is deprecated, but must still exist for the current segment
|
|
// so the fragment that was using TWEs before the 1.9 changes
|
|
// can be flushed to the current segment.
|
|
tokenCount: &activity.TokenCount{
|
|
CountByNamespaceID: make(map[string]uint64),
|
|
},
|
|
clientSequenceNumber: 0,
|
|
},
|
|
standbyFragmentsReceived: make([]*activity.LogFragment, 0),
|
|
inprocessExport: atomic.NewBool(false),
|
|
precomputedQueryWritten: make(chan struct{}),
|
|
}
|
|
|
|
config, err := a.loadConfigOrDefault(core.activeContext)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
a.SetConfigInit(config)
|
|
|
|
a.queryStore = activity.NewPrecomputedQueryStore(
|
|
logger,
|
|
view.SubView(activityQueryBasePath),
|
|
config.RetentionMonths)
|
|
|
|
return a, nil
|
|
}
|
|
|
|
// saveCurrentSegmentToStorage updates the record of Entities or
|
|
// Non Entity Tokens in persistent storage
|
|
// :force: forces a save of tokens/entities even if the in-memory log is empty
|
|
func (a *ActivityLog) saveCurrentSegmentToStorage(ctx context.Context, force bool) error {
|
|
// Prevent simultaneous changes to segment
|
|
a.l.Lock()
|
|
defer a.l.Unlock()
|
|
return a.saveCurrentSegmentToStorageLocked(ctx, force)
|
|
}
|
|
|
|
// Must be called with l held.
|
|
// :force: forces a save of tokens/entities even if the in-memory log is empty
|
|
func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, force bool) error {
|
|
defer a.metrics.MeasureSinceWithLabels([]string{"core", "activity", "segment_write"},
|
|
a.clock.Now(), []metricsutil.Label{})
|
|
|
|
// Swap out the pending fragments
|
|
a.fragmentLock.Lock()
|
|
localFragment := a.fragment
|
|
a.fragment = nil
|
|
standbys := a.standbyFragmentsReceived
|
|
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
|
|
a.fragmentLock.Unlock()
|
|
|
|
// If segment start time is zero, do not update or write
|
|
// (even if force is true). This can happen if activityLog is
|
|
// disabled after a save as been triggered.
|
|
if a.currentSegment.startTimestamp == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Measure the current fragment
|
|
if localFragment != nil {
|
|
a.metrics.IncrCounterWithLabels([]string{"core", "activity", "fragment_size"},
|
|
float32(len(localFragment.Clients)),
|
|
[]metricsutil.Label{
|
|
{"type", "entity"},
|
|
})
|
|
a.metrics.IncrCounterWithLabels([]string{"core", "activity", "fragment_size"},
|
|
float32(len(localFragment.NonEntityTokens)),
|
|
[]metricsutil.Label{
|
|
{"type", "direct_token"},
|
|
})
|
|
}
|
|
|
|
// Collect new entities and new tokens.
|
|
saveChanges := false
|
|
newEntities := make(map[string]*activity.EntityRecord)
|
|
for _, f := range append(standbys, localFragment) {
|
|
if f == nil {
|
|
continue
|
|
}
|
|
if len(f.Clients) != 0 || len(f.NonEntityTokens) != 0 {
|
|
saveChanges = true
|
|
}
|
|
for _, e := range f.Clients {
|
|
// We could sort by timestamp to see which is first.
|
|
// We'll ignore that; the order of the append above means
|
|
// that we choose entries in localFragment over those
|
|
// from standby nodes.
|
|
newEntities[e.ClientID] = e
|
|
}
|
|
// As of 1.9, a fragment should no longer have any NonEntityTokens. However
|
|
// in order to not lose any information about the current segment during the
|
|
// month when the client upgrades to 1.9, we must retain this functionality.
|
|
for ns, val := range f.NonEntityTokens {
|
|
// We track these pre-1.9 values in the old location, which is
|
|
// a.currentSegment.tokenCount, as opposed to the counter that stores tokens
|
|
// without entities that have client IDs, namely
|
|
// a.partialMonthClientTracker.nonEntityCountByNamespaceID. This preserves backward
|
|
// compatibility for the precomputedQueryWorkers and the segment storing
|
|
// logic.
|
|
a.currentSegment.tokenCount.CountByNamespaceID[ns] += val
|
|
}
|
|
}
|
|
|
|
if !saveChanges {
|
|
return nil
|
|
}
|
|
|
|
// Will all new entities fit? If not, roll over to a new segment.
|
|
available := ActivitySegmentClientCapacity - len(a.currentSegment.currentClients.Clients)
|
|
remaining := available - len(newEntities)
|
|
excess := 0
|
|
if remaining < 0 {
|
|
excess = -remaining
|
|
}
|
|
|
|
segmentClients := a.currentSegment.currentClients.Clients
|
|
excessClients := make([]*activity.EntityRecord, 0, excess)
|
|
for _, record := range newEntities {
|
|
if available > 0 {
|
|
segmentClients = append(segmentClients, record)
|
|
available -= 1
|
|
} else {
|
|
excessClients = append(excessClients, record)
|
|
}
|
|
}
|
|
a.currentSegment.currentClients.Clients = segmentClients
|
|
|
|
err := a.saveCurrentSegmentInternal(ctx, force)
|
|
if err != nil {
|
|
// The current fragment(s) have already been placed into the in-memory
|
|
// segment, but we may lose any excess (in excessClients).
|
|
// There isn't a good way to unwind the transaction on failure,
|
|
// so we may just lose some records.
|
|
return err
|
|
}
|
|
|
|
if available <= 0 {
|
|
if a.currentSegment.clientSequenceNumber >= activityLogMaxSegmentPerMonth {
|
|
// Cannot send as Warn because it will repeat too often,
|
|
// and disabling/renabling would be complicated.
|
|
a.logger.Trace("too many segments in current month", "dropped", len(excessClients))
|
|
return nil
|
|
}
|
|
|
|
// Rotate to next segment
|
|
a.currentSegment.clientSequenceNumber += 1
|
|
if len(excessClients) > ActivitySegmentClientCapacity {
|
|
a.logger.Warn("too many new active clients, dropping tail", "clients", len(excessClients))
|
|
excessClients = excessClients[:ActivitySegmentClientCapacity]
|
|
}
|
|
a.currentSegment.currentClients.Clients = excessClients
|
|
err := a.saveCurrentSegmentInternal(ctx, force)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// :force: forces a save of tokens/entities even if the in-memory log is empty
|
|
func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error {
|
|
_, err := a.saveSegmentEntitiesInternal(ctx, a.currentSegment, force)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = a.saveSegmentTokensInternal(ctx, a.currentSegment, force)
|
|
return err
|
|
}
|
|
|
|
func (a *ActivityLog) saveSegmentTokensInternal(ctx context.Context, currentSegment segmentInfo, force bool) (string, error) {
|
|
if len(currentSegment.tokenCount.CountByNamespaceID) == 0 && !force {
|
|
return "", nil
|
|
}
|
|
// RFC (VLT-120) defines this as 1-indexed, but it should be 0-indexed
|
|
tokenPath := fmt.Sprintf("%s%d/0", activityTokenBasePath, currentSegment.startTimestamp)
|
|
// We must still allow for the tokenCount of the current segment to
|
|
// be written to storage, since if we remove this code we will incur
|
|
// data loss for one segment's worth of TWEs.
|
|
// We can get away with simply using the oldest version stored because
|
|
// the storing of versions was introduced at the same time as this code.
|
|
oldestVersion, oldestUpgradeTime, err := a.core.FindOldestVersionTimestamp()
|
|
switch {
|
|
case err != nil:
|
|
a.logger.Error(fmt.Sprintf("unable to retrieve oldest version timestamp: %s", err.Error()))
|
|
case len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 &&
|
|
(oldestUpgradeTime.Add(time.Duration(trackedTWESegmentPeriod * time.Hour)).Before(time.Now())):
|
|
a.logger.Error(fmt.Sprintf("storing nonzero token count over a month after vault was upgraded to %s", oldestVersion))
|
|
default:
|
|
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 {
|
|
a.logger.Info("storing nonzero token count")
|
|
}
|
|
}
|
|
tokenCount, err := proto.Marshal(a.currentSegment.tokenCount)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
a.logger.Trace("writing segment", "path", tokenPath)
|
|
err = a.view.Put(ctx, &logical.StorageEntry{
|
|
Key: tokenPath,
|
|
Value: tokenCount,
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return tokenPath, nil
|
|
}
|
|
|
|
func (a *ActivityLog) saveSegmentEntitiesInternal(ctx context.Context, currentSegment segmentInfo, force bool) (string, error) {
|
|
entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, currentSegment.startTimestamp, currentSegment.clientSequenceNumber)
|
|
|
|
for _, client := range a.currentSegment.currentClients.Clients {
|
|
// Explicitly catch and throw clear error message if client ID creation and storage
|
|
// results in a []byte that doesn't assert into a valid string.
|
|
if !utf8.ValidString(client.ClientID) {
|
|
return "", fmt.Errorf("client ID %q is not a valid string:", client.ClientID)
|
|
}
|
|
}
|
|
|
|
if len(currentSegment.currentClients.Clients) == 0 && !force {
|
|
return "", nil
|
|
}
|
|
clients, err := proto.Marshal(currentSegment.currentClients)
|
|
if err != nil {
|
|
return entityPath, err
|
|
}
|
|
|
|
a.logger.Trace("writing segment", "path", entityPath)
|
|
err = a.view.Put(ctx, &logical.StorageEntry{
|
|
Key: entityPath,
|
|
Value: clients,
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return entityPath, err
|
|
}
|
|
|
|
// parseSegmentNumberFromPath returns the segment number from a path
|
|
// (and if it exists - it is the last element in the path)
|
|
func parseSegmentNumberFromPath(path string) (int, bool) {
|
|
// as long as both s and sep are not "", len(elems) >= 1
|
|
elems := strings.Split(path, "/")
|
|
segmentNum, err := strconv.Atoi(elems[len(elems)-1])
|
|
if err != nil {
|
|
return 0, false
|
|
}
|
|
|
|
return segmentNum, true
|
|
}
|
|
|
|
// availableLogs returns the start_time(s) (in UTC) associated with months for which logs exist,
|
|
// sorted last to first
|
|
func (a *ActivityLog) availableLogs(ctx context.Context) ([]time.Time, error) {
|
|
paths := make([]string, 0)
|
|
for _, basePath := range []string{activityEntityBasePath, activityTokenBasePath} {
|
|
p, err := a.view.List(ctx, basePath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
paths = append(paths, p...)
|
|
}
|
|
|
|
pathSet := make(map[time.Time]struct{})
|
|
out := make([]time.Time, 0)
|
|
for _, path := range paths {
|
|
// generate a set of unique start times
|
|
time, err := timeutil.ParseTimeFromPath(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if _, present := pathSet[time]; !present {
|
|
pathSet[time] = struct{}{}
|
|
out = append(out, time)
|
|
}
|
|
}
|
|
|
|
sort.Slice(out, func(i, j int) bool {
|
|
// sort in reverse order to make processing most recent segment easier
|
|
return out[i].After(out[j])
|
|
})
|
|
|
|
a.logger.Trace("scanned existing logs", "out", out)
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// getMostRecentActivityLogSegment gets the times (in UTC) associated with the most recent
|
|
// contiguous set of activity logs, sorted in decreasing order (latest to earliest)
|
|
func (a *ActivityLog) getMostRecentActivityLogSegment(ctx context.Context) ([]time.Time, error) {
|
|
logTimes, err := a.availableLogs(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return timeutil.GetMostRecentContiguousMonths(logTimes), nil
|
|
}
|
|
|
|
// getLastEntitySegmentNumber returns the (non-negative) last segment number for the :startTime:, if it exists
|
|
func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime time.Time) (uint64, bool, error) {
|
|
p, err := a.view.List(ctx, activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/")
|
|
if err != nil {
|
|
return 0, false, err
|
|
}
|
|
|
|
highestNum := -1
|
|
for _, path := range p {
|
|
if num, ok := parseSegmentNumberFromPath(path); ok {
|
|
if num > highestNum {
|
|
highestNum = num
|
|
}
|
|
}
|
|
}
|
|
|
|
if highestNum < 0 {
|
|
// numbers less than 0 are invalid. if a negative number is the highest value, there isn't a segment
|
|
return 0, false, nil
|
|
}
|
|
|
|
return uint64(highestNum), true, nil
|
|
}
|
|
|
|
// WalkEntitySegments loads each of the entity segments for a particular start time
|
|
func (a *ActivityLog) WalkEntitySegments(ctx context.Context, startTime time.Time, hll *hyperloglog.Sketch, walkFn func(*activity.EntityActivityLog, time.Time, *hyperloglog.Sketch) error) error {
|
|
basePath := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/"
|
|
pathList, err := a.view.List(ctx, basePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, path := range pathList {
|
|
raw, err := a.view.Get(ctx, basePath+path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if raw == nil {
|
|
a.logger.Warn("expected log segment not found", "startTime", startTime, "segment", path)
|
|
continue
|
|
}
|
|
|
|
out := &activity.EntityActivityLog{}
|
|
err = proto.Unmarshal(raw.Value, out)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to parse segment %v%v: %w", basePath, path, err)
|
|
}
|
|
err = walkFn(out, startTime, hll)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to walk entities: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// WalkTokenSegments loads each of the token segments (expected 1) for a particular start time
|
|
func (a *ActivityLog) WalkTokenSegments(ctx context.Context,
|
|
startTime time.Time,
|
|
walkFn func(*activity.TokenCount),
|
|
) error {
|
|
basePath := activityTokenBasePath + fmt.Sprint(startTime.Unix()) + "/"
|
|
pathList, err := a.view.List(ctx, basePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, path := range pathList {
|
|
raw, err := a.view.Get(ctx, basePath+path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if raw == nil {
|
|
a.logger.Trace("no tokens without entities stored without tracking", "startTime", startTime, "segment", path)
|
|
continue
|
|
}
|
|
out := &activity.TokenCount{}
|
|
err = proto.Unmarshal(raw.Value, out)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to parse token segment %v%v: %w", basePath, path, err)
|
|
}
|
|
walkFn(out)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// loadPriorEntitySegment populates the in-memory tracker for entity IDs that have
|
|
// been active "this month"
|
|
func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time.Time, sequenceNum uint64) error {
|
|
path := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10)
|
|
data, err := a.view.Get(ctx, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if data == nil {
|
|
return nil
|
|
}
|
|
|
|
out := &activity.EntityActivityLog{}
|
|
err = proto.Unmarshal(data.Value, out)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
a.l.RLock()
|
|
a.fragmentLock.Lock()
|
|
// Handle the (unlikely) case where the end of the month has been reached while background loading.
|
|
// Or the feature has been disabled.
|
|
if a.enabled && startTime.Unix() == a.currentSegment.startTimestamp {
|
|
for _, ent := range out.Clients {
|
|
a.partialMonthClientTracker[ent.ClientID] = ent
|
|
}
|
|
}
|
|
a.fragmentLock.Unlock()
|
|
a.l.RUnlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// loadCurrentClientSegment loads the most recent segment (for "this month")
|
|
// into memory (to append new entries), and to the partialMonthClientTracker to
|
|
// avoid duplication call with fragmentLock and l held.
|
|
func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime time.Time, sequenceNum uint64) error {
|
|
path := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10)
|
|
data, err := a.view.Get(ctx, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if data == nil {
|
|
return nil
|
|
}
|
|
|
|
out := &activity.EntityActivityLog{}
|
|
err = proto.Unmarshal(data.Value, out)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !a.core.perfStandby {
|
|
a.currentSegment = segmentInfo{
|
|
startTimestamp: startTime.Unix(),
|
|
currentClients: &activity.EntityActivityLog{
|
|
Clients: out.Clients,
|
|
},
|
|
tokenCount: a.currentSegment.tokenCount,
|
|
clientSequenceNumber: sequenceNum,
|
|
}
|
|
} else {
|
|
// populate this for edge case checking (if end of month passes while background loading on standby)
|
|
a.currentSegment.startTimestamp = startTime.Unix()
|
|
}
|
|
|
|
for _, client := range out.Clients {
|
|
a.partialMonthClientTracker[client.ClientID] = client
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// tokenCountExists checks if there's a token log for :startTime:
|
|
// this function should be called with the lock held
|
|
func (a *ActivityLog) tokenCountExists(ctx context.Context, startTime time.Time) (bool, error) {
|
|
p, err := a.view.List(ctx, activityTokenBasePath+fmt.Sprint(startTime.Unix())+"/")
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, path := range p {
|
|
if num, ok := parseSegmentNumberFromPath(path); ok && num == 0 {
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// loadTokenCount populates the in-memory representation of activity token count
|
|
// this function should be called with the lock held
|
|
func (a *ActivityLog) loadTokenCount(ctx context.Context, startTime time.Time) error {
|
|
tokenCountExists, err := a.tokenCountExists(ctx, startTime)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !tokenCountExists {
|
|
return nil
|
|
}
|
|
|
|
path := activityTokenBasePath + fmt.Sprint(startTime.Unix()) + "/0"
|
|
data, err := a.view.Get(ctx, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if data == nil {
|
|
return nil
|
|
}
|
|
|
|
out := &activity.TokenCount{}
|
|
err = proto.Unmarshal(data.Value, out)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// An empty map is unmarshaled as nil
|
|
if out.CountByNamespaceID == nil {
|
|
out.CountByNamespaceID = make(map[string]uint64)
|
|
}
|
|
|
|
// We must load the tokenCount of the current segment into the activity log
|
|
// so that TWEs counted before the introduction of a client ID for TWEs are
|
|
// still reported in the partial client counts.
|
|
a.currentSegment.tokenCount = out
|
|
|
|
return nil
|
|
}
|
|
|
|
// entityBackgroundLoader loads entity activity log records for start_date `t`
|
|
func (a *ActivityLog) entityBackgroundLoader(ctx context.Context, wg *sync.WaitGroup, t time.Time, seqNums <-chan uint64) {
|
|
defer wg.Done()
|
|
for seqNum := range seqNums {
|
|
select {
|
|
case <-a.doneCh:
|
|
a.logger.Info("background processing told to halt while loading entities", "time", t, "sequence", seqNum)
|
|
return
|
|
default:
|
|
}
|
|
|
|
err := a.loadPriorEntitySegment(ctx, t, seqNum)
|
|
if err != nil {
|
|
a.logger.Error("error loading entity activity log", "time", t, "sequence", seqNum, "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Initialize a new current segment, based on the current time.
|
|
// Call with fragmentLock and l held.
|
|
func (a *ActivityLog) startNewCurrentLogLocked(now time.Time) {
|
|
a.logger.Trace("initializing new log")
|
|
a.resetCurrentLog()
|
|
a.currentSegment.startTimestamp = now.Unix()
|
|
}
|
|
|
|
// Should be called with fragmentLock and l held.
|
|
func (a *ActivityLog) newMonthCurrentLogLocked(currentTime time.Time) {
|
|
a.logger.Trace("continuing log to new month")
|
|
a.resetCurrentLog()
|
|
monthStart := timeutil.StartOfMonth(currentTime.UTC())
|
|
a.currentSegment.startTimestamp = monthStart.Unix()
|
|
}
|
|
|
|
// Initialize a new current segment, based on the given time
|
|
// should be called with fragmentLock and l held.
|
|
func (a *ActivityLog) newSegmentAtGivenTime(t time.Time) {
|
|
timestamp := t.Unix()
|
|
|
|
a.logger.Trace("starting a segment", "timestamp", timestamp)
|
|
a.resetCurrentLog()
|
|
a.currentSegment.startTimestamp = timestamp
|
|
}
|
|
|
|
// Reset all the current segment state.
|
|
// Should be called with fragmentLock and l held.
|
|
func (a *ActivityLog) resetCurrentLog() {
|
|
a.currentSegment.startTimestamp = 0
|
|
a.currentSegment.currentClients = &activity.EntityActivityLog{
|
|
Clients: make([]*activity.EntityRecord, 0),
|
|
}
|
|
|
|
// We must still initialize the tokenCount to recieve tokenCounts from fragments
|
|
// during the month where customers upgrade to 1.9
|
|
a.currentSegment.tokenCount = &activity.TokenCount{
|
|
CountByNamespaceID: make(map[string]uint64),
|
|
}
|
|
|
|
a.currentSegment.clientSequenceNumber = 0
|
|
|
|
a.fragment = nil
|
|
a.partialMonthClientTracker = make(map[string]*activity.EntityRecord)
|
|
|
|
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
|
|
}
|
|
|
|
func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) {
|
|
entityPath := fmt.Sprintf("%v%v/", activityEntityBasePath, startTimestamp)
|
|
tokenPath := fmt.Sprintf("%v%v/", activityTokenBasePath, startTimestamp)
|
|
|
|
entitySegments, err := a.view.List(ctx, entityPath)
|
|
if err != nil {
|
|
a.logger.Error("could not list entity paths", "error", err)
|
|
return
|
|
}
|
|
for _, p := range entitySegments {
|
|
err = a.view.Delete(ctx, entityPath+p)
|
|
if err != nil {
|
|
a.logger.Error("could not delete entity log", "error", err)
|
|
}
|
|
}
|
|
|
|
tokenSegments, err := a.view.List(ctx, tokenPath)
|
|
if err != nil {
|
|
a.logger.Error("could not list token paths", "error", err)
|
|
return
|
|
}
|
|
for _, p := range tokenSegments {
|
|
err = a.view.Delete(ctx, tokenPath+p)
|
|
if err != nil {
|
|
a.logger.Error("could not delete token log", "error", err)
|
|
}
|
|
}
|
|
|
|
// Allow whoever started this as a goroutine to wait for it to finish.
|
|
close(whenDone)
|
|
}
|
|
|
|
func (a *ActivityLog) WaitForDeletion() {
|
|
a.l.Lock()
|
|
// May be nil, if never set
|
|
doneCh := a.deleteDone
|
|
a.l.Unlock()
|
|
if doneCh != nil {
|
|
select {
|
|
case <-doneCh:
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// refreshFromStoredLog loads the appropriate entities/tokencounts for active and performance standbys
|
|
// the most recent segment is loaded synchronously, and older segments are loaded in the background
|
|
// this function expects stateLock to be held
|
|
func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGroup, now time.Time) error {
|
|
a.l.Lock()
|
|
defer a.l.Unlock()
|
|
a.fragmentLock.Lock()
|
|
defer a.fragmentLock.Unlock()
|
|
|
|
decreasingLogTimes, err := a.getMostRecentActivityLogSegment(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(decreasingLogTimes) == 0 {
|
|
if a.enabled {
|
|
if a.core.perfStandby {
|
|
// reset the log without updating the timestamp
|
|
a.resetCurrentLog()
|
|
} else {
|
|
a.startNewCurrentLogLocked(now)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
mostRecent := decreasingLogTimes[0]
|
|
|
|
if !a.enabled {
|
|
a.logger.Debug("activity log not enabled, skipping refresh from storage")
|
|
if !a.core.perfStandby && timeutil.IsCurrentMonth(mostRecent, now) {
|
|
a.logger.Debug("activity log is disabled, cleaning up logs for the current month")
|
|
go a.deleteLogWorker(ctx, mostRecent.Unix(), make(chan struct{}))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if timeutil.IsPreviousMonth(mostRecent, now) {
|
|
// no activity logs to load for this month. if we are enabled, interpret
|
|
// it as having missed the rotation, so let it fall through and load
|
|
// if we missed generating the precomputed query, activeFragmentWorker()
|
|
// will clean things up when it runs next
|
|
|
|
a.logger.Trace("no log segments for current month", "mostRecent", mostRecent)
|
|
a.logger.Info("rotating activity log to new month")
|
|
} else if mostRecent.After(now) {
|
|
// we can't do anything if the most recent log is in the future
|
|
a.logger.Warn("timestamp from log to load is in the future", "timestamp", mostRecent)
|
|
return nil
|
|
} else if !timeutil.IsCurrentMonth(mostRecent, now) {
|
|
// the most recent log in storage is 2+ months in the past
|
|
|
|
a.logger.Warn("most recent log in storage is 2 or more months in the past.", "timestamp", mostRecent)
|
|
if a.core.perfStandby {
|
|
// reset the log without updating the timestamp
|
|
a.resetCurrentLog()
|
|
} else {
|
|
a.startNewCurrentLogLocked(now)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// load token counts from storage into memory. As of 1.9, this functionality
|
|
// is still required since without it, we would lose replicated TWE counts for the
|
|
// current segment.
|
|
if !a.core.perfStandby {
|
|
err = a.loadTokenCount(ctx, mostRecent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// load entity logs from storage into memory
|
|
lastSegment, segmentsExist, err := a.getLastEntitySegmentNumber(ctx, mostRecent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !segmentsExist {
|
|
a.logger.Trace("no entity segments for current month")
|
|
return nil
|
|
}
|
|
|
|
err = a.loadCurrentClientSegment(ctx, mostRecent, lastSegment)
|
|
if err != nil || lastSegment == 0 {
|
|
return err
|
|
}
|
|
lastSegment--
|
|
|
|
seqNums := make(chan uint64, lastSegment+1)
|
|
wg.Add(1)
|
|
go a.entityBackgroundLoader(ctx, wg, mostRecent, seqNums)
|
|
|
|
for n := int(lastSegment); n >= 0; n-- {
|
|
seqNums <- uint64(n)
|
|
}
|
|
close(seqNums)
|
|
|
|
return nil
|
|
}
|
|
|
|
// This version is used during construction
|
|
func (a *ActivityLog) SetConfigInit(config activityConfig) {
|
|
switch config.Enabled {
|
|
case "enable":
|
|
a.enabled = true
|
|
case "default":
|
|
a.enabled = activityLogEnabledDefault
|
|
case "disable":
|
|
a.enabled = false
|
|
}
|
|
|
|
if a.configOverrides.ForceEnable {
|
|
a.enabled = true
|
|
}
|
|
|
|
a.defaultReportMonths = config.DefaultReportMonths
|
|
a.retentionMonths = config.RetentionMonths
|
|
|
|
if a.retentionMonths < a.configOverrides.MinimumRetentionMonths {
|
|
a.retentionMonths = a.configOverrides.MinimumRetentionMonths
|
|
}
|
|
|
|
if a.configOverrides.CensusReportInterval > 0 {
|
|
a.CensusReportInterval = a.configOverrides.CensusReportInterval
|
|
}
|
|
}
|
|
|
|
// This version reacts to user changes
|
|
func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {
|
|
a.l.Lock()
|
|
defer a.l.Unlock()
|
|
|
|
// enabled is protected by fragmentLock
|
|
a.fragmentLock.Lock()
|
|
originalEnabled := a.enabled
|
|
switch config.Enabled {
|
|
case "enable":
|
|
a.enabled = true
|
|
case "default":
|
|
a.enabled = activityLogEnabledDefault
|
|
case "disable":
|
|
a.enabled = false
|
|
}
|
|
|
|
if a.enabled != originalEnabled {
|
|
a.logger.Info("activity log enable changed", "original", originalEnabled, "current", a.enabled)
|
|
}
|
|
|
|
if !a.enabled && a.currentSegment.startTimestamp != 0 {
|
|
a.logger.Trace("deleting current segment")
|
|
a.deleteDone = make(chan struct{})
|
|
// this is called from a request under stateLock, so use activeContext
|
|
go a.deleteLogWorker(a.core.activeContext, a.currentSegment.startTimestamp, a.deleteDone)
|
|
a.resetCurrentLog()
|
|
}
|
|
|
|
forceSave := false
|
|
if a.enabled && a.currentSegment.startTimestamp == 0 {
|
|
a.startNewCurrentLogLocked(a.clock.Now().UTC())
|
|
// Force a save so we can distinguish between
|
|
//
|
|
// Month N-1: present
|
|
// Month N: <blank because we missed the month end>
|
|
//
|
|
// and
|
|
//
|
|
// Month N-1: present
|
|
// Month N: <blank because disabled and re-enabled>
|
|
forceSave = true
|
|
}
|
|
a.fragmentLock.Unlock()
|
|
|
|
if forceSave {
|
|
// l is still held here
|
|
a.saveCurrentSegmentInternal(ctx, true)
|
|
}
|
|
|
|
a.defaultReportMonths = config.DefaultReportMonths
|
|
a.retentionMonths = config.RetentionMonths
|
|
if a.retentionMonths < a.configOverrides.MinimumRetentionMonths {
|
|
a.retentionMonths = a.configOverrides.MinimumRetentionMonths
|
|
}
|
|
|
|
// check for segments out of retention period, if it has changed
|
|
go a.retentionWorker(ctx, a.clock.Now(), a.retentionMonths)
|
|
}
|
|
|
|
// update the enable flag and reset the current log
|
|
func (a *ActivityLog) SetConfigStandby(ctx context.Context, config activityConfig) {
|
|
a.l.Lock()
|
|
defer a.l.Unlock()
|
|
|
|
// enable is protected by fragmentLock
|
|
a.fragmentLock.Lock()
|
|
originalEnabled := a.enabled
|
|
switch config.Enabled {
|
|
case "enable":
|
|
a.enabled = true
|
|
case "default":
|
|
a.enabled = activityLogEnabledDefault
|
|
case "disable":
|
|
a.enabled = false
|
|
}
|
|
|
|
if a.enabled != originalEnabled {
|
|
a.logger.Info("activity log enable changed", "original", originalEnabled, "current", a.enabled)
|
|
a.resetCurrentLog()
|
|
}
|
|
a.fragmentLock.Unlock()
|
|
}
|
|
|
|
func (a *ActivityLog) queriesAvailable(ctx context.Context) (bool, error) {
|
|
if a.queryStore == nil {
|
|
return false, nil
|
|
}
|
|
return a.queryStore.QueriesAvailable(ctx)
|
|
}
|
|
|
|
// setupActivityLog hooks up the singleton ActivityLog into Core.
|
|
func (c *Core) setupActivityLog(ctx context.Context, wg *sync.WaitGroup) error {
|
|
c.activityLogLock.Lock()
|
|
defer c.activityLogLock.Unlock()
|
|
return c.setupActivityLogLocked(ctx, wg)
|
|
}
|
|
|
|
// setupActivityLogLocked hooks up the singleton ActivityLog into Core.
|
|
// this function should be called with activityLogLock.
|
|
func (c *Core) setupActivityLogLocked(ctx context.Context, wg *sync.WaitGroup) error {
|
|
logger := c.baseLogger.Named("activity")
|
|
c.AddLogger(logger)
|
|
|
|
if os.Getenv("VAULT_DISABLE_ACTIVITY_LOG") != "" {
|
|
if c.CensusLicensingEnabled() {
|
|
logger.Warn("activity log disabled via environment variable while reporting is enabled. " +
|
|
"Reporting will override, and the activity log will be enabled")
|
|
} else {
|
|
logger.Info("activity log disabled via environment variable")
|
|
return nil
|
|
}
|
|
}
|
|
|
|
view := c.systemBarrierView.SubView(activitySubPath)
|
|
|
|
manager, err := NewActivityLog(c, logger, view, c.metricSink)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.activityLog = manager
|
|
|
|
// load activity log for "this month" into memory
|
|
err = manager.refreshFromStoredLog(manager.core.activeContext, wg, manager.clock.Now().UTC())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Start the background worker, depending on type
|
|
// Lock already held here, can't use .PerfStandby()
|
|
// The workers need to know the current segment time.
|
|
if c.perfStandby {
|
|
go manager.perfStandbyFragmentWorker(ctx)
|
|
} else {
|
|
go manager.activeFragmentWorker(ctx)
|
|
|
|
// Check for any intent log, in the background
|
|
manager.computationWorkerDone = make(chan struct{})
|
|
go func() {
|
|
manager.precomputedQueryWorker(ctx)
|
|
close(manager.computationWorkerDone)
|
|
}()
|
|
|
|
// Catch up on garbage collection
|
|
// Signal when this is done so that unit tests can proceed.
|
|
manager.retentionDone = make(chan struct{})
|
|
go func(months int) {
|
|
manager.retentionWorker(ctx, manager.clock.Now(), months)
|
|
close(manager.retentionDone)
|
|
}(manager.retentionMonths)
|
|
|
|
manager.CensusReportDone = make(chan bool)
|
|
go c.activityLog.CensusReport(ctx, c.CensusAgent(), c.BillingStart())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// stopActivityLogLocked removes the ActivityLog from Core
|
|
// and frees any resources.
|
|
// this function should be called with activityLogLock
|
|
func (c *Core) stopActivityLogLocked() {
|
|
// preSeal may run before startActivityLog got a chance to complete.
|
|
if c.activityLog != nil {
|
|
// Shut down background worker
|
|
close(c.activityLog.doneCh)
|
|
}
|
|
|
|
c.activityLog = nil
|
|
}
|
|
|
|
// stopActivityLog removes the ActivityLog from Core
|
|
// and frees any resources.
|
|
func (c *Core) stopActivityLog() {
|
|
c.activityLogLock.Lock()
|
|
defer c.activityLogLock.Unlock()
|
|
c.stopActivityLogLocked()
|
|
}
|
|
|
|
func (a *ActivityLog) StartOfNextMonth() time.Time {
|
|
a.l.RLock()
|
|
defer a.l.RUnlock()
|
|
var segmentStart time.Time
|
|
if a.currentSegment.startTimestamp == 0 {
|
|
segmentStart = a.clock.Now().UTC()
|
|
} else {
|
|
segmentStart = time.Unix(a.currentSegment.startTimestamp, 0).UTC()
|
|
}
|
|
// Basing this on the segment start will mean we trigger EOM rollover when
|
|
// necessary because we were down.
|
|
return timeutil.StartOfNextMonth(segmentStart)
|
|
}
|
|
|
|
// perfStandbyFragmentWorker handles scheduling fragments
|
|
// to send via RPC; it runs on perf standby nodes only.
|
|
func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) {
|
|
timer := a.clock.NewTimer(time.Duration(0))
|
|
fragmentWaiting := false
|
|
// Eat first event, so timer is stopped
|
|
<-timer.C
|
|
|
|
endOfMonth := a.clock.NewTimer(a.StartOfNextMonth().Sub(a.clock.Now()))
|
|
if a.configOverrides.DisableTimers {
|
|
endOfMonth.Stop()
|
|
}
|
|
|
|
sendFunc := func() {
|
|
ctx, cancel := context.WithTimeout(ctx, activityFragmentSendTimeout)
|
|
defer cancel()
|
|
err := a.sendCurrentFragment(ctx)
|
|
if err != nil {
|
|
a.logger.Warn("activity log fragment lost", "error", err)
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-a.doneCh:
|
|
// Shutting down activity log.
|
|
if fragmentWaiting && !timer.Stop() {
|
|
<-timer.C
|
|
}
|
|
if !endOfMonth.Stop() {
|
|
<-endOfMonth.C
|
|
}
|
|
return
|
|
case <-a.newFragmentCh:
|
|
// New fragment created, start the timer if not
|
|
// already running
|
|
if !fragmentWaiting {
|
|
fragmentWaiting = true
|
|
if !a.configOverrides.DisableTimers {
|
|
a.logger.Trace("reset fragment timer")
|
|
timer.Reset(activityFragmentStandbyTime)
|
|
}
|
|
}
|
|
case <-timer.C:
|
|
a.logger.Trace("sending fragment on timer expiration")
|
|
fragmentWaiting = false
|
|
sendFunc()
|
|
case <-a.sendCh:
|
|
a.logger.Trace("sending fragment on request")
|
|
// It might be that we get sendCh before fragmentCh
|
|
// if a fragment is created and then immediately fills
|
|
// up to its limit. So we attempt to send even if the timer's
|
|
// not running.
|
|
if fragmentWaiting {
|
|
fragmentWaiting = false
|
|
if !timer.Stop() {
|
|
<-timer.C
|
|
}
|
|
}
|
|
sendFunc()
|
|
case <-endOfMonth.C:
|
|
a.logger.Trace("sending fragment on end of month")
|
|
// Flush the current fragment, if any
|
|
if fragmentWaiting {
|
|
fragmentWaiting = false
|
|
if !timer.Stop() {
|
|
<-timer.C
|
|
}
|
|
}
|
|
sendFunc()
|
|
|
|
// clear active entity set
|
|
a.fragmentLock.Lock()
|
|
a.partialMonthClientTracker = make(map[string]*activity.EntityRecord)
|
|
|
|
a.fragmentLock.Unlock()
|
|
|
|
// Set timer for next month.
|
|
// The current segment *probably* hasn't been set yet (via invalidation),
|
|
// so don't rely on it.
|
|
target := timeutil.StartOfNextMonth(a.clock.Now().UTC())
|
|
endOfMonth.Reset(target.Sub(a.clock.Now()))
|
|
}
|
|
}
|
|
}
|
|
|
|
// activeFragmentWorker handles scheduling the write of the next
|
|
// segment. It runs on active nodes only.
|
|
func (a *ActivityLog) activeFragmentWorker(ctx context.Context) {
|
|
ticker := a.clock.NewTicker(activitySegmentInterval)
|
|
|
|
endOfMonth := a.clock.NewTimer(a.StartOfNextMonth().Sub(a.clock.Now()))
|
|
if a.configOverrides.DisableTimers {
|
|
endOfMonth.Stop()
|
|
}
|
|
|
|
endOfMonthChannel := endOfMonth.C
|
|
if a.core.activityLogConfig.DisableTimers {
|
|
endOfMonthChannel = nil
|
|
}
|
|
|
|
writeFunc := func() {
|
|
ctx, cancel := context.WithTimeout(ctx, activitySegmentWriteTimeout)
|
|
defer cancel()
|
|
err := a.saveCurrentSegmentToStorage(ctx, false)
|
|
if err != nil {
|
|
a.logger.Warn("activity log segment not saved, current fragment lost", "error", err)
|
|
}
|
|
}
|
|
|
|
// we modify the doneCh in some tests, so let's make sure we don't trip
|
|
// the race detector
|
|
a.l.RLock()
|
|
doneCh := a.doneCh
|
|
a.l.RUnlock()
|
|
|
|
for {
|
|
select {
|
|
case <-doneCh:
|
|
// Shutting down activity log.
|
|
ticker.Stop()
|
|
return
|
|
case <-a.newFragmentCh:
|
|
// Just eat the message; the ticker is
|
|
// already running so we don't need to start it.
|
|
// (But we might change the behavior in the future.)
|
|
a.logger.Trace("new local fragment created")
|
|
continue
|
|
case <-ticker.C:
|
|
// It's harder to disable a Ticker so we'll just ignore it.
|
|
if a.configOverrides.DisableTimers {
|
|
continue
|
|
}
|
|
a.logger.Trace("writing segment on timer expiration")
|
|
writeFunc()
|
|
case <-a.writeCh:
|
|
a.logger.Trace("writing segment on request")
|
|
writeFunc()
|
|
|
|
// Reset the schedule to wait 10 minutes from this forced write.
|
|
ticker.Stop()
|
|
ticker = a.clock.NewTicker(activitySegmentInterval)
|
|
|
|
// Simpler, but ticker.Reset was introduced in go 1.15:
|
|
// ticker.Reset(activitySegmentInterval)
|
|
case currentTime := <-endOfMonthChannel:
|
|
err := a.HandleEndOfMonth(ctx, currentTime.UTC())
|
|
if err != nil {
|
|
a.logger.Error("failed to perform end of month rotation", "error", err)
|
|
}
|
|
|
|
// Garbage collect any segments or queries based on the immediate
|
|
// value of retentionMonths.
|
|
a.l.RLock()
|
|
go a.retentionWorker(ctx, currentTime.UTC(), a.retentionMonths)
|
|
a.l.RUnlock()
|
|
|
|
delta := a.StartOfNextMonth().Sub(a.clock.Now())
|
|
if delta < 20*time.Minute {
|
|
delta = 20 * time.Minute
|
|
}
|
|
a.logger.Trace("scheduling next month", "delta", delta)
|
|
endOfMonth.Reset(delta)
|
|
}
|
|
}
|
|
}
|
|
|
|
type ActivityIntentLog struct {
|
|
PreviousMonth int64 `json:"previous_month"`
|
|
NextMonth int64 `json:"next_month"`
|
|
}
|
|
|
|
// Handle rotation to end-of-month
|
|
// currentTime is an argument for unit-testing purposes
|
|
func (a *ActivityLog) HandleEndOfMonth(ctx context.Context, currentTime time.Time) error {
|
|
// Hold lock to prevent segment or enable changing,
|
|
// disable will apply to *next* month.
|
|
a.l.Lock()
|
|
defer a.l.Unlock()
|
|
|
|
a.fragmentLock.RLock()
|
|
// Don't bother if disabled
|
|
// since l is locked earlier (and SetConfig() is the only way enabled can change)
|
|
// we don't need to worry about enabled changing during this work
|
|
enabled := a.enabled
|
|
a.fragmentLock.RUnlock()
|
|
if !enabled {
|
|
return nil
|
|
}
|
|
|
|
a.logger.Trace("starting end of month processing", "rolloverTime", currentTime)
|
|
|
|
prevSegmentTimestamp := a.currentSegment.startTimestamp
|
|
nextSegmentTimestamp := timeutil.StartOfMonth(currentTime.UTC()).Unix()
|
|
|
|
// Write out an intent log for the rotation with the current and new segment times.
|
|
intentLog := &ActivityIntentLog{
|
|
PreviousMonth: prevSegmentTimestamp,
|
|
NextMonth: nextSegmentTimestamp,
|
|
}
|
|
entry, err := logical.StorageEntryJSON(activityIntentLogKey, intentLog)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = a.view.Put(ctx, entry)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Save the current segment; this does not guarantee that fragment will be
|
|
// empty when it returns, but dropping some measurements is acceptable.
|
|
// We use force=true here in case an entry didn't appear this month
|
|
err = a.saveCurrentSegmentToStorageLocked(ctx, true)
|
|
|
|
// Don't return this error, just log it, we are done with that segment anyway.
|
|
if err != nil {
|
|
a.logger.Warn("last save of segment failed", "error", err)
|
|
}
|
|
|
|
// Advance the log; no need to force a save here because we have
|
|
// the intent log written already.
|
|
//
|
|
// On recovery refreshFromStoredLog() will see we're no longer
|
|
// in the previous month, and recover by calling newMonthCurrentLog
|
|
// again and triggering the precomputed query.
|
|
a.fragmentLock.Lock()
|
|
a.newMonthCurrentLogLocked(currentTime)
|
|
a.fragmentLock.Unlock()
|
|
|
|
// Work on precomputed queries in background
|
|
go a.precomputedQueryWorker(ctx)
|
|
|
|
return nil
|
|
}
|
|
|
|
// ResetActivityLog is used to extract the current fragment(s) during
|
|
// integration testing, so that it can be checked in a race-free way.
|
|
func (c *Core) ResetActivityLog() []*activity.LogFragment {
|
|
c.stateLock.RLock()
|
|
a := c.activityLog
|
|
c.stateLock.RUnlock()
|
|
if a == nil {
|
|
return nil
|
|
}
|
|
|
|
allFragments := make([]*activity.LogFragment, 1)
|
|
a.fragmentLock.Lock()
|
|
allFragments[0] = a.fragment
|
|
a.fragment = nil
|
|
|
|
allFragments = append(allFragments, a.standbyFragmentsReceived...)
|
|
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
|
|
a.fragmentLock.Unlock()
|
|
return allFragments
|
|
}
|
|
|
|
func (a *ActivityLog) AddEntityToFragment(entityID string, namespaceID string, timestamp int64) {
|
|
a.AddClientToFragment(entityID, namespaceID, timestamp, false, "")
|
|
}
|
|
|
|
// AddClientToFragment checks a client ID for uniqueness and
|
|
// if not already present, adds it to the current fragment.
|
|
//
|
|
// See note below about AddActivityToFragment.
|
|
func (a *ActivityLog) AddClientToFragment(clientID string, namespaceID string, timestamp int64, isTWE bool, mountAccessor string) {
|
|
// TWE == token without entity
|
|
if isTWE {
|
|
a.AddActivityToFragment(clientID, namespaceID, timestamp, nonEntityTokenActivityType, mountAccessor)
|
|
return
|
|
}
|
|
|
|
a.AddActivityToFragment(clientID, namespaceID, timestamp, entityActivityType, mountAccessor)
|
|
}
|
|
|
|
// AddActivityToFragment adds a client count event of any type to
|
|
// add to the current fragment. ClientIDs must be unique across
|
|
// all types; if not already present, we will add it to the current
|
|
// fragment. The timestamp is a Unix timestamp *without* nanoseconds,
|
|
// as that is what token.CreationTime uses.
|
|
func (a *ActivityLog) AddActivityToFragment(clientID string, namespaceID string, timestamp int64, activityType string, mountAccessor string) {
|
|
// Check whether entity ID already recorded
|
|
var present bool
|
|
|
|
// TODO: This hack enables separate tracking of events without handling
|
|
// separate storage buckets for counting these event types. Consider
|
|
// removing if the event type is otherwise clear; notably though, this
|
|
// does help ensure clientID uniqueness across different types of tokens,
|
|
// assuming it does not break any other downstream systems.
|
|
if activityType != nonEntityTokenActivityType && activityType != entityActivityType {
|
|
clientID = activityType + "." + clientID
|
|
}
|
|
|
|
a.fragmentLock.RLock()
|
|
if a.enabled {
|
|
_, present = a.partialMonthClientTracker[clientID]
|
|
} else {
|
|
present = true
|
|
}
|
|
a.fragmentLock.RUnlock()
|
|
if present {
|
|
return
|
|
}
|
|
|
|
// Update current fragment with new active entity
|
|
a.fragmentLock.Lock()
|
|
defer a.fragmentLock.Unlock()
|
|
|
|
// Re-check entity ID after re-acquiring lock
|
|
_, present = a.partialMonthClientTracker[clientID]
|
|
if present {
|
|
return
|
|
}
|
|
|
|
a.createCurrentFragment()
|
|
|
|
clientRecord := &activity.EntityRecord{
|
|
ClientID: clientID,
|
|
NamespaceID: namespaceID,
|
|
Timestamp: timestamp,
|
|
MountAccessor: mountAccessor,
|
|
ClientType: activityType,
|
|
}
|
|
|
|
// Track whether the clientID corresponds to a token without an entity or not.
|
|
// This field is backward compatible, as the default is 0, so records created
|
|
// from pre-1.9 activityLog code will automatically be marked as having an entity.
|
|
if activityType != entityActivityType {
|
|
// TODO: This part needs to be modified potentially for separate
|
|
// storage buckets of custom event types. Consider setting the above
|
|
// condition to activityType == nonEntityTokenEventType in the future.
|
|
clientRecord.NonEntity = true
|
|
}
|
|
|
|
a.fragment.Clients = append(a.fragment.Clients, clientRecord)
|
|
a.partialMonthClientTracker[clientRecord.ClientID] = clientRecord
|
|
}
|
|
|
|
// Create the current fragment if it doesn't already exist.
|
|
// Must be called with the lock held.
|
|
func (a *ActivityLog) createCurrentFragment() {
|
|
if a.fragment == nil {
|
|
a.fragment = &activity.LogFragment{
|
|
OriginatingNode: a.nodeID,
|
|
Clients: make([]*activity.EntityRecord, 0, 120),
|
|
NonEntityTokens: make(map[string]uint64),
|
|
}
|
|
a.fragmentCreation = a.clock.Now().UTC()
|
|
|
|
// Signal that a new segment is available, start
|
|
// the timer to send it.
|
|
a.newFragmentCh <- struct{}{}
|
|
}
|
|
}
|
|
|
|
func (a *ActivityLog) receivedFragment(fragment *activity.LogFragment) {
|
|
a.logger.Trace("received fragment from standby", "node", fragment.OriginatingNode)
|
|
|
|
a.fragmentLock.Lock()
|
|
defer a.fragmentLock.Unlock()
|
|
|
|
if !a.enabled {
|
|
return
|
|
}
|
|
|
|
for _, e := range fragment.Clients {
|
|
a.partialMonthClientTracker[e.ClientID] = e
|
|
}
|
|
|
|
a.standbyFragmentsReceived = append(a.standbyFragmentsReceived, fragment)
|
|
|
|
// TODO: check if current segment is full and should be written
|
|
}
|
|
|
|
type ResponseCounts struct {
|
|
DistinctEntities int `json:"distinct_entities"`
|
|
EntityClients int `json:"entity_clients"`
|
|
NonEntityTokens int `json:"non_entity_tokens"`
|
|
NonEntityClients int `json:"non_entity_clients"`
|
|
Clients int `json:"clients"`
|
|
}
|
|
|
|
type ResponseNamespace struct {
|
|
NamespaceID string `json:"namespace_id"`
|
|
NamespacePath string `json:"namespace_path"`
|
|
Counts ResponseCounts `json:"counts"`
|
|
Mounts []*ResponseMount `json:"mounts"`
|
|
}
|
|
|
|
type ResponseMonth struct {
|
|
Timestamp string `json:"timestamp"`
|
|
Counts *ResponseCounts `json:"counts"`
|
|
Namespaces []*ResponseNamespace `json:"namespaces"`
|
|
NewClients *ResponseNewClients `json:"new_clients" mapstructure:"new_clients"`
|
|
}
|
|
|
|
type ResponseNewClients struct {
|
|
Counts *ResponseCounts `json:"counts"`
|
|
Namespaces []*ResponseNamespace `json:"namespaces"`
|
|
}
|
|
|
|
type ResponseMount struct {
|
|
MountPath string `json:"mount_path"`
|
|
Counts *ResponseCounts `json:"counts"`
|
|
}
|
|
|
|
// ActivityLogInjectResponse injects a precomputed query into storage for testing.
|
|
func (c *Core) ActivityLogInjectResponse(ctx context.Context, pq *activity.PrecomputedQuery) error {
|
|
c.stateLock.RLock()
|
|
a := c.activityLog
|
|
c.stateLock.RUnlock()
|
|
if a == nil {
|
|
return errors.New("nil activity log")
|
|
}
|
|
return a.queryStore.Put(ctx, pq)
|
|
}
|
|
|
|
func (a *ActivityLog) includeInResponse(query *namespace.Namespace, record *namespace.Namespace) bool {
|
|
if record == nil {
|
|
// Deleted namespace, only include in root queries
|
|
return query.ID == namespace.RootNamespaceID
|
|
}
|
|
return record.HasParent(query)
|
|
}
|
|
|
|
func (a *ActivityLog) DefaultStartTime(endTime time.Time) time.Time {
|
|
// If end time is September 30, then start time should be
|
|
// October 1st to get 12 months of data.
|
|
a.l.RLock()
|
|
defer a.l.RUnlock()
|
|
|
|
monthStart := timeutil.StartOfMonth(endTime)
|
|
return monthStart.AddDate(0, -a.defaultReportMonths+1, 0)
|
|
}
|
|
|
|
func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.Time, limitNamespaces int) (map[string]interface{}, error) {
|
|
var computePartial bool
|
|
|
|
// Change the start time to the beginning of the month, and the end time to be the end
|
|
// of the month.
|
|
startTime = timeutil.StartOfMonth(startTime)
|
|
endTime = timeutil.EndOfMonth(endTime)
|
|
|
|
// If the endTime of the query is the current month, request data from the queryStore
|
|
// with the endTime equal to the end of the last month, and add in the current month
|
|
// data.
|
|
precomputedQueryEndTime := endTime
|
|
if timeutil.IsCurrentMonth(endTime, a.clock.Now().UTC()) {
|
|
precomputedQueryEndTime = timeutil.EndOfMonth(timeutil.MonthsPreviousTo(1, timeutil.StartOfMonth(endTime)))
|
|
computePartial = true
|
|
}
|
|
|
|
pq := &activity.PrecomputedQuery{}
|
|
if startTime.After(precomputedQueryEndTime) && timeutil.IsCurrentMonth(startTime, a.clock.Now().UTC()) {
|
|
// We're only calculating the partial month client count. Skip the precomputation
|
|
// get call.
|
|
pq = &activity.PrecomputedQuery{
|
|
StartTime: startTime,
|
|
EndTime: endTime,
|
|
Namespaces: make([]*activity.NamespaceRecord, 0),
|
|
Months: make([]*activity.MonthRecord, 0),
|
|
}
|
|
} else {
|
|
storedQuery, err := a.queryStore.Get(ctx, startTime, precomputedQueryEndTime)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if storedQuery == nil {
|
|
// If the storedQuery is nil, that means there's no historical data to process. But, it's possible there's
|
|
// still current month data to process, so rather than returning a 204, let's proceed along like we're
|
|
// just querying the current month.
|
|
storedQuery = &activity.PrecomputedQuery{
|
|
StartTime: startTime,
|
|
EndTime: endTime,
|
|
Namespaces: make([]*activity.NamespaceRecord, 0),
|
|
Months: make([]*activity.MonthRecord, 0),
|
|
}
|
|
}
|
|
pq = storedQuery
|
|
}
|
|
|
|
// Calculate the namespace response breakdowns and totals for entities and tokens from the initial
|
|
// namespace data.
|
|
totalEntities, totalTokens, byNamespaceResponse, err := a.calculateByNamespaceResponseForQuery(ctx, pq.Namespaces)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// If we need to add the current month's client counts into the total, compute the namespace
|
|
// breakdown for the current month as well.
|
|
var partialByMonth map[int64]*processMonth
|
|
var partialByNamespace map[string]*processByNamespace
|
|
var totalEntitiesCurrent int
|
|
var totalTokensCurrent int
|
|
var byNamespaceResponseCurrent []*ResponseNamespace
|
|
if computePartial {
|
|
// Traverse through current month's activitylog data and group clients
|
|
// into months and namespaces
|
|
a.fragmentLock.RLock()
|
|
partialByMonth, partialByNamespace = a.populateNamespaceAndMonthlyBreakdowns()
|
|
a.fragmentLock.RUnlock()
|
|
|
|
// Convert the byNamespace breakdowns into structs that are
|
|
// consumable by the /activity endpoint, so as to reuse code between these two
|
|
// endpoints.
|
|
byNamespaceComputation := a.transformALNamespaceBreakdowns(partialByNamespace)
|
|
|
|
// Calculate the namespace response breakdowns and totals for entities and tokens from the initial
|
|
// namespace data.
|
|
totalEntitiesCurrent, totalTokensCurrent, byNamespaceResponseCurrent, err = a.calculateByNamespaceResponseForQuery(ctx, byNamespaceComputation)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create a mapping of namespace id to slice index, so that we can efficiently update our results without
|
|
// having to traverse the entire namespace response slice every time.
|
|
nsrMap := make(map[string]int)
|
|
for i, nr := range byNamespaceResponse {
|
|
nsrMap[nr.NamespaceID] = i
|
|
}
|
|
|
|
// Rather than blindly appending, which will create duplicates, check our existing counts against the current
|
|
// month counts, and append or update as necessary. We also want to account for mounts and their counts.
|
|
for _, nrc := range byNamespaceResponseCurrent {
|
|
if ndx, ok := nsrMap[nrc.NamespaceID]; ok {
|
|
existingRecord := byNamespaceResponse[ndx]
|
|
|
|
// Create a map of the existing mounts, so we don't duplicate them
|
|
mountMap := make(map[string]*ResponseCounts)
|
|
for _, erm := range existingRecord.Mounts {
|
|
mountMap[erm.MountPath] = erm.Counts
|
|
}
|
|
|
|
existingRecord.Counts.EntityClients += nrc.Counts.EntityClients
|
|
existingRecord.Counts.Clients += nrc.Counts.Clients
|
|
existingRecord.Counts.DistinctEntities += nrc.Counts.DistinctEntities
|
|
existingRecord.Counts.NonEntityClients += nrc.Counts.NonEntityClients
|
|
existingRecord.Counts.NonEntityTokens += nrc.Counts.NonEntityTokens
|
|
|
|
// Check the current month mounts against the existing mounts and if there are matches, update counts
|
|
// accordingly. If there is no match, append the new mount to the existing mounts, so it will be counted
|
|
// later.
|
|
for _, nrcMount := range nrc.Mounts {
|
|
if existingRecordMountCounts, ook := mountMap[nrcMount.MountPath]; ook {
|
|
existingRecordMountCounts.EntityClients += nrcMount.Counts.EntityClients
|
|
existingRecordMountCounts.Clients += nrcMount.Counts.Clients
|
|
existingRecordMountCounts.DistinctEntities += nrcMount.Counts.DistinctEntities
|
|
existingRecordMountCounts.NonEntityClients += nrcMount.Counts.NonEntityClients
|
|
existingRecordMountCounts.NonEntityTokens += nrcMount.Counts.NonEntityTokens
|
|
} else {
|
|
existingRecord.Mounts = append(existingRecord.Mounts, nrcMount)
|
|
}
|
|
}
|
|
} else {
|
|
byNamespaceResponse = append(byNamespaceResponse, nrc)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sort clients within each namespace
|
|
a.sortALResponseNamespaces(byNamespaceResponse)
|
|
|
|
if limitNamespaces > 0 {
|
|
totalEntities, totalTokens, byNamespaceResponse = a.limitNamespacesInALResponse(byNamespaceResponse, limitNamespaces)
|
|
}
|
|
|
|
distinctEntitiesResponse := totalEntities
|
|
if computePartial {
|
|
currentMonth, err := a.computeCurrentMonthForBillingPeriod(ctx, partialByMonth, startTime, endTime)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Add the namespace attribution for the current month to the newly computed current month value. Note
|
|
// that transformMonthBreakdowns calculates a superstruct of the required namespace struct due to its
|
|
// primary use-case being for precomputedQueryWorker, but we will reuse this code for brevity and extract
|
|
// the namespaces from it.
|
|
currentMonthNamespaceAttribution := a.transformMonthBreakdowns(partialByMonth)
|
|
|
|
// Ensure that there is only one element in this list -- if not, warn.
|
|
if len(currentMonthNamespaceAttribution) > 1 {
|
|
a.logger.Warn("more than one month worth of namespace and mount attribution calculated for "+
|
|
"current month values", "number of months", len(currentMonthNamespaceAttribution))
|
|
}
|
|
if len(currentMonthNamespaceAttribution) == 0 {
|
|
a.logger.Warn("no month data found, returning query with no namespace attribution for current month")
|
|
} else {
|
|
currentMonth.Namespaces = currentMonthNamespaceAttribution[0].Namespaces
|
|
currentMonth.NewClients.Namespaces = currentMonthNamespaceAttribution[0].NewClients.Namespaces
|
|
}
|
|
pq.Months = append(pq.Months, currentMonth)
|
|
distinctEntitiesResponse += pq.Months[len(pq.Months)-1].NewClients.Counts.EntityClients
|
|
}
|
|
|
|
// Now populate the response based on breakdowns.
|
|
responseData := make(map[string]interface{})
|
|
responseData["start_time"] = pq.StartTime.Format(time.RFC3339)
|
|
|
|
// If we computed partial counts, we should return the actual end time we computed counts for, not the pre-computed
|
|
// query end time. If we don't do this, the end_time in the response doesn't match the actual data in the response,
|
|
// which is confusing. Note that regardless of what end time is given, if it falls within the current month, it will
|
|
// be set to the end of the current month. This is definitely suboptimal, and possibly confusing, but still an
|
|
// improvement over using the pre-computed query end time.
|
|
if computePartial {
|
|
responseData["end_time"] = endTime.Format(time.RFC3339)
|
|
} else {
|
|
responseData["end_time"] = pq.EndTime.Format(time.RFC3339)
|
|
}
|
|
|
|
responseData["by_namespace"] = byNamespaceResponse
|
|
responseData["total"] = &ResponseCounts{
|
|
DistinctEntities: distinctEntitiesResponse,
|
|
EntityClients: totalEntities + totalEntitiesCurrent,
|
|
NonEntityTokens: totalTokens + totalTokensCurrent,
|
|
NonEntityClients: totalTokens + totalTokensCurrent,
|
|
Clients: totalEntities + totalEntitiesCurrent + totalTokens + totalTokensCurrent,
|
|
}
|
|
|
|
// Create and populate the month response structs based on the monthly breakdown.
|
|
months, err := a.prepareMonthsResponseForQuery(ctx, pq.Months)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Sort the months and clients within each month before adding the months to the response
|
|
a.sortActivityLogMonthsResponse(months)
|
|
|
|
// Modify the final month output to make response more consumable based on API request
|
|
months = a.modifyResponseMonths(months, startTime, endTime)
|
|
responseData["months"] = months
|
|
|
|
return responseData, nil
|
|
}
|
|
|
|
// modifyResponseMonths fills out various parts of the query structure to help
|
|
// activity log clients parse the returned query.
|
|
func (a *ActivityLog) modifyResponseMonths(months []*ResponseMonth, start time.Time, end time.Time) []*ResponseMonth {
|
|
if len(months) == 0 {
|
|
return months
|
|
}
|
|
start = timeutil.StartOfMonth(start)
|
|
end = timeutil.EndOfMonth(end)
|
|
if timeutil.IsCurrentMonth(end, a.clock.Now().UTC()) {
|
|
end = timeutil.EndOfMonth(timeutil.StartOfMonth(end).AddDate(0, -1, 0))
|
|
}
|
|
modifiedResponseMonths := make([]*ResponseMonth, 0)
|
|
firstMonth, err := time.Parse(time.RFC3339, months[0].Timestamp)
|
|
if err != nil {
|
|
return months
|
|
}
|
|
for start.Before(firstMonth) && !timeutil.IsCurrentMonth(start, firstMonth) {
|
|
monthPlaceholder := &ResponseMonth{Timestamp: start.UTC().Format(time.RFC3339)}
|
|
modifiedResponseMonths = append(modifiedResponseMonths, monthPlaceholder)
|
|
start = timeutil.StartOfMonth(start.AddDate(0, 1, 0))
|
|
}
|
|
modifiedResponseMonths = append(modifiedResponseMonths, months...)
|
|
lastMonthStart, err := time.Parse(time.RFC3339, modifiedResponseMonths[len(modifiedResponseMonths)-1].Timestamp)
|
|
if err != nil {
|
|
return modifiedResponseMonths
|
|
}
|
|
lastMonth := timeutil.EndOfMonth(lastMonthStart)
|
|
for lastMonth.Before(end) && !timeutil.IsCurrentMonth(end, lastMonth) {
|
|
lastMonth = timeutil.StartOfMonth(lastMonth).AddDate(0, 1, 0)
|
|
monthPlaceholder := &ResponseMonth{Timestamp: lastMonth.UTC().Format(time.RFC3339)}
|
|
modifiedResponseMonths = append(modifiedResponseMonths, monthPlaceholder)
|
|
|
|
// reset lastMonth to be the end of the month so we can make an apt comparison
|
|
// in the next loop iteration
|
|
lastMonth = timeutil.EndOfMonth(lastMonth)
|
|
}
|
|
return modifiedResponseMonths
|
|
}
|
|
|
|
type activityConfig struct {
|
|
// DefaultReportMonths are the default number of months that are returned on
|
|
// a report. The zero value uses the system default of 12.
|
|
DefaultReportMonths int `json:"default_report_months"`
|
|
|
|
// RetentionMonths defines the number of months we want to retain data. The
|
|
// zero value uses the system default of 24 months.
|
|
RetentionMonths int `json:"retention_months"`
|
|
|
|
// Enabled is one of enable, disable, default.
|
|
Enabled string `json:"enabled"`
|
|
|
|
CensusReportInterval time.Duration `json:"census_report_interval"`
|
|
}
|
|
|
|
func defaultActivityConfig() activityConfig {
|
|
return activityConfig{
|
|
DefaultReportMonths: 12,
|
|
RetentionMonths: 24,
|
|
Enabled: "default",
|
|
}
|
|
}
|
|
|
|
func (a *ActivityLog) loadConfigOrDefault(ctx context.Context) (activityConfig, error) {
|
|
// Load from storage
|
|
var config activityConfig
|
|
configRaw, err := a.view.Get(ctx, activityConfigKey)
|
|
if err != nil {
|
|
return config, err
|
|
}
|
|
if configRaw == nil {
|
|
return defaultActivityConfig(), nil
|
|
}
|
|
|
|
if err := configRaw.DecodeJSON(&config); err != nil {
|
|
return config, err
|
|
}
|
|
|
|
return config, nil
|
|
}
|
|
|
|
// HandleTokenUsage adds the TokenEntry to the current fragment of the activity log
|
|
// This currently occurs on token usage only.
|
|
func (a *ActivityLog) HandleTokenUsage(ctx context.Context, entry *logical.TokenEntry, clientID string, isTWE bool) error {
|
|
// First, check if a is enabled, so as to avoid the cost of creating an ID for
|
|
// tokens without entities in the case where it not.
|
|
a.fragmentLock.RLock()
|
|
if !a.enabled {
|
|
a.fragmentLock.RUnlock()
|
|
return nil
|
|
}
|
|
a.fragmentLock.RUnlock()
|
|
|
|
// Do not count wrapping tokens in client count
|
|
if IsWrappingToken(entry) {
|
|
return nil
|
|
}
|
|
|
|
// Do not count root tokens in client count.
|
|
if entry.IsRoot() {
|
|
return nil
|
|
}
|
|
|
|
// Tokens created for the purpose of Link should bypass counting for billing purposes
|
|
if entry.InternalMeta != nil && entry.InternalMeta[IgnoreForBilling] == "true" {
|
|
return nil
|
|
}
|
|
|
|
// Look up the mount accessor of the auth method that issued the token, taking care to resolve the token path
|
|
// against the token namespace, which may not be the same as the request namespace!
|
|
tokenNS, err := NamespaceByID(ctx, entry.NamespaceID, a.core)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if tokenNS == nil {
|
|
return namespace.ErrNoNamespace
|
|
}
|
|
tokenCtx := namespace.ContextWithNamespace(ctx, tokenNS)
|
|
mountAccessor := ""
|
|
mountEntry := a.core.router.MatchingMountEntry(tokenCtx, entry.Path)
|
|
if mountEntry != nil {
|
|
mountAccessor = mountEntry.Accessor
|
|
}
|
|
|
|
// Parse an entry's client ID and add it to the activity log
|
|
a.AddClientToFragment(clientID, entry.NamespaceID, entry.CreationTime, isTWE, mountAccessor)
|
|
return nil
|
|
}
|
|
|
|
func (a *ActivityLog) namespaceToLabel(ctx context.Context, nsID string) string {
|
|
ns, err := NamespaceByID(ctx, nsID, a.core)
|
|
if err != nil || ns == nil {
|
|
return fmt.Sprintf("deleted-%v", nsID)
|
|
}
|
|
if ns.Path == "" {
|
|
return "root"
|
|
}
|
|
return ns.Path
|
|
}
|
|
|
|
type (
|
|
summaryByNamespace map[string]*processByNamespace
|
|
summaryByMount map[string]*processMount
|
|
summaryByMonth map[int64]*processMonth
|
|
)
|
|
|
|
type processCounts struct {
|
|
// entityID -> present
|
|
Entities map[string]struct{}
|
|
// count. This exists for backward compatibility
|
|
Tokens uint64
|
|
// clientID -> present
|
|
NonEntities map[string]struct{}
|
|
}
|
|
|
|
func newProcessCounts() *processCounts {
|
|
return &processCounts{
|
|
Entities: make(map[string]struct{}),
|
|
Tokens: 0,
|
|
NonEntities: make(map[string]struct{}),
|
|
}
|
|
}
|
|
|
|
func (p *processCounts) delete(client *activity.EntityRecord) {
|
|
if !p.contains(client) {
|
|
return
|
|
}
|
|
if client.NonEntity {
|
|
delete(p.NonEntities, client.ClientID)
|
|
} else {
|
|
delete(p.Entities, client.ClientID)
|
|
}
|
|
}
|
|
|
|
func (p *processCounts) add(client *activity.EntityRecord) {
|
|
if client.NonEntity {
|
|
p.NonEntities[client.ClientID] = struct{}{}
|
|
} else {
|
|
p.Entities[client.ClientID] = struct{}{}
|
|
}
|
|
}
|
|
|
|
func (p *processCounts) contains(client *activity.EntityRecord) bool {
|
|
if client.NonEntity {
|
|
_, ok := p.NonEntities[client.ClientID]
|
|
return ok
|
|
}
|
|
_, ok := p.Entities[client.ClientID]
|
|
return ok
|
|
}
|
|
|
|
type processMount struct {
|
|
Counts *processCounts
|
|
}
|
|
|
|
func newProcessMount() *processMount {
|
|
return &processMount{
|
|
Counts: newProcessCounts(),
|
|
}
|
|
}
|
|
|
|
func (p *processMount) add(client *activity.EntityRecord) {
|
|
p.Counts.add(client)
|
|
}
|
|
|
|
func (p *processMount) delete(client *activity.EntityRecord) {
|
|
p.Counts.delete(client)
|
|
}
|
|
|
|
func (s summaryByMount) add(client *activity.EntityRecord) {
|
|
if _, present := s[client.MountAccessor]; !present {
|
|
s[client.MountAccessor] = newProcessMount()
|
|
}
|
|
s[client.MountAccessor].add(client)
|
|
}
|
|
|
|
func (s summaryByMount) delete(client *activity.EntityRecord) {
|
|
if m, present := s[client.MountAccessor]; present {
|
|
m.delete(client)
|
|
}
|
|
}
|
|
|
|
type processByNamespace struct {
|
|
Counts *processCounts
|
|
Mounts summaryByMount
|
|
}
|
|
|
|
func newByNamespace() *processByNamespace {
|
|
return &processByNamespace{
|
|
Counts: newProcessCounts(),
|
|
Mounts: make(summaryByMount),
|
|
}
|
|
}
|
|
|
|
func (p *processByNamespace) add(client *activity.EntityRecord) {
|
|
p.Counts.add(client)
|
|
p.Mounts.add(client)
|
|
}
|
|
|
|
func (p *processByNamespace) delete(client *activity.EntityRecord) {
|
|
p.Counts.delete(client)
|
|
p.Mounts.delete(client)
|
|
}
|
|
|
|
func (s summaryByNamespace) add(client *activity.EntityRecord) {
|
|
if _, present := s[client.NamespaceID]; !present {
|
|
s[client.NamespaceID] = newByNamespace()
|
|
}
|
|
s[client.NamespaceID].add(client)
|
|
}
|
|
|
|
func (s summaryByNamespace) delete(client *activity.EntityRecord) {
|
|
if n, present := s[client.NamespaceID]; present {
|
|
n.delete(client)
|
|
}
|
|
}
|
|
|
|
type processNewClients struct {
|
|
Counts *processCounts
|
|
Namespaces summaryByNamespace
|
|
}
|
|
|
|
func newProcessNewClients() *processNewClients {
|
|
return &processNewClients{
|
|
Counts: newProcessCounts(),
|
|
Namespaces: make(summaryByNamespace),
|
|
}
|
|
}
|
|
|
|
func (p *processNewClients) add(client *activity.EntityRecord) {
|
|
p.Counts.add(client)
|
|
p.Namespaces.add(client)
|
|
}
|
|
|
|
func (p *processNewClients) delete(client *activity.EntityRecord) {
|
|
p.Counts.delete(client)
|
|
p.Namespaces.delete(client)
|
|
}
|
|
|
|
type processMonth struct {
|
|
Counts *processCounts
|
|
Namespaces summaryByNamespace
|
|
NewClients *processNewClients
|
|
}
|
|
|
|
func newProcessMonth() *processMonth {
|
|
return &processMonth{
|
|
Counts: newProcessCounts(),
|
|
Namespaces: make(summaryByNamespace),
|
|
NewClients: newProcessNewClients(),
|
|
}
|
|
}
|
|
|
|
func (p *processMonth) add(client *activity.EntityRecord) {
|
|
p.Counts.add(client)
|
|
p.NewClients.add(client)
|
|
p.Namespaces.add(client)
|
|
}
|
|
|
|
func (s summaryByMonth) add(client *activity.EntityRecord, startTime time.Time) {
|
|
monthTimestamp := timeutil.StartOfMonth(startTime).UTC().Unix()
|
|
if _, present := s[monthTimestamp]; !present {
|
|
s[monthTimestamp] = newProcessMonth()
|
|
}
|
|
s[monthTimestamp].add(client)
|
|
}
|
|
|
|
// processClientRecord parses the client record e and stores the breakdowns in
|
|
// the maps provided.
|
|
func processClientRecord(e *activity.EntityRecord, byNamespace summaryByNamespace, byMonth summaryByMonth, startTime time.Time) {
|
|
byNamespace.add(e)
|
|
byMonth.add(e, startTime)
|
|
}
|
|
|
|
// handleEntitySegment processes the record and adds it to the correct month/
|
|
// namespace breakdown maps, as well as to the hyperloglog for the month. New
|
|
// clients are deduplicated in opts.byMonth so that clients will only appear in
|
|
// the first month in which they are seen.
|
|
// This method must be called in reverse chronological order of the months (with
|
|
// the most recent month being called before previous months)
|
|
func (a *ActivityLog) handleEntitySegment(l *activity.EntityActivityLog, segmentTime time.Time, hll *hyperloglog.Sketch, opts pqOptions) error {
|
|
for _, e := range l.Clients {
|
|
|
|
processClientRecord(e, opts.byNamespace, opts.byMonth, segmentTime)
|
|
hll.Insert([]byte(e.ClientID))
|
|
|
|
// step forward in time through the months to check if the client is
|
|
// present. If it is, delete it. This is because the client should only
|
|
// be reported as new in the earliest month that it was seen
|
|
finalMonth := timeutil.StartOfMonth(opts.activePeriodEnd).UTC()
|
|
for currMonth := timeutil.StartOfMonth(segmentTime).UTC(); currMonth.Before(finalMonth); currMonth = timeutil.StartOfNextMonth(currMonth).UTC() {
|
|
// Invalidate the client from being a new client in the next month
|
|
next := timeutil.StartOfNextMonth(currMonth).UTC().Unix()
|
|
if _, present := opts.byMonth[next]; present {
|
|
// delete from the new clients map for the next month
|
|
// this will handle deleting from the per-namespace and per-mount maps of NewClients
|
|
opts.byMonth[next].NewClients.delete(e)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// breakdownTokenSegment handles a TokenCount record, adding it to the namespace breakdown
|
|
func (a *ActivityLog) breakdownTokenSegment(l *activity.TokenCount, byNamespace map[string]*processByNamespace) {
|
|
for nsID, v := range l.CountByNamespaceID {
|
|
if _, present := byNamespace[nsID]; !present {
|
|
byNamespace[nsID] = newByNamespace()
|
|
}
|
|
byNamespace[nsID].Counts.Tokens += v
|
|
}
|
|
}
|
|
|
|
func (a *ActivityLog) writePrecomputedQuery(ctx context.Context, segmentTime time.Time, opts pqOptions) error {
|
|
pq := &activity.PrecomputedQuery{
|
|
StartTime: segmentTime,
|
|
EndTime: opts.endTime,
|
|
Namespaces: make([]*activity.NamespaceRecord, 0, len(opts.byNamespace)),
|
|
Months: make([]*activity.MonthRecord, 0, len(opts.byMonth)),
|
|
}
|
|
// this will transform the byMonth map into the correctly formatted protobuf
|
|
pq.Months = a.transformMonthBreakdowns(opts.byMonth)
|
|
|
|
// the byNamespace map also needs to be transformed into a protobuf
|
|
for nsID, entry := range opts.byNamespace {
|
|
mountRecord := make([]*activity.MountRecord, 0, len(entry.Mounts))
|
|
for mountAccessor, mountData := range entry.Mounts {
|
|
mountRecord = append(mountRecord, &activity.MountRecord{
|
|
MountPath: a.mountAccessorToMountPath(mountAccessor),
|
|
Counts: &activity.CountsRecord{
|
|
EntityClients: len(mountData.Counts.Entities),
|
|
NonEntityClients: int(mountData.Counts.Tokens) + len(mountData.Counts.NonEntities),
|
|
},
|
|
})
|
|
}
|
|
|
|
pq.Namespaces = append(pq.Namespaces, &activity.NamespaceRecord{
|
|
NamespaceID: nsID,
|
|
Entities: uint64(len(entry.Counts.Entities)),
|
|
NonEntityTokens: entry.Counts.Tokens + uint64(len(entry.Counts.NonEntities)),
|
|
Mounts: mountRecord,
|
|
})
|
|
}
|
|
err := a.queryStore.Put(ctx, pq)
|
|
if err != nil {
|
|
a.logger.Warn("failed to store precomputed query", "error", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// pqOptions holds fields that will be used when creating precomputed queries
|
|
// These fields will remain the same for every segment that a precomputed query worker is handling
|
|
type pqOptions struct {
|
|
byNamespace map[string]*processByNamespace
|
|
byMonth map[int64]*processMonth
|
|
// endTime sets the end time of the precomputed query.
|
|
// When invoked on schedule by the precomputedQueryWorker, this is the end of the month that just finished.
|
|
endTime time.Time
|
|
// activePeriodStart is the earliest date in our retention window
|
|
activePeriodStart time.Time
|
|
// activePeriodEnd is the latest date in our retention window.
|
|
// When invoked on schedule by the precomputedQueryWorker, this will be the timestamp of the most recent segment
|
|
// that's present in storage
|
|
activePeriodEnd time.Time
|
|
}
|
|
|
|
// segmentToPrecomputedQuery processes a single segment
|
|
func (a *ActivityLog) segmentToPrecomputedQuery(ctx context.Context, segmentTime time.Time, reader SegmentReader, opts pqOptions) error {
|
|
hyperloglog, err := a.CreateOrFetchHyperlogLog(ctx, segmentTime)
|
|
if err != nil {
|
|
// We were unable to create or fetch the hll, but we should still
|
|
// continue with our precomputation
|
|
a.logger.Warn("unable to create or fetch hyperloglog", "start time", segmentTime, "error", err)
|
|
}
|
|
|
|
// Iterate through entities, adding them to the hyperloglog and the summary maps in opts
|
|
for {
|
|
entity, err := reader.ReadEntity(ctx)
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
if err != nil {
|
|
a.logger.Warn("failed to read segment", "error", err)
|
|
return err
|
|
}
|
|
err = a.handleEntitySegment(entity, segmentTime, hyperloglog, opts)
|
|
if err != nil {
|
|
a.logger.Warn("failed to handle entity segment", "error", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Store the hyperloglog
|
|
err = a.StoreHyperlogLog(ctx, segmentTime, hyperloglog)
|
|
if err != nil {
|
|
a.logger.Warn("failed to store hyperloglog for month", "start time", segmentTime, "error", err)
|
|
}
|
|
|
|
// Iterate through any tokens and add them to per namespace map
|
|
for {
|
|
token, err := reader.ReadToken(ctx)
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
if err != nil {
|
|
a.logger.Warn("failed to load token counts", "error", err)
|
|
return err
|
|
}
|
|
a.breakdownTokenSegment(token, opts.byNamespace)
|
|
}
|
|
|
|
// write metrics
|
|
for nsID, entry := range opts.byNamespace {
|
|
// If this is the most recent month, or the start of the reporting period, output
|
|
// a metric for each namespace.
|
|
if segmentTime == opts.activePeriodEnd {
|
|
a.metrics.SetGaugeWithLabels(
|
|
[]string{"identity", "entity", "active", "monthly"},
|
|
float32(len(entry.Counts.Entities)),
|
|
[]metricsutil.Label{
|
|
{Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)},
|
|
},
|
|
)
|
|
a.metrics.SetGaugeWithLabels(
|
|
[]string{"identity", "nonentity", "active", "monthly"},
|
|
float32(len(entry.Counts.NonEntities))+float32(entry.Counts.Tokens),
|
|
[]metricsutil.Label{
|
|
{Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)},
|
|
},
|
|
)
|
|
} else if segmentTime == opts.activePeriodStart {
|
|
a.metrics.SetGaugeWithLabels(
|
|
[]string{"identity", "entity", "active", "reporting_period"},
|
|
float32(len(entry.Counts.Entities)),
|
|
[]metricsutil.Label{
|
|
{Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)},
|
|
},
|
|
)
|
|
a.metrics.SetGaugeWithLabels(
|
|
[]string{"identity", "nonentity", "active", "reporting_period"},
|
|
float32(len(entry.Counts.NonEntities))+float32(entry.Counts.Tokens),
|
|
[]metricsutil.Label{
|
|
{Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)},
|
|
},
|
|
)
|
|
}
|
|
}
|
|
|
|
// convert the maps to the proper format and write them as precomputed queries
|
|
return a.writePrecomputedQuery(ctx, segmentTime, opts)
|
|
}
|
|
|
|
// goroutine to process the request in the intent log, creating precomputed queries.
|
|
// We expect the return value won't be checked, so log errors as they occur
|
|
// (but for unit testing having the error return should help.)
|
|
func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
// Cancel the context if activity log is shut down.
|
|
// This will cause the next storage operation to fail.
|
|
a.l.RLock()
|
|
// doneCh is modified in some tests, so we don't want to access that member
|
|
// without a lock, but we don't want to hold the lock for the entire lifetime
|
|
// of this goroutine. Passing the channel to the goroutine works here because
|
|
// no tests depend on us accessing the new doneCh after modifying the field.
|
|
go func(done chan struct{}) {
|
|
select {
|
|
case <-done:
|
|
cancel()
|
|
case <-ctx.Done():
|
|
break
|
|
}
|
|
}(a.doneCh)
|
|
a.l.RUnlock()
|
|
|
|
// Load the intent log
|
|
rawIntentLog, err := a.view.Get(ctx, activityIntentLogKey)
|
|
if err != nil {
|
|
a.logger.Warn("could not load intent log", "error", err)
|
|
return err
|
|
}
|
|
if rawIntentLog == nil {
|
|
a.logger.Trace("no intent log found")
|
|
return err
|
|
}
|
|
var intent ActivityIntentLog
|
|
err = json.Unmarshal(rawIntentLog.Value, &intent)
|
|
if err != nil {
|
|
a.logger.Warn("could not parse intent log", "error", err)
|
|
return err
|
|
}
|
|
|
|
// currentMonth could change (from another month end) after we release the lock.
|
|
// But, it's not critical to correct operation; this is a check for intent logs that are
|
|
// too old, and startTimestamp should only go forward (unless it is zero.)
|
|
// If there's an intent log, finish it even if the feature is currently disabled.
|
|
a.l.RLock()
|
|
currentMonth := a.currentSegment.startTimestamp
|
|
// Base retention period on the month we are generating (even in the past)--- a.clock.Now()
|
|
// would work but this will be easier to control in tests.
|
|
retentionWindow := timeutil.MonthsPreviousTo(a.retentionMonths, time.Unix(intent.NextMonth, 0).UTC())
|
|
a.l.RUnlock()
|
|
if currentMonth != 0 && intent.NextMonth != currentMonth {
|
|
a.logger.Warn("intent log does not match current segment",
|
|
"intent", intent.NextMonth, "current", currentMonth)
|
|
return errors.New("intent log is too far in the past")
|
|
}
|
|
|
|
lastMonth := intent.PreviousMonth
|
|
a.logger.Info("computing queries", "month", time.Unix(lastMonth, 0).UTC())
|
|
|
|
times, err := a.availableLogs(ctx)
|
|
if err != nil {
|
|
a.logger.Warn("could not list available logs", "error", err)
|
|
return err
|
|
}
|
|
if len(times) == 0 {
|
|
a.logger.Warn("no months in storage")
|
|
a.view.Delete(ctx, activityIntentLogKey)
|
|
return errors.New("previous month not found")
|
|
}
|
|
if times[0].Unix() != lastMonth {
|
|
a.logger.Warn("last month not in storage", "latest", times[0].Unix())
|
|
a.view.Delete(ctx, activityIntentLogKey)
|
|
return errors.New("previous month not found")
|
|
}
|
|
|
|
byNamespace := make(map[string]*processByNamespace)
|
|
byMonth := make(map[int64]*processMonth)
|
|
|
|
endTime := timeutil.EndOfMonth(time.Unix(lastMonth, 0).UTC())
|
|
activePeriodStart := timeutil.MonthsPreviousTo(a.defaultReportMonths, endTime)
|
|
// If not enough data, report as much as we have in the window
|
|
if activePeriodStart.Before(times[len(times)-1]) {
|
|
activePeriodStart = times[len(times)-1]
|
|
}
|
|
opts := pqOptions{
|
|
byNamespace: byNamespace,
|
|
byMonth: byMonth,
|
|
endTime: endTime,
|
|
activePeriodStart: activePeriodStart,
|
|
activePeriodEnd: times[0],
|
|
}
|
|
// "times" is already in reverse order, start building the per-namespace maps
|
|
// from the last month backward
|
|
for _, startTime := range times {
|
|
// Do not work back further than the current retention window,
|
|
// which will just get deleted anyway.
|
|
if startTime.Before(retentionWindow) {
|
|
break
|
|
}
|
|
reader, err := a.NewSegmentFileReader(ctx, startTime)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = a.segmentToPrecomputedQuery(ctx, startTime, reader, opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// delete the intent log
|
|
a.view.Delete(ctx, activityIntentLogKey)
|
|
|
|
a.logger.Info("finished computing queries", "month", endTime)
|
|
|
|
select {
|
|
case a.precomputedQueryWritten <- struct{}{}:
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// goroutine to delete any segments or precomputed queries older than
|
|
// the retention period.
|
|
// We expect the return value won't be checked, so log errors as they occur
|
|
// (but for unit testing having the error return should help.)
|
|
func (a *ActivityLog) retentionWorker(ctx context.Context, currentTime time.Time, retentionMonths int) error {
|
|
if a.core.activityLogConfig.DisableTimers {
|
|
return nil
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
// Cancel the context if activity log is shut down.
|
|
// This will cause the next storage operation to fail.
|
|
a.l.RLock()
|
|
doneCh := a.doneCh
|
|
a.l.RUnlock()
|
|
go func() {
|
|
select {
|
|
case <-doneCh:
|
|
cancel()
|
|
case <-ctx.Done():
|
|
break
|
|
}
|
|
}()
|
|
|
|
// everything >= the threshold is OK
|
|
retentionThreshold := timeutil.MonthsPreviousTo(retentionMonths, currentTime)
|
|
|
|
available, err := a.availableLogs(ctx)
|
|
if err != nil {
|
|
a.logger.Warn("could not list segments", "error", err)
|
|
return err
|
|
}
|
|
for _, t := range available {
|
|
// One at a time seems OK
|
|
if t.Before(retentionThreshold) {
|
|
a.logger.Trace("deleting segments", "startTime", t)
|
|
a.deleteLogWorker(ctx, t.Unix(), make(chan struct{}))
|
|
}
|
|
}
|
|
|
|
if a.queryStore != nil {
|
|
err = a.queryStore.DeleteQueriesBefore(ctx, retentionThreshold)
|
|
if err != nil {
|
|
a.logger.Warn("deletion of queries failed", "error", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Periodic report of number of active entities, with the current month.
|
|
// We don't break this down by namespace because that would require going to storage (that information
|
|
// is not currently stored in memory.)
|
|
func (a *ActivityLog) PartialMonthMetrics(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
|
|
a.fragmentLock.RLock()
|
|
defer a.fragmentLock.RUnlock()
|
|
if !a.enabled {
|
|
// Empty list
|
|
return []metricsutil.GaugeLabelValues{}, nil
|
|
}
|
|
count := len(a.partialMonthClientTracker)
|
|
|
|
return []metricsutil.GaugeLabelValues{
|
|
{
|
|
Labels: []metricsutil.Label{},
|
|
Value: float32(count),
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (c *Core) activeEntityGaugeCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
|
|
c.stateLock.RLock()
|
|
a := c.activityLog
|
|
c.stateLock.RUnlock()
|
|
if a == nil {
|
|
return []metricsutil.GaugeLabelValues{}, nil
|
|
}
|
|
return a.PartialMonthMetrics(ctx)
|
|
}
|
|
|
|
// populateNamespaceAndMonthlyBreakdowns traverses the partial month data
|
|
// stored in memory and groups them by months and namespaces.
|
|
func (a *ActivityLog) populateNamespaceAndMonthlyBreakdowns() (map[int64]*processMonth, map[string]*processByNamespace) {
|
|
// Parse the monthly clients and prepare the breakdowns.
|
|
byNamespace := make(map[string]*processByNamespace)
|
|
byMonth := make(map[int64]*processMonth)
|
|
for _, e := range a.partialMonthClientTracker {
|
|
processClientRecord(e, byNamespace, byMonth, a.clock.Now())
|
|
}
|
|
return byMonth, byNamespace
|
|
}
|
|
|
|
// transformMonthBreakdowns converts a map of unix timestamp -> processMonth to
|
|
// a slice of MonthRecord
|
|
func (a *ActivityLog) transformMonthBreakdowns(byMonth map[int64]*processMonth) []*activity.MonthRecord {
|
|
monthly := make([]*activity.MonthRecord, 0)
|
|
processByNamespaces := func(nsMap map[string]*processByNamespace) []*activity.MonthlyNamespaceRecord {
|
|
nsRecord := make([]*activity.MonthlyNamespaceRecord, 0, len(nsMap))
|
|
for nsID, nsData := range nsMap {
|
|
// Process mount specific data within a namespace within a given month
|
|
mountRecord := make([]*activity.MountRecord, 0, len(nsMap[nsID].Mounts))
|
|
for mountAccessor, mountData := range nsMap[nsID].Mounts {
|
|
mountRecord = append(mountRecord, &activity.MountRecord{
|
|
MountPath: a.mountAccessorToMountPath(mountAccessor),
|
|
Counts: &activity.CountsRecord{
|
|
EntityClients: len(mountData.Counts.Entities),
|
|
NonEntityClients: int(mountData.Counts.Tokens) + len(mountData.Counts.NonEntities),
|
|
},
|
|
})
|
|
}
|
|
|
|
// Process ns specific data within a given month
|
|
nsRecord = append(nsRecord, &activity.MonthlyNamespaceRecord{
|
|
NamespaceID: nsID,
|
|
Counts: &activity.CountsRecord{
|
|
EntityClients: len(nsData.Counts.Entities),
|
|
NonEntityClients: int(nsData.Counts.Tokens) + len(nsData.Counts.NonEntities),
|
|
},
|
|
Mounts: mountRecord,
|
|
})
|
|
}
|
|
return nsRecord
|
|
}
|
|
for timestamp, monthData := range byMonth {
|
|
newClientsNSRecord := processByNamespaces(monthData.NewClients.Namespaces)
|
|
newClientRecord := &activity.NewClientRecord{
|
|
Counts: &activity.CountsRecord{
|
|
EntityClients: len(monthData.NewClients.Counts.Entities),
|
|
NonEntityClients: int(monthData.NewClients.Counts.Tokens) + len(monthData.NewClients.Counts.NonEntities),
|
|
},
|
|
Namespaces: newClientsNSRecord,
|
|
}
|
|
|
|
// Process all the months
|
|
monthly = append(monthly, &activity.MonthRecord{
|
|
Timestamp: timestamp,
|
|
Counts: &activity.CountsRecord{
|
|
EntityClients: len(monthData.Counts.Entities),
|
|
NonEntityClients: int(monthData.Counts.Tokens) + len(monthData.Counts.NonEntities),
|
|
},
|
|
Namespaces: processByNamespaces(monthData.Namespaces),
|
|
NewClients: newClientRecord,
|
|
})
|
|
}
|
|
return monthly
|
|
}
|
|
|
|
func (a *ActivityLog) calculateByNamespaceResponseForQuery(ctx context.Context, byNamespace []*activity.NamespaceRecord) (int, int, []*ResponseNamespace, error) {
|
|
queryNS, err := namespace.FromContext(ctx)
|
|
if err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
byNamespaceResponse := make([]*ResponseNamespace, 0)
|
|
totalEntities := 0
|
|
totalTokens := 0
|
|
|
|
for _, nsRecord := range byNamespace {
|
|
ns, err := NamespaceByID(ctx, nsRecord.NamespaceID, a.core)
|
|
if err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
if a.includeInResponse(queryNS, ns) {
|
|
mountResponse := make([]*ResponseMount, 0, len(nsRecord.Mounts))
|
|
for _, mountRecord := range nsRecord.Mounts {
|
|
mountResponse = append(mountResponse, &ResponseMount{
|
|
MountPath: mountRecord.MountPath,
|
|
Counts: &ResponseCounts{
|
|
DistinctEntities: int(mountRecord.Counts.EntityClients),
|
|
EntityClients: int(mountRecord.Counts.EntityClients),
|
|
NonEntityClients: int(mountRecord.Counts.NonEntityClients),
|
|
NonEntityTokens: int(mountRecord.Counts.NonEntityClients),
|
|
Clients: int(mountRecord.Counts.EntityClients + mountRecord.Counts.NonEntityClients),
|
|
},
|
|
})
|
|
}
|
|
// Sort the mounts in descending order of usage
|
|
sort.Slice(mountResponse, func(i, j int) bool {
|
|
return mountResponse[i].Counts.Clients > mountResponse[j].Counts.Clients
|
|
})
|
|
|
|
var displayPath string
|
|
if ns == nil {
|
|
displayPath = fmt.Sprintf("deleted namespace %q", nsRecord.NamespaceID)
|
|
} else {
|
|
displayPath = ns.Path
|
|
}
|
|
byNamespaceResponse = append(byNamespaceResponse, &ResponseNamespace{
|
|
NamespaceID: nsRecord.NamespaceID,
|
|
NamespacePath: displayPath,
|
|
Counts: ResponseCounts{
|
|
DistinctEntities: int(nsRecord.Entities),
|
|
EntityClients: int(nsRecord.Entities),
|
|
NonEntityTokens: int(nsRecord.NonEntityTokens),
|
|
NonEntityClients: int(nsRecord.NonEntityTokens),
|
|
Clients: int(nsRecord.Entities + nsRecord.NonEntityTokens),
|
|
},
|
|
Mounts: mountResponse,
|
|
})
|
|
totalEntities += int(nsRecord.Entities)
|
|
totalTokens += int(nsRecord.NonEntityTokens)
|
|
}
|
|
}
|
|
return totalEntities, totalTokens, byNamespaceResponse, nil
|
|
}
|
|
|
|
func (a *ActivityLog) prepareMonthsResponseForQuery(ctx context.Context, byMonth []*activity.MonthRecord) ([]*ResponseMonth, error) {
|
|
months := make([]*ResponseMonth, 0, len(byMonth))
|
|
for _, monthsRecord := range byMonth {
|
|
newClientsResponse := &ResponseNewClients{}
|
|
if int(monthsRecord.NewClients.Counts.EntityClients+monthsRecord.NewClients.Counts.NonEntityClients) != 0 {
|
|
newClientsNSResponse, err := a.prepareNamespaceResponse(ctx, monthsRecord.NewClients.Namespaces)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
newClientsResponse.Counts = &ResponseCounts{
|
|
EntityClients: int(monthsRecord.NewClients.Counts.EntityClients),
|
|
NonEntityClients: int(monthsRecord.NewClients.Counts.NonEntityClients),
|
|
Clients: int(monthsRecord.NewClients.Counts.EntityClients + monthsRecord.NewClients.Counts.NonEntityClients),
|
|
}
|
|
newClientsResponse.Namespaces = newClientsNSResponse
|
|
}
|
|
|
|
monthResponse := &ResponseMonth{
|
|
Timestamp: time.Unix(monthsRecord.Timestamp, 0).UTC().Format(time.RFC3339),
|
|
}
|
|
if int(monthsRecord.Counts.EntityClients+monthsRecord.Counts.NonEntityClients) != 0 {
|
|
nsResponse, err := a.prepareNamespaceResponse(ctx, monthsRecord.Namespaces)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
monthResponse.Counts = &ResponseCounts{
|
|
EntityClients: int(monthsRecord.Counts.EntityClients),
|
|
NonEntityClients: int(monthsRecord.Counts.NonEntityClients),
|
|
Clients: int(monthsRecord.Counts.EntityClients + monthsRecord.Counts.NonEntityClients),
|
|
}
|
|
monthResponse.Namespaces = nsResponse
|
|
monthResponse.NewClients = newClientsResponse
|
|
months = append(months, monthResponse)
|
|
}
|
|
}
|
|
return months, nil
|
|
}
|
|
|
|
// prepareNamespaceResponse populates the namespace portion of the activity log response struct
|
|
// from
|
|
func (a *ActivityLog) prepareNamespaceResponse(ctx context.Context, nsRecords []*activity.MonthlyNamespaceRecord) ([]*ResponseNamespace, error) {
|
|
queryNS, err := namespace.FromContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
nsResponse := make([]*ResponseNamespace, 0, len(nsRecords))
|
|
for _, nsRecord := range nsRecords {
|
|
if int(nsRecord.Counts.EntityClients) == 0 && int(nsRecord.Counts.NonEntityClients) == 0 {
|
|
continue
|
|
}
|
|
|
|
ns, err := NamespaceByID(ctx, nsRecord.NamespaceID, a.core)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if a.includeInResponse(queryNS, ns) {
|
|
mountResponse := make([]*ResponseMount, 0, len(nsRecord.Mounts))
|
|
for _, mountRecord := range nsRecord.Mounts {
|
|
if int(mountRecord.Counts.EntityClients) == 0 && int(mountRecord.Counts.NonEntityClients) == 0 {
|
|
continue
|
|
}
|
|
|
|
mountResponse = append(mountResponse, &ResponseMount{
|
|
MountPath: mountRecord.MountPath,
|
|
Counts: &ResponseCounts{
|
|
EntityClients: int(mountRecord.Counts.EntityClients),
|
|
NonEntityClients: int(mountRecord.Counts.NonEntityClients),
|
|
Clients: int(mountRecord.Counts.EntityClients + mountRecord.Counts.NonEntityClients),
|
|
},
|
|
})
|
|
}
|
|
|
|
var displayPath string
|
|
if ns == nil {
|
|
displayPath = fmt.Sprintf("deleted namespace %q", nsRecord.NamespaceID)
|
|
} else {
|
|
displayPath = ns.Path
|
|
}
|
|
nsResponse = append(nsResponse, &ResponseNamespace{
|
|
NamespaceID: nsRecord.NamespaceID,
|
|
NamespacePath: displayPath,
|
|
Counts: ResponseCounts{
|
|
EntityClients: int(nsRecord.Counts.EntityClients),
|
|
NonEntityClients: int(nsRecord.Counts.NonEntityClients),
|
|
Clients: int(nsRecord.Counts.EntityClients + nsRecord.Counts.NonEntityClients),
|
|
},
|
|
Mounts: mountResponse,
|
|
})
|
|
}
|
|
}
|
|
return nsResponse, nil
|
|
}
|
|
|
|
// partialMonthClientCount returns the number of clients used so far this month.
|
|
// If activity log is not enabled, the response will be nil
|
|
func (a *ActivityLog) partialMonthClientCount(ctx context.Context) (map[string]interface{}, error) {
|
|
a.fragmentLock.RLock()
|
|
defer a.fragmentLock.RUnlock()
|
|
|
|
if !a.enabled {
|
|
// nothing to count
|
|
return nil, nil
|
|
}
|
|
|
|
// Traverse through current month's activitylog data and group clients
|
|
// into months and namespaces
|
|
byMonth, byNamespace := a.populateNamespaceAndMonthlyBreakdowns()
|
|
|
|
// Convert the byNamespace breakdowns into structs that are
|
|
// consumable by the /activity endpoint, so as to reuse code between these two
|
|
// endpoints.
|
|
byNamespaceComputation := a.transformALNamespaceBreakdowns(byNamespace)
|
|
|
|
// Calculate the namespace response breakdowns and totals for entities and tokens from the initial
|
|
// namespace data.
|
|
totalEntities, totalTokens, byNamespaceResponse, err := a.calculateByNamespaceResponseForQuery(ctx, byNamespaceComputation)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Sort clients within each namespace
|
|
a.sortALResponseNamespaces(byNamespaceResponse)
|
|
|
|
// Now populate the response based on breakdowns.
|
|
responseData := make(map[string]interface{})
|
|
responseData["by_namespace"] = byNamespaceResponse
|
|
responseData["distinct_entities"] = totalEntities
|
|
responseData["entity_clients"] = totalEntities
|
|
responseData["non_entity_tokens"] = totalTokens
|
|
responseData["non_entity_clients"] = totalTokens
|
|
responseData["clients"] = totalEntities + totalTokens
|
|
|
|
// The partialMonthClientCount should not have more than one month worth of data.
|
|
// If it does, something has gone wrong and we should warn that the activity log data
|
|
// might be inaccurate.
|
|
if len(byMonth) != 1 {
|
|
monthTimestamps := make([]string, 0)
|
|
for timestamp := range byMonth {
|
|
dateTimeString := time.Unix(timestamp, 0).UTC().Format(time.RFC3339)
|
|
monthTimestamps = append(monthTimestamps, dateTimeString)
|
|
}
|
|
a.logger.Error("more or less than one month of data recorded in current month's activity log", "timestamps", monthTimestamps)
|
|
}
|
|
|
|
// Convert the byMonth breakdowns into structs that are
|
|
// consumable by the /activity endpoint, so as to reuse code between these two
|
|
// endpoints.
|
|
monthlyComputation := a.transformMonthBreakdowns(byMonth)
|
|
|
|
// Create and populate the month response structs based on the monthly breakdown.
|
|
months, err := a.prepareMonthsResponseForQuery(ctx, monthlyComputation)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Sort the months and clients within each month before adding the months to the response
|
|
a.sortActivityLogMonthsResponse(months)
|
|
responseData["months"] = months
|
|
|
|
return responseData, nil
|
|
}
|
|
|
|
func (a *ActivityLog) writeExport(ctx context.Context, rw http.ResponseWriter, format string, startTime, endTime time.Time) error {
|
|
// For capacity reasons only allow a single in-process export at a time.
|
|
// TODO do we really need to do this?
|
|
if !a.inprocessExport.CAS(false, true) {
|
|
return fmt.Errorf("existing export in progress")
|
|
}
|
|
defer a.inprocessExport.Store(false)
|
|
|
|
// Find the months with activity log data that are between the start and end
|
|
// months. We want to walk this in cronological order so the oldest instance of a
|
|
// client usage is recorded, not the most recent.
|
|
times, err := a.availableLogs(ctx)
|
|
if err != nil {
|
|
a.logger.Warn("failed to list available log segments", "error", err)
|
|
return fmt.Errorf("failed to list available log segments: %w", err)
|
|
}
|
|
sort.Slice(times, func(i, j int) bool {
|
|
// sort in chronological order to produce the output we want showing what
|
|
// month an entity first had activity.
|
|
return times[i].Before(times[j])
|
|
})
|
|
|
|
// Filter over just the months we care about
|
|
filteredList := make([]time.Time, 0, len(times))
|
|
for _, t := range times {
|
|
if timeutil.InRange(t, startTime, endTime) {
|
|
filteredList = append(filteredList, t)
|
|
}
|
|
}
|
|
if len(filteredList) == 0 {
|
|
a.logger.Info("no data to export", "start_time", startTime, "end_time", endTime)
|
|
return fmt.Errorf("no data to export in provided time range")
|
|
}
|
|
|
|
actualStartTime := filteredList[len(filteredList)-1]
|
|
a.logger.Trace("choose start time for export", "actualStartTime", actualStartTime, "months_included", filteredList)
|
|
|
|
// Add headers here because we start to immediately write in the csv encoder
|
|
// constructor.
|
|
rw.Header().Add("Content-Disposition", fmt.Sprintf("attachment; filename=\"activity_export_%d_to_%d.%s\"", actualStartTime.Unix(), endTime.Unix(), format))
|
|
rw.Header().Add("Content-Type", fmt.Sprintf("application/%s", format))
|
|
|
|
var encoder encoder
|
|
switch format {
|
|
case "json":
|
|
encoder = newJSONEncoder(rw)
|
|
case "csv":
|
|
var err error
|
|
encoder, err = newCSVEncoder(rw)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create csv encoder: %w", err)
|
|
}
|
|
default:
|
|
return fmt.Errorf("invalid format: %s", format)
|
|
}
|
|
|
|
a.logger.Info("starting activity log export", "start_time", startTime, "end_time", endTime, "format", format)
|
|
|
|
dedupedIds := make(map[string]struct{})
|
|
|
|
walkEntities := func(l *activity.EntityActivityLog, startTime time.Time, hll *hyperloglog.Sketch) error {
|
|
for _, e := range l.Clients {
|
|
if _, ok := dedupedIds[e.ClientID]; ok {
|
|
continue
|
|
}
|
|
|
|
dedupedIds[e.ClientID] = struct{}{}
|
|
err := encoder.Encode(e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// For each month in the filtered list walk all the log segments
|
|
|
|
for _, startTime := range filteredList {
|
|
err := a.WalkEntitySegments(ctx, startTime, nil, walkEntities)
|
|
if err != nil {
|
|
a.logger.Error("failed to load segments for export", "error", err)
|
|
return fmt.Errorf("failed to load segments for export: %w", err)
|
|
}
|
|
}
|
|
|
|
// Flush and error check the encoder. This is neccessary for buffered
|
|
// encoders like csv.
|
|
encoder.Flush()
|
|
if err := encoder.Error(); err != nil {
|
|
a.logger.Error("failed to flush export encoding", "error", err)
|
|
return fmt.Errorf("failed to flush export encoding: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type encoder interface {
|
|
Encode(*activity.EntityRecord) error
|
|
Flush()
|
|
Error() error
|
|
}
|
|
|
|
var _ encoder = (*jsonEncoder)(nil)
|
|
|
|
type jsonEncoder struct {
|
|
e *json.Encoder
|
|
}
|
|
|
|
func newJSONEncoder(w io.Writer) *jsonEncoder {
|
|
return &jsonEncoder{
|
|
e: json.NewEncoder(w),
|
|
}
|
|
}
|
|
|
|
func (j *jsonEncoder) Encode(er *activity.EntityRecord) error {
|
|
return j.e.Encode(er)
|
|
}
|
|
|
|
// Flush is a no-op because json.Encoder doesn't buffer data
|
|
func (j *jsonEncoder) Flush() {}
|
|
|
|
// Error is a no-op because flushing is a no-op.
|
|
func (j *jsonEncoder) Error() error { return nil }
|
|
|
|
var _ encoder = (*csvEncoder)(nil)
|
|
|
|
type csvEncoder struct {
|
|
*csv.Writer
|
|
}
|
|
|
|
func newCSVEncoder(w io.Writer) (*csvEncoder, error) {
|
|
writer := csv.NewWriter(w)
|
|
|
|
err := writer.Write([]string{
|
|
"client_id",
|
|
"namespace_id",
|
|
"timestamp",
|
|
"non_entity",
|
|
"mount_accessor",
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &csvEncoder{
|
|
Writer: writer,
|
|
}, nil
|
|
}
|
|
|
|
// Encode converts an export bundle into a set of strings and writes them to the
|
|
// csv writer.
|
|
func (c *csvEncoder) Encode(e *activity.EntityRecord) error {
|
|
return c.Writer.Write([]string{
|
|
e.ClientID,
|
|
e.NamespaceID,
|
|
fmt.Sprintf("%d", e.Timestamp),
|
|
fmt.Sprintf("%t", e.NonEntity),
|
|
e.MountAccessor,
|
|
})
|
|
}
|