state: Add Change processor and snapshotter for service health
Co-authored-by: Paul Banks <banks@banksco.de>
This commit is contained in:
parent
6a1a43721d
commit
fdfe176deb
|
@ -0,0 +1,494 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ServiceHealthSnapshot is a stream.SnapFn that provides a streaming snapshot
|
||||||
|
// of stream.Events that describe the current state of a service health query.
|
||||||
|
func (s *Store) ServiceHealthSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
// TODO(namespace-streaming): plumb entMeta through from SubscribeRequest
|
||||||
|
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = checkServiceNodesToServiceHealth(idx, nodes, buf, TopicServiceHealth)
|
||||||
|
return idx, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServiceHealthSnapshot is a stream.SnapFn that provides a streaming snapshot
|
||||||
|
// of stream.Events that describe the current state of a service connect health
|
||||||
|
// query.
|
||||||
|
func (s *Store) ServiceHealthConnectSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
// TODO(namespace-streaming): plumb entMeta through from SubscribeRequest
|
||||||
|
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, true, nil)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = checkServiceNodesToServiceHealth(idx, nodes, buf, TopicServiceHealthConnect)
|
||||||
|
return idx, err
|
||||||
|
}
|
||||||
|
|
||||||
|
type changeOp int
|
||||||
|
|
||||||
|
const (
|
||||||
|
OpDelete changeOp = iota
|
||||||
|
OpCreate
|
||||||
|
OpUpdate
|
||||||
|
)
|
||||||
|
|
||||||
|
type eventPayload struct {
|
||||||
|
Op changeOp
|
||||||
|
Obj interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkServiceNodesToServiceHealth converts a list of structs.CheckServiceNodes
|
||||||
|
// to stream.ServiceHealth events for streaming. If a non-nil event buffer is
|
||||||
|
// passed, events are appended to the buffer one at a time and an nil slice is
|
||||||
|
// returned to avoid keeping a full copy in memory.
|
||||||
|
func checkServiceNodesToServiceHealth(idx uint64, nodes structs.CheckServiceNodes,
|
||||||
|
buf stream.SnapshotAppender, topic topic) ([]stream.Event, error) {
|
||||||
|
var events []stream.Event
|
||||||
|
for _, n := range nodes {
|
||||||
|
event := stream.Event{
|
||||||
|
Index: idx,
|
||||||
|
Topic: topic,
|
||||||
|
Payload: eventPayload{
|
||||||
|
Op: OpCreate,
|
||||||
|
Obj: &n,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if n.Service != nil {
|
||||||
|
event.Key = n.Service.Service
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: always called with a non-nil buf?
|
||||||
|
if buf != nil {
|
||||||
|
buf.Append([]stream.Event{event})
|
||||||
|
} else {
|
||||||
|
events = append(events, event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return events, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type nodeServiceTuple struct {
|
||||||
|
Node string
|
||||||
|
ServiceID string
|
||||||
|
EntMeta structs.EnterpriseMeta
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServiceHealthEventsFromChanges returns all the service and Connect health
|
||||||
|
// events that should be emitted given a set of changes to the state store.
|
||||||
|
func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
||||||
|
var events []stream.Event
|
||||||
|
|
||||||
|
var nodeChanges map[string]*memdb.Change
|
||||||
|
var serviceChanges map[nodeServiceTuple]*memdb.Change
|
||||||
|
|
||||||
|
markNode := func(node string, nodeChange *memdb.Change) {
|
||||||
|
if nodeChanges == nil {
|
||||||
|
nodeChanges = make(map[string]*memdb.Change)
|
||||||
|
}
|
||||||
|
// If the caller has an actual node mutation ensure we store it even if the
|
||||||
|
// node is already marked. If the caller is just marking the node dirty
|
||||||
|
// without an node change, don't overwrite any existing node change we know
|
||||||
|
// about.
|
||||||
|
ch := nodeChanges[node]
|
||||||
|
if ch == nil {
|
||||||
|
nodeChanges[node] = nodeChange
|
||||||
|
}
|
||||||
|
}
|
||||||
|
markService := func(node, service string, entMeta structs.EnterpriseMeta, svcChange *memdb.Change) {
|
||||||
|
if serviceChanges == nil {
|
||||||
|
serviceChanges = make(map[nodeServiceTuple]*memdb.Change)
|
||||||
|
}
|
||||||
|
k := nodeServiceTuple{
|
||||||
|
Node: node,
|
||||||
|
ServiceID: service,
|
||||||
|
EntMeta: entMeta,
|
||||||
|
}
|
||||||
|
// If the caller has an actual service mutation ensure we store it even if
|
||||||
|
// the service is already marked. If the caller is just marking the service
|
||||||
|
// dirty without an node change, don't overwrite any existing node change we
|
||||||
|
// know about.
|
||||||
|
ch := serviceChanges[k]
|
||||||
|
if ch == nil {
|
||||||
|
serviceChanges[k] = svcChange
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, change := range changes.Changes {
|
||||||
|
switch change.Table {
|
||||||
|
case "nodes":
|
||||||
|
// Node changed in some way, if it's not a delete, we'll need to
|
||||||
|
// re-deliver CheckServiceNode results for all services on that node but
|
||||||
|
// we mark it anyway because if it _is_ a delete then we need to know that
|
||||||
|
// later to avoid trying to deliver events when node level checks mark the
|
||||||
|
// node as "changed".
|
||||||
|
nRaw := change.After
|
||||||
|
if change.After == nil {
|
||||||
|
nRaw = change.Before
|
||||||
|
}
|
||||||
|
n := nRaw.(*structs.Node)
|
||||||
|
changeCopy := change
|
||||||
|
markNode(n.Node, &changeCopy)
|
||||||
|
|
||||||
|
case "services":
|
||||||
|
snRaw := change.After
|
||||||
|
if change.After == nil {
|
||||||
|
snRaw = change.Before
|
||||||
|
}
|
||||||
|
sn := snRaw.(*structs.ServiceNode)
|
||||||
|
changeCopy := change
|
||||||
|
markService(sn.Node, sn.ServiceID, sn.EnterpriseMeta, &changeCopy)
|
||||||
|
|
||||||
|
case "checks":
|
||||||
|
// For health we only care about the scope for now to know if it's just
|
||||||
|
// affecting a single service or every service on a node. There is a
|
||||||
|
// subtle edge case where the check with same ID changes from being node
|
||||||
|
// scoped to service scoped or vice versa, in either case we need to treat
|
||||||
|
// it as affecting all services on the node.
|
||||||
|
switch {
|
||||||
|
case change.Updated():
|
||||||
|
before := change.Before.(*structs.HealthCheck)
|
||||||
|
after := change.After.(*structs.HealthCheck)
|
||||||
|
if after.ServiceID == "" || before.ServiceID == "" {
|
||||||
|
// Either changed from or to being node-scoped
|
||||||
|
markNode(after.Node, nil)
|
||||||
|
} else {
|
||||||
|
// Check changed which means we just need to emit for the linked
|
||||||
|
// service.
|
||||||
|
markService(after.Node, after.ServiceID, after.EnterpriseMeta, nil)
|
||||||
|
|
||||||
|
// Edge case - if the check with same ID was updated to link to a
|
||||||
|
// different service ID but the old service with old ID still exists,
|
||||||
|
// then the old service instance needs updating too as it has one
|
||||||
|
// fewer checks now.
|
||||||
|
if before.ServiceID != after.ServiceID {
|
||||||
|
markService(before.Node, before.ServiceID, before.EnterpriseMeta, nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case change.Deleted():
|
||||||
|
before := change.Before.(*structs.HealthCheck)
|
||||||
|
if before.ServiceID == "" {
|
||||||
|
// Node level check
|
||||||
|
markNode(before.Node, nil)
|
||||||
|
} else {
|
||||||
|
markService(before.Node, before.ServiceID, before.EnterpriseMeta, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
case change.Created():
|
||||||
|
after := change.After.(*structs.HealthCheck)
|
||||||
|
if after.ServiceID == "" {
|
||||||
|
// Node level check
|
||||||
|
markNode(after.Node, nil)
|
||||||
|
} else {
|
||||||
|
markService(after.Node, after.ServiceID, after.EnterpriseMeta, nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now act on those marked nodes/services
|
||||||
|
for node, change := range nodeChanges {
|
||||||
|
// change may be nil if there was a change that _affected_ the node
|
||||||
|
// like a change to checks but it didn't actually change the node
|
||||||
|
// record itself.
|
||||||
|
if change != nil && change.Deleted() {
|
||||||
|
// Node deletions are a no-op here since the state store transaction will
|
||||||
|
// have also removed all the service instances which will be handled in
|
||||||
|
// the loop below.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Rebuild events for all services on this node
|
||||||
|
es, err := serviceHealthEventsForNode(tx, changes.Index, node)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
events = append(events, es...)
|
||||||
|
}
|
||||||
|
|
||||||
|
for tuple, change := range serviceChanges {
|
||||||
|
// change may be nil if there was a change that _affected_ the service
|
||||||
|
// like a change to checks but it didn't actually change the service
|
||||||
|
// record itself.
|
||||||
|
if change != nil && change.Deleted() {
|
||||||
|
// Generate delete event for the service instance and append it
|
||||||
|
sn := change.Before.(*structs.ServiceNode)
|
||||||
|
es, err := serviceHealthDeregEventsForServiceInstance(changes.Index, sn, &tuple.EntMeta)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
events = append(events, es...)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if this was a service mutation that changed it's name which
|
||||||
|
// requires special handling even if node changed and new events were
|
||||||
|
// already published.
|
||||||
|
if change != nil && change.Updated() {
|
||||||
|
before := change.Before.(*structs.ServiceNode)
|
||||||
|
after := change.After.(*structs.ServiceNode)
|
||||||
|
|
||||||
|
if before.ServiceName != after.ServiceName {
|
||||||
|
// Service was renamed, the code below will ensure the new registrations
|
||||||
|
// go out to subscribers to the new service name topic key, but we need
|
||||||
|
// to fix up subscribers that were watching the old name by sending
|
||||||
|
// deregistrations.
|
||||||
|
es, err := serviceHealthDeregEventsForServiceInstance(changes.Index, before, &tuple.EntMeta)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
events = append(events, es...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if before.ServiceKind == structs.ServiceKindConnectProxy &&
|
||||||
|
before.ServiceProxy.DestinationServiceName != after.ServiceProxy.DestinationServiceName {
|
||||||
|
// Connect proxy changed the service it is representing, need to issue a
|
||||||
|
// dereg for the old service on the Connect topic. We don't actually need
|
||||||
|
// to deregister this sidecar service though as it still exists and
|
||||||
|
// didn't change its name (or if it did that was caught just above). But
|
||||||
|
// our mechanism for connect events is to convert them so we generate
|
||||||
|
// the regular one, convert it to Connect topic and then discar the
|
||||||
|
// original.
|
||||||
|
es, err := serviceHealthDeregEventsForServiceInstance(changes.Index, before, &tuple.EntMeta)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Don't append es per comment above, but convert it to connect topic
|
||||||
|
// events.
|
||||||
|
es = serviceHealthToConnectEvents(es)
|
||||||
|
events = append(events, es...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := nodeChanges[tuple.Node]; ok {
|
||||||
|
// We already rebuilt events for everything on this node, no need to send
|
||||||
|
// a duplicate.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Build service event and append it
|
||||||
|
es, err := serviceHealthEventsForServiceInstance(tx, changes.Index, tuple)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
events = append(events, es...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Duplicate any events that affected connect-enabled instances (proxies or
|
||||||
|
// native apps) to the relevant Connect topic.
|
||||||
|
events = append(events, serviceHealthToConnectEvents(events)...)
|
||||||
|
|
||||||
|
return events, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// serviceHealthToConnectEvents converts already formatted service health
|
||||||
|
// registration events into the ones needed to publish to the Connect topic.
|
||||||
|
// This essentially means filtering out any instances that are not Connect
|
||||||
|
// enabled and so of no interest to those subscribers but also involves
|
||||||
|
// switching connection details to be the proxy instead of the actual instance
|
||||||
|
// in case of a sidecar.
|
||||||
|
func serviceHealthToConnectEvents(events []stream.Event) []stream.Event {
|
||||||
|
serviceHealthConnectEvents := make([]stream.Event, 0, len(events))
|
||||||
|
for _, event := range events {
|
||||||
|
if event.Topic != TopicServiceHealth {
|
||||||
|
// Skip non-health or any events already emitted to Connect topic
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
node := getPayloadCheckServiceNode(event.Payload)
|
||||||
|
if node.Service == nil ||
|
||||||
|
(node.Service.Kind != structs.ServiceKindConnectProxy && !node.Service.Connect.Native) {
|
||||||
|
// Event is not a service instance (i.e. just a node registration)
|
||||||
|
// or is not a service that is not connect-enabled in some way.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
connectEvent := event
|
||||||
|
connectEvent.Topic = TopicServiceHealthConnect
|
||||||
|
|
||||||
|
// If this is a proxy, set the key to the destination service name.
|
||||||
|
if node.Service.Kind == structs.ServiceKindConnectProxy {
|
||||||
|
connectEvent.Key = node.Service.Proxy.DestinationServiceName
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceHealthConnectEvents = append(serviceHealthConnectEvents, connectEvent)
|
||||||
|
}
|
||||||
|
|
||||||
|
return serviceHealthConnectEvents
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode {
|
||||||
|
ep, ok := payload.(eventPayload)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
csn, ok := ep.Obj.(*structs.CheckServiceNode)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return csn
|
||||||
|
}
|
||||||
|
|
||||||
|
// serviceHealthEventsForNode returns health events for all services on the
|
||||||
|
// given node. This mirrors some of the the logic in the oddly-named
|
||||||
|
// parseCheckServiceNodes but is more efficient since we know they are all on
|
||||||
|
// the same node.
|
||||||
|
func serviceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.Event, error) {
|
||||||
|
// TODO(namespace-streaming): figure out the right EntMeta and mystery arg.
|
||||||
|
services, err := catalogServiceListByNode(tx, node, nil, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
n, nodeChecks, svcChecks, err := getNodeAndChecks(tx, node)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var events []stream.Event
|
||||||
|
for service := services.Next(); service != nil; service = services.Next() {
|
||||||
|
sn := service.(*structs.ServiceNode)
|
||||||
|
|
||||||
|
es, err := serviceHealthEventsForServiceNodeInternal(idx, n, sn, nodeChecks, svcChecks)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append to the results.
|
||||||
|
events = append(events, es...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return events, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getNodeAndNodeChecks returns a specific node and ALL checks on that node
|
||||||
|
// (both node specific and service-specific). node-level Checks are returned as
|
||||||
|
// a slice, service-specific checks as a map of slices with the service id as
|
||||||
|
// the map key.
|
||||||
|
func getNodeAndChecks(tx ReadTxn, node string) (*structs.Node,
|
||||||
|
structs.HealthChecks, map[string]structs.HealthChecks, error) {
|
||||||
|
// Fetch the node
|
||||||
|
nodeRaw, err := tx.First("nodes", "id", node)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, err
|
||||||
|
}
|
||||||
|
if nodeRaw == nil {
|
||||||
|
return nil, nil, nil, ErrMissingNode
|
||||||
|
}
|
||||||
|
n := nodeRaw.(*structs.Node)
|
||||||
|
|
||||||
|
// TODO(namespace-streaming): work out what EntMeta is needed here, wildcard?
|
||||||
|
iter, err := catalogListChecksByNode(tx, node, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var nodeChecks structs.HealthChecks
|
||||||
|
var svcChecks map[string]structs.HealthChecks
|
||||||
|
|
||||||
|
for check := iter.Next(); check != nil; check = iter.Next() {
|
||||||
|
check := check.(*structs.HealthCheck)
|
||||||
|
if check.ServiceID == "" {
|
||||||
|
nodeChecks = append(nodeChecks, check)
|
||||||
|
} else {
|
||||||
|
if svcChecks == nil {
|
||||||
|
svcChecks = make(map[string]structs.HealthChecks)
|
||||||
|
}
|
||||||
|
svcChecks[check.ServiceID] = append(svcChecks[check.ServiceID], check)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return n, nodeChecks, svcChecks, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func serviceHealthEventsForServiceInstance(tx ReadTxn, idx uint64, tuple nodeServiceTuple) ([]stream.Event, error) {
|
||||||
|
n, nodeChecks, svcChecks, err := getNodeAndChecks(tx, tuple.Node)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
svc, err := getCompoundWithTxn(tx, "services", "id", &tuple.EntMeta, tuple.Node, tuple.ServiceID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
sn := svc.Next()
|
||||||
|
if sn == nil {
|
||||||
|
return nil, ErrMissingService
|
||||||
|
}
|
||||||
|
|
||||||
|
return serviceHealthEventsForServiceNodeInternal(idx, n, sn.(*structs.ServiceNode), nodeChecks, svcChecks)
|
||||||
|
}
|
||||||
|
|
||||||
|
func serviceHealthEventsForServiceNodeInternal(idx uint64,
|
||||||
|
node *structs.Node,
|
||||||
|
sn *structs.ServiceNode,
|
||||||
|
nodeChecks structs.HealthChecks,
|
||||||
|
svcChecks map[string]structs.HealthChecks) ([]stream.Event, error) {
|
||||||
|
|
||||||
|
// Start with a copy of the node checks.
|
||||||
|
checks := nodeChecks
|
||||||
|
for _, check := range svcChecks[sn.ServiceID] {
|
||||||
|
checks = append(checks, check)
|
||||||
|
}
|
||||||
|
|
||||||
|
csn := &structs.CheckServiceNode{
|
||||||
|
Node: node,
|
||||||
|
Service: sn.ToNodeService(),
|
||||||
|
Checks: checks,
|
||||||
|
}
|
||||||
|
e := stream.Event{
|
||||||
|
Topic: TopicServiceHealth,
|
||||||
|
Key: sn.ServiceName,
|
||||||
|
Index: idx,
|
||||||
|
Payload: eventPayload{
|
||||||
|
Op: OpCreate,
|
||||||
|
Obj: csn,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// See if we also need to emit a connect event (i.e. if this instance is a
|
||||||
|
// connect proxy or connect native app).
|
||||||
|
|
||||||
|
return []stream.Event{e}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func serviceHealthDeregEventsForServiceInstance(idx uint64,
|
||||||
|
sn *structs.ServiceNode, entMeta *structs.EnterpriseMeta) ([]stream.Event, error) {
|
||||||
|
|
||||||
|
// We actually only need the node name populated in the node part as it's only
|
||||||
|
// used as a key to know which service was deregistered so don't bother looking
|
||||||
|
// up the node in the DB. Note that while the ServiceNode does have NodeID
|
||||||
|
// etc. fields, they are never populated in memdb per the comment on that
|
||||||
|
// struct and only filled in when we return copies of the result to users.
|
||||||
|
// This is also important because if the service was deleted as part of a
|
||||||
|
// whole node deregistering then the node record won't actually exist now
|
||||||
|
// anyway and we'd have to plumb it through from the changeset above.
|
||||||
|
csn := &structs.CheckServiceNode{
|
||||||
|
Node: &structs.Node{
|
||||||
|
Node: sn.Node,
|
||||||
|
},
|
||||||
|
Service: sn.ToNodeService(),
|
||||||
|
}
|
||||||
|
|
||||||
|
e := stream.Event{
|
||||||
|
Topic: TopicServiceHealth,
|
||||||
|
Key: sn.ServiceName,
|
||||||
|
Index: idx,
|
||||||
|
Payload: eventPayload{
|
||||||
|
Op: OpDelete,
|
||||||
|
Obj: csn,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return []stream.Event{e}, nil
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -165,11 +165,34 @@ func (t topic) String() string {
|
||||||
return string(t)
|
return string(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
// TopicServiceHealth contains events for all registered service instances.
|
||||||
|
TopicServiceHealth topic = "topic-service-health"
|
||||||
|
// TopicServiceHealthConnect contains events for connect-enabled service instances.
|
||||||
|
TopicServiceHealthConnect topic = "topic-service-health-connect"
|
||||||
|
)
|
||||||
|
|
||||||
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
||||||
// TODO: add other table handlers here.
|
var events []stream.Event
|
||||||
return aclChangeUnsubscribeEvent(tx, changes)
|
fns := []func(tx ReadTxn, changes Changes) ([]stream.Event, error){
|
||||||
|
aclChangeUnsubscribeEvent,
|
||||||
|
ServiceHealthEventsFromChanges,
|
||||||
|
// TODO: add other table handlers here.
|
||||||
|
}
|
||||||
|
for _, fn := range fns {
|
||||||
|
e, err := fn(tx, changes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
events = append(events, e...)
|
||||||
|
}
|
||||||
|
return events, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSnapshotHandlers() stream.SnapshotHandlers {
|
// TODO: could accept a ReadTxner instead of a Store
|
||||||
return stream.SnapshotHandlers{}
|
func newSnapshotHandlers(s *Store) stream.SnapshotHandlers {
|
||||||
|
return stream.SnapshotHandlers{
|
||||||
|
TopicServiceHealth: s.ServiceHealthSnapshot,
|
||||||
|
TopicServiceHealthConnect: s.ServiceHealthConnectSnapshot,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -162,17 +162,17 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.TODO())
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
s := &Store{
|
s := &Store{
|
||||||
schema: schema,
|
schema: schema,
|
||||||
abandonCh: make(chan struct{}),
|
abandonCh: make(chan struct{}),
|
||||||
kvsGraveyard: NewGraveyard(gc),
|
kvsGraveyard: NewGraveyard(gc),
|
||||||
lockDelay: NewDelay(),
|
lockDelay: NewDelay(),
|
||||||
db: &changeTrackerDB{
|
|
||||||
db: db,
|
|
||||||
publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(), 10*time.Second),
|
|
||||||
processChanges: processDBChanges,
|
|
||||||
},
|
|
||||||
stopEventPublisher: cancel,
|
stopEventPublisher: cancel,
|
||||||
}
|
}
|
||||||
|
s.db = &changeTrackerDB{
|
||||||
|
db: db,
|
||||||
|
publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(s), 10*time.Second),
|
||||||
|
processChanges: processDBChanges,
|
||||||
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue