package vault import ( "context" "encoding/json" "errors" "fmt" "os" "sort" "strconv" "strings" "sync" "time" "unicode/utf8" "github.com/golang/protobuf/proto" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/vault/helper/metricsutil" "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/helper/timeutil" "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/vault/activity" "github.com/mitchellh/copystructure" ) const ( // activitySubPath is the directory under the system view where // the log will be stored. activitySubPath = "counters/activity/" activityEntityBasePath = "log/entity/" activityTokenBasePath = "log/directtokens/" activityQueryBasePath = "queries/" activityConfigKey = "config" activityIntentLogKey = "endofmonth" // for testing purposes (public as needed) ActivityLogPrefix = "sys/counters/activity/log/" ActivityPrefix = "sys/counters/activity/" // Time to wait on perf standby before sending fragment activityFragmentStandbyTime = 10 * time.Minute // Time between writes of segment to storage activitySegmentInterval = 10 * time.Minute // Timeout on RPC calls. activityFragmentSendTimeout = 1 * time.Minute // Timeout on storage calls. activitySegmentWriteTimeout = 1 * time.Minute // Number of entity records to store per segment // Estimated as 512KiB / 64 bytes = 8192, rounded down activitySegmentEntityCapacity = 8000 // Maximum number of segments per month activityLogMaxSegmentPerMonth = 81 // Number of records (entity or token) to store in a // standby fragment before sending it to the active node. // Estimates as 8KiB / 64 bytes = 128 activityFragmentStandbyCapacity = 128 // trackedTWESegmentPeriod is a time period of a little over a month, and represents // the amount of time that needs to pass after a 1.9 or later upgrade to result in // all fragments and segments no longer storing token counts in the directtokens // storage path. trackedTWESegmentPeriod = 35 * 24 ) type segmentInfo struct { startTimestamp int64 currentClients *activity.EntityActivityLog clientSequenceNumber uint64 // DEPRECATED // This field is needed for backward compatibility with fragments // and segments created with vault versions before 1.9. tokenCount *activity.TokenCount } type clients struct { distinctEntities uint64 distinctNonEntities uint64 } // ActivityLog tracks unique entity counts and non-entity token counts. // It handles assembling log fragments (and sending them to the active // node), writing log segments, and precomputing queries. type ActivityLog struct { core *Core configOverrides *ActivityLogCoreConfig // ActivityLog.l protects the configuration settings, except enable, and any modifications // to the current segment. // Acquire "l" before fragmentLock if both must be held. l sync.RWMutex // fragmentLock protects enable, activeClients, fragment, standbyFragmentsReceived fragmentLock sync.RWMutex // enabled indicates if the activity log is enabled for this cluster. // This is protected by fragmentLock so we can check with only // a single synchronization call. enabled bool // log destination logger log.Logger // metrics sink metrics metricsutil.Metrics // view is the storage location used by ActivityLog, // defaults to sys/activity. view *BarrierView // nodeID is the ID to use for all fragments that // are generated. // TODO: use secondary ID when available? nodeID string // current log fragment (may be nil) fragment *activity.LogFragment fragmentCreation time.Time // Channel to signal a new fragment has been created // so it's appropriate to start the timer. newFragmentCh chan struct{} // Channel for sending fragment immediately sendCh chan struct{} // Channel for writing fragment immediately writeCh chan struct{} // Channel to stop background processing doneCh chan struct{} // track metadata and contents of the most recent log segment currentSegment segmentInfo // Fragments received from performance standbys standbyFragmentsReceived []*activity.LogFragment // precomputed queries queryStore *activity.PrecomputedQueryStore defaultReportMonths int retentionMonths int // channel closed by delete worker when done deleteDone chan struct{} // channel closed when deletion at startup is done // (for unit test robustness) retentionDone chan struct{} // for testing: is config currently being invalidated. protected by l configInvalidationInProgress bool // clientTracker tracks active clients this month. Protected by fragmentLock. clientTracker *ClientTracker } type ClientTracker struct { // All known active clients this month; use fragmentLock read-locked // to check whether it already exists. activeClients map[string]struct{} entityCountByNamespaceID map[string]uint64 nonEntityCountByNamespaceID map[string]uint64 } // These non-persistent configuration options allow us to disable // parts of the implementation for integration testing. // The default values should turn everything on. type ActivityLogCoreConfig struct { // Enable activity log even if the feature flag not set ForceEnable bool // Do not start timers to send or persist fragments. DisableTimers bool } // NewActivityLog creates an activity log. func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics metricsutil.Metrics) (*ActivityLog, error) { hostname, err := os.Hostname() if err != nil { return nil, err } a := &ActivityLog{ core: core, configOverrides: &core.activityLogConfig, logger: logger, view: view, metrics: metrics, nodeID: hostname, newFragmentCh: make(chan struct{}, 1), sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size writeCh: make(chan struct{}, 1), // same for full segment doneCh: make(chan struct{}, 1), clientTracker: &ClientTracker{ activeClients: make(map[string]struct{}), entityCountByNamespaceID: make(map[string]uint64), nonEntityCountByNamespaceID: make(map[string]uint64), }, currentSegment: segmentInfo{ startTimestamp: 0, currentClients: &activity.EntityActivityLog{ Clients: make([]*activity.EntityRecord, 0), }, // tokenCount is deprecated, but must still exist for the current segment // so the fragment that was using TWEs before the 1.9 changes // can be flushed to the current segment. tokenCount: &activity.TokenCount{ CountByNamespaceID: make(map[string]uint64), }, clientSequenceNumber: 0, }, standbyFragmentsReceived: make([]*activity.LogFragment, 0), } config, err := a.loadConfigOrDefault(core.activeContext) if err != nil { return nil, err } a.SetConfigInit(config) a.queryStore = activity.NewPrecomputedQueryStore( logger, view.SubView(activityQueryBasePath), config.RetentionMonths) return a, nil } // saveCurrentSegmentToStorage updates the record of Entities or // Non Entity Tokens in persistent storage // :force: forces a save of tokens/entities even if the in-memory log is empty func (a *ActivityLog) saveCurrentSegmentToStorage(ctx context.Context, force bool) error { // Prevent simultaneous changes to segment a.l.Lock() defer a.l.Unlock() return a.saveCurrentSegmentToStorageLocked(ctx, force) } // Must be called with l held. // :force: forces a save of tokens/entities even if the in-memory log is empty func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, force bool) error { defer a.metrics.MeasureSinceWithLabels([]string{"core", "activity", "segment_write"}, time.Now(), []metricsutil.Label{}) // Swap out the pending fragments a.fragmentLock.Lock() localFragment := a.fragment a.fragment = nil standbys := a.standbyFragmentsReceived a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) a.fragmentLock.Unlock() // If segment start time is zero, do not update or write // (even if force is true). This can happen if activityLog is // disabled after a save as been triggered. if a.currentSegment.startTimestamp == 0 { return nil } // Measure the current fragment if localFragment != nil { a.metrics.IncrCounterWithLabels([]string{"core", "activity", "fragment_size"}, float32(len(localFragment.Clients)), []metricsutil.Label{ {"type", "entity"}, }) a.metrics.IncrCounterWithLabels([]string{"core", "activity", "fragment_size"}, float32(len(localFragment.NonEntityTokens)), []metricsutil.Label{ {"type", "direct_token"}, }) } // Collect new entities and new tokens. saveChanges := false newEntities := make(map[string]*activity.EntityRecord) for _, f := range append(standbys, localFragment) { if f == nil { continue } for _, e := range f.Clients { // We could sort by timestamp to see which is first. // We'll ignore that; the order of the append above means // that we choose entries in localFragment over those // from standby nodes. newEntities[e.ClientID] = e saveChanges = true } // As of 1.9, a fragment should no longer have any NonEntityTokens. However // in order to not lose any information about the current segment during the // month when the client upgrades to 1.9, we must retain this functionality. for ns, val := range f.NonEntityTokens { // We track these pre-1.9 values in the old location, which is // a.currentSegment.tokenCount, as opposed to the counter that stores tokens // without entities that have client IDs, namely // a.clientTracker.nonEntityCountByNamespaceID. This preserves backward // compatibility for the precomputedQueryWorkers and the segment storing // logic. a.currentSegment.tokenCount.CountByNamespaceID[ns] += val saveChanges = true } } if !saveChanges { return nil } // Will all new entities fit? If not, roll over to a new segment. available := activitySegmentEntityCapacity - len(a.currentSegment.currentClients.Clients) remaining := available - len(newEntities) excess := 0 if remaining < 0 { excess = -remaining } segmentClients := a.currentSegment.currentClients.Clients excessClients := make([]*activity.EntityRecord, 0, excess) for _, record := range newEntities { if available > 0 { segmentClients = append(segmentClients, record) available -= 1 } else { excessClients = append(excessClients, record) } } a.currentSegment.currentClients.Clients = segmentClients err := a.saveCurrentSegmentInternal(ctx, force) if err != nil { // The current fragment(s) have already been placed into the in-memory // segment, but we may lose any excess (in excessClients). // There isn't a good way to unwind the transaction on failure, // so we may just lose some records. return err } if available <= 0 { if a.currentSegment.clientSequenceNumber >= activityLogMaxSegmentPerMonth { // Cannot send as Warn because it will repeat too often, // and disabling/renabling would be complicated. a.logger.Trace("too many segments in current month", "dropped", len(excessClients)) return nil } // Rotate to next segment a.currentSegment.clientSequenceNumber += 1 if len(excessClients) > activitySegmentEntityCapacity { a.logger.Warn("too many new active entities, dropping tail", "entities", len(excessClients)) excessClients = excessClients[:activitySegmentEntityCapacity] } a.currentSegment.currentClients.Clients = excessClients err := a.saveCurrentSegmentInternal(ctx, force) if err != nil { return err } } return nil } // :force: forces a save of tokens/entities even if the in-memory log is empty func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error { entityPath := fmt.Sprintf("log/entity/%d/%d", a.currentSegment.startTimestamp, a.currentSegment.clientSequenceNumber) // RFC (VLT-120) defines this as 1-indexed, but it should be 0-indexed tokenPath := fmt.Sprintf("log/directtokens/%d/0", a.currentSegment.startTimestamp) for _, client := range a.currentSegment.currentClients.Clients { // Explicitly catch and throw clear error message if client ID creation and storage // results in a []byte that doesn't assert into a valid string. if !utf8.ValidString(client.ClientID) { return fmt.Errorf("client ID %q is not a valid string:", client.ClientID) } } if len(a.currentSegment.currentClients.Clients) > 0 || force { entities, err := proto.Marshal(a.currentSegment.currentClients) if err != nil { return err } a.logger.Trace("writing segment", "path", entityPath) err = a.view.Put(ctx, &logical.StorageEntry{ Key: entityPath, Value: entities, }) if err != nil { return err } } // We must still allow for the tokenCount of the current segment to // be written to storage, since if we remove this code we will incur // data loss for one segment's worth of TWEs. if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 || force { // We can get away with simply using the oldest version stored because // the storing of versions was introduced at the same time as this code. oldestVersion, oldestUpgradeTime, err := a.core.FindOldestVersionTimestamp() switch { case err != nil: a.logger.Error(fmt.Sprintf("unable to retrieve oldest version timestamp: %s", err.Error())) case len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 && (oldestUpgradeTime.Add(time.Duration(trackedTWESegmentPeriod * time.Hour)).Before(time.Now())): a.logger.Error(fmt.Sprintf("storing nonzero token count over a month after vault was upgraded to %s", oldestVersion)) default: if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 { a.logger.Info("storing nonzero token count") } } tokenCount, err := proto.Marshal(a.currentSegment.tokenCount) if err != nil { return err } a.logger.Trace("writing segment", "path", tokenPath) err = a.view.Put(ctx, &logical.StorageEntry{ Key: tokenPath, Value: tokenCount, }) if err != nil { return err } } return nil } // parseSegmentNumberFromPath returns the segment number from a path // (and if it exists - it is the last element in the path) func parseSegmentNumberFromPath(path string) (int, bool) { // as long as both s and sep are not "", len(elems) >= 1 elems := strings.Split(path, "/") segmentNum, err := strconv.Atoi(elems[len(elems)-1]) if err != nil { return 0, false } return segmentNum, true } // availableLogs returns the start_time(s) (in UTC) associated with months for which logs exist, // sorted last to first func (a *ActivityLog) availableLogs(ctx context.Context) ([]time.Time, error) { paths := make([]string, 0) for _, basePath := range []string{activityEntityBasePath, activityTokenBasePath} { p, err := a.view.List(ctx, basePath) if err != nil { return nil, err } paths = append(paths, p...) } pathSet := make(map[time.Time]struct{}) out := make([]time.Time, 0) for _, path := range paths { // generate a set of unique start times time, err := timeutil.ParseTimeFromPath(path) if err != nil { return nil, err } if _, present := pathSet[time]; !present { pathSet[time] = struct{}{} out = append(out, time) } } sort.Slice(out, func(i, j int) bool { // sort in reverse order to make processing most recent segment easier return out[i].After(out[j]) }) a.logger.Trace("scanned existing logs", "out", out) return out, nil } // getMostRecentActivityLogSegment gets the times (in UTC) associated with the most recent // contiguous set of activity logs, sorted in decreasing order (latest to earliest) func (a *ActivityLog) getMostRecentActivityLogSegment(ctx context.Context) ([]time.Time, error) { logTimes, err := a.availableLogs(ctx) if err != nil { return nil, err } return timeutil.GetMostRecentContiguousMonths(logTimes), nil } // getLastEntitySegmentNumber returns the (non-negative) last segment number for the :startTime:, if it exists func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime time.Time) (uint64, bool, error) { p, err := a.view.List(ctx, activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/") if err != nil { return 0, false, err } highestNum := -1 for _, path := range p { if num, ok := parseSegmentNumberFromPath(path); ok { if num > highestNum { highestNum = num } } } if highestNum < 0 { // numbers less than 0 are invalid. if a negative number is the highest value, there isn't a segment return 0, false, nil } return uint64(highestNum), true, nil } // WalkEntitySegments loads each of the entity segments for a particular start time func (a *ActivityLog) WalkEntitySegments(ctx context.Context, startTime time.Time, walkFn func(*activity.EntityActivityLog)) error { basePath := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" pathList, err := a.view.List(ctx, basePath) if err != nil { return err } for _, path := range pathList { raw, err := a.view.Get(ctx, basePath+path) if err != nil { return err } if raw == nil { a.logger.Warn("expected log segment not found", "startTime", startTime, "segment", path) continue } out := &activity.EntityActivityLog{} err = proto.Unmarshal(raw.Value, out) if err != nil { return fmt.Errorf("unable to parse segment %v%v: %w", basePath, path, err) } walkFn(out) } return nil } // WalkTokenSegments loads each of the token segments (expected 1) for a particular start time func (a *ActivityLog) WalkTokenSegments(ctx context.Context, startTime time.Time, walkFn func(*activity.TokenCount)) error { basePath := activityTokenBasePath + fmt.Sprint(startTime.Unix()) + "/" pathList, err := a.view.List(ctx, basePath) if err != nil { return err } for _, path := range pathList { raw, err := a.view.Get(ctx, basePath+path) if err != nil { return err } if raw == nil { a.logger.Trace("no tokens without entities stored without tracking", "startTime", startTime, "segment", path) continue } out := &activity.TokenCount{} err = proto.Unmarshal(raw.Value, out) if err != nil { return fmt.Errorf("unable to parse token segment %v%v: %w", basePath, path, err) } walkFn(out) } return nil } // loadPriorEntitySegment populates the in-memory tracker for entity IDs that have // been active "this month" func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time.Time, sequenceNum uint64) error { path := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10) data, err := a.view.Get(ctx, path) if err != nil { return err } if data == nil { return nil } out := &activity.EntityActivityLog{} err = proto.Unmarshal(data.Value, out) if err != nil { return err } a.l.RLock() a.fragmentLock.Lock() // Handle the (unlikely) case where the end of the month has been reached while background loading. // Or the feature has been disabled. if a.enabled && startTime.Unix() == a.currentSegment.startTimestamp { for _, ent := range out.Clients { a.clientTracker.addClient(ent) } } a.fragmentLock.Unlock() a.l.RUnlock() return nil } // loadCurrentClientSegment loads the most recent segment (for "this month") into memory // (to append new entries), and to the activeClients to avoid duplication // call with fragmentLock and l held func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime time.Time, sequenceNum uint64) error { path := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10) data, err := a.view.Get(ctx, path) if err != nil { return err } if data == nil { return nil } out := &activity.EntityActivityLog{} err = proto.Unmarshal(data.Value, out) if err != nil { return err } if !a.core.perfStandby { a.currentSegment = segmentInfo{ startTimestamp: startTime.Unix(), currentClients: &activity.EntityActivityLog{ Clients: out.Clients, }, tokenCount: a.currentSegment.tokenCount, clientSequenceNumber: sequenceNum, } } else { // populate this for edge case checking (if end of month passes while background loading on standby) a.currentSegment.startTimestamp = startTime.Unix() } for _, ent := range out.Clients { a.clientTracker.addClient(ent) } return nil } // tokenCountExists checks if there's a token log for :startTime: // this function should be called with the lock held func (a *ActivityLog) tokenCountExists(ctx context.Context, startTime time.Time) (bool, error) { p, err := a.view.List(ctx, activityTokenBasePath+fmt.Sprint(startTime.Unix())+"/") if err != nil { return false, err } for _, path := range p { if num, ok := parseSegmentNumberFromPath(path); ok && num == 0 { return true, nil } } return false, nil } // loadTokenCount populates the in-memory representation of activity token count // this function should be called with the lock held func (a *ActivityLog) loadTokenCount(ctx context.Context, startTime time.Time) error { tokenCountExists, err := a.tokenCountExists(ctx, startTime) if err != nil { return err } if !tokenCountExists { return nil } path := activityTokenBasePath + fmt.Sprint(startTime.Unix()) + "/0" data, err := a.view.Get(ctx, path) if err != nil { return err } if data == nil { return nil } out := &activity.TokenCount{} err = proto.Unmarshal(data.Value, out) if err != nil { return err } // An empty map is unmarshaled as nil if out.CountByNamespaceID == nil { out.CountByNamespaceID = make(map[string]uint64) } // We must load the tokenCount of the current segment into the activity log // so that TWEs counted before the introduction of a client ID for TWEs are // still reported in the partial client counts. a.currentSegment.tokenCount = out return nil } // entityBackgroundLoader loads entity activity log records for start_date `t` func (a *ActivityLog) entityBackgroundLoader(ctx context.Context, wg *sync.WaitGroup, t time.Time, seqNums <-chan uint64) { defer wg.Done() for seqNum := range seqNums { select { case <-a.doneCh: a.logger.Info("background processing told to halt while loading entities", "time", t, "sequence", seqNum) return default: } err := a.loadPriorEntitySegment(ctx, t, seqNum) if err != nil { a.logger.Error("error loading entity activity log", "time", t, "sequence", seqNum, "err", err) } } } // Initialize a new current segment, based on the current time. // Call with fragmentLock and l held. func (a *ActivityLog) startNewCurrentLogLocked(now time.Time) { a.logger.Trace("initializing new log") a.resetCurrentLog() a.currentSegment.startTimestamp = now.Unix() } // Should be called with fragmentLock and l held. func (a *ActivityLog) newMonthCurrentLogLocked(currentTime time.Time) { a.logger.Trace("continuing log to new month") a.resetCurrentLog() monthStart := timeutil.StartOfMonth(currentTime.UTC()) a.currentSegment.startTimestamp = monthStart.Unix() } // Initialize a new current segment, based on the given time // should be called with fragmentLock and l held. func (a *ActivityLog) newSegmentAtGivenTime(t time.Time) { timestamp := t.Unix() a.logger.Trace("starting a segment", "timestamp", timestamp) a.resetCurrentLog() a.currentSegment.startTimestamp = timestamp } // Reset all the current segment state. // Should be called with fragmentLock and l held. func (a *ActivityLog) resetCurrentLog() { a.currentSegment.startTimestamp = 0 a.currentSegment.currentClients = &activity.EntityActivityLog{ Clients: make([]*activity.EntityRecord, 0), } // We must still initialize the tokenCount to recieve tokenCounts from fragments // during the month where customers upgrade to 1.9 a.currentSegment.tokenCount = &activity.TokenCount{ CountByNamespaceID: make(map[string]uint64), } a.currentSegment.clientSequenceNumber = 0 a.fragment = nil a.clientTracker.activeClients = make(map[string]struct{}) a.clientTracker.entityCountByNamespaceID = make(map[string]uint64) a.clientTracker.nonEntityCountByNamespaceID = make(map[string]uint64) a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) } func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) { entityPath := fmt.Sprintf("%v%v/", activityEntityBasePath, startTimestamp) tokenPath := fmt.Sprintf("%v%v/", activityTokenBasePath, startTimestamp) entitySegments, err := a.view.List(ctx, entityPath) if err != nil { a.logger.Error("could not list entity paths", "error", err) return } for _, p := range entitySegments { err = a.view.Delete(ctx, entityPath+p) if err != nil { a.logger.Error("could not delete entity log", "error", err) } } tokenSegments, err := a.view.List(ctx, tokenPath) if err != nil { a.logger.Error("could not list token paths", "error", err) return } for _, p := range tokenSegments { err = a.view.Delete(ctx, tokenPath+p) if err != nil { a.logger.Error("could not delete token log", "error", err) } } // Allow whoever started this as a goroutine to wait for it to finish. close(whenDone) } func (a *ActivityLog) WaitForDeletion() { a.l.Lock() // May be nil, if never set doneCh := a.deleteDone a.l.Unlock() if doneCh != nil { select { case <-doneCh: break } } } // refreshFromStoredLog loads the appropriate entities/tokencounts for active and performance standbys // the most recent segment is loaded synchronously, and older segments are loaded in the background // this function expects stateLock to be held func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGroup, now time.Time) error { a.l.Lock() defer a.l.Unlock() a.fragmentLock.Lock() defer a.fragmentLock.Unlock() decreasingLogTimes, err := a.getMostRecentActivityLogSegment(ctx) if err != nil { return err } if len(decreasingLogTimes) == 0 { if a.enabled { if a.core.perfStandby { // reset the log without updating the timestamp a.resetCurrentLog() } else { a.startNewCurrentLogLocked(now) } } return nil } mostRecent := decreasingLogTimes[0] if !a.enabled { a.logger.Debug("activity log not enabled, skipping refresh from storage") if !a.core.perfStandby && timeutil.IsCurrentMonth(mostRecent, now) { a.logger.Debug("activity log is disabled, cleaning up logs for the current month") go a.deleteLogWorker(ctx, mostRecent.Unix(), make(chan struct{})) } return nil } if timeutil.IsPreviousMonth(mostRecent, now) { // no activity logs to load for this month. if we are enabled, interpret // it as having missed the rotation, so let it fall through and load // if we missed generating the precomputed query, activeFragmentWorker() // will clean things up when it runs next a.logger.Trace("no log segments for current month", "mostRecent", mostRecent) a.logger.Info("rotating activity log to new month") } else if mostRecent.After(now) { // we can't do anything if the most recent log is in the future a.logger.Warn("timestamp from log to load is in the future", "timestamp", mostRecent) return nil } else if !timeutil.IsCurrentMonth(mostRecent, now) { // the most recent log in storage is 2+ months in the past a.logger.Warn("most recent log in storage is 2 or more months in the past.", "timestamp", mostRecent) if a.core.perfStandby { // reset the log without updating the timestamp a.resetCurrentLog() } else { a.startNewCurrentLogLocked(now) } return nil } // load token counts from storage into memory. As of 1.9, this functionality // is still required since without it, we would lose replicated TWE counts for the // current segment. if !a.core.perfStandby { err = a.loadTokenCount(ctx, mostRecent) if err != nil { return err } } // load entity logs from storage into memory lastSegment, segmentsExist, err := a.getLastEntitySegmentNumber(ctx, mostRecent) if err != nil { return err } if !segmentsExist { a.logger.Trace("no entity segments for current month") return nil } err = a.loadCurrentClientSegment(ctx, mostRecent, lastSegment) if err != nil || lastSegment == 0 { return err } lastSegment-- seqNums := make(chan uint64, lastSegment+1) wg.Add(1) go a.entityBackgroundLoader(ctx, wg, mostRecent, seqNums) for n := int(lastSegment); n >= 0; n-- { seqNums <- uint64(n) } close(seqNums) return nil } // This version is used during construction func (a *ActivityLog) SetConfigInit(config activityConfig) { switch config.Enabled { case "enable": a.enabled = true case "default": a.enabled = activityLogEnabledDefault case "disable": a.enabled = false } if a.configOverrides.ForceEnable { a.enabled = true } a.defaultReportMonths = config.DefaultReportMonths a.retentionMonths = config.RetentionMonths } // This version reacts to user changes func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) { a.l.Lock() defer a.l.Unlock() // enabled is protected by fragmentLock a.fragmentLock.Lock() originalEnabled := a.enabled switch config.Enabled { case "enable": a.enabled = true case "default": a.enabled = activityLogEnabledDefault case "disable": a.enabled = false } if a.enabled != originalEnabled { a.logger.Info("activity log enable changed", "original", originalEnabled, "current", a.enabled) } if !a.enabled && a.currentSegment.startTimestamp != 0 { a.logger.Trace("deleting current segment") a.deleteDone = make(chan struct{}) // this is called from a request under stateLock, so use activeContext go a.deleteLogWorker(a.core.activeContext, a.currentSegment.startTimestamp, a.deleteDone) a.resetCurrentLog() } forceSave := false if a.enabled && a.currentSegment.startTimestamp == 0 { a.startNewCurrentLogLocked(time.Now().UTC()) // Force a save so we can distinguish between // // Month N-1: present // Month N: // // and // // Month N-1: present // Month N: forceSave = true } a.fragmentLock.Unlock() if forceSave { // l is still held here a.saveCurrentSegmentInternal(ctx, true) } a.defaultReportMonths = config.DefaultReportMonths a.retentionMonths = config.RetentionMonths // check for segments out of retention period, if it has changed go a.retentionWorker(ctx, time.Now(), a.retentionMonths) } // update the enable flag and reset the current log func (a *ActivityLog) SetConfigStandby(ctx context.Context, config activityConfig) { a.l.Lock() defer a.l.Unlock() // enable is protected by fragmentLock a.fragmentLock.Lock() originalEnabled := a.enabled switch config.Enabled { case "enable": a.enabled = true case "default": a.enabled = activityLogEnabledDefault case "disable": a.enabled = false } if a.enabled != originalEnabled { a.logger.Info("activity log enable changed", "original", originalEnabled, "current", a.enabled) a.resetCurrentLog() } a.fragmentLock.Unlock() } func (a *ActivityLog) queriesAvailable(ctx context.Context) (bool, error) { if a.queryStore == nil { return false, nil } return a.queryStore.QueriesAvailable(ctx) } // setupActivityLog hooks up the singleton ActivityLog into Core. func (c *Core) setupActivityLog(ctx context.Context, wg *sync.WaitGroup) error { logger := c.baseLogger.Named("activity") c.AddLogger(logger) if os.Getenv("VAULT_DISABLE_ACTIVITY_LOG") != "" { logger.Info("activity log disabled via environment variable") return nil } view := c.systemBarrierView.SubView(activitySubPath) manager, err := NewActivityLog(c, logger, view, c.metricSink) if err != nil { return err } c.activityLog = manager // load activity log for "this month" into memory err = manager.refreshFromStoredLog(manager.core.activeContext, wg, time.Now().UTC()) if err != nil { return err } // Start the background worker, depending on type // Lock already held here, can't use .PerfStandby() // The workers need to know the current segment time. if c.perfStandby { go manager.perfStandbyFragmentWorker(ctx) } else { go manager.activeFragmentWorker(ctx) // Check for any intent log, in the background go manager.precomputedQueryWorker(ctx) // Catch up on garbage collection // Signal when this is done so that unit tests can proceed. manager.retentionDone = make(chan struct{}) go func() { manager.retentionWorker(ctx, time.Now(), manager.retentionMonths) close(manager.retentionDone) }() } return nil } // stopActivityLog removes the ActivityLog from Core // and frees any resources. func (c *Core) stopActivityLog() { // preSeal may run before startActivityLog got a chance to complete. if c.activityLog != nil { // Shut down background worker close(c.activityLog.doneCh) } c.activityLog = nil } func (a *ActivityLog) StartOfNextMonth() time.Time { a.l.RLock() defer a.l.RUnlock() var segmentStart time.Time if a.currentSegment.startTimestamp == 0 { segmentStart = time.Now().UTC() } else { segmentStart = time.Unix(a.currentSegment.startTimestamp, 0).UTC() } // Basing this on the segment start will mean we trigger EOM rollover when // necessary because we were down. return timeutil.StartOfNextMonth(segmentStart) } // perfStandbyFragmentWorker handles scheduling fragments // to send via RPC; it runs on perf standby nodes only. func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) { timer := time.NewTimer(time.Duration(0)) fragmentWaiting := false // Eat first event, so timer is stopped <-timer.C endOfMonth := time.NewTimer(a.StartOfNextMonth().Sub(time.Now())) if a.configOverrides.DisableTimers { endOfMonth.Stop() } sendFunc := func() { ctx, cancel := context.WithTimeout(ctx, activityFragmentSendTimeout) defer cancel() err := a.sendCurrentFragment(ctx) if err != nil { a.logger.Warn("activity log fragment lost", "error", err) } } for { select { case <-a.doneCh: // Shutting down activity log. if fragmentWaiting && !timer.Stop() { <-timer.C } if !endOfMonth.Stop() { <-endOfMonth.C } return case <-a.newFragmentCh: // New fragment created, start the timer if not // already running if !fragmentWaiting { fragmentWaiting = true if !a.configOverrides.DisableTimers { a.logger.Trace("reset fragment timer") timer.Reset(activityFragmentStandbyTime) } } case <-timer.C: a.logger.Trace("sending fragment on timer expiration") fragmentWaiting = false sendFunc() case <-a.sendCh: a.logger.Trace("sending fragment on request") // It might be that we get sendCh before fragmentCh // if a fragment is created and then immediately fills // up to its limit. So we attempt to send even if the timer's // not running. if fragmentWaiting { fragmentWaiting = false if !timer.Stop() { <-timer.C } } sendFunc() case <-endOfMonth.C: a.logger.Trace("sending fragment on end of month") // Flush the current fragment, if any if fragmentWaiting { fragmentWaiting = false if !timer.Stop() { <-timer.C } } sendFunc() // clear active entity set a.fragmentLock.Lock() a.clientTracker.activeClients = make(map[string]struct{}) a.clientTracker.entityCountByNamespaceID = make(map[string]uint64) a.clientTracker.nonEntityCountByNamespaceID = make(map[string]uint64) a.fragmentLock.Unlock() // Set timer for next month. // The current segment *probably* hasn't been set yet (via invalidation), // so don't rely on it. target := timeutil.StartOfNextMonth(time.Now().UTC()) endOfMonth.Reset(target.Sub(time.Now())) } } } // activeFragmentWorker handles scheduling the write of the next // segment. It runs on active nodes only. func (a *ActivityLog) activeFragmentWorker(ctx context.Context) { ticker := time.NewTicker(activitySegmentInterval) endOfMonth := time.NewTimer(a.StartOfNextMonth().Sub(time.Now())) if a.configOverrides.DisableTimers { endOfMonth.Stop() } writeFunc := func() { ctx, cancel := context.WithTimeout(ctx, activitySegmentWriteTimeout) defer cancel() err := a.saveCurrentSegmentToStorage(ctx, false) if err != nil { a.logger.Warn("activity log segment not saved, current fragment lost", "error", err) } } // we modify the doneCh in some tests, so let's make sure we don't trip // the race detector a.l.RLock() doneCh := a.doneCh a.l.RUnlock() for { select { case <-doneCh: // Shutting down activity log. ticker.Stop() return case <-a.newFragmentCh: // Just eat the message; the ticker is // already running so we don't need to start it. // (But we might change the behavior in the future.) a.logger.Trace("new local fragment created") continue case <-ticker.C: // It's harder to disable a Ticker so we'll just ignore it. if a.configOverrides.DisableTimers { continue } a.logger.Trace("writing segment on timer expiration") writeFunc() case <-a.writeCh: a.logger.Trace("writing segment on request") writeFunc() // Reset the schedule to wait 10 minutes from this forced write. ticker.Stop() ticker = time.NewTicker(activitySegmentInterval) // Simpler, but ticker.Reset was introduced in go 1.15: // ticker.Reset(activitySegmentInterval) case currentTime := <-endOfMonth.C: err := a.HandleEndOfMonth(ctx, currentTime.UTC()) if err != nil { a.logger.Error("failed to perform end of month rotation", "error", err) } // Garbage collect any segments or queries based on the immediate // value of retentionMonths. a.l.RLock() go a.retentionWorker(ctx, currentTime.UTC(), a.retentionMonths) a.l.RUnlock() delta := a.StartOfNextMonth().Sub(time.Now()) if delta < 20*time.Minute { delta = 20 * time.Minute } a.logger.Trace("scheduling next month", "delta", delta) endOfMonth.Reset(delta) } } } type ActivityIntentLog struct { PreviousMonth int64 `json:"previous_month"` NextMonth int64 `json:"next_month"` } // Handle rotation to end-of-month // currentTime is an argument for unit-testing purposes func (a *ActivityLog) HandleEndOfMonth(ctx context.Context, currentTime time.Time) error { // Hold lock to prevent segment or enable changing, // disable will apply to *next* month. a.l.Lock() defer a.l.Unlock() a.fragmentLock.RLock() // Don't bother if disabled // since l is locked earlier (and SetConfig() is the only way enabled can change) // we don't need to worry about enabled changing during this work enabled := a.enabled a.fragmentLock.RUnlock() if !enabled { return nil } a.logger.Trace("starting end of month processing", "rolloverTime", currentTime) prevSegmentTimestamp := a.currentSegment.startTimestamp nextSegmentTimestamp := timeutil.StartOfMonth(currentTime.UTC()).Unix() // Write out an intent log for the rotation with the current and new segment times. intentLog := &ActivityIntentLog{ PreviousMonth: prevSegmentTimestamp, NextMonth: nextSegmentTimestamp, } entry, err := logical.StorageEntryJSON(activityIntentLogKey, intentLog) if err != nil { return err } err = a.view.Put(ctx, entry) if err != nil { return err } // Save the current segment; this does not guarantee that fragment will be // empty when it returns, but dropping some measurements is acceptable. // We use force=true here in case an entry didn't appear this month err = a.saveCurrentSegmentToStorageLocked(ctx, true) // Don't return this error, just log it, we are done with that segment anyway. if err != nil { a.logger.Warn("last save of segment failed", "error", err) } // Advance the log; no need to force a save here because we have // the intent log written already. // // On recovery refreshFromStoredLog() will see we're no longer // in the previous month, and recover by calling newMonthCurrentLog // again and triggering the precomputed query. a.fragmentLock.Lock() a.newMonthCurrentLogLocked(currentTime) a.fragmentLock.Unlock() // Work on precomputed queries in background go a.precomputedQueryWorker(ctx) return nil } // ResetActivityLog is used to extract the current fragment(s) during // integration testing, so that it can be checked in a race-free way. func (c *Core) ResetActivityLog() []*activity.LogFragment { c.stateLock.RLock() a := c.activityLog c.stateLock.RUnlock() if a == nil { return nil } allFragments := make([]*activity.LogFragment, 1) a.fragmentLock.Lock() allFragments[0] = a.fragment a.fragment = nil allFragments = append(allFragments, a.standbyFragmentsReceived...) a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) a.fragmentLock.Unlock() return allFragments } func (a *ActivityLog) AddEntityToFragment(entityID string, namespaceID string, timestamp int64) { a.AddClientToFragment(entityID, namespaceID, timestamp, false) } // AddClientToFragment checks a client ID for uniqueness and // if not already present, adds it to the current fragment. // The timestamp is a Unix timestamp *without* nanoseconds, as that // is what token.CreationTime uses. func (a *ActivityLog) AddClientToFragment(clientID string, namespaceID string, timestamp int64, isTWE bool) { // Check whether entity ID already recorded var present bool a.fragmentLock.RLock() if a.enabled { _, present = a.clientTracker.activeClients[clientID] } else { present = true } a.fragmentLock.RUnlock() if present { return } // Update current fragment with new active entity a.fragmentLock.Lock() defer a.fragmentLock.Unlock() // Re-check entity ID after re-acquiring lock _, present = a.clientTracker.activeClients[clientID] if present { return } a.createCurrentFragment() clientRecord := &activity.EntityRecord{ ClientID: clientID, NamespaceID: namespaceID, Timestamp: timestamp, } // Track whether the clientID corresponds to a token without an entity or not. // This field is backward compatible, as the default is 0, so records created // from pre-1.9 activityLog code will automatically be marked as having an entity. if isTWE { clientRecord.NonEntity = true } a.fragment.Clients = append(a.fragment.Clients, clientRecord) a.clientTracker.addClient(clientRecord) } // Create the current fragment if it doesn't already exist. // Must be called with the lock held. func (a *ActivityLog) createCurrentFragment() { if a.fragment == nil { a.fragment = &activity.LogFragment{ OriginatingNode: a.nodeID, Clients: make([]*activity.EntityRecord, 0, 120), NonEntityTokens: make(map[string]uint64), } a.fragmentCreation = time.Now().UTC() // Signal that a new segment is available, start // the timer to send it. a.newFragmentCh <- struct{}{} } } func (a *ActivityLog) receivedFragment(fragment *activity.LogFragment) { a.logger.Trace("received fragment from standby", "node", fragment.OriginatingNode) a.fragmentLock.Lock() defer a.fragmentLock.Unlock() if !a.enabled { return } for _, e := range fragment.Clients { a.clientTracker.addClient(e) } a.standbyFragmentsReceived = append(a.standbyFragmentsReceived, fragment) // TODO: check if current segment is full and should be written } type ClientCountResponse struct { DistinctEntities int `json:"distinct_entities"` NonEntityTokens int `json:"non_entity_tokens"` Clients int `json:"clients"` } type ClientCountInNamespace struct { NamespaceID string `json:"namespace_id"` NamespacePath string `json:"namespace_path"` Counts ClientCountResponse `json:"counts"` } // ActivityLogInjectResponse injects a precomputed query into storage for testing. func (c *Core) ActivityLogInjectResponse(ctx context.Context, pq *activity.PrecomputedQuery) error { c.stateLock.RLock() a := c.activityLog c.stateLock.RUnlock() if a == nil { return errors.New("nil activity log") } return a.queryStore.Put(ctx, pq) } func (a *ActivityLog) includeInResponse(query *namespace.Namespace, record *namespace.Namespace) bool { if record == nil { // Deleted namespace, only include in root queries return query.ID == namespace.RootNamespaceID } return record.HasParent(query) } func (a *ActivityLog) DefaultStartTime(endTime time.Time) time.Time { // If end time is September 30, then start time should be // October 1st to get 12 months of data. a.l.RLock() defer a.l.RUnlock() monthStart := timeutil.StartOfMonth(endTime) return monthStart.AddDate(0, -a.defaultReportMonths+1, 0) } func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.Time) (map[string]interface{}, error) { queryNS, err := namespace.FromContext(ctx) if err != nil { return nil, err } pq, err := a.queryStore.Get(ctx, startTime, endTime) if err != nil { return nil, err } if pq == nil { return nil, nil } responseData := make(map[string]interface{}) responseData["start_time"] = pq.StartTime.Format(time.RFC3339) responseData["end_time"] = pq.EndTime.Format(time.RFC3339) byNamespace := make([]*ClientCountInNamespace, 0) totalEntities := 0 totalTokens := 0 for _, nsRecord := range pq.Namespaces { ns, err := NamespaceByID(ctx, nsRecord.NamespaceID, a.core) if err != nil { return nil, err } if a.includeInResponse(queryNS, ns) { var displayPath string if ns == nil { displayPath = fmt.Sprintf("deleted namespace %q", nsRecord.NamespaceID) } else { displayPath = ns.Path } byNamespace = append(byNamespace, &ClientCountInNamespace{ NamespaceID: nsRecord.NamespaceID, NamespacePath: displayPath, Counts: ClientCountResponse{ DistinctEntities: int(nsRecord.Entities), NonEntityTokens: int(nsRecord.NonEntityTokens), Clients: int(nsRecord.Entities + nsRecord.NonEntityTokens), }, }) totalEntities += int(nsRecord.Entities) totalTokens += int(nsRecord.NonEntityTokens) } } sort.Slice(byNamespace, func(i, j int) bool { return byNamespace[i].Counts.Clients > byNamespace[j].Counts.Clients }) responseData["by_namespace"] = byNamespace responseData["total"] = &ClientCountResponse{ DistinctEntities: totalEntities, NonEntityTokens: totalTokens, Clients: totalEntities + totalTokens, } return responseData, nil } type activityConfig struct { // DefaultReportMonths are the default number of months that are returned on // a report. The zero value uses the system default of 12. DefaultReportMonths int `json:"default_report_months"` // RetentionMonths defines the number of months we want to retain data. The // zero value uses the system default of 24 months. RetentionMonths int `json:"retention_months"` // Enabled is one of enable, disable, default. Enabled string `json:"enabled"` } func defaultActivityConfig() activityConfig { return activityConfig{ DefaultReportMonths: 12, RetentionMonths: 24, Enabled: "default", } } func (a *ActivityLog) loadConfigOrDefault(ctx context.Context) (activityConfig, error) { // Load from storage var config activityConfig configRaw, err := a.view.Get(ctx, activityConfigKey) if err != nil { return config, err } if configRaw == nil { return defaultActivityConfig(), nil } if err := configRaw.DecodeJSON(&config); err != nil { return config, err } return config, nil } // HandleTokenUsage adds the TokenEntry to the current fragment of the activity log // This currently occurs on token usage only. func (a *ActivityLog) HandleTokenUsage(entry *logical.TokenEntry, clientID string, isTWE bool) { // First, check if a is enabled, so as to avoid the cost of creating an ID for // tokens without entities in the case where it not. a.fragmentLock.RLock() if !a.enabled { a.fragmentLock.RUnlock() return } a.fragmentLock.RUnlock() // Do not count wrapping tokens in client count if IsWrappingToken(entry) { return } // Do not count root tokens in client count. if entry.IsRoot() { return } // Parse an entry's client ID and add it to the activity log a.AddClientToFragment(clientID, entry.NamespaceID, entry.CreationTime, isTWE) } func (a *ActivityLog) namespaceToLabel(ctx context.Context, nsID string) string { ns, err := NamespaceByID(ctx, nsID, a.core) if err != nil || ns == nil { return fmt.Sprintf("deleted-%v", nsID) } if ns.Path == "" { return "root" } return ns.Path } // goroutine to process the request in the intent log, creating precomputed queries. // We expect the return value won't be checked, so log errors as they occur // (but for unit testing having the error return should help.) func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() // Cancel the context if activity log is shut down. // This will cause the next storage operation to fail. a.l.RLock() // doneCh is modified in some tests, so we don't want to access that member // without a lock, but we don't want to hold the lock for the entire lifetime // of this goroutine. Passing the channel to the goroutine works here because // no tests depend on us accessing the new doneCh after modifying the field. go func(done chan struct{}) { select { case <-done: cancel() case <-ctx.Done(): break } }(a.doneCh) a.l.RUnlock() // Load the intent log rawIntentLog, err := a.view.Get(ctx, activityIntentLogKey) if err != nil { a.logger.Warn("could not load intent log", "error", err) return err } if rawIntentLog == nil { a.logger.Trace("no intent log found") return err } var intent ActivityIntentLog err = json.Unmarshal(rawIntentLog.Value, &intent) if err != nil { a.logger.Warn("could not parse intent log", "error", err) return err } // currentMonth could change (from another month end) after we release the lock. // But, it's not critical to correct operation; this is a check for intent logs that are // too old, and startTimestamp should only go forward (unless it is zero.) // If there's an intent log, finish it even if the feature is currently disabled. a.l.RLock() currentMonth := a.currentSegment.startTimestamp // Base retention period on the month we are generating (even in the past)--- time.Now() // would work but this will be easier to control in tests. retentionWindow := timeutil.MonthsPreviousTo(a.retentionMonths, time.Unix(intent.NextMonth, 0).UTC()) a.l.RUnlock() if currentMonth != 0 && intent.NextMonth != currentMonth { a.logger.Warn("intent log does not match current segment", "intent", intent.NextMonth, "current", currentMonth) return errors.New("intent log is too far in the past") } lastMonth := intent.PreviousMonth a.logger.Info("computing queries", "month", time.Unix(lastMonth, 0).UTC()) times, err := a.getMostRecentActivityLogSegment(ctx) if err != nil { a.logger.Warn("could not list recent segments", "error", err) return err } if len(times) == 0 { a.logger.Warn("no months in storage") a.view.Delete(ctx, activityIntentLogKey) return errors.New("previous month not found") } if times[0].Unix() != lastMonth { a.logger.Warn("last month not in storage", "latest", times[0].Unix()) a.view.Delete(ctx, activityIntentLogKey) return errors.New("previous month not found") } // "times" is already in reverse order, start building the per-namespace maps // from the last month backward type NamespaceCounts struct { // entityID -> present Entities map[string]struct{} // count. This exists for backward compatibility Tokens uint64 // clientID -> present NonEntities map[string]struct{} } byNamespace := make(map[string]*NamespaceCounts) createNs := func(namespaceID string) { if _, namespacePresent := byNamespace[namespaceID]; !namespacePresent { byNamespace[namespaceID] = &NamespaceCounts{ Entities: make(map[string]struct{}), Tokens: 0, NonEntities: make(map[string]struct{}), } } } walkEntities := func(l *activity.EntityActivityLog) { for _, e := range l.Clients { createNs(e.NamespaceID) if e.NonEntity == true { byNamespace[e.NamespaceID].NonEntities[e.ClientID] = struct{}{} } else { byNamespace[e.NamespaceID].Entities[e.ClientID] = struct{}{} } } } walkTokens := func(l *activity.TokenCount) { for nsID, v := range l.CountByNamespaceID { createNs(nsID) byNamespace[nsID].Tokens += v } } endTime := timeutil.EndOfMonth(time.Unix(lastMonth, 0).UTC()) activePeriodStart := timeutil.MonthsPreviousTo(a.defaultReportMonths, endTime) // If not enough data, report as much as we have in the window if activePeriodStart.Before(times[len(times)-1]) { activePeriodStart = times[len(times)-1] } for _, startTime := range times { // Do not work back further than the current retention window, // which will just get deleted anyway. if startTime.Before(retentionWindow) { break } err = a.WalkEntitySegments(ctx, startTime, walkEntities) if err != nil { a.logger.Warn("failed to load previous segments", "error", err) return err } err = a.WalkTokenSegments(ctx, startTime, walkTokens) if err != nil { a.logger.Warn("failed to load previous token counts", "error", err) return err } // Save the work to date in a record pq := &activity.PrecomputedQuery{ StartTime: startTime, EndTime: endTime, Namespaces: make([]*activity.NamespaceRecord, 0, len(byNamespace)), } for nsID, counts := range byNamespace { pq.Namespaces = append(pq.Namespaces, &activity.NamespaceRecord{ NamespaceID: nsID, Entities: uint64(len(counts.Entities)), NonEntityTokens: counts.Tokens + uint64(len(counts.NonEntities)), }) // If this is the most recent month, or the start of the reporting period, output // a metric for each namespace. if startTime == times[0] { a.metrics.SetGaugeWithLabels( []string{"identity", "entity", "active", "monthly"}, float32(len(counts.Entities)), []metricsutil.Label{ {Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)}, }, ) a.metrics.SetGaugeWithLabels( []string{"identity", "nonentity", "active", "monthly"}, float32(len(counts.NonEntities))+float32(counts.Tokens), []metricsutil.Label{ {Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)}, }, ) } else if startTime == activePeriodStart { a.metrics.SetGaugeWithLabels( []string{"identity", "entity", "active", "reporting_period"}, float32(len(counts.Entities)), []metricsutil.Label{ {Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)}, }, ) a.metrics.SetGaugeWithLabels( []string{"identity", "nonentity", "active", "reporting_period"}, float32(len(counts.NonEntities))+float32(counts.Tokens), []metricsutil.Label{ {Name: "namespace", Value: a.namespaceToLabel(ctx, nsID)}, }, ) } } err = a.queryStore.Put(ctx, pq) if err != nil { a.logger.Warn("failed to store precomputed query", "error", err) } } // delete the intent log a.view.Delete(ctx, activityIntentLogKey) a.logger.Info("finished computing queries", "month", endTime) return nil } // goroutine to delete any segments or precomputed queries older than // the retention period. // We expect the return value won't be checked, so log errors as they occur // (but for unit testing having the error return should help.) func (a *ActivityLog) retentionWorker(ctx context.Context, currentTime time.Time, retentionMonths int) error { ctx, cancel := context.WithCancel(ctx) defer cancel() // Cancel the context if activity log is shut down. // This will cause the next storage operation to fail. a.l.RLock() doneCh := a.doneCh a.l.RUnlock() go func() { select { case <-doneCh: cancel() case <-ctx.Done(): break } }() // everything >= the threshold is OK retentionThreshold := timeutil.MonthsPreviousTo(retentionMonths, currentTime) available, err := a.availableLogs(ctx) if err != nil { a.logger.Warn("could not list segments", "error", err) return err } for _, t := range available { // One at a time seems OK if t.Before(retentionThreshold) { a.logger.Trace("deleting segments", "startTime", t) a.deleteLogWorker(ctx, t.Unix(), make(chan struct{})) } } if a.queryStore != nil { err = a.queryStore.DeleteQueriesBefore(ctx, retentionThreshold) if err != nil { a.logger.Warn("deletion of queries failed", "error", err) } return err } return nil } // Periodic report of number of active entities, with the current month. // We don't break this down by namespace because that would require going to storage (that information // is not currently stored in memory.) func (a *ActivityLog) PartialMonthMetrics(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) { a.fragmentLock.RLock() defer a.fragmentLock.RUnlock() if !a.enabled { // Empty list return []metricsutil.GaugeLabelValues{}, nil } count := len(a.clientTracker.activeClients) return []metricsutil.GaugeLabelValues{ { Labels: []metricsutil.Label{}, Value: float32(count), }, }, nil } func (c *Core) activeEntityGaugeCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) { c.stateLock.RLock() a := c.activityLog c.stateLock.RUnlock() if a == nil { return []metricsutil.GaugeLabelValues{}, nil } return a.PartialMonthMetrics(ctx) } // partialMonthClientCount returns the number of clients used so far this month. // If activity log is not enabled, the response will be nil func (a *ActivityLog) partialMonthClientCount(ctx context.Context) (map[string]interface{}, error) { a.fragmentLock.RLock() defer a.fragmentLock.RUnlock() if !a.enabled { // nothing to count return nil, nil } byNamespace := make([]*ClientCountInNamespace, 0) responseData := make(map[string]interface{}) totalEntities := 0 totalTokens := 0 nonEntityTokensMapInterface, err := copystructure.Copy(a.clientTracker.nonEntityCountByNamespaceID) if err != nil { return nil, fmt.Errorf("error making deep copy of nonEntityCounts: %+w", err) } nonEntityTokensMap := nonEntityTokensMapInterface.(map[string]uint64) // Merge the tokenCounts created pre-clientID with the newly counted // clientID tokens, if tokenCounts exist. for nsID, count := range a.currentSegment.tokenCount.CountByNamespaceID { nonEntityTokensMap[nsID] += count } clientCountTable := createClientCountTable(a.clientTracker.entityCountByNamespaceID, nonEntityTokensMap) queryNS, err := namespace.FromContext(ctx) if err != nil { return nil, err } for nsID, clients := range clientCountTable { ns, err := NamespaceByID(ctx, nsID, a.core) if err != nil { return nil, err } // Only include namespaces that are the queryNS or within it. If queryNS is the // root namespace, include all namespaces, even those which have been deleted. if a.includeInResponse(queryNS, ns) { var displayPath string if ns == nil { displayPath = fmt.Sprintf("deleted namespace %q", nsID) } else { displayPath = ns.Path } byNamespace = append(byNamespace, &ClientCountInNamespace{ NamespaceID: nsID, NamespacePath: displayPath, Counts: ClientCountResponse{ DistinctEntities: int(clients.distinctEntities), NonEntityTokens: int(clients.distinctNonEntities), Clients: int(clients.distinctEntities + clients.distinctNonEntities), }, }) totalEntities += int(clients.distinctEntities) totalTokens += int(clients.distinctNonEntities) } } sort.Slice(byNamespace, func(i, j int) bool { return byNamespace[i].NamespaceID < byNamespace[j].NamespaceID }) responseData["by_namespace"] = byNamespace responseData["distinct_entities"] = totalEntities responseData["non_entity_tokens"] = totalTokens responseData["clients"] = totalEntities + totalTokens return responseData, nil } // createClientCountTable maps the entitycount and token count to the namespace id. func createClientCountTable(entityMap map[string]uint64, nonEntityMap map[string]uint64) map[string]*clients { clientCountTable := make(map[string]*clients) for nsID, count := range entityMap { if _, ok := clientCountTable[nsID]; !ok { clientCountTable[nsID] = &clients{distinctEntities: 0, distinctNonEntities: 0} } clientCountTable[nsID].distinctEntities += count } for nsID, count := range nonEntityMap { if _, ok := clientCountTable[nsID]; !ok { clientCountTable[nsID] = &clients{distinctEntities: 0, distinctNonEntities: 0} } clientCountTable[nsID].distinctNonEntities += count } return clientCountTable } func (ct *ClientTracker) addClient(e *activity.EntityRecord) { if _, ok := ct.activeClients[e.ClientID]; !ok { ct.activeClients[e.ClientID] = struct{}{} if e.NonEntity == true { ct.nonEntityCountByNamespaceID[e.NamespaceID] += 1 } else { ct.entityCountByNamespaceID[e.NamespaceID] += 1 } } }