StreamFramer tests

This commit is contained in:
Alex Dadgar 2016-07-10 13:55:52 -04:00
parent 1fe435a130
commit 281c1fbf39
5 changed files with 437 additions and 13 deletions

View file

@ -27,8 +27,8 @@ var (
) )
const ( const (
// frameSize is the maximum number of bytes to send in a single frame // streamFrameSize is the maximum number of bytes to send in a single frame
frameSize = 64 * 1024 streamFrameSize = 64 * 1024
// streamHeartbeatRate is the rate at which a heartbeat will occur to detect // streamHeartbeatRate is the rate at which a heartbeat will occur to detect
// a closed connection without sending any additional data // a closed connection without sending any additional data
@ -190,10 +190,16 @@ type StreamFrame struct {
FileEvent string FileEvent string
} }
// IsHeartbeat returns if the frame is a heartbeat frame
func (s *StreamFrame) IsHeartbeat() bool {
return s.Offset == 0 && s.Data == "" && s.File == "" && s.FileEvent == ""
}
// StreamFramer is used to buffer and send frames as well as heartbeat. // StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct { type StreamFramer struct {
out io.WriteCloser out io.WriteCloser
enc *codec.Encoder enc *codec.Encoder
frameSize int
heartbeat *time.Ticker heartbeat *time.Ticker
flusher *time.Ticker flusher *time.Ticker
shutdown chan struct{} shutdown chan struct{}
@ -216,17 +222,18 @@ type StreamFramer struct {
// NewStreamFramer creates a new stream framer that will output StreamFrames to // NewStreamFramer creates a new stream framer that will output StreamFrames to
// the passed output. // the passed output.
func NewStreamFramer(out io.WriteCloser) *StreamFramer { func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer {
// Create a JSON encoder // Create a JSON encoder
enc := codec.NewEncoder(out, jsonHandle) enc := codec.NewEncoder(out, jsonHandle)
// Create the heartbeat and flush ticker // Create the heartbeat and flush ticker
heartbeat := time.NewTicker(streamHeartbeatRate) heartbeat := time.NewTicker(heartbeatRate)
flusher := time.NewTicker(streamBatchWindow) flusher := time.NewTicker(batchWindow)
return &StreamFramer{ return &StreamFramer{
out: out, out: out,
enc: enc, enc: enc,
frameSize: frameSize,
heartbeat: heartbeat, heartbeat: heartbeat,
flusher: flusher, flusher: flusher,
outbound: make(chan *StreamFrame), outbound: make(chan *StreamFrame),
@ -328,8 +335,8 @@ func (s *StreamFramer) run() {
func (s *StreamFramer) readData() string { func (s *StreamFramer) readData() string {
// Compute the amount to read from the buffer // Compute the amount to read from the buffer
size := s.data.Len() size := s.data.Len()
if size > frameSize { if size > s.frameSize {
size = frameSize size = s.frameSize
} }
return base64.StdEncoding.EncodeToString(s.data.Next(size)) return base64.StdEncoding.EncodeToString(s.data.Next(size))
} }
@ -375,7 +382,7 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
s.data.Write(data) s.data.Write(data)
// Flush till we are under the max frame size // Flush till we are under the max frame size
for s.data.Len() >= frameSize { for s.data.Len() >= s.frameSize {
// Create a new frame to send it // Create a new frame to send it
s.outbound <- &StreamFrame{ s.outbound <- &StreamFrame{
Offset: s.f.Offset, Offset: s.f.Offset,
@ -471,7 +478,7 @@ func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, o
}() }()
// Create the framer // Create the framer
framer := NewStreamFramer(output) framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run() framer.Run()
defer framer.Destroy() defer framer.Destroy()
@ -483,7 +490,7 @@ func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, o
var changes *watch.FileChanges var changes *watch.FileChanges
// Start streaming the data // Start streaming the data
data := make([]byte, frameSize) data := make([]byte, streamFrameSize)
OUTER: OUTER:
for { for {
// Read up to the max frame size // Read up to the max frame size

View file

@ -1,9 +1,14 @@
package agent package agent
import ( import (
"encoding/base64"
"io"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"time"
"github.com/ugorji/go/codec"
) )
func TestAllocDirFS_List_MissingParams(t *testing.T) { func TestAllocDirFS_List_MissingParams(t *testing.T) {
@ -85,7 +90,210 @@ func TestAllocDirFS_ReadAt_MissingParams(t *testing.T) {
}) })
} }
func TestHTTP_FsStream_EOF_Modify(t *testing.T) { type WriteCloseChecker struct {
httpTest(t, nil, func(s *TestServer) { io.WriteCloser
}) Closed bool
}
func (w *WriteCloseChecker) Close() error {
w.Closed = true
return w.WriteCloser.Close()
}
// This test checks, that even if the frame size has not been hit, a flush will
// periodically occur.
func TestStreamFramer_Flush(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
sf := NewStreamFramer(wrappedW, hRate, bWindow, 100)
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, jsonHandle)
f := "foo"
fe := "bar"
d := []byte{0xa}
expected := base64.StdEncoding.EncodeToString(d)
o := int64(10)
// Start the reader
resultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
if frame.IsHeartbeat() {
continue
}
if frame.Data == expected && frame.Offset == o && frame.File == f && frame.FileEvent == fe {
resultCh <- struct{}{}
return
}
}
}()
// Write only 1 byte so we do not hit the frame size
if err := sf.Send(f, fe, d, o); err != nil {
t.Fatalf("Send() failed %v", err)
}
select {
case <-resultCh:
case <-time.After(2 * bWindow):
t.Fatalf("failed to flush")
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
select {
case <-sf.ExitCh():
case <-time.After(2 * hRate):
t.Fatalf("exit channel should close")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}
// This test checks that frames will be batched till the frame size is hit (in
// the case that is before the flush).
func TestStreamFramer_Batch(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 500*time.Millisecond
sf := NewStreamFramer(wrappedW, hRate, bWindow, 3)
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, jsonHandle)
f := "foo"
fe := "bar"
d := []byte{0xa, 0xb, 0xc}
expected := base64.StdEncoding.EncodeToString(d)
o := int64(10)
// Start the reader
resultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
if frame.IsHeartbeat() {
continue
}
if frame.Data == expected && frame.Offset == o && frame.File == f && frame.FileEvent == fe {
resultCh <- struct{}{}
return
}
}
}()
// Write only 1 byte so we do not hit the frame size
if err := sf.Send(f, fe, d[:1], o); err != nil {
t.Fatalf("Send() failed %v", err)
}
// Ensure we didn't get any data
select {
case <-resultCh:
t.Fatalf("Got data before frame size reached")
case <-time.After(bWindow / 2):
}
// Write the rest so we hit the frame size
if err := sf.Send(f, fe, d[1:], o); err != nil {
t.Fatalf("Send() failed %v", err)
}
// Ensure we get data
select {
case <-resultCh:
case <-time.After(2 * bWindow):
t.Fatalf("Did not receive data after batch size reached")
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
select {
case <-sf.ExitCh():
case <-time.After(2 * hRate):
t.Fatalf("exit channel should close")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}
func TestStreamFramer_Heartbeat(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
sf := NewStreamFramer(wrappedW, hRate, bWindow, 100)
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, jsonHandle)
// Start the reader
resultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
if frame.IsHeartbeat() {
resultCh <- struct{}{}
return
}
}
}()
select {
case <-resultCh:
case <-time.After(2 * hRate):
t.Fatalf("failed to heartbeat")
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
select {
case <-sf.ExitCh():
case <-time.After(2 * hRate):
t.Fatalf("exit channel should close")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
} }

29
vendor/gopkg.in/tomb.v1/LICENSE generated vendored Normal file
View file

@ -0,0 +1,29 @@
tomb - support for clean goroutine termination in Go.
Copyright (c) 2010-2011 - Gustavo Niemeyer <gustavo@niemeyer.net>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

4
vendor/gopkg.in/tomb.v1/README.md generated vendored Normal file
View file

@ -0,0 +1,4 @@
Installation and usage
----------------------
See [gopkg.in/tomb.v1](https://gopkg.in/tomb.v1) for documentation and usage details.

176
vendor/gopkg.in/tomb.v1/tomb.go generated vendored Normal file
View file

@ -0,0 +1,176 @@
// Copyright (c) 2011 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
// * Neither the name of the copyright holder nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// The tomb package offers a conventional API for clean goroutine termination.
//
// A Tomb tracks the lifecycle of a goroutine as alive, dying or dead,
// and the reason for its death.
//
// The zero value of a Tomb assumes that a goroutine is about to be
// created or already alive. Once Kill or Killf is called with an
// argument that informs the reason for death, the goroutine is in
// a dying state and is expected to terminate soon. Right before the
// goroutine function or method returns, Done must be called to inform
// that the goroutine is indeed dead and about to stop running.
//
// A Tomb exposes Dying and Dead channels. These channels are closed
// when the Tomb state changes in the respective way. They enable
// explicit blocking until the state changes, and also to selectively
// unblock select statements accordingly.
//
// When the tomb state changes to dying and there's still logic going
// on within the goroutine, nested functions and methods may choose to
// return ErrDying as their error value, as this error won't alter the
// tomb state if provided to the Kill method. This is a convenient way to
// follow standard Go practices in the context of a dying tomb.
//
// For background and a detailed example, see the following blog post:
//
// http://blog.labix.org/2011/10/09/death-of-goroutines-under-control
//
// For a more complex code snippet demonstrating the use of multiple
// goroutines with a single Tomb, see:
//
// http://play.golang.org/p/Xh7qWsDPZP
//
package tomb
import (
"errors"
"fmt"
"sync"
)
// A Tomb tracks the lifecycle of a goroutine as alive, dying or dead,
// and the reason for its death.
//
// See the package documentation for details.
type Tomb struct {
m sync.Mutex
dying chan struct{}
dead chan struct{}
reason error
}
var (
ErrStillAlive = errors.New("tomb: still alive")
ErrDying = errors.New("tomb: dying")
)
func (t *Tomb) init() {
t.m.Lock()
if t.dead == nil {
t.dead = make(chan struct{})
t.dying = make(chan struct{})
t.reason = ErrStillAlive
}
t.m.Unlock()
}
// Dead returns the channel that can be used to wait
// until t.Done has been called.
func (t *Tomb) Dead() <-chan struct{} {
t.init()
return t.dead
}
// Dying returns the channel that can be used to wait
// until t.Kill or t.Done has been called.
func (t *Tomb) Dying() <-chan struct{} {
t.init()
return t.dying
}
// Wait blocks until the goroutine is in a dead state and returns the
// reason for its death.
func (t *Tomb) Wait() error {
t.init()
<-t.dead
t.m.Lock()
reason := t.reason
t.m.Unlock()
return reason
}
// Done flags the goroutine as dead, and should be called a single time
// right before the goroutine function or method returns.
// If the goroutine was not already in a dying state before Done is
// called, it will be flagged as dying and dead at once with no
// error.
func (t *Tomb) Done() {
t.Kill(nil)
close(t.dead)
}
// Kill flags the goroutine as dying for the given reason.
// Kill may be called multiple times, but only the first
// non-nil error is recorded as the reason for termination.
//
// If reason is ErrDying, the previous reason isn't replaced
// even if it is nil. It's a runtime error to call Kill with
// ErrDying if t is not in a dying state.
func (t *Tomb) Kill(reason error) {
t.init()
t.m.Lock()
defer t.m.Unlock()
if reason == ErrDying {
if t.reason == ErrStillAlive {
panic("tomb: Kill with ErrDying while still alive")
}
return
}
if t.reason == nil || t.reason == ErrStillAlive {
t.reason = reason
}
// If the receive on t.dying succeeds, then
// it can only be because we have already closed it.
// If it blocks, then we know that it needs to be closed.
select {
case <-t.dying:
default:
close(t.dying)
}
}
// Killf works like Kill, but builds the reason providing the received
// arguments to fmt.Errorf. The generated error is also returned.
func (t *Tomb) Killf(f string, a ...interface{}) error {
err := fmt.Errorf(f, a...)
t.Kill(err)
return err
}
// Err returns the reason for the goroutine death provided via Kill
// or Killf, or ErrStillAlive when the goroutine is still alive.
func (t *Tomb) Err() (reason error) {
t.init()
t.m.Lock()
reason = t.reason
t.m.Unlock()
return
}