open-consul/vendor/github.com/inconshreveable/muxado/proto/session.go

475 lines
13 KiB
Go

package proto
import (
"fmt"
"github.com/inconshreveable/muxado/proto/frame"
"io"
"net"
"reflect"
"sync"
"sync/atomic"
"time"
)
const (
defaultWindowSize = 0x10000 // 64KB
defaultAcceptQueueDepth = 100
MinExtensionType = 0xFFFFFFFF - 0x100 // 512 extensions
)
// private interface for Sessions to call Streams
type stream interface {
IStream
handleStreamData(*frame.RStreamData)
handleStreamWndInc(*frame.RStreamWndInc)
handleStreamRst(*frame.RStreamRst)
closeWith(error)
}
// for extensions
type ExtAccept func() (IStream, error)
type Extension interface {
Start(ISession, ExtAccept) frame.StreamType
}
type deadReason struct {
errorCode frame.ErrorCode
err error
remoteDebug []byte
}
// factory function that creates new streams
type streamFactory func(id frame.StreamId, priority frame.StreamPriority, streamType frame.StreamType, finLocal bool, finRemote bool, windowSize uint32, sess session) stream
// checks the parity of a stream id (local vs remote, client vs server)
type parityFn func(frame.StreamId) bool
// state for each half of the session (remote and local)
type halfState struct {
goneAway int32 // true if that half of the stream has gone away
lastId uint32 // last id used/seen from one half of the session
}
// Session implements a simple streaming session manager. It has the following characteristics:
//
// - When closing the Session, it does not linger, all pending write operations will fail immediately.
// - It completely ignores stream priority when processing and writing frames
// - It offers no customization of settings like window size/ping time
type Session struct {
conn net.Conn // connection the transport is running over
transport frame.Transport // transport
streams StreamMap // all active streams
local halfState // client state
remote halfState // server state
syn *frame.WStreamSyn // STREAM_SYN frame for opens
wr sync.Mutex // synchronization when writing frames
accept chan stream // new streams opened by the remote
diebit int32 // true if we're dying
remoteDebug []byte // debugging data sent in the remote's GoAway frame
defaultWindowSize uint32 // window size when creating new streams
newStream streamFactory // factory function to make new streams
dead chan deadReason // dead
isLocal parityFn // determines if a stream id is local or remote
exts map[frame.StreamType]chan stream // map of extension stream type -> accept channel for the extension
}
func NewSession(conn net.Conn, newStream streamFactory, isClient bool, exts []Extension) ISession {
sess := &Session{
conn: conn,
transport: frame.NewBasicTransport(conn),
streams: NewConcurrentStreamMap(),
local: halfState{lastId: 0},
remote: halfState{lastId: 0},
syn: frame.NewWStreamSyn(),
diebit: 0,
defaultWindowSize: defaultWindowSize,
accept: make(chan stream, defaultAcceptQueueDepth),
newStream: newStream,
dead: make(chan deadReason, 1), // don't block die() if there is no Wait call
exts: make(map[frame.StreamType]chan stream),
}
if isClient {
sess.isLocal = sess.isClient
sess.local.lastId += 1
} else {
sess.isLocal = sess.isServer
sess.remote.lastId += 1
}
for _, ext := range exts {
sess.startExtension(ext)
}
go sess.reader()
return sess
}
////////////////////////////////
// public interface
////////////////////////////////
func (s *Session) Open() (IStream, error) {
return s.OpenStream(0, 0, false)
}
func (s *Session) OpenStream(priority frame.StreamPriority, streamType frame.StreamType, fin bool) (ret IStream, err error) {
// check if the remote has gone away
if atomic.LoadInt32(&s.remote.goneAway) == 1 {
return nil, fmt.Errorf("Failed to create stream, remote has gone away.")
}
// this lock prevents the following race:
// goroutine1 goroutine2
// - inc stream id
// - inc stream id
// - send streamsyn
// - send streamsyn
s.wr.Lock()
// get the next id we can use
nextId := frame.StreamId(atomic.AddUint32(&s.local.lastId, 2))
// make the stream
str := s.newStream(nextId, priority, streamType, fin, false, s.defaultWindowSize, s)
// add to to the stream map
s.streams.Set(nextId, str)
// write the frame
if err = s.syn.Set(nextId, priority, streamType, fin); err != nil {
s.wr.Unlock()
s.die(frame.InternalError, err)
return
}
if err = s.transport.WriteFrame(s.syn); err != nil {
s.wr.Unlock()
s.die(frame.InternalError, err)
return
}
s.wr.Unlock()
return str, nil
}
func (s *Session) Accept() (str IStream, err error) {
var ok bool
if str, ok = <-s.accept; !ok {
return nil, fmt.Errorf("Session closed")
}
return
}
func (s *Session) Kill() error {
return s.transport.Close()
}
func (s *Session) Close() error {
return s.die(frame.NoError, fmt.Errorf("Session Close()"))
}
func (s *Session) GoAway(errorCode frame.ErrorCode, debug []byte) (err error) {
if !atomic.CompareAndSwapInt32(&s.local.goneAway, 0, 1) {
return fmt.Errorf("Already sent GoAway!")
}
s.wr.Lock()
f := frame.NewWGoAway()
remoteId := frame.StreamId(atomic.LoadUint32(&s.remote.lastId))
if err = f.Set(remoteId, errorCode, debug); err != nil {
s.wr.Unlock()
s.die(frame.InternalError, err)
return
}
if err = s.transport.WriteFrame(f); err != nil {
s.wr.Unlock()
s.die(frame.InternalError, err)
return
}
s.wr.Unlock()
return
}
func (s *Session) LocalAddr() net.Addr {
return s.conn.LocalAddr()
}
func (s *Session) RemoteAddr() net.Addr {
return s.conn.RemoteAddr()
}
func (s *Session) Wait() (frame.ErrorCode, error, []byte) {
reason := <-s.dead
return reason.errorCode, reason.err, reason.remoteDebug
}
////////////////////////////////
// private interface for streams
////////////////////////////////
// removeStream removes a stream from this session's stream registry
//
// It does not error if the stream is not present
func (s *Session) removeStream(id frame.StreamId) {
s.streams.Delete(id)
return
}
// writeFrame writes the given frame to the transport and returns the error from the write operation
func (s *Session) writeFrame(f frame.WFrame, dl time.Time) (err error) {
s.wr.Lock()
s.conn.SetWriteDeadline(dl)
err = s.transport.WriteFrame(f)
s.wr.Unlock()
return
}
// die closes the session cleanly with the given error and protocol error code
func (s *Session) die(errorCode frame.ErrorCode, err error) error {
// only one shutdown ever happens
if !atomic.CompareAndSwapInt32(&s.diebit, 0, 1) {
return fmt.Errorf("Shutdown already in progress")
}
// send a go away frame
s.GoAway(errorCode, []byte(err.Error()))
// now we're safe to stop accepting incoming connections
close(s.accept)
// we cleaned up as best as possible, close the transport
s.transport.Close()
// notify all of the streams that we're closing
s.streams.Each(func(id frame.StreamId, str stream) {
str.closeWith(fmt.Errorf("Session closed"))
})
s.dead <- deadReason{errorCode, err, s.remoteDebug}
return nil
}
////////////////////////////////
// internal methods
////////////////////////////////
// reader() reads frames from the underlying transport and handles passes them to handleFrame
func (s *Session) reader() {
defer s.recoverPanic("reader()")
// close all of the extension accept channels when we're done
// we do this here instead of in die() since otherwise it wouldn't
// be safe to access s.exts
defer func() {
for _, extAccept := range s.exts {
close(extAccept)
}
}()
for {
f, err := s.transport.ReadFrame()
if err != nil {
// if we fail to read a frame, terminate the session
_, ok := err.(*frame.FramingError)
if ok {
s.die(frame.ProtocolError, err)
} else {
s.die(frame.InternalError, err)
}
return
}
s.handleFrame(f)
}
}
func (s *Session) handleFrame(rf frame.RFrame) {
switch f := rf.(type) {
case *frame.RStreamSyn:
// if we're going away, refuse new streams
if atomic.LoadInt32(&s.local.goneAway) == 1 {
rstF := frame.NewWStreamRst()
rstF.Set(f.StreamId(), frame.RefusedStream)
go s.writeFrame(rstF, time.Time{})
return
}
if f.StreamId() <= frame.StreamId(atomic.LoadUint32(&s.remote.lastId)) {
s.die(frame.ProtocolError, fmt.Errorf("Stream id %d is less than last remote id.", f.StreamId()))
return
}
if s.isLocal(f.StreamId()) {
s.die(frame.ProtocolError, fmt.Errorf("Stream id has wrong parity for remote endpoint: %d", f.StreamId()))
return
}
// update last remote id
atomic.StoreUint32(&s.remote.lastId, uint32(f.StreamId()))
// make the new stream
str := s.newStream(f.StreamId(), f.StreamPriority(), f.StreamType(), false, f.Fin(), s.defaultWindowSize, s)
// add it to the stream map
s.streams.Set(f.StreamId(), str)
// check if this is an extension stream
if f.StreamType() >= MinExtensionType {
extAccept, ok := s.exts[f.StreamType()]
if !ok {
// Extension type of stream not registered
fRst := frame.NewWStreamRst()
if err := fRst.Set(f.StreamId(), frame.StreamClosed); err != nil {
s.die(frame.InternalError, err)
}
s.wr.Lock()
defer s.wr.Unlock()
s.transport.WriteFrame(fRst)
} else {
extAccept <- str
}
return
}
// put the new stream on the accept channel
s.accept <- str
case *frame.RStreamData:
if str := s.getStream(f.StreamId()); str != nil {
str.handleStreamData(f)
} else {
// if we get a data frame on a non-existent connection, we still
// need to read out the frame body so that the stream stays in a
// good state. read the payload into a throwaway buffer
discard := make([]byte, f.Length())
io.ReadFull(f.Reader(), discard)
// DATA frames on closed connections are just stream-level errors
fRst := frame.NewWStreamRst()
if err := fRst.Set(f.StreamId(), frame.StreamClosed); err != nil {
s.die(frame.InternalError, err)
}
s.wr.Lock()
defer s.wr.Unlock()
s.transport.WriteFrame(fRst)
return
}
case *frame.RStreamRst:
// delegate to the stream to handle these frames
if str := s.getStream(f.StreamId()); str != nil {
str.handleStreamRst(f)
}
case *frame.RStreamWndInc:
// delegate to the stream to handle these frames
if str := s.getStream(f.StreamId()); str != nil {
str.handleStreamWndInc(f)
}
case *frame.RGoAway:
atomic.StoreInt32(&s.remote.goneAway, 1)
s.remoteDebug = f.Debug()
lastId := f.LastStreamId()
s.streams.Each(func(id frame.StreamId, str stream) {
// close all streams that we opened above the last handled id
if s.isLocal(str.Id()) && str.Id() > lastId {
str.closeWith(fmt.Errorf("Remote is going away"))
}
})
default:
s.die(frame.ProtocolError, fmt.Errorf("Unrecognized frame type: %v", reflect.TypeOf(f)))
return
}
}
func (s *Session) recoverPanic(prefix string) {
if r := recover(); r != nil {
s.die(frame.InternalError, fmt.Errorf("%s panic: %v", prefix, r))
}
}
func (s *Session) getStream(id frame.StreamId) (str stream) {
// decide if this id is in the "idle" state (i.e. greater than any we've seen for that parity)
var lastId *uint32
if s.isLocal(id) {
lastId = &s.local.lastId
} else {
lastId = &s.remote.lastId
}
if uint32(id) > atomic.LoadUint32(lastId) {
s.die(frame.ProtocolError, fmt.Errorf("%d is an invalid, unassigned stream id", id))
}
// find the stream in the stream map
var ok bool
if str, ok = s.streams.Get(id); !ok {
return nil
}
return
}
// check if a stream id is for a client stream. client streams are odd
func (s *Session) isClient(id frame.StreamId) bool {
return uint32(id)&1 == 1
}
func (s *Session) isServer(id frame.StreamId) bool {
return !s.isClient(id)
}
//////////////////////////////////////////////
// session extensions
//////////////////////////////////////////////
func (s *Session) startExtension(ext Extension) {
accept := make(chan stream)
extAccept := func() (IStream, error) {
s, ok := <-accept
if !ok {
return nil, fmt.Errorf("Failed to accept connection, shutting down")
}
return s, nil
}
extType := ext.Start(s, extAccept)
s.exts[extType] = accept
}
//////////////////////////////////////////////
// net adaptors
//////////////////////////////////////////////
func (s *Session) NetDial(_, _ string) (net.Conn, error) {
str, err := s.Open()
return net.Conn(str), err
}
func (s *Session) NetListener() net.Listener {
return &netListenerAdaptor{s}
}
type netListenerAdaptor struct {
*Session
}
func (a *netListenerAdaptor) Addr() net.Addr {
return a.LocalAddr()
}
func (a *netListenerAdaptor) Accept() (net.Conn, error) {
str, err := a.Session.Accept()
return net.Conn(str), err
}