Remove setters

This commit is contained in:
Alex Dadgar 2017-07-03 11:26:45 -07:00
parent c189948ad2
commit 7154e4e08f
3 changed files with 25 additions and 45 deletions

View File

@ -110,39 +110,23 @@ type Watcher struct {
// NewDeploymentsWatcher returns a deployments watcher that is used to watch
// deployments and trigger the scheduler as needed.
func NewDeploymentsWatcher(logger *log.Logger, stateQueriesPerSecond float64,
func NewDeploymentsWatcher(logger *log.Logger, watchers DeploymentStateWatchers,
raft DeploymentRaftEndpoints, stateQueriesPerSecond float64,
evalBatchDuration time.Duration) *Watcher {
return &Watcher{
stateWatchers: watchers,
raft: raft,
queryLimiter: rate.NewLimiter(rate.Limit(stateQueriesPerSecond), 100),
evalBatchDuration: evalBatchDuration,
logger: logger,
}
}
// SetStateWatchers sets the interface for accessing state watchers
func (w *Watcher) SetStateWatchers(watchers DeploymentStateWatchers) {
w.l.Lock()
defer w.l.Unlock()
w.stateWatchers = watchers
}
// SetRaftEndpoints sets the interface for writing to Raft
func (w *Watcher) SetRaftEndpoints(raft DeploymentRaftEndpoints) {
w.l.Lock()
defer w.l.Unlock()
w.raft = raft
}
// SetEnabled is used to control if the watcher is enabled. The watcher
// should only be enabled on the active leader.
func (w *Watcher) SetEnabled(enabled bool) error {
w.l.Lock()
// Ensure our state is correct
if w.stateWatchers == nil || w.raft == nil {
return fmt.Errorf("State watchers and Raft endpoints must be set before starting")
}
wasEnabled := w.enabled
w.enabled = enabled
w.l.Unlock()

View File

@ -16,9 +16,7 @@ import (
func testDeploymentWatcher(t *testing.T, qps float64, batchDur time.Duration) (*Watcher, *mockBackend) {
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), qps, batchDur)
w.SetStateWatchers(m)
w.SetRaftEndpoints(m)
w := NewDeploymentsWatcher(testLogger(), m, m, qps, batchDur)
return w, m
}

View File

@ -224,28 +224,22 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg
incomingTLS = itls
}
// Create the deployment watcher
watcher := deploymentwatcher.NewDeploymentsWatcher(logger,
deploymentwatcher.LimitStateQueriesPerSecond,
deploymentwatcher.EvalBatchDuration)
// Create the server
s := &Server{
config: config,
consulCatalog: consulCatalog,
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
logger: logger,
rpcServer: rpc.NewServer(),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
reconcileCh: make(chan serf.Member, 32),
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
blockedEvals: blockedEvals,
deploymentWatcher: watcher,
planQueue: planQueue,
rpcTLS: incomingTLS,
shutdownCh: make(chan struct{}),
config: config,
consulCatalog: consulCatalog,
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
logger: logger,
rpcServer: rpc.NewServer(),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
reconcileCh: make(chan serf.Member, 32),
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
blockedEvals: blockedEvals,
planQueue: planQueue,
rpcTLS: incomingTLS,
shutdownCh: make(chan struct{}),
}
// Create the periodic dispatcher for launching periodic jobs.
@ -698,8 +692,12 @@ func (s *Server) setupDeploymentWatcher() error {
apply: s.raftApply,
}
s.deploymentWatcher.SetStateWatchers(stateShim)
s.deploymentWatcher.SetRaftEndpoints(raftShim)
// Create the deployment watcher
s.deploymentWatcher = deploymentwatcher.NewDeploymentsWatcher(
s.logger, stateShim, raftShim,
deploymentwatcher.LimitStateQueriesPerSecond,
deploymentwatcher.EvalBatchDuration)
return nil
}