open-nomad/client/structs/broadcaster.go

138 lines
3.3 KiB
Go
Raw Normal View History

2017-07-03 04:49:56 +00:00
package structs
import (
"errors"
2017-07-03 04:49:56 +00:00
"sync"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// listenerCap is the capacity of the listener chans. Must be exactly 1
// to prevent Sends from blocking and allows them to pop old pending
// updates from the chan before enqueueing the latest update.
listenerCap = 1
)
var ErrAllocBroadcasterClosed = errors.New("alloc broadcaster closed")
// AllocBroadcaster implements an allocation broadcast channel where each
// listener receives allocation updates. Pending updates are dropped and
// replaced by newer allocation updates, so listeners may not receive every
// allocation update. However this ensures Sends never block and listeners only
// receive the latest allocation update -- never a stale version.
2017-07-03 04:49:56 +00:00
type AllocBroadcaster struct {
m sync.Mutex
listeners map[int]chan *structs.Allocation // lazy init
2017-07-03 04:49:56 +00:00
nextId int
closed bool
}
// NewAllocBroadcaster returns a new AllocBroadcaster.
func NewAllocBroadcaster() *AllocBroadcaster {
return &AllocBroadcaster{}
2017-07-03 04:49:56 +00:00
}
// Send broadcasts an allocation update. Any pending updates are replaced with
// this version of the allocation to prevent blocking on slow receivers.
// Returns ErrAllocBroadcasterClosed if called after broadcaster is closed.
func (b *AllocBroadcaster) Send(v *structs.Allocation) error {
2017-07-03 04:49:56 +00:00
b.m.Lock()
defer b.m.Unlock()
if b.closed {
return ErrAllocBroadcasterClosed
2017-07-03 04:49:56 +00:00
}
// Send alloc to already created listeners
2017-07-03 04:49:56 +00:00
for _, l := range b.listeners {
select {
case l <- v:
case <-l:
// Pop pending update and replace with new update
l <- v
2017-07-03 04:49:56 +00:00
}
}
2017-07-18 18:05:38 +00:00
return nil
2017-07-03 04:49:56 +00:00
}
// Close closes the channel, disabling the sending of further allocation
// updates. Pending updates are still received by listeners. Safe to call
// concurrently and more than once.
2017-07-03 04:49:56 +00:00
func (b *AllocBroadcaster) Close() {
b.m.Lock()
defer b.m.Unlock()
if b.closed {
return
}
// Close all listener chans
2017-07-03 04:49:56 +00:00
for _, l := range b.listeners {
close(l)
}
// Clear all references and mark broadcaster as closed
b.listeners = nil
b.closed = true
}
// stop an individual listener
func (b *AllocBroadcaster) stop(id int) {
b.m.Lock()
defer b.m.Unlock()
// If broadcaster has been closed there's nothing more to do.
if b.closed {
return
}
l, ok := b.listeners[id]
if !ok {
// If this listener has been stopped already there's nothing
// more to do.
return
}
close(l)
delete(b.listeners, id)
2017-07-03 04:49:56 +00:00
}
// Listen returns a Listener for the broadcast channel.
func (b *AllocBroadcaster) Listen() *AllocListener {
b.m.Lock()
defer b.m.Unlock()
if b.listeners == nil {
b.listeners = make(map[int]chan *structs.Allocation)
2017-07-03 04:49:56 +00:00
}
2017-07-03 04:49:56 +00:00
for b.listeners[b.nextId] != nil {
b.nextId++
}
ch := make(chan *structs.Allocation, listenerCap)
// Broadcaster is already closed, close this listener
2017-07-03 04:49:56 +00:00
if b.closed {
close(ch)
}
2017-07-03 04:49:56 +00:00
b.listeners[b.nextId] = ch
2017-07-03 04:49:56 +00:00
return &AllocListener{ch, b, b.nextId}
}
// AllocListener implements a listening endpoint for an allocation broadcast
// channel.
type AllocListener struct {
// Ch receives the broadcast messages.
Ch <-chan *structs.Allocation
b *AllocBroadcaster
id int
}
// Close closes the Listener, disabling the receival of further messages. Safe
// to call more than once and concurrently with receiving on Ch.
2017-07-03 04:49:56 +00:00
func (l *AllocListener) Close() {
l.b.stop(l.id)
2017-07-03 04:49:56 +00:00
}