From e685211892c3c7001b77dfcb4e9ce7ba3fd8c961 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 13 Feb 2018 14:54:27 -0800 Subject: [PATCH] Code review feedback --- client/client.go | 6 +-- client/fs_endpoint.go | 28 +++++----- client/fs_endpoint_test.go | 24 ++++----- client/rpc.go | 5 +- command/agent/alloc_endpoint_test.go | 80 ++++++++++++++-------------- command/agent/fs_endpoint.go | 8 +-- nomad/client_fs_endpoint.go | 13 +++-- nomad/client_fs_endpoint_test.go | 20 +++---- nomad/structs/errors.go | 69 +++++++++++++++++++++++- nomad/structs/streaming_rpc.go | 3 +- 10 files changed, 157 insertions(+), 99 deletions(-) diff --git a/client/client.go b/client/client.go index 99ed1bd4a..a4a85c031 100644 --- a/client/client.go +++ b/client/client.go @@ -537,7 +537,7 @@ func (c *Client) GetAllocStats(allocID string) (AllocStatsReporter, error) { defer c.allocLock.RUnlock() ar, ok := c.allocs[allocID] if !ok { - return nil, fmt.Errorf("unknown allocation ID %q", allocID) + return nil, structs.NewErrUnknownAllocation(allocID) } return ar.StatsReporter(), nil } @@ -565,7 +565,7 @@ func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) { ar, ok := c.allocs[allocID] if !ok { - return nil, fmt.Errorf("unknown allocation ID %q", allocID) + return nil, structs.NewErrUnknownAllocation(allocID) } return ar.GetAllocDir(), nil } @@ -575,7 +575,7 @@ func (c *Client) GetClientAlloc(allocID string) (*structs.Allocation, error) { all := c.allAllocs() alloc, ok := all[allocID] if !ok { - return nil, fmt.Errorf("unknown allocation ID %q", allocID) + return nil, structs.NewErrUnknownAllocation(allocID) } return alloc, nil } diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index d812ed0f3..eaff009c7 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -73,9 +73,11 @@ type FileSystem struct { c *Client } -func (f *FileSystem) register() { +func NewFileSystemEndpoint(c *Client) *FileSystem { + f := &FileSystem{c} f.c.streamingRpcs.Register("FileSystem.Logs", f.logs) f.c.streamingRpcs.Register("FileSystem.Stream", f.stream) + return f } // handleStreamResultError is a helper for sending an error with a potential @@ -185,11 +187,9 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { fs, err := f.c.GetAllocFS(req.AllocID) if err != nil { - var code *int64 - if strings.Contains(err.Error(), "unknown allocation") { + code := helper.Int64ToPtr(500) + if structs.IsErrUnknownAllocation(err) { code = helper.Int64ToPtr(404) - } else { - code = helper.Int64ToPtr(500) } f.handleStreamResultError(err, code, encoder) @@ -359,11 +359,9 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { fs, err := f.c.GetAllocFS(req.AllocID) if err != nil { - var code *int64 - if strings.Contains(err.Error(), "unknown allocation") { + code := helper.Int64ToPtr(500) + if structs.IsErrUnknownAllocation(err) { code = helper.Int64ToPtr(404) - } else { - code = helper.Int64ToPtr(500) } f.handleStreamResultError(err, code, encoder) @@ -372,11 +370,9 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { alloc, err := f.c.GetClientAlloc(req.AllocID) if err != nil { - var code *int64 - if strings.Contains(err.Error(), "unknown allocation") { + code := helper.Int64ToPtr(500) + if structs.IsErrUnknownAllocation(err) { code = helper.Int64ToPtr(404) - } else { - code = helper.Int64ToPtr(500) } f.handleStreamResultError(err, code, encoder) @@ -628,7 +624,11 @@ func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, var changes *watch.FileChanges // Start streaming the data - data := make([]byte, streamFrameSize) + bufSize := int64(streamFrameSize) + if limit > 0 && limit < streamFrameSize { + bufSize = limit + } + data := make([]byte, bufSize) OUTER: for { // Read up to the max frame size diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index 4256f16ce..4e90daf06 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -70,7 +70,7 @@ func TestFS_Stat_NoAlloc(t *testing.T) { var resp cstructs.FsStatResponse err := c.ClientRPC("FileSystem.Stat", req, &resp) require.NotNil(err) - require.Contains(err.Error(), "unknown") + require.True(structs.IsErrUnknownAllocation(err)) } func TestFS_Stat(t *testing.T) { @@ -147,12 +147,12 @@ func TestFS_Stat_ACL(t *testing.T) { { Name: "good token", Token: tokenGood.SecretID, - ExpectedError: "unknown allocation", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, { Name: "root token", Token: root.SecretID, - ExpectedError: "unknown allocation", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, } @@ -195,7 +195,7 @@ func TestFS_List_NoAlloc(t *testing.T) { var resp cstructs.FsListResponse err := c.ClientRPC("FileSystem.List", req, &resp) require.NotNil(err) - require.Contains(err.Error(), "unknown") + require.True(structs.IsErrUnknownAllocation(err)) } func TestFS_List(t *testing.T) { @@ -272,12 +272,12 @@ func TestFS_List_ACL(t *testing.T) { { Name: "good token", Token: tokenGood.SecretID, - ExpectedError: "unknown allocation", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, { Name: "root token", Token: root.SecretID, - ExpectedError: "unknown allocation", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, } @@ -368,7 +368,7 @@ OUTER: continue } - if strings.Contains(msg.Error.Error(), "unknown alloc") { + if structs.IsErrUnknownAllocation(msg.Error) { break OUTER } else { t.Fatalf("bad error: %v", err) @@ -413,12 +413,12 @@ func TestFS_Stream_ACL(t *testing.T) { { Name: "good token", Token: tokenGood.SecretID, - ExpectedError: "unknown allocation", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, { Name: "root token", Token: root.SecretID, - ExpectedError: "unknown allocation", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, } @@ -1005,7 +1005,7 @@ OUTER: continue } - if strings.Contains(msg.Error.Error(), "unknown alloc") { + if structs.IsErrUnknownAllocation(msg.Error) { break OUTER } else { t.Fatalf("bad error: %v", err) @@ -1050,12 +1050,12 @@ func TestFS_Logs_ACL(t *testing.T) { { Name: "good token", Token: tokenGood.SecretID, - ExpectedError: "unknown allocation", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, { Name: "root token", Token: root.SecretID, - ExpectedError: "unknown allocation", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, } diff --git a/client/rpc.go b/client/rpc.go index 9d646c278..0144c2bd2 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -204,10 +204,7 @@ func (c *Client) streamingRpcConn(server *servers.Server, method string) (net.Co func (c *Client) setupClientRpc() { // Initialize the RPC handlers c.endpoints.ClientStats = &ClientStats{c} - c.endpoints.FileSystem = &FileSystem{c} - - // Initialize the streaming RPC handlers. - c.endpoints.FileSystem.register() + c.endpoints.FileSystem = NewFileSystemEndpoint(c) // Create the RPC Server c.rpcServer = rpc.NewServer() diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index 99b9109f4..0393defd6 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -18,7 +18,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestHTTP_AllocsList(t *testing.T) { @@ -77,9 +77,9 @@ func TestHTTP_AllocsList(t *testing.T) { } expectedMsg := "Task's sibling failed" displayMsg1 := allocs[0].TaskStates["test"].Events[0].DisplayMessage - assert.Equal(t, expectedMsg, displayMsg1, "DisplayMessage should be set") + require.Equal(t, expectedMsg, displayMsg1, "DisplayMessage should be set") displayMsg2 := allocs[0].TaskStates["test"].Events[0].DisplayMessage - assert.Equal(t, expectedMsg, displayMsg2, "DisplayMessage should be set") + require.Equal(t, expectedMsg, displayMsg2, "DisplayMessage should be set") }) } @@ -150,7 +150,7 @@ func TestHTTP_AllocsPrefixList(t *testing.T) { } expectedMsg := "Task's sibling failed" displayMsg1 := n[0].TaskStates["test"].Events[0].DisplayMessage - assert.Equal(t, expectedMsg, displayMsg1, "DisplayMessage should be set") + require.Equal(t, expectedMsg, displayMsg1, "DisplayMessage should be set") }) } @@ -279,7 +279,7 @@ func TestHTTP_AllocStats(t *testing.T) { func TestHTTP_AllocStats_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) httpACLTest(t, nil, func(s *TestAgent) { state := s.Agent.server.State() @@ -294,8 +294,8 @@ func TestHTTP_AllocStats_ACL(t *testing.T) { { respW := httptest.NewRecorder() _, err := s.Server.ClientAllocRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with an invalid token and expect failure @@ -304,8 +304,8 @@ func TestHTTP_AllocStats_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyWrite)) setToken(req, token) _, err := s.Server.ClientAllocRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with a valid token @@ -316,8 +316,8 @@ func TestHTTP_AllocStats_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1007, "valid", policy) setToken(req, token) _, err := s.Server.ClientAllocRequest(respW, req) - assert.NotNil(err) - assert.Contains(err.Error(), "unknown allocation ID") + require.NotNil(err) + require.True(structs.IsErrUnknownAllocation(err)) } // Try request with a management token @@ -326,8 +326,8 @@ func TestHTTP_AllocStats_ACL(t *testing.T) { respW := httptest.NewRecorder() setToken(req, s.RootToken) _, err := s.Server.ClientAllocRequest(respW, req) - assert.NotNil(err) - assert.Contains(err.Error(), "unknown allocation ID") + require.NotNil(err) + require.True(structs.IsErrUnknownAllocation(err)) } }) } @@ -352,35 +352,35 @@ func TestHTTP_AllocSnapshot(t *testing.T) { func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) httpACLTest(t, nil, func(s *TestAgent) { // Request without a token fails req, err := http.NewRequest("GET", "/v1/client/allocation/123/snapshot", nil) - assert.Nil(err) + require.Nil(err) // Make the unauthorized request respW := httptest.NewRecorder() _, err = s.Server.ClientAllocRequest(respW, req) - assert.NotNil(err) - assert.EqualError(err, structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.EqualError(err, structs.ErrPermissionDenied.Error()) // Create an allocation alloc := mock.Alloc() validMigrateToken, err := structs.GenerateMigrateToken(alloc.ID, s.Agent.Client().Node().SecretID) - assert.Nil(err) + require.Nil(err) // Request with a token succeeds url := fmt.Sprintf("/v1/client/allocation/%s/snapshot", alloc.ID) req, err = http.NewRequest("GET", url, nil) - assert.Nil(err) + require.Nil(err) req.Header.Set("X-Nomad-Token", validMigrateToken) // Make the unauthorized request respW = httptest.NewRecorder() _, err = s.Server.ClientAllocRequest(respW, req) - assert.NotContains(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotContains(err.Error(), structs.ErrPermissionDenied.Error()) }) } @@ -426,7 +426,7 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) { // Remove the task dir to break Snapshot os.RemoveAll(allocDir.TaskDirs["web"].LocalDir) - // Assert Snapshot fails + // require Snapshot fails if err := allocDir.Snapshot(ioutil.Discard); err != nil { s.logger.Printf("[DEBUG] agent.test: snapshot returned error: %v", err) } else { @@ -510,7 +510,7 @@ func TestHTTP_AllocGC(t *testing.T) { func TestHTTP_AllocGC_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) httpACLTest(t, nil, func(s *TestAgent) { state := s.Agent.server.State() @@ -525,8 +525,8 @@ func TestHTTP_AllocGC_ACL(t *testing.T) { { respW := httptest.NewRecorder() _, err := s.Server.ClientAllocRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with an invalid token and expect failure @@ -535,8 +535,8 @@ func TestHTTP_AllocGC_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyWrite)) setToken(req, token) _, err := s.Server.ClientAllocRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with a valid token @@ -547,8 +547,8 @@ func TestHTTP_AllocGC_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1007, "valid", policy) setToken(req, token) _, err := s.Server.ClientAllocRequest(respW, req) - assert.NotNil(err) - assert.Contains(err.Error(), "not present") + require.NotNil(err) + require.Contains(err.Error(), "not present") } // Try request with a management token @@ -557,8 +557,8 @@ func TestHTTP_AllocGC_ACL(t *testing.T) { respW := httptest.NewRecorder() setToken(req, s.RootToken) _, err := s.Server.ClientAllocRequest(respW, req) - assert.NotNil(err) - assert.Contains(err.Error(), "not present") + require.NotNil(err) + require.Contains(err.Error(), "not present") } }) } @@ -584,20 +584,20 @@ func TestHTTP_AllocAllGC(t *testing.T) { func TestHTTP_AllocAllGC_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) httpACLTest(t, nil, func(s *TestAgent) { state := s.Agent.server.State() // Make the HTTP request req, err := http.NewRequest("GET", "/v1/client/gc", nil) - assert.Nil(err) + require.Nil(err) // Try request without a token and expect failure { respW := httptest.NewRecorder() _, err := s.Server.ClientGCRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with an invalid token and expect failure @@ -606,8 +606,8 @@ func TestHTTP_AllocAllGC_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyRead)) setToken(req, token) _, err := s.Server.ClientGCRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with a valid token @@ -616,8 +616,8 @@ func TestHTTP_AllocAllGC_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.NodePolicy(acl.PolicyWrite)) setToken(req, token) _, err := s.Server.ClientGCRequest(respW, req) - assert.Nil(err) - assert.Equal(http.StatusOK, respW.Code) + require.Nil(err) + require.Equal(http.StatusOK, respW.Code) } // Try request with a management token @@ -625,8 +625,8 @@ func TestHTTP_AllocAllGC_ACL(t *testing.T) { respW := httptest.NewRecorder() setToken(req, s.RootToken) _, err := s.Server.ClientGCRequest(respW, req) - assert.Nil(err) - assert.Equal(http.StatusOK, respW.Code) + require.Nil(err) + require.Equal(http.StatusOK, respW.Code) } }) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 7a651d9b1..465451917 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -102,9 +102,7 @@ func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Re } if rpcErr != nil { - if structs.IsErrNoNodeConn(rpcErr) { - rpcErr = CodedError(404, rpcErr.Error()) - } else if strings.Contains(rpcErr.Error(), "unknown allocation") { + if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) { rpcErr = CodedError(404, rpcErr.Error()) } @@ -144,9 +142,7 @@ func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request } if rpcErr != nil { - if structs.IsErrNoNodeConn(rpcErr) { - rpcErr = CodedError(404, rpcErr.Error()) - } else if strings.Contains(rpcErr.Error(), "unknown allocation") { + if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) { rpcErr = CodedError(404, rpcErr.Error()) } diff --git a/nomad/client_fs_endpoint.go b/nomad/client_fs_endpoint.go index b1731bfe8..45a591eeb 100644 --- a/nomad/client_fs_endpoint.go +++ b/nomad/client_fs_endpoint.go @@ -2,7 +2,6 @@ package nomad import ( "errors" - "fmt" "io" "net" "strings" @@ -55,7 +54,7 @@ func (f *FileSystem) findNodeConnAndForward(snap *state.StateSnapshot, } if node == nil { - return fmt.Errorf("Unknown node %q", nodeID) + return structs.NewErrUnknownNode(nodeID) } // Determine the Server that has a connection to the node. @@ -88,7 +87,7 @@ func (f *FileSystem) forwardRegionStreamingRpc(conn io.ReadWriteCloser, } if allocResp.Alloc == nil { - f.handleStreamResultError(fmt.Errorf("unknown allocation %q", allocID), nil, encoder) + f.handleStreamResultError(structs.NewErrUnknownAllocation(allocID), helper.Int64ToPtr(404), encoder) return } @@ -153,7 +152,7 @@ func (f *FileSystem) List(args *cstructs.FsListRequest, reply *cstructs.FsListRe return err } if alloc == nil { - return fmt.Errorf("unknown allocation %q", args.AllocID) + return structs.NewErrUnknownAllocation(args.AllocID) } // Get the connection to the client @@ -202,7 +201,7 @@ func (f *FileSystem) Stat(args *cstructs.FsStatRequest, reply *cstructs.FsStatRe return err } if alloc == nil { - return fmt.Errorf("unknown allocation %q", args.AllocID) + return structs.NewErrUnknownAllocation(args.AllocID) } // Get the connection to the client @@ -266,7 +265,7 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { return } if alloc == nil { - f.handleStreamResultError(fmt.Errorf("unknown alloc ID %q", args.AllocID), helper.Int64ToPtr(404), encoder) + f.handleStreamResultError(structs.NewErrUnknownAllocation(args.AllocID), helper.Int64ToPtr(404), encoder) return } nodeID := alloc.NodeID @@ -366,7 +365,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { return } if alloc == nil { - f.handleStreamResultError(fmt.Errorf("unknown alloc ID %q", args.AllocID), helper.Int64ToPtr(404), encoder) + f.handleStreamResultError(structs.NewErrUnknownAllocation(args.AllocID), helper.Int64ToPtr(404), encoder) return } nodeID := alloc.NodeID diff --git a/nomad/client_fs_endpoint_test.go b/nomad/client_fs_endpoint_test.go index 8f8cfa11c..c396f833f 100644 --- a/nomad/client_fs_endpoint_test.go +++ b/nomad/client_fs_endpoint_test.go @@ -135,12 +135,12 @@ func TestClientFS_List_ACL(t *testing.T) { { Name: "good token", Token: tokenGood.SecretID, - ExpectedError: "unknown allocation", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, { Name: "root token", Token: root.SecretID, - ExpectedError: "unknown allocation", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, } @@ -373,12 +373,12 @@ func TestClientFS_Stat_ACL(t *testing.T) { { Name: "good token", Token: tokenGood.SecretID, - ExpectedError: "unknown allocation", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, { Name: "root token", Token: root.SecretID, - ExpectedError: "unknown allocation", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, } @@ -561,7 +561,7 @@ OUTER: continue } - if strings.Contains(msg.Error.Error(), "unknown alloc") { + if structs.IsErrUnknownAllocation(msg.Error) { break OUTER } } @@ -598,12 +598,12 @@ func TestClientFS_Streaming_ACL(t *testing.T) { { Name: "good token", Token: tokenGood.SecretID, - ExpectedError: "unknown alloc ID", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, { Name: "root token", Token: root.SecretID, - ExpectedError: "unknown alloc ID", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, } @@ -1303,7 +1303,7 @@ OUTER: continue } - if strings.Contains(msg.Error.Error(), "unknown alloc") { + if structs.IsErrUnknownAllocation(msg.Error) { break OUTER } } @@ -1340,12 +1340,12 @@ func TestClientFS_Logs_ACL(t *testing.T) { { Name: "good token", Token: tokenGood.SecretID, - ExpectedError: "unknown alloc ID", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, { Name: "root token", Token: root.SecretID, - ExpectedError: "unknown alloc ID", + ExpectedError: structs.ErrUnknownAllocationPrefix, }, } diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index 78d8fde5c..f1139cbd7 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -2,6 +2,7 @@ package structs import ( "errors" + "fmt" "strings" ) @@ -11,7 +12,15 @@ const ( errTokenNotFound = "ACL token not found" errPermissionDenied = "Permission denied" errNoNodeConn = "No path to node" - errUnknownMethod = "unknown rpc method" + errUnknownMethod = "Unknown rpc method" + + // Prefix based errors that are used to check if the error is of a given + // type. These errors should be created with the associated constructor. + ErrUnknownAllocationPrefix = "Unknown allocation" + ErrUnknownNodePrefix = "Unknown node" + ErrUnknownJobPrefix = "Unknown job" + ErrUnknownEvaluationPrefix = "Unknown evaluation" + ErrUnknownDeploymentPrefix = "Unknown deployment" ) var ( @@ -57,3 +66,61 @@ func IsErrNoNodeConn(err error) bool { func IsErrUnknownMethod(err error) bool { return err != nil && strings.Contains(err.Error(), errUnknownMethod) } + +// NewErrUnknownAllocation returns a new error caused by the allocation being +// unknown. +func NewErrUnknownAllocation(allocID string) error { + return fmt.Errorf("%s %q", ErrUnknownAllocationPrefix, allocID) +} + +// NewErrUnknownNode returns a new error caused by the node being unknown. +func NewErrUnknownNode(nodeID string) error { + return fmt.Errorf("%s %q", ErrUnknownNodePrefix, nodeID) +} + +// NewErrUnknownJob returns a new error caused by the job being unknown. +func NewErrUnknownJob(jobID string) error { + return fmt.Errorf("%s %q", ErrUnknownJobPrefix, jobID) +} + +// NewErrUnknownEvaluation returns a new error caused by the evaluation being +// unknown. +func NewErrUnknownEvaluation(evaluationID string) error { + return fmt.Errorf("%s %q", ErrUnknownEvaluationPrefix, evaluationID) +} + +// NewErrUnknownDeployment returns a new error caused by the deployment being +// unknown. +func NewErrUnknownDeployment(deploymentID string) error { + return fmt.Errorf("%s %q", ErrUnknownDeploymentPrefix, deploymentID) +} + +// IsErrUnknownAllocation returns whether the error is due to an unknown +// allocation. +func IsErrUnknownAllocation(err error) bool { + return err != nil && strings.Contains(err.Error(), ErrUnknownAllocationPrefix) +} + +// IsErrUnknownNode returns whether the error is due to an unknown +// node. +func IsErrUnknownNode(err error) bool { + return err != nil && strings.Contains(err.Error(), ErrUnknownNodePrefix) +} + +// IsErrUnknownJob returns whether the error is due to an unknown +// job. +func IsErrUnknownJob(err error) bool { + return err != nil && strings.Contains(err.Error(), ErrUnknownJobPrefix) +} + +// IsErrUnknownEvaluation returns whether the error is due to an unknown +// evaluation. +func IsErrUnknownEvaluation(err error) bool { + return err != nil && strings.Contains(err.Error(), ErrUnknownEvaluationPrefix) +} + +// IsErrUnknownDeployment returns whether the error is due to an unknown +// deployment. +func IsErrUnknownDeployment(err error) bool { + return err != nil && strings.Contains(err.Error(), ErrUnknownDeploymentPrefix) +} diff --git a/nomad/structs/streaming_rpc.go b/nomad/structs/streaming_rpc.go index 9979b276f..949a31e23 100644 --- a/nomad/structs/streaming_rpc.go +++ b/nomad/structs/streaming_rpc.go @@ -53,7 +53,7 @@ func (s *StreamingRpcRegistery) GetHandler(method string) (StreamingRpcHandler, } // Bridge is used to just link two connections together and copy traffic -func Bridge(a, b io.ReadWriteCloser) error { +func Bridge(a, b io.ReadWriteCloser) { wg := sync.WaitGroup{} wg.Add(2) go func() { @@ -69,5 +69,4 @@ func Bridge(a, b io.ReadWriteCloser) error { b.Close() }() wg.Wait() - return nil }