open-nomad/plugins/device/client.go

151 lines
4 KiB
Go
Raw Normal View History

2018-08-13 17:29:29 +00:00
package device
import (
"context"
"io"
"time"
2018-08-13 17:29:29 +00:00
"github.com/LK4D4/joincontext"
"github.com/golang/protobuf/ptypes"
2019-01-23 14:27:14 +00:00
"github.com/hashicorp/nomad/helper/pluginutils/grpcutils"
2018-08-13 17:29:29 +00:00
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/device/proto"
)
// devicePluginClient implements the client side of a remote device plugin, using
// gRPC to communicate to the remote plugin.
type devicePluginClient struct {
// basePluginClient is embedded to give access to the base plugin methods.
*base.BasePluginClient
client proto.DevicePluginClient
// doneCtx is closed when the plugin exits
doneCtx context.Context
2018-08-13 17:29:29 +00:00
}
// Fingerprint is used to retrieve the set of devices and their health from the
// device plugin. An error may be immediately returned if the fingerprint call
// could not be made or as part of the streaming response. If the context is
2019-01-23 15:53:07 +00:00
// cancelled, the error will be propagated.
2018-08-13 17:29:29 +00:00
func (d *devicePluginClient) Fingerprint(ctx context.Context) (<-chan *FingerprintResponse, error) {
// Join the passed context and the shutdown context
2018-12-21 19:23:21 +00:00
joinedCtx, _ := joincontext.Join(ctx, d.doneCtx)
2018-08-13 17:29:29 +00:00
var req proto.FingerprintRequest
2018-12-21 19:23:21 +00:00
stream, err := d.client.Fingerprint(joinedCtx, &req)
2018-08-13 17:29:29 +00:00
if err != nil {
2018-12-21 19:23:21 +00:00
return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx)
2018-08-13 17:29:29 +00:00
}
out := make(chan *FingerprintResponse, 1)
go d.handleFingerprint(ctx, stream, out)
return out, nil
}
// handleFingerprint should be launched in a goroutine and handles converting
// the gRPC stream to a channel. Exits either when context is cancelled or the
// stream has an error.
func (d *devicePluginClient) handleFingerprint(
2018-12-21 19:23:21 +00:00
reqCtx context.Context,
2018-08-13 17:29:29 +00:00
stream proto.DevicePlugin_FingerprintClient,
out chan *FingerprintResponse) {
defer close(out)
2018-08-13 17:29:29 +00:00
for {
resp, err := stream.Recv()
if err != nil {
if err != io.EOF {
out <- &FingerprintResponse{
2018-12-21 19:23:21 +00:00
Error: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx),
2018-08-13 17:29:29 +00:00
}
}
// End the stream
return
}
// Send the response
f := &FingerprintResponse{
2018-08-13 17:29:29 +00:00
Devices: convertProtoDeviceGroups(resp.GetDeviceGroup()),
}
select {
2018-12-21 19:23:21 +00:00
case <-reqCtx.Done():
return
case out <- f:
}
2018-08-13 17:29:29 +00:00
}
}
func (d *devicePluginClient) Reserve(deviceIDs []string) (*ContainerReservation, error) {
// Build the request
req := &proto.ReserveRequest{
DeviceIds: deviceIDs,
}
// Make the request
2018-11-01 23:23:04 +00:00
resp, err := d.client.Reserve(d.doneCtx, req)
2018-08-13 17:29:29 +00:00
if err != nil {
2018-12-21 19:23:21 +00:00
return nil, grpcutils.HandleGrpcErr(err, d.doneCtx)
2018-08-13 17:29:29 +00:00
}
// Convert the response
out := convertProtoContainerReservation(resp.GetContainerRes())
return out, nil
}
2018-08-27 23:11:07 +00:00
// Stats is used to retrieve device statistics from the device plugin. An error
// may be immediately returned if the stats call could not be made or as part of
// the streaming response. If the context is cancelled, the error will be
2019-01-23 15:53:07 +00:00
// propagated.
func (d *devicePluginClient) Stats(ctx context.Context, interval time.Duration) (<-chan *StatsResponse, error) {
// Join the passed context and the shutdown context
2018-12-21 19:23:21 +00:00
joinedCtx, _ := joincontext.Join(ctx, d.doneCtx)
req := proto.StatsRequest{
CollectionInterval: ptypes.DurationProto(interval),
}
2018-12-21 19:23:21 +00:00
stream, err := d.client.Stats(joinedCtx, &req)
2018-08-27 23:11:07 +00:00
if err != nil {
2018-12-21 19:23:21 +00:00
return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx)
2018-08-27 23:11:07 +00:00
}
out := make(chan *StatsResponse, 1)
go d.handleStats(ctx, stream, out)
return out, nil
}
// handleStats should be launched in a goroutine and handles converting
// the gRPC stream to a channel. Exits either when context is cancelled or the
// stream has an error.
func (d *devicePluginClient) handleStats(
2018-12-21 19:23:21 +00:00
reqCtx context.Context,
2018-08-27 23:11:07 +00:00
stream proto.DevicePlugin_StatsClient,
out chan *StatsResponse) {
defer close(out)
2018-08-27 23:11:07 +00:00
for {
resp, err := stream.Recv()
if err != nil {
if err != io.EOF {
out <- &StatsResponse{
2018-12-21 19:23:21 +00:00
Error: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx),
2018-08-27 23:11:07 +00:00
}
}
// End the stream
return
}
// Send the response
s := &StatsResponse{
2018-08-27 23:11:07 +00:00
Groups: convertProtoDeviceGroupsStats(resp.GetGroups()),
}
select {
2018-12-21 19:23:21 +00:00
case <-reqCtx.Done():
return
case out <- s:
}
}
}