diff --git a/watcher/watcher.go b/watcher/watcher.go new file mode 100644 index 0000000..27f32bb --- /dev/null +++ b/watcher/watcher.go @@ -0,0 +1,73 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +package watcher + +import ( + "fmt" + + "git.st8l.com/luxolus/kdnotify/config" + "git.st8l.com/luxolus/kdnotify/fifo" + n "git.st8l.com/luxolus/kdnotify/schema/notify" +) + +type MsgHandler interface { + ProcessVrrp(n.VrrpMessage) error +} + +type EventWatcher struct { + Events fifo.EventChannel + Cxt *config.LibCxt +} + +func NewWatcherFifo(cxt *config.LibCxt, path string) (EventWatcher, error) { + events, err := fifo.NewFifoChannel(cxt, path) + if err != nil { + return EventWatcher{}, fmt.Errorf("unable to create watch fifo channel for '%s': %s", path, err) + } + + return NewWatcher(cxt, &events), nil +} + +func NewWatcher(cxt *config.LibCxt, events fifo.EventChannel) EventWatcher { + cxt = &config.LibCxt{ + Context: cxt.Context, + Logger: cxt.Logger.Named("watch"), + } + + return EventWatcher{ + Events: events, + Cxt: cxt, + } +} + +func (w *EventWatcher) Watch(h MsgHandler) error { + w.watchLoop(w.Events, h) + + return nil +} + +func (w *EventWatcher) watchLoop(events fifo.EventChannel, h MsgHandler) { + log := w.Cxt.Logger.Sugar() + + log.Debug("starting watch") + for !events.Done() { + msg, err := events.Next() + if err != nil { + if err != fifo.EClosed { + log.Warnw("event channel closed abnormally", "error", err) + } + break + } + log.Debugw("incoming VRRP message", "instance", msg.Instance) + + err = h.ProcessVrrp(msg) + if err != nil { + log.Warnw("handler failed to process VRRP message", "error", err) + } + } + log.Debug("stopping watch") +}