diff --git a/CHANGELOG.md b/CHANGELOG.md
index 655373ce3..4bc44f675 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,8 @@ IMPROVEMENTS:
BUG FIXES:
* agent/config: Fix use of IPv6 addresses [GH-2036]
+ * api: Fix file descriptor leak and high CPU usage when using the logs
+ endpoint [GH-2079]
* cli: Improve parsing error when a job without a name is specified [GH-2030]
* client: Fixed permissions of migrated allocation directory [GH-2061]
* client: Ensuring allocations are not blocked more than once [GH-2040]
diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go
index 48cc5a16e..d84411e2b 100644
--- a/command/agent/fs_endpoint.go
+++ b/command/agent/fs_endpoint.go
@@ -1,11 +1,12 @@
package agent
+//go:generate codecgen -o fs_endpoint.generated.go fs_endpoint.go
+
import (
"bytes"
"fmt"
"io"
"math"
- "net"
"net/http"
"os"
"path/filepath"
@@ -204,6 +205,12 @@ func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request)
return nil, r.Close()
}
+var (
+ // HeartbeatStreamFrame is the StreamFrame to send as a heartbeat, avoiding
+ // creating many instances of the empty StreamFrame
+ HeartbeatStreamFrame = &StreamFrame{}
+)
+
// StreamFrame is used to frame data of a file when streaming
type StreamFrame struct {
// Offset is the offset the data was read from
@@ -225,6 +232,27 @@ func (s *StreamFrame) IsHeartbeat() bool {
return s.Offset == 0 && len(s.Data) == 0 && s.File == "" && s.FileEvent == ""
}
+func (s *StreamFrame) Clear() {
+ s.Offset = 0
+ s.Data = nil
+ s.File = ""
+ s.FileEvent = ""
+}
+
+func (s *StreamFrame) IsCleared() bool {
+ if s.Offset != 0 {
+ return false
+ } else if s.Data != nil {
+ return false
+ } else if s.File != "" {
+ return false
+ } else if s.FileEvent != "" {
+ return false
+ } else {
+ return true
+ }
+}
+
// StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct {
out io.WriteCloser
@@ -243,7 +271,7 @@ type StreamFramer struct {
l sync.Mutex
// The current working frame
- f *StreamFrame
+ f StreamFrame
data *bytes.Buffer
// Captures whether the framer is running and any error that occurred to
@@ -328,32 +356,32 @@ OUTER:
case <-s.flusher.C:
// Skip if there is nothing to flush
s.l.Lock()
- if s.f == nil {
+ if s.f.IsCleared() {
s.l.Unlock()
continue
}
// Read the data for the frame, and send it
s.f.Data = s.readData()
- err = s.send(s.f)
- s.f = nil
+ err = s.send(&s.f)
+ s.f.Clear()
s.l.Unlock()
if err != nil {
return
}
case <-s.heartbeat.C:
// Send a heartbeat frame
- if err = s.send(&StreamFrame{}); err != nil {
+ if err = s.send(HeartbeatStreamFrame); err != nil {
return
}
}
}
s.l.Lock()
- if s.f != nil {
+ if !s.f.IsCleared() {
s.f.Data = s.readData()
- err = s.send(s.f)
- s.f = nil
+ err = s.send(&s.f)
+ s.f.Clear()
}
s.l.Unlock()
}
@@ -378,9 +406,7 @@ func (s *StreamFramer) readData() []byte {
return nil
}
d := s.data.Next(size)
- b := make([]byte, size)
- copy(b, d)
- return b
+ return d
}
// Send creates and sends a StreamFrame based on the passed parameters. An error
@@ -401,75 +427,62 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
}
// Check if not mergeable
- if s.f != nil && (s.f.File != file || s.f.FileEvent != fileEvent) {
+ if !s.f.IsCleared() && (s.f.File != file || s.f.FileEvent != fileEvent) {
// Flush the old frame
- f := *s.f
- f.Data = s.readData()
+ s.f.Data = s.readData()
select {
case <-s.exitCh:
return nil
default:
}
- err := s.send(&f)
- s.f = nil
+ err := s.send(&s.f)
+ s.f.Clear()
if err != nil {
return err
}
}
// Store the new data as the current frame.
- if s.f == nil {
- s.f = &StreamFrame{
- Offset: offset,
- File: file,
- FileEvent: fileEvent,
- }
+ if s.f.IsCleared() {
+ s.f.Offset = offset
+ s.f.File = file
+ s.f.FileEvent = fileEvent
}
// Write the data to the buffer
s.data.Write(data)
// Handle the delete case in which there is no data
+ force := false
if s.data.Len() == 0 && s.f.FileEvent != "" {
- select {
- case <-s.exitCh:
- return nil
- default:
- }
-
- f := &StreamFrame{
- Offset: s.f.Offset,
- File: s.f.File,
- FileEvent: s.f.FileEvent,
- }
- if err := s.send(f); err != nil {
- return err
- }
+ force = true
}
// Flush till we are under the max frame size
- for s.data.Len() >= s.frameSize {
+ for s.data.Len() >= s.frameSize || force {
+ // Clear
+ if force {
+ force = false
+ }
+
// Create a new frame to send it
- d := s.readData()
+ s.f.Data = s.readData()
select {
case <-s.exitCh:
return nil
default:
}
- f := &StreamFrame{
- Offset: s.f.Offset,
- File: s.f.File,
- FileEvent: s.f.FileEvent,
- Data: d,
- }
- if err := s.send(f); err != nil {
+ if err := s.send(&s.f); err != nil {
return err
}
+
+ // Update the offset
+ s.f.Offset += int64(len(s.f.Data))
}
if s.data.Len() == 0 {
- s.f = nil
+ s.f.Clear()
}
return nil
@@ -569,6 +582,26 @@ func (s *HTTPServer) stream(offset int64, path string,
t.Done()
}()
+ // parseFramerErr takes an error and returns an error. The error will
+ // potentially change if it was caused by the connection being closed.
+ parseFramerErr := func(e error) error {
+ if e == nil {
+ return nil
+ }
+
+ if strings.Contains(e.Error(), io.ErrClosedPipe.Error()) {
+ // The pipe check is for tests
+ return syscall.EPIPE
+ }
+
+ // The connection was closed by our peer
+ if strings.Contains(e.Error(), syscall.EPIPE.Error()) || strings.Contains(e.Error(), syscall.ECONNRESET.Error()) {
+ return syscall.EPIPE
+ }
+
+ return err
+ }
+
// Create a variable to allow setting the last event
var lastEvent string
@@ -594,23 +627,7 @@ OUTER:
// Send the frame
if n != 0 {
if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {
-
- // Check if the connection has been closed
- if err == io.ErrClosedPipe {
- // The pipe check is for tests
- return syscall.EPIPE
- }
-
- operr, ok := err.(*net.OpError)
- if ok {
- // The connection was closed by our peer
- e := operr.Err.Error()
- if strings.Contains(e, syscall.EPIPE.Error()) || strings.Contains(e, syscall.ECONNRESET.Error()) {
- return syscall.EPIPE
- }
- }
-
- return err
+ return parseFramerErr(err)
}
}
@@ -637,7 +654,7 @@ OUTER:
case <-changes.Modified:
continue OUTER
case <-changes.Deleted:
- return framer.Send(path, deleteEvent, nil, offset)
+ return parseFramerErr(framer.Send(path, deleteEvent, nil, offset))
case <-changes.Truncated:
// Close the current reader
if err := f.Close(); err != nil {
@@ -657,7 +674,7 @@ OUTER:
lastEvent = truncateEvent
continue OUTER
case <-framer.ExitCh():
- return nil
+ return parseFramerErr(framer.Err)
case err, ok := <-eofCancelCh:
if !ok {
return nil
@@ -850,7 +867,9 @@ func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logT
return
}
- scanCh := time.Tick(nextLogCheckRate)
+ ticker := time.NewTicker(nextLogCheckRate)
+ defer ticker.Stop()
+ scanCh := ticker.C
for {
select {
case <-t.Dead():
diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go
index 152832e99..bf01c7794 100644
--- a/command/agent/fs_endpoint_test.go
+++ b/command/agent/fs_endpoint_test.go
@@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"reflect"
+ "runtime"
"strconv"
"testing"
"time"
@@ -438,7 +439,7 @@ func TestHTTP_Stream_MissingParams(t *testing.T) {
// tempAllocDir returns a new alloc dir that is rooted in a temp dir. The caller
// should destroy the temp dir.
-func tempAllocDir(t *testing.T) *allocdir.AllocDir {
+func tempAllocDir(t testing.TB) *allocdir.AllocDir {
dir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("TempDir() failed: %v", err)
@@ -925,6 +926,112 @@ func TestHTTP_Logs_Follow(t *testing.T) {
})
}
+func BenchmarkHTTP_Logs_Follow(t *testing.B) {
+ runtime.MemProfileRate = 1
+
+ s := makeHTTPServer(t, nil)
+ defer s.Cleanup()
+ testutil.WaitForLeader(t, s.Agent.RPC)
+
+ // Get a temp alloc dir and create the log dir
+ ad := tempAllocDir(t)
+ s.Agent.logger.Printf("ALEX: LOG DIR: %q", ad.SharedDir)
+ //defer os.RemoveAll(ad.AllocDir)
+
+ logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName)
+ if err := os.MkdirAll(logDir, 0777); err != nil {
+ t.Fatalf("Failed to make log dir: %v", err)
+ }
+
+ // Create a series of log files in the temp dir
+ task := "foo"
+ logType := "stdout"
+ expected := make([]byte, 1024*1024*100)
+ initialWrites := 3
+
+ writeToFile := func(index int, data []byte) {
+ logFile := fmt.Sprintf("%s.%s.%d", task, logType, index)
+ logFilePath := filepath.Join(logDir, logFile)
+ err := ioutil.WriteFile(logFilePath, data, 777)
+ if err != nil {
+ t.Fatalf("Failed to create file: %v", err)
+ }
+ }
+
+ part := (len(expected) / 3) - 50
+ goodEnough := (8 * len(expected)) / 10
+ for i := 0; i < initialWrites; i++ {
+ writeToFile(i, expected[i*part:(i+1)*part])
+ }
+
+ t.ResetTimer()
+ for i := 0; i < t.N; i++ {
+ s.Agent.logger.Printf("BENCHMARK %d", i)
+
+ // Create a decoder
+ r, w := io.Pipe()
+ wrappedW := &WriteCloseChecker{WriteCloser: w}
+ defer r.Close()
+ defer w.Close()
+ dec := codec.NewDecoder(r, jsonHandle)
+
+ var received []byte
+
+ // Start the reader
+ fullResultCh := make(chan struct{})
+ go func() {
+ for {
+ var frame StreamFrame
+ if err := dec.Decode(&frame); err != nil {
+ if err == io.EOF {
+ t.Logf("EOF")
+ return
+ }
+
+ t.Fatalf("failed to decode: %v", err)
+ }
+
+ if frame.IsHeartbeat() {
+ continue
+ }
+
+ received = append(received, frame.Data...)
+ if len(received) > goodEnough {
+ close(fullResultCh)
+ return
+ }
+ }
+ }()
+
+ // Start streaming logs
+ go func() {
+ if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
+ t.Fatalf("logs() failed: %v", err)
+ }
+ }()
+
+ select {
+ case <-fullResultCh:
+ case <-time.After(time.Duration(60 * time.Second)):
+ t.Fatalf("did not receive data: %d < %d", len(received), goodEnough)
+ }
+
+ s.Agent.logger.Printf("ALEX: CLOSING")
+
+ // Close the reader
+ r.Close()
+ s.Agent.logger.Printf("ALEX: CLOSED")
+
+ s.Agent.logger.Printf("ALEX: WAITING FOR WRITER TO CLOSE")
+ testutil.WaitForResult(func() (bool, error) {
+ return wrappedW.Closed, nil
+ }, func(err error) {
+ t.Fatalf("connection not closed")
+ })
+ s.Agent.logger.Printf("ALEX: WRITER CLOSED")
+ }
+}
+
func TestLogs_findClosest(t *testing.T) {
task := "foo"
entries := []*allocdir.AllocFileInfo{
diff --git a/command/agent/http_test.go b/command/agent/http_test.go
index aa71add72..34f37dcdf 100644
--- a/command/agent/http_test.go
+++ b/command/agent/http_test.go
@@ -381,7 +381,7 @@ func getIndex(t *testing.T, resp *httptest.ResponseRecorder) uint64 {
return uint64(val)
}
-func httpTest(t *testing.T, cb func(c *Config), f func(srv *TestServer)) {
+func httpTest(t testing.TB, cb func(c *Config), f func(srv *TestServer)) {
s := makeHTTPServer(t, cb)
defer s.Cleanup()
testutil.WaitForLeader(t, s.Agent.RPC)
diff --git a/testutil/wait.go b/testutil/wait.go
index 23c88c497..bdac812d0 100644
--- a/testutil/wait.go
+++ b/testutil/wait.go
@@ -54,7 +54,7 @@ func IsTravis() bool {
type rpcFn func(string, interface{}, interface{}) error
-func WaitForLeader(t *testing.T, rpc rpcFn) {
+func WaitForLeader(t testing.TB, rpc rpcFn) {
WaitForResult(func() (bool, error) {
args := &structs.GenericRequest{}
var leader string
diff --git a/website/source/docs/agent/configuration/client.html.md b/website/source/docs/agent/configuration/client.html.md
index 601a255c6..e06b05aab 100644
--- a/website/source/docs/agent/configuration/client.html.md
+++ b/website/source/docs/agent/configuration/client.html.md
@@ -46,7 +46,7 @@ client {
job is allowed to wait to exit. Individual jobs may customize their own kill
timeout, but it may not exceed this value.
-- `meta` ([Meta][]: nil)
- Specifies a key-value map that annotates
+- `meta` `(map[string]string: nil)` - Specifies a key-value map that annotates
with user-defined metadata.
- `network_interface` `(string: "lo | lo0")` - Specifies the name of the
@@ -299,5 +299,3 @@ client {
}
}
```
-
-[meta]: /docs/job-specification/meta.html "Nomad meta Job Specification"
diff --git a/website/source/docs/job-specification/constraint.html.md b/website/source/docs/job-specification/constraint.html.md
index eb7c87014..bbafbb7d6 100644
--- a/website/source/docs/job-specification/constraint.html.md
+++ b/website/source/docs/job-specification/constraint.html.md
@@ -24,9 +24,9 @@ description: |-
The `constraint` allows restricting the set of eligible nodes. Constraints may
-filter on [attributes][interpolation] or [metadata][meta]. Additionally
-constraints may be specified at the [job][job], [group][group], or [task][task]
-levels for ultimate flexibility.
+filter on [attributes][interpolation] or [client metadata][client-meta].
+Additionally constraints may be specified at the [job][job], [group][group], or
+[task][task] levels for ultimate flexibility.
```hcl
job "docs" {
@@ -46,7 +46,7 @@ job "docs" {
task "server" {
# All tasks must run where "my_custom_value" is greater than 3.
constraint {
- attribute = "${meta.my_custom_value}"
+ attribute = "${node.meta.my_custom_value}"
operator = ">"
value = "3"
}
@@ -217,6 +217,6 @@ constraint {
[job]: /docs/job-specification/job.html "Nomad job Job Specification"
[group]: /docs/job-specification/group.html "Nomad group Job Specification"
-[meta]: /docs/job-specification/meta.html "Nomad meta Job Specification"
+[client-meta]: /docs/agent/configuration/client.html#meta "Nomad meta Job Specification"
[task]: /docs/job-specification/task.html "Nomad task Job Specification"
[interpolation]: /docs/runtime/interpolation.html "Nomad interpolation"
diff --git a/website/source/docs/job-specification/job.html.md b/website/source/docs/job-specification/job.html.md
index e703aed1c..844dba133 100644
--- a/website/source/docs/job-specification/job.html.md
+++ b/website/source/docs/job-specification/job.html.md
@@ -219,5 +219,4 @@ $ VAULT_TOKEN="..." nomad run example.nomad
[task]: /docs/job-specification/task.html "Nomad task Job Specification"
[update]: /docs/job-specification/update.html "Nomad update Job Specification"
[vault]: /docs/job-specification/vault.html "Nomad vault Job Specification"
-[meta]: /docs/job-specification/meta.html "Nomad meta Job Specification"
[scheduler]: /docs/runtime/schedulers.html "Nomad Scheduler Types"
diff --git a/website/source/docs/runtime/interpolation.html.md b/website/source/docs/runtime/interpolation.html.md
index e8da3a22c..531dcb92a 100644
--- a/website/source/docs/runtime/interpolation.html.md
+++ b/website/source/docs/runtime/interpolation.html.md
@@ -92,12 +92,12 @@ driver.