Revert grpc back a version (they introduced a panic) and clean up a bunch of old request forwarding stuff
This commit is contained in:
parent
af0d347766
commit
9d4801b1e8
|
@ -564,7 +564,7 @@ CLUSTER_SYNTHESIS_COMPLETE:
|
|||
// This needs to happen before we first unseal, so before we trigger dev
|
||||
// mode if it's set
|
||||
core.SetClusterListenerAddrs(clusterAddrs)
|
||||
core.SetClusterSetupFuncs(vault.WrapHandlerForClustering(handler, c.logger))
|
||||
core.SetClusterHandler(handler)
|
||||
|
||||
// If we're in Dev mode, then initialize the core
|
||||
if dev {
|
||||
|
|
|
@ -17,11 +17,8 @@ import (
|
|||
"net/http"
|
||||
"time"
|
||||
|
||||
log "github.com/mgutz/logxi/v1"
|
||||
|
||||
"github.com/hashicorp/errwrap"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/vault/helper/forwarding"
|
||||
"github.com/hashicorp/vault/helper/jsonutil"
|
||||
)
|
||||
|
||||
|
@ -285,21 +282,11 @@ func (c *Core) setupCluster() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetClusterSetupFuncs sets the handler setup func
|
||||
func (c *Core) SetClusterSetupFuncs(handler func() (http.Handler, http.Handler)) {
|
||||
c.clusterHandlerSetupFunc = handler
|
||||
}
|
||||
|
||||
// startClusterListener starts cluster request listeners during postunseal. It
|
||||
// is assumed that the state lock is held while this is run. Right now this
|
||||
// only starts forwarding listeners; it's TBD whether other request types will
|
||||
// be built in the same mechanism or started independently.
|
||||
func (c *Core) startClusterListener() error {
|
||||
if c.clusterHandlerSetupFunc == nil {
|
||||
c.logger.Error("core: cluster handler setup function has not been set when trying to start listeners")
|
||||
return fmt.Errorf("cluster handler setup function has not been set")
|
||||
}
|
||||
|
||||
if c.clusterAddr == "" {
|
||||
c.logger.Info("core: clustering disabled, not starting listeners")
|
||||
return nil
|
||||
|
@ -475,50 +462,6 @@ func (c *Core) SetClusterListenerAddrs(addrs []*net.TCPAddr) {
|
|||
c.clusterListenerAddrs = addrs
|
||||
}
|
||||
|
||||
// WrapHandlerForClustering takes in Vault's HTTP handler and returns a setup
|
||||
// function that returns both the original handler and one wrapped with cluster
|
||||
// methods
|
||||
func WrapHandlerForClustering(handler http.Handler, logger log.Logger) func() (http.Handler, http.Handler) {
|
||||
return func() (http.Handler, http.Handler) {
|
||||
// This mux handles cluster functions (right now, only forwarded requests)
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/cluster/local/forwarded-request", func(w http.ResponseWriter, req *http.Request) {
|
||||
//logger.Trace("forwarding: serving h2 forwarded request")
|
||||
freq, err := forwarding.ParseForwardedHTTPRequest(req)
|
||||
if err != nil {
|
||||
if logger != nil {
|
||||
logger.Error("http/forwarded-request-server: error parsing forwarded request", "error", err)
|
||||
}
|
||||
|
||||
w.Header().Add("Content-Type", "application/json")
|
||||
|
||||
// The response writer here is different from
|
||||
// the one set in Vault's HTTP handler.
|
||||
// Hence, set the Cache-Control explicitly.
|
||||
w.Header().Set("Cache-Control", "no-store")
|
||||
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
|
||||
type errorResponse struct {
|
||||
Errors []string
|
||||
}
|
||||
resp := &errorResponse{
|
||||
Errors: []string{
|
||||
err.Error(),
|
||||
},
|
||||
}
|
||||
|
||||
enc := json.NewEncoder(w)
|
||||
enc.Encode(resp)
|
||||
return
|
||||
}
|
||||
|
||||
// To avoid the risk of a forward loop in some pathological condition,
|
||||
// set the no-forward header
|
||||
freq.Header.Set(IntNoForwardingHeaderName, "true")
|
||||
handler.ServeHTTP(w, freq)
|
||||
})
|
||||
|
||||
return handler, mux
|
||||
}
|
||||
func (c *Core) SetClusterHandler(handler http.Handler) {
|
||||
c.clusterHandler = handler
|
||||
}
|
||||
|
|
|
@ -294,8 +294,8 @@ type Core struct {
|
|||
localClusterParsedCert *x509.Certificate
|
||||
// The TCP addresses we should use for clustering
|
||||
clusterListenerAddrs []*net.TCPAddr
|
||||
// The setup function that gives us the handler to use
|
||||
clusterHandlerSetupFunc func() (http.Handler, http.Handler)
|
||||
// The handler to use for request forwarding
|
||||
clusterHandler http.Handler
|
||||
// Tracks whether cluster listeners are running, e.g. it's safe to send a
|
||||
// shutdown down the channel
|
||||
clusterListenersRunning bool
|
||||
|
|
|
@ -34,10 +34,6 @@ func (c *Core) startForwarding() error {
|
|||
// Resolve locally to avoid races
|
||||
ha := c.ha != nil
|
||||
|
||||
// Get our base handler (for our RPC server) and our wrapped handler (for
|
||||
// straight HTTP/2 forwarding)
|
||||
baseHandler, wrappedHandler := c.clusterHandlerSetupFunc()
|
||||
|
||||
// Get our TLS config
|
||||
tlsConfig, err := c.ClusterTLSConfig()
|
||||
if err != nil {
|
||||
|
@ -58,10 +54,10 @@ func (c *Core) startForwarding() error {
|
|||
|
||||
c.rpcServer = grpc.NewServer()
|
||||
|
||||
if ha {
|
||||
if ha && c.clusterHandler != nil {
|
||||
RegisterRequestForwardingServer(c.rpcServer, &forwardedRequestRPCServer{
|
||||
core: c,
|
||||
handler: baseHandler,
|
||||
handler: c.clusterHandler,
|
||||
})
|
||||
}
|
||||
c.clusterParamsLock.Unlock()
|
||||
|
|
|
@ -155,12 +155,12 @@ func testCoreConfig(t testing.TB, physicalBackend physical.Backend, logger log.L
|
|||
// TestCoreInit initializes the core with a single key, and returns
|
||||
// the key that must be used to unseal the core and a root token.
|
||||
func TestCoreInit(t testing.TB, core *Core) ([][]byte, string) {
|
||||
return TestCoreInitClusterWrapperSetup(t, core, nil, func() (http.Handler, http.Handler) { return nil, nil })
|
||||
return TestCoreInitClusterWrapperSetup(t, core, nil, nil)
|
||||
}
|
||||
|
||||
func TestCoreInitClusterWrapperSetup(t testing.TB, core *Core, clusterAddrs []*net.TCPAddr, handlerSetupFunc func() (http.Handler, http.Handler)) ([][]byte, string) {
|
||||
func TestCoreInitClusterWrapperSetup(t testing.TB, core *Core, clusterAddrs []*net.TCPAddr, handler http.Handler) ([][]byte, string) {
|
||||
core.SetClusterListenerAddrs(clusterAddrs)
|
||||
core.SetClusterSetupFuncs(handlerSetupFunc)
|
||||
core.SetClusterHandler(handler)
|
||||
result, err := core.Initialize(&InitParams{
|
||||
BarrierConfig: &SealConfig{
|
||||
SecretShares: 3,
|
||||
|
@ -178,7 +178,6 @@ func TestCoreInitClusterWrapperSetup(t testing.TB, core *Core, clusterAddrs []*n
|
|||
}
|
||||
|
||||
func TestCoreUnseal(core *Core, key []byte) (bool, error) {
|
||||
core.SetClusterSetupFuncs(func() (http.Handler, http.Handler) { return nil, nil })
|
||||
return core.Unseal(key)
|
||||
}
|
||||
|
||||
|
@ -842,10 +841,10 @@ func TestCluster(t testing.TB, handlers []http.Handler, base *CoreConfig, unseal
|
|||
}
|
||||
|
||||
c2.SetClusterListenerAddrs(clusterAddrGen(c2lns))
|
||||
c2.SetClusterSetupFuncs(WrapHandlerForClustering(handlers[1], logger))
|
||||
c2.SetClusterHandler(handlers[1])
|
||||
c3.SetClusterListenerAddrs(clusterAddrGen(c3lns))
|
||||
c3.SetClusterSetupFuncs(WrapHandlerForClustering(handlers[2], logger))
|
||||
keys, root := TestCoreInitClusterWrapperSetup(t, c1, clusterAddrGen(c1lns), WrapHandlerForClustering(handlers[0], logger))
|
||||
c3.SetClusterHandler(handlers[2])
|
||||
keys, root := TestCoreInitClusterWrapperSetup(t, c1, clusterAddrGen(c1lns), handlers[0])
|
||||
for _, key := range keys {
|
||||
if _, err := c1.Unseal(TestKeyCopy(key)); err != nil {
|
||||
t.Fatalf("unseal err: %s", err)
|
||||
|
|
|
@ -278,7 +278,7 @@ type parser struct {
|
|||
// that the underlying io.Reader must not return an incompatible
|
||||
// error.
|
||||
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
|
||||
if _, err := p.r.Read(p.header[:]); err != nil {
|
||||
if _, err := io.ReadFull(p.r, p.header[:]); err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
|
@ -294,7 +294,7 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
|
|||
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
|
||||
// of making it for each message:
|
||||
msg = make([]byte, int(length))
|
||||
if _, err := p.r.Read(msg); err != nil {
|
||||
if _, err := io.ReadFull(p.r, msg); err != nil {
|
||||
if err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
|
|
|
@ -58,8 +58,6 @@ const (
|
|||
defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
|
||||
defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
|
||||
defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
|
||||
// max window limit set by HTTP2 Specs.
|
||||
maxWindowSize = math.MaxInt32
|
||||
)
|
||||
|
||||
// The following defines various control items which could flow through
|
||||
|
@ -169,40 +167,6 @@ type inFlow struct {
|
|||
// The amount of data the application has consumed but grpc has not sent
|
||||
// window update for them. Used to reduce window update frequency.
|
||||
pendingUpdate uint32
|
||||
// delta is the extra window update given by receiver when an application
|
||||
// is reading data bigger in size than the inFlow limit.
|
||||
delta uint32
|
||||
}
|
||||
|
||||
func (f *inFlow) maybeAdjust(n uint32) uint32 {
|
||||
if n > uint32(math.MaxInt32) {
|
||||
n = uint32(math.MaxInt32)
|
||||
}
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
// estSenderQuota is the receiver's view of the maximum number of bytes the sender
|
||||
// can send without a window update.
|
||||
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
|
||||
// estUntransmittedData is the maximum number of bytes the sends might not have put
|
||||
// on the wire yet. A value of 0 or less means that we have already received all or
|
||||
// more bytes than the application is requesting to read.
|
||||
estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
|
||||
// This implies that unless we send a window update, the sender won't be able to send all the bytes
|
||||
// for this message. Therefore we must send an update over the limit since there's an active read
|
||||
// request from the application.
|
||||
if estUntransmittedData > estSenderQuota {
|
||||
// Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec.
|
||||
if f.limit+n > maxWindowSize {
|
||||
f.delta = maxWindowSize - f.limit
|
||||
} else {
|
||||
// Send a window update for the whole message and not just the difference between
|
||||
// estUntransmittedData and estSenderQuota. This will be helpful in case the message
|
||||
// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
|
||||
f.delta = n
|
||||
}
|
||||
return f.delta
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// onData is invoked when some data frame is received. It updates pendingData.
|
||||
|
@ -210,7 +174,7 @@ func (f *inFlow) onData(n uint32) error {
|
|||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.pendingData += n
|
||||
if f.pendingData+f.pendingUpdate > f.limit+f.delta {
|
||||
if f.pendingData+f.pendingUpdate > f.limit {
|
||||
return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
|
||||
}
|
||||
return nil
|
||||
|
@ -225,13 +189,6 @@ func (f *inFlow) onRead(n uint32) uint32 {
|
|||
return 0
|
||||
}
|
||||
f.pendingData -= n
|
||||
if n > f.delta {
|
||||
n -= f.delta
|
||||
f.delta = 0
|
||||
} else {
|
||||
f.delta -= n
|
||||
n = 0
|
||||
}
|
||||
f.pendingUpdate += n
|
||||
if f.pendingUpdate >= f.limit/4 {
|
||||
wu := f.pendingUpdate
|
||||
|
|
|
@ -316,12 +316,13 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
|
|||
req := ht.req
|
||||
|
||||
s := &Stream{
|
||||
id: 0, // irrelevant
|
||||
cancel: cancel,
|
||||
buf: newRecvBuffer(),
|
||||
st: ht,
|
||||
method: req.URL.Path,
|
||||
recvCompress: req.Header.Get("grpc-encoding"),
|
||||
id: 0, // irrelevant
|
||||
windowHandler: func(int) {}, // nothing
|
||||
cancel: cancel,
|
||||
buf: newRecvBuffer(),
|
||||
st: ht,
|
||||
method: req.URL.Path,
|
||||
recvCompress: req.Header.Get("grpc-encoding"),
|
||||
}
|
||||
pr := &peer.Peer{
|
||||
Addr: ht.RemoteAddr(),
|
||||
|
@ -332,7 +333,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
|
|||
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
|
||||
ctx = peer.NewContext(ctx, pr)
|
||||
s.ctx = newContextWithStream(ctx, s)
|
||||
s.trReader = &recvBufferReader{ctx: s.ctx, recv: s.buf}
|
||||
s.dec = &recvBufferReader{ctx: s.ctx, recv: s.buf}
|
||||
|
||||
// readerDone is closed when the Body.Read-ing goroutine exits.
|
||||
readerDone := make(chan struct{})
|
||||
|
|
|
@ -173,9 +173,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
|
|||
conn, err := dial(ctx, opts.Dialer, addr.Addr)
|
||||
if err != nil {
|
||||
if opts.FailOnNonTempDialError {
|
||||
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
|
||||
return nil, connectionErrorf(isTemporary(err), err, "transport: %v", err)
|
||||
}
|
||||
return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
// Any further errors will close the underlying connection
|
||||
defer func(conn net.Conn) {
|
||||
|
@ -194,7 +194,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
|
|||
// Credentials handshake errors are typically considered permanent
|
||||
// to avoid retrying on e.g. bad certificates.
|
||||
temp := isTemporary(err)
|
||||
return nil, connectionErrorf(temp, err, "transport: authentication handshake failed: %v", err)
|
||||
return nil, connectionErrorf(temp, err, "transport: %v", err)
|
||||
}
|
||||
isSecure = true
|
||||
}
|
||||
|
@ -269,7 +269,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
|
|||
n, err := t.conn.Write(clientPreface)
|
||||
if err != nil {
|
||||
t.Close()
|
||||
return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
if n != len(clientPreface) {
|
||||
t.Close()
|
||||
|
@ -285,13 +285,13 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
|
|||
}
|
||||
if err != nil {
|
||||
t.Close()
|
||||
return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
// Adjust the connection flow control window if needed.
|
||||
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
|
||||
if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
|
||||
t.Close()
|
||||
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
}
|
||||
go t.controller()
|
||||
|
@ -316,24 +316,18 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
|||
headerChan: make(chan struct{}),
|
||||
}
|
||||
t.nextID += 2
|
||||
s.requestRead = func(n int) {
|
||||
t.adjustWindow(s, uint32(n))
|
||||
s.windowHandler = func(n int) {
|
||||
t.updateWindow(s, uint32(n))
|
||||
}
|
||||
// The client side stream context should have exactly the same life cycle with the user provided context.
|
||||
// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
|
||||
// So we use the original context here instead of creating a copy.
|
||||
s.ctx = ctx
|
||||
s.trReader = &transportReader{
|
||||
reader: &recvBufferReader{
|
||||
ctx: s.ctx,
|
||||
goAway: s.goAway,
|
||||
recv: s.buf,
|
||||
},
|
||||
windowHandler: func(n int) {
|
||||
t.updateWindow(s, uint32(n))
|
||||
},
|
||||
s.dec = &recvBufferReader{
|
||||
ctx: s.ctx,
|
||||
goAway: s.goAway,
|
||||
recv: s.buf,
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -808,20 +802,6 @@ func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
|
|||
return s, ok
|
||||
}
|
||||
|
||||
// adjustWindow sends out extra window update over the initial window size
|
||||
// of stream if the application is requesting data larger in size than
|
||||
// the window.
|
||||
func (t *http2Client) adjustWindow(s *Stream, n uint32) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.state == streamDone {
|
||||
return
|
||||
}
|
||||
if w := s.fc.maybeAdjust(n); w > 0 {
|
||||
t.controlBuf.put(&windowUpdate{s.id, w})
|
||||
}
|
||||
}
|
||||
|
||||
// updateWindow adjusts the inbound quota for the stream and the transport.
|
||||
// Window updates will deliver to the controller for sending when
|
||||
// the cumulative quota exceeds the corresponding threshold.
|
||||
|
|
|
@ -274,14 +274,10 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
|||
if len(state.mdata) > 0 {
|
||||
s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
|
||||
}
|
||||
s.trReader = &transportReader{
|
||||
reader: &recvBufferReader{
|
||||
ctx: s.ctx,
|
||||
recv: s.buf,
|
||||
},
|
||||
windowHandler: func(n int) {
|
||||
t.updateWindow(s, uint32(n))
|
||||
},
|
||||
|
||||
s.dec = &recvBufferReader{
|
||||
ctx: s.ctx,
|
||||
recv: s.buf,
|
||||
}
|
||||
s.recvCompress = state.encoding
|
||||
s.method = state.method
|
||||
|
@ -320,8 +316,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
|||
t.idle = time.Time{}
|
||||
}
|
||||
t.mu.Unlock()
|
||||
s.requestRead = func(n int) {
|
||||
t.adjustWindow(s, uint32(n))
|
||||
s.windowHandler = func(n int) {
|
||||
t.updateWindow(s, uint32(n))
|
||||
}
|
||||
s.ctx = traceCtx(s.ctx, s.method)
|
||||
if t.stats != nil {
|
||||
|
@ -365,7 +361,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
|
|||
return
|
||||
}
|
||||
if err != nil {
|
||||
grpclog.Printf("transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
|
||||
grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
|
||||
t.Close()
|
||||
return
|
||||
}
|
||||
|
@ -439,20 +435,6 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
|
|||
return s, true
|
||||
}
|
||||
|
||||
// adjustWindow sends out extra window update over the initial window size
|
||||
// of stream if the application is requesting data larger in size than
|
||||
// the window.
|
||||
func (t *http2Server) adjustWindow(s *Stream, n uint32) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.state == streamDone {
|
||||
return
|
||||
}
|
||||
if w := s.fc.maybeAdjust(n); w > 0 {
|
||||
t.controlBuf.put(&windowUpdate{s.id, w})
|
||||
}
|
||||
}
|
||||
|
||||
// updateWindow adjusts the inbound quota for the stream and the transport.
|
||||
// Window updates will deliver to the controller for sending when
|
||||
// the cumulative quota exceeds the corresponding threshold.
|
||||
|
|
|
@ -185,17 +185,14 @@ type Stream struct {
|
|||
recvCompress string
|
||||
sendCompress string
|
||||
buf *recvBuffer
|
||||
trReader io.Reader
|
||||
dec io.Reader
|
||||
fc *inFlow
|
||||
recvQuota uint32
|
||||
|
||||
// TODO: Remote this unused variable.
|
||||
// The accumulated inbound quota pending for window update.
|
||||
updateQuota uint32
|
||||
|
||||
// Callback to state application's intentions to read data. This
|
||||
// is used to adjust flow control, if need be.
|
||||
requestRead func(int)
|
||||
// The handler to control the window update procedure for both this
|
||||
// particular stream and the associated transport.
|
||||
windowHandler func(int)
|
||||
|
||||
sendQuotaPool *quotaPool
|
||||
// Close headerChan to indicate the end of reception of header metadata.
|
||||
|
@ -323,35 +320,16 @@ func (s *Stream) write(m recvMsg) {
|
|||
s.buf.put(&m)
|
||||
}
|
||||
|
||||
// Read reads all p bytes from the wire for this stream.
|
||||
func (s *Stream) Read(p []byte) (n int, err error) {
|
||||
// Don't request a read if there was an error earlier
|
||||
if er := s.trReader.(*transportReader).er; er != nil {
|
||||
return 0, er
|
||||
}
|
||||
s.requestRead(len(p))
|
||||
return io.ReadFull(s.trReader, p)
|
||||
}
|
||||
|
||||
// tranportReader reads all the data available for this Stream from the transport and
|
||||
// Read reads all the data available for this Stream from the transport and
|
||||
// passes them into the decoder, which converts them into a gRPC message stream.
|
||||
// The error is io.EOF when the stream is done or another non-nil error if
|
||||
// the stream broke.
|
||||
type transportReader struct {
|
||||
reader io.Reader
|
||||
// The handler to control the window update procedure for both this
|
||||
// particular stream and the associated transport.
|
||||
windowHandler func(int)
|
||||
er error
|
||||
}
|
||||
|
||||
func (t *transportReader) Read(p []byte) (n int, err error) {
|
||||
n, err = t.reader.Read(p)
|
||||
func (s *Stream) Read(p []byte) (n int, err error) {
|
||||
n, err = s.dec.Read(p)
|
||||
if err != nil {
|
||||
t.er = err
|
||||
return
|
||||
}
|
||||
t.windowHandler(n)
|
||||
s.windowHandler(n)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -1567,94 +1567,94 @@
|
|||
"revisionTime": "2017-05-23T04:36:04Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "/rSFejOYzXTDxhpcW/vN7EFU2Qs=",
|
||||
"checksumSHA1": "ipFn6sHiVFEnpb9EAXVy5q4QdTY=",
|
||||
"path": "google.golang.org/grpc",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "xDFNvjW6gMsDQ+WRZ0t+8S/e4Qk=",
|
||||
"path": "google.golang.org/grpc/codes",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "ABO3qOTUiOZOk1UCq47NbQ64yWk=",
|
||||
"path": "google.golang.org/grpc/credentials",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "kkBRqB0xCvCiZhsA9yNQFwbntfo=",
|
||||
"path": "google.golang.org/grpc/credentials/oauth",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "d0cyferoJguQhL6d2K6g2oC0mVM=",
|
||||
"path": "google.golang.org/grpc/grpclb/grpc_lb_v1",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "3Lt5hNAG8qJAYSsNghR5uA1zQns=",
|
||||
"path": "google.golang.org/grpc/grpclog",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "T3Q0p8kzvXFnRkMaK/G8mCv6mc0=",
|
||||
"path": "google.golang.org/grpc/internal",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "TY6NrgLRPSer6a5dBYOo/7o/ghk=",
|
||||
"path": "google.golang.org/grpc/keepalive",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "89fjWaU6NKVpmWI+0EoDION0dpE=",
|
||||
"path": "google.golang.org/grpc/metadata",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "4GSUFhOQ0kdFlBH4D5OTeKy78z0=",
|
||||
"path": "google.golang.org/grpc/naming",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "wUSBvomJRJhYf4ELuP0bSkFrzgc=",
|
||||
"path": "google.golang.org/grpc/peer",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "e7eoENPNFnm2QddUE5epm8UmFX8=",
|
||||
"path": "google.golang.org/grpc/stats",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "0tlQhEkF3hex/+tjcygLxeweuiY=",
|
||||
"path": "google.golang.org/grpc/status",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "N0TftT6/CyWqp6VRi2DqDx60+Fo=",
|
||||
"path": "google.golang.org/grpc/tap",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "kBBBWr92WhtHLtKWWa79rwzopTg=",
|
||||
"checksumSHA1": "kZdlvLQTf65/tXDwjVseffdHRWo=",
|
||||
"path": "google.golang.org/grpc/transport",
|
||||
"revision": "6dff7c5f333b5331089ccc6451fc842664f3be6a",
|
||||
"revisionTime": "2017-05-23T18:39:15Z"
|
||||
"revision": "79f73d62e5082ee8d6b86a2487fd6267c1495fcd",
|
||||
"revisionTime": "2017-05-22T21:45:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "xsaHqy6/sonLV6xIxTNh4FfkWbU=",
|
||||
|
|
Loading…
Reference in New Issue