open-nomad/client/lib/streamframer/framer.go
2018-02-15 13:59:02 -08:00

310 lines
6.4 KiB
Go

package framer
import (
"bytes"
"fmt"
"sync"
"time"
)
var (
// HeartbeatStreamFrame is the StreamFrame to send as a heartbeat, avoiding
// creating many instances of the empty StreamFrame
HeartbeatStreamFrame = &StreamFrame{}
)
// StreamFrame is used to frame data of a file when streaming
type StreamFrame struct {
// Offset is the offset the data was read from
Offset int64 `json:",omitempty"`
// Data is the read data
Data []byte `json:",omitempty"`
// File is the file that the data was read from
File string `json:",omitempty"`
// FileEvent is the last file event that occurred that could cause the
// streams position to change or end
FileEvent string `json:",omitempty"`
}
// IsHeartbeat returns if the frame is a heartbeat frame
func (s *StreamFrame) IsHeartbeat() bool {
return s.Offset == 0 && len(s.Data) == 0 && s.File == "" && s.FileEvent == ""
}
func (s *StreamFrame) Clear() {
s.Offset = 0
s.Data = nil
s.File = ""
s.FileEvent = ""
}
func (s *StreamFrame) IsCleared() bool {
if s.Offset != 0 {
return false
} else if s.Data != nil {
return false
} else if s.File != "" {
return false
} else if s.FileEvent != "" {
return false
} else {
return true
}
}
// StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct {
out chan<- *StreamFrame
frameSize int
heartbeat *time.Ticker
flusher *time.Ticker
shutdown bool
shutdownCh chan struct{}
exitCh chan struct{}
// The mutex protects everything below
l sync.Mutex
// The current working frame
f StreamFrame
data *bytes.Buffer
// Captures whether the framer is running and any error that occurred to
// cause it to stop.
running bool
err error
}
// NewStreamFramer creates a new stream framer that will output StreamFrames to
// the passed output channel.
func NewStreamFramer(out chan<- *StreamFrame,
heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer {
// Create the heartbeat and flush ticker
heartbeat := time.NewTicker(heartbeatRate)
flusher := time.NewTicker(batchWindow)
return &StreamFramer{
out: out,
frameSize: frameSize,
heartbeat: heartbeat,
flusher: flusher,
data: bytes.NewBuffer(make([]byte, 0, 2*frameSize)),
shutdownCh: make(chan struct{}),
exitCh: make(chan struct{}),
}
}
// Destroy is used to cleanup the StreamFramer and flush any pending frames
func (s *StreamFramer) Destroy() {
s.l.Lock()
wasShutdown := s.shutdown
s.shutdown = true
if !wasShutdown {
close(s.shutdownCh)
}
s.heartbeat.Stop()
s.flusher.Stop()
running := s.running
s.l.Unlock()
// Ensure things were flushed
if running {
<-s.exitCh
}
if !wasShutdown {
close(s.out)
}
}
// Run starts a long lived goroutine that handles sending data as well as
// heartbeating
func (s *StreamFramer) Run() {
s.l.Lock()
defer s.l.Unlock()
if s.running {
return
}
s.running = true
go s.run()
}
// ExitCh returns a channel that will be closed when the run loop terminates.
func (s *StreamFramer) ExitCh() <-chan struct{} {
return s.exitCh
}
// Err returns the error that caused the StreamFramer to exit
func (s *StreamFramer) Err() error {
s.l.Lock()
defer s.l.Unlock()
return s.err
}
// run is the internal run method. It exits if Destroy is called or an error
// occurs, in which case the exit channel is closed.
func (s *StreamFramer) run() {
var err error
defer func() {
s.l.Lock()
s.running = false
s.err = err
s.l.Unlock()
close(s.exitCh)
}()
OUTER:
for {
select {
case <-s.shutdownCh:
break OUTER
case <-s.flusher.C:
// Skip if there is nothing to flush
s.l.Lock()
if s.f.IsCleared() {
s.l.Unlock()
continue
}
// Read the data for the frame, and send it
s.f.Data = s.readData()
err = s.send(&s.f)
s.f.Clear()
s.l.Unlock()
if err != nil {
return
}
case <-s.heartbeat.C:
// Send a heartbeat frame
if err = s.send(HeartbeatStreamFrame); err != nil {
return
}
}
}
s.l.Lock()
if !s.f.IsCleared() {
s.f.Data = s.readData()
err = s.send(&s.f)
s.f.Clear()
}
s.l.Unlock()
}
// send takes a StreamFrame, encodes and sends it
func (s *StreamFramer) send(f *StreamFrame) error {
sending := *f
f.Data = nil
select {
case s.out <- &sending:
return nil
case <-s.exitCh:
return nil
}
}
// readData is a helper which reads the buffered data returning up to the frame
// size of data. Must be called with the lock held. The returned value is
// invalid on the next read or write into the StreamFramer buffer
func (s *StreamFramer) readData() []byte {
// Compute the amount to read from the buffer
size := s.data.Len()
if size > s.frameSize {
size = s.frameSize
}
if size == 0 {
return nil
}
d := s.data.Next(size)
return d
}
// Send creates and sends a StreamFrame based on the passed parameters. An error
// is returned if the run routine hasn't run or encountered an error. Send is
// asynchronous and does not block for the data to be transferred.
func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) error {
s.l.Lock()
defer s.l.Unlock()
// If we are not running, return the error that caused us to not run or
// indicated that it was never started.
if !s.running {
if s.err != nil {
return s.err
}
return fmt.Errorf("StreamFramer not running")
}
// Check if not mergeable
if !s.f.IsCleared() && (s.f.File != file || s.f.FileEvent != fileEvent) {
// Flush the old frame
s.f.Data = s.readData()
select {
case <-s.exitCh:
return nil
default:
}
err := s.send(&s.f)
s.f.Clear()
if err != nil {
return err
}
}
// Store the new data as the current frame.
if s.f.IsCleared() {
s.f.Offset = offset
s.f.File = file
s.f.FileEvent = fileEvent
}
// Write the data to the buffer
s.data.Write(data)
// Handle the delete case in which there is no data
force := s.data.Len() == 0 && s.f.FileEvent != ""
// Flush till we are under the max frame size
for s.data.Len() >= s.frameSize || force {
// Clear since are flushing the frame and capturing the file event.
// Subsequent data frames will be flushed based on the data size alone
// since they share the same fileevent.
if force {
force = false
}
// Create a new frame to send it
s.f.Data = s.readData()
select {
case <-s.exitCh:
return nil
default:
}
if err := s.send(&s.f); err != nil {
return err
}
// Update the offset
s.f.Offset += int64(len(s.f.Data))
}
if s.data.Len() == 0 {
s.f.Clear()
}
return nil
}