diff --git a/client/client_test.go b/client/client_test.go index 2b203094c..a62914dd9 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -549,7 +549,7 @@ func waitTilNodeReady(client *Client, t *testing.T) { func TestClient_SaveRestoreState(t *testing.T) { t.Parallel() - ctestutil.ExecCompatible(t) + s1, _ := testServer(t, nil) defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) @@ -573,6 +573,7 @@ func TestClient_SaveRestoreState(t *testing.T) { alloc1.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ "run_for": "10s", } + alloc1.ClientStatus = structs.AllocClientStatusRunning state := s1.State() if err := state.UpsertJob(100, job); err != nil { diff --git a/drivers/shared/eventer/eventer.go b/drivers/shared/eventer/eventer.go index a68a20162..a76e84fb4 100644 --- a/drivers/shared/eventer/eventer.go +++ b/drivers/shared/eventer/eventer.go @@ -81,6 +81,15 @@ func (e *Eventer) iterateConsumers(event *drivers.TaskEvent) { e.consumersLock.Lock() filtered := e.consumers[:0] for _, consumer := range e.consumers { + + // prioritize checking if context is cancelled prior + // to attempting to forwarding events + // golang select evaluations aren't predictable + if consumer.ctx.Err() != nil { + close(consumer.ch) + continue + } + select { case <-time.After(consumer.timeout): filtered = append(filtered, consumer)