open-nomad/api/fs.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

385 lines
9.3 KiB
Go
Raw Normal View History

2016-01-26 22:31:52 +00:00
package api
import (
"encoding/json"
"fmt"
2016-01-27 00:07:59 +00:00
"io"
"net"
2016-01-27 00:07:59 +00:00
"strconv"
2016-07-22 22:07:11 +00:00
"sync"
2016-01-27 22:20:10 +00:00
"time"
2016-01-26 22:31:52 +00:00
)
2016-07-07 18:51:40 +00:00
const (
// OriginStart and OriginEnd are the available parameters for the origin
// argument when streaming a file. They respectively offset from the start
// and end of a file.
OriginStart = "start"
OriginEnd = "end"
)
// AllocFileInfo holds information about a file inside the AllocDir
type AllocFileInfo struct {
Name string
IsDir bool
Size int64
FileMode string
ModTime time.Time
ContentType string
}
2016-07-07 18:51:40 +00:00
// StreamFrame is used to frame data of a file when streaming
type StreamFrame struct {
2016-07-10 22:56:13 +00:00
Offset int64 `json:",omitempty"`
Data []byte `json:",omitempty"`
File string `json:",omitempty"`
FileEvent string `json:",omitempty"`
2016-07-07 18:51:40 +00:00
}
2016-07-10 22:56:13 +00:00
// IsHeartbeat returns if the frame is a heartbeat frame
2016-07-07 18:51:40 +00:00
func (s *StreamFrame) IsHeartbeat() bool {
2016-07-10 22:56:13 +00:00
return len(s.Data) == 0 && s.FileEvent == "" && s.File == "" && s.Offset == 0
2016-07-07 18:51:40 +00:00
}
2016-01-26 22:44:33 +00:00
// AllocFS is used to introspect an allocation directory on a Nomad client
2016-01-26 22:31:52 +00:00
type AllocFS struct {
client *Client
}
2016-01-26 22:44:33 +00:00
// AllocFS returns an handle to the AllocFS endpoints
2016-01-26 22:31:52 +00:00
func (c *Client) AllocFS() *AllocFS {
return &AllocFS{client: c}
}
// List is used to list the files at a given path of an allocation directory
func (a *AllocFS) List(alloc *Allocation, path string, q *QueryOptions) ([]*AllocFileInfo, *QueryMeta, error) {
if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}
q.Params["path"] = path
var resp []*AllocFileInfo
qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/ls/%s", alloc.ID), &resp, q)
if err != nil {
2016-01-26 22:31:52 +00:00
return nil, nil, err
}
return resp, qm, nil
2016-01-26 22:31:52 +00:00
}
2016-01-26 23:03:26 +00:00
// Stat is used to stat a file at a given path of an allocation directory
func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocFileInfo, *QueryMeta, error) {
if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}
q.Params["path"] = path
var resp AllocFileInfo
qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/stat/%s", alloc.ID), &resp, q)
if err != nil {
2016-01-26 23:03:26 +00:00
return nil, nil, err
}
return &resp, qm, nil
2016-01-26 23:03:26 +00:00
}
2016-01-27 00:07:59 +00:00
// ReadAt is used to read bytes at a given offset until limit at the given path
// in an allocation directory. If limit is <= 0, there is no limit.
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) {
reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID)
return queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["limit"] = strconv.FormatInt(limit, 10)
})
2016-01-27 00:07:59 +00:00
}
2016-03-28 18:06:22 +00:00
// Cat is used to read contents of a file at the given path in an allocation
// directory
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) {
reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID)
return queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
})
}
2016-07-07 18:51:40 +00:00
2016-07-10 22:56:13 +00:00
// Stream streams the content of a file blocking on EOF.
// The parameters are:
// * path: path to file to stream.
// * offset: The offset to start streaming data at.
// * origin: Either "start" or "end" and defines from where the offset is applied.
2016-07-18 18:39:38 +00:00
// * cancel: A channel that when closed, streaming will end.
2016-07-10 22:56:13 +00:00
//
// The return value is a channel that will emit StreamFrames as they are read.
2016-07-07 18:51:40 +00:00
func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
2017-09-19 12:59:05 +00:00
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
2016-07-07 18:51:40 +00:00
2017-09-19 12:59:05 +00:00
errCh := make(chan error, 1)
reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID)
r, err := queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["origin"] = origin
})
2016-07-07 18:51:40 +00:00
if err != nil {
errCh <- err
return nil, errCh
2016-07-07 18:51:40 +00:00
}
// Create the output channel
frames := make(chan *StreamFrame, 10)
2016-07-18 18:39:38 +00:00
go func() {
// Close the body
defer r.Close()
2016-07-18 18:39:38 +00:00
// Create a decoder
dec := json.NewDecoder(r)
2016-07-18 18:39:38 +00:00
for {
// Check if we have been cancelled
select {
case <-cancel:
return
default:
}
// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
2017-09-19 12:59:05 +00:00
errCh <- err
2016-07-18 18:39:38 +00:00
close(frames)
return
}
// Discard heartbeat frames
if frame.IsHeartbeat() {
continue
}
frames <- &frame
}
}()
2017-09-19 12:59:05 +00:00
return frames, errCh
2016-07-18 18:39:38 +00:00
}
func queryClientNode(c *Client, alloc *Allocation, reqPath string, q *QueryOptions, customizeQ func(*QueryOptions)) (io.ReadCloser, error) {
nodeClient, _ := c.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}
if customizeQ != nil {
customizeQ(q)
}
var r io.ReadCloser
var err error
if nodeClient != nil {
r, err = nodeClient.rawQuery(reqPath, q)
if _, ok := err.(net.Error); err != nil && !ok {
// found a non networking error talking to client directly
return nil, err
}
}
// failed to query node, access through server directly
// or network error when talking to the client directly
if r == nil {
return c.rawQuery(reqPath, q)
}
return r, err
}
2016-07-18 18:39:38 +00:00
// Logs streams the content of a tasks logs blocking on EOF.
// The parameters are:
// * allocation: the allocation to stream from.
2016-07-20 17:18:05 +00:00
// * follow: Whether the logs should be followed.
2016-07-18 18:39:38 +00:00
// * task: the tasks name to stream logs for.
// * logType: Either "stdout" or "stderr"
// * origin: Either "start" or "end" and defines from where the offset is applied.
2016-07-20 17:18:05 +00:00
// * offset: The offset to start streaming data at.
2016-07-18 18:39:38 +00:00
// * cancel: A channel that when closed, streaming will end.
//
// The return value is a channel that will emit StreamFrames as they are read.
// The chan will be closed when follow=false and the end of the file is
// reached.
//
// Unexpected (non-EOF) errors will be sent on the error chan.
2016-07-20 17:18:05 +00:00
func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string,
2017-09-19 12:59:05 +00:00
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
2016-07-18 18:39:38 +00:00
2017-09-19 12:59:05 +00:00
errCh := make(chan error, 1)
reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
r, err := queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["follow"] = strconv.FormatBool(follow)
q.Params["task"] = task
q.Params["type"] = logType
q.Params["origin"] = origin
q.Params["offset"] = strconv.FormatInt(offset, 10)
})
if err != nil {
2017-09-19 12:59:05 +00:00
errCh <- err
return nil, errCh
2016-07-18 18:39:38 +00:00
}
2016-07-18 18:39:38 +00:00
// Create the output channel
frames := make(chan *StreamFrame, 10)
2016-07-07 18:51:40 +00:00
go func() {
// Close the body
defer r.Close()
2016-07-07 18:51:40 +00:00
// Create a decoder
dec := json.NewDecoder(r)
2016-07-07 18:51:40 +00:00
for {
// Check if we have been cancelled
select {
case <-cancel:
close(frames)
2016-07-07 18:51:40 +00:00
return
default:
}
// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
close(frames)
} else {
errCh <- err
}
2016-07-07 18:51:40 +00:00
return
}
// Discard heartbeat frames
if frame.IsHeartbeat() {
continue
}
frames <- &frame
}
}()
2017-09-19 12:59:05 +00:00
return frames, errCh
2016-07-07 18:51:40 +00:00
}
2016-07-12 23:29:18 +00:00
// FrameReader is used to convert a stream of frames into a read closer.
type FrameReader struct {
frames <-chan *StreamFrame
2017-09-19 12:59:05 +00:00
errCh <-chan error
2016-07-12 23:29:18 +00:00
cancelCh chan struct{}
2016-07-22 22:07:11 +00:00
closedLock sync.Mutex
closed bool
2016-07-12 23:29:18 +00:00
unblockTime time.Duration
2016-07-12 23:29:18 +00:00
frame *StreamFrame
frameOffset int
byteOffset int
}
// NewFrameReader takes a channel of frames and returns a FrameReader which
// implements io.ReadCloser
2017-09-19 12:59:05 +00:00
func NewFrameReader(frames <-chan *StreamFrame, errCh <-chan error, cancelCh chan struct{}) *FrameReader {
2016-07-12 23:29:18 +00:00
return &FrameReader{
frames: frames,
2017-09-19 12:59:05 +00:00
errCh: errCh,
2016-07-12 23:29:18 +00:00
cancelCh: cancelCh,
}
}
// SetUnblockTime sets the time to unblock and return zero bytes read. If the
// duration is unset or is zero or less, the read will block until data is read.
func (f *FrameReader) SetUnblockTime(d time.Duration) {
f.unblockTime = d
}
2016-07-12 23:29:18 +00:00
// Offset returns the offset into the stream.
func (f *FrameReader) Offset() int {
return f.byteOffset
}
// Read reads the data of the incoming frames into the bytes buffer. Returns EOF
// when there are no more frames.
func (f *FrameReader) Read(p []byte) (n int, err error) {
2016-07-22 22:07:11 +00:00
f.closedLock.Lock()
closed := f.closed
f.closedLock.Unlock()
if closed {
return 0, io.EOF
}
2016-07-12 23:29:18 +00:00
if f.frame == nil {
var unblock <-chan time.Time
if f.unblockTime.Nanoseconds() > 0 {
unblock = time.After(f.unblockTime)
2016-07-12 23:29:18 +00:00
}
select {
case frame, ok := <-f.frames:
if !ok {
return 0, io.EOF
}
f.frame = frame
// Store the total offset into the file
f.byteOffset = int(f.frame.Offset)
case <-unblock:
return 0, nil
2017-09-19 12:59:05 +00:00
case err := <-f.errCh:
return 0, err
2016-07-22 22:07:11 +00:00
case <-f.cancelCh:
return 0, io.EOF
}
2016-07-12 23:29:18 +00:00
}
// Copy the data out of the frame and update our offset
n = copy(p, f.frame.Data[f.frameOffset:])
f.frameOffset += n
// Clear the frame and its offset once we have read everything
if len(f.frame.Data) == f.frameOffset {
f.frame = nil
f.frameOffset = 0
}
return n, nil
}
// Close cancels the stream of frames
func (f *FrameReader) Close() error {
2016-07-22 22:07:11 +00:00
f.closedLock.Lock()
defer f.closedLock.Unlock()
if f.closed {
return nil
}
2016-07-12 23:29:18 +00:00
close(f.cancelCh)
f.closed = true
2016-07-12 23:29:18 +00:00
return nil
}