From 65660e32866685b2f080fec81f899a52b19d7a86 Mon Sep 17 00:00:00 2001 From: Bazaah Date: Fri, 9 Dec 2022 12:04:54 +0000 Subject: [PATCH] fifo: EventChannel, FifoChannel This commit adds the fifo.EventChannel interface for consuming (VRRP) events from keepalived. This is what package users should accept when expecting a handle to the event stream. fifo.FifoChannel is an implementation of this against an IPC FIFO channel opened via MkFifo(), and can be considered the primary "producer" of fifo.EventChannel events, but importantly we can create other producers, notably for testing. --- fifo/events.go | 112 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 fifo/events.go diff --git a/fifo/events.go b/fifo/events.go new file mode 100644 index 0000000..bf6c7d5 --- /dev/null +++ b/fifo/events.go @@ -0,0 +1,112 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +package fifo + +import ( + "bufio" + "errors" + "io" + + "go.uber.org/zap" + + "git.st8l.com/luxolus/kdnotify/config" + n "git.st8l.com/luxolus/kdnotify/schema/notify" +) + +var ( + EClosed = errors.New("channel is closed") +) + +type EventChannel interface { + Next() (n.VrrpMessage, error) + Done() bool +} + +func NewFifoChannel(cxt *config.LibCxt, path string) (FifoChannel, error) { + fifo, err := MkPipe(path) + if err != nil { + return FifoChannel{}, err + } + + echan := make(chan n.VrrpMessage, 1) + cxt = &config.LibCxt{ + Logger: cxt.Logger.Named("fifo").With(zap.String("path", path)), + Context: cxt.Context, + } + + go fifoLoop(cxt, fifo, echan) + + return FifoChannel{ + events: echan, + done: false, + }, nil +} + +type FifoChannel struct { + events <-chan n.VrrpMessage + done bool +} + +func (c *FifoChannel) Next() (n.VrrpMessage, error) { + if c.done { + return n.VrrpMessage{}, EClosed + } + + msg, ok := <-c.events + if !ok { + c.done = true + + return n.VrrpMessage{}, EClosed + } + + return msg, nil +} + +func (c *FifoChannel) Done() bool { + return c.done +} + +func fifoLoop(cxt *config.LibCxt, fifo io.ReadCloser, echan chan<- n.VrrpMessage) { + var count, failed uint64 + log := cxt.Logger.Sugar() + buffer := bufio.NewReaderSize(fifo, 1024*4) + + defer func(fifo io.ReadCloser) { + err := fifo.Close() + if err != nil { + log.Warnw("error while closing fifo channel", "error", err) + } + }(fifo) + + log.Debug("open fifo") + for { + bytes, err := buffer.ReadBytes('\n') + if err != nil { + if err != io.EOF { + log.Warnw("io error, closing fifo", "error", err) + } + break + } + count++ + + msg, err := n.ParseVrrp(string(bytes)) + if err != nil { + log.Infow("failed to parse VRRP message, skipping", "error", err, "line", string(bytes)) + failed++ + continue + } + + log.Debugw("new VRRP message from fifo", + "instance", msg.Instance, + "type", msg.Type.String(), + "state", msg.State.String()) + + echan <- msg + } + log.Infow("close fifo", "messages", count, "failed", failed) + close(echan) +}