diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 0a8adc0d0..60af402d3 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -69,6 +69,7 @@ func (s *StateStore) Restore() (*StateRestore, error) { r := &StateRestore{ txn: txn, watch: s.watch, + items: make(watchItems), } return r, nil } @@ -705,7 +706,7 @@ type StateSnapshot struct { type StateRestore struct { txn *memdb.Txn watch *stateWatch - items []watchItem + items watchItems } // Abort is used to abort the restore operation @@ -715,13 +716,13 @@ func (s *StateRestore) Abort() { // Commit is used to commit the restore operation func (s *StateRestore) Commit() { - s.txn.Defer(func() { s.watch.notify(s.items...) }) + s.txn.Defer(func() { s.watch.notify(s.items.items()...) }) s.txn.Commit() } // NodeRestore is used to restore a node func (r *StateRestore) NodeRestore(node *structs.Node) error { - r.items = append(r.items, watchItem{table: "nodes"}) + r.items.add(watchItem{table: "nodes"}) if err := r.txn.Insert("nodes", node); err != nil { return fmt.Errorf("node insert failed: %v", err) } @@ -730,7 +731,7 @@ func (r *StateRestore) NodeRestore(node *structs.Node) error { // JobRestore is used to restore a job func (r *StateRestore) JobRestore(job *structs.Job) error { - r.items = append(r.items, watchItem{table: "jobs"}) + r.items.add(watchItem{table: "jobs"}) if err := r.txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) } @@ -739,7 +740,7 @@ func (r *StateRestore) JobRestore(job *structs.Job) error { // EvalRestore is used to restore an evaluation func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error { - r.items = append(r.items, watchItem{table: "evals"}) + r.items.add(watchItem{table: "evals"}) if err := r.txn.Insert("evals", eval); err != nil { return fmt.Errorf("eval insert failed: %v", err) } @@ -748,8 +749,8 @@ func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error { // AllocRestore is used to restore an allocation func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error { - r.items = append(r.items, watchItem{table: "allocs"}) - r.items = append(r.items, watchItem{allocNode: alloc.NodeID}) + r.items.add(watchItem{table: "allocs"}) + r.items.add(watchItem{allocNode: alloc.NodeID}) if err := r.txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } @@ -775,6 +776,24 @@ type watchItem struct { table string } +// watchItems is a helper used to construct a set of watchItems. It deduplicates +// the items as they are added using map keys. +type watchItems map[watchItem]struct{} + +// add adds an item to the watch set. +func (w watchItems) add(wi watchItem) { + w[wi] = struct{}{} +} + +// items returns the items as a slice. +func (w watchItems) items() []watchItem { + items := make([]watchItem, 0, len(w)) + for wi, _ := range w { + items = append(items, wi) + } + return items +} + // stateWatch holds shared state for watching updates. This is // outside of StateStore so it can be shared with snapshots. type stateWatch struct {