27bb03bbc0
* adding copyright header * fix fmt and a test
359 lines
9.6 KiB
Go
359 lines
9.6 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package fairshare
|
|
|
|
import (
|
|
"container/list"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"math"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
log "github.com/hashicorp/go-hclog"
|
|
uuid "github.com/hashicorp/go-uuid"
|
|
"github.com/hashicorp/vault/helper/metricsutil"
|
|
"github.com/hashicorp/vault/sdk/helper/logging"
|
|
)
|
|
|
|
type JobManager struct {
|
|
name string
|
|
queues map[string]*list.List
|
|
|
|
quit chan struct{}
|
|
newWork chan struct{} // must be buffered
|
|
|
|
workerPool *dispatcher
|
|
workerCount map[string]int
|
|
|
|
onceStart sync.Once
|
|
onceStop sync.Once
|
|
|
|
logger log.Logger
|
|
|
|
totalJobs int
|
|
metricSink *metricsutil.ClusterMetricSink
|
|
|
|
// waitgroup for testing stop functionality
|
|
wg sync.WaitGroup
|
|
|
|
// protects `queues`, `workerCount`, `queuesIndex`, `lastQueueAccessed`
|
|
l sync.RWMutex
|
|
|
|
// track queues by index for round robin worker assignment
|
|
queuesIndex []string
|
|
lastQueueAccessed int
|
|
}
|
|
|
|
// NewJobManager creates a job manager, with an optional name
|
|
func NewJobManager(name string, numWorkers int, l log.Logger, metricSink *metricsutil.ClusterMetricSink) *JobManager {
|
|
if l == nil {
|
|
l = logging.NewVaultLoggerWithWriter(ioutil.Discard, log.NoLevel)
|
|
}
|
|
if name == "" {
|
|
guid, err := uuid.GenerateUUID()
|
|
if err != nil {
|
|
l.Warn("uuid generator failed, using 'no-uuid'", "err", err)
|
|
guid = "no-uuid"
|
|
}
|
|
|
|
name = fmt.Sprintf("jobmanager-%s", guid)
|
|
}
|
|
|
|
wp := newDispatcher(fmt.Sprintf("%s-dispatcher", name), numWorkers, l)
|
|
|
|
j := JobManager{
|
|
name: name,
|
|
queues: make(map[string]*list.List),
|
|
quit: make(chan struct{}),
|
|
newWork: make(chan struct{}, 1),
|
|
workerPool: wp,
|
|
workerCount: make(map[string]int),
|
|
logger: l,
|
|
metricSink: metricSink,
|
|
queuesIndex: make([]string, 0),
|
|
lastQueueAccessed: -1,
|
|
}
|
|
|
|
j.logger.Trace("created job manager", "name", name, "pool_size", numWorkers)
|
|
return &j
|
|
}
|
|
|
|
// Start starts the job manager
|
|
// note: a given job manager cannot be restarted after it has been stopped
|
|
func (j *JobManager) Start() {
|
|
j.onceStart.Do(func() {
|
|
j.logger.Trace("starting job manager", "name", j.name)
|
|
j.workerPool.start()
|
|
j.assignWork()
|
|
})
|
|
}
|
|
|
|
// Stop stops the job manager asynchronously
|
|
func (j *JobManager) Stop() {
|
|
j.onceStop.Do(func() {
|
|
j.logger.Trace("terminating job manager...")
|
|
close(j.quit)
|
|
j.workerPool.stop()
|
|
})
|
|
}
|
|
|
|
// AddJob adds a job to the given queue, creating the queue if it doesn't exist
|
|
func (j *JobManager) AddJob(job Job, queueID string) {
|
|
j.l.Lock()
|
|
if len(j.queues) == 0 {
|
|
defer func() {
|
|
// newWork must be buffered to avoid deadlocks if work is added
|
|
// before the job manager is started
|
|
j.newWork <- struct{}{}
|
|
}()
|
|
}
|
|
defer j.l.Unlock()
|
|
|
|
if _, ok := j.queues[queueID]; !ok {
|
|
j.addQueue(queueID)
|
|
}
|
|
|
|
j.queues[queueID].PushBack(job)
|
|
j.totalJobs++
|
|
|
|
if j.metricSink != nil {
|
|
j.metricSink.AddSampleWithLabels([]string{j.name, "job_manager", "queue_length"}, float32(j.queues[queueID].Len()), []metrics.Label{{"queue_id", queueID}})
|
|
j.metricSink.AddSample([]string{j.name, "job_manager", "total_jobs"}, float32(j.totalJobs))
|
|
}
|
|
}
|
|
|
|
// GetCurrentJobCount returns the total number of pending jobs in the job manager
|
|
func (j *JobManager) GetPendingJobCount() int {
|
|
j.l.RLock()
|
|
defer j.l.RUnlock()
|
|
|
|
cnt := 0
|
|
for _, q := range j.queues {
|
|
cnt += q.Len()
|
|
}
|
|
|
|
return cnt
|
|
}
|
|
|
|
// GetWorkerCounts() returns a map of queue ID to number of active workers
|
|
func (j *JobManager) GetWorkerCounts() map[string]int {
|
|
j.l.RLock()
|
|
defer j.l.RUnlock()
|
|
return j.workerCount
|
|
}
|
|
|
|
// GetWorkQueueLengths() returns a map of queue ID to number of jobs in the queue
|
|
func (j *JobManager) GetWorkQueueLengths() map[string]int {
|
|
out := make(map[string]int)
|
|
|
|
j.l.RLock()
|
|
defer j.l.RUnlock()
|
|
|
|
for k, v := range j.queues {
|
|
out[k] = v.Len()
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
// getNextJob pops the next job to be processed and prunes empty queues
|
|
// it also returns the ID of the queue the job is associated with
|
|
func (j *JobManager) getNextJob() (Job, string) {
|
|
j.l.Lock()
|
|
defer j.l.Unlock()
|
|
|
|
if len(j.queues) == 0 {
|
|
return nil, ""
|
|
}
|
|
|
|
queueID, canAssignWorker := j.getNextQueue()
|
|
if !canAssignWorker {
|
|
return nil, ""
|
|
}
|
|
|
|
jobElement := j.queues[queueID].Front()
|
|
jobRaw := j.queues[queueID].Remove(jobElement)
|
|
|
|
j.totalJobs--
|
|
|
|
if j.metricSink != nil {
|
|
j.metricSink.AddSampleWithLabels([]string{j.name, "job_manager", "queue_length"}, float32(j.queues[queueID].Len()), []metrics.Label{{"queue_id", queueID}})
|
|
j.metricSink.AddSample([]string{j.name, "job_manager", "total_jobs"}, float32(j.totalJobs))
|
|
}
|
|
|
|
if j.queues[queueID].Len() == 0 {
|
|
// we remove the empty queue, but we don't remove the worker count
|
|
// in case we are still working on previous jobs from this queue.
|
|
// worker count cleanup is handled in j.decrementWorkerCount
|
|
j.removeLastQueueAccessed()
|
|
}
|
|
|
|
return jobRaw.(Job), queueID
|
|
}
|
|
|
|
// returns the next queue to assign work from, and a bool if there is a queue
|
|
// that can have a worker assigned. if there is work to be assigned,
|
|
// j.lastQueueAccessed will be updated to that queue.
|
|
// note: this must be called with j.l held
|
|
func (j *JobManager) getNextQueue() (string, bool) {
|
|
var nextQueue string
|
|
var canAssignWorker bool
|
|
|
|
// ensure we loop through all existing queues until we find an eligible
|
|
// queue, if one exists.
|
|
queueIdx := j.nextQueueIndex(j.lastQueueAccessed)
|
|
for i := 0; i < len(j.queuesIndex); i++ {
|
|
potentialQueueID := j.queuesIndex[queueIdx]
|
|
|
|
if !j.queueWorkersSaturated(potentialQueueID) {
|
|
nextQueue = potentialQueueID
|
|
canAssignWorker = true
|
|
j.lastQueueAccessed = queueIdx
|
|
break
|
|
}
|
|
|
|
queueIdx = j.nextQueueIndex(queueIdx)
|
|
}
|
|
|
|
return nextQueue, canAssignWorker
|
|
}
|
|
|
|
// get the index of the next queue in round-robin order
|
|
// note: this must be called with j.l held
|
|
func (j *JobManager) nextQueueIndex(currentIdx int) int {
|
|
return (currentIdx + 1) % len(j.queuesIndex)
|
|
}
|
|
|
|
// returns true if there are already too many workers on this queue
|
|
// note: this must be called with j.l held (at least for read).
|
|
// note: we may want to eventually factor in queue length relative to num queues
|
|
func (j *JobManager) queueWorkersSaturated(queueID string) bool {
|
|
numActiveQueues := float64(len(j.queues))
|
|
numTotalWorkers := float64(j.workerPool.numWorkers)
|
|
maxWorkersPerQueue := math.Ceil(0.9 * numTotalWorkers / numActiveQueues)
|
|
|
|
numWorkersPerQueue := j.workerCount
|
|
|
|
return numWorkersPerQueue[queueID] >= int(maxWorkersPerQueue)
|
|
}
|
|
|
|
// increment the worker count for this queue
|
|
func (j *JobManager) incrementWorkerCount(queueID string) {
|
|
j.l.Lock()
|
|
defer j.l.Unlock()
|
|
|
|
j.workerCount[queueID]++
|
|
}
|
|
|
|
// decrement the worker count for this queue
|
|
// this also removes worker tracking for this queue if needed
|
|
func (j *JobManager) decrementWorkerCount(queueID string) {
|
|
j.l.Lock()
|
|
defer j.l.Unlock()
|
|
|
|
j.workerCount[queueID]--
|
|
|
|
_, queueExists := j.queues[queueID]
|
|
if !queueExists && j.workerCount[queueID] < 1 {
|
|
delete(j.workerCount, queueID)
|
|
}
|
|
}
|
|
|
|
// assignWork continually loops checks for new jobs and dispatches them to the
|
|
// worker pool
|
|
func (j *JobManager) assignWork() {
|
|
j.wg.Add(1)
|
|
|
|
go func() {
|
|
// ticker is used to prevent memory leak of using time.After in
|
|
// for - select pattern.
|
|
ticker := time.NewTicker(50 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
for {
|
|
for {
|
|
// assign work while there are jobs to distribute
|
|
select {
|
|
case <-j.quit:
|
|
j.wg.Done()
|
|
return
|
|
case <-j.newWork:
|
|
// keep the channel empty since we're already processing work
|
|
default:
|
|
}
|
|
|
|
job, queueID := j.getNextJob()
|
|
if job != nil {
|
|
j.workerPool.dispatch(job,
|
|
func() {
|
|
j.incrementWorkerCount(queueID)
|
|
},
|
|
func() {
|
|
j.decrementWorkerCount(queueID)
|
|
})
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
ticker.Reset(50 * time.Millisecond)
|
|
select {
|
|
case <-j.quit:
|
|
j.wg.Done()
|
|
return
|
|
case <-j.newWork:
|
|
// listen for wake-up when an empty job manager has been given work
|
|
case <-ticker.C:
|
|
// periodically check if new workers can be assigned. with the
|
|
// fairsharing worker distribution it can be the case that there
|
|
// is work waiting, but no queues are eligible for another worker
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// addQueue generates a new queue if a queue for `queueID` doesn't exist
|
|
// it also starts tracking workers on that queue, if not already tracked
|
|
// note: this must be called with j.l held for write
|
|
func (j *JobManager) addQueue(queueID string) {
|
|
if _, ok := j.queues[queueID]; !ok {
|
|
j.queues[queueID] = list.New()
|
|
j.queuesIndex = append(j.queuesIndex, queueID)
|
|
}
|
|
|
|
// it's possible the queue ran out of work and was pruned, but there were
|
|
// still workers operating on data formerly in that queue, which were still
|
|
// being tracked. if that is the case, we don't want to wipe out that worker
|
|
// count when the queue is re-initialized.
|
|
if _, ok := j.workerCount[queueID]; !ok {
|
|
j.workerCount[queueID] = 0
|
|
}
|
|
}
|
|
|
|
// removes the queue and index tracker for the last queue accessed.
|
|
// it is to be used when the last queue accessed has emptied.
|
|
// note: this must be called with j.l held.
|
|
func (j *JobManager) removeLastQueueAccessed() {
|
|
if j.lastQueueAccessed == -1 || j.lastQueueAccessed > len(j.queuesIndex)-1 {
|
|
j.logger.Warn("call to remove queue out of bounds", "idx", j.lastQueueAccessed)
|
|
return
|
|
}
|
|
|
|
queueID := j.queuesIndex[j.lastQueueAccessed]
|
|
|
|
// remove the queue
|
|
delete(j.queues, queueID)
|
|
|
|
// remove the index for the queue
|
|
j.queuesIndex = append(j.queuesIndex[:j.lastQueueAccessed], j.queuesIndex[j.lastQueueAccessed+1:]...)
|
|
|
|
// correct the last queue accessed for round robining
|
|
if j.lastQueueAccessed > 0 {
|
|
j.lastQueueAccessed--
|
|
} else {
|
|
j.lastQueueAccessed = len(j.queuesIndex) - 1
|
|
}
|
|
}
|