Merge pull request #1934 from hashicorp/b-check-output

Syncs a check's output with the catalog when output rate limiting isn't in effect.
This commit is contained in:
James Phillips 2016-04-11 00:37:50 -07:00
commit c1c8a0b70c
4 changed files with 160 additions and 8 deletions

View File

@ -434,11 +434,26 @@ func (l *localState) setSyncState() error {
if l.config.CheckUpdateInterval == 0 {
equal = existing.IsSame(check)
} else {
eCopy := new(structs.HealthCheck)
*eCopy = *existing
eCopy.Output = ""
check.Output = ""
equal = eCopy.IsSame(check)
// Copy the existing check before potentially modifying
// it before the compare operation.
eCopy := existing.Clone()
// Copy the server's check before modifying, otherwise
// in-memory RPC-based unit tests will have side effects.
cCopy := check.Clone()
// If there's a defer timer active then we've got a
// potentially spammy check so we don't sync the output
// during this sweep since the timer will mark the check
// out of sync for us. Otherwise, it is safe to sync the
// output now. This is especially important for checks
// that don't change state after they are created, in
// which case we'd never see their output synced back ever.
if _, ok := l.deferCheck[id]; ok {
eCopy.Output = ""
cCopy.Output = ""
}
equal = eCopy.IsSame(cCopy)
}
// Update the status
@ -499,6 +514,8 @@ func (l *localState) syncChanges() error {
if err := l.syncNodeInfo(); err != nil {
return err
}
} else {
l.logger.Printf("[DEBUG] agent: Node info in sync")
}
return nil

View File

@ -654,7 +654,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
conf := nextConfig()
conf.CheckUpdateInterval = 100 * time.Millisecond
conf.CheckUpdateInterval = 500 * time.Millisecond
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()
@ -693,8 +693,8 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
// Update the check output! Should be deferred
agent.state.UpdateCheck("web", structs.HealthPassing, "output")
// Should not update for 100 milliseconds
time.Sleep(50 * time.Millisecond)
// Should not update for 500 milliseconds
time.Sleep(250 * time.Millisecond)
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
t.Fatalf("err: %v", err)
}
@ -729,6 +729,112 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %s", err)
})
// Change the output in the catalog to force it out of sync.
eCopy := check.Clone()
eCopy.Output = "changed"
reg := structs.RegisterRequest{
Datacenter: agent.config.Datacenter,
Node: agent.config.NodeName,
Address: agent.config.AdvertiseAddr,
TaggedAddresses: agent.config.TaggedAddresses,
Check: eCopy,
WriteRequest: structs.WriteRequest{},
}
var out struct{}
if err := agent.RPC("Catalog.Register", &reg, &out); err != nil {
t.Fatalf("err: %s", err)
}
// Verify that the output is out of sync.
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
t.Fatalf("err: %v", err)
}
for _, chk := range checks.HealthChecks {
switch chk.CheckID {
case "web":
if chk.Output != "changed" {
t.Fatalf("unexpected update: %v", chk)
}
}
}
// Trigger anti-entropy run and wait.
agent.StartSync()
time.Sleep(200 * time.Millisecond)
// Verify that the output was synced back to the agent's value.
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
t.Fatalf("err: %v", err)
}
for _, chk := range checks.HealthChecks {
switch chk.CheckID {
case "web":
if chk.Output != "output" {
t.Fatalf("missed update: %v", chk)
}
}
}
// Reset the catalog again.
if err := agent.RPC("Catalog.Register", &reg, &out); err != nil {
t.Fatalf("err: %s", err)
}
// Verify that the output is out of sync.
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
t.Fatalf("err: %v", err)
}
for _, chk := range checks.HealthChecks {
switch chk.CheckID {
case "web":
if chk.Output != "changed" {
t.Fatalf("unexpected update: %v", chk)
}
}
}
// Now make an update that should be deferred.
agent.state.UpdateCheck("web", structs.HealthPassing, "deferred")
// Trigger anti-entropy run and wait.
agent.StartSync()
time.Sleep(200 * time.Millisecond)
// Verify that the output is still out of sync since there's a deferred
// update pending.
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
t.Fatalf("err: %v", err)
}
for _, chk := range checks.HealthChecks {
switch chk.CheckID {
case "web":
if chk.Output != "changed" {
t.Fatalf("unexpected update: %v", chk)
}
}
}
// Wait for the deferred update.
testutil.WaitForResult(func() (bool, error) {
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
return false, err
}
// Verify updated
for _, chk := range checks.HealthChecks {
switch chk.CheckID {
case "web":
if chk.Output != "deferred" {
return false, fmt.Errorf("no update: %v", chk)
}
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestAgentAntiEntropy_NodeInfo(t *testing.T) {

View File

@ -396,6 +396,13 @@ func (c *HealthCheck) IsSame(other *HealthCheck) bool {
return true
}
// Clone returns a distinct clone of the HealthCheck.
func (c *HealthCheck) Clone() *HealthCheck {
clone := new(HealthCheck)
*clone = *c
return clone
}
type HealthChecks []*HealthCheck
// CheckServiceNode is used to provide the node, its service

View File

@ -211,6 +211,28 @@ func TestStructs_HealthCheck_IsSame(t *testing.T) {
check(&other.ServiceName)
}
func TestStructs_HealthCheck_Clone(t *testing.T) {
hc := &HealthCheck{
Node: "node1",
CheckID: "check1",
Name: "thecheck",
Status: HealthPassing,
Notes: "it's all good",
Output: "lgtm",
ServiceID: "service1",
ServiceName: "theservice",
}
clone := hc.Clone()
if !hc.IsSame(clone) {
t.Fatalf("should be equal to its clone")
}
clone.Output = "different"
if hc.IsSame(clone) {
t.Fatalf("should not longer be equal to its clone")
}
}
func TestStructs_CheckServiceNodes_Shuffle(t *testing.T) {
// Make a huge list of nodes.
var nodes CheckServiceNodes