diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go new file mode 100644 index 000000000..0214b7ef7 --- /dev/null +++ b/agent/consul/state/catalog_events.go @@ -0,0 +1,494 @@ +package state + +import ( + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" +) + +// ServiceHealthSnapshot is a stream.SnapFn that provides a streaming snapshot +// of stream.Events that describe the current state of a service health query. +func (s *Store) ServiceHealthSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + tx := s.db.Txn(false) + defer tx.Abort() + // TODO(namespace-streaming): plumb entMeta through from SubscribeRequest + idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, false, nil) + if err != nil { + return 0, err + } + + _, err = checkServiceNodesToServiceHealth(idx, nodes, buf, TopicServiceHealth) + return idx, err +} + +// ServiceHealthSnapshot is a stream.SnapFn that provides a streaming snapshot +// of stream.Events that describe the current state of a service connect health +// query. +func (s *Store) ServiceHealthConnectSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + tx := s.db.Txn(false) + defer tx.Abort() + // TODO(namespace-streaming): plumb entMeta through from SubscribeRequest + idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, true, nil) + if err != nil { + return 0, err + } + + _, err = checkServiceNodesToServiceHealth(idx, nodes, buf, TopicServiceHealthConnect) + return idx, err +} + +type changeOp int + +const ( + OpDelete changeOp = iota + OpCreate + OpUpdate +) + +type eventPayload struct { + Op changeOp + Obj interface{} +} + +// checkServiceNodesToServiceHealth converts a list of structs.CheckServiceNodes +// to stream.ServiceHealth events for streaming. If a non-nil event buffer is +// passed, events are appended to the buffer one at a time and an nil slice is +// returned to avoid keeping a full copy in memory. +func checkServiceNodesToServiceHealth(idx uint64, nodes structs.CheckServiceNodes, + buf stream.SnapshotAppender, topic topic) ([]stream.Event, error) { + var events []stream.Event + for _, n := range nodes { + event := stream.Event{ + Index: idx, + Topic: topic, + Payload: eventPayload{ + Op: OpCreate, + Obj: &n, + }, + } + + if n.Service != nil { + event.Key = n.Service.Service + } + + // TODO: always called with a non-nil buf? + if buf != nil { + buf.Append([]stream.Event{event}) + } else { + events = append(events, event) + } + } + return events, nil +} + +type nodeServiceTuple struct { + Node string + ServiceID string + EntMeta structs.EnterpriseMeta +} + +// ServiceHealthEventsFromChanges returns all the service and Connect health +// events that should be emitted given a set of changes to the state store. +func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { + var events []stream.Event + + var nodeChanges map[string]*memdb.Change + var serviceChanges map[nodeServiceTuple]*memdb.Change + + markNode := func(node string, nodeChange *memdb.Change) { + if nodeChanges == nil { + nodeChanges = make(map[string]*memdb.Change) + } + // If the caller has an actual node mutation ensure we store it even if the + // node is already marked. If the caller is just marking the node dirty + // without an node change, don't overwrite any existing node change we know + // about. + ch := nodeChanges[node] + if ch == nil { + nodeChanges[node] = nodeChange + } + } + markService := func(node, service string, entMeta structs.EnterpriseMeta, svcChange *memdb.Change) { + if serviceChanges == nil { + serviceChanges = make(map[nodeServiceTuple]*memdb.Change) + } + k := nodeServiceTuple{ + Node: node, + ServiceID: service, + EntMeta: entMeta, + } + // If the caller has an actual service mutation ensure we store it even if + // the service is already marked. If the caller is just marking the service + // dirty without an node change, don't overwrite any existing node change we + // know about. + ch := serviceChanges[k] + if ch == nil { + serviceChanges[k] = svcChange + } + } + + for _, change := range changes.Changes { + switch change.Table { + case "nodes": + // Node changed in some way, if it's not a delete, we'll need to + // re-deliver CheckServiceNode results for all services on that node but + // we mark it anyway because if it _is_ a delete then we need to know that + // later to avoid trying to deliver events when node level checks mark the + // node as "changed". + nRaw := change.After + if change.After == nil { + nRaw = change.Before + } + n := nRaw.(*structs.Node) + changeCopy := change + markNode(n.Node, &changeCopy) + + case "services": + snRaw := change.After + if change.After == nil { + snRaw = change.Before + } + sn := snRaw.(*structs.ServiceNode) + changeCopy := change + markService(sn.Node, sn.ServiceID, sn.EnterpriseMeta, &changeCopy) + + case "checks": + // For health we only care about the scope for now to know if it's just + // affecting a single service or every service on a node. There is a + // subtle edge case where the check with same ID changes from being node + // scoped to service scoped or vice versa, in either case we need to treat + // it as affecting all services on the node. + switch { + case change.Updated(): + before := change.Before.(*structs.HealthCheck) + after := change.After.(*structs.HealthCheck) + if after.ServiceID == "" || before.ServiceID == "" { + // Either changed from or to being node-scoped + markNode(after.Node, nil) + } else { + // Check changed which means we just need to emit for the linked + // service. + markService(after.Node, after.ServiceID, after.EnterpriseMeta, nil) + + // Edge case - if the check with same ID was updated to link to a + // different service ID but the old service with old ID still exists, + // then the old service instance needs updating too as it has one + // fewer checks now. + if before.ServiceID != after.ServiceID { + markService(before.Node, before.ServiceID, before.EnterpriseMeta, nil) + } + } + + case change.Deleted(): + before := change.Before.(*structs.HealthCheck) + if before.ServiceID == "" { + // Node level check + markNode(before.Node, nil) + } else { + markService(before.Node, before.ServiceID, before.EnterpriseMeta, nil) + } + + case change.Created(): + after := change.After.(*structs.HealthCheck) + if after.ServiceID == "" { + // Node level check + markNode(after.Node, nil) + } else { + markService(after.Node, after.ServiceID, after.EnterpriseMeta, nil) + } + } + } + } + + // Now act on those marked nodes/services + for node, change := range nodeChanges { + // change may be nil if there was a change that _affected_ the node + // like a change to checks but it didn't actually change the node + // record itself. + if change != nil && change.Deleted() { + // Node deletions are a no-op here since the state store transaction will + // have also removed all the service instances which will be handled in + // the loop below. + continue + } + // Rebuild events for all services on this node + es, err := serviceHealthEventsForNode(tx, changes.Index, node) + if err != nil { + return nil, err + } + events = append(events, es...) + } + + for tuple, change := range serviceChanges { + // change may be nil if there was a change that _affected_ the service + // like a change to checks but it didn't actually change the service + // record itself. + if change != nil && change.Deleted() { + // Generate delete event for the service instance and append it + sn := change.Before.(*structs.ServiceNode) + es, err := serviceHealthDeregEventsForServiceInstance(changes.Index, sn, &tuple.EntMeta) + if err != nil { + return nil, err + } + events = append(events, es...) + continue + } + + // Check if this was a service mutation that changed it's name which + // requires special handling even if node changed and new events were + // already published. + if change != nil && change.Updated() { + before := change.Before.(*structs.ServiceNode) + after := change.After.(*structs.ServiceNode) + + if before.ServiceName != after.ServiceName { + // Service was renamed, the code below will ensure the new registrations + // go out to subscribers to the new service name topic key, but we need + // to fix up subscribers that were watching the old name by sending + // deregistrations. + es, err := serviceHealthDeregEventsForServiceInstance(changes.Index, before, &tuple.EntMeta) + if err != nil { + return nil, err + } + events = append(events, es...) + } + + if before.ServiceKind == structs.ServiceKindConnectProxy && + before.ServiceProxy.DestinationServiceName != after.ServiceProxy.DestinationServiceName { + // Connect proxy changed the service it is representing, need to issue a + // dereg for the old service on the Connect topic. We don't actually need + // to deregister this sidecar service though as it still exists and + // didn't change its name (or if it did that was caught just above). But + // our mechanism for connect events is to convert them so we generate + // the regular one, convert it to Connect topic and then discar the + // original. + es, err := serviceHealthDeregEventsForServiceInstance(changes.Index, before, &tuple.EntMeta) + if err != nil { + return nil, err + } + // Don't append es per comment above, but convert it to connect topic + // events. + es = serviceHealthToConnectEvents(es) + events = append(events, es...) + } + } + + if _, ok := nodeChanges[tuple.Node]; ok { + // We already rebuilt events for everything on this node, no need to send + // a duplicate. + continue + } + // Build service event and append it + es, err := serviceHealthEventsForServiceInstance(tx, changes.Index, tuple) + if err != nil { + return nil, err + } + events = append(events, es...) + } + + // Duplicate any events that affected connect-enabled instances (proxies or + // native apps) to the relevant Connect topic. + events = append(events, serviceHealthToConnectEvents(events)...) + + return events, nil +} + +// serviceHealthToConnectEvents converts already formatted service health +// registration events into the ones needed to publish to the Connect topic. +// This essentially means filtering out any instances that are not Connect +// enabled and so of no interest to those subscribers but also involves +// switching connection details to be the proxy instead of the actual instance +// in case of a sidecar. +func serviceHealthToConnectEvents(events []stream.Event) []stream.Event { + serviceHealthConnectEvents := make([]stream.Event, 0, len(events)) + for _, event := range events { + if event.Topic != TopicServiceHealth { + // Skip non-health or any events already emitted to Connect topic + continue + } + node := getPayloadCheckServiceNode(event.Payload) + if node.Service == nil || + (node.Service.Kind != structs.ServiceKindConnectProxy && !node.Service.Connect.Native) { + // Event is not a service instance (i.e. just a node registration) + // or is not a service that is not connect-enabled in some way. + continue + } + + connectEvent := event + connectEvent.Topic = TopicServiceHealthConnect + + // If this is a proxy, set the key to the destination service name. + if node.Service.Kind == structs.ServiceKindConnectProxy { + connectEvent.Key = node.Service.Proxy.DestinationServiceName + } + + serviceHealthConnectEvents = append(serviceHealthConnectEvents, connectEvent) + } + + return serviceHealthConnectEvents +} + +func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode { + ep, ok := payload.(eventPayload) + if !ok { + return nil + } + csn, ok := ep.Obj.(*structs.CheckServiceNode) + if !ok { + return nil + } + return csn +} + +// serviceHealthEventsForNode returns health events for all services on the +// given node. This mirrors some of the the logic in the oddly-named +// parseCheckServiceNodes but is more efficient since we know they are all on +// the same node. +func serviceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.Event, error) { + // TODO(namespace-streaming): figure out the right EntMeta and mystery arg. + services, err := catalogServiceListByNode(tx, node, nil, false) + if err != nil { + return nil, err + } + + n, nodeChecks, svcChecks, err := getNodeAndChecks(tx, node) + if err != nil { + return nil, err + } + + var events []stream.Event + for service := services.Next(); service != nil; service = services.Next() { + sn := service.(*structs.ServiceNode) + + es, err := serviceHealthEventsForServiceNodeInternal(idx, n, sn, nodeChecks, svcChecks) + if err != nil { + return nil, err + } + + // Append to the results. + events = append(events, es...) + } + + return events, nil +} + +// getNodeAndNodeChecks returns a specific node and ALL checks on that node +// (both node specific and service-specific). node-level Checks are returned as +// a slice, service-specific checks as a map of slices with the service id as +// the map key. +func getNodeAndChecks(tx ReadTxn, node string) (*structs.Node, + structs.HealthChecks, map[string]structs.HealthChecks, error) { + // Fetch the node + nodeRaw, err := tx.First("nodes", "id", node) + if err != nil { + return nil, nil, nil, err + } + if nodeRaw == nil { + return nil, nil, nil, ErrMissingNode + } + n := nodeRaw.(*structs.Node) + + // TODO(namespace-streaming): work out what EntMeta is needed here, wildcard? + iter, err := catalogListChecksByNode(tx, node, nil) + if err != nil { + return nil, nil, nil, err + } + + var nodeChecks structs.HealthChecks + var svcChecks map[string]structs.HealthChecks + + for check := iter.Next(); check != nil; check = iter.Next() { + check := check.(*structs.HealthCheck) + if check.ServiceID == "" { + nodeChecks = append(nodeChecks, check) + } else { + if svcChecks == nil { + svcChecks = make(map[string]structs.HealthChecks) + } + svcChecks[check.ServiceID] = append(svcChecks[check.ServiceID], check) + } + } + return n, nodeChecks, svcChecks, nil +} + +func serviceHealthEventsForServiceInstance(tx ReadTxn, idx uint64, tuple nodeServiceTuple) ([]stream.Event, error) { + n, nodeChecks, svcChecks, err := getNodeAndChecks(tx, tuple.Node) + if err != nil { + return nil, err + } + + svc, err := getCompoundWithTxn(tx, "services", "id", &tuple.EntMeta, tuple.Node, tuple.ServiceID) + if err != nil { + return nil, err + } + + sn := svc.Next() + if sn == nil { + return nil, ErrMissingService + } + + return serviceHealthEventsForServiceNodeInternal(idx, n, sn.(*structs.ServiceNode), nodeChecks, svcChecks) +} + +func serviceHealthEventsForServiceNodeInternal(idx uint64, + node *structs.Node, + sn *structs.ServiceNode, + nodeChecks structs.HealthChecks, + svcChecks map[string]structs.HealthChecks) ([]stream.Event, error) { + + // Start with a copy of the node checks. + checks := nodeChecks + for _, check := range svcChecks[sn.ServiceID] { + checks = append(checks, check) + } + + csn := &structs.CheckServiceNode{ + Node: node, + Service: sn.ToNodeService(), + Checks: checks, + } + e := stream.Event{ + Topic: TopicServiceHealth, + Key: sn.ServiceName, + Index: idx, + Payload: eventPayload{ + Op: OpCreate, + Obj: csn, + }, + } + + // See if we also need to emit a connect event (i.e. if this instance is a + // connect proxy or connect native app). + + return []stream.Event{e}, nil +} + +func serviceHealthDeregEventsForServiceInstance(idx uint64, + sn *structs.ServiceNode, entMeta *structs.EnterpriseMeta) ([]stream.Event, error) { + + // We actually only need the node name populated in the node part as it's only + // used as a key to know which service was deregistered so don't bother looking + // up the node in the DB. Note that while the ServiceNode does have NodeID + // etc. fields, they are never populated in memdb per the comment on that + // struct and only filled in when we return copies of the result to users. + // This is also important because if the service was deleted as part of a + // whole node deregistering then the node record won't actually exist now + // anyway and we'd have to plumb it through from the changeset above. + csn := &structs.CheckServiceNode{ + Node: &structs.Node{ + Node: sn.Node, + }, + Service: sn.ToNodeService(), + } + + e := stream.Event{ + Topic: TopicServiceHealth, + Key: sn.ServiceName, + Index: idx, + Payload: eventPayload{ + Op: OpDelete, + Obj: csn, + }, + } + return []stream.Event{e}, nil +} diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go new file mode 100644 index 000000000..5cf610604 --- /dev/null +++ b/agent/consul/state/catalog_events_test.go @@ -0,0 +1,1492 @@ +package state + +import ( + "fmt" + "testing" + + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/types" + "github.com/stretchr/testify/require" +) + +func TestServiceHealthEventsFromChanges(t *testing.T) { + cases := []struct { + Name string + Setup func(s *Store, tx *txn) error + Mutate func(s *Store, tx *txn) error + WantEvents []stream.Event + WantErr bool + }{ + { + Name: "irrelevant events", + Mutate: func(s *Store, tx *txn) error { + return kvsSetTxn(tx, tx.Index, &structs.DirEntry{ + Key: "foo", + Value: []byte("bar"), + }, false) + }, + WantEvents: nil, + WantErr: false, + }, + { + Name: "service reg, new node", + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "web"), + }, + WantErr: false, + }, + { + Name: "service reg, existing node", + Setup: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")) + }, + WantEvents: []stream.Event{ + // Should only publish new service + testServiceHealthEvent(t, "web", evNodeUnchanged), + }, + WantErr: false, + }, + { + Name: "service dereg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteServiceTxn(tx, tx.Index, "node1", "web", nil) + }, + WantEvents: []stream.Event{ + // Should only publish deregistration for that service + testServiceHealthDeregistrationEvent(t, "web"), + }, + WantErr: false, + }, + { + Name: "node dereg", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db")); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web")); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteNodeTxn(tx, tx.Index, "node1") + }, + WantEvents: []stream.Event{ + // Should publish deregistration events for all services + testServiceHealthDeregistrationEvent(t, "db"), + testServiceHealthDeregistrationEvent(t, "web"), + }, + WantErr: false, + }, + { + Name: "connect native reg, new node", + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative)) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event as well as a connect + // one. + testServiceHealthEvent(t, "web", evConnectNative), + testServiceHealthEvent(t, "web", evConnectNative, evConnectTopic), + }, + WantErr: false, + }, + { + Name: "connect native reg, existing node", + Setup: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db")) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative)) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event as well as a connect + // one. + testServiceHealthEvent(t, "web", + evNodeUnchanged, + evConnectNative), + testServiceHealthEvent(t, "web", + evNodeUnchanged, + evConnectNative, + evConnectTopic), + }, + WantErr: false, + }, + { + Name: "connect native dereg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db")); err != nil { + return err + } + + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative)) + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteServiceTxn(tx, tx.Index, "node1", "web", nil) + }, + WantEvents: []stream.Event{ + // We should see both a regular service dereg event and a connect one + testServiceHealthDeregistrationEvent(t, "web", evConnectNative), + testServiceHealthDeregistrationEvent(t, "web", evConnectNative, evConnectTopic), + }, + WantErr: false, + }, + { + Name: "connect sidecar reg, new node", + Mutate: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web")); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regSidecar)) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event for the web service + // another for the sidecar service and a connect event for web. + testServiceHealthEvent(t, "web"), + testServiceHealthEvent(t, "web", evSidecar), + testServiceHealthEvent(t, "web", evConnectTopic, evSidecar), + }, + WantErr: false, + }, + { + Name: "connect sidecar reg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event for the proxy + // service and a connect one for the target service. + testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged), + testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged), + }, + WantErr: false, + }, + { + Name: "connect sidecar dereg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)) + }, + Mutate: func(s *Store, tx *txn) error { + // Delete only the sidecar + return s.deleteServiceTxn(tx, tx.Index, "node1", "web_sidecar_proxy", nil) + }, + WantEvents: []stream.Event{ + // We should see both a regular service dereg event and a connect one + testServiceHealthDeregistrationEvent(t, "web", evSidecar), + testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evSidecar), + }, + WantErr: false, + }, + { + Name: "connect sidecar mutate svc", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)) + }, + Mutate: func(s *Store, tx *txn) error { + // Change port of the target service instance + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regMutatePort)) + }, + WantEvents: []stream.Event{ + // We should see the service topic update but not connect since proxy + // details didn't change. + testServiceHealthEvent(t, "web", + evMutatePort, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged, + ), + }, + WantErr: false, + }, + { + Name: "connect sidecar mutate sidecar", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)) + }, + Mutate: func(s *Store, tx *txn) error { + // Change port of the sidecar service instance + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regMutatePort)) + }, + WantEvents: []stream.Event{ + // We should see the proxy service topic update and a connect update + testServiceHealthEvent(t, "web", + evSidecar, + evMutatePort, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeUnchanged, + evMutatePort, + evServiceMutated, + evChecksUnchanged), + }, + WantErr: false, + }, + { + Name: "connect sidecar rename service", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)) + }, + Mutate: func(s *Store, tx *txn) error { + // Change service name but not ID, update proxy too + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regRenameService)); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regRenameService)) + }, + WantEvents: []stream.Event{ + // We should see events to deregister the old service instance and the + // old connect instance since we changed topic key for both. Then new + // service and connect registrations. The proxy instance should also + // change since it's not proxying a different service. + testServiceHealthDeregistrationEvent(t, "web"), + testServiceHealthEvent(t, "web", + evRenameService, + evServiceMutated, + evNodeUnchanged, + evChecksMutated, + ), + testServiceHealthDeregistrationEvent(t, "web", + evConnectTopic, + evSidecar, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evRenameService, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeUnchanged, + evRenameService, + evServiceMutated, + evChecksUnchanged, + ), + }, + WantErr: false, + }, + { + Name: "connect sidecar change destination service", + Setup: func(s *Store, tx *txn) error { + // Register a web_changed service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web_changed")); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + // And a sidecar initially for web, will be moved to target web_changed + // in Mutate. + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)) + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the destination service of the proxy without a service + // rename or deleting and recreating the proxy. This is far fetched but + // still valid. + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regRenameService)) + }, + WantEvents: []stream.Event{ + // We should only see service health events for the sidecar service + // since the actual target services didn't change. But also should see + // Connect topic dereg for the old name to update existing subscribers + // for Connect/web. + testServiceHealthDeregistrationEvent(t, "web", + evConnectTopic, + evSidecar, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evRenameService, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeUnchanged, + evRenameService, + evServiceMutated, + evChecksUnchanged, + ), + }, + WantErr: false, + }, + { + Name: "multi-service node update", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the node meta. + return s.ensureRegistrationTxn(tx, tx.Index, false, + testNodeRegistration(t, regNodeMeta)) + }, + WantEvents: []stream.Event{ + // We should see updates for all services and a connect update for the + // sidecar's destination. + testServiceHealthEvent(t, "db", + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + }, + WantErr: false, + }, + { + Name: "multi-service node rename", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the node NAME but not it's ID. We do it for every service + // though since this is effectively what client agent anti-entropy would + // do on a node rename. If we only rename the node it will have no + // services registered afterwards. + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db", regRenameNode)); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regRenameNode)); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regRenameNode)); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + // Node rename is implemented internally as a node delete and new node + // insert after some renaming validation. So we should see full set of + // new events for health, then the deletions of old services, then the + // connect update and delete pair. + testServiceHealthEvent(t, "db", + evRenameNode, + // Although we delete and re-insert, we do maintain the CreatedIndex + // of the node record from the old one. + evNodeMutated, + ), + testServiceHealthEvent(t, "web", + evRenameNode, + evNodeMutated, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evRenameNode, + evNodeMutated, + ), + // dereg events for old node name services + testServiceHealthDeregistrationEvent(t, "db"), + testServiceHealthDeregistrationEvent(t, "web"), + testServiceHealthDeregistrationEvent(t, "web", evSidecar), + // Connect topic updates are last due to the way we add them + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evRenameNode, + evNodeMutated, + ), + testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evSidecar), + }, + WantErr: false, + }, + { + Name: "multi-service node check failure", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the node-level check status + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regNodeCheckFail)); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "db", + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + // Only the node check changed. This needs to come after evNodeUnchanged + evNodeChecksMutated, + ), + testServiceHealthEvent(t, "web", + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evNodeChecksMutated, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evNodeChecksMutated, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evNodeChecksMutated, + ), + }, + WantErr: false, + }, + { + Name: "multi-service node service check failure", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change the service-level check status + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regServiceCheckFail)); err != nil { + return err + } + // Also change the service-level check status for the proxy. This is + // analogous to what would happen with an alias check on the client side + // - the proxies check would get updated at roughly the same time as the + // target service check updates. + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regServiceCheckFail)); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + // Should only see the events for that one service change, the sidecar + // service and hence the connect topic for that service. + testServiceHealthEvent(t, "web", + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evChecksMutated, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evChecksMutated, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evChecksMutated, + ), + }, + WantErr: false, + }, + { + Name: "multi-service node node-level check delete", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Delete only the node-level check + if err := s.deleteCheckTxn(tx, tx.Index, "node1", "serf-health", nil); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "db", + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + testServiceHealthEvent(t, "web", + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + }, + WantErr: false, + }, + { + Name: "multi-service node service check delete", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Delete the service-level check for the main service + if err := s.deleteCheckTxn(tx, tx.Index, "node1", "service:web", nil); err != nil { + return err + } + // Also delete for a proxy + if err := s.deleteCheckTxn(tx, tx.Index, "node1", "service:web_sidecar_proxy", nil); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + // Should only see the events for that one service change, the sidecar + // service and hence the connect topic for that service. + testServiceHealthEvent(t, "web", + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceCheckDelete, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceCheckDelete, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceCheckDelete, + ), + }, + WantErr: false, + }, + { + Name: "many services on many nodes in one TX", + Setup: func(s *Store, tx *txn) error { + // Node1 + + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + + // Node2 + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regNode2)); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regNode2)); err != nil { + return err + } + + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // In one transaction the operator moves the web service and it's + // sidecar from node2 back to node1 and deletes them from node2 + + if err := s.deleteServiceTxn(tx, tx.Index, "node2", "web", nil); err != nil { + return err + } + if err := s.deleteServiceTxn(tx, tx.Index, "node2", "web_sidecar_proxy", nil); err != nil { + return err + } + + // Register those on node1 + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)); err != nil { + return err + } + + // And for good measure, add a new connect-native service to node2 + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "api", regConnectNative, regNode2)); err != nil { + return err + } + + return nil + }, + WantEvents: []stream.Event{ + // We should see: + // - service dereg for web and proxy on node2 + // - connect dereg for web on node2 + // - service reg for web and proxy on node1 + // - connect reg for web on node1 + // - service reg for api on node2 + // - connect reg for api on node2 + testServiceHealthDeregistrationEvent(t, "web", evNode2), + testServiceHealthDeregistrationEvent(t, "web", evNode2, evSidecar), + testServiceHealthDeregistrationEvent(t, "web", + evConnectTopic, + evNode2, + evSidecar, + ), + + testServiceHealthEvent(t, "web", evNodeUnchanged), + testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged), + testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged), + + testServiceHealthEvent(t, "api", + evNode2, + evConnectNative, + evNodeUnchanged, + ), + testServiceHealthEvent(t, "api", + evNode2, + evConnectTopic, + evConnectNative, + evNodeUnchanged, + ), + }, + WantErr: false, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.Name, func(t *testing.T) { + s := testStateStore(t) + + if tc.Setup != nil { + // Bypass the publish mechanism for this test or we get into odd + // recursive stuff... + setupTx := s.db.WriteTxn(10) + require.NoError(t, tc.Setup(s, setupTx)) + // Commit the underlying transaction without using wrapped Commit so we + // avoid the whole event publishing system for setup here. It _should_ + // work but it makes debugging test hard as it will call the function + // under test for the setup data... + setupTx.Txn.Commit() + } + + tx := s.db.WriteTxn(100) + require.NoError(t, tc.Mutate(s, tx)) + + // Note we call the func under test directly rather than publishChanges so + // we can test this in isolation. + got, err := ServiceHealthEventsFromChanges(tx, Changes{Changes: tx.Changes(), Index: 100}) + if tc.WantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + + // Make sure we have the right events, only taking ordering into account + // where it matters to account for non-determinism. + requireEventsInCorrectPartialOrder(t, tc.WantEvents, got, func(e stream.Event) string { + // We need events affecting unique registrations to be ordered, within a topic + csn := getPayloadCheckServiceNode(e.Payload) + return fmt.Sprintf("%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service) + }) + }) + } +} + +type regOption func(req *structs.RegisterRequest) error + +func testNodeRegistration(t *testing.T, opts ...regOption) *structs.RegisterRequest { + r := &structs.RegisterRequest{ + Datacenter: "dc1", + ID: "11111111-2222-3333-4444-555555555555", + Node: "node1", + Address: "10.10.10.10", + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + CheckID: "serf-health", + Name: "serf-health", + Node: "node1", + Status: api.HealthPassing, + }, + }, + } + for _, opt := range opts { + err := opt(r) + require.NoError(t, err) + } + return r +} + +func testServiceRegistration(t *testing.T, svc string, opts ...regOption) *structs.RegisterRequest { + // note: don't pass opts or they might get applied twice! + r := testNodeRegistration(t) + r.Service = &structs.NodeService{ + ID: svc, + Service: svc, + Port: 8080, + } + r.Checks = append(r.Checks, + &structs.HealthCheck{ + CheckID: types.CheckID("service:" + svc), + Name: "service:" + svc, + Node: "node1", + ServiceID: svc, + ServiceName: svc, + Type: "ttl", + Status: api.HealthPassing, + }) + for _, opt := range opts { + err := opt(r) + require.NoError(t, err) + } + return r +} + +type eventOption func(e *stream.Event) error + +func testServiceHealthEvent(t *testing.T, svc string, opts ...eventOption) stream.Event { + e := newTestEventServiceHealthRegister(100, 1, svc) + + // Normalize a few things that are different in the generic event which was + // based on original code here but made more general. This means we don't have + // to change all the test loads... + csn := getPayloadCheckServiceNode(e.Payload) + csn.Node.ID = "11111111-2222-3333-4444-555555555555" + csn.Node.Address = "10.10.10.10" + + for _, opt := range opts { + err := opt(&e) + require.NoError(t, err) + } + return e +} + +func testServiceHealthDeregistrationEvent(t *testing.T, svc string, opts ...eventOption) stream.Event { + e := newTestEventServiceHealthDeregister(100, 1, svc) + for _, opt := range opts { + err := opt(&e) + require.NoError(t, err) + } + return e +} + +// regConnectNative option converts the base registration into a Connect-native +// one. +func regConnectNative(req *structs.RegisterRequest) error { + if req.Service == nil { + return nil + } + req.Service.Connect.Native = true + return nil +} + +// regSidecar option converts the base registration request +// into the registration for it's sidecar service. +func regSidecar(req *structs.RegisterRequest) error { + if req.Service == nil { + return nil + } + svc := req.Service.Service + + req.Service.Kind = structs.ServiceKindConnectProxy + req.Service.ID = svc + "_sidecar_proxy" + req.Service.Service = svc + "_sidecar_proxy" + req.Service.Port = 20000 + req.Service.Port + + req.Service.Proxy.DestinationServiceName = svc + req.Service.Proxy.DestinationServiceID = svc + + // Convert the check to point to the right ID now. This isn't totally + // realistic - sidecars should have alias checks etc but this is good enough + // to test this code path. + if len(req.Checks) >= 2 { + req.Checks[1].CheckID = types.CheckID("service:" + svc + "_sidecar_proxy") + req.Checks[1].ServiceID = svc + "_sidecar_proxy" + } + + return nil +} + +// regNodeCheckFail option converts the base registration request +// into a registration with the node-level health check failing +func regNodeCheckFail(req *structs.RegisterRequest) error { + req.Checks[0].Status = api.HealthCritical + return nil +} + +// regServiceCheckFail option converts the base registration request +// into a registration with the service-level health check failing +func regServiceCheckFail(req *structs.RegisterRequest) error { + req.Checks[1].Status = api.HealthCritical + return nil +} + +// regMutatePort option alters the base registration service port by a relative +// amount to simulate a service change. Can be used with regSidecar since it's a +// relative change (+10). +func regMutatePort(req *structs.RegisterRequest) error { + if req.Service == nil { + return nil + } + req.Service.Port += 10 + return nil +} + +// regRenameService option alters the base registration service name but not +// it's ID simulating a service being renamed while it's ID is maintained +// separately e.g. by a scheduler. This is an edge case but an important one as +// it changes which topic key events propagate. +func regRenameService(req *structs.RegisterRequest) error { + if req.Service == nil { + return nil + } + isSidecar := req.Service.Kind == structs.ServiceKindConnectProxy + + if !isSidecar { + req.Service.Service += "_changed" + // Update service checks + if len(req.Checks) >= 2 { + req.Checks[1].ServiceName += "_changed" + } + return nil + } + // This is a sidecar, it's not really realistic but lets only update the + // fields necessary to make it work again with the new service name to be sure + // we get the right result. This is certainly possible if not likely so a + // valid case. + + // We don't need to update out own details, only the name of the destination + req.Service.Proxy.DestinationServiceName += "_changed" + + return nil +} + +// regRenameNode option alters the base registration node name by adding the +// _changed suffix. +func regRenameNode(req *structs.RegisterRequest) error { + req.Node += "_changed" + for i := range req.Checks { + req.Checks[i].Node = req.Node + } + return nil +} + +// regNode2 option alters the base registration to be on a different node. +func regNode2(req *structs.RegisterRequest) error { + req.Node = "node2" + req.ID = "22222222-2222-3333-4444-555555555555" + for i := range req.Checks { + req.Checks[i].Node = req.Node + } + return nil +} + +// regNodeMeta option alters the base registration node to add some meta data. +func regNodeMeta(req *structs.RegisterRequest) error { + req.NodeMeta = map[string]string{"foo": "bar"} + return nil +} + +// evNodeUnchanged option converts the event to reset the node and node check +// raft indexes to the original value where we expect the node not to have been +// changed in the mutation. +func evNodeUnchanged(e *stream.Event) error { + // If the node wasn't touched, its modified index and check's modified + // indexes should be the original ones. + csn := getPayloadCheckServiceNode(e.Payload) + + // Check this isn't a dereg event with made up/placeholder node info + if csn.Node.CreateIndex == 0 { + return nil + } + csn.Node.CreateIndex = 10 + csn.Node.ModifyIndex = 10 + csn.Checks[0].CreateIndex = 10 + csn.Checks[0].ModifyIndex = 10 + return nil +} + +// evServiceUnchanged option converts the event to reset the service and service +// check raft indexes to the original value where we expect the service record +// not to have been changed in the mutation. +func evServiceUnchanged(e *stream.Event) error { + // If the node wasn't touched, its modified index and check's modified + // indexes should be the original ones. + csn := getPayloadCheckServiceNode(e.Payload) + + csn.Service.CreateIndex = 10 + csn.Service.ModifyIndex = 10 + if len(csn.Checks) > 1 { + csn.Checks[1].CreateIndex = 10 + csn.Checks[1].ModifyIndex = 10 + } + return nil +} + +// evConnectNative option converts the base event to represent a connect-native +// service instance. +func evConnectNative(e *stream.Event) error { + getPayloadCheckServiceNode(e.Payload).Service.Connect.Native = true + return nil +} + +// evConnectTopic option converts the base event to the equivalent event that +// should be published to the connect topic. When needed it should be applied +// first as several other options (notable evSidecar) change behavior subtly +// depending on which topic they are published to and they determin this from +// the event. +func evConnectTopic(e *stream.Event) error { + e.Topic = TopicServiceHealthConnect + return nil +} + +// evSidecar option converts the base event to the health (not connect) event +// expected from the sidecar proxy registration for that service instead. When +// needed it should be applied after any option that changes topic (e.g. +// evConnectTopic) but before other options that might change behavior subtly +// depending on whether it's a sidecar or regular service event (e.g. +// evRenameService). +func evSidecar(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + + svc := csn.Service.Service + + csn.Service.Kind = structs.ServiceKindConnectProxy + csn.Service.ID = svc + "_sidecar_proxy" + csn.Service.Service = svc + "_sidecar_proxy" + csn.Service.Port = 20000 + csn.Service.Port + + csn.Service.Proxy.DestinationServiceName = svc + csn.Service.Proxy.DestinationServiceID = svc + + // Convert the check to point to the right ID now. This isn't totally + // realistic - sidecars should have alias checks etc but this is good enough + // to test this code path. + if len(csn.Checks) >= 2 { + csn.Checks[1].CheckID = types.CheckID("service:" + svc + "_sidecar_proxy") + csn.Checks[1].ServiceID = svc + "_sidecar_proxy" + csn.Checks[1].ServiceName = svc + "_sidecar_proxy" + } + + // Update event key to be the proxy service name, but only if this is not + // already in the connect topic + if e.Topic != TopicServiceHealthConnect { + e.Key = csn.Service.Service + } + return nil +} + +// evMutatePort option alters the base event service port by a relative +// amount to simulate a service change. Can be used with evSidecar since it's a +// relative change (+10). +func evMutatePort(e *stream.Event) error { + getPayloadCheckServiceNode(e.Payload).Service.Port += 10 + return nil +} + +// evNodeMutated option alters the base event node to set it's CreateIndex +// (but not modify index) to the setup index. This expresses that we expect the +// node record originally created in setup to have been mutated during the +// update. +func evNodeMutated(e *stream.Event) error { + getPayloadCheckServiceNode(e.Payload).Node.CreateIndex = 10 + return nil +} + +// evServiceMutated option alters the base event service to set it's CreateIndex +// (but not modify index) to the setup index. This expresses that we expect the +// service record originally created in setup to have been mutated during the +// update. +func evServiceMutated(e *stream.Event) error { + getPayloadCheckServiceNode(e.Payload).Service.CreateIndex = 10 + return nil +} + +// evChecksMutated option alters the base event service check to set it's +// CreateIndex (but not modify index) to the setup index. This expresses that we +// expect the service check records originally created in setup to have been +// mutated during the update. NOTE: this must be sequenced after +// evServiceUnchanged if both are used. +func evChecksMutated(e *stream.Event) error { + getPayloadCheckServiceNode(e.Payload).Checks[1].CreateIndex = 10 + getPayloadCheckServiceNode(e.Payload).Checks[1].ModifyIndex = 100 + return nil +} + +// evNodeChecksMutated option alters the base event node check to set it's +// CreateIndex (but not modify index) to the setup index. This expresses that we +// expect the node check records originally created in setup to have been +// mutated during the update. NOTE: this must be sequenced after evNodeUnchanged +// if both are used. +func evNodeChecksMutated(e *stream.Event) error { + getPayloadCheckServiceNode(e.Payload).Checks[0].CreateIndex = 10 + getPayloadCheckServiceNode(e.Payload).Checks[0].ModifyIndex = 100 + return nil +} + +// evChecksUnchanged option alters the base event service to set all check raft +// indexes to the setup index. This expresses that we expect none of the checks +// to have changed in the update. +func evChecksUnchanged(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + for i := range csn.Checks { + csn.Checks[i].CreateIndex = 10 + csn.Checks[i].ModifyIndex = 10 + } + return nil +} + +// evRenameService option alters the base event service to change the service +// name but not ID simulating an in-place service rename. +func evRenameService(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + isSidecar := csn.Service.Kind == structs.ServiceKindConnectProxy + + if !isSidecar { + csn.Service.Service += "_changed" + // Update service checks + if len(csn.Checks) >= 2 { + csn.Checks[1].ServiceName += "_changed" + } + e.Key += "_changed" + return nil + } + // This is a sidecar, it's not really realistic but lets only update the + // fields necessary to make it work again with the new service name to be sure + // we get the right result. This is certainly possible if not likely so a + // valid case. + + // We don't need to update out own details, only the name of the destination + csn.Service.Proxy.DestinationServiceName += "_changed" + + // If this is the connect topic we need to change the key too + if e.Topic == TopicServiceHealthConnect { + e.Key += "_changed" + } + return nil +} + +// evNodeMeta option alters the base event node to add some meta data. +func evNodeMeta(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + csn.Node.Meta = map[string]string{"foo": "bar"} + return nil +} + +// evRenameNode option alters the base event node name. +func evRenameNode(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + csn.Node.Node += "_changed" + for i := range csn.Checks { + csn.Checks[i].Node = csn.Node.Node + } + return nil +} + +// evNode2 option alters the base event to refer to a different node +func evNode2(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + csn.Node.Node = "node2" + // Only change ID if it's set (e.g. it's not in a deregistration event) + if csn.Node.ID != "" { + csn.Node.ID = "22222222-2222-3333-4444-555555555555" + } + for i := range csn.Checks { + csn.Checks[i].Node = csn.Node.Node + } + return nil +} + +// evNodeCheckFail option alters the base event to set the node-level health +// check to be failing +func evNodeCheckFail(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + csn.Checks[0].Status = api.HealthCritical + return nil +} + +// evNodeCheckDelete option alters the base event to remove the node-level +// health check +func evNodeCheckDelete(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + // Ensure this is idempotent as we sometimes get called multiple times.. + if len(csn.Checks) > 0 && csn.Checks[0].ServiceID == "" { + csn.Checks = csn.Checks[1:] + } + return nil +} + +// evServiceCheckFail option alters the base event to set the service-level health +// check to be failing +func evServiceCheckFail(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + csn.Checks[1].Status = api.HealthCritical + return nil +} + +// evServiceCheckDelete option alters the base event to remove the service-level +// health check +func evServiceCheckDelete(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + // Ensure this is idempotent as we sometimes get called multiple times.. + if len(csn.Checks) > 1 && csn.Checks[1].ServiceID != "" { + csn.Checks = csn.Checks[0:1] + } + return nil +} + +// requireEventsInCorrectPartialOrder compares that the expected set of events +// was emitted. It allows for _independent_ events to be emitted in any order - +// this can be important because even though the transaction processing is all +// strictly ordered up until the processing func, grouping multiple updates that +// affect the same logical entity may be necessary and may impose random +// ordering changes on the eventual events if a map is used. We only care that +// events _affecting the same topic and key_ are ordered correctly with respect +// to the "expected" set of events so this helper asserts that. +// +// The caller provides a func that can return a partition key for the given +// event types and we assert that all events with the same partition key are +// deliveries in the same order. Note that this is not necessarily the same as +// topic/key since for example in Catalog only events about a specific service +// _instance_ need to be ordered while topic and key are more general. +func requireEventsInCorrectPartialOrder(t *testing.T, want, got []stream.Event, + partKey func(stream.Event) string) { + t.Helper() + + // Partion both arrays by topic/key + wantParts := make(map[string][]stream.Event) + gotParts := make(map[string][]stream.Event) + + for _, e := range want { + k := partKey(e) + wantParts[k] = append(wantParts[k], e) + } + for _, e := range got { + k := partKey(e) + gotParts[k] = append(gotParts[k], e) + } + + for k, want := range wantParts { + require.Equal(t, want, gotParts[k], "got incorrect events for partition: %s", k) + } + + for k, got := range gotParts { + if _, ok := wantParts[k]; !ok { + require.Equal(t, nil, got, "got unwanted events for partition: %s", k) + } + } +} + +// newTestEventServiceHealthRegister returns a realistically populated service +// health registration event. The nodeNum is a +// logical node and is used to create the node name ("node%d") but also change +// the node ID and IP address to make it a little more realistic for cases that +// need that. nodeNum should be less than 64k to make the IP address look +// realistic. Any other changes can be made on the returned event to avoid +// adding too many options to callers. +func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) stream.Event { + node := fmt.Sprintf("node%d", nodeNum) + nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum)) + addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) + + return stream.Event{ + Topic: TopicServiceHealth, + Key: svc, + Index: index, + Payload: eventPayload{ + Op: OpCreate, + Obj: &structs.CheckServiceNode{ + Node: &structs.Node{ + ID: nodeID, + Node: node, + Address: addr, + Datacenter: "dc1", + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + Service: &structs.NodeService{ + ID: svc, + Service: svc, + Port: 8080, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + Checks: []*structs.HealthCheck{ + { + Node: node, + CheckID: "serf-health", + Name: "serf-health", + Status: "passing", + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + { + Node: node, + CheckID: types.CheckID("service:" + svc), + Name: "service:" + svc, + ServiceID: svc, + ServiceName: svc, + Type: "ttl", + Status: "passing", + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + }, + }, + }, + } +} + +// TestEventServiceHealthDeregister returns a realistically populated service +// health deregistration event. The nodeNum is a +// logical node and is used to create the node name ("node%d") but also change +// the node ID and IP address to make it a little more realistic for cases that +// need that. nodeNum should be less than 64k to make the IP address look +// realistic. Any other changes can be made on the returned event to avoid +// adding too many options to callers. +func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) stream.Event { + return stream.Event{ + Topic: TopicServiceHealth, + Key: svc, + Index: index, + Payload: eventPayload{ + Op: OpDelete, + Obj: &structs.CheckServiceNode{ + Node: &structs.Node{ + Node: fmt.Sprintf("node%d", nodeNum), + }, + Service: &structs.NodeService{ + ID: svc, + Service: svc, + Port: 8080, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + RaftIndex: structs.RaftIndex{ + // The original insertion index since a delete doesn't update + // this. This magic value came from state store tests where we + // setup at index 10 and then mutate at index 100. It can be + // modified by the caller later and makes it easier than having + // yet another argument in the common case. + CreateIndex: 10, + ModifyIndex: 10, + }, + }, + }, + }, + } +} diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 1aaa45dd8..8ac0cf57a 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -165,11 +165,34 @@ func (t topic) String() string { return string(t) } +var ( + // TopicServiceHealth contains events for all registered service instances. + TopicServiceHealth topic = "topic-service-health" + // TopicServiceHealthConnect contains events for connect-enabled service instances. + TopicServiceHealthConnect topic = "topic-service-health-connect" +) + func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { - // TODO: add other table handlers here. - return aclChangeUnsubscribeEvent(tx, changes) + var events []stream.Event + fns := []func(tx ReadTxn, changes Changes) ([]stream.Event, error){ + aclChangeUnsubscribeEvent, + ServiceHealthEventsFromChanges, + // TODO: add other table handlers here. + } + for _, fn := range fns { + e, err := fn(tx, changes) + if err != nil { + return nil, err + } + events = append(events, e...) + } + return events, nil } -func newSnapshotHandlers() stream.SnapshotHandlers { - return stream.SnapshotHandlers{} +// TODO: could accept a ReadTxner instead of a Store +func newSnapshotHandlers(s *Store) stream.SnapshotHandlers { + return stream.SnapshotHandlers{ + TopicServiceHealth: s.ServiceHealthSnapshot, + TopicServiceHealthConnect: s.ServiceHealthConnectSnapshot, + } } diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index d19922eec..3a7229607 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -162,17 +162,17 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) { ctx, cancel := context.WithCancel(context.TODO()) s := &Store{ - schema: schema, - abandonCh: make(chan struct{}), - kvsGraveyard: NewGraveyard(gc), - lockDelay: NewDelay(), - db: &changeTrackerDB{ - db: db, - publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(), 10*time.Second), - processChanges: processDBChanges, - }, + schema: schema, + abandonCh: make(chan struct{}), + kvsGraveyard: NewGraveyard(gc), + lockDelay: NewDelay(), stopEventPublisher: cancel, } + s.db = &changeTrackerDB{ + db: db, + publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(s), 10*time.Second), + processChanges: processDBChanges, + } return s, nil }