state: use an enum for tracking node changes
This commit is contained in:
parent
7c3c627028
commit
668b98bcce
|
@ -68,20 +68,19 @@ type nodeServiceTuple struct {
|
||||||
func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
||||||
var events []stream.Event
|
var events []stream.Event
|
||||||
|
|
||||||
var nodeChanges map[string]*memdb.Change
|
var nodeChanges map[string]changeType
|
||||||
var serviceChanges map[nodeServiceTuple]*memdb.Change
|
var serviceChanges map[nodeServiceTuple]*memdb.Change
|
||||||
|
|
||||||
markNode := func(node string, nodeChange *memdb.Change) {
|
markNode := func(node string, typ changeType) {
|
||||||
if nodeChanges == nil {
|
if nodeChanges == nil {
|
||||||
nodeChanges = make(map[string]*memdb.Change)
|
nodeChanges = make(map[string]changeType)
|
||||||
}
|
}
|
||||||
// If the caller has an actual node mutation ensure we store it even if the
|
// If the caller has an actual node mutation ensure we store it even if the
|
||||||
// node is already marked. If the caller is just marking the node dirty
|
// node is already marked. If the caller is just marking the node dirty
|
||||||
// without an node change, don't overwrite any existing node change we know
|
// without an node change, don't overwrite any existing node change we know
|
||||||
// about.
|
// about.
|
||||||
ch := nodeChanges[node]
|
if nodeChanges[node] == changeIndirect {
|
||||||
if ch == nil {
|
nodeChanges[node] = typ
|
||||||
nodeChanges[node] = nodeChange
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
markService := func(node, service string, entMeta structs.EnterpriseMeta, svcChange *memdb.Change) {
|
markService := func(node, service string, entMeta structs.EnterpriseMeta, svcChange *memdb.Change) {
|
||||||
|
@ -111,20 +110,11 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
||||||
// we mark it anyway because if it _is_ a delete then we need to know that
|
// we mark it anyway because if it _is_ a delete then we need to know that
|
||||||
// later to avoid trying to deliver events when node level checks mark the
|
// later to avoid trying to deliver events when node level checks mark the
|
||||||
// node as "changed".
|
// node as "changed".
|
||||||
nRaw := change.After
|
n := changeObject(change).(*structs.Node)
|
||||||
if change.After == nil {
|
markNode(n.Node, changeTypeFromChange(change))
|
||||||
nRaw = change.Before
|
|
||||||
}
|
|
||||||
n := nRaw.(*structs.Node)
|
|
||||||
changeCopy := change
|
|
||||||
markNode(n.Node, &changeCopy)
|
|
||||||
|
|
||||||
case "services":
|
case "services":
|
||||||
snRaw := change.After
|
sn := changeObject(change).(*structs.ServiceNode)
|
||||||
if change.After == nil {
|
|
||||||
snRaw = change.Before
|
|
||||||
}
|
|
||||||
sn := snRaw.(*structs.ServiceNode)
|
|
||||||
changeCopy := change
|
changeCopy := change
|
||||||
markService(sn.Node, sn.ServiceID, sn.EnterpriseMeta, &changeCopy)
|
markService(sn.Node, sn.ServiceID, sn.EnterpriseMeta, &changeCopy)
|
||||||
|
|
||||||
|
@ -140,7 +130,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
||||||
after := change.After.(*structs.HealthCheck)
|
after := change.After.(*structs.HealthCheck)
|
||||||
if after.ServiceID == "" || before.ServiceID == "" {
|
if after.ServiceID == "" || before.ServiceID == "" {
|
||||||
// Either changed from or to being node-scoped
|
// Either changed from or to being node-scoped
|
||||||
markNode(after.Node, nil)
|
markNode(after.Node, changeIndirect)
|
||||||
} else {
|
} else {
|
||||||
// Check changed which means we just need to emit for the linked
|
// Check changed which means we just need to emit for the linked
|
||||||
// service.
|
// service.
|
||||||
|
@ -159,7 +149,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
||||||
before := change.Before.(*structs.HealthCheck)
|
before := change.Before.(*structs.HealthCheck)
|
||||||
if before.ServiceID == "" {
|
if before.ServiceID == "" {
|
||||||
// Node level check
|
// Node level check
|
||||||
markNode(before.Node, nil)
|
markNode(before.Node, changeIndirect)
|
||||||
} else {
|
} else {
|
||||||
markService(before.Node, before.ServiceID, before.EnterpriseMeta, nil)
|
markService(before.Node, before.ServiceID, before.EnterpriseMeta, nil)
|
||||||
}
|
}
|
||||||
|
@ -168,7 +158,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
||||||
after := change.After.(*structs.HealthCheck)
|
after := change.After.(*structs.HealthCheck)
|
||||||
if after.ServiceID == "" {
|
if after.ServiceID == "" {
|
||||||
// Node level check
|
// Node level check
|
||||||
markNode(after.Node, nil)
|
markNode(after.Node, changeIndirect)
|
||||||
} else {
|
} else {
|
||||||
markService(after.Node, after.ServiceID, after.EnterpriseMeta, nil)
|
markService(after.Node, after.ServiceID, after.EnterpriseMeta, nil)
|
||||||
}
|
}
|
||||||
|
@ -177,11 +167,8 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now act on those marked nodes/services
|
// Now act on those marked nodes/services
|
||||||
for node, change := range nodeChanges {
|
for node, changeType := range nodeChanges {
|
||||||
// change may be nil if there was a change that _affected_ the node
|
if changeType == changeDelete {
|
||||||
// like a change to checks but it didn't actually change the node
|
|
||||||
// record itself.
|
|
||||||
if change != nil && change.Deleted() {
|
|
||||||
// Node deletions are a no-op here since the state store transaction will
|
// Node deletions are a no-op here since the state store transaction will
|
||||||
// have also removed all the service instances which will be handled in
|
// have also removed all the service instances which will be handled in
|
||||||
// the loop below.
|
// the loop below.
|
||||||
|
@ -269,6 +256,28 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
||||||
return events, nil
|
return events, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type changeType uint8
|
||||||
|
|
||||||
|
const (
|
||||||
|
// changeIndirect indicates some other object changed which has implications
|
||||||
|
// for the target object.
|
||||||
|
changeIndirect changeType = iota
|
||||||
|
changeDelete
|
||||||
|
changeCreate
|
||||||
|
changeUpdate
|
||||||
|
)
|
||||||
|
|
||||||
|
func changeTypeFromChange(change memdb.Change) changeType {
|
||||||
|
switch {
|
||||||
|
case change.Deleted():
|
||||||
|
return changeDelete
|
||||||
|
case change.Created():
|
||||||
|
return changeCreate
|
||||||
|
default:
|
||||||
|
return changeUpdate
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// serviceHealthToConnectEvents converts already formatted service health
|
// serviceHealthToConnectEvents converts already formatted service health
|
||||||
// registration events into the ones needed to publish to the Connect topic.
|
// registration events into the ones needed to publish to the Connect topic.
|
||||||
// This essentially means filtering out any instances that are not Connect
|
// This essentially means filtering out any instances that are not Connect
|
||||||
|
|
Loading…
Reference in New Issue