808 lines
25 KiB
Go
808 lines
25 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package state
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/consul/stream"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/proto/private/pbcommon"
|
|
"github.com/hashicorp/consul/proto/private/pbservice"
|
|
"github.com/hashicorp/consul/proto/private/pbsubscribe"
|
|
)
|
|
|
|
// EventSubjectService is a stream.Subject used to route and receive events for
|
|
// a specific service.
|
|
type EventSubjectService struct {
|
|
Key string
|
|
EnterpriseMeta acl.EnterpriseMeta
|
|
PeerName string
|
|
|
|
overrideKey string
|
|
overrideNamespace string
|
|
overridePartition string
|
|
}
|
|
|
|
// EventPayloadCheckServiceNode is used as the Payload for a stream.Event to
|
|
// indicates changes to a CheckServiceNode for service health.
|
|
//
|
|
// The stream.Payload methods implemented by EventPayloadCheckServiceNode are
|
|
// do not mutate the payload, making it safe to use in an Event sent to
|
|
// stream.EventPublisher.Publish.
|
|
type EventPayloadCheckServiceNode struct {
|
|
Op pbsubscribe.CatalogOp
|
|
Value *structs.CheckServiceNode
|
|
// key is used to override the key used to filter the payload. It is set for
|
|
// events in the connect topic to specify the name of the underlying service
|
|
// when the change event is for a sidecar or gateway.
|
|
overrideKey string
|
|
overrideNamespace string
|
|
overridePartition string
|
|
}
|
|
|
|
func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bool {
|
|
return e.Value.CanRead(authz) == acl.Allow
|
|
}
|
|
|
|
func (e EventPayloadCheckServiceNode) Subject() stream.Subject {
|
|
return EventSubjectService{
|
|
Key: e.Value.Service.Service,
|
|
EnterpriseMeta: e.Value.Service.EnterpriseMeta,
|
|
PeerName: e.Value.Service.PeerName,
|
|
|
|
overrideKey: e.overrideKey,
|
|
overrideNamespace: e.overrideNamespace,
|
|
overridePartition: e.overridePartition,
|
|
}
|
|
}
|
|
|
|
func (e EventPayloadCheckServiceNode) ToSubscriptionEvent(idx uint64) *pbsubscribe.Event {
|
|
return &pbsubscribe.Event{
|
|
Index: idx,
|
|
Payload: &pbsubscribe.Event_ServiceHealth{
|
|
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
|
Op: e.Op,
|
|
CheckServiceNode: pbservice.NewCheckServiceNodeFromStructs(e.Value),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// EventPayloadServiceListUpdate is used as the Payload for a stream.Event when
|
|
// services (not service instances) are registered/deregistered. These events
|
|
// are used to materialize the list of services in a datacenter.
|
|
type EventPayloadServiceListUpdate struct {
|
|
Op pbsubscribe.CatalogOp
|
|
|
|
Name string
|
|
EnterpriseMeta acl.EnterpriseMeta
|
|
PeerName string
|
|
}
|
|
|
|
func (e *EventPayloadServiceListUpdate) ToSubscriptionEvent(idx uint64) *pbsubscribe.Event {
|
|
return &pbsubscribe.Event{
|
|
Index: idx,
|
|
Payload: &pbsubscribe.Event_Service{
|
|
Service: &pbsubscribe.ServiceListUpdate{
|
|
Op: e.Op,
|
|
Name: e.Name,
|
|
EnterpriseMeta: pbcommon.NewEnterpriseMetaFromStructs(e.EnterpriseMeta),
|
|
PeerName: e.PeerName,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (e *EventPayloadServiceListUpdate) Subject() stream.Subject { return stream.SubjectNone }
|
|
|
|
func (e *EventPayloadServiceListUpdate) HasReadPermission(authz acl.Authorizer) bool {
|
|
var authzContext acl.AuthorizerContext
|
|
e.EnterpriseMeta.FillAuthzContext(&authzContext)
|
|
return authz.ServiceRead(e.Name, &authzContext) == acl.Allow
|
|
}
|
|
|
|
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
|
|
// of stream.Events that describe the current state of a service health query.
|
|
func (s *Store) ServiceHealthSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) {
|
|
tx := s.db.ReadTxn()
|
|
defer tx.Abort()
|
|
|
|
connect := req.Topic == EventTopicServiceHealthConnect
|
|
|
|
subject, ok := req.Subject.(EventSubjectService)
|
|
if !ok {
|
|
return 0, fmt.Errorf("expected SubscribeRequest.Subject to be a: state.EventSubjectService, was a: %T", req.Subject)
|
|
}
|
|
|
|
idx, nodes, err := checkServiceNodesTxn(tx, nil, subject.Key, connect, &subject.EnterpriseMeta, subject.PeerName)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
for i := range nodes {
|
|
n := nodes[i]
|
|
event := stream.Event{
|
|
Index: idx,
|
|
Topic: req.Topic,
|
|
Payload: EventPayloadCheckServiceNode{
|
|
Op: pbsubscribe.CatalogOp_Register,
|
|
Value: &n,
|
|
},
|
|
}
|
|
|
|
if !connect {
|
|
// append each event as a separate item so that they can be serialized
|
|
// separately, to prevent the encoding of one massive message.
|
|
buf.Append([]stream.Event{event})
|
|
continue
|
|
}
|
|
|
|
events, err := connectEventsByServiceKind(tx, event)
|
|
if err != nil {
|
|
return idx, err
|
|
}
|
|
buf.Append(events)
|
|
}
|
|
|
|
return idx, err
|
|
}
|
|
|
|
// TODO: this could use NodeServiceQuery
|
|
type nodeServiceTuple struct {
|
|
Node string
|
|
ServiceID string
|
|
EntMeta acl.EnterpriseMeta
|
|
PeerName string
|
|
}
|
|
|
|
func newNodeServiceTupleFromServiceNode(sn *structs.ServiceNode) nodeServiceTuple {
|
|
return nodeServiceTuple{
|
|
Node: strings.ToLower(sn.Node),
|
|
ServiceID: sn.ServiceID,
|
|
EntMeta: sn.EnterpriseMeta,
|
|
PeerName: sn.PeerName,
|
|
}
|
|
}
|
|
|
|
func newNodeServiceTupleFromServiceHealthCheck(hc *structs.HealthCheck) nodeServiceTuple {
|
|
return nodeServiceTuple{
|
|
Node: strings.ToLower(hc.Node),
|
|
ServiceID: hc.ServiceID,
|
|
EntMeta: hc.EnterpriseMeta,
|
|
PeerName: hc.PeerName,
|
|
}
|
|
}
|
|
|
|
type serviceChange struct {
|
|
changeType changeType
|
|
change memdb.Change
|
|
}
|
|
|
|
type nodeTuple struct {
|
|
Node string
|
|
Partition string
|
|
PeerName string
|
|
}
|
|
|
|
var serviceChangeIndirect = serviceChange{changeType: changeIndirect}
|
|
|
|
// ServiceListUpdateEventsFromChanges returns events representing changes to
|
|
// the list of services from the given set of state store changes.
|
|
func ServiceListUpdateEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
|
var events []stream.Event
|
|
for _, change := range changes.Changes {
|
|
if change.Table != tableKindServiceNames {
|
|
continue
|
|
}
|
|
|
|
kindName := changeObject(change).(*KindServiceName)
|
|
|
|
// TODO(peering): make this peer-aware.
|
|
payload := &EventPayloadServiceListUpdate{
|
|
Name: kindName.Service.Name,
|
|
EnterpriseMeta: kindName.Service.EnterpriseMeta,
|
|
}
|
|
|
|
if change.Deleted() {
|
|
payload.Op = pbsubscribe.CatalogOp_Deregister
|
|
} else {
|
|
payload.Op = pbsubscribe.CatalogOp_Register
|
|
}
|
|
|
|
events = append(events, stream.Event{
|
|
Topic: EventTopicServiceList,
|
|
Index: changes.Index,
|
|
Payload: payload,
|
|
})
|
|
}
|
|
return events, nil
|
|
}
|
|
|
|
// ServiceListSnapshot is a stream.SnapshotFunc that returns a snapshot of
|
|
// all service names.
|
|
func (s *Store) ServiceListSnapshot(_ stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
index, names, err := s.ServiceNamesOfKind(nil, "")
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if l := len(names); l > 0 {
|
|
events := make([]stream.Event, l)
|
|
for idx, name := range names {
|
|
events[idx] = stream.Event{
|
|
Topic: EventTopicServiceList,
|
|
Index: index,
|
|
Payload: &EventPayloadServiceListUpdate{
|
|
Op: pbsubscribe.CatalogOp_Register,
|
|
Name: name.Service.Name,
|
|
EnterpriseMeta: name.Service.EnterpriseMeta,
|
|
},
|
|
}
|
|
}
|
|
buf.Append(events)
|
|
}
|
|
|
|
return index, nil
|
|
}
|
|
|
|
// ServiceHealthEventsFromChanges returns all the service and Connect health
|
|
// events that should be emitted given a set of changes to the state store.
|
|
func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
|
var events []stream.Event
|
|
|
|
var nodeChanges map[nodeTuple]changeType
|
|
var serviceChanges map[nodeServiceTuple]serviceChange
|
|
var termGatewayChanges map[structs.ServiceName]map[structs.ServiceName]serviceChange
|
|
|
|
markNode := func(node nodeTuple, typ changeType) {
|
|
if nodeChanges == nil {
|
|
nodeChanges = make(map[nodeTuple]changeType)
|
|
}
|
|
// If the caller has an actual node mutation ensure we store it even if the
|
|
// node is already marked. If the caller is just marking the node dirty
|
|
// without a node change, don't overwrite any existing node change we know
|
|
// about.
|
|
if nodeChanges[node] == changeIndirect {
|
|
nodeChanges[node] = typ
|
|
}
|
|
}
|
|
markService := func(key nodeServiceTuple, svcChange serviceChange) {
|
|
if serviceChanges == nil {
|
|
serviceChanges = make(map[nodeServiceTuple]serviceChange)
|
|
}
|
|
// If the caller has an actual service mutation ensure we store it even if
|
|
// the service is already marked. If the caller is just marking the service
|
|
// dirty without a service change, don't overwrite any existing service change we
|
|
// know about.
|
|
if serviceChanges[key].changeType == changeIndirect {
|
|
serviceChanges[key] = svcChange
|
|
}
|
|
}
|
|
|
|
for _, change := range changes.Changes {
|
|
switch change.Table {
|
|
case tableNodes:
|
|
// Node changed in some way, if it's not a delete, we'll need to
|
|
// re-deliver CheckServiceNode results for all services on that node but
|
|
// we mark it anyway because if it _is_ a delete then we need to know that
|
|
// later to avoid trying to deliver events when node level checks mark the
|
|
// node as "changed".
|
|
n := changeObject(change).(*structs.Node)
|
|
tuple := newNodeTupleFromNode(n)
|
|
markNode(tuple, changeTypeFromChange(change))
|
|
|
|
case tableServices:
|
|
sn := changeObject(change).(*structs.ServiceNode)
|
|
srvChange := serviceChange{changeType: changeTypeFromChange(change), change: change}
|
|
markService(newNodeServiceTupleFromServiceNode(sn), srvChange)
|
|
|
|
case tableChecks:
|
|
// For health we only care about the scope for now to know if it's just
|
|
// affecting a single service or every service on a node. There is a
|
|
// subtle edge case where the check with same ID changes from being node
|
|
// scoped to service scoped or vice versa, in either case we need to treat
|
|
// it as affecting all services on the node.
|
|
switch {
|
|
case change.Updated():
|
|
before := change.Before.(*structs.HealthCheck)
|
|
after := change.After.(*structs.HealthCheck)
|
|
if after.ServiceID == "" || before.ServiceID == "" {
|
|
// check before and/or after is node-scoped
|
|
nt := newNodeTupleFromHealthCheck(after)
|
|
markNode(nt, changeIndirect)
|
|
} else {
|
|
// Check changed which means we just need to emit for the linked
|
|
// service.
|
|
markService(newNodeServiceTupleFromServiceHealthCheck(after), serviceChangeIndirect)
|
|
|
|
// Edge case - if the check with same ID was updated to link to a
|
|
// different service ID but the old service with old ID still exists,
|
|
// then the old service instance needs updating too as it has one
|
|
// fewer checks now.
|
|
if before.ServiceID != after.ServiceID {
|
|
markService(newNodeServiceTupleFromServiceHealthCheck(before), serviceChangeIndirect)
|
|
}
|
|
}
|
|
|
|
case change.Deleted(), change.Created():
|
|
obj := changeObject(change).(*structs.HealthCheck)
|
|
if obj.ServiceID == "" {
|
|
// Node level check
|
|
nt := newNodeTupleFromHealthCheck(obj)
|
|
markNode(nt, changeIndirect)
|
|
} else {
|
|
markService(newNodeServiceTupleFromServiceHealthCheck(obj), serviceChangeIndirect)
|
|
}
|
|
}
|
|
case tableGatewayServices:
|
|
gs := changeObject(change).(*structs.GatewayService)
|
|
if gs.GatewayKind != structs.ServiceKindTerminatingGateway {
|
|
continue
|
|
}
|
|
|
|
gsChange := serviceChange{changeType: changeTypeFromChange(change), change: change}
|
|
|
|
if termGatewayChanges == nil {
|
|
termGatewayChanges = make(map[structs.ServiceName]map[structs.ServiceName]serviceChange)
|
|
}
|
|
|
|
_, ok := termGatewayChanges[gs.Gateway]
|
|
if !ok {
|
|
termGatewayChanges[gs.Gateway] = map[structs.ServiceName]serviceChange{}
|
|
}
|
|
|
|
switch gsChange.changeType {
|
|
case changeUpdate:
|
|
after := gsChange.change.After.(*structs.GatewayService)
|
|
if gsChange.change.Before.(*structs.GatewayService).IsSame(after) {
|
|
continue
|
|
}
|
|
termGatewayChanges[gs.Gateway][gs.Service] = gsChange
|
|
case changeDelete, changeCreate:
|
|
termGatewayChanges[gs.Gateway][gs.Service] = gsChange
|
|
}
|
|
}
|
|
}
|
|
|
|
// Now act on those marked nodes/services
|
|
for node, changeType := range nodeChanges {
|
|
if changeType == changeDelete {
|
|
// Node deletions are a no-op here since the state store transaction will
|
|
// have also removed all the service instances which will be handled in
|
|
// the loop below.
|
|
continue
|
|
}
|
|
// Rebuild events for all services on this node
|
|
es, err := newServiceHealthEventsForNode(tx, changes.Index, node.Node,
|
|
structs.WildcardEnterpriseMetaInPartition(node.Partition), node.PeerName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
events = append(events, es...)
|
|
}
|
|
|
|
for tuple, srvChange := range serviceChanges {
|
|
if srvChange.changeType == changeDelete {
|
|
sn := srvChange.change.Before.(*structs.ServiceNode)
|
|
e := newServiceHealthEventDeregister(changes.Index, sn)
|
|
events = append(events, e)
|
|
continue
|
|
}
|
|
|
|
// Check if this was a service mutation that changed it's name which
|
|
// requires special handling even if node changed and new events were
|
|
// already published.
|
|
if srvChange.changeType == changeUpdate {
|
|
before := srvChange.change.Before.(*structs.ServiceNode)
|
|
after := srvChange.change.After.(*structs.ServiceNode)
|
|
|
|
if before.ServiceName != after.ServiceName {
|
|
// Service was renamed, the code below will ensure the new registrations
|
|
// go out to subscribers to the new service name topic key, but we need
|
|
// to fix up subscribers that were watching the old name by sending
|
|
// deregistrations.
|
|
e := newServiceHealthEventDeregister(changes.Index, before)
|
|
events = append(events, e)
|
|
}
|
|
|
|
if e, ok := isConnectProxyDestinationServiceChange(changes.Index, before, after); ok {
|
|
events = append(events, e)
|
|
}
|
|
}
|
|
|
|
if _, ok := nodeChanges[tuple.nodeTuple()]; ok {
|
|
// We already rebuilt events for everything on this node, no need to send
|
|
// a duplicate.
|
|
continue
|
|
}
|
|
// Build service event and append it
|
|
e, err := newServiceHealthEventForService(tx, changes.Index, tuple)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
events = append(events, e)
|
|
}
|
|
|
|
for gatewayName, svcChanges := range termGatewayChanges {
|
|
for serviceName, gsChange := range svcChanges {
|
|
gs := changeObject(gsChange.change).(*structs.GatewayService)
|
|
|
|
q := Query{
|
|
Value: gs.Gateway.Name,
|
|
EnterpriseMeta: gatewayName.EnterpriseMeta,
|
|
PeerName: structs.TODOPeerKeyword,
|
|
}
|
|
_, nodes, err := serviceNodesTxn(tx, nil, indexService, q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Always send deregister events for deletes/updates.
|
|
if gsChange.changeType != changeCreate {
|
|
for _, sn := range nodes {
|
|
e := newServiceHealthEventDeregister(changes.Index, sn)
|
|
|
|
e.Topic = EventTopicServiceHealthConnect
|
|
payload := e.Payload.(EventPayloadCheckServiceNode)
|
|
payload.overrideKey = serviceName.Name
|
|
if gatewayName.EnterpriseMeta.NamespaceOrDefault() != serviceName.EnterpriseMeta.NamespaceOrDefault() {
|
|
payload.overrideNamespace = serviceName.EnterpriseMeta.NamespaceOrDefault()
|
|
}
|
|
if gatewayName.EnterpriseMeta.PartitionOrDefault() != serviceName.EnterpriseMeta.PartitionOrDefault() {
|
|
payload.overridePartition = serviceName.EnterpriseMeta.PartitionOrDefault()
|
|
}
|
|
e.Payload = payload
|
|
|
|
events = append(events, e)
|
|
}
|
|
}
|
|
|
|
if gsChange.changeType == changeDelete {
|
|
continue
|
|
}
|
|
|
|
// Build service events and append them
|
|
for _, sn := range nodes {
|
|
tuple := newNodeServiceTupleFromServiceNode(sn)
|
|
|
|
// If we're already sending an event for the service, don't send another.
|
|
if _, ok := serviceChanges[tuple]; ok {
|
|
continue
|
|
}
|
|
|
|
e, err := newServiceHealthEventForService(tx, changes.Index, tuple)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
e.Topic = EventTopicServiceHealthConnect
|
|
payload := e.Payload.(EventPayloadCheckServiceNode)
|
|
payload.overrideKey = serviceName.Name
|
|
if gatewayName.EnterpriseMeta.NamespaceOrDefault() != serviceName.EnterpriseMeta.NamespaceOrDefault() {
|
|
payload.overrideNamespace = serviceName.EnterpriseMeta.NamespaceOrDefault()
|
|
}
|
|
if gatewayName.EnterpriseMeta.PartitionOrDefault() != serviceName.EnterpriseMeta.PartitionOrDefault() {
|
|
payload.overridePartition = serviceName.EnterpriseMeta.PartitionOrDefault()
|
|
}
|
|
e.Payload = payload
|
|
|
|
events = append(events, e)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Duplicate any events that affected connect-enabled instances (proxies or
|
|
// native apps) to the relevant Connect topic.
|
|
connectEvents, err := serviceHealthToConnectEvents(tx, events...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
events = append(events, connectEvents...)
|
|
|
|
return events, nil
|
|
}
|
|
|
|
// isConnectProxyDestinationServiceChange handles the case where a Connect proxy changed
|
|
// the service it is proxying. We need to issue a de-registration for the old
|
|
// service on the Connect topic. We don't actually need to deregister this sidecar
|
|
// service though as it still exists and didn't change its name.
|
|
func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.ServiceNode) (stream.Event, bool) {
|
|
if before.ServiceKind != structs.ServiceKindConnectProxy ||
|
|
before.ServiceProxy.DestinationServiceName == after.ServiceProxy.DestinationServiceName {
|
|
return stream.Event{}, false
|
|
}
|
|
|
|
e := newServiceHealthEventDeregister(idx, before)
|
|
e.Topic = EventTopicServiceHealthConnect
|
|
payload := e.Payload.(EventPayloadCheckServiceNode)
|
|
payload.overrideKey = payload.Value.Service.Proxy.DestinationServiceName
|
|
e.Payload = payload
|
|
return e, true
|
|
}
|
|
|
|
type changeType uint8
|
|
|
|
const (
|
|
// changeIndirect indicates some other object changed which has implications
|
|
// for the target object.
|
|
changeIndirect changeType = iota
|
|
changeDelete
|
|
changeCreate
|
|
changeUpdate
|
|
)
|
|
|
|
func changeTypeFromChange(change memdb.Change) changeType {
|
|
switch {
|
|
case change.Deleted():
|
|
return changeDelete
|
|
case change.Created():
|
|
return changeCreate
|
|
default:
|
|
return changeUpdate
|
|
}
|
|
}
|
|
|
|
// serviceHealthToConnectEvents converts already formatted service health
|
|
// registration events into the ones needed to publish to the Connect topic.
|
|
// This essentially means filtering out any instances that are not Connect
|
|
// enabled and so of no interest to those subscribers but also involves
|
|
// switching connection details to be the proxy instead of the actual instance
|
|
// in case of a sidecar.
|
|
func serviceHealthToConnectEvents(
|
|
tx ReadTxn,
|
|
events ...stream.Event,
|
|
) ([]stream.Event, error) {
|
|
var result []stream.Event
|
|
for _, event := range events {
|
|
if event.Topic != EventTopicServiceHealth { // event.Topic == topicServiceHealthConnect
|
|
// Skip non-health or any events already emitted to Connect topic
|
|
continue
|
|
}
|
|
|
|
connectEvents, err := connectEventsByServiceKind(tx, event)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result = append(result, connectEvents...)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func connectEventsByServiceKind(tx ReadTxn, origEvent stream.Event) ([]stream.Event, error) {
|
|
node := getPayloadCheckServiceNode(origEvent.Payload)
|
|
if node.Service == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
event := origEvent // shallow copy the event
|
|
event.Topic = EventTopicServiceHealthConnect
|
|
|
|
if node.Service.Connect.Native {
|
|
return []stream.Event{event}, nil
|
|
}
|
|
|
|
switch node.Service.Kind {
|
|
case structs.ServiceKindConnectProxy:
|
|
payload := event.Payload.(EventPayloadCheckServiceNode)
|
|
payload.overrideKey = node.Service.Proxy.DestinationServiceName
|
|
event.Payload = payload
|
|
return []stream.Event{event}, nil
|
|
|
|
case structs.ServiceKindTerminatingGateway:
|
|
var result []stream.Event
|
|
|
|
// TODO(peering): handle terminating gateways somehow
|
|
|
|
sn := structs.ServiceName{
|
|
Name: node.Service.Service,
|
|
EnterpriseMeta: node.Service.EnterpriseMeta,
|
|
}
|
|
iter, err := tx.Get(tableGatewayServices, indexGateway, sn)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// similar to checkServiceNodesTxn -> serviceGatewayNodes
|
|
for obj := iter.Next(); obj != nil; obj = iter.Next() {
|
|
result = append(result, copyEventForService(event, obj.(*structs.GatewayService).Service))
|
|
}
|
|
return result, nil
|
|
default:
|
|
// All other cases are not relevant to the connect topic
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func copyEventForService(event stream.Event, service structs.ServiceName) stream.Event {
|
|
event.Topic = EventTopicServiceHealthConnect
|
|
payload := event.Payload.(EventPayloadCheckServiceNode)
|
|
payload.overrideKey = service.Name
|
|
if payload.Value.Service.EnterpriseMeta.NamespaceOrDefault() != service.EnterpriseMeta.NamespaceOrDefault() {
|
|
payload.overrideNamespace = service.EnterpriseMeta.NamespaceOrDefault()
|
|
}
|
|
if payload.Value.Service.EnterpriseMeta.PartitionOrDefault() != service.EnterpriseMeta.PartitionOrDefault() {
|
|
payload.overridePartition = service.EnterpriseMeta.PartitionOrDefault()
|
|
}
|
|
|
|
event.Payload = payload
|
|
return event
|
|
}
|
|
|
|
func getPayloadCheckServiceNode(payload stream.Payload) *structs.CheckServiceNode {
|
|
ep, ok := payload.(EventPayloadCheckServiceNode)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return ep.Value
|
|
}
|
|
|
|
// newServiceHealthEventsForNode returns health events for all services on the
|
|
// given node. This mirrors some of the the logic in the oddly-named
|
|
// parseCheckServiceNodes but is more efficient since we know they are all on
|
|
// the same node.
|
|
func newServiceHealthEventsForNode(tx ReadTxn, idx uint64, node string, entMeta *acl.EnterpriseMeta, peerName string) ([]stream.Event, error) {
|
|
services, err := tx.Get(tableServices, indexNode, Query{
|
|
Value: node,
|
|
EnterpriseMeta: *entMeta,
|
|
PeerName: peerName,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
n, checksFunc, err := getNodeAndChecks(tx, node, entMeta, peerName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var events []stream.Event
|
|
for service := services.Next(); service != nil; service = services.Next() {
|
|
sn := service.(*structs.ServiceNode)
|
|
|
|
event := newServiceHealthEventRegister(idx, n, sn, checksFunc(sn.ServiceID))
|
|
events = append(events, event)
|
|
}
|
|
|
|
return events, nil
|
|
}
|
|
|
|
// getNodeAndNodeChecks returns a the node structure and a function that returns
|
|
// the full list of checks for a specific service on that node.
|
|
func getNodeAndChecks(tx ReadTxn, node string, entMeta *acl.EnterpriseMeta, peerName string) (*structs.Node, serviceChecksFunc, error) {
|
|
// Fetch the node
|
|
nodeRaw, err := tx.First(tableNodes, indexID, Query{
|
|
Value: node,
|
|
EnterpriseMeta: *entMeta,
|
|
PeerName: peerName,
|
|
})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if nodeRaw == nil {
|
|
return nil, nil, ErrMissingNode
|
|
}
|
|
n := nodeRaw.(*structs.Node)
|
|
|
|
iter, err := tx.Get(tableChecks, indexNode, Query{
|
|
Value: node,
|
|
EnterpriseMeta: *entMeta,
|
|
PeerName: peerName,
|
|
})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
var nodeChecks structs.HealthChecks
|
|
var svcChecks map[string]structs.HealthChecks
|
|
|
|
for check := iter.Next(); check != nil; check = iter.Next() {
|
|
check := check.(*structs.HealthCheck)
|
|
if check.ServiceID == "" {
|
|
nodeChecks = append(nodeChecks, check)
|
|
} else {
|
|
if svcChecks == nil {
|
|
svcChecks = make(map[string]structs.HealthChecks)
|
|
}
|
|
svcChecks[check.ServiceID] = append(svcChecks[check.ServiceID], check)
|
|
}
|
|
}
|
|
serviceChecks := func(serviceID string) structs.HealthChecks {
|
|
// Create a new slice so that append does not modify the array backing nodeChecks.
|
|
result := make(structs.HealthChecks, 0, len(nodeChecks))
|
|
result = append(result, nodeChecks...)
|
|
for _, check := range svcChecks[serviceID] {
|
|
result = append(result, check)
|
|
}
|
|
return result
|
|
}
|
|
return n, serviceChecks, nil
|
|
}
|
|
|
|
type serviceChecksFunc func(serviceID string) structs.HealthChecks
|
|
|
|
func newServiceHealthEventForService(tx ReadTxn, idx uint64, tuple nodeServiceTuple) (stream.Event, error) {
|
|
n, checksFunc, err := getNodeAndChecks(tx, tuple.Node, &tuple.EntMeta, tuple.PeerName)
|
|
if err != nil {
|
|
return stream.Event{}, err
|
|
}
|
|
|
|
svc, err := tx.Get(tableServices, indexID, NodeServiceQuery{
|
|
EnterpriseMeta: tuple.EntMeta,
|
|
Node: tuple.Node,
|
|
Service: tuple.ServiceID,
|
|
PeerName: tuple.PeerName,
|
|
})
|
|
if err != nil {
|
|
return stream.Event{}, err
|
|
}
|
|
|
|
raw := svc.Next()
|
|
if raw == nil {
|
|
return stream.Event{}, ErrMissingService
|
|
}
|
|
|
|
sn := raw.(*structs.ServiceNode)
|
|
return newServiceHealthEventRegister(idx, n, sn, checksFunc(sn.ServiceID)), nil
|
|
}
|
|
|
|
func newServiceHealthEventRegister(
|
|
idx uint64,
|
|
node *structs.Node,
|
|
sn *structs.ServiceNode,
|
|
checks structs.HealthChecks,
|
|
) stream.Event {
|
|
csn := &structs.CheckServiceNode{
|
|
Node: node,
|
|
Service: sn.ToNodeService(),
|
|
Checks: checks,
|
|
}
|
|
return stream.Event{
|
|
Topic: EventTopicServiceHealth,
|
|
Index: idx,
|
|
Payload: EventPayloadCheckServiceNode{
|
|
Op: pbsubscribe.CatalogOp_Register,
|
|
Value: csn,
|
|
},
|
|
}
|
|
}
|
|
|
|
func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream.Event {
|
|
// We actually only need the node name populated in the node part as it's only
|
|
// used as a key to know which service was deregistered so don't bother looking
|
|
// up the node in the DB. Note that while the ServiceNode does have NodeID
|
|
// etc. fields, they are never populated in memdb per the comment on that
|
|
// struct and only filled in when we return copies of the result to users.
|
|
// This is also important because if the service was deleted as part of a
|
|
// whole node deregistering then the node record won't actually exist now
|
|
// anyway and we'd have to plumb it through from the changeset above.
|
|
|
|
entMeta := sn.EnterpriseMeta
|
|
entMeta.Normalize()
|
|
|
|
csn := &structs.CheckServiceNode{
|
|
Node: &structs.Node{
|
|
Node: sn.Node,
|
|
Partition: entMeta.PartitionOrEmpty(),
|
|
PeerName: sn.PeerName,
|
|
},
|
|
Service: sn.ToNodeService(),
|
|
}
|
|
|
|
return stream.Event{
|
|
Topic: EventTopicServiceHealth,
|
|
Index: idx,
|
|
Payload: EventPayloadCheckServiceNode{
|
|
Op: pbsubscribe.CatalogOp_Deregister,
|
|
Value: csn,
|
|
},
|
|
}
|
|
}
|