Merge pull request #4598 from hashicorp/f-device-plugin-example
Example device plugin and helpers
This commit is contained in:
commit
63ad69ab91
|
@ -1,10 +1,13 @@
|
|||
package base
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"reflect"
|
||||
|
||||
plugin "github.com/hashicorp/go-plugin"
|
||||
"github.com/hashicorp/nomad/plugins/base/proto"
|
||||
"github.com/ugorji/go/codec"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
|
@ -46,3 +49,15 @@ func (p *PluginBase) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error
|
|||
func (p *PluginBase) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
|
||||
return &BasePluginClient{Client: proto.NewBasePluginClient(c)}, nil
|
||||
}
|
||||
|
||||
// MsgpackHandle is a shared handle for encoding/decoding of structs
|
||||
var MsgpackHandle = func() *codec.MsgpackHandle {
|
||||
h := &codec.MsgpackHandle{RawToString: true}
|
||||
h.MapType = reflect.TypeOf(map[string]interface{}(nil))
|
||||
return h
|
||||
}()
|
||||
|
||||
// MsgPackDecode is used to decode a MsgPack encoded object
|
||||
func MsgPackDecode(buf []byte, out interface{}) error {
|
||||
return codec.NewDecoder(bytes.NewReader(buf), MsgpackHandle).Decode(out)
|
||||
}
|
||||
|
|
2
plugins/device/cmd/example/README.md
Normal file
2
plugins/device/cmd/example/README.md
Normal file
|
@ -0,0 +1,2 @@
|
|||
This package provides an example implementation of a device plugin for
|
||||
reference.
|
18
plugins/device/cmd/example/cmd/main.go
Normal file
18
plugins/device/cmd/example/cmd/main.go
Normal file
|
@ -0,0 +1,18 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/nomad/plugins"
|
||||
"github.com/hashicorp/nomad/plugins/device/cmd/example"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Serve the plugin
|
||||
plugins.Serve(factory)
|
||||
}
|
||||
|
||||
// factory returns a new instance of our example device plugin
|
||||
func factory(log log.Logger) interface{} {
|
||||
return example.NewExampleDevice(log)
|
||||
}
|
384
plugins/device/cmd/example/device.go
Normal file
384
plugins/device/cmd/example/device.go
Normal file
|
@ -0,0 +1,384 @@
|
|||
package example
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/kr/pretty"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
"github.com/hashicorp/nomad/plugins/device"
|
||||
"github.com/hashicorp/nomad/plugins/shared/hclspec"
|
||||
)
|
||||
|
||||
const (
|
||||
// pluginName is the name of the plugin
|
||||
pluginName = "example-fs-device"
|
||||
|
||||
// vendor is the vendor providing the devices
|
||||
vendor = "nomad"
|
||||
|
||||
// deviceType is the type of device being returned
|
||||
deviceType = "file"
|
||||
|
||||
// deviceName is the name of the devices being exposed
|
||||
deviceName = "mock"
|
||||
)
|
||||
|
||||
var (
|
||||
// pluginInfo describes the plugin
|
||||
pluginInfo = &base.PluginInfoResponse{
|
||||
Type: base.PluginTypeDevice,
|
||||
PluginApiVersion: "0.0.1", // XXX This should be an array and should be consts
|
||||
PluginVersion: "0.1.0",
|
||||
Name: pluginName,
|
||||
}
|
||||
|
||||
// configSpec is the specification of the plugin's configuration
|
||||
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
|
||||
"dir": hclspec.NewDefault(
|
||||
hclspec.NewAttr("dir", "string", false),
|
||||
hclspec.NewLiteral("\".\""),
|
||||
),
|
||||
"list_period": hclspec.NewDefault(
|
||||
hclspec.NewAttr("list_period", "string", false),
|
||||
hclspec.NewLiteral("\"5s\""),
|
||||
),
|
||||
"unhealthy_perm": hclspec.NewDefault(
|
||||
hclspec.NewAttr("unhealthy_perm", "string", false),
|
||||
hclspec.NewLiteral("\"-rwxrwxrwx\""),
|
||||
),
|
||||
"stats_period": hclspec.NewDefault(
|
||||
hclspec.NewAttr("list_period", "string", false),
|
||||
hclspec.NewLiteral("\"5s\""),
|
||||
),
|
||||
})
|
||||
)
|
||||
|
||||
// Config contains configuration information for the plugin.
|
||||
type Config struct {
|
||||
Dir string `codec:"dir"`
|
||||
ListPeriod string `codec:"list_period"`
|
||||
StatsPeriod string `codec:"stats_period"`
|
||||
UnhealthyPerm string `codec:"unhealthy_perm"`
|
||||
}
|
||||
|
||||
// FsDevice is an example device plugin. The device plugin exposes files as
|
||||
// devices and periodically polls the directory for new files. If a file has a
|
||||
// given file permission, it is considered unhealthy. This device plugin is
|
||||
// purely for use as an example.
|
||||
type FsDevice struct {
|
||||
logger log.Logger
|
||||
|
||||
// deviceDir is the directory we expose as devices
|
||||
deviceDir string
|
||||
|
||||
// unhealthyPerm is the permissions on a file we consider unhealthy
|
||||
unhealthyPerm string
|
||||
|
||||
// listPeriod is how often we should list the device directory to detect new
|
||||
// devices
|
||||
listPeriod time.Duration
|
||||
|
||||
// statsPeriod is how often we should collect statistics for fingerprinted
|
||||
// devices.
|
||||
statsPeriod time.Duration
|
||||
|
||||
// devices is the set of detected devices and maps whether they are healthy
|
||||
devices map[string]bool
|
||||
deviceLock sync.RWMutex
|
||||
}
|
||||
|
||||
// NewExampleDevice returns a new example device plugin.
|
||||
func NewExampleDevice(log log.Logger) *FsDevice {
|
||||
return &FsDevice{
|
||||
logger: log.Named(pluginName),
|
||||
devices: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
|
||||
// PluginInfo returns information describing the plugin.
|
||||
func (d *FsDevice) PluginInfo() (*base.PluginInfoResponse, error) {
|
||||
return pluginInfo, nil
|
||||
}
|
||||
|
||||
// ConfigSchema returns the plugins configuration schema.
|
||||
func (d *FsDevice) ConfigSchema() (*hclspec.Spec, error) {
|
||||
return configSpec, nil
|
||||
}
|
||||
|
||||
// SetConfig is used to set the configuration of the plugin.
|
||||
func (d *FsDevice) SetConfig(data []byte) error {
|
||||
var config Config
|
||||
if err := base.MsgPackDecode(data, &config); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Save the device directory and the unhealthy permissions
|
||||
d.deviceDir = config.Dir
|
||||
d.unhealthyPerm = config.UnhealthyPerm
|
||||
|
||||
// Convert the poll period
|
||||
period, err := time.ParseDuration(config.ListPeriod)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse list period %q: %v", config.ListPeriod, err)
|
||||
}
|
||||
d.listPeriod = period
|
||||
|
||||
// Convert the stats period
|
||||
speriod, err := time.ParseDuration(config.StatsPeriod)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse list period %q: %v", config.StatsPeriod, err)
|
||||
}
|
||||
d.statsPeriod = speriod
|
||||
|
||||
d.logger.Debug("test debug")
|
||||
d.logger.Info("config set", "config", log.Fmt("% #v", pretty.Formatter(config)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fingerprint streams detected devices. If device changes are detected or the
|
||||
// devices health changes, messages will be emitted.
|
||||
func (d *FsDevice) Fingerprint(ctx context.Context) (<-chan *device.FingerprintResponse, error) {
|
||||
if d.deviceDir == "" {
|
||||
return nil, status.New(codes.Internal, "device directory not set in config").Err()
|
||||
}
|
||||
|
||||
outCh := make(chan *device.FingerprintResponse)
|
||||
go d.fingerprint(ctx, outCh)
|
||||
return outCh, nil
|
||||
}
|
||||
|
||||
// fingerprint is the long running goroutine that detects hardware
|
||||
func (d *FsDevice) fingerprint(ctx context.Context, devices chan *device.FingerprintResponse) {
|
||||
defer close(devices)
|
||||
|
||||
// Create a timer that will fire immediately for the first detection
|
||||
ticker := time.NewTimer(0)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
ticker.Reset(d.listPeriod)
|
||||
}
|
||||
|
||||
d.logger.Trace("scanning for changes")
|
||||
|
||||
files, err := ioutil.ReadDir(d.deviceDir)
|
||||
if err != nil {
|
||||
d.logger.Error("failed to list device directory", "error", err)
|
||||
devices <- device.NewFingerprintError(err)
|
||||
return
|
||||
}
|
||||
|
||||
detected := d.diffFiles(files)
|
||||
if len(detected) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
devices <- device.NewFingerprint(getDeviceGroup(detected))
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (d *FsDevice) diffFiles(files []os.FileInfo) []*device.Device {
|
||||
d.deviceLock.Lock()
|
||||
defer d.deviceLock.Unlock()
|
||||
|
||||
// Build an unhealthy message
|
||||
unhealthyDesc := fmt.Sprintf("Device has bad permissions %q", d.unhealthyPerm)
|
||||
|
||||
var changes bool
|
||||
fnames := make(map[string]struct{})
|
||||
for _, f := range files {
|
||||
name := f.Name()
|
||||
fnames[name] = struct{}{}
|
||||
if f.IsDir() {
|
||||
d.logger.Trace("skipping directory", "directory", name)
|
||||
continue
|
||||
}
|
||||
|
||||
// Determine the health
|
||||
perms := f.Mode().Perm().String()
|
||||
healthy := perms != d.unhealthyPerm
|
||||
d.logger.Trace("checking health", "file perm", perms, "unhealthy perms", d.unhealthyPerm, "healthy", healthy)
|
||||
|
||||
// See if we alreay have the device
|
||||
oldHealth, ok := d.devices[name]
|
||||
if ok && oldHealth == healthy {
|
||||
continue
|
||||
}
|
||||
|
||||
// Health has changed or we have a new object
|
||||
changes = true
|
||||
d.devices[name] = healthy
|
||||
}
|
||||
|
||||
for id := range d.devices {
|
||||
if _, ok := fnames[id]; !ok {
|
||||
delete(d.devices, id)
|
||||
changes = true
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing to do
|
||||
if !changes {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Build the devices
|
||||
detected := make([]*device.Device, 0, len(d.devices))
|
||||
for name, healthy := range d.devices {
|
||||
var desc string
|
||||
if !healthy {
|
||||
desc = unhealthyDesc
|
||||
}
|
||||
|
||||
detected = append(detected, &device.Device{
|
||||
ID: name,
|
||||
Healthy: healthy,
|
||||
HealthDesc: desc,
|
||||
})
|
||||
}
|
||||
|
||||
return detected
|
||||
}
|
||||
|
||||
// getDeviceGroup is a helper to build the DeviceGroup given a set of devices.
|
||||
func getDeviceGroup(devices []*device.Device) *device.DeviceGroup {
|
||||
return &device.DeviceGroup{
|
||||
Vendor: vendor,
|
||||
Type: deviceType,
|
||||
Name: deviceName,
|
||||
Devices: devices,
|
||||
}
|
||||
}
|
||||
|
||||
// Reserve returns information on how to mount the given devices.
|
||||
func (d *FsDevice) Reserve(deviceIDs []string) (*device.ContainerReservation, error) {
|
||||
if len(deviceIDs) == 0 {
|
||||
return nil, status.New(codes.InvalidArgument, "no device ids given").Err()
|
||||
}
|
||||
|
||||
resp := &device.ContainerReservation{}
|
||||
|
||||
for _, id := range deviceIDs {
|
||||
// Check if the device is known
|
||||
if _, ok := d.devices[id]; !ok {
|
||||
return nil, status.Newf(codes.InvalidArgument, "unknown device %q", id).Err()
|
||||
}
|
||||
|
||||
// Add a mount
|
||||
resp.Devices = append(resp.Devices, &device.DeviceSpec{
|
||||
TaskPath: fmt.Sprintf("/dev/%s", id),
|
||||
HostPath: filepath.Join(d.deviceDir, id),
|
||||
CgroupPerms: "rw",
|
||||
})
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// Stats streams statistics for the detected devices.
|
||||
func (d *FsDevice) Stats(ctx context.Context) (<-chan *device.StatsResponse, error) {
|
||||
outCh := make(chan *device.StatsResponse)
|
||||
go d.stats(ctx, outCh)
|
||||
return outCh, nil
|
||||
}
|
||||
|
||||
// stats is the long running goroutine that streams device statistics
|
||||
func (d *FsDevice) stats(ctx context.Context, stats chan *device.StatsResponse) {
|
||||
defer close(stats)
|
||||
|
||||
// Create a timer that will fire immediately for the first detection
|
||||
ticker := time.NewTimer(0)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
ticker.Reset(d.listPeriod)
|
||||
}
|
||||
|
||||
deviceStats, err := d.collectStats()
|
||||
if err != nil {
|
||||
stats <- &device.StatsResponse{
|
||||
Error: err,
|
||||
}
|
||||
return
|
||||
}
|
||||
if deviceStats == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
stats <- &device.StatsResponse{
|
||||
Groups: []*device.DeviceGroupStats{deviceStats},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *FsDevice) collectStats() (*device.DeviceGroupStats, error) {
|
||||
d.deviceLock.RLock()
|
||||
defer d.deviceLock.RUnlock()
|
||||
l := len(d.devices)
|
||||
if l == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
group := &device.DeviceGroupStats{
|
||||
Vendor: vendor,
|
||||
Type: deviceType,
|
||||
Name: deviceName,
|
||||
InstanceStats: make(map[string]*device.DeviceStats, l),
|
||||
}
|
||||
|
||||
for k := range d.devices {
|
||||
p := filepath.Join(d.deviceDir, k)
|
||||
f, err := os.Stat(p)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to stat %q: %v", p, err)
|
||||
}
|
||||
|
||||
s := &device.DeviceStats{
|
||||
Summary: &device.StatValue{
|
||||
IntNumeratorVal: f.Size(),
|
||||
Unit: "bytes",
|
||||
Desc: "Filesize in bytes",
|
||||
},
|
||||
Stats: &device.StatObject{
|
||||
Attributes: map[string]*device.StatValue{
|
||||
"size": {
|
||||
IntNumeratorVal: f.Size(),
|
||||
Unit: "bytes",
|
||||
Desc: "Filesize in bytes",
|
||||
},
|
||||
"modify_time": {
|
||||
StringVal: f.ModTime().String(),
|
||||
Desc: "Last modified",
|
||||
},
|
||||
"mode": {
|
||||
StringVal: f.Mode().String(),
|
||||
Desc: "File mode",
|
||||
},
|
||||
},
|
||||
},
|
||||
Timestamp: now,
|
||||
}
|
||||
|
||||
group.InstanceStats[k] = s
|
||||
}
|
||||
|
||||
return group, nil
|
||||
}
|
|
@ -38,6 +38,21 @@ type FingerprintResponse struct {
|
|||
Error error
|
||||
}
|
||||
|
||||
// NewFingerprint takes a set of device groups and returns a fingerprint
|
||||
// response
|
||||
func NewFingerprint(devices ...*DeviceGroup) *FingerprintResponse {
|
||||
return &FingerprintResponse{
|
||||
Devices: devices,
|
||||
}
|
||||
}
|
||||
|
||||
// NewFingerprintError takes an error and returns a fingerprint response
|
||||
func NewFingerprintError(err error) *FingerprintResponse {
|
||||
return &FingerprintResponse{
|
||||
Error: err,
|
||||
}
|
||||
}
|
||||
|
||||
// DeviceGroup is a grouping of devices that share a common vendor, device type
|
||||
// and name.
|
||||
type DeviceGroup struct {
|
||||
|
|
|
@ -3,6 +3,7 @@ package device
|
|||
import (
|
||||
"context"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
plugin "github.com/hashicorp/go-plugin"
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
bproto "github.com/hashicorp/nomad/plugins/base/proto"
|
||||
|
@ -33,3 +34,16 @@ func (p *PluginDevice) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker
|
|||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Serve is used to serve a device plugin
|
||||
func Serve(dev DevicePlugin, logger log.Logger) {
|
||||
plugin.Serve(&plugin.ServeConfig{
|
||||
HandshakeConfig: base.Handshake,
|
||||
Plugins: map[string]plugin.Plugin{
|
||||
base.PluginTypeBase: &base.PluginBase{Impl: dev},
|
||||
base.PluginTypeDevice: &PluginDevice{Impl: dev},
|
||||
},
|
||||
GRPCServer: plugin.DefaultGRPCServer,
|
||||
Logger: logger,
|
||||
})
|
||||
}
|
||||
|
|
27
plugins/serve.go
Normal file
27
plugins/serve.go
Normal file
|
@ -0,0 +1,27 @@
|
|||
package plugins
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/plugins/device"
|
||||
)
|
||||
|
||||
// PluginFactory returns a new plugin instance
|
||||
type PluginFactory func(log log.Logger) interface{}
|
||||
|
||||
// Serve is used to serve a new Nomad plugin
|
||||
func Serve(f PluginFactory) {
|
||||
logger := log.New(&log.LoggerOptions{
|
||||
Level: log.Trace,
|
||||
JSONFormat: true,
|
||||
})
|
||||
|
||||
plugin := f(logger)
|
||||
switch p := plugin.(type) {
|
||||
case device.DevicePlugin:
|
||||
device.Serve(p, logger)
|
||||
default:
|
||||
fmt.Println("Unsupported plugin type")
|
||||
}
|
||||
}
|
3
plugins/shared/cmd/launcher/README.md
Normal file
3
plugins/shared/cmd/launcher/README.md
Normal file
|
@ -0,0 +1,3 @@
|
|||
This command allows plugin developers to interact with a plugin directly. The
|
||||
command has subcommands for each plugin type. See the subcommands for usage
|
||||
information.
|
364
plugins/shared/cmd/launcher/command/device.go
Normal file
364
plugins/shared/cmd/launcher/command/device.go
Normal file
|
@ -0,0 +1,364 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
plugin "github.com/hashicorp/go-plugin"
|
||||
"github.com/hashicorp/hcl"
|
||||
"github.com/hashicorp/hcl/hcl/ast"
|
||||
hcl2 "github.com/hashicorp/hcl2/hcl"
|
||||
"github.com/hashicorp/hcl2/hcldec"
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
"github.com/hashicorp/nomad/plugins/device"
|
||||
"github.com/hashicorp/nomad/plugins/shared"
|
||||
"github.com/hashicorp/nomad/plugins/shared/hclspec"
|
||||
"github.com/kr/pretty"
|
||||
"github.com/mitchellh/cli"
|
||||
"github.com/zclconf/go-cty/cty/msgpack"
|
||||
)
|
||||
|
||||
func DeviceCommandFactory(meta Meta) cli.CommandFactory {
|
||||
return func() (cli.Command, error) {
|
||||
return &Device{Meta: meta}, nil
|
||||
}
|
||||
}
|
||||
|
||||
type Device struct {
|
||||
Meta
|
||||
|
||||
// dev is the plugin device
|
||||
dev device.DevicePlugin
|
||||
|
||||
// spec is the returned and parsed spec.
|
||||
spec hcldec.Spec
|
||||
}
|
||||
|
||||
func (c *Device) Help() string {
|
||||
helpText := `
|
||||
Usage: nomad-plugin-launcher device <device-binary> <config_file>
|
||||
|
||||
Device launches the given device binary and provides a REPL for interacting
|
||||
with it.
|
||||
|
||||
General Options:
|
||||
|
||||
` + generalOptionsUsage() + `
|
||||
|
||||
Device Options:
|
||||
|
||||
-trace
|
||||
Enable trace level log output.
|
||||
`
|
||||
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
||||
|
||||
func (c *Device) Synopsis() string {
|
||||
return "REPL for interacting with device plugins"
|
||||
}
|
||||
|
||||
func (c *Device) Run(args []string) int {
|
||||
var trace bool
|
||||
cmdFlags := c.FlagSet("device")
|
||||
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
|
||||
cmdFlags.BoolVar(&trace, "trace", false, "")
|
||||
|
||||
if err := cmdFlags.Parse(args); err != nil {
|
||||
c.logger.Error("failed to parse flags:", "error", err)
|
||||
return 1
|
||||
}
|
||||
if trace {
|
||||
c.logger.SetLevel(hclog.Trace)
|
||||
} else if c.verbose {
|
||||
c.logger.SetLevel(hclog.Debug)
|
||||
}
|
||||
|
||||
args = cmdFlags.Args()
|
||||
numArgs := len(args)
|
||||
if numArgs < 1 {
|
||||
c.logger.Error("expected at least 1 args (device binary)", "args", args)
|
||||
return 1
|
||||
} else if numArgs > 2 {
|
||||
c.logger.Error("expected at most 2 args (device binary and config file)", "args", args)
|
||||
return 1
|
||||
}
|
||||
|
||||
binary := args[0]
|
||||
var config []byte
|
||||
if numArgs == 2 {
|
||||
var err error
|
||||
config, err = ioutil.ReadFile(args[1])
|
||||
if err != nil {
|
||||
c.logger.Error("failed to read config file", "error", err)
|
||||
return 1
|
||||
}
|
||||
|
||||
c.logger.Trace("read config", "config", string(config))
|
||||
}
|
||||
|
||||
// Get the plugin
|
||||
dev, cleanup, err := c.getDevicePlugin(binary)
|
||||
if err != nil {
|
||||
c.logger.Error("failed to launch device plugin", "error", err)
|
||||
return 1
|
||||
}
|
||||
defer cleanup()
|
||||
c.dev = dev
|
||||
|
||||
spec, err := c.getSpec()
|
||||
if err != nil {
|
||||
c.logger.Error("failed to get config spec", "error", err)
|
||||
return 1
|
||||
}
|
||||
c.spec = spec
|
||||
|
||||
if err := c.setConfig(spec, config); err != nil {
|
||||
c.logger.Error("failed to set config", "error", err)
|
||||
return 1
|
||||
}
|
||||
|
||||
if err := c.startRepl(); err != nil {
|
||||
c.logger.Error("error interacting with plugin", "error", err)
|
||||
return 1
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c *Device) getDevicePlugin(binary string) (device.DevicePlugin, func(), error) {
|
||||
// Launch the plugin
|
||||
client := plugin.NewClient(&plugin.ClientConfig{
|
||||
HandshakeConfig: base.Handshake,
|
||||
Plugins: map[string]plugin.Plugin{
|
||||
base.PluginTypeBase: &base.PluginBase{},
|
||||
base.PluginTypeDevice: &device.PluginDevice{},
|
||||
},
|
||||
Cmd: exec.Command(binary),
|
||||
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
|
||||
Logger: c.logger,
|
||||
})
|
||||
|
||||
// Connect via RPC
|
||||
rpcClient, err := client.Client()
|
||||
if err != nil {
|
||||
client.Kill()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Request the plugin
|
||||
raw, err := rpcClient.Dispense(base.PluginTypeDevice)
|
||||
if err != nil {
|
||||
client.Kill()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// We should have a KV store now! This feels like a normal interface
|
||||
// implementation but is in fact over an RPC connection.
|
||||
dev := raw.(device.DevicePlugin)
|
||||
return dev, func() { client.Kill() }, nil
|
||||
}
|
||||
|
||||
func (c *Device) getSpec() (hcldec.Spec, error) {
|
||||
// Get the schema so we can parse the config
|
||||
spec, err := c.dev.ConfigSchema()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get config schema: %v", err)
|
||||
}
|
||||
|
||||
c.logger.Trace("device spec", "spec", hclog.Fmt("% #v", pretty.Formatter(spec)))
|
||||
|
||||
// Convert the schema
|
||||
schema, diag := hclspec.Convert(spec)
|
||||
if diag.HasErrors() {
|
||||
errStr := "failed to convert HCL schema: "
|
||||
for _, err := range diag.Errs() {
|
||||
errStr = fmt.Sprintf("%s\n* %s", errStr, err.Error())
|
||||
}
|
||||
return nil, errors.New(errStr)
|
||||
}
|
||||
|
||||
return schema, nil
|
||||
}
|
||||
|
||||
func (c *Device) setConfig(spec hcldec.Spec, config []byte) error {
|
||||
// Parse the config into hcl
|
||||
configVal, err := hclConfigToInterface(config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.logger.Trace("raw hcl config", "config", hclog.Fmt("% #v", pretty.Formatter(configVal)))
|
||||
|
||||
ctx := &hcl2.EvalContext{
|
||||
Functions: shared.GetStdlibFuncs(),
|
||||
}
|
||||
|
||||
val, diag := shared.ParseHclInterface(configVal, spec, ctx)
|
||||
if diag.HasErrors() {
|
||||
errStr := "failed to parse config"
|
||||
for _, err := range diag.Errs() {
|
||||
errStr = fmt.Sprintf("%s\n* %s", errStr, err.Error())
|
||||
}
|
||||
return errors.New(errStr)
|
||||
}
|
||||
c.logger.Trace("parsed hcl config", "config", hclog.Fmt("% #v", pretty.Formatter(val)))
|
||||
|
||||
cdata, err := msgpack.Marshal(val, val.Type())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.logger.Trace("msgpack config", "config", string(cdata))
|
||||
if err := c.dev.SetConfig(cdata); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func hclConfigToInterface(config []byte) (interface{}, error) {
|
||||
if len(config) == 0 {
|
||||
return map[string]interface{}{}, nil
|
||||
}
|
||||
|
||||
// Parse as we do in the jobspec parser
|
||||
root, err := hcl.Parse(string(config))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to hcl parse the config: %v", err)
|
||||
}
|
||||
|
||||
// Top-level item should be a list
|
||||
list, ok := root.Node.(*ast.ObjectList)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("root should be an object")
|
||||
}
|
||||
|
||||
var m map[string]interface{}
|
||||
if err := hcl.DecodeObject(&m, list.Items[0]); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode object: %v", err)
|
||||
}
|
||||
|
||||
return m["config"], nil
|
||||
}
|
||||
|
||||
func (c *Device) startRepl() error {
|
||||
// Start the output goroutine
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
fingerprint := make(chan context.Context)
|
||||
stats := make(chan context.Context)
|
||||
reserve := make(chan []string)
|
||||
go c.replOutput(ctx, fingerprint, stats, reserve)
|
||||
|
||||
c.Ui.Output("> Availabile commands are: exit(), fingerprint(), stop_fingerprint(), stats(), stop_stats(), reserve(id1, id2, ...)")
|
||||
var fingerprintCtx, statsCtx context.Context
|
||||
var fingerprintCancel, statsCancel context.CancelFunc
|
||||
for {
|
||||
in, err := c.Ui.Ask("> ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch {
|
||||
case in == "exit()":
|
||||
return nil
|
||||
case in == "fingerprint()":
|
||||
if fingerprintCtx != nil {
|
||||
continue
|
||||
}
|
||||
fingerprintCtx, fingerprintCancel = context.WithCancel(ctx)
|
||||
fingerprint <- fingerprintCtx
|
||||
case in == "stop_fingerprint()":
|
||||
if fingerprintCtx == nil {
|
||||
continue
|
||||
}
|
||||
fingerprintCancel()
|
||||
fingerprintCtx = nil
|
||||
case in == "stats()":
|
||||
if statsCtx != nil {
|
||||
continue
|
||||
}
|
||||
statsCtx, statsCancel = context.WithCancel(ctx)
|
||||
stats <- statsCtx
|
||||
case in == "stop_stats()":
|
||||
if statsCtx == nil {
|
||||
continue
|
||||
}
|
||||
statsCancel()
|
||||
statsCtx = nil
|
||||
case strings.HasPrefix(in, "reserve(") && strings.HasSuffix(in, ")"):
|
||||
listString := strings.TrimSuffix(strings.TrimPrefix(in, "reserve("), ")")
|
||||
ids := strings.Split(strings.TrimSpace(listString), ",")
|
||||
reserve <- ids
|
||||
default:
|
||||
c.Ui.Error(fmt.Sprintf("> Unknown command %q", in))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Device) replOutput(ctx context.Context, startFingerprint, startStats <-chan context.Context, reserve <-chan []string) {
|
||||
var fingerprint <-chan *device.FingerprintResponse
|
||||
var stats <-chan *device.StatsResponse
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case ctx := <-startFingerprint:
|
||||
var err error
|
||||
fingerprint, err = c.dev.Fingerprint(ctx)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("fingerprint: %s", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
case resp, ok := <-fingerprint:
|
||||
if !ok {
|
||||
c.Ui.Output("> fingerprint: fingerprint output closed")
|
||||
fingerprint = nil
|
||||
continue
|
||||
}
|
||||
|
||||
if resp == nil {
|
||||
c.Ui.Warn("> fingerprint: received nil result")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
c.Ui.Output(fmt.Sprintf("> fingerprint: % #v", pretty.Formatter(resp)))
|
||||
case ctx := <-startStats:
|
||||
var err error
|
||||
stats, err = c.dev.Stats(ctx)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("stats: %s", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
case resp, ok := <-stats:
|
||||
if !ok {
|
||||
c.Ui.Output("> stats: stats output closed")
|
||||
stats = nil
|
||||
continue
|
||||
}
|
||||
|
||||
if resp == nil {
|
||||
c.Ui.Warn("> stats: received nil result")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
c.Ui.Output(fmt.Sprintf("> stats: % #v", pretty.Formatter(resp)))
|
||||
case ids := <-reserve:
|
||||
resp, err := c.dev.Reserve(ids)
|
||||
if err != nil {
|
||||
c.Ui.Warn(fmt.Sprintf("> reserve(%s): %v", strings.Join(ids, ", "), err))
|
||||
} else {
|
||||
c.Ui.Output(fmt.Sprintf("> reserve(%s): % #v", strings.Join(ids, ", "), pretty.Formatter(resp)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
39
plugins/shared/cmd/launcher/command/meta.go
Normal file
39
plugins/shared/cmd/launcher/command/meta.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"strings"
|
||||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
type Meta struct {
|
||||
Ui cli.Ui
|
||||
logger hclog.Logger
|
||||
|
||||
verbose bool
|
||||
}
|
||||
|
||||
func NewMeta(ui cli.Ui, logger hclog.Logger) Meta {
|
||||
return Meta{
|
||||
Ui: ui,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Meta) FlagSet(n string) *flag.FlagSet {
|
||||
f := flag.NewFlagSet(n, flag.ContinueOnError)
|
||||
|
||||
f.BoolVar(&m.verbose, "verbose", false, "Toggle verbose output")
|
||||
return f
|
||||
}
|
||||
|
||||
// generalOptionsUsage return the help string for the global options
|
||||
func generalOptionsUsage() string {
|
||||
helpText := `
|
||||
-verbose
|
||||
Enables verbose logging.
|
||||
`
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
41
plugins/shared/cmd/launcher/main.go
Normal file
41
plugins/shared/cmd/launcher/main.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/plugins/shared/cmd/launcher/command"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
const (
|
||||
NomadPluginLauncherCli = "nomad-plugin-launcher"
|
||||
NomadPluginLauncherCliVersion = "0.0.1"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ui := &cli.BasicUi{
|
||||
Reader: os.Stdin,
|
||||
Writer: os.Stdout,
|
||||
ErrorWriter: os.Stderr,
|
||||
}
|
||||
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Name: NomadPluginLauncherCli,
|
||||
Output: &cli.UiWriter{Ui: ui},
|
||||
})
|
||||
|
||||
c := cli.NewCLI(NomadPluginLauncherCli, NomadPluginLauncherCliVersion)
|
||||
c.Args = os.Args[1:]
|
||||
|
||||
meta := command.NewMeta(ui, logger)
|
||||
c.Commands = map[string]cli.CommandFactory{
|
||||
"device": command.DeviceCommandFactory(meta),
|
||||
}
|
||||
|
||||
exitStatus, err := c.Run()
|
||||
if err != nil {
|
||||
logger.Error("command exited with non-zero status", "status", exitStatus, "error", err)
|
||||
}
|
||||
os.Exit(exitStatus)
|
||||
}
|
170
plugins/shared/hclspec/spec.go
Normal file
170
plugins/shared/hclspec/spec.go
Normal file
|
@ -0,0 +1,170 @@
|
|||
package hclspec
|
||||
|
||||
// ObjectSpec wraps the object and returns a spec.
|
||||
func ObjectSpec(obj *Object) *Spec {
|
||||
return &Spec{
|
||||
Block: &Spec_Object{
|
||||
Object: obj,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// ArraySpec wraps the array and returns a spec.
|
||||
func ArraySpec(array *Array) *Spec {
|
||||
return &Spec{
|
||||
Block: &Spec_Array{
|
||||
Array: array,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// AttrSpec wraps the attr and returns a spec.
|
||||
func AttrSpec(attr *Attr) *Spec {
|
||||
return &Spec{
|
||||
Block: &Spec_Attr{
|
||||
Attr: attr,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// BlockSpec wraps the block and returns a spec.
|
||||
func BlockSpec(block *Block) *Spec {
|
||||
return &Spec{
|
||||
Block: &Spec_BlockValue{
|
||||
BlockValue: block,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// BlockListSpec wraps the block list and returns a spec.
|
||||
func BlockListSpec(blockList *BlockList) *Spec {
|
||||
return &Spec{
|
||||
Block: &Spec_BlockList{
|
||||
BlockList: blockList,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// BlockSetSpec wraps the block set and returns a spec.
|
||||
func BlockSetSpec(blockSet *BlockSet) *Spec {
|
||||
return &Spec{
|
||||
Block: &Spec_BlockSet{
|
||||
BlockSet: blockSet,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// BlockMapSpec wraps the block map and returns a spec.
|
||||
func BlockMapSpec(blockMap *BlockMap) *Spec {
|
||||
return &Spec{
|
||||
Block: &Spec_BlockMap{
|
||||
BlockMap: blockMap,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// DefaultSpec wraps the default and returns a spec.
|
||||
func DefaultSpec(d *Default) *Spec {
|
||||
return &Spec{
|
||||
Block: &Spec_Default{
|
||||
Default: d,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// LiteralSpec wraps the literal and returns a spec.
|
||||
func LiteralSpec(l *Literal) *Spec {
|
||||
return &Spec{
|
||||
Block: &Spec_Literal{
|
||||
Literal: l,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// NewObject returns a new object spec.
|
||||
func NewObject(attrs map[string]*Spec) *Spec {
|
||||
return ObjectSpec(&Object{
|
||||
Attributes: attrs,
|
||||
})
|
||||
}
|
||||
|
||||
// NewAttr returns a new attribute spec.
|
||||
func NewAttr(name, attrType string, required bool) *Spec {
|
||||
return AttrSpec(&Attr{
|
||||
Name: name,
|
||||
Type: attrType,
|
||||
Required: required,
|
||||
})
|
||||
}
|
||||
|
||||
// NewBlock returns a new block spec.
|
||||
func NewBlock(name string, required bool, nested *Spec) *Spec {
|
||||
return BlockSpec(&Block{
|
||||
Name: name,
|
||||
Required: required,
|
||||
Nested: nested,
|
||||
})
|
||||
}
|
||||
|
||||
// NewBlockList returns a new block list spec that has no limits.
|
||||
func NewBlockList(name string, nested *Spec) *Spec {
|
||||
return NewBlockListLimited(name, 0, 0, nested)
|
||||
}
|
||||
|
||||
// NewBlockListLimited returns a new block list spec that limits the number of
|
||||
// blocks.
|
||||
func NewBlockListLimited(name string, min, max uint64, nested *Spec) *Spec {
|
||||
return BlockListSpec(&BlockList{
|
||||
Name: name,
|
||||
MinItems: min,
|
||||
MaxItems: max,
|
||||
Nested: nested,
|
||||
})
|
||||
}
|
||||
|
||||
// NewBlockSet returns a new block set spec that has no limits.
|
||||
func NewBlockSet(name string, nested *Spec) *Spec {
|
||||
return NewBlockSetLimited(name, 0, 0, nested)
|
||||
}
|
||||
|
||||
// NewBlockSetLimited returns a new block set spec that limits the number of
|
||||
// blocks.
|
||||
func NewBlockSetLimited(name string, min, max uint64, nested *Spec) *Spec {
|
||||
return BlockSetSpec(&BlockSet{
|
||||
Name: name,
|
||||
MinItems: min,
|
||||
MaxItems: max,
|
||||
Nested: nested,
|
||||
})
|
||||
}
|
||||
|
||||
// NewBlockMap returns a new block map spec.
|
||||
func NewBlockMap(name string, labels []string, nested *Spec) *Spec {
|
||||
return BlockMapSpec(&BlockMap{
|
||||
Name: name,
|
||||
Labels: labels,
|
||||
Nested: nested,
|
||||
})
|
||||
}
|
||||
|
||||
// NewLiteral returns a new literal spec.
|
||||
func NewLiteral(value string) *Spec {
|
||||
return LiteralSpec(&Literal{
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
|
||||
// NewDefault returns a new default spec.
|
||||
func NewDefault(primary, defaultValue *Spec) *Spec {
|
||||
return DefaultSpec(&Default{
|
||||
Primary: primary,
|
||||
Default: defaultValue,
|
||||
})
|
||||
}
|
||||
|
||||
// NewArray returns a new array spec.
|
||||
func NewArray(values []*Spec) *Spec {
|
||||
return ArraySpec(&Array{
|
||||
Values: values,
|
||||
})
|
||||
}
|
Loading…
Reference in a new issue