test stream framer
This commit is contained in:
parent
ddd67f5f11
commit
14f57024b7
|
@ -1,33 +1,24 @@
|
|||
package framer
|
||||
|
||||
import (
|
||||
"io"
|
||||
"bytes"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
)
|
||||
|
||||
type WriteCloseChecker struct {
|
||||
io.WriteCloser
|
||||
Closed bool
|
||||
}
|
||||
|
||||
func (w *WriteCloseChecker) Close() error {
|
||||
w.Closed = true
|
||||
return w.WriteCloser.Close()
|
||||
}
|
||||
|
||||
/*
|
||||
// This test checks, that even if the frame size has not been hit, a flush will
|
||||
// periodically occur.
|
||||
func TestStreamFramer_Flush(t *testing.T) {
|
||||
// Create the stream framer
|
||||
r, w := io.Pipe()
|
||||
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
||||
frames := make(chan *StreamFrame, 10)
|
||||
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
|
||||
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100)
|
||||
sf := NewStreamFramer(frames, hRate, bWindow, 100)
|
||||
sf.Run()
|
||||
|
||||
// Create a decoder
|
||||
dec := codec.NewDecoder(r, structs.JsonHandle)
|
||||
|
||||
f := "foo"
|
||||
fe := "bar"
|
||||
d := []byte{0xa}
|
||||
|
@ -37,10 +28,7 @@ func TestStreamFramer_Flush(t *testing.T) {
|
|||
resultCh := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
var frame StreamFrame
|
||||
if err := dec.Decode(&frame); err != nil {
|
||||
t.Fatalf("failed to decode")
|
||||
}
|
||||
frame := <-frames
|
||||
|
||||
if frame.IsHeartbeat() {
|
||||
continue
|
||||
|
@ -65,10 +53,8 @@ func TestStreamFramer_Flush(t *testing.T) {
|
|||
t.Fatalf("failed to flush")
|
||||
}
|
||||
|
||||
// Close the reader and wait. This should cause the runner to exit
|
||||
if err := r.Close(); err != nil {
|
||||
t.Fatalf("failed to close reader")
|
||||
}
|
||||
// Shutdown
|
||||
sf.Destroy()
|
||||
|
||||
select {
|
||||
case <-sf.ExitCh():
|
||||
|
@ -76,25 +62,21 @@ func TestStreamFramer_Flush(t *testing.T) {
|
|||
t.Fatalf("exit channel should close")
|
||||
}
|
||||
|
||||
sf.Destroy()
|
||||
if !wrappedW.Closed {
|
||||
t.Fatalf("writer not closed")
|
||||
if _, ok := <-frames; ok {
|
||||
t.Fatal("out channel should be closed")
|
||||
}
|
||||
}
|
||||
|
||||
// This test checks that frames will be batched till the frame size is hit (in
|
||||
// the case that is before the flush).
|
||||
func TestStreamFramer_Batch(t *testing.T) {
|
||||
// Create the stream framer
|
||||
r, w := io.Pipe()
|
||||
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
||||
// Ensure the batch window doesn't get hit
|
||||
hRate, bWindow := 100*time.Millisecond, 500*time.Millisecond
|
||||
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 3)
|
||||
sf.Run()
|
||||
|
||||
// Create a decoder
|
||||
dec := codec.NewDecoder(r, structs.JsonHandle)
|
||||
// Create the stream framer
|
||||
frames := make(chan *StreamFrame, 10)
|
||||
sf := NewStreamFramer(frames, hRate, bWindow, 3)
|
||||
sf.Run()
|
||||
|
||||
f := "foo"
|
||||
fe := "bar"
|
||||
|
@ -105,11 +87,7 @@ func TestStreamFramer_Batch(t *testing.T) {
|
|||
resultCh := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
var frame StreamFrame
|
||||
if err := dec.Decode(&frame); err != nil {
|
||||
t.Fatalf("failed to decode")
|
||||
}
|
||||
|
||||
frame := <-frames
|
||||
if frame.IsHeartbeat() {
|
||||
continue
|
||||
}
|
||||
|
@ -145,10 +123,8 @@ func TestStreamFramer_Batch(t *testing.T) {
|
|||
t.Fatalf("Did not receive data after batch size reached")
|
||||
}
|
||||
|
||||
// Close the reader and wait. This should cause the runner to exit
|
||||
if err := r.Close(); err != nil {
|
||||
t.Fatalf("failed to close reader")
|
||||
}
|
||||
// Shutdown
|
||||
sf.Destroy()
|
||||
|
||||
select {
|
||||
case <-sf.ExitCh():
|
||||
|
@ -156,32 +132,23 @@ func TestStreamFramer_Batch(t *testing.T) {
|
|||
t.Fatalf("exit channel should close")
|
||||
}
|
||||
|
||||
sf.Destroy()
|
||||
if !wrappedW.Closed {
|
||||
t.Fatalf("writer not closed")
|
||||
if _, ok := <-frames; ok {
|
||||
t.Fatal("out channel should be closed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamFramer_Heartbeat(t *testing.T) {
|
||||
// Create the stream framer
|
||||
r, w := io.Pipe()
|
||||
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
||||
frames := make(chan *StreamFrame, 10)
|
||||
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
|
||||
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100)
|
||||
sf := NewStreamFramer(frames, hRate, bWindow, 100)
|
||||
sf.Run()
|
||||
|
||||
// Create a decoder
|
||||
dec := codec.NewDecoder(r, structs.JsonHandle)
|
||||
|
||||
// Start the reader
|
||||
resultCh := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
var frame StreamFrame
|
||||
if err := dec.Decode(&frame); err != nil {
|
||||
t.Fatalf("failed to decode")
|
||||
}
|
||||
|
||||
frame := <-frames
|
||||
if frame.IsHeartbeat() {
|
||||
resultCh <- struct{}{}
|
||||
return
|
||||
|
@ -195,10 +162,8 @@ func TestStreamFramer_Heartbeat(t *testing.T) {
|
|||
t.Fatalf("failed to heartbeat")
|
||||
}
|
||||
|
||||
// Close the reader and wait. This should cause the runner to exit
|
||||
if err := r.Close(); err != nil {
|
||||
t.Fatalf("failed to close reader")
|
||||
}
|
||||
// Shutdown
|
||||
sf.Destroy()
|
||||
|
||||
select {
|
||||
case <-sf.ExitCh():
|
||||
|
@ -206,25 +171,20 @@ func TestStreamFramer_Heartbeat(t *testing.T) {
|
|||
t.Fatalf("exit channel should close")
|
||||
}
|
||||
|
||||
sf.Destroy()
|
||||
if !wrappedW.Closed {
|
||||
t.Fatalf("writer not closed")
|
||||
if _, ok := <-frames; ok {
|
||||
t.Fatal("out channel should be closed")
|
||||
}
|
||||
}
|
||||
|
||||
// This test checks that frames are received in order
|
||||
func TestStreamFramer_Order(t *testing.T) {
|
||||
// Create the stream framer
|
||||
r, w := io.Pipe()
|
||||
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
||||
// Ensure the batch window doesn't get hit
|
||||
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
|
||||
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 10)
|
||||
// Create the stream framer
|
||||
frames := make(chan *StreamFrame, 10)
|
||||
sf := NewStreamFramer(frames, hRate, bWindow, 10)
|
||||
sf.Run()
|
||||
|
||||
// Create a decoder
|
||||
dec := codec.NewDecoder(r, structs.JsonHandle)
|
||||
|
||||
files := []string{"1", "2", "3", "4", "5"}
|
||||
input := bytes.NewBuffer(make([]byte, 0, 100000))
|
||||
for i := 0; i <= 1000; i++ {
|
||||
|
@ -242,11 +202,7 @@ func TestStreamFramer_Order(t *testing.T) {
|
|||
resultCh := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
var frame StreamFrame
|
||||
if err := dec.Decode(&frame); err != nil {
|
||||
t.Fatalf("failed to decode")
|
||||
}
|
||||
|
||||
frame := <-frames
|
||||
if frame.IsHeartbeat() {
|
||||
continue
|
||||
}
|
||||
|
@ -288,10 +244,8 @@ func TestStreamFramer_Order(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Close the reader and wait. This should cause the runner to exit
|
||||
if err := r.Close(); err != nil {
|
||||
t.Fatalf("failed to close reader")
|
||||
}
|
||||
// Shutdown
|
||||
sf.Destroy()
|
||||
|
||||
select {
|
||||
case <-sf.ExitCh():
|
||||
|
@ -299,105 +253,7 @@ func TestStreamFramer_Order(t *testing.T) {
|
|||
t.Fatalf("exit channel should close")
|
||||
}
|
||||
|
||||
sf.Destroy()
|
||||
if !wrappedW.Closed {
|
||||
t.Fatalf("writer not closed")
|
||||
if _, ok := <-frames; ok {
|
||||
t.Fatal("out channel should be closed")
|
||||
}
|
||||
}
|
||||
|
||||
// This test checks that frames are received in order
|
||||
func TestStreamFramer_Order_PlainText(t *testing.T) {
|
||||
// Create the stream framer
|
||||
r, w := io.Pipe()
|
||||
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
||||
// Ensure the batch window doesn't get hit
|
||||
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
|
||||
sf := NewStreamFramer(wrappedW, true, hRate, bWindow, 10)
|
||||
sf.Run()
|
||||
|
||||
files := []string{"1", "2", "3", "4", "5"}
|
||||
input := bytes.NewBuffer(make([]byte, 0, 100000))
|
||||
for i := 0; i <= 1000; i++ {
|
||||
str := strconv.Itoa(i) + ","
|
||||
input.WriteString(str)
|
||||
}
|
||||
|
||||
expected := bytes.NewBuffer(make([]byte, 0, 100000))
|
||||
for range files {
|
||||
expected.Write(input.Bytes())
|
||||
}
|
||||
receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000))
|
||||
|
||||
// Start the reader
|
||||
resultCh := make(chan struct{})
|
||||
go func() {
|
||||
OUTER:
|
||||
for {
|
||||
if _, err := receivedBuf.ReadFrom(r); err != nil {
|
||||
if strings.Contains(err.Error(), "closed pipe") {
|
||||
resultCh <- struct{}{}
|
||||
return
|
||||
}
|
||||
t.Fatalf("bad read: %v", err)
|
||||
}
|
||||
|
||||
if expected.Len() != receivedBuf.Len() {
|
||||
continue
|
||||
}
|
||||
expectedBytes := expected.Bytes()
|
||||
actualBytes := receivedBuf.Bytes()
|
||||
for i, e := range expectedBytes {
|
||||
if a := actualBytes[i]; a != e {
|
||||
continue OUTER
|
||||
}
|
||||
}
|
||||
resultCh <- struct{}{}
|
||||
return
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
// Send the data
|
||||
b := input.Bytes()
|
||||
shards := 10
|
||||
each := len(b) / shards
|
||||
for _, f := range files {
|
||||
for i := 0; i < shards; i++ {
|
||||
l, r := each*i, each*(i+1)
|
||||
if i == shards-1 {
|
||||
r = len(b)
|
||||
}
|
||||
|
||||
if err := sf.Send(f, "", b[l:r], 0); err != nil {
|
||||
t.Fatalf("Send() failed %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure we get data
|
||||
select {
|
||||
case <-resultCh:
|
||||
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow):
|
||||
if expected.Len() != receivedBuf.Len() {
|
||||
t.Fatalf("Got %v; want %v", expected.Len(), receivedBuf.Len())
|
||||
}
|
||||
expectedBytes := expected.Bytes()
|
||||
actualBytes := receivedBuf.Bytes()
|
||||
for i, e := range expectedBytes {
|
||||
if a := actualBytes[i]; a != e {
|
||||
t.Fatalf("Index %d; Got %q; want %q", i, a, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close the reader and wait. This should cause the runner to exit
|
||||
if err := r.Close(); err != nil {
|
||||
t.Fatalf("failed to close reader")
|
||||
}
|
||||
|
||||
sf.Destroy()
|
||||
if !wrappedW.Closed {
|
||||
t.Fatalf("writer not closed")
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
|
Loading…
Reference in a new issue