state: use changeType in serviceChanges
To be a little more explicit, instead of nil implying an indirect change
This commit is contained in:
parent
68682e7e83
commit
870823e8ed
|
@ -63,13 +63,36 @@ type nodeServiceTuple struct {
|
||||||
EntMeta structs.EnterpriseMeta
|
EntMeta structs.EnterpriseMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newNodeServiceTupleFromServiceNode(sn *structs.ServiceNode) nodeServiceTuple {
|
||||||
|
return nodeServiceTuple{
|
||||||
|
Node: sn.Node,
|
||||||
|
ServiceID: sn.ServiceID,
|
||||||
|
EntMeta: sn.EnterpriseMeta,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newNodeServiceTupleFromServiceHealthCheck(hc *structs.HealthCheck) nodeServiceTuple {
|
||||||
|
return nodeServiceTuple{
|
||||||
|
Node: hc.Node,
|
||||||
|
ServiceID: hc.ServiceID,
|
||||||
|
EntMeta: hc.EnterpriseMeta,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type serviceChange struct {
|
||||||
|
changeType changeType
|
||||||
|
change memdb.Change
|
||||||
|
}
|
||||||
|
|
||||||
|
var serviceChangeIndirect = serviceChange{changeType: changeIndirect}
|
||||||
|
|
||||||
// ServiceHealthEventsFromChanges returns all the service and Connect health
|
// ServiceHealthEventsFromChanges returns all the service and Connect health
|
||||||
// events that should be emitted given a set of changes to the state store.
|
// events that should be emitted given a set of changes to the state store.
|
||||||
func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
||||||
var events []stream.Event
|
var events []stream.Event
|
||||||
|
|
||||||
var nodeChanges map[string]changeType
|
var nodeChanges map[string]changeType
|
||||||
var serviceChanges map[nodeServiceTuple]*memdb.Change
|
var serviceChanges map[nodeServiceTuple]serviceChange
|
||||||
|
|
||||||
markNode := func(node string, typ changeType) {
|
markNode := func(node string, typ changeType) {
|
||||||
if nodeChanges == nil {
|
if nodeChanges == nil {
|
||||||
|
@ -83,21 +106,16 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
||||||
nodeChanges[node] = typ
|
nodeChanges[node] = typ
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
markService := func(node, service string, entMeta structs.EnterpriseMeta, svcChange *memdb.Change) {
|
markService := func(key nodeServiceTuple, svcChange serviceChange) {
|
||||||
if serviceChanges == nil {
|
if serviceChanges == nil {
|
||||||
serviceChanges = make(map[nodeServiceTuple]*memdb.Change)
|
serviceChanges = make(map[nodeServiceTuple]serviceChange)
|
||||||
}
|
|
||||||
k := nodeServiceTuple{
|
|
||||||
Node: node,
|
|
||||||
ServiceID: service,
|
|
||||||
EntMeta: entMeta,
|
|
||||||
}
|
}
|
||||||
// If the caller has an actual service mutation ensure we store it even if
|
// If the caller has an actual service mutation ensure we store it even if
|
||||||
// the service is already marked. If the caller is just marking the service
|
// the service is already marked. If the caller is just marking the service
|
||||||
// dirty without an node change, don't overwrite any existing node change we
|
// dirty without an node change, don't overwrite any existing node change we
|
||||||
// know about.
|
// know about.
|
||||||
if serviceChanges[k] == nil {
|
if serviceChanges[key].changeType == changeIndirect {
|
||||||
serviceChanges[k] = svcChange
|
serviceChanges[key] = svcChange
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,8 +132,8 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
||||||
|
|
||||||
case "services":
|
case "services":
|
||||||
sn := changeObject(change).(*structs.ServiceNode)
|
sn := changeObject(change).(*structs.ServiceNode)
|
||||||
changeCopy := change // TODO: why does the change need to be copied?
|
srvChange := serviceChange{changeType: changeTypeFromChange(change), change: change}
|
||||||
markService(sn.Node, sn.ServiceID, sn.EnterpriseMeta, &changeCopy)
|
markService(newNodeServiceTupleFromServiceNode(sn), srvChange)
|
||||||
|
|
||||||
case "checks":
|
case "checks":
|
||||||
// For health we only care about the scope for now to know if it's just
|
// For health we only care about the scope for now to know if it's just
|
||||||
|
@ -133,14 +151,14 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
||||||
} else {
|
} else {
|
||||||
// Check changed which means we just need to emit for the linked
|
// Check changed which means we just need to emit for the linked
|
||||||
// service.
|
// service.
|
||||||
markService(after.Node, after.ServiceID, after.EnterpriseMeta, nil)
|
markService(newNodeServiceTupleFromServiceHealthCheck(after), serviceChangeIndirect)
|
||||||
|
|
||||||
// Edge case - if the check with same ID was updated to link to a
|
// Edge case - if the check with same ID was updated to link to a
|
||||||
// different service ID but the old service with old ID still exists,
|
// different service ID but the old service with old ID still exists,
|
||||||
// then the old service instance needs updating too as it has one
|
// then the old service instance needs updating too as it has one
|
||||||
// fewer checks now.
|
// fewer checks now.
|
||||||
if before.ServiceID != after.ServiceID {
|
if before.ServiceID != after.ServiceID {
|
||||||
markService(before.Node, before.ServiceID, before.EnterpriseMeta, nil)
|
markService(newNodeServiceTupleFromServiceHealthCheck(before), serviceChangeIndirect)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,7 +168,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
||||||
// Node level check
|
// Node level check
|
||||||
markNode(obj.Node, changeIndirect)
|
markNode(obj.Node, changeIndirect)
|
||||||
} else {
|
} else {
|
||||||
markService(obj.Node, obj.ServiceID, obj.EnterpriseMeta, nil)
|
markService(newNodeServiceTupleFromServiceHealthCheck(obj), serviceChangeIndirect)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -172,12 +190,12 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
||||||
events = append(events, es...)
|
events = append(events, es...)
|
||||||
}
|
}
|
||||||
|
|
||||||
for tuple, change := range serviceChanges {
|
for tuple, srvChange := range serviceChanges {
|
||||||
// change may be nil if there was a change that _affected_ the service
|
// change may be nil if there was a change that _affected_ the service
|
||||||
// like a change to checks but it didn't actually change the service
|
// like a change to checks but it didn't actually change the service
|
||||||
// record itself.
|
// record itself.
|
||||||
if change != nil && change.Deleted() {
|
if srvChange.changeType == changeDelete {
|
||||||
sn := change.Before.(*structs.ServiceNode)
|
sn := srvChange.change.Before.(*structs.ServiceNode)
|
||||||
e := newServiceHealthEventDeregister(changes.Index, sn)
|
e := newServiceHealthEventDeregister(changes.Index, sn)
|
||||||
events = append(events, e)
|
events = append(events, e)
|
||||||
continue
|
continue
|
||||||
|
@ -186,9 +204,9 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
||||||
// Check if this was a service mutation that changed it's name which
|
// Check if this was a service mutation that changed it's name which
|
||||||
// requires special handling even if node changed and new events were
|
// requires special handling even if node changed and new events were
|
||||||
// already published.
|
// already published.
|
||||||
if change != nil && change.Updated() {
|
if srvChange.changeType == changeUpdate {
|
||||||
before := change.Before.(*structs.ServiceNode)
|
before := srvChange.change.Before.(*structs.ServiceNode)
|
||||||
after := change.After.(*structs.ServiceNode)
|
after := srvChange.change.After.(*structs.ServiceNode)
|
||||||
|
|
||||||
if before.ServiceName != after.ServiceName {
|
if before.ServiceName != after.ServiceName {
|
||||||
// Service was renamed, the code below will ensure the new registrations
|
// Service was renamed, the code below will ensure the new registrations
|
||||||
|
|
Loading…
Reference in New Issue