510 lines
12 KiB
Go
510 lines
12 KiB
Go
// Copyright 2011 Google Inc. All rights reserved.
|
|
// Use of this source code is governed by the Apache 2.0
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// +build !appengine
|
|
|
|
package internal
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
netcontext "golang.org/x/net/context"
|
|
|
|
remotepb "google.golang.org/appengine/internal/remote_api"
|
|
)
|
|
|
|
const (
|
|
apiPath = "/rpc_http"
|
|
)
|
|
|
|
var (
|
|
// Incoming headers.
|
|
ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
|
|
dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
|
|
traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
|
|
curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
|
|
userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP")
|
|
remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
|
|
|
|
// Outgoing headers.
|
|
apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
|
|
apiEndpointHeaderValue = []string{"app-engine-apis"}
|
|
apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
|
|
apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"}
|
|
apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
|
|
apiContentType = http.CanonicalHeaderKey("Content-Type")
|
|
apiContentTypeValue = []string{"application/octet-stream"}
|
|
|
|
apiHTTPClient = &http.Client{
|
|
Transport: &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
Dial: limitDial,
|
|
},
|
|
}
|
|
)
|
|
|
|
func apiURL() *url.URL {
|
|
host, port := "appengine.googleapis.internal", "10001"
|
|
if h := os.Getenv("API_HOST"); h != "" {
|
|
host = h
|
|
}
|
|
if p := os.Getenv("API_PORT"); p != "" {
|
|
port = p
|
|
}
|
|
return &url.URL{
|
|
Scheme: "http",
|
|
Host: host + ":" + port,
|
|
Path: apiPath,
|
|
}
|
|
}
|
|
|
|
func handleHTTP(w http.ResponseWriter, r *http.Request) {
|
|
c := &context{
|
|
req: r,
|
|
outHeader: w.Header(),
|
|
apiURL: apiURL(),
|
|
logger: globalLogger(),
|
|
}
|
|
|
|
ctxs.Lock()
|
|
ctxs.m[r] = c
|
|
ctxs.Unlock()
|
|
defer func() {
|
|
ctxs.Lock()
|
|
delete(ctxs.m, r)
|
|
ctxs.Unlock()
|
|
}()
|
|
|
|
// Patch up RemoteAddr so it looks reasonable.
|
|
if addr := r.Header.Get(userIPHeader); addr != "" {
|
|
r.RemoteAddr = addr
|
|
} else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
|
|
r.RemoteAddr = addr
|
|
} else {
|
|
// Should not normally reach here, but pick a sensible default anyway.
|
|
r.RemoteAddr = "127.0.0.1"
|
|
}
|
|
// The address in the headers will most likely be of these forms:
|
|
// 123.123.123.123
|
|
// 2001:db8::1
|
|
// net/http.Request.RemoteAddr is specified to be in "IP:port" form.
|
|
if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
|
|
// Assume the remote address is only a host; add a default port.
|
|
r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
|
|
}
|
|
|
|
executeRequestSafely(c, r)
|
|
c.outHeader = nil // make sure header changes aren't respected any more
|
|
|
|
// Avoid nil Write call if c.Write is never called.
|
|
if c.outCode != 0 {
|
|
w.WriteHeader(c.outCode)
|
|
}
|
|
if c.outBody != nil {
|
|
w.Write(c.outBody)
|
|
}
|
|
}
|
|
|
|
func executeRequestSafely(c *context, r *http.Request) {
|
|
defer func() {
|
|
if x := recover(); x != nil {
|
|
logf(c, 4, "%s", renderPanic(x)) // 4 == critical
|
|
c.outCode = 500
|
|
}
|
|
}()
|
|
|
|
http.DefaultServeMux.ServeHTTP(c, r)
|
|
}
|
|
|
|
func renderPanic(x interface{}) string {
|
|
buf := make([]byte, 16<<10) // 16 KB should be plenty
|
|
buf = buf[:runtime.Stack(buf, false)]
|
|
|
|
// Remove the first few stack frames:
|
|
// this func
|
|
// the recover closure in the caller
|
|
// That will root the stack trace at the site of the panic.
|
|
const (
|
|
skipStart = "internal.renderPanic"
|
|
skipFrames = 2
|
|
)
|
|
start := bytes.Index(buf, []byte(skipStart))
|
|
p := start
|
|
for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
|
|
p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
|
|
if p < 0 {
|
|
break
|
|
}
|
|
}
|
|
if p >= 0 {
|
|
// buf[start:p+1] is the block to remove.
|
|
// Copy buf[p+1:] over buf[start:] and shrink buf.
|
|
copy(buf[start:], buf[p+1:])
|
|
buf = buf[:len(buf)-(p+1-start)]
|
|
}
|
|
|
|
// Add panic heading.
|
|
head := fmt.Sprintf("panic: %v\n\n", x)
|
|
if len(head) > len(buf) {
|
|
// Extremely unlikely to happen.
|
|
return head
|
|
}
|
|
copy(buf[len(head):], buf)
|
|
copy(buf, head)
|
|
|
|
return string(buf)
|
|
}
|
|
|
|
var ctxs = struct {
|
|
sync.Mutex
|
|
m map[*http.Request]*context
|
|
bg *context // background context, lazily initialized
|
|
// dec is used by tests to decorate the netcontext.Context returned
|
|
// for a given request. This allows tests to add overrides (such as
|
|
// WithAppIDOverride) to the context. The map is nil outside tests.
|
|
dec map[*http.Request]func(netcontext.Context) netcontext.Context
|
|
}{
|
|
m: make(map[*http.Request]*context),
|
|
}
|
|
|
|
// context represents the context of an in-flight HTTP request.
|
|
// It implements the appengine.Context and http.ResponseWriter interfaces.
|
|
type context struct {
|
|
req *http.Request
|
|
logger *jsonLogger
|
|
|
|
outCode int
|
|
outHeader http.Header
|
|
outBody []byte
|
|
|
|
apiURL *url.URL
|
|
}
|
|
|
|
var contextKey = "holds a *context"
|
|
|
|
func fromContext(ctx netcontext.Context) *context {
|
|
c, _ := ctx.Value(&contextKey).(*context)
|
|
return c
|
|
}
|
|
|
|
func withContext(parent netcontext.Context, c *context) netcontext.Context {
|
|
ctx := netcontext.WithValue(parent, &contextKey, c)
|
|
if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
|
|
ctx = withNamespace(ctx, ns)
|
|
}
|
|
return ctx
|
|
}
|
|
|
|
func toContext(c *context) netcontext.Context {
|
|
return withContext(netcontext.Background(), c)
|
|
}
|
|
|
|
func IncomingHeaders(ctx netcontext.Context) http.Header {
|
|
if c := fromContext(ctx); c != nil {
|
|
return c.req.Header
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context {
|
|
ctxs.Lock()
|
|
c := ctxs.m[req]
|
|
d := ctxs.dec[req]
|
|
ctxs.Unlock()
|
|
|
|
if d != nil {
|
|
parent = d(parent)
|
|
}
|
|
|
|
if c == nil {
|
|
// Someone passed in an http.Request that is not in-flight.
|
|
// We panic here rather than panicking at a later point
|
|
// so that stack traces will be more sensible.
|
|
log.Panic("appengine: NewContext passed an unknown http.Request")
|
|
}
|
|
return withContext(parent, c)
|
|
}
|
|
|
|
func BackgroundContext() netcontext.Context {
|
|
ctxs.Lock()
|
|
defer ctxs.Unlock()
|
|
|
|
if ctxs.bg != nil {
|
|
return toContext(ctxs.bg)
|
|
}
|
|
|
|
// Compute background security ticket.
|
|
appID := partitionlessAppID()
|
|
escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1)
|
|
majVersion := VersionID(nil)
|
|
if i := strings.Index(majVersion, "."); i > 0 {
|
|
majVersion = majVersion[:i]
|
|
}
|
|
ticket := fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID())
|
|
|
|
ctxs.bg = &context{
|
|
req: &http.Request{
|
|
Header: http.Header{
|
|
ticketHeader: []string{ticket},
|
|
},
|
|
},
|
|
apiURL: apiURL(),
|
|
logger: globalLogger(),
|
|
}
|
|
|
|
return toContext(ctxs.bg)
|
|
}
|
|
|
|
// RegisterTestRequest registers the HTTP request req for testing, such that
|
|
// any API calls are sent to the provided URL. It returns a closure to delete
|
|
// the registration.
|
|
// It should only be used by aetest package.
|
|
func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) func() {
|
|
c := &context{
|
|
req: req,
|
|
apiURL: apiURL,
|
|
logger: globalLogger(),
|
|
}
|
|
ctxs.Lock()
|
|
defer ctxs.Unlock()
|
|
if _, ok := ctxs.m[req]; ok {
|
|
log.Panic("req already associated with context")
|
|
}
|
|
if _, ok := ctxs.dec[req]; ok {
|
|
log.Panic("req already associated with context")
|
|
}
|
|
if ctxs.dec == nil {
|
|
ctxs.dec = make(map[*http.Request]func(netcontext.Context) netcontext.Context)
|
|
}
|
|
ctxs.m[req] = c
|
|
ctxs.dec[req] = decorate
|
|
|
|
return func() {
|
|
ctxs.Lock()
|
|
delete(ctxs.m, req)
|
|
delete(ctxs.dec, req)
|
|
ctxs.Unlock()
|
|
}
|
|
}
|
|
|
|
var errTimeout = &CallError{
|
|
Detail: "Deadline exceeded",
|
|
Code: int32(remotepb.RpcError_CANCELLED),
|
|
Timeout: true,
|
|
}
|
|
|
|
func (c *context) Header() http.Header { return c.outHeader }
|
|
|
|
// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
|
|
// codes do not permit a response body (nor response entity headers such as
|
|
// Content-Length, Content-Type, etc).
|
|
func bodyAllowedForStatus(status int) bool {
|
|
switch {
|
|
case status >= 100 && status <= 199:
|
|
return false
|
|
case status == 204:
|
|
return false
|
|
case status == 304:
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *context) Write(b []byte) (int, error) {
|
|
if c.outCode == 0 {
|
|
c.WriteHeader(http.StatusOK)
|
|
}
|
|
if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
|
|
return 0, http.ErrBodyNotAllowed
|
|
}
|
|
c.outBody = append(c.outBody, b...)
|
|
return len(b), nil
|
|
}
|
|
|
|
func (c *context) WriteHeader(code int) {
|
|
if c.outCode != 0 {
|
|
logf(c, 3, "WriteHeader called multiple times on request.") // error level
|
|
return
|
|
}
|
|
c.outCode = code
|
|
}
|
|
|
|
func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) {
|
|
hreq := &http.Request{
|
|
Method: "POST",
|
|
URL: c.apiURL,
|
|
Header: http.Header{
|
|
apiEndpointHeader: apiEndpointHeaderValue,
|
|
apiMethodHeader: apiMethodHeaderValue,
|
|
apiContentType: apiContentTypeValue,
|
|
apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
|
|
},
|
|
Body: ioutil.NopCloser(bytes.NewReader(body)),
|
|
ContentLength: int64(len(body)),
|
|
Host: c.apiURL.Host,
|
|
}
|
|
if info := c.req.Header.Get(dapperHeader); info != "" {
|
|
hreq.Header.Set(dapperHeader, info)
|
|
}
|
|
if info := c.req.Header.Get(traceHeader); info != "" {
|
|
hreq.Header.Set(traceHeader, info)
|
|
}
|
|
|
|
tr := apiHTTPClient.Transport.(*http.Transport)
|
|
|
|
var timedOut int32 // atomic; set to 1 if timed out
|
|
t := time.AfterFunc(timeout, func() {
|
|
atomic.StoreInt32(&timedOut, 1)
|
|
tr.CancelRequest(hreq)
|
|
})
|
|
defer t.Stop()
|
|
defer func() {
|
|
// Check if timeout was exceeded.
|
|
if atomic.LoadInt32(&timedOut) != 0 {
|
|
err = errTimeout
|
|
}
|
|
}()
|
|
|
|
hresp, err := apiHTTPClient.Do(hreq)
|
|
if err != nil {
|
|
return nil, &CallError{
|
|
Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
|
|
Code: int32(remotepb.RpcError_UNKNOWN),
|
|
}
|
|
}
|
|
defer hresp.Body.Close()
|
|
hrespBody, err := ioutil.ReadAll(hresp.Body)
|
|
if hresp.StatusCode != 200 {
|
|
return nil, &CallError{
|
|
Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
|
|
Code: int32(remotepb.RpcError_UNKNOWN),
|
|
}
|
|
}
|
|
if err != nil {
|
|
return nil, &CallError{
|
|
Detail: fmt.Sprintf("service bridge response bad: %v", err),
|
|
Code: int32(remotepb.RpcError_UNKNOWN),
|
|
}
|
|
}
|
|
return hrespBody, nil
|
|
}
|
|
|
|
func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error {
|
|
if ns := NamespaceFromContext(ctx); ns != "" {
|
|
if fn, ok := NamespaceMods[service]; ok {
|
|
fn(in, ns)
|
|
}
|
|
}
|
|
|
|
if f, ctx, ok := callOverrideFromContext(ctx); ok {
|
|
return f(ctx, service, method, in, out)
|
|
}
|
|
|
|
// Handle already-done contexts quickly.
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
c := fromContext(ctx)
|
|
if c == nil {
|
|
// Give a good error message rather than a panic lower down.
|
|
return errors.New("not an App Engine context")
|
|
}
|
|
|
|
// Apply transaction modifications if we're in a transaction.
|
|
if t := transactionFromContext(ctx); t != nil {
|
|
if t.finished {
|
|
return errors.New("transaction context has expired")
|
|
}
|
|
applyTransaction(in, &t.transaction)
|
|
}
|
|
|
|
// Default RPC timeout is 60s.
|
|
timeout := 60 * time.Second
|
|
if deadline, ok := ctx.Deadline(); ok {
|
|
timeout = deadline.Sub(time.Now())
|
|
}
|
|
|
|
data, err := proto.Marshal(in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ticket := c.req.Header.Get(ticketHeader)
|
|
req := &remotepb.Request{
|
|
ServiceName: &service,
|
|
Method: &method,
|
|
Request: data,
|
|
RequestId: &ticket,
|
|
}
|
|
hreqBody, err := proto.Marshal(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hrespBody, err := c.post(hreqBody, timeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
res := &remotepb.Response{}
|
|
if err := proto.Unmarshal(hrespBody, res); err != nil {
|
|
return err
|
|
}
|
|
if res.RpcError != nil {
|
|
ce := &CallError{
|
|
Detail: res.RpcError.GetDetail(),
|
|
Code: *res.RpcError.Code,
|
|
}
|
|
switch remotepb.RpcError_ErrorCode(ce.Code) {
|
|
case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
|
|
ce.Timeout = true
|
|
}
|
|
return ce
|
|
}
|
|
if res.ApplicationError != nil {
|
|
return &APIError{
|
|
Service: *req.ServiceName,
|
|
Detail: res.ApplicationError.GetDetail(),
|
|
Code: *res.ApplicationError.Code,
|
|
}
|
|
}
|
|
if res.Exception != nil || res.JavaException != nil {
|
|
// This shouldn't happen, but let's be defensive.
|
|
return &CallError{
|
|
Detail: "service bridge returned exception",
|
|
Code: int32(remotepb.RpcError_UNKNOWN),
|
|
}
|
|
}
|
|
return proto.Unmarshal(res.Response, out)
|
|
}
|
|
|
|
func (c *context) Request() *http.Request {
|
|
return c.req
|
|
}
|
|
|
|
func ContextForTesting(req *http.Request) netcontext.Context {
|
|
return toContext(&context{
|
|
req: req,
|
|
logger: testLogger,
|
|
})
|
|
}
|