open-nomad/plugins/device/server.go
Tim Gross eb06c25d5f
deps: remove deprecated net/context (#13932)
The `golang.org/x/net/context` package was merged into the stdlib as of go
1.7. Update the imports to use the identical stdlib version. Clean up import
blocks for the impacted files to remove unnecessary package aliasing.
2022-07-28 14:46:56 -04:00

121 lines
2.4 KiB
Go

package device
import (
"context"
"fmt"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/plugins/device/proto"
)
// devicePluginServer wraps a device plugin and exposes it via gRPC.
type devicePluginServer struct {
broker *plugin.GRPCBroker
impl DevicePlugin
}
func (d *devicePluginServer) Fingerprint(req *proto.FingerprintRequest, stream proto.DevicePlugin_FingerprintServer) error {
ctx := stream.Context()
outCh, err := d.impl.Fingerprint(ctx)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case resp, ok := <-outCh:
// The output channel has been closed, end the stream
if !ok {
return nil
}
// Handle any error
if resp.Error != nil {
return resp.Error
}
// Convert the devices
out := convertStructDeviceGroups(resp.Devices)
// Build the response
presp := &proto.FingerprintResponse{
DeviceGroup: out,
}
// Send the devices
if err := stream.Send(presp); err != nil {
return err
}
}
}
}
func (d *devicePluginServer) Reserve(ctx context.Context, req *proto.ReserveRequest) (*proto.ReserveResponse, error) {
resp, err := d.impl.Reserve(req.GetDeviceIds())
if err != nil {
return nil, err
}
// Make the response
presp := &proto.ReserveResponse{
ContainerRes: convertStructContainerReservation(resp),
}
return presp, nil
}
func (d *devicePluginServer) Stats(req *proto.StatsRequest, stream proto.DevicePlugin_StatsServer) error {
ctx := stream.Context()
// Retrieve the collection interval
interval, err := ptypes.Duration(req.CollectionInterval)
if err != nil {
return fmt.Errorf("failed to parse collection interval: %v", err)
}
// Default the duration if we get an invalid duration
if interval.Nanoseconds() == 0 {
interval = time.Second
}
outCh, err := d.impl.Stats(ctx, interval)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case resp, ok := <-outCh:
// The output channel has been closed, end the stream
if !ok {
return nil
}
// Handle any error
if resp.Error != nil {
return resp.Error
}
// Convert the devices
out := convertStructDeviceGroupsStats(resp.Groups)
// Build the response
presp := &proto.StatsResponse{
Groups: out,
}
// Send the devices
if err := stream.Send(presp); err != nil {
return err
}
}
}
}