/* * * Copyright 2014 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package grpc import ( "bytes" "compress/gzip" "encoding/binary" "io" "io/ioutil" "math" "strings" "sync" "time" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding/proto" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/stats" "google.golang.org/grpc/status" "google.golang.org/grpc/transport" ) // Compressor defines the interface gRPC uses to compress a message. type Compressor interface { // Do compresses p into w. Do(w io.Writer, p []byte) error // Type returns the compression algorithm the Compressor uses. Type() string } type gzipCompressor struct { pool sync.Pool } // NewGZIPCompressor creates a Compressor based on GZIP. func NewGZIPCompressor() Compressor { return &gzipCompressor{ pool: sync.Pool{ New: func() interface{} { return gzip.NewWriter(ioutil.Discard) }, }, } } func (c *gzipCompressor) Do(w io.Writer, p []byte) error { z := c.pool.Get().(*gzip.Writer) defer c.pool.Put(z) z.Reset(w) if _, err := z.Write(p); err != nil { return err } return z.Close() } func (c *gzipCompressor) Type() string { return "gzip" } // Decompressor defines the interface gRPC uses to decompress a message. type Decompressor interface { // Do reads the data from r and uncompress them. Do(r io.Reader) ([]byte, error) // Type returns the compression algorithm the Decompressor uses. Type() string } type gzipDecompressor struct { pool sync.Pool } // NewGZIPDecompressor creates a Decompressor based on GZIP. func NewGZIPDecompressor() Decompressor { return &gzipDecompressor{} } func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) { var z *gzip.Reader switch maybeZ := d.pool.Get().(type) { case nil: newZ, err := gzip.NewReader(r) if err != nil { return nil, err } z = newZ case *gzip.Reader: z = maybeZ if err := z.Reset(r); err != nil { d.pool.Put(z) return nil, err } } defer func() { z.Close() d.pool.Put(z) }() return ioutil.ReadAll(z) } func (d *gzipDecompressor) Type() string { return "gzip" } // callInfo contains all related configuration and information about an RPC. type callInfo struct { compressorType string failFast bool stream *transport.Stream traceInfo traceInfo // in trace.go maxReceiveMessageSize *int maxSendMessageSize *int creds credentials.PerRPCCredentials contentSubtype string codec baseCodec } func defaultCallInfo() *callInfo { return &callInfo{failFast: true} } // CallOption configures a Call before it starts or extracts information from // a Call after it completes. type CallOption interface { // before is called before the call is sent to any server. If before // returns a non-nil error, the RPC fails with that error. before(*callInfo) error // after is called after the call has completed. after cannot return an // error, so any failures should be reported via output parameters. after(*callInfo) } // EmptyCallOption does not alter the Call configuration. // It can be embedded in another structure to carry satellite data for use // by interceptors. type EmptyCallOption struct{} func (EmptyCallOption) before(*callInfo) error { return nil } func (EmptyCallOption) after(*callInfo) {} type beforeCall func(c *callInfo) error func (o beforeCall) before(c *callInfo) error { return o(c) } func (o beforeCall) after(c *callInfo) {} type afterCall func(c *callInfo) func (o afterCall) before(c *callInfo) error { return nil } func (o afterCall) after(c *callInfo) { o(c) } // Header returns a CallOptions that retrieves the header metadata // for a unary RPC. func Header(md *metadata.MD) CallOption { return afterCall(func(c *callInfo) { if c.stream != nil { *md, _ = c.stream.Header() } }) } // Trailer returns a CallOptions that retrieves the trailer metadata // for a unary RPC. func Trailer(md *metadata.MD) CallOption { return afterCall(func(c *callInfo) { if c.stream != nil { *md = c.stream.Trailer() } }) } // Peer returns a CallOption that retrieves peer information for a // unary RPC. func Peer(p *peer.Peer) CallOption { return afterCall(func(c *callInfo) { if c.stream != nil { if x, ok := peer.FromContext(c.stream.Context()); ok { *p = *x } } }) } // FailFast configures the action to take when an RPC is attempted on broken // connections or unreachable servers. If failFast is true, the RPC will fail // immediately. Otherwise, the RPC client will block the call until a // connection is available (or the call is canceled or times out) and will // retry the call if it fails due to a transient error. gRPC will not retry if // data was written to the wire unless the server indicates it did not process // the data. Please refer to // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md. // // By default, RPCs are "Fail Fast". func FailFast(failFast bool) CallOption { return beforeCall(func(c *callInfo) error { c.failFast = failFast return nil }) } // MaxCallRecvMsgSize returns a CallOption which sets the maximum message size the client can receive. func MaxCallRecvMsgSize(s int) CallOption { return beforeCall(func(o *callInfo) error { o.maxReceiveMessageSize = &s return nil }) } // MaxCallSendMsgSize returns a CallOption which sets the maximum message size the client can send. func MaxCallSendMsgSize(s int) CallOption { return beforeCall(func(o *callInfo) error { o.maxSendMessageSize = &s return nil }) } // PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials // for a call. func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption { return beforeCall(func(c *callInfo) error { c.creds = creds return nil }) } // UseCompressor returns a CallOption which sets the compressor used when // sending the request. If WithCompressor is also set, UseCompressor has // higher priority. // // This API is EXPERIMENTAL. func UseCompressor(name string) CallOption { return beforeCall(func(c *callInfo) error { c.compressorType = name return nil }) } // CallContentSubtype returns a CallOption that will set the content-subtype // for a call. For example, if content-subtype is "json", the Content-Type over // the wire will be "application/grpc+json". The content-subtype is converted // to lowercase before being included in Content-Type. See Content-Type on // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for // more details. // // If CallCustomCodec is not also used, the content-subtype will be used to // look up the Codec to use in the registry controlled by RegisterCodec. See // the documention on RegisterCodec for details on registration. The lookup // of content-subtype is case-insensitive. If no such Codec is found, the call // will result in an error with code codes.Internal. // // If CallCustomCodec is also used, that Codec will be used for all request and // response messages, with the content-subtype set to the given contentSubtype // here for requests. func CallContentSubtype(contentSubtype string) CallOption { contentSubtype = strings.ToLower(contentSubtype) return beforeCall(func(c *callInfo) error { c.contentSubtype = contentSubtype return nil }) } // CallCustomCodec returns a CallOption that will set the given Codec to be // used for all request and response messages for a call. The result of calling // String() will be used as the content-subtype in a case-insensitive manner. // // See Content-Type on // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for // more details. Also see the documentation on RegisterCodec and // CallContentSubtype for more details on the interaction between Codec and // content-subtype. // // This function is provided for advanced users; prefer to use only // CallContentSubtype to select a registered codec instead. func CallCustomCodec(codec Codec) CallOption { return beforeCall(func(c *callInfo) error { c.codec = codec return nil }) } // The format of the payload: compressed or not? type payloadFormat uint8 const ( compressionNone payloadFormat = iota // no compression compressionMade ) // parser reads complete gRPC messages from the underlying reader. type parser struct { // r is the underlying reader. // See the comment on recvMsg for the permissible // error types. r io.Reader // The header of a gRPC message. Find more detail at // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md header [5]byte } // recvMsg reads a complete gRPC message from the stream. // // It returns the message and its payload (compression/encoding) // format. The caller owns the returned msg memory. // // If there is an error, possible values are: // * io.EOF, when no messages remain // * io.ErrUnexpectedEOF // * of type transport.ConnectionError // * of type transport.StreamError // No other error values or types must be returned, which also means // that the underlying io.Reader must not return an incompatible // error. func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) { if _, err := p.r.Read(p.header[:]); err != nil { return 0, nil, err } pf = payloadFormat(p.header[0]) length := binary.BigEndian.Uint32(p.header[1:]) if length == 0 { return pf, nil, nil } if int64(length) > int64(maxInt) { return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt) } if int(length) > maxReceiveMessageSize { return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) } // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead // of making it for each message: msg = make([]byte, int(length)) if _, err := p.r.Read(msg); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF } return 0, nil, err } return pf, msg, nil } // encode serializes msg and returns a buffer of message header and a buffer of msg. // If msg is nil, it generates the message header and an empty msg buffer. // TODO(ddyihai): eliminate extra Compressor parameter. func encode(c baseCodec, msg interface{}, cp Compressor, outPayload *stats.OutPayload, compressor encoding.Compressor) ([]byte, []byte, error) { var ( b []byte cbuf *bytes.Buffer ) const ( payloadLen = 1 sizeLen = 4 ) if msg != nil { var err error b, err = c.Marshal(msg) if err != nil { return nil, nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error()) } if outPayload != nil { outPayload.Payload = msg // TODO truncate large payload. outPayload.Data = b outPayload.Length = len(b) } if compressor != nil || cp != nil { cbuf = new(bytes.Buffer) // Has compressor, check Compressor is set by UseCompressor first. if compressor != nil { z, _ := compressor.Compress(cbuf) if _, err := z.Write(b); err != nil { return nil, nil, status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error()) } z.Close() } else { // If Compressor is not set by UseCompressor, use default Compressor if err := cp.Do(cbuf, b); err != nil { return nil, nil, status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error()) } } b = cbuf.Bytes() } } if uint(len(b)) > math.MaxUint32 { return nil, nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b)) } bufHeader := make([]byte, payloadLen+sizeLen) if compressor != nil || cp != nil { bufHeader[0] = byte(compressionMade) } else { bufHeader[0] = byte(compressionNone) } // Write length of b into buf binary.BigEndian.PutUint32(bufHeader[payloadLen:], uint32(len(b))) if outPayload != nil { outPayload.WireLength = payloadLen + sizeLen + len(b) } return bufHeader, b, nil } func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool) *status.Status { switch pf { case compressionNone: case compressionMade: if recvCompress == "" || recvCompress == encoding.Identity { return status.New(codes.Internal, "grpc: compressed flag set with identity or empty encoding") } if !haveCompressor { return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress) } default: return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", pf) } return nil } // For the two compressor parameters, both should not be set, but if they are, // dc takes precedence over compressor. // TODO(dfawley): wrap the old compressor/decompressor using the new API? func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload, compressor encoding.Compressor) error { pf, d, err := p.recvMsg(maxReceiveMessageSize) if err != nil { return err } if inPayload != nil { inPayload.WireLength = len(d) } if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil { return st.Err() } if pf == compressionMade { // To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor, // use this decompressor as the default. if dc != nil { d, err = dc.Do(bytes.NewReader(d)) if err != nil { return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) } } else { dcReader, err := compressor.Decompress(bytes.NewReader(d)) if err != nil { return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) } d, err = ioutil.ReadAll(dcReader) if err != nil { return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) } } } if len(d) > maxReceiveMessageSize { // TODO: Revisit the error code. Currently keep it consistent with java // implementation. return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize) } if err := c.Unmarshal(d, m); err != nil { return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) } if inPayload != nil { inPayload.RecvTime = time.Now() inPayload.Payload = m // TODO truncate large payload. inPayload.Data = d inPayload.Length = len(d) } return nil } type rpcInfo struct { failfast bool } type rpcInfoContextKey struct{} func newContextWithRPCInfo(ctx context.Context, failfast bool) context.Context { return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{failfast: failfast}) } func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) { s, ok = ctx.Value(rpcInfoContextKey{}).(*rpcInfo) return } // Code returns the error code for err if it was produced by the rpc system. // Otherwise, it returns codes.Unknown. // // Deprecated: use status.FromError and Code method instead. func Code(err error) codes.Code { if s, ok := status.FromError(err); ok { return s.Code() } return codes.Unknown } // ErrorDesc returns the error description of err if it was produced by the rpc system. // Otherwise, it returns err.Error() or empty string when err is nil. // // Deprecated: use status.FromError and Message method instead. func ErrorDesc(err error) string { if s, ok := status.FromError(err); ok { return s.Message() } return err.Error() } // Errorf returns an error containing an error code and a description; // Errorf returns nil if c is OK. // // Deprecated: use status.Errorf instead. func Errorf(c codes.Code, format string, a ...interface{}) error { return status.Errorf(c, format, a...) } // setCallInfoCodec should only be called after CallOptions have been applied. func setCallInfoCodec(c *callInfo) error { if c.codec != nil { // codec was already set by a CallOption; use it. return nil } if c.contentSubtype == "" { // No codec specified in CallOptions; use proto by default. c.codec = encoding.GetCodec(proto.Name) return nil } // c.contentSubtype is already lowercased in CallContentSubtype c.codec = encoding.GetCodec(c.contentSubtype) if c.codec == nil { return status.Errorf(codes.Internal, "no codec registered for content-subtype %s", c.contentSubtype) } return nil } // The SupportPackageIsVersion variables are referenced from generated protocol // buffer files to ensure compatibility with the gRPC version used. The latest // support package version is 5. // // Older versions are kept for compatibility. They may be removed if // compatibility cannot be maintained. // // These constants should not be referenced from any other code. const ( SupportPackageIsVersion3 = true SupportPackageIsVersion4 = true SupportPackageIsVersion5 = true ) // Version is the current grpc version. const Version = "1.10.0-dev" const grpcUA = "grpc-go/" + Version