diff --git a/.changelog/11983.txt b/.changelog/11983.txt new file mode 100644 index 000000000..2bdc92ab2 --- /dev/null +++ b/.changelog/11983.txt @@ -0,0 +1,3 @@ +```release-note:bug +cleanup: prevent leaks from time.After +``` diff --git a/api/allocations_exec.go b/api/allocations_exec.go index 9f5e0e299..25f7955de 100644 --- a/api/allocations_exec.go +++ b/api/allocations_exec.go @@ -13,6 +13,12 @@ import ( "github.com/gorilla/websocket" ) +const ( + // heartbeatInterval is the amount of time to wait between sending heartbeats + // during an exec streaming operation + heartbeatInterval = 10 * time.Second +) + type execSession struct { client *Client alloc *Allocation @@ -177,15 +183,19 @@ func (s *execSession) startTransmit(ctx context.Context, conn *websocket.Conn) < // send a heartbeat every 10 seconds go func() { + t := time.NewTimer(heartbeatInterval) + defer t.Stop() + for { + t.Reset(heartbeatInterval) + select { case <-ctx.Done(): return - // heartbeat message - case <-time.After(10 * time.Second): + case <-t.C: + // heartbeat message send(&execStreamingInputHeartbeat) } - } }() diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index a5f53bc8c..3cf751cf1 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -527,6 +527,9 @@ func (tr *TaskRunner) Run() { return } + timer, stop := helper.NewSafeTimer(0) // timer duration calculated JIT + defer stop() + MAIN: for !tr.shouldShutdown() { select { @@ -612,9 +615,11 @@ MAIN: break MAIN } + timer.Reset(restartDelay) + // Actually restart by sleeping and also watching for destroy events select { - case <-time.After(restartDelay): + case <-timer.C: case <-tr.killCtx.Done(): tr.logger.Trace("task killed between restarts", "delay", restartDelay) break MAIN diff --git a/command/agent/consul/version_checker.go b/command/agent/consul/version_checker.go index 970958072..f31f016ea 100644 --- a/command/agent/consul/version_checker.go +++ b/command/agent/consul/version_checker.go @@ -7,6 +7,7 @@ import ( log "github.com/hashicorp/go-hclog" version "github.com/hashicorp/go-version" + "github.com/hashicorp/nomad/helper" ) // checkConsulTLSSkipVerify logs if Consul does not support TLSSkipVerify on @@ -20,6 +21,10 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age defer close(done) i := uint64(0) + + timer, stop := helper.NewSafeTimer(limit) + defer stop() + for { self, err := client.Self() if err == nil { @@ -39,10 +44,12 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age i++ } + timer.Reset(backoff) + select { case <-ctx.Done(): return - case <-time.After(backoff): + case <-timer.C: } } } diff --git a/command/agent/monitor/monitor.go b/command/agent/monitor/monitor.go index d788110c1..2a9108662 100644 --- a/command/agent/monitor/monitor.go +++ b/command/agent/monitor/monitor.go @@ -6,6 +6,7 @@ import ( "time" log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/helper" ) // Monitor provides a mechanism to stream logs using go-hclog @@ -107,12 +108,17 @@ func (d *monitor) Start() <-chan []byte { // dropped messages and makes room on the logCh // to add a dropped message count warning go func() { + timer, stop := helper.NewSafeTimer(d.droppedDuration) + defer stop() + // loop and check for dropped messages for { + timer.Reset(d.droppedDuration) + select { case <-d.doneCh: return - case <-time.After(d.droppedDuration): + case <-timer.C: d.Lock() // Check if there have been any dropped messages. diff --git a/drivers/docker/stats.go b/drivers/docker/stats.go index cb7bd3ab0..13531c6fb 100644 --- a/drivers/docker/stats.go +++ b/drivers/docker/stats.go @@ -10,6 +10,7 @@ import ( docker "github.com/fsouza/go-dockerclient" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/drivers/docker/util" + "github.com/hashicorp/nomad/helper" nstructs "github.com/hashicorp/nomad/nomad/structs" ) @@ -91,19 +92,27 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte defer destCh.close() // backoff and retry used if the docker stats API returns an error - var backoff time.Duration + var backoff time.Duration = 0 var retry int + + // create an interval timer + timer, stop := helper.NewSafeTimer(backoff) + defer stop() + // loops until doneCh is closed for { + timer.Reset(backoff) + if backoff > 0 { select { - case <-time.After(backoff): + case <-timer.C: case <-ctx.Done(): return case <-h.doneCh: return } } + // make a channel for docker stats structs and start a collector to // receive stats from docker and emit nomad stats // statsCh will always be closed by docker client. diff --git a/drivers/shared/eventer/eventer.go b/drivers/shared/eventer/eventer.go index e98c896a2..925b97cc8 100644 --- a/drivers/shared/eventer/eventer.go +++ b/drivers/shared/eventer/eventer.go @@ -6,6 +6,7 @@ import ( "time" hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/plugins/drivers" ) @@ -62,14 +63,19 @@ func NewEventer(ctx context.Context, logger hclog.Logger) *Eventer { // eventLoop is the main logic which pulls events from the channel and broadcasts // them to all consumers func (e *Eventer) eventLoop() { + timer, stop := helper.NewSafeTimer(ConsumerGCInterval) + defer stop() + for { + timer.Reset(ConsumerGCInterval) + select { case <-e.ctx.Done(): e.logger.Trace("task event loop shutdown") return case event := <-e.events: e.iterateConsumers(event) - case <-time.After(ConsumerGCInterval): + case <-timer.C: e.gcConsumers() } } diff --git a/helper/funcs.go b/helper/funcs.go index f55266819..c2b85a0b8 100644 --- a/helper/funcs.go +++ b/helper/funcs.go @@ -572,3 +572,30 @@ func PathEscapesSandbox(sandboxDir, path string) bool { } return false } + +// StopFunc is used to stop a time.Timer created with NewSafeTimer +type StopFunc func() + +// NewSafeTimer creates a time.Timer but does not panic if duration is <= 0. +// +// Using a time.Timer is recommended instead of time.After when it is necessary +// to avoid leaking goroutines (e.g. in a select inside a loop). +// +// Returns the time.Timer and also a StopFunc, forcing the caller to deal +// with stopping the time.Timer to avoid leaking a goroutine. +func NewSafeTimer(duration time.Duration) (*time.Timer, StopFunc) { + if duration <= 0 { + // Avoid panic by using the smallest positive value. This is close enough + // to the behavior of time.After(0), which this helper is intended to + // replace. + // https://go.dev/play/p/EIkm9MsPbHY + duration = 1 + } + + t := time.NewTimer(duration) + cancel := func() { + t.Stop() + } + + return t, cancel +} diff --git a/helper/funcs_test.go b/helper/funcs_test.go index 265e3f479..0eaad19c4 100644 --- a/helper/funcs_test.go +++ b/helper/funcs_test.go @@ -431,3 +431,17 @@ func TestPathEscapesSandbox(t *testing.T) { }) } } + +func Test_NewSafeTimer(t *testing.T) { + t.Run("zero", func(t *testing.T) { + timer, stop := NewSafeTimer(0) + defer stop() + <-timer.C + }) + + t.Run("positive", func(t *testing.T) { + timer, stop := NewSafeTimer(1) + defer stop() + <-timer.C + }) +} diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index edcb8e08d..aac55e978 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -706,9 +706,14 @@ func (b *BlockedEvals) Stats() *BlockedStats { // EmitStats is used to export metrics about the blocked eval tracker while enabled func (b *BlockedEvals) EmitStats(period time.Duration, stopCh <-chan struct{}) { + timer, stop := helper.NewSafeTimer(period) + defer stop() + for { + timer.Reset(period) + select { - case <-time.After(period): + case <-timer.C: stats := b.Stats() metrics.SetGauge([]string{"nomad", "blocked_evals", "total_quota_limit"}, float32(stats.TotalQuotaLimit)) metrics.SetGauge([]string{"nomad", "blocked_evals", "total_blocked"}, float32(stats.TotalBlocked)) diff --git a/nomad/drainer/watch_jobs.go b/nomad/drainer/watch_jobs.go index 51304f8e7..b3dbc842e 100644 --- a/nomad/drainer/watch_jobs.go +++ b/nomad/drainer/watch_jobs.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "sync" - "time" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" @@ -140,10 +139,17 @@ func (w *drainingJobWatcher) deregisterJob(jobID, namespace string) { // watch is the long lived watching routine that detects job drain changes. func (w *drainingJobWatcher) watch() { + timer, stop := helper.NewSafeTimer(stateReadErrorDelay) + defer stop() + waitIndex := uint64(1) + for { + timer.Reset(stateReadErrorDelay) + w.logger.Trace("getting job allocs at index", "index", waitIndex) jobAllocs, index, err := w.getJobAllocs(w.getQueryCtx(), waitIndex) + if err != nil { if err == context.Canceled { // Determine if it is a cancel or a shutdown @@ -164,7 +170,7 @@ func (w *drainingJobWatcher) watch() { case <-w.ctx.Done(): w.logger.Trace("shutting down") return - case <-time.After(stateReadErrorDelay): + case <-timer.C: continue } } diff --git a/nomad/drainer/watch_nodes.go b/nomad/drainer/watch_nodes.go index 0d91bba11..31ce4357e 100644 --- a/nomad/drainer/watch_nodes.go +++ b/nomad/drainer/watch_nodes.go @@ -2,10 +2,10 @@ package drainer import ( "context" - "time" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -148,8 +148,13 @@ func NewNodeDrainWatcher(ctx context.Context, limiter *rate.Limiter, state *stat // watch is the long lived watching routine that detects node changes. func (w *nodeDrainWatcher) watch() { + timer, stop := helper.NewSafeTimer(stateReadErrorDelay) + defer stop() + nindex := uint64(1) + for { + timer.Reset(stateReadErrorDelay) nodes, index, err := w.getNodes(nindex) if err != nil { if err == context.Canceled { @@ -160,7 +165,7 @@ func (w *nodeDrainWatcher) watch() { select { case <-w.ctx.Done(): return - case <-time.After(stateReadErrorDelay): + case <-timer.C: continue } } diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index bfab5b437..9aa7542dc 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -2,15 +2,15 @@ package nomad import ( "container/heap" + "context" "errors" "fmt" "math/rand" "sync" "time" - "context" - metrics "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/lib/delayheap" "github.com/hashicorp/nomad/nomad/structs" @@ -835,9 +835,14 @@ func (b *EvalBroker) Stats() *BrokerStats { // EmitStats is used to export metrics about the broker while enabled func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { + timer, stop := helper.NewSafeTimer(period) + defer stop() + for { + timer.Reset(period) + select { - case <-time.After(period): + case <-timer.C: stats := b.Stats() metrics.SetGauge([]string{"nomad", "broker", "total_ready"}, float32(stats.TotalReady)) metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked)) diff --git a/nomad/plan_queue.go b/nomad/plan_queue.go index 92d81d096..2ba35e068 100644 --- a/nomad/plan_queue.go +++ b/nomad/plan_queue.go @@ -7,6 +7,7 @@ import ( "time" metrics "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -196,12 +197,14 @@ func (q *PlanQueue) Stats() *QueueStats { // EmitStats is used to export metrics about the broker while enabled func (q *PlanQueue) EmitStats(period time.Duration, stopCh <-chan struct{}) { + timer, stop := helper.NewSafeTimer(period) + defer stop() + for { select { - case <-time.After(period): + case <-timer.C: stats := q.Stats() metrics.SetGauge([]string{"nomad", "plan", "queue_depth"}, float32(stats.Depth)) - case <-stopCh: return } diff --git a/nomad/server.go b/nomad/server.go index 1dfbd6ef9..d9f607aaa 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -27,6 +27,7 @@ import ( multierror "github.com/hashicorp/go-multierror" lru "github.com/hashicorp/golang-lru" "github.com/hashicorp/nomad/command/agent/consul" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/codec" "github.com/hashicorp/nomad/helper/pool" "github.com/hashicorp/nomad/helper/stats" @@ -1855,9 +1856,14 @@ func (s *Server) Stats() map[string]map[string]string { // EmitRaftStats is used to export metrics about raft indexes and state store snapshot index func (s *Server) EmitRaftStats(period time.Duration, stopCh <-chan struct{}) { + timer, stop := helper.NewSafeTimer(period) + defer stop() + for { + timer.Reset(period) + select { - case <-time.After(period): + case <-timer.C: lastIndex := s.raft.LastIndex() metrics.SetGauge([]string{"raft", "lastIndex"}, float32(lastIndex)) appliedIndex := s.raft.AppliedIndex() diff --git a/nomad/vault.go b/nomad/vault.go index 4e8a1ff13..c5012fa9b 100644 --- a/nomad/vault.go +++ b/nomad/vault.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/nomad/helper" tomb "gopkg.in/tomb.v2" metrics "github.com/armon/go-metrics" @@ -1423,9 +1424,14 @@ func (v *vaultClient) stats() *VaultStats { // EmitStats is used to export metrics about the blocked eval tracker while enabled func (v *vaultClient) EmitStats(period time.Duration, stopCh <-chan struct{}) { + timer, stop := helper.NewSafeTimer(period) + defer stop() + for { + timer.Reset(period) + select { - case <-time.After(period): + case <-timer.C: stats := v.stats() metrics.SetGauge([]string{"nomad", "vault", "distributed_tokens_revoking"}, float32(stats.TrackedForRevoke)) metrics.SetGauge([]string{"nomad", "vault", "token_ttl"}, float32(stats.TokenTTL/time.Millisecond))