open-nomad/nomad/state/events.go
Drew Bailey f9f5fe8236
Events switch on memdb change table instead of type to prevent duplicates (#9486)
* prevent duplicate job events

when a job is updated, the job_version table is updated with a structs.Job, this caused there to be multiple job events since we are switching off the change type and not the table

* test length

* add table value to tests
2020-12-01 15:14:05 -05:00

203 lines
5.5 KiB
Go

package state
import (
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)
var MsgTypeEvents = map[structs.MessageType]string{
structs.NodeRegisterRequestType: structs.TypeNodeRegistration,
structs.NodeDeregisterRequestType: structs.TypeNodeDeregistration,
structs.UpsertNodeEventsType: structs.TypeNodeEvent,
structs.EvalUpdateRequestType: structs.TypeEvalUpdated,
structs.AllocClientUpdateRequestType: structs.TypeAllocUpdated,
structs.JobRegisterRequestType: structs.TypeJobRegistered,
structs.AllocUpdateRequestType: structs.TypeAllocUpdated,
structs.NodeUpdateStatusRequestType: structs.TypeNodeEvent,
structs.JobDeregisterRequestType: structs.TypeJobDeregistered,
structs.JobBatchDeregisterRequestType: structs.TypeJobBatchDeregistered,
structs.AllocUpdateDesiredTransitionRequestType: structs.TypeAllocUpdateDesiredStatus,
structs.NodeUpdateEligibilityRequestType: structs.TypeNodeDrain,
structs.NodeUpdateDrainRequestType: structs.TypeNodeDrain,
structs.BatchNodeUpdateDrainRequestType: structs.TypeNodeDrain,
structs.DeploymentStatusUpdateRequestType: structs.TypeDeploymentUpdate,
structs.DeploymentPromoteRequestType: structs.TypeDeploymentPromotion,
structs.DeploymentAllocHealthRequestType: structs.TypeDeploymentAllocHealth,
structs.ApplyPlanResultsRequestType: structs.TypePlanResult,
structs.ACLTokenDeleteRequestType: structs.TypeACLTokenDeleted,
structs.ACLTokenUpsertRequestType: structs.TypeACLTokenUpserted,
structs.ACLPolicyDeleteRequestType: structs.TypeACLPolicyDeleted,
structs.ACLPolicyUpsertRequestType: structs.TypeACLPolicyUpserted,
}
func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {
eventType, ok := MsgTypeEvents[changes.MsgType]
if !ok {
return nil
}
var events []structs.Event
for _, change := range changes.Changes {
if event, ok := eventFromChange(change); ok {
event.Type = eventType
event.Index = changes.Index
events = append(events, event)
}
}
return &structs.Events{Index: changes.Index, Events: events}
}
func eventFromChange(change memdb.Change) (structs.Event, bool) {
if change.Deleted() {
switch change.Table {
case "acl_token":
before, ok := change.Before.(*structs.ACLToken)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicACLToken,
Key: before.AccessorID,
Payload: structs.ACLTokenEvent{
ACLToken: before,
},
}, true
case "acl_policy":
before, ok := change.Before.(*structs.ACLPolicy)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicACLPolicy,
Key: before.Name,
Payload: structs.ACLPolicyEvent{
ACLPolicy: before,
},
}, true
case "nodes":
before, ok := change.Before.(*structs.Node)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicNode,
Key: before.ID,
Payload: &structs.NodeStreamEvent{
Node: before,
},
}, true
}
return structs.Event{}, false
}
switch change.Table {
case "acl_token":
after, ok := change.After.(*structs.ACLToken)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicACLToken,
Key: after.AccessorID,
Payload: structs.ACLTokenEvent{
ACLToken: after,
},
}, true
case "acl_policy":
after, ok := change.After.(*structs.ACLPolicy)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicACLPolicy,
Key: after.Name,
Payload: structs.ACLPolicyEvent{
ACLPolicy: after,
},
}, true
case "evals":
after, ok := change.After.(*structs.Evaluation)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicEval,
Key: after.ID,
FilterKeys: []string{
after.JobID,
after.DeploymentID,
},
Namespace: after.Namespace,
Payload: &structs.EvalEvent{
Eval: after,
},
}, true
case "allocs":
after, ok := change.After.(*structs.Allocation)
if !ok {
return structs.Event{}, false
}
alloc := after.Copy()
filterKeys := []string{
alloc.JobID,
alloc.DeploymentID,
}
// remove job info to help keep size of alloc event down
alloc.Job = nil
return structs.Event{
Topic: structs.TopicAlloc,
Key: after.ID,
FilterKeys: filterKeys,
Namespace: after.Namespace,
Payload: &structs.AllocEvent{
Alloc: alloc,
},
}, true
case "jobs":
after, ok := change.After.(*structs.Job)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicJob,
Key: after.ID,
Namespace: after.Namespace,
Payload: &structs.JobEvent{
Job: after,
},
}, true
case "nodes":
after, ok := change.After.(*structs.Node)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicNode,
Key: after.ID,
Payload: &structs.NodeStreamEvent{
Node: after,
},
}, true
case "deployment":
after, ok := change.After.(*structs.Deployment)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicDeployment,
Key: after.ID,
Namespace: after.Namespace,
FilterKeys: []string{after.JobID},
Payload: &structs.DeploymentEvent{
Deployment: after,
},
}, true
}
return structs.Event{}, false
}