diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 48adb310e..4fa0f6362 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -289,11 +289,7 @@ func NewStreamFramer(out io.WriteCloser, plainTxt bool, heartbeatRate, batchWind enc := codec.NewEncoder(out, jsonHandle) // Create the heartbeat and flush ticker - var heartbeat *time.Ticker - if !plainTxt { - heartbeat = time.NewTicker(heartbeatRate) - } - + heartbeat := time.NewTicker(heartbeatRate) flusher := time.NewTicker(batchWindow) return &StreamFramer{ @@ -313,9 +309,7 @@ func NewStreamFramer(out io.WriteCloser, plainTxt bool, heartbeatRate, batchWind func (s *StreamFramer) Destroy() { s.l.Lock() close(s.shutdownCh) - if s.heartbeat != nil { - s.heartbeat.Stop() - } + s.heartbeat.Stop() s.flusher.Stop() running := s.running s.l.Unlock() @@ -357,11 +351,6 @@ func (s *StreamFramer) run() { s.l.Unlock() }() - var heartbeat <-chan time.Time - if s.heartbeat != nil { - heartbeat = s.heartbeat.C - } - OUTER: for { select { @@ -383,7 +372,7 @@ OUTER: if err != nil { return } - case <-heartbeat: + case <-s.heartbeat.C: // Send a heartbeat frame if err = s.send(HeartbeatStreamFrame); err != nil { return diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index abb9a974b..0e9b73254 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -14,6 +14,7 @@ import ( "reflect" "runtime" "strconv" + "strings" "testing" "time" @@ -118,7 +119,7 @@ func TestStreamFramer_Flush(t *testing.T) { r, w := io.Pipe() wrappedW := &WriteCloseChecker{WriteCloser: w} hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond - sf := NewStreamFramer(wrappedW, hRate, bWindow, 100) + sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100) sf.Run() // Create a decoder @@ -186,7 +187,7 @@ func TestStreamFramer_Batch(t *testing.T) { wrappedW := &WriteCloseChecker{WriteCloser: w} // Ensure the batch window doesn't get hit hRate, bWindow := 100*time.Millisecond, 500*time.Millisecond - sf := NewStreamFramer(wrappedW, hRate, bWindow, 3) + sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 3) sf.Run() // Create a decoder @@ -263,7 +264,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) { r, w := io.Pipe() wrappedW := &WriteCloseChecker{WriteCloser: w} hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond - sf := NewStreamFramer(wrappedW, hRate, bWindow, 100) + sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100) sf.Run() // Create a decoder @@ -315,7 +316,7 @@ func TestStreamFramer_Order(t *testing.T) { wrappedW := &WriteCloseChecker{WriteCloser: w} // Ensure the batch window doesn't get hit hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond - sf := NewStreamFramer(wrappedW, hRate, bWindow, 10) + sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 10) sf.Run() // Create a decoder @@ -401,6 +402,102 @@ func TestStreamFramer_Order(t *testing.T) { } } +// This test checks that frames are received in order +func TestStreamFramer_Order_PlainText(t *testing.T) { + // Create the stream framer + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + // Ensure the batch window doesn't get hit + hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond + sf := NewStreamFramer(wrappedW, true, hRate, bWindow, 10) + sf.Run() + + files := []string{"1", "2", "3", "4", "5"} + input := bytes.NewBuffer(make([]byte, 0, 100000)) + for i := 0; i <= 1000; i++ { + str := strconv.Itoa(i) + "," + input.WriteString(str) + } + + expected := bytes.NewBuffer(make([]byte, 0, 100000)) + for _, _ = range files { + expected.Write(input.Bytes()) + } + receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000)) + + // Start the reader + resultCh := make(chan struct{}) + go func() { + OUTER: + for { + if _, err := receivedBuf.ReadFrom(r); err != nil { + if strings.Contains(err.Error(), "closed pipe") { + resultCh <- struct{}{} + return + } + t.Fatalf("bad read: %v", err) + } + + if expected.Len() != receivedBuf.Len() { + continue + } + expectedBytes := expected.Bytes() + actualBytes := receivedBuf.Bytes() + for i, e := range expectedBytes { + if a := actualBytes[i]; a != e { + continue OUTER + } + } + resultCh <- struct{}{} + return + + } + }() + + // Send the data + b := input.Bytes() + shards := 10 + each := len(b) / shards + for _, f := range files { + for i := 0; i < shards; i++ { + l, r := each*i, each*(i+1) + if i == shards-1 { + r = len(b) + } + + if err := sf.Send(f, "", b[l:r], 0); err != nil { + t.Fatalf("Send() failed %v", err) + } + } + } + + // Ensure we get data + select { + case <-resultCh: + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow): + if expected.Len() != receivedBuf.Len() { + t.Fatalf("Got %v; want %v", expected.Len(), receivedBuf.Len()) + } + expectedBytes := expected.Bytes() + actualBytes := receivedBuf.Bytes() + for i, e := range expectedBytes { + if a := actualBytes[i]; a != e { + t.Fatalf("Index %d; Got %q; want %q", i, a, e) + } + } + } + + // Close the reader and wait. This should cause the runner to exit + if err := r.Close(); err != nil { + t.Fatalf("failed to close reader") + } + + sf.Destroy() + if !wrappedW.Closed { + t.Fatalf("writer not closed") + } +} + func TestHTTP_Stream_MissingParams(t *testing.T) { httpTest(t, nil, func(s *TestServer) { req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil) @@ -467,7 +564,7 @@ func TestHTTP_Stream_NoFile(t *testing.T) { ad := tempAllocDir(t) defer os.RemoveAll(ad.AllocDir) - framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() @@ -526,7 +623,7 @@ func TestHTTP_Stream_Modify(t *testing.T) { t.Fatalf("write failed: %v", err) } - framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() @@ -607,7 +704,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) { t.Fatalf("write failed: %v", err) } - framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() @@ -710,7 +807,7 @@ func TestHTTP_Stream_Delete(t *testing.T) { t.Fatalf("write failed: %v", err) } - framer := NewStreamFramer(wrappedW, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(wrappedW, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() // Start streaming @@ -804,7 +901,7 @@ func TestHTTP_Logs_NoFollow(t *testing.T) { // Start streaming logs go func() { - if err := s.Server.logs(false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { + if err := s.Server.logs(false, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { t.Fatalf("logs() failed: %v", err) } }() @@ -893,7 +990,7 @@ func TestHTTP_Logs_Follow(t *testing.T) { // Start streaming logs go func() { - if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil { + if err := s.Server.logs(true, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { t.Fatalf("logs() failed: %v", err) } }() @@ -1006,7 +1103,7 @@ func BenchmarkHTTP_Logs_Follow(t *testing.B) { // Start streaming logs go func() { - if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil { + if err := s.Server.logs(true, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { t.Fatalf("logs() failed: %v", err) } }()