Merge pull request #4591 from hashicorp/f-device-plugin

Initial device go-plugin
This commit is contained in:
Alex Dadgar 2018-08-20 15:19:26 -07:00 committed by GitHub
commit 49c56b1d79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1380 additions and 196 deletions

View File

@ -8,14 +8,14 @@ import (
"github.com/hashicorp/nomad/plugins/shared/hclspec" "github.com/hashicorp/nomad/plugins/shared/hclspec"
) )
// basePluginClient implements the client side of a remote base plugin, using // BasePluginClient implements the client side of a remote base plugin, using
// gRPC to communicate to the remote plugin. // gRPC to communicate to the remote plugin.
type basePluginClient struct { type BasePluginClient struct {
client proto.BasePluginClient Client proto.BasePluginClient
} }
func (b *basePluginClient) PluginInfo() (*PluginInfoResponse, error) { func (b *BasePluginClient) PluginInfo() (*PluginInfoResponse, error) {
presp, err := b.client.PluginInfo(context.Background(), &proto.PluginInfoRequest{}) presp, err := b.Client.PluginInfo(context.Background(), &proto.PluginInfoRequest{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -40,8 +40,8 @@ func (b *basePluginClient) PluginInfo() (*PluginInfoResponse, error) {
return resp, nil return resp, nil
} }
func (b *basePluginClient) ConfigSchema() (*hclspec.Spec, error) { func (b *BasePluginClient) ConfigSchema() (*hclspec.Spec, error) {
presp, err := b.client.ConfigSchema(context.Background(), &proto.ConfigSchemaRequest{}) presp, err := b.Client.ConfigSchema(context.Background(), &proto.ConfigSchemaRequest{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -49,9 +49,9 @@ func (b *basePluginClient) ConfigSchema() (*hclspec.Spec, error) {
return presp.GetSpec(), nil return presp.GetSpec(), nil
} }
func (b *basePluginClient) SetConfig(data []byte) error { func (b *BasePluginClient) SetConfig(data []byte) error {
// Send the config // Send the config
_, err := b.client.SetConfig(context.Background(), &proto.SetConfigRequest{ _, err := b.Client.SetConfig(context.Background(), &proto.SetConfigRequest{
MsgpackConfig: data, MsgpackConfig: data,
}) })

View File

@ -1,18 +0,0 @@
package base
import (
"github.com/hashicorp/nomad/plugins/shared/hclspec"
)
// MockPlugin is used for testing.
// Each function can be set as a closure to make assertions about how data
// is passed through the base plugin layer.
type MockPlugin struct {
PluginInfoF func() (*PluginInfoResponse, error)
ConfigSchemaF func() (*hclspec.Spec, error)
SetConfigF func([]byte) error
}
func (p *MockPlugin) PluginInfo() (*PluginInfoResponse, error) { return p.PluginInfoF() }
func (p *MockPlugin) ConfigSchema() (*hclspec.Spec, error) { return p.ConfigSchemaF() }
func (p *MockPlugin) SetConfig(data []byte) error { return p.SetConfigF(data) }

View File

@ -9,6 +9,9 @@ import (
) )
const ( const (
// PluginTypeBase implements the base plugin interface
PluginTypeBase = "base"
// PluginTypeDriver implements the driver plugin interface // PluginTypeDriver implements the driver plugin interface
PluginTypeDriver = "driver" PluginTypeDriver = "driver"
@ -29,17 +32,17 @@ var (
// interface to expose the interface over gRPC. // interface to expose the interface over gRPC.
type PluginBase struct { type PluginBase struct {
plugin.NetRPCUnsupportedPlugin plugin.NetRPCUnsupportedPlugin
impl BasePlugin Impl BasePlugin
} }
func (p *PluginBase) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { func (p *PluginBase) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
proto.RegisterBasePluginServer(s, &basePluginServer{ proto.RegisterBasePluginServer(s, &basePluginServer{
impl: p.impl, impl: p.Impl,
broker: broker, broker: broker,
}) })
return nil return nil
} }
func (p *PluginBase) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { func (p *PluginBase) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &basePluginClient{client: proto.NewBasePluginClient(c)}, nil return &BasePluginClient{Client: proto.NewBasePluginClient(c)}, nil
} }

View File

@ -12,48 +12,6 @@ import (
"github.com/zclconf/go-cty/cty/msgpack" "github.com/zclconf/go-cty/cty/msgpack"
) )
var (
// testSpec is an hcl Spec for testing
testSpec = &hclspec.Spec{
Block: &hclspec.Spec_Object{
Object: &hclspec.Object{
Attributes: map[string]*hclspec.Spec{
"foo": {
Block: &hclspec.Spec_Attr{
Attr: &hclspec.Attr{
Type: "string",
Required: false,
},
},
},
"bar": {
Block: &hclspec.Spec_Attr{
Attr: &hclspec.Attr{
Type: "number",
Required: true,
},
},
},
"baz": {
Block: &hclspec.Spec_Attr{
Attr: &hclspec.Attr{
Type: "bool",
},
},
},
},
},
},
}
)
// testConfig is used to decode a config from the testSpec
type testConfig struct {
Foo string `cty:"foo" codec:"foo"`
Bar int64 `cty:"bar" codec:"bar"`
Baz bool `cty:"baz" codec:"baz"`
}
func TestBasePlugin_PluginInfo_GRPC(t *testing.T) { func TestBasePlugin_PluginInfo_GRPC(t *testing.T) {
t.Parallel() t.Parallel()
require := require.New(t) require := require.New(t)
@ -88,12 +46,12 @@ func TestBasePlugin_PluginInfo_GRPC(t *testing.T) {
} }
client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{
"base": &PluginBase{impl: mock}, PluginTypeBase: &PluginBase{Impl: mock},
}) })
defer server.Stop() defer server.Stop()
defer client.Close() defer client.Close()
raw, err := client.Dispense("base") raw, err := client.Dispense(PluginTypeBase)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -123,17 +81,17 @@ func TestBasePlugin_ConfigSchema(t *testing.T) {
mock := &MockPlugin{ mock := &MockPlugin{
ConfigSchemaF: func() (*hclspec.Spec, error) { ConfigSchemaF: func() (*hclspec.Spec, error) {
return testSpec, nil return TestSpec, nil
}, },
} }
client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{
"base": &PluginBase{impl: mock}, PluginTypeBase: &PluginBase{Impl: mock},
}) })
defer server.Stop() defer server.Stop()
defer client.Close() defer client.Close()
raw, err := client.Dispense("base") raw, err := client.Dispense(PluginTypeBase)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -145,7 +103,7 @@ func TestBasePlugin_ConfigSchema(t *testing.T) {
specOut, err := impl.ConfigSchema() specOut, err := impl.ConfigSchema()
require.NoError(err) require.NoError(err)
require.True(pb.Equal(testSpec, specOut)) require.True(pb.Equal(TestSpec, specOut))
} }
func TestBasePlugin_SetConfig(t *testing.T) { func TestBasePlugin_SetConfig(t *testing.T) {
@ -155,7 +113,7 @@ func TestBasePlugin_SetConfig(t *testing.T) {
var receivedData []byte var receivedData []byte
mock := &MockPlugin{ mock := &MockPlugin{
ConfigSchemaF: func() (*hclspec.Spec, error) { ConfigSchemaF: func() (*hclspec.Spec, error) {
return testSpec, nil return TestSpec, nil
}, },
SetConfigF: func(data []byte) error { SetConfigF: func(data []byte) error {
receivedData = data receivedData = data
@ -164,12 +122,12 @@ func TestBasePlugin_SetConfig(t *testing.T) {
} }
client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{
"base": &PluginBase{impl: mock}, PluginTypeBase: &PluginBase{Impl: mock},
}) })
defer server.Stop() defer server.Stop()
defer client.Close() defer client.Close()
raw, err := client.Dispense("base") raw, err := client.Dispense(PluginTypeBase)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -190,7 +148,7 @@ func TestBasePlugin_SetConfig(t *testing.T) {
require.Equal(cdata, receivedData) require.Equal(cdata, receivedData)
// Decode the value back // Decode the value back
var actual testConfig var actual TestConfig
require.NoError(structs.Decode(receivedData, &actual)) require.NoError(structs.Decode(receivedData, &actual))
require.Equal("v1", actual.Foo) require.Equal("v1", actual.Foo)
require.EqualValues(1337, actual.Bar) require.EqualValues(1337, actual.Bar)

60
plugins/base/testing.go Normal file
View File

@ -0,0 +1,60 @@
package base
import (
"github.com/hashicorp/nomad/plugins/shared/hclspec"
)
var (
// TestSpec is an hcl Spec for testing
TestSpec = &hclspec.Spec{
Block: &hclspec.Spec_Object{
Object: &hclspec.Object{
Attributes: map[string]*hclspec.Spec{
"foo": {
Block: &hclspec.Spec_Attr{
Attr: &hclspec.Attr{
Type: "string",
Required: false,
},
},
},
"bar": {
Block: &hclspec.Spec_Attr{
Attr: &hclspec.Attr{
Type: "number",
Required: true,
},
},
},
"baz": {
Block: &hclspec.Spec_Attr{
Attr: &hclspec.Attr{
Type: "bool",
},
},
},
},
},
},
}
)
// TestConfig is used to decode a config from the TestSpec
type TestConfig struct {
Foo string `cty:"foo" codec:"foo"`
Bar int64 `cty:"bar" codec:"bar"`
Baz bool `cty:"baz" codec:"baz"`
}
// MockPlugin is used for testing.
// Each function can be set as a closure to make assertions about how data
// is passed through the base plugin layer.
type MockPlugin struct {
PluginInfoF func() (*PluginInfoResponse, error)
ConfigSchemaF func() (*hclspec.Spec, error)
SetConfigF func([]byte) error
}
func (p *MockPlugin) PluginInfo() (*PluginInfoResponse, error) { return p.PluginInfoF() }
func (p *MockPlugin) ConfigSchema() (*hclspec.Spec, error) { return p.ConfigSchemaF() }
func (p *MockPlugin) SetConfig(data []byte) error { return p.SetConfigF(data) }

88
plugins/device/client.go Normal file
View File

@ -0,0 +1,88 @@
package device
import (
"context"
"io"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/device/proto"
netctx "golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// devicePluginClient implements the client side of a remote device plugin, using
// gRPC to communicate to the remote plugin.
type devicePluginClient struct {
// basePluginClient is embedded to give access to the base plugin methods.
*base.BasePluginClient
client proto.DevicePluginClient
}
// Fingerprint is used to retrieve the set of devices and their health from the
// device plugin. An error may be immediately returned if the fingerprint call
// could not be made or as part of the streaming response. If the context is
// cancelled, the error will be propogated.
func (d *devicePluginClient) Fingerprint(ctx context.Context) (<-chan *FingerprintResponse, error) {
var req proto.FingerprintRequest
stream, err := d.client.Fingerprint(ctx, &req)
if err != nil {
return nil, err
}
out := make(chan *FingerprintResponse, 1)
go d.handleFingerprint(ctx, stream, out)
return out, nil
}
// handleFingerprint should be launched in a goroutine and handles converting
// the gRPC stream to a channel. Exits either when context is cancelled or the
// stream has an error.
func (d *devicePluginClient) handleFingerprint(
ctx netctx.Context,
stream proto.DevicePlugin_FingerprintClient,
out chan *FingerprintResponse) {
for {
resp, err := stream.Recv()
if err != nil {
// Handle a non-graceful stream error
if err != io.EOF {
if errStatus := status.FromContextError(ctx.Err()); errStatus.Code() == codes.Canceled {
err = context.Canceled
}
out <- &FingerprintResponse{
Error: err,
}
}
// End the stream
close(out)
return
}
// Send the response
out <- &FingerprintResponse{
Devices: convertProtoDeviceGroups(resp.GetDeviceGroup()),
}
}
}
func (d *devicePluginClient) Reserve(deviceIDs []string) (*ContainerReservation, error) {
// Build the request
req := &proto.ReserveRequest{
DeviceIds: deviceIDs,
}
// Make the request
resp, err := d.client.Reserve(context.Background(), req)
if err != nil {
return nil, err
}
// Convert the response
out := convertProtoContainerReservation(resp.GetContainerRes())
return out, nil
}

114
plugins/device/device.go Normal file
View File

@ -0,0 +1,114 @@
package device
import (
"context"
"github.com/hashicorp/nomad/plugins/base"
)
const (
// DeviceTypeGPU is a canonical device type for a GPU.
DeviceTypeGPU = "gpu"
)
// DevicePlugin is the interface for a plugin that can expose detected devices
// to Nomad and inform it how to mount them.
type DevicePlugin interface {
base.BasePlugin
// Fingerprint returns a stream of devices that are detected.
Fingerprint(ctx context.Context) (<-chan *FingerprintResponse, error)
// Reserve is used to reserve a set of devices and retrieve mount
// instructions.
Reserve(deviceIDs []string) (*ContainerReservation, error)
}
// FingerprintResponse includes a set of detected devices or an error in the
// process of fingerprinting.
type FingerprintResponse struct {
// Devices is a set of devices that have been detected.
Devices []*DeviceGroup
// Error is populated when fingerprinting has failed.
Error error
}
// DeviceGroup is a grouping of devices that share a common vendor, device type
// and name.
type DeviceGroup struct {
// Vendor is the vendor providing the device (nvidia, intel, etc).
Vendor string
// Type is the type of the device (gpu, fpga, etc).
Type string
// Name is the devices model name.
Name string
// Devices is the set of device instances.
Devices []*Device
// Attributes are a set of attributes shared for all the devices.
Attributes map[string]string
}
// Device is an instance of a particular device.
type Device struct {
// ID is the identifier for the device.
ID string
// Healthy marks whether the device is healthy and can be used for
// scheduling.
Healthy bool
// HealthDesc describes why the device may be unhealthy.
HealthDesc string
// HwLocality captures hardware locality information for the device.
HwLocality *DeviceLocality
}
// DeviceLocality captures hardware locality information for a device.
type DeviceLocality struct {
// PciBusID is the PCI bus ID of the device.
PciBusID string
}
// ContainerReservation describes how to mount a device into a container. A
// container is an isolated environment that shares the host's OS.
type ContainerReservation struct {
// Envs are a set of environment variables to set for the task.
Envs map[string]string
// Mounts are used to mount host volumes into a container that may include
// libraries, etc.
Mounts []*Mount
// Devices are the set of devices to mount into the container.
Devices []*DeviceSpec
}
// Mount is used to mount a host directory into a container.
type Mount struct {
// TaskPath is the location in the task's file system to mount.
TaskPath string
// HostPath is the host directory path to mount.
HostPath string
// ReadOnly defines whether the mount should be read only to the task.
ReadOnly bool
}
// DeviceSpec captures how to mount a device into a container.
type DeviceSpec struct {
// TaskPath is the location to mount the device in the task's file system.
TaskPath string
// HostPath is the host location of the device.
HostPath string
// CgroupPerms defines the permissions to use when mounting the device.
CgroupPerms string
}

23
plugins/device/mock.go Normal file
View File

@ -0,0 +1,23 @@
package device
import (
"context"
"github.com/hashicorp/nomad/plugins/base"
)
// MockDevicePlugin is used for testing.
// Each function can be set as a closure to make assertions about how data
// is passed through the base plugin layer.
type MockDevicePlugin struct {
*base.MockPlugin
FingerprintF func(context.Context) (<-chan *FingerprintResponse, error)
ReserveF func([]string) (*ContainerReservation, error)
}
func (p *MockDevicePlugin) Fingerprint(ctx context.Context) (<-chan *FingerprintResponse, error) {
return p.FingerprintF(ctx)
}
func (p *MockDevicePlugin) Reserve(devices []string) (*ContainerReservation, error) {
return p.ReserveF(devices)
}

35
plugins/device/plugin.go Normal file
View File

@ -0,0 +1,35 @@
package device
import (
"context"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/plugins/base"
bproto "github.com/hashicorp/nomad/plugins/base/proto"
"github.com/hashicorp/nomad/plugins/device/proto"
"google.golang.org/grpc"
)
// PluginDevice is wraps a DevicePlugin and implements go-plugins GRPCPlugin
// interface to expose the interface over gRPC.
type PluginDevice struct {
plugin.NetRPCUnsupportedPlugin
Impl DevicePlugin
}
func (p *PluginDevice) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
proto.RegisterDevicePluginServer(s, &devicePluginServer{
impl: p.Impl,
broker: broker,
})
return nil
}
func (p *PluginDevice) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &devicePluginClient{
client: proto.NewDevicePluginClient(c),
BasePluginClient: &base.BasePluginClient{
Client: bproto.NewBasePluginClient(c),
},
}, nil
}

View File

@ -0,0 +1,445 @@
package device
import (
"context"
"fmt"
"testing"
"time"
pb "github.com/golang/protobuf/proto"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
"github.com/zclconf/go-cty/cty"
"github.com/zclconf/go-cty/cty/msgpack"
"google.golang.org/grpc/status"
)
func TestDevicePlugin_PluginInfo(t *testing.T) {
t.Parallel()
require := require.New(t)
const (
apiVersion = "v0.1.0"
pluginVersion = "v0.2.1"
pluginName = "mock"
)
knownType := func() (*base.PluginInfoResponse, error) {
info := &base.PluginInfoResponse{
Type: base.PluginTypeDevice,
PluginApiVersion: apiVersion,
PluginVersion: pluginVersion,
Name: pluginName,
}
return info, nil
}
unknownType := func() (*base.PluginInfoResponse, error) {
info := &base.PluginInfoResponse{
Type: "bad",
PluginApiVersion: apiVersion,
PluginVersion: pluginVersion,
Name: pluginName,
}
return info, nil
}
mock := &MockDevicePlugin{
MockPlugin: &base.MockPlugin{
PluginInfoF: knownType,
},
}
client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{
base.PluginTypeBase: &base.PluginBase{Impl: mock},
base.PluginTypeDevice: &PluginDevice{Impl: mock},
})
defer server.Stop()
defer client.Close()
raw, err := client.Dispense(base.PluginTypeDevice)
if err != nil {
t.Fatalf("err: %s", err)
}
impl, ok := raw.(DevicePlugin)
if !ok {
t.Fatalf("bad: %#v", raw)
}
resp, err := impl.PluginInfo()
require.NoError(err)
require.Equal(apiVersion, resp.PluginApiVersion)
require.Equal(pluginVersion, resp.PluginVersion)
require.Equal(pluginName, resp.Name)
require.Equal(base.PluginTypeDevice, resp.Type)
// Swap the implementation to return an unknown type
mock.PluginInfoF = unknownType
_, err = impl.PluginInfo()
require.Error(err)
require.Contains(err.Error(), "unknown type")
}
func TestDevicePlugin_ConfigSchema(t *testing.T) {
t.Parallel()
require := require.New(t)
mock := &MockDevicePlugin{
MockPlugin: &base.MockPlugin{
ConfigSchemaF: func() (*hclspec.Spec, error) {
return base.TestSpec, nil
},
},
}
client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{
base.PluginTypeBase: &base.PluginBase{Impl: mock},
base.PluginTypeDevice: &PluginDevice{Impl: mock},
})
defer server.Stop()
defer client.Close()
raw, err := client.Dispense(base.PluginTypeDevice)
if err != nil {
t.Fatalf("err: %s", err)
}
impl, ok := raw.(DevicePlugin)
if !ok {
t.Fatalf("bad: %#v", raw)
}
specOut, err := impl.ConfigSchema()
require.NoError(err)
require.True(pb.Equal(base.TestSpec, specOut))
}
func TestDevicePlugin_SetConfig(t *testing.T) {
t.Parallel()
require := require.New(t)
var receivedData []byte
mock := &MockDevicePlugin{
MockPlugin: &base.MockPlugin{
ConfigSchemaF: func() (*hclspec.Spec, error) {
return base.TestSpec, nil
},
SetConfigF: func(data []byte) error {
receivedData = data
return nil
},
},
}
client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{
base.PluginTypeBase: &base.PluginBase{Impl: mock},
base.PluginTypeDevice: &PluginDevice{Impl: mock},
})
defer server.Stop()
defer client.Close()
raw, err := client.Dispense(base.PluginTypeDevice)
if err != nil {
t.Fatalf("err: %s", err)
}
impl, ok := raw.(DevicePlugin)
if !ok {
t.Fatalf("bad: %#v", raw)
}
config := cty.ObjectVal(map[string]cty.Value{
"foo": cty.StringVal("v1"),
"bar": cty.NumberIntVal(1337),
"baz": cty.BoolVal(true),
})
cdata, err := msgpack.Marshal(config, config.Type())
require.NoError(err)
require.NoError(impl.SetConfig(cdata))
require.Equal(cdata, receivedData)
// Decode the value back
var actual base.TestConfig
require.NoError(structs.Decode(receivedData, &actual))
require.Equal("v1", actual.Foo)
require.EqualValues(1337, actual.Bar)
require.True(actual.Baz)
}
func TestDevicePlugin_Fingerprint(t *testing.T) {
t.Parallel()
require := require.New(t)
devices1 := []*DeviceGroup{
{
Vendor: "nvidia",
Type: DeviceTypeGPU,
Name: "foo",
},
}
devices2 := []*DeviceGroup{
{
Vendor: "nvidia",
Type: DeviceTypeGPU,
Name: "foo",
},
{
Vendor: "nvidia",
Type: DeviceTypeGPU,
Name: "bar",
},
}
mock := &MockDevicePlugin{
FingerprintF: func(ctx context.Context) (<-chan *FingerprintResponse, error) {
outCh := make(chan *FingerprintResponse, 1)
go func() {
// Send two messages
for _, devs := range [][]*DeviceGroup{devices1, devices2} {
select {
case <-ctx.Done():
return
case outCh <- &FingerprintResponse{Devices: devs}:
}
}
close(outCh)
return
}()
return outCh, nil
},
}
client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{
base.PluginTypeBase: &base.PluginBase{Impl: mock},
base.PluginTypeDevice: &PluginDevice{Impl: mock},
})
defer server.Stop()
defer client.Close()
raw, err := client.Dispense(base.PluginTypeDevice)
if err != nil {
t.Fatalf("err: %s", err)
}
impl, ok := raw.(DevicePlugin)
if !ok {
t.Fatalf("bad: %#v", raw)
}
// Create a context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Get the stream
stream, err := impl.Fingerprint(ctx)
require.NoError(err)
// Get the first message
var first *FingerprintResponse
select {
case <-time.After(1 * time.Second):
t.Fatal("timeout")
case first = <-stream:
}
require.NoError(first.Error)
require.EqualValues(devices1, first.Devices)
// Get the second message
var second *FingerprintResponse
select {
case <-time.After(1 * time.Second):
t.Fatal("timeout")
case second = <-stream:
}
require.NoError(second.Error)
require.EqualValues(devices2, second.Devices)
select {
case _, ok := <-stream:
require.False(ok)
case <-time.After(1 * time.Second):
t.Fatal("stream should be closed")
}
}
func TestDevicePlugin_Fingerprint_StreamErr(t *testing.T) {
t.Parallel()
require := require.New(t)
ferr := fmt.Errorf("mock fingerprinting failed")
mock := &MockDevicePlugin{
FingerprintF: func(ctx context.Context) (<-chan *FingerprintResponse, error) {
outCh := make(chan *FingerprintResponse, 1)
go func() {
// Send the error
select {
case <-ctx.Done():
return
case outCh <- &FingerprintResponse{Error: ferr}:
}
close(outCh)
return
}()
return outCh, nil
},
}
client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{
base.PluginTypeBase: &base.PluginBase{Impl: mock},
base.PluginTypeDevice: &PluginDevice{Impl: mock},
})
defer server.Stop()
defer client.Close()
raw, err := client.Dispense(base.PluginTypeDevice)
if err != nil {
t.Fatalf("err: %s", err)
}
impl, ok := raw.(DevicePlugin)
if !ok {
t.Fatalf("bad: %#v", raw)
}
// Create a context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Get the stream
stream, err := impl.Fingerprint(ctx)
require.NoError(err)
// Get the first message
var first *FingerprintResponse
select {
case <-time.After(1 * time.Second):
t.Fatal("timeout")
case first = <-stream:
}
errStatus := status.Convert(ferr)
require.EqualError(first.Error, errStatus.Err().Error())
}
func TestDevicePlugin_Fingerprint_CancelCtx(t *testing.T) {
t.Parallel()
require := require.New(t)
mock := &MockDevicePlugin{
FingerprintF: func(ctx context.Context) (<-chan *FingerprintResponse, error) {
outCh := make(chan *FingerprintResponse, 1)
go func() {
<-ctx.Done()
close(outCh)
return
}()
return outCh, nil
},
}
client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{
base.PluginTypeBase: &base.PluginBase{Impl: mock},
base.PluginTypeDevice: &PluginDevice{Impl: mock},
})
defer server.Stop()
defer client.Close()
raw, err := client.Dispense(base.PluginTypeDevice)
if err != nil {
t.Fatalf("err: %s", err)
}
impl, ok := raw.(DevicePlugin)
if !ok {
t.Fatalf("bad: %#v", raw)
}
// Create a context
ctx, cancel := context.WithCancel(context.Background())
// Get the stream
stream, err := impl.Fingerprint(ctx)
require.NoError(err)
// Get the first message
select {
case <-time.After(testutil.Timeout(10 * time.Millisecond)):
case _ = <-stream:
t.Fatal("bad value")
}
// Cancel the context
cancel()
// Make sure we are done
select {
case <-time.After(100 * time.Millisecond):
t.Fatalf("timeout")
case v := <-stream:
require.Error(v.Error)
require.EqualError(v.Error, context.Canceled.Error())
}
}
func TestDevicePlugin_Reserve(t *testing.T) {
t.Parallel()
require := require.New(t)
reservation := &ContainerReservation{
Envs: map[string]string{
"foo": "bar",
},
Mounts: []*Mount{
{
TaskPath: "foo",
HostPath: "bar",
ReadOnly: true,
},
},
Devices: []*DeviceSpec{
{
TaskPath: "foo",
HostPath: "bar",
CgroupPerms: "rx",
},
},
}
var received []string
mock := &MockDevicePlugin{
ReserveF: func(devices []string) (*ContainerReservation, error) {
received = devices
return reservation, nil
},
}
client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{
base.PluginTypeBase: &base.PluginBase{Impl: mock},
base.PluginTypeDevice: &PluginDevice{Impl: mock},
})
defer server.Stop()
defer client.Close()
raw, err := client.Dispense(base.PluginTypeDevice)
if err != nil {
t.Fatalf("err: %s", err)
}
impl, ok := raw.(DevicePlugin)
if !ok {
t.Fatalf("bad: %#v", raw)
}
req := []string{"a", "b"}
containerRes, err := impl.Reserve(req)
require.NoError(err)
require.EqualValues(req, received)
require.EqualValues(reservation, containerRes)
}

View File

@ -1,12 +1,11 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// source: github.com/hashicorp/nomad/plugins/device/device.proto // source: github.com/hashicorp/nomad/plugins/device/proto/device.proto
package device package proto
import proto "github.com/golang/protobuf/proto" import proto "github.com/golang/protobuf/proto"
import fmt "fmt" import fmt "fmt"
import math "math" import math "math"
import empty "github.com/golang/protobuf/ptypes/empty"
import ( import (
context "golang.org/x/net/context" context "golang.org/x/net/context"
@ -24,9 +23,81 @@ var _ = math.Inf
// proto package needs to be updated. // proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
// DetectedDevices is the set of devices that the device plugin has // FingerprintRequest is used to request for devices to be fingerprinted.
// detected and is exposing type FingerprintRequest struct {
type DetectedDevices struct { XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *FingerprintRequest) Reset() { *m = FingerprintRequest{} }
func (m *FingerprintRequest) String() string { return proto.CompactTextString(m) }
func (*FingerprintRequest) ProtoMessage() {}
func (*FingerprintRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_device_7496b084f8b5ea81, []int{0}
}
func (m *FingerprintRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FingerprintRequest.Unmarshal(m, b)
}
func (m *FingerprintRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_FingerprintRequest.Marshal(b, m, deterministic)
}
func (dst *FingerprintRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_FingerprintRequest.Merge(dst, src)
}
func (m *FingerprintRequest) XXX_Size() int {
return xxx_messageInfo_FingerprintRequest.Size(m)
}
func (m *FingerprintRequest) XXX_DiscardUnknown() {
xxx_messageInfo_FingerprintRequest.DiscardUnknown(m)
}
var xxx_messageInfo_FingerprintRequest proto.InternalMessageInfo
// FingerprintResponse returns a set of detected devices.
type FingerprintResponse struct {
// device_group is a group of devices that share a vendor, device_type, and
// device_name. This is returned as a set so that a single plugin could
// potentially detect several device types and models.
DeviceGroup []*DeviceGroup `protobuf:"bytes,1,rep,name=device_group,json=deviceGroup,proto3" json:"device_group,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *FingerprintResponse) Reset() { *m = FingerprintResponse{} }
func (m *FingerprintResponse) String() string { return proto.CompactTextString(m) }
func (*FingerprintResponse) ProtoMessage() {}
func (*FingerprintResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_device_7496b084f8b5ea81, []int{1}
}
func (m *FingerprintResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FingerprintResponse.Unmarshal(m, b)
}
func (m *FingerprintResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_FingerprintResponse.Marshal(b, m, deterministic)
}
func (dst *FingerprintResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_FingerprintResponse.Merge(dst, src)
}
func (m *FingerprintResponse) XXX_Size() int {
return xxx_messageInfo_FingerprintResponse.Size(m)
}
func (m *FingerprintResponse) XXX_DiscardUnknown() {
xxx_messageInfo_FingerprintResponse.DiscardUnknown(m)
}
var xxx_messageInfo_FingerprintResponse proto.InternalMessageInfo
func (m *FingerprintResponse) GetDeviceGroup() []*DeviceGroup {
if m != nil {
return m.DeviceGroup
}
return nil
}
// DeviceGroup is a group of devices that share a vendor, device type and name.
type DeviceGroup struct {
// vendor is the name of the vendor of the device // vendor is the name of the vendor of the device
Vendor string `protobuf:"bytes,1,opt,name=vendor,proto3" json:"vendor,omitempty"` Vendor string `protobuf:"bytes,1,opt,name=vendor,proto3" json:"vendor,omitempty"`
// device_type is the type of the device (gpu, fpga, etc). // device_type is the type of the device (gpu, fpga, etc).
@ -35,96 +106,96 @@ type DetectedDevices struct {
DeviceName string `protobuf:"bytes,3,opt,name=device_name,json=deviceName,proto3" json:"device_name,omitempty"` DeviceName string `protobuf:"bytes,3,opt,name=device_name,json=deviceName,proto3" json:"device_name,omitempty"`
// devices is the set of devices detected by the plugin. // devices is the set of devices detected by the plugin.
Devices []*DetectedDevice `protobuf:"bytes,4,rep,name=devices,proto3" json:"devices,omitempty"` Devices []*DetectedDevice `protobuf:"bytes,4,rep,name=devices,proto3" json:"devices,omitempty"`
// node_attributes allows adding node attributes to be used for // attributes allows adding attributes to be used for constraints or
// constraints or affinities. // affinities.
NodeAttributes map[string]string `protobuf:"bytes,5,rep,name=node_attributes,json=nodeAttributes,proto3" json:"node_attributes,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` Attributes map[string]string `protobuf:"bytes,5,rep,name=attributes,proto3" json:"attributes,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
} }
func (m *DetectedDevices) Reset() { *m = DetectedDevices{} } func (m *DeviceGroup) Reset() { *m = DeviceGroup{} }
func (m *DetectedDevices) String() string { return proto.CompactTextString(m) } func (m *DeviceGroup) String() string { return proto.CompactTextString(m) }
func (*DetectedDevices) ProtoMessage() {} func (*DeviceGroup) ProtoMessage() {}
func (*DetectedDevices) Descriptor() ([]byte, []int) { func (*DeviceGroup) Descriptor() ([]byte, []int) {
return fileDescriptor_device_13acb8ec0117c3b0, []int{0} return fileDescriptor_device_7496b084f8b5ea81, []int{2}
} }
func (m *DetectedDevices) XXX_Unmarshal(b []byte) error { func (m *DeviceGroup) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DetectedDevices.Unmarshal(m, b) return xxx_messageInfo_DeviceGroup.Unmarshal(m, b)
} }
func (m *DetectedDevices) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { func (m *DeviceGroup) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_DetectedDevices.Marshal(b, m, deterministic) return xxx_messageInfo_DeviceGroup.Marshal(b, m, deterministic)
} }
func (dst *DetectedDevices) XXX_Merge(src proto.Message) { func (dst *DeviceGroup) XXX_Merge(src proto.Message) {
xxx_messageInfo_DetectedDevices.Merge(dst, src) xxx_messageInfo_DeviceGroup.Merge(dst, src)
} }
func (m *DetectedDevices) XXX_Size() int { func (m *DeviceGroup) XXX_Size() int {
return xxx_messageInfo_DetectedDevices.Size(m) return xxx_messageInfo_DeviceGroup.Size(m)
} }
func (m *DetectedDevices) XXX_DiscardUnknown() { func (m *DeviceGroup) XXX_DiscardUnknown() {
xxx_messageInfo_DetectedDevices.DiscardUnknown(m) xxx_messageInfo_DeviceGroup.DiscardUnknown(m)
} }
var xxx_messageInfo_DetectedDevices proto.InternalMessageInfo var xxx_messageInfo_DeviceGroup proto.InternalMessageInfo
func (m *DetectedDevices) GetVendor() string { func (m *DeviceGroup) GetVendor() string {
if m != nil { if m != nil {
return m.Vendor return m.Vendor
} }
return "" return ""
} }
func (m *DetectedDevices) GetDeviceType() string { func (m *DeviceGroup) GetDeviceType() string {
if m != nil { if m != nil {
return m.DeviceType return m.DeviceType
} }
return "" return ""
} }
func (m *DetectedDevices) GetDeviceName() string { func (m *DeviceGroup) GetDeviceName() string {
if m != nil { if m != nil {
return m.DeviceName return m.DeviceName
} }
return "" return ""
} }
func (m *DetectedDevices) GetDevices() []*DetectedDevice { func (m *DeviceGroup) GetDevices() []*DetectedDevice {
if m != nil { if m != nil {
return m.Devices return m.Devices
} }
return nil return nil
} }
func (m *DetectedDevices) GetNodeAttributes() map[string]string { func (m *DeviceGroup) GetAttributes() map[string]string {
if m != nil { if m != nil {
return m.NodeAttributes return m.Attributes
} }
return nil return nil
} }
// DetectedDevice is a single detected device. // DetectedDevice is a single detected device.
type DetectedDevice struct { type DetectedDevice struct {
// ID is the ID of the device. This ID is used during // ID is the ID of the device. This ID is used during allocation and must be
// allocation. // stable across restarts of the device driver.
ID string `protobuf:"bytes,1,opt,name=ID,proto3" json:"ID,omitempty"` ID string `protobuf:"bytes,1,opt,name=ID,proto3" json:"ID,omitempty"`
// Health of the device. // Health of the device.
Healthy bool `protobuf:"varint,2,opt,name=healthy,proto3" json:"healthy,omitempty"` Healthy bool `protobuf:"varint,2,opt,name=healthy,proto3" json:"healthy,omitempty"`
// health_description allows the device plugin to optionally // health_description allows the device plugin to optionally
// annotate the health field with a human readable reason. // annotate the health field with a human readable reason.
HealthDescription string `protobuf:"bytes,3,opt,name=health_description,json=healthDescription,proto3" json:"health_description,omitempty"` HealthDescription string `protobuf:"bytes,3,opt,name=health_description,json=healthDescription,proto3" json:"health_description,omitempty"`
// pci_bus_id is the PCI bus ID for the device. If reported, it // hw_locality is optionally set to expose hardware locality information for
// allows Nomad to make NUMA aware optimizations. // more optimal placement decisions.
PciBusId string `protobuf:"bytes,4,opt,name=pci_bus_id,json=pciBusId,proto3" json:"pci_bus_id,omitempty"` HwLocality *DeviceLocality `protobuf:"bytes,4,opt,name=hw_locality,json=hwLocality,proto3" json:"hw_locality,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
} }
func (m *DetectedDevice) Reset() { *m = DetectedDevice{} } func (m *DetectedDevice) Reset() { *m = DetectedDevice{} }
func (m *DetectedDevice) String() string { return proto.CompactTextString(m) } func (m *DetectedDevice) String() string { return proto.CompactTextString(m) }
func (*DetectedDevice) ProtoMessage() {} func (*DetectedDevice) ProtoMessage() {}
func (*DetectedDevice) Descriptor() ([]byte, []int) { func (*DetectedDevice) Descriptor() ([]byte, []int) {
return fileDescriptor_device_13acb8ec0117c3b0, []int{1} return fileDescriptor_device_7496b084f8b5ea81, []int{3}
} }
func (m *DetectedDevice) XXX_Unmarshal(b []byte) error { func (m *DetectedDevice) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DetectedDevice.Unmarshal(m, b) return xxx_messageInfo_DetectedDevice.Unmarshal(m, b)
@ -165,7 +236,48 @@ func (m *DetectedDevice) GetHealthDescription() string {
return "" return ""
} }
func (m *DetectedDevice) GetPciBusId() string { func (m *DetectedDevice) GetHwLocality() *DeviceLocality {
if m != nil {
return m.HwLocality
}
return nil
}
// DeviceLocality is used to expose HW locality information about a device.
type DeviceLocality struct {
// pci_bus_id is the PCI bus ID for the device. If reported, it
// allows Nomad to make NUMA aware optimizations.
PciBusId string `protobuf:"bytes,1,opt,name=pci_bus_id,json=pciBusId,proto3" json:"pci_bus_id,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *DeviceLocality) Reset() { *m = DeviceLocality{} }
func (m *DeviceLocality) String() string { return proto.CompactTextString(m) }
func (*DeviceLocality) ProtoMessage() {}
func (*DeviceLocality) Descriptor() ([]byte, []int) {
return fileDescriptor_device_7496b084f8b5ea81, []int{4}
}
func (m *DeviceLocality) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DeviceLocality.Unmarshal(m, b)
}
func (m *DeviceLocality) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_DeviceLocality.Marshal(b, m, deterministic)
}
func (dst *DeviceLocality) XXX_Merge(src proto.Message) {
xxx_messageInfo_DeviceLocality.Merge(dst, src)
}
func (m *DeviceLocality) XXX_Size() int {
return xxx_messageInfo_DeviceLocality.Size(m)
}
func (m *DeviceLocality) XXX_DiscardUnknown() {
xxx_messageInfo_DeviceLocality.DiscardUnknown(m)
}
var xxx_messageInfo_DeviceLocality proto.InternalMessageInfo
func (m *DeviceLocality) GetPciBusId() string {
if m != nil { if m != nil {
return m.PciBusId return m.PciBusId
} }
@ -186,7 +298,7 @@ func (m *ReserveRequest) Reset() { *m = ReserveRequest{} }
func (m *ReserveRequest) String() string { return proto.CompactTextString(m) } func (m *ReserveRequest) String() string { return proto.CompactTextString(m) }
func (*ReserveRequest) ProtoMessage() {} func (*ReserveRequest) ProtoMessage() {}
func (*ReserveRequest) Descriptor() ([]byte, []int) { func (*ReserveRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_device_13acb8ec0117c3b0, []int{2} return fileDescriptor_device_7496b084f8b5ea81, []int{5}
} }
func (m *ReserveRequest) XXX_Unmarshal(b []byte) error { func (m *ReserveRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReserveRequest.Unmarshal(m, b) return xxx_messageInfo_ReserveRequest.Unmarshal(m, b)
@ -229,7 +341,7 @@ func (m *ReserveResponse) Reset() { *m = ReserveResponse{} }
func (m *ReserveResponse) String() string { return proto.CompactTextString(m) } func (m *ReserveResponse) String() string { return proto.CompactTextString(m) }
func (*ReserveResponse) ProtoMessage() {} func (*ReserveResponse) ProtoMessage() {}
func (*ReserveResponse) Descriptor() ([]byte, []int) { func (*ReserveResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_device_13acb8ec0117c3b0, []int{3} return fileDescriptor_device_7496b084f8b5ea81, []int{6}
} }
func (m *ReserveResponse) XXX_Unmarshal(b []byte) error { func (m *ReserveResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReserveResponse.Unmarshal(m, b) return xxx_messageInfo_ReserveResponse.Unmarshal(m, b)
@ -274,7 +386,7 @@ func (m *ContainerReservation) Reset() { *m = ContainerReservation{} }
func (m *ContainerReservation) String() string { return proto.CompactTextString(m) } func (m *ContainerReservation) String() string { return proto.CompactTextString(m) }
func (*ContainerReservation) ProtoMessage() {} func (*ContainerReservation) ProtoMessage() {}
func (*ContainerReservation) Descriptor() ([]byte, []int) { func (*ContainerReservation) Descriptor() ([]byte, []int) {
return fileDescriptor_device_13acb8ec0117c3b0, []int{4} return fileDescriptor_device_7496b084f8b5ea81, []int{7}
} }
func (m *ContainerReservation) XXX_Unmarshal(b []byte) error { func (m *ContainerReservation) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ContainerReservation.Unmarshal(m, b) return xxx_messageInfo_ContainerReservation.Unmarshal(m, b)
@ -333,7 +445,7 @@ func (m *Mount) Reset() { *m = Mount{} }
func (m *Mount) String() string { return proto.CompactTextString(m) } func (m *Mount) String() string { return proto.CompactTextString(m) }
func (*Mount) ProtoMessage() {} func (*Mount) ProtoMessage() {}
func (*Mount) Descriptor() ([]byte, []int) { func (*Mount) Descriptor() ([]byte, []int) {
return fileDescriptor_device_13acb8ec0117c3b0, []int{5} return fileDescriptor_device_7496b084f8b5ea81, []int{8}
} }
func (m *Mount) XXX_Unmarshal(b []byte) error { func (m *Mount) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Mount.Unmarshal(m, b) return xxx_messageInfo_Mount.Unmarshal(m, b)
@ -394,7 +506,7 @@ func (m *DeviceSpec) Reset() { *m = DeviceSpec{} }
func (m *DeviceSpec) String() string { return proto.CompactTextString(m) } func (m *DeviceSpec) String() string { return proto.CompactTextString(m) }
func (*DeviceSpec) ProtoMessage() {} func (*DeviceSpec) ProtoMessage() {}
func (*DeviceSpec) Descriptor() ([]byte, []int) { func (*DeviceSpec) Descriptor() ([]byte, []int) {
return fileDescriptor_device_13acb8ec0117c3b0, []int{6} return fileDescriptor_device_7496b084f8b5ea81, []int{9}
} }
func (m *DeviceSpec) XXX_Unmarshal(b []byte) error { func (m *DeviceSpec) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DeviceSpec.Unmarshal(m, b) return xxx_messageInfo_DeviceSpec.Unmarshal(m, b)
@ -436,9 +548,12 @@ func (m *DeviceSpec) GetPermissions() string {
} }
func init() { func init() {
proto.RegisterType((*DetectedDevices)(nil), "hashicorp.nomad.plugins.device.DetectedDevices") proto.RegisterType((*FingerprintRequest)(nil), "hashicorp.nomad.plugins.device.FingerprintRequest")
proto.RegisterMapType((map[string]string)(nil), "hashicorp.nomad.plugins.device.DetectedDevices.NodeAttributesEntry") proto.RegisterType((*FingerprintResponse)(nil), "hashicorp.nomad.plugins.device.FingerprintResponse")
proto.RegisterType((*DeviceGroup)(nil), "hashicorp.nomad.plugins.device.DeviceGroup")
proto.RegisterMapType((map[string]string)(nil), "hashicorp.nomad.plugins.device.DeviceGroup.AttributesEntry")
proto.RegisterType((*DetectedDevice)(nil), "hashicorp.nomad.plugins.device.DetectedDevice") proto.RegisterType((*DetectedDevice)(nil), "hashicorp.nomad.plugins.device.DetectedDevice")
proto.RegisterType((*DeviceLocality)(nil), "hashicorp.nomad.plugins.device.DeviceLocality")
proto.RegisterType((*ReserveRequest)(nil), "hashicorp.nomad.plugins.device.ReserveRequest") proto.RegisterType((*ReserveRequest)(nil), "hashicorp.nomad.plugins.device.ReserveRequest")
proto.RegisterType((*ReserveResponse)(nil), "hashicorp.nomad.plugins.device.ReserveResponse") proto.RegisterType((*ReserveResponse)(nil), "hashicorp.nomad.plugins.device.ReserveResponse")
proto.RegisterType((*ContainerReservation)(nil), "hashicorp.nomad.plugins.device.ContainerReservation") proto.RegisterType((*ContainerReservation)(nil), "hashicorp.nomad.plugins.device.ContainerReservation")
@ -462,7 +577,7 @@ type DevicePluginClient interface {
// Fingerprint allows the device plugin to return a set of // Fingerprint allows the device plugin to return a set of
// detected devices and provide a mechanism to update the state of // detected devices and provide a mechanism to update the state of
// the device. // the device.
Fingerprint(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (DevicePlugin_FingerprintClient, error) Fingerprint(ctx context.Context, in *FingerprintRequest, opts ...grpc.CallOption) (DevicePlugin_FingerprintClient, error)
// Reserve is called by the client before starting an allocation // Reserve is called by the client before starting an allocation
// that requires access to the plugins devices. The plugin can use // that requires access to the plugins devices. The plugin can use
// this to run any setup steps and provides the mounting details to // this to run any setup steps and provides the mounting details to
@ -478,7 +593,7 @@ func NewDevicePluginClient(cc *grpc.ClientConn) DevicePluginClient {
return &devicePluginClient{cc} return &devicePluginClient{cc}
} }
func (c *devicePluginClient) Fingerprint(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (DevicePlugin_FingerprintClient, error) { func (c *devicePluginClient) Fingerprint(ctx context.Context, in *FingerprintRequest, opts ...grpc.CallOption) (DevicePlugin_FingerprintClient, error) {
stream, err := c.cc.NewStream(ctx, &_DevicePlugin_serviceDesc.Streams[0], "/hashicorp.nomad.plugins.device.DevicePlugin/Fingerprint", opts...) stream, err := c.cc.NewStream(ctx, &_DevicePlugin_serviceDesc.Streams[0], "/hashicorp.nomad.plugins.device.DevicePlugin/Fingerprint", opts...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -494,7 +609,7 @@ func (c *devicePluginClient) Fingerprint(ctx context.Context, in *empty.Empty, o
} }
type DevicePlugin_FingerprintClient interface { type DevicePlugin_FingerprintClient interface {
Recv() (*DetectedDevices, error) Recv() (*FingerprintResponse, error)
grpc.ClientStream grpc.ClientStream
} }
@ -502,8 +617,8 @@ type devicePluginFingerprintClient struct {
grpc.ClientStream grpc.ClientStream
} }
func (x *devicePluginFingerprintClient) Recv() (*DetectedDevices, error) { func (x *devicePluginFingerprintClient) Recv() (*FingerprintResponse, error) {
m := new(DetectedDevices) m := new(FingerprintResponse)
if err := x.ClientStream.RecvMsg(m); err != nil { if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err return nil, err
} }
@ -524,7 +639,7 @@ type DevicePluginServer interface {
// Fingerprint allows the device plugin to return a set of // Fingerprint allows the device plugin to return a set of
// detected devices and provide a mechanism to update the state of // detected devices and provide a mechanism to update the state of
// the device. // the device.
Fingerprint(*empty.Empty, DevicePlugin_FingerprintServer) error Fingerprint(*FingerprintRequest, DevicePlugin_FingerprintServer) error
// Reserve is called by the client before starting an allocation // Reserve is called by the client before starting an allocation
// that requires access to the plugins devices. The plugin can use // that requires access to the plugins devices. The plugin can use
// this to run any setup steps and provides the mounting details to // this to run any setup steps and provides the mounting details to
@ -537,7 +652,7 @@ func RegisterDevicePluginServer(s *grpc.Server, srv DevicePluginServer) {
} }
func _DevicePlugin_Fingerprint_Handler(srv interface{}, stream grpc.ServerStream) error { func _DevicePlugin_Fingerprint_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(empty.Empty) m := new(FingerprintRequest)
if err := stream.RecvMsg(m); err != nil { if err := stream.RecvMsg(m); err != nil {
return err return err
} }
@ -545,7 +660,7 @@ func _DevicePlugin_Fingerprint_Handler(srv interface{}, stream grpc.ServerStream
} }
type DevicePlugin_FingerprintServer interface { type DevicePlugin_FingerprintServer interface {
Send(*DetectedDevices) error Send(*FingerprintResponse) error
grpc.ServerStream grpc.ServerStream
} }
@ -553,7 +668,7 @@ type devicePluginFingerprintServer struct {
grpc.ServerStream grpc.ServerStream
} }
func (x *devicePluginFingerprintServer) Send(m *DetectedDevices) error { func (x *devicePluginFingerprintServer) Send(m *FingerprintResponse) error {
return x.ServerStream.SendMsg(m) return x.ServerStream.SendMsg(m)
} }
@ -591,54 +706,56 @@ var _DevicePlugin_serviceDesc = grpc.ServiceDesc{
ServerStreams: true, ServerStreams: true,
}, },
}, },
Metadata: "github.com/hashicorp/nomad/plugins/device/device.proto", Metadata: "github.com/hashicorp/nomad/plugins/device/proto/device.proto",
} }
func init() { func init() {
proto.RegisterFile("github.com/hashicorp/nomad/plugins/device/device.proto", fileDescriptor_device_13acb8ec0117c3b0) proto.RegisterFile("github.com/hashicorp/nomad/plugins/device/proto/device.proto", fileDescriptor_device_7496b084f8b5ea81)
} }
var fileDescriptor_device_13acb8ec0117c3b0 = []byte{ var fileDescriptor_device_7496b084f8b5ea81 = []byte{
// 645 bytes of a gzipped FileDescriptorProto // 682 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x54, 0xdd, 0x6e, 0xd3, 0x4c, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xdd, 0x6e, 0xd3, 0x4a,
0x10, 0x6d, 0x92, 0x36, 0x75, 0x26, 0xfd, 0xd2, 0x8f, 0xa5, 0xaa, 0xac, 0x94, 0x9f, 0xc8, 0x12, 0x10, 0x3e, 0x4e, 0x9a, 0x26, 0x99, 0xf4, 0xb4, 0xe7, 0xec, 0xa9, 0x8e, 0xac, 0xf0, 0x17, 0x59,
0x52, 0x85, 0x84, 0x8d, 0x02, 0x02, 0x84, 0x04, 0x52, 0xdb, 0x14, 0x91, 0x0b, 0x4a, 0x65, 0xb8, 0x42, 0xaa, 0x40, 0xd8, 0x28, 0x45, 0x02, 0x01, 0x45, 0xa2, 0xa4, 0x40, 0x24, 0x68, 0x2b, 0xc3,
0xa1, 0x17, 0x58, 0x8e, 0x3d, 0xc4, 0xab, 0xda, 0xbb, 0x8b, 0x77, 0x1d, 0xc9, 0x4f, 0xc0, 0xab, 0x0d, 0x20, 0x61, 0x39, 0xf6, 0x2a, 0x5e, 0xd5, 0xd9, 0x35, 0xbb, 0xeb, 0x54, 0xe6, 0x89, 0x78,
0xf0, 0x48, 0x3c, 0x01, 0xcf, 0x81, 0xbc, 0xeb, 0xa4, 0x09, 0xaa, 0x88, 0x0a, 0x57, 0xf6, 0xce, 0x06, 0xde, 0x81, 0xf7, 0xe1, 0x12, 0x79, 0x77, 0x9d, 0xb8, 0x80, 0x48, 0x0b, 0x57, 0xbb, 0x33,
0x9c, 0x73, 0xe6, 0x78, 0x3c, 0x3b, 0xf0, 0x6c, 0x4a, 0x55, 0x52, 0x4c, 0xdc, 0x88, 0x67, 0x5e, 0xdf, 0x7c, 0x33, 0x93, 0xd9, 0x6f, 0x1c, 0x78, 0x38, 0x25, 0x32, 0xc9, 0x27, 0x6e, 0xc4, 0x66,
0x12, 0xca, 0x84, 0x46, 0x3c, 0x17, 0x1e, 0xe3, 0x59, 0x18, 0x7b, 0x22, 0x2d, 0xa6, 0x94, 0x49, 0x5e, 0x12, 0x8a, 0x84, 0x44, 0x8c, 0x67, 0x1e, 0x65, 0xb3, 0x30, 0xf6, 0xb2, 0x34, 0x9f, 0x12,
0x2f, 0xc6, 0x19, 0x8d, 0xb0, 0x7e, 0xb8, 0x22, 0xe7, 0x8a, 0x93, 0x7b, 0x0b, 0xb0, 0xab, 0xc1, 0x2a, 0xbc, 0x18, 0xcf, 0x49, 0x84, 0xbd, 0x8c, 0x33, 0xc9, 0x8c, 0xe1, 0x2a, 0x03, 0x5d, 0x5d,
0x6e, 0x0d, 0x76, 0x0d, 0xaa, 0x7f, 0x30, 0xe5, 0x7c, 0x9a, 0xa2, 0xa7, 0xd1, 0x93, 0xe2, 0x8b, 0x50, 0x5c, 0x45, 0x71, 0x0d, 0xc5, 0xd5, 0x51, 0xce, 0x36, 0xa0, 0xa7, 0x84, 0x4e, 0x31, 0xcf,
0x87, 0x99, 0x50, 0xa5, 0x21, 0x3b, 0x3f, 0x9b, 0xb0, 0x3b, 0x42, 0x85, 0x91, 0xc2, 0x78, 0xa4, 0x38, 0xa1, 0xd2, 0xc7, 0x1f, 0x72, 0x2c, 0xa4, 0x83, 0xe1, 0xbf, 0x33, 0x5e, 0x91, 0x31, 0x2a,
0xf1, 0x92, 0xec, 0x43, 0x7b, 0x86, 0x2c, 0xe6, 0xb9, 0xdd, 0x18, 0x34, 0x0e, 0x3b, 0x7e, 0x7d, 0x30, 0x3a, 0x84, 0x0d, 0x4d, 0x0b, 0xa6, 0x9c, 0xe5, 0x99, 0x6d, 0x0d, 0x9a, 0x3b, 0xbd, 0xe1,
0x22, 0xf7, 0xa1, 0x6b, 0x24, 0x03, 0x55, 0x0a, 0xb4, 0x9b, 0x3a, 0x09, 0x26, 0xf4, 0xb1, 0x14, 0x4d, 0xf7, 0xd7, 0x35, 0xdc, 0x91, 0x3a, 0x9e, 0x95, 0x14, 0xbf, 0x17, 0x2f, 0x0d, 0xe7, 0x4b,
0xb8, 0x04, 0x60, 0x61, 0x86, 0x76, 0x6b, 0x19, 0x70, 0x16, 0x66, 0x48, 0xde, 0xc2, 0xb6, 0x39, 0x03, 0x7a, 0x35, 0x10, 0xfd, 0x0f, 0xeb, 0x73, 0x4c, 0x63, 0xc6, 0x6d, 0x6b, 0x60, 0xed, 0x74,
0x49, 0x7b, 0x73, 0xd0, 0x3a, 0xec, 0x0e, 0x5d, 0xf7, 0xcf, 0xe6, 0xdd, 0x55, 0x6f, 0xfe, 0x9c, 0x7d, 0x63, 0xa1, 0x6b, 0x60, 0x68, 0x81, 0x2c, 0x32, 0x6c, 0x37, 0x14, 0x08, 0xda, 0xf5, 0xba,
0x4e, 0x52, 0xd8, 0x65, 0x3c, 0xc6, 0x20, 0x54, 0x2a, 0xa7, 0x93, 0x42, 0xa1, 0xb4, 0xb7, 0xb4, 0xc8, 0x70, 0x2d, 0x80, 0x86, 0x33, 0x6c, 0x37, 0xeb, 0x01, 0x87, 0xe1, 0x0c, 0xa3, 0xe7, 0xd0,
0xe2, 0xc9, 0xcd, 0x14, 0xa5, 0x7b, 0xc6, 0x63, 0x3c, 0x5a, 0xa8, 0x9c, 0x32, 0x95, 0x97, 0x7e, 0xd6, 0x96, 0xb0, 0xd7, 0x54, 0xd3, 0xee, 0xea, 0xa6, 0x25, 0x8e, 0x24, 0x8e, 0x75, 0x7f, 0x7e,
0x8f, 0xad, 0x04, 0xfb, 0x47, 0x70, 0xfb, 0x1a, 0x18, 0xf9, 0x1f, 0x5a, 0x97, 0x58, 0xd6, 0x5d, 0x45, 0x47, 0xef, 0x00, 0x42, 0x29, 0x39, 0x99, 0xe4, 0x12, 0x0b, 0xbb, 0xa5, 0x92, 0x3d, 0xb8,
0xaa, 0x5e, 0xc9, 0x1e, 0x6c, 0xcd, 0xc2, 0xb4, 0x98, 0x37, 0xc7, 0x1c, 0x5e, 0x36, 0x5f, 0x34, 0xc0, 0x04, 0xdc, 0xc7, 0x0b, 0xf6, 0x01, 0x95, 0xbc, 0xf0, 0x6b, 0xe9, 0xfa, 0x7b, 0xb0, 0xf5,
0x9c, 0x6f, 0x0d, 0xe8, 0xad, 0x96, 0x26, 0x3d, 0x68, 0x8e, 0x47, 0x35, 0xbb, 0x39, 0x1e, 0x11, 0x1d, 0x8c, 0xfe, 0x81, 0xe6, 0x09, 0x2e, 0xcc, 0x40, 0xca, 0x2b, 0xda, 0x86, 0xd6, 0x3c, 0x4c,
0x1b, 0xb6, 0x13, 0x0c, 0x53, 0x95, 0x94, 0x9a, 0x6e, 0xf9, 0xf3, 0x23, 0x79, 0x04, 0xc4, 0xbc, 0xf3, 0x6a, 0x0e, 0xda, 0xb8, 0xdf, 0xb8, 0x67, 0x39, 0x9f, 0x2d, 0xd8, 0x3c, 0xdb, 0x37, 0xda,
0x06, 0x31, 0xca, 0x28, 0xa7, 0x42, 0x51, 0xce, 0xea, 0xfe, 0xde, 0x32, 0x99, 0xd1, 0x55, 0x82, 0x84, 0xc6, 0x78, 0x64, 0xd8, 0x8d, 0xf1, 0x08, 0xd9, 0xd0, 0x4e, 0x70, 0x98, 0xca, 0xa4, 0x50,
0xdc, 0x01, 0x10, 0x11, 0x0d, 0x26, 0x85, 0x0c, 0x68, 0x6c, 0x6f, 0x6a, 0x98, 0x25, 0x22, 0x7a, 0xf4, 0x8e, 0x5f, 0x99, 0xe8, 0x16, 0x20, 0x7d, 0x0d, 0x62, 0x2c, 0x22, 0x4e, 0x32, 0x49, 0x18,
0x5c, 0xc8, 0x71, 0xec, 0x78, 0xd0, 0xf3, 0x51, 0x62, 0x3e, 0x43, 0x1f, 0xbf, 0x16, 0x28, 0x15, 0x35, 0xa3, 0xfc, 0x57, 0x23, 0xa3, 0x25, 0x80, 0x8e, 0xa0, 0x97, 0x9c, 0x06, 0x29, 0x8b, 0xc2,
0xb9, 0x0b, 0xf5, 0x4f, 0x0a, 0x68, 0x2c, 0xed, 0xc6, 0xa0, 0x75, 0xd8, 0xf1, 0x3b, 0x26, 0x32, 0x94, 0xc8, 0xc2, 0x5e, 0x1b, 0x58, 0xe7, 0x9b, 0x6a, 0x79, 0xbc, 0x30, 0x2c, 0x1f, 0x92, 0xd3,
0x8e, 0xa5, 0x93, 0xc2, 0xee, 0x82, 0x20, 0x05, 0x67, 0x12, 0xc9, 0x27, 0xf8, 0x2f, 0xe2, 0x4c, 0xea, 0xee, 0xb8, 0x65, 0xef, 0x75, 0x14, 0x5d, 0x06, 0xc8, 0x22, 0x12, 0x4c, 0x72, 0x11, 0x90,
0x85, 0x94, 0x61, 0x1e, 0xe4, 0x28, 0xf5, 0x57, 0x74, 0x87, 0x4f, 0xd7, 0x35, 0xff, 0x64, 0x4e, 0xd8, 0xfc, 0x86, 0x4e, 0x16, 0x91, 0xfd, 0x5c, 0x8c, 0x63, 0xc7, 0x83, 0x4d, 0x1f, 0x0b, 0xcc,
0x32, 0x82, 0x61, 0x65, 0xd7, 0xdf, 0x89, 0x96, 0xa2, 0xce, 0xf7, 0x26, 0xec, 0x5d, 0x07, 0x23, 0xe7, 0xd8, 0xa8, 0x16, 0x5d, 0x01, 0xf3, 0xe4, 0x01, 0x89, 0x85, 0x12, 0x67, 0xd7, 0xef, 0x6a,
0x3e, 0x6c, 0x22, 0x9b, 0x19, 0x7f, 0xdd, 0xe1, 0xeb, 0xbf, 0x29, 0xe5, 0x9e, 0xb2, 0x59, 0xfd, 0xcf, 0x38, 0x16, 0x4e, 0x0a, 0x5b, 0x0b, 0x82, 0x11, 0xf4, 0x1b, 0xf8, 0x3b, 0x62, 0x54, 0x86,
0x8b, 0xb5, 0x16, 0x79, 0x05, 0xed, 0x8c, 0x17, 0x4c, 0x49, 0xbb, 0xa9, 0x55, 0x1f, 0xac, 0x53, 0x84, 0x62, 0x1e, 0x70, 0x2c, 0x54, 0x91, 0xde, 0xf0, 0xce, 0xaa, 0x9f, 0xf1, 0xa4, 0x22, 0xe9,
0x7d, 0x57, 0xa1, 0xfd, 0x9a, 0x44, 0x46, 0x57, 0xf3, 0xdc, 0xd2, 0xfc, 0x87, 0xeb, 0xa7, 0xaf, 0x84, 0x61, 0x39, 0x11, 0x7f, 0x23, 0xaa, 0x79, 0x9d, 0x4f, 0x0d, 0xd8, 0xfe, 0x59, 0x18, 0xf2,
0x7a, 0x7c, 0x10, 0x18, 0x2d, 0x66, 0xb9, 0xff, 0x1c, 0x3a, 0x0b, 0x5f, 0x37, 0x9a, 0xa9, 0xcf, 0x61, 0x0d, 0xd3, 0xb9, 0x30, 0xcb, 0xf3, 0xe8, 0x77, 0x4a, 0xb9, 0x07, 0x74, 0x6e, 0xd4, 0xa3,
0xb0, 0xa5, 0xfd, 0x90, 0x03, 0xe8, 0xa8, 0x50, 0x5e, 0x06, 0x22, 0x54, 0x49, 0x4d, 0xb5, 0xaa, 0x72, 0xa1, 0x3d, 0x58, 0x9f, 0xb1, 0x9c, 0x4a, 0x61, 0x37, 0x54, 0xd6, 0xeb, 0xab, 0xb2, 0xbe,
0xc0, 0x79, 0xa8, 0x92, 0x2a, 0x99, 0x70, 0xa9, 0x4c, 0xd2, 0x68, 0x58, 0x55, 0x60, 0x9e, 0xcc, 0x2c, 0xa3, 0x7d, 0x43, 0x42, 0xa3, 0xe5, 0x76, 0x34, 0x15, 0xff, 0xc6, 0xf9, 0xde, 0xf1, 0x55,
0x31, 0x8c, 0x03, 0xce, 0xd2, 0x52, 0x0f, 0x94, 0xe5, 0x5b, 0x55, 0xe0, 0x3d, 0x4b, 0x4b, 0x27, 0x86, 0xa3, 0xc5, 0x66, 0xf4, 0xef, 0x42, 0x77, 0xd1, 0xd7, 0x85, 0x64, 0xfb, 0x1e, 0x5a, 0xaa,
0x01, 0xb8, 0xf2, 0xfb, 0x0f, 0x45, 0x06, 0xd0, 0x15, 0x98, 0x67, 0x54, 0x4a, 0xca, 0x99, 0xac, 0x1f, 0x74, 0x09, 0xba, 0x32, 0x14, 0x27, 0x41, 0x16, 0xca, 0xa4, 0x7a, 0xef, 0xd2, 0x71, 0x1c,
0xe7, 0x76, 0x39, 0x34, 0xfc, 0xd1, 0x80, 0x1d, 0x53, 0xea, 0x5c, 0xf7, 0x8b, 0x5c, 0x40, 0xf7, 0xca, 0xa4, 0x04, 0x13, 0x26, 0xa4, 0x06, 0x75, 0x8e, 0x4e, 0xe9, 0xa8, 0x40, 0x8e, 0xc3, 0x38,
0x0d, 0x65, 0x53, 0xcc, 0x45, 0x4e, 0x99, 0x22, 0xfb, 0xae, 0x59, 0x62, 0xee, 0x7c, 0x89, 0xb9, 0x60, 0x34, 0x2d, 0x94, 0x66, 0x3b, 0x7e, 0xa7, 0x74, 0x1c, 0xd1, 0xb4, 0x70, 0x12, 0x80, 0x65,
0xa7, 0xd5, 0x12, 0xeb, 0x7b, 0x37, 0xbc, 0xed, 0xce, 0xc6, 0xe3, 0x06, 0x49, 0x61, 0xbb, 0x9e, 0xbf, 0x7f, 0x50, 0x64, 0x00, 0xbd, 0x0c, 0xf3, 0x19, 0x11, 0x82, 0x30, 0x2a, 0xcc, 0x6a, 0xd4,
0x67, 0xb2, 0x76, 0xff, 0xac, 0xde, 0x94, 0xf5, 0xf5, 0x7e, 0xbb, 0x28, 0xce, 0xc6, 0xb1, 0x75, 0x5d, 0xc3, 0xaf, 0x16, 0x6c, 0xe8, 0x52, 0xc7, 0x6a, 0x5e, 0xe8, 0x23, 0xf4, 0x6a, 0x1f, 0x52,
0xd1, 0x36, 0xb9, 0x49, 0x5b, 0x9b, 0x7f, 0xf2, 0x2b, 0x00, 0x00, 0xff, 0xff, 0x15, 0x03, 0xfe, 0x34, 0x5c, 0x35, 0xd7, 0x1f, 0xbf, 0xc5, 0xfd, 0xdd, 0x0b, 0x71, 0xb4, 0xb0, 0x9d, 0xbf, 0x6e,
0x52, 0xe9, 0x05, 0x00, 0x00, 0x5b, 0x28, 0x85, 0xb6, 0xd1, 0x3b, 0x5a, 0xb9, 0x97, 0x67, 0x37, 0xa9, 0xef, 0x9d, 0x3b, 0xbe,
0xaa, 0xb7, 0xdf, 0x7e, 0xdb, 0x52, 0xff, 0x38, 0x93, 0x75, 0x75, 0xec, 0x7e, 0x0b, 0x00, 0x00,
0xff, 0xff, 0x5f, 0xeb, 0xc2, 0x59, 0xb8, 0x06, 0x00, 0x00,
} }

View File

@ -1,14 +1,13 @@
syntax = "proto3"; syntax = "proto3";
package hashicorp.nomad.plugins.device; package hashicorp.nomad.plugins.device;
option go_package = "device"; option go_package = "proto";
import "google/protobuf/empty.proto";
// DevicePlugin is the API exposed by device plugins // DevicePlugin is the API exposed by device plugins
service DevicePlugin { service DevicePlugin {
// Fingerprint allows the device plugin to return a set of // Fingerprint allows the device plugin to return a set of
// detected devices and provide a mechanism to update the state of // detected devices and provide a mechanism to update the state of
// the device. // the device.
rpc Fingerprint(google.protobuf.Empty) returns (stream DetectedDevices) {} rpc Fingerprint(FingerprintRequest) returns (stream FingerprintResponse) {}
// Reserve is called by the client before starting an allocation // Reserve is called by the client before starting an allocation
// that requires access to the plugins devices. The plugin can use // that requires access to the plugins devices. The plugin can use
@ -17,9 +16,19 @@ service DevicePlugin {
rpc Reserve(ReserveRequest) returns (ReserveResponse) {} rpc Reserve(ReserveRequest) returns (ReserveResponse) {}
} }
// DetectedDevices is the set of devices that the device plugin has // FingerprintRequest is used to request for devices to be fingerprinted.
// detected and is exposing message FingerprintRequest {}
message DetectedDevices {
// FingerprintResponse returns a set of detected devices.
message FingerprintResponse {
// device_group is a group of devices that share a vendor, device_type, and
// device_name. This is returned as a set so that a single plugin could
// potentially detect several device types and models.
repeated DeviceGroup device_group = 1;
}
// DeviceGroup is a group of devices that share a vendor, device type and name.
message DeviceGroup {
// vendor is the name of the vendor of the device // vendor is the name of the vendor of the device
string vendor = 1; string vendor = 1;
@ -32,15 +41,15 @@ message DetectedDevices {
// devices is the set of devices detected by the plugin. // devices is the set of devices detected by the plugin.
repeated DetectedDevice devices = 4; repeated DetectedDevice devices = 4;
// node_attributes allows adding node attributes to be used for // attributes allows adding attributes to be used for constraints or
// constraints or affinities. // affinities.
map<string, string> node_attributes = 5; map<string, string> attributes = 5;
} }
// DetectedDevice is a single detected device. // DetectedDevice is a single detected device.
message DetectedDevice { message DetectedDevice {
// ID is the ID of the device. This ID is used during // ID is the ID of the device. This ID is used during allocation and must be
// allocation. // stable across restarts of the device driver.
string ID = 1; string ID = 1;
// Health of the device. // Health of the device.
@ -50,11 +59,19 @@ message DetectedDevice {
// annotate the health field with a human readable reason. // annotate the health field with a human readable reason.
string health_description = 3; string health_description = 3;
// hw_locality is optionally set to expose hardware locality information for
// more optimal placement decisions.
DeviceLocality hw_locality = 4;
}
// DeviceLocality is used to expose HW locality information about a device.
message DeviceLocality {
// pci_bus_id is the PCI bus ID for the device. If reported, it // pci_bus_id is the PCI bus ID for the device. If reported, it
// allows Nomad to make NUMA aware optimizations. // allows Nomad to make NUMA aware optimizations.
string pci_bus_id = 4; string pci_bus_id = 1;
} }
// ReserveRequest is used to ask the device driver for information on // ReserveRequest is used to ask the device driver for information on
// how to allocate the requested devices. // how to allocate the requested devices.
message ReserveRequest { message ReserveRequest {
@ -111,3 +128,4 @@ message DeviceSpec {
// * m - allows task to create device files that do not yet exist // * m - allows task to create device files that do not yet exist
string permissions = 3; string permissions = 3;
} }

66
plugins/device/server.go Normal file
View File

@ -0,0 +1,66 @@
package device
import (
context "golang.org/x/net/context"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/plugins/device/proto"
)
// devicePluginServer wraps a device plugin and exposes it via gRPC.
type devicePluginServer struct {
broker *plugin.GRPCBroker
impl DevicePlugin
}
func (d *devicePluginServer) Fingerprint(req *proto.FingerprintRequest, stream proto.DevicePlugin_FingerprintServer) error {
ctx := stream.Context()
outCh, err := d.impl.Fingerprint(ctx)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case resp, ok := <-outCh:
// The output channel has been closed, end the stream
if !ok {
return nil
}
// Handle any error
if resp.Error != nil {
return resp.Error
}
// Convert the devices
out := convertStructDeviceGroups(resp.Devices)
// Build the response
presp := &proto.FingerprintResponse{
DeviceGroup: out,
}
// Send the devices
if err := stream.Send(presp); err != nil {
return err
}
}
}
}
func (d *devicePluginServer) Reserve(ctx context.Context, req *proto.ReserveRequest) (*proto.ReserveResponse, error) {
resp, err := d.impl.Reserve(req.GetDeviceIds())
if err != nil {
return nil, err
}
// Make the response
presp := &proto.ReserveResponse{
ContainerRes: convertStructContainerReservation(resp),
}
return presp, nil
}

275
plugins/device/util.go Normal file
View File

@ -0,0 +1,275 @@
package device
import "github.com/hashicorp/nomad/plugins/device/proto"
// convertProtoDeviceGroups converts between a list of proto and structs DeviceGroup
func convertProtoDeviceGroups(in []*proto.DeviceGroup) []*DeviceGroup {
if in == nil {
return nil
}
out := make([]*DeviceGroup, len(in))
for i, group := range in {
out[i] = convertProtoDeviceGroup(group)
}
return out
}
// convertProtoDeviceGroup converts between a proto and structs DeviceGroup
func convertProtoDeviceGroup(in *proto.DeviceGroup) *DeviceGroup {
if in == nil {
return nil
}
return &DeviceGroup{
Vendor: in.GetVendor(),
Type: in.GetDeviceType(),
Name: in.GetDeviceName(),
Devices: convertProtoDevices(in.GetDevices()),
Attributes: in.GetAttributes(),
}
}
// convertProtoDevices converts between a list of proto and structs Device
func convertProtoDevices(in []*proto.DetectedDevice) []*Device {
if in == nil {
return nil
}
out := make([]*Device, len(in))
for i, d := range in {
out[i] = convertProtoDevice(d)
}
return out
}
// convertProtoDevice converts between a proto and structs Device
func convertProtoDevice(in *proto.DetectedDevice) *Device {
if in == nil {
return nil
}
return &Device{
ID: in.GetID(),
Healthy: in.GetHealthy(),
HealthDesc: in.GetHealthDescription(),
HwLocality: convertProtoDeviceLocality(in.GetHwLocality()),
}
}
// convertProtoDeviceLocality converts between a proto and structs DeviceLocality
func convertProtoDeviceLocality(in *proto.DeviceLocality) *DeviceLocality {
if in == nil {
return nil
}
return &DeviceLocality{
PciBusID: in.GetPciBusId(),
}
}
// convertProtoContainerReservation is used to convert between a proto and struct
// ContainerReservation
func convertProtoContainerReservation(in *proto.ContainerReservation) *ContainerReservation {
if in == nil {
return nil
}
return &ContainerReservation{
Envs: in.GetEnvs(),
Mounts: convertProtoMounts(in.GetMounts()),
Devices: convertProtoDeviceSpecs(in.GetDevices()),
}
}
// convertProtoMount converts between a list of proto and structs Mount
func convertProtoMounts(in []*proto.Mount) []*Mount {
if in == nil {
return nil
}
out := make([]*Mount, len(in))
for i, d := range in {
out[i] = convertProtoMount(d)
}
return out
}
// convertProtoMount converts between a proto and structs Mount
func convertProtoMount(in *proto.Mount) *Mount {
if in == nil {
return nil
}
return &Mount{
TaskPath: in.GetTaskPath(),
HostPath: in.GetHostPath(),
ReadOnly: in.GetReadOnly(),
}
}
// convertProtoDeviceSpecs converts between a list of proto and structs DeviceSpecs
func convertProtoDeviceSpecs(in []*proto.DeviceSpec) []*DeviceSpec {
if in == nil {
return nil
}
out := make([]*DeviceSpec, len(in))
for i, d := range in {
out[i] = convertProtoDeviceSpec(d)
}
return out
}
// convertProtoDeviceSpec converts between a proto and structs DeviceSpec
func convertProtoDeviceSpec(in *proto.DeviceSpec) *DeviceSpec {
if in == nil {
return nil
}
return &DeviceSpec{
TaskPath: in.GetTaskPath(),
HostPath: in.GetHostPath(),
CgroupPerms: in.GetPermissions(),
}
}
// convertStructDeviceGroup converts between a list of struct and proto DeviceGroup
func convertStructDeviceGroups(in []*DeviceGroup) []*proto.DeviceGroup {
if in == nil {
return nil
}
out := make([]*proto.DeviceGroup, len(in))
for i, g := range in {
out[i] = convertStructDeviceGroup(g)
}
return out
}
// convertStructDeviceGroup converts between a struct and proto DeviceGroup
func convertStructDeviceGroup(in *DeviceGroup) *proto.DeviceGroup {
if in == nil {
return nil
}
return &proto.DeviceGroup{
Vendor: in.Vendor,
DeviceType: in.Type,
DeviceName: in.Name,
Devices: convertStructDevices(in.Devices),
Attributes: in.Attributes,
}
}
// convertStructDevices converts between a list of struct and proto Device
func convertStructDevices(in []*Device) []*proto.DetectedDevice {
if in == nil {
return nil
}
out := make([]*proto.DetectedDevice, len(in))
for i, d := range in {
out[i] = convertStructDevice(d)
}
return out
}
// convertStructDevice converts between a struct and proto Device
func convertStructDevice(in *Device) *proto.DetectedDevice {
if in == nil {
return nil
}
return &proto.DetectedDevice{
ID: in.ID,
Healthy: in.Healthy,
HealthDescription: in.HealthDesc,
HwLocality: convertStructDeviceLocality(in.HwLocality),
}
}
// convertStructDeviceLocality converts between a struct and proto DeviceLocality
func convertStructDeviceLocality(in *DeviceLocality) *proto.DeviceLocality {
if in == nil {
return nil
}
return &proto.DeviceLocality{
PciBusId: in.PciBusID,
}
}
// convertStructContainerReservation is used to convert between a struct and
// proto ContainerReservation
func convertStructContainerReservation(in *ContainerReservation) *proto.ContainerReservation {
if in == nil {
return nil
}
return &proto.ContainerReservation{
Envs: in.Envs,
Mounts: convertStructMounts(in.Mounts),
Devices: convertStructDeviceSpecs(in.Devices),
}
}
// convertStructMount converts between a list of structs and proto Mount
func convertStructMounts(in []*Mount) []*proto.Mount {
if in == nil {
return nil
}
out := make([]*proto.Mount, len(in))
for i, m := range in {
out[i] = convertStructMount(m)
}
return out
}
// convertStructMount converts between a struct and proto Mount
func convertStructMount(in *Mount) *proto.Mount {
if in == nil {
return nil
}
return &proto.Mount{
TaskPath: in.TaskPath,
HostPath: in.HostPath,
ReadOnly: in.ReadOnly,
}
}
// convertStructDeviceSpecs converts between a list of struct and proto DeviceSpecs
func convertStructDeviceSpecs(in []*DeviceSpec) []*proto.DeviceSpec {
if in == nil {
return nil
}
out := make([]*proto.DeviceSpec, len(in))
for i, d := range in {
out[i] = convertStructDeviceSpec(d)
}
return out
}
// convertStructDeviceSpec converts between a struct and proto DeviceSpec
func convertStructDeviceSpec(in *DeviceSpec) *proto.DeviceSpec {
if in == nil {
return nil
}
return &proto.DeviceSpec{
TaskPath: in.TaskPath,
HostPath: in.HostPath,
Permissions: in.CgroupPerms,
}
}