Merge pull request #10895 from bigmikes/serve-panic-recovery

grpc, xds: recovery middleware to return and log error in case of panic
This commit is contained in:
Daniel Nephin 2021-12-07 18:34:40 -05:00 committed by GitHub
commit e16e6e66c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 172 additions and 18 deletions

3
.changelog/10895.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
grpc, xds: improved reliability of grpc and xds servers by adding recovery-middleware to return and log error in case of panic.
```

View File

@ -654,7 +654,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
s.registerEnterpriseGRPCServices(deps, srv)
}
return agentgrpc.NewHandler(config.RPCAddr, register)
return agentgrpc.NewHandler(deps.Logger, config.RPCAddr, register)
}
func (s *Server) connectCARootsMonitor(ctx context.Context) {

View File

@ -151,7 +151,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) {
}, hclog.New(nil))
require.NoError(t, err)
srv := newTestServer(t, "server-1", "dc1", tlsConf)
srv := newSimpleTestServer(t, "server-1", "dc1", tlsConf)
md := srv.Metadata()
res.AddServer(types.AreaWAN, md)
@ -199,7 +199,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler_viaMeshGateway(t *testing.T)
}, hclog.New(nil))
require.NoError(t, err)
srv := newTestServer(t, "bob", "dc1", tlsConf)
srv := newSimpleTestServer(t, "bob", "dc1", tlsConf)
// Send all of the traffic to dc1's server
var p tcpproxy.Proxy
@ -266,7 +266,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
srv := newTestServer(t, name, "dc1", nil)
srv := newSimpleTestServer(t, name, "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown)
}
@ -302,7 +302,7 @@ func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) {
var servers []testServer
for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
srv := newTestServer(t, name, "dc1", nil)
srv := newSimpleTestServer(t, name, "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
servers = append(servers, srv)
t.Cleanup(srv.shutdown)
@ -352,7 +352,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) {
for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
srv := newTestServer(t, name, "dc1", nil)
srv := newSimpleTestServer(t, name, "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown)
}
@ -406,7 +406,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) {
for _, dc := range dcs {
name := "server-0-" + dc
srv := newTestServer(t, name, dc, nil)
srv := newSimpleTestServer(t, name, dc, nil)
res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown)
}

View File

@ -9,29 +9,73 @@ import (
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/hashicorp/go-hclog"
)
// NewHandler returns a gRPC server that accepts connections from Handle(conn).
// The register function will be called with the grpc.Server to register
// gRPC services with the server.
func NewHandler(addr net.Addr, register func(server *grpc.Server)) *Handler {
func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server)) *Handler {
metrics := defaultMetrics()
// We don't need to pass tls.Config to the server since it's multiplexed
// behind the RPC listener, which already has TLS configured.
srv := grpc.NewServer(
recoveryOpts := PanicHandlerMiddlewareOpts(logger)
opts := []grpc.ServerOption{
grpc.StatsHandler(newStatsHandler(metrics)),
grpc.StreamInterceptor((&activeStreamCounter{metrics: metrics}).Intercept),
middleware.WithUnaryServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.UnaryServerInterceptor(recoveryOpts...),
),
middleware.WithStreamServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.StreamServerInterceptor(recoveryOpts...),
(&activeStreamCounter{metrics: metrics}).Intercept,
),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 15 * time.Second,
}),
)
}
// We don't need to pass tls.Config to the server since it's multiplexed
// behind the RPC listener, which already has TLS configured.
srv := grpc.NewServer(opts...)
register(srv)
lis := &chanListener{addr: addr, conns: make(chan net.Conn), done: make(chan struct{})}
return &Handler{srv: srv, listener: lis}
}
// PanicHandlerMiddlewareOpts returns the []recovery.Option containing
// recovery handler function.
func PanicHandlerMiddlewareOpts(logger Logger) []recovery.Option {
return []recovery.Option{
recovery.WithRecoveryHandler(NewPanicHandler(logger)),
}
}
// NewPanicHandler returns a recovery.RecoveryHandlerFunc closure function
// to handle panic in GRPC server's handlers.
func NewPanicHandler(logger Logger) recovery.RecoveryHandlerFunc {
return func(p interface{}) (err error) {
// Log the panic and the stack trace of the Goroutine that caused the panic.
stacktrace := hclog.Stacktrace()
logger.Error("panic serving grpc request",
"panic", p,
"stack", stacktrace,
)
return status.Errorf(codes.Internal, "grpc: panic serving request")
}
}
// Handler implements a handler for the rpc server listener, and the
// agent.Component interface for managing the lifecycle of the grpc.Server.
type Handler struct {

View File

@ -0,0 +1,61 @@
package grpc
import (
"bytes"
"context"
"testing"
"time"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
"github.com/hashicorp/consul/agent/grpc/resolver"
)
func TestHandler_PanicRecoveryInterceptor(t *testing.T) {
// Prepare a logger with output to a buffer
// so we can check what it writes.
var buf bytes.Buffer
logger := hclog.New(&hclog.LoggerOptions{
Output: &buf,
})
res := resolver.NewServerResolverBuilder(newConfig(t))
registerWithGRPC(t, res)
srv := newPanicTestServer(t, logger, "server-1", "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown)
pool := NewClientConnPool(ClientConnPoolConfig{
Servers: res,
UseTLSForDC: useTLSForDcAlwaysTrue,
DialingFromServer: true,
DialingFromDatacenter: "dc1",
})
conn, err := pool.ClientConn("dc1")
require.NoError(t, err)
client := testservice.NewSimpleClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
resp, err := client.Something(ctx, &testservice.Req{})
expectedErr := status.Errorf(codes.Internal, "grpc: panic serving request")
require.Equal(t, expectedErr, err)
require.Nil(t, resp)
// Read the log
strLog := buf.String()
// Checking the entire stack trace is not possible, let's
// make sure that it contains a couple of expected strings.
require.Contains(t, strLog, `[ERROR] panic serving grpc request: panic="panic from Something`)
require.Contains(t, strLog, `github.com/hashicorp/consul/agent/grpc.(*simplePanic).Something`)
}

View File

@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
)
type testServer struct {
@ -39,11 +40,22 @@ func (s testServer) Metadata() *metadata.Server {
}
}
func newTestServer(t *testing.T, name string, dc string, tlsConf *tlsutil.Configurator) testServer {
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
handler := NewHandler(addr, func(server *grpc.Server) {
func newSimpleTestServer(t *testing.T, name, dc string, tlsConf *tlsutil.Configurator) testServer {
return newTestServer(t, hclog.Default(), name, dc, tlsConf, func(server *grpc.Server) {
testservice.RegisterSimpleServer(server, &simple{name: name, dc: dc})
})
}
// newPanicTestServer sets up a simple server with handlers that panic.
func newPanicTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf *tlsutil.Configurator) testServer {
return newTestServer(t, logger, name, dc, tlsConf, func(server *grpc.Server) {
testservice.RegisterSimpleServer(server, &simplePanic{name: name, dc: dc})
})
}
func newTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf *tlsutil.Configurator, register func(server *grpc.Server)) testServer {
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
handler := NewHandler(logger, addr, register)
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
@ -103,6 +115,23 @@ func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.
return &testservice.Resp{ServerName: s.name, Datacenter: s.dc}, nil
}
type simplePanic struct {
name, dc string
}
func (s *simplePanic) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error {
for flow.Context().Err() == nil {
time.Sleep(time.Millisecond)
panic("panic from Flow")
}
return nil
}
func (s *simplePanic) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) {
time.Sleep(time.Millisecond)
panic("panic from Something")
}
// fakeRPCListener mimics agent/consul.Server.listen to handle the RPCType byte.
// In the future we should be able to refactor Server and extract this RPC
// handling logic so that we don't need to use a fake.

View File

@ -15,6 +15,7 @@ import (
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
"github.com/hashicorp/go-hclog"
)
func noopRegister(*grpc.Server) {}
@ -23,7 +24,7 @@ func TestHandler_EmitsStats(t *testing.T) {
sink, reset := patchGlobalMetrics(t)
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
handler := NewHandler(addr, noopRegister)
handler := NewHandler(hclog.Default(), addr, noopRegister)
reset()
testservice.RegisterSimpleServer(handler.srv, &simple{})

View File

@ -320,7 +320,7 @@ var _ Backend = (*testBackend)(nil)
func runTestServer(t *testing.T, server *Server) net.Addr {
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
var grpcServer *gogrpc.Server
handler := grpc.NewHandler(addr, func(srv *gogrpc.Server) {
handler := grpc.NewHandler(hclog.New(nil), addr, func(srv *gogrpc.Server) {
grpcServer = srv
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, server)
})

View File

@ -7,6 +7,8 @@ import (
"time"
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
@ -18,6 +20,7 @@ import (
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/acl"
agentgrpc "github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/tlsutil"
@ -219,8 +222,18 @@ func tokenFromContext(ctx context.Context) string {
// NewGRPCServer creates a grpc.Server, registers the Server, and then returns
// the grpc.Server.
func NewGRPCServer(s *Server, tlsConfigurator *tlsutil.Configurator) *grpc.Server {
recoveryOpts := agentgrpc.PanicHandlerMiddlewareOpts(s.Logger)
opts := []grpc.ServerOption{
grpc.MaxConcurrentStreams(2048),
middleware.WithUnaryServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.UnaryServerInterceptor(recoveryOpts...),
),
middleware.WithStreamServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.StreamServerInterceptor(recoveryOpts...),
),
}
if tlsConfigurator != nil {
if tlsConfigurator.Cert() != nil {

3
go.mod
View File

@ -29,6 +29,7 @@ require (
github.com/google/gofuzz v1.2.0
github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22
github.com/google/tcpproxy v0.0.0-20180808230851-dfa16c61dad2
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/hashicorp/consul/api v1.11.0
github.com/hashicorp/consul/sdk v0.8.0
github.com/hashicorp/go-bexpr v0.1.2
@ -73,7 +74,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pkg/errors v0.8.1
github.com/pkg/errors v0.9.1
github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35 // indirect
github.com/prometheus/client_golang v1.4.0
github.com/rboyer/safeio v0.2.1

4
go.sum
View File

@ -214,6 +214,7 @@ github.com/gophercloud/gophercloud v0.1.0 h1:P/nh25+rzXouhytV2pUHBb65fnds26Ghl8/
github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
@ -426,8 +427,9 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI=
github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=