2016-01-12 23:03:53 +00:00
|
|
|
package agent
|
|
|
|
|
|
|
|
import (
|
2016-07-06 00:08:58 +00:00
|
|
|
"encoding/base64"
|
2016-01-12 23:03:53 +00:00
|
|
|
"fmt"
|
2016-01-14 21:35:42 +00:00
|
|
|
"io"
|
2016-01-12 23:03:53 +00:00
|
|
|
"net/http"
|
2016-01-13 06:06:42 +00:00
|
|
|
"strconv"
|
2016-01-12 23:03:53 +00:00
|
|
|
"strings"
|
2016-07-06 00:08:58 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"gopkg.in/tomb.v1"
|
|
|
|
|
2016-07-06 03:48:25 +00:00
|
|
|
"github.com/docker/docker/pkg/ioutils"
|
2016-07-06 00:08:58 +00:00
|
|
|
"github.com/ugorji/go/codec"
|
2016-01-12 23:03:53 +00:00
|
|
|
)
|
|
|
|
|
2016-01-13 19:49:39 +00:00
|
|
|
var (
|
|
|
|
allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id")
|
|
|
|
fileNameNotPresentErr = fmt.Errorf("must provide a file name")
|
2016-03-07 19:26:54 +00:00
|
|
|
clientNotRunning = fmt.Errorf("node is not running a Nomad Client")
|
2016-07-06 00:08:58 +00:00
|
|
|
invalidOrigin = fmt.Errorf("origin must be start or end")
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
// frameSize is the maximum number of bytes to send in a single frame
|
2016-07-06 03:48:25 +00:00
|
|
|
frameSize = 64 * 1024
|
2016-07-06 00:08:58 +00:00
|
|
|
|
|
|
|
// streamHeartbeatRate is the rate at which a heartbeat will occur to detect
|
|
|
|
// a closed connection without sending any additional data
|
|
|
|
streamHeartbeatRate = 10 * time.Second
|
|
|
|
|
|
|
|
deleteEvent = "file deleted"
|
|
|
|
truncateEvent = "file truncated"
|
2016-01-13 19:49:39 +00:00
|
|
|
)
|
|
|
|
|
2016-03-07 19:26:54 +00:00
|
|
|
func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
|
|
if s.agent.client == nil {
|
|
|
|
return nil, clientNotRunning
|
|
|
|
}
|
|
|
|
|
|
|
|
path := strings.TrimPrefix(req.URL.Path, "/v1/client/fs/")
|
|
|
|
switch {
|
|
|
|
case strings.HasPrefix(path, "ls/"):
|
|
|
|
return s.DirectoryListRequest(resp, req)
|
|
|
|
case strings.HasPrefix(path, "stat/"):
|
|
|
|
return s.FileStatRequest(resp, req)
|
|
|
|
case strings.HasPrefix(path, "readat/"):
|
|
|
|
return s.FileReadAtRequest(resp, req)
|
2016-03-28 18:06:22 +00:00
|
|
|
case strings.HasPrefix(path, "cat/"):
|
|
|
|
return s.FileCatRequest(resp, req)
|
2016-07-06 00:08:58 +00:00
|
|
|
case strings.HasPrefix(path, "stream/"):
|
|
|
|
return s.Stream(resp, req)
|
2016-03-07 19:26:54 +00:00
|
|
|
default:
|
|
|
|
return nil, CodedError(404, ErrInvalidMethod)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-01-12 23:03:53 +00:00
|
|
|
func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
2016-01-13 06:25:12 +00:00
|
|
|
var allocID, path string
|
|
|
|
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/ls/"); allocID == "" {
|
2016-01-13 19:49:39 +00:00
|
|
|
return nil, allocIDNotPresentErr
|
2016-01-12 23:03:53 +00:00
|
|
|
}
|
2016-01-13 06:25:12 +00:00
|
|
|
if path = req.URL.Query().Get("path"); path == "" {
|
|
|
|
path = "/"
|
2016-01-12 23:03:53 +00:00
|
|
|
}
|
2016-01-14 21:35:42 +00:00
|
|
|
fs, err := s.agent.client.GetAllocFS(allocID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return fs.List(path)
|
2016-01-12 23:03:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
2016-01-13 06:25:12 +00:00
|
|
|
var allocID, path string
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stat/"); allocID == "" {
|
2016-01-13 19:49:39 +00:00
|
|
|
return nil, allocIDNotPresentErr
|
2016-01-12 23:25:51 +00:00
|
|
|
}
|
2016-01-13 21:21:03 +00:00
|
|
|
if path = req.URL.Query().Get("path"); path == "" {
|
2016-01-13 19:49:39 +00:00
|
|
|
return nil, fileNameNotPresentErr
|
2016-01-12 23:25:51 +00:00
|
|
|
}
|
2016-01-14 21:35:42 +00:00
|
|
|
fs, err := s.agent.client.GetAllocFS(allocID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return fs.Stat(path)
|
2016-01-12 23:03:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *HTTPServer) FileReadAtRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
2016-01-13 06:25:12 +00:00
|
|
|
var allocID, path string
|
|
|
|
var offset, limit int64
|
|
|
|
var err error
|
2016-01-13 06:06:42 +00:00
|
|
|
|
2016-01-13 06:25:12 +00:00
|
|
|
q := req.URL.Query()
|
|
|
|
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/readat/"); allocID == "" {
|
2016-01-13 19:49:39 +00:00
|
|
|
return nil, allocIDNotPresentErr
|
2016-01-13 06:06:42 +00:00
|
|
|
}
|
2016-01-13 06:25:12 +00:00
|
|
|
if path = q.Get("path"); path == "" {
|
2016-01-13 19:49:39 +00:00
|
|
|
return nil, fileNameNotPresentErr
|
2016-01-13 06:06:42 +00:00
|
|
|
}
|
|
|
|
|
2016-01-13 06:25:12 +00:00
|
|
|
if offset, err = strconv.ParseInt(q.Get("offset"), 10, 64); err != nil {
|
|
|
|
return nil, fmt.Errorf("error parsing offset: %v", err)
|
2016-01-13 06:06:42 +00:00
|
|
|
}
|
2016-01-13 06:25:12 +00:00
|
|
|
if limit, err = strconv.ParseInt(q.Get("limit"), 10, 64); err != nil {
|
|
|
|
return nil, fmt.Errorf("error parsing limit: %v", err)
|
2016-01-13 06:06:42 +00:00
|
|
|
}
|
2016-01-14 21:35:42 +00:00
|
|
|
fs, err := s.agent.client.GetAllocFS(allocID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2016-07-06 00:08:58 +00:00
|
|
|
r, err := fs.LimitReadAt(path, offset, limit)
|
2016-01-14 21:35:42 +00:00
|
|
|
if err != nil {
|
2016-01-13 06:06:42 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
2016-01-14 21:35:42 +00:00
|
|
|
io.Copy(resp, r)
|
2016-01-12 23:03:53 +00:00
|
|
|
return nil, nil
|
|
|
|
}
|
2016-03-28 18:06:22 +00:00
|
|
|
|
|
|
|
func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
|
|
var allocID, path string
|
|
|
|
var err error
|
|
|
|
|
|
|
|
q := req.URL.Query()
|
|
|
|
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/cat/"); allocID == "" {
|
|
|
|
return nil, allocIDNotPresentErr
|
|
|
|
}
|
|
|
|
if path = q.Get("path"); path == "" {
|
|
|
|
return nil, fileNameNotPresentErr
|
|
|
|
}
|
|
|
|
fs, err := s.agent.client.GetAllocFS(allocID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
fileInfo, err := fs.Stat(path)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if fileInfo.IsDir {
|
2016-04-19 01:53:05 +00:00
|
|
|
return nil, fmt.Errorf("file %q is a directory", path)
|
2016-03-28 18:06:22 +00:00
|
|
|
}
|
|
|
|
|
2016-07-06 00:08:58 +00:00
|
|
|
r, err := fs.ReadAt(path, int64(0))
|
2016-06-16 21:32:07 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2016-03-28 18:06:22 +00:00
|
|
|
io.Copy(resp, r)
|
|
|
|
return nil, nil
|
|
|
|
}
|
2016-07-06 00:08:58 +00:00
|
|
|
|
|
|
|
type StreamFrame struct {
|
|
|
|
Offset int64
|
|
|
|
// Base64 byte encoding
|
|
|
|
Data string
|
|
|
|
File string
|
|
|
|
FileEvent string
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
|
|
var allocID, path string
|
|
|
|
var err error
|
|
|
|
|
|
|
|
q := req.URL.Query()
|
|
|
|
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stream/"); allocID == "" {
|
|
|
|
return nil, allocIDNotPresentErr
|
|
|
|
}
|
|
|
|
|
|
|
|
if path = q.Get("path"); path == "" {
|
|
|
|
return nil, fileNameNotPresentErr
|
|
|
|
}
|
|
|
|
|
|
|
|
var offset int64
|
|
|
|
offsetString := q.Get("offset")
|
|
|
|
if offsetString != "" {
|
|
|
|
var err error
|
|
|
|
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
|
|
|
|
return nil, fmt.Errorf("error parsing offset: %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
origin := q.Get("origin")
|
|
|
|
switch origin {
|
|
|
|
case "start", "end":
|
|
|
|
case "":
|
|
|
|
origin = "start"
|
|
|
|
default:
|
|
|
|
return nil, invalidOrigin
|
|
|
|
}
|
|
|
|
|
|
|
|
fs, err := s.agent.client.GetAllocFS(allocID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
fileInfo, err := fs.Stat(path)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if fileInfo.IsDir {
|
|
|
|
return nil, fmt.Errorf("file %q is a directory", path)
|
|
|
|
}
|
|
|
|
|
|
|
|
// If offsetting from the end subtract from the size
|
|
|
|
if origin == "end" {
|
|
|
|
offset = fileInfo.Size - offset
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2016-07-06 03:48:25 +00:00
|
|
|
// Create an output that gets flushed on every write
|
|
|
|
output := ioutils.NewWriteFlusher(resp)
|
|
|
|
|
2016-07-06 00:08:58 +00:00
|
|
|
// Create a JSON encoder
|
2016-07-06 03:48:25 +00:00
|
|
|
enc := codec.NewEncoder(output, jsonHandle)
|
2016-07-06 00:08:58 +00:00
|
|
|
|
|
|
|
// Get the reader
|
|
|
|
f, err := fs.ReadAt(path, offset)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer f.Close()
|
|
|
|
|
|
|
|
// Create a tomb to cancel watch events
|
|
|
|
t := tomb.Tomb{}
|
|
|
|
defer t.Done()
|
|
|
|
|
|
|
|
// Create the heartbeat timer
|
|
|
|
ticker := time.NewTimer(streamHeartbeatRate)
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
|
|
// Create a variable to allow setting the last event
|
|
|
|
var lastEvent string
|
|
|
|
|
|
|
|
// Start streaming the data
|
|
|
|
OUTER:
|
|
|
|
for {
|
|
|
|
// Create a frame
|
|
|
|
frame := StreamFrame{
|
|
|
|
Offset: offset,
|
|
|
|
File: path,
|
|
|
|
}
|
|
|
|
data := make([]byte, frameSize)
|
|
|
|
|
|
|
|
if lastEvent != "" {
|
|
|
|
frame.FileEvent = lastEvent
|
|
|
|
lastEvent = ""
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read up to the max frame size
|
|
|
|
n, err := f.Read(data)
|
|
|
|
|
|
|
|
// Update the offset
|
|
|
|
offset += int64(n)
|
|
|
|
|
|
|
|
// Convert the data to Base64
|
|
|
|
frame.Data = base64.StdEncoding.EncodeToString(data[:n])
|
|
|
|
|
|
|
|
// Return non-EOF errors
|
|
|
|
if err != nil && err != io.EOF {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Send the frame
|
|
|
|
if err := enc.Encode(&frame); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Just keep reading
|
|
|
|
if err == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// If EOF is hit, wait for a change to the file but periodically
|
|
|
|
// heartbeat to ensure the socket is not closed
|
|
|
|
changes, err := fs.ChangeEvents(path, offset, &t)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Reset the heartbeat timer as we just started waiting
|
|
|
|
ticker.Reset(streamHeartbeatRate)
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-changes.Modified:
|
|
|
|
continue OUTER
|
|
|
|
case <-changes.Deleted:
|
|
|
|
// Send a heartbeat frame with the delete
|
|
|
|
hFrame := StreamFrame{
|
|
|
|
Offset: offset,
|
|
|
|
File: path,
|
|
|
|
FileEvent: deleteEvent,
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := enc.Encode(&hFrame); err != nil {
|
|
|
|
// The defer on the tomb will stop the watch
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, nil
|
|
|
|
case <-changes.Truncated:
|
|
|
|
// Close the current reader
|
|
|
|
if err := f.Close(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get a new reader at offset zero
|
|
|
|
offset = 0
|
|
|
|
var err error
|
|
|
|
f, err = fs.ReadAt(path, offset)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer f.Close()
|
|
|
|
|
|
|
|
// Store the last event
|
|
|
|
lastEvent = truncateEvent
|
|
|
|
continue OUTER
|
|
|
|
case <-t.Dying():
|
|
|
|
return nil, nil
|
|
|
|
case <-ticker.C:
|
|
|
|
// Send a heartbeat frame
|
|
|
|
hFrame := StreamFrame{
|
|
|
|
Offset: offset,
|
|
|
|
File: path,
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := enc.Encode(&hFrame); err != nil {
|
|
|
|
// The defer on the tomb will stop the watch
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
ticker.Reset(streamHeartbeatRate)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, nil
|
|
|
|
}
|