ff26294d63
Some global variables are patched to shorter values in these tests. But the goroutines that read them can outlive the test because nothing waited for them to exit. This commit adds a Wait() method to the routine manager, so that tests can wait for the goroutines to exit. This prevents the data race because the 'reset to original value' can happen after all other goroutines have stopped.
162 lines
3 KiB
Go
162 lines
3 KiB
Go
package routine
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"sync"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
)
|
|
|
|
type Routine func(ctx context.Context) error
|
|
|
|
type routineTracker struct {
|
|
cancel context.CancelFunc
|
|
stoppedCh chan struct{} // closed when no longer running
|
|
}
|
|
|
|
func (r *routineTracker) running() bool {
|
|
select {
|
|
case <-r.stoppedCh:
|
|
return false
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
func (r *routineTracker) wait() {
|
|
<-r.stoppedCh
|
|
}
|
|
|
|
type Manager struct {
|
|
lock sync.RWMutex
|
|
logger hclog.Logger
|
|
|
|
routines map[string]*routineTracker
|
|
}
|
|
|
|
func NewManager(logger hclog.Logger) *Manager {
|
|
if logger == nil {
|
|
logger = hclog.New(&hclog.LoggerOptions{
|
|
Output: os.Stderr,
|
|
})
|
|
}
|
|
|
|
return &Manager{
|
|
logger: logger,
|
|
routines: make(map[string]*routineTracker),
|
|
}
|
|
}
|
|
|
|
func (m *Manager) IsRunning(name string) bool {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
|
|
if routine, ok := m.routines[name]; ok {
|
|
return routine.running()
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (m *Manager) Start(ctx context.Context, name string, routine Routine) error {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
|
|
if instance, ok := m.routines[name]; ok && instance.running() {
|
|
return nil
|
|
}
|
|
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
rtCtx, cancel := context.WithCancel(ctx)
|
|
instance := &routineTracker{
|
|
cancel: cancel,
|
|
stoppedCh: make(chan struct{}),
|
|
}
|
|
|
|
go m.execute(rtCtx, name, routine, instance.stoppedCh)
|
|
|
|
m.routines[name] = instance
|
|
m.logger.Info("started routine", "routine", name)
|
|
return nil
|
|
}
|
|
|
|
// execute will run the given routine in the foreground and close the given channel when its done executing
|
|
func (m *Manager) execute(ctx context.Context, name string, routine Routine, done chan struct{}) {
|
|
defer func() {
|
|
close(done)
|
|
}()
|
|
|
|
err := routine(ctx)
|
|
if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
|
|
m.logger.Error("routine exited with error",
|
|
"routine", name,
|
|
"error", err,
|
|
)
|
|
} else {
|
|
m.logger.Debug("stopped routine", "routine", name)
|
|
}
|
|
}
|
|
|
|
func (m *Manager) Stop(name string) <-chan struct{} {
|
|
instance := m.stopInstance(name)
|
|
if instance == nil {
|
|
// Fabricate a closed channel so it won't block forever.
|
|
ch := make(chan struct{})
|
|
close(ch)
|
|
return ch
|
|
}
|
|
|
|
return instance.stoppedCh
|
|
}
|
|
|
|
func (m *Manager) stopInstance(name string) *routineTracker {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
|
|
instance, ok := m.routines[name]
|
|
if !ok {
|
|
// no running instance
|
|
return nil
|
|
}
|
|
|
|
if !instance.running() {
|
|
return instance
|
|
}
|
|
|
|
m.logger.Debug("stopping routine", "routine", name)
|
|
instance.cancel()
|
|
|
|
delete(m.routines, name)
|
|
|
|
return instance
|
|
}
|
|
|
|
// StopAll goroutines. Once StopAll is called, it is no longer safe to add no
|
|
// goroutines to the Manager.
|
|
func (m *Manager) StopAll() {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
|
|
for name, routine := range m.routines {
|
|
if !routine.running() {
|
|
continue
|
|
}
|
|
m.logger.Debug("stopping routine", "routine", name)
|
|
routine.cancel()
|
|
}
|
|
}
|
|
|
|
// Wait for all goroutines to stop after StopAll is called.
|
|
func (m *Manager) Wait() {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
|
|
for _, routine := range m.routines {
|
|
routine.wait()
|
|
}
|
|
}
|