438717500d
If one thread calls `Flush()` on a gatedwriter while another thread attempts to `Write()` new data to it, strange things will happen. The test I wrote shows that at the very least you can write _while_ flushing, and the call to `Write()` will happen during the internal writes of the buffered data, which is maybe not what is expected. (i.e. the `Write()`'d data will be inserted somewhere in the middle of the data being `Flush()'d`) It's also the case that, because `Write()` only has a read lock, if you had multiple threads trying to write ("read") at the same time you might have data loss because the `w.buf` that was read would not necessarily be up-to-date by the time `p2` was appended to it and it was re-assigned to `w.buf`. You can see this if you run the new gatedwriter tests with `-race` against the old implementation: ``` WARNING: DATA RACE Read at 0x00c0000c0420 by goroutine 11: runtime.growslice() /usr/lib/go/src/runtime/slice.go:125 +0x0 github.com/hashicorp/nomad/helper/gated-writer.(*Writer).Write() /home/hicks/workspace/nomad/helper/gated-writer/writer.go:41 +0x2b6 github.com/hashicorp/nomad/helper/gated-writer.TestWriter_WithMultipleWriters.func1() /home/hicks/workspace/nomad/helper/gated-writer/writer_test.go:90 +0xea ``` This race condition is fixed in this change.
46 lines
737 B
Go
46 lines
737 B
Go
package gatedwriter
|
|
|
|
import (
|
|
"io"
|
|
"sync"
|
|
)
|
|
|
|
// Writer is an io.Writer implementation that buffers all of its
|
|
// data into an internal buffer until it is told to let data through.
|
|
type Writer struct {
|
|
Writer io.Writer
|
|
|
|
buf [][]byte
|
|
flush bool
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
// Flush tells the Writer to flush any buffered data and to stop
|
|
// buffering.
|
|
func (w *Writer) Flush() {
|
|
w.lock.Lock()
|
|
defer w.lock.Unlock()
|
|
|
|
w.flush = true
|
|
|
|
for _, p := range w.buf {
|
|
_, _ = w.Writer.Write(p)
|
|
}
|
|
|
|
w.buf = nil
|
|
}
|
|
|
|
func (w *Writer) Write(p []byte) (n int, err error) {
|
|
w.lock.Lock()
|
|
defer w.lock.Unlock()
|
|
|
|
if w.flush {
|
|
return w.Writer.Write(p)
|
|
}
|
|
|
|
p2 := make([]byte, len(p))
|
|
copy(p2, p)
|
|
w.buf = append(w.buf, p2)
|
|
return len(p), nil
|
|
}
|