Running script checks periodically

This commit is contained in:
Diptanu Choudhury 2016-03-24 13:05:08 -07:00
parent 46bd7323f4
commit d6588013f8
4 changed files with 176 additions and 123 deletions

View File

@ -1,6 +1,10 @@
package consul
import (
"container/heap"
"fmt"
"time"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
)
@ -8,3 +12,120 @@ type Check interface {
Run() *cstructs.CheckResult
ID() string
}
type consulCheck struct {
check Check
next time.Time
index int
}
type checkHeap struct {
index map[string]*consulCheck
heap checksHeapImp
}
func NewConsulChecksHeap() *checkHeap {
return &checkHeap{
index: make(map[string]*consulCheck),
heap: make(checksHeapImp, 0),
}
}
func (c *checkHeap) Push(check Check, next time.Time) error {
if _, ok := c.index[check.ID()]; ok {
return fmt.Errorf("check %v already exists", check.ID())
}
cCheck := &consulCheck{check, next, 0}
c.index[check.ID()] = cCheck
heap.Push(&c.heap, cCheck)
return nil
}
func (c *checkHeap) Pop() *consulCheck {
if len(c.heap) == 0 {
return nil
}
cCheck := heap.Pop(&c.heap).(*consulCheck)
delete(c.index, cCheck.check.ID())
return cCheck
}
func (c *checkHeap) Peek() *consulCheck {
if len(c.heap) == 0 {
return nil
}
return c.heap[0]
}
func (c *checkHeap) Contains(check Check) bool {
_, ok := c.index[check.ID()]
return ok
}
func (c *checkHeap) Update(check Check, next time.Time) error {
if cCheck, ok := c.index[check.ID()]; ok {
cCheck.check = check
cCheck.next = next
heap.Fix(&c.heap, cCheck.index)
return nil
}
return fmt.Errorf("heap doesn't contain check %v", check.ID())
}
func (c *checkHeap) Remove(id string) error {
if cCheck, ok := c.index[id]; ok {
heap.Remove(&c.heap, cCheck.index)
delete(c.index, id)
return nil
}
return fmt.Errorf("heap doesn't contain check %v", id)
}
func (c *checkHeap) Len() int { return len(c.heap) }
type checksHeapImp []*consulCheck
func (h checksHeapImp) Len() int { return len(h) }
func (h checksHeapImp) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
// Sort such that zero times are at the end of the list.
iZero, jZero := h[i].next.IsZero(), h[j].next.IsZero()
if iZero && jZero {
return false
} else if iZero {
return false
} else if jZero {
return true
}
return h[i].next.Before(h[j].next)
}
func (h checksHeapImp) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *checksHeapImp) Push(x interface{}) {
n := len(*h)
check := x.(*consulCheck)
check.index = n
*h = append(*h, check)
}
func (h *checksHeapImp) Pop() interface{} {
old := *h
n := len(old)
check := old[n-1]
check.index = -1 // for safety
*h = old[0 : n-1]
return check
}

View File

@ -24,13 +24,15 @@ type ConsulService struct {
task *structs.Task
allocID string
delegateChecks map[string]struct{}
createCheck func(structs.ServiceCheck, string) (Check, error)
createCheck func(*structs.ServiceCheck, string) (Check, error)
trackedServices map[string]*consul.AgentService
trackedChecks map[string]*structs.ServiceCheck
execChecks *checkHeap
logger *log.Logger
updateCh chan struct{}
shutdownCh chan struct{}
shutdown bool
shutdownLock sync.Mutex
@ -95,13 +97,15 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string)
logger: logger,
trackedServices: make(map[string]*consul.AgentService),
trackedChecks: make(map[string]*structs.ServiceCheck),
execChecks: NewConsulChecksHeap(),
shutdownCh: make(chan struct{}),
updateCh: make(chan struct{}),
}
return &consulService, nil
}
func (c *ConsulService) SetDelegatedChecks(delegateChecks map[string]struct{}, createCheck func(structs.ServiceCheck, string) (Check, error)) *ConsulService {
func (c *ConsulService) SetDelegatedChecks(delegateChecks map[string]struct{}, createCheck func(*structs.ServiceCheck, string) (Check, error)) *ConsulService {
c.delegateChecks = delegateChecks
c.createCheck = createCheck
return c
@ -236,7 +240,20 @@ func (c *ConsulService) registerCheck(check *structs.ServiceCheck, service *cons
case structs.ServiceCheckTCP:
chkReg.TCP = fmt.Sprintf("%s:%d", service.Address, service.Port)
case structs.ServiceCheckScript:
chkReg.TTL = check.Interval.String()
chkReg.TTL = (check.Interval + 30*time.Second).String()
}
if _, ok := c.delegateChecks[check.Type]; !ok {
chk, err := c.createCheck(check, chkReg.ID)
if err != nil {
return err
}
if err := c.execChecks.Push(chk, time.Now().Add(check.Interval)); err != nil {
c.logger.Printf("[ERR] consulservice: unable to add check %q to heap", chk.ID())
}
select {
case c.updateCh <- struct{}{}:
default:
}
}
return c.client.Agent().CheckRegister(&chkReg)
}
@ -280,20 +297,31 @@ func (c *ConsulService) deregisterService(ID string) error {
// deregisterCheck de-registers a check with a given ID from Consul.
func (c *ConsulService) deregisterCheck(ID string) error {
if err := c.execChecks.Remove(ID); err != nil {
c.logger.Printf("[DEBUG] consulservice: unable to remove check with ID %q from heap", ID)
}
return c.client.Agent().CheckDeregister(ID)
}
// PeriodicSync triggers periodic syncing of services and checks with Consul.
// This is a long lived go-routine which is stopped during shutdown
func (c *ConsulService) PeriodicSync() {
var runCheck <-chan time.Time
sync := time.After(syncInterval)
for {
runCheck = c.sleepBeforeRunningCheck()
select {
case <-sync:
if err := c.performSync(); err != nil {
c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.task.Name, err)
}
sync = time.After(syncInterval)
case <-c.updateCh:
continue
case <-runCheck:
chk := c.execChecks.heap.Pop().(consulCheck)
runCheck = c.sleepBeforeRunningCheck()
c.runCheck(chk.check)
case <-c.shutdownCh:
c.logger.Printf("[INFO] consul: shutting down sync for task %q", c.task.Name)
return
@ -301,6 +329,13 @@ func (c *ConsulService) PeriodicSync() {
}
}
func (c *ConsulService) sleepBeforeRunningCheck() <-chan time.Time {
if c := c.execChecks.Peek(); c != nil {
return time.After(time.Now().Sub(c.next))
}
return nil
}
// performSync sync the services and checks we are tracking with Consul.
func (c *ConsulService) performSync() error {
var mErr multierror.Error
@ -357,7 +392,23 @@ func (c *ConsulService) filterConsulChecks(chks map[string]*consul.AgentCheck) m
return nomadChecks
}
// consulPresent indicates whether the consul agent is responding
func (c *ConsulService) consulPresent() bool {
_, err := c.client.Agent().Self()
return err == nil
}
// runCheck runs a check and updates the corresponding ttl check in consul
func (c *ConsulService) runCheck(check Check) {
res := check.Run()
if res.Err != nil {
c.client.Agent().UpdateTTL(check.ID(), res.Output, consul.HealthCritical)
}
if res.ExitCode == 0 {
c.client.Agent().UpdateTTL(check.ID(), res.Output, consul.HealthPassing)
}
if res.ExitCode == 1 {
c.client.Agent().UpdateTTL(check.ID(), res.Output, consul.HealthWarning)
}
c.client.Agent().UpdateTTL(check.ID(), res.Output, consul.HealthCritical)
}

View File

@ -1,7 +1,6 @@
package executor
import (
"container/heap"
"fmt"
"log"
"os/exec"
@ -11,7 +10,6 @@ import (
"github.com/armon/circbuf"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/nomad/client/consul"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
)
@ -114,120 +112,3 @@ func (e *ExecScriptCheck) Run() *cstructs.CheckResult {
func (e *ExecScriptCheck) ID() string {
return e.id
}
type consulCheck struct {
check consul.Check
next time.Time
index int
}
type checkHeap struct {
index map[string]*consulCheck
heap checksHeapImp
}
func NewConsulChecksHeap() *checkHeap {
return &checkHeap{
index: make(map[string]*consulCheck),
heap: make(checksHeapImp, 0),
}
}
func (c *checkHeap) Push(check consul.Check, next time.Time) error {
if _, ok := c.index[check.ID()]; ok {
return fmt.Errorf("check %v already exists", check.ID())
}
cCheck := &consulCheck{check, next, 0}
c.index[check.ID()] = cCheck
heap.Push(&c.heap, cCheck)
return nil
}
func (c *checkHeap) Pop() *consulCheck {
if len(c.heap) == 0 {
return nil
}
cCheck := heap.Pop(&c.heap).(*consulCheck)
delete(c.index, cCheck.check.ID())
return cCheck
}
func (c *checkHeap) Peek() *consulCheck {
if len(c.heap) == 0 {
return nil
}
return c.heap[0]
}
func (c *checkHeap) Contains(check consul.Check) bool {
_, ok := c.index[check.ID()]
return ok
}
func (c *checkHeap) Update(check consul.Check, next time.Time) error {
if cCheck, ok := c.index[check.ID()]; ok {
cCheck.check = check
cCheck.next = next
heap.Fix(&c.heap, cCheck.index)
return nil
}
return fmt.Errorf("heap doesn't contain check %v", check.ID())
}
func (c *checkHeap) Remove(check consul.Check) error {
if cCheck, ok := c.index[check.ID()]; ok {
heap.Remove(&c.heap, cCheck.index)
delete(c.index, check.ID())
return nil
}
return fmt.Errorf("heap doesn't contain check %v", check.ID())
}
func (c *checkHeap) Len() int { return len(c.heap) }
type checksHeapImp []*consulCheck
func (h checksHeapImp) Len() int { return len(h) }
func (h checksHeapImp) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
// Sort such that zero times are at the end of the list.
iZero, jZero := h[i].next.IsZero(), h[j].next.IsZero()
if iZero && jZero {
return false
} else if iZero {
return false
} else if jZero {
return true
}
return h[i].next.Before(h[j].next)
}
func (h checksHeapImp) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *checksHeapImp) Push(x interface{}) {
n := len(*h)
check := x.(*consulCheck)
check.index = n
*h = append(*h, check)
}
func (h *checksHeapImp) Pop() interface{} {
old := *h
n := len(old)
check := old[n-1]
check.index = -1 // for safety
*h = old[0 : n-1]
return check
}

View File

@ -493,7 +493,7 @@ func (e *UniversalExecutor) createCheckMap() map[string]struct{} {
return checks
}
func (e *UniversalExecutor) createCheck(check structs.ServiceCheck, checkID string) (consul.Check, error) {
func (e *UniversalExecutor) createCheck(check *structs.ServiceCheck, checkID string) (consul.Check, error) {
if check.Type == structs.ServiceCheckScript && e.ctx.Driver == "docker" {
return &DockerScriptCheck{
id: checkID,