Vault 936: use core.activeContext in ActivityLog (#13083)
* update activity log to use core's activeContext for cleaner worker termination * update tests to use core activeContext instead of generic context * pass context around instead * revert context change * undo test context changes * change worker context * accidentally undid context for fcn signature changes
This commit is contained in:
parent
3bfa4fa267
commit
e137045050
|
@ -782,12 +782,10 @@ func (a *ActivityLog) resetCurrentLog() {
|
|||
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
|
||||
}
|
||||
|
||||
func (a *ActivityLog) deleteLogWorker(startTimestamp int64, whenDone chan struct{}) {
|
||||
ctx := namespace.RootContext(nil)
|
||||
func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) {
|
||||
entityPath := fmt.Sprintf("%v%v/", activityEntityBasePath, startTimestamp)
|
||||
tokenPath := fmt.Sprintf("%v%v/", activityTokenBasePath, startTimestamp)
|
||||
|
||||
// TODO: handle seal gracefully, if we're still working?
|
||||
entitySegments, err := a.view.List(ctx, entityPath)
|
||||
if err != nil {
|
||||
a.logger.Error("could not list entity paths", "error", err)
|
||||
|
@ -862,7 +860,7 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro
|
|||
a.logger.Debug("activity log not enabled, skipping refresh from storage")
|
||||
if !a.core.perfStandby && timeutil.IsCurrentMonth(mostRecent, now) {
|
||||
a.logger.Debug("activity log is disabled, cleaning up logs for the current month")
|
||||
go a.deleteLogWorker(mostRecent.Unix(), make(chan struct{}))
|
||||
go a.deleteLogWorker(ctx, mostRecent.Unix(), make(chan struct{}))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -975,7 +973,8 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {
|
|||
if !a.enabled && a.currentSegment.startTimestamp != 0 {
|
||||
a.logger.Trace("deleting current segment")
|
||||
a.deleteDone = make(chan struct{})
|
||||
go a.deleteLogWorker(a.currentSegment.startTimestamp, a.deleteDone)
|
||||
// this is called from a request under stateLock, so use activeContext
|
||||
go a.deleteLogWorker(a.core.activeContext, a.currentSegment.startTimestamp, a.deleteDone)
|
||||
a.resetCurrentLog()
|
||||
}
|
||||
|
||||
|
@ -1004,7 +1003,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {
|
|||
a.retentionMonths = config.RetentionMonths
|
||||
|
||||
// check for segments out of retention period, if it has changed
|
||||
go a.retentionWorker(time.Now(), a.retentionMonths)
|
||||
go a.retentionWorker(ctx, time.Now(), a.retentionMonths)
|
||||
}
|
||||
|
||||
// update the enable flag and reset the current log
|
||||
|
@ -1066,18 +1065,18 @@ func (c *Core) setupActivityLog(ctx context.Context, wg *sync.WaitGroup) error {
|
|||
// Lock already held here, can't use .PerfStandby()
|
||||
// The workers need to know the current segment time.
|
||||
if c.perfStandby {
|
||||
go manager.perfStandbyFragmentWorker()
|
||||
go manager.perfStandbyFragmentWorker(ctx)
|
||||
} else {
|
||||
go manager.activeFragmentWorker()
|
||||
go manager.activeFragmentWorker(ctx)
|
||||
|
||||
// Check for any intent log, in the background
|
||||
go manager.precomputedQueryWorker()
|
||||
go manager.precomputedQueryWorker(ctx)
|
||||
|
||||
// Catch up on garbage collection
|
||||
// Signal when this is done so that unit tests can proceed.
|
||||
manager.retentionDone = make(chan struct{})
|
||||
go func() {
|
||||
manager.retentionWorker(time.Now(), manager.retentionMonths)
|
||||
manager.retentionWorker(ctx, time.Now(), manager.retentionMonths)
|
||||
close(manager.retentionDone)
|
||||
}()
|
||||
}
|
||||
|
@ -1113,7 +1112,7 @@ func (a *ActivityLog) StartOfNextMonth() time.Time {
|
|||
|
||||
// perfStandbyFragmentWorker handles scheduling fragments
|
||||
// to send via RPC; it runs on perf standby nodes only.
|
||||
func (a *ActivityLog) perfStandbyFragmentWorker() {
|
||||
func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) {
|
||||
timer := time.NewTimer(time.Duration(0))
|
||||
fragmentWaiting := false
|
||||
// Eat first event, so timer is stopped
|
||||
|
@ -1125,7 +1124,7 @@ func (a *ActivityLog) perfStandbyFragmentWorker() {
|
|||
}
|
||||
|
||||
sendFunc := func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), activityFragmentSendTimeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, activityFragmentSendTimeout)
|
||||
defer cancel()
|
||||
err := a.sendCurrentFragment(ctx)
|
||||
if err != nil {
|
||||
|
@ -1200,7 +1199,7 @@ func (a *ActivityLog) perfStandbyFragmentWorker() {
|
|||
|
||||
// activeFragmentWorker handles scheduling the write of the next
|
||||
// segment. It runs on active nodes only.
|
||||
func (a *ActivityLog) activeFragmentWorker() {
|
||||
func (a *ActivityLog) activeFragmentWorker(ctx context.Context) {
|
||||
ticker := time.NewTicker(activitySegmentInterval)
|
||||
|
||||
endOfMonth := time.NewTimer(a.StartOfNextMonth().Sub(time.Now()))
|
||||
|
@ -1209,7 +1208,7 @@ func (a *ActivityLog) activeFragmentWorker() {
|
|||
}
|
||||
|
||||
writeFunc := func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), activitySegmentWriteTimeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, activitySegmentWriteTimeout)
|
||||
defer cancel()
|
||||
err := a.saveCurrentSegmentToStorage(ctx, false)
|
||||
if err != nil {
|
||||
|
@ -1252,7 +1251,7 @@ func (a *ActivityLog) activeFragmentWorker() {
|
|||
// Simpler, but ticker.Reset was introduced in go 1.15:
|
||||
// ticker.Reset(activitySegmentInterval)
|
||||
case currentTime := <-endOfMonth.C:
|
||||
err := a.HandleEndOfMonth(currentTime.UTC())
|
||||
err := a.HandleEndOfMonth(ctx, currentTime.UTC())
|
||||
if err != nil {
|
||||
a.logger.Error("failed to perform end of month rotation", "error", err)
|
||||
}
|
||||
|
@ -1260,7 +1259,7 @@ func (a *ActivityLog) activeFragmentWorker() {
|
|||
// Garbage collect any segments or queries based on the immediate
|
||||
// value of retentionMonths.
|
||||
a.l.RLock()
|
||||
go a.retentionWorker(currentTime.UTC(), a.retentionMonths)
|
||||
go a.retentionWorker(ctx, currentTime.UTC(), a.retentionMonths)
|
||||
a.l.RUnlock()
|
||||
|
||||
delta := a.StartOfNextMonth().Sub(time.Now())
|
||||
|
@ -1280,9 +1279,7 @@ type ActivityIntentLog struct {
|
|||
|
||||
// Handle rotation to end-of-month
|
||||
// currentTime is an argument for unit-testing purposes
|
||||
func (a *ActivityLog) HandleEndOfMonth(currentTime time.Time) error {
|
||||
ctx := namespace.RootContext(nil)
|
||||
|
||||
func (a *ActivityLog) HandleEndOfMonth(ctx context.Context, currentTime time.Time) error {
|
||||
// Hold lock to prevent segment or enable changing,
|
||||
// disable will apply to *next* month.
|
||||
a.l.Lock()
|
||||
|
@ -1338,7 +1335,7 @@ func (a *ActivityLog) HandleEndOfMonth(currentTime time.Time) error {
|
|||
a.fragmentLock.Unlock()
|
||||
|
||||
// Work on precomputed queries in background
|
||||
go a.precomputedQueryWorker()
|
||||
go a.precomputedQueryWorker(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -1678,8 +1675,8 @@ func (a *ActivityLog) namespaceToLabel(ctx context.Context, nsID string) string
|
|||
// goroutine to process the request in the intent log, creating precomputed queries.
|
||||
// We expect the return value won't be checked, so log errors as they occur
|
||||
// (but for unit testing having the error return should help.)
|
||||
func (a *ActivityLog) precomputedQueryWorker() error {
|
||||
ctx, cancel := context.WithCancel(namespace.RootContext(nil))
|
||||
func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
// Cancel the context if activity log is shut down.
|
||||
|
@ -1883,8 +1880,8 @@ func (a *ActivityLog) precomputedQueryWorker() error {
|
|||
// the retention period.
|
||||
// We expect the return value won't be checked, so log errors as they occur
|
||||
// (but for unit testing having the error return should help.)
|
||||
func (a *ActivityLog) retentionWorker(currentTime time.Time, retentionMonths int) error {
|
||||
ctx, cancel := context.WithCancel(namespace.RootContext(nil))
|
||||
func (a *ActivityLog) retentionWorker(ctx context.Context, currentTime time.Time, retentionMonths int) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
// Cancel the context if activity log is shut down.
|
||||
|
@ -1913,7 +1910,7 @@ func (a *ActivityLog) retentionWorker(currentTime time.Time, retentionMonths int
|
|||
// One at a time seems OK
|
||||
if t.Before(retentionThreshold) {
|
||||
a.logger.Trace("deleting segments", "startTime", t)
|
||||
a.deleteLogWorker(t.Unix(), make(chan struct{}))
|
||||
a.deleteLogWorker(ctx, t.Unix(), make(chan struct{}))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1835,7 +1835,7 @@ func TestActivityLog_DeleteWorker(t *testing.T) {
|
|||
doneCh := make(chan struct{})
|
||||
timeout := time.After(20 * time.Second)
|
||||
|
||||
go a.deleteLogWorker(1111, doneCh)
|
||||
go a.deleteLogWorker(namespace.RootContext(nil), 1111, doneCh)
|
||||
select {
|
||||
case <-doneCh:
|
||||
break
|
||||
|
@ -1986,7 +1986,7 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
|
|||
month2 := timeutil.StartOfNextMonth(month1)
|
||||
|
||||
// Trigger end-of-month
|
||||
a.HandleEndOfMonth(month1)
|
||||
a.HandleEndOfMonth(ctx, month1)
|
||||
|
||||
// Check segment is present, with 1 entity
|
||||
path := fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, segment0)
|
||||
|
@ -2028,7 +2028,7 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
|
|||
|
||||
a.AddEntityToFragment(id2, "root", time.Now().Unix())
|
||||
|
||||
a.HandleEndOfMonth(month2)
|
||||
a.HandleEndOfMonth(ctx, month2)
|
||||
segment2 := a.GetStartTimestamp()
|
||||
|
||||
a.AddEntityToFragment(id3, "root", time.Now().Unix())
|
||||
|
@ -2370,7 +2370,7 @@ func TestActivityLog_CalculatePrecomputedQueriesWithMixedTWEs(t *testing.T) {
|
|||
// Pretend we've successfully rolled over to the following month
|
||||
a.SetStartTimestamp(tc.NextMonth)
|
||||
|
||||
err = a.precomputedQueryWorker()
|
||||
err = a.precomputedQueryWorker(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -2741,7 +2741,7 @@ func TestActivityLog_Precompute(t *testing.T) {
|
|||
// Pretend we've successfully rolled over to the following month
|
||||
a.SetStartTimestamp(tc.NextMonth)
|
||||
|
||||
err = a.precomputedQueryWorker()
|
||||
err = a.precomputedQueryWorker(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -3076,7 +3076,7 @@ func TestActivityLog_PrecomputeNonEntityTokensWithID(t *testing.T) {
|
|||
// Pretend we've successfully rolled over to the following month
|
||||
a.SetStartTimestamp(tc.NextMonth)
|
||||
|
||||
err = a.precomputedQueryWorker()
|
||||
err = a.precomputedQueryWorker(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -3201,7 +3201,7 @@ func TestActivityLog_PrecomputeCancel(t *testing.T) {
|
|||
// This will block if the shutdown didn't work.
|
||||
go func() {
|
||||
// We expect this to error because of BlockingInmemStorage
|
||||
_ = a.precomputedQueryWorker()
|
||||
_ = a.precomputedQueryWorker(namespace.RootContext(nil))
|
||||
close(done)
|
||||
}()
|
||||
|
||||
|
@ -3337,9 +3337,10 @@ func TestActivityLog_Deletion(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
ctx := namespace.RootContext(nil)
|
||||
t.Log("24 months")
|
||||
now := times[len(times)-1]
|
||||
err := a.retentionWorker(now, 24)
|
||||
err := a.retentionWorker(ctx, now, 24)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -3348,7 +3349,7 @@ func TestActivityLog_Deletion(t *testing.T) {
|
|||
}
|
||||
|
||||
t.Log("12 months")
|
||||
err = a.retentionWorker(now, 12)
|
||||
err = a.retentionWorker(ctx, now, 12)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -3360,7 +3361,7 @@ func TestActivityLog_Deletion(t *testing.T) {
|
|||
}
|
||||
|
||||
t.Log("1 month")
|
||||
err = a.retentionWorker(now, 1)
|
||||
err = a.retentionWorker(ctx, now, 1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -3371,7 +3372,7 @@ func TestActivityLog_Deletion(t *testing.T) {
|
|||
checkPresent(21)
|
||||
|
||||
t.Log("0 months")
|
||||
err = a.retentionWorker(now, 0)
|
||||
err = a.retentionWorker(ctx, now, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue