diff --git a/agent/consul/state/catalog_events_oss_test.go b/agent/consul/state/catalog_events_oss_test.go deleted file mode 100644 index bad6fa817..000000000 --- a/agent/consul/state/catalog_events_oss_test.go +++ /dev/null @@ -1,7 +0,0 @@ -// +build !consulent - -package state - -func withServiceHealthEnterpriseCases(cases []serviceHealthTestCase) []serviceHealthTestCase { - return cases -} diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index de7a266f0..2d901388f 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -196,7 +196,7 @@ func newIndexCounter() *indexCounter { var _ stream.SnapshotAppender = (*snapshotAppender)(nil) -type serviceHealthTestCase struct { +type eventsTestCase struct { Name string Setup func(s *Store, tx *txn) error Mutate func(s *Store, tx *txn) error @@ -205,1403 +205,1413 @@ type serviceHealthTestCase struct { } func TestServiceHealthEventsFromChanges(t *testing.T) { + setupIndex := uint64(10) + + run := func(t *testing.T, tc eventsTestCase) { + t.Helper() + runCase(t, tc.Name, tc.run) + } + + run(t, eventsTestCase{ + Name: "irrelevant events", + Mutate: func(s *Store, tx *txn) error { + return kvsSetTxn(tx, tx.Index, &structs.DirEntry{ + Key: "foo", + Value: []byte("bar"), + }, false) + }, + WantEvents: nil, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "service reg, new node", + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "web"), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "service reg, existing node", + Setup: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false) + }, + WantEvents: []stream.Event{ + // Should only publish new service + testServiceHealthEvent(t, "web", evNodeUnchanged), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "service dereg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteServiceTxn(tx, tx.Index, "node1", "web", nil) + }, + WantEvents: []stream.Event{ + // Should only publish deregistration for that service + testServiceHealthDeregistrationEvent(t, "web"), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "node dereg", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db"), false); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web"), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteNodeTxn(tx, tx.Index, "node1") + }, + WantEvents: []stream.Event{ + // Should publish deregistration events for all services + testServiceHealthDeregistrationEvent(t, "db"), + testServiceHealthDeregistrationEvent(t, "web"), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect native reg, new node", + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative), false) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event as well as a connect + // one. + testServiceHealthEvent(t, "web", evConnectNative), + testServiceHealthEvent(t, "web", evConnectNative, evConnectTopic), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect native reg, existing node", + Setup: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db"), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative), false) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event as well as a connect + // one. + testServiceHealthEvent(t, "web", + evNodeUnchanged, + evConnectNative), + testServiceHealthEvent(t, "web", + evNodeUnchanged, + evConnectNative, + evConnectTopic), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect native dereg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db"), false); err != nil { + return err + } + + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteServiceTxn(tx, tx.Index, "node1", "web", nil) + }, + WantEvents: []stream.Event{ + // We should see both a regular service dereg event and a connect one + testServiceHealthDeregistrationEvent(t, "web", evConnectNative), + testServiceHealthDeregistrationEvent(t, "web", evConnectNative, evConnectTopic), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect sidecar reg, new node", + Mutate: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web"), false); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regSidecar), false) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event for the web service + // another for the sidecar service and a connect event for web. + testServiceHealthEvent(t, "web"), + testServiceHealthEvent(t, "web", evSidecar), + testServiceHealthEvent(t, "web", evConnectTopic, evSidecar), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect sidecar reg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event for the proxy + // service and a connect one for the target service. + testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged), + testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect sidecar dereg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false) + }, + Mutate: func(s *Store, tx *txn) error { + // Delete only the sidecar + return s.deleteServiceTxn(tx, tx.Index, "node1", "web_sidecar_proxy", nil) + }, + WantEvents: []stream.Event{ + // We should see both a regular service dereg event and a connect one + testServiceHealthDeregistrationEvent(t, "web", evSidecar), + testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evSidecar), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect sidecar mutate svc", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false) + }, + Mutate: func(s *Store, tx *txn) error { + // Change port of the target service instance + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regMutatePort), false) + }, + WantEvents: []stream.Event{ + // We should see the service topic update but not connect since proxy + // details didn't change. + testServiceHealthEvent(t, "web", + evMutatePort, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect sidecar mutate sidecar", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false) + }, + Mutate: func(s *Store, tx *txn) error { + // Change port of the sidecar service instance + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regMutatePort), false) + }, + WantEvents: []stream.Event{ + // We should see the proxy service topic update and a connect update + testServiceHealthEvent(t, "web", + evSidecar, + evMutatePort, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeUnchanged, + evMutatePort, + evServiceMutated, + evChecksUnchanged), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect sidecar rename service", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false) + }, + Mutate: func(s *Store, tx *txn) error { + // Change service name but not ID, update proxy too + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regRenameService), false); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regRenameService), false) + }, + WantEvents: []stream.Event{ + // We should see events to deregister the old service instance and the + // old connect instance since we changed topic key for both. Then new + // service and connect registrations. The proxy instance should also + // change since it's not proxying a different service. + testServiceHealthDeregistrationEvent(t, "web"), + testServiceHealthEvent(t, "web", + evRenameService, + evServiceMutated, + evNodeUnchanged, + evServiceChecksMutated, + ), + testServiceHealthDeregistrationEvent(t, "web", + evConnectTopic, + evSidecar, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evRenameService, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeUnchanged, + evRenameService, + evServiceMutated, + evChecksUnchanged, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "connect sidecar change destination service", + Setup: func(s *Store, tx *txn) error { + // Register a web_changed service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web_changed"), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + // And a sidecar initially for web, will be moved to target web_changed + // in Mutate. + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false) + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the destination service of the proxy without a service + // rename or deleting and recreating the proxy. This is far fetched but + // still valid. + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regRenameService), false) + }, + WantEvents: []stream.Event{ + // We should only see service health events for the sidecar service + // since the actual target services didn't change. But also should see + // Connect topic dereg for the old name to update existing subscribers + // for Connect/web. + testServiceHealthDeregistrationEvent(t, "web", + evConnectTopic, + evSidecar, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evRenameService, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeUnchanged, + evRenameService, + evServiceMutated, + evChecksUnchanged, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "multi-service node update", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the node meta. + return s.ensureRegistrationTxn(tx, tx.Index, false, + testNodeRegistration(t, regNodeMeta), false) + }, + WantEvents: []stream.Event{ + // We should see updates for all services and a connect update for the + // sidecar's destination. + testServiceHealthEvent(t, "db", + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "multi-service node rename", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the node NAME but not it's ID. We do it for every service + // though since this is effectively what client agent anti-entropy would + // do on a node rename. If we only rename the node it will have no + // services registered afterwards. + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db", regRenameNode), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regRenameNode), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regRenameNode), false); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + // Node rename is implemented internally as a node delete and new node + // insert after some renaming validation. So we should see full set of + // new events for health, then the deletions of old services, then the + // connect update and delete pair. + testServiceHealthEvent(t, "db", + evRenameNode, + // Although we delete and re-insert, we do maintain the CreatedIndex + // of the node record from the old one. + evNodeMutated, + ), + testServiceHealthEvent(t, "web", + evRenameNode, + evNodeMutated, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evRenameNode, + evNodeMutated, + ), + // dereg events for old node name services + testServiceHealthDeregistrationEvent(t, "db"), + testServiceHealthDeregistrationEvent(t, "web"), + testServiceHealthDeregistrationEvent(t, "web", evSidecar), + // Connect topic updates are last due to the way we add them + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evRenameNode, + evNodeMutated, + ), + testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evSidecar), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "multi-service node check failure", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the node-level check status + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regNodeCheckFail), false); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "db", + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + // Only the node check changed. This needs to come after evNodeUnchanged + evNodeChecksMutated, + ), + testServiceHealthEvent(t, "web", + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evNodeChecksMutated, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evNodeChecksMutated, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evNodeChecksMutated, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "multi-service node service check failure", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change the service-level check status + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regServiceCheckFail), false); err != nil { + return err + } + // Also change the service-level check status for the proxy. This is + // analogous to what would happen with an alias check on the client side + // - the proxies check would get updated at roughly the same time as the + // target service check updates. + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regServiceCheckFail), false); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + // Should only see the events for that one service change, the sidecar + // service and hence the connect topic for that service. + testServiceHealthEvent(t, "web", + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceChecksMutated, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceChecksMutated, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceChecksMutated, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "multi-service node node-level check delete", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Delete only the node-level check + if err := s.deleteCheckTxn(tx, tx.Index, "node1", "serf-health", nil); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "db", + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + testServiceHealthEvent(t, "web", + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "multi-service node service check delete", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Delete the service-level check for the main service + if err := s.deleteCheckTxn(tx, tx.Index, "node1", "service:web", nil); err != nil { + return err + } + // Also delete for a proxy + if err := s.deleteCheckTxn(tx, tx.Index, "node1", "service:web_sidecar_proxy", nil); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + // Should only see the events for that one service change, the sidecar + // service and hence the connect topic for that service. + testServiceHealthEvent(t, "web", + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceCheckDelete, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceCheckDelete, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceCheckDelete, + ), + }, + WantErr: false, + }) + run(t, eventsTestCase{ + Name: "many services on many nodes in one TX", + Setup: func(s *Store, tx *txn) error { + // Node1 + + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db"), false); err != nil { + return err + } + + // Node2 + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regNode2), false); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regNode2), false); err != nil { + return err + } + + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // In one transaction the operator moves the web service and it's + // sidecar from node2 back to node1 and deletes them from node2 + + if err := s.deleteServiceTxn(tx, tx.Index, "node2", "web", nil); err != nil { + return err + } + if err := s.deleteServiceTxn(tx, tx.Index, "node2", "web_sidecar_proxy", nil); err != nil { + return err + } + + // Register those on node1 + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web"), false); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar), false); err != nil { + return err + } + + // And for good measure, add a new connect-native service to node2 + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "api", regConnectNative, regNode2), false); err != nil { + return err + } + + return nil + }, + + WantEvents: []stream.Event{ + // We should see: + // - service dereg for web and proxy on node2 + // - connect dereg for web on node2 + // - service reg for web and proxy on node1 + // - connect reg for web on node1 + // - service reg for api on node2 + // - connect reg for api on node2 + testServiceHealthDeregistrationEvent(t, "web", evNode2), + testServiceHealthDeregistrationEvent(t, "web", evNode2, evSidecar), + testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evNode2, evSidecar), + + testServiceHealthEvent(t, "web", evNodeUnchanged), + testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged), + testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged), + + testServiceHealthEvent(t, "api", evNode2, evConnectNative, evNodeUnchanged), + testServiceHealthEvent(t, "api", evNode2, evConnectTopic, evConnectNative, evNodeUnchanged), + }, + }) + run(t, eventsTestCase{ + Name: "terminating gateway registered with no config entry", + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway("tgate1")), + }, + }) + run(t, eventsTestCase{ + Name: "config entry created with no terminating gateway instance", + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry) + }, + WantEvents: []stream.Event{}, + }) + run(t, eventsTestCase{ + Name: "terminating gateway registered after config entry exists", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry) + }, + Mutate: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false, + ); err != nil { + return err + } + return s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway, regNode2), false) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway("tgate1")), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2")), + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway("tgate1"), + evNode2), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evNode2), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evNode2), + }, + }) + run(t, eventsTestCase{ + Name: "terminating gateway updated after config entry exists", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + return s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway, regNodeCheckFail), false) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway("tgate1"), + evNodeCheckFail, + evNodeUnchanged, + evNodeChecksMutated, + evServiceUnchanged), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evNodeCheckFail, + evNodeUnchanged, + evNodeChecksMutated, + evServiceUnchanged), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evNodeCheckFail, + evNodeUnchanged, + evNodeChecksMutated, + evServiceUnchanged), + }, + }) + run(t, eventsTestCase{ + Name: "terminating gateway config entry created after gateway exists", + Setup: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evServiceIndex(setupIndex)), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evServiceIndex(setupIndex)), + }, + }) + run(t, eventsTestCase{ + Name: "change the terminating gateway config entry to add a linked service", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evServiceIndex(setupIndex)), + }, + }) + run(t, eventsTestCase{ + Name: "change the terminating gateway config entry to remove a linked service", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + }, + }) + run(t, eventsTestCase{ + Name: "update a linked service within a terminating gateway config entry", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + CAFile: "foo.crt", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evServiceIndex(setupIndex)), + }, + }) + run(t, eventsTestCase{ + Name: "delete a terminating gateway config entry with a linked service", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + err = s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + if err != nil { + return err + } + return s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway, regNode2), false) + }, + Mutate: func(s *Store, tx *txn) error { + return deleteConfigEntryTxn(tx, tx.Index, structs.TerminatingGateway, "tgate1", structs.DefaultEnterpriseMeta()) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evNode2), + }, + }) + run(t, eventsTestCase{ + Name: "create an instance of a linked service in a terminating gateway", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "srv1"), false) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "srv1", evNodeUnchanged), + }, + }) + run(t, eventsTestCase{ + Name: "delete an instance of a linked service in a terminating gateway", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + err = s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "srv1"), false) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteServiceTxn(tx, tx.Index, "node1", "srv1", nil) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, "srv1"), + }, + }) + run(t, eventsTestCase{ + Name: "rename a terminating gateway instance", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + configEntry = &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate2", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err = ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + rename := func(req *structs.RegisterRequest) error { + req.Service.Service = "tgate2" + req.Checks[1].ServiceName = "tgate2" + return nil + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway, rename), false) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evServiceTermingGateway("tgate1")), + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway(""), + evNodeUnchanged, + evServiceMutated, + evServiceChecksMutated, + evTerminatingGatewayRenamed("tgate2")), + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evNodeUnchanged, + evServiceMutated, + evServiceChecksMutated, + evTerminatingGatewayRenamed("tgate2")), + }, + }) + run(t, eventsTestCase{ + Name: "delete a terminating gateway instance", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteServiceTxn(tx, tx.Index, "node1", "tgate1", structs.DefaultEnterpriseMeta()) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evServiceTermingGateway("")), + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2")), + }, + }) +} + +func (tc eventsTestCase) run(t *testing.T) { + s := NewStateStore(nil) + setupIndex := uint64(10) mutateIndex := uint64(100) - cases := []serviceHealthTestCase{ - { - Name: "irrelevant events", - Mutate: func(s *Store, tx *txn) error { - return kvsSetTxn(tx, tx.Index, &structs.DirEntry{ - Key: "foo", - Value: []byte("bar"), - }, false) - }, - WantEvents: nil, - WantErr: false, - }, - { - Name: "service reg, new node", - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false) - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, "web"), - }, - WantErr: false, - }, - { - Name: "service reg, existing node", - Setup: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false) - }, - WantEvents: []stream.Event{ - // Should only publish new service - testServiceHealthEvent(t, "web", evNodeUnchanged), - }, - WantErr: false, - }, - { - Name: "service dereg, existing node", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - return s.deleteServiceTxn(tx, tx.Index, "node1", "web", nil) - }, - WantEvents: []stream.Event{ - // Should only publish deregistration for that service - testServiceHealthDeregistrationEvent(t, "web"), - }, - WantErr: false, - }, - { - Name: "node dereg", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db"), false); err != nil { - return err - } - if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web"), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - return s.deleteNodeTxn(tx, tx.Index, "node1") - }, - WantEvents: []stream.Event{ - // Should publish deregistration events for all services - testServiceHealthDeregistrationEvent(t, "db"), - testServiceHealthDeregistrationEvent(t, "web"), - }, - WantErr: false, - }, - { - Name: "connect native reg, new node", - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative), false) - }, - WantEvents: []stream.Event{ - // We should see both a regular service health event as well as a connect - // one. - testServiceHealthEvent(t, "web", evConnectNative), - testServiceHealthEvent(t, "web", evConnectNative, evConnectTopic), - }, - WantErr: false, - }, - { - Name: "connect native reg, existing node", - Setup: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db"), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative), false) - }, - WantEvents: []stream.Event{ - // We should see both a regular service health event as well as a connect - // one. - testServiceHealthEvent(t, "web", - evNodeUnchanged, - evConnectNative), - testServiceHealthEvent(t, "web", - evNodeUnchanged, - evConnectNative, - evConnectTopic), - }, - WantErr: false, - }, - { - Name: "connect native dereg, existing node", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db"), false); err != nil { - return err - } - - return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.deleteServiceTxn(tx, tx.Index, "node1", "web", nil) - }, - WantEvents: []stream.Event{ - // We should see both a regular service dereg event and a connect one - testServiceHealthDeregistrationEvent(t, "web", evConnectNative), - testServiceHealthDeregistrationEvent(t, "web", evConnectNative, evConnectTopic), - }, - WantErr: false, - }, - { - Name: "connect sidecar reg, new node", - Mutate: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web"), false); err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regSidecar), false) - }, - WantEvents: []stream.Event{ - // We should see both a regular service health event for the web service - // another for the sidecar service and a connect event for web. - testServiceHealthEvent(t, "web"), - testServiceHealthEvent(t, "web", evSidecar), - testServiceHealthEvent(t, "web", evConnectTopic, evSidecar), - }, - WantErr: false, - }, - { - Name: "connect sidecar reg, existing node", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false) - }, - WantEvents: []stream.Event{ - // We should see both a regular service health event for the proxy - // service and a connect one for the target service. - testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged), - testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged), - }, - WantErr: false, - }, - { - Name: "connect sidecar dereg, existing node", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false) - }, - Mutate: func(s *Store, tx *txn) error { - // Delete only the sidecar - return s.deleteServiceTxn(tx, tx.Index, "node1", "web_sidecar_proxy", nil) - }, - WantEvents: []stream.Event{ - // We should see both a regular service dereg event and a connect one - testServiceHealthDeregistrationEvent(t, "web", evSidecar), - testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evSidecar), - }, - WantErr: false, - }, - { - Name: "connect sidecar mutate svc", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false) - }, - Mutate: func(s *Store, tx *txn) error { - // Change port of the target service instance - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regMutatePort), false) - }, - WantEvents: []stream.Event{ - // We should see the service topic update but not connect since proxy - // details didn't change. - testServiceHealthEvent(t, "web", - evMutatePort, - evNodeUnchanged, - evServiceMutated, - evChecksUnchanged, - ), - }, - WantErr: false, - }, - { - Name: "connect sidecar mutate sidecar", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false) - }, - Mutate: func(s *Store, tx *txn) error { - // Change port of the sidecar service instance - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar, regMutatePort), false) - }, - WantEvents: []stream.Event{ - // We should see the proxy service topic update and a connect update - testServiceHealthEvent(t, "web", - evSidecar, - evMutatePort, - evNodeUnchanged, - evServiceMutated, - evChecksUnchanged), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evNodeUnchanged, - evMutatePort, - evServiceMutated, - evChecksUnchanged), - }, - WantErr: false, - }, - { - Name: "connect sidecar rename service", - Setup: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false) - }, - Mutate: func(s *Store, tx *txn) error { - // Change service name but not ID, update proxy too - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regRenameService), false); err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar, regRenameService), false) - }, - WantEvents: []stream.Event{ - // We should see events to deregister the old service instance and the - // old connect instance since we changed topic key for both. Then new - // service and connect registrations. The proxy instance should also - // change since it's not proxying a different service. - testServiceHealthDeregistrationEvent(t, "web"), - testServiceHealthEvent(t, "web", - evRenameService, - evServiceMutated, - evNodeUnchanged, - evServiceChecksMutated, - ), - testServiceHealthDeregistrationEvent(t, "web", - evConnectTopic, - evSidecar, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evRenameService, - evNodeUnchanged, - evServiceMutated, - evChecksUnchanged, - ), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evNodeUnchanged, - evRenameService, - evServiceMutated, - evChecksUnchanged, - ), - }, - WantErr: false, - }, - { - Name: "connect sidecar change destination service", - Setup: func(s *Store, tx *txn) error { - // Register a web_changed service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web_changed"), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - // And a sidecar initially for web, will be moved to target web_changed - // in Mutate. - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false) - }, - Mutate: func(s *Store, tx *txn) error { - // Change only the destination service of the proxy without a service - // rename or deleting and recreating the proxy. This is far fetched but - // still valid. - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar, regRenameService), false) - }, - WantEvents: []stream.Event{ - // We should only see service health events for the sidecar service - // since the actual target services didn't change. But also should see - // Connect topic dereg for the old name to update existing subscribers - // for Connect/web. - testServiceHealthDeregistrationEvent(t, "web", - evConnectTopic, - evSidecar, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evRenameService, - evNodeUnchanged, - evServiceMutated, - evChecksUnchanged, - ), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evNodeUnchanged, - evRenameService, - evServiceMutated, - evChecksUnchanged, - ), - }, - WantErr: false, - }, - { - Name: "multi-service node update", - Setup: func(s *Store, tx *txn) error { - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - // Change only the node meta. - return s.ensureRegistrationTxn(tx, tx.Index, false, - testNodeRegistration(t, regNodeMeta), false) - }, - WantEvents: []stream.Event{ - // We should see updates for all services and a connect update for the - // sidecar's destination. - testServiceHealthEvent(t, "db", - evNodeMeta, - evNodeMutated, - evServiceUnchanged, - evChecksUnchanged, - ), - testServiceHealthEvent(t, "web", - evNodeMeta, - evNodeMutated, - evServiceUnchanged, - evChecksUnchanged, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evNodeMeta, - evNodeMutated, - evServiceUnchanged, - evChecksUnchanged, - ), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evNodeMeta, - evNodeMutated, - evServiceUnchanged, - evChecksUnchanged, - ), - }, - WantErr: false, - }, - { - Name: "multi-service node rename", - Setup: func(s *Store, tx *txn) error { - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - // Change only the node NAME but not it's ID. We do it for every service - // though since this is effectively what client agent anti-entropy would - // do on a node rename. If we only rename the node it will have no - // services registered afterwards. - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db", regRenameNode), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regRenameNode), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar, regRenameNode), false); err != nil { - return err - } - return nil - }, - WantEvents: []stream.Event{ - // Node rename is implemented internally as a node delete and new node - // insert after some renaming validation. So we should see full set of - // new events for health, then the deletions of old services, then the - // connect update and delete pair. - testServiceHealthEvent(t, "db", - evRenameNode, - // Although we delete and re-insert, we do maintain the CreatedIndex - // of the node record from the old one. - evNodeMutated, - ), - testServiceHealthEvent(t, "web", - evRenameNode, - evNodeMutated, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evRenameNode, - evNodeMutated, - ), - // dereg events for old node name services - testServiceHealthDeregistrationEvent(t, "db"), - testServiceHealthDeregistrationEvent(t, "web"), - testServiceHealthDeregistrationEvent(t, "web", evSidecar), - // Connect topic updates are last due to the way we add them - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evRenameNode, - evNodeMutated, - ), - testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evSidecar), - }, - WantErr: false, - }, - { - Name: "multi-service node check failure", - Setup: func(s *Store, tx *txn) error { - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - // Change only the node-level check status - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regNodeCheckFail), false); err != nil { - return err - } - return nil - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, "db", - evNodeCheckFail, - evNodeUnchanged, - evServiceUnchanged, - // Only the node check changed. This needs to come after evNodeUnchanged - evNodeChecksMutated, - ), - testServiceHealthEvent(t, "web", - evNodeCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evNodeChecksMutated, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evNodeCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evNodeChecksMutated, - ), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evNodeCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evNodeChecksMutated, - ), - }, - WantErr: false, - }, - { - Name: "multi-service node service check failure", - Setup: func(s *Store, tx *txn) error { - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - // Change the service-level check status - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regServiceCheckFail), false); err != nil { - return err - } - // Also change the service-level check status for the proxy. This is - // analogous to what would happen with an alias check on the client side - // - the proxies check would get updated at roughly the same time as the - // target service check updates. - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar, regServiceCheckFail), false); err != nil { - return err - } - return nil - }, - WantEvents: []stream.Event{ - // Should only see the events for that one service change, the sidecar - // service and hence the connect topic for that service. - testServiceHealthEvent(t, "web", - evServiceCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evServiceChecksMutated, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evServiceCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evServiceChecksMutated, - ), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evServiceCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evServiceChecksMutated, - ), - }, - WantErr: false, - }, - { - Name: "multi-service node node-level check delete", - Setup: func(s *Store, tx *txn) error { - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - // Delete only the node-level check - if err := s.deleteCheckTxn(tx, tx.Index, "node1", "serf-health", nil); err != nil { - return err - } - return nil - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, "db", - evNodeCheckDelete, - evNodeUnchanged, - evServiceUnchanged, - ), - testServiceHealthEvent(t, "web", - evNodeCheckDelete, - evNodeUnchanged, - evServiceUnchanged, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evNodeCheckDelete, - evNodeUnchanged, - evServiceUnchanged, - ), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evNodeCheckDelete, - evNodeUnchanged, - evServiceUnchanged, - ), - }, - WantErr: false, - }, - { - Name: "multi-service node service check delete", - Setup: func(s *Store, tx *txn) error { - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false); err != nil { - return err - } - return nil - }, - Mutate: func(s *Store, tx *txn) error { - // Delete the service-level check for the main service - if err := s.deleteCheckTxn(tx, tx.Index, "node1", "service:web", nil); err != nil { - return err - } - // Also delete for a proxy - if err := s.deleteCheckTxn(tx, tx.Index, "node1", "service:web_sidecar_proxy", nil); err != nil { - return err - } - return nil - }, - WantEvents: []stream.Event{ - // Should only see the events for that one service change, the sidecar - // service and hence the connect topic for that service. - testServiceHealthEvent(t, "web", - evServiceCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evServiceCheckDelete, - ), - testServiceHealthEvent(t, "web", - evSidecar, - evServiceCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evServiceCheckDelete, - ), - testServiceHealthEvent(t, "web", - evConnectTopic, - evSidecar, - evServiceCheckFail, - evNodeUnchanged, - evServiceUnchanged, - evServiceCheckDelete, - ), - }, - WantErr: false, - }, - { - Name: "many services on many nodes in one TX", - Setup: func(s *Store, tx *txn) error { - // Node1 - - // Register a db service - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "db"), false); err != nil { - return err - } - - // Node2 - // Also a web - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regNode2), false); err != nil { - return err - } - // With a connect sidecar - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar, regNode2), false); err != nil { - return err - } - - return nil - }, - Mutate: func(s *Store, tx *txn) error { - // In one transaction the operator moves the web service and it's - // sidecar from node2 back to node1 and deletes them from node2 - - if err := s.deleteServiceTxn(tx, tx.Index, "node2", "web", nil); err != nil { - return err - } - if err := s.deleteServiceTxn(tx, tx.Index, "node2", "web_sidecar_proxy", nil); err != nil { - return err - } - - // Register those on node1 - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web"), false); err != nil { - return err - } - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "web", regSidecar), false); err != nil { - return err - } - - // And for good measure, add a new connect-native service to node2 - if err := s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "api", regConnectNative, regNode2), false); err != nil { - return err - } - - return nil - }, - - WantEvents: []stream.Event{ - // We should see: - // - service dereg for web and proxy on node2 - // - connect dereg for web on node2 - // - service reg for web and proxy on node1 - // - connect reg for web on node1 - // - service reg for api on node2 - // - connect reg for api on node2 - testServiceHealthDeregistrationEvent(t, "web", evNode2), - testServiceHealthDeregistrationEvent(t, "web", evNode2, evSidecar), - testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evNode2, evSidecar), - - testServiceHealthEvent(t, "web", evNodeUnchanged), - testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged), - testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged), - - testServiceHealthEvent(t, "api", evNode2, evConnectNative, evNodeUnchanged), - testServiceHealthEvent(t, "api", evNode2, evConnectTopic, evConnectNative, evNodeUnchanged), - }, - }, - { - Name: "terminating gateway registered with no config entry", - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, - "tgate1", - evServiceTermingGateway("tgate1")), - }, - }, - { - Name: "config entry created with no terminating gateway instance", - Mutate: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - return ensureConfigEntryTxn(tx, tx.Index, configEntry) - }, - WantEvents: []stream.Event{}, - }, - { - Name: "terminating gateway registered after config entry exists", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - { - Name: "srv2", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - return ensureConfigEntryTxn(tx, tx.Index, configEntry) - }, - Mutate: func(s *Store, tx *txn) error { - if err := s.ensureRegistrationTxn( - tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false, - ); err != nil { - return err - } - return s.ensureRegistrationTxn( - tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway, regNode2), false) - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, - "tgate1", - evServiceTermingGateway("tgate1")), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1")), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv2")), - testServiceHealthEvent(t, - "tgate1", - evServiceTermingGateway("tgate1"), - evNode2), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1"), - evNode2), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv2"), - evNode2), - }, - }, - { - Name: "terminating gateway updated after config entry exists", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - { - Name: "srv2", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn( - tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn( - tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway, regNodeCheckFail), false) - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, - "tgate1", - evServiceTermingGateway("tgate1"), - evNodeCheckFail, - evNodeUnchanged, - evNodeChecksMutated, - evServiceUnchanged), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1"), - evNodeCheckFail, - evNodeUnchanged, - evNodeChecksMutated, - evServiceUnchanged), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv2"), - evNodeCheckFail, - evNodeUnchanged, - evNodeChecksMutated, - evServiceUnchanged), - }, - }, - { - Name: "terminating gateway config entry created after gateway exists", - Setup: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - { - Name: "srv2", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - return ensureConfigEntryTxn(tx, tx.Index, configEntry) - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1"), - evServiceIndex(setupIndex)), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv2"), - evServiceIndex(setupIndex)), - }, - }, - { - Name: "change the terminating gateway config entry to add a linked service", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - { - Name: "srv2", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - return ensureConfigEntryTxn(tx, tx.Index, configEntry) - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv2"), - evServiceIndex(setupIndex)), - }, - }, - { - Name: "change the terminating gateway config entry to remove a linked service", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - { - Name: "srv2", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv2", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - return ensureConfigEntryTxn(tx, tx.Index, configEntry) - }, - WantEvents: []stream.Event{ - testServiceHealthDeregistrationEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1")), - }, - }, - { - Name: "update a linked service within a terminating gateway config entry", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - CAFile: "foo.crt", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - return ensureConfigEntryTxn(tx, tx.Index, configEntry) - }, - WantEvents: []stream.Event{ - testServiceHealthDeregistrationEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1")), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1"), - evServiceIndex(setupIndex)), - }, - }, - { - Name: "delete a terminating gateway config entry with a linked service", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - err = s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - if err != nil { - return err - } - return s.ensureRegistrationTxn( - tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway, regNode2), false) - }, - Mutate: func(s *Store, tx *txn) error { - return deleteConfigEntryTxn(tx, tx.Index, structs.TerminatingGateway, "tgate1", structs.DefaultEnterpriseMeta()) - }, - WantEvents: []stream.Event{ - testServiceHealthDeregistrationEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1")), - testServiceHealthDeregistrationEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1"), - evNode2), - }, - }, - { - Name: "create an instance of a linked service in a terminating gateway", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "srv1"), false) - }, - WantEvents: []stream.Event{ - testServiceHealthEvent(t, "srv1", evNodeUnchanged), - }, - }, - { - Name: "delete an instance of a linked service in a terminating gateway", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - err = s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "srv1"), false) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.deleteServiceTxn(tx, tx.Index, "node1", "srv1", nil) - }, - WantEvents: []stream.Event{ - testServiceHealthDeregistrationEvent(t, "srv1"), - }, - }, - { - Name: "rename a terminating gateway instance", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - configEntry = &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate2", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err = ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - rename := func(req *structs.RegisterRequest) error { - req.Service.Service = "tgate2" - req.Checks[1].ServiceName = "tgate2" - return nil - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway, rename), false) - }, - WantEvents: []stream.Event{ - testServiceHealthDeregistrationEvent(t, - "tgate1", - evServiceTermingGateway("tgate1")), - testServiceHealthEvent(t, - "tgate1", - evServiceTermingGateway(""), - evNodeUnchanged, - evServiceMutated, - evServiceChecksMutated, - evTerminatingGatewayRenamed("tgate2")), - testServiceHealthDeregistrationEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1")), - testServiceHealthEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1"), - evNodeUnchanged, - evServiceMutated, - evServiceChecksMutated, - evTerminatingGatewayRenamed("tgate2")), - }, - }, - { - Name: "delete a terminating gateway instance", - Setup: func(s *Store, tx *txn) error { - configEntry := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "tgate1", - Services: []structs.LinkedService{ - { - Name: "srv1", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - { - Name: "srv2", - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), - } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) - }, - Mutate: func(s *Store, tx *txn) error { - return s.deleteServiceTxn(tx, tx.Index, "node1", "tgate1", structs.DefaultEnterpriseMeta()) - }, - WantEvents: []stream.Event{ - testServiceHealthDeregistrationEvent(t, - "tgate1", - evServiceTermingGateway("")), - testServiceHealthDeregistrationEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv1")), - testServiceHealthDeregistrationEvent(t, - "tgate1", - evConnectTopic, - evServiceTermingGateway("srv2")), - }, - }, + if tc.Setup != nil { + // Bypass the publish mechanism for this test or we get into odd + // recursive stuff... + setupTx := s.db.WriteTxn(setupIndex) + require.NoError(t, tc.Setup(s, setupTx)) + // Commit the underlying transaction without using wrapped Commit so we + // avoid the whole event publishing system for setup here. It _should_ + // work but it makes debugging test hard as it will call the function + // under test for the setup data... + setupTx.Txn.Commit() } - cases = withServiceHealthEnterpriseCases(cases) - for _, tc := range cases { - tc := tc - t.Run(tc.Name, func(t *testing.T) { - s := testStateStore(t) + tx := s.db.WriteTxn(mutateIndex) + require.NoError(t, tc.Mutate(s, tx)) - if tc.Setup != nil { - // Bypass the publish mechanism for this test or we get into odd - // recursive stuff... - setupTx := s.db.WriteTxn(setupIndex) - require.NoError(t, tc.Setup(s, setupTx)) - // Commit the underlying transaction without using wrapped Commit so we - // avoid the whole event publishing system for setup here. It _should_ - // work but it makes debugging test hard as it will call the function - // under test for the setup data... - setupTx.Txn.Commit() - } - - tx := s.db.WriteTxn(mutateIndex) - require.NoError(t, tc.Mutate(s, tx)) - - // Note we call the func under test directly rather than publishChanges so - // we can test this in isolation. - got, err := ServiceHealthEventsFromChanges(tx, Changes{Changes: tx.Changes(), Index: 100}) - if tc.WantErr { - require.Error(t, err) - return - } - require.NoError(t, err) - - assertDeepEqual(t, tc.WantEvents, got, cmpPartialOrderEvents, cmpopts.EquateEmpty()) - }) + // Note we call the func under test directly rather than publishChanges so + // we can test this in isolation. + got, err := ServiceHealthEventsFromChanges(tx, Changes{Changes: tx.Changes(), Index: 100}) + if tc.WantErr { + require.Error(t, err) + return } + require.NoError(t, err) + + assertDeepEqual(t, tc.WantEvents, got, cmpPartialOrderEvents, cmpopts.EquateEmpty()) +} + +func runCase(t *testing.T, name string, fn func(t *testing.T)) { + t.Helper() + t.Run(name, func(t *testing.T) { + t.Helper() + t.Log("case:", name) + fn(t) + }) } func regTerminatingGateway(req *structs.RegisterRequest) error {