Merge pull request #850 from hashicorp/f-ae-missing

Anti-entropy sync services/checks missing entirely from Consul
This commit is contained in:
Ryan Uber 2015-04-10 11:17:25 -07:00
commit 3477c0a0d7
2 changed files with 71 additions and 20 deletions

View File

@ -310,24 +310,47 @@ func (l *localState) setSyncState() error {
if err := l.iface.RPC("Health.NodeChecks", &req, &out2); err != nil { if err := l.iface.RPC("Health.NodeChecks", &req, &out2); err != nil {
return err return err
} }
services := out1.NodeServices
checks := out2.HealthChecks checks := out2.HealthChecks
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
if services != nil { services := make(map[string]*structs.NodeService)
for id, service := range services.Services { if out1.NodeServices != nil {
// If we don't have the service locally, deregister it services = out1.NodeServices.Services
existing, ok := l.services[id] }
if !ok {
l.serviceStatus[id] = syncStatus{remoteDelete: true}
continue
}
// If our definition is different, we need to update it for id, _ := range l.services {
equal := reflect.DeepEqual(existing, service) // If the local service doesn't exist remotely, then sync it
l.serviceStatus[id] = syncStatus{inSync: equal} if _, ok := services[id]; !ok {
l.serviceStatus[id] = syncStatus{inSync: false}
}
}
for id, service := range services {
// If we don't have the service locally, deregister it
existing, ok := l.services[id]
if !ok {
l.serviceStatus[id] = syncStatus{remoteDelete: true}
continue
}
// If our definition is different, we need to update it
equal := reflect.DeepEqual(existing, service)
l.serviceStatus[id] = syncStatus{inSync: equal}
}
for id, _ := range l.checks {
// Sync any check which doesn't exist on the remote side
found := false
for _, check := range checks {
if check.CheckID == id {
found = true
break
}
}
if !found {
l.checkStatus[id] = syncStatus{inSync: false}
} }
} }

View File

@ -88,6 +88,16 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
} }
agent.state.AddService(srv5) agent.state.AddService(srv5)
// Exists local, in sync, remote missing (create)
srv6 := &structs.NodeService{
ID: "cache",
Service: "cache",
Tags: []string{},
Port: 11211,
}
agent.state.AddService(srv6)
agent.state.serviceStatus["cache"] = syncStatus{inSync: true}
srv5_mod := new(structs.NodeService) srv5_mod := new(structs.NodeService)
*srv5_mod = *srv5 *srv5_mod = *srv5
srv5_mod.Address = "127.0.0.1" srv5_mod.Address = "127.0.0.1"
@ -110,8 +120,8 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// We should have 5 services (consul included) // We should have 6 services (consul included)
if len(services.NodeServices.Services) != 5 { if len(services.NodeServices.Services) != 6 {
t.Fatalf("bad: %v", services.NodeServices.Services) t.Fatalf("bad: %v", services.NodeServices.Services)
} }
@ -134,6 +144,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
if !reflect.DeepEqual(serv, srv5) { if !reflect.DeepEqual(serv, srv5) {
t.Fatalf("bad: %v %v", serv, srv5) t.Fatalf("bad: %v %v", serv, srv5)
} }
case "cache":
if !reflect.DeepEqual(serv, srv6) {
t.Fatalf("bad: %v %v", serv, srv6)
}
case "consul": case "consul":
// ignore // ignore
default: default:
@ -142,10 +156,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
} }
// Check the local state // Check the local state
if len(agent.state.services) != 5 { if len(agent.state.services) != 6 {
t.Fatalf("bad: %v", agent.state.services) t.Fatalf("bad: %v", agent.state.services)
} }
if len(agent.state.serviceStatus) != 5 { if len(agent.state.serviceStatus) != 6 {
t.Fatalf("bad: %v", agent.state.serviceStatus) t.Fatalf("bad: %v", agent.state.serviceStatus)
} }
for name, status := range agent.state.serviceStatus { for name, status := range agent.state.serviceStatus {
@ -439,6 +453,16 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Exists local, in sync, remote missing (create)
chk5 := &structs.HealthCheck{
Node: agent.config.NodeName,
CheckID: "cache",
Name: "cache",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk5)
agent.state.checkStatus["cache"] = syncStatus{inSync: true}
// Trigger anti-entropy run and wait // Trigger anti-entropy run and wait
agent.StartSync() agent.StartSync()
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
@ -453,8 +477,8 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// We should have 4 services (serf included) // We should have 5 checks (serf included)
if len(checks.HealthChecks) != 4 { if len(checks.HealthChecks) != 5 {
t.Fatalf("bad: %v", checks) t.Fatalf("bad: %v", checks)
} }
@ -473,6 +497,10 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
if !reflect.DeepEqual(chk, chk3) { if !reflect.DeepEqual(chk, chk3) {
t.Fatalf("bad: %v %v", chk, chk3) t.Fatalf("bad: %v %v", chk, chk3)
} }
case "cache":
if !reflect.DeepEqual(chk, chk5) {
t.Fatalf("bad: %v %v", chk, chk5)
}
case "serfHealth": case "serfHealth":
// ignore // ignore
default: default:
@ -481,10 +509,10 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
} }
// Check the local state // Check the local state
if len(agent.state.checks) != 3 { if len(agent.state.checks) != 4 {
t.Fatalf("bad: %v", agent.state.checks) t.Fatalf("bad: %v", agent.state.checks)
} }
if len(agent.state.checkStatus) != 3 { if len(agent.state.checkStatus) != 4 {
t.Fatalf("bad: %v", agent.state.checkStatus) t.Fatalf("bad: %v", agent.state.checkStatus)
} }
for name, status := range agent.state.checkStatus { for name, status := range agent.state.checkStatus {