From 03271093005a4ec1a4c9b4842f39d516dd962ee9 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 18 Jul 2016 18:41:21 -0700 Subject: [PATCH] tests --- command/agent/fs_endpoint.go | 12 +-- command/agent/fs_endpoint_test.go | 118 +++++++++++++++++++++++++++++- 2 files changed, 121 insertions(+), 9 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 35f15be16..2a338da48 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -261,7 +261,6 @@ func (s *StreamFramer) Destroy() { s.l.Lock() wasRunning := s.running s.running = false - s.f = nil close(s.shutdownCh) s.heartbeat.Stop() s.flusher.Stop() @@ -271,6 +270,7 @@ func (s *StreamFramer) Destroy() { if wasRunning { <-s.exitCh } + s.out.Close() } // Run starts a long lived goroutine that handles sending data as well as @@ -299,7 +299,6 @@ func (s *StreamFramer) run() { defer func() { s.l.Lock() s.err = err - s.out.Close() close(s.exitCh) close(s.outbound) s.l.Unlock() @@ -322,8 +321,11 @@ func (s *StreamFramer) run() { // Read the data for the frame, and send it s.f.Data = s.readData() - s.outbound <- s.f - s.f = nil + select { + case s.outbound <- s.f: + s.f = nil + default: + } s.l.Unlock() case <-s.heartbeat.C: @@ -339,7 +341,7 @@ OUTER: case <-s.shutdownCh: break OUTER case o := <-s.outbound: - // Send the frame and then clear the current working frame + // Send the frame if err = s.enc.Encode(o); err != nil { return } diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index aaf1f8b4d..a6e839829 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -1,6 +1,7 @@ package agent import ( + "bytes" "io" "io/ioutil" "net/http" @@ -8,6 +9,7 @@ import ( "os" "path/filepath" "reflect" + "strconv" "testing" "time" @@ -302,6 +304,98 @@ func TestStreamFramer_Heartbeat(t *testing.T) { } } +// This test checks that frames are received in order +func TestStreamFramer_Order(t *testing.T) { + // Create the stream framer + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + // Ensure the batch window doesn't get hit + hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond + sf := NewStreamFramer(wrappedW, hRate, bWindow, 100) + sf.Run() + + // Create a decoder + dec := codec.NewDecoder(r, jsonHandle) + + //files := []string{"1", "2", "3", "4", "5"} + files := []string{"1"} + input := bytes.NewBuffer(make([]byte, 100000)) + for i := 0; i <= 2000; i++ { + str := strconv.Itoa(i) + "\n" + input.WriteString(str) + } + + expected := bytes.NewBuffer(make([]byte, 100000)) + for _, _ = range files { + expected.Write(input.Bytes()) + } + receivedBuf := bytes.NewBuffer(make([]byte, 100000)) + + // Start the reader + resultCh := make(chan struct{}) + go func() { + for { + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + t.Fatalf("failed to decode") + } + + if frame.IsHeartbeat() { + continue + } + + receivedBuf.Write(frame.Data) + + if reflect.DeepEqual(expected, receivedBuf) { + resultCh <- struct{}{} + return + } + } + }() + + // Send the data + b := input.Bytes() + shards := 10 + each := len(b) / shards + for _, f := range files { + for i := 0; i < shards; i++ { + l, r := each*i, each*(i+1) + if i == shards-1 { + r = len(b) + } + + if err := sf.Send(f, "", b[l:r], 0); err != nil { + t.Fatalf("Send() failed %v", err) + } + } + } + + // Ensure we get data + select { + case <-resultCh: + case <-time.After(4 * bWindow): + got := receivedBuf.String() + want := expected.String() + t.Fatalf("Did not receive data in sorted order\nGot:%v\nWant:%v\n", got, want) + } + + // Close the reader and wait. This should cause the runner to exit + if err := r.Close(); err != nil { + t.Fatalf("failed to close reader") + } + + select { + case <-sf.ExitCh(): + case <-time.After(2 * hRate): + t.Fatalf("exit channel should close") + } + + sf.Destroy() + if !wrappedW.Closed { + t.Fatalf("writer not closed") + } +} + func TestHTTP_Stream_MissingParams(t *testing.T) { httpTest(t, nil, func(s *TestServer) { req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil) @@ -364,7 +458,11 @@ func TestHTTP_Stream_NoFile(t *testing.T) { ad := tempAllocDir(t) defer os.RemoveAll(ad.AllocDir) - if err := s.Server.stream(0, "foo", ad, nopWriteCloser{ioutil.Discard}); err == nil { + framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + + if err := s.Server.stream(0, "foo", ad, framer, nil); err == nil { t.Fatalf("expected an error when streaming unknown file") } }) @@ -419,9 +517,13 @@ func TestHTTP_Stream_Modify(t *testing.T) { t.Fatalf("write failed: %v", err) } + framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + // Start streaming go func() { - if err := s.Server.stream(0, streamFile, ad, w); err != nil { + if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -496,9 +598,13 @@ func TestHTTP_Stream_Truncate(t *testing.T) { t.Fatalf("write failed: %v", err) } + framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + // Start streaming go func() { - if err := s.Server.stream(0, streamFile, ad, w); err != nil { + if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -595,9 +701,12 @@ func TestHTTP_Stream_Delete(t *testing.T) { t.Fatalf("write failed: %v", err) } + framer := NewStreamFramer(wrappedW, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + // Start streaming go func() { - if err := s.Server.stream(0, streamFile, ad, wrappedW); err != nil { + if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -615,6 +724,7 @@ func TestHTTP_Stream_Delete(t *testing.T) { t.Fatalf("did not receive delete") } + framer.Destroy() testutil.WaitForResult(func() (bool, error) { return wrappedW.Closed, nil }, func(err error) {