package agent import ( "bytes" "fmt" "io" "math" "net/http" "os" "path/filepath" "sort" "strconv" "strings" "sync" "time" "gopkg.in/tomb.v1" "github.com/docker/docker/pkg/ioutils" "github.com/hashicorp/nomad/client/allocdir" "github.com/hpcloud/tail/watch" "github.com/ugorji/go/codec" ) var ( allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id") fileNameNotPresentErr = fmt.Errorf("must provide a file name") taskNotPresentErr = fmt.Errorf("must provide task name") logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)") clientNotRunning = fmt.Errorf("node is not running a Nomad Client") invalidOrigin = fmt.Errorf("origin must be start or end") ) const ( // streamFrameSize is the maximum number of bytes to send in a single frame streamFrameSize = 64 * 1024 // streamHeartbeatRate is the rate at which a heartbeat will occur to detect // a closed connection without sending any additional data streamHeartbeatRate = 1 * time.Second // streamBatchWindow is the window in which file content is batched before // being flushed if the frame size has not been hit. streamBatchWindow = 200 * time.Millisecond deleteEvent = "file deleted" truncateEvent = "file truncated" ) func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { if s.agent.client == nil { return nil, clientNotRunning } path := strings.TrimPrefix(req.URL.Path, "/v1/client/fs/") switch { case strings.HasPrefix(path, "ls/"): return s.DirectoryListRequest(resp, req) case strings.HasPrefix(path, "stat/"): return s.FileStatRequest(resp, req) case strings.HasPrefix(path, "readat/"): return s.FileReadAtRequest(resp, req) case strings.HasPrefix(path, "cat/"): return s.FileCatRequest(resp, req) case strings.HasPrefix(path, "stream/"): return s.Stream(resp, req) case strings.HasPrefix(path, "logs/"): return s.Logs(resp, req) default: return nil, CodedError(404, ErrInvalidMethod) } } func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var allocID, path string if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/ls/"); allocID == "" { return nil, allocIDNotPresentErr } if path = req.URL.Query().Get("path"); path == "" { path = "/" } fs, err := s.agent.client.GetAllocFS(allocID) if err != nil { return nil, err } return fs.List(path) } func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var allocID, path string if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stat/"); allocID == "" { return nil, allocIDNotPresentErr } if path = req.URL.Query().Get("path"); path == "" { return nil, fileNameNotPresentErr } fs, err := s.agent.client.GetAllocFS(allocID) if err != nil { return nil, err } return fs.Stat(path) } func (s *HTTPServer) FileReadAtRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var allocID, path string var offset, limit int64 var err error q := req.URL.Query() if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/readat/"); allocID == "" { return nil, allocIDNotPresentErr } if path = q.Get("path"); path == "" { return nil, fileNameNotPresentErr } if offset, err = strconv.ParseInt(q.Get("offset"), 10, 64); err != nil { return nil, fmt.Errorf("error parsing offset: %v", err) } // Parse the limit if limitStr := q.Get("limit"); limitStr != "" { if limit, err = strconv.ParseInt(limitStr, 10, 64); err != nil { return nil, fmt.Errorf("error parsing limit: %v", err) } } fs, err := s.agent.client.GetAllocFS(allocID) if err != nil { return nil, err } rc, err := fs.ReadAt(path, offset) if limit > 0 { rc = &ReadCloserWrapper{ Reader: io.LimitReader(rc, limit), Closer: rc, } } if err != nil { return nil, err } io.Copy(resp, rc) return nil, rc.Close() } // ReadCloserWrapper wraps a LimitReader so that a file is closed once it has been // read type ReadCloserWrapper struct { io.Reader io.Closer } func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var allocID, path string var err error q := req.URL.Query() if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/cat/"); allocID == "" { return nil, allocIDNotPresentErr } if path = q.Get("path"); path == "" { return nil, fileNameNotPresentErr } fs, err := s.agent.client.GetAllocFS(allocID) if err != nil { return nil, err } fileInfo, err := fs.Stat(path) if err != nil { return nil, err } if fileInfo.IsDir { return nil, fmt.Errorf("file %q is a directory", path) } r, err := fs.ReadAt(path, int64(0)) if err != nil { return nil, err } io.Copy(resp, r) return nil, r.Close() } // StreamFrame is used to frame data of a file when streaming type StreamFrame struct { // Offset is the offset the data was read from Offset int64 `json:",omitempty"` // Data is the read data Data []byte `json:",omitempty"` // File is the file that the data was read from File string `json:",omitempty"` // FileEvent is the last file event that occurred that could cause the // streams position to change or end FileEvent string `json:",omitempty"` } // IsHeartbeat returns if the frame is a heartbeat frame func (s *StreamFrame) IsHeartbeat() bool { return s.Offset == 0 && len(s.Data) == 0 && s.File == "" && s.FileEvent == "" } // StreamFramer is used to buffer and send frames as well as heartbeat. type StreamFramer struct { out io.WriteCloser enc *codec.Encoder frameSize int heartbeat *time.Ticker flusher *time.Ticker shutdownCh chan struct{} exitCh chan struct{} outbound chan *StreamFrame // The mutex protects everything below l sync.Mutex // The current working frame f *StreamFrame data *bytes.Buffer // Captures whether the framer is running and any error that occurred to // cause it to stop. running bool Err error } // NewStreamFramer creates a new stream framer that will output StreamFrames to // the passed output. func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer { // Create a JSON encoder enc := codec.NewEncoder(out, jsonHandle) // Create the heartbeat and flush ticker heartbeat := time.NewTicker(heartbeatRate) flusher := time.NewTicker(batchWindow) return &StreamFramer{ out: out, enc: enc, frameSize: frameSize, heartbeat: heartbeat, flusher: flusher, outbound: make(chan *StreamFrame), data: bytes.NewBuffer(make([]byte, 0, 2*frameSize)), shutdownCh: make(chan struct{}), exitCh: make(chan struct{}), } } // Destroy is used to cleanup the StreamFramer and flush any pending frames func (s *StreamFramer) Destroy() { s.l.Lock() close(s.shutdownCh) s.heartbeat.Stop() s.flusher.Stop() s.l.Unlock() // Ensure things were flushed if s.running { <-s.exitCh } s.out.Close() } // Run starts a long lived goroutine that handles sending data as well as // heartbeating func (s *StreamFramer) Run() { s.l.Lock() defer s.l.Unlock() if s.running { return } s.running = true go s.run() } // ExitCh returns a channel that will be closed when the run loop terminates. func (s *StreamFramer) ExitCh() <-chan struct{} { return s.exitCh } // run is the internal run method. It exits if Destroy is called or an error // occurs, in which case the exit channel is closed. func (s *StreamFramer) run() { // Store any error and mark it as not running var err error defer func() { s.l.Lock() s.Err = err close(s.exitCh) close(s.outbound) s.running = false s.l.Unlock() }() // Start a heartbeat/flusher go-routine. This is done seprately to avoid blocking // the outbound channel. go func() { for { select { case <-s.exitCh: return case <-s.shutdownCh: return case <-s.flusher.C: // Skip if there is nothing to flush s.l.Lock() if s.f == nil { s.l.Unlock() continue } // Read the data for the frame, and send it s.f.Data = s.readData() s.outbound <- s.f s.f = nil s.l.Unlock() case <-s.heartbeat.C: // Send a heartbeat frame select { case s.outbound <- &StreamFrame{}: default: } } } }() OUTER: for { select { case <-s.shutdownCh: break OUTER case o := <-s.outbound: // Send the frame if err = s.enc.Encode(o); err != nil { return } } } // Flush any existing frames s.l.Lock() select { case o := <-s.outbound: // Send the frame and then clear the current working frame if err = s.enc.Encode(o); err != nil { s.l.Unlock() return } default: } if s.f != nil { s.f.Data = s.readData() s.enc.Encode(s.f) } s.l.Unlock() } // readData is a helper which reads the buffered data returning up to the frame // size of data. Must be called with the lock held. The returned value is // invalid on the next read or write into the StreamFramer buffer func (s *StreamFramer) readData() []byte { // Compute the amount to read from the buffer size := s.data.Len() if size > s.frameSize { size = s.frameSize } if size == 0 { return nil } d := s.data.Next(size) b := make([]byte, size) copy(b, d) return b } // Send creates and sends a StreamFrame based on the passed parameters. An error // is returned if the run routine hasn't run or encountered an error. Send is // asyncronous and does not block for the data to be transferred. func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) error { s.l.Lock() defer s.l.Unlock() // If we are not running, return the error that caused us to not run or // indicated that it was never started. if !s.running { if s.Err != nil { return s.Err } return fmt.Errorf("StreamFramer not running") } // Check if not mergeable if s.f != nil && (s.f.File != file || s.f.FileEvent != fileEvent) { // Flush the old frame f := *s.f f.Data = s.readData() select { case <-s.exitCh: return nil case s.outbound <- &f: s.f = nil } } // Store the new data as the current frame. if s.f == nil { s.f = &StreamFrame{ Offset: offset, File: file, FileEvent: fileEvent, } } // Write the data to the buffer s.data.Write(data) // Handle the delete case in which there is no data if s.data.Len() == 0 && s.f.FileEvent != "" { select { case <-s.exitCh: return nil case s.outbound <- &StreamFrame{ Offset: s.f.Offset, File: s.f.File, FileEvent: s.f.FileEvent, }: } } // Flush till we are under the max frame size for s.data.Len() >= s.frameSize { // Create a new frame to send it d := s.readData() select { case <-s.exitCh: return nil case s.outbound <- &StreamFrame{ Offset: s.f.Offset, File: s.f.File, FileEvent: s.f.FileEvent, Data: d, }: } } if s.data.Len() == 0 { s.f = nil } return nil } // Stream streams the content of a file blocking on EOF. // The parameters are: // * path: path to file to stream. // * offset: The offset to start streaming data at, defaults to zero. // * origin: Either "start" or "end" and defines from where the offset is // applied. Defaults to "start". func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var allocID, path string var err error q := req.URL.Query() if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stream/"); allocID == "" { return nil, allocIDNotPresentErr } if path = q.Get("path"); path == "" { return nil, fileNameNotPresentErr } var offset int64 offsetString := q.Get("offset") if offsetString != "" { var err error if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil { return nil, fmt.Errorf("error parsing offset: %v", err) } } origin := q.Get("origin") switch origin { case "start", "end": case "": origin = "start" default: return nil, invalidOrigin } fs, err := s.agent.client.GetAllocFS(allocID) if err != nil { return nil, err } fileInfo, err := fs.Stat(path) if err != nil { return nil, err } if fileInfo.IsDir { return nil, fmt.Errorf("file %q is a directory", path) } // If offsetting from the end subtract from the size if origin == "end" { offset = fileInfo.Size - offset } // Create an output that gets flushed on every write output := ioutils.NewWriteFlusher(resp) // Create the framer framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() return nil, s.stream(offset, path, fs, framer, nil) } func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, framer *StreamFramer, eofCancelCh chan error) error { // Get the reader f, err := fs.ReadAt(path, offset) if err != nil { return err } defer f.Close() // Create a tomb to cancel watch events t := tomb.Tomb{} defer func() { t.Kill(nil) t.Done() }() // Create a variable to allow setting the last event var lastEvent string // Only create the file change watcher once. But we need to do it after we // read and reach EOF. var changes *watch.FileChanges // Start streaming the data data := make([]byte, streamFrameSize) OUTER: for { // Read up to the max frame size n, readErr := f.Read(data) // Update the offset offset += int64(n) // Return non-EOF errors if readErr != nil && readErr != io.EOF { return readErr } // Send the frame if n != 0 { if err := framer.Send(path, lastEvent, data[:n], offset); err != nil { return err } } // Clear the last event if lastEvent != "" { lastEvent = "" } // Just keep reading if readErr == nil { continue } // If EOF is hit, wait for a change to the file if changes == nil { changes, err = fs.ChangeEvents(path, offset, &t) if err != nil { return err } } for { select { case <-changes.Modified: continue OUTER case <-changes.Deleted: return framer.Send(path, deleteEvent, nil, offset) case <-changes.Truncated: // Close the current reader if err := f.Close(); err != nil { return err } // Get a new reader at offset zero offset = 0 var err error f, err = fs.ReadAt(path, offset) if err != nil { return err } defer f.Close() // Store the last event lastEvent = truncateEvent continue OUTER case <-framer.ExitCh(): return nil case err := <-eofCancelCh: return err } } } return nil } // Logs streams the content of a log blocking on EOF. The parameters are: // * task: task name to stream logs for. // * type: stdout/stderr to stream. // * offset: The offset to start streaming data at, defaults to zero. // * origin: Either "start" or "end" and defines from where the offset is // applied. Defaults to "start". func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var allocID, task, logType string var err error q := req.URL.Query() if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/logs/"); allocID == "" { return nil, allocIDNotPresentErr } if task = q.Get("task"); task == "" { return nil, taskNotPresentErr } logType = q.Get("type") switch logType { case "stdout", "stderr": default: return nil, logTypeNotPresentErr } var offset int64 offsetString := q.Get("offset") if offsetString != "" { var err error if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil { return nil, fmt.Errorf("error parsing offset: %v", err) } } origin := q.Get("origin") switch origin { case "start", "end": case "": origin = "start" default: return nil, invalidOrigin } fs, err := s.agent.client.GetAllocFS(allocID) if err != nil { return nil, err } // Create an output that gets flushed on every write output := ioutils.NewWriteFlusher(resp) return nil, s.logs(offset, origin, task, logType, fs, output) } func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdir.AllocDirFS, output io.WriteCloser) error { // Create the framer framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() // Path to the logs logPath := filepath.Join(allocdir.SharedAllocName, allocdir.LogDirName) // nextIdx is the next index to read logs from var nextIdx int64 switch origin { case "start": nextIdx = 0 case "end": nextIdx = math.MaxInt64 offset *= -1 default: return invalidOrigin } // Create a tomb to cancel watch events t := tomb.Tomb{} defer func() { t.Kill(nil) t.Done() }() for { // Logic for picking next file is: // 1) List log files // 2) Pick log file closest to desired index // 3) Open log file at correct offset // 3a) No error, read contents // 3b) If file doesn't exist, goto 1 as it may have been rotated out entries, err := fs.List(logPath) if err != nil { return fmt.Errorf("failed to list entries: %v", err) } logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType) if err != nil { return err } p := filepath.Join(logPath, logEntry.Name) nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1)) nextExists := fs.BlockUntilExists(nextPath, &t) err = s.stream(openOffset, p, fs, framer, nextExists) // Check if there was an error where the file does not exist. That means // it got rotated out from under us. if err != nil { if os.IsNotExist(err) { continue } return err } //Since we successfully streamed, update the overall offset/idx. offset = int64(0) nextIdx = idx + 1 } return nil } // indexTuple and indexTupleArray are used to find the correct log entry to // start streaming logs from type indexTuple struct { idx int64 entry *allocdir.AllocFileInfo } type indexTupleArray []indexTuple func (a indexTupleArray) Len() int { return len(a) } func (a indexTupleArray) Less(i, j int) bool { return a[i].idx < a[j].idx } func (a indexTupleArray) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // findClosest takes a list of entries, the desired log index and desired log // offset (which can be negative, treated as offset from end), task name and log // type and returns the log entry, the log index, the offset to read from and a // potential error. func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx, desiredOffset int64, task, logType string) (*allocdir.AllocFileInfo, int64, int64, error) { // Build the matching indexes var indexes []indexTuple prefix := fmt.Sprintf("%s.%s.", task, logType) for _, entry := range entries { if entry.IsDir { continue } // If nothing was trimmed, then it is not a match idxStr := strings.TrimPrefix(entry.Name, prefix) if idxStr == entry.Name { continue } // Convert to an int idx, err := strconv.Atoi(idxStr) if err != nil { return nil, 0, 0, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err) } indexes = append(indexes, indexTuple{idx: int64(idx), entry: entry}) } if len(indexes) == 0 { return nil, 0, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType) } // Binary search the indexes to get the desiredIdx sort.Sort(indexTupleArray(indexes)) i := sort.Search(len(indexes), func(i int) bool { return indexes[i].idx >= desiredIdx }) l := len(indexes) if i == l { // Use the last index if the number is bigger than all of them. i = l - 1 } // Get to the correct offset offset := desiredOffset idx := int64(i) for { s := indexes[idx].entry.Size // Base case if offset == 0 { break } else if offset < 0 { // Going backwards if newOffset := s + offset; newOffset >= 0 { // Current file works offset = newOffset break } else if idx == 0 { // Already at the end offset = 0 break } else { // Try the file before offset = newOffset idx -= 1 continue } } else { // Going forward if offset <= s { // Current file works break } else if idx == int64(l-1) { // Already at the end offset = s break } else { // Try the next file offset = offset - s idx += 1 continue } } } return indexes[idx].entry, indexes[idx].idx, offset, nil }