Merge branch 'master' of github.com:hashicorp/nomad

This commit is contained in:
Diptanu Choudhury 2016-12-09 17:53:56 -08:00
commit 76c6103d3d
10 changed files with 207 additions and 82 deletions

View file

@ -5,6 +5,8 @@ IMPROVEMENTS:
BUG FIXES: BUG FIXES:
* agent/config: Fix use of IPv6 addresses [GH-2036] * agent/config: Fix use of IPv6 addresses [GH-2036]
* api: Fix file descriptor leak and high CPU usage when using the logs
endpoint [GH-2079]
* cli: Improve parsing error when a job without a name is specified [GH-2030] * cli: Improve parsing error when a job without a name is specified [GH-2030]
* client: Fixed permissions of migrated allocation directory [GH-2061] * client: Fixed permissions of migrated allocation directory [GH-2061]
* client: Ensuring allocations are not blocked more than once [GH-2040] * client: Ensuring allocations are not blocked more than once [GH-2040]

View file

@ -1,11 +1,12 @@
package agent package agent
//go:generate codecgen -o fs_endpoint.generated.go fs_endpoint.go
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"math" "math"
"net"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
@ -204,6 +205,12 @@ func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request)
return nil, r.Close() return nil, r.Close()
} }
var (
// HeartbeatStreamFrame is the StreamFrame to send as a heartbeat, avoiding
// creating many instances of the empty StreamFrame
HeartbeatStreamFrame = &StreamFrame{}
)
// StreamFrame is used to frame data of a file when streaming // StreamFrame is used to frame data of a file when streaming
type StreamFrame struct { type StreamFrame struct {
// Offset is the offset the data was read from // Offset is the offset the data was read from
@ -225,6 +232,27 @@ func (s *StreamFrame) IsHeartbeat() bool {
return s.Offset == 0 && len(s.Data) == 0 && s.File == "" && s.FileEvent == "" return s.Offset == 0 && len(s.Data) == 0 && s.File == "" && s.FileEvent == ""
} }
func (s *StreamFrame) Clear() {
s.Offset = 0
s.Data = nil
s.File = ""
s.FileEvent = ""
}
func (s *StreamFrame) IsCleared() bool {
if s.Offset != 0 {
return false
} else if s.Data != nil {
return false
} else if s.File != "" {
return false
} else if s.FileEvent != "" {
return false
} else {
return true
}
}
// StreamFramer is used to buffer and send frames as well as heartbeat. // StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct { type StreamFramer struct {
out io.WriteCloser out io.WriteCloser
@ -243,7 +271,7 @@ type StreamFramer struct {
l sync.Mutex l sync.Mutex
// The current working frame // The current working frame
f *StreamFrame f StreamFrame
data *bytes.Buffer data *bytes.Buffer
// Captures whether the framer is running and any error that occurred to // Captures whether the framer is running and any error that occurred to
@ -328,32 +356,32 @@ OUTER:
case <-s.flusher.C: case <-s.flusher.C:
// Skip if there is nothing to flush // Skip if there is nothing to flush
s.l.Lock() s.l.Lock()
if s.f == nil { if s.f.IsCleared() {
s.l.Unlock() s.l.Unlock()
continue continue
} }
// Read the data for the frame, and send it // Read the data for the frame, and send it
s.f.Data = s.readData() s.f.Data = s.readData()
err = s.send(s.f) err = s.send(&s.f)
s.f = nil s.f.Clear()
s.l.Unlock() s.l.Unlock()
if err != nil { if err != nil {
return return
} }
case <-s.heartbeat.C: case <-s.heartbeat.C:
// Send a heartbeat frame // Send a heartbeat frame
if err = s.send(&StreamFrame{}); err != nil { if err = s.send(HeartbeatStreamFrame); err != nil {
return return
} }
} }
} }
s.l.Lock() s.l.Lock()
if s.f != nil { if !s.f.IsCleared() {
s.f.Data = s.readData() s.f.Data = s.readData()
err = s.send(s.f) err = s.send(&s.f)
s.f = nil s.f.Clear()
} }
s.l.Unlock() s.l.Unlock()
} }
@ -378,9 +406,7 @@ func (s *StreamFramer) readData() []byte {
return nil return nil
} }
d := s.data.Next(size) d := s.data.Next(size)
b := make([]byte, size) return d
copy(b, d)
return b
} }
// Send creates and sends a StreamFrame based on the passed parameters. An error // Send creates and sends a StreamFrame based on the passed parameters. An error
@ -401,75 +427,62 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
} }
// Check if not mergeable // Check if not mergeable
if s.f != nil && (s.f.File != file || s.f.FileEvent != fileEvent) { if !s.f.IsCleared() && (s.f.File != file || s.f.FileEvent != fileEvent) {
// Flush the old frame // Flush the old frame
f := *s.f s.f.Data = s.readData()
f.Data = s.readData()
select { select {
case <-s.exitCh: case <-s.exitCh:
return nil return nil
default: default:
} }
err := s.send(&f) err := s.send(&s.f)
s.f = nil s.f.Clear()
if err != nil { if err != nil {
return err return err
} }
} }
// Store the new data as the current frame. // Store the new data as the current frame.
if s.f == nil { if s.f.IsCleared() {
s.f = &StreamFrame{ s.f.Offset = offset
Offset: offset, s.f.File = file
File: file, s.f.FileEvent = fileEvent
FileEvent: fileEvent,
}
} }
// Write the data to the buffer // Write the data to the buffer
s.data.Write(data) s.data.Write(data)
// Handle the delete case in which there is no data // Handle the delete case in which there is no data
force := false
if s.data.Len() == 0 && s.f.FileEvent != "" { if s.data.Len() == 0 && s.f.FileEvent != "" {
select { force = true
case <-s.exitCh:
return nil
default:
}
f := &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
}
if err := s.send(f); err != nil {
return err
}
} }
// Flush till we are under the max frame size // Flush till we are under the max frame size
for s.data.Len() >= s.frameSize { for s.data.Len() >= s.frameSize || force {
// Clear
if force {
force = false
}
// Create a new frame to send it // Create a new frame to send it
d := s.readData() s.f.Data = s.readData()
select { select {
case <-s.exitCh: case <-s.exitCh:
return nil return nil
default: default:
} }
f := &StreamFrame{ if err := s.send(&s.f); err != nil {
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
Data: d,
}
if err := s.send(f); err != nil {
return err return err
} }
// Update the offset
s.f.Offset += int64(len(s.f.Data))
} }
if s.data.Len() == 0 { if s.data.Len() == 0 {
s.f = nil s.f.Clear()
} }
return nil return nil
@ -569,6 +582,26 @@ func (s *HTTPServer) stream(offset int64, path string,
t.Done() t.Done()
}() }()
// parseFramerErr takes an error and returns an error. The error will
// potentially change if it was caused by the connection being closed.
parseFramerErr := func(e error) error {
if e == nil {
return nil
}
if strings.Contains(e.Error(), io.ErrClosedPipe.Error()) {
// The pipe check is for tests
return syscall.EPIPE
}
// The connection was closed by our peer
if strings.Contains(e.Error(), syscall.EPIPE.Error()) || strings.Contains(e.Error(), syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}
return err
}
// Create a variable to allow setting the last event // Create a variable to allow setting the last event
var lastEvent string var lastEvent string
@ -594,23 +627,7 @@ OUTER:
// Send the frame // Send the frame
if n != 0 { if n != 0 {
if err := framer.Send(path, lastEvent, data[:n], offset); err != nil { if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {
return parseFramerErr(err)
// Check if the connection has been closed
if err == io.ErrClosedPipe {
// The pipe check is for tests
return syscall.EPIPE
}
operr, ok := err.(*net.OpError)
if ok {
// The connection was closed by our peer
e := operr.Err.Error()
if strings.Contains(e, syscall.EPIPE.Error()) || strings.Contains(e, syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}
}
return err
} }
} }
@ -637,7 +654,7 @@ OUTER:
case <-changes.Modified: case <-changes.Modified:
continue OUTER continue OUTER
case <-changes.Deleted: case <-changes.Deleted:
return framer.Send(path, deleteEvent, nil, offset) return parseFramerErr(framer.Send(path, deleteEvent, nil, offset))
case <-changes.Truncated: case <-changes.Truncated:
// Close the current reader // Close the current reader
if err := f.Close(); err != nil { if err := f.Close(); err != nil {
@ -657,7 +674,7 @@ OUTER:
lastEvent = truncateEvent lastEvent = truncateEvent
continue OUTER continue OUTER
case <-framer.ExitCh(): case <-framer.ExitCh():
return nil return parseFramerErr(framer.Err)
case err, ok := <-eofCancelCh: case err, ok := <-eofCancelCh:
if !ok { if !ok {
return nil return nil
@ -850,7 +867,9 @@ func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logT
return return
} }
scanCh := time.Tick(nextLogCheckRate) ticker := time.NewTicker(nextLogCheckRate)
defer ticker.Stop()
scanCh := ticker.C
for { for {
select { select {
case <-t.Dead(): case <-t.Dead():

View file

@ -11,6 +11,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
"runtime"
"strconv" "strconv"
"testing" "testing"
"time" "time"
@ -438,7 +439,7 @@ func TestHTTP_Stream_MissingParams(t *testing.T) {
// tempAllocDir returns a new alloc dir that is rooted in a temp dir. The caller // tempAllocDir returns a new alloc dir that is rooted in a temp dir. The caller
// should destroy the temp dir. // should destroy the temp dir.
func tempAllocDir(t *testing.T) *allocdir.AllocDir { func tempAllocDir(t testing.TB) *allocdir.AllocDir {
dir, err := ioutil.TempDir("", "") dir, err := ioutil.TempDir("", "")
if err != nil { if err != nil {
t.Fatalf("TempDir() failed: %v", err) t.Fatalf("TempDir() failed: %v", err)
@ -925,6 +926,112 @@ func TestHTTP_Logs_Follow(t *testing.T) {
}) })
} }
func BenchmarkHTTP_Logs_Follow(t *testing.B) {
runtime.MemProfileRate = 1
s := makeHTTPServer(t, nil)
defer s.Cleanup()
testutil.WaitForLeader(t, s.Agent.RPC)
// Get a temp alloc dir and create the log dir
ad := tempAllocDir(t)
s.Agent.logger.Printf("ALEX: LOG DIR: %q", ad.SharedDir)
//defer os.RemoveAll(ad.AllocDir)
logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName)
if err := os.MkdirAll(logDir, 0777); err != nil {
t.Fatalf("Failed to make log dir: %v", err)
}
// Create a series of log files in the temp dir
task := "foo"
logType := "stdout"
expected := make([]byte, 1024*1024*100)
initialWrites := 3
writeToFile := func(index int, data []byte) {
logFile := fmt.Sprintf("%s.%s.%d", task, logType, index)
logFilePath := filepath.Join(logDir, logFile)
err := ioutil.WriteFile(logFilePath, data, 777)
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}
}
part := (len(expected) / 3) - 50
goodEnough := (8 * len(expected)) / 10
for i := 0; i < initialWrites; i++ {
writeToFile(i, expected[i*part:(i+1)*part])
}
t.ResetTimer()
for i := 0; i < t.N; i++ {
s.Agent.logger.Printf("BENCHMARK %d", i)
// Create a decoder
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
defer r.Close()
defer w.Close()
dec := codec.NewDecoder(r, jsonHandle)
var received []byte
// Start the reader
fullResultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
if err == io.EOF {
t.Logf("EOF")
return
}
t.Fatalf("failed to decode: %v", err)
}
if frame.IsHeartbeat() {
continue
}
received = append(received, frame.Data...)
if len(received) > goodEnough {
close(fullResultCh)
return
}
}
}()
// Start streaming logs
go func() {
if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
t.Fatalf("logs() failed: %v", err)
}
}()
select {
case <-fullResultCh:
case <-time.After(time.Duration(60 * time.Second)):
t.Fatalf("did not receive data: %d < %d", len(received), goodEnough)
}
s.Agent.logger.Printf("ALEX: CLOSING")
// Close the reader
r.Close()
s.Agent.logger.Printf("ALEX: CLOSED")
s.Agent.logger.Printf("ALEX: WAITING FOR WRITER TO CLOSE")
testutil.WaitForResult(func() (bool, error) {
return wrappedW.Closed, nil
}, func(err error) {
t.Fatalf("connection not closed")
})
s.Agent.logger.Printf("ALEX: WRITER CLOSED")
}
}
func TestLogs_findClosest(t *testing.T) { func TestLogs_findClosest(t *testing.T) {
task := "foo" task := "foo"
entries := []*allocdir.AllocFileInfo{ entries := []*allocdir.AllocFileInfo{

View file

@ -381,7 +381,7 @@ func getIndex(t *testing.T, resp *httptest.ResponseRecorder) uint64 {
return uint64(val) return uint64(val)
} }
func httpTest(t *testing.T, cb func(c *Config), f func(srv *TestServer)) { func httpTest(t testing.TB, cb func(c *Config), f func(srv *TestServer)) {
s := makeHTTPServer(t, cb) s := makeHTTPServer(t, cb)
defer s.Cleanup() defer s.Cleanup()
testutil.WaitForLeader(t, s.Agent.RPC) testutil.WaitForLeader(t, s.Agent.RPC)

View file

@ -54,7 +54,7 @@ func IsTravis() bool {
type rpcFn func(string, interface{}, interface{}) error type rpcFn func(string, interface{}, interface{}) error
func WaitForLeader(t *testing.T, rpc rpcFn) { func WaitForLeader(t testing.TB, rpc rpcFn) {
WaitForResult(func() (bool, error) { WaitForResult(func() (bool, error) {
args := &structs.GenericRequest{} args := &structs.GenericRequest{}
var leader string var leader string

View file

@ -46,7 +46,7 @@ client {
job is allowed to wait to exit. Individual jobs may customize their own kill job is allowed to wait to exit. Individual jobs may customize their own kill
timeout, but it may not exceed this value. timeout, but it may not exceed this value.
- `meta` <code>([Meta][]: nil)</code> - Specifies a key-value map that annotates - `meta` `(map[string]string: nil)` - Specifies a key-value map that annotates
with user-defined metadata. with user-defined metadata.
- `network_interface` `(string: "lo | lo0")` - Specifies the name of the - `network_interface` `(string: "lo | lo0")` - Specifies the name of the
@ -299,5 +299,3 @@ client {
} }
} }
``` ```
[meta]: /docs/job-specification/meta.html "Nomad meta Job Specification"

View file

@ -24,9 +24,9 @@ description: |-
</table> </table>
The `constraint` allows restricting the set of eligible nodes. Constraints may The `constraint` allows restricting the set of eligible nodes. Constraints may
filter on [attributes][interpolation] or [metadata][meta]. Additionally filter on [attributes][interpolation] or [client metadata][client-meta].
constraints may be specified at the [job][job], [group][group], or [task][task] Additionally constraints may be specified at the [job][job], [group][group], or
levels for ultimate flexibility. [task][task] levels for ultimate flexibility.
```hcl ```hcl
job "docs" { job "docs" {
@ -46,7 +46,7 @@ job "docs" {
task "server" { task "server" {
# All tasks must run where "my_custom_value" is greater than 3. # All tasks must run where "my_custom_value" is greater than 3.
constraint { constraint {
attribute = "${meta.my_custom_value}" attribute = "${node.meta.my_custom_value}"
operator = ">" operator = ">"
value = "3" value = "3"
} }
@ -217,6 +217,6 @@ constraint {
[job]: /docs/job-specification/job.html "Nomad job Job Specification" [job]: /docs/job-specification/job.html "Nomad job Job Specification"
[group]: /docs/job-specification/group.html "Nomad group Job Specification" [group]: /docs/job-specification/group.html "Nomad group Job Specification"
[meta]: /docs/job-specification/meta.html "Nomad meta Job Specification" [client-meta]: /docs/agent/configuration/client.html#meta "Nomad meta Job Specification"
[task]: /docs/job-specification/task.html "Nomad task Job Specification" [task]: /docs/job-specification/task.html "Nomad task Job Specification"
[interpolation]: /docs/runtime/interpolation.html "Nomad interpolation" [interpolation]: /docs/runtime/interpolation.html "Nomad interpolation"

View file

@ -219,5 +219,4 @@ $ VAULT_TOKEN="..." nomad run example.nomad
[task]: /docs/job-specification/task.html "Nomad task Job Specification" [task]: /docs/job-specification/task.html "Nomad task Job Specification"
[update]: /docs/job-specification/update.html "Nomad update Job Specification" [update]: /docs/job-specification/update.html "Nomad update Job Specification"
[vault]: /docs/job-specification/vault.html "Nomad vault Job Specification" [vault]: /docs/job-specification/vault.html "Nomad vault Job Specification"
[meta]: /docs/job-specification/meta.html "Nomad meta Job Specification"
[scheduler]: /docs/runtime/schedulers.html "Nomad Scheduler Types" [scheduler]: /docs/runtime/schedulers.html "Nomad Scheduler Types"

View file

@ -92,12 +92,12 @@ driver.
<td><tt>linux-64bit</tt></td> <td><tt>linux-64bit</tt></td>
</tr> </tr>
<tr> <tr>
<td><tt>${attr.&lt;property&gt;}</tt></td> <td><tt>${node.attr.&lt;property&gt;}</tt></td>
<td>Property given by <tt>property</tt> on the client</td> <td>Property given by <tt>property</tt> on the client</td>
<td><tt>${attr.arch} => amd64</tt></td> <td><tt>${attr.arch} => amd64</tt></td>
</tr> </tr>
<tr> <tr>
<td><tt>${meta.&lt;key&gt;}</tt></td> <td><tt>${node.meta.&lt;key&gt;}</tt></td>
<td>Metadata value given by <tt>key</tt> on the client</td> <td>Metadata value given by <tt>key</tt> on the client</td>
<td><tt>${meta.foo} => bar</tt></td> <td><tt>${meta.foo} => bar</tt></td>
</tr> </tr>

View file

@ -76,7 +76,7 @@ cache 0 0 1 0 0 0
Allocations Allocations
ID Eval ID Node ID Task Group Desired Status Created At ID Eval ID Node ID Task Group Desired Status Created At
dadcdb81 61b0b423 72687b1a cache run running 06/23/16 01:41:13 UTC 8ba85cef 26cfc69e 171a583b cache run running 06/23/16 01:41:13 UTC
``` ```
Here we can see that the result of our evaluation was the creation of an Here we can see that the result of our evaluation was the creation of an