open-nomad/drivers/shared/executor/client.go
Michael Schurter 38821954b7 plugins: squelch context Canceled error logs
As far as I can tell this is the most straightforward and resilient way
to skip error logging on context cancellation with grpc streams. You
cannot compare the error against context.Canceled directly as it is of
type `*status.statusError`. The next best solution I found was:

```go
resp, err := stream.Recv()
if code, ok := err.(interface{ Code() code.Code }); ok {
	if code.Code == code.Canceled {
		return
	}
}
```

However I think checking ctx.Err() directly makes the code much easier
to read and is resilient against grpc API changes.
2019-02-21 15:32:18 -08:00

184 lines
4.3 KiB
Go

package executor
import (
"context"
"fmt"
"io"
"os"
"syscall"
"time"
"github.com/LK4D4/joincontext"
"github.com/golang/protobuf/ptypes"
hclog "github.com/hashicorp/go-hclog"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/executor/proto"
"github.com/hashicorp/nomad/plugins/drivers"
)
var _ Executor = (*grpcExecutorClient)(nil)
type grpcExecutorClient struct {
client proto.ExecutorClient
logger hclog.Logger
// doneCtx is close when the plugin exits
doneCtx context.Context
}
func (c *grpcExecutorClient) Launch(cmd *ExecCommand) (*ProcessState, error) {
ctx := context.Background()
req := &proto.LaunchRequest{
Cmd: cmd.Cmd,
Args: cmd.Args,
Resources: drivers.ResourcesToProto(cmd.Resources),
StdoutPath: cmd.StdoutPath,
StderrPath: cmd.StderrPath,
Env: cmd.Env,
User: cmd.User,
TaskDir: cmd.TaskDir,
ResourceLimits: cmd.ResourceLimits,
BasicProcessCgroup: cmd.BasicProcessCgroup,
Mounts: drivers.MountsToProto(cmd.Mounts),
Devices: drivers.DevicesToProto(cmd.Devices),
}
resp, err := c.client.Launch(ctx, req)
if err != nil {
return nil, err
}
ps, err := processStateFromProto(resp.Process)
if err != nil {
return nil, err
}
return ps, nil
}
func (c *grpcExecutorClient) Wait(ctx context.Context) (*ProcessState, error) {
// Join the passed context and the shutdown context
ctx, _ = joincontext.Join(ctx, c.doneCtx)
resp, err := c.client.Wait(ctx, &proto.WaitRequest{})
if err != nil {
return nil, err
}
ps, err := processStateFromProto(resp.Process)
if err != nil {
return nil, err
}
return ps, nil
}
func (c *grpcExecutorClient) Shutdown(signal string, gracePeriod time.Duration) error {
ctx := context.Background()
req := &proto.ShutdownRequest{
Signal: signal,
GracePeriod: gracePeriod.Nanoseconds(),
}
if _, err := c.client.Shutdown(ctx, req); err != nil {
return err
}
return nil
}
func (c *grpcExecutorClient) UpdateResources(r *drivers.Resources) error {
ctx := context.Background()
req := &proto.UpdateResourcesRequest{Resources: drivers.ResourcesToProto(r)}
if _, err := c.client.UpdateResources(ctx, req); err != nil {
return err
}
return nil
}
func (c *grpcExecutorClient) Version() (*ExecutorVersion, error) {
ctx := context.Background()
resp, err := c.client.Version(ctx, &proto.VersionRequest{})
if err != nil {
return nil, err
}
return &ExecutorVersion{Version: resp.Version}, nil
}
func (c *grpcExecutorClient) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) {
stream, err := c.client.Stats(ctx, &proto.StatsRequest{})
if err != nil {
return nil, err
}
ch := make(chan *cstructs.TaskResourceUsage)
go c.handleStats(ctx, stream, ch)
return ch, nil
}
func (c *grpcExecutorClient) handleStats(ctx context.Context, stream proto.Executor_StatsClient, ch chan<- *cstructs.TaskResourceUsage) {
defer close(ch)
for {
resp, err := stream.Recv()
if ctx.Err() != nil {
// Context canceled; exit gracefully
return
}
if err != nil {
if err != io.EOF {
c.logger.Error("error receiving stream from Stats executor RPC, closing stream", "error", err)
}
// End stream
return
}
stats, err := drivers.TaskStatsFromProto(resp.Stats)
if err != nil {
c.logger.Error("failed to decode stats from RPC", "error", err, "stats", resp.Stats)
continue
}
select {
case ch <- stats:
case <-ctx.Done():
return
}
}
}
func (c *grpcExecutorClient) Signal(s os.Signal) error {
ctx := context.Background()
sig, ok := s.(syscall.Signal)
if !ok {
return fmt.Errorf("unsupported signal type: %q", s.String())
}
req := &proto.SignalRequest{
Signal: int32(sig),
}
if _, err := c.client.Signal(ctx, req); err != nil {
return err
}
return nil
}
func (c *grpcExecutorClient) Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) {
ctx := context.Background()
pbDeadline, err := ptypes.TimestampProto(deadline)
if err != nil {
return nil, 0, err
}
req := &proto.ExecRequest{
Deadline: pbDeadline,
Cmd: cmd,
Args: args,
}
resp, err := c.client.Exec(ctx, req)
if err != nil {
return nil, 0, err
}
return resp.Output, int(resp.ExitCode), nil
}