Code review feedback

This commit is contained in:
Alex Dadgar 2018-02-13 14:54:27 -08:00
parent a9c4f8a4c8
commit e685211892
10 changed files with 157 additions and 99 deletions

View File

@ -537,7 +537,7 @@ func (c *Client) GetAllocStats(allocID string) (AllocStatsReporter, error) {
defer c.allocLock.RUnlock()
ar, ok := c.allocs[allocID]
if !ok {
return nil, fmt.Errorf("unknown allocation ID %q", allocID)
return nil, structs.NewErrUnknownAllocation(allocID)
}
return ar.StatsReporter(), nil
}
@ -565,7 +565,7 @@ func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) {
ar, ok := c.allocs[allocID]
if !ok {
return nil, fmt.Errorf("unknown allocation ID %q", allocID)
return nil, structs.NewErrUnknownAllocation(allocID)
}
return ar.GetAllocDir(), nil
}
@ -575,7 +575,7 @@ func (c *Client) GetClientAlloc(allocID string) (*structs.Allocation, error) {
all := c.allAllocs()
alloc, ok := all[allocID]
if !ok {
return nil, fmt.Errorf("unknown allocation ID %q", allocID)
return nil, structs.NewErrUnknownAllocation(allocID)
}
return alloc, nil
}

View File

@ -73,9 +73,11 @@ type FileSystem struct {
c *Client
}
func (f *FileSystem) register() {
func NewFileSystemEndpoint(c *Client) *FileSystem {
f := &FileSystem{c}
f.c.streamingRpcs.Register("FileSystem.Logs", f.logs)
f.c.streamingRpcs.Register("FileSystem.Stream", f.stream)
return f
}
// handleStreamResultError is a helper for sending an error with a potential
@ -185,11 +187,9 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
fs, err := f.c.GetAllocFS(req.AllocID)
if err != nil {
var code *int64
if strings.Contains(err.Error(), "unknown allocation") {
code := helper.Int64ToPtr(500)
if structs.IsErrUnknownAllocation(err) {
code = helper.Int64ToPtr(404)
} else {
code = helper.Int64ToPtr(500)
}
f.handleStreamResultError(err, code, encoder)
@ -359,11 +359,9 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
fs, err := f.c.GetAllocFS(req.AllocID)
if err != nil {
var code *int64
if strings.Contains(err.Error(), "unknown allocation") {
code := helper.Int64ToPtr(500)
if structs.IsErrUnknownAllocation(err) {
code = helper.Int64ToPtr(404)
} else {
code = helper.Int64ToPtr(500)
}
f.handleStreamResultError(err, code, encoder)
@ -372,11 +370,9 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
alloc, err := f.c.GetClientAlloc(req.AllocID)
if err != nil {
var code *int64
if strings.Contains(err.Error(), "unknown allocation") {
code := helper.Int64ToPtr(500)
if structs.IsErrUnknownAllocation(err) {
code = helper.Int64ToPtr(404)
} else {
code = helper.Int64ToPtr(500)
}
f.handleStreamResultError(err, code, encoder)
@ -628,7 +624,11 @@ func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string,
var changes *watch.FileChanges
// Start streaming the data
data := make([]byte, streamFrameSize)
bufSize := int64(streamFrameSize)
if limit > 0 && limit < streamFrameSize {
bufSize = limit
}
data := make([]byte, bufSize)
OUTER:
for {
// Read up to the max frame size

View File

@ -70,7 +70,7 @@ func TestFS_Stat_NoAlloc(t *testing.T) {
var resp cstructs.FsStatResponse
err := c.ClientRPC("FileSystem.Stat", req, &resp)
require.NotNil(err)
require.Contains(err.Error(), "unknown")
require.True(structs.IsErrUnknownAllocation(err))
}
func TestFS_Stat(t *testing.T) {
@ -147,12 +147,12 @@ func TestFS_Stat_ACL(t *testing.T) {
{
Name: "good token",
Token: tokenGood.SecretID,
ExpectedError: "unknown allocation",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
{
Name: "root token",
Token: root.SecretID,
ExpectedError: "unknown allocation",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
}
@ -195,7 +195,7 @@ func TestFS_List_NoAlloc(t *testing.T) {
var resp cstructs.FsListResponse
err := c.ClientRPC("FileSystem.List", req, &resp)
require.NotNil(err)
require.Contains(err.Error(), "unknown")
require.True(structs.IsErrUnknownAllocation(err))
}
func TestFS_List(t *testing.T) {
@ -272,12 +272,12 @@ func TestFS_List_ACL(t *testing.T) {
{
Name: "good token",
Token: tokenGood.SecretID,
ExpectedError: "unknown allocation",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
{
Name: "root token",
Token: root.SecretID,
ExpectedError: "unknown allocation",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
}
@ -368,7 +368,7 @@ OUTER:
continue
}
if strings.Contains(msg.Error.Error(), "unknown alloc") {
if structs.IsErrUnknownAllocation(msg.Error) {
break OUTER
} else {
t.Fatalf("bad error: %v", err)
@ -413,12 +413,12 @@ func TestFS_Stream_ACL(t *testing.T) {
{
Name: "good token",
Token: tokenGood.SecretID,
ExpectedError: "unknown allocation",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
{
Name: "root token",
Token: root.SecretID,
ExpectedError: "unknown allocation",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
}
@ -1005,7 +1005,7 @@ OUTER:
continue
}
if strings.Contains(msg.Error.Error(), "unknown alloc") {
if structs.IsErrUnknownAllocation(msg.Error) {
break OUTER
} else {
t.Fatalf("bad error: %v", err)
@ -1050,12 +1050,12 @@ func TestFS_Logs_ACL(t *testing.T) {
{
Name: "good token",
Token: tokenGood.SecretID,
ExpectedError: "unknown allocation",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
{
Name: "root token",
Token: root.SecretID,
ExpectedError: "unknown allocation",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
}

View File

@ -204,10 +204,7 @@ func (c *Client) streamingRpcConn(server *servers.Server, method string) (net.Co
func (c *Client) setupClientRpc() {
// Initialize the RPC handlers
c.endpoints.ClientStats = &ClientStats{c}
c.endpoints.FileSystem = &FileSystem{c}
// Initialize the streaming RPC handlers.
c.endpoints.FileSystem.register()
c.endpoints.FileSystem = NewFileSystemEndpoint(c)
// Create the RPC Server
c.rpcServer = rpc.NewServer()

View File

@ -18,7 +18,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestHTTP_AllocsList(t *testing.T) {
@ -77,9 +77,9 @@ func TestHTTP_AllocsList(t *testing.T) {
}
expectedMsg := "Task's sibling failed"
displayMsg1 := allocs[0].TaskStates["test"].Events[0].DisplayMessage
assert.Equal(t, expectedMsg, displayMsg1, "DisplayMessage should be set")
require.Equal(t, expectedMsg, displayMsg1, "DisplayMessage should be set")
displayMsg2 := allocs[0].TaskStates["test"].Events[0].DisplayMessage
assert.Equal(t, expectedMsg, displayMsg2, "DisplayMessage should be set")
require.Equal(t, expectedMsg, displayMsg2, "DisplayMessage should be set")
})
}
@ -150,7 +150,7 @@ func TestHTTP_AllocsPrefixList(t *testing.T) {
}
expectedMsg := "Task's sibling failed"
displayMsg1 := n[0].TaskStates["test"].Events[0].DisplayMessage
assert.Equal(t, expectedMsg, displayMsg1, "DisplayMessage should be set")
require.Equal(t, expectedMsg, displayMsg1, "DisplayMessage should be set")
})
}
@ -279,7 +279,7 @@ func TestHTTP_AllocStats(t *testing.T) {
func TestHTTP_AllocStats_ACL(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
httpACLTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
@ -294,8 +294,8 @@ func TestHTTP_AllocStats_ACL(t *testing.T) {
{
respW := httptest.NewRecorder()
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
@ -304,8 +304,8 @@ func TestHTTP_AllocStats_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyWrite))
setToken(req, token)
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with a valid token
@ -316,8 +316,8 @@ func TestHTTP_AllocStats_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1007, "valid", policy)
setToken(req, token)
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Contains(err.Error(), "unknown allocation ID")
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
}
// Try request with a management token
@ -326,8 +326,8 @@ func TestHTTP_AllocStats_ACL(t *testing.T) {
respW := httptest.NewRecorder()
setToken(req, s.RootToken)
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Contains(err.Error(), "unknown allocation ID")
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
}
})
}
@ -352,35 +352,35 @@ func TestHTTP_AllocSnapshot(t *testing.T) {
func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
httpACLTest(t, nil, func(s *TestAgent) {
// Request without a token fails
req, err := http.NewRequest("GET", "/v1/client/allocation/123/snapshot", nil)
assert.Nil(err)
require.Nil(err)
// Make the unauthorized request
respW := httptest.NewRecorder()
_, err = s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.EqualError(err, structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.EqualError(err, structs.ErrPermissionDenied.Error())
// Create an allocation
alloc := mock.Alloc()
validMigrateToken, err := structs.GenerateMigrateToken(alloc.ID, s.Agent.Client().Node().SecretID)
assert.Nil(err)
require.Nil(err)
// Request with a token succeeds
url := fmt.Sprintf("/v1/client/allocation/%s/snapshot", alloc.ID)
req, err = http.NewRequest("GET", url, nil)
assert.Nil(err)
require.Nil(err)
req.Header.Set("X-Nomad-Token", validMigrateToken)
// Make the unauthorized request
respW = httptest.NewRecorder()
_, err = s.Server.ClientAllocRequest(respW, req)
assert.NotContains(err.Error(), structs.ErrPermissionDenied.Error())
require.NotContains(err.Error(), structs.ErrPermissionDenied.Error())
})
}
@ -426,7 +426,7 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) {
// Remove the task dir to break Snapshot
os.RemoveAll(allocDir.TaskDirs["web"].LocalDir)
// Assert Snapshot fails
// require Snapshot fails
if err := allocDir.Snapshot(ioutil.Discard); err != nil {
s.logger.Printf("[DEBUG] agent.test: snapshot returned error: %v", err)
} else {
@ -510,7 +510,7 @@ func TestHTTP_AllocGC(t *testing.T) {
func TestHTTP_AllocGC_ACL(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
httpACLTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
@ -525,8 +525,8 @@ func TestHTTP_AllocGC_ACL(t *testing.T) {
{
respW := httptest.NewRecorder()
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
@ -535,8 +535,8 @@ func TestHTTP_AllocGC_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyWrite))
setToken(req, token)
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with a valid token
@ -547,8 +547,8 @@ func TestHTTP_AllocGC_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1007, "valid", policy)
setToken(req, token)
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Contains(err.Error(), "not present")
require.NotNil(err)
require.Contains(err.Error(), "not present")
}
// Try request with a management token
@ -557,8 +557,8 @@ func TestHTTP_AllocGC_ACL(t *testing.T) {
respW := httptest.NewRecorder()
setToken(req, s.RootToken)
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Contains(err.Error(), "not present")
require.NotNil(err)
require.Contains(err.Error(), "not present")
}
})
}
@ -584,20 +584,20 @@ func TestHTTP_AllocAllGC(t *testing.T) {
func TestHTTP_AllocAllGC_ACL(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
httpACLTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/client/gc", nil)
assert.Nil(err)
require.Nil(err)
// Try request without a token and expect failure
{
respW := httptest.NewRecorder()
_, err := s.Server.ClientGCRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
@ -606,8 +606,8 @@ func TestHTTP_AllocAllGC_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyRead))
setToken(req, token)
_, err := s.Server.ClientGCRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with a valid token
@ -616,8 +616,8 @@ func TestHTTP_AllocAllGC_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.NodePolicy(acl.PolicyWrite))
setToken(req, token)
_, err := s.Server.ClientGCRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
require.Nil(err)
require.Equal(http.StatusOK, respW.Code)
}
// Try request with a management token
@ -625,8 +625,8 @@ func TestHTTP_AllocAllGC_ACL(t *testing.T) {
respW := httptest.NewRecorder()
setToken(req, s.RootToken)
_, err := s.Server.ClientGCRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
require.Nil(err)
require.Equal(http.StatusOK, respW.Code)
}
})

View File

@ -102,9 +102,7 @@ func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Re
}
if rpcErr != nil {
if structs.IsErrNoNodeConn(rpcErr) {
rpcErr = CodedError(404, rpcErr.Error())
} else if strings.Contains(rpcErr.Error(), "unknown allocation") {
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
rpcErr = CodedError(404, rpcErr.Error())
}
@ -144,9 +142,7 @@ func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request
}
if rpcErr != nil {
if structs.IsErrNoNodeConn(rpcErr) {
rpcErr = CodedError(404, rpcErr.Error())
} else if strings.Contains(rpcErr.Error(), "unknown allocation") {
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
rpcErr = CodedError(404, rpcErr.Error())
}

View File

@ -2,7 +2,6 @@ package nomad
import (
"errors"
"fmt"
"io"
"net"
"strings"
@ -55,7 +54,7 @@ func (f *FileSystem) findNodeConnAndForward(snap *state.StateSnapshot,
}
if node == nil {
return fmt.Errorf("Unknown node %q", nodeID)
return structs.NewErrUnknownNode(nodeID)
}
// Determine the Server that has a connection to the node.
@ -88,7 +87,7 @@ func (f *FileSystem) forwardRegionStreamingRpc(conn io.ReadWriteCloser,
}
if allocResp.Alloc == nil {
f.handleStreamResultError(fmt.Errorf("unknown allocation %q", allocID), nil, encoder)
f.handleStreamResultError(structs.NewErrUnknownAllocation(allocID), helper.Int64ToPtr(404), encoder)
return
}
@ -153,7 +152,7 @@ func (f *FileSystem) List(args *cstructs.FsListRequest, reply *cstructs.FsListRe
return err
}
if alloc == nil {
return fmt.Errorf("unknown allocation %q", args.AllocID)
return structs.NewErrUnknownAllocation(args.AllocID)
}
// Get the connection to the client
@ -202,7 +201,7 @@ func (f *FileSystem) Stat(args *cstructs.FsStatRequest, reply *cstructs.FsStatRe
return err
}
if alloc == nil {
return fmt.Errorf("unknown allocation %q", args.AllocID)
return structs.NewErrUnknownAllocation(args.AllocID)
}
// Get the connection to the client
@ -266,7 +265,7 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
return
}
if alloc == nil {
f.handleStreamResultError(fmt.Errorf("unknown alloc ID %q", args.AllocID), helper.Int64ToPtr(404), encoder)
f.handleStreamResultError(structs.NewErrUnknownAllocation(args.AllocID), helper.Int64ToPtr(404), encoder)
return
}
nodeID := alloc.NodeID
@ -366,7 +365,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
return
}
if alloc == nil {
f.handleStreamResultError(fmt.Errorf("unknown alloc ID %q", args.AllocID), helper.Int64ToPtr(404), encoder)
f.handleStreamResultError(structs.NewErrUnknownAllocation(args.AllocID), helper.Int64ToPtr(404), encoder)
return
}
nodeID := alloc.NodeID

View File

@ -135,12 +135,12 @@ func TestClientFS_List_ACL(t *testing.T) {
{
Name: "good token",
Token: tokenGood.SecretID,
ExpectedError: "unknown allocation",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
{
Name: "root token",
Token: root.SecretID,
ExpectedError: "unknown allocation",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
}
@ -373,12 +373,12 @@ func TestClientFS_Stat_ACL(t *testing.T) {
{
Name: "good token",
Token: tokenGood.SecretID,
ExpectedError: "unknown allocation",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
{
Name: "root token",
Token: root.SecretID,
ExpectedError: "unknown allocation",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
}
@ -561,7 +561,7 @@ OUTER:
continue
}
if strings.Contains(msg.Error.Error(), "unknown alloc") {
if structs.IsErrUnknownAllocation(msg.Error) {
break OUTER
}
}
@ -598,12 +598,12 @@ func TestClientFS_Streaming_ACL(t *testing.T) {
{
Name: "good token",
Token: tokenGood.SecretID,
ExpectedError: "unknown alloc ID",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
{
Name: "root token",
Token: root.SecretID,
ExpectedError: "unknown alloc ID",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
}
@ -1303,7 +1303,7 @@ OUTER:
continue
}
if strings.Contains(msg.Error.Error(), "unknown alloc") {
if structs.IsErrUnknownAllocation(msg.Error) {
break OUTER
}
}
@ -1340,12 +1340,12 @@ func TestClientFS_Logs_ACL(t *testing.T) {
{
Name: "good token",
Token: tokenGood.SecretID,
ExpectedError: "unknown alloc ID",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
{
Name: "root token",
Token: root.SecretID,
ExpectedError: "unknown alloc ID",
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
}

View File

@ -2,6 +2,7 @@ package structs
import (
"errors"
"fmt"
"strings"
)
@ -11,7 +12,15 @@ const (
errTokenNotFound = "ACL token not found"
errPermissionDenied = "Permission denied"
errNoNodeConn = "No path to node"
errUnknownMethod = "unknown rpc method"
errUnknownMethod = "Unknown rpc method"
// Prefix based errors that are used to check if the error is of a given
// type. These errors should be created with the associated constructor.
ErrUnknownAllocationPrefix = "Unknown allocation"
ErrUnknownNodePrefix = "Unknown node"
ErrUnknownJobPrefix = "Unknown job"
ErrUnknownEvaluationPrefix = "Unknown evaluation"
ErrUnknownDeploymentPrefix = "Unknown deployment"
)
var (
@ -57,3 +66,61 @@ func IsErrNoNodeConn(err error) bool {
func IsErrUnknownMethod(err error) bool {
return err != nil && strings.Contains(err.Error(), errUnknownMethod)
}
// NewErrUnknownAllocation returns a new error caused by the allocation being
// unknown.
func NewErrUnknownAllocation(allocID string) error {
return fmt.Errorf("%s %q", ErrUnknownAllocationPrefix, allocID)
}
// NewErrUnknownNode returns a new error caused by the node being unknown.
func NewErrUnknownNode(nodeID string) error {
return fmt.Errorf("%s %q", ErrUnknownNodePrefix, nodeID)
}
// NewErrUnknownJob returns a new error caused by the job being unknown.
func NewErrUnknownJob(jobID string) error {
return fmt.Errorf("%s %q", ErrUnknownJobPrefix, jobID)
}
// NewErrUnknownEvaluation returns a new error caused by the evaluation being
// unknown.
func NewErrUnknownEvaluation(evaluationID string) error {
return fmt.Errorf("%s %q", ErrUnknownEvaluationPrefix, evaluationID)
}
// NewErrUnknownDeployment returns a new error caused by the deployment being
// unknown.
func NewErrUnknownDeployment(deploymentID string) error {
return fmt.Errorf("%s %q", ErrUnknownDeploymentPrefix, deploymentID)
}
// IsErrUnknownAllocation returns whether the error is due to an unknown
// allocation.
func IsErrUnknownAllocation(err error) bool {
return err != nil && strings.Contains(err.Error(), ErrUnknownAllocationPrefix)
}
// IsErrUnknownNode returns whether the error is due to an unknown
// node.
func IsErrUnknownNode(err error) bool {
return err != nil && strings.Contains(err.Error(), ErrUnknownNodePrefix)
}
// IsErrUnknownJob returns whether the error is due to an unknown
// job.
func IsErrUnknownJob(err error) bool {
return err != nil && strings.Contains(err.Error(), ErrUnknownJobPrefix)
}
// IsErrUnknownEvaluation returns whether the error is due to an unknown
// evaluation.
func IsErrUnknownEvaluation(err error) bool {
return err != nil && strings.Contains(err.Error(), ErrUnknownEvaluationPrefix)
}
// IsErrUnknownDeployment returns whether the error is due to an unknown
// deployment.
func IsErrUnknownDeployment(err error) bool {
return err != nil && strings.Contains(err.Error(), ErrUnknownDeploymentPrefix)
}

View File

@ -53,7 +53,7 @@ func (s *StreamingRpcRegistery) GetHandler(method string) (StreamingRpcHandler,
}
// Bridge is used to just link two connections together and copy traffic
func Bridge(a, b io.ReadWriteCloser) error {
func Bridge(a, b io.ReadWriteCloser) {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
@ -69,5 +69,4 @@ func Bridge(a, b io.ReadWriteCloser) error {
b.Close()
}()
wg.Wait()
return nil
}