consul: periodically reconcile services/checks

Periodically sync services and checks from Nomad to Consul. This is
mostly useful when testing with the Consul dev agent which does not
persist state across restarts. However, this is a reasonable safety
measure to prevent skew between Consul's state and Nomad's
services+checks.

Also modernized the test suite a bit.
This commit is contained in:
Michael Schurter 2018-04-17 12:36:50 -07:00
parent 361ff9e140
commit cfcbb9fa21
5 changed files with 109 additions and 46 deletions

View file

@ -33,6 +33,11 @@ type MockAgent struct {
// maps of what services and checks have been registered
services map[string]*api.AgentServiceRegistration
checks map[string]*api.AgentCheckRegistration
// hits is the total number of times agent methods have been called
hits int
// mu guards above fields
mu sync.Mutex
// when UpdateTTL is called the check ID will have its counter inc'd
@ -52,6 +57,13 @@ func NewMockAgent() *MockAgent {
}
}
// getHits returns how many Consul Agent API calls have been made.
func (c *MockAgent) getHits() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.hits
}
// SetStatus that Checks() should return. Returns old status value.
func (c *MockAgent) SetStatus(s string) string {
c.mu.Lock()
@ -62,6 +74,10 @@ func (c *MockAgent) SetStatus(s string) string {
}
func (c *MockAgent) Self() (map[string]map[string]interface{}, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
s := map[string]map[string]interface{}{
"Member": {
"Addr": "127.0.0.1",
@ -85,6 +101,7 @@ func (c *MockAgent) Self() (map[string]map[string]interface{}, error) {
func (c *MockAgent) Services() (map[string]*api.AgentService, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
r := make(map[string]*api.AgentService, len(c.services))
for k, v := range c.services {
@ -105,6 +122,7 @@ func (c *MockAgent) Services() (map[string]*api.AgentService, error) {
func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
r := make(map[string]*api.AgentCheck, len(c.checks))
for k, v := range c.checks {
@ -125,7 +143,6 @@ func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) {
func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration {
c.mu.Lock()
defer c.mu.Unlock()
regs := make([]*api.AgentCheckRegistration, 0, len(c.checks))
for _, check := range c.checks {
regs = append(regs, check)
@ -136,6 +153,8 @@ func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration {
func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
c.checks[check.ID] = check
// Be nice and make checks reachable-by-service
@ -147,6 +166,7 @@ func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error {
func (c *MockAgent) CheckDeregister(checkID string) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
delete(c.checks, checkID)
delete(c.checkTTLs, checkID)
return nil
@ -155,6 +175,7 @@ func (c *MockAgent) CheckDeregister(checkID string) error {
func (c *MockAgent) ServiceRegister(service *api.AgentServiceRegistration) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
c.services[service.ID] = service
return nil
}
@ -162,6 +183,7 @@ func (c *MockAgent) ServiceRegister(service *api.AgentServiceRegistration) error
func (c *MockAgent) ServiceDeregister(serviceID string) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
delete(c.services, serviceID)
return nil
}
@ -169,6 +191,8 @@ func (c *MockAgent) ServiceDeregister(serviceID string) error {
func (c *MockAgent) UpdateTTL(id string, output string, status string) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
check, ok := c.checks[id]
if !ok {
return fmt.Errorf("unknown check id: %q", id)

View file

@ -7,6 +7,7 @@ import (
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -113,9 +114,9 @@ func (c *fakeChecksAPI) Checks() (map[string]*api.AgentCheck, error) {
// testWatcherSetup sets up a fakeChecksAPI and a real checkWatcher with a test
// logger and faster poll frequency.
func testWatcherSetup() (*fakeChecksAPI, *checkWatcher) {
func testWatcherSetup(t *testing.T) (*fakeChecksAPI, *checkWatcher) {
fakeAPI := newFakeChecksAPI()
cw := newCheckWatcher(testLogger(), fakeAPI)
cw := newCheckWatcher(testlog.Logger(t), fakeAPI)
cw.pollFreq = 10 * time.Millisecond
return fakeAPI, cw
}
@ -141,7 +142,7 @@ func TestCheckWatcher_Skip(t *testing.T) {
check := testCheck()
check.CheckRestart = nil
cw := newCheckWatcher(testLogger(), newFakeChecksAPI())
cw := newCheckWatcher(testlog.Logger(t), newFakeChecksAPI())
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check)
cw.Watch("testalloc1", "testtask1", "testcheck1", check, restarter1)
@ -155,7 +156,7 @@ func TestCheckWatcher_Skip(t *testing.T) {
func TestCheckWatcher_Healthy(t *testing.T) {
t.Parallel()
fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)
check1 := testCheck()
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
@ -190,7 +191,7 @@ func TestCheckWatcher_Healthy(t *testing.T) {
func TestCheckWatcher_HealthyWarning(t *testing.T) {
t.Parallel()
fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)
check1 := testCheck()
check1.CheckRestart.Limit = 1
@ -218,7 +219,7 @@ func TestCheckWatcher_HealthyWarning(t *testing.T) {
func TestCheckWatcher_Flapping(t *testing.T) {
t.Parallel()
fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)
check1 := testCheck()
check1.CheckRestart.Grace = 0
@ -247,7 +248,7 @@ func TestCheckWatcher_Flapping(t *testing.T) {
func TestCheckWatcher_Unwatch(t *testing.T) {
t.Parallel()
fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)
// Unwatch immediately
check1 := testCheck()
@ -276,7 +277,7 @@ func TestCheckWatcher_Unwatch(t *testing.T) {
func TestCheckWatcher_MultipleChecks(t *testing.T) {
t.Parallel()
fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)
check1 := testCheck()
check1.CheckRestart.Limit = 1

View file

@ -36,6 +36,13 @@ const (
// defaultMaxRetryInterval is the default max retry interval.
defaultMaxRetryInterval = 30 * time.Second
// defaultPeriodicalInterval is the interval at which the service
// client reconciles state between the desired services and checks and
// what's actually registered in Consul. This is done at an interval,
// rather than being purely edge triggered, to handle the case that the
// Consul agent's state may change underneath us
defaultPeriodicInterval = 30 * time.Second
// ttlCheckBuffer is the time interval that Nomad can take to report Consul
// the check result
ttlCheckBuffer = 31 * time.Second
@ -190,6 +197,7 @@ type ServiceClient struct {
logger *log.Logger
retryInterval time.Duration
maxRetryInterval time.Duration
periodicInterval time.Duration
// exitCh is closed when the main Run loop exits
exitCh chan struct{}
@ -235,6 +243,7 @@ func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
periodicInterval: defaultPeriodicInterval,
exitCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
@ -279,7 +288,6 @@ func (c *ServiceClient) Run() {
// Process operations while waiting for initial contact with Consul but
// do not sync until contact has been made.
hasOps := false
INIT:
for {
select {
@ -289,7 +297,6 @@ INIT:
case <-c.shutdownCh:
return
case ops := <-c.opCh:
hasOps = true
c.merge(ops)
}
}
@ -299,11 +306,8 @@ INIT:
// Start checkWatcher
go c.checkWatcher.Run(ctx)
// Always immediately sync to reconcile Nomad and Consul's state
retryTimer := time.NewTimer(0)
if !hasOps {
// No pending operations so don't immediately sync
<-retryTimer.C
}
failures := 0
for {
@ -345,6 +349,16 @@ INIT:
c.logger.Printf("[INFO] consul.sync: successfully updated services in Consul")
failures = 0
}
// Reset timer to periodic interval to periodically
// reconile with Consul
if !retryTimer.Stop() {
select {
case <-retryTimer.C:
default:
}
}
retryTimer.Reset(c.periodicInterval)
}
select {

View file

@ -9,6 +9,7 @@ import (
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/testtask"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -59,7 +60,7 @@ func TestConsulScript_Exec_Cancel(t *testing.T) {
exec := newBlockingScriptExec()
// pass nil for heartbeater as it shouldn't be called
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testLogger(), nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testlog.Logger(t), nil)
handle := check.run()
// wait until Exec is called
@ -111,7 +112,7 @@ func TestConsulScript_Exec_Timeout(t *testing.T) {
exec := newBlockingScriptExec()
hb := newFakeHeartbeater()
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), nil)
handle := check.run()
defer handle.cancel() // just-in-case cleanup
<-exec.running
@ -160,7 +161,7 @@ func TestConsulScript_Exec_TimeoutCritical(t *testing.T) {
Timeout: time.Nanosecond,
}
hb := newFakeHeartbeater()
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testLogger(), nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testlog.Logger(t), nil)
handle := check.run()
defer handle.cancel() // just-in-case cleanup
@ -205,7 +206,7 @@ func TestConsulScript_Exec_Shutdown(t *testing.T) {
hb := newFakeHeartbeater()
shutdown := make(chan struct{})
exec := newSimpleExec(0, nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), shutdown)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), shutdown)
handle := check.run()
defer handle.cancel() // just-in-case cleanup
@ -242,7 +243,7 @@ func TestConsulScript_Exec_Codes(t *testing.T) {
hb := newFakeHeartbeater()
shutdown := make(chan struct{})
exec := newSimpleExec(code, err)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), shutdown)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), shutdown)
handle := check.run()
defer handle.cancel()

View file

@ -3,9 +3,6 @@ package consul
import (
"context"
"fmt"
"io/ioutil"
"log"
"os"
"reflect"
"strings"
"sync/atomic"
@ -14,6 +11,7 @@ import (
"github.com/hashicorp/consul/api"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
@ -27,13 +25,6 @@ const (
yPort = 1235
)
func testLogger() *log.Logger {
if testing.Verbose() {
return log.New(os.Stderr, "", log.LstdFlags)
}
return log.New(ioutil.Discard, "", 0)
}
func testTask() *structs.Task {
return &structs.Task{
Name: "taskname",
@ -112,10 +103,10 @@ func (t *testFakeCtx) syncOnce() error {
// setupFake creates a testFakeCtx with a ServiceClient backed by a fakeConsul.
// A test Task is also provided.
func setupFake() *testFakeCtx {
func setupFake(t *testing.T) *testFakeCtx {
fc := NewMockAgent()
return &testFakeCtx{
ServiceClient: NewServiceClient(fc, testLogger()),
ServiceClient: NewServiceClient(fc, testlog.Logger(t)),
FakeConsul: fc,
Task: testTask(),
Restarter: &restartRecorder{},
@ -124,7 +115,7 @@ func setupFake() *testFakeCtx {
}
func TestConsul_ChangeTags(t *testing.T) {
ctx := setupFake()
ctx := setupFake(t)
allocID := "allocid"
if err := ctx.ServiceClient.RegisterTask(allocID, ctx.Task, ctx.Restarter, nil, nil); err != nil {
@ -225,7 +216,7 @@ func TestConsul_ChangeTags(t *testing.T) {
// it in Consul. Pre-0.7.1 ports were not part of the service ID and this was a
// slightly different code path than changing tags.
func TestConsul_ChangePorts(t *testing.T) {
ctx := setupFake()
ctx := setupFake(t)
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
@ -406,7 +397,7 @@ func TestConsul_ChangePorts(t *testing.T) {
// TestConsul_ChangeChecks asserts that updating only the checks on a service
// properly syncs with Consul.
func TestConsul_ChangeChecks(t *testing.T) {
ctx := setupFake()
ctx := setupFake(t)
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
@ -641,7 +632,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
// TestConsul_RegServices tests basic service registration.
func TestConsul_RegServices(t *testing.T) {
ctx := setupFake()
ctx := setupFake(t)
// Add a check w/restarting
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
@ -778,7 +769,7 @@ func TestConsul_RegServices(t *testing.T) {
// ServiceClient.
func TestConsul_ShutdownOK(t *testing.T) {
require := require.New(t)
ctx := setupFake()
ctx := setupFake(t)
// Add a script check to make sure its TTL gets updated
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
@ -840,8 +831,8 @@ func TestConsul_ShutdownOK(t *testing.T) {
// TestConsul_ShutdownSlow tests the slow but ok path for the shutdown logic in
// ServiceClient.
func TestConsul_ShutdownSlow(t *testing.T) {
t.Parallel() // run the slow tests in parallel
ctx := setupFake()
t.Parallel()
ctx := setupFake(t)
// Add a script check to make sure its TTL gets updated
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
@ -912,8 +903,8 @@ func TestConsul_ShutdownSlow(t *testing.T) {
// TestConsul_ShutdownBlocked tests the blocked past deadline path for the
// shutdown logic in ServiceClient.
func TestConsul_ShutdownBlocked(t *testing.T) {
t.Parallel() // run the slow tests in parallel
ctx := setupFake()
t.Parallel()
ctx := setupFake(t)
// Add a script check to make sure its TTL gets updated
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
@ -981,7 +972,7 @@ func TestConsul_ShutdownBlocked(t *testing.T) {
// TestConsul_RemoveScript assert removing a script check removes all objects
// related to that check.
func TestConsul_CancelScript(t *testing.T) {
ctx := setupFake()
ctx := setupFake(t)
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "scriptcheckDel",
@ -1069,7 +1060,8 @@ func TestConsul_CancelScript(t *testing.T) {
// auto-use set then services should advertise it unless explicitly set to
// host. Checks should always use host.
func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
ctx := setupFake()
t.Parallel()
ctx := setupFake(t)
ctx.Task.Services = []*structs.Service{
{
@ -1195,7 +1187,8 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
// set auto-use only services which request the driver's network should
// advertise it.
func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
ctx := setupFake()
t.Parallel()
ctx := setupFake(t)
ctx.Task.Services = []*structs.Service{
{
@ -1268,7 +1261,8 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
// TestConsul_DriverNetwork_Change asserts that if a driver network is
// specified and a service updates its use its properly updated in Consul.
func TestConsul_DriverNetwork_Change(t *testing.T) {
ctx := setupFake()
t.Parallel()
ctx := setupFake(t)
ctx.Task.Services = []*structs.Service{
{
@ -1337,9 +1331,38 @@ func TestConsul_DriverNetwork_Change(t *testing.T) {
syncAndAssertPort(net.PortMap["x"])
}
// TestConsul_PeriodicSync asserts that Nomad periodically reconciles with
// Consul.
func TestConsul_PeriodicSync(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
defer ctx.ServiceClient.Shutdown()
// Lower periodic sync interval to speed up test
ctx.ServiceClient.periodicInterval = 2 * time.Millisecond
// Run for 10ms and assert hits >= 5 because each sync() calls multiple
// Consul APIs
go ctx.ServiceClient.Run()
select {
case <-ctx.ServiceClient.exitCh:
t.Fatalf("exited unexpectedly")
case <-time.After(10 * time.Millisecond):
}
minHits := 5
if hits := ctx.FakeConsul.getHits(); hits < minHits {
t.Fatalf("expected at least %d hits but found %d", minHits, hits)
}
}
// TestIsNomadService asserts the isNomadService helper returns true for Nomad
// task IDs and false for unknown IDs and Nomad agent IDs (see #2827).
func TestIsNomadService(t *testing.T) {
t.Parallel()
tests := []struct {
id string
result bool