a112161f60
* expiration: Add a few metrics to measure revoke queue lengths * Update the metric names * Add appropriate cluster labels * Add metrics to docs * Update jobmanager.go
260 lines
6.6 KiB
Go
260 lines
6.6 KiB
Go
package fairshare
|
|
|
|
import (
|
|
"container/list"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"sync"
|
|
|
|
"github.com/armon/go-metrics"
|
|
log "github.com/hashicorp/go-hclog"
|
|
uuid "github.com/hashicorp/go-uuid"
|
|
"github.com/hashicorp/vault/helper/metricsutil"
|
|
"github.com/hashicorp/vault/sdk/helper/logging"
|
|
)
|
|
|
|
/*
|
|
Future Work:
|
|
- track workers per queue. this will involve things like:
|
|
- somehow wrap the Execute/OnFailure functions to increment counter when
|
|
they start running, and decrement when they stop running
|
|
-- put a queue.IncrementCounter() call at the beginning
|
|
-- call the provided work function in the middle
|
|
-- put a queue.DecrementCounter() call at the end
|
|
- job has a queueID or reference to the queue
|
|
- queue only removed when empty AND no workers
|
|
*/
|
|
|
|
type JobManager struct {
|
|
name string
|
|
queues map[string]*list.List
|
|
queuesIndex []string
|
|
lastQueueAccessed int
|
|
quit chan struct{}
|
|
newWork chan struct{} // must be buffered
|
|
workerPool *dispatcher
|
|
onceStart sync.Once
|
|
onceStop sync.Once
|
|
logger log.Logger
|
|
totalJobs int
|
|
metricSink *metricsutil.ClusterMetricSink
|
|
|
|
// waitgroup for testing stop functionality
|
|
wg sync.WaitGroup
|
|
|
|
// protects `queues`, `queuesIndex`, `lastQueueAccessed`
|
|
l sync.RWMutex
|
|
}
|
|
|
|
// NewJobManager creates a job manager, with an optional name
|
|
func NewJobManager(name string, numWorkers int, l log.Logger, metricSink *metricsutil.ClusterMetricSink) *JobManager {
|
|
if l == nil {
|
|
l = logging.NewVaultLoggerWithWriter(ioutil.Discard, log.NoLevel)
|
|
}
|
|
if name == "" {
|
|
guid, err := uuid.GenerateUUID()
|
|
if err != nil {
|
|
l.Warn("uuid generator failed, using 'no-uuid'", "err", err)
|
|
guid = "no-uuid"
|
|
}
|
|
|
|
name = fmt.Sprintf("jobmanager-%s", guid)
|
|
}
|
|
|
|
wp := newDispatcher(fmt.Sprintf("%s-dispatcher", name), numWorkers, l)
|
|
|
|
j := JobManager{
|
|
name: name,
|
|
queues: make(map[string]*list.List),
|
|
queuesIndex: make([]string, 0),
|
|
lastQueueAccessed: -1,
|
|
quit: make(chan struct{}),
|
|
newWork: make(chan struct{}, 1),
|
|
workerPool: wp,
|
|
logger: l,
|
|
metricSink: metricSink,
|
|
}
|
|
|
|
j.logger.Trace("created job manager", "name", name, "pool_size", numWorkers)
|
|
return &j
|
|
}
|
|
|
|
// Start starts the job manager
|
|
// note: a given job manager cannot be restarted after it has been stopped
|
|
func (j *JobManager) Start() {
|
|
j.onceStart.Do(func() {
|
|
j.logger.Trace("starting job manager", "name", j.name)
|
|
j.workerPool.start()
|
|
j.assignWork()
|
|
})
|
|
}
|
|
|
|
// Stop stops the job manager asynchronously
|
|
func (j *JobManager) Stop() {
|
|
j.onceStop.Do(func() {
|
|
j.logger.Trace("terminating job manager...")
|
|
close(j.quit)
|
|
j.workerPool.stop()
|
|
})
|
|
}
|
|
|
|
// AddJob adds a job to the given queue, creating the queue if it doesn't exist
|
|
func (j *JobManager) AddJob(job Job, queueID string) {
|
|
j.l.Lock()
|
|
if len(j.queues) == 0 {
|
|
defer func() {
|
|
// newWork must be buffered to avoid deadlocks if work is added
|
|
// before the job manager is started
|
|
j.newWork <- struct{}{}
|
|
}()
|
|
}
|
|
defer j.l.Unlock()
|
|
|
|
if _, ok := j.queues[queueID]; !ok {
|
|
j.addQueue(queueID)
|
|
}
|
|
|
|
j.queues[queueID].PushBack(job)
|
|
j.totalJobs++
|
|
|
|
if j.metricSink != nil {
|
|
j.metricSink.AddSampleWithLabels([]string{j.name, "job_manager", "queue_length"}, float32(j.queues[queueID].Len()), []metrics.Label{{"queue_id", queueID}})
|
|
j.metricSink.AddSample([]string{j.name, "job_manager", "total_jobs"}, float32(j.totalJobs))
|
|
}
|
|
}
|
|
|
|
// GetCurrentJobCount returns the total number of pending jobs in the job manager
|
|
func (j *JobManager) GetPendingJobCount() int {
|
|
j.l.RLock()
|
|
defer j.l.RUnlock()
|
|
|
|
cnt := 0
|
|
for _, q := range j.queues {
|
|
cnt += q.Len()
|
|
}
|
|
|
|
return cnt
|
|
}
|
|
|
|
// GetWorkerCounts() returns a map of queue ID to number of active workers
|
|
func (j *JobManager) GetWorkerCounts() map[string]int {
|
|
// TODO implement with VLT-145
|
|
return nil
|
|
}
|
|
|
|
// GetWorkQueueLengths() returns a map of queue ID to number of active workers
|
|
func (j *JobManager) GetWorkQueueLengths() map[string]int {
|
|
out := make(map[string]int)
|
|
|
|
j.l.RLock()
|
|
defer j.l.RUnlock()
|
|
|
|
for k, v := range j.queues {
|
|
out[k] = v.Len()
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
// getNextJob grabs the next job to be processed and prunes empty queues
|
|
func (j *JobManager) getNextJob() Job {
|
|
j.l.Lock()
|
|
defer j.l.Unlock()
|
|
|
|
if len(j.queues) == 0 {
|
|
return nil
|
|
}
|
|
|
|
j.lastQueueAccessed = (j.lastQueueAccessed + 1) % len(j.queuesIndex)
|
|
queueID := j.queuesIndex[j.lastQueueAccessed]
|
|
|
|
jobElement := j.queues[queueID].Front()
|
|
out := j.queues[queueID].Remove(jobElement)
|
|
|
|
j.totalJobs--
|
|
|
|
if j.metricSink != nil {
|
|
j.metricSink.AddSampleWithLabels([]string{j.name, "job_manager", "queue_length"}, float32(j.queues[queueID].Len()), []metrics.Label{{"queue_id", queueID}})
|
|
j.metricSink.AddSample([]string{j.name, "job_manager", "total_jobs"}, float32(j.totalJobs))
|
|
}
|
|
|
|
if j.queues[queueID].Len() == 0 {
|
|
j.removeLastQueueAccessed()
|
|
}
|
|
|
|
return out.(Job)
|
|
}
|
|
|
|
// assignWork continually loops checks for new jobs and dispatches them to the
|
|
// worker pool
|
|
func (j *JobManager) assignWork() {
|
|
j.wg.Add(1)
|
|
|
|
go func() {
|
|
for {
|
|
for {
|
|
// assign work while there are jobs to distribute
|
|
select {
|
|
case <-j.quit:
|
|
j.wg.Done()
|
|
return
|
|
case <-j.newWork:
|
|
// keep the channel empty since we're already processing work
|
|
default:
|
|
}
|
|
|
|
job := j.getNextJob()
|
|
if job != nil {
|
|
j.workerPool.dispatch(job)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
// listen for a wake-up when an emtpy job manager has been given
|
|
// new work
|
|
select {
|
|
case <-j.quit:
|
|
j.wg.Done()
|
|
return
|
|
case <-j.newWork:
|
|
break
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// addQueue generates a new queue if a queue for `queueID` doesn't exist
|
|
// note: this must be called with l held for write
|
|
func (j *JobManager) addQueue(queueID string) {
|
|
if _, ok := j.queues[queueID]; !ok {
|
|
j.queues[queueID] = list.New()
|
|
j.queuesIndex = append(j.queuesIndex, queueID)
|
|
}
|
|
}
|
|
|
|
// removeLastQueueAccessed removes the queue and index map for the last queue
|
|
// accessed. It is to be used when the last queue accessed has emptied.
|
|
// note: this must be called with l held for write
|
|
func (j *JobManager) removeLastQueueAccessed() {
|
|
if j.lastQueueAccessed == -1 || j.lastQueueAccessed > len(j.queuesIndex)-1 {
|
|
j.logger.Warn("call to remove queue out of bounds", "idx", j.lastQueueAccessed)
|
|
return
|
|
}
|
|
|
|
queueID := j.queuesIndex[j.lastQueueAccessed]
|
|
|
|
// remove the queue
|
|
delete(j.queues, queueID)
|
|
|
|
// remove the index for the queue
|
|
j.queuesIndex = append(j.queuesIndex[:j.lastQueueAccessed], j.queuesIndex[j.lastQueueAccessed+1:]...)
|
|
|
|
// correct the last queue accessed for round robining
|
|
if j.lastQueueAccessed > 0 {
|
|
j.lastQueueAccessed--
|
|
} else {
|
|
j.lastQueueAccessed = len(j.queuesIndex) - 1
|
|
}
|
|
}
|