From 175e74972daeb67c655ae481c94d042f12d3d057 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sat, 30 Jun 2018 07:25:57 -0700 Subject: [PATCH] agent/checks: use local state for local services --- agent/checks/alias.go | 115 ++++++++++++++++++++++++++----------- agent/checks/alias_test.go | 33 +++++++++-- 2 files changed, 110 insertions(+), 38 deletions(-) diff --git a/agent/checks/alias.go b/agent/checks/alias.go index 1b02842d9..72963adb5 100644 --- a/agent/checks/alias.go +++ b/agent/checks/alias.go @@ -28,13 +28,24 @@ type CheckAlias struct { CheckID types.CheckID // ID of this check RPC RPC // Used to query remote server if necessary RPCReq structs.NodeSpecificRequest // Base request - Notify CheckNotifier // For updating the check state + Notify AliasNotifier // For updating the check state stop bool stopCh chan struct{} stopLock sync.Mutex } +// AliasNotifier is a CheckNotifier specifically for the Alias check. +// This requires additional methods that are satisfied by the agent +// local state. +type AliasNotifier interface { + CheckNotifier + + AddAliasCheck(types.CheckID, string, chan<- struct{}) error + RemoveAliasCheck(types.CheckID, string) + Checks() map[types.CheckID]*structs.HealthCheck +} + // Start is used to start a check ttl, runs until Stop() func (c *CheckAlias) Start() { func (c *CheckAlias) Start() { c.stopLock.Lock() @@ -56,6 +67,41 @@ func (c *CheckAlias) Stop() { // run is invoked in a goroutine until Stop() is called. func (c *CheckAlias) run(stopCh chan struct{}) { + // If we have a specific node set, then use a blocking query + if c.Node != "" { + c.runQuery(stopCh) + return + } + + // Use the local state to match the service. + c.runLocal(stopCh) +} + +func (c *CheckAlias) runLocal(stopCh chan struct{}) { + // Very important this is buffered as 1 so that we do not lose any + // queued updates. This only has to be exactly 1 since the existence + // of any update triggers us to load the full health check state. + notifyCh := make(chan struct{}, 1) + c.Notify.AddAliasCheck(c.CheckID, c.ServiceID, notifyCh) + defer c.Notify.RemoveAliasCheck(c.CheckID, c.ServiceID) + + for { + select { + case <-notifyCh: + checks := c.Notify.Checks() + checksList := make([]*structs.HealthCheck, 0, len(checks)) + for _, chk := range checks { + checksList = append(checksList, chk) + } + c.processChecks(checksList) + + case <-stopCh: + return + } + } +} + +func (c *CheckAlias) runQuery(stopCh chan struct{}) { args := c.RPCReq args.Node = c.Node args.AllowStale = true @@ -107,36 +153,41 @@ func (c *CheckAlias) run(stopCh chan struct{}) { args.MinQueryIndex = 1 } - health := api.HealthPassing - msg := "All checks passing." - if len(out.HealthChecks) == 0 { - // No health checks means we're healthy by default - msg = "No checks found." - } - for _, chk := range out.HealthChecks { - if chk.Node != c.Node { - continue - } - - // We allow ServiceID == "" so that we also check node checks - if chk.ServiceID != "" && chk.ServiceID != c.ServiceID { - continue - } - - if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning { - health = chk.Status - msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output) - - // Critical checks exit the for loop immediately since we - // know that this is the health state. Warnings do not since - // there may still be a critical check. - if chk.Status == api.HealthCritical { - break - } - } - } - - // Update our check value - c.Notify.UpdateCheck(c.CheckID, health, msg) + c.processChecks(out.HealthChecks) } } + +// processChecks is a common helper for taking a set of health checks and +// using them to update our alias. This is abstracted since the checks can +// come from both the remote server as well as local state. +func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) { + health := api.HealthPassing + msg := "No checks found." + for _, chk := range checks { + if c.Node != "" && chk.Node != c.Node { + continue + } + + // We allow ServiceID == "" so that we also check node checks + if chk.ServiceID != "" && chk.ServiceID != c.ServiceID { + continue + } + + if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning { + health = chk.Status + msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output) + + // Critical checks exit the for loop immediately since we + // know that this is the health state. Warnings do not since + // there may still be a critical check. + if chk.Status == api.HealthCritical { + break + } + } + + msg = "All checks passing." + } + + // Update our check value + c.Notify.UpdateCheck(c.CheckID, health, msg) +} diff --git a/agent/checks/alias_test.go b/agent/checks/alias_test.go index 431cf116a..e063451b3 100644 --- a/agent/checks/alias_test.go +++ b/agent/checks/alias_test.go @@ -19,7 +19,7 @@ import ( func TestCheckAlias_remoteErrBackoff(t *testing.T) { t.Parallel() - notify := mock.NewNotify() + notify := newMockAliasNotify() chkID := types.CheckID("foo") rpc := &mockRPC{} chk := &CheckAlias{ @@ -45,7 +45,7 @@ func TestCheckAlias_remoteErrBackoff(t *testing.T) { func TestCheckAlias_remoteNoChecks(t *testing.T) { t.Parallel() - notify := mock.NewNotify() + notify := newMockAliasNotify() chkID := types.CheckID("foo") rpc := &mockRPC{} chk := &CheckAlias{ @@ -71,7 +71,7 @@ func TestCheckAlias_remoteNoChecks(t *testing.T) { func TestCheckAlias_remoteNodeFailure(t *testing.T) { t.Parallel() - notify := mock.NewNotify() + notify := newMockAliasNotify() chkID := types.CheckID("foo") rpc := &mockRPC{} chk := &CheckAlias{ @@ -120,7 +120,7 @@ func TestCheckAlias_remoteNodeFailure(t *testing.T) { func TestCheckAlias_remotePassing(t *testing.T) { t.Parallel() - notify := mock.NewNotify() + notify := newMockAliasNotify() chkID := types.CheckID("foo") rpc := &mockRPC{} chk := &CheckAlias{ @@ -169,7 +169,7 @@ func TestCheckAlias_remotePassing(t *testing.T) { func TestCheckAlias_remoteCritical(t *testing.T) { t.Parallel() - notify := mock.NewNotify() + notify := newMockAliasNotify() chkID := types.CheckID("foo") rpc := &mockRPC{} chk := &CheckAlias{ @@ -224,7 +224,7 @@ func TestCheckAlias_remoteCritical(t *testing.T) { func TestCheckAlias_remoteWarning(t *testing.T) { t.Parallel() - notify := mock.NewNotify() + notify := newMockAliasNotify() chkID := types.CheckID("foo") rpc := &mockRPC{} chk := &CheckAlias{ @@ -275,6 +275,27 @@ func TestCheckAlias_remoteWarning(t *testing.T) { }) } +type mockAliasNotify struct { + *mock.Notify +} + +func newMockAliasNotify() *mockAliasNotify { + return &mockAliasNotify{ + Notify: mock.NewNotify(), + } +} + +func (m *mockAliasNotify) AddAliasCheck(chkID types.CheckID, serviceID string, ch chan<- struct{}) error { + return nil +} + +func (m *mockAliasNotify) RemoveAliasCheck(chkID types.CheckID, serviceID string) { +} + +func (m *mockAliasNotify) Checks() map[types.CheckID]*structs.HealthCheck { + return nil +} + // mockRPC is an implementation of RPC that can be used for tests. The // atomic.Value fields can be set concurrently and will reflect on the next // RPC call.