fifo: Use plain fifo file in Unix

This PR switches to using plain fifo files instead of golang structs
managed by containerd/fifo library.

The library main benefit is management of opening fifo files.  In Linux,
a reader `open()` request would block until a writer opens the file (and
vice-versa).  The library uses goroutines so that it's the first IO
operation that blocks.

This benefit isn't really useful for us: Given that logmon simply
streams output in a separate process, blocking of opening or first read
is effectively the same.

The library additionally makes further complications for managing state
and tracking read/write permission that seems overhead for our use,
compared to using a file directly.

Looking here, I made the following incidental changes:
* document that we do handle if fifo files are already created, as we
rely on that behavior for logmon restarts
* use type system to lock read vs write: currently, fifo library returns
`io.ReadWriteCloser` even if fifo is opened for writing only!
This commit is contained in:
Mahmood Ali 2019-03-31 12:50:06 -04:00
parent b1666e6c8a
commit 967452a3f0
4 changed files with 76 additions and 30 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/require"
)
// TestFIFO tests basic behavior, and that reader closes when writer closes
func TestFIFO(t *testing.T) {
require := require.New(t)
var path string
@ -29,9 +30,11 @@ func TestFIFO(t *testing.T) {
path = filepath.Join(dir, "fifo")
}
reader, err := New(path)
readerOpenFn, err := New(path)
require.NoError(err)
var reader io.ReadCloser
toWrite := [][]byte{
[]byte("abc\n"),
[]byte(""),
@ -45,7 +48,12 @@ func TestFIFO(t *testing.T) {
wait.Add(1)
go func() {
defer wait.Done()
io.Copy(&readBuf, reader)
reader, err = readerOpenFn()
require.NoError(err)
_, err = io.Copy(&readBuf, reader)
require.NoError(err)
}()
writer, err := Open(path)
@ -57,9 +65,9 @@ func TestFIFO(t *testing.T) {
}
require.NoError(writer.Close())
time.Sleep(500 * time.Millisecond)
require.NoError(reader.Close())
wait.Wait()
require.NoError(reader.Close())
expected := "abc\ndef\nnomad\n"
require.Equal(expected, readBuf.String())
@ -67,6 +75,7 @@ func TestFIFO(t *testing.T) {
require.NoError(Remove(path))
}
// TestWriteClose asserts that when writer closes, subsequent Write() fails
func TestWriteClose(t *testing.T) {
require := require.New(t)
var path string
@ -81,15 +90,21 @@ func TestWriteClose(t *testing.T) {
path = filepath.Join(dir, "fifo")
}
reader, err := New(path)
readerOpenFn, err := New(path)
require.NoError(err)
var reader io.ReadCloser
var readBuf bytes.Buffer
var wait sync.WaitGroup
wait.Add(1)
go func() {
defer wait.Done()
io.Copy(&readBuf, reader)
reader, err = readerOpenFn()
require.NoError(err)
_, err = io.Copy(&readBuf, reader)
require.NoError(err)
}()
writer, err := Open(path)

View File

@ -3,23 +3,34 @@
package fifo
import (
"context"
"fmt"
"io"
"os"
"syscall"
cfifo "github.com/containerd/fifo"
"golang.org/x/sys/unix"
)
// New creates a fifo at the given path and returns an io.ReadWriteCloser for it
// The fifo must not already exist
func New(path string) (io.ReadWriteCloser, error) {
return cfifo.OpenFifo(context.Background(), path, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
// New creates a fifo at the given path, and returns an open function for reading.
// The fifo must not exist already, or that it's already a fifo file
//
// It returns a reader open function that may block until a writer opens
// so it's advised to run it in a goroutine different from reader goroutine
func New(path string) (func() (io.ReadCloser, error), error) {
// create first
if err := mkfifo(path, 0600); err != nil && !os.IsExist(err) {
return nil, fmt.Errorf("error creating fifo %v: %v", path, err)
}
openFn := func() (io.ReadCloser, error) {
return os.OpenFile(path, unix.O_RDONLY, os.ModeNamedPipe)
}
return openFn, nil
}
// Open opens a fifo that already exists and returns an io.ReadWriteCloser for it
func Open(path string) (io.ReadWriteCloser, error) {
return cfifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0600)
// Open opens a fifo file for reading, assuming it already exists, returns io.WriteCloser
func Open(path string) (io.WriteCloser, error) {
return os.OpenFile(path, unix.O_WRONLY, os.ModeNamedPipe)
}
// Remove a fifo that already exists at a given path
@ -34,3 +45,7 @@ func IsClosedErr(err error) bool {
}
return false
}
func mkfifo(path string, mode uint32) (err error) {
return unix.Mkfifo(path, mode)
}

View File

@ -67,9 +67,9 @@ func (f *winFIFO) Close() error {
return f.listener.Close()
}
// New creates a fifo at the given path and returns an io.ReadWriteCloser for it. The fifo
// must not already exist
func New(path string) (io.ReadWriteCloser, error) {
// New creates a fifo at the given path and returns an io.ReadCloser open for it.
// The fifo must not already exist
func New(path string) (func() (io.ReadCloser, error), error) {
l, err := winio.ListenPipe(path, &winio.PipeConfig{
InputBufferSize: PipeBufferSize,
OutputBufferSize: PipeBufferSize,
@ -78,13 +78,17 @@ func New(path string) (io.ReadWriteCloser, error) {
return nil, err
}
return &winFIFO{
listener: l,
}, nil
openFn := func() (io.ReadCloser, error) {
return &winFIFO{
listener: l,
}, nil
}
return openFn, nil
}
// OpenWriter opens a fifo that already exists and returns an io.ReadWriteCloser for it
func Open(path string) (io.ReadWriteCloser, error) {
// OpenWriter opens a fifo that already exists and returns an io.WriteCloser for it
func Open(path string) (io.WriteCloser, error) {
return winio.DialPipe(path, nil)
}

View File

@ -177,10 +177,12 @@ func NewTaskLogger(cfg *LogConfig, logger hclog.Logger) (*TaskLogger, error) {
// data will be copied from the reader to the rotator.
type logRotatorWrapper struct {
fifoPath string
processOutReader io.ReadCloser
rotatorWriter *logging.FileRotator
hasFinishedCopied chan struct{}
logger hclog.Logger
processOutReader io.ReadCloser
opened chan struct{}
}
// isRunning will return true until the reader is closed
@ -197,29 +199,38 @@ func (l *logRotatorWrapper) isRunning() bool {
// processOutWriter to attach to the stdout or stderr of a process.
func newLogRotatorWrapper(path string, logger hclog.Logger, rotator *logging.FileRotator) (*logRotatorWrapper, error) {
logger.Info("opening fifo", "path", path)
f, err := fifo.New(path)
fifoOpenFn, err := fifo.New(path)
if err != nil {
return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err)
}
wrap := &logRotatorWrapper{
fifoPath: path,
processOutReader: f,
rotatorWriter: rotator,
hasFinishedCopied: make(chan struct{}),
opened: make(chan struct{}),
logger: logger,
}
wrap.start()
wrap.start(fifoOpenFn)
return wrap, nil
}
// start starts a goroutine that copies from the pipe into the rotator. This is
// called by the constructor and not the user of the wrapper.
func (l *logRotatorWrapper) start() {
func (l *logRotatorWrapper) start(readerOpenFn func() (io.ReadCloser, error)) {
go func() {
defer close(l.hasFinishedCopied)
_, err := io.Copy(l.rotatorWriter, l.processOutReader)
reader, err := readerOpenFn()
if err != nil {
return
}
l.processOutReader = reader
close(l.opened)
_, err = io.Copy(l.rotatorWriter, reader)
if err != nil {
l.logger.Error("copying got an error", "error", err)
// Close reader to propagate io error across pipe.
// Note that this may block until the process exits on
// Windows due to
@ -227,7 +238,7 @@ func (l *logRotatorWrapper) start() {
// or similar issues. Since this is already running in
// a goroutine its safe to block until the process is
// force-killed.
l.processOutReader.Close()
reader.Close()
}
}()
return
@ -249,6 +260,7 @@ func (l *logRotatorWrapper) Close() {
closeDone := make(chan struct{})
go func() {
defer close(closeDone)
<-l.opened
err := l.processOutReader.Close()
if err != nil && !strings.Contains(err.Error(), "file already closed") {
l.logger.Warn("error closing read-side of process output pipe", "err", err)