client: add test for driverfailure during fingerprinting
This commit is contained in:
parent
15e8477c4e
commit
c7ac1186c9
|
@ -276,13 +276,12 @@ func (fm *FingerprintManager) watchDriverFingerprint(fpChan <-chan *drivers.Fing
|
|||
if backoff > 0 {
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-fm.shutdownCh:
|
||||
cancel()
|
||||
return
|
||||
case fp, ok := <-fpChan:
|
||||
if ok {
|
||||
if ok && fp.Err == nil {
|
||||
fm.processDriverFingerprint(fp, name)
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -4,8 +4,11 @@ import (
|
|||
"fmt"
|
||||
"testing"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
|
@ -565,6 +568,60 @@ func TestFingerprintManager_Run_DriversWhiteListBlacklistCombination(t *testing.
|
|||
require.NotContains(node.Drivers, "exec")
|
||||
}
|
||||
|
||||
func TestFingerprintManager_Run_DriverFailure(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
testClient := TestClient(t, func(c *config.Config) {
|
||||
c.Options = map[string]string{
|
||||
"driver.raw_exec.enable": "1",
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testlog.HCLogger(t)
|
||||
defer testClient.Shutdown()
|
||||
|
||||
singLoader := testClient.config.PluginSingletonLoader
|
||||
|
||||
dispenseCalls := 0
|
||||
loader := &loader.MockCatalog{
|
||||
DispenseF: func(name, pluginType string, logger log.Logger) (loader.PluginInstance, error) {
|
||||
if pluginType == base.PluginTypeDriver && name == "raw_exec" {
|
||||
dispenseCalls++
|
||||
}
|
||||
return singLoader.Dispense(name, pluginType, logger)
|
||||
},
|
||||
ReattachF: singLoader.Reattach,
|
||||
CatalogF: singLoader.Catalog,
|
||||
}
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
loader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testClient.logger,
|
||||
)
|
||||
|
||||
fpChan, cancel, err := fm.dispenseDriverFingerprint("raw_exec")
|
||||
require.NoError(err)
|
||||
require.Equal(1, dispenseCalls)
|
||||
|
||||
cancel()
|
||||
go fm.watchDriverFingerprint(fpChan, "raw_exec", cancel)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if 2 != dispenseCalls {
|
||||
return false, fmt.Errorf("expected dispenseCalls to be 2 but was %d", dispenseCalls)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestFingerprintManager_Run_DriversInBlacklist(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
|
|
@ -196,6 +196,7 @@ func (r *RawExecDriver) Fingerprint(ctx context.Context) (<-chan *drivers.Finger
|
|||
}
|
||||
|
||||
func (r *RawExecDriver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerprint) {
|
||||
defer close(ch)
|
||||
ticker := time.NewTimer(0)
|
||||
for {
|
||||
select {
|
||||
|
|
|
@ -67,37 +67,44 @@ func (d *driverPluginClient) Capabilities() (*Capabilities, error) {
|
|||
func (d *driverPluginClient) Fingerprint(ctx context.Context) (<-chan *Fingerprint, error) {
|
||||
req := &proto.FingerprintRequest{}
|
||||
|
||||
stream, err := d.client.Fingerprint(context.Background(), req)
|
||||
stream, err := d.client.Fingerprint(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ch := make(chan *Fingerprint)
|
||||
go d.handleFingerprint(ch, stream)
|
||||
go d.handleFingerprint(ctx, ch, stream)
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
func (d *driverPluginClient) handleFingerprint(ch chan *Fingerprint, stream proto.Driver_FingerprintClient) {
|
||||
func (d *driverPluginClient) handleFingerprint(ctx context.Context, ch chan *Fingerprint, stream proto.Driver_FingerprintClient) {
|
||||
defer close(ch)
|
||||
for {
|
||||
pb, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
break
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
d.logger.Error("error receiving stream from Fingerprint driver RPC", "error", err)
|
||||
ch <- &Fingerprint{Err: fmt.Errorf("error from RPC stream: %v", err)}
|
||||
break
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case ch <- &Fingerprint{Err: fmt.Errorf("error from RPC stream: %v", err)}:
|
||||
}
|
||||
return
|
||||
}
|
||||
f := &Fingerprint{
|
||||
Attributes: pb.Attributes,
|
||||
Health: healthStateFromProto(pb.Health),
|
||||
HealthDescription: pb.HealthDescription,
|
||||
}
|
||||
ch <- f
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case ch <- f:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RecoverTask does internal state recovery to be able to control the task of
|
||||
|
|
Loading…
Reference in New Issue