VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
package fairshare
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"io/ioutil"
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
|
|
uuid "github.com/hashicorp/go-uuid"
|
|
|
|
"github.com/hashicorp/vault/sdk/helper/logging"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Job is an interface for jobs used with this job manager
|
|
|
|
type Job interface {
|
2021-06-25 20:06:49 +00:00
|
|
|
// Execute performs the work.
|
|
|
|
// It should be synchronous if a cleanupFn is provided.
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
Execute() error
|
2021-06-25 20:06:49 +00:00
|
|
|
|
|
|
|
// OnFailure handles the error resulting from a failed Execute().
|
|
|
|
// It should be synchronous if a cleanupFn is provided.
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
OnFailure(err error)
|
|
|
|
}
|
|
|
|
|
2022-01-27 18:06:34 +00:00
|
|
|
type (
|
|
|
|
initFn func()
|
|
|
|
cleanupFn func()
|
|
|
|
)
|
2021-06-25 20:06:49 +00:00
|
|
|
|
|
|
|
type wrappedJob struct {
|
|
|
|
job Job
|
|
|
|
init initFn
|
|
|
|
cleanup cleanupFn
|
|
|
|
}
|
|
|
|
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
// worker represents a single worker in a pool
|
|
|
|
type worker struct {
|
|
|
|
name string
|
2021-06-25 20:06:49 +00:00
|
|
|
jobCh <-chan wrappedJob
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
quit chan struct{}
|
|
|
|
logger log.Logger
|
2021-02-25 21:33:02 +00:00
|
|
|
|
|
|
|
// waitgroup for testing stop functionality
|
|
|
|
wg *sync.WaitGroup
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// start starts the worker listening and working until the quit channel is closed
|
|
|
|
func (w *worker) start() {
|
|
|
|
w.wg.Add(1)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-w.quit:
|
|
|
|
w.wg.Done()
|
|
|
|
return
|
2021-06-25 20:06:49 +00:00
|
|
|
case wJob := <-w.jobCh:
|
|
|
|
if wJob.init != nil {
|
|
|
|
wJob.init()
|
|
|
|
}
|
|
|
|
|
|
|
|
err := wJob.job.Execute()
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
if err != nil {
|
2021-06-25 20:06:49 +00:00
|
|
|
wJob.job.OnFailure(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if wJob.cleanup != nil {
|
|
|
|
wJob.cleanup()
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
// dispatcher represents a worker pool
|
|
|
|
type dispatcher struct {
|
|
|
|
name string
|
|
|
|
numWorkers int
|
|
|
|
workers []worker
|
2021-06-25 20:06:49 +00:00
|
|
|
jobCh chan wrappedJob
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
onceStart sync.Once
|
|
|
|
onceStop sync.Once
|
|
|
|
quit chan struct{}
|
|
|
|
logger log.Logger
|
|
|
|
wg *sync.WaitGroup
|
|
|
|
}
|
|
|
|
|
|
|
|
// newDispatcher generates a new worker dispatcher and populates it with workers
|
|
|
|
func newDispatcher(name string, numWorkers int, l log.Logger) *dispatcher {
|
|
|
|
d := createDispatcher(name, numWorkers, l)
|
|
|
|
|
|
|
|
d.init()
|
|
|
|
return d
|
|
|
|
}
|
|
|
|
|
2021-06-25 20:06:49 +00:00
|
|
|
// dispatch dispatches a job to the worker pool, with optional initialization
|
|
|
|
// and cleanup functions (useful for tracking job progress)
|
|
|
|
func (d *dispatcher) dispatch(job Job, init initFn, cleanup cleanupFn) {
|
|
|
|
wJob := wrappedJob{
|
|
|
|
init: init,
|
|
|
|
job: job,
|
|
|
|
cleanup: cleanup,
|
|
|
|
}
|
|
|
|
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
select {
|
2021-06-25 20:06:49 +00:00
|
|
|
case d.jobCh <- wJob:
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
case <-d.quit:
|
|
|
|
d.logger.Info("shutting down during dispatch")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// start starts all the workers listening on the job channel
|
|
|
|
// this will only start the workers for this dispatch once
|
|
|
|
func (d *dispatcher) start() {
|
|
|
|
d.onceStart.Do(func() {
|
|
|
|
d.logger.Trace("starting dispatcher")
|
|
|
|
for _, w := range d.workers {
|
|
|
|
worker := w
|
|
|
|
worker.start()
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2021-02-25 21:33:02 +00:00
|
|
|
// stop stops the worker pool asynchronously
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
func (d *dispatcher) stop() {
|
|
|
|
d.onceStop.Do(func() {
|
|
|
|
d.logger.Trace("terminating dispatcher")
|
|
|
|
close(d.quit)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// createDispatcher generates a new Dispatcher object, but does not initialize the
|
|
|
|
// worker pool
|
|
|
|
func createDispatcher(name string, numWorkers int, l log.Logger) *dispatcher {
|
|
|
|
if l == nil {
|
|
|
|
l = logging.NewVaultLoggerWithWriter(ioutil.Discard, log.NoLevel)
|
|
|
|
}
|
|
|
|
if numWorkers <= 0 {
|
|
|
|
numWorkers = 1
|
|
|
|
l.Warn("must have 1 or more workers. setting number of workers to 1")
|
|
|
|
}
|
|
|
|
|
|
|
|
if name == "" {
|
|
|
|
guid, err := uuid.GenerateUUID()
|
|
|
|
if err != nil {
|
|
|
|
l.Warn("uuid generator failed, using 'no-uuid'", "err", err)
|
|
|
|
guid = "no-uuid"
|
|
|
|
}
|
|
|
|
|
|
|
|
name = fmt.Sprintf("dispatcher-%s", guid)
|
|
|
|
}
|
|
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
d := dispatcher{
|
|
|
|
name: name,
|
|
|
|
numWorkers: numWorkers,
|
|
|
|
workers: make([]worker, 0),
|
2021-06-25 20:06:49 +00:00
|
|
|
jobCh: make(chan wrappedJob),
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
quit: make(chan struct{}),
|
|
|
|
logger: l,
|
|
|
|
wg: &wg,
|
|
|
|
}
|
|
|
|
|
|
|
|
d.logger.Trace("created dispatcher", "name", d.name, "num_workers", d.numWorkers)
|
|
|
|
return &d
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *dispatcher) init() {
|
|
|
|
for len(d.workers) < d.numWorkers {
|
|
|
|
d.initializeWorker()
|
|
|
|
}
|
|
|
|
|
|
|
|
d.logger.Trace("initialized dispatcher", "num_workers", d.numWorkers)
|
|
|
|
}
|
|
|
|
|
|
|
|
// initializeWorker initializes and adds a new worker, with an optional name
|
|
|
|
func (d *dispatcher) initializeWorker() {
|
|
|
|
w := worker{
|
|
|
|
name: fmt.Sprint("worker-", len(d.workers)),
|
|
|
|
jobCh: d.jobCh,
|
|
|
|
quit: d.quit,
|
|
|
|
logger: d.logger,
|
|
|
|
wg: d.wg,
|
|
|
|
}
|
|
|
|
|
|
|
|
d.workers = append(d.workers, w)
|
|
|
|
}
|