tests
This commit is contained in:
parent
b2df901158
commit
0327109300
|
@ -261,7 +261,6 @@ func (s *StreamFramer) Destroy() {
|
||||||
s.l.Lock()
|
s.l.Lock()
|
||||||
wasRunning := s.running
|
wasRunning := s.running
|
||||||
s.running = false
|
s.running = false
|
||||||
s.f = nil
|
|
||||||
close(s.shutdownCh)
|
close(s.shutdownCh)
|
||||||
s.heartbeat.Stop()
|
s.heartbeat.Stop()
|
||||||
s.flusher.Stop()
|
s.flusher.Stop()
|
||||||
|
@ -271,6 +270,7 @@ func (s *StreamFramer) Destroy() {
|
||||||
if wasRunning {
|
if wasRunning {
|
||||||
<-s.exitCh
|
<-s.exitCh
|
||||||
}
|
}
|
||||||
|
s.out.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts a long lived goroutine that handles sending data as well as
|
// Run starts a long lived goroutine that handles sending data as well as
|
||||||
|
@ -299,7 +299,6 @@ func (s *StreamFramer) run() {
|
||||||
defer func() {
|
defer func() {
|
||||||
s.l.Lock()
|
s.l.Lock()
|
||||||
s.err = err
|
s.err = err
|
||||||
s.out.Close()
|
|
||||||
close(s.exitCh)
|
close(s.exitCh)
|
||||||
close(s.outbound)
|
close(s.outbound)
|
||||||
s.l.Unlock()
|
s.l.Unlock()
|
||||||
|
@ -322,8 +321,11 @@ func (s *StreamFramer) run() {
|
||||||
|
|
||||||
// Read the data for the frame, and send it
|
// Read the data for the frame, and send it
|
||||||
s.f.Data = s.readData()
|
s.f.Data = s.readData()
|
||||||
s.outbound <- s.f
|
select {
|
||||||
s.f = nil
|
case s.outbound <- s.f:
|
||||||
|
s.f = nil
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
s.l.Unlock()
|
s.l.Unlock()
|
||||||
case <-s.heartbeat.C:
|
case <-s.heartbeat.C:
|
||||||
|
@ -339,7 +341,7 @@ OUTER:
|
||||||
case <-s.shutdownCh:
|
case <-s.shutdownCh:
|
||||||
break OUTER
|
break OUTER
|
||||||
case o := <-s.outbound:
|
case o := <-s.outbound:
|
||||||
// Send the frame and then clear the current working frame
|
// Send the frame
|
||||||
if err = s.enc.Encode(o); err != nil {
|
if err = s.enc.Encode(o); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package agent
|
package agent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -8,6 +9,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -302,6 +304,98 @@ func TestStreamFramer_Heartbeat(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This test checks that frames are received in order
|
||||||
|
func TestStreamFramer_Order(t *testing.T) {
|
||||||
|
// Create the stream framer
|
||||||
|
r, w := io.Pipe()
|
||||||
|
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
||||||
|
// Ensure the batch window doesn't get hit
|
||||||
|
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
|
||||||
|
sf := NewStreamFramer(wrappedW, hRate, bWindow, 100)
|
||||||
|
sf.Run()
|
||||||
|
|
||||||
|
// Create a decoder
|
||||||
|
dec := codec.NewDecoder(r, jsonHandle)
|
||||||
|
|
||||||
|
//files := []string{"1", "2", "3", "4", "5"}
|
||||||
|
files := []string{"1"}
|
||||||
|
input := bytes.NewBuffer(make([]byte, 100000))
|
||||||
|
for i := 0; i <= 2000; i++ {
|
||||||
|
str := strconv.Itoa(i) + "\n"
|
||||||
|
input.WriteString(str)
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := bytes.NewBuffer(make([]byte, 100000))
|
||||||
|
for _, _ = range files {
|
||||||
|
expected.Write(input.Bytes())
|
||||||
|
}
|
||||||
|
receivedBuf := bytes.NewBuffer(make([]byte, 100000))
|
||||||
|
|
||||||
|
// Start the reader
|
||||||
|
resultCh := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
var frame StreamFrame
|
||||||
|
if err := dec.Decode(&frame); err != nil {
|
||||||
|
t.Fatalf("failed to decode")
|
||||||
|
}
|
||||||
|
|
||||||
|
if frame.IsHeartbeat() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
receivedBuf.Write(frame.Data)
|
||||||
|
|
||||||
|
if reflect.DeepEqual(expected, receivedBuf) {
|
||||||
|
resultCh <- struct{}{}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Send the data
|
||||||
|
b := input.Bytes()
|
||||||
|
shards := 10
|
||||||
|
each := len(b) / shards
|
||||||
|
for _, f := range files {
|
||||||
|
for i := 0; i < shards; i++ {
|
||||||
|
l, r := each*i, each*(i+1)
|
||||||
|
if i == shards-1 {
|
||||||
|
r = len(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := sf.Send(f, "", b[l:r], 0); err != nil {
|
||||||
|
t.Fatalf("Send() failed %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure we get data
|
||||||
|
select {
|
||||||
|
case <-resultCh:
|
||||||
|
case <-time.After(4 * bWindow):
|
||||||
|
got := receivedBuf.String()
|
||||||
|
want := expected.String()
|
||||||
|
t.Fatalf("Did not receive data in sorted order\nGot:%v\nWant:%v\n", got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the reader and wait. This should cause the runner to exit
|
||||||
|
if err := r.Close(); err != nil {
|
||||||
|
t.Fatalf("failed to close reader")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-sf.ExitCh():
|
||||||
|
case <-time.After(2 * hRate):
|
||||||
|
t.Fatalf("exit channel should close")
|
||||||
|
}
|
||||||
|
|
||||||
|
sf.Destroy()
|
||||||
|
if !wrappedW.Closed {
|
||||||
|
t.Fatalf("writer not closed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestHTTP_Stream_MissingParams(t *testing.T) {
|
func TestHTTP_Stream_MissingParams(t *testing.T) {
|
||||||
httpTest(t, nil, func(s *TestServer) {
|
httpTest(t, nil, func(s *TestServer) {
|
||||||
req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil)
|
req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil)
|
||||||
|
@ -364,7 +458,11 @@ func TestHTTP_Stream_NoFile(t *testing.T) {
|
||||||
ad := tempAllocDir(t)
|
ad := tempAllocDir(t)
|
||||||
defer os.RemoveAll(ad.AllocDir)
|
defer os.RemoveAll(ad.AllocDir)
|
||||||
|
|
||||||
if err := s.Server.stream(0, "foo", ad, nopWriteCloser{ioutil.Discard}); err == nil {
|
framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
||||||
|
framer.Run()
|
||||||
|
defer framer.Destroy()
|
||||||
|
|
||||||
|
if err := s.Server.stream(0, "foo", ad, framer, nil); err == nil {
|
||||||
t.Fatalf("expected an error when streaming unknown file")
|
t.Fatalf("expected an error when streaming unknown file")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -419,9 +517,13 @@ func TestHTTP_Stream_Modify(t *testing.T) {
|
||||||
t.Fatalf("write failed: %v", err)
|
t.Fatalf("write failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
||||||
|
framer.Run()
|
||||||
|
defer framer.Destroy()
|
||||||
|
|
||||||
// Start streaming
|
// Start streaming
|
||||||
go func() {
|
go func() {
|
||||||
if err := s.Server.stream(0, streamFile, ad, w); err != nil {
|
if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil {
|
||||||
t.Fatalf("stream() failed: %v", err)
|
t.Fatalf("stream() failed: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -496,9 +598,13 @@ func TestHTTP_Stream_Truncate(t *testing.T) {
|
||||||
t.Fatalf("write failed: %v", err)
|
t.Fatalf("write failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
||||||
|
framer.Run()
|
||||||
|
defer framer.Destroy()
|
||||||
|
|
||||||
// Start streaming
|
// Start streaming
|
||||||
go func() {
|
go func() {
|
||||||
if err := s.Server.stream(0, streamFile, ad, w); err != nil {
|
if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil {
|
||||||
t.Fatalf("stream() failed: %v", err)
|
t.Fatalf("stream() failed: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -595,9 +701,12 @@ func TestHTTP_Stream_Delete(t *testing.T) {
|
||||||
t.Fatalf("write failed: %v", err)
|
t.Fatalf("write failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
framer := NewStreamFramer(wrappedW, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
||||||
|
framer.Run()
|
||||||
|
|
||||||
// Start streaming
|
// Start streaming
|
||||||
go func() {
|
go func() {
|
||||||
if err := s.Server.stream(0, streamFile, ad, wrappedW); err != nil {
|
if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil {
|
||||||
t.Fatalf("stream() failed: %v", err)
|
t.Fatalf("stream() failed: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -615,6 +724,7 @@ func TestHTTP_Stream_Delete(t *testing.T) {
|
||||||
t.Fatalf("did not receive delete")
|
t.Fatalf("did not receive delete")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
framer.Destroy()
|
||||||
testutil.WaitForResult(func() (bool, error) {
|
testutil.WaitForResult(func() (bool, error) {
|
||||||
return wrappedW.Closed, nil
|
return wrappedW.Closed, nil
|
||||||
}, func(err error) {
|
}, func(err error) {
|
||||||
|
|
Loading…
Reference in New Issue