519429a2de
This PR changes Nomad's wrapper around the Consul NamespaceAPI so that it will detect if the Consul Namespaces feature is enabled before making a request to the Namespaces API. Namespaces are not enabled in Consul OSS, and require a suitable license to be used with Consul ENT. Previously Nomad would check for a 404 status code when makeing a request to the Namespaces API to "detect" if Consul OSS was being used. This does not work for Consul ENT with Namespaces disabled, which returns a 500. Now we avoid requesting the namespace API altogether if Consul is detected to be the OSS sku, or if the Namespaces feature is not licensed. Since Consul can be upgraded from OSS to ENT, or a new license applied, we cache the value for 1 minute, refreshing on demand if expired. Fixes https://github.com/hashicorp/nomad-enterprise/issues/575 Note that the ticket originally describes using attributes from https://github.com/hashicorp/nomad/issues/10688. This turns out not to be possible due to a chicken-egg situation between bootstrapping the agent and setting up the consul client. Also fun: the Consul fingerprinter creates its own Consul client, because there is no [currently] no way to pass the agent's client through the fingerprint factory.
438 lines
13 KiB
Go
438 lines
13 KiB
Go
package consul
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/nomad/helper/testlog"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// checkRestartRecord is used by a testFakeCtx to record when restarts occur
|
|
// due to a watched check.
|
|
type checkRestartRecord struct {
|
|
timestamp time.Time
|
|
source string
|
|
reason string
|
|
failure bool
|
|
}
|
|
|
|
// fakeCheckRestarter is a test implementation of TaskRestarter.
|
|
type fakeCheckRestarter struct {
|
|
// restarts is a slice of all of the restarts triggered by the checkWatcher
|
|
restarts []checkRestartRecord
|
|
|
|
// need the checkWatcher to re-Watch restarted tasks like TaskRunner
|
|
watcher *checkWatcher
|
|
|
|
// check to re-Watch on restarts
|
|
check *structs.ServiceCheck
|
|
allocID string
|
|
taskName string
|
|
checkName string
|
|
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// newFakeCheckRestart creates a new TaskRestarter. It needs all of the
|
|
// parameters checkWatcher.Watch expects.
|
|
func newFakeCheckRestarter(w *checkWatcher, allocID, taskName, checkName string, c *structs.ServiceCheck) *fakeCheckRestarter {
|
|
return &fakeCheckRestarter{
|
|
watcher: w,
|
|
check: c,
|
|
allocID: allocID,
|
|
taskName: taskName,
|
|
checkName: checkName,
|
|
}
|
|
}
|
|
|
|
// Restart implements part of the TaskRestarter interface needed for check
|
|
// watching and is normally fulfilled by a TaskRunner.
|
|
//
|
|
// Restarts are recorded in the []restarts field and re-Watch the check.
|
|
//func (c *fakeCheckRestarter) Restart(source, reason string, failure bool) {
|
|
func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
restart := checkRestartRecord{
|
|
timestamp: time.Now(),
|
|
source: event.Type,
|
|
reason: event.DisplayMessage,
|
|
failure: failure,
|
|
}
|
|
c.restarts = append(c.restarts, restart)
|
|
|
|
// Re-Watch the check just like TaskRunner
|
|
c.watcher.Watch(c.allocID, c.taskName, c.checkName, c.check, c)
|
|
return nil
|
|
}
|
|
|
|
// String for debugging
|
|
func (c *fakeCheckRestarter) String() string {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
s := fmt.Sprintf("%s %s %s restarts:\n", c.allocID, c.taskName, c.checkName)
|
|
for _, r := range c.restarts {
|
|
s += fmt.Sprintf("%s - %s: %s (failure: %t)\n", r.timestamp, r.source, r.reason, r.failure)
|
|
}
|
|
return s
|
|
}
|
|
|
|
// GetRestarts for testing in a threadsafe way
|
|
func (c *fakeCheckRestarter) GetRestarts() []checkRestartRecord {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
o := make([]checkRestartRecord, len(c.restarts))
|
|
copy(o, c.restarts)
|
|
return o
|
|
}
|
|
|
|
// checkResponse is a response returned by the fakeChecksAPI after the given
|
|
// time.
|
|
type checkResponse struct {
|
|
at time.Time
|
|
id string
|
|
status string
|
|
}
|
|
|
|
// fakeChecksAPI implements the Checks() method for testing Consul.
|
|
type fakeChecksAPI struct {
|
|
// responses is a map of check ids to their status at a particular
|
|
// time. checkResponses must be in chronological order.
|
|
responses map[string][]checkResponse
|
|
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func newFakeChecksAPI() *fakeChecksAPI {
|
|
return &fakeChecksAPI{responses: make(map[string][]checkResponse)}
|
|
}
|
|
|
|
// add a new check status to Consul at the given time.
|
|
func (c *fakeChecksAPI) add(id, status string, at time.Time) {
|
|
c.mu.Lock()
|
|
c.responses[id] = append(c.responses[id], checkResponse{at, id, status})
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
func (c *fakeChecksAPI) ChecksWithFilterOpts(filter string, opts *api.QueryOptions) (map[string]*api.AgentCheck, error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
now := time.Now()
|
|
result := make(map[string]*api.AgentCheck, len(c.responses))
|
|
|
|
// Use the latest response for each check
|
|
for k, vs := range c.responses {
|
|
for _, v := range vs {
|
|
if v.at.After(now) {
|
|
break
|
|
}
|
|
result[k] = &api.AgentCheck{
|
|
CheckID: k,
|
|
Name: k,
|
|
Status: v.status,
|
|
}
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// testWatcherSetup sets up a fakeChecksAPI and a real checkWatcher with a test
|
|
// logger and faster poll frequency.
|
|
func testWatcherSetup(t *testing.T) (*fakeChecksAPI, *checkWatcher) {
|
|
logger := testlog.HCLogger(t)
|
|
checksAPI := newFakeChecksAPI()
|
|
namespacesClient := NewNamespacesClient(NewMockNamespaces(nil), NewMockAgent(ossFeatures))
|
|
cw := newCheckWatcher(logger, checksAPI, namespacesClient)
|
|
cw.pollFreq = 10 * time.Millisecond
|
|
return checksAPI, cw
|
|
}
|
|
|
|
func testCheck() *structs.ServiceCheck {
|
|
return &structs.ServiceCheck{
|
|
Name: "testcheck",
|
|
Interval: 100 * time.Millisecond,
|
|
Timeout: 100 * time.Millisecond,
|
|
CheckRestart: &structs.CheckRestart{
|
|
Limit: 3,
|
|
Grace: 100 * time.Millisecond,
|
|
IgnoreWarnings: false,
|
|
},
|
|
}
|
|
}
|
|
|
|
// TestCheckWatcher_Skip asserts unwatched checks are ignored.
|
|
func TestCheckWatcher_Skip(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Create a check with restarting disabled
|
|
check := testCheck()
|
|
check.CheckRestart = nil
|
|
|
|
logger := testlog.HCLogger(t)
|
|
checksAPI := newFakeChecksAPI()
|
|
namespacesClient := NewNamespacesClient(NewMockNamespaces(nil), NewMockAgent(ossFeatures))
|
|
|
|
cw := newCheckWatcher(logger, checksAPI, namespacesClient)
|
|
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check)
|
|
cw.Watch("testalloc1", "testtask1", "testcheck1", check, restarter1)
|
|
|
|
// Check should have been dropped as it's not watched
|
|
if n := len(cw.checkUpdateCh); n != 0 {
|
|
t.Fatalf("expected 0 checks to be enqueued for watching but found %d", n)
|
|
}
|
|
}
|
|
|
|
// TestCheckWatcher_Healthy asserts healthy tasks are not restarted.
|
|
func TestCheckWatcher_Healthy(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
fakeAPI, cw := testWatcherSetup(t)
|
|
|
|
check1 := testCheck()
|
|
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
|
|
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)
|
|
|
|
check2 := testCheck()
|
|
check2.CheckRestart.Limit = 1
|
|
check2.CheckRestart.Grace = 0
|
|
restarter2 := newFakeCheckRestarter(cw, "testalloc2", "testtask2", "testcheck2", check2)
|
|
cw.Watch("testalloc2", "testtask2", "testcheck2", check2, restarter2)
|
|
|
|
// Make both checks healthy from the beginning
|
|
fakeAPI.add("testcheck1", "passing", time.Time{})
|
|
fakeAPI.add("testcheck2", "passing", time.Time{})
|
|
|
|
// Run
|
|
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
|
defer cancel()
|
|
cw.Run(ctx)
|
|
|
|
// Ensure restart was never called
|
|
if n := len(restarter1.restarts); n > 0 {
|
|
t.Errorf("expected check 1 to not be restarted but found %d:\n%s", n, restarter1)
|
|
}
|
|
if n := len(restarter2.restarts); n > 0 {
|
|
t.Errorf("expected check 2 to not be restarted but found %d:\n%s", n, restarter2)
|
|
}
|
|
}
|
|
|
|
// TestCheckWatcher_Unhealthy asserts unhealthy tasks are restarted exactly once.
|
|
func TestCheckWatcher_Unhealthy(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
fakeAPI, cw := testWatcherSetup(t)
|
|
|
|
check1 := testCheck()
|
|
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
|
|
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)
|
|
|
|
// Check has always been failing
|
|
fakeAPI.add("testcheck1", "critical", time.Time{})
|
|
|
|
// Run
|
|
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
|
defer cancel()
|
|
cw.Run(ctx)
|
|
|
|
// Ensure restart was called exactly once
|
|
require.Len(t, restarter1.restarts, 1)
|
|
}
|
|
|
|
// TestCheckWatcher_HealthyWarning asserts checks in warning with
|
|
// ignore_warnings=true do not restart tasks.
|
|
func TestCheckWatcher_HealthyWarning(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
fakeAPI, cw := testWatcherSetup(t)
|
|
|
|
check1 := testCheck()
|
|
check1.CheckRestart.Limit = 1
|
|
check1.CheckRestart.Grace = 0
|
|
check1.CheckRestart.IgnoreWarnings = true
|
|
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
|
|
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)
|
|
|
|
// Check is always in warning but that's ok
|
|
fakeAPI.add("testcheck1", "warning", time.Time{})
|
|
|
|
// Run
|
|
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
|
defer cancel()
|
|
cw.Run(ctx)
|
|
|
|
// Ensure restart was never called on check 1
|
|
if n := len(restarter1.restarts); n > 0 {
|
|
t.Errorf("expected check 1 to not be restarted but found %d", n)
|
|
}
|
|
}
|
|
|
|
// TestCheckWatcher_Flapping asserts checks that flap from healthy to unhealthy
|
|
// before the unhealthy limit is reached do not restart tasks.
|
|
func TestCheckWatcher_Flapping(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
fakeAPI, cw := testWatcherSetup(t)
|
|
|
|
check1 := testCheck()
|
|
check1.CheckRestart.Grace = 0
|
|
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
|
|
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)
|
|
|
|
// Check flaps and is never failing for the full 200ms needed to restart
|
|
now := time.Now()
|
|
fakeAPI.add("testcheck1", "passing", now)
|
|
fakeAPI.add("testcheck1", "critical", now.Add(100*time.Millisecond))
|
|
fakeAPI.add("testcheck1", "passing", now.Add(250*time.Millisecond))
|
|
fakeAPI.add("testcheck1", "critical", now.Add(300*time.Millisecond))
|
|
fakeAPI.add("testcheck1", "passing", now.Add(450*time.Millisecond))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond)
|
|
defer cancel()
|
|
cw.Run(ctx)
|
|
|
|
// Ensure restart was never called on check 1
|
|
if n := len(restarter1.restarts); n > 0 {
|
|
t.Errorf("expected check 1 to not be restarted but found %d\n%s", n, restarter1)
|
|
}
|
|
}
|
|
|
|
// TestCheckWatcher_Unwatch asserts unwatching checks prevents restarts.
|
|
func TestCheckWatcher_Unwatch(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
fakeAPI, cw := testWatcherSetup(t)
|
|
|
|
// Unwatch immediately
|
|
check1 := testCheck()
|
|
check1.CheckRestart.Limit = 1
|
|
check1.CheckRestart.Grace = 100 * time.Millisecond
|
|
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
|
|
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)
|
|
cw.Unwatch("testcheck1")
|
|
|
|
// Always failing
|
|
fakeAPI.add("testcheck1", "critical", time.Time{})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
|
|
defer cancel()
|
|
cw.Run(ctx)
|
|
|
|
// Ensure restart was never called on check 1
|
|
if n := len(restarter1.restarts); n > 0 {
|
|
t.Errorf("expected check 1 to not be restarted but found %d\n%s", n, restarter1)
|
|
}
|
|
}
|
|
|
|
// TestCheckWatcher_MultipleChecks asserts that when there are multiple checks
|
|
// for a single task, all checks should be removed when any of them restart the
|
|
// task to avoid multiple restarts.
|
|
func TestCheckWatcher_MultipleChecks(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
fakeAPI, cw := testWatcherSetup(t)
|
|
|
|
check1 := testCheck()
|
|
check1.CheckRestart.Limit = 1
|
|
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
|
|
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)
|
|
|
|
check2 := testCheck()
|
|
check2.CheckRestart.Limit = 1
|
|
restarter2 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck2", check2)
|
|
cw.Watch("testalloc1", "testtask1", "testcheck2", check2, restarter2)
|
|
|
|
check3 := testCheck()
|
|
check3.CheckRestart.Limit = 1
|
|
restarter3 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck3", check3)
|
|
cw.Watch("testalloc1", "testtask1", "testcheck3", check3, restarter3)
|
|
|
|
// check 2 & 3 fail long enough to cause 1 restart, but only 1 should restart
|
|
now := time.Now()
|
|
fakeAPI.add("testcheck1", "critical", now)
|
|
fakeAPI.add("testcheck1", "passing", now.Add(150*time.Millisecond))
|
|
fakeAPI.add("testcheck2", "critical", now)
|
|
fakeAPI.add("testcheck2", "passing", now.Add(150*time.Millisecond))
|
|
fakeAPI.add("testcheck3", "passing", time.Time{})
|
|
|
|
// Run
|
|
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
|
defer cancel()
|
|
cw.Run(ctx)
|
|
|
|
// Ensure that restart was only called once on check 1 or 2. Since
|
|
// checks are in a map it's random which check triggers the restart
|
|
// first.
|
|
if n := len(restarter1.restarts) + len(restarter2.restarts); n != 1 {
|
|
t.Errorf("expected check 1 & 2 to be restarted 1 time but found %d\ncheck 1:\n%s\ncheck 2:%s",
|
|
n, restarter1, restarter2)
|
|
}
|
|
|
|
if n := len(restarter3.restarts); n != 0 {
|
|
t.Errorf("expected check 3 to not be restarted but found %d:\n%s", n, restarter3)
|
|
}
|
|
}
|
|
|
|
// TestCheckWatcher_Deadlock asserts that check watcher will not deadlock when
|
|
// attempting to restart a task even if its update queue is full.
|
|
// https://github.com/hashicorp/nomad/issues/5395
|
|
func TestCheckWatcher_Deadlock(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
fakeAPI, cw := testWatcherSetup(t)
|
|
|
|
// If TR.Restart blocks, restarting len(checkUpdateCh)+1 checks causes
|
|
// a deadlock due to checkWatcher.Run being blocked in
|
|
// checkRestart.apply and unable to process updates from the chan!
|
|
n := cap(cw.checkUpdateCh) + 1
|
|
checks := make([]*structs.ServiceCheck, n)
|
|
restarters := make([]*fakeCheckRestarter, n)
|
|
for i := 0; i < n; i++ {
|
|
c := testCheck()
|
|
r := newFakeCheckRestarter(cw,
|
|
fmt.Sprintf("alloc%d", i),
|
|
fmt.Sprintf("task%d", i),
|
|
fmt.Sprintf("check%d", i),
|
|
c,
|
|
)
|
|
checks[i] = c
|
|
restarters[i] = r
|
|
}
|
|
|
|
// Run
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go cw.Run(ctx)
|
|
|
|
// Watch
|
|
for _, r := range restarters {
|
|
cw.Watch(r.allocID, r.taskName, r.checkName, r.check, r)
|
|
}
|
|
|
|
// Make them all fail
|
|
for _, r := range restarters {
|
|
fakeAPI.add(r.checkName, "critical", time.Time{})
|
|
}
|
|
|
|
// Ensure that restart was called exactly once on all checks
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
for _, r := range restarters {
|
|
if n := len(r.GetRestarts()); n != 1 {
|
|
return false, fmt.Errorf("expected 1 restart but found %d", n)
|
|
}
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
require.NoError(t, err)
|
|
})
|
|
}
|