Update alias checks on local add and remove
This commit is contained in:
parent
484e0efaf2
commit
1538a738f2
|
@ -548,6 +548,9 @@ func (l *State) removeCheckLocked(id types.CheckID) error {
|
||||||
return fmt.Errorf("Check %q does not exist", id)
|
return fmt.Errorf("Check %q does not exist", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If this is a check for an aliased service, then notify the waiters.
|
||||||
|
l.notifyIfAliased(c.Check.ServiceID)
|
||||||
|
|
||||||
// To remove the check on the server we need the token.
|
// To remove the check on the server we need the token.
|
||||||
// Therefore, we mark the service as deleted and keep the
|
// Therefore, we mark the service as deleted and keep the
|
||||||
// entry around until it is actually removed.
|
// entry around until it is actually removed.
|
||||||
|
@ -616,18 +619,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this is a check for an aliased service, then notify the waiters.
|
// If this is a check for an aliased service, then notify the waiters.
|
||||||
if aliases, ok := l.checkAliases[c.Check.ServiceID]; ok && len(aliases) > 0 {
|
l.notifyIfAliased(c.Check.ServiceID)
|
||||||
for _, notifyCh := range aliases {
|
|
||||||
// Do not block. All notify channels should be buffered to at
|
|
||||||
// least 1 in which case not-blocking does not result in loss
|
|
||||||
// of data because a failed send means a notification is
|
|
||||||
// already queued. This must be called with the lock held.
|
|
||||||
select {
|
|
||||||
case notifyCh <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update status and mark out of sync
|
// Update status and mark out of sync
|
||||||
c.Check.Status = status
|
c.Check.Status = status
|
||||||
|
@ -685,6 +677,10 @@ func (l *State) SetCheckState(c *CheckState) {
|
||||||
|
|
||||||
func (l *State) setCheckStateLocked(c *CheckState) {
|
func (l *State) setCheckStateLocked(c *CheckState) {
|
||||||
l.checks[c.Check.CheckID] = c
|
l.checks[c.Check.CheckID] = c
|
||||||
|
|
||||||
|
// If this is a check for an aliased service, then notify the waiters.
|
||||||
|
l.notifyIfAliased(c.Check.ServiceID)
|
||||||
|
|
||||||
l.TriggerSyncChanges()
|
l.TriggerSyncChanges()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1474,3 +1470,19 @@ func (l *State) syncNodeInfo() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// notifyIfAliased will notify waiters if this is a check for an aliased service
|
||||||
|
func (l *State) notifyIfAliased(serviceID string) {
|
||||||
|
if aliases, ok := l.checkAliases[serviceID]; ok && len(aliases) > 0 {
|
||||||
|
for _, notifyCh := range aliases {
|
||||||
|
// Do not block. All notify channels should be buffered to at
|
||||||
|
// least 1 in which case not-blocking does not result in loss
|
||||||
|
// of data because a failed send means a notification is
|
||||||
|
// already queued. This must be called with the lock held.
|
||||||
|
select {
|
||||||
|
case notifyCh <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -2211,6 +2211,90 @@ func TestStateProxyRestore(t *testing.T) {
|
||||||
assert.Equal(p1.ProxyService.Port, p2.ProxyService.Port)
|
assert.Equal(p1.ProxyService.Port, p2.ProxyService.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test that alias check is updated after AddCheck, UpdateCheck, and RemoveCheck for the same service id
|
||||||
|
func TestAliasNotifications_local(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
a := agent.NewTestAgent(t, t.Name(), "")
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register service with a failing TCP check
|
||||||
|
svcID := "socat"
|
||||||
|
srv := &structs.NodeService{
|
||||||
|
ID: svcID,
|
||||||
|
Service: "echo",
|
||||||
|
Tags: []string{},
|
||||||
|
Address: "127.0.0.10",
|
||||||
|
Port: 8080,
|
||||||
|
}
|
||||||
|
a.State.AddService(srv, "")
|
||||||
|
|
||||||
|
scID := "socat-sidecar-proxy"
|
||||||
|
sc := &structs.NodeService{
|
||||||
|
ID: scID,
|
||||||
|
Service: scID,
|
||||||
|
Tags: []string{},
|
||||||
|
Address: "127.0.0.10",
|
||||||
|
Port: 9090,
|
||||||
|
}
|
||||||
|
a.State.AddService(sc, "")
|
||||||
|
|
||||||
|
tcpID := types.CheckID("service:socat-tcp")
|
||||||
|
chk0 := &structs.HealthCheck{
|
||||||
|
Node: "",
|
||||||
|
CheckID: tcpID,
|
||||||
|
Name: "tcp check",
|
||||||
|
Status: api.HealthPassing,
|
||||||
|
ServiceID: svcID,
|
||||||
|
}
|
||||||
|
a.State.AddCheck(chk0, "")
|
||||||
|
|
||||||
|
// Register an alias for the service
|
||||||
|
proxyID := types.CheckID("service:socat-sidecar-proxy:2")
|
||||||
|
chk1 := &structs.HealthCheck{
|
||||||
|
Node: "",
|
||||||
|
CheckID: proxyID,
|
||||||
|
Name: "Connect Sidecar Aliasing socat",
|
||||||
|
Status: api.HealthPassing,
|
||||||
|
ServiceID: scID,
|
||||||
|
}
|
||||||
|
chkt := &structs.CheckType{
|
||||||
|
AliasService: svcID,
|
||||||
|
}
|
||||||
|
require.NoError(t, a.AddCheck(chk1, chkt, true, "", agent.ConfigSourceLocal))
|
||||||
|
|
||||||
|
// Add a failing check to the same service ID, alias should also fail
|
||||||
|
maintID := types.CheckID("service:socat-maintenance")
|
||||||
|
chk2 := &structs.HealthCheck{
|
||||||
|
Node: "",
|
||||||
|
CheckID: maintID,
|
||||||
|
Name: "socat:Service Maintenance Mode",
|
||||||
|
Status: api.HealthCritical,
|
||||||
|
ServiceID: svcID,
|
||||||
|
}
|
||||||
|
a.State.AddCheck(chk2, "")
|
||||||
|
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
require.Equal(r, api.HealthCritical, a.State.Check(proxyID).Status)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Remove the failing check, alias should pass
|
||||||
|
a.State.RemoveCheck(maintID)
|
||||||
|
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
require.Equal(r, api.HealthPassing, a.State.Check(proxyID).Status)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Update TCP check to failing, alias should fail
|
||||||
|
a.State.UpdateCheck(tcpID, api.HealthCritical, "")
|
||||||
|
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
require.Equal(r, api.HealthCritical, a.State.Check(proxyID).Status)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// drainCh drains a channel by reading messages until it would block.
|
// drainCh drains a channel by reading messages until it would block.
|
||||||
func drainCh(ch chan struct{}) {
|
func drainCh(ch chan struct{}) {
|
||||||
for {
|
for {
|
||||||
|
|
Loading…
Reference in New Issue