client: review comments and fixup/skip tests
This commit is contained in:
parent
64e9fc3798
commit
3183b33d24
|
@ -76,6 +76,7 @@ func TestAllocations_GarbageCollectAll_ACL(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAllocations_GarbageCollect(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
client := TestClient(t, func(c *config.Config) {
|
||||
|
@ -174,6 +175,7 @@ func TestAllocations_GarbageCollect_ACL(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAllocations_Stats(t *testing.T) {
|
||||
t.Skip("missing exec driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
client := TestClient(t, nil)
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
// TestPrevAlloc_StreamAllocDir_TLS asserts ephemeral disk migrations still
|
||||
// work when TLS is enabled.
|
||||
func TestPrevAlloc_StreamAllocDir_TLS(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
const (
|
||||
caFn = "../helper/tlsutil/testdata/global-ca.pem"
|
||||
serverCertFn = "../helper/tlsutil/testdata/global-server.pem"
|
||||
|
|
|
@ -80,9 +80,6 @@ type allocRunner struct {
|
|||
// and if necessary migrate its alloc dir.
|
||||
prevAllocWatcher allocwatcher.PrevAllocWatcher
|
||||
|
||||
// pluginLoader is used to load plugins.
|
||||
pluginLoader loader.PluginCatalog
|
||||
|
||||
// pluginSingletonLoader is a plugin loader that will returns singleton
|
||||
// instances of the plugins.
|
||||
pluginSingletonLoader loader.PluginCatalog
|
||||
|
@ -109,7 +106,6 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
|
|||
stateUpdater: config.StateUpdater,
|
||||
allocBroadcaster: cstructs.NewAllocBroadcaster(),
|
||||
prevAllocWatcher: config.PrevAllocWatcher,
|
||||
pluginLoader: config.PluginLoader,
|
||||
pluginSingletonLoader: config.PluginSingletonLoader,
|
||||
}
|
||||
|
||||
|
@ -143,7 +139,6 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
|
|||
StateUpdater: ar,
|
||||
Consul: ar.consulClient,
|
||||
VaultClient: ar.vaultClient,
|
||||
PluginLoader: ar.pluginLoader,
|
||||
PluginSingletonLoader: ar.pluginSingletonLoader,
|
||||
}
|
||||
|
||||
|
|
|
@ -7,28 +7,35 @@ import (
|
|||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/plugins/shared/catalog"
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
"github.com/hashicorp/nomad/plugins/shared/singleton"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestAllocRunner_AllocState_Initialized asserts that getting TaskStates via
|
||||
// AllocState() are initialized even before the AllocRunner has run.
|
||||
func TestAllocRunner_AllocState_Initialized(t *testing.T) {
|
||||
t.Skip("missing exec driver plugin implementation")
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.Alloc()
|
||||
logger := testlog.HCLogger(t)
|
||||
|
||||
conf := &Config{
|
||||
Alloc: alloc,
|
||||
Logger: logger,
|
||||
ClientConfig: config.TestClientConfig(),
|
||||
StateDB: state.NoopDB{},
|
||||
Consul: nil,
|
||||
Vault: nil,
|
||||
StateUpdater: nil,
|
||||
PrevAllocWatcher: nil,
|
||||
Alloc: alloc,
|
||||
Logger: logger,
|
||||
ClientConfig: config.TestClientConfig(),
|
||||
StateDB: state.NoopDB{},
|
||||
Consul: nil,
|
||||
Vault: nil,
|
||||
StateUpdater: nil,
|
||||
PrevAllocWatcher: nil,
|
||||
PluginSingletonLoader: &loader.MockCatalog{},
|
||||
}
|
||||
|
||||
pluginLoader := catalog.TestPluginLoader(t)
|
||||
conf.PluginSingletonLoader = singleton.NewSingletonLoader(logger, pluginLoader)
|
||||
ar, err := NewAllocRunner(conf)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
|
|
@ -4,14 +4,14 @@ import (
|
|||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
)
|
||||
|
||||
func NewDriverHandle(driver drivers.DriverPlugin, taskID string, task *structs.Task, net *cstructs.DriverNetwork) interfaces.DriverHandle {
|
||||
return &driverHandleImpl{
|
||||
// NewDriverHandle returns a handle for task operations on a specific task
|
||||
func NewDriverHandle(driver drivers.DriverPlugin, taskID string, task *structs.Task, net *cstructs.DriverNetwork) *DriverHandle {
|
||||
return &DriverHandle{
|
||||
driver: driver,
|
||||
net: net,
|
||||
taskID: taskID,
|
||||
|
@ -19,38 +19,40 @@ func NewDriverHandle(driver drivers.DriverPlugin, taskID string, task *structs.T
|
|||
}
|
||||
}
|
||||
|
||||
type driverHandleImpl struct {
|
||||
// DriverHandle encapsulates a driver plugin client and task identifier and exposes
|
||||
// an api to perform driver operations on the task
|
||||
type DriverHandle struct {
|
||||
driver drivers.DriverPlugin
|
||||
net *cstructs.DriverNetwork
|
||||
task *structs.Task
|
||||
taskID string
|
||||
}
|
||||
|
||||
func (h *driverHandleImpl) ID() string {
|
||||
func (h *DriverHandle) ID() string {
|
||||
return h.taskID
|
||||
}
|
||||
|
||||
func (h *driverHandleImpl) WaitCh(ctx context.Context) (<-chan *drivers.ExitResult, error) {
|
||||
func (h *DriverHandle) WaitCh(ctx context.Context) (<-chan *drivers.ExitResult, error) {
|
||||
return h.driver.WaitTask(ctx, h.taskID)
|
||||
}
|
||||
|
||||
func (h *driverHandleImpl) Update(task *structs.Task) error {
|
||||
func (h *DriverHandle) Update(task *structs.Task) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *driverHandleImpl) Kill() error {
|
||||
func (h *DriverHandle) Kill() error {
|
||||
return h.driver.StopTask(h.taskID, h.task.KillTimeout, h.task.KillSignal)
|
||||
}
|
||||
|
||||
func (h *driverHandleImpl) Stats() (*cstructs.TaskResourceUsage, error) {
|
||||
func (h *DriverHandle) Stats() (*cstructs.TaskResourceUsage, error) {
|
||||
return h.driver.TaskStats(h.taskID)
|
||||
}
|
||||
|
||||
func (h *driverHandleImpl) Signal(s string) error {
|
||||
func (h *DriverHandle) Signal(s string) error {
|
||||
return h.driver.SignalTask(h.taskID, s)
|
||||
}
|
||||
|
||||
func (h *driverHandleImpl) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) {
|
||||
func (h *DriverHandle) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) {
|
||||
command := append([]string{cmd}, args...)
|
||||
res, err := h.driver.ExecTask(h.taskID, command, timeout)
|
||||
if err != nil {
|
||||
|
@ -59,6 +61,6 @@ func (h *driverHandleImpl) Exec(timeout time.Duration, cmd string, args []string
|
|||
return res.Stdout, res.ExitResult.ExitCode, res.ExitResult.Err
|
||||
}
|
||||
|
||||
func (h *driverHandleImpl) Network() *cstructs.DriverNetwork {
|
||||
func (h *DriverHandle) Network() *cstructs.DriverNetwork {
|
||||
return h.net
|
||||
}
|
||||
|
|
|
@ -52,7 +52,6 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error {
|
|||
tr.EmitEvent(event)
|
||||
|
||||
// Send the signal
|
||||
|
||||
return handle.Signal(s)
|
||||
}
|
||||
|
||||
|
@ -90,23 +89,24 @@ func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error
|
|||
// Block until task has exited.
|
||||
waitCh, err := handle.WaitCh(ctx)
|
||||
|
||||
// The task may have already been cleaned up
|
||||
// The error should be nil or TaskNotFound, if it's something else then a
|
||||
// failure in the driver or transport layer occured
|
||||
if err != nil && err != drivers.ErrTaskNotFound {
|
||||
tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if waitCh != nil {
|
||||
if err == nil {
|
||||
<-waitCh
|
||||
}
|
||||
|
||||
// Store that the task has been destroyed and any associated error.
|
||||
tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr))
|
||||
// Store that the task has been destroyed and any associated error.
|
||||
tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr))
|
||||
|
||||
if destroyErr != nil {
|
||||
return destroyErr
|
||||
} else if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
if destroyErr != nil {
|
||||
return destroyErr
|
||||
} else if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -28,7 +28,6 @@ import (
|
|||
"github.com/hashicorp/nomad/plugins/shared"
|
||||
"github.com/hashicorp/nomad/plugins/shared/hclspec"
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
"github.com/zclconf/go-cty/cty"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -77,10 +76,6 @@ type TaskRunner struct {
|
|||
// stateDB is for persisting localState and taskState
|
||||
stateDB cstate.StateDB
|
||||
|
||||
// persistedHash is the hash of the last persisted state for skipping
|
||||
// unnecessary writes
|
||||
persistedHash []byte
|
||||
|
||||
// ctx is the task runner's context representing the tasks's lifecycle.
|
||||
// Canceling the context will cause the task to be destroyed.
|
||||
ctx context.Context
|
||||
|
@ -114,7 +109,7 @@ type TaskRunner struct {
|
|||
handleLock sync.Mutex
|
||||
|
||||
// handle to the running driver
|
||||
handle tinterfaces.DriverHandle
|
||||
handle *DriverHandle
|
||||
|
||||
// network is the configuration for the driver network if one was created
|
||||
network *cstructs.DriverNetwork
|
||||
|
@ -161,9 +156,6 @@ type TaskRunner struct {
|
|||
resourceUsage *cstructs.TaskResourceUsage
|
||||
resourceUsageLock sync.Mutex
|
||||
|
||||
// PluginLoader is used to load plugins.
|
||||
pluginLoader loader.PluginCatalog
|
||||
|
||||
// PluginSingletonLoader is a plugin loader that will returns singleton
|
||||
// instances of the plugins.
|
||||
pluginSingletonLoader loader.PluginCatalog
|
||||
|
@ -189,9 +181,6 @@ type Config struct {
|
|||
// StateUpdater is used to emit updated task state
|
||||
StateUpdater interfaces.TaskStateHandler
|
||||
|
||||
// PluginLoader is used to load plugins.
|
||||
PluginLoader loader.PluginCatalog
|
||||
|
||||
// PluginSingletonLoader is a plugin loader that will returns singleton
|
||||
// instances of the plugins.
|
||||
PluginSingletonLoader loader.PluginCatalog
|
||||
|
@ -227,7 +216,6 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
|
|||
ctxCancel: trCancel,
|
||||
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
|
||||
waitCh: make(chan struct{}),
|
||||
pluginLoader: config.PluginLoader,
|
||||
pluginSingletonLoader: config.PluginSingletonLoader,
|
||||
}
|
||||
|
||||
|
@ -437,16 +425,15 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
|
|||
// runDriver runs the driver and waits for it to exit
|
||||
func (tr *TaskRunner) runDriver() error {
|
||||
|
||||
// TODO(nickethier): make sure this uses alloc.AllocatedResources once #4750 is rebased
|
||||
taskConfig := drivers.NewTaskConfig(tr.task, tr.taskDir, tr.envBuilder.Build())
|
||||
taskConfig.ID = tr.buildID()
|
||||
taskConfig.StdoutPath = tr.logmonHookConfig.stdoutFifo
|
||||
taskConfig.StderrPath = tr.logmonHookConfig.stderrFifo
|
||||
|
||||
// TODO: load variables
|
||||
evalCtx := &hcl.EvalContext{
|
||||
Functions: shared.GetStdlibFuncs(),
|
||||
Variables: map[string]cty.Value{
|
||||
"NOMAD_ENV_bin": cty.StringVal("/bin/consul"),
|
||||
},
|
||||
}
|
||||
|
||||
val, diag := shared.ParseHclInterface(tr.task.Config, tr.taskSchema, evalCtx)
|
||||
|
@ -487,12 +474,7 @@ func (tr *TaskRunner) runDriver() error {
|
|||
func (tr *TaskRunner) updateDriverHandle(taskID string) {
|
||||
tr.handleLock.Lock()
|
||||
defer tr.handleLock.Unlock()
|
||||
tr.handle = &driverHandleImpl{
|
||||
driver: tr.driver,
|
||||
net: tr.network,
|
||||
taskID: taskID,
|
||||
task: tr.Task(),
|
||||
}
|
||||
tr.handle = NewDriverHandle(tr.driver, taskID, tr.Task(), tr.network)
|
||||
}
|
||||
|
||||
// initDriver creates the driver for the task
|
||||
|
|
|
@ -58,7 +58,7 @@ func (tr *TaskRunner) getDriverHandle() interfaces.DriverHandle {
|
|||
}
|
||||
|
||||
// setDriverHanlde sets the driver handle and creates a new result proxy.
|
||||
func (tr *TaskRunner) setDriverHandle(handle interfaces.DriverHandle) {
|
||||
func (tr *TaskRunner) setDriverHandle(handle *DriverHandle) {
|
||||
tr.handleLock.Lock()
|
||||
defer tr.handleLock.Unlock()
|
||||
tr.handle = handle
|
||||
|
|
|
@ -1039,64 +1039,64 @@ func (c *Client) updateNodeFromFingerprint(response *cstructs.FingerprintRespons
|
|||
func (c *Client) updateNodeFromDriver(name string, info *structs.DriverInfo) *structs.Node {
|
||||
c.configLock.Lock()
|
||||
defer c.configLock.Unlock()
|
||||
if info == nil {
|
||||
return c.configCopy.Node
|
||||
}
|
||||
|
||||
var hasChanged bool
|
||||
|
||||
hadDriver := c.config.Node.Drivers[name] != nil
|
||||
if info != nil {
|
||||
if !hadDriver {
|
||||
// If the driver info has not yet been set, do that here
|
||||
if !hadDriver {
|
||||
// If the driver info has not yet been set, do that here
|
||||
hasChanged = true
|
||||
c.config.Node.Drivers[name] = info
|
||||
for attrName, newVal := range info.Attributes {
|
||||
c.config.Node.Attributes[attrName] = newVal
|
||||
}
|
||||
} else {
|
||||
oldVal := c.config.Node.Drivers[name]
|
||||
// The driver info has already been set, fix it up
|
||||
if oldVal.Detected != info.Detected {
|
||||
hasChanged = true
|
||||
c.config.Node.Drivers[name] = info
|
||||
for attrName, newVal := range info.Attributes {
|
||||
c.config.Node.Drivers[name].Detected = info.Detected
|
||||
}
|
||||
|
||||
if oldVal.Healthy != info.Healthy || oldVal.HealthDescription != info.HealthDescription {
|
||||
hasChanged = true
|
||||
if info.HealthDescription != "" {
|
||||
event := &structs.NodeEvent{
|
||||
Subsystem: "Driver",
|
||||
Message: info.HealthDescription,
|
||||
Timestamp: time.Now(),
|
||||
Details: map[string]string{"driver": name},
|
||||
}
|
||||
c.triggerNodeEvent(event)
|
||||
}
|
||||
}
|
||||
|
||||
for attrName, newVal := range info.Attributes {
|
||||
oldVal := c.config.Node.Drivers[name].Attributes[attrName]
|
||||
if oldVal == newVal {
|
||||
continue
|
||||
}
|
||||
|
||||
hasChanged = true
|
||||
if newVal == "" {
|
||||
delete(c.config.Node.Attributes, attrName)
|
||||
} else {
|
||||
c.config.Node.Attributes[attrName] = newVal
|
||||
}
|
||||
} else {
|
||||
oldVal := c.config.Node.Drivers[name]
|
||||
// The driver info has already been set, fix it up
|
||||
if oldVal.Detected != info.Detected {
|
||||
hasChanged = true
|
||||
c.config.Node.Drivers[name].Detected = info.Detected
|
||||
}
|
||||
|
||||
if oldVal.Healthy != info.Healthy || oldVal.HealthDescription != info.HealthDescription {
|
||||
hasChanged = true
|
||||
if info.HealthDescription != "" {
|
||||
event := &structs.NodeEvent{
|
||||
Subsystem: "Driver",
|
||||
Message: info.HealthDescription,
|
||||
Timestamp: time.Now(),
|
||||
Details: map[string]string{"driver": name},
|
||||
}
|
||||
c.triggerNodeEvent(event)
|
||||
}
|
||||
}
|
||||
|
||||
for attrName, newVal := range info.Attributes {
|
||||
oldVal := c.config.Node.Drivers[name].Attributes[attrName]
|
||||
if oldVal == newVal {
|
||||
continue
|
||||
}
|
||||
|
||||
hasChanged = true
|
||||
if newVal == "" {
|
||||
delete(c.config.Node.Attributes, attrName)
|
||||
} else {
|
||||
c.config.Node.Attributes[attrName] = newVal
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// COMPAT Remove in Nomad 0.10
|
||||
// We maintain the driver enabled attribute until all drivers expose
|
||||
// their attributes as DriverInfo
|
||||
driverName := fmt.Sprintf("driver.%s", name)
|
||||
if info.Detected {
|
||||
c.config.Node.Attributes[driverName] = "1"
|
||||
} else {
|
||||
delete(c.config.Node.Attributes, driverName)
|
||||
}
|
||||
}
|
||||
|
||||
// COMPAT Remove in Nomad 0.10
|
||||
// We maintain the driver enabled attribute until all drivers expose
|
||||
// their attributes as DriverInfo
|
||||
driverName := fmt.Sprintf("driver.%s", name)
|
||||
if info.Detected {
|
||||
c.config.Node.Attributes[driverName] = "1"
|
||||
} else {
|
||||
delete(c.config.Node.Attributes, driverName)
|
||||
}
|
||||
|
||||
if hasChanged {
|
||||
|
|
|
@ -159,6 +159,7 @@ func TestClient_Fingerprint(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestClient_Fingerprint_Periodic(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
|
||||
c1 := TestClient(t, func(c *config.Config) {
|
||||
|
@ -399,6 +400,7 @@ func TestClient_Heartbeat(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestClient_UpdateAllocStatus(t *testing.T) {
|
||||
t.Skip("missing exec driver plugin implementation")
|
||||
t.Parallel()
|
||||
s1, _ := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
|
@ -662,6 +664,7 @@ func TestClient_Init(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestClient_BlockedAllocations(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
s1, _ := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
|
|
|
@ -118,13 +118,9 @@ func (fp *FingerprintManager) Run() error {
|
|||
|
||||
var availDrivers []string
|
||||
var skippedDrivers []string
|
||||
var registeredDrivers []string
|
||||
|
||||
for _, pl := range fp.singletonLoader.Catalog()[base.PluginTypeDriver] {
|
||||
registeredDrivers = append(registeredDrivers, pl.Name)
|
||||
}
|
||||
|
||||
for _, name := range registeredDrivers {
|
||||
name := pl.Name
|
||||
// Skip fingerprinting drivers that are not in the whitelist if it is
|
||||
// enabled.
|
||||
if _, ok := whitelistDrivers[name]; whitelistDriversEnabled && !ok {
|
||||
|
@ -197,7 +193,7 @@ func (fm *FingerprintManager) setupDrivers(driverNames []string) error {
|
|||
|
||||
driver, ok := plug.Plugin().(drivers.DriverPlugin)
|
||||
if !ok {
|
||||
return fmt.Errorf("registered driver plugin %q does not implement DriverPlugin interface")
|
||||
return fmt.Errorf("registered driver plugin %q does not implement DriverPlugin interface", name)
|
||||
}
|
||||
|
||||
// Pass true for whether the health check is periodic here, so that the
|
||||
|
@ -209,6 +205,7 @@ func (fm *FingerprintManager) setupDrivers(driverNames []string) error {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
fingerCh, err := driver.Fingerprint(ctx)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -218,10 +215,14 @@ func (fm *FingerprintManager) setupDrivers(driverNames []string) error {
|
|||
// attributes.
|
||||
go fm.watchDriverFingerprint(fingerCh, name, cancel)
|
||||
|
||||
if fm.logger.IsTrace() {
|
||||
fm.logger.Trace("initial driver fingerprint", "driver", name, "fingerprint", finger)
|
||||
}
|
||||
// Log the fingerprinters which have been applied
|
||||
if finger.Health != drivers.HealthStateUndetected {
|
||||
availDrivers = append(availDrivers, name)
|
||||
}
|
||||
fm.processDriverFingerprint(finger, name)
|
||||
}
|
||||
|
||||
fm.logger.Debug("detected drivers", "drivers", availDrivers)
|
||||
|
@ -282,20 +283,69 @@ func (fm *FingerprintManager) watchDriverFingerprint(fpChan <-chan *drivers.Fing
|
|||
case <-fm.shutdownCh:
|
||||
cancel()
|
||||
return
|
||||
case fp := <-fpChan:
|
||||
di := &structs.DriverInfo{
|
||||
Attributes: fp.Attributes,
|
||||
Detected: fp.Health != drivers.HealthStateUndetected,
|
||||
Healthy: fp.Health == drivers.HealthStateHealthy,
|
||||
HealthDescription: fp.HealthDescription,
|
||||
UpdateTime: time.Now(),
|
||||
case fp, ok := <-fpChan:
|
||||
// if the channel is closed attempt to open a new one
|
||||
if !ok {
|
||||
newFpChan, newCancel, err := fm.retryDriverFingerprint(name)
|
||||
if err != nil {
|
||||
fm.logger.Warn("failed to fingerprint driver, retrying in 30s", "error", err)
|
||||
fm.nodeLock.Lock()
|
||||
n := fm.updateNodeFromDriver(name, &structs.DriverInfo{
|
||||
Healthy: false,
|
||||
HealthDescription: "failed to fingerprint driver",
|
||||
UpdateTime: time.Now(),
|
||||
})
|
||||
if n != nil {
|
||||
fm.node = n
|
||||
}
|
||||
fm.nodeLock.Unlock()
|
||||
time.Sleep(30 * time.Second)
|
||||
} else {
|
||||
cancel()
|
||||
fpChan = newFpChan
|
||||
cancel = newCancel
|
||||
}
|
||||
continue
|
||||
} else {
|
||||
fm.processDriverFingerprint(fp, name)
|
||||
}
|
||||
fm.nodeLock.Lock()
|
||||
n := fm.updateNodeFromDriver(name, di)
|
||||
if n != nil {
|
||||
fm.node = n
|
||||
}
|
||||
fm.nodeLock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fm *FingerprintManager) processDriverFingerprint(fp *drivers.Fingerprint, driverName string) {
|
||||
di := &structs.DriverInfo{
|
||||
Attributes: fp.Attributes,
|
||||
Detected: fp.Health != drivers.HealthStateUndetected,
|
||||
Healthy: fp.Health == drivers.HealthStateHealthy,
|
||||
HealthDescription: fp.HealthDescription,
|
||||
UpdateTime: time.Now(),
|
||||
}
|
||||
fm.nodeLock.Lock()
|
||||
n := fm.updateNodeFromDriver(driverName, di)
|
||||
if n != nil {
|
||||
fm.node = n
|
||||
}
|
||||
fm.nodeLock.Unlock()
|
||||
}
|
||||
|
||||
func (fm *FingerprintManager) retryDriverFingerprint(driverName string) (<-chan *drivers.Fingerprint, context.CancelFunc, error) {
|
||||
plug, err := fm.singletonLoader.Dispense(driverName, base.PluginTypeDriver, fm.logger)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
driver, ok := plug.Plugin().(drivers.DriverPlugin)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("registered driver plugin %q does not implement DriverPlugin interface", driverName)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
fingerCh, err := driver.Fingerprint(ctx)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return fingerCh, cancel, nil
|
||||
}
|
||||
|
|
|
@ -8,9 +8,13 @@ import (
|
|||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
// registering raw_exec driver plugin used in testing
|
||||
_ "github.com/hashicorp/nomad/drivers/rawexec"
|
||||
)
|
||||
|
||||
func TestFingerprintManager_Run_MockDriver(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
testClient := TestClient(t, nil)
|
||||
|
@ -19,6 +23,7 @@ func TestFingerprintManager_Run_MockDriver(t *testing.T) {
|
|||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
|
@ -46,6 +51,7 @@ func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) {
|
|||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
|
@ -67,12 +73,17 @@ func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) {
|
|||
func TestFingerprintManager_Fingerprint_Run(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
testClient := TestClient(t, nil)
|
||||
testClient := TestClient(t, func(c *config.Config) {
|
||||
c.Options = map[string]string{
|
||||
"driver.raw_exec.enable": "true",
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testlog.HCLogger(t)
|
||||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
|
@ -92,6 +103,7 @@ func TestFingerprintManager_Fingerprint_Run(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFingerprintManager_Fingerprint_Periodic(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
testClient := TestClient(t, func(c *config.Config) {
|
||||
|
@ -105,6 +117,7 @@ func TestFingerprintManager_Fingerprint_Periodic(t *testing.T) {
|
|||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
|
@ -153,6 +166,7 @@ func TestFingerprintManager_Fingerprint_Periodic(t *testing.T) {
|
|||
// This is a temporary measure to check that a driver has both attributes on a
|
||||
// node set as well as DriverInfo.
|
||||
func TestFingerprintManager_HealthCheck_Driver(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
testClient := TestClient(t, func(c *config.Config) {
|
||||
|
@ -167,6 +181,7 @@ func TestFingerprintManager_HealthCheck_Driver(t *testing.T) {
|
|||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
|
@ -254,6 +269,7 @@ func TestFingerprintManager_HealthCheck_Driver(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFingerprintManager_HealthCheck_Periodic(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
testClient := TestClient(t, func(c *config.Config) {
|
||||
|
@ -267,6 +283,7 @@ func TestFingerprintManager_HealthCheck_Periodic(t *testing.T) {
|
|||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
|
@ -349,6 +366,7 @@ func TestFingerprintManager_HealthCheck_Periodic(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFimgerprintManager_Run_InWhitelist(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
|
@ -363,6 +381,7 @@ func TestFimgerprintManager_Run_InWhitelist(t *testing.T) {
|
|||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
|
@ -393,6 +412,7 @@ func TestFingerprintManager_Run_InBlacklist(t *testing.T) {
|
|||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
|
@ -425,6 +445,7 @@ func TestFingerprintManager_Run_Combination(t *testing.T) {
|
|||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
|
@ -458,6 +479,7 @@ func TestFingerprintManager_Run_WhitelistDrivers(t *testing.T) {
|
|||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
|
@ -480,6 +502,7 @@ func TestFingerprintManager_Run_AllDriversBlacklisted(t *testing.T) {
|
|||
|
||||
testClient := TestClient(t, func(c *config.Config) {
|
||||
c.Options = map[string]string{
|
||||
"driver.raw_exec.enable": "1",
|
||||
"driver.whitelist": " foo,bar,baz ",
|
||||
}
|
||||
})
|
||||
|
@ -488,6 +511,7 @@ func TestFingerprintManager_Run_AllDriversBlacklisted(t *testing.T) {
|
|||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
|
@ -502,8 +526,8 @@ func TestFingerprintManager_Run_AllDriversBlacklisted(t *testing.T) {
|
|||
node := testClient.config.Node
|
||||
|
||||
require.NotContains(node.Attributes, "driver.raw_exec")
|
||||
require.NotContains(node.Attributes, "driver.exec")
|
||||
require.NotContains(node.Attributes, "driver.docker")
|
||||
//require.NotContains(node.Attributes, "driver.exec")
|
||||
//require.NotContains(node.Attributes, "driver.docker")
|
||||
}
|
||||
|
||||
func TestFingerprintManager_Run_DriversWhiteListBlacklistCombination(t *testing.T) {
|
||||
|
@ -522,6 +546,7 @@ func TestFingerprintManager_Run_DriversWhiteListBlacklistCombination(t *testing.
|
|||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
|
@ -555,6 +580,7 @@ func TestFingerprintManager_Run_DriversInBlacklist(t *testing.T) {
|
|||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
|
|
|
@ -74,6 +74,7 @@ func TestFS_Stat_NoAlloc(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFS_Stat(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
|
@ -212,6 +213,7 @@ func TestFS_List_NoAlloc(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFS_List(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
|
@ -523,6 +525,7 @@ func TestFS_Stream_ACL(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFS_Stream(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
|
@ -633,6 +636,7 @@ func (r *ReadWriteCloseChecker) Close() error {
|
|||
}
|
||||
|
||||
func TestFS_Stream_Follow(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
|
@ -730,6 +734,7 @@ OUTER:
|
|||
}
|
||||
|
||||
func TestFS_Stream_Limit(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
|
@ -902,6 +907,7 @@ OUTER:
|
|||
// TestFS_Logs_TaskPending asserts that trying to stream logs for tasks which
|
||||
// have not started returns a 404 error.
|
||||
func TestFS_Logs_TaskPending(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
|
@ -1130,6 +1136,7 @@ func TestFS_Logs_ACL(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFS_Logs(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
|
@ -1231,6 +1238,7 @@ OUTER:
|
|||
}
|
||||
|
||||
func TestFS_Logs_Follow(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
|
|
|
@ -25,14 +25,15 @@ func TestClient(t testing.T, cb func(c *config.Config)) *Client {
|
|||
logger := testlog.HCLogger(t)
|
||||
conf.Logger = logger
|
||||
|
||||
// Set the plugin loaders
|
||||
conf.PluginLoader = catalog.TestPluginLoader(t)
|
||||
conf.PluginSingletonLoader = singleton.NewSingletonLoader(logger, conf.PluginLoader)
|
||||
|
||||
if cb != nil {
|
||||
cb(conf)
|
||||
}
|
||||
|
||||
// Set the plugin loaders
|
||||
if conf.PluginLoader == nil {
|
||||
conf.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", conf.Options, nil)
|
||||
conf.PluginSingletonLoader = singleton.NewSingletonLoader(logger, conf.PluginLoader)
|
||||
}
|
||||
catalog := consul.NewMockCatalog(logger)
|
||||
mockService := consulApi.NewMockConsulServiceClient(t, logger)
|
||||
client, err := NewClient(conf, catalog, mockService)
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
|
||||
// Each internal plugin has an init func which registers itself with the
|
||||
// plugin catalog. Since the plugin implementations are not imported by the
|
||||
// client or server they must be referenced here so plugin registration
|
||||
// occures.
|
||||
|
||||
// raw_exec driver
|
||||
_ "github.com/hashicorp/nomad/drivers/rawexec"
|
||||
)
|
|
@ -6,8 +6,6 @@ import (
|
|||
"github.com/hashicorp/nomad/plugins/shared/catalog"
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
"github.com/hashicorp/nomad/plugins/shared/singleton"
|
||||
|
||||
_ "github.com/hashicorp/nomad/drivers/rawexec"
|
||||
)
|
||||
|
||||
// setupPlugins is used to setup the plugin loaders.
|
||||
|
|
|
@ -23,6 +23,8 @@ import (
|
|||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// When the package is loaded the driver is registered as an internal plugin
|
||||
// with the plugin catalog
|
||||
func init() {
|
||||
catalog.RegisterDeferredConfig(loader.PluginID{
|
||||
Name: pluginName,
|
||||
|
@ -32,15 +34,13 @@ func init() {
|
|||
Factory: func(l hclog.Logger) interface{} { return NewRawExecDriver(l) },
|
||||
},
|
||||
func(opts map[string]string) (map[string]interface{}, error) {
|
||||
fmt.Println(opts)
|
||||
conf := map[string]interface{}{}
|
||||
if v, ok := opts["driver.raw_exec.enable"]; ok && v == "true" {
|
||||
conf["enabled"] = true
|
||||
if v, err := strconv.ParseBool(opts["driver.raw_exec.enable"]); err == nil {
|
||||
conf["enabled"] = v
|
||||
}
|
||||
if v, ok := opts["driver.raw_exec.no_cgroups"]; ok && v == "true" {
|
||||
conf["no_cgroups"] = true
|
||||
if v, err := strconv.ParseBool(opts["driver.raw_exec.no_cgroups"]); err == nil {
|
||||
conf["no_cgroups"] = v
|
||||
}
|
||||
|
||||
return conf, nil
|
||||
},
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue