api.fs uses the api.Client rather than making raw requests

This commit is contained in:
Alex Dadgar 2016-07-28 14:24:01 -07:00
parent 419acaa440
commit ef47612606
4 changed files with 125 additions and 163 deletions

View File

@ -35,6 +35,9 @@ type QueryOptions struct {
// If set, used as prefix for resource list searches
Prefix string
// Set HTTP parameters on the query.
Params map[string]string
}
// WriteOptions are used to parameterize a write
@ -162,6 +165,9 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.Prefix != "" {
r.params.Set("prefix", q.Prefix)
}
for k, v := range q.Params {
r.params.Set(k, v)
}
}
// durToMsec converts a duration to a millisecond specified string
@ -301,6 +307,19 @@ func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) {
return diff, resp, err
}
// rawQuery makes a GET request to the specified endpoint but returns just the
// response body.
func (c *Client) rawQuery(endpoint string, q *QueryOptions) (io.ReadCloser, error) {
r := c.newRequest("GET", endpoint)
r.setQueryOptions(q)
_, resp, err := requireOK(c.doRequest(r))
if err != nil {
return nil, err
}
return resp.Body, nil
}
// Query is used to do a GET request against an endpoint
// and deserialize the response into an interface using
// standard Nomad conventions.

258
api/fs.go
View File

@ -4,9 +4,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"sync"
"time"
@ -52,42 +49,57 @@ func (c *Client) AllocFS() *AllocFS {
return &AllocFS{client: c}
}
// getNodeClient returns a Client that will dial the node. If the QueryOptions
// is set, the function will ensure that it is initalized and that the Params
// field is valid.
func (a *AllocFS) getNodeClient(nodeHTTPAddr, allocID string, q **QueryOptions) (*Client, error) {
if nodeHTTPAddr == "" {
return nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", allocID)
}
// Get an API client for the node
nodeClientConfig := &Config{
Address: fmt.Sprintf("http://%s", nodeHTTPAddr),
Region: a.client.config.Region,
}
nodeClient, err := NewClient(nodeClientConfig)
if err != nil {
return nil, err
}
// Set the query params
if q == nil {
return nodeClient, nil
}
if *q == nil {
*q = &QueryOptions{}
}
if actQ := *q; actQ.Params == nil {
actQ.Params = make(map[string]string)
}
return nodeClient, nil
}
// List is used to list the files at a given path of an allocation directory
func (a *AllocFS) List(alloc *Allocation, path string, q *QueryOptions) ([]*AllocFileInfo, *QueryMeta, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{})
if err != nil {
return nil, nil, err
}
if node.HTTPAddr == "" {
return nil, nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID)
}
u := &url.URL{
Scheme: "http",
Host: node.HTTPAddr,
Path: fmt.Sprintf("/v1/client/fs/ls/%s", alloc.ID),
}
v := url.Values{}
v.Set("path", path)
u.RawQuery = v.Encode()
req := &http.Request{
Method: "GET",
URL: u,
}
c := http.Client{}
resp, err := c.Do(req)
nodeClient, err := a.getNodeClient(node.HTTPAddr, alloc.ID, &q)
if err != nil {
return nil, nil, err
}
if resp.StatusCode != 200 {
return nil, nil, a.getErrorMsg(resp)
}
decoder := json.NewDecoder(resp.Body)
var files []*AllocFileInfo
if err := decoder.Decode(&files); err != nil {
q.Params["path"] = path
var resp []*AllocFileInfo
qm, err := nodeClient.query(fmt.Sprintf("/v1/client/fs/ls/%s", alloc.ID), &resp, q)
if err != nil {
return nil, nil, err
}
return files, nil, nil
return resp, qm, nil
}
// Stat is used to stat a file at a given path of an allocation directory
@ -96,108 +108,62 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
if err != nil {
return nil, nil, err
}
if node.HTTPAddr == "" {
return nil, nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID)
}
u := &url.URL{
Scheme: "http",
Host: node.HTTPAddr,
Path: fmt.Sprintf("/v1/client/fs/stat/%s", alloc.ID),
}
v := url.Values{}
v.Set("path", path)
u.RawQuery = v.Encode()
req := &http.Request{
Method: "GET",
URL: u,
}
c := http.Client{}
resp, err := c.Do(req)
nodeClient, err := a.getNodeClient(node.HTTPAddr, alloc.ID, &q)
if err != nil {
return nil, nil, err
}
if resp.StatusCode != 200 {
return nil, nil, a.getErrorMsg(resp)
}
decoder := json.NewDecoder(resp.Body)
var file *AllocFileInfo
if err := decoder.Decode(&file); err != nil {
q.Params["path"] = path
var resp AllocFileInfo
qm, err := nodeClient.query(fmt.Sprintf("/v1/client/fs/stat/%s", alloc.ID), &resp, q)
if err != nil {
return nil, nil, err
}
return file, nil, nil
return &resp, qm, nil
}
// ReadAt is used to read bytes at a given offset until limit at the given path
// in an allocation directory. If limit is <= 0, there is no limit.
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, *QueryMeta, error) {
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{})
if err != nil {
return nil, nil, err
return nil, err
}
if node.HTTPAddr == "" {
return nil, nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID)
}
u := &url.URL{
Scheme: "http",
Host: node.HTTPAddr,
Path: fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID),
}
v := url.Values{}
v.Set("path", path)
v.Set("offset", strconv.FormatInt(offset, 10))
v.Set("limit", strconv.FormatInt(limit, 10))
u.RawQuery = v.Encode()
req := &http.Request{
Method: "GET",
URL: u,
}
c := http.Client{}
resp, err := c.Do(req)
nodeClient, err := a.getNodeClient(node.HTTPAddr, alloc.ID, &q)
if err != nil {
return nil, nil, err
return nil, err
}
return resp.Body, nil, nil
q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["limit"] = strconv.FormatInt(limit, 10)
r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID), q)
if err != nil {
return nil, err
}
return r, nil
}
// Cat is used to read contents of a file at the given path in an allocation
// directory
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, *QueryMeta, error) {
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{})
if err != nil {
return nil, nil, err
return nil, err
}
if node.HTTPAddr == "" {
return nil, nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID)
}
u := &url.URL{
Scheme: "http",
Host: node.HTTPAddr,
Path: fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID),
}
v := url.Values{}
v.Set("path", path)
u.RawQuery = v.Encode()
req := &http.Request{
Method: "GET",
URL: u,
}
c := http.Client{}
resp, err := c.Do(req)
nodeClient, err := a.getNodeClient(node.HTTPAddr, alloc.ID, &q)
if err != nil {
return nil, nil, err
return nil, err
}
return resp.Body, nil, nil
}
q.Params["path"] = path
func (a *AllocFS) getErrorMsg(resp *http.Response) error {
if errMsg, err := ioutil.ReadAll(resp.Body); err == nil {
return fmt.Errorf(string(errMsg))
} else {
return err
r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID), q)
if err != nil {
return nil, err
}
return r, nil
}
// Stream streams the content of a file blocking on EOF.
@ -209,35 +175,24 @@ func (a *AllocFS) getErrorMsg(resp *http.Response) error {
//
// The return value is a channel that will emit StreamFrames as they are read.
func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) {
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, q)
if err != nil {
return nil, nil, err
return nil, err
}
if node.HTTPAddr == "" {
return nil, nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID)
}
u := &url.URL{
Scheme: "http",
Host: node.HTTPAddr,
Path: fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID),
}
v := url.Values{}
v.Set("path", path)
v.Set("origin", origin)
v.Set("offset", strconv.FormatInt(offset, 10))
u.RawQuery = v.Encode()
req := &http.Request{
Method: "GET",
URL: u,
Cancel: cancel,
}
c := http.Client{}
resp, err := c.Do(req)
nodeClient, err := a.getNodeClient(node.HTTPAddr, alloc.ID, &q)
if err != nil {
return nil, nil, err
return nil, err
}
q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["origin"] = origin
r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID), q)
if err != nil {
return nil, err
}
// Create the output channel
@ -245,10 +200,10 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
go func() {
// Close the body
defer resp.Body.Close()
defer r.Close()
// Create a decoder
dec := json.NewDecoder(resp.Body)
dec := json.NewDecoder(r)
for {
// Check if we have been cancelled
@ -274,7 +229,7 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
}
}()
return frames, nil, nil
return frames, nil
}
// Logs streams the content of a tasks logs blocking on EOF.
@ -289,37 +244,26 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
//
// The return value is a channel that will emit StreamFrames as they are read.
func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string,
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) {
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, q)
if err != nil {
return nil, nil, err
return nil, err
}
if node.HTTPAddr == "" {
return nil, nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID)
}
u := &url.URL{
Scheme: "http",
Host: node.HTTPAddr,
Path: fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID),
}
v := url.Values{}
v.Set("follow", strconv.FormatBool(follow))
v.Set("task", task)
v.Set("type", logType)
v.Set("origin", origin)
v.Set("offset", strconv.FormatInt(offset, 10))
u.RawQuery = v.Encode()
req := &http.Request{
Method: "GET",
URL: u,
Cancel: cancel,
}
c := http.Client{}
resp, err := c.Do(req)
nodeClient, err := a.getNodeClient(node.HTTPAddr, alloc.ID, &q)
if err != nil {
return nil, nil, err
return nil, err
}
q.Params["follow"] = strconv.FormatBool(follow)
q.Params["task"] = task
q.Params["type"] = logType
q.Params["origin"] = origin
q.Params["offset"] = strconv.FormatInt(offset, 10)
r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID), q)
if err != nil {
return nil, err
}
// Create the output channel
@ -327,10 +271,10 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
go func() {
// Close the body
defer resp.Body.Close()
defer r.Close()
// Create a decoder
dec := json.NewDecoder(resp.Body)
dec := json.NewDecoder(r)
for {
// Check if we have been cancelled
@ -356,7 +300,7 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
}
}()
return frames, nil, nil
return frames, nil
}
// FrameReader is used to convert a stream of frames into a read closer.

View File

@ -251,7 +251,7 @@ func (f *FSCommand) Run(args []string) int {
if follow {
r, readErr = f.followFile(client, alloc, path, api.OriginStart, 0, -1)
} else {
r, _, readErr = client.AllocFS().Cat(alloc, path, nil)
r, readErr = client.AllocFS().Cat(alloc, path, nil)
}
if readErr != nil {
@ -282,7 +282,7 @@ func (f *FSCommand) Run(args []string) int {
// This offset needs to be relative from the front versus the follow
// is relative to the end
offset = file.Size - offset
r, _, readErr = client.AllocFS().ReadAt(alloc, path, offset, -1, nil)
r, readErr = client.AllocFS().ReadAt(alloc, path, offset, -1, nil)
// If numLines is set, wrap the reader
if numLines != -1 {
@ -311,7 +311,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,
path, origin string, offset, numLines int64) (io.ReadCloser, error) {
cancel := make(chan struct{})
frames, _, err := client.AllocFS().Stream(alloc, path, origin, offset, cancel, nil)
frames, err := client.AllocFS().Stream(alloc, path, origin, offset, cancel, nil)
if err != nil {
return nil, err
}

View File

@ -230,12 +230,12 @@ func (l *LogsCommand) Run(args []string) int {
}
}
defer r.Close()
if readErr != nil {
l.Ui.Error(readErr.Error())
return 1
}
defer r.Close()
io.Copy(os.Stdout, r)
return 0
}
@ -246,9 +246,8 @@ func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation,
follow bool, task, logType, origin string, offset int64) (io.ReadCloser, error) {
cancel := make(chan struct{})
frames, _, err := client.AllocFS().Logs(alloc, follow, task, logType, origin, offset, cancel, nil)
frames, err := client.AllocFS().Logs(alloc, follow, task, logType, origin, offset, cancel, nil)
if err != nil {
panic(err.Error())
return nil, err
}
signalCh := make(chan os.Signal, 1)