watcher: MsgHandler, EventWatcher
This package is responsible for maintaining the lifecycle of a fifo.EventChannel. The EventWatcher type handles a single instance of an EventChannel, feeding messages it receives to a provided watcher.MsgHandler interface.
This commit is contained in:
parent
760bcac716
commit
c72d92bbd2
|
@ -0,0 +1,73 @@
|
||||||
|
/*
|
||||||
|
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package watcher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.st8l.com/luxolus/kdnotify/config"
|
||||||
|
"git.st8l.com/luxolus/kdnotify/fifo"
|
||||||
|
n "git.st8l.com/luxolus/kdnotify/schema/notify"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MsgHandler interface {
|
||||||
|
ProcessVrrp(n.VrrpMessage) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type EventWatcher struct {
|
||||||
|
Events fifo.EventChannel
|
||||||
|
Cxt *config.LibCxt
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWatcherFifo(cxt *config.LibCxt, path string) (EventWatcher, error) {
|
||||||
|
events, err := fifo.NewFifoChannel(cxt, path)
|
||||||
|
if err != nil {
|
||||||
|
return EventWatcher{}, fmt.Errorf("unable to create watch fifo channel for '%s': %s", path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return NewWatcher(cxt, &events), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWatcher(cxt *config.LibCxt, events fifo.EventChannel) EventWatcher {
|
||||||
|
cxt = &config.LibCxt{
|
||||||
|
Context: cxt.Context,
|
||||||
|
Logger: cxt.Logger.Named("watch"),
|
||||||
|
}
|
||||||
|
|
||||||
|
return EventWatcher{
|
||||||
|
Events: events,
|
||||||
|
Cxt: cxt,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *EventWatcher) Watch(h MsgHandler) error {
|
||||||
|
w.watchLoop(w.Events, h)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *EventWatcher) watchLoop(events fifo.EventChannel, h MsgHandler) {
|
||||||
|
log := w.Cxt.Logger.Sugar()
|
||||||
|
|
||||||
|
log.Debug("starting watch")
|
||||||
|
for !events.Done() {
|
||||||
|
msg, err := events.Next()
|
||||||
|
if err != nil {
|
||||||
|
if err != fifo.EClosed {
|
||||||
|
log.Warnw("event channel closed abnormally", "error", err)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
log.Debugw("incoming VRRP message", "instance", msg.Instance)
|
||||||
|
|
||||||
|
err = h.ProcessVrrp(msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnw("handler failed to process VRRP message", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Debug("stopping watch")
|
||||||
|
}
|
Loading…
Reference in New Issue