cleanup: prevent leaks from time.After

This PR replaces use of time.After with a safe helper function
that creates a time.Timer to use instead. The new function returns
both a time.Timer and a Stop function that the caller must handle.

Unlike time.NewTimer, the helper function does not panic if the duration
set is <= 0.
This commit is contained in:
Seth Hoenig 2022-02-02 10:59:53 -06:00
parent c4cff5359f
commit db2347a86c
16 changed files with 144 additions and 21 deletions

3
.changelog/11983.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
cleanup: prevent leaks from time.After
```

View File

@ -13,6 +13,12 @@ import (
"github.com/gorilla/websocket"
)
const (
// heartbeatInterval is the amount of time to wait between sending heartbeats
// during an exec streaming operation
heartbeatInterval = 10 * time.Second
)
type execSession struct {
client *Client
alloc *Allocation
@ -177,15 +183,19 @@ func (s *execSession) startTransmit(ctx context.Context, conn *websocket.Conn) <
// send a heartbeat every 10 seconds
go func() {
t := time.NewTimer(heartbeatInterval)
defer t.Stop()
for {
t.Reset(heartbeatInterval)
select {
case <-ctx.Done():
return
// heartbeat message
case <-time.After(10 * time.Second):
case <-t.C:
// heartbeat message
send(&execStreamingInputHeartbeat)
}
}
}()

View File

@ -527,6 +527,9 @@ func (tr *TaskRunner) Run() {
return
}
timer, stop := helper.NewSafeTimer(0) // timer duration calculated JIT
defer stop()
MAIN:
for !tr.shouldShutdown() {
select {
@ -612,9 +615,11 @@ MAIN:
break MAIN
}
timer.Reset(restartDelay)
// Actually restart by sleeping and also watching for destroy events
select {
case <-time.After(restartDelay):
case <-timer.C:
case <-tr.killCtx.Done():
tr.logger.Trace("task killed between restarts", "delay", restartDelay)
break MAIN

View File

@ -7,6 +7,7 @@ import (
log "github.com/hashicorp/go-hclog"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper"
)
// checkConsulTLSSkipVerify logs if Consul does not support TLSSkipVerify on
@ -20,6 +21,10 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age
defer close(done)
i := uint64(0)
timer, stop := helper.NewSafeTimer(limit)
defer stop()
for {
self, err := client.Self()
if err == nil {
@ -39,10 +44,12 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age
i++
}
timer.Reset(backoff)
select {
case <-ctx.Done():
return
case <-time.After(backoff):
case <-timer.C:
}
}
}

View File

@ -6,6 +6,7 @@ import (
"time"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
)
// Monitor provides a mechanism to stream logs using go-hclog
@ -107,12 +108,17 @@ func (d *monitor) Start() <-chan []byte {
// dropped messages and makes room on the logCh
// to add a dropped message count warning
go func() {
timer, stop := helper.NewSafeTimer(d.droppedDuration)
defer stop()
// loop and check for dropped messages
for {
timer.Reset(d.droppedDuration)
select {
case <-d.doneCh:
return
case <-time.After(d.droppedDuration):
case <-timer.C:
d.Lock()
// Check if there have been any dropped messages.

View File

@ -10,6 +10,7 @@ import (
docker "github.com/fsouza/go-dockerclient"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/docker/util"
"github.com/hashicorp/nomad/helper"
nstructs "github.com/hashicorp/nomad/nomad/structs"
)
@ -91,19 +92,27 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
defer destCh.close()
// backoff and retry used if the docker stats API returns an error
var backoff time.Duration
var backoff time.Duration = 0
var retry int
// create an interval timer
timer, stop := helper.NewSafeTimer(backoff)
defer stop()
// loops until doneCh is closed
for {
timer.Reset(backoff)
if backoff > 0 {
select {
case <-time.After(backoff):
case <-timer.C:
case <-ctx.Done():
return
case <-h.doneCh:
return
}
}
// make a channel for docker stats structs and start a collector to
// receive stats from docker and emit nomad stats
// statsCh will always be closed by docker client.

View File

@ -6,6 +6,7 @@ import (
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/plugins/drivers"
)
@ -62,14 +63,19 @@ func NewEventer(ctx context.Context, logger hclog.Logger) *Eventer {
// eventLoop is the main logic which pulls events from the channel and broadcasts
// them to all consumers
func (e *Eventer) eventLoop() {
timer, stop := helper.NewSafeTimer(ConsumerGCInterval)
defer stop()
for {
timer.Reset(ConsumerGCInterval)
select {
case <-e.ctx.Done():
e.logger.Trace("task event loop shutdown")
return
case event := <-e.events:
e.iterateConsumers(event)
case <-time.After(ConsumerGCInterval):
case <-timer.C:
e.gcConsumers()
}
}

View File

@ -572,3 +572,30 @@ func PathEscapesSandbox(sandboxDir, path string) bool {
}
return false
}
// StopFunc is used to stop a time.Timer created with NewSafeTimer
type StopFunc func()
// NewSafeTimer creates a time.Timer but does not panic if duration is <= 0.
//
// Using a time.Timer is recommended instead of time.After when it is necessary
// to avoid leaking goroutines (e.g. in a select inside a loop).
//
// Returns the time.Timer and also a StopFunc, forcing the caller to deal
// with stopping the time.Timer to avoid leaking a goroutine.
func NewSafeTimer(duration time.Duration) (*time.Timer, StopFunc) {
if duration <= 0 {
// Avoid panic by using the smallest positive value. This is close enough
// to the behavior of time.After(0), which this helper is intended to
// replace.
// https://go.dev/play/p/EIkm9MsPbHY
duration = 1
}
t := time.NewTimer(duration)
cancel := func() {
t.Stop()
}
return t, cancel
}

View File

@ -431,3 +431,17 @@ func TestPathEscapesSandbox(t *testing.T) {
})
}
}
func Test_NewSafeTimer(t *testing.T) {
t.Run("zero", func(t *testing.T) {
timer, stop := NewSafeTimer(0)
defer stop()
<-timer.C
})
t.Run("positive", func(t *testing.T) {
timer, stop := NewSafeTimer(1)
defer stop()
<-timer.C
})
}

View File

@ -706,9 +706,14 @@ func (b *BlockedEvals) Stats() *BlockedStats {
// EmitStats is used to export metrics about the blocked eval tracker while enabled
func (b *BlockedEvals) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()
for {
timer.Reset(period)
select {
case <-time.After(period):
case <-timer.C:
stats := b.Stats()
metrics.SetGauge([]string{"nomad", "blocked_evals", "total_quota_limit"}, float32(stats.TotalQuotaLimit))
metrics.SetGauge([]string{"nomad", "blocked_evals", "total_blocked"}, float32(stats.TotalBlocked))

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
@ -140,10 +139,17 @@ func (w *drainingJobWatcher) deregisterJob(jobID, namespace string) {
// watch is the long lived watching routine that detects job drain changes.
func (w *drainingJobWatcher) watch() {
timer, stop := helper.NewSafeTimer(stateReadErrorDelay)
defer stop()
waitIndex := uint64(1)
for {
timer.Reset(stateReadErrorDelay)
w.logger.Trace("getting job allocs at index", "index", waitIndex)
jobAllocs, index, err := w.getJobAllocs(w.getQueryCtx(), waitIndex)
if err != nil {
if err == context.Canceled {
// Determine if it is a cancel or a shutdown
@ -164,7 +170,7 @@ func (w *drainingJobWatcher) watch() {
case <-w.ctx.Done():
w.logger.Trace("shutting down")
return
case <-time.After(stateReadErrorDelay):
case <-timer.C:
continue
}
}

View File

@ -2,10 +2,10 @@ package drainer
import (
"context"
"time"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@ -148,8 +148,13 @@ func NewNodeDrainWatcher(ctx context.Context, limiter *rate.Limiter, state *stat
// watch is the long lived watching routine that detects node changes.
func (w *nodeDrainWatcher) watch() {
timer, stop := helper.NewSafeTimer(stateReadErrorDelay)
defer stop()
nindex := uint64(1)
for {
timer.Reset(stateReadErrorDelay)
nodes, index, err := w.getNodes(nindex)
if err != nil {
if err == context.Canceled {
@ -160,7 +165,7 @@ func (w *nodeDrainWatcher) watch() {
select {
case <-w.ctx.Done():
return
case <-time.After(stateReadErrorDelay):
case <-timer.C:
continue
}
}

View File

@ -2,15 +2,15 @@ package nomad
import (
"container/heap"
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"
"context"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/delayheap"
"github.com/hashicorp/nomad/nomad/structs"
@ -835,9 +835,14 @@ func (b *EvalBroker) Stats() *BrokerStats {
// EmitStats is used to export metrics about the broker while enabled
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()
for {
timer.Reset(period)
select {
case <-time.After(period):
case <-timer.C:
stats := b.Stats()
metrics.SetGauge([]string{"nomad", "broker", "total_ready"}, float32(stats.TotalReady))
metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked))

View File

@ -7,6 +7,7 @@ import (
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -196,12 +197,14 @@ func (q *PlanQueue) Stats() *QueueStats {
// EmitStats is used to export metrics about the broker while enabled
func (q *PlanQueue) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()
for {
select {
case <-time.After(period):
case <-timer.C:
stats := q.Stats()
metrics.SetGauge([]string{"nomad", "plan", "queue_depth"}, float32(stats.Depth))
case <-stopCh:
return
}

View File

@ -27,6 +27,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/codec"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/helper/stats"
@ -1855,9 +1856,14 @@ func (s *Server) Stats() map[string]map[string]string {
// EmitRaftStats is used to export metrics about raft indexes and state store snapshot index
func (s *Server) EmitRaftStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()
for {
timer.Reset(period)
select {
case <-time.After(period):
case <-timer.C:
lastIndex := s.raft.LastIndex()
metrics.SetGauge([]string{"raft", "lastIndex"}, float32(lastIndex))
appliedIndex := s.raft.AppliedIndex()

View File

@ -11,6 +11,7 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/nomad/helper"
tomb "gopkg.in/tomb.v2"
metrics "github.com/armon/go-metrics"
@ -1423,9 +1424,14 @@ func (v *vaultClient) stats() *VaultStats {
// EmitStats is used to export metrics about the blocked eval tracker while enabled
func (v *vaultClient) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()
for {
timer.Reset(period)
select {
case <-time.After(period):
case <-timer.C:
stats := v.stats()
metrics.SetGauge([]string{"nomad", "vault", "distributed_tokens_revoking"}, float32(stats.TrackedForRevoke))
metrics.SetGauge([]string{"nomad", "vault", "token_ttl"}, float32(stats.TokenTTL/time.Millisecond))