Merge pull request #5152 from hashicorp/f-recover

Task runner recovers from external plugin exiting
This commit is contained in:
Alex Dadgar 2019-01-07 15:27:33 -08:00 committed by GitHub
commit edf132758d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 824 additions and 307 deletions

View File

@ -0,0 +1,152 @@
package taskrunner
import (
"context"
"fmt"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
cstructs "github.com/hashicorp/nomad/client/structs"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
)
const (
// retrieveBackoffBaseline is the baseline time for exponential backoff while
// retrieving a handle.
retrieveBackoffBaseline = 250 * time.Millisecond
// retrieveBackoffLimit is the limit of the exponential backoff for
// retrieving a handle.
retrieveBackoffLimit = 5 * time.Second
// retrieveFailureLimit is how many times we will attempt to retrieve a
// new handle before giving up.
retrieveFailureLimit = 5
)
// retrieveHandleFn is used to retrieve the latest driver handle
type retrieveHandleFn func() *DriverHandle
// LazyHandle is used to front calls to a DriverHandle where it is expected the
// existing handle may no longer be valid because the backing plugin has
// shutdown. LazyHandle detects the plugin shutting down and retrieves a new
// handle so that the consumer does not need to worry whether the handle is to
// the latest driver instance.
type LazyHandle struct {
// retrieveHandle is used to retrieve the latest handle
retrieveHandle retrieveHandleFn
// h is the current handle and may be nil
h *DriverHandle
// shutdownCtx is used to cancel retries if the agent is shutting down
shutdownCtx context.Context
logger log.Logger
sync.Mutex
}
// NewLazyHandle takes the function to receive the latest handle and a logger
// and returns a LazyHandle
func NewLazyHandle(shutdownCtx context.Context, fn retrieveHandleFn, logger log.Logger) *LazyHandle {
return &LazyHandle{
retrieveHandle: fn,
h: fn(),
shutdownCtx: shutdownCtx,
logger: logger.Named("lazy_handle"),
}
}
// getHandle returns the current handle or retrieves a new one
func (l *LazyHandle) getHandle() (*DriverHandle, error) {
l.Lock()
defer l.Unlock()
if l.h != nil {
return l.h, nil
}
return l.refreshHandleLocked()
}
// refreshHandle retrieves a new handle
func (l *LazyHandle) refreshHandle() (*DriverHandle, error) {
l.Lock()
defer l.Unlock()
return l.refreshHandleLocked()
}
// refreshHandleLocked retrieves a new handle and should be called with the lock
// held. It will retry to give the client time to restart the driver and restore
// the handle.
func (l *LazyHandle) refreshHandleLocked() (*DriverHandle, error) {
for i := 0; i < retrieveFailureLimit; i++ {
l.h = l.retrieveHandle()
if l.h != nil {
return l.h, nil
}
// Calculate the new backoff
backoff := (1 << (2 * uint64(i))) * retrieveBackoffBaseline
if backoff > retrieveBackoffLimit {
backoff = retrieveBackoffLimit
}
l.logger.Debug("failed to retrieve handle", "backoff", backoff)
select {
case <-l.shutdownCtx.Done():
return nil, l.shutdownCtx.Err()
case <-time.After(backoff):
}
}
return nil, fmt.Errorf("no driver handle")
}
func (l *LazyHandle) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) {
h, err := l.getHandle()
if err != nil {
return nil, 0, err
}
// Only retry once
first := true
TRY:
out, c, err := h.Exec(timeout, cmd, args)
if err == bstructs.ErrPluginShutdown && first {
first = false
h, err = l.refreshHandle()
if err == nil {
goto TRY
}
}
return out, c, err
}
func (l *LazyHandle) Stats() (*cstructs.TaskResourceUsage, error) {
h, err := l.getHandle()
if err != nil {
return nil, err
}
// Only retry once
first := true
TRY:
out, err := h.Stats()
if err == bstructs.ErrPluginShutdown && first {
first = false
h, err = l.refreshHandle()
if err == nil {
goto TRY
}
}
return out, err
}

View File

@ -2,13 +2,13 @@ package taskrunner
import (
"context"
"strings"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
cstructs "github.com/hashicorp/nomad/client/structs"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
)
// StatsUpdater is the interface required by the StatsHook to update stats.
@ -99,11 +99,11 @@ func (h *statsHook) collectResourceUsageStats(handle interfaces.DriverStats, sto
return
}
//XXX This is a net/rpc specific error
// We do not log when the plugin is shutdown as this is simply a
// race between the stopCollection channel being closed and calling
// Stats on the handle.
if !strings.Contains(err.Error(), "connection is shut down") {
// We do not log when the plugin is shutdown since this is
// likely because the driver plugin has unexpectedly exited,
// in which case sleeping and trying again or returning based
// on the stop channel is the correct behavior
if err != bstructs.ErrPluginShutdown {
h.logger.Debug("error fetching stats of task", "error", err)
}

View File

@ -28,9 +28,10 @@ import (
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/hclutils"
)
const (
@ -408,8 +409,10 @@ MAIN:
}
// Grab the result proxy and wait for task to exit
WAIT:
{
handle := tr.getDriverHandle()
result = nil
// Do *not* use tr.killCtx here as it would cause
// Wait() to unblock before the task exits when Kill()
@ -418,12 +421,15 @@ MAIN:
tr.logger.Error("wait task failed", "error", err)
} else {
select {
case result = <-resultCh:
// WaitCh returned a result
tr.handleTaskExitResult(result)
case <-tr.ctx.Done():
// TaskRunner was told to exit immediately
return
case result = <-resultCh:
}
// WaitCh returned a result
if retryWait := tr.handleTaskExitResult(result); retryWait {
goto WAIT
}
}
}
@ -467,9 +473,37 @@ MAIN:
tr.logger.Debug("task run loop exiting")
}
func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) {
// handleTaskExitResult handles the results returned by the task exiting. If
// retryWait is true, the caller should attempt to wait on the task again since
// it has not actually finished running. This can happen if the driver plugin
// has exited.
func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) (retryWait bool) {
if result == nil {
return
return false
}
if result.Err == bstructs.ErrPluginShutdown {
dn := tr.Task().Driver
tr.logger.Debug("driver plugin has shutdown; attempting to recover task", "driver", dn)
// Initialize a new driver handle
if err := tr.initDriver(); err != nil {
tr.logger.Error("failed to initialize driver after it exited unexpectedly", "error", err, "driver", dn)
return false
}
// Try to restore the handle
tr.stateLock.RLock()
h := tr.localState.TaskHandle
net := tr.localState.DriverNetwork
tr.stateLock.RUnlock()
if !tr.restoreHandle(h, net) {
tr.logger.Error("failed to restore handle on driver after it exited unexpectedly", "driver", dn)
return false
}
tr.logger.Debug("task successfully recovered on driver", "driver", dn)
return true
}
event := structs.NewTaskEvent(structs.TaskTerminated).
@ -483,6 +517,8 @@ func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) {
if result.OOMKilled && !tr.clientConfig.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "oom_killed"}, 1, tr.baseLabels)
}
return false
}
// handleUpdates runs update hooks when triggerUpdateCh is ticked and exits
@ -530,7 +566,6 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
// runDriver runs the driver and waits for it to exit
func (tr *TaskRunner) runDriver() error {
// TODO(nickethier): make sure this uses alloc.AllocatedResources once #4750 is rebased
taskConfig := tr.buildTaskConfig()
// Build hcl context variables
@ -556,10 +591,10 @@ func (tr *TaskRunner) runDriver() error {
evalCtx := &hcl.EvalContext{
Variables: vars,
Functions: shared.GetStdlibFuncs(),
Functions: hclutils.GetStdlibFuncs(),
}
val, diag := shared.ParseHclInterface(tr.task.Config, tr.taskSchema, evalCtx)
val, diag := hclutils.ParseHclInterface(tr.task.Config, tr.taskSchema, evalCtx)
if diag.HasErrors() {
return multierror.Append(errors.New("failed to parse config"), diag.Errs()...)
}
@ -568,8 +603,6 @@ func (tr *TaskRunner) runDriver() error {
return fmt.Errorf("failed to encode driver config: %v", err)
}
//XXX Evaluate and encode driver config
// If there's already a task handle (eg from a Restore) there's nothing
// to do except update state.
if tr.getDriverHandle() != nil {
@ -586,7 +619,20 @@ func (tr *TaskRunner) runDriver() error {
// Start the job if there's no existing handle (or if RecoverTask failed)
handle, net, err := tr.driver.StartTask(taskConfig)
if err != nil {
return fmt.Errorf("driver start failed: %v", err)
// The plugin has died, try relaunching it
if err == bstructs.ErrPluginShutdown {
tr.logger.Info("failed to start task because plugin shutdown unexpectedly; attempting to recover")
if err := tr.initDriver(); err != nil {
return fmt.Errorf("failed to initialize driver after it exited unexpectedly: %v", err)
}
handle, net, err = tr.driver.StartTask(taskConfig)
if err != nil {
return fmt.Errorf("failed to start task after driver exited unexpectedly: %v", err)
}
} else {
return fmt.Errorf("driver start failed: %v", err)
}
}
tr.stateLock.Lock()
@ -735,16 +781,16 @@ func (tr *TaskRunner) Restore() error {
// restoreHandle ensures a TaskHandle is valid by calling Driver.RecoverTask
// and sets the driver handle. If the TaskHandle is not valid, DestroyTask is
// called.
func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstructs.DriverNetwork) {
func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstructs.DriverNetwork) (success bool) {
// Ensure handle is well-formed
if taskHandle.Config == nil {
return
return true
}
if err := tr.driver.RecoverTask(taskHandle); err != nil {
if tr.TaskState().State != structs.TaskStateRunning {
// RecoverTask should fail if the Task wasn't running
return
return true
}
tr.logger.Error("error recovering task; cleaning up",
@ -760,14 +806,15 @@ func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstruct
"error", err, "task_id", taskHandle.Config.ID)
}
return false
}
return
return true
}
// Update driver handle on task runner
tr.setDriverHandle(NewDriverHandle(tr.driver, taskHandle.Config.ID, tr.Task(), net))
return
return true
}
// UpdateState sets the task runners allocation state and triggers a server

View File

@ -254,6 +254,10 @@ func (tr *TaskRunner) poststart() error {
handle := tr.getDriverHandle()
net := handle.Network()
// Pass the lazy handle to the hooks so even if the driver exits and we
// launch a new one (external plugin), the handle will refresh.
lazyHandle := NewLazyHandle(tr.ctx, tr.getDriverHandle, tr.logger)
var merr multierror.Error
for _, hook := range tr.runnerHooks {
post, ok := hook.(interfaces.TaskPoststartHook)
@ -269,9 +273,9 @@ func (tr *TaskRunner) poststart() error {
}
req := interfaces.TaskPoststartRequest{
DriverExec: handle,
DriverExec: lazyHandle,
DriverNetwork: net,
DriverStats: handle,
DriverStats: lazyHandle,
TaskEnv: tr.envBuilder.Build(),
}
var resp interfaces.TaskPoststartResponse

View File

@ -337,3 +337,90 @@ func TestTaskRunner_Restore_HookEnv(t *testing.T) {
require.Contains(env, "mock_hook")
require.Equal("1", env["mock_hook"])
}
// This test asserts that we can recover from an "external" plugin exiting by
// retrieving a new instance of the driver and recovering the task.
func TestTaskRunner_RecoverFromDriverExiting(t *testing.T) {
t.Parallel()
require := require.New(t)
// Create an allocation using the mock driver that exits simulating the
// driver crashing. We can then test that the task runner recovers from this
alloc := mock.BatchAlloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"plugin_exit_after": "1s",
"run_for": "5s",
}
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
conf.StateDB = cstate.NewMemDB() // "persist" state between prestart calls
defer cleanup()
tr, err := NewTaskRunner(conf)
require.NoError(err)
start := time.Now()
go tr.Run()
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
// Wait for the task to be running
testWaitForTaskToStart(t, tr)
// Get the task ID
tr.stateLock.RLock()
l := tr.localState.TaskHandle
require.NotNil(l)
require.NotNil(l.Config)
require.NotEmpty(l.Config.ID)
id := l.Config.ID
tr.stateLock.RUnlock()
// Get the mock driver plugin
driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name)
require.NoError(err)
mockDriver := driverPlugin.(*mockdriver.Driver)
// Wait for the task to start
testutil.WaitForResult(func() (bool, error) {
// Get the handle and check that it was recovered
handle := mockDriver.GetHandle(id)
if handle == nil {
return false, fmt.Errorf("nil handle")
}
if !handle.Recovered {
return false, fmt.Errorf("handle not recovered")
}
return true, nil
}, func(err error) {
t.Fatal(err.Error())
})
// Wait for task to complete
select {
case <-tr.WaitCh():
case <-time.After(10 * time.Second):
}
// Ensure that we actually let the task complete
require.True(time.Now().Sub(start) > 5*time.Second)
// Check it finished successfully
state := tr.TaskState()
require.True(state.Successful())
}
// testWaitForTaskToStart waits for the task to or fails the test
func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) {
// Wait for the task to start
testutil.WaitForResult(func() (bool, error) {
tr.stateLock.RLock()
started := !tr.state.StartedAt.IsZero()
tr.stateLock.RUnlock()
return started, nil
}, func(err error) {
t.Fatalf("not started")
})
}

View File

@ -622,9 +622,6 @@ func (c *Client) Shutdown() error {
}
c.logger.Info("shutting down")
// Shutdown the plugin managers
c.pluginManagers.Shutdown()
// Stop renewing tokens and secrets
if c.vaultClient != nil {
c.vaultClient.Stop()
@ -649,6 +646,9 @@ func (c *Client) Shutdown() error {
}
arGroup.Wait()
// Shutdown the plugin managers
c.pluginManagers.Shutdown()
c.shutdown = true
close(c.shutdownCh)

View File

@ -10,6 +10,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
"github.com/hashicorp/nomad/plugins/device"
"github.com/hashicorp/nomad/plugins/shared/loader"
"github.com/hashicorp/nomad/plugins/shared/singleton"
@ -363,7 +364,7 @@ START:
// Handle any errors
if fresp.Error != nil {
if fresp.Error == base.ErrPluginShutdown {
if fresp.Error == bstructs.ErrPluginShutdown {
i.logger.Error("plugin exited unexpectedly")
goto START
}
@ -488,7 +489,7 @@ START:
// Handle any errors
if sresp.Error != nil {
if sresp.Error == base.ErrPluginShutdown {
if sresp.Error == bstructs.ErrPluginShutdown {
i.logger.Error("plugin exited unexpectedly")
goto START
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"io"
"strings"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
@ -84,12 +85,22 @@ type TaskLogger struct {
}
func (tl *TaskLogger) Close() {
var wg sync.WaitGroup
if tl.lro != nil {
tl.lro.Close()
wg.Add(1)
go func() {
tl.lro.Close()
wg.Done()
}()
}
if tl.lre != nil {
tl.lre.Close()
wg.Add(1)
go func() {
tl.lre.Close()
wg.Done()
}()
}
wg.Wait()
}
func NewTaskLogger(cfg *LogConfig, logger hclog.Logger) (*TaskLogger, error) {

View File

@ -15,7 +15,7 @@ import (
// LaunchLogMon an instance of logmon
// TODO: Integrate with base plugin loader
func LaunchLogMon(logger hclog.Logger) (LogMon, *plugin.Client, error) {
logger = logger.Named("logmon-launcher")
logger = logger.Named("logmon")
bin, err := discover.NomadExecutable()
if err != nil {
return nil, nil, err
@ -24,12 +24,13 @@ func LaunchLogMon(logger hclog.Logger) (LogMon, *plugin.Client, error) {
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: base.Handshake,
Plugins: map[string]plugin.Plugin{
"logmon": NewPlugin(NewLogMon(hclog.L().Named("logmon"))),
"logmon": &Plugin{},
},
Cmd: exec.Command(bin, "logmon"),
AllowedProtocols: []plugin.Protocol{
plugin.ProtocolGRPC,
},
Logger: logger,
})
rpcClient, err := client.Client()

View File

@ -9,6 +9,7 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/shared/loader"
"github.com/hashicorp/nomad/plugins/shared/singleton"
@ -447,6 +448,11 @@ func (i *instanceManager) handleEvents() {
// handleEvent looks up the event handler(s) for the event and runs them
func (i *instanceManager) handleEvent(ev *drivers.TaskEvent) {
// Do not emit that the plugin is shutdown
if ev.Err != nil && ev.Err == bstructs.ErrPluginShutdown {
return
}
if handler := i.eventHandlerFactory(ev.AllocID, ev.TaskName); handler != nil {
i.logger.Trace("task event received", "event", ev)
handler(ev)

View File

@ -180,6 +180,10 @@ type MemoryStats struct {
}
func (ms *MemoryStats) Add(other *MemoryStats) {
if other == nil {
return
}
ms.RSS += other.RSS
ms.Cache += other.Cache
ms.Swap += other.Swap
@ -203,6 +207,10 @@ type CpuStats struct {
}
func (cs *CpuStats) Add(other *CpuStats) {
if other == nil {
return
}
cs.SystemMode += other.SystemMode
cs.UserMode += other.UserMode
cs.TotalTicks += other.TotalTicks
@ -229,7 +237,7 @@ func (ru *ResourceUsage) Add(other *ResourceUsage) {
// and the resource usage of the individual pids
type TaskResourceUsage struct {
ResourceUsage *ResourceUsage
Timestamp int64
Timestamp int64 // UnixNano
Pids map[string]*ResourceUsage
}

View File

@ -6,7 +6,7 @@ import (
"strings"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/plugins/base"

View File

@ -25,10 +25,15 @@ func (e *LogMonPluginCommand) Synopsis() string {
}
func (e *LogMonPluginCommand) Run(args []string) int {
logger := hclog.New(&hclog.LoggerOptions{
Level: hclog.Trace,
JSONFormat: true,
Name: "logmon",
})
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: base.Handshake,
Plugins: map[string]plugin.Plugin{
"logmon": logmon.NewPlugin(logmon.NewLogMon(hclog.Default().Named("logmon"))),
"logmon": logmon.NewPlugin(logmon.NewLogMon(logger)),
},
GRPCServer: plugin.DefaultGRPCServer,
})

View File

@ -0,0 +1,44 @@
// This package provides a mechanism to build the Docker driver plugin as an
// external binary. The binary has two entry points; the docker driver and the
// docker plugin's logging child binary. An example of using this is `go build
// -o </nomad/plugin/dir/docker`. When Nomad agent is then launched, the
// external docker plugin will be used.
package main
import (
"os"
log "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/drivers/docker"
"github.com/hashicorp/nomad/drivers/docker/docklog"
"github.com/hashicorp/nomad/plugins"
"github.com/hashicorp/nomad/plugins/base"
)
func main() {
if len(os.Args) > 1 {
// Detect if we are being launched as a docker logging plugin
switch os.Args[1] {
case docklog.PluginName:
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: base.Handshake,
Plugins: map[string]plugin.Plugin{
docklog.PluginName: docklog.NewPlugin(docklog.NewDockerLogger(log.Default().Named(docklog.PluginName))),
},
GRPCServer: plugin.DefaultGRPCServer,
})
return
}
}
// Serve the plugin
plugins.Serve(factory)
}
// factory returns a new instance of the docker driver plugin
func factory(log log.Logger) interface{} {
return docker.NewDockerDriver(log)
}

View File

@ -21,8 +21,8 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/hclutils"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"
@ -606,11 +606,11 @@ touch: cannot touch '/tmp/task-path-ro/testfile-from-ro': Read-only file system`
func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) {
evalCtx := &hcl.EvalContext{
Functions: shared.GetStdlibFuncs(),
Functions: hclutils.GetStdlibFuncs(),
}
spec, diag := hclspec.Convert(taskConfigSpec)
require.False(diag.HasErrors())
taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx)
taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx)
require.False(diag.HasErrors())
err := task.EncodeDriverConfig(taskConfigCtyVal)
require.Nil(err)

View File

@ -19,8 +19,8 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/hclutils"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
@ -272,11 +272,11 @@ func encodeDriverHelper(t *testing.T, task *drivers.TaskConfig, taskConfig map[s
t.Helper()
evalCtx := &hcl.EvalContext{
Functions: shared.GetStdlibFuncs(),
Functions: hclutils.GetStdlibFuncs(),
}
spec, diag := hclspec.Convert(taskConfigSpec)
require.False(t, diag.HasErrors())
taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx)
taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx)
require.Empty(t, diag.Errs())
err := task.EncodeDriverConfig(taskConfigCtyVal)
require.Nil(t, err)

View File

@ -19,8 +19,8 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/hclutils"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
lxc "gopkg.in/lxc/go-lxc.v2"
@ -269,11 +269,11 @@ func requireLXC(t *testing.T) {
func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) {
evalCtx := &hcl.EvalContext{
Functions: shared.GetStdlibFuncs(),
Functions: hclutils.GetStdlibFuncs(),
}
spec, diag := hclspec.Convert(taskConfigSpec)
require.False(diag.HasErrors())
taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx)
taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx)
require.False(diag.HasErrors())
err := task.EncodeDriverConfig(taskConfigCtyVal)
require.Nil(err)

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
@ -67,6 +68,7 @@ var (
"start_error_recoverable": hclspec.NewAttr("start_error_recoverable", "bool", false),
"start_block_for": hclspec.NewAttr("start_block_for", "string", false),
"kill_after": hclspec.NewAttr("kill_after", "string", false),
"plugin_exit_after": hclspec.NewAttr("plugin_exit_after", "string", false),
"run_for": hclspec.NewAttr("run_for", "string", false),
"exit_code": hclspec.NewAttr("exit_code", "number", false),
"exit_signal": hclspec.NewAttr("exit_signal", "number", false),
@ -153,6 +155,10 @@ type Config struct {
// TaskConfig is the driver configuration of a task within a job
type TaskConfig struct {
// PluginExitAfter is the duration after which the mock driver indicates the
// plugin has exited via the WaitTask call.
PluginExitAfter string `codec:"plugin_exit_after"`
pluginExitAfterDuration time.Duration
// StartErr specifies the error that should be returned when starting the
// mock driver.
@ -213,8 +219,7 @@ type TaskConfig struct {
}
type MockTaskState struct {
TaskConfig *drivers.TaskConfig
StartedAt time.Time
StartedAt time.Time
}
func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) {
@ -289,42 +294,95 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
}
}
func (d *Driver) RecoverTask(h *drivers.TaskHandle) error {
if h == nil {
func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
if handle == nil {
return fmt.Errorf("handle cannot be nil")
}
if _, ok := d.tasks.Get(h.Config.ID); ok {
d.logger.Debug("nothing to recover; task already exists",
"task_id", h.Config.ID,
"task_name", h.Config.Name,
)
return nil
// Unmarshall the driver state and create a new handle
var taskState MockTaskState
if err := handle.GetDriverState(&taskState); err != nil {
d.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to decode task state from handle: %v", err)
}
// Recovering a task requires the task to be running external to the
// plugin. Since the mock_driver runs all tasks in process it cannot
// recover tasks.
return fmt.Errorf("%s cannot recover tasks", pluginName)
driverCfg, err := parseDriverConfig(handle.Config)
if err != nil {
d.logger.Error("failed to parse driver config from handle", "error", err, "task_id", handle.Config.ID, "config", hclog.Fmt("%+v", handle.Config))
return fmt.Errorf("failed to parse driver config from handle: %v", err)
}
// Remove the plugin exit time if set
driverCfg.pluginExitAfterDuration = 0
// Correct the run_for time based on how long it has already been running
now := time.Now()
driverCfg.runForDuration = driverCfg.runForDuration - now.Sub(taskState.StartedAt)
h := newTaskHandle(handle.Config, driverCfg, d.logger)
h.Recovered = true
d.tasks.Set(handle.Config.ID, h)
go h.run()
return nil
}
func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) {
func parseDriverConfig(cfg *drivers.TaskConfig) (*TaskConfig, error) {
var driverConfig TaskConfig
if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
return nil, nil, err
return nil, err
}
var err error
if driverConfig.startBlockForDuration, err = parseDuration(driverConfig.StartBlockFor); err != nil {
return nil, nil, fmt.Errorf("start_block_for %v not a valid duration: %v", driverConfig.StartBlockFor, err)
return nil, fmt.Errorf("start_block_for %v not a valid duration: %v", driverConfig.StartBlockFor, err)
}
if driverConfig.runForDuration, err = parseDuration(driverConfig.RunFor); err != nil {
return nil, nil, fmt.Errorf("run_for %v not a valid duration: %v", driverConfig.RunFor, err)
return nil, fmt.Errorf("run_for %v not a valid duration: %v", driverConfig.RunFor, err)
}
if driverConfig.pluginExitAfterDuration, err = parseDuration(driverConfig.PluginExitAfter); err != nil {
return nil, fmt.Errorf("plugin_exit_after %v not a valid duration: %v", driverConfig.PluginExitAfter, err)
}
if driverConfig.stdoutRepeatDuration, err = parseDuration(driverConfig.StdoutRepeatDur); err != nil {
return nil, nil, fmt.Errorf("stdout_repeat_duration %v not a valid duration: %v", driverConfig.stdoutRepeatDuration, err)
return nil, fmt.Errorf("stdout_repeat_duration %v not a valid duration: %v", driverConfig.stdoutRepeatDuration, err)
}
return &driverConfig, nil
}
func newTaskHandle(cfg *drivers.TaskConfig, driverConfig *TaskConfig, logger hclog.Logger) *taskHandle {
killCtx, killCancel := context.WithCancel(context.Background())
h := &taskHandle{
taskConfig: cfg,
runFor: driverConfig.runForDuration,
pluginExitAfter: driverConfig.pluginExitAfterDuration,
killAfter: driverConfig.killAfterDuration,
exitCode: driverConfig.ExitCode,
exitSignal: driverConfig.ExitSignal,
stdoutString: driverConfig.StdoutString,
stdoutRepeat: driverConfig.StdoutRepeat,
stdoutRepeatDur: driverConfig.stdoutRepeatDuration,
logger: logger.With("task_name", cfg.Name),
waitCh: make(chan struct{}),
killCh: killCtx.Done(),
kill: killCancel,
startedAt: time.Now(),
}
if driverConfig.ExitErrMsg != "" {
h.exitErr = errors.New(driverConfig.ExitErrMsg)
}
if driverConfig.SignalErr != "" {
h.signalErr = fmt.Errorf(driverConfig.SignalErr)
}
return h
}
func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) {
driverConfig, err := parseDriverConfig(cfg)
if err != nil {
return nil, nil, err
}
if driverConfig.startBlockForDuration != 0 {
@ -334,7 +392,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru
// Store last configs
d.lastMu.Lock()
d.lastDriverTaskConfig = cfg
d.lastTaskConfig = &driverConfig
d.lastTaskConfig = driverConfig
d.lastMu.Unlock()
if driverConfig.StartErr != "" {
@ -358,32 +416,9 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru
net.PortMap = map[string]int{parts[0]: port}
}
killCtx, killCancel := context.WithCancel(context.Background())
h := &taskHandle{
taskConfig: cfg,
runFor: driverConfig.runForDuration,
killAfter: driverConfig.killAfterDuration,
exitCode: driverConfig.ExitCode,
exitSignal: driverConfig.ExitSignal,
stdoutString: driverConfig.StdoutString,
stdoutRepeat: driverConfig.StdoutRepeat,
stdoutRepeatDur: driverConfig.stdoutRepeatDuration,
logger: d.logger.With("task_name", cfg.Name),
waitCh: make(chan struct{}),
killCh: killCtx.Done(),
kill: killCancel,
}
if driverConfig.ExitErrMsg != "" {
h.exitErr = errors.New(driverConfig.ExitErrMsg)
}
if driverConfig.SignalErr != "" {
h.signalErr = fmt.Errorf(driverConfig.SignalErr)
}
h := newTaskHandle(cfg, driverConfig, d.logger)
driverState := MockTaskState{
TaskConfig: cfg,
StartedAt: h.startedAt,
StartedAt: h.startedAt,
}
handle := drivers.NewTaskHandle(pluginName)
handle.Config = cfg
@ -461,8 +496,17 @@ func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
}
func (d *Driver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) {
//TODO return an error?
return nil, nil
// Generate random value for the memory usage
s := &cstructs.TaskResourceUsage{
ResourceUsage: &cstructs.ResourceUsage{
MemoryStats: &cstructs.MemoryStats{
RSS: rand.Uint64(),
Measured: []string{"RSS"},
},
},
Timestamp: time.Now().UTC().UnixNano(),
}
return s, nil
}
func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
@ -499,3 +543,10 @@ func (d *Driver) GetTaskConfig() (*drivers.TaskConfig, *TaskConfig) {
defer d.lastMu.Unlock()
return d.lastDriverTaskConfig, d.lastTaskConfig
}
// GetHandle is unique to the mock driver and for testing purposes only. It
// returns the handle of the given task ID
func (d *Driver) GetHandle(taskID string) *taskHandle {
h, _ := d.tasks.Get(taskID)
return h
}

View File

@ -8,6 +8,7 @@ import (
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/lib/fifo"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
@ -16,6 +17,7 @@ type taskHandle struct {
logger hclog.Logger
runFor time.Duration
pluginExitAfter time.Duration
killAfter time.Duration
waitCh chan struct{}
exitCode int
@ -39,6 +41,9 @@ type taskHandle struct {
// Calling kill closes killCh if it is not already closed
kill context.CancelFunc
killCh <-chan struct{}
// Recovered is set to true if the handle was created while being recovered
Recovered bool
}
func (h *taskHandle) TaskStatus() *drivers.TaskStatus {
@ -79,18 +84,30 @@ func (h *taskHandle) run() {
errCh := make(chan error, 1)
// Setup logging output
if h.stdoutString != "" {
go h.handleLogging(errCh)
}
go h.handleLogging(errCh)
timer := time.NewTimer(h.runFor)
defer timer.Stop()
var pluginExitTimer <-chan time.Time
if h.pluginExitAfter != 0 {
timer := time.NewTimer(h.pluginExitAfter)
defer timer.Stop()
pluginExitTimer = timer.C
}
select {
case <-timer.C:
h.logger.Debug("run_for time elapsed; exiting", "run_for", h.runFor)
case <-h.killCh:
h.logger.Debug("killed; exiting")
case <-pluginExitTimer:
h.logger.Debug("exiting plugin")
h.exitResult = &drivers.ExitResult{
Err: bstructs.ErrPluginShutdown,
}
return
case err := <-errCh:
h.logger.Error("error running mock task; exiting", "error", err)
h.exitResult = &drivers.ExitResult{
@ -114,6 +131,18 @@ func (h *taskHandle) handleLogging(errCh chan<- error) {
errCh <- err
return
}
stderr, err := fifo.Open(h.taskConfig.StderrPath)
if err != nil {
h.logger.Error("failed to write to stderr", "error", err)
errCh <- err
return
}
defer stderr.Close()
if h.stdoutString == "" {
return
}
if _, err := io.WriteString(stdout, h.stdoutString); err != nil {
h.logger.Error("failed to write to stdout", "error", err)
errCh <- err

View File

@ -17,8 +17,8 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/hclutils"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
@ -203,11 +203,11 @@ func TestQemuDriver_GetMonitorPathNewQemu(t *testing.T) {
//encodeDriverhelper sets up the task config spec and encodes qemu specific driver configuration
func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) {
evalCtx := &hcl.EvalContext{
Functions: shared.GetStdlibFuncs(),
Functions: hclutils.GetStdlibFuncs(),
}
spec, diag := hclspec.Convert(taskConfigSpec)
require.False(diag.HasErrors(), diag.Error())
taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx)
taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx)
require.False(diag.HasErrors(), diag.Error())
err := task.EncodeDriverConfig(taskConfigCtyVal)
require.Nil(err)

View File

@ -20,8 +20,8 @@ import (
basePlug "github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/hclutils"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
@ -500,11 +500,11 @@ func TestRawExecDriver_Exec(t *testing.T) {
func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) {
evalCtx := &hcl.EvalContext{
Functions: shared.GetStdlibFuncs(),
Functions: hclutils.GetStdlibFuncs(),
}
spec, diag := hclspec.Convert(taskConfigSpec)
require.False(diag.HasErrors())
taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx)
taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx)
require.False(diag.HasErrors())
err := task.EncodeDriverConfig(taskConfigCtyVal)
require.Nil(err)

View File

@ -16,14 +16,15 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"syscall"
"time"
appcschema "github.com/appc/spec/schema"
"github.com/hashicorp/consul-template/signals"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/go-version"
hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
@ -193,6 +194,10 @@ type Driver struct {
// logger will log to the Nomad agent
logger hclog.Logger
// hasFingerprinted is used to store whether we have fingerprinted before
hasFingerprinted bool
fingerprintLock sync.Mutex
}
func NewRktDriver(logger hclog.Logger) drivers.DriverPlugin {
@ -261,7 +266,25 @@ func (d *Driver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerp
}
}
// setFingerprinted marks the driver as having fingerprinted once before
func (d *Driver) setFingerprinted() {
d.fingerprintLock.Lock()
d.hasFingerprinted = true
d.fingerprintLock.Unlock()
}
// fingerprinted returns whether the driver has fingerprinted before
func (d *Driver) fingerprinted() bool {
d.fingerprintLock.Lock()
defer d.fingerprintLock.Unlock()
return d.hasFingerprinted
}
func (d *Driver) buildFingerprint() *drivers.Fingerprint {
defer func() {
d.setFingerprinted()
}()
fingerprint := &drivers.Fingerprint{
Attributes: map[string]*pstructs.Attribute{},
Health: drivers.HealthStateHealthy,
@ -270,6 +293,9 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
// Only enable if we are root
if syscall.Geteuid() != 0 {
if !d.fingerprinted() {
d.logger.Debug("must run as root user, disabling")
}
fingerprint.Health = drivers.HealthStateUndetected
fingerprint.HealthDescription = drivers.DriverRequiresRootMessage
return fingerprint
@ -297,6 +323,10 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
// Do not allow ancient rkt versions
fingerprint.Health = drivers.HealthStateUndetected
fingerprint.HealthDescription = fmt.Sprintf("Unsuported rkt version %s", currentVersion)
if !d.fingerprinted() {
d.logger.Warn("unsupported rkt version please upgrade to >= "+minVersion.String(),
"rkt_version", currentVersion)
}
return fingerprint
}

View File

@ -22,8 +22,8 @@ import (
basePlug "github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/hclutils"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"
@ -874,11 +874,11 @@ func TestRktDriver_Stats(t *testing.T) {
func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) {
evalCtx := &hcl.EvalContext{
Functions: shared.GetStdlibFuncs(),
Functions: hclutils.GetStdlibFuncs(),
}
spec, diag := hclspec.Convert(taskConfigSpec)
require.False(diag.HasErrors())
taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx)
taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx)
if diag.HasErrors() {
fmt.Println("conversion error", diag.Error())
}

View File

@ -5,7 +5,7 @@ import (
"net"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
plugin "github.com/hashicorp/go-plugin"
)
// ExecutorConfig is the config that Nomad passes to the executor

View File

@ -28,7 +28,7 @@ import (
"github.com/hashicorp/consul/api"
hcodec "github.com/hashicorp/go-msgpack/codec"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/args"
@ -5743,17 +5743,7 @@ func (ts *TaskState) Copy() *TaskState {
// have meaning on a non-batch allocation because a service and system
// allocation should not finish.
func (ts *TaskState) Successful() bool {
l := len(ts.Events)
if ts.State != TaskStateDead || l == 0 {
return false
}
e := ts.Events[l-1]
if e.Type != TaskTerminated {
return false
}
return e.ExitCode == 0
return ts.State == TaskStateDead && !ts.Failed
}
const (

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/hashicorp/nomad/plugins/base/proto"
"github.com/hashicorp/nomad/plugins/shared/grpcutils"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
)
@ -20,7 +21,7 @@ type BasePluginClient struct {
func (b *BasePluginClient) PluginInfo() (*PluginInfoResponse, error) {
presp, err := b.Client.PluginInfo(b.DoneCtx, &proto.PluginInfoRequest{})
if err != nil {
return nil, err
return nil, grpcutils.HandleGrpcErr(err, b.DoneCtx)
}
var ptype string
@ -46,7 +47,7 @@ func (b *BasePluginClient) PluginInfo() (*PluginInfoResponse, error) {
func (b *BasePluginClient) ConfigSchema() (*hclspec.Spec, error) {
presp, err := b.Client.ConfigSchema(b.DoneCtx, &proto.ConfigSchemaRequest{})
if err != nil {
return nil, err
return nil, grpcutils.HandleGrpcErr(err, b.DoneCtx)
}
return presp.GetSpec(), nil
@ -60,5 +61,5 @@ func (b *BasePluginClient) SetConfig(c *Config) error {
PluginApiVersion: c.ApiVersion,
})
return err
return grpcutils.HandleGrpcErr(err, b.DoneCtx)
}

View File

@ -3,7 +3,6 @@ package base
import (
"bytes"
"context"
"errors"
"reflect"
plugin "github.com/hashicorp/go-plugin"
@ -30,9 +29,6 @@ var (
MagicCookieKey: "NOMAD_PLUGIN_MAGIC_COOKIE",
MagicCookieValue: "e4327c2e01eabfd75a8a67adb114fb34a757d57eee7728d857a8cec6e91a7255",
}
// ErrPluginShutdown is returned when the plugin has shutdown.
ErrPluginShutdown = errors.New("plugin is shut down")
)
// PluginBase is wraps a BasePlugin and implements go-plugins GRPCPlugin

View File

@ -0,0 +1,12 @@
package structs
import "errors"
const (
errPluginShutdown = "plugin is shut down"
)
var (
// ErrPluginShutdown is returned when the plugin has shutdown.
ErrPluginShutdown = errors.New(errPluginShutdown)
)

View File

@ -9,7 +9,7 @@ import (
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/device/proto"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/grpcutils"
)
// devicePluginClient implements the client side of a remote device plugin, using
@ -30,12 +30,12 @@ type devicePluginClient struct {
// cancelled, the error will be propogated.
func (d *devicePluginClient) Fingerprint(ctx context.Context) (<-chan *FingerprintResponse, error) {
// Join the passed context and the shutdown context
ctx, _ = joincontext.Join(ctx, d.doneCtx)
joinedCtx, _ := joincontext.Join(ctx, d.doneCtx)
var req proto.FingerprintRequest
stream, err := d.client.Fingerprint(ctx, &req)
stream, err := d.client.Fingerprint(joinedCtx, &req)
if err != nil {
return nil, err
return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx)
}
out := make(chan *FingerprintResponse, 1)
@ -47,7 +47,7 @@ func (d *devicePluginClient) Fingerprint(ctx context.Context) (<-chan *Fingerpri
// the gRPC stream to a channel. Exits either when context is cancelled or the
// stream has an error.
func (d *devicePluginClient) handleFingerprint(
ctx context.Context,
reqCtx context.Context,
stream proto.DevicePlugin_FingerprintClient,
out chan *FingerprintResponse) {
@ -57,7 +57,7 @@ func (d *devicePluginClient) handleFingerprint(
if err != nil {
if err != io.EOF {
out <- &FingerprintResponse{
Error: shared.HandleStreamErr(err, ctx, d.doneCtx),
Error: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx),
}
}
@ -70,7 +70,7 @@ func (d *devicePluginClient) handleFingerprint(
Devices: convertProtoDeviceGroups(resp.GetDeviceGroup()),
}
select {
case <-ctx.Done():
case <-reqCtx.Done():
return
case out <- f:
}
@ -86,7 +86,7 @@ func (d *devicePluginClient) Reserve(deviceIDs []string) (*ContainerReservation,
// Make the request
resp, err := d.client.Reserve(d.doneCtx, req)
if err != nil {
return nil, err
return nil, grpcutils.HandleGrpcErr(err, d.doneCtx)
}
// Convert the response
@ -100,14 +100,14 @@ func (d *devicePluginClient) Reserve(deviceIDs []string) (*ContainerReservation,
// propogated.
func (d *devicePluginClient) Stats(ctx context.Context, interval time.Duration) (<-chan *StatsResponse, error) {
// Join the passed context and the shutdown context
ctx, _ = joincontext.Join(ctx, d.doneCtx)
joinedCtx, _ := joincontext.Join(ctx, d.doneCtx)
req := proto.StatsRequest{
CollectionInterval: ptypes.DurationProto(interval),
}
stream, err := d.client.Stats(ctx, &req)
stream, err := d.client.Stats(joinedCtx, &req)
if err != nil {
return nil, err
return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx)
}
out := make(chan *StatsResponse, 1)
@ -119,7 +119,7 @@ func (d *devicePluginClient) Stats(ctx context.Context, interval time.Duration)
// the gRPC stream to a channel. Exits either when context is cancelled or the
// stream has an error.
func (d *devicePluginClient) handleStats(
ctx context.Context,
reqCtx context.Context,
stream proto.DevicePlugin_StatsClient,
out chan *StatsResponse) {
@ -129,7 +129,7 @@ func (d *devicePluginClient) handleStats(
if err != nil {
if err != io.EOF {
out <- &StatsResponse{
Error: shared.HandleStreamErr(err, ctx, d.doneCtx),
Error: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx),
}
}
@ -142,7 +142,7 @@ func (d *devicePluginClient) handleStats(
Groups: convertProtoDeviceGroupsStats(resp.GetGroups()),
}
select {
case <-ctx.Done():
case <-reqCtx.Done():
return
case out <- s:
}

View File

@ -8,12 +8,11 @@ import (
"github.com/LK4D4/joincontext"
"github.com/golang/protobuf/ptypes"
hclog "github.com/hashicorp/go-hclog"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers/proto"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/grpcutils"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
sproto "github.com/hashicorp/nomad/plugins/shared/structs/proto"
@ -26,7 +25,6 @@ type driverPluginClient struct {
*base.BasePluginClient
client proto.DriverClient
logger hclog.Logger
// doneCtx is closed when the plugin exits
doneCtx context.Context
@ -37,7 +35,7 @@ func (d *driverPluginClient) TaskConfigSchema() (*hclspec.Spec, error) {
resp, err := d.client.TaskConfigSchema(d.doneCtx, req)
if err != nil {
return nil, err
return nil, grpcutils.HandleGrpcErr(err, d.doneCtx)
}
return resp.Spec, nil
@ -48,7 +46,7 @@ func (d *driverPluginClient) Capabilities() (*Capabilities, error) {
resp, err := d.client.Capabilities(d.doneCtx, req)
if err != nil {
return nil, err
return nil, grpcutils.HandleGrpcErr(err, d.doneCtx)
}
caps := &Capabilities{}
@ -76,11 +74,11 @@ func (d *driverPluginClient) Fingerprint(ctx context.Context) (<-chan *Fingerpri
req := &proto.FingerprintRequest{}
// Join the passed context and the shutdown context
ctx, _ = joincontext.Join(ctx, d.doneCtx)
joinedCtx, _ := joincontext.Join(ctx, d.doneCtx)
stream, err := d.client.Fingerprint(ctx, req)
stream, err := d.client.Fingerprint(joinedCtx, req)
if err != nil {
return nil, err
return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx)
}
ch := make(chan *Fingerprint, 1)
@ -89,15 +87,14 @@ func (d *driverPluginClient) Fingerprint(ctx context.Context) (<-chan *Fingerpri
return ch, nil
}
func (d *driverPluginClient) handleFingerprint(ctx context.Context, ch chan *Fingerprint, stream proto.Driver_FingerprintClient) {
func (d *driverPluginClient) handleFingerprint(reqCtx context.Context, ch chan *Fingerprint, stream proto.Driver_FingerprintClient) {
defer close(ch)
for {
pb, err := stream.Recv()
if err != nil {
if err != io.EOF {
d.logger.Error("error receiving stream from Fingerprint driver RPC", "error", err)
ch <- &Fingerprint{
Err: shared.HandleStreamErr(err, ctx, d.doneCtx),
Err: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx),
}
}
@ -112,7 +109,7 @@ func (d *driverPluginClient) handleFingerprint(ctx context.Context, ch chan *Fin
}
select {
case <-ctx.Done():
case <-reqCtx.Done():
return
case ch <- f:
}
@ -125,7 +122,7 @@ func (d *driverPluginClient) RecoverTask(h *TaskHandle) error {
req := &proto.RecoverTaskRequest{Handle: taskHandleToProto(h)}
_, err := d.client.RecoverTask(d.doneCtx, req)
return err
return grpcutils.HandleGrpcErr(err, d.doneCtx)
}
// StartTask starts execution of a task with the given TaskConfig. A TaskHandle
@ -144,7 +141,7 @@ func (d *driverPluginClient) StartTask(c *TaskConfig) (*TaskHandle, *cstructs.Dr
return nil, nil, structs.NewRecoverableError(err, rec.Recoverable)
}
}
return nil, nil, err
return nil, nil, grpcutils.HandleGrpcErr(err, d.doneCtx)
}
var net *cstructs.DriverNetwork
@ -168,10 +165,6 @@ func (d *driverPluginClient) StartTask(c *TaskConfig) (*TaskHandle, *cstructs.Dr
// the same task without issue.
func (d *driverPluginClient) WaitTask(ctx context.Context, id string) (<-chan *ExitResult, error) {
ch := make(chan *ExitResult)
// Join the passed context and the shutdown context
ctx, _ = joincontext.Join(ctx, d.doneCtx)
go d.handleWaitTask(ctx, id, ch)
return ch, nil
}
@ -183,9 +176,12 @@ func (d *driverPluginClient) handleWaitTask(ctx context.Context, id string, ch c
TaskId: id,
}
resp, err := d.client.WaitTask(ctx, req)
// Join the passed context and the shutdown context
joinedCtx, _ := joincontext.Join(ctx, d.doneCtx)
resp, err := d.client.WaitTask(joinedCtx, req)
if err != nil {
result.Err = err
result.Err = grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx)
} else {
result.ExitCode = int(resp.Result.ExitCode)
result.Signal = int(resp.Result.Signal)
@ -209,7 +205,7 @@ func (d *driverPluginClient) StopTask(taskID string, timeout time.Duration, sign
}
_, err := d.client.StopTask(d.doneCtx, req)
return err
return grpcutils.HandleGrpcErr(err, d.doneCtx)
}
// DestroyTask removes the task from the driver's in memory state. The task
@ -222,7 +218,7 @@ func (d *driverPluginClient) DestroyTask(taskID string, force bool) error {
}
_, err := d.client.DestroyTask(d.doneCtx, req)
return err
return grpcutils.HandleGrpcErr(err, d.doneCtx)
}
// InspectTask returns status information for a task
@ -231,7 +227,7 @@ func (d *driverPluginClient) InspectTask(taskID string) (*TaskStatus, error) {
resp, err := d.client.InspectTask(d.doneCtx, req)
if err != nil {
return nil, err
return nil, grpcutils.HandleGrpcErr(err, d.doneCtx)
}
status, err := taskStatusFromProto(resp.Task)
@ -262,7 +258,7 @@ func (d *driverPluginClient) TaskStats(taskID string) (*cstructs.TaskResourceUsa
resp, err := d.client.TaskStats(d.doneCtx, req)
if err != nil {
return nil, err
return nil, grpcutils.HandleGrpcErr(err, d.doneCtx)
}
stats, err := TaskStatsFromProto(resp.Stats)
@ -279,11 +275,11 @@ func (d *driverPluginClient) TaskEvents(ctx context.Context) (<-chan *TaskEvent,
req := &proto.TaskEventsRequest{}
// Join the passed context and the shutdown context
ctx, _ = joincontext.Join(ctx, d.doneCtx)
joinedCtx, _ := joincontext.Join(ctx, d.doneCtx)
stream, err := d.client.TaskEvents(ctx, req)
stream, err := d.client.TaskEvents(joinedCtx, req)
if err != nil {
return nil, err
return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx)
}
ch := make(chan *TaskEvent, 1)
@ -291,15 +287,14 @@ func (d *driverPluginClient) TaskEvents(ctx context.Context) (<-chan *TaskEvent,
return ch, nil
}
func (d *driverPluginClient) handleTaskEvents(ctx context.Context, ch chan *TaskEvent, stream proto.Driver_TaskEventsClient) {
func (d *driverPluginClient) handleTaskEvents(reqCtx context.Context, ch chan *TaskEvent, stream proto.Driver_TaskEventsClient) {
defer close(ch)
for {
ev, err := stream.Recv()
if err != nil {
if err != io.EOF {
d.logger.Error("error receiving stream from TaskEvents driver RPC", "error", err)
ch <- &TaskEvent{
Err: shared.HandleStreamErr(err, ctx, d.doneCtx),
Err: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx),
}
}
@ -317,7 +312,7 @@ func (d *driverPluginClient) handleTaskEvents(ctx context.Context, ch chan *Task
Timestamp: timestamp,
}
select {
case <-ctx.Done():
case <-reqCtx.Done():
return
case ch <- event:
}
@ -331,7 +326,7 @@ func (d *driverPluginClient) SignalTask(taskID string, signal string) error {
Signal: signal,
}
_, err := d.client.SignalTask(d.doneCtx, req)
return err
return grpcutils.HandleGrpcErr(err, d.doneCtx)
}
// ExecTask will run the given command within the execution context of the task.
@ -347,7 +342,7 @@ func (d *driverPluginClient) ExecTask(taskID string, cmd []string, timeout time.
resp, err := d.client.ExecTask(d.doneCtx, req)
if err != nil {
return nil, err
return nil, grpcutils.HandleGrpcErr(err, d.doneCtx)
}
result := &ExecTaskResult{

View File

@ -19,10 +19,9 @@ type PluginDriver struct {
logger hclog.Logger
}
func NewDriverPlugin(d DriverPlugin, logger hclog.Logger) plugin.GRPCPlugin {
func NewDriverPlugin(d DriverPlugin) plugin.GRPCPlugin {
return &PluginDriver{
impl: d,
logger: logger.Named("driver_plugin"),
impl: d,
}
}
@ -42,7 +41,6 @@ func (p *PluginDriver) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker
Client: baseproto.NewBasePluginClient(c),
},
client: proto.NewDriverClient(c),
logger: p.logger,
doneCtx: ctx,
}, nil
}

View File

@ -50,7 +50,7 @@ func (x TaskState) String() string {
return proto.EnumName(TaskState_name, int32(x))
}
func (TaskState) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{0}
return fileDescriptor_driver_66cfa35dd20ec741, []int{0}
}
type FingerprintResponse_HealthState int32
@ -76,7 +76,7 @@ func (x FingerprintResponse_HealthState) String() string {
return proto.EnumName(FingerprintResponse_HealthState_name, int32(x))
}
func (FingerprintResponse_HealthState) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{5, 0}
return fileDescriptor_driver_66cfa35dd20ec741, []int{5, 0}
}
type StartTaskResponse_Result int32
@ -102,7 +102,7 @@ func (x StartTaskResponse_Result) String() string {
return proto.EnumName(StartTaskResponse_Result_name, int32(x))
}
func (StartTaskResponse_Result) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{9, 0}
return fileDescriptor_driver_66cfa35dd20ec741, []int{9, 0}
}
type DriverCapabilities_FSIsolation int32
@ -128,7 +128,7 @@ func (x DriverCapabilities_FSIsolation) String() string {
return proto.EnumName(DriverCapabilities_FSIsolation_name, int32(x))
}
func (DriverCapabilities_FSIsolation) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{25, 0}
return fileDescriptor_driver_66cfa35dd20ec741, []int{25, 0}
}
type CPUUsage_Fields int32
@ -163,7 +163,7 @@ func (x CPUUsage_Fields) String() string {
return proto.EnumName(CPUUsage_Fields_name, int32(x))
}
func (CPUUsage_Fields) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{43, 0}
return fileDescriptor_driver_66cfa35dd20ec741, []int{43, 0}
}
type MemoryUsage_Fields int32
@ -195,7 +195,7 @@ func (x MemoryUsage_Fields) String() string {
return proto.EnumName(MemoryUsage_Fields_name, int32(x))
}
func (MemoryUsage_Fields) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{44, 0}
return fileDescriptor_driver_66cfa35dd20ec741, []int{44, 0}
}
type TaskConfigSchemaRequest struct {
@ -208,7 +208,7 @@ func (m *TaskConfigSchemaRequest) Reset() { *m = TaskConfigSchemaRequest
func (m *TaskConfigSchemaRequest) String() string { return proto.CompactTextString(m) }
func (*TaskConfigSchemaRequest) ProtoMessage() {}
func (*TaskConfigSchemaRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{0}
return fileDescriptor_driver_66cfa35dd20ec741, []int{0}
}
func (m *TaskConfigSchemaRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TaskConfigSchemaRequest.Unmarshal(m, b)
@ -240,7 +240,7 @@ func (m *TaskConfigSchemaResponse) Reset() { *m = TaskConfigSchemaRespon
func (m *TaskConfigSchemaResponse) String() string { return proto.CompactTextString(m) }
func (*TaskConfigSchemaResponse) ProtoMessage() {}
func (*TaskConfigSchemaResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{1}
return fileDescriptor_driver_66cfa35dd20ec741, []int{1}
}
func (m *TaskConfigSchemaResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TaskConfigSchemaResponse.Unmarshal(m, b)
@ -277,7 +277,7 @@ func (m *CapabilitiesRequest) Reset() { *m = CapabilitiesRequest{} }
func (m *CapabilitiesRequest) String() string { return proto.CompactTextString(m) }
func (*CapabilitiesRequest) ProtoMessage() {}
func (*CapabilitiesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{2}
return fileDescriptor_driver_66cfa35dd20ec741, []int{2}
}
func (m *CapabilitiesRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CapabilitiesRequest.Unmarshal(m, b)
@ -312,7 +312,7 @@ func (m *CapabilitiesResponse) Reset() { *m = CapabilitiesResponse{} }
func (m *CapabilitiesResponse) String() string { return proto.CompactTextString(m) }
func (*CapabilitiesResponse) ProtoMessage() {}
func (*CapabilitiesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{3}
return fileDescriptor_driver_66cfa35dd20ec741, []int{3}
}
func (m *CapabilitiesResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CapabilitiesResponse.Unmarshal(m, b)
@ -349,7 +349,7 @@ func (m *FingerprintRequest) Reset() { *m = FingerprintRequest{} }
func (m *FingerprintRequest) String() string { return proto.CompactTextString(m) }
func (*FingerprintRequest) ProtoMessage() {}
func (*FingerprintRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{4}
return fileDescriptor_driver_66cfa35dd20ec741, []int{4}
}
func (m *FingerprintRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FingerprintRequest.Unmarshal(m, b)
@ -392,7 +392,7 @@ func (m *FingerprintResponse) Reset() { *m = FingerprintResponse{} }
func (m *FingerprintResponse) String() string { return proto.CompactTextString(m) }
func (*FingerprintResponse) ProtoMessage() {}
func (*FingerprintResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{5}
return fileDescriptor_driver_66cfa35dd20ec741, []int{5}
}
func (m *FingerprintResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FingerprintResponse.Unmarshal(m, b)
@ -447,7 +447,7 @@ func (m *RecoverTaskRequest) Reset() { *m = RecoverTaskRequest{} }
func (m *RecoverTaskRequest) String() string { return proto.CompactTextString(m) }
func (*RecoverTaskRequest) ProtoMessage() {}
func (*RecoverTaskRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{6}
return fileDescriptor_driver_66cfa35dd20ec741, []int{6}
}
func (m *RecoverTaskRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RecoverTaskRequest.Unmarshal(m, b)
@ -491,7 +491,7 @@ func (m *RecoverTaskResponse) Reset() { *m = RecoverTaskResponse{} }
func (m *RecoverTaskResponse) String() string { return proto.CompactTextString(m) }
func (*RecoverTaskResponse) ProtoMessage() {}
func (*RecoverTaskResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{7}
return fileDescriptor_driver_66cfa35dd20ec741, []int{7}
}
func (m *RecoverTaskResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RecoverTaskResponse.Unmarshal(m, b)
@ -523,7 +523,7 @@ func (m *StartTaskRequest) Reset() { *m = StartTaskRequest{} }
func (m *StartTaskRequest) String() string { return proto.CompactTextString(m) }
func (*StartTaskRequest) ProtoMessage() {}
func (*StartTaskRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{8}
return fileDescriptor_driver_66cfa35dd20ec741, []int{8}
}
func (m *StartTaskRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StartTaskRequest.Unmarshal(m, b)
@ -577,7 +577,7 @@ func (m *StartTaskResponse) Reset() { *m = StartTaskResponse{} }
func (m *StartTaskResponse) String() string { return proto.CompactTextString(m) }
func (*StartTaskResponse) ProtoMessage() {}
func (*StartTaskResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{9}
return fileDescriptor_driver_66cfa35dd20ec741, []int{9}
}
func (m *StartTaskResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StartTaskResponse.Unmarshal(m, b)
@ -637,7 +637,7 @@ func (m *WaitTaskRequest) Reset() { *m = WaitTaskRequest{} }
func (m *WaitTaskRequest) String() string { return proto.CompactTextString(m) }
func (*WaitTaskRequest) ProtoMessage() {}
func (*WaitTaskRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{10}
return fileDescriptor_driver_66cfa35dd20ec741, []int{10}
}
func (m *WaitTaskRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_WaitTaskRequest.Unmarshal(m, b)
@ -678,7 +678,7 @@ func (m *WaitTaskResponse) Reset() { *m = WaitTaskResponse{} }
func (m *WaitTaskResponse) String() string { return proto.CompactTextString(m) }
func (*WaitTaskResponse) ProtoMessage() {}
func (*WaitTaskResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{11}
return fileDescriptor_driver_66cfa35dd20ec741, []int{11}
}
func (m *WaitTaskResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_WaitTaskResponse.Unmarshal(m, b)
@ -730,7 +730,7 @@ func (m *StopTaskRequest) Reset() { *m = StopTaskRequest{} }
func (m *StopTaskRequest) String() string { return proto.CompactTextString(m) }
func (*StopTaskRequest) ProtoMessage() {}
func (*StopTaskRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{12}
return fileDescriptor_driver_66cfa35dd20ec741, []int{12}
}
func (m *StopTaskRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StopTaskRequest.Unmarshal(m, b)
@ -781,7 +781,7 @@ func (m *StopTaskResponse) Reset() { *m = StopTaskResponse{} }
func (m *StopTaskResponse) String() string { return proto.CompactTextString(m) }
func (*StopTaskResponse) ProtoMessage() {}
func (*StopTaskResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{13}
return fileDescriptor_driver_66cfa35dd20ec741, []int{13}
}
func (m *StopTaskResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StopTaskResponse.Unmarshal(m, b)
@ -815,7 +815,7 @@ func (m *DestroyTaskRequest) Reset() { *m = DestroyTaskRequest{} }
func (m *DestroyTaskRequest) String() string { return proto.CompactTextString(m) }
func (*DestroyTaskRequest) ProtoMessage() {}
func (*DestroyTaskRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{14}
return fileDescriptor_driver_66cfa35dd20ec741, []int{14}
}
func (m *DestroyTaskRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroyTaskRequest.Unmarshal(m, b)
@ -859,7 +859,7 @@ func (m *DestroyTaskResponse) Reset() { *m = DestroyTaskResponse{} }
func (m *DestroyTaskResponse) String() string { return proto.CompactTextString(m) }
func (*DestroyTaskResponse) ProtoMessage() {}
func (*DestroyTaskResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{15}
return fileDescriptor_driver_66cfa35dd20ec741, []int{15}
}
func (m *DestroyTaskResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroyTaskResponse.Unmarshal(m, b)
@ -891,7 +891,7 @@ func (m *InspectTaskRequest) Reset() { *m = InspectTaskRequest{} }
func (m *InspectTaskRequest) String() string { return proto.CompactTextString(m) }
func (*InspectTaskRequest) ProtoMessage() {}
func (*InspectTaskRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{16}
return fileDescriptor_driver_66cfa35dd20ec741, []int{16}
}
func (m *InspectTaskRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_InspectTaskRequest.Unmarshal(m, b)
@ -934,7 +934,7 @@ func (m *InspectTaskResponse) Reset() { *m = InspectTaskResponse{} }
func (m *InspectTaskResponse) String() string { return proto.CompactTextString(m) }
func (*InspectTaskResponse) ProtoMessage() {}
func (*InspectTaskResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{17}
return fileDescriptor_driver_66cfa35dd20ec741, []int{17}
}
func (m *InspectTaskResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_InspectTaskResponse.Unmarshal(m, b)
@ -987,7 +987,7 @@ func (m *TaskStatsRequest) Reset() { *m = TaskStatsRequest{} }
func (m *TaskStatsRequest) String() string { return proto.CompactTextString(m) }
func (*TaskStatsRequest) ProtoMessage() {}
func (*TaskStatsRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{18}
return fileDescriptor_driver_66cfa35dd20ec741, []int{18}
}
func (m *TaskStatsRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TaskStatsRequest.Unmarshal(m, b)
@ -1026,7 +1026,7 @@ func (m *TaskStatsResponse) Reset() { *m = TaskStatsResponse{} }
func (m *TaskStatsResponse) String() string { return proto.CompactTextString(m) }
func (*TaskStatsResponse) ProtoMessage() {}
func (*TaskStatsResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{19}
return fileDescriptor_driver_66cfa35dd20ec741, []int{19}
}
func (m *TaskStatsResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TaskStatsResponse.Unmarshal(m, b)
@ -1063,7 +1063,7 @@ func (m *TaskEventsRequest) Reset() { *m = TaskEventsRequest{} }
func (m *TaskEventsRequest) String() string { return proto.CompactTextString(m) }
func (*TaskEventsRequest) ProtoMessage() {}
func (*TaskEventsRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{20}
return fileDescriptor_driver_66cfa35dd20ec741, []int{20}
}
func (m *TaskEventsRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TaskEventsRequest.Unmarshal(m, b)
@ -1097,7 +1097,7 @@ func (m *SignalTaskRequest) Reset() { *m = SignalTaskRequest{} }
func (m *SignalTaskRequest) String() string { return proto.CompactTextString(m) }
func (*SignalTaskRequest) ProtoMessage() {}
func (*SignalTaskRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{21}
return fileDescriptor_driver_66cfa35dd20ec741, []int{21}
}
func (m *SignalTaskRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SignalTaskRequest.Unmarshal(m, b)
@ -1141,7 +1141,7 @@ func (m *SignalTaskResponse) Reset() { *m = SignalTaskResponse{} }
func (m *SignalTaskResponse) String() string { return proto.CompactTextString(m) }
func (*SignalTaskResponse) ProtoMessage() {}
func (*SignalTaskResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{22}
return fileDescriptor_driver_66cfa35dd20ec741, []int{22}
}
func (m *SignalTaskResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SignalTaskResponse.Unmarshal(m, b)
@ -1178,7 +1178,7 @@ func (m *ExecTaskRequest) Reset() { *m = ExecTaskRequest{} }
func (m *ExecTaskRequest) String() string { return proto.CompactTextString(m) }
func (*ExecTaskRequest) ProtoMessage() {}
func (*ExecTaskRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{23}
return fileDescriptor_driver_66cfa35dd20ec741, []int{23}
}
func (m *ExecTaskRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ExecTaskRequest.Unmarshal(m, b)
@ -1235,7 +1235,7 @@ func (m *ExecTaskResponse) Reset() { *m = ExecTaskResponse{} }
func (m *ExecTaskResponse) String() string { return proto.CompactTextString(m) }
func (*ExecTaskResponse) ProtoMessage() {}
func (*ExecTaskResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{24}
return fileDescriptor_driver_66cfa35dd20ec741, []int{24}
}
func (m *ExecTaskResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ExecTaskResponse.Unmarshal(m, b)
@ -1294,7 +1294,7 @@ func (m *DriverCapabilities) Reset() { *m = DriverCapabilities{} }
func (m *DriverCapabilities) String() string { return proto.CompactTextString(m) }
func (*DriverCapabilities) ProtoMessage() {}
func (*DriverCapabilities) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{25}
return fileDescriptor_driver_66cfa35dd20ec741, []int{25}
}
func (m *DriverCapabilities) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DriverCapabilities.Unmarshal(m, b)
@ -1380,7 +1380,7 @@ func (m *TaskConfig) Reset() { *m = TaskConfig{} }
func (m *TaskConfig) String() string { return proto.CompactTextString(m) }
func (*TaskConfig) ProtoMessage() {}
func (*TaskConfig) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{26}
return fileDescriptor_driver_66cfa35dd20ec741, []int{26}
}
func (m *TaskConfig) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TaskConfig.Unmarshal(m, b)
@ -1519,7 +1519,7 @@ func (m *Resources) Reset() { *m = Resources{} }
func (m *Resources) String() string { return proto.CompactTextString(m) }
func (*Resources) ProtoMessage() {}
func (*Resources) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{27}
return fileDescriptor_driver_66cfa35dd20ec741, []int{27}
}
func (m *Resources) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Resources.Unmarshal(m, b)
@ -1566,7 +1566,7 @@ func (m *AllocatedTaskResources) Reset() { *m = AllocatedTaskResources{}
func (m *AllocatedTaskResources) String() string { return proto.CompactTextString(m) }
func (*AllocatedTaskResources) ProtoMessage() {}
func (*AllocatedTaskResources) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{28}
return fileDescriptor_driver_66cfa35dd20ec741, []int{28}
}
func (m *AllocatedTaskResources) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_AllocatedTaskResources.Unmarshal(m, b)
@ -1618,7 +1618,7 @@ func (m *AllocatedCpuResources) Reset() { *m = AllocatedCpuResources{} }
func (m *AllocatedCpuResources) String() string { return proto.CompactTextString(m) }
func (*AllocatedCpuResources) ProtoMessage() {}
func (*AllocatedCpuResources) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{29}
return fileDescriptor_driver_66cfa35dd20ec741, []int{29}
}
func (m *AllocatedCpuResources) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_AllocatedCpuResources.Unmarshal(m, b)
@ -1656,7 +1656,7 @@ func (m *AllocatedMemoryResources) Reset() { *m = AllocatedMemoryResourc
func (m *AllocatedMemoryResources) String() string { return proto.CompactTextString(m) }
func (*AllocatedMemoryResources) ProtoMessage() {}
func (*AllocatedMemoryResources) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{30}
return fileDescriptor_driver_66cfa35dd20ec741, []int{30}
}
func (m *AllocatedMemoryResources) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_AllocatedMemoryResources.Unmarshal(m, b)
@ -1699,7 +1699,7 @@ func (m *NetworkResource) Reset() { *m = NetworkResource{} }
func (m *NetworkResource) String() string { return proto.CompactTextString(m) }
func (*NetworkResource) ProtoMessage() {}
func (*NetworkResource) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{31}
return fileDescriptor_driver_66cfa35dd20ec741, []int{31}
}
func (m *NetworkResource) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NetworkResource.Unmarshal(m, b)
@ -1773,7 +1773,7 @@ func (m *NetworkPort) Reset() { *m = NetworkPort{} }
func (m *NetworkPort) String() string { return proto.CompactTextString(m) }
func (*NetworkPort) ProtoMessage() {}
func (*NetworkPort) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{32}
return fileDescriptor_driver_66cfa35dd20ec741, []int{32}
}
func (m *NetworkPort) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NetworkPort.Unmarshal(m, b)
@ -1833,7 +1833,7 @@ func (m *LinuxResources) Reset() { *m = LinuxResources{} }
func (m *LinuxResources) String() string { return proto.CompactTextString(m) }
func (*LinuxResources) ProtoMessage() {}
func (*LinuxResources) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{33}
return fileDescriptor_driver_66cfa35dd20ec741, []int{33}
}
func (m *LinuxResources) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LinuxResources.Unmarshal(m, b)
@ -1925,7 +1925,7 @@ func (m *Mount) Reset() { *m = Mount{} }
func (m *Mount) String() string { return proto.CompactTextString(m) }
func (*Mount) ProtoMessage() {}
func (*Mount) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{34}
return fileDescriptor_driver_66cfa35dd20ec741, []int{34}
}
func (m *Mount) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Mount.Unmarshal(m, b)
@ -1988,7 +1988,7 @@ func (m *Device) Reset() { *m = Device{} }
func (m *Device) String() string { return proto.CompactTextString(m) }
func (*Device) ProtoMessage() {}
func (*Device) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{35}
return fileDescriptor_driver_66cfa35dd20ec741, []int{35}
}
func (m *Device) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Device.Unmarshal(m, b)
@ -2046,7 +2046,7 @@ func (m *TaskHandle) Reset() { *m = TaskHandle{} }
func (m *TaskHandle) String() string { return proto.CompactTextString(m) }
func (*TaskHandle) ProtoMessage() {}
func (*TaskHandle) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{36}
return fileDescriptor_driver_66cfa35dd20ec741, []int{36}
}
func (m *TaskHandle) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TaskHandle.Unmarshal(m, b)
@ -2106,7 +2106,7 @@ func (m *NetworkOverride) Reset() { *m = NetworkOverride{} }
func (m *NetworkOverride) String() string { return proto.CompactTextString(m) }
func (*NetworkOverride) ProtoMessage() {}
func (*NetworkOverride) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{37}
return fileDescriptor_driver_66cfa35dd20ec741, []int{37}
}
func (m *NetworkOverride) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NetworkOverride.Unmarshal(m, b)
@ -2164,7 +2164,7 @@ func (m *ExitResult) Reset() { *m = ExitResult{} }
func (m *ExitResult) String() string { return proto.CompactTextString(m) }
func (*ExitResult) ProtoMessage() {}
func (*ExitResult) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{38}
return fileDescriptor_driver_66cfa35dd20ec741, []int{38}
}
func (m *ExitResult) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ExitResult.Unmarshal(m, b)
@ -2227,7 +2227,7 @@ func (m *TaskStatus) Reset() { *m = TaskStatus{} }
func (m *TaskStatus) String() string { return proto.CompactTextString(m) }
func (*TaskStatus) ProtoMessage() {}
func (*TaskStatus) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{39}
return fileDescriptor_driver_66cfa35dd20ec741, []int{39}
}
func (m *TaskStatus) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TaskStatus.Unmarshal(m, b)
@ -2302,7 +2302,7 @@ func (m *TaskDriverStatus) Reset() { *m = TaskDriverStatus{} }
func (m *TaskDriverStatus) String() string { return proto.CompactTextString(m) }
func (*TaskDriverStatus) ProtoMessage() {}
func (*TaskDriverStatus) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{40}
return fileDescriptor_driver_66cfa35dd20ec741, []int{40}
}
func (m *TaskDriverStatus) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TaskDriverStatus.Unmarshal(m, b)
@ -2347,7 +2347,7 @@ func (m *TaskStats) Reset() { *m = TaskStats{} }
func (m *TaskStats) String() string { return proto.CompactTextString(m) }
func (*TaskStats) ProtoMessage() {}
func (*TaskStats) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{41}
return fileDescriptor_driver_66cfa35dd20ec741, []int{41}
}
func (m *TaskStats) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TaskStats.Unmarshal(m, b)
@ -2409,7 +2409,7 @@ func (m *TaskResourceUsage) Reset() { *m = TaskResourceUsage{} }
func (m *TaskResourceUsage) String() string { return proto.CompactTextString(m) }
func (*TaskResourceUsage) ProtoMessage() {}
func (*TaskResourceUsage) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{42}
return fileDescriptor_driver_66cfa35dd20ec741, []int{42}
}
func (m *TaskResourceUsage) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TaskResourceUsage.Unmarshal(m, b)
@ -2461,7 +2461,7 @@ func (m *CPUUsage) Reset() { *m = CPUUsage{} }
func (m *CPUUsage) String() string { return proto.CompactTextString(m) }
func (*CPUUsage) ProtoMessage() {}
func (*CPUUsage) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{43}
return fileDescriptor_driver_66cfa35dd20ec741, []int{43}
}
func (m *CPUUsage) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CPUUsage.Unmarshal(m, b)
@ -2547,7 +2547,7 @@ func (m *MemoryUsage) Reset() { *m = MemoryUsage{} }
func (m *MemoryUsage) String() string { return proto.CompactTextString(m) }
func (*MemoryUsage) ProtoMessage() {}
func (*MemoryUsage) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{44}
return fileDescriptor_driver_66cfa35dd20ec741, []int{44}
}
func (m *MemoryUsage) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_MemoryUsage.Unmarshal(m, b)
@ -2631,7 +2631,7 @@ func (m *DriverTaskEvent) Reset() { *m = DriverTaskEvent{} }
func (m *DriverTaskEvent) String() string { return proto.CompactTextString(m) }
func (*DriverTaskEvent) ProtoMessage() {}
func (*DriverTaskEvent) Descriptor() ([]byte, []int) {
return fileDescriptor_driver_de29bfae7a3376ed, []int{45}
return fileDescriptor_driver_66cfa35dd20ec741, []int{45}
}
func (m *DriverTaskEvent) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DriverTaskEvent.Unmarshal(m, b)
@ -3339,10 +3339,10 @@ var _Driver_serviceDesc = grpc.ServiceDesc{
}
func init() {
proto.RegisterFile("plugins/drivers/proto/driver.proto", fileDescriptor_driver_de29bfae7a3376ed)
proto.RegisterFile("plugins/drivers/proto/driver.proto", fileDescriptor_driver_66cfa35dd20ec741)
}
var fileDescriptor_driver_de29bfae7a3376ed = []byte{
var fileDescriptor_driver_66cfa35dd20ec741 = []byte{
// 2940 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x59, 0xcb, 0x6f, 0x23, 0xc7,
0xd1, 0x17, 0x9f, 0x22, 0x8b, 0x12, 0x35, 0xdb, 0xbb, 0x6b, 0xd3, 0x34, 0xbe, 0xcf, 0xeb, 0x01,

View File

@ -43,7 +43,7 @@ func (d *DriverHarness) Impl() drivers.DriverPlugin {
func NewDriverHarness(t testing.T, d drivers.DriverPlugin) *DriverHarness {
logger := testlog.HCLogger(t).Named("driver_harness")
pd := drivers.NewDriverPlugin(d, logger).(*drivers.PluginDriver)
pd := drivers.NewDriverPlugin(d).(*drivers.PluginDriver)
client, server := plugin.TestPluginGRPCConn(t,
map[string]plugin.Plugin{

View File

@ -18,8 +18,8 @@ import (
"github.com/hashicorp/hcl2/hcldec"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/device"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/hclutils"
"github.com/kr/pretty"
"github.com/mitchellh/cli"
"github.com/zclconf/go-cty/cty/msgpack"
@ -198,10 +198,10 @@ func (c *Device) setConfig(spec hcldec.Spec, apiVersion string, config []byte, n
c.logger.Trace("raw hcl config", "config", hclog.Fmt("% #v", pretty.Formatter(configVal)))
ctx := &hcl2.EvalContext{
Functions: shared.GetStdlibFuncs(),
Functions: hclutils.GetStdlibFuncs(),
}
val, diag := shared.ParseHclInterface(configVal, spec, ctx)
val, diag := hclutils.ParseHclInterface(configVal, spec, ctx)
if diag.HasErrors() {
errStr := "failed to parse config"
for _, err := range diag.Errs() {

View File

@ -1,61 +0,0 @@
package shared
import (
"context"
"time"
"github.com/hashicorp/nomad/plugins/base"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// HandleStreamErr is used to handle a non io.EOF error in a stream. It handles
// detecting if the plugin has shutdown via the passeed pluginCtx. The
// parameters are:
// - err: the error returned from the streaming RPC
// - reqCtx: the context passed to the streaming request
// - pluginCtx: the plugins done ctx used to detect the plugin dying
//
// The return values are:
// - base.ErrPluginShutdown if the error is because the plugin shutdown
// - context.Canceled if the reqCtx is canceled
// - The original error
func HandleStreamErr(err error, reqCtx, pluginCtx context.Context) error {
if err == nil {
return nil
}
// Determine if the error is because the plugin shutdown
if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.Unavailable {
// Potentially wait a little before returning an error so we can detect
// the exit
select {
case <-pluginCtx.Done():
err = base.ErrPluginShutdown
case <-reqCtx.Done():
err = reqCtx.Err()
// There is no guarantee that the select will choose the
// doneCtx first so we have to double check
select {
case <-pluginCtx.Done():
err = base.ErrPluginShutdown
default:
}
case <-time.After(3 * time.Second):
// Its okay to wait a while since the connection isn't available and
// on local host it is likely shutting down. It is not expected for
// this to ever reach even close to 3 seconds.
}
// It is an error we don't know how to handle, so return it
return err
}
// Context was cancelled
if errStatus := status.FromContextError(reqCtx.Err()); errStatus.Code() == codes.Canceled {
return context.Canceled
}
return err
}

View File

@ -0,0 +1,105 @@
package grpcutils
import (
"context"
"time"
"github.com/hashicorp/nomad/plugins/base/structs"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// HandleReqCtxGrpcErr is used to handle a non io.EOF error in a GRPC request
// where a user supplied context is used. It handles detecting if the plugin has
// shutdown via the passeed pluginCtx. The parameters are:
// - err: the error returned from the streaming RPC
// - reqCtx: the user context passed to the request
// - pluginCtx: the plugins done ctx used to detect the plugin dying
//
// The return values are:
// - ErrPluginShutdown if the error is because the plugin shutdown
// - context.Canceled if the reqCtx is canceled
// - The original error
func HandleReqCtxGrpcErr(err error, reqCtx, pluginCtx context.Context) error {
if err == nil {
return nil
}
// Determine if the error is because the plugin shutdown
if errStatus, ok := status.FromError(err); ok &&
(errStatus.Code() == codes.Unavailable || errStatus.Code() == codes.Canceled) {
// Potentially wait a little before returning an error so we can detect
// the exit
select {
case <-pluginCtx.Done():
err = structs.ErrPluginShutdown
case <-reqCtx.Done():
err = reqCtx.Err()
// There is no guarantee that the select will choose the
// doneCtx first so we have to double check
select {
case <-pluginCtx.Done():
err = structs.ErrPluginShutdown
default:
}
case <-time.After(3 * time.Second):
// Its okay to wait a while since the connection isn't available and
// on local host it is likely shutting down. It is not expected for
// this to ever reach even close to 3 seconds.
}
// It is an error we don't know how to handle, so return it
return err
}
// Context was cancelled
if errStatus := status.FromContextError(reqCtx.Err()); errStatus.Code() == codes.Canceled {
return context.Canceled
}
return err
}
// HandleGrpcErr is used to handle errors made to a remote gRPC plugin. It
// handles detecting if the plugin has shutdown via the passeed pluginCtx. The
// parameters are:
// - err: the error returned from the streaming RPC
// - pluginCtx: the plugins done ctx used to detect the plugin dying
//
// The return values are:
// - ErrPluginShutdown if the error is because the plugin shutdown
// - The original error
func HandleGrpcErr(err error, pluginCtx context.Context) error {
if err == nil {
return nil
}
if errStatus := status.FromContextError(pluginCtx.Err()); errStatus.Code() == codes.Canceled {
// See if the plugin shutdown
select {
case <-pluginCtx.Done():
err = structs.ErrPluginShutdown
default:
}
}
// Determine if the error is because the plugin shutdown
if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.Unavailable {
// Potentially wait a little before returning an error so we can detect
// the exit
select {
case <-pluginCtx.Done():
err = structs.ErrPluginShutdown
case <-time.After(3 * time.Second):
// Its okay to wait a while since the connection isn't available and
// on local host it is likely shutting down. It is not expected for
// this to ever reach even close to 3 seconds.
}
// It is an error we don't know how to handle, so return it
return err
}
return err
}

View File

@ -1,4 +1,4 @@
package shared
package hclutils
import (
"bytes"

View File

@ -1,4 +1,4 @@
package shared
package hclutils
import (
"testing"

View File

@ -13,8 +13,8 @@ import (
hcl2 "github.com/hashicorp/hcl2/hcl"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/hclutils"
"github.com/zclconf/go-cty/cty/msgpack"
)
@ -22,7 +22,7 @@ var (
// configParseCtx is the context used to parse a plugin's configuration
// stanza
configParseCtx = &hcl2.EvalContext{
Functions: shared.GetStdlibFuncs(),
Functions: hclutils.GetStdlibFuncs(),
}
)
@ -467,7 +467,7 @@ func (l *PluginLoader) validePluginConfig(id PluginID, info *pluginInfo) error {
}
// Parse the config using the spec
val, diag := shared.ParseHclInterface(info.config, spec, configParseCtx)
val, diag := hclutils.ParseHclInterface(info.config, spec, configParseCtx)
if diag.HasErrors() {
multierror.Append(&mErr, diag.Errs()...)
return multierror.Prefix(&mErr, "failed parsing config:")

View File

@ -86,14 +86,19 @@ func TestServiceSched_JobRegister(t *testing.T) {
}
// Ensure different ports were used.
used := make(map[int]struct{})
used := make(map[int]map[string]struct{})
for _, alloc := range out {
for _, resource := range alloc.TaskResources {
for _, port := range resource.Networks[0].DynamicPorts {
if _, ok := used[port.Value]; ok {
t.Fatalf("Port collision %v", port.Value)
nodeMap, ok := used[port.Value]
if !ok {
nodeMap = make(map[string]struct{})
used[port.Value] = nodeMap
}
used[port.Value] = struct{}{}
if _, ok := nodeMap[alloc.NodeID]; ok {
t.Fatalf("Port collision on node %q %v", alloc.NodeID, port.Value)
}
nodeMap[alloc.NodeID] = struct{}{}
}
}
}