From 98a67938a0047abc68e9fc6cdfa1ca7434097476 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 13 Aug 2018 10:29:29 -0700 Subject: [PATCH 1/2] initial device go-plugin --- plugins/base/client.go | 18 +- plugins/base/mock.go | 18 - plugins/base/plugin.go | 9 +- plugins/base/plugin_test.go | 62 +--- plugins/base/testing.go | 60 ++++ plugins/device/client.go | 84 +++++ plugins/device/device.go | 114 ++++++ plugins/device/mock.go | 23 ++ plugins/device/plugin.go | 35 ++ plugins/device/plugin_test.go | 445 ++++++++++++++++++++++++ plugins/device/{ => proto}/device.pb.go | 321 +++++++++++------ plugins/device/{ => proto}/device.proto | 42 ++- plugins/device/server.go | 66 ++++ plugins/device/util.go | 275 +++++++++++++++ 14 files changed, 1376 insertions(+), 196 deletions(-) delete mode 100644 plugins/base/mock.go create mode 100644 plugins/base/testing.go create mode 100644 plugins/device/client.go create mode 100644 plugins/device/device.go create mode 100644 plugins/device/mock.go create mode 100644 plugins/device/plugin.go create mode 100644 plugins/device/plugin_test.go rename plugins/device/{ => proto}/device.pb.go (59%) rename plugins/device/{ => proto}/device.proto (71%) create mode 100644 plugins/device/server.go create mode 100644 plugins/device/util.go diff --git a/plugins/base/client.go b/plugins/base/client.go index 58d2e57d5..7d6ac5eba 100644 --- a/plugins/base/client.go +++ b/plugins/base/client.go @@ -8,14 +8,14 @@ import ( "github.com/hashicorp/nomad/plugins/shared/hclspec" ) -// basePluginClient implements the client side of a remote base plugin, using +// BasePluginClient implements the client side of a remote base plugin, using // gRPC to communicate to the remote plugin. -type basePluginClient struct { - client proto.BasePluginClient +type BasePluginClient struct { + Client proto.BasePluginClient } -func (b *basePluginClient) PluginInfo() (*PluginInfoResponse, error) { - presp, err := b.client.PluginInfo(context.Background(), &proto.PluginInfoRequest{}) +func (b *BasePluginClient) PluginInfo() (*PluginInfoResponse, error) { + presp, err := b.Client.PluginInfo(context.Background(), &proto.PluginInfoRequest{}) if err != nil { return nil, err } @@ -40,8 +40,8 @@ func (b *basePluginClient) PluginInfo() (*PluginInfoResponse, error) { return resp, nil } -func (b *basePluginClient) ConfigSchema() (*hclspec.Spec, error) { - presp, err := b.client.ConfigSchema(context.Background(), &proto.ConfigSchemaRequest{}) +func (b *BasePluginClient) ConfigSchema() (*hclspec.Spec, error) { + presp, err := b.Client.ConfigSchema(context.Background(), &proto.ConfigSchemaRequest{}) if err != nil { return nil, err } @@ -49,9 +49,9 @@ func (b *basePluginClient) ConfigSchema() (*hclspec.Spec, error) { return presp.GetSpec(), nil } -func (b *basePluginClient) SetConfig(data []byte) error { +func (b *BasePluginClient) SetConfig(data []byte) error { // Send the config - _, err := b.client.SetConfig(context.Background(), &proto.SetConfigRequest{ + _, err := b.Client.SetConfig(context.Background(), &proto.SetConfigRequest{ MsgpackConfig: data, }) diff --git a/plugins/base/mock.go b/plugins/base/mock.go deleted file mode 100644 index 7a3c5c77d..000000000 --- a/plugins/base/mock.go +++ /dev/null @@ -1,18 +0,0 @@ -package base - -import ( - "github.com/hashicorp/nomad/plugins/shared/hclspec" -) - -// MockPlugin is used for testing. -// Each function can be set as a closure to make assertions about how data -// is passed through the base plugin layer. -type MockPlugin struct { - PluginInfoF func() (*PluginInfoResponse, error) - ConfigSchemaF func() (*hclspec.Spec, error) - SetConfigF func([]byte) error -} - -func (p *MockPlugin) PluginInfo() (*PluginInfoResponse, error) { return p.PluginInfoF() } -func (p *MockPlugin) ConfigSchema() (*hclspec.Spec, error) { return p.ConfigSchemaF() } -func (p *MockPlugin) SetConfig(data []byte) error { return p.SetConfigF(data) } diff --git a/plugins/base/plugin.go b/plugins/base/plugin.go index 7f0219b29..74d97a4b7 100644 --- a/plugins/base/plugin.go +++ b/plugins/base/plugin.go @@ -9,6 +9,9 @@ import ( ) const ( + // PluginTypeBase implements the base plugin interface + PluginTypeBase = "base" + // PluginTypeDriver implements the driver plugin interface PluginTypeDriver = "driver" @@ -29,17 +32,17 @@ var ( // interface to expose the interface over gRPC. type PluginBase struct { plugin.NetRPCUnsupportedPlugin - impl BasePlugin + Impl BasePlugin } func (p *PluginBase) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { proto.RegisterBasePluginServer(s, &basePluginServer{ - impl: p.impl, + impl: p.Impl, broker: broker, }) return nil } func (p *PluginBase) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { - return &basePluginClient{client: proto.NewBasePluginClient(c)}, nil + return &BasePluginClient{Client: proto.NewBasePluginClient(c)}, nil } diff --git a/plugins/base/plugin_test.go b/plugins/base/plugin_test.go index 2e7d1c19f..33083afc5 100644 --- a/plugins/base/plugin_test.go +++ b/plugins/base/plugin_test.go @@ -12,48 +12,6 @@ import ( "github.com/zclconf/go-cty/cty/msgpack" ) -var ( - // testSpec is an hcl Spec for testing - testSpec = &hclspec.Spec{ - Block: &hclspec.Spec_Object{ - Object: &hclspec.Object{ - Attributes: map[string]*hclspec.Spec{ - "foo": { - Block: &hclspec.Spec_Attr{ - Attr: &hclspec.Attr{ - Type: "string", - Required: false, - }, - }, - }, - "bar": { - Block: &hclspec.Spec_Attr{ - Attr: &hclspec.Attr{ - Type: "number", - Required: true, - }, - }, - }, - "baz": { - Block: &hclspec.Spec_Attr{ - Attr: &hclspec.Attr{ - Type: "bool", - }, - }, - }, - }, - }, - }, - } -) - -// testConfig is used to decode a config from the testSpec -type testConfig struct { - Foo string `cty:"foo" codec:"foo"` - Bar int64 `cty:"bar" codec:"bar"` - Baz bool `cty:"baz" codec:"baz"` -} - func TestBasePlugin_PluginInfo_GRPC(t *testing.T) { t.Parallel() require := require.New(t) @@ -88,12 +46,12 @@ func TestBasePlugin_PluginInfo_GRPC(t *testing.T) { } client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ - "base": &PluginBase{impl: mock}, + PluginTypeBase: &PluginBase{Impl: mock}, }) defer server.Stop() defer client.Close() - raw, err := client.Dispense("base") + raw, err := client.Dispense(PluginTypeBase) if err != nil { t.Fatalf("err: %s", err) } @@ -123,17 +81,17 @@ func TestBasePlugin_ConfigSchema(t *testing.T) { mock := &MockPlugin{ ConfigSchemaF: func() (*hclspec.Spec, error) { - return testSpec, nil + return TestSpec, nil }, } client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ - "base": &PluginBase{impl: mock}, + PluginTypeBase: &PluginBase{Impl: mock}, }) defer server.Stop() defer client.Close() - raw, err := client.Dispense("base") + raw, err := client.Dispense(PluginTypeBase) if err != nil { t.Fatalf("err: %s", err) } @@ -145,7 +103,7 @@ func TestBasePlugin_ConfigSchema(t *testing.T) { specOut, err := impl.ConfigSchema() require.NoError(err) - require.True(pb.Equal(testSpec, specOut)) + require.True(pb.Equal(TestSpec, specOut)) } func TestBasePlugin_SetConfig(t *testing.T) { @@ -155,7 +113,7 @@ func TestBasePlugin_SetConfig(t *testing.T) { var receivedData []byte mock := &MockPlugin{ ConfigSchemaF: func() (*hclspec.Spec, error) { - return testSpec, nil + return TestSpec, nil }, SetConfigF: func(data []byte) error { receivedData = data @@ -164,12 +122,12 @@ func TestBasePlugin_SetConfig(t *testing.T) { } client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ - "base": &PluginBase{impl: mock}, + PluginTypeBase: &PluginBase{Impl: mock}, }) defer server.Stop() defer client.Close() - raw, err := client.Dispense("base") + raw, err := client.Dispense(PluginTypeBase) if err != nil { t.Fatalf("err: %s", err) } @@ -190,7 +148,7 @@ func TestBasePlugin_SetConfig(t *testing.T) { require.Equal(cdata, receivedData) // Decode the value back - var actual testConfig + var actual TestConfig require.NoError(structs.Decode(receivedData, &actual)) require.Equal("v1", actual.Foo) require.EqualValues(1337, actual.Bar) diff --git a/plugins/base/testing.go b/plugins/base/testing.go new file mode 100644 index 000000000..26247e19a --- /dev/null +++ b/plugins/base/testing.go @@ -0,0 +1,60 @@ +package base + +import ( + "github.com/hashicorp/nomad/plugins/shared/hclspec" +) + +var ( + // TestSpec is an hcl Spec for testing + TestSpec = &hclspec.Spec{ + Block: &hclspec.Spec_Object{ + Object: &hclspec.Object{ + Attributes: map[string]*hclspec.Spec{ + "foo": { + Block: &hclspec.Spec_Attr{ + Attr: &hclspec.Attr{ + Type: "string", + Required: false, + }, + }, + }, + "bar": { + Block: &hclspec.Spec_Attr{ + Attr: &hclspec.Attr{ + Type: "number", + Required: true, + }, + }, + }, + "baz": { + Block: &hclspec.Spec_Attr{ + Attr: &hclspec.Attr{ + Type: "bool", + }, + }, + }, + }, + }, + }, + } +) + +// TestConfig is used to decode a config from the TestSpec +type TestConfig struct { + Foo string `cty:"foo" codec:"foo"` + Bar int64 `cty:"bar" codec:"bar"` + Baz bool `cty:"baz" codec:"baz"` +} + +// MockPlugin is used for testing. +// Each function can be set as a closure to make assertions about how data +// is passed through the base plugin layer. +type MockPlugin struct { + PluginInfoF func() (*PluginInfoResponse, error) + ConfigSchemaF func() (*hclspec.Spec, error) + SetConfigF func([]byte) error +} + +func (p *MockPlugin) PluginInfo() (*PluginInfoResponse, error) { return p.PluginInfoF() } +func (p *MockPlugin) ConfigSchema() (*hclspec.Spec, error) { return p.ConfigSchemaF() } +func (p *MockPlugin) SetConfig(data []byte) error { return p.SetConfigF(data) } diff --git a/plugins/device/client.go b/plugins/device/client.go new file mode 100644 index 000000000..d0763088e --- /dev/null +++ b/plugins/device/client.go @@ -0,0 +1,84 @@ +package device + +import ( + "context" + "io" + + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/device/proto" + netctx "golang.org/x/net/context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// devicePluginClient implements the client side of a remote device plugin, using +// gRPC to communicate to the remote plugin. +type devicePluginClient struct { + // basePluginClient is embedded to give access to the base plugin methods. + *base.BasePluginClient + + client proto.DevicePluginClient +} + +func (d *devicePluginClient) Fingerprint(ctx context.Context) (<-chan *FingerprintResponse, error) { + var req proto.FingerprintRequest + stream, err := d.client.Fingerprint(ctx, &req) + if err != nil { + return nil, err + } + + out := make(chan *FingerprintResponse, 1) + go d.handleFingerprint(ctx, stream, out) + return out, nil +} + +// handleFingerprint should be launched in a goroutine and handles converting +// the gRPC stream to a channel. Exits either when context is cancelled or the +// stream has an error. +func (d *devicePluginClient) handleFingerprint( + ctx netctx.Context, + stream proto.DevicePlugin_FingerprintClient, + out chan *FingerprintResponse) { + + for { + resp, err := stream.Recv() + if err != nil { + // Handle a non-graceful stream error + if err != io.EOF { + if errStatus := status.FromContextError(ctx.Err()); errStatus.Code() == codes.Canceled { + err = context.Canceled + } + + out <- &FingerprintResponse{ + Error: err, + } + } + + // End the stream + close(out) + return + } + + // Send the response + out <- &FingerprintResponse{ + Devices: convertProtoDeviceGroups(resp.GetDeviceGroup()), + } + } +} + +func (d *devicePluginClient) Reserve(deviceIDs []string) (*ContainerReservation, error) { + // Build the request + req := &proto.ReserveRequest{ + DeviceIds: deviceIDs, + } + + // Make the request + resp, err := d.client.Reserve(context.Background(), req) + if err != nil { + return nil, err + } + + // Convert the response + out := convertProtoContainerReservation(resp.GetContainerRes()) + return out, nil +} diff --git a/plugins/device/device.go b/plugins/device/device.go new file mode 100644 index 000000000..236c00611 --- /dev/null +++ b/plugins/device/device.go @@ -0,0 +1,114 @@ +package device + +import ( + "context" + + "github.com/hashicorp/nomad/plugins/base" +) + +const ( + // DeviceTypeGPU is a canonical device type for a GPU. + DeviceTypeGPU = "gpu" +) + +// DevicePlugin is the interface for a plugin that can expose detected devices +// to Nomad and inform it how to mount them. +type DevicePlugin interface { + base.BasePlugin + + // Fingerprint returns a stream of devices that are detected. + Fingerprint(ctx context.Context) (<-chan *FingerprintResponse, error) + + // Reserve is used to reserve a set of devices and retrieve mount + // instructions. + Reserve(deviceIDs []string) (*ContainerReservation, error) +} + +// FingerprintResponse includes a set of detected devices or an error in the +// process of fingerprinting. +type FingerprintResponse struct { + // Devices is a set of devices that have been detected. + Devices []*DeviceGroup + + // Error is populated when fingerprinting has failed. + Error error +} + +// DeviceGroup is a grouping of devices that share a common vendor, device type +// and name. +type DeviceGroup struct { + // Vendor is the vendor providing the device (nvidia, intel, etc). + Vendor string + + // Type is the type of the device (gpu, fpga, etc). + Type string + + // Name is the devices model name. + Name string + + // Devices is the set of device instances. + Devices []*Device + + // Attributes are a set of attributes shared for all the devices. + Attributes map[string]string +} + +// Device is an instance of a particular device. +type Device struct { + // ID is the identifier for the device. + ID string + + // Healthy marks whether the device is healthy and can be used for + // scheduling. + Healthy bool + + // HealthDesc describes why the device may be unhealthy. + HealthDesc string + + // HwLocality captures hardware locality information for the device. + HwLocality *DeviceLocality +} + +// DeviceLocality captures hardware locality information for a device. +type DeviceLocality struct { + // PciBusID is the PCI bus ID of the device. + PciBusID string +} + +// ContainerReservation describes how to mount a device into a container. A +// container is an isolated environment that shares the host's OS. +type ContainerReservation struct { + // Envs are a set of environment variables to set for the task. + Envs map[string]string + + // Mounts are used to mount host volumes into a container that may include + // libraries, etc. + Mounts []*Mount + + // Devices are the set of devices to mount into the container. + Devices []*DeviceSpec +} + +// Mount is used to mount a host directory into a container. +type Mount struct { + // TaskPath is the location in the task's file system to mount. + TaskPath string + + // HostPath is the host directory path to mount. + HostPath string + + // ReadOnly defines whether the mount should be read only to the task. + ReadOnly bool +} + +// DeviceSpec captures how to mount a device into a container. +type DeviceSpec struct { + // TaskPath is the location to mount the device in the task's file system. + TaskPath string + + // HostPath is the host location of the device. + HostPath string + + // CgroupPerms defines the permissions to use when mounting the device. + CgroupPerms string +} diff --git a/plugins/device/mock.go b/plugins/device/mock.go new file mode 100644 index 000000000..39afeb33d --- /dev/null +++ b/plugins/device/mock.go @@ -0,0 +1,23 @@ +package device + +import ( + "context" + + "github.com/hashicorp/nomad/plugins/base" +) + +// MockDevicePlugin is used for testing. +// Each function can be set as a closure to make assertions about how data +// is passed through the base plugin layer. +type MockDevicePlugin struct { + *base.MockPlugin + FingerprintF func(context.Context) (<-chan *FingerprintResponse, error) + ReserveF func([]string) (*ContainerReservation, error) +} + +func (p *MockDevicePlugin) Fingerprint(ctx context.Context) (<-chan *FingerprintResponse, error) { + return p.FingerprintF(ctx) +} +func (p *MockDevicePlugin) Reserve(devices []string) (*ContainerReservation, error) { + return p.ReserveF(devices) +} diff --git a/plugins/device/plugin.go b/plugins/device/plugin.go new file mode 100644 index 000000000..e9218593b --- /dev/null +++ b/plugins/device/plugin.go @@ -0,0 +1,35 @@ +package device + +import ( + "context" + + plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/plugins/base" + bproto "github.com/hashicorp/nomad/plugins/base/proto" + "github.com/hashicorp/nomad/plugins/device/proto" + "google.golang.org/grpc" +) + +// PluginDevice is wraps a DevicePlugin and implements go-plugins GRPCPlugin +// interface to expose the interface over gRPC. +type PluginDevice struct { + plugin.NetRPCUnsupportedPlugin + Impl DevicePlugin +} + +func (p *PluginDevice) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { + proto.RegisterDevicePluginServer(s, &devicePluginServer{ + impl: p.Impl, + broker: broker, + }) + return nil +} + +func (p *PluginDevice) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { + return &devicePluginClient{ + client: proto.NewDevicePluginClient(c), + BasePluginClient: &base.BasePluginClient{ + Client: bproto.NewBasePluginClient(c), + }, + }, nil +} diff --git a/plugins/device/plugin_test.go b/plugins/device/plugin_test.go new file mode 100644 index 000000000..8ead11fd1 --- /dev/null +++ b/plugins/device/plugin_test.go @@ -0,0 +1,445 @@ +package device + +import ( + "context" + "fmt" + "testing" + "time" + + pb "github.com/golang/protobuf/proto" + plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" + "github.com/zclconf/go-cty/cty" + "github.com/zclconf/go-cty/cty/msgpack" + "google.golang.org/grpc/status" +) + +func TestDevicePlugin_PluginInfo(t *testing.T) { + t.Parallel() + require := require.New(t) + + const ( + apiVersion = "v0.1.0" + pluginVersion = "v0.2.1" + pluginName = "mock" + ) + + knownType := func() (*base.PluginInfoResponse, error) { + info := &base.PluginInfoResponse{ + Type: base.PluginTypeDevice, + PluginApiVersion: apiVersion, + PluginVersion: pluginVersion, + Name: pluginName, + } + return info, nil + } + unknownType := func() (*base.PluginInfoResponse, error) { + info := &base.PluginInfoResponse{ + Type: "bad", + PluginApiVersion: apiVersion, + PluginVersion: pluginVersion, + Name: pluginName, + } + return info, nil + } + + mock := &MockDevicePlugin{ + MockPlugin: &base.MockPlugin{ + PluginInfoF: knownType, + }, + } + + client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ + base.PluginTypeBase: &base.PluginBase{Impl: mock}, + base.PluginTypeDevice: &PluginDevice{Impl: mock}, + }) + defer server.Stop() + defer client.Close() + + raw, err := client.Dispense(base.PluginTypeDevice) + if err != nil { + t.Fatalf("err: %s", err) + } + + impl, ok := raw.(DevicePlugin) + if !ok { + t.Fatalf("bad: %#v", raw) + } + + resp, err := impl.PluginInfo() + require.NoError(err) + require.Equal(apiVersion, resp.PluginApiVersion) + require.Equal(pluginVersion, resp.PluginVersion) + require.Equal(pluginName, resp.Name) + require.Equal(base.PluginTypeDevice, resp.Type) + + // Swap the implementation to return an unknown type + mock.PluginInfoF = unknownType + _, err = impl.PluginInfo() + require.Error(err) + require.Contains(err.Error(), "unknown type") +} + +func TestDevicePlugin_ConfigSchema(t *testing.T) { + t.Parallel() + require := require.New(t) + + mock := &MockDevicePlugin{ + MockPlugin: &base.MockPlugin{ + ConfigSchemaF: func() (*hclspec.Spec, error) { + return base.TestSpec, nil + }, + }, + } + + client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ + base.PluginTypeBase: &base.PluginBase{Impl: mock}, + base.PluginTypeDevice: &PluginDevice{Impl: mock}, + }) + defer server.Stop() + defer client.Close() + + raw, err := client.Dispense(base.PluginTypeDevice) + if err != nil { + t.Fatalf("err: %s", err) + } + + impl, ok := raw.(DevicePlugin) + if !ok { + t.Fatalf("bad: %#v", raw) + } + + specOut, err := impl.ConfigSchema() + require.NoError(err) + require.True(pb.Equal(base.TestSpec, specOut)) +} + +func TestDevicePlugin_SetConfig(t *testing.T) { + t.Parallel() + require := require.New(t) + + var receivedData []byte + mock := &MockDevicePlugin{ + MockPlugin: &base.MockPlugin{ + ConfigSchemaF: func() (*hclspec.Spec, error) { + return base.TestSpec, nil + }, + SetConfigF: func(data []byte) error { + receivedData = data + return nil + }, + }, + } + + client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ + base.PluginTypeBase: &base.PluginBase{Impl: mock}, + base.PluginTypeDevice: &PluginDevice{Impl: mock}, + }) + defer server.Stop() + defer client.Close() + + raw, err := client.Dispense(base.PluginTypeDevice) + if err != nil { + t.Fatalf("err: %s", err) + } + + impl, ok := raw.(DevicePlugin) + if !ok { + t.Fatalf("bad: %#v", raw) + } + + config := cty.ObjectVal(map[string]cty.Value{ + "foo": cty.StringVal("v1"), + "bar": cty.NumberIntVal(1337), + "baz": cty.BoolVal(true), + }) + cdata, err := msgpack.Marshal(config, config.Type()) + require.NoError(err) + require.NoError(impl.SetConfig(cdata)) + require.Equal(cdata, receivedData) + + // Decode the value back + var actual base.TestConfig + require.NoError(structs.Decode(receivedData, &actual)) + require.Equal("v1", actual.Foo) + require.EqualValues(1337, actual.Bar) + require.True(actual.Baz) +} + +func TestDevicePlugin_Fingerprint(t *testing.T) { + t.Parallel() + require := require.New(t) + + devices1 := []*DeviceGroup{ + { + Vendor: "nvidia", + Type: DeviceTypeGPU, + Name: "foo", + }, + } + devices2 := []*DeviceGroup{ + { + Vendor: "nvidia", + Type: DeviceTypeGPU, + Name: "foo", + }, + { + Vendor: "nvidia", + Type: DeviceTypeGPU, + Name: "bar", + }, + } + + mock := &MockDevicePlugin{ + FingerprintF: func(ctx context.Context) (<-chan *FingerprintResponse, error) { + outCh := make(chan *FingerprintResponse, 1) + go func() { + // Send two messages + for _, devs := range [][]*DeviceGroup{devices1, devices2} { + select { + case <-ctx.Done(): + return + case outCh <- &FingerprintResponse{Devices: devs}: + } + } + close(outCh) + return + }() + return outCh, nil + }, + } + + client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ + base.PluginTypeBase: &base.PluginBase{Impl: mock}, + base.PluginTypeDevice: &PluginDevice{Impl: mock}, + }) + defer server.Stop() + defer client.Close() + + raw, err := client.Dispense(base.PluginTypeDevice) + if err != nil { + t.Fatalf("err: %s", err) + } + + impl, ok := raw.(DevicePlugin) + if !ok { + t.Fatalf("bad: %#v", raw) + } + + // Create a context + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Get the stream + stream, err := impl.Fingerprint(ctx) + require.NoError(err) + + // Get the first message + var first *FingerprintResponse + select { + case <-time.After(1 * time.Second): + t.Fatal("timeout") + case first = <-stream: + } + + require.NoError(first.Error) + require.EqualValues(devices1, first.Devices) + + // Get the second message + var second *FingerprintResponse + select { + case <-time.After(1 * time.Second): + t.Fatal("timeout") + case second = <-stream: + } + + require.NoError(second.Error) + require.EqualValues(devices2, second.Devices) + + select { + case _, ok := <-stream: + require.False(ok) + case <-time.After(1 * time.Second): + t.Fatal("stream should be closed") + } +} + +func TestDevicePlugin_Fingerprint_StreamErr(t *testing.T) { + t.Parallel() + require := require.New(t) + + ferr := fmt.Errorf("mock fingerprinting failed") + mock := &MockDevicePlugin{ + FingerprintF: func(ctx context.Context) (<-chan *FingerprintResponse, error) { + outCh := make(chan *FingerprintResponse, 1) + go func() { + // Send the error + select { + case <-ctx.Done(): + return + case outCh <- &FingerprintResponse{Error: ferr}: + } + + close(outCh) + return + }() + return outCh, nil + }, + } + + client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ + base.PluginTypeBase: &base.PluginBase{Impl: mock}, + base.PluginTypeDevice: &PluginDevice{Impl: mock}, + }) + defer server.Stop() + defer client.Close() + + raw, err := client.Dispense(base.PluginTypeDevice) + if err != nil { + t.Fatalf("err: %s", err) + } + + impl, ok := raw.(DevicePlugin) + if !ok { + t.Fatalf("bad: %#v", raw) + } + + // Create a context + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Get the stream + stream, err := impl.Fingerprint(ctx) + require.NoError(err) + + // Get the first message + var first *FingerprintResponse + select { + case <-time.After(1 * time.Second): + t.Fatal("timeout") + case first = <-stream: + } + + errStatus := status.Convert(ferr) + require.EqualError(first.Error, errStatus.Err().Error()) +} + +func TestDevicePlugin_Fingerprint_CancelCtx(t *testing.T) { + t.Parallel() + require := require.New(t) + + mock := &MockDevicePlugin{ + FingerprintF: func(ctx context.Context) (<-chan *FingerprintResponse, error) { + outCh := make(chan *FingerprintResponse, 1) + go func() { + <-ctx.Done() + close(outCh) + return + }() + return outCh, nil + }, + } + + client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ + base.PluginTypeBase: &base.PluginBase{Impl: mock}, + base.PluginTypeDevice: &PluginDevice{Impl: mock}, + }) + defer server.Stop() + defer client.Close() + + raw, err := client.Dispense(base.PluginTypeDevice) + if err != nil { + t.Fatalf("err: %s", err) + } + + impl, ok := raw.(DevicePlugin) + if !ok { + t.Fatalf("bad: %#v", raw) + } + + // Create a context + ctx, cancel := context.WithCancel(context.Background()) + + // Get the stream + stream, err := impl.Fingerprint(ctx) + require.NoError(err) + + // Get the first message + select { + case <-time.After(testutil.Timeout(10 * time.Millisecond)): + case _ = <-stream: + t.Fatal("bad value") + } + + // Cancel the context + cancel() + + // Make sure we are done + select { + case <-time.After(100 * time.Millisecond): + t.Fatalf("timeout") + case v := <-stream: + require.Error(v.Error) + require.EqualError(v.Error, context.Canceled.Error()) + } +} + +func TestDevicePlugin_Reserve(t *testing.T) { + t.Parallel() + require := require.New(t) + + reservation := &ContainerReservation{ + Envs: map[string]string{ + "foo": "bar", + }, + Mounts: []*Mount{ + { + TaskPath: "foo", + HostPath: "bar", + ReadOnly: true, + }, + }, + Devices: []*DeviceSpec{ + { + TaskPath: "foo", + HostPath: "bar", + CgroupPerms: "rx", + }, + }, + } + + var received []string + mock := &MockDevicePlugin{ + ReserveF: func(devices []string) (*ContainerReservation, error) { + received = devices + return reservation, nil + }, + } + + client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ + base.PluginTypeBase: &base.PluginBase{Impl: mock}, + base.PluginTypeDevice: &PluginDevice{Impl: mock}, + }) + defer server.Stop() + defer client.Close() + + raw, err := client.Dispense(base.PluginTypeDevice) + if err != nil { + t.Fatalf("err: %s", err) + } + + impl, ok := raw.(DevicePlugin) + if !ok { + t.Fatalf("bad: %#v", raw) + } + + req := []string{"a", "b"} + containerRes, err := impl.Reserve(req) + require.NoError(err) + require.EqualValues(req, received) + require.EqualValues(reservation, containerRes) +} diff --git a/plugins/device/device.pb.go b/plugins/device/proto/device.pb.go similarity index 59% rename from plugins/device/device.pb.go rename to plugins/device/proto/device.pb.go index 1c5828cd9..d2a75d31c 100644 --- a/plugins/device/device.pb.go +++ b/plugins/device/proto/device.pb.go @@ -1,12 +1,11 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: github.com/hashicorp/nomad/plugins/device/device.proto +// source: github.com/hashicorp/nomad/plugins/device/proto/device.proto -package device +package proto import proto "github.com/golang/protobuf/proto" import fmt "fmt" import math "math" -import empty "github.com/golang/protobuf/ptypes/empty" import ( context "golang.org/x/net/context" @@ -24,9 +23,81 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package -// DetectedDevices is the set of devices that the device plugin has -// detected and is exposing -type DetectedDevices struct { +// FingerprintRequest is used to request for devices to be fingerprinted. +type FingerprintRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *FingerprintRequest) Reset() { *m = FingerprintRequest{} } +func (m *FingerprintRequest) String() string { return proto.CompactTextString(m) } +func (*FingerprintRequest) ProtoMessage() {} +func (*FingerprintRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_device_7496b084f8b5ea81, []int{0} +} +func (m *FingerprintRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_FingerprintRequest.Unmarshal(m, b) +} +func (m *FingerprintRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_FingerprintRequest.Marshal(b, m, deterministic) +} +func (dst *FingerprintRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_FingerprintRequest.Merge(dst, src) +} +func (m *FingerprintRequest) XXX_Size() int { + return xxx_messageInfo_FingerprintRequest.Size(m) +} +func (m *FingerprintRequest) XXX_DiscardUnknown() { + xxx_messageInfo_FingerprintRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_FingerprintRequest proto.InternalMessageInfo + +// FingerprintResponse returns a set of detected devices. +type FingerprintResponse struct { + // device_group is a group of devices that share a vendor, device_type, and + // device_name. This is returned as a set so that a single plugin could + // potentially detect several device types and models. + DeviceGroup []*DeviceGroup `protobuf:"bytes,1,rep,name=device_group,json=deviceGroup,proto3" json:"device_group,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *FingerprintResponse) Reset() { *m = FingerprintResponse{} } +func (m *FingerprintResponse) String() string { return proto.CompactTextString(m) } +func (*FingerprintResponse) ProtoMessage() {} +func (*FingerprintResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_device_7496b084f8b5ea81, []int{1} +} +func (m *FingerprintResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_FingerprintResponse.Unmarshal(m, b) +} +func (m *FingerprintResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_FingerprintResponse.Marshal(b, m, deterministic) +} +func (dst *FingerprintResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_FingerprintResponse.Merge(dst, src) +} +func (m *FingerprintResponse) XXX_Size() int { + return xxx_messageInfo_FingerprintResponse.Size(m) +} +func (m *FingerprintResponse) XXX_DiscardUnknown() { + xxx_messageInfo_FingerprintResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_FingerprintResponse proto.InternalMessageInfo + +func (m *FingerprintResponse) GetDeviceGroup() []*DeviceGroup { + if m != nil { + return m.DeviceGroup + } + return nil +} + +// DeviceGroup is a group of devices that share a vendor, device type and name. +type DeviceGroup struct { // vendor is the name of the vendor of the device Vendor string `protobuf:"bytes,1,opt,name=vendor,proto3" json:"vendor,omitempty"` // device_type is the type of the device (gpu, fpga, etc). @@ -35,96 +106,96 @@ type DetectedDevices struct { DeviceName string `protobuf:"bytes,3,opt,name=device_name,json=deviceName,proto3" json:"device_name,omitempty"` // devices is the set of devices detected by the plugin. Devices []*DetectedDevice `protobuf:"bytes,4,rep,name=devices,proto3" json:"devices,omitempty"` - // node_attributes allows adding node attributes to be used for - // constraints or affinities. - NodeAttributes map[string]string `protobuf:"bytes,5,rep,name=node_attributes,json=nodeAttributes,proto3" json:"node_attributes,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // attributes allows adding attributes to be used for constraints or + // affinities. + Attributes map[string]string `protobuf:"bytes,5,rep,name=attributes,proto3" json:"attributes,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } -func (m *DetectedDevices) Reset() { *m = DetectedDevices{} } -func (m *DetectedDevices) String() string { return proto.CompactTextString(m) } -func (*DetectedDevices) ProtoMessage() {} -func (*DetectedDevices) Descriptor() ([]byte, []int) { - return fileDescriptor_device_13acb8ec0117c3b0, []int{0} +func (m *DeviceGroup) Reset() { *m = DeviceGroup{} } +func (m *DeviceGroup) String() string { return proto.CompactTextString(m) } +func (*DeviceGroup) ProtoMessage() {} +func (*DeviceGroup) Descriptor() ([]byte, []int) { + return fileDescriptor_device_7496b084f8b5ea81, []int{2} } -func (m *DetectedDevices) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_DetectedDevices.Unmarshal(m, b) +func (m *DeviceGroup) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DeviceGroup.Unmarshal(m, b) } -func (m *DetectedDevices) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_DetectedDevices.Marshal(b, m, deterministic) +func (m *DeviceGroup) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DeviceGroup.Marshal(b, m, deterministic) } -func (dst *DetectedDevices) XXX_Merge(src proto.Message) { - xxx_messageInfo_DetectedDevices.Merge(dst, src) +func (dst *DeviceGroup) XXX_Merge(src proto.Message) { + xxx_messageInfo_DeviceGroup.Merge(dst, src) } -func (m *DetectedDevices) XXX_Size() int { - return xxx_messageInfo_DetectedDevices.Size(m) +func (m *DeviceGroup) XXX_Size() int { + return xxx_messageInfo_DeviceGroup.Size(m) } -func (m *DetectedDevices) XXX_DiscardUnknown() { - xxx_messageInfo_DetectedDevices.DiscardUnknown(m) +func (m *DeviceGroup) XXX_DiscardUnknown() { + xxx_messageInfo_DeviceGroup.DiscardUnknown(m) } -var xxx_messageInfo_DetectedDevices proto.InternalMessageInfo +var xxx_messageInfo_DeviceGroup proto.InternalMessageInfo -func (m *DetectedDevices) GetVendor() string { +func (m *DeviceGroup) GetVendor() string { if m != nil { return m.Vendor } return "" } -func (m *DetectedDevices) GetDeviceType() string { +func (m *DeviceGroup) GetDeviceType() string { if m != nil { return m.DeviceType } return "" } -func (m *DetectedDevices) GetDeviceName() string { +func (m *DeviceGroup) GetDeviceName() string { if m != nil { return m.DeviceName } return "" } -func (m *DetectedDevices) GetDevices() []*DetectedDevice { +func (m *DeviceGroup) GetDevices() []*DetectedDevice { if m != nil { return m.Devices } return nil } -func (m *DetectedDevices) GetNodeAttributes() map[string]string { +func (m *DeviceGroup) GetAttributes() map[string]string { if m != nil { - return m.NodeAttributes + return m.Attributes } return nil } // DetectedDevice is a single detected device. type DetectedDevice struct { - // ID is the ID of the device. This ID is used during - // allocation. + // ID is the ID of the device. This ID is used during allocation and must be + // stable across restarts of the device driver. ID string `protobuf:"bytes,1,opt,name=ID,proto3" json:"ID,omitempty"` // Health of the device. Healthy bool `protobuf:"varint,2,opt,name=healthy,proto3" json:"healthy,omitempty"` // health_description allows the device plugin to optionally // annotate the health field with a human readable reason. HealthDescription string `protobuf:"bytes,3,opt,name=health_description,json=healthDescription,proto3" json:"health_description,omitempty"` - // pci_bus_id is the PCI bus ID for the device. If reported, it - // allows Nomad to make NUMA aware optimizations. - PciBusId string `protobuf:"bytes,4,opt,name=pci_bus_id,json=pciBusId,proto3" json:"pci_bus_id,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + // hw_locality is optionally set to expose hardware locality information for + // more optimal placement decisions. + HwLocality *DeviceLocality `protobuf:"bytes,4,opt,name=hw_locality,json=hwLocality,proto3" json:"hw_locality,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *DetectedDevice) Reset() { *m = DetectedDevice{} } func (m *DetectedDevice) String() string { return proto.CompactTextString(m) } func (*DetectedDevice) ProtoMessage() {} func (*DetectedDevice) Descriptor() ([]byte, []int) { - return fileDescriptor_device_13acb8ec0117c3b0, []int{1} + return fileDescriptor_device_7496b084f8b5ea81, []int{3} } func (m *DetectedDevice) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DetectedDevice.Unmarshal(m, b) @@ -165,7 +236,48 @@ func (m *DetectedDevice) GetHealthDescription() string { return "" } -func (m *DetectedDevice) GetPciBusId() string { +func (m *DetectedDevice) GetHwLocality() *DeviceLocality { + if m != nil { + return m.HwLocality + } + return nil +} + +// DeviceLocality is used to expose HW locality information about a device. +type DeviceLocality struct { + // pci_bus_id is the PCI bus ID for the device. If reported, it + // allows Nomad to make NUMA aware optimizations. + PciBusId string `protobuf:"bytes,1,opt,name=pci_bus_id,json=pciBusId,proto3" json:"pci_bus_id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DeviceLocality) Reset() { *m = DeviceLocality{} } +func (m *DeviceLocality) String() string { return proto.CompactTextString(m) } +func (*DeviceLocality) ProtoMessage() {} +func (*DeviceLocality) Descriptor() ([]byte, []int) { + return fileDescriptor_device_7496b084f8b5ea81, []int{4} +} +func (m *DeviceLocality) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DeviceLocality.Unmarshal(m, b) +} +func (m *DeviceLocality) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DeviceLocality.Marshal(b, m, deterministic) +} +func (dst *DeviceLocality) XXX_Merge(src proto.Message) { + xxx_messageInfo_DeviceLocality.Merge(dst, src) +} +func (m *DeviceLocality) XXX_Size() int { + return xxx_messageInfo_DeviceLocality.Size(m) +} +func (m *DeviceLocality) XXX_DiscardUnknown() { + xxx_messageInfo_DeviceLocality.DiscardUnknown(m) +} + +var xxx_messageInfo_DeviceLocality proto.InternalMessageInfo + +func (m *DeviceLocality) GetPciBusId() string { if m != nil { return m.PciBusId } @@ -186,7 +298,7 @@ func (m *ReserveRequest) Reset() { *m = ReserveRequest{} } func (m *ReserveRequest) String() string { return proto.CompactTextString(m) } func (*ReserveRequest) ProtoMessage() {} func (*ReserveRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_device_13acb8ec0117c3b0, []int{2} + return fileDescriptor_device_7496b084f8b5ea81, []int{5} } func (m *ReserveRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ReserveRequest.Unmarshal(m, b) @@ -229,7 +341,7 @@ func (m *ReserveResponse) Reset() { *m = ReserveResponse{} } func (m *ReserveResponse) String() string { return proto.CompactTextString(m) } func (*ReserveResponse) ProtoMessage() {} func (*ReserveResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_device_13acb8ec0117c3b0, []int{3} + return fileDescriptor_device_7496b084f8b5ea81, []int{6} } func (m *ReserveResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ReserveResponse.Unmarshal(m, b) @@ -274,7 +386,7 @@ func (m *ContainerReservation) Reset() { *m = ContainerReservation{} } func (m *ContainerReservation) String() string { return proto.CompactTextString(m) } func (*ContainerReservation) ProtoMessage() {} func (*ContainerReservation) Descriptor() ([]byte, []int) { - return fileDescriptor_device_13acb8ec0117c3b0, []int{4} + return fileDescriptor_device_7496b084f8b5ea81, []int{7} } func (m *ContainerReservation) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ContainerReservation.Unmarshal(m, b) @@ -333,7 +445,7 @@ func (m *Mount) Reset() { *m = Mount{} } func (m *Mount) String() string { return proto.CompactTextString(m) } func (*Mount) ProtoMessage() {} func (*Mount) Descriptor() ([]byte, []int) { - return fileDescriptor_device_13acb8ec0117c3b0, []int{5} + return fileDescriptor_device_7496b084f8b5ea81, []int{8} } func (m *Mount) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Mount.Unmarshal(m, b) @@ -394,7 +506,7 @@ func (m *DeviceSpec) Reset() { *m = DeviceSpec{} } func (m *DeviceSpec) String() string { return proto.CompactTextString(m) } func (*DeviceSpec) ProtoMessage() {} func (*DeviceSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_device_13acb8ec0117c3b0, []int{6} + return fileDescriptor_device_7496b084f8b5ea81, []int{9} } func (m *DeviceSpec) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DeviceSpec.Unmarshal(m, b) @@ -436,9 +548,12 @@ func (m *DeviceSpec) GetPermissions() string { } func init() { - proto.RegisterType((*DetectedDevices)(nil), "hashicorp.nomad.plugins.device.DetectedDevices") - proto.RegisterMapType((map[string]string)(nil), "hashicorp.nomad.plugins.device.DetectedDevices.NodeAttributesEntry") + proto.RegisterType((*FingerprintRequest)(nil), "hashicorp.nomad.plugins.device.FingerprintRequest") + proto.RegisterType((*FingerprintResponse)(nil), "hashicorp.nomad.plugins.device.FingerprintResponse") + proto.RegisterType((*DeviceGroup)(nil), "hashicorp.nomad.plugins.device.DeviceGroup") + proto.RegisterMapType((map[string]string)(nil), "hashicorp.nomad.plugins.device.DeviceGroup.AttributesEntry") proto.RegisterType((*DetectedDevice)(nil), "hashicorp.nomad.plugins.device.DetectedDevice") + proto.RegisterType((*DeviceLocality)(nil), "hashicorp.nomad.plugins.device.DeviceLocality") proto.RegisterType((*ReserveRequest)(nil), "hashicorp.nomad.plugins.device.ReserveRequest") proto.RegisterType((*ReserveResponse)(nil), "hashicorp.nomad.plugins.device.ReserveResponse") proto.RegisterType((*ContainerReservation)(nil), "hashicorp.nomad.plugins.device.ContainerReservation") @@ -462,7 +577,7 @@ type DevicePluginClient interface { // Fingerprint allows the device plugin to return a set of // detected devices and provide a mechanism to update the state of // the device. - Fingerprint(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (DevicePlugin_FingerprintClient, error) + Fingerprint(ctx context.Context, in *FingerprintRequest, opts ...grpc.CallOption) (DevicePlugin_FingerprintClient, error) // Reserve is called by the client before starting an allocation // that requires access to the plugin’s devices. The plugin can use // this to run any setup steps and provides the mounting details to @@ -478,7 +593,7 @@ func NewDevicePluginClient(cc *grpc.ClientConn) DevicePluginClient { return &devicePluginClient{cc} } -func (c *devicePluginClient) Fingerprint(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (DevicePlugin_FingerprintClient, error) { +func (c *devicePluginClient) Fingerprint(ctx context.Context, in *FingerprintRequest, opts ...grpc.CallOption) (DevicePlugin_FingerprintClient, error) { stream, err := c.cc.NewStream(ctx, &_DevicePlugin_serviceDesc.Streams[0], "/hashicorp.nomad.plugins.device.DevicePlugin/Fingerprint", opts...) if err != nil { return nil, err @@ -494,7 +609,7 @@ func (c *devicePluginClient) Fingerprint(ctx context.Context, in *empty.Empty, o } type DevicePlugin_FingerprintClient interface { - Recv() (*DetectedDevices, error) + Recv() (*FingerprintResponse, error) grpc.ClientStream } @@ -502,8 +617,8 @@ type devicePluginFingerprintClient struct { grpc.ClientStream } -func (x *devicePluginFingerprintClient) Recv() (*DetectedDevices, error) { - m := new(DetectedDevices) +func (x *devicePluginFingerprintClient) Recv() (*FingerprintResponse, error) { + m := new(FingerprintResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } @@ -524,7 +639,7 @@ type DevicePluginServer interface { // Fingerprint allows the device plugin to return a set of // detected devices and provide a mechanism to update the state of // the device. - Fingerprint(*empty.Empty, DevicePlugin_FingerprintServer) error + Fingerprint(*FingerprintRequest, DevicePlugin_FingerprintServer) error // Reserve is called by the client before starting an allocation // that requires access to the plugin’s devices. The plugin can use // this to run any setup steps and provides the mounting details to @@ -537,7 +652,7 @@ func RegisterDevicePluginServer(s *grpc.Server, srv DevicePluginServer) { } func _DevicePlugin_Fingerprint_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(empty.Empty) + m := new(FingerprintRequest) if err := stream.RecvMsg(m); err != nil { return err } @@ -545,7 +660,7 @@ func _DevicePlugin_Fingerprint_Handler(srv interface{}, stream grpc.ServerStream } type DevicePlugin_FingerprintServer interface { - Send(*DetectedDevices) error + Send(*FingerprintResponse) error grpc.ServerStream } @@ -553,7 +668,7 @@ type devicePluginFingerprintServer struct { grpc.ServerStream } -func (x *devicePluginFingerprintServer) Send(m *DetectedDevices) error { +func (x *devicePluginFingerprintServer) Send(m *FingerprintResponse) error { return x.ServerStream.SendMsg(m) } @@ -591,54 +706,56 @@ var _DevicePlugin_serviceDesc = grpc.ServiceDesc{ ServerStreams: true, }, }, - Metadata: "github.com/hashicorp/nomad/plugins/device/device.proto", + Metadata: "github.com/hashicorp/nomad/plugins/device/proto/device.proto", } func init() { - proto.RegisterFile("github.com/hashicorp/nomad/plugins/device/device.proto", fileDescriptor_device_13acb8ec0117c3b0) + proto.RegisterFile("github.com/hashicorp/nomad/plugins/device/proto/device.proto", fileDescriptor_device_7496b084f8b5ea81) } -var fileDescriptor_device_13acb8ec0117c3b0 = []byte{ - // 645 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x54, 0xdd, 0x6e, 0xd3, 0x4c, - 0x10, 0x6d, 0x92, 0x36, 0x75, 0x26, 0xfd, 0xd2, 0x8f, 0xa5, 0xaa, 0xac, 0x94, 0x9f, 0xc8, 0x12, - 0x52, 0x85, 0x84, 0x8d, 0x02, 0x02, 0x84, 0x04, 0x52, 0xdb, 0x14, 0x91, 0x0b, 0x4a, 0x65, 0xb8, - 0xa1, 0x17, 0x58, 0x8e, 0x3d, 0xc4, 0xab, 0xda, 0xbb, 0x8b, 0x77, 0x1d, 0xc9, 0x4f, 0xc0, 0xab, - 0xf0, 0x48, 0x3c, 0x01, 0xcf, 0x81, 0xbc, 0xeb, 0xa4, 0x09, 0xaa, 0x88, 0x0a, 0x57, 0xf6, 0xce, - 0x9c, 0x73, 0xe6, 0x78, 0x3c, 0x3b, 0xf0, 0x6c, 0x4a, 0x55, 0x52, 0x4c, 0xdc, 0x88, 0x67, 0x5e, - 0x12, 0xca, 0x84, 0x46, 0x3c, 0x17, 0x1e, 0xe3, 0x59, 0x18, 0x7b, 0x22, 0x2d, 0xa6, 0x94, 0x49, - 0x2f, 0xc6, 0x19, 0x8d, 0xb0, 0x7e, 0xb8, 0x22, 0xe7, 0x8a, 0x93, 0x7b, 0x0b, 0xb0, 0xab, 0xc1, - 0x6e, 0x0d, 0x76, 0x0d, 0xaa, 0x7f, 0x30, 0xe5, 0x7c, 0x9a, 0xa2, 0xa7, 0xd1, 0x93, 0xe2, 0x8b, - 0x87, 0x99, 0x50, 0xa5, 0x21, 0x3b, 0x3f, 0x9b, 0xb0, 0x3b, 0x42, 0x85, 0x91, 0xc2, 0x78, 0xa4, - 0xf1, 0x92, 0xec, 0x43, 0x7b, 0x86, 0x2c, 0xe6, 0xb9, 0xdd, 0x18, 0x34, 0x0e, 0x3b, 0x7e, 0x7d, - 0x22, 0xf7, 0xa1, 0x6b, 0x24, 0x03, 0x55, 0x0a, 0xb4, 0x9b, 0x3a, 0x09, 0x26, 0xf4, 0xb1, 0x14, - 0xb8, 0x04, 0x60, 0x61, 0x86, 0x76, 0x6b, 0x19, 0x70, 0x16, 0x66, 0x48, 0xde, 0xc2, 0xb6, 0x39, - 0x49, 0x7b, 0x73, 0xd0, 0x3a, 0xec, 0x0e, 0x5d, 0xf7, 0xcf, 0xe6, 0xdd, 0x55, 0x6f, 0xfe, 0x9c, - 0x4e, 0x52, 0xd8, 0x65, 0x3c, 0xc6, 0x20, 0x54, 0x2a, 0xa7, 0x93, 0x42, 0xa1, 0xb4, 0xb7, 0xb4, - 0xe2, 0xc9, 0xcd, 0x14, 0xa5, 0x7b, 0xc6, 0x63, 0x3c, 0x5a, 0xa8, 0x9c, 0x32, 0x95, 0x97, 0x7e, - 0x8f, 0xad, 0x04, 0xfb, 0x47, 0x70, 0xfb, 0x1a, 0x18, 0xf9, 0x1f, 0x5a, 0x97, 0x58, 0xd6, 0x5d, - 0xaa, 0x5e, 0xc9, 0x1e, 0x6c, 0xcd, 0xc2, 0xb4, 0x98, 0x37, 0xc7, 0x1c, 0x5e, 0x36, 0x5f, 0x34, - 0x9c, 0x6f, 0x0d, 0xe8, 0xad, 0x96, 0x26, 0x3d, 0x68, 0x8e, 0x47, 0x35, 0xbb, 0x39, 0x1e, 0x11, - 0x1b, 0xb6, 0x13, 0x0c, 0x53, 0x95, 0x94, 0x9a, 0x6e, 0xf9, 0xf3, 0x23, 0x79, 0x04, 0xc4, 0xbc, - 0x06, 0x31, 0xca, 0x28, 0xa7, 0x42, 0x51, 0xce, 0xea, 0xfe, 0xde, 0x32, 0x99, 0xd1, 0x55, 0x82, - 0xdc, 0x01, 0x10, 0x11, 0x0d, 0x26, 0x85, 0x0c, 0x68, 0x6c, 0x6f, 0x6a, 0x98, 0x25, 0x22, 0x7a, - 0x5c, 0xc8, 0x71, 0xec, 0x78, 0xd0, 0xf3, 0x51, 0x62, 0x3e, 0x43, 0x1f, 0xbf, 0x16, 0x28, 0x15, - 0xb9, 0x0b, 0xf5, 0x4f, 0x0a, 0x68, 0x2c, 0xed, 0xc6, 0xa0, 0x75, 0xd8, 0xf1, 0x3b, 0x26, 0x32, - 0x8e, 0xa5, 0x93, 0xc2, 0xee, 0x82, 0x20, 0x05, 0x67, 0x12, 0xc9, 0x27, 0xf8, 0x2f, 0xe2, 0x4c, - 0x85, 0x94, 0x61, 0x1e, 0xe4, 0x28, 0xf5, 0x57, 0x74, 0x87, 0x4f, 0xd7, 0x35, 0xff, 0x64, 0x4e, - 0x32, 0x82, 0x61, 0x65, 0xd7, 0xdf, 0x89, 0x96, 0xa2, 0xce, 0xf7, 0x26, 0xec, 0x5d, 0x07, 0x23, - 0x3e, 0x6c, 0x22, 0x9b, 0x19, 0x7f, 0xdd, 0xe1, 0xeb, 0xbf, 0x29, 0xe5, 0x9e, 0xb2, 0x59, 0xfd, - 0x8b, 0xb5, 0x16, 0x79, 0x05, 0xed, 0x8c, 0x17, 0x4c, 0x49, 0xbb, 0xa9, 0x55, 0x1f, 0xac, 0x53, - 0x7d, 0x57, 0xa1, 0xfd, 0x9a, 0x44, 0x46, 0x57, 0xf3, 0xdc, 0xd2, 0xfc, 0x87, 0xeb, 0xa7, 0xaf, - 0x7a, 0x7c, 0x10, 0x18, 0x2d, 0x66, 0xb9, 0xff, 0x1c, 0x3a, 0x0b, 0x5f, 0x37, 0x9a, 0xa9, 0xcf, - 0xb0, 0xa5, 0xfd, 0x90, 0x03, 0xe8, 0xa8, 0x50, 0x5e, 0x06, 0x22, 0x54, 0x49, 0x4d, 0xb5, 0xaa, - 0xc0, 0x79, 0xa8, 0x92, 0x2a, 0x99, 0x70, 0xa9, 0x4c, 0xd2, 0x68, 0x58, 0x55, 0x60, 0x9e, 0xcc, - 0x31, 0x8c, 0x03, 0xce, 0xd2, 0x52, 0x0f, 0x94, 0xe5, 0x5b, 0x55, 0xe0, 0x3d, 0x4b, 0x4b, 0x27, - 0x01, 0xb8, 0xf2, 0xfb, 0x0f, 0x45, 0x06, 0xd0, 0x15, 0x98, 0x67, 0x54, 0x4a, 0xca, 0x99, 0xac, - 0xe7, 0x76, 0x39, 0x34, 0xfc, 0xd1, 0x80, 0x1d, 0x53, 0xea, 0x5c, 0xf7, 0x8b, 0x5c, 0x40, 0xf7, - 0x0d, 0x65, 0x53, 0xcc, 0x45, 0x4e, 0x99, 0x22, 0xfb, 0xae, 0x59, 0x62, 0xee, 0x7c, 0x89, 0xb9, - 0xa7, 0xd5, 0x12, 0xeb, 0x7b, 0x37, 0xbc, 0xed, 0xce, 0xc6, 0xe3, 0x06, 0x49, 0x61, 0xbb, 0x9e, - 0x67, 0xb2, 0x76, 0xff, 0xac, 0xde, 0x94, 0xf5, 0xf5, 0x7e, 0xbb, 0x28, 0xce, 0xc6, 0xb1, 0x75, - 0xd1, 0x36, 0xb9, 0x49, 0x5b, 0x9b, 0x7f, 0xf2, 0x2b, 0x00, 0x00, 0xff, 0xff, 0x15, 0x03, 0xfe, - 0x52, 0xe9, 0x05, 0x00, 0x00, +var fileDescriptor_device_7496b084f8b5ea81 = []byte{ + // 682 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xdd, 0x6e, 0xd3, 0x4a, + 0x10, 0x3e, 0x4e, 0x9a, 0x26, 0x99, 0xf4, 0xb4, 0xe7, 0xec, 0xa9, 0x8e, 0xac, 0xf0, 0x17, 0x59, + 0x42, 0xaa, 0x40, 0xd8, 0x28, 0x45, 0x02, 0x01, 0x45, 0xa2, 0xa4, 0x40, 0x24, 0x68, 0x2b, 0xc3, + 0x0d, 0x20, 0x61, 0x39, 0xf6, 0x2a, 0x5e, 0xd5, 0xd9, 0x35, 0xbb, 0xeb, 0x54, 0xe6, 0x89, 0x78, + 0x06, 0xde, 0x81, 0xf7, 0xe1, 0x12, 0x79, 0x77, 0x9d, 0xb8, 0x80, 0x48, 0x0b, 0x57, 0xbb, 0x33, + 0xdf, 0x7c, 0x33, 0x93, 0xd9, 0x6f, 0x1c, 0x78, 0x38, 0x25, 0x32, 0xc9, 0x27, 0x6e, 0xc4, 0x66, + 0x5e, 0x12, 0x8a, 0x84, 0x44, 0x8c, 0x67, 0x1e, 0x65, 0xb3, 0x30, 0xf6, 0xb2, 0x34, 0x9f, 0x12, + 0x2a, 0xbc, 0x18, 0xcf, 0x49, 0x84, 0xbd, 0x8c, 0x33, 0xc9, 0x8c, 0xe1, 0x2a, 0x03, 0x5d, 0x5d, + 0x50, 0x5c, 0x45, 0x71, 0x0d, 0xc5, 0xd5, 0x51, 0xce, 0x36, 0xa0, 0xa7, 0x84, 0x4e, 0x31, 0xcf, + 0x38, 0xa1, 0xd2, 0xc7, 0x1f, 0x72, 0x2c, 0xa4, 0x83, 0xe1, 0xbf, 0x33, 0x5e, 0x91, 0x31, 0x2a, + 0x30, 0x3a, 0x84, 0x0d, 0x4d, 0x0b, 0xa6, 0x9c, 0xe5, 0x99, 0x6d, 0x0d, 0x9a, 0x3b, 0xbd, 0xe1, + 0x4d, 0xf7, 0xd7, 0x35, 0xdc, 0x91, 0x3a, 0x9e, 0x95, 0x14, 0xbf, 0x17, 0x2f, 0x0d, 0xe7, 0x4b, + 0x03, 0x7a, 0x35, 0x10, 0xfd, 0x0f, 0xeb, 0x73, 0x4c, 0x63, 0xc6, 0x6d, 0x6b, 0x60, 0xed, 0x74, + 0x7d, 0x63, 0xa1, 0x6b, 0x60, 0x68, 0x81, 0x2c, 0x32, 0x6c, 0x37, 0x14, 0x08, 0xda, 0xf5, 0xba, + 0xc8, 0x70, 0x2d, 0x80, 0x86, 0x33, 0x6c, 0x37, 0xeb, 0x01, 0x87, 0xe1, 0x0c, 0xa3, 0xe7, 0xd0, + 0xd6, 0x96, 0xb0, 0xd7, 0x54, 0xd3, 0xee, 0xea, 0xa6, 0x25, 0x8e, 0x24, 0x8e, 0x75, 0x7f, 0x7e, + 0x45, 0x47, 0xef, 0x00, 0x42, 0x29, 0x39, 0x99, 0xe4, 0x12, 0x0b, 0xbb, 0xa5, 0x92, 0x3d, 0xb8, + 0xc0, 0x04, 0xdc, 0xc7, 0x0b, 0xf6, 0x01, 0x95, 0xbc, 0xf0, 0x6b, 0xe9, 0xfa, 0x7b, 0xb0, 0xf5, + 0x1d, 0x8c, 0xfe, 0x81, 0xe6, 0x09, 0x2e, 0xcc, 0x40, 0xca, 0x2b, 0xda, 0x86, 0xd6, 0x3c, 0x4c, + 0xf3, 0x6a, 0x0e, 0xda, 0xb8, 0xdf, 0xb8, 0x67, 0x39, 0x9f, 0x2d, 0xd8, 0x3c, 0xdb, 0x37, 0xda, + 0x84, 0xc6, 0x78, 0x64, 0xd8, 0x8d, 0xf1, 0x08, 0xd9, 0xd0, 0x4e, 0x70, 0x98, 0xca, 0xa4, 0x50, + 0xf4, 0x8e, 0x5f, 0x99, 0xe8, 0x16, 0x20, 0x7d, 0x0d, 0x62, 0x2c, 0x22, 0x4e, 0x32, 0x49, 0x18, + 0x35, 0xa3, 0xfc, 0x57, 0x23, 0xa3, 0x25, 0x80, 0x8e, 0xa0, 0x97, 0x9c, 0x06, 0x29, 0x8b, 0xc2, + 0x94, 0xc8, 0xc2, 0x5e, 0x1b, 0x58, 0xe7, 0x9b, 0x6a, 0x79, 0xbc, 0x30, 0x2c, 0x1f, 0x92, 0xd3, + 0xea, 0xee, 0xb8, 0x65, 0xef, 0x75, 0x14, 0x5d, 0x06, 0xc8, 0x22, 0x12, 0x4c, 0x72, 0x11, 0x90, + 0xd8, 0xfc, 0x86, 0x4e, 0x16, 0x91, 0xfd, 0x5c, 0x8c, 0x63, 0xc7, 0x83, 0x4d, 0x1f, 0x0b, 0xcc, + 0xe7, 0xd8, 0xa8, 0x16, 0x5d, 0x01, 0xf3, 0xe4, 0x01, 0x89, 0x85, 0x12, 0x67, 0xd7, 0xef, 0x6a, + 0xcf, 0x38, 0x16, 0x4e, 0x0a, 0x5b, 0x0b, 0x82, 0x11, 0xf4, 0x1b, 0xf8, 0x3b, 0x62, 0x54, 0x86, + 0x84, 0x62, 0x1e, 0x70, 0x2c, 0x54, 0x91, 0xde, 0xf0, 0xce, 0xaa, 0x9f, 0xf1, 0xa4, 0x22, 0xe9, + 0x84, 0x61, 0x39, 0x11, 0x7f, 0x23, 0xaa, 0x79, 0x9d, 0x4f, 0x0d, 0xd8, 0xfe, 0x59, 0x18, 0xf2, + 0x61, 0x0d, 0xd3, 0xb9, 0x30, 0xcb, 0xf3, 0xe8, 0x77, 0x4a, 0xb9, 0x07, 0x74, 0x6e, 0xd4, 0xa3, + 0x72, 0xa1, 0x3d, 0x58, 0x9f, 0xb1, 0x9c, 0x4a, 0x61, 0x37, 0x54, 0xd6, 0xeb, 0xab, 0xb2, 0xbe, + 0x2c, 0xa3, 0x7d, 0x43, 0x42, 0xa3, 0xe5, 0x76, 0x34, 0x15, 0xff, 0xc6, 0xf9, 0xde, 0xf1, 0x55, + 0x86, 0xa3, 0xc5, 0x66, 0xf4, 0xef, 0x42, 0x77, 0xd1, 0xd7, 0x85, 0x64, 0xfb, 0x1e, 0x5a, 0xaa, + 0x1f, 0x74, 0x09, 0xba, 0x32, 0x14, 0x27, 0x41, 0x16, 0xca, 0xa4, 0x7a, 0xef, 0xd2, 0x71, 0x1c, + 0xca, 0xa4, 0x04, 0x13, 0x26, 0xa4, 0x06, 0x75, 0x8e, 0x4e, 0xe9, 0xa8, 0x40, 0x8e, 0xc3, 0x38, + 0x60, 0x34, 0x2d, 0x94, 0x66, 0x3b, 0x7e, 0xa7, 0x74, 0x1c, 0xd1, 0xb4, 0x70, 0x12, 0x80, 0x65, + 0xbf, 0x7f, 0x50, 0x64, 0x00, 0xbd, 0x0c, 0xf3, 0x19, 0x11, 0x82, 0x30, 0x2a, 0xcc, 0x6a, 0xd4, + 0x5d, 0xc3, 0xaf, 0x16, 0x6c, 0xe8, 0x52, 0xc7, 0x6a, 0x5e, 0xe8, 0x23, 0xf4, 0x6a, 0x1f, 0x52, + 0x34, 0x5c, 0x35, 0xd7, 0x1f, 0xbf, 0xc5, 0xfd, 0xdd, 0x0b, 0x71, 0xb4, 0xb0, 0x9d, 0xbf, 0x6e, + 0x5b, 0x28, 0x85, 0xb6, 0xd1, 0x3b, 0x5a, 0xb9, 0x97, 0x67, 0x37, 0xa9, 0xef, 0x9d, 0x3b, 0xbe, + 0xaa, 0xb7, 0xdf, 0x7e, 0xdb, 0x52, 0xff, 0x38, 0x93, 0x75, 0x75, 0xec, 0x7e, 0x0b, 0x00, 0x00, + 0xff, 0xff, 0x5f, 0xeb, 0xc2, 0x59, 0xb8, 0x06, 0x00, 0x00, } diff --git a/plugins/device/device.proto b/plugins/device/proto/device.proto similarity index 71% rename from plugins/device/device.proto rename to plugins/device/proto/device.proto index 3bfffd4a2..d1b68dc07 100644 --- a/plugins/device/device.proto +++ b/plugins/device/proto/device.proto @@ -1,14 +1,13 @@ syntax = "proto3"; package hashicorp.nomad.plugins.device; -option go_package = "device"; -import "google/protobuf/empty.proto"; +option go_package = "proto"; // DevicePlugin is the API exposed by device plugins service DevicePlugin { // Fingerprint allows the device plugin to return a set of // detected devices and provide a mechanism to update the state of // the device. - rpc Fingerprint(google.protobuf.Empty) returns (stream DetectedDevices) {} + rpc Fingerprint(FingerprintRequest) returns (stream FingerprintResponse) {} // Reserve is called by the client before starting an allocation // that requires access to the plugin’s devices. The plugin can use @@ -17,9 +16,19 @@ service DevicePlugin { rpc Reserve(ReserveRequest) returns (ReserveResponse) {} } -// DetectedDevices is the set of devices that the device plugin has -// detected and is exposing -message DetectedDevices { +// FingerprintRequest is used to request for devices to be fingerprinted. +message FingerprintRequest {} + +// FingerprintResponse returns a set of detected devices. +message FingerprintResponse { + // device_group is a group of devices that share a vendor, device_type, and + // device_name. This is returned as a set so that a single plugin could + // potentially detect several device types and models. + repeated DeviceGroup device_group = 1; +} + +// DeviceGroup is a group of devices that share a vendor, device type and name. +message DeviceGroup { // vendor is the name of the vendor of the device string vendor = 1; @@ -32,15 +41,15 @@ message DetectedDevices { // devices is the set of devices detected by the plugin. repeated DetectedDevice devices = 4; - // node_attributes allows adding node attributes to be used for - // constraints or affinities. - map node_attributes = 5; + // attributes allows adding attributes to be used for constraints or + // affinities. + map attributes = 5; } // DetectedDevice is a single detected device. message DetectedDevice { - // ID is the ID of the device. This ID is used during - // allocation. + // ID is the ID of the device. This ID is used during allocation and must be + // stable across restarts of the device driver. string ID = 1; // Health of the device. @@ -50,11 +59,19 @@ message DetectedDevice { // annotate the health field with a human readable reason. string health_description = 3; + // hw_locality is optionally set to expose hardware locality information for + // more optimal placement decisions. + DeviceLocality hw_locality = 4; +} + +// DeviceLocality is used to expose HW locality information about a device. +message DeviceLocality { // pci_bus_id is the PCI bus ID for the device. If reported, it // allows Nomad to make NUMA aware optimizations. - string pci_bus_id = 4; + string pci_bus_id = 1; } + // ReserveRequest is used to ask the device driver for information on // how to allocate the requested devices. message ReserveRequest { @@ -111,3 +128,4 @@ message DeviceSpec { // * m - allows task to create device files that do not yet exist string permissions = 3; } + diff --git a/plugins/device/server.go b/plugins/device/server.go new file mode 100644 index 000000000..12f2554a6 --- /dev/null +++ b/plugins/device/server.go @@ -0,0 +1,66 @@ +package device + +import ( + context "golang.org/x/net/context" + + plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/plugins/device/proto" +) + +// devicePluginServer wraps a device plugin and exposes it via gRPC. +type devicePluginServer struct { + broker *plugin.GRPCBroker + impl DevicePlugin +} + +func (d *devicePluginServer) Fingerprint(req *proto.FingerprintRequest, stream proto.DevicePlugin_FingerprintServer) error { + ctx := stream.Context() + outCh, err := d.impl.Fingerprint(ctx) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return nil + case resp, ok := <-outCh: + // The output channel has been closed, end the stream + if !ok { + return nil + } + + // Handle any error + if resp.Error != nil { + return resp.Error + } + + // Convert the devices + out := convertStructDeviceGroups(resp.Devices) + + // Build the response + presp := &proto.FingerprintResponse{ + DeviceGroup: out, + } + + // Send the devices + if err := stream.Send(presp); err != nil { + return err + } + } + } +} + +func (d *devicePluginServer) Reserve(ctx context.Context, req *proto.ReserveRequest) (*proto.ReserveResponse, error) { + resp, err := d.impl.Reserve(req.GetDeviceIds()) + if err != nil { + return nil, err + } + + // Make the response + presp := &proto.ReserveResponse{ + ContainerRes: convertStructContainerReservation(resp), + } + + return presp, nil +} diff --git a/plugins/device/util.go b/plugins/device/util.go new file mode 100644 index 000000000..92338931d --- /dev/null +++ b/plugins/device/util.go @@ -0,0 +1,275 @@ +package device + +import "github.com/hashicorp/nomad/plugins/device/proto" + +// convertProtoDeviceGroups converts between a list of proto and structs DeviceGroup +func convertProtoDeviceGroups(in []*proto.DeviceGroup) []*DeviceGroup { + if in == nil { + return nil + } + + out := make([]*DeviceGroup, len(in)) + for i, group := range in { + out[i] = convertProtoDeviceGroup(group) + } + + return out +} + +// convertProtoDeviceGroup converts between a proto and structs DeviceGroup +func convertProtoDeviceGroup(in *proto.DeviceGroup) *DeviceGroup { + if in == nil { + return nil + } + + return &DeviceGroup{ + Vendor: in.GetVendor(), + Type: in.GetDeviceType(), + Name: in.GetDeviceName(), + Devices: convertProtoDevices(in.GetDevices()), + Attributes: in.GetAttributes(), + } +} + +// convertProtoDevices converts between a list of proto and structs Device +func convertProtoDevices(in []*proto.DetectedDevice) []*Device { + if in == nil { + return nil + } + + out := make([]*Device, len(in)) + for i, d := range in { + out[i] = convertProtoDevice(d) + } + + return out +} + +// convertProtoDevice converts between a proto and structs Device +func convertProtoDevice(in *proto.DetectedDevice) *Device { + if in == nil { + return nil + } + + return &Device{ + ID: in.GetID(), + Healthy: in.GetHealthy(), + HealthDesc: in.GetHealthDescription(), + HwLocality: convertProtoDeviceLocality(in.GetHwLocality()), + } +} + +// convertProtoDeviceLocality converts between a proto and structs DeviceLocality +func convertProtoDeviceLocality(in *proto.DeviceLocality) *DeviceLocality { + if in == nil { + return nil + } + + return &DeviceLocality{ + PciBusID: in.GetPciBusId(), + } +} + +// convertProtoContainerReservation is used to convert between a proto and struct +// ContainerReservation +func convertProtoContainerReservation(in *proto.ContainerReservation) *ContainerReservation { + if in == nil { + return nil + } + + return &ContainerReservation{ + Envs: in.GetEnvs(), + Mounts: convertProtoMounts(in.GetMounts()), + Devices: convertProtoDeviceSpecs(in.GetDevices()), + } +} + +// convertProtoMount converts between a list of proto and structs Mount +func convertProtoMounts(in []*proto.Mount) []*Mount { + if in == nil { + return nil + } + + out := make([]*Mount, len(in)) + for i, d := range in { + out[i] = convertProtoMount(d) + } + + return out +} + +// convertProtoMount converts between a proto and structs Mount +func convertProtoMount(in *proto.Mount) *Mount { + if in == nil { + return nil + } + + return &Mount{ + TaskPath: in.GetTaskPath(), + HostPath: in.GetHostPath(), + ReadOnly: in.GetReadOnly(), + } +} + +// convertProtoDeviceSpecs converts between a list of proto and structs DeviceSpecs +func convertProtoDeviceSpecs(in []*proto.DeviceSpec) []*DeviceSpec { + if in == nil { + return nil + } + + out := make([]*DeviceSpec, len(in)) + for i, d := range in { + out[i] = convertProtoDeviceSpec(d) + } + + return out +} + +// convertProtoDeviceSpec converts between a proto and structs DeviceSpec +func convertProtoDeviceSpec(in *proto.DeviceSpec) *DeviceSpec { + if in == nil { + return nil + } + + return &DeviceSpec{ + TaskPath: in.GetTaskPath(), + HostPath: in.GetHostPath(), + CgroupPerms: in.GetPermissions(), + } +} + +// convertStructDeviceGroup converts between a list of struct and proto DeviceGroup +func convertStructDeviceGroups(in []*DeviceGroup) []*proto.DeviceGroup { + if in == nil { + return nil + } + + out := make([]*proto.DeviceGroup, len(in)) + for i, g := range in { + out[i] = convertStructDeviceGroup(g) + } + + return out +} + +// convertStructDeviceGroup converts between a struct and proto DeviceGroup +func convertStructDeviceGroup(in *DeviceGroup) *proto.DeviceGroup { + if in == nil { + return nil + } + + return &proto.DeviceGroup{ + Vendor: in.Vendor, + DeviceType: in.Type, + DeviceName: in.Name, + Devices: convertStructDevices(in.Devices), + Attributes: in.Attributes, + } +} + +// convertStructDevices converts between a list of struct and proto Device +func convertStructDevices(in []*Device) []*proto.DetectedDevice { + if in == nil { + return nil + } + + out := make([]*proto.DetectedDevice, len(in)) + for i, d := range in { + out[i] = convertStructDevice(d) + } + + return out +} + +// convertStructDevice converts between a struct and proto Device +func convertStructDevice(in *Device) *proto.DetectedDevice { + if in == nil { + return nil + } + + return &proto.DetectedDevice{ + ID: in.ID, + Healthy: in.Healthy, + HealthDescription: in.HealthDesc, + HwLocality: convertStructDeviceLocality(in.HwLocality), + } +} + +// convertStructDeviceLocality converts between a struct and proto DeviceLocality +func convertStructDeviceLocality(in *DeviceLocality) *proto.DeviceLocality { + if in == nil { + return nil + } + + return &proto.DeviceLocality{ + PciBusId: in.PciBusID, + } +} + +// convertStructContainerReservation is used to convert between a struct and +// proto ContainerReservation +func convertStructContainerReservation(in *ContainerReservation) *proto.ContainerReservation { + if in == nil { + return nil + } + + return &proto.ContainerReservation{ + Envs: in.Envs, + Mounts: convertStructMounts(in.Mounts), + Devices: convertStructDeviceSpecs(in.Devices), + } +} + +// convertStructMount converts between a list of structs and proto Mount +func convertStructMounts(in []*Mount) []*proto.Mount { + if in == nil { + return nil + } + + out := make([]*proto.Mount, len(in)) + for i, m := range in { + out[i] = convertStructMount(m) + } + + return out +} + +// convertStructMount converts between a struct and proto Mount +func convertStructMount(in *Mount) *proto.Mount { + if in == nil { + return nil + } + + return &proto.Mount{ + TaskPath: in.TaskPath, + HostPath: in.HostPath, + ReadOnly: in.ReadOnly, + } +} + +// convertStructDeviceSpecs converts between a list of struct and proto DeviceSpecs +func convertStructDeviceSpecs(in []*DeviceSpec) []*proto.DeviceSpec { + if in == nil { + return nil + } + + out := make([]*proto.DeviceSpec, len(in)) + for i, d := range in { + out[i] = convertStructDeviceSpec(d) + } + + return out +} + +// convertStructDeviceSpec converts between a struct and proto DeviceSpec +func convertStructDeviceSpec(in *DeviceSpec) *proto.DeviceSpec { + if in == nil { + return nil + } + + return &proto.DeviceSpec{ + TaskPath: in.TaskPath, + HostPath: in.HostPath, + Permissions: in.CgroupPerms, + } +} From 9ac7dcd1eea402b6c8f98910316e99ec36364c98 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 20 Aug 2018 15:19:08 -0700 Subject: [PATCH 2/2] Describe public interface of Fingerprint --- plugins/device/client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/device/client.go b/plugins/device/client.go index d0763088e..8bf6f27ff 100644 --- a/plugins/device/client.go +++ b/plugins/device/client.go @@ -20,6 +20,10 @@ type devicePluginClient struct { client proto.DevicePluginClient } +// Fingerprint is used to retrieve the set of devices and their health from the +// device plugin. An error may be immediately returned if the fingerprint call +// could not be made or as part of the streaming response. If the context is +// cancelled, the error will be propogated. func (d *devicePluginClient) Fingerprint(ctx context.Context) (<-chan *FingerprintResponse, error) { var req proto.FingerprintRequest stream, err := d.client.Fingerprint(ctx, &req)