VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
package fairshare
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"testing"
|
|
|
|
|
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
|
|
uuid "github.com/hashicorp/go-uuid"
|
|
|
|
)
|
|
|
|
|
|
|
|
type testJob struct {
|
|
|
|
id string
|
|
|
|
ex func(id string) error
|
|
|
|
onFail func(error)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (t *testJob) Execute() error {
|
2021-02-22 22:00:24 +00:00
|
|
|
return t.ex(t.id)
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (t *testJob) OnFailure(err error) {
|
|
|
|
t.onFail(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
func newTestJob(t *testing.T, id string, ex func(string) error, onFail func(error)) testJob {
|
|
|
|
t.Helper()
|
|
|
|
if ex == nil {
|
|
|
|
t.Errorf("ex cannot be nil")
|
|
|
|
}
|
|
|
|
if onFail == nil {
|
|
|
|
t.Errorf("onFail cannot be nil")
|
|
|
|
}
|
|
|
|
|
|
|
|
return testJob{
|
|
|
|
id: id,
|
|
|
|
ex: ex,
|
|
|
|
onFail: onFail,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func newDefaultTestJob(t *testing.T, id string) testJob {
|
|
|
|
ex := func(_ string) error { return nil }
|
|
|
|
onFail := func(_ error) {}
|
|
|
|
return newTestJob(t, id, ex, onFail)
|
|
|
|
}
|
|
|
|
|
|
|
|
func newTestLogger(name string) log.Logger {
|
|
|
|
guid, err := uuid.GenerateUUID()
|
|
|
|
if err != nil {
|
|
|
|
guid = "no-guid"
|
|
|
|
}
|
|
|
|
return log.New(&log.LoggerOptions{
|
|
|
|
Name: fmt.Sprintf("%s-%s", name, guid),
|
|
|
|
Level: log.LevelFromString("TRACE"),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func GetNumWorkers(j *JobManager) int {
|
|
|
|
return j.workerPool.numWorkers
|
|
|
|
}
|