subscribe: add a stateless subscribe service for the gRPC server
With a Backend that provides access to the necessary dependencies.
This commit is contained in:
parent
2f6c98ee0e
commit
371ec2d70a
|
@ -1,6 +1,7 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/grpc"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
|
@ -14,4 +15,5 @@ type Deps struct {
|
|||
Tokens *token.Store
|
||||
Router *router.Router
|
||||
ConnPool *pool.ConnPool
|
||||
GRPCConnPool *grpc.ClientConnPool
|
||||
}
|
||||
|
|
|
@ -26,14 +26,16 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/usagemetrics"
|
||||
"github.com/hashicorp/consul/agent/grpc"
|
||||
agentgrpc "github.com/hashicorp/consul/agent/grpc"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/subscribe"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
connlimit "github.com/hashicorp/go-connlimit"
|
||||
|
@ -44,6 +46,7 @@ import (
|
|||
raftboltdb "github.com/hashicorp/raft-boltdb"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// These are the protocol versions that Consul can _understand_. These are
|
||||
|
@ -577,7 +580,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
|
|||
}
|
||||
go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
|
||||
|
||||
s.grpcHandler = newGRPCHandlerFromConfig(logger, config)
|
||||
s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s)
|
||||
|
||||
// Initialize Autopilot. This must happen before starting leadership monitoring
|
||||
// as establishing leadership could attempt to use autopilot and cause a panic.
|
||||
|
@ -606,12 +609,18 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
|
|||
return s, nil
|
||||
}
|
||||
|
||||
func newGRPCHandlerFromConfig(logger hclog.Logger, config *Config) connHandler {
|
||||
func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler {
|
||||
if !config.EnableGRPCServer {
|
||||
return grpc.NoOpHandler{Logger: logger}
|
||||
return agentgrpc.NoOpHandler{Logger: deps.Logger}
|
||||
}
|
||||
|
||||
return grpc.NewHandler(config.RPCAddr)
|
||||
register := func(srv *grpc.Server) {
|
||||
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, &subscribe.Server{
|
||||
Backend: &subscribeBackend{srv: s, connPool: deps.GRPCConnPool},
|
||||
Logger: deps.Logger.Named("grpc-api.subscription"),
|
||||
})
|
||||
}
|
||||
return agentgrpc.NewHandler(config.RPCAddr, register)
|
||||
}
|
||||
|
||||
func (s *Server) connectCARootsMonitor(ctx context.Context) {
|
||||
|
|
|
@ -19,7 +19,7 @@ const (
|
|||
|
||||
// ErrSubscriptionClosed is a error signalling the subscription has been
|
||||
// closed. The client should Unsubscribe, then re-Subscribe.
|
||||
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe")
|
||||
var ErrSubscriptionClosed = errors.New("subscription closed by server, client must reset state and resubscribe")
|
||||
|
||||
// Subscription provides events on a Topic. Events may be filtered by Key.
|
||||
// Events are returned by Next(), and may start with a Snapshot of events.
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
agentgrpc "github.com/hashicorp/consul/agent/grpc"
|
||||
"github.com/hashicorp/consul/agent/subscribe"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type subscribeBackend struct {
|
||||
srv *Server
|
||||
connPool *agentgrpc.ClientConnPool
|
||||
}
|
||||
|
||||
// TODO: refactor Resolve methods to an ACLBackend that can be used by all
|
||||
// the endpoints.
|
||||
func (s subscribeBackend) ResolveToken(token string) (acl.Authorizer, error) {
|
||||
return s.srv.ResolveToken(token)
|
||||
}
|
||||
|
||||
var _ subscribe.Backend = (*subscribeBackend)(nil)
|
||||
|
||||
// Forward requests to a remote datacenter by calling f if the target dc does not
|
||||
// match the config. Does nothing but return handled=false if dc is not specified,
|
||||
// or if it matches the Datacenter in config.
|
||||
//
|
||||
// TODO: extract this so that it can be used with other grpc services.
|
||||
func (s subscribeBackend) Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error) {
|
||||
if dc == "" || dc == s.srv.config.Datacenter {
|
||||
return false, nil
|
||||
}
|
||||
conn, err := s.connPool.ClientConn(dc)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, f(conn)
|
||||
}
|
||||
|
||||
func (s subscribeBackend) Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) {
|
||||
return s.srv.fsm.State().EventPublisher().Subscribe(req)
|
||||
}
|
|
@ -11,15 +11,16 @@ import (
|
|||
)
|
||||
|
||||
// NewHandler returns a gRPC server that accepts connections from Handle(conn).
|
||||
func NewHandler(addr net.Addr) *Handler {
|
||||
// The register function will be called with the grpc.Server to register
|
||||
// gRPC services with the server.
|
||||
func NewHandler(addr net.Addr, register func(server *grpc.Server)) *Handler {
|
||||
// We don't need to pass tls.Config to the server since it's multiplexed
|
||||
// behind the RPC listener, which already has TLS configured.
|
||||
srv := grpc.NewServer(
|
||||
grpc.StatsHandler(newStatsHandler()),
|
||||
grpc.StreamInterceptor((&activeStreamCounter{}).Intercept),
|
||||
)
|
||||
|
||||
// TODO(streaming): add gRPC services to srv here
|
||||
register(srv)
|
||||
|
||||
lis := &chanListener{addr: addr, conns: make(chan net.Conn)}
|
||||
return &Handler{srv: srv, listener: lis}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type testServer struct {
|
||||
|
@ -28,9 +29,9 @@ func (s testServer) Metadata() *metadata.Server {
|
|||
|
||||
func newTestServer(t *testing.T, name string, dc string) testServer {
|
||||
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
|
||||
handler := NewHandler(addr)
|
||||
|
||||
testservice.RegisterSimpleServer(handler.srv, &simple{name: name, dc: dc})
|
||||
handler := NewHandler(addr, func(server *grpc.Server) {
|
||||
testservice.RegisterSimpleServer(server, &simple{name: name, dc: dc})
|
||||
})
|
||||
|
||||
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -14,11 +14,14 @@ import (
|
|||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func noopRegister(*grpc.Server) {}
|
||||
|
||||
func TestHandler_EmitsStats(t *testing.T) {
|
||||
sink := patchGlobalMetrics(t)
|
||||
|
||||
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
|
||||
handler := NewHandler(addr)
|
||||
handler := NewHandler(addr, noopRegister)
|
||||
|
||||
testservice.RegisterSimpleServer(handler.srv, &simple{})
|
||||
|
||||
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
"github.com/hashicorp/consul/agent/grpc"
|
||||
"github.com/hashicorp/consul/agent/grpc/resolver"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
|
@ -86,6 +87,8 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
|
|||
// TODO(streaming): setConfig.Scheme name for tests
|
||||
builder := resolver.NewServerResolverBuilder(resolver.Config{})
|
||||
resolver.RegisterWithGRPC(builder)
|
||||
d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()))
|
||||
|
||||
d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter), builder)
|
||||
|
||||
acConf := autoconf.Config{
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
package subscribe
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
)
|
||||
|
||||
// EnforceACL takes an acl.Authorizer and returns the decision for whether the
|
||||
// event is allowed to be sent to this client or not.
|
||||
func enforceACL(authz acl.Authorizer, e stream.Event) acl.EnforcementDecision {
|
||||
switch {
|
||||
case e.IsEndOfSnapshot(), e.IsEndOfEmptySnapshot():
|
||||
return acl.Allow
|
||||
}
|
||||
|
||||
switch p := e.Payload.(type) {
|
||||
case state.EventPayloadCheckServiceNode:
|
||||
csn := p.Value
|
||||
if csn.Node == nil || csn.Service == nil || csn.Node.Node == "" || csn.Service.Service == "" {
|
||||
return acl.Deny
|
||||
}
|
||||
|
||||
// TODO: what about acl.Default?
|
||||
// TODO(streaming): we need the AuthorizerContext for ent
|
||||
if dec := authz.NodeRead(csn.Node.Node, nil); dec != acl.Allow {
|
||||
return acl.Deny
|
||||
}
|
||||
|
||||
// TODO(streaming): we need the AuthorizerContext for ent
|
||||
// Enterprise support for streaming events - they don't have enough data to
|
||||
// populate it yet.
|
||||
if dec := authz.ServiceRead(csn.Service.Service, nil); dec != acl.Allow {
|
||||
return acl.Deny
|
||||
}
|
||||
return acl.Allow
|
||||
}
|
||||
|
||||
return acl.Deny
|
||||
}
|
|
@ -1,45 +1,66 @@
|
|||
package subscribe
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/proto/pbevent"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
)
|
||||
|
||||
// Server implements a StateChangeSubscriptionServer for accepting SubscribeRequests,
|
||||
// and sending events to the subscription topic.
|
||||
type Server struct {
|
||||
srv *Server
|
||||
logger hclog.Logger
|
||||
Backend Backend
|
||||
Logger Logger
|
||||
}
|
||||
|
||||
var _ pbevent.StateChangeSubscriptionServer = (*Server)(nil)
|
||||
type Logger interface {
|
||||
IsTrace() bool
|
||||
Trace(msg string, args ...interface{})
|
||||
}
|
||||
|
||||
func (h *Server) Subscribe(req *pbevent.SubscribeRequest, serverStream pbevent.StateChangeSubscription_SubscribeServer) error {
|
||||
var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil)
|
||||
|
||||
type Backend interface {
|
||||
ResolveToken(token string) (acl.Authorizer, error)
|
||||
Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error)
|
||||
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
|
||||
}
|
||||
|
||||
func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer) error {
|
||||
// streamID is just used for message correlation in trace logs and not
|
||||
// populated normally.
|
||||
var streamID string
|
||||
var err error
|
||||
|
||||
if h.logger.IsTrace() {
|
||||
if h.Logger.IsTrace() {
|
||||
// TODO(banks) it might be nice one day to replace this with OpenTracing ID
|
||||
// if one is set etc. but probably pointless until we support that properly
|
||||
// in other places so it's actually propagated properly. For now this just
|
||||
// makes lifetime of a stream more traceable in our regular server logs for
|
||||
// debugging/dev.
|
||||
var err error
|
||||
streamID, err = uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Forward the request to a remote DC if applicable.
|
||||
if req.Datacenter != "" && req.Datacenter != h.srv.config.Datacenter {
|
||||
return h.forwardAndProxy(req, serverStream, streamID)
|
||||
// TODO: add fields to logger and pass logger around instead of streamID
|
||||
handled, err := h.Backend.Forward(req.Datacenter, h.forwardToDC(req, serverStream, streamID))
|
||||
if handled || err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
h.srv.logger.Trace("new subscription",
|
||||
h.Logger.Trace("new subscription",
|
||||
"topic", req.Topic.String(),
|
||||
"key", req.Key,
|
||||
"index", req.Index,
|
||||
|
@ -47,141 +68,180 @@ func (h *Server) Subscribe(req *pbevent.SubscribeRequest, serverStream pbevent.S
|
|||
)
|
||||
|
||||
var sentCount uint64
|
||||
defer h.srv.logger.Trace("subscription closed", "stream_id", streamID)
|
||||
defer h.Logger.Trace("subscription closed", "stream_id", streamID)
|
||||
|
||||
// Resolve the token and create the ACL filter.
|
||||
// TODO: handle token expiry gracefully...
|
||||
authz, err := h.srv.ResolveToken(req.Token)
|
||||
authz, err := h.Backend.ResolveToken(req.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
aclFilter := newACLFilter(authz, h.srv.logger, h.srv.config.ACLEnforceVersion8)
|
||||
|
||||
state := h.srv.fsm.State()
|
||||
|
||||
// Register a subscription on this topic/key with the FSM.
|
||||
sub, err := state.Subscribe(serverStream.Context(), req)
|
||||
sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer state.Unsubscribe(req)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
// Deliver the events
|
||||
ctx := serverStream.Context()
|
||||
snapshotDone := false
|
||||
for {
|
||||
events, err := sub.Next()
|
||||
if err == stream.ErrSubscriptionReload {
|
||||
event := pbevent.Event{
|
||||
Payload: &pbevent.Event_ResetStream{ResetStream: true},
|
||||
}
|
||||
if err := serverStream.Send(&event); err != nil {
|
||||
return err
|
||||
}
|
||||
h.srv.logger.Trace("subscription reloaded",
|
||||
"stream_id", streamID,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
events, err := sub.Next(ctx)
|
||||
switch {
|
||||
// TODO: test case
|
||||
case errors.Is(err, stream.ErrSubscriptionClosed):
|
||||
h.Logger.Trace("subscription reset by server", "stream_id", streamID)
|
||||
return status.Error(codes.Aborted, err.Error())
|
||||
case err != nil:
|
||||
return err
|
||||
}
|
||||
|
||||
aclFilter.filterStreamEvents(&events)
|
||||
events = filterStreamEvents(authz, events)
|
||||
if len(events) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
snapshotDone := false
|
||||
if len(events) == 1 {
|
||||
if events[0].GetEndOfSnapshot() {
|
||||
snapshotDone = true
|
||||
h.srv.logger.Trace("snapshot complete",
|
||||
"index", events[0].Index,
|
||||
"sent", sentCount,
|
||||
"stream_id", streamID,
|
||||
)
|
||||
} else if events[0].GetResumeStream() {
|
||||
snapshotDone = true
|
||||
h.srv.logger.Trace("resuming stream",
|
||||
"index", events[0].Index,
|
||||
"sent", sentCount,
|
||||
"stream_id", streamID,
|
||||
)
|
||||
} else if snapshotDone {
|
||||
// Count this event too in the normal case as "sent" the above cases
|
||||
// only show the number of events sent _before_ the snapshot ended.
|
||||
h.srv.logger.Trace("sending events",
|
||||
"index", events[0].Index,
|
||||
"sent", sentCount,
|
||||
"batch_size", 1,
|
||||
"stream_id", streamID,
|
||||
)
|
||||
}
|
||||
sentCount++
|
||||
if err := serverStream.Send(&events[0]); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if len(events) > 1 {
|
||||
e := &pbevent.Event{
|
||||
Topic: req.Topic,
|
||||
Key: req.Key,
|
||||
Index: events[0].Index,
|
||||
Payload: &pbevent.Event_EventBatch{
|
||||
EventBatch: &pbevent.EventBatch{
|
||||
Events: pbevent.EventBatchEventsFromEventSlice(events),
|
||||
},
|
||||
},
|
||||
}
|
||||
sentCount += uint64(len(events))
|
||||
h.srv.logger.Trace("sending events",
|
||||
"index", events[0].Index,
|
||||
first := events[0]
|
||||
switch {
|
||||
case first.IsEndOfSnapshot() || first.IsEndOfEmptySnapshot():
|
||||
snapshotDone = true
|
||||
h.Logger.Trace("snapshot complete",
|
||||
"index", first.Index, "sent", sentCount, "stream_id", streamID)
|
||||
case snapshotDone:
|
||||
h.Logger.Trace("sending events",
|
||||
"index", first.Index,
|
||||
"sent", sentCount,
|
||||
"batch_size", len(events),
|
||||
"stream_id", streamID,
|
||||
)
|
||||
if err := serverStream.Send(e); err != nil {
|
||||
}
|
||||
|
||||
sentCount += uint64(len(events))
|
||||
e := newEventFromStreamEvents(req, events)
|
||||
if err := serverStream.Send(e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: can be replaced by mog conversion
|
||||
func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest) *stream.SubscribeRequest {
|
||||
return &stream.SubscribeRequest{
|
||||
Topic: req.Topic,
|
||||
Key: req.Key,
|
||||
Token: req.Token,
|
||||
Index: req.Index,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Server) forwardToDC(
|
||||
req *pbsubscribe.SubscribeRequest,
|
||||
serverStream pbsubscribe.StateChangeSubscription_SubscribeServer,
|
||||
streamID string,
|
||||
) func(conn *grpc.ClientConn) error {
|
||||
return func(conn *grpc.ClientConn) error {
|
||||
h.Logger.Trace("forwarding to another DC",
|
||||
"dc", req.Datacenter,
|
||||
"topic", req.Topic.String(),
|
||||
"key", req.Key,
|
||||
"index", req.Index,
|
||||
"stream_id", streamID,
|
||||
)
|
||||
|
||||
defer func() {
|
||||
h.Logger.Trace("forwarded stream closed",
|
||||
"dc", req.Datacenter,
|
||||
"stream_id", streamID,
|
||||
)
|
||||
}()
|
||||
|
||||
client := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||
streamHandle, err := client.Subscribe(serverStream.Context(), req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
event, err := streamHandle.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := serverStream.Send(event); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Server) forwardAndProxy(
|
||||
req *pbevent.SubscribeRequest,
|
||||
serverStream pbevent.StateChangeSubscription_SubscribeServer,
|
||||
streamID string) error {
|
||||
|
||||
conn, err := h.srv.grpcClient.GRPCConn(req.Datacenter)
|
||||
if err != nil {
|
||||
return err
|
||||
// filterStreamEvents to only those allowed by the acl token.
|
||||
func filterStreamEvents(authz acl.Authorizer, events []stream.Event) []stream.Event {
|
||||
// TODO: when is authz nil?
|
||||
if authz == nil || len(events) == 0 {
|
||||
return events
|
||||
}
|
||||
|
||||
h.logger.Trace("forwarding to another DC",
|
||||
"dc", req.Datacenter,
|
||||
"topic", req.Topic.String(),
|
||||
"key", req.Key,
|
||||
"index", req.Index,
|
||||
"stream_id", streamID,
|
||||
)
|
||||
|
||||
defer func() {
|
||||
h.logger.Trace("forwarded stream closed",
|
||||
"dc", req.Datacenter,
|
||||
"stream_id", streamID,
|
||||
)
|
||||
}()
|
||||
|
||||
// Open a Subscribe call to the remote DC.
|
||||
client := pbevent.NewConsulClient(conn)
|
||||
streamHandle, err := client.Subscribe(serverStream.Context(), req)
|
||||
if err != nil {
|
||||
return err
|
||||
// Fast path for the common case of only 1 event since we can avoid slice
|
||||
// allocation in the hot path of every single update event delivered in vast
|
||||
// majority of cases with this. Note that this is called _per event/item_ when
|
||||
// sending snapshots which is a lot worse than being called once on regular
|
||||
// result.
|
||||
if len(events) == 1 {
|
||||
if enforceACL(authz, events[0]) == acl.Allow {
|
||||
return events
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Relay the events back to the client.
|
||||
for {
|
||||
event, err := streamHandle.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
var filtered []stream.Event
|
||||
for idx := range events {
|
||||
event := events[idx]
|
||||
if enforceACL(authz, event) == acl.Allow {
|
||||
filtered = append(filtered, event)
|
||||
}
|
||||
if err := serverStream.Send(event); err != nil {
|
||||
return err
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
func newEventFromStreamEvents(req *pbsubscribe.SubscribeRequest, events []stream.Event) *pbsubscribe.Event {
|
||||
e := &pbsubscribe.Event{
|
||||
Topic: req.Topic,
|
||||
Key: req.Key,
|
||||
Index: events[0].Index,
|
||||
}
|
||||
if len(events) == 1 {
|
||||
setPayload(e, events[0].Payload)
|
||||
return e
|
||||
}
|
||||
|
||||
e.Payload = &pbsubscribe.Event_EventBatch{
|
||||
EventBatch: &pbsubscribe.EventBatch{
|
||||
Events: batchEventsFromEventSlice(events),
|
||||
},
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
func setPayload(e *pbsubscribe.Event, payload interface{}) {
|
||||
switch p := payload.(type) {
|
||||
case state.EventPayloadCheckServiceNode:
|
||||
e.Payload = &pbsubscribe.Event_ServiceHealth{
|
||||
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||
Op: p.Op,
|
||||
// TODO: this could be cached
|
||||
CheckServiceNode: pbservice.NewCheckServiceNodeFromStructs(p.Value),
|
||||
},
|
||||
}
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected payload: %T: %#v", p, p))
|
||||
}
|
||||
}
|
||||
|
||||
func batchEventsFromEventSlice(events []stream.Event) []*pbsubscribe.Event {
|
||||
result := make([]*pbsubscribe.Event, len(events))
|
||||
for i := range events {
|
||||
event := events[i]
|
||||
result[i] = &pbsubscribe.Event{Key: event.Key, Index: event.Index}
|
||||
setPayload(result[i], event.Payload)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue