open-vault/helper/fairshare/workerpool_test.go
swayne275 3994579603
VAULT-1401 and 1402 - preliminary fair sharing (#1701) (#10917)
* basic pool and start testing

* refactor a bit for testing

* workFunc, start/stop safety, testing

* cleanup function for worker quit, more tests

* redo public/private members

* improve tests, export types, switch uuid package

* fix loop capture bug, cleanup

* cleanup tests

* update worker pool file name, other improvements

* add job manager prototype

* remove remnants

* add functions to wait for job manager and worker pool to stop, other fixes

* test job manager functionality, fix bugs

* encapsulate how jobs are distributed to workers

* make worker job channel read only

* add job interface, more testing, fixes

* set name for dispatcher

* fix test races

* dispatcher and job manager constructors don't return errors

* logger now dependency injected

* make some members private, test fcn to get worker pool size

* make GetNumWorkers public

* Update helper/fairshare/jobmanager_test.go

Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>

* make workerpool private

* remove custom worker names

* concurrency improvements

* remove worker pool cleanup function

* remove cleanup func from job manager, remove non blocking stop from fairshare

* stop fairshare when started in tests

* stop leaking job manager goroutine

* prototype channel for waking up to assign work

* fix typo/bug and add tests

* improve job manager wake up, fix test typo

* put channel drain back

* better start/pause test for job manager

* go mod vendor

Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>

Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
2021-02-12 14:51:52 -07:00

393 lines
7.4 KiB
Go

package fairshare
import (
"fmt"
"reflect"
"sync"
"testing"
"time"
)
func TestFairshare_newDispatcher(t *testing.T) {
testCases := []struct {
name string
numWorkers int
expectedNumWorkers int
}{
{
name: "",
numWorkers: 0,
expectedNumWorkers: 1,
},
{
name: "",
numWorkers: 10,
expectedNumWorkers: 10,
},
{
name: "test-dispatcher",
numWorkers: 10,
expectedNumWorkers: 10,
},
}
l := newTestLogger("workerpool-test")
for tcNum, tc := range testCases {
d := newDispatcher(tc.name, tc.numWorkers, l)
if tc.name != "" && d.name != tc.name {
t.Errorf("tc %d: expected name %s, got %s", tcNum, tc.name, d.name)
}
if len(d.workers) != tc.expectedNumWorkers {
t.Errorf("tc %d: expected %d workers, got %d", tcNum, tc.expectedNumWorkers, len(d.workers))
}
if d.jobCh == nil {
t.Errorf("tc %d: work channel not set up properly", tcNum)
}
}
}
func TestFairshare_createDispatcher(t *testing.T) {
testCases := []struct {
name string
numWorkers int
expectedNumWorkers int
}{
{
name: "",
numWorkers: -1,
expectedNumWorkers: 1,
},
{
name: "",
numWorkers: 0,
expectedNumWorkers: 1,
},
{
name: "",
numWorkers: 10,
expectedNumWorkers: 10,
},
{
name: "",
numWorkers: 10,
expectedNumWorkers: 10,
},
{
name: "test-dispatcher",
numWorkers: 10,
expectedNumWorkers: 10,
},
}
l := newTestLogger("workerpool-test")
for tcNum, tc := range testCases {
d := createDispatcher(tc.name, tc.numWorkers, l)
if d == nil {
t.Fatalf("tc %d: expected non-nil object", tcNum)
}
if tc.name != "" && d.name != tc.name {
t.Errorf("tc %d: expected name %s, got %s", tcNum, tc.name, d.name)
}
if len(d.name) == 0 {
t.Errorf("tc %d: expected name to be set", tcNum)
}
if d.numWorkers != tc.expectedNumWorkers {
t.Errorf("tc %d: expected %d workers, got %d", tcNum, tc.expectedNumWorkers, d.numWorkers)
}
if d.workers == nil {
t.Errorf("tc %d: expected non-nil workers", tcNum)
}
if d.jobCh == nil {
t.Errorf("tc %d: work channel not set up properly", tcNum)
}
if d.quit == nil {
t.Errorf("tc %d: expected non-nil quit channel", tcNum)
}
if d.logger == nil {
t.Errorf("tc %d: expected non-nil logger", tcNum)
}
}
}
func TestFairshare_initDispatcher(t *testing.T) {
testCases := []struct {
numWorkers int
}{
{
numWorkers: 1,
},
{
numWorkers: 10,
},
{
numWorkers: 100,
},
{
numWorkers: 1000,
},
}
l := newTestLogger("workerpool-test")
for tcNum, tc := range testCases {
d := createDispatcher("", tc.numWorkers, l)
d.init()
if len(d.workers) != tc.numWorkers {
t.Fatalf("tc %d: expected %d workers, got %d", tcNum, tc.numWorkers, len(d.workers))
}
}
}
func TestFairshare_initializeWorker(t *testing.T) {
numWorkers := 3
d := createDispatcher("", numWorkers, newTestLogger("workerpool-test"))
for workerNum := 0; workerNum < numWorkers; workerNum++ {
d.initializeWorker()
w := d.workers[workerNum]
expectedName := fmt.Sprint("worker-", workerNum)
if w.name != expectedName {
t.Errorf("tc %d: expected name %s, got %s", workerNum, expectedName, w.name)
}
if w.jobCh != d.jobCh {
t.Errorf("tc %d: work channel not set up properly", workerNum)
}
if w.quit == nil || w.quit != d.quit {
t.Errorf("tc %d: quit channel not set up properly", workerNum)
}
if w.logger == nil || w.logger != d.logger {
t.Errorf("tc %d: logger not set up properly", workerNum)
}
}
}
func TestFairshare_startWorker(t *testing.T) {
d := newDispatcher("", 1, newTestLogger("workerpool-test"))
d.workers[0].start()
defer d.stop()
var wg sync.WaitGroup
ex := func(_ string) error {
wg.Done()
return nil
}
onFail := func(_ error) {}
job := newTestJob(t, "test job", ex, onFail)
doneCh := make(chan struct{})
timeout := time.After(5 * time.Second)
wg.Add(1)
d.dispatch(&job)
go func() {
wg.Wait()
doneCh <- struct{}{}
}()
select {
case <-doneCh:
break
case <-timeout:
t.Fatal("timed out")
}
}
func TestFairshare_start(t *testing.T) {
numJobs := 10
var wg sync.WaitGroup
ex := func(_ string) error {
wg.Done()
return nil
}
onFail := func(_ error) {}
wg.Add(numJobs)
d := newDispatcher("", 3, newTestLogger("workerpool-test"))
d.start()
defer d.stop()
doneCh := make(chan struct{})
timeout := time.After(5 * time.Second)
go func() {
wg.Wait()
doneCh <- struct{}{}
}()
for i := 0; i < numJobs; i++ {
job := newTestJob(t, fmt.Sprintf("job-%d", i), ex, onFail)
d.dispatch(&job)
}
select {
case <-doneCh:
break
case <-timeout:
t.Fatal("timed out")
}
}
func TestFairshare_stop(t *testing.T) {
d := newDispatcher("", 5, newTestLogger("workerpool-test"))
d.start()
doneCh := make(chan struct{})
timeout := time.After(5 * time.Second)
go func() {
d.stop()
doneCh <- struct{}{}
}()
select {
case <-doneCh:
break
case <-timeout:
t.Fatal("timed out")
}
}
func TestFairshare_stopMultiple(t *testing.T) {
d := newDispatcher("", 5, newTestLogger("workerpool-test"))
d.start()
doneCh := make(chan struct{})
timeout := time.After(5 * time.Second)
go func() {
d.stop()
doneCh <- struct{}{}
}()
select {
case <-doneCh:
break
case <-timeout:
t.Fatal("timed out")
}
// essentially, we don't want to panic here
var r interface{}
go func() {
t.Helper()
defer func() {
r = recover()
doneCh <- struct{}{}
}()
d.stop()
}()
select {
case <-doneCh:
break
case <-timeout:
t.Fatal("timed out")
}
if r != nil {
t.Fatalf("panic during second stop: %v", r)
}
}
func TestFairshare_dispatch(t *testing.T) {
d := newDispatcher("", 1, newTestLogger("workerpool-test"))
var wg sync.WaitGroup
accumulatedIDs := make([]string, 0)
ex := func(id string) error {
accumulatedIDs = append(accumulatedIDs, id)
wg.Done()
return nil
}
onFail := func(_ error) {}
expectedIDs := []string{"job-1", "job-2", "job-3", "job-4"}
go func() {
for _, id := range expectedIDs {
job := newTestJob(t, id, ex, onFail)
d.dispatch(&job)
}
}()
wg.Add(len(expectedIDs))
d.start()
defer d.stop()
doneCh := make(chan struct{})
go func() {
wg.Wait()
doneCh <- struct{}{}
}()
timeout := time.After(5 * time.Second)
select {
case <-doneCh:
break
case <-timeout:
t.Fatal("timed out")
}
if !reflect.DeepEqual(accumulatedIDs, expectedIDs) {
t.Fatalf("bad job ids. expected %v, got %v", expectedIDs, accumulatedIDs)
}
}
func TestFairshare_jobFailure(t *testing.T) {
numJobs := 10
testErr := fmt.Errorf("test error")
var wg sync.WaitGroup
ex := func(_ string) error {
return testErr
}
onFail := func(err error) {
if err != testErr {
t.Errorf("got unexpected error. expected %v, got %v", testErr, err)
}
wg.Done()
}
wg.Add(numJobs)
d := newDispatcher("", 3, newTestLogger("workerpool-test"))
d.start()
defer d.stop()
doneCh := make(chan struct{})
timeout := time.After(5 * time.Second)
go func() {
wg.Wait()
doneCh <- struct{}{}
}()
for i := 0; i < numJobs; i++ {
job := newTestJob(t, fmt.Sprintf("job-%d", i), ex, onFail)
d.dispatch(&job)
}
select {
case <-doneCh:
break
case <-timeout:
t.Fatal("timed out")
}
}
func TestFairshare_nilLoggerDispatcher(t *testing.T) {
d := newDispatcher("test-job-mgr", 1, nil)
if d.logger == nil {
t.Error("logger not set up properly")
}
}