/* * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ package fifo import ( "bufio" "errors" "io" "go.uber.org/zap" "git.st8l.com/luxolus/kdnotify/config" n "git.st8l.com/luxolus/kdnotify/schema/notify" ) var ( EClosed = errors.New("channel is closed") ) type EventChannel interface { Next() (n.VrrpMessage, error) Done() bool } func NewFifoChannel(cxt *config.LibCxt, path string) (FifoChannel, error) { fifo, err := MkPipe(path) if err != nil { return FifoChannel{}, err } echan := make(chan n.VrrpMessage, 1) cxt = &config.LibCxt{ Logger: cxt.Logger.Named("fifo").With(zap.String("path", path)), Context: cxt.Context, } go fifoLoop(cxt, fifo, echan) return FifoChannel{ events: echan, done: false, }, nil } type FifoChannel struct { events <-chan n.VrrpMessage done bool } func (c *FifoChannel) Next() (n.VrrpMessage, error) { if c.done { return n.VrrpMessage{}, EClosed } msg, ok := <-c.events if !ok { c.done = true return n.VrrpMessage{}, EClosed } return msg, nil } func (c *FifoChannel) Done() bool { return c.done } func fifoLoop(cxt *config.LibCxt, fifo io.ReadCloser, echan chan<- n.VrrpMessage) { var count, failed uint64 log := cxt.Logger.Sugar() buffer := bufio.NewReaderSize(fifo, 1024*4) defer func(fifo io.ReadCloser) { err := fifo.Close() if err != nil { log.Warnw("error while closing fifo channel", "error", err) } }(fifo) log.Debug("open fifo") for { bytes, err := buffer.ReadBytes('\n') if err != nil { if err != io.EOF { log.Warnw("io error, closing fifo", "error", err) } break } count++ msg, err := n.ParseVrrp(string(bytes)) if err != nil { log.Infow("failed to parse VRRP message, skipping", "error", err, "line", string(bytes)) failed++ continue } log.Debugw("new VRRP message from fifo", "instance", msg.Instance, "type", msg.Type.String(), "state", msg.State.String()) echan <- msg } log.Infow("close fifo", "messages", count, "failed", failed) close(echan) }