checks: when a service does not exists in an alias, consider it failing (#7384)

In current implementation of Consul, check alias cannot determine
if a service exists or not. Because a service without any check
is semantically considered as passing, so when no healthchecks
are found for an agent, the check was considered as passing.

But this make little sense as the current implementation does not
make any difference between:
 * a non-existing service (passing)
 * a service without any check (passing as well)

In order to make it work, we have to ensure that when a check did
not find any healthcheck, the service does indeed exists. If it
does not, lets consider the check as failing.
This commit is contained in:
Pierre Souchay 2020-06-04 14:50:52 +02:00 committed by GitHub
parent dd8cd9bc24
commit 7cd5477c3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 408 additions and 36 deletions

View File

@ -7,6 +7,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math/rand"
"net" "net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -583,7 +584,13 @@ func testAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T, extraHCL st
services := make([]*structs.ServiceDefinition, numServices) services := make([]*structs.ServiceDefinition, numServices)
checkIDs := make([]types.CheckID, numServices) checkIDs := make([]types.CheckID, numServices)
for i := 0; i < numServices; i++ { services[0] = &structs.ServiceDefinition{
ID: "fake",
Name: "fake",
Port: 8080,
Checks: []*structs.CheckType{},
}
for i := 1; i < numServices; i++ {
name := fmt.Sprintf("web-%d", i) name := fmt.Sprintf("web-%d", i)
services[i] = &structs.ServiceDefinition{ services[i] = &structs.ServiceDefinition{
@ -620,6 +627,152 @@ func testAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T, extraHCL st
}) })
} }
func test_createAlias(t *testing.T, agent *TestAgent, chk *structs.CheckType, expectedResult string) func(r *retry.R) {
t.Helper()
serviceNum := rand.Int()
srv := &structs.NodeService{
Service: fmt.Sprintf("serviceAlias-%d", serviceNum),
Tags: []string{"tag1"},
Port: 8900 + serviceNum,
}
if srv.ID == "" {
srv.ID = fmt.Sprintf("serviceAlias-%d", serviceNum)
}
chk.Status = api.HealthWarning
if chk.CheckID == "" {
chk.CheckID = types.CheckID(fmt.Sprintf("check-%d", serviceNum))
}
err := agent.AddService(srv, []*structs.CheckType{chk}, false, "", ConfigSourceLocal)
assert.NoError(t, err)
return func(r *retry.R) {
t.Helper()
found := false
for _, c := range agent.State.CheckStates(structs.WildcardEnterpriseMeta()) {
if c.Check.CheckID == chk.CheckID {
found = true
assert.Equal(t, expectedResult, c.Check.Status, "Check state should be %s, was %s in %#v", expectedResult, c.Check.Status, c.Check)
var srvID structs.ServiceID
srvID.Init(srv.ID, structs.WildcardEnterpriseMeta())
if err := agent.Agent.State.RemoveService(structs.ServiceID(srvID)); err != nil {
fmt.Println("[DEBUG] Fail to remove service", srvID, ", err:=", err)
}
fmt.Println("[DEBUG] Service Removed", srvID, ", err:=", err)
break
}
}
assert.True(t, found)
}
}
// TestAgent_CheckAliasRPC test the Alias Check to be properly sync remotely
// and locally.
// It contains a few hacks such as unlockIndexOnNode because watch performed
// in CheckAlias.runQuery() waits for 1 min, so Shutdoww the agent might take time
// So, we ensure the agent will update regularilly the index
func TestAgent_CheckAliasRPC(t *testing.T) {
t.Helper()
a := NewTestAgent(t, `
node_name = "node1"
`)
srv := &structs.NodeService{
ID: "svcid1",
Service: "svcname1",
Tags: []string{"tag1"},
Port: 8100,
}
unlockIndexOnNode := func() {
// We ensure to not block and update Agent's index
srv.Tags = []string{fmt.Sprintf("tag-%s", time.Now())}
assert.NoError(t, a.waitForUp())
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceLocal)
assert.NoError(t, err)
}
shutdownAgent := func() {
// This is to be sure Alias Checks on remote won't be blocked during 1 min
unlockIndexOnNode()
fmt.Println("[DEBUG] STARTING shutdown for TestAgent_CheckAliasRPC", time.Now())
go a.Shutdown()
unlockIndexOnNode()
fmt.Println("[DEBUG] DONE shutdown for TestAgent_CheckAliasRPC", time.Now())
}
defer shutdownAgent()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
assert.NoError(t, a.waitForUp())
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceLocal)
assert.NoError(t, err)
retry.Run(t, func(r *retry.R) {
t.Helper()
var args structs.NodeSpecificRequest
args.Datacenter = "dc1"
args.Node = "node1"
args.AllowStale = true
var out structs.IndexedNodeServices
err := a.RPC("Catalog.NodeServices", &args, &out)
assert.NoError(r, err)
foundService := false
var lookup structs.ServiceID
lookup.Init("svcid1", structs.WildcardEnterpriseMeta())
for _, srv := range out.NodeServices.Services {
sid := srv.CompoundServiceID()
if lookup.Matches(&sid) {
foundService = true
}
}
assert.True(r, foundService, "could not find svcid1 in %#v", out.NodeServices.Services)
})
checks := make([](func(*retry.R)), 0)
checks = append(checks, test_createAlias(t, a, &structs.CheckType{
Name: "Check_Local_Ok",
AliasService: "svcid1",
}, api.HealthPassing))
checks = append(checks, test_createAlias(t, a, &structs.CheckType{
Name: "Check_Local_Fail",
AliasService: "svcidNoExistingID",
}, api.HealthCritical))
checks = append(checks, test_createAlias(t, a, &structs.CheckType{
Name: "Check_Remote_Host_Ok",
AliasNode: "node1",
AliasService: "svcid1",
}, api.HealthPassing))
checks = append(checks, test_createAlias(t, a, &structs.CheckType{
Name: "Check_Remote_Host_Non_Existing_Service",
AliasNode: "node1",
AliasService: "svcidNoExistingID",
}, api.HealthCritical))
// We wait for max 5s for all checks to be in sync
{
for i := 0; i < 50; i++ {
unlockIndexOnNode()
allNonWarning := true
for _, chk := range a.State.Checks(structs.WildcardEnterpriseMeta()) {
if chk.Status == api.HealthWarning {
allNonWarning = false
}
}
if allNonWarning {
break
} else {
time.Sleep(100 * time.Millisecond)
}
}
}
for _, toRun := range checks {
unlockIndexOnNode()
retry.Run(t, toRun)
}
}
func TestAgent_AddServiceNoExec(t *testing.T) { func TestAgent_AddServiceNoExec(t *testing.T) {
t.Run("normal", func(t *testing.T) { t.Run("normal", func(t *testing.T) {
t.Parallel() t.Parallel()

View File

@ -114,8 +114,9 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) {
for _, chk := range checks { for _, chk := range checks {
checksList = append(checksList, chk) checksList = append(checksList, chk)
} }
c.processChecks(checksList, func(serviceID *structs.ServiceID) bool {
c.processChecks(checksList) return c.Notify.ServiceExists(*serviceID)
})
extendRefreshTimer() extendRefreshTimer()
} }
@ -134,12 +135,44 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) {
} }
} }
// CheckIfServiceIDExists is used to determine if a service exists
type CheckIfServiceIDExists func(*structs.ServiceID) bool
func (c *CheckAlias) checkServiceExistsOnRemoteServer(serviceID *structs.ServiceID) (bool, error) {
args := c.RPCReq
args.Node = c.Node
args.AllowStale = true
args.EnterpriseMeta = c.EnterpriseMeta
// We are late at maximum of 15s compared to leader
args.MaxStaleDuration = time.Duration(15 * time.Second)
attempts := 0
RETRY_CALL:
var out structs.IndexedNodeServices
attempts++
if err := c.RPC.RPC("Catalog.NodeServices", &args, &out); err != nil {
if attempts <= 3 {
time.Sleep(time.Duration(attempts) * time.Second)
goto RETRY_CALL
}
return false, err
}
for _, srv := range out.NodeServices.Services {
sid := srv.CompoundServiceID()
if serviceID.Matches(&sid) {
return true, nil
}
}
return false, nil
}
func (c *CheckAlias) runQuery(stopCh chan struct{}) { func (c *CheckAlias) runQuery(stopCh chan struct{}) {
args := c.RPCReq args := c.RPCReq
args.Node = c.Node args.Node = c.Node
args.AllowStale = true args.AllowStale = true
args.MaxQueryTime = 1 * time.Minute args.MaxQueryTime = 1 * time.Minute
args.EnterpriseMeta = c.EnterpriseMeta args.EnterpriseMeta = c.EnterpriseMeta
// We are late at maximum of 15s compared to leader
args.MaxStaleDuration = time.Duration(15 * time.Second)
var attempt uint var attempt uint
for { for {
@ -173,6 +206,7 @@ func (c *CheckAlias) runQuery(stopCh chan struct{}) {
// but for blocking queries isn't that much more efficient since the checks // but for blocking queries isn't that much more efficient since the checks
// index is global to the cluster. // index is global to the cluster.
var out structs.IndexedHealthChecks var out structs.IndexedHealthChecks
if err := c.RPC.RPC("Health.NodeChecks", &args, &out); err != nil { if err := c.RPC.RPC("Health.NodeChecks", &args, &out); err != nil {
attempt++ attempt++
if attempt > 1 { if attempt > 1 {
@ -195,29 +229,37 @@ func (c *CheckAlias) runQuery(stopCh chan struct{}) {
if args.MinQueryIndex < 1 { if args.MinQueryIndex < 1 {
args.MinQueryIndex = 1 args.MinQueryIndex = 1
} }
c.processChecks(out.HealthChecks, func(serviceID *structs.ServiceID) bool {
c.processChecks(out.HealthChecks) ret, err := c.checkServiceExistsOnRemoteServer(serviceID)
if err != nil {
// We cannot determine if node has the check, let's assume it exists
return true
}
return ret
})
} }
} }
// processChecks is a common helper for taking a set of health checks and // processChecks is a common helper for taking a set of health checks and
// using them to update our alias. This is abstracted since the checks can // using them to update our alias. This is abstracted since the checks can
// come from both the remote server as well as local state. // come from both the remote server as well as local state.
func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) { func (c *CheckAlias) processChecks(checks []*structs.HealthCheck, CheckIfServiceIDExists CheckIfServiceIDExists) {
health := api.HealthPassing health := api.HealthPassing
msg := "No checks found." msg := "No checks found."
serviceFound := false
for _, chk := range checks { for _, chk := range checks {
if c.Node != "" && chk.Node != c.Node { if c.Node != "" && c.Node != chk.Node {
continue continue
} }
// We allow ServiceID == "" so that we also check node checks
sid := chk.CompoundServiceID() sid := chk.CompoundServiceID()
serviceMatch := c.ServiceID.Matches(&sid)
if chk.ServiceID != "" && !c.ServiceID.Matches(&sid) { if chk.ServiceID != "" && !serviceMatch {
continue continue
} }
// We have at least one healthcheck for this service
if serviceMatch {
serviceFound = true
}
if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning { if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning {
health = chk.Status health = chk.Status
msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output) msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output)
@ -228,13 +270,18 @@ func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) {
if chk.Status == api.HealthCritical { if chk.Status == api.HealthCritical {
break break
} }
} else {
// if current health is warning, don't overwrite it
if health == api.HealthPassing {
msg = "All checks passing."
}
}
}
if !serviceFound {
if !CheckIfServiceIDExists(&c.ServiceID) {
msg = fmt.Sprintf("Service %s could not be found on node %s", c.ServiceID.ID, c.Node)
health = api.HealthCritical
} }
msg = "All checks passing."
} }
// TODO(rb): if no matching checks found should this default to critical?
// Update our check value
c.Notify.UpdateCheck(c.CheckID, health, msg) c.Notify.UpdateCheck(c.CheckID, health, msg)
} }

View File

@ -30,7 +30,7 @@ func TestCheckAlias_remoteErrBackoff(t *testing.T) {
RPC: rpc, RPC: rpc,
} }
rpc.Reply.Store(fmt.Errorf("failure")) rpc.AddReply("Health.NodeChecks", fmt.Errorf("failure"))
chk.Start() chk.Start()
defer chk.Stop() defer chk.Stop()
@ -62,7 +62,7 @@ func TestCheckAlias_remoteNoChecks(t *testing.T) {
RPC: rpc, RPC: rpc,
} }
rpc.Reply.Store(structs.IndexedHealthChecks{}) rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{})
chk.Start() chk.Start()
defer chk.Stop() defer chk.Stop()
@ -88,7 +88,7 @@ func TestCheckAlias_remoteNodeFailure(t *testing.T) {
RPC: rpc, RPC: rpc,
} }
rpc.Reply.Store(structs.IndexedHealthChecks{ rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{
HealthChecks: []*structs.HealthCheck{ HealthChecks: []*structs.HealthCheck{
// Should ignore non-matching node // Should ignore non-matching node
&structs.HealthCheck{ &structs.HealthCheck{
@ -137,7 +137,7 @@ func TestCheckAlias_remotePassing(t *testing.T) {
RPC: rpc, RPC: rpc,
} }
rpc.Reply.Store(structs.IndexedHealthChecks{ rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{
HealthChecks: []*structs.HealthCheck{ HealthChecks: []*structs.HealthCheck{
// Should ignore non-matching node // Should ignore non-matching node
&structs.HealthCheck{ &structs.HealthCheck{
@ -171,6 +171,116 @@ func TestCheckAlias_remotePassing(t *testing.T) {
}) })
} }
// Remote service has no healtchecks, but service exists on remote host
func TestCheckAlias_remotePassingWithoutChecksButWithService(t *testing.T) {
t.Parallel()
notify := newMockAliasNotify()
chkID := structs.NewCheckID("foo", nil)
rpc := &mockRPC{}
chk := &CheckAlias{
Node: "remote",
ServiceID: structs.ServiceID{ID: "web"},
CheckID: chkID,
Notify: notify,
RPC: rpc,
}
rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{
HealthChecks: []*structs.HealthCheck{
// Should ignore non-matching node
&structs.HealthCheck{
Node: "A",
ServiceID: "web",
Status: api.HealthCritical,
},
// Should ignore non-matching service
&structs.HealthCheck{
Node: "remote",
ServiceID: "db",
Status: api.HealthCritical,
},
},
})
injected := structs.IndexedNodeServices{
NodeServices: &structs.NodeServices{
Node: &structs.Node{
Node: "remote",
},
Services: make(map[string]*structs.NodeService),
},
QueryMeta: structs.QueryMeta{},
}
injected.NodeServices.Services["web"] = &structs.NodeService{
Service: "web",
ID: "web",
}
rpc.AddReply("Catalog.NodeServices", injected)
chk.Start()
defer chk.Stop()
retry.Run(t, func(r *retry.R) {
if got, want := notify.State(chkID), api.HealthPassing; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}
// Remote service has no healtchecks, service does not exists on remote host
func TestCheckAlias_remotePassingWithoutChecksAndWithoutService(t *testing.T) {
t.Parallel()
notify := newMockAliasNotify()
chkID := structs.NewCheckID("foo", nil)
rpc := &mockRPC{}
chk := &CheckAlias{
Node: "remote",
ServiceID: structs.ServiceID{ID: "web"},
CheckID: chkID,
Notify: notify,
RPC: rpc,
}
rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{
HealthChecks: []*structs.HealthCheck{
// Should ignore non-matching node
&structs.HealthCheck{
Node: "A",
ServiceID: "web",
Status: api.HealthCritical,
},
// Should ignore non-matching service
&structs.HealthCheck{
Node: "remote",
ServiceID: "db",
Status: api.HealthCritical,
},
},
})
injected := structs.IndexedNodeServices{
NodeServices: &structs.NodeServices{
Node: &structs.Node{
Node: "remote",
},
Services: make(map[string]*structs.NodeService),
},
QueryMeta: structs.QueryMeta{},
}
rpc.AddReply("Catalog.NodeServices", injected)
chk.Start()
defer chk.Stop()
retry.Run(t, func(r *retry.R) {
if got, want := notify.State(chkID), api.HealthCritical; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}
// If any checks are critical, it should be critical // If any checks are critical, it should be critical
func TestCheckAlias_remoteCritical(t *testing.T) { func TestCheckAlias_remoteCritical(t *testing.T) {
t.Parallel() t.Parallel()
@ -186,7 +296,7 @@ func TestCheckAlias_remoteCritical(t *testing.T) {
RPC: rpc, RPC: rpc,
} }
rpc.Reply.Store(structs.IndexedHealthChecks{ rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{
HealthChecks: []*structs.HealthCheck{ HealthChecks: []*structs.HealthCheck{
// Should ignore non-matching node // Should ignore non-matching node
&structs.HealthCheck{ &structs.HealthCheck{
@ -241,7 +351,7 @@ func TestCheckAlias_remoteWarning(t *testing.T) {
RPC: rpc, RPC: rpc,
} }
rpc.Reply.Store(structs.IndexedHealthChecks{ rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{
HealthChecks: []*structs.HealthCheck{ HealthChecks: []*structs.HealthCheck{
// Should ignore non-matching node // Should ignore non-matching node
&structs.HealthCheck{ &structs.HealthCheck{
@ -295,7 +405,7 @@ func TestCheckAlias_remoteNodeOnlyPassing(t *testing.T) {
RPC: rpc, RPC: rpc,
} }
rpc.Reply.Store(structs.IndexedHealthChecks{ rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{
HealthChecks: []*structs.HealthCheck{ HealthChecks: []*structs.HealthCheck{
// Should ignore non-matching node // Should ignore non-matching node
&structs.HealthCheck{ &structs.HealthCheck{
@ -342,7 +452,7 @@ func TestCheckAlias_remoteNodeOnlyCritical(t *testing.T) {
RPC: rpc, RPC: rpc,
} }
rpc.Reply.Store(structs.IndexedHealthChecks{ rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{
HealthChecks: []*structs.HealthCheck{ HealthChecks: []*structs.HealthCheck{
// Should ignore non-matching node // Should ignore non-matching node
&structs.HealthCheck{ &structs.HealthCheck{
@ -403,9 +513,19 @@ type mockRPC struct {
Calls uint32 // Read-only, number of RPC calls Calls uint32 // Read-only, number of RPC calls
Args atomic.Value // Read-only, the last args sent Args atomic.Value // Read-only, the last args sent
// Write-only, the reply to send. If of type "error" then an error will // Write-only, the replies to send, indexed per method. If of type "error" then an error will
// be returned from the RPC call. // be returned from the RPC call.
Reply atomic.Value Replies map[string]*atomic.Value
}
func (m *mockRPC) AddReply(method string, reply interface{}) {
if m.Replies == nil {
m.Replies = make(map[string]*atomic.Value)
}
val := &atomic.Value{}
val.Store(reply)
m.Replies[method] = val
} }
func (m *mockRPC) RPC(method string, args interface{}, reply interface{}) error { func (m *mockRPC) RPC(method string, args interface{}, reply interface{}) error {
@ -424,12 +544,15 @@ func (m *mockRPC) RPC(method string, args interface{}, reply interface{}) error
} }
replyv = replyv.Elem() // Get pointer value replyv = replyv.Elem() // Get pointer value
replyv.Set(reflect.Zero(replyv.Type())) // Reset to zero value replyv.Set(reflect.Zero(replyv.Type())) // Reset to zero value
if v := m.Reply.Load(); v != nil { repl := m.Replies[method]
if repl == nil {
return fmt.Errorf("No Such Method: %s", method)
}
if v := m.Replies[method].Load(); v != nil {
// Return an error if the reply is an error type // Return an error if the reply is an error type
if err, ok := v.(error); ok { if err, ok := v.(error); ok {
return err return err
} }
replyv.Set(reflect.ValueOf(v)) // Set to reply value if non-nil replyv.Set(reflect.ValueOf(v)) // Set to reply value if non-nil
} }
@ -442,6 +565,8 @@ func TestCheckAlias_localInitialStatus(t *testing.T) {
t.Parallel() t.Parallel()
notify := newMockAliasNotify() notify := newMockAliasNotify()
// We fake a local service web to ensure check if passing works
notify.Notify.AddServiceID(structs.ServiceID{ID: "web"})
chkID := structs.NewCheckID(types.CheckID("foo"), nil) chkID := structs.NewCheckID(types.CheckID("foo"), nil)
rpc := &mockRPC{} rpc := &mockRPC{}
chk := &CheckAlias{ chk := &CheckAlias{
@ -463,3 +588,27 @@ func TestCheckAlias_localInitialStatus(t *testing.T) {
} }
}) })
} }
// Local check on non-existing service
func TestCheckAlias_localInitialStatusShouldFailBecauseNoService(t *testing.T) {
t.Parallel()
notify := newMockAliasNotify()
chkID := structs.NewCheckID(types.CheckID("foo"), nil)
rpc := &mockRPC{}
chk := &CheckAlias{
ServiceID: structs.ServiceID{ID: "web"},
CheckID: chkID,
Notify: notify,
RPC: rpc,
}
chk.Start()
defer chk.Stop()
retry.Run(t, func(r *retry.R) {
if got, want := notify.State(chkID), api.HealthCritical; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}

View File

@ -52,6 +52,8 @@ type RPC interface {
// should take care to be idempotent. // should take care to be idempotent.
type CheckNotifier interface { type CheckNotifier interface {
UpdateCheck(checkID structs.CheckID, status, output string) UpdateCheck(checkID structs.CheckID, status, output string)
// ServiceExists return true if the given service does exists
ServiceExists(serviceID structs.ServiceID) bool
} }
// CheckMonitor is used to periodically invoke a script to // CheckMonitor is used to periodically invoke a script to

View File

@ -485,6 +485,13 @@ func (l *State) AddAliasCheck(checkID structs.CheckID, srcServiceID structs.Serv
return nil return nil
} }
// ServiceExists return true if the given service does exists
func (l *State) ServiceExists(serviceID structs.ServiceID) bool {
l.Lock()
defer l.Unlock()
return l.services[serviceID] != nil
}
// RemoveAliasCheck removes the mapping for the alias check. // RemoveAliasCheck removes the mapping for the alias check.
func (l *State) RemoveAliasCheck(checkID structs.CheckID, srcServiceID structs.ServiceID) { func (l *State) RemoveAliasCheck(checkID structs.CheckID, srcServiceID structs.ServiceID) {
l.Lock() l.Lock()

View File

@ -48,7 +48,9 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
}, },
EnterpriseMeta: *structs.DefaultEnterpriseMeta(), EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
} }
assert.False(t, a.State.ServiceExists(structs.ServiceID{ID: srv1.ID}))
a.State.AddService(srv1, "") a.State.AddService(srv1, "")
assert.True(t, a.State.ServiceExists(structs.ServiceID{ID: srv1.ID}))
args.Service = srv1 args.Service = srv1
if err := a.RPC("Catalog.Register", args, &out); err != nil { if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)

View File

@ -14,19 +14,31 @@ type Notify struct {
// of the notification mock in order to prevent panics // of the notification mock in order to prevent panics
// raised by the race conditions detector. // raised by the race conditions detector.
sync.RWMutex sync.RWMutex
state map[structs.CheckID]string state map[structs.CheckID]string
updates map[structs.CheckID]int updates map[structs.CheckID]int
output map[structs.CheckID]string output map[structs.CheckID]string
serviceIDs map[structs.ServiceID]bool
} }
func NewNotify() *Notify { func NewNotify() *Notify {
return &Notify{ return &Notify{
state: make(map[structs.CheckID]string), state: make(map[structs.CheckID]string),
updates: make(map[structs.CheckID]int), updates: make(map[structs.CheckID]int),
output: make(map[structs.CheckID]string), output: make(map[structs.CheckID]string),
serviceIDs: make(map[structs.ServiceID]bool),
} }
} }
// ServiceExists mock
func (c *Notify) ServiceExists(serviceID structs.ServiceID) bool {
return c.serviceIDs[serviceID]
}
// AddServiceID will mock a service being present locally
func (c *Notify) AddServiceID(serviceID structs.ServiceID) {
c.serviceIDs[serviceID] = true
}
func NewNotifyChan() (*Notify, chan int) { func NewNotifyChan() (*Notify, chan int) {
n := &Notify{ n := &Notify{
updated: make(chan int), updated: make(chan int),