agent/checks: use local state for local services

This commit is contained in:
Mitchell Hashimoto 2018-06-30 07:25:57 -07:00
parent 3177d1719d
commit 175e74972d
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
2 changed files with 110 additions and 38 deletions

View File

@ -28,13 +28,24 @@ type CheckAlias struct {
CheckID types.CheckID // ID of this check
RPC RPC // Used to query remote server if necessary
RPCReq structs.NodeSpecificRequest // Base request
Notify CheckNotifier // For updating the check state
Notify AliasNotifier // For updating the check state
stop bool
stopCh chan struct{}
stopLock sync.Mutex
}
// AliasNotifier is a CheckNotifier specifically for the Alias check.
// This requires additional methods that are satisfied by the agent
// local state.
type AliasNotifier interface {
CheckNotifier
AddAliasCheck(types.CheckID, string, chan<- struct{}) error
RemoveAliasCheck(types.CheckID, string)
Checks() map[types.CheckID]*structs.HealthCheck
}
// Start is used to start a check ttl, runs until Stop() func (c *CheckAlias) Start() {
func (c *CheckAlias) Start() {
c.stopLock.Lock()
@ -56,6 +67,41 @@ func (c *CheckAlias) Stop() {
// run is invoked in a goroutine until Stop() is called.
func (c *CheckAlias) run(stopCh chan struct{}) {
// If we have a specific node set, then use a blocking query
if c.Node != "" {
c.runQuery(stopCh)
return
}
// Use the local state to match the service.
c.runLocal(stopCh)
}
func (c *CheckAlias) runLocal(stopCh chan struct{}) {
// Very important this is buffered as 1 so that we do not lose any
// queued updates. This only has to be exactly 1 since the existence
// of any update triggers us to load the full health check state.
notifyCh := make(chan struct{}, 1)
c.Notify.AddAliasCheck(c.CheckID, c.ServiceID, notifyCh)
defer c.Notify.RemoveAliasCheck(c.CheckID, c.ServiceID)
for {
select {
case <-notifyCh:
checks := c.Notify.Checks()
checksList := make([]*structs.HealthCheck, 0, len(checks))
for _, chk := range checks {
checksList = append(checksList, chk)
}
c.processChecks(checksList)
case <-stopCh:
return
}
}
}
func (c *CheckAlias) runQuery(stopCh chan struct{}) {
args := c.RPCReq
args.Node = c.Node
args.AllowStale = true
@ -107,36 +153,41 @@ func (c *CheckAlias) run(stopCh chan struct{}) {
args.MinQueryIndex = 1
}
health := api.HealthPassing
msg := "All checks passing."
if len(out.HealthChecks) == 0 {
// No health checks means we're healthy by default
msg = "No checks found."
}
for _, chk := range out.HealthChecks {
if chk.Node != c.Node {
continue
}
// We allow ServiceID == "" so that we also check node checks
if chk.ServiceID != "" && chk.ServiceID != c.ServiceID {
continue
}
if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning {
health = chk.Status
msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output)
// Critical checks exit the for loop immediately since we
// know that this is the health state. Warnings do not since
// there may still be a critical check.
if chk.Status == api.HealthCritical {
break
}
}
}
// Update our check value
c.Notify.UpdateCheck(c.CheckID, health, msg)
c.processChecks(out.HealthChecks)
}
}
// processChecks is a common helper for taking a set of health checks and
// using them to update our alias. This is abstracted since the checks can
// come from both the remote server as well as local state.
func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) {
health := api.HealthPassing
msg := "No checks found."
for _, chk := range checks {
if c.Node != "" && chk.Node != c.Node {
continue
}
// We allow ServiceID == "" so that we also check node checks
if chk.ServiceID != "" && chk.ServiceID != c.ServiceID {
continue
}
if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning {
health = chk.Status
msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output)
// Critical checks exit the for loop immediately since we
// know that this is the health state. Warnings do not since
// there may still be a critical check.
if chk.Status == api.HealthCritical {
break
}
}
msg = "All checks passing."
}
// Update our check value
c.Notify.UpdateCheck(c.CheckID, health, msg)
}

View File

@ -19,7 +19,7 @@ import (
func TestCheckAlias_remoteErrBackoff(t *testing.T) {
t.Parallel()
notify := mock.NewNotify()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
@ -45,7 +45,7 @@ func TestCheckAlias_remoteErrBackoff(t *testing.T) {
func TestCheckAlias_remoteNoChecks(t *testing.T) {
t.Parallel()
notify := mock.NewNotify()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
@ -71,7 +71,7 @@ func TestCheckAlias_remoteNoChecks(t *testing.T) {
func TestCheckAlias_remoteNodeFailure(t *testing.T) {
t.Parallel()
notify := mock.NewNotify()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
@ -120,7 +120,7 @@ func TestCheckAlias_remoteNodeFailure(t *testing.T) {
func TestCheckAlias_remotePassing(t *testing.T) {
t.Parallel()
notify := mock.NewNotify()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
@ -169,7 +169,7 @@ func TestCheckAlias_remotePassing(t *testing.T) {
func TestCheckAlias_remoteCritical(t *testing.T) {
t.Parallel()
notify := mock.NewNotify()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
@ -224,7 +224,7 @@ func TestCheckAlias_remoteCritical(t *testing.T) {
func TestCheckAlias_remoteWarning(t *testing.T) {
t.Parallel()
notify := mock.NewNotify()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
@ -275,6 +275,27 @@ func TestCheckAlias_remoteWarning(t *testing.T) {
})
}
type mockAliasNotify struct {
*mock.Notify
}
func newMockAliasNotify() *mockAliasNotify {
return &mockAliasNotify{
Notify: mock.NewNotify(),
}
}
func (m *mockAliasNotify) AddAliasCheck(chkID types.CheckID, serviceID string, ch chan<- struct{}) error {
return nil
}
func (m *mockAliasNotify) RemoveAliasCheck(chkID types.CheckID, serviceID string) {
}
func (m *mockAliasNotify) Checks() map[types.CheckID]*structs.HealthCheck {
return nil
}
// mockRPC is an implementation of RPC that can be used for tests. The
// atomic.Value fields can be set concurrently and will reflect on the next
// RPC call.