Switch ServiceClient to synchronizing state

Previously it applied a stream of operations. Reconciling state is less
complex and error prone at the cost of slightly higher CPU/memory usage.
This commit is contained in:
Michael Schurter 2017-04-07 17:10:26 -07:00
parent 244251490a
commit 87b28cfb75
4 changed files with 350 additions and 310 deletions

View File

@ -23,8 +23,12 @@ const (
// services
nomadServicePrefix = "_nomad"
// The periodic time interval for syncing services and checks with Consul
defaultSyncInterval = 6 * time.Second
// defaultRetryInterval is how quickly to retry syncing services and
// checks to Consul when an error occurs. Will backoff up to a max.
defaultRetryInterval = time.Second
// defaultMaxRetryInterval is the default max retry interval.
defaultMaxRetryInterval = 30 * time.Second
// ttlCheckBuffer is the time interval that Nomad can take to report Consul
// the check result
@ -62,6 +66,8 @@ type CatalogAPI interface {
// AgentAPI is the consul/api.Agent API used by Nomad.
type AgentAPI interface {
Services() (map[string]*api.AgentService, error)
Checks() (map[string]*api.AgentCheck, error)
CheckRegister(check *api.AgentCheckRegistration) error
CheckDeregister(checkID string) error
ServiceRegister(service *api.AgentServiceRegistration) error
@ -69,14 +75,30 @@ type AgentAPI interface {
UpdateTTL(id, output, status string) error
}
// addrParser is usually the Task.FindHostAndPortFor method for turning a
// portLabel into an address and port.
type addrParser func(portLabel string) (string, int)
// operations are submitted to the main loop via commit() for synchronizing
// with Consul.
type operations struct {
regServices []*api.AgentServiceRegistration
regChecks []*api.AgentCheckRegistration
scripts []*scriptCheck
deregServices []string
deregChecks []string
}
// ServiceClient handles task and agent service registration with Consul.
type ServiceClient struct {
client AgentAPI
logger *log.Logger
retryInterval time.Duration
client AgentAPI
logger *log.Logger
retryInterval time.Duration
maxRetryInterval time.Duration
// runningCh is closed when the main Run loop exits
runningCh chan struct{}
// exitCh is closed when the main Run loop exits
exitCh chan struct{}
// shutdownCh is closed when the client should shutdown
shutdownCh chan struct{}
@ -85,156 +107,210 @@ type ServiceClient struct {
// sync() to finish. Defaults to defaultShutdownWait
shutdownWait time.Duration
// syncCh triggers a sync in the main Run loop
syncCh chan struct{}
opCh chan *operations
// pending service and check operations
pending *consulOps
opsLock sync.Mutex
// script check cancel funcs to be called before their corresponding
// check is removed. Only accessed in sync() so not covered by regLock
services map[string]*api.AgentServiceRegistration
checks map[string]*api.AgentCheckRegistration
scripts map[string]*scriptCheck
runningScripts map[string]*scriptHandle
// regLock must be held while accessing reg and dereg maps
regLock sync.Mutex
// Registered agent services and checks
// agent services and checks record entries for the agent itself which
// should be removed on shutdown
agentServices map[string]struct{}
agentChecks map[string]struct{}
// agentLock must be held while accessing agent maps
agentLock sync.Mutex
agentLock sync.Mutex
}
// NewServiceClient creates a new Consul ServiceClient from an existing Consul API
// Client and logger.
func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient {
return &ServiceClient{
client: consulClient,
logger: logger,
retryInterval: defaultSyncInterval,
runningCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
syncCh: make(chan struct{}, 1),
pending: newConsulOps(),
runningScripts: make(map[string]*scriptHandle),
agentServices: make(map[string]struct{}, 8),
agentChecks: make(map[string]struct{}, 8),
client: consulClient,
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
exitCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
opCh: make(chan *operations, 8),
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
scripts: make(map[string]*scriptCheck),
runningScripts: make(map[string]*scriptHandle),
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
}
}
// Run the Consul main loop which retries operations against Consul. It should
// be called exactly once.
func (c *ServiceClient) Run() {
defer close(c.runningCh)
timer := time.NewTimer(0)
defer timer.Stop()
// Drain the initial tick so we don't sync until instructed
<-timer.C
lastOk := true
defer close(c.exitCh)
retryTimer := time.NewTimer(0)
<-retryTimer.C // disabled by default
failures := 0
for {
select {
case <-c.syncCh:
timer.Reset(0)
case <-timer.C:
if err := c.sync(); err != nil {
if lastOk {
lastOk = false
c.logger.Printf("[WARN] consul: failed to update services in Consul: %v", err)
}
timer.Reset(c.retryInterval)
} else {
if !lastOk {
c.logger.Printf("[INFO] consul: successfully updated services in Consul")
lastOk = true
}
}
case <-retryTimer.C:
case <-c.shutdownCh:
return
case ops := <-c.opCh:
c.merge(ops)
}
if err := c.sync(); err != nil {
if failures == 0 {
c.logger.Printf("[WARN] consul: failed to update services in Consul: %v", err)
}
failures++
if !retryTimer.Stop() {
<-retryTimer.C
}
backoff := c.retryInterval * time.Duration(failures)
if backoff > c.maxRetryInterval {
backoff = c.maxRetryInterval
}
retryTimer.Reset(backoff)
} else {
if failures > 0 {
c.logger.Printf("[INFO] consul: successfully updated services in Consul")
failures = 0
}
}
select {
case <-c.shutdownCh:
// Exit only after sync'ing all outstanding operations
if len(c.opCh) > 0 {
for len(c.opCh) > 0 {
c.merge(<-c.opCh)
}
continue
}
return
default:
}
}
}
// forceSync asynchronously causes a sync to happen. Any operations enqueued
// prior to calling forceSync will be synced.
func (c *ServiceClient) forceSync() {
// commit operations and returns false if shutdown signalled before committing.
func (c *ServiceClient) commit(ops *operations) bool {
select {
case c.syncCh <- mark:
default:
case c.opCh <- ops:
return true
case <-c.shutdownCh:
return false
}
}
// merge registrations into state map prior to sync'ing with Consul
func (c *ServiceClient) merge(ops *operations) {
for _, s := range ops.regServices {
c.services[s.ID] = s
}
for _, check := range ops.regChecks {
c.checks[check.ID] = check
}
for _, s := range ops.scripts {
c.scripts[s.id] = s
}
for _, sid := range ops.deregServices {
delete(c.services, sid)
}
for _, cid := range ops.deregChecks {
if script, ok := c.runningScripts[cid]; ok {
script.cancel()
delete(c.scripts, cid)
}
delete(c.checks, cid)
}
}
// sync enqueued operations.
func (c *ServiceClient) sync() error {
c.opsLock.Lock()
ops := c.pending
c.pending = newConsulOps()
c.opsLock.Unlock()
sreg, creg, sdereg, cdereg := 0, 0, 0, 0
var err error
msg := ops.String()
// Register Services
for id, service := range ops.regServices {
if err = c.client.ServiceRegister(service); err != nil {
goto ERROR
}
delete(ops.regServices, id)
consulServices, err := c.client.Services()
if err != nil {
return fmt.Errorf("error querying Consul services: %v", err)
}
// Register Checks
for id, check := range ops.regChecks {
if err = c.client.CheckRegister(check); err != nil {
goto ERROR
}
delete(ops.regChecks, id)
consulChecks, err := c.client.Checks()
if err != nil {
return fmt.Errorf("error querying Consul checks: %v", err)
}
// Run the script for this check if one exists
if script, ok := ops.regScripts[id]; ok {
// This check is a script check; run it
// Remove Nomad services in Consul but unknown locally
for id := range consulServices {
if _, ok := c.services[id]; ok {
// Known service, skip
continue
}
if !isNomadService(id) {
// Not managed by Nomad, skip
continue
}
// Unknown Nomad managed service; kill
if err := c.client.ServiceDeregister(id); err != nil {
return err
}
sdereg++
}
// Add Nomad services missing from Consul
for id, service := range c.services {
if _, ok := consulServices[id]; ok {
// Already in Consul; skipping
continue
}
if err = c.client.ServiceRegister(service); err != nil {
return err
}
sreg++
}
// Remove Nomad checks in Consul but unknown locally
for id, check := range consulChecks {
if _, ok := c.checks[id]; ok {
// Known check, skip
continue
}
if !isNomadService(check.ServiceID) {
// Not managed by Nomad, skip
continue
}
// Unknown Nomad managed check; kill
if err := c.client.CheckDeregister(id); err != nil {
return err
}
cdereg++
}
// Add Nomad checks missing from Consul
for id, check := range c.checks {
if _, ok := consulChecks[id]; ok {
// Already in Consul; skipping
continue
}
if err := c.client.CheckRegister(check); err != nil {
return err
}
creg++
// Handle starting scripts
if script, ok := c.scripts[id]; ok {
// If it's already running, don't run it again
if _, running := c.runningScripts[id]; running {
continue
}
// Not running, start and store the handle
c.runningScripts[id] = script.run()
}
}
// Deregister Checks
for id := range ops.deregChecks {
if h, ok := c.runningScripts[id]; ok {
// This check is a script check; stop it
h.cancel()
delete(c.runningScripts, id)
}
if err = c.client.CheckDeregister(id); err != nil {
goto ERROR
}
delete(ops.deregChecks, id)
}
// Deregister Services
for id := range ops.deregServices {
if err = c.client.ServiceDeregister(id); err != nil {
goto ERROR
}
delete(ops.deregServices, id)
}
c.logger.Printf("[DEBUG] consul: %s", msg)
c.logger.Printf("[DEBUG] consul.sync: registered %d services, %d checks; deregistered %d services, %d checks",
sreg, creg, sdereg, cdereg)
return nil
//TODO Labels and gotos are nasty; move to a function?
ERROR:
// An error occurred, repopulate the operation maps but give
// precendence to new ops
c.opsLock.Lock()
ops.merge(c.pending)
c.pending = ops
c.opsLock.Unlock()
return err
}
// RegisterAgent registers Nomad agents (client or server). Script checks are
@ -242,7 +318,7 @@ ERROR:
//
// Agents will be deregistered when Shutdown is called.
func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) error {
ops := newConsulOps()
ops := operations{}
for _, service := range services {
id := makeAgentServiceID(role, service)
@ -261,7 +337,7 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
Address: host,
Port: port,
}
ops.regServices[id] = serviceReg
ops.regServices = append(ops.regServices, serviceReg)
for _, check := range service.Checks {
checkID := createCheckID(id, check)
@ -284,32 +360,30 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
if err != nil {
return fmt.Errorf("failed to add check %q: %v", check.Name, err)
}
ops.regChecks[checkID] = checkReg
ops.regChecks = append(ops.regChecks, checkReg)
}
}
// Now add them to the registration queue
c.opsLock.Lock()
c.pending.merge(ops)
c.opsLock.Unlock()
if ok := c.commit(&ops); !ok {
// shutting down, exit
return nil
}
// Record IDs for deregistering on shutdown
c.agentLock.Lock()
for id := range ops.regServices {
c.agentServices[id] = mark
for _, id := range ops.regServices {
c.agentServices[id.ID] = mark
}
for id := range ops.regChecks {
c.agentChecks[id] = mark
for _, id := range ops.regChecks {
c.agentChecks[id.ID] = mark
}
c.agentLock.Unlock()
c.forceSync()
return nil
}
type addrParser func(portLabel string) (string, int)
// makeCheckReg adds a check reg to operations.
func (c *ServiceClient) makeCheckReg(ops *consulOps, check *structs.ServiceCheck,
func (c *ServiceClient) makeCheckReg(ops *operations, check *structs.ServiceCheck,
service *api.AgentServiceRegistration, exec ScriptExecutor, parseAddr addrParser) error {
checkID := createCheckID(service.ID, check)
@ -317,8 +391,9 @@ func (c *ServiceClient) makeCheckReg(ops *consulOps, check *structs.ServiceCheck
if exec == nil {
return fmt.Errorf("driver doesn't support script checks")
}
ops.regScripts[checkID] = newScriptCheck(
checkID, check, exec, c.client, c.logger, c.shutdownCh)
ops.scripts = append(ops.scripts, newScriptCheck(
checkID, check, exec, c.client, c.logger, c.shutdownCh))
}
host, port := service.Address, service.Port
if check.PortLabel != "" {
@ -328,13 +403,13 @@ func (c *ServiceClient) makeCheckReg(ops *consulOps, check *structs.ServiceCheck
if err != nil {
return fmt.Errorf("failed to add check %q: %v", check.Name, err)
}
ops.regChecks[checkID] = checkReg
ops.regChecks = append(ops.regChecks, checkReg)
return nil
}
// serviceRegs creates service registrations, check registrations, and script
// checks from a service.
func (c *ServiceClient) serviceRegs(ops *consulOps, allocID string, service *structs.Service,
func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *structs.Service,
exec ScriptExecutor, task *structs.Task) error {
id := makeTaskServiceID(allocID, task.Name, service)
@ -349,7 +424,7 @@ func (c *ServiceClient) serviceRegs(ops *consulOps, allocID string, service *str
// copy isn't strictly necessary but can avoid bugs especially
// with tests that may reuse Tasks
copy(serviceReg.Tags, service.Tags)
ops.regServices[id] = serviceReg
ops.regServices = append(ops.regServices, serviceReg)
for _, check := range service.Checks {
err := c.makeCheckReg(ops, check, serviceReg, exec, task.FindHostAndPortFor)
@ -365,35 +440,28 @@ func (c *ServiceClient) serviceRegs(ops *consulOps, allocID string, service *str
//
// Actual communication with Consul is done asynchrously (see Run).
func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec ScriptExecutor) error {
ops := newConsulOps()
ops := &operations{}
for _, service := range task.Services {
if err := c.serviceRegs(ops, allocID, service, exec, task); err != nil {
return err
}
}
// Now add them to the registration queue
c.opsLock.Lock()
c.pending.merge(ops)
c.opsLock.Unlock()
c.forceSync()
c.commit(ops)
return nil
}
// UpdateTask in Consul. Does not alter the service if only checks have
// changed.
func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Task, exec ScriptExecutor) error {
ops := newConsulOps()
ops := &operations{}
existingIDs := make(map[string]*structs.Service, len(existing.Services))
for _, s := range existing.Services {
existingIDs[makeTaskServiceID(allocID, existing.Name, s)] = s
c.logger.Printf("[XXX] EXISTING: %s", makeTaskServiceID(allocID, existing.Name, s))
}
newIDs := make(map[string]*structs.Service, len(newTask.Services))
for _, s := range newTask.Services {
newIDs[makeTaskServiceID(allocID, newTask.Name, s)] = s
c.logger.Printf("[XXX] UPDATED : %s", makeTaskServiceID(allocID, newTask.Name, s))
}
parseAddr := newTask.FindHostAndPortFor
@ -404,29 +472,38 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
newSvc, ok := newIDs[existingID]
if !ok {
// Existing sevice entry removed
ops.deregServices[existingID] = mark
ops.deregServices = append(ops.deregServices, existingID)
for _, check := range existingSvc.Checks {
ops.deregChecks[createCheckID(existingID, check)] = mark
ops.deregChecks = append(ops.deregChecks, createCheckID(existingID, check))
}
continue
}
// Manipulating checks is cheap and easy, so just remove old and add new
// Service exists and wasn't updated, don't add it later
delete(newIDs, existingID)
// Check to see what checks were updated
existingChecks := make(map[string]struct{}, len(existingSvc.Checks))
for _, check := range existingSvc.Checks {
ops.deregChecks[createCheckID(existingID, check)] = mark
existingChecks[createCheckID(existingID, check)] = mark
}
// Register new checks
for _, check := range newSvc.Checks {
checkID := createCheckID(existingID, check)
// Don't deregister this check if it hasn't changed
delete(ops.deregChecks, checkID)
if _, exists := existingChecks[checkID]; exists {
// Check already exists; skip it
delete(existingChecks, checkID)
continue
}
// New check, register it
if check.Type == structs.ServiceCheckScript {
if exec == nil {
return fmt.Errorf("driver doesn't support script checks")
}
ops.regScripts[checkID] = newScriptCheck(
checkID, check, exec, c.client, c.logger, c.shutdownCh)
ops.scripts = append(ops.scripts, newScriptCheck(
checkID, check, exec, c.client, c.logger, c.shutdownCh))
}
host, port := parseAddr(existingSvc.PortLabel)
if check.PortLabel != "" {
@ -436,12 +513,13 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
if err != nil {
return err
}
ops.regChecks[checkID] = checkReg
ops.regChecks = append(ops.regChecks, checkReg)
}
// Service hasn't changed and checks are updated so don't
// process this service again later
delete(newIDs, existingID)
// Remove existing checks not in updated service
for cid := range existingChecks {
ops.deregChecks = append(ops.deregChecks, cid)
}
}
// Any remaining services should just be enqueued directly
@ -452,12 +530,7 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
}
}
// Finally enqueue the updates and force sync
c.opsLock.Lock()
c.pending.merge(ops)
c.opsLock.Unlock()
c.forceSync()
c.commit(ops)
return nil
}
@ -465,23 +538,19 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
//
// Actual communication with Consul is done asynchrously (see Run).
func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) {
ops := newConsulOps()
ops := operations{}
for _, service := range task.Services {
id := makeTaskServiceID(allocID, task.Name, service)
ops.deregServices[id] = mark
ops.deregServices = append(ops.deregServices, id)
for _, check := range service.Checks {
ops.deregChecks[createCheckID(id, check)] = mark
ops.deregChecks = append(ops.deregChecks, createCheckID(id, check))
}
}
// Now add them to the deregistration fields; main Run loop will update
c.regLock.Lock()
c.pending.merge(ops)
c.regLock.Unlock()
c.forceSync()
c.commit(&ops)
}
// Shutdown the Consul client. Update running task registations and deregister
@ -516,13 +585,9 @@ func (c *ServiceClient) Shutdown() error {
}
c.agentLock.Unlock()
// Wait for Run to finish any outstanding sync() calls and exit
// Wait for Run to finish any outstanding operations and exit
select {
case <-c.runningCh:
// sync one last time to ensure all enqueued operations are applied
if err := c.sync(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
case <-c.exitCh:
case <-deadline:
// Don't wait forever though
mErr.Errors = append(mErr.Errors, fmt.Errorf("timed out waiting for Consul operations to complete"))
@ -621,3 +686,9 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host
}
return &chkReg, nil
}
// isNomadService returns true if the ID matches the pattern of a Nomad managed
// service.
func isNomadService(id string) bool {
return strings.HasPrefix(id, nomadServicePrefix)
}

View File

@ -1,61 +0,0 @@
package consul
import (
"fmt"
"github.com/hashicorp/consul/api"
)
type consulOps struct {
// services and checks to be registered
regServices map[string]*api.AgentServiceRegistration
regChecks map[string]*api.AgentCheckRegistration
// services and checks to be unregisterd
deregServices map[string]struct{}
deregChecks map[string]struct{}
// script checks to be run() after their corresponding check is
// registered
regScripts map[string]*scriptCheck
}
func newConsulOps() *consulOps {
return &consulOps{
regServices: make(map[string]*api.AgentServiceRegistration),
regChecks: make(map[string]*api.AgentCheckRegistration),
deregServices: make(map[string]struct{}),
deregChecks: make(map[string]struct{}),
regScripts: make(map[string]*scriptCheck),
}
}
// merge newer operations. New operations registrations override existing
// deregistrations.
func (c *consulOps) merge(newer *consulOps) {
for id, service := range newer.regServices {
delete(c.deregServices, id)
c.regServices[id] = service
}
for id, check := range newer.regChecks {
delete(c.deregChecks, id)
c.regChecks[id] = check
}
for id, script := range newer.regScripts {
c.regScripts[id] = script
}
for id, _ := range newer.deregServices {
delete(c.regServices, id)
c.deregServices[id] = mark
}
for id, _ := range newer.deregChecks {
delete(c.regChecks, id)
delete(c.regScripts, id)
c.deregChecks[id] = mark
}
}
func (c *consulOps) String() string {
return fmt.Sprintf("registered %d services / %d checks; deregisterd %d services / %d checks",
len(c.regServices), len(c.regChecks), len(c.deregServices), len(c.deregChecks))
}

View File

@ -27,10 +27,11 @@ func (s *scriptHandle) wait() <-chan struct{} {
}
type scriptCheck struct {
id string
check *structs.ServiceCheck
exec ScriptExecutor
agent heartbeater
id string
check *structs.ServiceCheck
exec ScriptExecutor
agent heartbeater
running bool
// lastCheckOk is true if the last check was ok; otherwise false
lastCheckOk bool
@ -132,5 +133,6 @@ func (s *scriptCheck) run() *scriptHandle {
}
}
}()
s.running = true
return &scriptHandle{cancel: cancel, done: done}
}

View File

@ -62,14 +62,24 @@ func (t *testFakeCtx) Exec(ctx context.Context, cmd string, args []string) ([]by
return t.ExecFunc(ctx, cmd, args)
}
var errNoOps = fmt.Errorf("testing error: no pending operations")
// syncOps simulates one iteration of the ServiceClient.Run loop and returns
// any errors returned by sync() or errNoOps if no pending operations.
func (t *testFakeCtx) syncOnce() error {
select {
case ops := <-t.ServiceClient.opCh:
t.ServiceClient.merge(ops)
return t.ServiceClient.sync()
default:
return errNoOps
}
}
// setupFake creates a testFakeCtx with a ServiceClient backed by a fakeConsul.
// A test Task is also provided.
func setupFake() *testFakeCtx {
fc := &fakeConsul{
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
checkTTLs: make(map[string]int),
}
fc := newFakeConsul()
return &testFakeCtx{
ServiceClient: NewServiceClient(fc, testLogger()),
FakeConsul: fc,
@ -86,6 +96,55 @@ type fakeConsul struct {
// when UpdateTTL is called the check ID will have its counter inc'd
checkTTLs map[string]int
// What check status to return from Checks()
checkStatus string
}
func newFakeConsul() *fakeConsul {
return &fakeConsul{
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
checkTTLs: make(map[string]int),
checkStatus: api.HealthPassing,
}
}
func (c *fakeConsul) Services() (map[string]*api.AgentService, error) {
c.mu.Lock()
defer c.mu.Unlock()
r := make(map[string]*api.AgentService, len(c.services))
for k, v := range c.services {
r[k] = &api.AgentService{
ID: v.ID,
Service: v.Name,
Tags: make([]string, len(v.Tags)),
Port: v.Port,
Address: v.Address,
EnableTagOverride: v.EnableTagOverride,
}
copy(r[k].Tags, v.Tags)
}
return r, nil
}
func (c *fakeConsul) Checks() (map[string]*api.AgentCheck, error) {
c.mu.Lock()
defer c.mu.Unlock()
r := make(map[string]*api.AgentCheck, len(c.checks))
for k, v := range c.checks {
r[k] = &api.AgentCheck{
CheckID: v.ID,
Name: v.Name,
Status: c.checkStatus,
Notes: v.Notes,
ServiceID: v.ServiceID,
ServiceName: c.services[v.ServiceID].Name,
}
}
return r, nil
}
func (c *fakeConsul) CheckRegister(check *api.AgentCheckRegistration) error {
@ -137,8 +196,7 @@ func TestConsul_ChangeTags(t *testing.T) {
t.Fatalf("unexpected error registering task: %v", err)
}
// Manually call sync() since Run() isn't running
if err := ctx.ServiceClient.sync(); err != nil {
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
@ -157,13 +215,13 @@ func TestConsul_ChangeTags(t *testing.T) {
}
}
// Changing a tag removes old entry before adding new one
ctx.ServiceClient.RemoveTask("allocid", ctx.Task)
origTask := ctx.Task
ctx.Task = testTask()
ctx.Task.Services[0].Tags[0] = "newtag"
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, nil); err != nil {
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, nil); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.ServiceClient.sync(); err != nil {
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
@ -193,8 +251,7 @@ func TestConsul_RegServices(t *testing.T) {
t.Fatalf("unexpected error registering task: %v", err)
}
// Manually call sync() since Run() isn't running
if err := ctx.ServiceClient.sync(); err != nil {
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
@ -232,7 +289,7 @@ func TestConsul_RegServices(t *testing.T) {
}
// Now sync() and re-check for the applied updates
if err := ctx.ServiceClient.sync(); err != nil {
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 2 {
@ -256,7 +313,7 @@ func TestConsul_RegServices(t *testing.T) {
// Remove the new task
ctx.ServiceClient.RemoveTask("allocid", ctx.Task)
if err := ctx.ServiceClient.sync(); err != nil {
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
@ -287,11 +344,7 @@ func TestConsul_ShutdownOK(t *testing.T) {
},
}
hasShutdown := make(chan struct{})
go func() {
ctx.ServiceClient.Run()
close(hasShutdown)
}()
go ctx.ServiceClient.Run()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx); err != nil {
@ -309,26 +362,11 @@ func TestConsul_ShutdownOK(t *testing.T) {
t.Fatalf("unexpected error registering agent: %v", err)
}
// Shutdown should block until all enqueued operations finish.
// Shutdown should block until scripts finish
if err := ctx.ServiceClient.Shutdown(); err != nil {
t.Errorf("unexpected error shutting down client: %v", err)
}
// assert Run() exits in a timely fashion after Shutdown() exits
select {
case <-hasShutdown:
// ok! Run() exited as expected
case <-time.After(10 * time.Second):
t.Fatalf("expected Run() to exit, but it did not")
}
// Nothing should be enqueued anymore
enqueued := (len(ctx.ServiceClient.pending.regServices) + len(ctx.ServiceClient.pending.deregServices) +
len(ctx.ServiceClient.pending.regChecks) + len(ctx.ServiceClient.pending.deregChecks))
if enqueued > 0 {
t.Errorf("%d operations still enqueued", enqueued)
}
// UpdateTTL should have been called once for the script check
if n := len(ctx.FakeConsul.checkTTLs); n != 1 {
t.Fatalf("expected 1 checkTTL entry but found: %d", n)
@ -365,7 +403,13 @@ func TestConsul_ShutdownSlow(t *testing.T) {
}
// Make Exec slow, but not too slow
waiter := make(chan struct{})
ctx.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
select {
case <-waiter:
default:
close(waiter)
}
time.Sleep(time.Second)
return []byte{}, 0, nil
}
@ -373,31 +417,22 @@ func TestConsul_ShutdownSlow(t *testing.T) {
// Make shutdown wait time just a bit longer than ctx.Exec takes
ctx.ServiceClient.shutdownWait = 3 * time.Second
hasShutdown := make(chan struct{})
go func() {
ctx.ServiceClient.Run()
close(hasShutdown)
}()
go ctx.ServiceClient.Run()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
// wait for Exec to get called before shutting down
<-waiter
// Shutdown should block until all enqueued operations finish.
preShutdown := time.Now()
if err := ctx.ServiceClient.Shutdown(); err != nil {
t.Errorf("unexpected error shutting down client: %v", err)
}
// assert Run() exits in a timely fashion after Shutdown() exits
select {
case <-hasShutdown:
// ok! Run() exited as expected
case <-time.After(10 * time.Second):
t.Fatalf("expected Run() to exit, but it did not")
}
// Shutdown time should have taken: 1s <= shutdown <= 3s
shutdownTime := time.Now().Sub(preShutdown)
if shutdownTime < time.Second || shutdownTime > ctx.ServiceClient.shutdownWait {
@ -442,8 +477,10 @@ func TestConsul_ShutdownBlocked(t *testing.T) {
block := make(chan struct{})
defer close(block) // cleanup after test
// Make Exec slow, but not too slow
// Make Exec block forever
waiter := make(chan struct{})
ctx.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
close(waiter)
<-block
return []byte{}, 0, nil
}
@ -451,17 +488,16 @@ func TestConsul_ShutdownBlocked(t *testing.T) {
// Use a short shutdown deadline since we're intentionally blocking forever
ctx.ServiceClient.shutdownWait = time.Second
hasShutdown := make(chan struct{})
go func() {
ctx.ServiceClient.Run()
close(hasShutdown)
}()
go ctx.ServiceClient.Run()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
// Wait for exec to be called
<-waiter
// Shutdown should block until all enqueued operations finish.
preShutdown := time.Now()
err := ctx.ServiceClient.Shutdown()
@ -469,18 +505,10 @@ func TestConsul_ShutdownBlocked(t *testing.T) {
t.Errorf("expected a timed out error from shutdown")
}
// assert Run() exits in a timely fashion after Shutdown() exits
maxWait := 10 * time.Second
select {
case <-hasShutdown:
// ok! Run() exited as expected
case <-time.After(maxWait):
t.Fatalf("expected Run() to exit, but it did not")
}
// Shutdown time should have taken 1s; to avoid timing related errors
// simply test for 1s <= shutdown <= 10s
// Shutdown time should have taken shutdownWait; to avoid timing
// related errors simply test for wait <= shutdown <= wait+3s
shutdownTime := time.Now().Sub(preShutdown)
maxWait := ctx.ServiceClient.shutdownWait + (3 * time.Second)
if shutdownTime < ctx.ServiceClient.shutdownWait || shutdownTime > maxWait {
t.Errorf("expected shutdown to take >%s and <%s but took: %s", ctx.ServiceClient.shutdownWait, maxWait, shutdownTime)
}