state: handle terminating gateways in service health events

This commit is contained in:
Daniel Nephin 2020-08-07 14:18:24 -04:00
parent b241debee7
commit c17a5b0628
1 changed files with 32 additions and 23 deletions

View File

@ -218,17 +218,8 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
events = append(events, e)
}
if before.ServiceKind == structs.ServiceKindConnectProxy &&
before.ServiceProxy.DestinationServiceName != after.ServiceProxy.DestinationServiceName {
// Connect proxy changed the service it is representing, need to issue a
// dereg for the old service on the Connect topic. We don't actually need
// to deregister this sidecar service though as it still exists and
// didn't change its name (or if it did that was caught just above). But
// our mechanism for connect events is to convert them so we generate
// the regular one, convert it to Connect topic and then discard the
// original.
e := newServiceHealthEventDeregister(changes.Index, before)
events = append(events, serviceHealthToConnectEvents(e)...)
if e, ok := isConnectProxyDestinationServiceChange(changes.Index, before, after); ok {
events = append(events, e)
}
}
@ -252,6 +243,22 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
return events, nil
}
// isConnectProxyDestinationServiceChange handles the case where a Connect proxy changed
// the service it is proxying. We need to issue a de-registration for the old
// service on the Connect topic. We don't actually need to deregister this sidecar
// service though as it still exists and didn't change its name.
func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.ServiceNode) (stream.Event, bool) {
if before.ServiceKind != structs.ServiceKindConnectProxy ||
before.ServiceProxy.DestinationServiceName == after.ServiceProxy.DestinationServiceName {
return stream.Event{}, false
}
e := newServiceHealthEventDeregister(idx, before)
e.Topic = TopicServiceHealthConnect
e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName
return e, true
}
type changeType uint8
const (
@ -281,33 +288,35 @@ func changeTypeFromChange(change memdb.Change) changeType {
// switching connection details to be the proxy instead of the actual instance
// in case of a sidecar.
func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
var serviceHealthConnectEvents []stream.Event
var result []stream.Event
for _, event := range events {
if event.Topic != TopicServiceHealth {
// Skip non-health or any events already emitted to Connect topic
continue
}
node := getPayloadCheckServiceNode(event.Payload)
// TODO: do we need to handle gateways here as well?
if node.Service == nil ||
(node.Service.Kind != structs.ServiceKindConnectProxy && !node.Service.Connect.Native) {
// Event is not a service instance (i.e. just a node registration)
// or is not a service that is not connect-enabled in some way.
if node.Service == nil {
continue
}
connectEvent := event
connectEvent.Topic = TopicServiceHealthConnect
// If this is a proxy, set the key to the destination service name.
if node.Service.Kind == structs.ServiceKindConnectProxy {
connectEvent.Key = node.Service.Proxy.DestinationServiceName
}
switch {
case node.Service.Connect.Native:
result = append(result, connectEvent)
serviceHealthConnectEvents = append(serviceHealthConnectEvents, connectEvent)
case node.Service.Kind == structs.ServiceKindConnectProxy:
connectEvent.Key = node.Service.Proxy.DestinationServiceName
result = append(result, connectEvent)
default:
// ServiceKindTerminatingGateway changes are handled separately.
// All other cases are not relevant to the connect topic
}
}
return serviceHealthConnectEvents
return result
}
func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode {