cli: recover from client ACL lookup failures
This fixes a bug in the CLI handling of node lookup failures when querying allocation and FS endpoints. Allocation and FS endpoint are handled by the client; one can query the relevant client directly, or query a server to have it forwarded transparently to relevant client. Querying the client directly is benefecial to avoid loading servers with IO. As an optimization, the CLI attempts to query the client directly, but then falls back to using server forwarding path if it encounters network or connection errors (e.g. clients are locked down or in a separate inaccessible network). Here, we fix a bug where if the CLI fails to find to lookup the client details because it lacks ACL capability or other unexpected reasons, the CLI will not go through fallback path.
This commit is contained in:
parent
5da134d074
commit
b77fd8654b
|
@ -210,12 +210,7 @@ func (a *Allocations) Exec(ctx context.Context,
|
|||
|
||||
func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task string, tty bool, command []string,
|
||||
errCh chan<- error, q *QueryOptions) (sendFn func(*ExecStreamingInput) error, output <-chan *ExecStreamingOutput) {
|
||||
|
||||
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return nil, nil
|
||||
}
|
||||
nodeClient, _ := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
|
||||
|
||||
if q == nil {
|
||||
q = &QueryOptions{}
|
||||
|
@ -236,15 +231,17 @@ func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task st
|
|||
|
||||
reqPath := fmt.Sprintf("/v1/client/allocation/%s/exec", alloc.ID)
|
||||
|
||||
conn, _, err := nodeClient.websocket(reqPath, q)
|
||||
if err != nil {
|
||||
// There was an error talking directly to the client. Non-network
|
||||
// errors are fatal, but network errors can attempt to route via RPC.
|
||||
if _, ok := err.(net.Error); !ok {
|
||||
var conn *websocket.Conn
|
||||
|
||||
if nodeClient != nil {
|
||||
conn, _, err = nodeClient.websocket(reqPath, q)
|
||||
if _, ok := err.(net.Error); err != nil && !ok {
|
||||
errCh <- err
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
if conn == nil {
|
||||
conn, _, err = a.client.websocket(reqPath, q)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
|
|
179
api/fs.go
179
api/fs.go
|
@ -92,72 +92,24 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
|
|||
// ReadAt is used to read bytes at a given offset until limit at the given path
|
||||
// in an allocation directory. If limit is <= 0, there is no limit.
|
||||
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) {
|
||||
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if q == nil {
|
||||
q = &QueryOptions{}
|
||||
}
|
||||
if q.Params == nil {
|
||||
q.Params = make(map[string]string)
|
||||
}
|
||||
|
||||
q.Params["path"] = path
|
||||
q.Params["offset"] = strconv.FormatInt(offset, 10)
|
||||
q.Params["limit"] = strconv.FormatInt(limit, 10)
|
||||
|
||||
reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID)
|
||||
r, err := nodeClient.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
// There was a networking error when talking directly to the client.
|
||||
if _, ok := err.(net.Error); !ok {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Try via the server
|
||||
r, err = a.client.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return r, nil
|
||||
return queryClientNode(a.client, alloc, reqPath, q,
|
||||
func(q *QueryOptions) {
|
||||
q.Params["path"] = path
|
||||
q.Params["offset"] = strconv.FormatInt(offset, 10)
|
||||
q.Params["limit"] = strconv.FormatInt(limit, 10)
|
||||
})
|
||||
}
|
||||
|
||||
// Cat is used to read contents of a file at the given path in an allocation
|
||||
// directory
|
||||
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) {
|
||||
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if q == nil {
|
||||
q = &QueryOptions{}
|
||||
}
|
||||
if q.Params == nil {
|
||||
q.Params = make(map[string]string)
|
||||
}
|
||||
|
||||
q.Params["path"] = path
|
||||
reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID)
|
||||
r, err := nodeClient.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
// There was a networking error when talking directly to the client.
|
||||
if _, ok := err.(net.Error); !ok {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Try via the server
|
||||
r, err = a.client.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return r, nil
|
||||
return queryClientNode(a.client, alloc, reqPath, q,
|
||||
func(q *QueryOptions) {
|
||||
q.Params["path"] = path
|
||||
})
|
||||
}
|
||||
|
||||
// Stream streams the content of a file blocking on EOF.
|
||||
|
@ -172,40 +124,19 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
|
|||
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
|
||||
|
||||
reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID)
|
||||
r, err := queryClientNode(a.client, alloc, reqPath, q,
|
||||
func(q *QueryOptions) {
|
||||
q.Params["path"] = path
|
||||
q.Params["offset"] = strconv.FormatInt(offset, 10)
|
||||
q.Params["origin"] = origin
|
||||
})
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return nil, errCh
|
||||
}
|
||||
|
||||
if q == nil {
|
||||
q = &QueryOptions{}
|
||||
}
|
||||
if q.Params == nil {
|
||||
q.Params = make(map[string]string)
|
||||
}
|
||||
|
||||
q.Params["path"] = path
|
||||
q.Params["offset"] = strconv.FormatInt(offset, 10)
|
||||
q.Params["origin"] = origin
|
||||
|
||||
reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID)
|
||||
r, err := nodeClient.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
// There was a networking error when talking directly to the client.
|
||||
if _, ok := err.(net.Error); !ok {
|
||||
errCh <- err
|
||||
return nil, errCh
|
||||
}
|
||||
|
||||
// Try via the server
|
||||
r, err = a.client.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return nil, errCh
|
||||
}
|
||||
}
|
||||
|
||||
// Create the output channel
|
||||
frames := make(chan *StreamFrame, 10)
|
||||
|
||||
|
@ -244,6 +175,40 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
|
|||
return frames, errCh
|
||||
}
|
||||
|
||||
func queryClientNode(c *Client, alloc *Allocation, reqPath string, q *QueryOptions, customizeQ func(*QueryOptions)) (io.ReadCloser, error) {
|
||||
nodeClient, _ := c.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
|
||||
|
||||
if q == nil {
|
||||
q = &QueryOptions{}
|
||||
}
|
||||
if q.Params == nil {
|
||||
q.Params = make(map[string]string)
|
||||
}
|
||||
if customizeQ != nil {
|
||||
customizeQ(q)
|
||||
}
|
||||
|
||||
var r io.ReadCloser
|
||||
var err error
|
||||
|
||||
if nodeClient != nil {
|
||||
r, err = nodeClient.rawQuery(reqPath, q)
|
||||
if _, ok := err.(net.Error); err != nil && !ok {
|
||||
// found a non networking error talking to client directly
|
||||
return nil, err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// failed to query node, access through server directly
|
||||
// or network error when talking to the client directly
|
||||
if r == nil {
|
||||
return c.rawQuery(reqPath, q)
|
||||
}
|
||||
|
||||
return r, err
|
||||
}
|
||||
|
||||
// Logs streams the content of a tasks logs blocking on EOF.
|
||||
// The parameters are:
|
||||
// * allocation: the allocation to stream from.
|
||||
|
@ -264,42 +229,20 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
|
|||
|
||||
errCh := make(chan error, 1)
|
||||
|
||||
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
|
||||
reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
|
||||
r, err := queryClientNode(a.client, alloc, reqPath, q,
|
||||
func(q *QueryOptions) {
|
||||
q.Params["follow"] = strconv.FormatBool(follow)
|
||||
q.Params["task"] = task
|
||||
q.Params["type"] = logType
|
||||
q.Params["origin"] = origin
|
||||
q.Params["offset"] = strconv.FormatInt(offset, 10)
|
||||
})
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return nil, errCh
|
||||
}
|
||||
|
||||
if q == nil {
|
||||
q = &QueryOptions{}
|
||||
}
|
||||
if q.Params == nil {
|
||||
q.Params = make(map[string]string)
|
||||
}
|
||||
|
||||
q.Params["follow"] = strconv.FormatBool(follow)
|
||||
q.Params["task"] = task
|
||||
q.Params["type"] = logType
|
||||
q.Params["origin"] = origin
|
||||
q.Params["offset"] = strconv.FormatInt(offset, 10)
|
||||
|
||||
reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
|
||||
r, err := nodeClient.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
// There was a networking error when talking directly to the client.
|
||||
if _, ok := err.(net.Error); !ok {
|
||||
errCh <- err
|
||||
return nil, errCh
|
||||
}
|
||||
|
||||
// Try via the server
|
||||
r, err = a.client.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return nil, errCh
|
||||
}
|
||||
}
|
||||
|
||||
// Create the output channel
|
||||
frames := make(chan *StreamFrame, 10)
|
||||
|
||||
|
|
Loading…
Reference in New Issue