test
This commit is contained in:
parent
6590990263
commit
ab330b96aa
|
@ -289,11 +289,7 @@ func NewStreamFramer(out io.WriteCloser, plainTxt bool, heartbeatRate, batchWind
|
||||||
enc := codec.NewEncoder(out, jsonHandle)
|
enc := codec.NewEncoder(out, jsonHandle)
|
||||||
|
|
||||||
// Create the heartbeat and flush ticker
|
// Create the heartbeat and flush ticker
|
||||||
var heartbeat *time.Ticker
|
heartbeat := time.NewTicker(heartbeatRate)
|
||||||
if !plainTxt {
|
|
||||||
heartbeat = time.NewTicker(heartbeatRate)
|
|
||||||
}
|
|
||||||
|
|
||||||
flusher := time.NewTicker(batchWindow)
|
flusher := time.NewTicker(batchWindow)
|
||||||
|
|
||||||
return &StreamFramer{
|
return &StreamFramer{
|
||||||
|
@ -313,9 +309,7 @@ func NewStreamFramer(out io.WriteCloser, plainTxt bool, heartbeatRate, batchWind
|
||||||
func (s *StreamFramer) Destroy() {
|
func (s *StreamFramer) Destroy() {
|
||||||
s.l.Lock()
|
s.l.Lock()
|
||||||
close(s.shutdownCh)
|
close(s.shutdownCh)
|
||||||
if s.heartbeat != nil {
|
s.heartbeat.Stop()
|
||||||
s.heartbeat.Stop()
|
|
||||||
}
|
|
||||||
s.flusher.Stop()
|
s.flusher.Stop()
|
||||||
running := s.running
|
running := s.running
|
||||||
s.l.Unlock()
|
s.l.Unlock()
|
||||||
|
@ -357,11 +351,6 @@ func (s *StreamFramer) run() {
|
||||||
s.l.Unlock()
|
s.l.Unlock()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var heartbeat <-chan time.Time
|
|
||||||
if s.heartbeat != nil {
|
|
||||||
heartbeat = s.heartbeat.C
|
|
||||||
}
|
|
||||||
|
|
||||||
OUTER:
|
OUTER:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -383,7 +372,7 @@ OUTER:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case <-heartbeat:
|
case <-s.heartbeat.C:
|
||||||
// Send a heartbeat frame
|
// Send a heartbeat frame
|
||||||
if err = s.send(HeartbeatStreamFrame); err != nil {
|
if err = s.send(HeartbeatStreamFrame); err != nil {
|
||||||
return
|
return
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -118,7 +119,7 @@ func TestStreamFramer_Flush(t *testing.T) {
|
||||||
r, w := io.Pipe()
|
r, w := io.Pipe()
|
||||||
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
||||||
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
|
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
|
||||||
sf := NewStreamFramer(wrappedW, hRate, bWindow, 100)
|
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100)
|
||||||
sf.Run()
|
sf.Run()
|
||||||
|
|
||||||
// Create a decoder
|
// Create a decoder
|
||||||
|
@ -186,7 +187,7 @@ func TestStreamFramer_Batch(t *testing.T) {
|
||||||
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
||||||
// Ensure the batch window doesn't get hit
|
// Ensure the batch window doesn't get hit
|
||||||
hRate, bWindow := 100*time.Millisecond, 500*time.Millisecond
|
hRate, bWindow := 100*time.Millisecond, 500*time.Millisecond
|
||||||
sf := NewStreamFramer(wrappedW, hRate, bWindow, 3)
|
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 3)
|
||||||
sf.Run()
|
sf.Run()
|
||||||
|
|
||||||
// Create a decoder
|
// Create a decoder
|
||||||
|
@ -263,7 +264,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) {
|
||||||
r, w := io.Pipe()
|
r, w := io.Pipe()
|
||||||
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
||||||
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
|
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
|
||||||
sf := NewStreamFramer(wrappedW, hRate, bWindow, 100)
|
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100)
|
||||||
sf.Run()
|
sf.Run()
|
||||||
|
|
||||||
// Create a decoder
|
// Create a decoder
|
||||||
|
@ -315,7 +316,7 @@ func TestStreamFramer_Order(t *testing.T) {
|
||||||
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
||||||
// Ensure the batch window doesn't get hit
|
// Ensure the batch window doesn't get hit
|
||||||
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
|
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
|
||||||
sf := NewStreamFramer(wrappedW, hRate, bWindow, 10)
|
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 10)
|
||||||
sf.Run()
|
sf.Run()
|
||||||
|
|
||||||
// Create a decoder
|
// Create a decoder
|
||||||
|
@ -401,6 +402,102 @@ func TestStreamFramer_Order(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This test checks that frames are received in order
|
||||||
|
func TestStreamFramer_Order_PlainText(t *testing.T) {
|
||||||
|
// Create the stream framer
|
||||||
|
r, w := io.Pipe()
|
||||||
|
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
||||||
|
// Ensure the batch window doesn't get hit
|
||||||
|
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
|
||||||
|
sf := NewStreamFramer(wrappedW, true, hRate, bWindow, 10)
|
||||||
|
sf.Run()
|
||||||
|
|
||||||
|
files := []string{"1", "2", "3", "4", "5"}
|
||||||
|
input := bytes.NewBuffer(make([]byte, 0, 100000))
|
||||||
|
for i := 0; i <= 1000; i++ {
|
||||||
|
str := strconv.Itoa(i) + ","
|
||||||
|
input.WriteString(str)
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := bytes.NewBuffer(make([]byte, 0, 100000))
|
||||||
|
for _, _ = range files {
|
||||||
|
expected.Write(input.Bytes())
|
||||||
|
}
|
||||||
|
receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000))
|
||||||
|
|
||||||
|
// Start the reader
|
||||||
|
resultCh := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
OUTER:
|
||||||
|
for {
|
||||||
|
if _, err := receivedBuf.ReadFrom(r); err != nil {
|
||||||
|
if strings.Contains(err.Error(), "closed pipe") {
|
||||||
|
resultCh <- struct{}{}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.Fatalf("bad read: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if expected.Len() != receivedBuf.Len() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
expectedBytes := expected.Bytes()
|
||||||
|
actualBytes := receivedBuf.Bytes()
|
||||||
|
for i, e := range expectedBytes {
|
||||||
|
if a := actualBytes[i]; a != e {
|
||||||
|
continue OUTER
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resultCh <- struct{}{}
|
||||||
|
return
|
||||||
|
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Send the data
|
||||||
|
b := input.Bytes()
|
||||||
|
shards := 10
|
||||||
|
each := len(b) / shards
|
||||||
|
for _, f := range files {
|
||||||
|
for i := 0; i < shards; i++ {
|
||||||
|
l, r := each*i, each*(i+1)
|
||||||
|
if i == shards-1 {
|
||||||
|
r = len(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := sf.Send(f, "", b[l:r], 0); err != nil {
|
||||||
|
t.Fatalf("Send() failed %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure we get data
|
||||||
|
select {
|
||||||
|
case <-resultCh:
|
||||||
|
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow):
|
||||||
|
if expected.Len() != receivedBuf.Len() {
|
||||||
|
t.Fatalf("Got %v; want %v", expected.Len(), receivedBuf.Len())
|
||||||
|
}
|
||||||
|
expectedBytes := expected.Bytes()
|
||||||
|
actualBytes := receivedBuf.Bytes()
|
||||||
|
for i, e := range expectedBytes {
|
||||||
|
if a := actualBytes[i]; a != e {
|
||||||
|
t.Fatalf("Index %d; Got %q; want %q", i, a, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the reader and wait. This should cause the runner to exit
|
||||||
|
if err := r.Close(); err != nil {
|
||||||
|
t.Fatalf("failed to close reader")
|
||||||
|
}
|
||||||
|
|
||||||
|
sf.Destroy()
|
||||||
|
if !wrappedW.Closed {
|
||||||
|
t.Fatalf("writer not closed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestHTTP_Stream_MissingParams(t *testing.T) {
|
func TestHTTP_Stream_MissingParams(t *testing.T) {
|
||||||
httpTest(t, nil, func(s *TestServer) {
|
httpTest(t, nil, func(s *TestServer) {
|
||||||
req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil)
|
req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil)
|
||||||
|
@ -467,7 +564,7 @@ func TestHTTP_Stream_NoFile(t *testing.T) {
|
||||||
ad := tempAllocDir(t)
|
ad := tempAllocDir(t)
|
||||||
defer os.RemoveAll(ad.AllocDir)
|
defer os.RemoveAll(ad.AllocDir)
|
||||||
|
|
||||||
framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
||||||
framer.Run()
|
framer.Run()
|
||||||
defer framer.Destroy()
|
defer framer.Destroy()
|
||||||
|
|
||||||
|
@ -526,7 +623,7 @@ func TestHTTP_Stream_Modify(t *testing.T) {
|
||||||
t.Fatalf("write failed: %v", err)
|
t.Fatalf("write failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
framer := NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
||||||
framer.Run()
|
framer.Run()
|
||||||
defer framer.Destroy()
|
defer framer.Destroy()
|
||||||
|
|
||||||
|
@ -607,7 +704,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) {
|
||||||
t.Fatalf("write failed: %v", err)
|
t.Fatalf("write failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
framer := NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
||||||
framer.Run()
|
framer.Run()
|
||||||
defer framer.Destroy()
|
defer framer.Destroy()
|
||||||
|
|
||||||
|
@ -710,7 +807,7 @@ func TestHTTP_Stream_Delete(t *testing.T) {
|
||||||
t.Fatalf("write failed: %v", err)
|
t.Fatalf("write failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
framer := NewStreamFramer(wrappedW, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
framer := NewStreamFramer(wrappedW, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
||||||
framer.Run()
|
framer.Run()
|
||||||
|
|
||||||
// Start streaming
|
// Start streaming
|
||||||
|
@ -804,7 +901,7 @@ func TestHTTP_Logs_NoFollow(t *testing.T) {
|
||||||
|
|
||||||
// Start streaming logs
|
// Start streaming logs
|
||||||
go func() {
|
go func() {
|
||||||
if err := s.Server.logs(false, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
|
if err := s.Server.logs(false, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
|
||||||
t.Fatalf("logs() failed: %v", err)
|
t.Fatalf("logs() failed: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -893,7 +990,7 @@ func TestHTTP_Logs_Follow(t *testing.T) {
|
||||||
|
|
||||||
// Start streaming logs
|
// Start streaming logs
|
||||||
go func() {
|
go func() {
|
||||||
if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
|
if err := s.Server.logs(true, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
|
||||||
t.Fatalf("logs() failed: %v", err)
|
t.Fatalf("logs() failed: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -1006,7 +1103,7 @@ func BenchmarkHTTP_Logs_Follow(t *testing.B) {
|
||||||
|
|
||||||
// Start streaming logs
|
// Start streaming logs
|
||||||
go func() {
|
go func() {
|
||||||
if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
|
if err := s.Server.logs(true, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
|
||||||
t.Fatalf("logs() failed: %v", err)
|
t.Fatalf("logs() failed: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
Loading…
Reference in a new issue