VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
package fairshare
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"reflect"
|
|
|
|
"sync"
|
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
func TestFairshare_newDispatcher(t *testing.T) {
|
|
|
|
testCases := []struct {
|
|
|
|
name string
|
|
|
|
numWorkers int
|
|
|
|
expectedNumWorkers int
|
|
|
|
}{
|
|
|
|
{
|
|
|
|
name: "",
|
|
|
|
numWorkers: 0,
|
|
|
|
expectedNumWorkers: 1,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
name: "",
|
|
|
|
numWorkers: 10,
|
|
|
|
expectedNumWorkers: 10,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
name: "test-dispatcher",
|
|
|
|
numWorkers: 10,
|
|
|
|
expectedNumWorkers: 10,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
l := newTestLogger("workerpool-test")
|
|
|
|
for tcNum, tc := range testCases {
|
|
|
|
d := newDispatcher(tc.name, tc.numWorkers, l)
|
|
|
|
|
|
|
|
if tc.name != "" && d.name != tc.name {
|
|
|
|
t.Errorf("tc %d: expected name %s, got %s", tcNum, tc.name, d.name)
|
|
|
|
}
|
|
|
|
if len(d.workers) != tc.expectedNumWorkers {
|
|
|
|
t.Errorf("tc %d: expected %d workers, got %d", tcNum, tc.expectedNumWorkers, len(d.workers))
|
|
|
|
}
|
|
|
|
if d.jobCh == nil {
|
|
|
|
t.Errorf("tc %d: work channel not set up properly", tcNum)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestFairshare_createDispatcher(t *testing.T) {
|
|
|
|
testCases := []struct {
|
|
|
|
name string
|
|
|
|
numWorkers int
|
|
|
|
expectedNumWorkers int
|
|
|
|
}{
|
|
|
|
{
|
|
|
|
name: "",
|
|
|
|
numWorkers: -1,
|
|
|
|
expectedNumWorkers: 1,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
name: "",
|
|
|
|
numWorkers: 0,
|
|
|
|
expectedNumWorkers: 1,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
name: "",
|
|
|
|
numWorkers: 10,
|
|
|
|
expectedNumWorkers: 10,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
name: "",
|
|
|
|
numWorkers: 10,
|
|
|
|
expectedNumWorkers: 10,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
name: "test-dispatcher",
|
|
|
|
numWorkers: 10,
|
|
|
|
expectedNumWorkers: 10,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
l := newTestLogger("workerpool-test")
|
|
|
|
for tcNum, tc := range testCases {
|
|
|
|
d := createDispatcher(tc.name, tc.numWorkers, l)
|
|
|
|
if d == nil {
|
|
|
|
t.Fatalf("tc %d: expected non-nil object", tcNum)
|
|
|
|
}
|
|
|
|
|
|
|
|
if tc.name != "" && d.name != tc.name {
|
|
|
|
t.Errorf("tc %d: expected name %s, got %s", tcNum, tc.name, d.name)
|
|
|
|
}
|
|
|
|
if len(d.name) == 0 {
|
|
|
|
t.Errorf("tc %d: expected name to be set", tcNum)
|
|
|
|
}
|
|
|
|
if d.numWorkers != tc.expectedNumWorkers {
|
|
|
|
t.Errorf("tc %d: expected %d workers, got %d", tcNum, tc.expectedNumWorkers, d.numWorkers)
|
|
|
|
}
|
|
|
|
if d.workers == nil {
|
|
|
|
t.Errorf("tc %d: expected non-nil workers", tcNum)
|
|
|
|
}
|
|
|
|
if d.jobCh == nil {
|
|
|
|
t.Errorf("tc %d: work channel not set up properly", tcNum)
|
|
|
|
}
|
|
|
|
if d.quit == nil {
|
|
|
|
t.Errorf("tc %d: expected non-nil quit channel", tcNum)
|
|
|
|
}
|
|
|
|
if d.logger == nil {
|
|
|
|
t.Errorf("tc %d: expected non-nil logger", tcNum)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestFairshare_initDispatcher(t *testing.T) {
|
|
|
|
testCases := []struct {
|
|
|
|
numWorkers int
|
|
|
|
}{
|
|
|
|
{
|
|
|
|
numWorkers: 1,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
numWorkers: 10,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
numWorkers: 100,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
numWorkers: 1000,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
l := newTestLogger("workerpool-test")
|
|
|
|
for tcNum, tc := range testCases {
|
|
|
|
d := createDispatcher("", tc.numWorkers, l)
|
|
|
|
|
|
|
|
d.init()
|
|
|
|
if len(d.workers) != tc.numWorkers {
|
|
|
|
t.Fatalf("tc %d: expected %d workers, got %d", tcNum, tc.numWorkers, len(d.workers))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestFairshare_initializeWorker(t *testing.T) {
|
|
|
|
numWorkers := 3
|
|
|
|
|
|
|
|
d := createDispatcher("", numWorkers, newTestLogger("workerpool-test"))
|
|
|
|
|
|
|
|
for workerNum := 0; workerNum < numWorkers; workerNum++ {
|
|
|
|
d.initializeWorker()
|
|
|
|
|
|
|
|
w := d.workers[workerNum]
|
|
|
|
expectedName := fmt.Sprint("worker-", workerNum)
|
|
|
|
if w.name != expectedName {
|
|
|
|
t.Errorf("tc %d: expected name %s, got %s", workerNum, expectedName, w.name)
|
|
|
|
}
|
|
|
|
if w.jobCh != d.jobCh {
|
|
|
|
t.Errorf("tc %d: work channel not set up properly", workerNum)
|
|
|
|
}
|
|
|
|
if w.quit == nil || w.quit != d.quit {
|
|
|
|
t.Errorf("tc %d: quit channel not set up properly", workerNum)
|
|
|
|
}
|
|
|
|
if w.logger == nil || w.logger != d.logger {
|
|
|
|
t.Errorf("tc %d: logger not set up properly", workerNum)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestFairshare_startWorker(t *testing.T) {
|
|
|
|
d := newDispatcher("", 1, newTestLogger("workerpool-test"))
|
|
|
|
|
|
|
|
d.workers[0].start()
|
|
|
|
defer d.stop()
|
|
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
ex := func(_ string) error {
|
|
|
|
wg.Done()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
onFail := func(_ error) {}
|
|
|
|
|
|
|
|
job := newTestJob(t, "test job", ex, onFail)
|
|
|
|
|
|
|
|
doneCh := make(chan struct{})
|
|
|
|
timeout := time.After(5 * time.Second)
|
|
|
|
|
|
|
|
wg.Add(1)
|
2021-06-25 20:06:49 +00:00
|
|
|
d.dispatch(&job, nil, nil)
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
go func() {
|
|
|
|
wg.Wait()
|
|
|
|
doneCh <- struct{}{}
|
|
|
|
}()
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-doneCh:
|
|
|
|
break
|
|
|
|
case <-timeout:
|
|
|
|
t.Fatal("timed out")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestFairshare_start(t *testing.T) {
|
|
|
|
numJobs := 10
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
ex := func(_ string) error {
|
|
|
|
wg.Done()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
onFail := func(_ error) {}
|
|
|
|
|
|
|
|
wg.Add(numJobs)
|
|
|
|
d := newDispatcher("", 3, newTestLogger("workerpool-test"))
|
|
|
|
|
|
|
|
d.start()
|
|
|
|
defer d.stop()
|
|
|
|
|
|
|
|
doneCh := make(chan struct{})
|
|
|
|
timeout := time.After(5 * time.Second)
|
|
|
|
go func() {
|
|
|
|
wg.Wait()
|
|
|
|
doneCh <- struct{}{}
|
|
|
|
}()
|
|
|
|
|
|
|
|
for i := 0; i < numJobs; i++ {
|
|
|
|
job := newTestJob(t, fmt.Sprintf("job-%d", i), ex, onFail)
|
2021-06-25 20:06:49 +00:00
|
|
|
d.dispatch(&job, nil, nil)
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-doneCh:
|
|
|
|
break
|
|
|
|
case <-timeout:
|
|
|
|
t.Fatal("timed out")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestFairshare_stop(t *testing.T) {
|
|
|
|
d := newDispatcher("", 5, newTestLogger("workerpool-test"))
|
|
|
|
|
|
|
|
d.start()
|
|
|
|
|
|
|
|
doneCh := make(chan struct{})
|
|
|
|
timeout := time.After(5 * time.Second)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
d.stop()
|
2021-02-25 21:33:02 +00:00
|
|
|
d.wg.Wait()
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
doneCh <- struct{}{}
|
|
|
|
}()
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-doneCh:
|
|
|
|
break
|
|
|
|
case <-timeout:
|
|
|
|
t.Fatal("timed out")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestFairshare_stopMultiple(t *testing.T) {
|
|
|
|
d := newDispatcher("", 5, newTestLogger("workerpool-test"))
|
|
|
|
|
|
|
|
d.start()
|
|
|
|
|
|
|
|
doneCh := make(chan struct{})
|
|
|
|
timeout := time.After(5 * time.Second)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
d.stop()
|
2021-02-25 21:33:02 +00:00
|
|
|
d.wg.Wait()
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
doneCh <- struct{}{}
|
|
|
|
}()
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-doneCh:
|
|
|
|
break
|
|
|
|
case <-timeout:
|
|
|
|
t.Fatal("timed out")
|
|
|
|
}
|
|
|
|
|
|
|
|
// essentially, we don't want to panic here
|
|
|
|
var r interface{}
|
|
|
|
go func() {
|
|
|
|
t.Helper()
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
r = recover()
|
|
|
|
doneCh <- struct{}{}
|
|
|
|
}()
|
|
|
|
|
|
|
|
d.stop()
|
2021-02-25 21:33:02 +00:00
|
|
|
d.wg.Wait()
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
}()
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-doneCh:
|
|
|
|
break
|
|
|
|
case <-timeout:
|
|
|
|
t.Fatal("timed out")
|
|
|
|
}
|
|
|
|
|
|
|
|
if r != nil {
|
|
|
|
t.Fatalf("panic during second stop: %v", r)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestFairshare_dispatch(t *testing.T) {
|
|
|
|
d := newDispatcher("", 1, newTestLogger("workerpool-test"))
|
|
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
accumulatedIDs := make([]string, 0)
|
|
|
|
ex := func(id string) error {
|
|
|
|
accumulatedIDs = append(accumulatedIDs, id)
|
|
|
|
wg.Done()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
onFail := func(_ error) {}
|
|
|
|
|
|
|
|
expectedIDs := []string{"job-1", "job-2", "job-3", "job-4"}
|
|
|
|
go func() {
|
|
|
|
for _, id := range expectedIDs {
|
|
|
|
job := newTestJob(t, id, ex, onFail)
|
2021-06-25 20:06:49 +00:00
|
|
|
d.dispatch(&job, nil, nil)
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
wg.Add(len(expectedIDs))
|
|
|
|
d.start()
|
|
|
|
defer d.stop()
|
|
|
|
|
|
|
|
doneCh := make(chan struct{})
|
|
|
|
go func() {
|
|
|
|
wg.Wait()
|
|
|
|
doneCh <- struct{}{}
|
|
|
|
}()
|
|
|
|
|
|
|
|
timeout := time.After(5 * time.Second)
|
|
|
|
select {
|
|
|
|
case <-doneCh:
|
|
|
|
break
|
|
|
|
case <-timeout:
|
|
|
|
t.Fatal("timed out")
|
|
|
|
}
|
|
|
|
|
|
|
|
if !reflect.DeepEqual(accumulatedIDs, expectedIDs) {
|
|
|
|
t.Fatalf("bad job ids. expected %v, got %v", expectedIDs, accumulatedIDs)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestFairshare_jobFailure(t *testing.T) {
|
|
|
|
numJobs := 10
|
|
|
|
testErr := fmt.Errorf("test error")
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
|
|
|
ex := func(_ string) error {
|
|
|
|
return testErr
|
|
|
|
}
|
|
|
|
onFail := func(err error) {
|
|
|
|
if err != testErr {
|
|
|
|
t.Errorf("got unexpected error. expected %v, got %v", testErr, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
wg.Done()
|
|
|
|
}
|
|
|
|
|
|
|
|
wg.Add(numJobs)
|
|
|
|
d := newDispatcher("", 3, newTestLogger("workerpool-test"))
|
|
|
|
|
|
|
|
d.start()
|
|
|
|
defer d.stop()
|
|
|
|
|
|
|
|
doneCh := make(chan struct{})
|
|
|
|
timeout := time.After(5 * time.Second)
|
|
|
|
go func() {
|
|
|
|
wg.Wait()
|
|
|
|
doneCh <- struct{}{}
|
|
|
|
}()
|
|
|
|
|
|
|
|
for i := 0; i < numJobs; i++ {
|
|
|
|
job := newTestJob(t, fmt.Sprintf("job-%d", i), ex, onFail)
|
2021-06-25 20:06:49 +00:00
|
|
|
d.dispatch(&job, nil, nil)
|
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing
* refactor a bit for testing
* workFunc, start/stop safety, testing
* cleanup function for worker quit, more tests
* redo public/private members
* improve tests, export types, switch uuid package
* fix loop capture bug, cleanup
* cleanup tests
* update worker pool file name, other improvements
* add job manager prototype
* remove remnants
* add functions to wait for job manager and worker pool to stop, other fixes
* test job manager functionality, fix bugs
* encapsulate how jobs are distributed to workers
* make worker job channel read only
* add job interface, more testing, fixes
* set name for dispatcher
* fix test races
* dispatcher and job manager constructors don't return errors
* logger now dependency injected
* make some members private, test fcn to get worker pool size
* make GetNumWorkers public
* Update helper/fairshare/jobmanager_test.go
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
* make workerpool private
* remove custom worker names
* concurrency improvements
* remove worker pool cleanup function
* remove cleanup func from job manager, remove non blocking stop from fairshare
* stop fairshare when started in tests
* stop leaking job manager goroutine
* prototype channel for waking up to assign work
* fix typo/bug and add tests
* improve job manager wake up, fix test typo
* put channel drain back
* better start/pause test for job manager
* go mod vendor
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 21:51:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-doneCh:
|
|
|
|
break
|
|
|
|
case <-timeout:
|
|
|
|
t.Fatal("timed out")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestFairshare_nilLoggerDispatcher(t *testing.T) {
|
|
|
|
d := newDispatcher("test-job-mgr", 1, nil)
|
|
|
|
if d.logger == nil {
|
|
|
|
t.Error("logger not set up properly")
|
|
|
|
}
|
|
|
|
}
|