* routine: fix that acl stops replicating after regaining leadership (#12295) * routine: add TestManager_StopBlocking (#12295) * routine: update TestManager_StopBlocking (#12295)
This commit is contained in:
parent
e3d2b91e34
commit
e1b013d835
|
@ -10,15 +10,22 @@ import (
|
|||
|
||||
type Routine func(ctx context.Context) error
|
||||
|
||||
// cancelCh is the ctx.Done()
|
||||
// When cancel() is called, if the routine is running a blocking call (e.g. some ACL replication RPCs),
|
||||
// stoppedCh won't be closed till the blocking call returns, while cancelCh will be closed immediately.
|
||||
// cancelCh is used to properly detect routine running status between cancel() and close(stoppedCh)
|
||||
type routineTracker struct {
|
||||
cancel context.CancelFunc
|
||||
stoppedCh chan struct{} // closed when no longer running
|
||||
cancelCh <-chan struct{} // closed when ctx is done
|
||||
stoppedCh chan struct{} // closed when no longer running
|
||||
}
|
||||
|
||||
func (r *routineTracker) running() bool {
|
||||
select {
|
||||
case <-r.stoppedCh:
|
||||
return false
|
||||
case <-r.cancelCh:
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
|
@ -74,6 +81,7 @@ func (m *Manager) Start(ctx context.Context, name string, routine Routine) error
|
|||
rtCtx, cancel := context.WithCancel(ctx)
|
||||
instance := &routineTracker{
|
||||
cancel: cancel,
|
||||
cancelCh: ctx.Done(),
|
||||
stoppedCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
|
@ -97,10 +105,14 @@ func (m *Manager) execute(ctx context.Context, name string, routine Routine, don
|
|||
"error", err,
|
||||
)
|
||||
} else {
|
||||
m.logger.Debug("stopped routine", "routine", name)
|
||||
m.logger.Info("stopped routine", "routine", name)
|
||||
}
|
||||
}
|
||||
|
||||
// Caveat: The returned stoppedCh indicates that the routine is completed
|
||||
// It's possible that ctx is canceled, but stoppedCh not yet closed
|
||||
// Use mgr.IsRunning(name) than this stoppedCh to tell whether the
|
||||
// instance is still running (not cancelled or completed).
|
||||
func (m *Manager) Stop(name string) <-chan struct{} {
|
||||
instance := m.stopInstance(name)
|
||||
if instance == nil {
|
||||
|
@ -127,7 +139,7 @@ func (m *Manager) stopInstance(name string) *routineTracker {
|
|||
return instance
|
||||
}
|
||||
|
||||
m.logger.Debug("stopping routine", "routine", name)
|
||||
m.logger.Info("stopping routine", "routine", name)
|
||||
instance.cancel()
|
||||
|
||||
delete(m.routines, name)
|
||||
|
@ -145,7 +157,7 @@ func (m *Manager) StopAll() {
|
|||
if !routine.running() {
|
||||
continue
|
||||
}
|
||||
m.logger.Debug("stopping routine", "routine", name)
|
||||
m.logger.Info("stopping routine", "routine", name)
|
||||
routine.cancel()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -99,3 +99,58 @@ func TestManager_StopAll(t *testing.T) {
|
|||
require.False(r, mgr.IsRunning("run2"))
|
||||
})
|
||||
}
|
||||
|
||||
// Test IsRunning when routine is a blocking call that does not
|
||||
// immediately return when cancelled
|
||||
func TestManager_StopBlocking(t *testing.T) {
|
||||
t.Parallel()
|
||||
var runs uint32
|
||||
var running uint32
|
||||
unblock := make(chan struct{}) // To simulate a blocking call
|
||||
mgr := NewManager(testutil.Logger(t))
|
||||
|
||||
// A routine that will be still running for a while after cancelled
|
||||
run := func(ctx context.Context) error {
|
||||
atomic.StoreUint32(&running, 1)
|
||||
defer atomic.StoreUint32(&running, 0)
|
||||
atomic.AddUint32(&runs, 1)
|
||||
<-ctx.Done()
|
||||
<-unblock
|
||||
return nil
|
||||
}
|
||||
|
||||
require.NoError(t, mgr.Start(context.Background(), "blocking", run))
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.True(r, mgr.IsRunning("blocking"))
|
||||
require.Equal(r, uint32(1), atomic.LoadUint32(&runs))
|
||||
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
|
||||
})
|
||||
|
||||
doneCh := mgr.Stop("blocking")
|
||||
|
||||
// IsRunning should return false, however &running is still 1
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.False(r, mgr.IsRunning("blocking"))
|
||||
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
|
||||
})
|
||||
|
||||
// New routine should be able to replace old "cancelled but running" routine.
|
||||
require.NoError(t, mgr.Start(context.Background(), "blocking", func(ctx context.Context) error {
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
}))
|
||||
defer mgr.Stop("blocking")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.True(r, mgr.IsRunning("blocking")) // New routine
|
||||
require.Equal(r, uint32(1), atomic.LoadUint32(&running)) // Old routine
|
||||
})
|
||||
|
||||
// Complete the blocking routine
|
||||
close(unblock)
|
||||
<-doneCh
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, uint32(0), atomic.LoadUint32(&running))
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue