6d76b137ba
For established xDS gRPC streams recheck ACLs for each DiscoveryRequest or DiscoveryResponse. If more than 5 minutes has elapsed since the last ACL check, recheck even without an incoming DiscoveryRequest or DiscoveryResponse. ACL failures will terminate the stream.
483 lines
16 KiB
Go
483 lines
16 KiB
Go
package xds
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/status"
|
|
|
|
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
|
envoyauthz "github.com/envoyproxy/go-control-plane/envoy/service/auth/v2alpha"
|
|
envoydisco "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
|
|
"github.com/gogo/googleapis/google/rpc"
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/cache"
|
|
"github.com/hashicorp/consul/agent/connect"
|
|
"github.com/hashicorp/consul/agent/proxycfg"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
)
|
|
|
|
// ADSStream is a shorter way of referring to this thing...
|
|
type ADSStream = envoydisco.AggregatedDiscoveryService_StreamAggregatedResourcesServer
|
|
|
|
const (
|
|
// Resource types in xDS v2. These are copied from
|
|
// envoyproxy/go-control-plane/pkg/cache/resource.go since we don't need any of
|
|
// the rest of that package.
|
|
typePrefix = "type.googleapis.com/envoy.api.v2."
|
|
|
|
// EndpointType is the TypeURL for Endpoint discovery responses.
|
|
EndpointType = typePrefix + "ClusterLoadAssignment"
|
|
|
|
// ClusterType is the TypeURL for Cluster discovery responses.
|
|
ClusterType = typePrefix + "Cluster"
|
|
|
|
// RouteType is the TypeURL for Route discovery responses.
|
|
RouteType = typePrefix + "RouteConfiguration"
|
|
|
|
// ListenerType is the TypeURL for Listener discovery responses.
|
|
ListenerType = typePrefix + "Listener"
|
|
|
|
// PublicListenerName is the name we give the public listener in Envoy config.
|
|
PublicListenerName = "public_listener"
|
|
|
|
// LocalAppClusterName is the name we give the local application "cluster" in
|
|
// Envoy config.
|
|
LocalAppClusterName = "local_app"
|
|
|
|
// LocalAgentClusterName is the name we give the local agent "cluster" in
|
|
// Envoy config.
|
|
LocalAgentClusterName = "local_agent"
|
|
|
|
// DefaultAuthCheckFrequency is the default value for
|
|
// Server.AuthCheckFrequency to use when the zero value is provided.
|
|
DefaultAuthCheckFrequency = 5 * time.Minute
|
|
)
|
|
|
|
// ACLResolverFunc is a shim to resolve ACLs. Since ACL enforcement is so far
|
|
// entirely agent-local and all uses private methods this allows a simple shim
|
|
// to be written in the agent package to allow resolving without tightly
|
|
// coupling this to the agent.
|
|
type ACLResolverFunc func(id string) (acl.Authorizer, error)
|
|
|
|
// ConnectAuthz is the interface the agent needs to expose to be able to re-use
|
|
// the authorization logic between both APIs.
|
|
type ConnectAuthz interface {
|
|
// ConnectAuthorize is implemented by Agent.ConnectAuthorize
|
|
ConnectAuthorize(token string, req *structs.ConnectAuthorizeRequest) (authz bool, reason string, m *cache.ResultMeta, err error)
|
|
}
|
|
|
|
// ConfigManager is the interface xds.Server requires to consume proxy config
|
|
// updates. It's satisfied normally by the agent's proxycfg.Manager, but allows
|
|
// easier testing without several layers of mocked cache, local state and
|
|
// proxycfg.Manager.
|
|
type ConfigManager interface {
|
|
Watch(proxyID string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc)
|
|
}
|
|
|
|
// Server represents a gRPC server that can handle both XDS and ext_authz
|
|
// requests from Envoy. All of it's public members must be set before the gRPC
|
|
// server is started.
|
|
//
|
|
// A full description of the XDS protocol can be found at
|
|
// https://github.com/envoyproxy/data-plane-api/blob/master/XDS_PROTOCOL.md
|
|
type Server struct {
|
|
Logger *log.Logger
|
|
CfgMgr ConfigManager
|
|
Authz ConnectAuthz
|
|
ResolveToken ACLResolverFunc
|
|
// AuthCheckFrequency is how often we should re-check the credentials used
|
|
// during a long-lived gRPC Stream after it has been initially established.
|
|
// This is only used during idle periods of stream interactions (i.e. when
|
|
// there has been no recent DiscoveryRequest).
|
|
AuthCheckFrequency time.Duration
|
|
}
|
|
|
|
// Initialize will finish configuring the Server for first use.
|
|
func (s *Server) Initialize() {
|
|
if s.AuthCheckFrequency == 0 {
|
|
s.AuthCheckFrequency = DefaultAuthCheckFrequency
|
|
}
|
|
}
|
|
|
|
// StreamAggregatedResources implements
|
|
// envoydisco.AggregatedDiscoveryServiceServer. This is the ADS endpoint which is
|
|
// the only xDS API we directly support for now.
|
|
func (s *Server) StreamAggregatedResources(stream ADSStream) error {
|
|
// a channel for receiving incoming requests
|
|
reqCh := make(chan *envoy.DiscoveryRequest)
|
|
reqStop := int32(0)
|
|
go func() {
|
|
for {
|
|
req, err := stream.Recv()
|
|
if atomic.LoadInt32(&reqStop) != 0 {
|
|
return
|
|
}
|
|
if err != nil {
|
|
close(reqCh)
|
|
return
|
|
}
|
|
reqCh <- req
|
|
}
|
|
}()
|
|
|
|
err := s.process(stream, reqCh)
|
|
if err != nil {
|
|
s.Logger.Printf("[DEBUG] Error handling ADS stream: %s", err)
|
|
}
|
|
|
|
// prevents writing to a closed channel if send failed on blocked recv
|
|
atomic.StoreInt32(&reqStop, 1)
|
|
|
|
return err
|
|
}
|
|
|
|
const (
|
|
stateInit int = iota
|
|
statePendingInitialConfig
|
|
stateRunning
|
|
)
|
|
|
|
func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) error {
|
|
// xDS requires a unique nonce to correlate response/request pairs
|
|
var nonce uint64
|
|
|
|
// xDS works with versions of configs. Internally we don't have a consistent
|
|
// version. We could just hash the config since versions don't have to be
|
|
// ordered as far as I can tell, but it's cheaper just to increment a counter
|
|
// every time we observe a new config since the upstream proxycfg package only
|
|
// delivers updates when there are actual changes.
|
|
var configVersion uint64
|
|
|
|
// Loop state
|
|
var cfgSnap *proxycfg.ConfigSnapshot
|
|
var req *envoy.DiscoveryRequest
|
|
var ok bool
|
|
var stateCh <-chan *proxycfg.ConfigSnapshot
|
|
var watchCancel func()
|
|
var proxyID string
|
|
|
|
// need to run a small state machine to get through initial authentication.
|
|
var state = stateInit
|
|
|
|
// Configure handlers for each type of request
|
|
handlers := map[string]*xDSType{
|
|
EndpointType: &xDSType{
|
|
typeURL: EndpointType,
|
|
resources: endpointsFromSnapshot,
|
|
stream: stream,
|
|
},
|
|
ClusterType: &xDSType{
|
|
typeURL: ClusterType,
|
|
resources: clustersFromSnapshot,
|
|
stream: stream,
|
|
},
|
|
RouteType: &xDSType{
|
|
typeURL: RouteType,
|
|
resources: routesFromSnapshot,
|
|
stream: stream,
|
|
},
|
|
ListenerType: &xDSType{
|
|
typeURL: ListenerType,
|
|
resources: listenersFromSnapshot,
|
|
stream: stream,
|
|
},
|
|
}
|
|
|
|
var authTimer <-chan time.Time
|
|
extendAuthTimer := func() {
|
|
authTimer = time.After(s.AuthCheckFrequency)
|
|
}
|
|
|
|
checkStreamACLs := func(cfgSnap *proxycfg.ConfigSnapshot) error {
|
|
if cfgSnap == nil {
|
|
return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot")
|
|
}
|
|
|
|
token := tokenFromStream(stream)
|
|
rule, err := s.ResolveToken(token)
|
|
|
|
if acl.IsErrNotFound(err) {
|
|
return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err)
|
|
} else if acl.IsErrPermissionDenied(err) {
|
|
return status.Errorf(codes.PermissionDenied, "permission denied: %v", err)
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
|
|
if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) {
|
|
return status.Errorf(codes.PermissionDenied, "permission denied")
|
|
}
|
|
|
|
// Authed OK!
|
|
return nil
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-authTimer:
|
|
// It's been too long since a Discovery{Request,Response} so recheck ACLs.
|
|
if err := checkStreamACLs(cfgSnap); err != nil {
|
|
return err
|
|
}
|
|
extendAuthTimer()
|
|
|
|
case req, ok = <-reqCh:
|
|
if !ok {
|
|
// reqCh is closed when stream.Recv errors which is how we detect client
|
|
// going away. AFAICT the stream.Context() is only cancelled once the
|
|
// RPC method returns which it can't until we return from this one so
|
|
// there's no point in blocking on that.
|
|
return nil
|
|
}
|
|
if req.TypeUrl == "" {
|
|
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
|
|
}
|
|
if handler, ok := handlers[req.TypeUrl]; ok {
|
|
handler.Recv(req)
|
|
}
|
|
case cfgSnap = <-stateCh:
|
|
// We got a new config, update the version counter
|
|
configVersion++
|
|
}
|
|
|
|
// Trigger state machine
|
|
switch state {
|
|
case stateInit:
|
|
if req == nil {
|
|
// This can't happen (tm) since stateCh is nil until after the first req
|
|
// is received but lets not panic about it.
|
|
continue
|
|
}
|
|
// Start authentication process, we need the proxyID
|
|
proxyID = req.Node.Id
|
|
|
|
// Start watching config for that proxy
|
|
stateCh, watchCancel = s.CfgMgr.Watch(proxyID)
|
|
// Note that in this case we _intend_ the defer to only be triggered when
|
|
// this whole process method ends (i.e. when streaming RPC aborts) not at
|
|
// the end of the current loop iteration. We have to do it in the loop
|
|
// here since we can't start watching until we get to this state in the
|
|
// state machine.
|
|
defer watchCancel()
|
|
|
|
// Now wait for the config so we can check ACL
|
|
state = statePendingInitialConfig
|
|
case statePendingInitialConfig:
|
|
if cfgSnap == nil {
|
|
// Nothing we can do until we get the initial config
|
|
continue
|
|
}
|
|
|
|
// Got config, try to authenticate next.
|
|
state = stateRunning
|
|
|
|
// Lets actually process the config we just got or we'll mis responding
|
|
fallthrough
|
|
case stateRunning:
|
|
// Check ACLs on every Discovery{Request,Response}.
|
|
if err := checkStreamACLs(cfgSnap); err != nil {
|
|
return err
|
|
}
|
|
// For the first time through the state machine, this is when the
|
|
// timer is first started.
|
|
extendAuthTimer()
|
|
|
|
// See if any handlers need to have the current (possibly new) config
|
|
// sent. Note the order here is actually significant so we can't just
|
|
// range the map which has no determined order. It's important because:
|
|
//
|
|
// 1. Envoy needs to see a consistent snapshot to avoid potentially
|
|
// dropping traffic due to inconsistencies. This is the
|
|
// main win of ADS after all - we get to control this order.
|
|
// 2. Non-determinsic order of complex protobuf responses which are
|
|
// compared for non-exact JSON equivalence makes the tests uber-messy
|
|
// to handle
|
|
for _, typeURL := range []string{ClusterType, EndpointType, RouteType, ListenerType} {
|
|
handler := handlers[typeURL]
|
|
if err := handler.SendIfNew(cfgSnap, configVersion, &nonce); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type xDSType struct {
|
|
typeURL string
|
|
stream ADSStream
|
|
req *envoy.DiscoveryRequest
|
|
lastNonce string
|
|
// lastVersion is the version that was last sent to the proxy. It is needed
|
|
// because we don't want to send the same version more than once.
|
|
// req.VersionInfo may be an older version than the most recent once sent in
|
|
// two cases: 1) if the ACK wasn't received yet and `req` still points to the
|
|
// previous request we already responded to and 2) if the proxy rejected the
|
|
// last version we sent with a Nack then req.VersionInfo will be the older
|
|
// version it's hanging on to.
|
|
lastVersion uint64
|
|
resources func(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error)
|
|
}
|
|
|
|
func (t *xDSType) Recv(req *envoy.DiscoveryRequest) {
|
|
if t.lastNonce == "" || t.lastNonce == req.GetResponseNonce() {
|
|
t.req = req
|
|
}
|
|
}
|
|
|
|
func (t *xDSType) SendIfNew(cfgSnap *proxycfg.ConfigSnapshot, version uint64, nonce *uint64) error {
|
|
if t.req == nil {
|
|
return nil
|
|
}
|
|
if t.lastVersion >= version {
|
|
// Already sent this version
|
|
return nil
|
|
}
|
|
resources, err := t.resources(cfgSnap, tokenFromStream(t.stream))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resources == nil || len(resources) == 0 {
|
|
// Nothing to send yet
|
|
return nil
|
|
}
|
|
|
|
// Note we only increment nonce when we actually send - not important for
|
|
// correctness but makes tests much simpler when we skip a type like Routes
|
|
// with nothing to send.
|
|
*nonce++
|
|
nonceStr := fmt.Sprintf("%08x", *nonce)
|
|
versionStr := fmt.Sprintf("%08x", version)
|
|
|
|
resp, err := createResponse(t.typeURL, versionStr, nonceStr, resources)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = t.stream.Send(resp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
t.lastVersion = version
|
|
t.lastNonce = nonceStr
|
|
return nil
|
|
}
|
|
|
|
func tokenFromStream(stream ADSStream) string {
|
|
return tokenFromContext(stream.Context())
|
|
}
|
|
|
|
func tokenFromContext(ctx context.Context) string {
|
|
md, ok := metadata.FromIncomingContext(ctx)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
toks, ok := md["x-consul-token"]
|
|
if ok && len(toks) > 0 {
|
|
return toks[0]
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// IncrementalAggregatedResources implements envoydisco.AggregatedDiscoveryServiceServer
|
|
func (s *Server) IncrementalAggregatedResources(_ envoydisco.AggregatedDiscoveryService_IncrementalAggregatedResourcesServer) error {
|
|
return errors.New("not implemented")
|
|
}
|
|
|
|
func deniedResponse(reason string) (*envoyauthz.CheckResponse, error) {
|
|
return &envoyauthz.CheckResponse{
|
|
Status: &rpc.Status{
|
|
Code: int32(rpc.PERMISSION_DENIED),
|
|
Message: "Denied: " + reason,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// Check implements envoyauthz.AuthorizationServer.
|
|
func (s *Server) Check(ctx context.Context, r *envoyauthz.CheckRequest) (*envoyauthz.CheckResponse, error) {
|
|
// Sanity checks
|
|
if r.Attributes == nil || r.Attributes.Source == nil || r.Attributes.Destination == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "source and destination attributes are required")
|
|
}
|
|
if r.Attributes.Source.Principal == "" || r.Attributes.Destination.Principal == "" {
|
|
return nil, status.Error(codes.InvalidArgument, "source and destination Principal are required")
|
|
}
|
|
|
|
// Parse destination to know the target service
|
|
dest, err := connect.ParseCertURIFromString(r.Attributes.Destination.Principal)
|
|
if err != nil {
|
|
// Treat this as an auth error since Envoy has sent something it considers
|
|
// valid, it's just not an identity we trust.
|
|
return deniedResponse("Destination Principal is not a valid Connect identity")
|
|
}
|
|
|
|
destID, ok := dest.(*connect.SpiffeIDService)
|
|
if !ok {
|
|
return deniedResponse("Destination Principal is not a valid Service identity")
|
|
}
|
|
|
|
// For now we don't validate the trust domain of the _destination_ at all -
|
|
// the HTTP Authorize endpoint just accepts a target _service_ and it's
|
|
// implicit that the request is for the correct cluster. We might want to
|
|
// reconsider this later but plumbing in additional machinery to check the
|
|
// clusterID here is not really necessary for now unless Envoys are badly
|
|
// configured. Our threat model _requires_ correctly configured and well
|
|
// behaved proxies given that they have ACLs to fetch certs and so can do
|
|
// whatever they want including not authorizing traffic at all or routing it
|
|
// do a different service than they auth'd against.
|
|
|
|
// Create an authz request
|
|
req := &structs.ConnectAuthorizeRequest{
|
|
Target: destID.Service,
|
|
ClientCertURI: r.Attributes.Source.Principal,
|
|
// TODO(banks): need Envoy to support sending cert serial/hash to enforce
|
|
// revocation later.
|
|
}
|
|
token := tokenFromContext(ctx)
|
|
authed, reason, _, err := s.Authz.ConnectAuthorize(token, req)
|
|
if err != nil {
|
|
if err == acl.ErrPermissionDenied {
|
|
return nil, status.Error(codes.PermissionDenied, err.Error())
|
|
}
|
|
return nil, status.Error(codes.Internal, err.Error())
|
|
}
|
|
if !authed {
|
|
return deniedResponse(reason)
|
|
}
|
|
|
|
return &envoyauthz.CheckResponse{
|
|
Status: &rpc.Status{
|
|
Code: int32(rpc.OK),
|
|
Message: "ALLOWED: " + reason,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// GRPCServer returns a server instance that can handle XDS and ext_authz
|
|
// requests.
|
|
func (s *Server) GRPCServer(certFile, keyFile string) (*grpc.Server, error) {
|
|
opts := []grpc.ServerOption{
|
|
grpc.MaxConcurrentStreams(2048),
|
|
}
|
|
if certFile != "" && keyFile != "" {
|
|
creds, err := credentials.NewServerTLSFromFile(certFile, keyFile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
opts = append(opts, grpc.Creds(creds))
|
|
}
|
|
srv := grpc.NewServer(opts...)
|
|
envoydisco.RegisterAggregatedDiscoveryServiceServer(srv, s)
|
|
envoyauthz.RegisterAuthorizationServer(srv, s)
|
|
return srv, nil
|
|
}
|