diff --git a/fifo/events.go b/fifo/events.go new file mode 100644 index 0000000..bf6c7d5 --- /dev/null +++ b/fifo/events.go @@ -0,0 +1,112 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +package fifo + +import ( + "bufio" + "errors" + "io" + + "go.uber.org/zap" + + "git.st8l.com/luxolus/kdnotify/config" + n "git.st8l.com/luxolus/kdnotify/schema/notify" +) + +var ( + EClosed = errors.New("channel is closed") +) + +type EventChannel interface { + Next() (n.VrrpMessage, error) + Done() bool +} + +func NewFifoChannel(cxt *config.LibCxt, path string) (FifoChannel, error) { + fifo, err := MkPipe(path) + if err != nil { + return FifoChannel{}, err + } + + echan := make(chan n.VrrpMessage, 1) + cxt = &config.LibCxt{ + Logger: cxt.Logger.Named("fifo").With(zap.String("path", path)), + Context: cxt.Context, + } + + go fifoLoop(cxt, fifo, echan) + + return FifoChannel{ + events: echan, + done: false, + }, nil +} + +type FifoChannel struct { + events <-chan n.VrrpMessage + done bool +} + +func (c *FifoChannel) Next() (n.VrrpMessage, error) { + if c.done { + return n.VrrpMessage{}, EClosed + } + + msg, ok := <-c.events + if !ok { + c.done = true + + return n.VrrpMessage{}, EClosed + } + + return msg, nil +} + +func (c *FifoChannel) Done() bool { + return c.done +} + +func fifoLoop(cxt *config.LibCxt, fifo io.ReadCloser, echan chan<- n.VrrpMessage) { + var count, failed uint64 + log := cxt.Logger.Sugar() + buffer := bufio.NewReaderSize(fifo, 1024*4) + + defer func(fifo io.ReadCloser) { + err := fifo.Close() + if err != nil { + log.Warnw("error while closing fifo channel", "error", err) + } + }(fifo) + + log.Debug("open fifo") + for { + bytes, err := buffer.ReadBytes('\n') + if err != nil { + if err != io.EOF { + log.Warnw("io error, closing fifo", "error", err) + } + break + } + count++ + + msg, err := n.ParseVrrp(string(bytes)) + if err != nil { + log.Infow("failed to parse VRRP message, skipping", "error", err, "line", string(bytes)) + failed++ + continue + } + + log.Debugw("new VRRP message from fifo", + "instance", msg.Instance, + "type", msg.Type.String(), + "state", msg.State.String()) + + echan <- msg + } + log.Infow("close fifo", "messages", count, "failed", failed) + close(echan) +}