fifo: EventChannel, FifoChannel

This commit adds the fifo.EventChannel interface for consuming (VRRP)
events from keepalived.

This is what package users should accept when expecting a handle to the
event stream.

fifo.FifoChannel is an implementation of this against an IPC FIFO channel
opened via MkFifo(), and can be considered the primary "producer" of
fifo.EventChannel events, but importantly we can create other
producers, notably for testing.
This commit is contained in:
Paul Stemmet 2022-12-09 12:04:54 +00:00
parent 69ceb4eab3
commit 65660e3286
Signed by: Paul Stemmet
GPG Key ID: EDEA539F594E7E75
1 changed files with 112 additions and 0 deletions

112
fifo/events.go Normal file
View File

@ -0,0 +1,112 @@
/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
package fifo
import (
"bufio"
"errors"
"io"
"go.uber.org/zap"
"git.st8l.com/luxolus/kdnotify/config"
n "git.st8l.com/luxolus/kdnotify/schema/notify"
)
var (
EClosed = errors.New("channel is closed")
)
type EventChannel interface {
Next() (n.VrrpMessage, error)
Done() bool
}
func NewFifoChannel(cxt *config.LibCxt, path string) (FifoChannel, error) {
fifo, err := MkPipe(path)
if err != nil {
return FifoChannel{}, err
}
echan := make(chan n.VrrpMessage, 1)
cxt = &config.LibCxt{
Logger: cxt.Logger.Named("fifo").With(zap.String("path", path)),
Context: cxt.Context,
}
go fifoLoop(cxt, fifo, echan)
return FifoChannel{
events: echan,
done: false,
}, nil
}
type FifoChannel struct {
events <-chan n.VrrpMessage
done bool
}
func (c *FifoChannel) Next() (n.VrrpMessage, error) {
if c.done {
return n.VrrpMessage{}, EClosed
}
msg, ok := <-c.events
if !ok {
c.done = true
return n.VrrpMessage{}, EClosed
}
return msg, nil
}
func (c *FifoChannel) Done() bool {
return c.done
}
func fifoLoop(cxt *config.LibCxt, fifo io.ReadCloser, echan chan<- n.VrrpMessage) {
var count, failed uint64
log := cxt.Logger.Sugar()
buffer := bufio.NewReaderSize(fifo, 1024*4)
defer func(fifo io.ReadCloser) {
err := fifo.Close()
if err != nil {
log.Warnw("error while closing fifo channel", "error", err)
}
}(fifo)
log.Debug("open fifo")
for {
bytes, err := buffer.ReadBytes('\n')
if err != nil {
if err != io.EOF {
log.Warnw("io error, closing fifo", "error", err)
}
break
}
count++
msg, err := n.ParseVrrp(string(bytes))
if err != nil {
log.Infow("failed to parse VRRP message, skipping", "error", err, "line", string(bytes))
failed++
continue
}
log.Debugw("new VRRP message from fifo",
"instance", msg.Instance,
"type", msg.Type.String(),
"state", msg.State.String())
echan <- msg
}
log.Infow("close fifo", "messages", count, "failed", failed)
close(echan)
}