consul: add test for check watcher deadlock
This commit is contained in:
parent
826d2503e6
commit
6d095b3b36
|
@ -3,12 +3,14 @@ package consul
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -34,6 +36,8 @@ type fakeCheckRestarter struct {
|
|||
allocID string
|
||||
taskName string
|
||||
checkName string
|
||||
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// newFakeCheckRestart creates a new TaskRestarter. It needs all of the
|
||||
|
@ -54,6 +58,8 @@ func newFakeCheckRestarter(w *checkWatcher, allocID, taskName, checkName string,
|
|||
// Restarts are recorded in the []restarts field and re-Watch the check.
|
||||
//func (c *fakeCheckRestarter) Restart(source, reason string, failure bool) {
|
||||
func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
restart := checkRestartRecord{
|
||||
timestamp: time.Now(),
|
||||
source: event.Type,
|
||||
|
@ -69,6 +75,9 @@ func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEve
|
|||
|
||||
// String for debugging
|
||||
func (c *fakeCheckRestarter) String() string {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
s := fmt.Sprintf("%s %s %s restarts:\n", c.allocID, c.taskName, c.checkName)
|
||||
for _, r := range c.restarts {
|
||||
s += fmt.Sprintf("%s - %s: %s (failure: %t)\n", r.timestamp, r.source, r.reason, r.failure)
|
||||
|
@ -76,6 +85,16 @@ func (c *fakeCheckRestarter) String() string {
|
|||
return s
|
||||
}
|
||||
|
||||
// GetRestarts for testing in a threadsafe way
|
||||
func (c *fakeCheckRestarter) GetRestarts() []checkRestartRecord {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
o := make([]checkRestartRecord, len(c.restarts))
|
||||
copy(o, c.restarts)
|
||||
return o
|
||||
}
|
||||
|
||||
// checkResponse is a response returned by the fakeChecksAPI after the given
|
||||
// time.
|
||||
type checkResponse struct {
|
||||
|
@ -89,6 +108,8 @@ type fakeChecksAPI struct {
|
|||
// responses is a map of check ids to their status at a particular
|
||||
// time. checkResponses must be in chronological order.
|
||||
responses map[string][]checkResponse
|
||||
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newFakeChecksAPI() *fakeChecksAPI {
|
||||
|
@ -97,10 +118,14 @@ func newFakeChecksAPI() *fakeChecksAPI {
|
|||
|
||||
// add a new check status to Consul at the given time.
|
||||
func (c *fakeChecksAPI) add(id, status string, at time.Time) {
|
||||
c.mu.Lock()
|
||||
c.responses[id] = append(c.responses[id], checkResponse{at, id, status})
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *fakeChecksAPI) Checks() (map[string]*api.AgentCheck, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
now := time.Now()
|
||||
result := make(map[string]*api.AgentCheck, len(c.responses))
|
||||
|
||||
|
@ -350,3 +375,57 @@ func TestCheckWatcher_MultipleChecks(t *testing.T) {
|
|||
t.Errorf("expected check 3 to not be restarted but found %d:\n%s", n, restarter3)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCheckWatcher_Deadlock asserts that check watcher will not deadlock when
|
||||
// attempting to restart a task even if its update queue is full.
|
||||
// https://github.com/hashicorp/nomad/issues/5395
|
||||
func TestCheckWatcher_Deadlock(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fakeAPI, cw := testWatcherSetup(t)
|
||||
|
||||
// If TR.Restart blocks, restarting len(checkUpdateCh)+1 checks causes
|
||||
// a deadlock due to checkWatcher.Run being blocked in
|
||||
// checkRestart.apply and unable to process updates from the chan!
|
||||
n := cap(cw.checkUpdateCh) + 1
|
||||
checks := make([]*structs.ServiceCheck, n)
|
||||
restarters := make([]*fakeCheckRestarter, n)
|
||||
for i := 0; i < n; i++ {
|
||||
c := testCheck()
|
||||
r := newFakeCheckRestarter(cw,
|
||||
fmt.Sprintf("alloc%d", i),
|
||||
fmt.Sprintf("task%d", i),
|
||||
fmt.Sprintf("check%d", i),
|
||||
c,
|
||||
)
|
||||
checks[i] = c
|
||||
restarters[i] = r
|
||||
}
|
||||
|
||||
// Run
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go cw.Run(ctx)
|
||||
|
||||
// Watch
|
||||
for _, r := range restarters {
|
||||
cw.Watch(r.allocID, r.taskName, r.checkName, r.check, r)
|
||||
}
|
||||
|
||||
// Make them all fail
|
||||
for _, r := range restarters {
|
||||
fakeAPI.add(r.checkName, "critical", time.Time{})
|
||||
}
|
||||
|
||||
// Ensure that restart was called exactly once on all checks
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
for _, r := range restarters {
|
||||
if n := len(r.GetRestarts()); n != 1 {
|
||||
return false, fmt.Errorf("expected 1 restart but found %d", n)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue