kdnotify/fifo/events.go

113 lines
2.1 KiB
Go
Raw Normal View History

/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
package fifo
import (
"bufio"
"errors"
"io"
"go.uber.org/zap"
"git.st8l.com/luxolus/kdnotify/config"
n "git.st8l.com/luxolus/kdnotify/schema/notify"
)
var (
EClosed = errors.New("channel is closed")
)
type EventChannel interface {
Next() (n.VrrpMessage, error)
Done() bool
}
func NewFifoChannel(cxt *config.LibCxt, path string) (FifoChannel, error) {
fifo, err := MkPipe(path)
if err != nil {
return FifoChannel{}, err
}
echan := make(chan n.VrrpMessage, 1)
cxt = &config.LibCxt{
Logger: cxt.Logger.Named("fifo").With(zap.String("path", path)),
Context: cxt.Context,
}
go fifoLoop(cxt, fifo, echan)
return FifoChannel{
events: echan,
done: false,
}, nil
}
type FifoChannel struct {
events <-chan n.VrrpMessage
done bool
}
func (c *FifoChannel) Next() (n.VrrpMessage, error) {
if c.done {
return n.VrrpMessage{}, EClosed
}
msg, ok := <-c.events
if !ok {
c.done = true
return n.VrrpMessage{}, EClosed
}
return msg, nil
}
func (c *FifoChannel) Done() bool {
return c.done
}
func fifoLoop(cxt *config.LibCxt, fifo io.ReadCloser, echan chan<- n.VrrpMessage) {
var count, failed uint64
log := cxt.Logger.Sugar()
buffer := bufio.NewReaderSize(fifo, 1024*4)
defer func(fifo io.ReadCloser) {
err := fifo.Close()
if err != nil {
log.Warnw("error while closing fifo channel", "error", err)
}
}(fifo)
log.Debug("open fifo")
for {
bytes, err := buffer.ReadBytes('\n')
if err != nil {
if err != io.EOF {
log.Warnw("io error, closing fifo", "error", err)
}
break
}
count++
msg, err := n.ParseVrrp(string(bytes))
if err != nil {
log.Infow("failed to parse VRRP message, skipping", "error", err, "line", string(bytes))
failed++
continue
}
log.Debugw("new VRRP message from fifo",
"instance", msg.Instance,
"type", msg.Type.String(),
"state", msg.State.String())
echan <- msg
}
log.Infow("close fifo", "messages", count, "failed", failed)
close(echan)
}