Merge pull request #2259 from hashicorp/f-retire-muxado

Removes support for muxado and protocol version 1.
This commit is contained in:
James Phillips 2016-08-09 18:25:17 -07:00 committed by GitHub
commit 4b35a19a83
34 changed files with 14 additions and 2606 deletions

View file

@ -14,27 +14,14 @@ import (
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/yamux"
"github.com/inconshreveable/muxado"
)
// muxSession is used to provide an interface for either muxado or yamux
// muxSession is used to provide an interface for a stream multiplexer.
type muxSession interface {
Open() (net.Conn, error)
Close() error
}
type muxadoWrapper struct {
m muxado.Session
}
func (w *muxadoWrapper) Open() (net.Conn, error) {
return w.m.Open()
}
func (w *muxadoWrapper) Close() error {
return w.m.Close()
}
// streamClient is used to wrap a stream with an RPC client
type StreamClient struct {
stream net.Conn
@ -295,15 +282,8 @@ func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, err
// Switch the multiplexing based on version
var session muxSession
if version < 2 {
// Write the Consul multiplex byte to set the mode
if _, err := conn.Write([]byte{byte(rpcMultiplex)}); err != nil {
conn.Close()
return nil, err
}
// Create a multiplexed session
session = &muxadoWrapper{muxado.Client(conn)}
conn.Close()
return nil, fmt.Errorf("cannot make client connection, unsupported protocol version %d", version)
} else {
// Write the Consul multiplex byte to set the mode
if _, err := conn.Write([]byte{byte(rpcMultiplexV2)}); err != nil {

View file

@ -17,7 +17,6 @@ import (
"github.com/hashicorp/memberlist"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/yamux"
"github.com/inconshreveable/muxado"
)
type RPCType byte
@ -25,7 +24,7 @@ type RPCType byte
const (
rpcConsul RPCType = iota
rpcRaft
rpcMultiplex
rpcMultiplex // Old Muxado byte, no longer supported.
rpcTLS
rpcMultiplexV2
)
@ -108,9 +107,6 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
metrics.IncrCounter([]string{"consul", "rpc", "raft_handoff"}, 1)
s.raftLayer.Handoff(conn)
case rpcMultiplex:
s.handleMultiplex(conn)
case rpcTLS:
if s.rpcTLS == nil {
s.logger.Printf("[WARN] consul.rpc: TLS connection attempted, server not configured for TLS %s", logConn(conn))
@ -130,23 +126,6 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
}
}
// handleMultiplex is used to multiplex a single incoming connection
// using the Muxado multiplexer
func (s *Server) handleMultiplex(conn net.Conn) {
defer conn.Close()
server := muxado.Server(conn)
for {
sub, err := server.Accept()
if err != nil {
if !strings.Contains(err.Error(), "closed") {
s.logger.Printf("[ERR] consul.rpc: multiplex conn accept failed: %v %s", err, logConn(conn))
}
return
}
go s.handleConsulConn(sub)
}
}
// handleMultiplexV2 is used to multiplex a single incoming connection
// using the Yamux multiplexer
func (s *Server) handleMultiplexV2(conn net.Conn) {

View file

@ -30,7 +30,7 @@ import (
// Consul-level protocol versions, that are used to configure the Serf
// protocol versions.
const (
ProtocolVersionMin uint8 = 1
ProtocolVersionMin uint8 = 2
// Version 3 added support for network coordinates but we kept the
// default protocol version at 2 to ease the transition to this new

View file

@ -1,13 +0,0 @@
Copyright 2013 Alan Shreve
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View file

@ -1,122 +0,0 @@
# muxado - Stream multiplexing for Go
## What is stream multiplexing?
Imagine you have a single stream (a bi-directional stream of bytes) like a TCP connection. Stream multiplexing
is a method for enabling the transmission of multiple simultaneous streams over the one underlying transport stream.
## What is muxado?
muxado is an implementation of a stream multiplexing library in Go that can be layered on top of a net.Conn to multiplex that stream.
muxado's protocol is not currently documented explicitly, but it is very nearly an implementation of the HTTP2
framing layer with all of the HTTP-specific bits removed. It is heavily inspired by HTTP2, SPDY, and WebMUX.
## How does it work?
Simplifying, muxado chunks data sent over each multiplexed stream and transmits each piece
as a "frame" over the transport stream. It then sends these frames,
often interleaving data for multiple streams, to the remote side.
The remote endpoint then reassembles the frames into distinct streams
of data which are presented to the application layer.
## What good is it anyways?
A stream multiplexing library is a powerful tool for an application developer's toolbox which solves a number of problems:
- It allows developers to implement asynchronous/pipelined protocols with ease. Instead of matching requests with responses in your protocols, just open a new stream for each request and communicate over that.
- muxado can do application-level keep-alives and dead-session detection so that you don't have to write heartbeat code ever again.
- You never need to build connection pools for services running your protocol. You can open as many independent, concurrent streams as you need without incurring any round-trip latency costs.
- muxado allows the server to initiate new streams to clients which is normally very difficult without NAT-busting trickery.
## Show me the code!
As much as possible, the muxado library strives to look and feel just like the standard library's net package. Here's how you initiate a new client session:
sess, err := muxado.DialTLS("tcp", "example.com:1234", tlsConfig)
And a server:
l, err := muxado.ListenTLS("tcp", ":1234", tlsConfig))
for {
sess, err := l.Accept()
go handleSession(sess)
}
Once you have a session, you can open new streams on it:
stream, err := sess.Open()
And accept streams opened by the remote side:
stream, err := sess.Accept()
Streams satisfy the net.Conn interface, so they're very familiar to work with:
n, err := stream.Write(buf)
n, err = stream.Read(buf)
muxado sessions and streams implement the net.Listener and net.Conn interfaces (with a small shim), so you can use them with existing golang libraries!
sess, err := muxado.DialTLS("tcp", "example.com:1234", tlsConfig)
http.Serve(sess.NetListener(), handler)
## A more extensive muxado client
// open a new session to a remote endpoint
sess, err := muxado.Dial("tcp", "example.com:1234")
if err != nil {
panic(err)
}
// handle streams initiated by the server
go func() {
for {
stream, err := sess.Accept()
if err != nil {
panic(err)
}
go handleStream(stream)
}
}()
// open new streams for application requests
for req := range requests {
str, err := sess.Open()
if err != nil {
panic(err)
}
go func(stream muxado.Stream) {
defer stream.Close()
// send request
if _, err = stream.Write(req.serialize()); err != nil {
panic(err)
}
// read response
if buf, err := ioutil.ReadAll(stream); err != nil {
panic(err)
}
handleResponse(buf)
}(str)
}
## How did you build it?
muxado is a modified implementation of the HTTP2 framing protocol with all of the HTTP-specific bits removed. It aims
for simplicity in the protocol by removing everything that is not core to multiplexing streams. The muxado code
is also built with the intention that its performance should be moderately good within the bounds of working in Go. As a result,
muxado does contain some unidiomatic code.
## API documentation
API documentation is available on godoc.org:
[muxado API documentation](https://godoc.org/github.com/inconshreveable/muxado)
## What are its biggest drawbacks?
Any stream-multiplexing library over TCP will suffer from head-of-line blocking if the next packet to service gets dropped.
muxado is also a poor choice when sending large payloads and speed is a priority.
It shines best when the application workload needs to quickly open a large number of small-payload streams.
## Status
Most of muxado's features are implemented (and tested!), but there are many that are still rough or could be improved. See the TODO file for suggestions on what needs to improve.
## License
Apache

View file

@ -1,35 +0,0 @@
improve the formatting of the docs to look nice for godoc
use a better example in the docs first before showing the clever integration with the net.Listener/net.Conn APIs
Make all errors support Temporary() API so applications can better decide what to do
Handle case of running out of stream ids + test
writeFrame errors should kill the session, but only if it's not a timeout + test
Short read should cause an error + test
Decrement() in outBuffer needs to have deadline support
Extensions:
Heartbeat extension needs tests
Make extensions a public API instead of a private API
Document how extensions work
Don't include any extensions by default
heartbeat test
Finish writing buffer tests
Write stress test
Write multi-frame write test
More session tests
More stream tests
Write frame/transport tests - verify read correct type, verify unknown type causes error, verify ioerror is propogated
Write frame/syn tests
Write frame/goaway tests
### Low priority:
- Add the ability to differentiate stream errors which allow you to safely retry
- Decide what to do if the application isn't handling its accepted streams fast enough. Refuse stream? Wait and block reading more frames?
- Figure out whether to die with/without lock - in GoAway/OpenStream
- Add priority APIs to stream
- Add priority extension
- Add Reset() stream API
- Eliminate unlikely race on s.remoteDebug between handleFrame() and die()
- Should writeFrame calls for rst/wndinc set the write deadline?
- don't send reset if the stream is fully closed
- include muxado pun somewhere in the docs

View file

@ -1,54 +0,0 @@
package muxado
import (
"github.com/inconshreveable/muxado/proto"
"github.com/inconshreveable/muxado/proto/frame"
)
// streamAdaptor recasts the types of some function calls by the proto/Stream implementation
// so that it satisfies the public interface
type streamAdaptor struct {
proto.IStream
}
func (a *streamAdaptor) Id() StreamId {
return StreamId(a.IStream.Id())
}
func (a *streamAdaptor) StreamType() StreamType {
return StreamType(a.IStream.StreamType())
}
func (a *streamAdaptor) Session() Session {
return &sessionAdaptor{a.IStream.Session()}
}
// sessionAdaptor recasts the types of some function calls by the proto/Session implementation
// so that it satisfies the public interface
type sessionAdaptor struct {
proto.ISession
}
func (a *sessionAdaptor) Accept() (Stream, error) {
str, err := a.ISession.Accept()
return &streamAdaptor{str}, err
}
func (a *sessionAdaptor) Open() (Stream, error) {
str, err := a.ISession.Open()
return &streamAdaptor{str}, err
}
func (a *sessionAdaptor) OpenStream(priority StreamPriority, streamType StreamType, fin bool) (Stream, error) {
str, err := a.ISession.OpenStream(frame.StreamPriority(priority), frame.StreamType(streamType), fin)
return &streamAdaptor{str}, err
}
func (a *sessionAdaptor) GoAway(code ErrorCode, debug []byte) error {
return a.ISession.GoAway(frame.ErrorCode(code), debug)
}
func (a *sessionAdaptor) Wait() (ErrorCode, error, []byte) {
code, err, debug := a.ISession.Wait()
return ErrorCode(code), err, debug
}

View file

@ -1,32 +0,0 @@
package muxado
import (
"crypto/tls"
"github.com/inconshreveable/muxado/proto"
"github.com/inconshreveable/muxado/proto/ext"
"net"
)
// Client returns a new muxado client-side connection using conn as the transport.
func Client(conn net.Conn) Session {
return &sessionAdaptor{proto.NewSession(conn, proto.NewStream, true, []proto.Extension{ext.NewDefaultHeartbeat()})}
}
// Dial opens a new connection to the given network/address and then beings a muxado client session on it.
func Dial(network, addr string) (sess Session, err error) {
conn, err := net.Dial(network, addr)
if err != nil {
return
}
return Client(conn), nil
}
// DialTLS opens a new TLS encrytped connection with the givent configuration
// to the network/address and then beings a muxado client session on it.
func DialTLS(network, addr string, tlsConfig *tls.Config) (sess Session, err error) {
conn, err := tls.Dial(network, addr, tlsConfig)
if err != nil {
return
}
return Client(conn), nil
}

View file

@ -1,57 +0,0 @@
// muxado is an implementation of a general-purpose stream-multiplexing protocol.
//
// muxado allows clients applications to multiplex a single stream-oriented connection,
// like a TCP connection, and communicate over many streams on top of it. muxado accomplishes
// this by chunking data sent over each stream into frames and then reassembling the
// frames and buffering the data before being passed up to the application
// layer on the other side.
//
// muxado is very nearly an exact implementation of the HTTP2 framing layer while leaving out all
// the HTTP-specific parts. It is heavily inspired by HTTP2/SPDY/WebMUX.
//
// muxado's documentation uses the following terms consistently for easier communication:
// - "a transport" is an underlying stream (typically TCP) over which frames are sent between
// endpoints
// - "a stream" is any of the full-duplex byte-streams multiplexed over the transport
// - "a session" refers to an instance of the muxado protocol running over a transport between
// two endpoints
//
// Perhaps the best part of muxado is the interface exposed to client libraries. Since new
// streams may be initiated by both sides at any time, a muxado.Session implements the net.Listener
// interface (almost! Go unfortunately doesn't support covariant interface satisfaction so there's
// a shim). Each muxado stream implements the net.Conn interface. This allows you to integrate
// muxado into existing code which works with these interfaces (which is most Golang networking code)
// with very little difficulty. Consider the following toy example. Here we'll initiate a new secure
// connection to a server, and then ask it which application it wants via an HTTP request over a muxado stream
// and then serve an entire HTTP application *to the server*.
//
//
// sess, err := muxado.DialTLS("tcp", "example.com:1234", new(tls.Config))
// client := &http.Client{Transport: &http.Transport{Dial: sess.NetDial}}
// resp, err := client.Get("http://example.com/appchoice")
// switch getChoice(resp.Body) {
// case "foo":
// http.Serve(sess.NetListener(), fooHandler)
// case "bar":
// http.Serve(sess.NetListener(), barHandler)
// }
//
//
// In addition to enabling multiple streams over a single connection, muxado enables other
// behaviors which can be useful to the application layer:
// - Both sides of a muxado session may initiate new streams
// - muxado can transparently run application-level heartbeats and timeout dead sessions
// - When connections fail, muxado indicates to the application which streams may be safely retried
// - muxado supports prioritizing streams to maximize useful throughput when bandwidth-constrained
//
// A few examples of what these capabilities might make muxado useful for:
// - eliminating custom async/pipeling code for your protocols
// - eliminating connection pools in your protocols
// - eliminating custom NAT traversal logic for enabling server-initiated streams
//
// muxado has been tuned to be very performant within the limits of what you can expect of pure-Go code.
// Some of muxado's code looks unidiomatic in the quest for better performance. (Locks over channels, never allocating
// from the heap, etc). muxado will typically outperform TCP connections when rapidly initiating many new
// streams with small payloads. When sending a large payload over a single stream, muxado's worst case, it can
// be 2-3x slower and does not parallelize well.
package muxado

View file

@ -1,115 +0,0 @@
package muxado
import (
"github.com/inconshreveable/muxado/proto/frame"
"net"
"time"
)
type StreamId frame.StreamId
type StreamPriority frame.StreamPriority
type StreamType frame.StreamType
type ErrorCode frame.ErrorCode
// Stream is a full duplex stream-oriented connection that is multiplexed over a Session.
// Stream implement the net.Conn inteface.
type Stream interface {
// Write writes the bytes in the given buffer to the stream
Write([]byte) (int, error)
// Read reads the next bytes on the stream into the given buffer
Read([]byte) (int, error)
// Close closes the stream. It attempts to behave as Close does for a TCP conn in that it
// half-closes the stream for sending, and it will send an RST if any more data is received
// from the remote side.
Close() error
// SetDeadline sets a time after which future Read and Write operations will fail.
SetDeadline(time.Time) error
// SetReadDeadline sets a time after which future Read operations will fail.
SetReadDeadline(time.Time) error
// SetWriteDeadline sets a time after which future Write operations will fail.
SetWriteDeadline(time.Time) error
// HalfClose sends a data frame with a fin flag set to half-close the stream from the local side.
HalfClose([]byte) (int, error)
// Id returns the stream's id.
Id() StreamId
// StreamType returns the stream's type
StreamType() StreamType
// Session returns the session object this stream is running on.
Session() Session
// RemoteAddr returns the session transport's remote address.
RemoteAddr() net.Addr
// LocalAddr returns the session transport's local address.
LocalAddr() net.Addr
}
// Session multiplexes many Streams over a single underlying stream transport.
// Both sides of a muxado session can open new Streams. Sessions can also accept
// new streams from the remote side.
//
// A muxado Session implements the net.Listener interface, returning new Streams from the remote side.
type Session interface {
// Open initiates a new stream on the session. It is equivalent to OpenStream(0, 0, false)
Open() (Stream, error)
// OpenStream initiates a new stream on the session. A caller can specify a stream's priority and an opaque stream type.
// Setting fin to true will cause the stream to be half-closed from the local side immediately upon creation.
OpenStream(priority StreamPriority, streamType StreamType, fin bool) (Stream, error)
// Accept returns the next stream initiated by the remote side
Accept() (Stream, error)
// Kill closes the underlying transport stream immediately.
//
// You SHOULD always perfer to call Close() instead so that the connection
// closes cleanly by sending a GoAway frame.
Kill() error
// Close instructs the session to close cleanly, sending a GoAway frame if one hasn't already been sent.
//
// This implementation does not "linger". Pending writes on streams may fail.
//
// You MAY call Close() more than once. Each time after
// the first, Close() will return an error.
Close() error
// GoAway instructs the other side of the connection to stop
// initiating new streams by sending a GoAway frame. Most clients
// will just call Close(), but you may want explicit control of this
// in order to facilitate clean shutdowns.
//
// You MAY call GoAway() more than once. Each time after the first,
// GoAway() will return an error.
GoAway(ErrorCode, []byte) error
// LocalAddr returns the local address of the transport stream over which the session is running.
LocalAddr() net.Addr
// RemoteAddr returns the address of the remote side of the transport stream over which the session is running.
RemoteAddr() net.Addr
// Wait blocks until the session has shutdown and returns the error code for session termination. It also
// returns the error that caused the session to terminate as well as any debug information sent in the GoAway
// frame by the remote side.
Wait() (code ErrorCode, err error, debug []byte)
// NetListener returns an adaptor object which allows this Session to be used as a net.Listener. The returned
// net.Listener returns new streams initiated by the remote side as net.Conn's when calling Accept().
NetListener() net.Listener
// NetDial is a function that implements the same API as net.Dial and can be used in place of it. Users should keep
// in mind that it is the same as a call to Open(). It ignores both arguments passed to it, always initiate a new stream
// to the remote side.
NetDial(_, _ string) (net.Conn, error)
}

View file

@ -1,87 +0,0 @@
package buffer
import (
"errors"
"io"
)
var (
FullError = errors.New("Buffer is full")
)
// Reads as much data
func readInto(rd io.Reader, p []byte) (n int, err error) {
var nr int
for n < len(p) {
nr, err = rd.Read(p[n:])
n += nr
if err != nil {
return
}
}
return
}
// A circular buffer on top of a byte-array
// NOTE: It does not implement the Write() method, it implements ReadFrom()
// to avoid copies
type Circular struct {
buf []byte // the bytes
size int // == len(buf)
head int // index of the next byte to read
tail int // index of the last byte available to read
}
// Returns a new circular buffer of the given size
func NewCircular(size int) *Circular {
return &Circular{
buf: make([]byte, size+1),
size: size + 1,
}
}
// Copy data from the given reader into the buffer
// Any errors encountered while reading are returned EXCEPT io.EOF.
// If the reader fills the buffer, it returns buffer.FullError
func (c *Circular) ReadFrom(rd io.Reader) (n int, err error) {
// IF:
// [---H+++T--]
if c.tail >= c.head {
n, err = readInto(rd, c.buf[c.tail:])
c.tail = (c.tail + n) % c.size
if err == io.EOF {
return n, nil
} else if err != nil {
return
}
}
// NOW:
// [T---H++++] or [++T--H+++]
n2, err := readInto(rd, c.buf[c.tail:c.head])
n += n2
c.tail += n2
if err == nil {
err = FullError
} else if err == io.EOF {
err = nil
}
return
}
// Read data out of the buffer. This never fails but may
// return n==0 if there is no data to be read
func (c *Circular) Read(p []byte) (n int, err error) {
if c.head > c.tail {
n = copy(p, c.buf[c.head:])
c.head = (c.head + n) % c.size
if c.head != 0 {
return
}
}
n2 := copy(p[n:], c.buf[c.head:c.tail])
n += n2
c.head += n2
return
}

View file

@ -1,160 +0,0 @@
package buffer
import (
"errors"
"io"
"io/ioutil"
"sync"
"time"
)
var (
AlreadyClosed = errors.New("Buffer already closed")
)
// A specialized concurrent circular buffer intended to buffer a stream's inbound data with the following properties:
// - Minimizes copies by skipping the buffer if a write occurs while a reader is waiting
// - Provides a mechnaism to time out reads after a deadline
// - Provides a mechanism to set an 'error' that will fail reads when the buffer is empty
type waitingReader struct {
buf []byte
n int
}
type Inbound struct {
*Circular
*sync.Cond
err error
waitingReader
deadline time.Time
timer *time.Timer
}
func NewInbound(size int) *Inbound {
return &Inbound{
Circular: NewCircular(size),
Cond: sync.NewCond(new(sync.Mutex)),
}
}
func (b *Inbound) SetDeadline(t time.Time) {
b.L.Lock()
// set the deadline
b.deadline = t
// how long until the deadline
delay := t.Sub(time.Now())
if b.timer != nil {
b.timer.Stop()
}
// after the delay, wake up waiters
b.timer = time.AfterFunc(delay, func() {
b.Broadcast()
})
b.L.Unlock()
}
func (b *Inbound) SetError(err error) {
b.L.Lock()
b.err = err
b.Broadcast()
b.L.Unlock()
}
func (b *Inbound) GetError() (err error) {
b.L.Lock()
err = b.err
b.L.Unlock()
return
}
func (b *Inbound) ReadFrom(rd io.Reader) (n int, err error) {
b.L.Lock()
if b.err != nil {
b.L.Unlock()
if _, err = ioutil.ReadAll(rd); err != nil {
return
}
return 0, AlreadyClosed
}
// write directly to a reader's buffer, if possible
if b.waitingReader.buf != nil {
b.waitingReader.n, err = readInto(rd, b.waitingReader.buf)
n += b.waitingReader.n
b.waitingReader.buf = nil
if err != nil {
if err == io.EOF {
// EOF is not an error
err = nil
}
b.Broadcast()
b.L.Unlock()
return
}
}
// write the rest to buffer
var writeN int
writeN, err = b.Circular.ReadFrom(rd)
n += writeN
b.Broadcast()
b.L.Unlock()
return
}
func (b *Inbound) Read(p []byte) (n int, err error) {
b.L.Lock()
var wait *waitingReader
for {
// we got a direct write to our buffer
if wait != nil && wait.n != 0 {
n = wait.n
break
}
// check for timeout
if !b.deadline.IsZero() {
if time.Now().After(b.deadline) {
err = errors.New("Read timeout")
break
}
}
// try to read from the buffer
n, _ = b.Circular.Read(p)
// successfully read some data
if n != 0 {
break
}
// there's an error
if b.err != nil {
err = b.err
break
}
// register for a direct write
if b.waitingReader.buf == nil {
wait = &b.waitingReader
wait.buf = p
wait.n = 0
}
// no data, wait
b.Wait()
}
b.L.Unlock()
return
}

View file

@ -1,59 +0,0 @@
package buffer
import (
"sync"
)
type Outbound struct {
val int
err error
*sync.Cond
}
func NewOutbound(size int) *Outbound {
return &Outbound{val: size, Cond: sync.NewCond(new(sync.Mutex))}
}
func (b *Outbound) Increment(inc int) {
b.L.Lock()
b.val += inc
b.Broadcast()
b.L.Unlock()
}
func (b *Outbound) SetError(err error) {
b.L.Lock()
b.err = err
b.Broadcast()
b.L.Unlock()
}
func (b *Outbound) Decrement(dec int) (ret int, err error) {
if dec == 0 {
return
}
b.L.Lock()
for {
if b.err != nil {
err = b.err
break
}
if b.val > 0 {
if dec > b.val {
ret = b.val
b.val = 0
break
} else {
b.val -= dec
ret = dec
break
}
} else {
b.Wait()
}
}
b.L.Unlock()
return
}

View file

@ -1,10 +0,0 @@
package buffer
func BothClosed(in *Inbound, out *Outbound) (closed bool) {
in.L.Lock()
out.L.Lock()
closed = (in.err != nil && out.err != nil)
out.L.Unlock()
in.L.Unlock()
return
}

View file

@ -1,9 +0,0 @@
package ext
import (
"github.com/inconshreveable/muxado/proto"
)
const (
heartbeatExtensionType = proto.MinExtensionType + iota
)

View file

@ -1,125 +0,0 @@
package ext
// XXX: There's no logging around heartbeats - how can we do this in a way that is useful
// as a library?
//
// XXX: When we close the session because of a lost heartbeat or because of an error in the
// heartbeating, there is no way to tell that; a Session will just appear to stop working
import (
"encoding/binary"
proto "github.com/inconshreveable/muxado/proto"
"github.com/inconshreveable/muxado/proto/frame"
"io"
"time"
)
const (
defaultHeartbeatInterval = 3 * time.Second
defaultHeartbeatTolerance = 10 * time.Second
)
type Heartbeat struct {
sess proto.ISession
accept proto.ExtAccept
mark chan int
interval time.Duration
tolerance time.Duration
}
func NewDefaultHeartbeat() *Heartbeat {
return NewHeartbeat(defaultHeartbeatInterval, defaultHeartbeatTolerance)
}
func NewHeartbeat(interval, tolerance time.Duration) *Heartbeat {
return &Heartbeat{
mark: make(chan int),
interval: interval,
tolerance: tolerance,
}
}
func (h *Heartbeat) Start(sess proto.ISession, accept proto.ExtAccept) frame.StreamType {
h.sess = sess
h.accept = accept
go h.respond()
go h.request()
go h.check()
return heartbeatExtensionType
}
func (h *Heartbeat) check() {
t := time.NewTimer(h.interval + h.tolerance)
for {
select {
case <-t.C:
// time out waiting for a response!
h.sess.Close()
return
case <-h.mark:
t.Reset(h.interval + h.tolerance)
}
}
}
func (h *Heartbeat) respond() {
// close the session on any errors
defer h.sess.Close()
stream, err := h.accept()
if err != nil {
return
}
// read the next heartbeat id and respond
buf := make([]byte, 4)
for {
_, err := io.ReadFull(stream, buf)
if err != nil {
return
}
_, err = stream.Write(buf)
if err != nil {
return
}
}
}
func (h *Heartbeat) request() {
// close the session on any errors
defer h.sess.Close()
// request highest possible priority for heartbeats
priority := frame.StreamPriority(0x7FFFFFFF)
stream, err := h.sess.OpenStream(priority, heartbeatExtensionType, false)
if err != nil {
return
}
// send heartbeats and then check that we got them back
var id uint32
for {
time.Sleep(h.interval)
if err := binary.Write(stream, binary.BigEndian, id); err != nil {
return
}
var respId uint32
if err := binary.Read(stream, binary.BigEndian, &respId); err != nil {
return
}
if id != respId {
return
}
// record the time
h.mark <- 1
}
}

View file

@ -1,68 +0,0 @@
package frame
import (
"io"
)
const (
// data frames are actually longer, but they are variable length
dataFrameSize = headerSize
)
type RStreamData struct {
Header
fixed [dataFrameSize]byte
toRead io.LimitedReader // when reading, the underlying connection's io.Reader is handed up
}
func (f *RStreamData) Reader() io.Reader {
return &f.toRead
}
func (f *RStreamData) readFrom(d deserializer) (err error) {
// not using io.LimitReader to avoid a heap memory allocation in the hot path
f.toRead.R = d
f.toRead.N = int64(f.Length())
return
}
// WStreamData is a StreamData frame that you can write
// It delivers opaque data on a stream to the application layer
type WStreamData struct {
Header
fixed [dataFrameSize]byte
toWrite []byte // when writing, you just pass a byte slice to write
}
func (f *WStreamData) writeTo(s serializer) (err error) {
if _, err = s.Write(f.fixed[:]); err != nil {
return err
}
if _, err = s.Write(f.toWrite); err != nil {
return err
}
return
}
func (f *WStreamData) Set(streamId StreamId, data []byte, fin bool) (err error) {
var flags flagsType
if fin {
flags.Set(flagFin)
}
if err = f.Header.SetAll(TypeStreamData, len(data), streamId, flags); err != nil {
return
}
f.toWrite = data
return
}
func NewWStreamData() (f *WStreamData) {
f = new(WStreamData)
f.Header = f.fixed[:headerSize]
return
}

View file

@ -1,37 +0,0 @@
package frame
import "fmt"
import "io"
type DebugTransport struct {
prefix string
*BasicTransport
}
func (t *DebugTransport) Write(buf []byte) (int, error) {
fmt.Printf("%v writes %d bytes: %x\n", t.prefix, len(buf), buf)
return t.BasicTransport.Write(buf)
}
func (t *DebugTransport) WriteFrame(frame WFrame) (err error) {
// each frame knows how to write iteself to the framer
return frame.writeTo(t)
}
func (t *DebugTransport) ReadFrame() (f RFrame, err error) {
f, err = t.BasicTransport.ReadFrame()
fmt.Printf("%v reads Header length: %v\n", t.prefix, t.Header.Length())
fmt.Printf("%v reads Header type: %v\n", t.prefix, t.Header.Type())
fmt.Printf("%v reads Header stream id: %v\n", t.prefix, t.Header.StreamId())
fmt.Printf("%v reads Header fin: %v\n", t.prefix, t.Header.Fin())
return
}
func NewDebugTransport(rwc io.ReadWriteCloser, prefix string) *DebugTransport {
trans := &DebugTransport{
prefix: prefix,
BasicTransport: &BasicTransport{ReadWriteCloser: rwc, Header: make([]byte, headerSize)},
}
return trans
}

View file

@ -1,25 +0,0 @@
package frame
import (
"fmt"
)
const (
NoError = iota
ProtocolError
InternalError
FlowControlError
StreamClosed
FrameSizeError
RefusedStream
Cancel
NoSuchError
)
type FramingError struct {
error
}
func protoError(fmtstr string, args ...interface{}) FramingError {
return FramingError{fmt.Errorf(fmtstr, args...)}
}

View file

@ -1,79 +0,0 @@
package frame
import "io"
const (
goAwayBodySize = 8
goAwayFrameSize = headerSize + goAwayBodySize
)
// Instruct the remote side not to initiate new streams
type RGoAway struct {
Header
body [goAwayBodySize]byte
debug []byte
}
func (f *RGoAway) LastStreamId() StreamId {
return StreamId(order.Uint32(f.body[0:]) & streamMask)
}
func (f *RGoAway) ErrorCode() ErrorCode {
return ErrorCode(order.Uint32(f.body[4:]))
}
func (f *RGoAway) Debug() []byte {
return f.debug
}
func (f *RGoAway) readFrom(d deserializer) (err error) {
if _, err = io.ReadFull(d, f.body[:]); err != nil {
return
}
f.debug = make([]byte, f.Length()-goAwayBodySize)
if _, err = io.ReadFull(d, f.debug); err != nil {
return
}
return
}
type WGoAway struct {
Header
data [goAwayFrameSize]byte
debug []byte
}
func (f *WGoAway) writeTo(s serializer) (err error) {
if _, err = s.Write(f.data[:]); err != nil {
return
}
if _, err = s.Write(f.debug); err != nil {
return
}
return
}
func (f *WGoAway) Set(lastStreamId StreamId, errorCode ErrorCode, debug []byte) (err error) {
if f.Header.SetAll(TypeGoAway, len(debug)+goAwayFrameSize, 0, 0); err != nil {
return
}
if lastStreamId > streamMask {
err = protoError("Related stream id %d is out of range", lastStreamId)
return
}
order.PutUint32(f.data[headerSize:], uint32(lastStreamId))
order.PutUint32(f.data[headerSize+4:], uint32(errorCode))
return
}
func NewWGoAway() (f *WGoAway) {
f = new(WGoAway)
f.Header = Header(f.data[:headerSize])
return
}

View file

@ -1,85 +0,0 @@
package frame
import "io"
const (
headerSize = 8
)
type Header []byte
func newHeader() Header {
return Header(make([]byte, headerSize))
}
func (b Header) readFrom(d deserializer) (err error) {
// read the header
if _, err = io.ReadFull(d, []byte(b)); err != nil {
return err
}
return
}
func (b Header) Length() uint16 {
return order.Uint16(b[:2]) & lengthMask
}
func (b Header) SetLength(length int) (err error) {
if length > lengthMask || length < 0 {
return protoError("Frame length %d out of range", length)
}
order.PutUint16(b[:2], uint16(length))
return
}
func (b Header) Type() FrameType {
return FrameType((b[3]) & typeMask)
}
func (b Header) SetType(t FrameType) (err error) {
b[3] = byte(t & typeMask)
return
}
func (b Header) StreamId() StreamId {
return StreamId(order.Uint32(b[4:]) & streamMask)
}
func (b Header) SetStreamId(streamId StreamId) (err error) {
if streamId > streamMask {
return protoError("Stream id %d out of range", streamId)
}
order.PutUint32(b[4:], uint32(streamId))
return
}
func (b Header) Flags() flagsType {
return flagsType(b[2])
}
func (b Header) SetFlags(fl flagsType) (err error) {
b[2] = byte(fl)
return
}
func (b Header) Fin() bool {
return b.Flags().IsSet(flagFin)
}
func (b Header) SetAll(ftype FrameType, length int, streamId StreamId, flags flagsType) (err error) {
if err = b.SetType(ftype); err != nil {
return
}
if err = b.SetLength(length); err != nil {
return
}
if err = b.SetStreamId(streamId); err != nil {
return
}
if err = b.SetFlags(flags); err != nil {
return
}
return
}

View file

@ -1,25 +0,0 @@
package frame
import (
"io"
)
type Transport interface {
WriteFrame(WFrame) error
ReadFrame() (RFrame, error)
Close() error
}
// A frame can read and write itself to a serializer/deserializer
type RFrame interface {
StreamId() StreamId
Type() FrameType
readFrom(deserializer) error
}
type WFrame interface {
writeTo(serializer) error
}
type deserializer io.Reader
type serializer io.Writer

View file

@ -1,67 +0,0 @@
package frame
import "io"
const (
rstBodySize = 4
rstFrameSize = headerSize + rstBodySize
)
// RsStreamRst is a STREAM_RST frame that is read from a transport
type RStreamRst struct {
Header
body [rstBodySize]byte
}
func (f *RStreamRst) readFrom(d deserializer) (err error) {
if f.Length() != rstBodySize {
return protoError("STREAM_RST length must be %d, got %d", rstBodySize, f.Length())
}
if _, err = io.ReadFull(d, f.body[:]); err != nil {
return
}
return
}
func (f *RStreamRst) ErrorCode() ErrorCode {
return ErrorCode(order.Uint32(f.body[0:]))
}
// WStreamRst is a STREAM_RST frame that can be written, it terminate a stream ungracefully
type WStreamRst struct {
Header
all [rstFrameSize]byte
}
func NewWStreamRst() (f *WStreamRst) {
f = new(WStreamRst)
f.Header = Header(f.all[:headerSize])
return
}
func (f *WStreamRst) writeTo(s serializer) (err error) {
_, err = s.Write(f.all[:])
return
}
func (f *WStreamRst) Set(streamId StreamId, errorCode ErrorCode) (err error) {
if err = f.Header.SetAll(TypeStreamRst, rstBodySize, streamId, 0); err != nil {
return
}
if err = validRstErrorCode(errorCode); err != nil {
return
}
order.PutUint32(f.all[headerSize:], uint32(errorCode))
return
}
func validRstErrorCode(errorCode ErrorCode) error {
if errorCode >= NoSuchError {
return protoError("Invalid error code %d for STREAM_RST", errorCode)
}
return nil
}

View file

@ -1,120 +0,0 @@
package frame
import (
"fmt"
"io"
)
const (
maxSynBodySize = 8
maxSynFrameSize = headerSize + maxSynBodySize
)
type RStreamSyn struct {
Header
body [maxSynBodySize]byte
streamPriority StreamPriority
streamType StreamType
}
// StreamType returns the stream's defined type as specified by
// the remote endpoint
func (f *RStreamSyn) StreamType() StreamType {
return f.streamType
}
// StreamPriority returns the stream priority set on this frame
func (f *RStreamSyn) StreamPriority() StreamPriority {
return f.streamPriority
}
func (f *RStreamSyn) parseFields() error {
var length uint16 = 0
flags := f.Flags()
if flags.IsSet(flagStreamPriority) {
f.streamPriority = StreamPriority(order.Uint32(f.body[length : length+4]))
length += 4
} else {
f.streamPriority = 0
}
if flags.IsSet(flagStreamType) {
f.streamType = StreamType(order.Uint32(f.body[length : length+4]))
length += 4
} else {
f.streamType = 0
}
if length != f.Length() {
return fmt.Errorf("Expected length %d for flags %v, but got %v", length, flags, f.Length())
}
return nil
}
func (f *RStreamSyn) readFrom(d deserializer) (err error) {
if _, err = io.ReadFull(d, f.body[:f.Length()]); err != nil {
return
}
if err = f.parseFields(); err != nil {
return
}
return
}
type WStreamSyn struct {
Header
data [maxSynFrameSize]byte
length int
}
func (f *WStreamSyn) writeTo(s serializer) (err error) {
_, err = s.Write(f.data[:headerSize+f.Length()])
return
}
func (f *WStreamSyn) Set(streamId StreamId, streamPriority StreamPriority, streamType StreamType, fin bool) (err error) {
var (
flags flagsType
length int = 0
)
// set fin bit
if fin {
flags.Set(flagFin)
}
if streamPriority != 0 {
if streamPriority > priorityMask {
err = protoError("Priority %d is out of range", streamPriority)
return
}
flags.Set(flagStreamPriority)
start := headerSize + length
order.PutUint32(f.data[start:start+4], uint32(streamPriority))
length += 4
}
if streamType != 0 {
flags.Set(flagStreamType)
start := headerSize + length
order.PutUint32(f.data[start:start+4], uint32(streamType))
length += 4
}
// make the frame
if err = f.Header.SetAll(TypeStreamSyn, length, streamId, flags); err != nil {
return
}
return
}
func NewWStreamSyn() (f *WStreamSyn) {
f = new(WStreamSyn)
f.Header = Header(f.data[:headerSize])
return
}

View file

@ -1,79 +0,0 @@
package frame
import (
"encoding/binary"
"io"
)
var (
order = binary.BigEndian
)
// BasicTransport can serialize/deserialize frames on an underlying
// net.Conn to implement the muxado protocol.
type BasicTransport struct {
io.ReadWriteCloser
Header
RStreamSyn
RStreamRst
RStreamData
RStreamWndInc
RGoAway
}
// WriteFrame writes the given frame to the underlying transport
func (t *BasicTransport) WriteFrame(frame WFrame) (err error) {
// each frame knows how to write iteself to the framer
err = frame.writeTo(t)
return
}
// ReadFrame reads the next frame from the underlying transport
func (t *BasicTransport) ReadFrame() (f RFrame, err error) {
// read the header
if _, err = io.ReadFull(t, []byte(t.Header)); err != nil {
return nil, err
}
switch t.Header.Type() {
case TypeStreamSyn:
frame := &t.RStreamSyn
frame.Header = t.Header
err = frame.readFrom(t)
return frame, err
case TypeStreamRst:
frame := &t.RStreamRst
frame.Header = t.Header
err = frame.readFrom(t)
return frame, err
case TypeStreamData:
frame := &t.RStreamData
frame.Header = t.Header
err = frame.readFrom(t)
return frame, err
case TypeStreamWndInc:
frame := &t.RStreamWndInc
frame.Header = t.Header
err = frame.readFrom(t)
return frame, err
case TypeGoAway:
frame := &t.RGoAway
frame.Header = t.Header
err = frame.readFrom(t)
return frame, err
default:
return nil, protoError("Illegal frame type: %d", t.Header.Type())
}
return
}
func NewBasicTransport(rwc io.ReadWriteCloser) *BasicTransport {
trans := &BasicTransport{ReadWriteCloser: rwc, Header: make([]byte, headerSize)}
return trans
}

View file

@ -1,61 +0,0 @@
package frame
const (
// offsets for packing/unpacking frames
lengthOffset = 32 + 16
flagsOffset = 32 + 8
typeOffset = 32 + 3
// masks for packing/unpacking frames
lengthMask = 0x3FFF
streamMask = 0x7FFFFFFF
flagsMask = 0xFF
typeMask = 0x1F
wndIncMask = 0x7FFFFFFF
priorityMask = 0x7FFFFFFF
)
// a frameType is a 5-bit integer in the frame header that identifies the type of frame
type FrameType uint8
const (
TypeStreamSyn = 0x1
TypeStreamRst = 0x2
TypeStreamData = 0x3
TypeStreamWndInc = 0x4
TypeStreamPri = 0x5
TypeGoAway = 0x6
)
// a flagsType is an 8-bit integer containing frame-specific flag bits in the frame header
type flagsType uint8
const (
flagFin = 0x1
flagStreamPriority = 0x2
flagStreamType = 0x4
)
func (ft flagsType) IsSet(f flagsType) bool {
return (ft & f) != 0
}
func (ft *flagsType) Set(f flagsType) {
*ft |= f
}
func (ft *flagsType) Unset(f flagsType) {
*ft = *ft &^ f
}
// StreamId is 31-bit integer uniquely identifying a stream within a session
type StreamId uint32
// StreamPriority is 31-bit integer specifying a stream's priority
type StreamPriority uint32
// StreamType is 32-bit integer specifying a stream's type
type StreamType uint32
// ErrorCode is a 32-bit integer indicating a error condition included in rst/goaway frames
type ErrorCode uint32

View file

@ -1,57 +0,0 @@
package frame
import "io"
const (
wndIncBodySize = 4
wndIncFrameSize = headerSize + wndIncBodySize
)
// Increase a stream's flow control window size
type RStreamWndInc struct {
Header
body [wndIncBodySize]byte
}
func (f *RStreamWndInc) WindowIncrement() (inc uint32) {
return order.Uint32(f.body[:]) & wndIncMask
}
func (f *RStreamWndInc) readFrom(d deserializer) (err error) {
if f.Length() != wndIncBodySize {
return protoError("WND_INC length must be %d, got %d", wndIncBodySize, f.Length())
}
_, err = io.ReadFull(d, f.body[:])
return
}
type WStreamWndInc struct {
Header
data [wndIncFrameSize]byte
}
func (f *WStreamWndInc) writeTo(s serializer) (err error) {
_, err = s.Write(f.data[:])
return
}
func (f *WStreamWndInc) Set(streamId StreamId, inc uint32) (err error) {
if inc > wndIncMask {
return protoError("Window increment %d out of range", inc)
}
order.PutUint32(f.data[headerSize:], inc)
if err = f.Header.SetAll(TypeStreamWndInc, wndIncBodySize, streamId, 0); err != nil {
return
}
return
}
func NewWStreamWndInc() (f *WStreamWndInc) {
f = new(WStreamWndInc)
f.Header = Header(f.data[:headerSize])
return
}

View file

@ -1,36 +0,0 @@
package proto
import (
"github.com/inconshreveable/muxado/proto/frame"
"net"
"time"
)
type IStream interface {
Write([]byte) (int, error)
Read([]byte) (int, error)
Close() error
SetDeadline(time.Time) error
SetReadDeadline(time.Time) error
SetWriteDeadline(time.Time) error
HalfClose([]byte) (int, error)
Id() frame.StreamId
StreamType() frame.StreamType
Session() ISession
RemoteAddr() net.Addr
LocalAddr() net.Addr
}
type ISession interface {
Open() (IStream, error)
OpenStream(frame.StreamPriority, frame.StreamType, bool) (IStream, error)
Accept() (IStream, error)
Kill() error
GoAway(frame.ErrorCode, []byte) error
LocalAddr() net.Addr
RemoteAddr() net.Addr
Close() error
Wait() (frame.ErrorCode, error, []byte)
NetListener() net.Listener
NetDial(_, _ string) (net.Conn, error)
}

View file

@ -1,474 +0,0 @@
package proto
import (
"fmt"
"github.com/inconshreveable/muxado/proto/frame"
"io"
"net"
"reflect"
"sync"
"sync/atomic"
"time"
)
const (
defaultWindowSize = 0x10000 // 64KB
defaultAcceptQueueDepth = 100
MinExtensionType = 0xFFFFFFFF - 0x100 // 512 extensions
)
// private interface for Sessions to call Streams
type stream interface {
IStream
handleStreamData(*frame.RStreamData)
handleStreamWndInc(*frame.RStreamWndInc)
handleStreamRst(*frame.RStreamRst)
closeWith(error)
}
// for extensions
type ExtAccept func() (IStream, error)
type Extension interface {
Start(ISession, ExtAccept) frame.StreamType
}
type deadReason struct {
errorCode frame.ErrorCode
err error
remoteDebug []byte
}
// factory function that creates new streams
type streamFactory func(id frame.StreamId, priority frame.StreamPriority, streamType frame.StreamType, finLocal bool, finRemote bool, windowSize uint32, sess session) stream
// checks the parity of a stream id (local vs remote, client vs server)
type parityFn func(frame.StreamId) bool
// state for each half of the session (remote and local)
type halfState struct {
goneAway int32 // true if that half of the stream has gone away
lastId uint32 // last id used/seen from one half of the session
}
// Session implements a simple streaming session manager. It has the following characteristics:
//
// - When closing the Session, it does not linger, all pending write operations will fail immediately.
// - It completely ignores stream priority when processing and writing frames
// - It offers no customization of settings like window size/ping time
type Session struct {
conn net.Conn // connection the transport is running over
transport frame.Transport // transport
streams StreamMap // all active streams
local halfState // client state
remote halfState // server state
syn *frame.WStreamSyn // STREAM_SYN frame for opens
wr sync.Mutex // synchronization when writing frames
accept chan stream // new streams opened by the remote
diebit int32 // true if we're dying
remoteDebug []byte // debugging data sent in the remote's GoAway frame
defaultWindowSize uint32 // window size when creating new streams
newStream streamFactory // factory function to make new streams
dead chan deadReason // dead
isLocal parityFn // determines if a stream id is local or remote
exts map[frame.StreamType]chan stream // map of extension stream type -> accept channel for the extension
}
func NewSession(conn net.Conn, newStream streamFactory, isClient bool, exts []Extension) ISession {
sess := &Session{
conn: conn,
transport: frame.NewBasicTransport(conn),
streams: NewConcurrentStreamMap(),
local: halfState{lastId: 0},
remote: halfState{lastId: 0},
syn: frame.NewWStreamSyn(),
diebit: 0,
defaultWindowSize: defaultWindowSize,
accept: make(chan stream, defaultAcceptQueueDepth),
newStream: newStream,
dead: make(chan deadReason, 1), // don't block die() if there is no Wait call
exts: make(map[frame.StreamType]chan stream),
}
if isClient {
sess.isLocal = sess.isClient
sess.local.lastId += 1
} else {
sess.isLocal = sess.isServer
sess.remote.lastId += 1
}
for _, ext := range exts {
sess.startExtension(ext)
}
go sess.reader()
return sess
}
////////////////////////////////
// public interface
////////////////////////////////
func (s *Session) Open() (IStream, error) {
return s.OpenStream(0, 0, false)
}
func (s *Session) OpenStream(priority frame.StreamPriority, streamType frame.StreamType, fin bool) (ret IStream, err error) {
// check if the remote has gone away
if atomic.LoadInt32(&s.remote.goneAway) == 1 {
return nil, fmt.Errorf("Failed to create stream, remote has gone away.")
}
// this lock prevents the following race:
// goroutine1 goroutine2
// - inc stream id
// - inc stream id
// - send streamsyn
// - send streamsyn
s.wr.Lock()
// get the next id we can use
nextId := frame.StreamId(atomic.AddUint32(&s.local.lastId, 2))
// make the stream
str := s.newStream(nextId, priority, streamType, fin, false, s.defaultWindowSize, s)
// add to to the stream map
s.streams.Set(nextId, str)
// write the frame
if err = s.syn.Set(nextId, priority, streamType, fin); err != nil {
s.wr.Unlock()
s.die(frame.InternalError, err)
return
}
if err = s.transport.WriteFrame(s.syn); err != nil {
s.wr.Unlock()
s.die(frame.InternalError, err)
return
}
s.wr.Unlock()
return str, nil
}
func (s *Session) Accept() (str IStream, err error) {
var ok bool
if str, ok = <-s.accept; !ok {
return nil, fmt.Errorf("Session closed")
}
return
}
func (s *Session) Kill() error {
return s.transport.Close()
}
func (s *Session) Close() error {
return s.die(frame.NoError, fmt.Errorf("Session Close()"))
}
func (s *Session) GoAway(errorCode frame.ErrorCode, debug []byte) (err error) {
if !atomic.CompareAndSwapInt32(&s.local.goneAway, 0, 1) {
return fmt.Errorf("Already sent GoAway!")
}
s.wr.Lock()
f := frame.NewWGoAway()
remoteId := frame.StreamId(atomic.LoadUint32(&s.remote.lastId))
if err = f.Set(remoteId, errorCode, debug); err != nil {
s.wr.Unlock()
s.die(frame.InternalError, err)
return
}
if err = s.transport.WriteFrame(f); err != nil {
s.wr.Unlock()
s.die(frame.InternalError, err)
return
}
s.wr.Unlock()
return
}
func (s *Session) LocalAddr() net.Addr {
return s.conn.LocalAddr()
}
func (s *Session) RemoteAddr() net.Addr {
return s.conn.RemoteAddr()
}
func (s *Session) Wait() (frame.ErrorCode, error, []byte) {
reason := <-s.dead
return reason.errorCode, reason.err, reason.remoteDebug
}
////////////////////////////////
// private interface for streams
////////////////////////////////
// removeStream removes a stream from this session's stream registry
//
// It does not error if the stream is not present
func (s *Session) removeStream(id frame.StreamId) {
s.streams.Delete(id)
return
}
// writeFrame writes the given frame to the transport and returns the error from the write operation
func (s *Session) writeFrame(f frame.WFrame, dl time.Time) (err error) {
s.wr.Lock()
s.conn.SetWriteDeadline(dl)
err = s.transport.WriteFrame(f)
s.wr.Unlock()
return
}
// die closes the session cleanly with the given error and protocol error code
func (s *Session) die(errorCode frame.ErrorCode, err error) error {
// only one shutdown ever happens
if !atomic.CompareAndSwapInt32(&s.diebit, 0, 1) {
return fmt.Errorf("Shutdown already in progress")
}
// send a go away frame
s.GoAway(errorCode, []byte(err.Error()))
// now we're safe to stop accepting incoming connections
close(s.accept)
// we cleaned up as best as possible, close the transport
s.transport.Close()
// notify all of the streams that we're closing
s.streams.Each(func(id frame.StreamId, str stream) {
str.closeWith(fmt.Errorf("Session closed"))
})
s.dead <- deadReason{errorCode, err, s.remoteDebug}
return nil
}
////////////////////////////////
// internal methods
////////////////////////////////
// reader() reads frames from the underlying transport and handles passes them to handleFrame
func (s *Session) reader() {
defer s.recoverPanic("reader()")
// close all of the extension accept channels when we're done
// we do this here instead of in die() since otherwise it wouldn't
// be safe to access s.exts
defer func() {
for _, extAccept := range s.exts {
close(extAccept)
}
}()
for {
f, err := s.transport.ReadFrame()
if err != nil {
// if we fail to read a frame, terminate the session
_, ok := err.(*frame.FramingError)
if ok {
s.die(frame.ProtocolError, err)
} else {
s.die(frame.InternalError, err)
}
return
}
s.handleFrame(f)
}
}
func (s *Session) handleFrame(rf frame.RFrame) {
switch f := rf.(type) {
case *frame.RStreamSyn:
// if we're going away, refuse new streams
if atomic.LoadInt32(&s.local.goneAway) == 1 {
rstF := frame.NewWStreamRst()
rstF.Set(f.StreamId(), frame.RefusedStream)
go s.writeFrame(rstF, time.Time{})
return
}
if f.StreamId() <= frame.StreamId(atomic.LoadUint32(&s.remote.lastId)) {
s.die(frame.ProtocolError, fmt.Errorf("Stream id %d is less than last remote id.", f.StreamId()))
return
}
if s.isLocal(f.StreamId()) {
s.die(frame.ProtocolError, fmt.Errorf("Stream id has wrong parity for remote endpoint: %d", f.StreamId()))
return
}
// update last remote id
atomic.StoreUint32(&s.remote.lastId, uint32(f.StreamId()))
// make the new stream
str := s.newStream(f.StreamId(), f.StreamPriority(), f.StreamType(), false, f.Fin(), s.defaultWindowSize, s)
// add it to the stream map
s.streams.Set(f.StreamId(), str)
// check if this is an extension stream
if f.StreamType() >= MinExtensionType {
extAccept, ok := s.exts[f.StreamType()]
if !ok {
// Extension type of stream not registered
fRst := frame.NewWStreamRst()
if err := fRst.Set(f.StreamId(), frame.StreamClosed); err != nil {
s.die(frame.InternalError, err)
}
s.wr.Lock()
defer s.wr.Unlock()
s.transport.WriteFrame(fRst)
} else {
extAccept <- str
}
return
}
// put the new stream on the accept channel
s.accept <- str
case *frame.RStreamData:
if str := s.getStream(f.StreamId()); str != nil {
str.handleStreamData(f)
} else {
// if we get a data frame on a non-existent connection, we still
// need to read out the frame body so that the stream stays in a
// good state. read the payload into a throwaway buffer
discard := make([]byte, f.Length())
io.ReadFull(f.Reader(), discard)
// DATA frames on closed connections are just stream-level errors
fRst := frame.NewWStreamRst()
if err := fRst.Set(f.StreamId(), frame.StreamClosed); err != nil {
s.die(frame.InternalError, err)
}
s.wr.Lock()
defer s.wr.Unlock()
s.transport.WriteFrame(fRst)
return
}
case *frame.RStreamRst:
// delegate to the stream to handle these frames
if str := s.getStream(f.StreamId()); str != nil {
str.handleStreamRst(f)
}
case *frame.RStreamWndInc:
// delegate to the stream to handle these frames
if str := s.getStream(f.StreamId()); str != nil {
str.handleStreamWndInc(f)
}
case *frame.RGoAway:
atomic.StoreInt32(&s.remote.goneAway, 1)
s.remoteDebug = f.Debug()
lastId := f.LastStreamId()
s.streams.Each(func(id frame.StreamId, str stream) {
// close all streams that we opened above the last handled id
if s.isLocal(str.Id()) && str.Id() > lastId {
str.closeWith(fmt.Errorf("Remote is going away"))
}
})
default:
s.die(frame.ProtocolError, fmt.Errorf("Unrecognized frame type: %v", reflect.TypeOf(f)))
return
}
}
func (s *Session) recoverPanic(prefix string) {
if r := recover(); r != nil {
s.die(frame.InternalError, fmt.Errorf("%s panic: %v", prefix, r))
}
}
func (s *Session) getStream(id frame.StreamId) (str stream) {
// decide if this id is in the "idle" state (i.e. greater than any we've seen for that parity)
var lastId *uint32
if s.isLocal(id) {
lastId = &s.local.lastId
} else {
lastId = &s.remote.lastId
}
if uint32(id) > atomic.LoadUint32(lastId) {
s.die(frame.ProtocolError, fmt.Errorf("%d is an invalid, unassigned stream id", id))
}
// find the stream in the stream map
var ok bool
if str, ok = s.streams.Get(id); !ok {
return nil
}
return
}
// check if a stream id is for a client stream. client streams are odd
func (s *Session) isClient(id frame.StreamId) bool {
return uint32(id)&1 == 1
}
func (s *Session) isServer(id frame.StreamId) bool {
return !s.isClient(id)
}
//////////////////////////////////////////////
// session extensions
//////////////////////////////////////////////
func (s *Session) startExtension(ext Extension) {
accept := make(chan stream)
extAccept := func() (IStream, error) {
s, ok := <-accept
if !ok {
return nil, fmt.Errorf("Failed to accept connection, shutting down")
}
return s, nil
}
extType := ext.Start(s, extAccept)
s.exts[extType] = accept
}
//////////////////////////////////////////////
// net adaptors
//////////////////////////////////////////////
func (s *Session) NetDial(_, _ string) (net.Conn, error) {
str, err := s.Open()
return net.Conn(str), err
}
func (s *Session) NetListener() net.Listener {
return &netListenerAdaptor{s}
}
type netListenerAdaptor struct {
*Session
}
func (a *netListenerAdaptor) Addr() net.Addr {
return a.LocalAddr()
}
func (a *netListenerAdaptor) Accept() (net.Conn, error) {
str, err := a.Session.Accept()
return net.Conn(str), err
}

View file

@ -1,319 +0,0 @@
package proto
import (
"fmt"
"github.com/inconshreveable/muxado/proto/buffer"
"github.com/inconshreveable/muxado/proto/frame"
"io"
"net"
"sync"
"sync/atomic"
"time"
)
var (
zeroTime time.Time
resetRemoveDelay = 10 * time.Second
closeError = fmt.Errorf("Stream closed")
)
type Stream struct {
id frame.StreamId // stream id (const)
streamType frame.StreamType // related stream id (const)
session session // the parent session (const)
inBuffer *buffer.Inbound // buffer for data coming in from the remote side
outBuffer *buffer.Outbound // manages size of the outbound window
sentRst uint32 // == 1 only if we sent a reset to close this connection
writer sync.Mutex // only one writer at a time
wdata *frame.WStreamData // the frame this stream is currently writing
winc *frame.WStreamWndInc // window increment currently being written
readDeadline time.Time // deadline for reads (protected by buffer mutex)
writeDeadline time.Time // deadline for writes (protected by writer mutex)
}
// private interface for Streams to call Sessions
type session interface {
ISession
writeFrame(frame.WFrame, time.Time) error
die(frame.ErrorCode, error) error
removeStream(frame.StreamId)
}
////////////////////////////////
// public interface
////////////////////////////////
func NewStream(id frame.StreamId, priority frame.StreamPriority, streamType frame.StreamType, finLocal bool, finRemote bool, windowSize uint32, sess session) stream {
str := &Stream{
id: id,
inBuffer: buffer.NewInbound(int(windowSize)),
outBuffer: buffer.NewOutbound(int(windowSize)),
streamType: streamType,
session: sess,
wdata: frame.NewWStreamData(),
winc: frame.NewWStreamWndInc(),
}
if finLocal {
str.inBuffer.SetError(io.EOF)
}
if finRemote {
str.outBuffer.SetError(fmt.Errorf("Stream closed"))
}
return str
}
func (s *Stream) Write(buf []byte) (n int, err error) {
return s.write(buf, false)
}
func (s *Stream) Read(buf []byte) (n int, err error) {
// read from the buffer
n, err = s.inBuffer.Read(buf)
// if we read more than zero, we send a window update
if n > 0 {
errWnd := s.sendWindowUpdate(uint32(n))
if errWnd != nil {
err = errWnd
s.die(frame.InternalError, err)
}
}
return
}
// Close closes the stream in a manner that attempts to emulate a net.Conn's Close():
// - It calls HalfClose() with an empty buffer to half-close the stream on the remote side
// - It calls closeWith() so that all future Read/Write operations will fail
// - If the stream receives another STREAM_DATA frame from the remote side, it will send a STREAM_RST with a CANCELED error code
func (s *Stream) Close() error {
s.HalfClose([]byte{})
s.closeWith(closeError)
return nil
}
func (s *Stream) SetDeadline(deadline time.Time) (err error) {
if err = s.SetReadDeadline(deadline); err != nil {
return
}
if err = s.SetWriteDeadline(deadline); err != nil {
return
}
return
}
func (s *Stream) SetReadDeadline(dl time.Time) error {
s.inBuffer.SetDeadline(dl)
return nil
}
func (s *Stream) SetWriteDeadline(dl time.Time) error {
s.writer.Lock()
s.writeDeadline = dl
s.writer.Unlock()
return nil
}
func (s *Stream) HalfClose(buf []byte) (n int, err error) {
return s.write(buf, true)
}
func (s *Stream) Id() frame.StreamId {
return s.id
}
func (s *Stream) StreamType() frame.StreamType {
return s.streamType
}
func (s *Stream) Session() ISession {
return s.session
}
func (s *Stream) LocalAddr() net.Addr {
return s.session.LocalAddr()
}
func (s *Stream) RemoteAddr() net.Addr {
return s.session.RemoteAddr()
}
/////////////////////////////////////
// session's stream interface
/////////////////////////////////////
func (s *Stream) handleStreamData(f *frame.RStreamData) {
// skip writing for zero-length frames (typically for sending FIN)
if f.Length() > 0 {
// write the data into the buffer
if _, err := s.inBuffer.ReadFrom(f.Reader()); err != nil {
if err == buffer.FullError {
s.resetWith(frame.FlowControlError, fmt.Errorf("Flow control buffer overflowed"))
} else if err == closeError {
// We're trying to emulate net.Conn's Close() behavior where we close our side of the connection,
// and if we get any more frames from the other side, we RST it.
s.resetWith(frame.Cancel, fmt.Errorf("Stream closed"))
} else if err == buffer.AlreadyClosed {
// there was already an error set
s.resetWith(frame.StreamClosed, err)
} else {
// the transport returned some sort of IO error
s.die(frame.ProtocolError, err)
}
return
}
}
if f.Fin() {
s.inBuffer.SetError(io.EOF)
s.maybeRemove()
}
}
func (s *Stream) handleStreamRst(f *frame.RStreamRst) {
s.closeWith(fmt.Errorf("Stream reset by peer with error %d", f.ErrorCode()))
}
func (s *Stream) handleStreamWndInc(f *frame.RStreamWndInc) {
s.outBuffer.Increment(int(f.WindowIncrement()))
}
func (s *Stream) closeWith(err error) {
s.outBuffer.SetError(err)
s.inBuffer.SetError(err)
s.session.removeStream(s.id)
}
////////////////////////////////
// internal methods
////////////////////////////////
func (s *Stream) closeWithAndRemoveLater(err error) {
s.outBuffer.SetError(err)
s.inBuffer.SetError(err)
time.AfterFunc(resetRemoveDelay, func() {
s.session.removeStream(s.id)
})
}
func (s *Stream) maybeRemove() {
if buffer.BothClosed(s.inBuffer, s.outBuffer) {
s.session.removeStream(s.id)
}
}
func (s *Stream) resetWith(errorCode frame.ErrorCode, resetErr error) {
// only ever send one reset
if !atomic.CompareAndSwapUint32(&s.sentRst, 0, 1) {
return
}
// close the stream
s.closeWithAndRemoveLater(resetErr)
// make the reset frame
rst := frame.NewWStreamRst()
if err := rst.Set(s.id, errorCode); err != nil {
s.die(frame.InternalError, err)
}
// need write lock to make sure no data frames get sent after we send the reset
s.writer.Lock()
// send it
if err := s.session.writeFrame(rst, zeroTime); err != nil {
s.writer.Unlock()
s.die(frame.InternalError, err)
}
s.writer.Unlock()
}
func (s *Stream) write(buf []byte, fin bool) (n int, err error) {
// a write call can pass a buffer larger that we can send in a single frame
// only allow one writer at a time to prevent interleaving frames from concurrent writes
s.writer.Lock()
bufSize := len(buf)
bytesRemaining := bufSize
for bytesRemaining > 0 || fin {
// figure out the most we can write in a single frame
writeReqSize := min(0x3FFF, bytesRemaining)
// and then reduce that to however much is available in the window
// this blocks until window is available and may not return all that we asked for
var writeSize int
if writeSize, err = s.outBuffer.Decrement(writeReqSize); err != nil {
s.writer.Unlock()
return
}
// calculate the slice of the buffer we'll write
start, end := n, n+writeSize
// only send fin for the last frame
finBit := fin && end == bufSize
// make the frame
if err = s.wdata.Set(s.id, buf[start:end], finBit); err != nil {
s.writer.Unlock()
s.die(frame.InternalError, err)
return
}
// write the frame
if err = s.session.writeFrame(s.wdata, s.writeDeadline); err != nil {
s.writer.Unlock()
return
}
// update our counts
n += writeSize
bytesRemaining -= writeSize
if finBit {
s.outBuffer.SetError(fmt.Errorf("Stream closed"))
s.maybeRemove()
// handles the empty buffer case with fin case
fin = false
}
}
s.writer.Unlock()
return
}
// sendWindowUpdate sends a window increment frame
// with the given increment
func (s *Stream) sendWindowUpdate(inc uint32) (err error) {
// send a window update
if err = s.winc.Set(s.id, inc); err != nil {
return
}
// XXX: write this async? We can only write one at
// a time if we're not allocating new ones from the heap
if err = s.session.writeFrame(s.winc, zeroTime); err != nil {
return
}
return
}
// die is called when a protocol error occurs and the entire
// session must be destroyed.
func (s *Stream) die(errorCode frame.ErrorCode, err error) {
s.closeWith(fmt.Errorf("Stream closed on error: %v", err))
s.session.die(errorCode, err)
}
func min(n1, n2 int) int {
if n1 > n2 {
return n2
} else {
return n1
}
}

View file

@ -1,59 +0,0 @@
package proto
import (
"github.com/inconshreveable/muxado/proto/frame"
"sync"
)
const (
initMapCapacity = 128 // not too much extra memory wasted to avoid allocations
)
type StreamMap interface {
Get(frame.StreamId) (stream, bool)
Set(frame.StreamId, stream)
Delete(frame.StreamId)
Each(func(frame.StreamId, stream))
}
// ConcurrentStreamMap is a map of stream ids -> streams guarded by a read/write lock
type ConcurrentStreamMap struct {
sync.RWMutex
table map[frame.StreamId]stream
}
func (m *ConcurrentStreamMap) Get(id frame.StreamId) (s stream, ok bool) {
m.RLock()
s, ok = m.table[id]
m.RUnlock()
return
}
func (m *ConcurrentStreamMap) Set(id frame.StreamId, str stream) {
m.Lock()
m.table[id] = str
m.Unlock()
}
func (m *ConcurrentStreamMap) Delete(id frame.StreamId) {
m.Lock()
delete(m.table, id)
m.Unlock()
}
func (m *ConcurrentStreamMap) Each(fn func(frame.StreamId, stream)) {
m.Lock()
streams := make(map[frame.StreamId]stream, len(m.table))
for k, v := range m.table {
streams[k] = v
}
m.Unlock()
for id, str := range streams {
fn(id, str)
}
}
func NewConcurrentStreamMap() *ConcurrentStreamMap {
return &ConcurrentStreamMap{table: make(map[frame.StreamId]stream, initMapCapacity)}
}

View file

@ -1,71 +0,0 @@
package muxado
import (
"crypto/tls"
"github.com/inconshreveable/muxado/proto"
"github.com/inconshreveable/muxado/proto/ext"
"net"
)
// A Listener accepts new connections from its net.Listener
// and begins muxado server connections on them.
//
// It's API is very similar to a net.Listener, but it returns
// muxado.Sessions instead of net.Conn's.
type Listener struct {
wrapped net.Listener
}
// Accept the next connection from the listener and begin
// a muxado session on it.
func (l *Listener) Accept() (Session, error) {
conn, err := l.wrapped.Accept()
if err != nil {
return nil, err
}
return Server(conn), nil
}
// Addr returns the bound address of the wrapped net.Listener
func (l *Listener) Addr() net.Addr {
return l.wrapped.Addr()
}
// Close closes the wrapped net.Listener
func (l *Listener) Close() error {
return l.wrapped.Close()
}
// Server returns a muxado server session using conn as the transport.
func Server(conn net.Conn) Session {
return &sessionAdaptor{proto.NewSession(conn, proto.NewStream, false, []proto.Extension{ext.NewDefaultHeartbeat()})}
}
// Listen binds to a network address and returns a Listener which accepts
// new connections and starts muxado server sessions on them.
func Listen(network, addr string) (*Listener, error) {
l, err := net.Listen(network, addr)
if err != nil {
return nil, err
}
return &Listener{l}, nil
}
// ListenTLS binds to a network address and accepts new TLS-encrypted connections.
// It returns a Listener which starts new muxado server sessions on the connections.
func ListenTLS(network, addr string, tlsConfig *tls.Config) (*Listener, error) {
l, err := tls.Listen(network, addr, tlsConfig)
if err != nil {
return nil, err
}
return &Listener{l}, nil
}
// NewListener creates a new muxado listener which creates new muxado server sessions
// by accepting connections from the given net.Listener
func NewListener(l net.Listener) *Listener {
return &Listener{l}
}

20
vendor/vendor.json vendored
View file

@ -399,26 +399,6 @@
"revision": "d1caa6c97c9fc1cc9e83bbe34d0603f9ff0ce8bd",
"revisionTime": "2016-07-20T23:31:40Z"
},
{
"path": "github.com/inconshreveable/muxado",
"revision": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848"
},
{
"path": "github.com/inconshreveable/muxado/proto",
"revision": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848"
},
{
"path": "github.com/inconshreveable/muxado/proto/buffer",
"revision": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848"
},
{
"path": "github.com/inconshreveable/muxado/proto/ext",
"revision": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848"
},
{
"path": "github.com/inconshreveable/muxado/proto/frame",
"revision": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848"
},
{
"checksumSHA1": "xZuhljnmBysJPta/lMyYmJdujCg=",
"path": "github.com/mattn/go-isatty",

View file

@ -16,6 +16,15 @@ standard upgrade flow.
## Consul 0.7
#### Dropped Support for Protocol Version 1
Consul version 0.7 dropped support for protocol version 1, which means it
is no longer compatible with versions of Consul prior to 0.3. You will need
to upgrade all agents to a newer version of Consul before upgrading to Consul
0.7.
#### Prepared Query Changes
Consul version 0.7 adds a feature which allows prepared queries to store a
["Near" parameter](/docs/agent/http/query.html#near) in the query definition
itself. This feature enables using the distance sorting features of prepared