open-nomad/drivers/shared/eventer/eventer.go

166 lines
4.2 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package eventer
import (
"context"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/plugins/drivers"
)
var (
// DefaultSendEventTimeout is the timeout used when publishing events to consumers
DefaultSendEventTimeout = 2 * time.Second
// ConsumerGCInterval is the interval at which garbage collection of consumers
// occures
ConsumerGCInterval = time.Minute
)
// Eventer is a utility to control broadcast of TaskEvents to multiple consumers.
// It also implements the TaskEvents func in the DriverPlugin interface so that
// it can be embedded in a implementing driver struct.
type Eventer struct {
// events is a channel were events to be broadcasted are sent
// This channel is never closed, because it's lifetime is tied to the
// life of the driver and closing creates some subtile race conditions
// between closing it and emitting events.
events chan *drivers.TaskEvent
// consumers is a slice of eventConsumers to broadcast events to.
// access is gaurded by consumersLock RWMutex
consumers []*eventConsumer
consumersLock sync.RWMutex
// ctx to allow control of event loop shutdown
ctx context.Context
logger hclog.Logger
}
type eventConsumer struct {
timeout time.Duration
ctx context.Context
ch chan *drivers.TaskEvent
logger hclog.Logger
}
// NewEventer returns an Eventer with a running event loop that can be stopped
// by closing the given stop channel
func NewEventer(ctx context.Context, logger hclog.Logger) *Eventer {
e := &Eventer{
events: make(chan *drivers.TaskEvent),
ctx: ctx,
logger: logger,
}
go e.eventLoop()
return e
}
// eventLoop is the main logic which pulls events from the channel and broadcasts
// them to all consumers
func (e *Eventer) eventLoop() {
timer, stop := helper.NewSafeTimer(ConsumerGCInterval)
defer stop()
for {
timer.Reset(ConsumerGCInterval)
select {
case <-e.ctx.Done():
e.logger.Trace("task event loop shutdown")
return
case event := <-e.events:
e.iterateConsumers(event)
case <-timer.C:
e.gcConsumers()
}
}
}
// iterateConsumers will iterate through all consumers and broadcast the event,
// cleaning up any consumers that have closed their context
func (e *Eventer) iterateConsumers(event *drivers.TaskEvent) {
e.consumersLock.Lock()
filtered := e.consumers[:0]
for _, consumer := range e.consumers {
// prioritize checking if context is cancelled prior
// to attempting to forwarding events
// golang select evaluations aren't predictable
if consumer.ctx.Err() != nil {
close(consumer.ch)
continue
}
select {
case <-time.After(consumer.timeout):
filtered = append(filtered, consumer)
e.logger.Warn("timeout sending event", "task_id", event.TaskID, "message", event.Message)
case <-consumer.ctx.Done():
// consumer context finished, filtering it out of loop
close(consumer.ch)
case consumer.ch <- event:
filtered = append(filtered, consumer)
}
}
e.consumers = filtered
e.consumersLock.Unlock()
}
func (e *Eventer) gcConsumers() {
e.consumersLock.Lock()
filtered := e.consumers[:0]
for _, consumer := range e.consumers {
select {
case <-consumer.ctx.Done():
// consumer context finished, filtering it out of loop
default:
filtered = append(filtered, consumer)
}
}
e.consumers = filtered
e.consumersLock.Unlock()
}
func (e *Eventer) newConsumer(ctx context.Context) *eventConsumer {
e.consumersLock.Lock()
defer e.consumersLock.Unlock()
consumer := &eventConsumer{
ch: make(chan *drivers.TaskEvent),
ctx: ctx,
timeout: DefaultSendEventTimeout,
logger: e.logger,
}
e.consumers = append(e.consumers, consumer)
return consumer
}
// TaskEvents is an implementation of the DriverPlugin.TaskEvents function
func (e *Eventer) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
consumer := e.newConsumer(ctx)
return consumer.ch, nil
}
// EmitEvent can be used to broadcast a new event
func (e *Eventer) EmitEvent(event *drivers.TaskEvent) error {
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.events <- event:
if e.logger.IsTrace() {
e.logger.Trace("emitting event", "event", event)
}
}
return nil
}