From 0b5dfee00acbca775668ab7f65248d31860a01e5 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 15 Mar 2021 17:35:42 -0400 Subject: [PATCH] state: use runCase pattern for large test The TestServiceHealthEventsFromChanges function was over 1400 lines. Attempting to debug test failures in test functions this large is difficult. It requires scrolling to the line which defines the testcase because the failure message only includes the line number of the assertion, not the line number of the test case. This is an excellent example of where test tables stop working well, and start being a problem. To mitigate this problem, the runCase pattern can be used. When one of these tests fails, a failure message will print the line number of both the test case and the assertion. This allows a developer to quickly jump to both of the relevant lines, signficanting reducing the time it takes to debug test failures. For example, one such failure could look like this: catalog_events_test.go:1610: case: service reg, new node catalog_events_test.go:1605: assertion failed: values are not equal --- agent/consul/state/catalog_events_oss_test.go | 7 - agent/consul/state/catalog_events_test.go | 2792 +++++++++-------- 2 files changed, 1401 insertions(+), 1398 deletions(-) delete mode 100644 agent/consul/state/catalog_events_oss_test.go diff --git a/agent/consul/state/catalog_events_oss_test.go b/agent/consul/state/catalog_events_oss_test.go deleted file mode 100644 index bad6fa817..000000000 --- a/agent/consul/state/catalog_events_oss_test.go +++ /dev/null @@ -1,7 +0,0 @@ -// +build !consulent - -package state - -func withServiceHealthEnterpriseCases(cases []serviceHealthTestCase) []serviceHealthTestCase { - return cases -} diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index de7a266f0..2d901388f 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -196,7 +196,7 @@ func newIndexCounter() *indexCounter { var _ stream.SnapshotAppender = (*snapshotAppender)(nil) -type serviceHealthTestCase struct { +type eventsTestCase struct { Name string Setup func(s *Store, tx *txn) error Mutate func(s *Store, tx *txn) error @@ -205,1403 +205,1413 @@ type serviceHealthTestCase struct { } func TestServiceHealthEventsFromChanges(t *testing.T) { + setupIndex := uint64(10) + + run := func(t *testing.T, tc eventsTestCase) { + t.Helper() + runCase(t, tc.Name, tc.run) + } + + run(t, eventsTestCase{ + Name: "irrelevant events", + Mutate: func(s *Store, tx *txn) error { + return kvsSetTxn(tx, tx.Index, &structs.DirEntry{ + Key: "foo", + Value: []byte("bar"), + }, false) + }, + WantEvents: nil, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "service reg, new node", + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "web"), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "service reg, existing node", + Setup: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false) + }, + WantEvents: []stream.Event{ + // Should only publish new service + testServiceHealthEvent(t, "web", evNodeUnchanged), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "service dereg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteServiceTxn(tx, tx.Index, "node1", "web", nil) + }, + WantEvents: []stream.Event{ + // Should only publish deregistration for that service + testServiceHealthDeregistrationEvent(t, "web"), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "node dereg", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db"), false); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web"), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteNodeTxn(tx, tx.Index, "node1") + }, + WantEvents: []stream.Event{ + // Should publish deregistration events for all services + testServiceHealthDeregistrationEvent(t, "db"), + testServiceHealthDeregistrationEvent(t, "web"), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect native reg, new node", + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative), false) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event as well as a connect + // one. + testServiceHealthEvent(t, "web", evConnectNative), + testServiceHealthEvent(t, "web", evConnectNative, evConnectTopic), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect native reg, existing node", + Setup: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db"), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative), false) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event as well as a connect + // one. + testServiceHealthEvent(t, "web", + evNodeUnchanged, + evConnectNative), + testServiceHealthEvent(t, "web", + evNodeUnchanged, + evConnectNative, + evConnectTopic), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect native dereg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db"), false); err != nil { + return err + } + + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteServiceTxn(tx, tx.Index, "node1", "web", nil) + }, + WantEvents: []stream.Event{ + // We should see both a regular service dereg event and a connect one + testServiceHealthDeregistrationEvent(t, "web", evConnectNative), + testServiceHealthDeregistrationEvent(t, "web", evConnectNative, evConnectTopic), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect sidecar reg, new node", + Mutate: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web"), false); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regSidecar), false) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event for the web service + // another for the sidecar service and a connect event for web. + testServiceHealthEvent(t, "web"), + testServiceHealthEvent(t, "web", evSidecar), + testServiceHealthEvent(t, "web", evConnectTopic, evSidecar), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect sidecar reg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event for the proxy + // service and a connect one for the target service. + testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged), + testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect sidecar dereg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false) + }, + Mutate: func(s *Store, tx *txn) error { + // Delete only the sidecar + return s.deleteServiceTxn(tx, tx.Index, "node1", "web_sidecar_proxy", nil) + }, + WantEvents: []stream.Event{ + // We should see both a regular service dereg event and a connect one + testServiceHealthDeregistrationEvent(t, "web", evSidecar), + testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evSidecar), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect sidecar mutate svc", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false) + }, + Mutate: func(s *Store, tx *txn) error { + // Change port of the target service instance + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regMutatePort), false) + }, + WantEvents: []stream.Event{ + // We should see the service topic update but not connect since proxy + // details didn't change. + testServiceHealthEvent(t, "web", + evMutatePort, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect sidecar mutate sidecar", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false) + }, + Mutate: func(s *Store, tx *txn) error { + // Change port of the sidecar service instance + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regMutatePort), false) + }, + WantEvents: []stream.Event{ + // We should see the proxy service topic update and a connect update + testServiceHealthEvent(t, "web", + evSidecar, + evMutatePort, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeUnchanged, + evMutatePort, + evServiceMutated, + evChecksUnchanged), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect sidecar rename service", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false) + }, + Mutate: func(s *Store, tx *txn) error { + // Change service name but not ID, update proxy too + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regRenameService), false); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regRenameService), false) + }, + WantEvents: []stream.Event{ + // We should see events to deregister the old service instance and the + // old connect instance since we changed topic key for both. Then new + // service and connect registrations. The proxy instance should also + // change since it's not proxying a different service. + testServiceHealthDeregistrationEvent(t, "web"), + testServiceHealthEvent(t, "web", + evRenameService, + evServiceMutated, + evNodeUnchanged, + evServiceChecksMutated, + ), + testServiceHealthDeregistrationEvent(t, "web", + evConnectTopic, + evSidecar, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evRenameService, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeUnchanged, + evRenameService, + evServiceMutated, + evChecksUnchanged, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect sidecar change destination service", + Setup: func(s *Store, tx *txn) error { + // Register a web_changed service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web_changed"), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + // And a sidecar initially for web, will be moved to target web_changed + // in Mutate. + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false) + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the destination service of the proxy without a service + // rename or deleting and recreating the proxy. This is far fetched but + // still valid. + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regRenameService), false) + }, + WantEvents: []stream.Event{ + // We should only see service health events for the sidecar service + // since the actual target services didn't change. But also should see + // Connect topic dereg for the old name to update existing subscribers + // for Connect/web. + testServiceHealthDeregistrationEvent(t, "web", + evConnectTopic, + evSidecar, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evRenameService, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeUnchanged, + evRenameService, + evServiceMutated, + evChecksUnchanged, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "multi-service node update", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the node meta. + return s.ensureRegistrationTxn(tx, tx.Index, false, + testNodeRegistration(t, regNodeMeta), false) + }, + WantEvents: []stream.Event{ + // We should see updates for all services and a connect update for the + // sidecar's destination. + testServiceHealthEvent(t, "db", + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "multi-service node rename", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the node NAME but not it's ID. We do it for every service + // though since this is effectively what client agent anti-entropy would + // do on a node rename. If we only rename the node it will have no + // services registered afterwards. + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db", regRenameNode), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regRenameNode), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regRenameNode), false); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + // Node rename is implemented internally as a node delete and new node + // insert after some renaming validation. So we should see full set of + // new events for health, then the deletions of old services, then the + // connect update and delete pair. + testServiceHealthEvent(t, "db", + evRenameNode, + // Although we delete and re-insert, we do maintain the CreatedIndex + // of the node record from the old one. + evNodeMutated, + ), + testServiceHealthEvent(t, "web", + evRenameNode, + evNodeMutated, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evRenameNode, + evNodeMutated, + ), + // dereg events for old node name services + testServiceHealthDeregistrationEvent(t, "db"), + testServiceHealthDeregistrationEvent(t, "web"), + testServiceHealthDeregistrationEvent(t, "web", evSidecar), + // Connect topic updates are last due to the way we add them + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evRenameNode, + evNodeMutated, + ), + testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evSidecar), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "multi-service node check failure", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the node-level check status + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regNodeCheckFail), false); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "db", + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + // Only the node check changed. This needs to come after evNodeUnchanged + evNodeChecksMutated, + ), + testServiceHealthEvent(t, "web", + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evNodeChecksMutated, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evNodeChecksMutated, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evNodeChecksMutated, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "multi-service node service check failure", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change the service-level check status + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regServiceCheckFail), false); err != nil { + return err + } + // Also change the service-level check status for the proxy. This is + // analogous to what would happen with an alias check on the client side + // - the proxies check would get updated at roughly the same time as the + // target service check updates. + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regServiceCheckFail), false); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + // Should only see the events for that one service change, the sidecar + // service and hence the connect topic for that service. + testServiceHealthEvent(t, "web", + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceChecksMutated, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceChecksMutated, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceChecksMutated, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "multi-service node node-level check delete", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Delete only the node-level check + if err := s.deleteCheckTxn(tx, tx.Index, "node1", "serf-health", nil); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "db", + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + testServiceHealthEvent(t, "web", + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "multi-service node service check delete", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Delete the service-level check for the main service + if err := s.deleteCheckTxn(tx, tx.Index, "node1", "service:web", nil); err != nil { + return err + } + // Also delete for a proxy + if err := s.deleteCheckTxn(tx, tx.Index, "node1", "service:web_sidecar_proxy", nil); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + // Should only see the events for that one service change, the sidecar + // service and hence the connect topic for that service. + testServiceHealthEvent(t, "web", + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceCheckDelete, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceCheckDelete, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceCheckDelete, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "many services on many nodes in one TX", + Setup: func(s *Store, tx *txn) error { + // Node1 + + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + + // Node2 + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regNode2), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regNode2), false); err != nil { + return err + } + + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // In one transaction the operator moves the web service and it's + // sidecar from node2 back to node1 and deletes them from node2 + + if err := s.deleteServiceTxn(tx, tx.Index, "node2", "web", nil); err != nil { + return err + } + if err := s.deleteServiceTxn(tx, tx.Index, "node2", "web_sidecar_proxy", nil); err != nil { + return err + } + + // Register those on node1 + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false); err != nil { + return err + } + + // And for good measure, add a new connect-native service to node2 + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "api", regConnectNative, regNode2), false); err != nil { + return err + } + + return nil + }, + + WantEvents: []stream.Event{ + // We should see: + // - service dereg for web and proxy on node2 + // - connect dereg for web on node2 + // - service reg for web and proxy on node1 + // - connect reg for web on node1 + // - service reg for api on node2 + // - connect reg for api on node2 + testServiceHealthDeregistrationEvent(t, "web", evNode2), + testServiceHealthDeregistrationEvent(t, "web", evNode2, evSidecar), + testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evNode2, evSidecar), + + testServiceHealthEvent(t, "web", evNodeUnchanged), + testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged), + testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged), + + testServiceHealthEvent(t, "api", evNode2, evConnectNative, evNodeUnchanged), + testServiceHealthEvent(t, "api", evNode2, evConnectTopic, evConnectNative, evNodeUnchanged), + }, + }) + run(t, eventsTestCase{ + Name: "terminating gateway registered with no config entry", + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway("tgate1")), + }, + }) + run(t, eventsTestCase{ + Name: "config entry created with no terminating gateway instance", + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry) + }, + WantEvents: []stream.Event{}, + }) + run(t, eventsTestCase{ + Name: "terminating gateway registered after config entry exists", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry) + }, + Mutate: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false, + ); err != nil { + return err + } + return s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway, regNode2), false) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway("tgate1")), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2")), + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway("tgate1"), + evNode2), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evNode2), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evNode2), + }, + }) + run(t, eventsTestCase{ + Name: "terminating gateway updated after config entry exists", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + return s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway, regNodeCheckFail), false) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway("tgate1"), + evNodeCheckFail, + evNodeUnchanged, + evNodeChecksMutated, + evServiceUnchanged), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evNodeCheckFail, + evNodeUnchanged, + evNodeChecksMutated, + evServiceUnchanged), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evNodeCheckFail, + evNodeUnchanged, + evNodeChecksMutated, + evServiceUnchanged), + }, + }) + run(t, eventsTestCase{ + Name: "terminating gateway config entry created after gateway exists", + Setup: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evServiceIndex(setupIndex)), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evServiceIndex(setupIndex)), + }, + }) + run(t, eventsTestCase{ + Name: "change the terminating gateway config entry to add a linked service", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evServiceIndex(setupIndex)), + }, + }) + run(t, eventsTestCase{ + Name: "change the terminating gateway config entry to remove a linked service", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + }, + }) + run(t, eventsTestCase{ + Name: "update a linked service within a terminating gateway config entry", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + CAFile: "foo.crt", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evServiceIndex(setupIndex)), + }, + }) + run(t, eventsTestCase{ + Name: "delete a terminating gateway config entry with a linked service", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + err = s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + if err != nil { + return err + } + return s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway, regNode2), false) + }, + Mutate: func(s *Store, tx *txn) error { + return deleteConfigEntryTxn(tx, tx.Index, structs.TerminatingGateway, "tgate1", structs.DefaultEnterpriseMeta()) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evNode2), + }, + }) + run(t, eventsTestCase{ + Name: "create an instance of a linked service in a terminating gateway", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "srv1"), false) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "srv1", evNodeUnchanged), + }, + }) + run(t, eventsTestCase{ + Name: "delete an instance of a linked service in a terminating gateway", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + err = s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "srv1"), false) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteServiceTxn(tx, tx.Index, "node1", "srv1", nil) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, "srv1"), + }, + }) + run(t, eventsTestCase{ + Name: "rename a terminating gateway instance", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + configEntry = &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate2", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err = ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + rename := func(req *structs.RegisterRequest) error { + req.Service.Service = "tgate2" + req.Checks[1].ServiceName = "tgate2" + return nil + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway, rename), false) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evServiceTermingGateway("tgate1")), + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway(""), + evNodeUnchanged, + evServiceMutated, + evServiceChecksMutated, + evTerminatingGatewayRenamed("tgate2")), + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evNodeUnchanged, + evServiceMutated, + evServiceChecksMutated, + evTerminatingGatewayRenamed("tgate2")), + }, + }) + run(t, eventsTestCase{ + Name: "delete a terminating gateway instance", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteServiceTxn(tx, tx.Index, "node1", "tgate1", structs.DefaultEnterpriseMeta()) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evServiceTermingGateway("")), + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2")), + }, + }) +} + +func (tc eventsTestCase) run(t *testing.T) { + s := NewStateStore(nil) + setupIndex := uint64(10) mutateIndex := uint64(100) - cases := []serviceHealthTestCase{ - { - Name: "irrelevant events", - Mutate: func(s *Store, tx *txn) error { - return kvsSetTxn(tx, tx.Index, &structs.DirEntry{ - Key: "foo", - Value: []byte("bar"), - }, false) - }, - WantEvents: nil, - WantErr: false, - }, - { - Name: "service reg, new node", - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false) - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, "web"), - }, - WantErr: false, - }, - { - Name: "service reg, existing node", - Setup: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false) - }, - WantEvents: []stream.Event{ - // Should only publish new service - testServiceHealthEvent(t, "web", evNodeUnchanged), - }, - WantErr: false, - }, - { - Name: "service dereg, existing node", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - return s.deleteServiceTxn(tx, tx.Index, "node1", "web", nil) - }, - WantEvents: []stream.Event{ - // Should only publish deregistration for that service - testServiceHealthDeregistrationEvent(t, "web"), - }, - WantErr: false, - }, - { - Name: "node dereg", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db"), false); err != nil { - return err - } - if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web"), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - return s.deleteNodeTxn(tx, tx.Index, "node1") - }, - WantEvents: []stream.Event{ - // Should publish deregistration events for all services - testServiceHealthDeregistrationEvent(t, "db"), - testServiceHealthDeregistrationEvent(t, "web"), - }, - WantErr: false, - }, - { - Name: "connect native reg, new node", - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative), false) - }, - WantEvents: []stream.Event{ - // We should see both a regular service health event as well as a connect - // one. - testServiceHealthEvent(t, "web", evConnectNative), - testServiceHealthEvent(t, "web", evConnectNative, evConnectTopic), - }, - WantErr: false, - }, - { - Name: "connect native reg, existing node", - Setup: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db"), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative), false) - }, - WantEvents: []stream.Event{ - // We should see both a regular service health event as well as a connect - // one. - testServiceHealthEvent(t, "web", - evNodeUnchanged, - evConnectNative), - testServiceHealthEvent(t, "web", - evNodeUnchanged, - evConnectNative, - evConnectTopic), - }, - WantErr: false, - }, - { - Name: "connect native dereg, existing node", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db"), false); err != nil { - return err - } - - return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.deleteServiceTxn(tx, tx.Index, "node1", "web", nil) - }, - WantEvents: []stream.Event{ - // We should see both a regular service dereg event and a connect one - testServiceHealthDeregistrationEvent(t, "web", evConnectNative), - testServiceHealthDeregistrationEvent(t, "web", evConnectNative, evConnectTopic), - }, - WantErr: false, - }, - { - Name: "connect sidecar reg, new node", - Mutate: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web"), false); err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regSidecar), false) - }, - WantEvents: []stream.Event{ - // We should see both a regular service health event for the web service - // another for the sidecar service and a connect event for web. - testServiceHealthEvent(t, "web"), - testServiceHealthEvent(t, "web", evSidecar), - testServiceHealthEvent(t, "web", evConnectTopic, evSidecar), - }, - WantErr: false, - }, - { - Name: "connect sidecar reg, existing node", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false) - }, - WantEvents: []stream.Event{ - // We should see both a regular service health event for the proxy - // service and a connect one for the target service. - testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged), - testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged), - }, - WantErr: false, - }, - { - Name: "connect sidecar dereg, existing node", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false) - }, - Mutate: func(s *Store, tx *txn) error { - // Delete only the sidecar - return s.deleteServiceTxn(tx, tx.Index, "node1", "web_sidecar_proxy", nil) - }, - WantEvents: []stream.Event{ - // We should see both a regular service dereg event and a connect one - testServiceHealthDeregistrationEvent(t, "web", evSidecar), - testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evSidecar), - }, - WantErr: false, - }, - { - Name: "connect sidecar mutate svc", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false) - }, - Mutate: func(s *Store, tx *txn) error { - // Change port of the target service instance - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regMutatePort), false) - }, - WantEvents: []stream.Event{ - // We should see the service topic update but not connect since proxy - // details didn't change. - testServiceHealthEvent(t, "web", - evMutatePort, - evNodeUnchanged, - evServiceMutated, - evChecksUnchanged, - ), - }, - WantErr: false, - }, - { - Name: "connect sidecar mutate sidecar", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false) - }, - Mutate: func(s *Store, tx *txn) error { - // Change port of the sidecar service instance - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar, regMutatePort), false) - }, - WantEvents: []stream.Event{ - // We should see the proxy service topic update and a connect update - testServiceHealthEvent(t, "web", - evSidecar, - evMutatePort, - evNodeUnchanged, - evServiceMutated, - evChecksUnchanged), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evNodeUnchanged, - evMutatePort, - evServiceMutated, - evChecksUnchanged), - }, - WantErr: false, - }, - { - Name: "connect sidecar rename service", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false) - }, - Mutate: func(s *Store, tx *txn) error { - // Change service name but not ID, update proxy too - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regRenameService), false); err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar, regRenameService), false) - }, - WantEvents: []stream.Event{ - // We should see events to deregister the old service instance and the - // old connect instance since we changed topic key for both. Then new - // service and connect registrations. The proxy instance should also - // change since it's not proxying a different service. - testServiceHealthDeregistrationEvent(t, "web"), - testServiceHealthEvent(t, "web", - evRenameService, - evServiceMutated, - evNodeUnchanged, - evServiceChecksMutated, - ), - testServiceHealthDeregistrationEvent(t, "web", - evConnectTopic, - evSidecar, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evRenameService, - evNodeUnchanged, - evServiceMutated, - evChecksUnchanged, - ), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evNodeUnchanged, - evRenameService, - evServiceMutated, - evChecksUnchanged, - ), - }, - WantErr: false, - }, - { - Name: "connect sidecar change destination service", - Setup: func(s *Store, tx *txn) error { - // Register a web_changed service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web_changed"), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - // And a sidecar initially for web, will be moved to target web_changed - // in Mutate. - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false) - }, - Mutate: func(s *Store, tx *txn) error { - // Change only the destination service of the proxy without a service - // rename or deleting and recreating the proxy. This is far fetched but - // still valid. - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar, regRenameService), false) - }, - WantEvents: []stream.Event{ - // We should only see service health events for the sidecar service - // since the actual target services didn't change. But also should see - // Connect topic dereg for the old name to update existing subscribers - // for Connect/web. - testServiceHealthDeregistrationEvent(t, "web", - evConnectTopic, - evSidecar, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evRenameService, - evNodeUnchanged, - evServiceMutated, - evChecksUnchanged, - ), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evNodeUnchanged, - evRenameService, - evServiceMutated, - evChecksUnchanged, - ), - }, - WantErr: false, - }, - { - Name: "multi-service node update", - Setup: func(s *Store, tx *txn) error { - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - // Change only the node meta. - return s.ensureRegistrationTxn(tx, tx.Index, false, - testNodeRegistration(t, regNodeMeta), false) - }, - WantEvents: []stream.Event{ - // We should see updates for all services and a connect update for the - // sidecar's destination. - testServiceHealthEvent(t, "db", - evNodeMeta, - evNodeMutated, - evServiceUnchanged, - evChecksUnchanged, - ), - testServiceHealthEvent(t, "web", - evNodeMeta, - evNodeMutated, - evServiceUnchanged, - evChecksUnchanged, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evNodeMeta, - evNodeMutated, - evServiceUnchanged, - evChecksUnchanged, - ), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evNodeMeta, - evNodeMutated, - evServiceUnchanged, - evChecksUnchanged, - ), - }, - WantErr: false, - }, - { - Name: "multi-service node rename", - Setup: func(s *Store, tx *txn) error { - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - // Change only the node NAME but not it's ID. We do it for every service - // though since this is effectively what client agent anti-entropy would - // do on a node rename. If we only rename the node it will have no - // services registered afterwards. - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db", regRenameNode), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regRenameNode), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar, regRenameNode), false); err != nil { - return err - } - return nil - }, - WantEvents: []stream.Event{ - // Node rename is implemented internally as a node delete and new node - // insert after some renaming validation. So we should see full set of - // new events for health, then the deletions of old services, then the - // connect update and delete pair. - testServiceHealthEvent(t, "db", - evRenameNode, - // Although we delete and re-insert, we do maintain the CreatedIndex - // of the node record from the old one. - evNodeMutated, - ), - testServiceHealthEvent(t, "web", - evRenameNode, - evNodeMutated, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evRenameNode, - evNodeMutated, - ), - // dereg events for old node name services - testServiceHealthDeregistrationEvent(t, "db"), - testServiceHealthDeregistrationEvent(t, "web"), - testServiceHealthDeregistrationEvent(t, "web", evSidecar), - // Connect topic updates are last due to the way we add them - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evRenameNode, - evNodeMutated, - ), - testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evSidecar), - }, - WantErr: false, - }, - { - Name: "multi-service node check failure", - Setup: func(s *Store, tx *txn) error { - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - // Change only the node-level check status - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regNodeCheckFail), false); err != nil { - return err - } - return nil - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, "db", - evNodeCheckFail, - evNodeUnchanged, - evServiceUnchanged, - // Only the node check changed. This needs to come after evNodeUnchanged - evNodeChecksMutated, - ), - testServiceHealthEvent(t, "web", - evNodeCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evNodeChecksMutated, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evNodeCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evNodeChecksMutated, - ), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evNodeCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evNodeChecksMutated, - ), - }, - WantErr: false, - }, - { - Name: "multi-service node service check failure", - Setup: func(s *Store, tx *txn) error { - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - // Change the service-level check status - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regServiceCheckFail), false); err != nil { - return err - } - // Also change the service-level check status for the proxy. This is - // analogous to what would happen with an alias check on the client side - // - the proxies check would get updated at roughly the same time as the - // target service check updates. - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar, regServiceCheckFail), false); err != nil { - return err - } - return nil - }, - WantEvents: []stream.Event{ - // Should only see the events for that one service change, the sidecar - // service and hence the connect topic for that service. - testServiceHealthEvent(t, "web", - evServiceCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evServiceChecksMutated, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evServiceCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evServiceChecksMutated, - ), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evServiceCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evServiceChecksMutated, - ), - }, - WantErr: false, - }, - { - Name: "multi-service node node-level check delete", - Setup: func(s *Store, tx *txn) error { - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - // Delete only the node-level check - if err := s.deleteCheckTxn(tx, tx.Index, "node1", "serf-health", nil); err != nil { - return err - } - return nil - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, "db", - evNodeCheckDelete, - evNodeUnchanged, - evServiceUnchanged, - ), - testServiceHealthEvent(t, "web", - evNodeCheckDelete, - evNodeUnchanged, - evServiceUnchanged, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evNodeCheckDelete, - evNodeUnchanged, - evServiceUnchanged, - ), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evNodeCheckDelete, - evNodeUnchanged, - evServiceUnchanged, - ), - }, - WantErr: false, - }, - { - Name: "multi-service node service check delete", - Setup: func(s *Store, tx *txn) error { - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - // Delete the service-level check for the main service - if err := s.deleteCheckTxn(tx, tx.Index, "node1", "service:web", nil); err != nil { - return err - } - // Also delete for a proxy - if err := s.deleteCheckTxn(tx, tx.Index, "node1", "service:web_sidecar_proxy", nil); err != nil { - return err - } - return nil - }, - WantEvents: []stream.Event{ - // Should only see the events for that one service change, the sidecar - // service and hence the connect topic for that service. - testServiceHealthEvent(t, "web", - evServiceCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evServiceCheckDelete, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evServiceCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evServiceCheckDelete, - ), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evServiceCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evServiceCheckDelete, - ), - }, - WantErr: false, - }, - { - Name: "many services on many nodes in one TX", - Setup: func(s *Store, tx *txn) error { - // Node1 - - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - - // Node2 - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regNode2), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar, regNode2), false); err != nil { - return err - } - - return nil - }, - Mutate: func(s *Store, tx *txn) error { - // In one transaction the operator moves the web service and it's - // sidecar from node2 back to node1 and deletes them from node2 - - if err := s.deleteServiceTxn(tx, tx.Index, "node2", "web", nil); err != nil { - return err - } - if err := s.deleteServiceTxn(tx, tx.Index, "node2", "web_sidecar_proxy", nil); err != nil { - return err - } - - // Register those on node1 - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false); err != nil { - return err - } - - // And for good measure, add a new connect-native service to node2 - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "api", regConnectNative, regNode2), false); err != nil { - return err - } - - return nil - }, - - WantEvents: []stream.Event{ - // We should see: - // - service dereg for web and proxy on node2 - // - connect dereg for web on node2 - // - service reg for web and proxy on node1 - // - connect reg for web on node1 - // - service reg for api on node2 - // - connect reg for api on node2 - testServiceHealthDeregistrationEvent(t, "web", evNode2), - testServiceHealthDeregistrationEvent(t, "web", evNode2, evSidecar), - testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evNode2, evSidecar), - - testServiceHealthEvent(t, "web", evNodeUnchanged), - testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged), - testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged), - - testServiceHealthEvent(t, "api", evNode2, evConnectNative, evNodeUnchanged), - testServiceHealthEvent(t, "api", evNode2, evConnectTopic, evConnectNative, evNodeUnchanged), - }, - }, - { - Name: "terminating gateway registered with no config entry", - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, - "tgate1", - evServiceTermingGateway("tgate1")), - }, - }, - { - Name: "config entry created with no terminating gateway instance", - Mutate: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - return ensureConfigEntryTxn(tx, tx.Index, configEntry) - }, - WantEvents: []stream.Event{}, - }, - { - Name: "terminating gateway registered after config entry exists", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - { - Name: "srv2", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - return ensureConfigEntryTxn(tx, tx.Index, configEntry) - }, - Mutate: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn( - tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false, - ); err != nil { - return err - } - return s.ensureRegistrationTxn( - tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway, regNode2), false) - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, - "tgate1", - evServiceTermingGateway("tgate1")), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1")), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv2")), - testServiceHealthEvent(t, - "tgate1", - evServiceTermingGateway("tgate1"), - evNode2), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1"), - evNode2), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv2"), - evNode2), - }, - }, - { - Name: "terminating gateway updated after config entry exists", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - { - Name: "srv2", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn( - tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn( - tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway, regNodeCheckFail), false) - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, - "tgate1", - evServiceTermingGateway("tgate1"), - evNodeCheckFail, - evNodeUnchanged, - evNodeChecksMutated, - evServiceUnchanged), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1"), - evNodeCheckFail, - evNodeUnchanged, - evNodeChecksMutated, - evServiceUnchanged), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv2"), - evNodeCheckFail, - evNodeUnchanged, - evNodeChecksMutated, - evServiceUnchanged), - }, - }, - { - Name: "terminating gateway config entry created after gateway exists", - Setup: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - { - Name: "srv2", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - return ensureConfigEntryTxn(tx, tx.Index, configEntry) - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1"), - evServiceIndex(setupIndex)), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv2"), - evServiceIndex(setupIndex)), - }, - }, - { - Name: "change the terminating gateway config entry to add a linked service", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - { - Name: "srv2", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - return ensureConfigEntryTxn(tx, tx.Index, configEntry) - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv2"), - evServiceIndex(setupIndex)), - }, - }, - { - Name: "change the terminating gateway config entry to remove a linked service", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - { - Name: "srv2", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv2", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - return ensureConfigEntryTxn(tx, tx.Index, configEntry) - }, - WantEvents: []stream.Event{ - testServiceHealthDeregistrationEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1")), - }, - }, - { - Name: "update a linked service within a terminating gateway config entry", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - CAFile: "foo.crt", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - return ensureConfigEntryTxn(tx, tx.Index, configEntry) - }, - WantEvents: []stream.Event{ - testServiceHealthDeregistrationEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1")), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1"), - evServiceIndex(setupIndex)), - }, - }, - { - Name: "delete a terminating gateway config entry with a linked service", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - err = s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - if err != nil { - return err - } - return s.ensureRegistrationTxn( - tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway, regNode2), false) - }, - Mutate: func(s *Store, tx *txn) error { - return deleteConfigEntryTxn(tx, tx.Index, structs.TerminatingGateway, "tgate1", structs.DefaultEnterpriseMeta()) - }, - WantEvents: []stream.Event{ - testServiceHealthDeregistrationEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1")), - testServiceHealthDeregistrationEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1"), - evNode2), - }, - }, - { - Name: "create an instance of a linked service in a terminating gateway", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "srv1"), false) - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, "srv1", evNodeUnchanged), - }, - }, - { - Name: "delete an instance of a linked service in a terminating gateway", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - err = s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "srv1"), false) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.deleteServiceTxn(tx, tx.Index, "node1", "srv1", nil) - }, - WantEvents: []stream.Event{ - testServiceHealthDeregistrationEvent(t, "srv1"), - }, - }, - { - Name: "rename a terminating gateway instance", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - configEntry = &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate2", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err = ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - rename := func(req *structs.RegisterRequest) error { - req.Service.Service = "tgate2" - req.Checks[1].ServiceName = "tgate2" - return nil - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway, rename), false) - }, - WantEvents: []stream.Event{ - testServiceHealthDeregistrationEvent(t, - "tgate1", - evServiceTermingGateway("tgate1")), - testServiceHealthEvent(t, - "tgate1", - evServiceTermingGateway(""), - evNodeUnchanged, - evServiceMutated, - evServiceChecksMutated, - evTerminatingGatewayRenamed("tgate2")), - testServiceHealthDeregistrationEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1")), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1"), - evNodeUnchanged, - evServiceMutated, - evServiceChecksMutated, - evTerminatingGatewayRenamed("tgate2")), - }, - }, - { - Name: "delete a terminating gateway instance", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - { - Name: "srv2", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.deleteServiceTxn(tx, tx.Index, "node1", "tgate1", structs.DefaultEnterpriseMeta()) - }, - WantEvents: []stream.Event{ - testServiceHealthDeregistrationEvent(t, - "tgate1", - evServiceTermingGateway("")), - testServiceHealthDeregistrationEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1")), - testServiceHealthDeregistrationEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv2")), - }, - }, + if tc.Setup != nil { + // Bypass the publish mechanism for this test or we get into odd + // recursive stuff... + setupTx := s.db.WriteTxn(setupIndex) + require.NoError(t, tc.Setup(s, setupTx)) + // Commit the underlying transaction without using wrapped Commit so we + // avoid the whole event publishing system for setup here. It _should_ + // work but it makes debugging test hard as it will call the function + // under test for the setup data... + setupTx.Txn.Commit() } - cases = withServiceHealthEnterpriseCases(cases) - for _, tc := range cases { - tc := tc - t.Run(tc.Name, func(t *testing.T) { - s := testStateStore(t) + tx := s.db.WriteTxn(mutateIndex) + require.NoError(t, tc.Mutate(s, tx)) - if tc.Setup != nil { - // Bypass the publish mechanism for this test or we get into odd - // recursive stuff... - setupTx := s.db.WriteTxn(setupIndex) - require.NoError(t, tc.Setup(s, setupTx)) - // Commit the underlying transaction without using wrapped Commit so we - // avoid the whole event publishing system for setup here. It _should_ - // work but it makes debugging test hard as it will call the function - // under test for the setup data... - setupTx.Txn.Commit() - } - - tx := s.db.WriteTxn(mutateIndex) - require.NoError(t, tc.Mutate(s, tx)) - - // Note we call the func under test directly rather than publishChanges so - // we can test this in isolation. - got, err := ServiceHealthEventsFromChanges(tx, Changes{Changes: tx.Changes(), Index: 100}) - if tc.WantErr { - require.Error(t, err) - return - } - require.NoError(t, err) - - assertDeepEqual(t, tc.WantEvents, got, cmpPartialOrderEvents, cmpopts.EquateEmpty()) - }) + // Note we call the func under test directly rather than publishChanges so + // we can test this in isolation. + got, err := ServiceHealthEventsFromChanges(tx, Changes{Changes: tx.Changes(), Index: 100}) + if tc.WantErr { + require.Error(t, err) + return } + require.NoError(t, err) + + assertDeepEqual(t, tc.WantEvents, got, cmpPartialOrderEvents, cmpopts.EquateEmpty()) +} + +func runCase(t *testing.T, name string, fn func(t *testing.T)) { + t.Helper() + t.Run(name, func(t *testing.T) { + t.Helper() + t.Log("case:", name) + fn(t) + }) } func regTerminatingGateway(req *structs.RegisterRequest) error {