nomad: deduplicate watch items with a helper
This commit is contained in:
parent
a4ee8929e3
commit
eaeec03e1e
|
@ -69,6 +69,7 @@ func (s *StateStore) Restore() (*StateRestore, error) {
|
|||
r := &StateRestore{
|
||||
txn: txn,
|
||||
watch: s.watch,
|
||||
items: make(watchItems),
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
@ -705,7 +706,7 @@ type StateSnapshot struct {
|
|||
type StateRestore struct {
|
||||
txn *memdb.Txn
|
||||
watch *stateWatch
|
||||
items []watchItem
|
||||
items watchItems
|
||||
}
|
||||
|
||||
// Abort is used to abort the restore operation
|
||||
|
@ -715,13 +716,13 @@ func (s *StateRestore) Abort() {
|
|||
|
||||
// Commit is used to commit the restore operation
|
||||
func (s *StateRestore) Commit() {
|
||||
s.txn.Defer(func() { s.watch.notify(s.items...) })
|
||||
s.txn.Defer(func() { s.watch.notify(s.items.items()...) })
|
||||
s.txn.Commit()
|
||||
}
|
||||
|
||||
// NodeRestore is used to restore a node
|
||||
func (r *StateRestore) NodeRestore(node *structs.Node) error {
|
||||
r.items = append(r.items, watchItem{table: "nodes"})
|
||||
r.items.add(watchItem{table: "nodes"})
|
||||
if err := r.txn.Insert("nodes", node); err != nil {
|
||||
return fmt.Errorf("node insert failed: %v", err)
|
||||
}
|
||||
|
@ -730,7 +731,7 @@ func (r *StateRestore) NodeRestore(node *structs.Node) error {
|
|||
|
||||
// JobRestore is used to restore a job
|
||||
func (r *StateRestore) JobRestore(job *structs.Job) error {
|
||||
r.items = append(r.items, watchItem{table: "jobs"})
|
||||
r.items.add(watchItem{table: "jobs"})
|
||||
if err := r.txn.Insert("jobs", job); err != nil {
|
||||
return fmt.Errorf("job insert failed: %v", err)
|
||||
}
|
||||
|
@ -739,7 +740,7 @@ func (r *StateRestore) JobRestore(job *structs.Job) error {
|
|||
|
||||
// EvalRestore is used to restore an evaluation
|
||||
func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error {
|
||||
r.items = append(r.items, watchItem{table: "evals"})
|
||||
r.items.add(watchItem{table: "evals"})
|
||||
if err := r.txn.Insert("evals", eval); err != nil {
|
||||
return fmt.Errorf("eval insert failed: %v", err)
|
||||
}
|
||||
|
@ -748,8 +749,8 @@ func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error {
|
|||
|
||||
// AllocRestore is used to restore an allocation
|
||||
func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error {
|
||||
r.items = append(r.items, watchItem{table: "allocs"})
|
||||
r.items = append(r.items, watchItem{allocNode: alloc.NodeID})
|
||||
r.items.add(watchItem{table: "allocs"})
|
||||
r.items.add(watchItem{allocNode: alloc.NodeID})
|
||||
if err := r.txn.Insert("allocs", alloc); err != nil {
|
||||
return fmt.Errorf("alloc insert failed: %v", err)
|
||||
}
|
||||
|
@ -775,6 +776,24 @@ type watchItem struct {
|
|||
table string
|
||||
}
|
||||
|
||||
// watchItems is a helper used to construct a set of watchItems. It deduplicates
|
||||
// the items as they are added using map keys.
|
||||
type watchItems map[watchItem]struct{}
|
||||
|
||||
// add adds an item to the watch set.
|
||||
func (w watchItems) add(wi watchItem) {
|
||||
w[wi] = struct{}{}
|
||||
}
|
||||
|
||||
// items returns the items as a slice.
|
||||
func (w watchItems) items() []watchItem {
|
||||
items := make([]watchItem, 0, len(w))
|
||||
for wi, _ := range w {
|
||||
items = append(items, wi)
|
||||
}
|
||||
return items
|
||||
}
|
||||
|
||||
// stateWatch holds shared state for watching updates. This is
|
||||
// outside of StateStore so it can be shared with snapshots.
|
||||
type stateWatch struct {
|
||||
|
|
Loading…
Reference in New Issue