open-nomad/command/agent/fs_endpoint.go

557 lines
13 KiB
Go
Raw Normal View History

package agent
import (
"bytes"
2016-07-06 00:08:58 +00:00
"encoding/base64"
"fmt"
2016-01-14 21:35:42 +00:00
"io"
"net/http"
2016-01-13 06:06:42 +00:00
"strconv"
"strings"
"sync"
2016-07-06 00:08:58 +00:00
"time"
"gopkg.in/tomb.v1"
2016-07-06 03:48:25 +00:00
"github.com/docker/docker/pkg/ioutils"
2016-07-07 15:15:22 +00:00
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hpcloud/tail/watch"
2016-07-06 00:08:58 +00:00
"github.com/ugorji/go/codec"
)
2016-01-13 19:49:39 +00:00
var (
allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id")
fileNameNotPresentErr = fmt.Errorf("must provide a file name")
clientNotRunning = fmt.Errorf("node is not running a Nomad Client")
2016-07-06 00:08:58 +00:00
invalidOrigin = fmt.Errorf("origin must be start or end")
)
const (
// frameSize is the maximum number of bytes to send in a single frame
2016-07-06 03:48:25 +00:00
frameSize = 64 * 1024
2016-07-06 00:08:58 +00:00
// streamHeartbeatRate is the rate at which a heartbeat will occur to detect
// a closed connection without sending any additional data
streamHeartbeatRate = 10 * time.Second
// streamBatchWindow is the window in which file content is batched before
// being flushed if the frame size has not been hit.
streamBatchWindow = 200 * time.Millisecond
2016-07-06 00:08:58 +00:00
deleteEvent = "file deleted"
truncateEvent = "file truncated"
2016-01-13 19:49:39 +00:00
)
func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if s.agent.client == nil {
return nil, clientNotRunning
}
path := strings.TrimPrefix(req.URL.Path, "/v1/client/fs/")
switch {
case strings.HasPrefix(path, "ls/"):
return s.DirectoryListRequest(resp, req)
case strings.HasPrefix(path, "stat/"):
return s.FileStatRequest(resp, req)
case strings.HasPrefix(path, "readat/"):
return s.FileReadAtRequest(resp, req)
2016-03-28 18:06:22 +00:00
case strings.HasPrefix(path, "cat/"):
return s.FileCatRequest(resp, req)
2016-07-06 00:08:58 +00:00
case strings.HasPrefix(path, "stream/"):
return s.Stream(resp, req)
default:
return nil, CodedError(404, ErrInvalidMethod)
}
}
func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
2016-01-13 06:25:12 +00:00
var allocID, path string
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/ls/"); allocID == "" {
2016-01-13 19:49:39 +00:00
return nil, allocIDNotPresentErr
}
2016-01-13 06:25:12 +00:00
if path = req.URL.Query().Get("path"); path == "" {
path = "/"
}
2016-01-14 21:35:42 +00:00
fs, err := s.agent.client.GetAllocFS(allocID)
if err != nil {
return nil, err
}
return fs.List(path)
}
func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
2016-01-13 06:25:12 +00:00
var allocID, path string
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stat/"); allocID == "" {
2016-01-13 19:49:39 +00:00
return nil, allocIDNotPresentErr
2016-01-12 23:25:51 +00:00
}
if path = req.URL.Query().Get("path"); path == "" {
2016-01-13 19:49:39 +00:00
return nil, fileNameNotPresentErr
2016-01-12 23:25:51 +00:00
}
2016-01-14 21:35:42 +00:00
fs, err := s.agent.client.GetAllocFS(allocID)
if err != nil {
return nil, err
}
return fs.Stat(path)
}
func (s *HTTPServer) FileReadAtRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
2016-01-13 06:25:12 +00:00
var allocID, path string
var offset, limit int64
var err error
2016-01-13 06:06:42 +00:00
2016-01-13 06:25:12 +00:00
q := req.URL.Query()
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/readat/"); allocID == "" {
2016-01-13 19:49:39 +00:00
return nil, allocIDNotPresentErr
2016-01-13 06:06:42 +00:00
}
2016-01-13 06:25:12 +00:00
if path = q.Get("path"); path == "" {
2016-01-13 19:49:39 +00:00
return nil, fileNameNotPresentErr
2016-01-13 06:06:42 +00:00
}
2016-01-13 06:25:12 +00:00
if offset, err = strconv.ParseInt(q.Get("offset"), 10, 64); err != nil {
return nil, fmt.Errorf("error parsing offset: %v", err)
2016-01-13 06:06:42 +00:00
}
// Parse the limit
if limitStr := q.Get("limit"); limitStr != "" {
if limit, err = strconv.ParseInt(limitStr, 10, 64); err != nil {
return nil, fmt.Errorf("error parsing limit: %v", err)
}
2016-01-13 06:06:42 +00:00
}
2016-01-14 21:35:42 +00:00
fs, err := s.agent.client.GetAllocFS(allocID)
if err != nil {
return nil, err
}
var rc io.ReadCloser
if limit > 0 {
rc, err = fs.LimitReadAt(path, offset, limit)
} else {
rc, err = fs.ReadAt(path, offset)
}
2016-01-14 21:35:42 +00:00
if err != nil {
2016-01-13 06:06:42 +00:00
return nil, err
}
defer rc.Close()
io.Copy(resp, rc)
return nil, nil
}
2016-03-28 18:06:22 +00:00
func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, path string
var err error
q := req.URL.Query()
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/cat/"); allocID == "" {
return nil, allocIDNotPresentErr
}
if path = q.Get("path"); path == "" {
return nil, fileNameNotPresentErr
}
fs, err := s.agent.client.GetAllocFS(allocID)
if err != nil {
return nil, err
}
fileInfo, err := fs.Stat(path)
if err != nil {
return nil, err
}
if fileInfo.IsDir {
2016-04-19 01:53:05 +00:00
return nil, fmt.Errorf("file %q is a directory", path)
2016-03-28 18:06:22 +00:00
}
2016-07-06 00:08:58 +00:00
r, err := fs.ReadAt(path, int64(0))
2016-06-16 21:32:07 +00:00
if err != nil {
return nil, err
}
2016-03-28 18:06:22 +00:00
io.Copy(resp, r)
return nil, nil
}
2016-07-06 00:08:58 +00:00
2016-07-07 15:15:22 +00:00
// StreamFrame is used to frame data of a file when streaming
2016-07-06 00:08:58 +00:00
type StreamFrame struct {
2016-07-07 15:15:22 +00:00
// Offset is the offset the data was read from
2016-07-06 00:08:58 +00:00
Offset int64
2016-07-07 15:15:22 +00:00
// Data is the read data with Base64 byte encoding
Data string
// File is the file that the data was read from
File string
// FileEvent is the last file event that occured that could cause the
// streams position to change or end
2016-07-06 00:08:58 +00:00
FileEvent string
}
// StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct {
out io.WriteCloser
enc *codec.Encoder
heartbeat *time.Ticker
flusher *time.Ticker
shutdown chan struct{}
exitCh chan struct{}
outbound chan *StreamFrame
// The mutex protects everything below
l sync.Mutex
// The current working frame
f *StreamFrame
data *bytes.Buffer
// Captures whether the framer is running and any error that occured to
// cause it to stop.
running bool
err error
}
// NewStreamFramer creates a new stream framer that will output StreamFrames to
// the passed output.
func NewStreamFramer(out io.WriteCloser) *StreamFramer {
// Create a JSON encoder
enc := codec.NewEncoder(out, jsonHandle)
// Create the heartbeat and flush ticker
heartbeat := time.NewTicker(streamHeartbeatRate)
flusher := time.NewTicker(streamBatchWindow)
return &StreamFramer{
out: out,
enc: enc,
heartbeat: heartbeat,
flusher: flusher,
outbound: make(chan *StreamFrame),
data: bytes.NewBuffer(make([]byte, 2*frameSize)),
shutdown: make(chan struct{}),
exitCh: make(chan struct{}),
}
}
// Destroy is used to cleanup the StreamFramer and flush any pending frames
func (s *StreamFramer) Destroy() {
s.l.Lock()
defer s.l.Unlock()
// Flush any existing frames
if s.f != nil {
s.f.Data = s.readData()
s.enc.Encode(s.f)
}
s.f = nil
s.running = false
close(s.shutdown)
s.out.Close()
s.heartbeat.Stop()
s.flusher.Stop()
}
// Run starts a long lived goroutine that handles sending data as well as
// heartbeating
func (s *StreamFramer) Run() {
s.l.Lock()
s.running = true
s.l.Unlock()
go s.run()
}
// ExitCh returns a channel that will be closed when the run loop terminates.
func (s *StreamFramer) ExitCh() <-chan struct{} {
return s.exitCh
}
// run is the internal run method. It exits if Destroy is called or an error
// occurs, in which case the exit channel is closed.
func (s *StreamFramer) run() {
// Store any error and mark it as not running
var err error
defer func() {
s.l.Lock()
s.err = err
close(s.exitCh)
s.l.Unlock()
}()
// Start a heartbeat/flusher go-routine. This is done seprately to avoid blocking
// the outbound channel.
go func() {
for {
select {
case <-s.shutdown:
return
case <-s.flusher.C:
// Skip if there is nothing to flush
s.l.Lock()
if s.f == nil {
s.l.Unlock()
continue
}
// Read the data for the frame, and send it
s.f.Data = s.readData()
s.outbound <- s.f
s.f = nil
s.l.Unlock()
case <-s.heartbeat.C:
// Send a heartbeat frame
s.outbound <- &StreamFrame{}
}
}
}()
for {
select {
case <-s.shutdown:
return
case o := <-s.outbound:
// Send the frame and then clear the current working frame
if err = s.enc.Encode(o); err != nil {
return
}
}
}
}
// readData reads the buffered data and returns a base64 encoded version of it.
// Must be called with the lock held.
func (s *StreamFramer) readData() string {
// Compute the amount to read from the buffer
size := s.data.Len()
if size > frameSize {
size = frameSize
}
return base64.StdEncoding.EncodeToString(s.data.Next(size))
}
// Send creates and sends a StreamFrame based on the passed parameters. An error
// is returned if the run routine hasn't run or encountered an error. Send is
// asyncronous and does not block for the data to be transferred.
func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) error {
s.l.Lock()
defer s.l.Unlock()
// If we are not running, return the error that caused us to not run or
// indicated that it was never started.
if !s.running {
if s.err != nil {
return s.err
}
return fmt.Errorf("StreamFramer not running")
}
// Check if not mergeable
if s.f != nil && (s.f.File != file || s.f.FileEvent != fileEvent) {
// Flush the old frame
s.outbound <- &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
Data: s.readData(),
}
s.f = nil
}
// Store the new data as the current frame.
if s.f == nil {
s.f = &StreamFrame{
Offset: offset,
File: file,
FileEvent: fileEvent,
}
}
// Write the data to the buffer
s.data.Write(data)
// Flush till we are under the max frame size
for s.data.Len() >= frameSize {
// Create a new frame to send it
s.outbound <- &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
Data: s.readData(),
}
}
if s.data.Len() == 0 {
s.f = nil
}
return nil
}
2016-07-07 15:15:22 +00:00
// Stream streams the content of a file blocking on EOF.
// The parameters are:
// * path: path to file to stream.
// * offset: The offset to start streaming data at, defaults to zero.
// * origin: Either "start" or "end" and defines from where the offset is
// applied. Defaults to "start".
2016-07-06 00:08:58 +00:00
func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, path string
var err error
q := req.URL.Query()
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stream/"); allocID == "" {
return nil, allocIDNotPresentErr
}
if path = q.Get("path"); path == "" {
return nil, fileNameNotPresentErr
}
var offset int64
offsetString := q.Get("offset")
if offsetString != "" {
var err error
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
return nil, fmt.Errorf("error parsing offset: %v", err)
}
}
origin := q.Get("origin")
switch origin {
case "start", "end":
case "":
origin = "start"
default:
return nil, invalidOrigin
}
fs, err := s.agent.client.GetAllocFS(allocID)
if err != nil {
return nil, err
}
fileInfo, err := fs.Stat(path)
if err != nil {
return nil, err
}
if fileInfo.IsDir {
return nil, fmt.Errorf("file %q is a directory", path)
}
// If offsetting from the end subtract from the size
if origin == "end" {
offset = fileInfo.Size - offset
}
2016-07-06 03:48:25 +00:00
// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)
2016-07-07 15:15:22 +00:00
return nil, s.stream(offset, path, fs, output)
}
func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, output io.WriteCloser) error {
2016-07-06 00:08:58 +00:00
// Get the reader
f, err := fs.ReadAt(path, offset)
if err != nil {
2016-07-07 15:15:22 +00:00
return err
2016-07-06 00:08:58 +00:00
}
defer f.Close()
// Create a tomb to cancel watch events
t := tomb.Tomb{}
defer func() {
t.Kill(nil)
t.Done()
}()
2016-07-06 00:08:58 +00:00
// Create the framer
framer := NewStreamFramer(output)
framer.Run()
defer framer.Destroy()
2016-07-06 00:08:58 +00:00
// Create a variable to allow setting the last event
var lastEvent string
// Only create the file change watcher once. But we need to do it after we
// read and reach EOF.
var changes *watch.FileChanges
2016-07-06 00:08:58 +00:00
// Start streaming the data
data := make([]byte, frameSize)
2016-07-06 00:08:58 +00:00
OUTER:
for {
// Read up to the max frame size
n, readErr := f.Read(data)
2016-07-06 00:08:58 +00:00
// Update the offset
offset += int64(n)
// Return non-EOF errors
if readErr != nil && readErr != io.EOF {
return readErr
2016-07-06 00:08:58 +00:00
}
// Send the frame
if n != 0 {
if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {
return err
}
}
// Clear the last event
if lastEvent != "" {
lastEvent = ""
2016-07-06 00:08:58 +00:00
}
// Just keep reading
if readErr == nil {
2016-07-06 00:08:58 +00:00
continue
}
// If EOF is hit, wait for a change to the file
if changes == nil {
changes, err = fs.ChangeEvents(path, offset, &t)
if err != nil {
return err
}
2016-07-06 00:08:58 +00:00
}
for {
select {
case <-changes.Modified:
continue OUTER
case <-changes.Deleted:
return framer.Send(path, deleteEvent, nil, offset)
2016-07-06 00:08:58 +00:00
case <-changes.Truncated:
// Close the current reader
if err := f.Close(); err != nil {
2016-07-07 15:15:22 +00:00
return err
2016-07-06 00:08:58 +00:00
}
// Get a new reader at offset zero
offset = 0
var err error
f, err = fs.ReadAt(path, offset)
if err != nil {
2016-07-07 15:15:22 +00:00
return err
2016-07-06 00:08:58 +00:00
}
defer f.Close()
// Store the last event
lastEvent = truncateEvent
continue OUTER
case <-framer.ExitCh():
2016-07-07 15:15:22 +00:00
return nil
2016-07-06 00:08:58 +00:00
}
}
}
2016-07-07 15:15:22 +00:00
return nil
2016-07-06 00:08:58 +00:00
}