open-nomad/helper/gated-writer/writer_test.go
Kris Hicks 438717500d
gatedwriter: Fix race condition (#9791)
If one thread calls `Flush()` on a gatedwriter while another thread attempts to
`Write()` new data to it, strange things will happen.

The test I wrote shows that at the very least you can write _while_ flushing,
and the call to `Write()` will happen during the internal writes of the
buffered data, which is maybe not what is expected. (i.e. the `Write()`'d data
will be inserted somewhere in the middle of the data being `Flush()'d`)

It's also the case that, because `Write()` only has a read lock, if you had
multiple threads trying to write ("read") at the same time you might have data
loss because the `w.buf` that was read would not necessarily be up-to-date by
the time `p2` was appended to it and it was re-assigned to `w.buf`. You can see
this if you run the new gatedwriter tests with `-race` against the old implementation:

```
WARNING: DATA RACE
Read at 0x00c0000c0420 by goroutine 11:
  runtime.growslice()
      /usr/lib/go/src/runtime/slice.go:125 +0x0
  github.com/hashicorp/nomad/helper/gated-writer.(*Writer).Write()
      /home/hicks/workspace/nomad/helper/gated-writer/writer.go:41 +0x2b6
  github.com/hashicorp/nomad/helper/gated-writer.TestWriter_WithMultipleWriters.func1()
      /home/hicks/workspace/nomad/helper/gated-writer/writer_test.go:90 +0xea
```

This race condition is fixed in this change.
2021-01-14 12:43:14 -08:00

127 lines
2 KiB
Go

package gatedwriter
import (
"bytes"
"io"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestWriter_impl(t *testing.T) {
var _ io.Writer = new(Writer)
}
type slowTestWriter struct {
buf *bytes.Buffer
called chan struct{}
callCount int
}
func (w *slowTestWriter) Write(p []byte) (int, error) {
if w.callCount == 0 {
defer close(w.called)
}
w.callCount++
time.Sleep(time.Millisecond)
return w.buf.Write(p)
}
func TestWriter_WithSlowWriter(t *testing.T) {
buf := new(bytes.Buffer)
called := make(chan struct{})
w := &slowTestWriter{
buf: buf,
called: called,
}
writer := &Writer{Writer: w}
writer.Write([]byte("foo\n"))
writer.Write([]byte("bar\n"))
writer.Write([]byte("baz\n"))
flushed := make(chan struct{})
go func() {
writer.Flush()
close(flushed)
}()
// wait for the flush to call Write on slowTestWriter
<-called
// write to the now-flushing writer, which is no longer buffering
writer.Write([]byte("quux\n"))
// wait for the flush to finish to assert
<-flushed
require.Equal(t, "foo\nbar\nbaz\nquux\n", buf.String())
}
func TestWriter(t *testing.T) {
buf := new(bytes.Buffer)
w := &Writer{Writer: buf}
w.Write([]byte("foo\n"))
w.Write([]byte("bar\n"))
if buf.String() != "" {
t.Fatalf("bad: %s", buf.String())
}
w.Flush()
if buf.String() != "foo\nbar\n" {
t.Fatalf("bad: %s", buf.String())
}
w.Write([]byte("baz\n"))
if buf.String() != "foo\nbar\nbaz\n" {
t.Fatalf("bad: %s", buf.String())
}
}
func TestWriter_WithMultipleWriters(t *testing.T) {
buf := new(bytes.Buffer)
writer := &Writer{Writer: buf}
strs := []string{
"foo\n",
"bar\n",
"baz\n",
"quux\n",
}
waitCh := make(chan struct{})
wg := &sync.WaitGroup{}
for _, str := range strs {
str := str
wg.Add(1)
go func() {
defer wg.Done()
<-waitCh
writer.Write([]byte(str))
}()
}
// synchronize calls to Write() as closely as possible
close(waitCh)
wg.Wait()
writer.Flush()
require.Equal(t, strings.Count(buf.String(), "\n"), len(strs))
}