Updating yamux

This commit is contained in:
Diptanu Choudhury 2016-05-20 22:06:22 -07:00
parent 83bbe9df60
commit 6eaa1ef0d5
5 changed files with 35 additions and 34 deletions

View file

@ -1,23 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test

View file

@ -46,8 +46,11 @@ type Session struct {
pingID uint32
pingLock sync.Mutex
// streams maps a stream id to a stream
// streams maps a stream id to a stream, and inflight has an entry
// for any outgoing stream that has not yet been established. Both are
// protected by streamLock.
streams map[uint32]*Stream
inflight map[uint32]struct{}
streamLock sync.Mutex
// synCh acts like a semaphore. It is sized to the AcceptBacklog which
@ -90,6 +93,7 @@ func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
bufRead: bufio.NewReader(conn),
pings: make(map[uint32]chan struct{}),
streams: make(map[uint32]*Stream),
inflight: make(map[uint32]struct{}),
synCh: make(chan struct{}, config.AcceptBacklog),
acceptCh: make(chan *Stream, config.AcceptBacklog),
sendCh: make(chan sendReady, 64),
@ -153,7 +157,7 @@ func (s *Session) OpenStream() (*Stream, error) {
}
GET_ID:
// Get and ID, and check for stream exhaustion
// Get an ID, and check for stream exhaustion
id := atomic.LoadUint32(&s.nextStreamID)
if id >= math.MaxUint32-1 {
return nil, ErrStreamsExhausted
@ -166,10 +170,16 @@ GET_ID:
stream := newStream(s, id, streamInit)
s.streamLock.Lock()
s.streams[id] = stream
s.inflight[id] = struct{}{}
s.streamLock.Unlock()
// Send the window update to create
if err := stream.sendWindowUpdate(); err != nil {
select {
case <-s.synCh:
default:
s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore")
}
return nil, err
}
return stream, nil
@ -580,19 +590,34 @@ func (s *Session) incomingStream(id uint32) error {
}
// closeStream is used to close a stream once both sides have
// issued a close.
// issued a close. If there was an in-flight SYN and the stream
// was not yet established, then this will give the credit back.
func (s *Session) closeStream(id uint32) {
s.streamLock.Lock()
if _, ok := s.inflight[id]; ok {
select {
case <-s.synCh:
default:
s.logger.Printf("[ERR] yamux: SYN tracking out of sync")
}
}
delete(s.streams, id)
s.streamLock.Unlock()
}
// establishStream is used to mark a stream that was in the
// SYN Sent state as established.
func (s *Session) establishStream() {
func (s *Session) establishStream(id uint32) {
s.streamLock.Lock()
if _, ok := s.inflight[id]; ok {
delete(s.inflight, id)
} else {
s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)")
}
select {
case <-s.synCh:
default:
panic("established stream without inflight syn")
s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)")
}
s.streamLock.Unlock()
}

View file

@ -96,7 +96,7 @@ Because we are relying on the reliable stream underneath, a connection
can begin sending data once the SYN flag is sent. The corresponding
ACK does not need to be received. This is particularly well suited
for an RPC system where a client wants to open a stream and immediately
fire a request without wiating for the RTT of the ACK.
fire a request without waiting for the RTT of the ACK.
This does introduce the possibility of a connection being rejected
after data has been sent already. This is a slight semantic difference

View file

@ -327,7 +327,7 @@ func (s *Stream) processFlags(flags uint16) error {
if s.state == streamSYNSent {
s.state = streamEstablished
}
s.session.establishStream()
s.session.establishStream(s.id)
}
if flags&flagFIN == flagFIN {
switch s.state {
@ -348,9 +348,6 @@ func (s *Stream) processFlags(flags uint16) error {
}
}
if flags&flagRST == flagRST {
if s.state == streamSYNSent {
s.session.establishStream()
}
s.state = streamReset
closeStream = true
s.notifyWaiting()

4
vendor/vendor.json vendored
View file

@ -459,8 +459,10 @@
"revision": "c4c55f16bae1aed9b355ad655d3ebf0215734461"
},
{
"checksumSHA1": "xvxetwF2G1XHScrmo8EM3yisjBc=",
"path": "github.com/hashicorp/yamux",
"revision": "df949784da9ed028ee76df44652e42d37a09d7e4"
"revision": "172cde3b6ca5c154ff4e6e2ef96b7451332a9946",
"revisionTime": "2016-05-19T16:00:42Z"
},
{
"comment": "0.2.2-2-gc01cf91",