diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index cfb92bc24..8960a2e9d 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -218,9 +218,9 @@ func (j *Job) List(args *structs.JobListRequest, // Setup the blocking query opts := blockingOptions{ - queryOpts: &args.QueryOptions, - queryMeta: &reply.QueryMeta, - jobsWatch: true, + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + watchTables: []string{"jobs"}, run: func() error { // Capture all the jobs snap, err := j.srv.fsm.State().Snapshot() diff --git a/nomad/rpc.go b/nomad/rpc.go index a6b6595f3..dcb120cc8 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -268,11 +268,11 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) { // blockingOptions is used to parameterize blockingRPC type blockingOptions struct { - queryOpts *structs.QueryOptions - queryMeta *structs.QueryMeta - allocWatch string - jobsWatch bool - run func() error + queryOpts *structs.QueryOptions + queryMeta *structs.QueryMeta + allocWatch string + watchTables []string + run func() error } // blockingRPC is used for queries that need to wait for a @@ -310,9 +310,7 @@ func (s *Server) blockingRPC(opts *blockingOptions) error { if opts.allocWatch != "" { state.StopWatchAllocs(opts.allocWatch, notifyCh) } - if opts.jobsWatch { - state.StopWatchJobs(notifyCh) - } + state.StopWatchTables(notifyCh, opts.watchTables...) }() REGISTER_NOTIFY: @@ -321,9 +319,7 @@ REGISTER_NOTIFY: if opts.allocWatch != "" { state.WatchAllocs(opts.allocWatch, notifyCh) } - if opts.jobsWatch { - state.WatchJobs(notifyCh) - } + state.WatchTables(notifyCh, opts.watchTables...) RUN_QUERY: // Update the query meta data diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index a24fe9195..ac16b2ead 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -62,8 +62,47 @@ type stateWatch struct { allocs map[string]*NotifyGroup allocLock sync.Mutex - // Full table job watches - jobs *NotifyGroup + // Full table watches + tables map[string]*NotifyGroup + tableLock sync.Mutex +} + +// watchTable is used to subscribe a channel to a full table watch. +func (w *stateWatch) watchTable(table string, ch chan struct{}) { + w.tableLock.Lock() + defer w.tableLock.Unlock() + + tw, ok := w.tables[table] + if !ok { + tw = new(NotifyGroup) + w.tables[table] = tw + } + tw.Wait(ch) +} + +// stopWatchTable is used to unsubscribe a channel from a table watch. +func (w *stateWatch) stopWatchTable(table string, ch chan struct{}) { + w.tableLock.Lock() + defer w.tableLock.Unlock() + + if tw, ok := w.tables[table]; ok { + tw.Clear(ch) + if tw.Empty() { + delete(w.tables, table) + } + } +} + +// notifyTables is used to notify watchers of the given tables. +func (w *stateWatch) notifyTables(tables ...string) { + w.tableLock.Lock() + defer w.tableLock.Unlock() + + for _, table := range tables { + if tw, ok := w.tables[table]; ok { + tw.Notify() + } + } } // NewStateStore is used to create a new state store @@ -77,7 +116,7 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) { // Create the watch entry watch := &stateWatch{ allocs: make(map[string]*NotifyGroup), - jobs: &NotifyGroup{}, + tables: make(map[string]*NotifyGroup), } // Create the state store @@ -160,14 +199,18 @@ func (w *stateWatch) notifyAllocs(nodes map[string]struct{}) { } } -// WatchJobs is used to start watching the jobs view for changes. -func (s *StateStore) WatchJobs(notify chan struct{}) { - s.watch.jobs.Wait(notify) +// WatchTables is used to subscribe a channel to a set of tables. +func (s *StateStore) WatchTables(notify chan struct{}, tables ...string) { + for _, table := range tables { + s.watch.watchTable(table, notify) + } } -// StopWatchJobs is used to cancel notification on the given channel. -func (s *StateStore) StopWatchJobs(notify chan struct{}) { - s.watch.jobs.Clear(notify) +// StopWatchTables is used to unsubscribe a channel from table watches. +func (s *StateStore) StopWatchTables(notify chan struct{}, tables ...string) { + for _, table := range tables { + s.watch.stopWatchTable(table, notify) + } } // UpsertNode is used to register a node or update a node definition @@ -357,7 +400,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return fmt.Errorf("index update failed: %v", err) } - txn.Defer(func() { s.watch.jobs.Notify() }) + txn.Defer(func() { s.watch.notifyTables("jobs") }) txn.Commit() return nil } @@ -384,7 +427,7 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { return fmt.Errorf("index update failed: %v", err) } - txn.Defer(func() { s.watch.jobs.Notify() }) + txn.Defer(func() { s.watch.notifyTables("jobs") }) txn.Commit() return nil }