subscirbe: extract streamID and logging from Subscribe
By extracting all of the tracing logic the core logic of the Subscribe endpoint is much easier to read.
This commit is contained in:
parent
4c4441997a
commit
011109a6f6
|
@ -18,6 +18,16 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
metrics "github.com/armon/go-metrics"
|
metrics "github.com/armon/go-metrics"
|
||||||
|
connlimit "github.com/hashicorp/go-connlimit"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
"github.com/hashicorp/memberlist"
|
||||||
|
"github.com/hashicorp/raft"
|
||||||
|
raftboltdb "github.com/hashicorp/raft-boltdb"
|
||||||
|
"github.com/hashicorp/serf/serf"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
ca "github.com/hashicorp/consul/agent/connect/ca"
|
ca "github.com/hashicorp/consul/agent/connect/ca"
|
||||||
"github.com/hashicorp/consul/agent/consul/authmethod"
|
"github.com/hashicorp/consul/agent/consul/authmethod"
|
||||||
|
@ -38,15 +48,6 @@ import (
|
||||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
connlimit "github.com/hashicorp/go-connlimit"
|
|
||||||
"github.com/hashicorp/go-hclog"
|
|
||||||
"github.com/hashicorp/go-memdb"
|
|
||||||
"github.com/hashicorp/memberlist"
|
|
||||||
"github.com/hashicorp/raft"
|
|
||||||
raftboltdb "github.com/hashicorp/raft-boltdb"
|
|
||||||
"github.com/hashicorp/serf/serf"
|
|
||||||
"golang.org/x/time/rate"
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// These are the protocol versions that Consul can _understand_. These are
|
// These are the protocol versions that Consul can _understand_. These are
|
||||||
|
@ -615,10 +616,9 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
register := func(srv *grpc.Server) {
|
register := func(srv *grpc.Server) {
|
||||||
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, &subscribe.Server{
|
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, subscribe.NewServer(
|
||||||
Backend: &subscribeBackend{srv: s, connPool: deps.GRPCConnPool},
|
&subscribeBackend{srv: s, connPool: deps.GRPCConnPool},
|
||||||
Logger: deps.Logger.Named("grpc-api.subscription"),
|
deps.Logger.Named("grpc-api.subscription")))
|
||||||
})
|
|
||||||
}
|
}
|
||||||
return agentgrpc.NewHandler(config.RPCAddr, register)
|
return agentgrpc.NewHandler(config.RPCAddr, register)
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
package subscribe
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-uuid"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
|
)
|
||||||
|
|
||||||
|
// streamID is used in logs as a unique identifier for a subscription. The value
|
||||||
|
// is created lazily on the first call to String() so that we do not call it
|
||||||
|
// if trace logging is disabled.
|
||||||
|
// If a random UUID can not be created, defaults to the current time formatted
|
||||||
|
// as RFC3339Nano.
|
||||||
|
//
|
||||||
|
// TODO(banks) it might be nice one day to replace this with OpenTracing ID
|
||||||
|
// if one is set etc. but probably pointless until we support that properly
|
||||||
|
// in other places so it's actually propagated properly. For now this just
|
||||||
|
// makes lifetime of a stream more traceable in our regular server logs for
|
||||||
|
// debugging/dev.
|
||||||
|
type streamID struct {
|
||||||
|
once sync.Once
|
||||||
|
id string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamID) String() string {
|
||||||
|
s.once.Do(func() {
|
||||||
|
var err error
|
||||||
|
s.id, err = uuid.GenerateUUID()
|
||||||
|
if err != nil {
|
||||||
|
s.id = time.Now().Format(time.RFC3339Nano)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return s.id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Server) newLoggerForRequest(req *pbsubscribe.SubscribeRequest) Logger {
|
||||||
|
return h.Logger.With(
|
||||||
|
"topic", req.Topic.String(),
|
||||||
|
"dc", req.Datacenter,
|
||||||
|
"key", req.Key,
|
||||||
|
"index", req.Index,
|
||||||
|
"stream_id", &streamID{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type eventLogger struct {
|
||||||
|
logger Logger
|
||||||
|
snapshotDone bool
|
||||||
|
count uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *eventLogger) Trace(e []stream.Event) {
|
||||||
|
if len(e) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
first := e[0]
|
||||||
|
switch {
|
||||||
|
case first.IsEndOfSnapshot() || first.IsEndOfEmptySnapshot():
|
||||||
|
l.snapshotDone = true
|
||||||
|
l.logger.Trace("snapshot complete", "index", first.Index, "sent", l.count)
|
||||||
|
|
||||||
|
case l.snapshotDone:
|
||||||
|
l.logger.Trace("sending events", "index", first.Index, "sent", l.count, "batch_size", len(e))
|
||||||
|
}
|
||||||
|
|
||||||
|
l.count += uint64(len(e))
|
||||||
|
}
|
|
@ -4,7 +4,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/hashicorp/go-uuid"
|
"github.com/hashicorp/go-hclog"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
@ -23,9 +23,13 @@ type Server struct {
|
||||||
Logger Logger
|
Logger Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewServer(backend Backend, logger Logger) *Server {
|
||||||
|
return &Server{Backend: backend, Logger: logger}
|
||||||
|
}
|
||||||
|
|
||||||
type Logger interface {
|
type Logger interface {
|
||||||
IsTrace() bool
|
|
||||||
Trace(msg string, args ...interface{})
|
Trace(msg string, args ...interface{})
|
||||||
|
With(args ...interface{}) hclog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil)
|
var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil)
|
||||||
|
@ -37,41 +41,17 @@ type Backend interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer) error {
|
func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer) error {
|
||||||
// streamID is just used for message correlation in trace logs and not
|
logger := h.newLoggerForRequest(req)
|
||||||
// populated normally.
|
handled, err := h.Backend.Forward(req.Datacenter, forwardToDC(req, serverStream, logger))
|
||||||
var streamID string
|
|
||||||
|
|
||||||
if h.Logger.IsTrace() {
|
|
||||||
// TODO(banks) it might be nice one day to replace this with OpenTracing ID
|
|
||||||
// if one is set etc. but probably pointless until we support that properly
|
|
||||||
// in other places so it's actually propagated properly. For now this just
|
|
||||||
// makes lifetime of a stream more traceable in our regular server logs for
|
|
||||||
// debugging/dev.
|
|
||||||
var err error
|
|
||||||
streamID, err = uuid.GenerateUUID()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: add fields to logger and pass logger around instead of streamID
|
|
||||||
handled, err := h.Backend.Forward(req.Datacenter, h.forwardToDC(req, serverStream, streamID))
|
|
||||||
if handled || err != nil {
|
if handled || err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
h.Logger.Trace("new subscription",
|
logger.Trace("new subscription")
|
||||||
"topic", req.Topic.String(),
|
defer logger.Trace("subscription closed")
|
||||||
"key", req.Key,
|
|
||||||
"index", req.Index,
|
|
||||||
"stream_id", streamID,
|
|
||||||
)
|
|
||||||
|
|
||||||
var sentCount uint64
|
|
||||||
defer h.Logger.Trace("subscription closed", "stream_id", streamID)
|
|
||||||
|
|
||||||
// Resolve the token and create the ACL filter.
|
// Resolve the token and create the ACL filter.
|
||||||
// TODO: handle token expiry gracefully...
|
// TODO(streaming): handle token expiry gracefully...
|
||||||
authz, err := h.Backend.ResolveToken(req.Token)
|
authz, err := h.Backend.ResolveToken(req.Token)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -84,12 +64,13 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
|
||||||
defer sub.Unsubscribe()
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
ctx := serverStream.Context()
|
ctx := serverStream.Context()
|
||||||
snapshotDone := false
|
|
||||||
|
elog := &eventLogger{logger: logger}
|
||||||
for {
|
for {
|
||||||
events, err := sub.Next(ctx)
|
events, err := sub.Next(ctx)
|
||||||
switch {
|
switch {
|
||||||
case errors.Is(err, stream.ErrSubscriptionClosed):
|
case errors.Is(err, stream.ErrSubscriptionClosed):
|
||||||
h.Logger.Trace("subscription reset by server", "stream_id", streamID)
|
logger.Trace("subscription reset by server")
|
||||||
return status.Error(codes.Aborted, err.Error())
|
return status.Error(codes.Aborted, err.Error())
|
||||||
case err != nil:
|
case err != nil:
|
||||||
return err
|
return err
|
||||||
|
@ -100,23 +81,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
first := events[0]
|
elog.Trace(events)
|
||||||
switch {
|
|
||||||
case first.IsEndOfSnapshot() || first.IsEndOfEmptySnapshot():
|
|
||||||
snapshotDone = true
|
|
||||||
h.Logger.Trace("snapshot complete",
|
|
||||||
"index", first.Index, "sent", sentCount, "stream_id", streamID)
|
|
||||||
|
|
||||||
case snapshotDone:
|
|
||||||
h.Logger.Trace("sending events",
|
|
||||||
"index", first.Index,
|
|
||||||
"sent", sentCount,
|
|
||||||
"batch_size", len(events),
|
|
||||||
"stream_id", streamID,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
sentCount += uint64(len(events))
|
|
||||||
e := newEventFromStreamEvents(req, events)
|
e := newEventFromStreamEvents(req, events)
|
||||||
if err := serverStream.Send(e); err != nil {
|
if err := serverStream.Send(e); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -134,26 +99,14 @@ func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest) *stream.Subscri
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Server) forwardToDC(
|
func forwardToDC(
|
||||||
req *pbsubscribe.SubscribeRequest,
|
req *pbsubscribe.SubscribeRequest,
|
||||||
serverStream pbsubscribe.StateChangeSubscription_SubscribeServer,
|
serverStream pbsubscribe.StateChangeSubscription_SubscribeServer,
|
||||||
streamID string,
|
logger Logger,
|
||||||
) func(conn *grpc.ClientConn) error {
|
) func(conn *grpc.ClientConn) error {
|
||||||
return func(conn *grpc.ClientConn) error {
|
return func(conn *grpc.ClientConn) error {
|
||||||
h.Logger.Trace("forwarding to another DC",
|
logger.Trace("forwarding to another DC")
|
||||||
"dc", req.Datacenter,
|
defer logger.Trace("forwarded stream closed")
|
||||||
"topic", req.Topic.String(),
|
|
||||||
"key", req.Key,
|
|
||||||
"index", req.Index,
|
|
||||||
"stream_id", streamID,
|
|
||||||
)
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
h.Logger.Trace("forwarded stream closed",
|
|
||||||
"dc", req.Datacenter,
|
|
||||||
"stream_id", streamID,
|
|
||||||
)
|
|
||||||
}()
|
|
||||||
|
|
||||||
client := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
client := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||||
streamHandle, err := client.Subscribe(serverStream.Context(), req)
|
streamHandle, err := client.Subscribe(serverStream.Context(), req)
|
||||||
|
@ -175,7 +128,7 @@ func (h *Server) forwardToDC(
|
||||||
|
|
||||||
// filterStreamEvents to only those allowed by the acl token.
|
// filterStreamEvents to only those allowed by the acl token.
|
||||||
func filterStreamEvents(authz acl.Authorizer, events []stream.Event) []stream.Event {
|
func filterStreamEvents(authz acl.Authorizer, events []stream.Event) []stream.Event {
|
||||||
// TODO: when is authz nil?
|
// authz will be nil when ACLs are disabled
|
||||||
if authz == nil || len(events) == 0 {
|
if authz == nil || len(events) == 0 {
|
||||||
return events
|
return events
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ import (
|
||||||
func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
||||||
backend, err := newTestBackend()
|
backend, err := newTestBackend()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
srv := &Server{Backend: backend, Logger: hclog.New(nil)}
|
srv := NewServer(backend, hclog.New(nil))
|
||||||
addr := newTestServer(t, srv)
|
addr := newTestServer(t, srv)
|
||||||
ids := newCounter()
|
ids := newCounter()
|
||||||
|
|
||||||
|
@ -373,11 +373,11 @@ func raftIndex(ids *counter, created, modified string) pbcommon.RaftIndex {
|
||||||
func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
||||||
backendLocal, err := newTestBackend()
|
backendLocal, err := newTestBackend()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
addrLocal := newTestServer(t, &Server{Backend: backendLocal, Logger: hclog.New(nil)})
|
addrLocal := newTestServer(t, NewServer(backendLocal, hclog.New(nil)))
|
||||||
|
|
||||||
backendRemoteDC, err := newTestBackend()
|
backendRemoteDC, err := newTestBackend()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
srvRemoteDC := &Server{Backend: backendRemoteDC, Logger: hclog.New(nil)}
|
srvRemoteDC := NewServer(backendRemoteDC, hclog.New(nil))
|
||||||
addrRemoteDC := newTestServer(t, srvRemoteDC)
|
addrRemoteDC := newTestServer(t, srvRemoteDC)
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
@ -592,7 +592,7 @@ func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testi
|
||||||
|
|
||||||
backend, err := newTestBackend()
|
backend, err := newTestBackend()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
srv := &Server{Backend: backend, Logger: hclog.New(nil)}
|
srv := NewServer(backend, hclog.New(nil))
|
||||||
addr := newTestServer(t, srv)
|
addr := newTestServer(t, srv)
|
||||||
|
|
||||||
// Create a policy for the test token.
|
// Create a policy for the test token.
|
||||||
|
@ -796,7 +796,7 @@ node "node1" {
|
||||||
func TestServer_Subscribe_IntegrationWithBackend_ACLUpdate(t *testing.T) {
|
func TestServer_Subscribe_IntegrationWithBackend_ACLUpdate(t *testing.T) {
|
||||||
backend, err := newTestBackend()
|
backend, err := newTestBackend()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
srv := &Server{Backend: backend, Logger: hclog.New(nil)}
|
srv := NewServer(backend, hclog.New(nil))
|
||||||
addr := newTestServer(t, srv)
|
addr := newTestServer(t, srv)
|
||||||
|
|
||||||
rules := `
|
rules := `
|
||||||
|
|
Loading…
Reference in New Issue