kdnotify/watcher/watcher.go

74 lines
1.6 KiB
Go

/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
package watcher
import (
"fmt"
"git.st8l.com/luxolus/kdnotify/config"
"git.st8l.com/luxolus/kdnotify/fifo"
n "git.st8l.com/luxolus/kdnotify/schema/notify"
)
type MsgHandler interface {
ProcessVrrp(n.VrrpMessage) error
}
type EventWatcher struct {
Events fifo.EventChannel
Cxt *config.LibCxt
}
func NewWatcherFifo(cxt *config.LibCxt, path string) (EventWatcher, error) {
events, err := fifo.NewFifoChannel(cxt, path)
if err != nil {
return EventWatcher{}, fmt.Errorf("unable to create watch fifo channel for '%s': %s", path, err)
}
return NewWatcher(cxt, &events), nil
}
func NewWatcher(cxt *config.LibCxt, events fifo.EventChannel) EventWatcher {
cxt = &config.LibCxt{
Context: cxt.Context,
Logger: cxt.Logger.Named("watch"),
}
return EventWatcher{
Events: events,
Cxt: cxt,
}
}
func (w *EventWatcher) Watch(h MsgHandler) error {
w.watchLoop(w.Events, h)
return nil
}
func (w *EventWatcher) watchLoop(events fifo.EventChannel, h MsgHandler) {
log := w.Cxt.Logger.Sugar()
log.Debug("starting watch")
for !events.Done() {
msg, err := events.Next()
if err != nil {
if err != fifo.EClosed {
log.Warnw("event channel closed abnormally", "error", err)
}
break
}
log.Debugw("incoming VRRP message", "instance", msg.Instance)
err = h.ProcessVrrp(msg)
if err != nil {
log.Warnw("handler failed to process VRRP message", "error", err)
}
}
log.Debug("stopping watch")
}