From 011109a6f633c9dca44183c069b63960ce1cbbb2 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 28 Sep 2020 18:52:31 -0400 Subject: [PATCH] subscirbe: extract streamID and logging from Subscribe By extracting all of the tracing logic the core logic of the Subscribe endpoint is much easier to read. --- agent/consul/server.go | 26 ++++----- agent/subscribe/logger.go | 71 +++++++++++++++++++++++++ agent/subscribe/subscribe.go | 87 +++++++------------------------ agent/subscribe/subscribe_test.go | 10 ++-- 4 files changed, 109 insertions(+), 85 deletions(-) create mode 100644 agent/subscribe/logger.go diff --git a/agent/consul/server.go b/agent/consul/server.go index a8bbfde21..dbfa5b461 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -18,6 +18,16 @@ import ( "time" metrics "github.com/armon/go-metrics" + connlimit "github.com/hashicorp/go-connlimit" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/memberlist" + "github.com/hashicorp/raft" + raftboltdb "github.com/hashicorp/raft-boltdb" + "github.com/hashicorp/serf/serf" + "golang.org/x/time/rate" + "google.golang.org/grpc" + "github.com/hashicorp/consul/acl" ca "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/consul/authmethod" @@ -38,15 +48,6 @@ import ( "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" - connlimit "github.com/hashicorp/go-connlimit" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-memdb" - "github.com/hashicorp/memberlist" - "github.com/hashicorp/raft" - raftboltdb "github.com/hashicorp/raft-boltdb" - "github.com/hashicorp/serf/serf" - "golang.org/x/time/rate" - "google.golang.org/grpc" ) // These are the protocol versions that Consul can _understand_. These are @@ -615,10 +616,9 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler } register := func(srv *grpc.Server) { - pbsubscribe.RegisterStateChangeSubscriptionServer(srv, &subscribe.Server{ - Backend: &subscribeBackend{srv: s, connPool: deps.GRPCConnPool}, - Logger: deps.Logger.Named("grpc-api.subscription"), - }) + pbsubscribe.RegisterStateChangeSubscriptionServer(srv, subscribe.NewServer( + &subscribeBackend{srv: s, connPool: deps.GRPCConnPool}, + deps.Logger.Named("grpc-api.subscription"))) } return agentgrpc.NewHandler(config.RPCAddr, register) } diff --git a/agent/subscribe/logger.go b/agent/subscribe/logger.go new file mode 100644 index 000000000..b1a32a6cd --- /dev/null +++ b/agent/subscribe/logger.go @@ -0,0 +1,71 @@ +package subscribe + +import ( + "sync" + "time" + + "github.com/hashicorp/go-uuid" + + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/proto/pbsubscribe" +) + +// streamID is used in logs as a unique identifier for a subscription. The value +// is created lazily on the first call to String() so that we do not call it +// if trace logging is disabled. +// If a random UUID can not be created, defaults to the current time formatted +// as RFC3339Nano. +// +// TODO(banks) it might be nice one day to replace this with OpenTracing ID +// if one is set etc. but probably pointless until we support that properly +// in other places so it's actually propagated properly. For now this just +// makes lifetime of a stream more traceable in our regular server logs for +// debugging/dev. +type streamID struct { + once sync.Once + id string +} + +func (s *streamID) String() string { + s.once.Do(func() { + var err error + s.id, err = uuid.GenerateUUID() + if err != nil { + s.id = time.Now().Format(time.RFC3339Nano) + } + }) + return s.id +} + +func (h *Server) newLoggerForRequest(req *pbsubscribe.SubscribeRequest) Logger { + return h.Logger.With( + "topic", req.Topic.String(), + "dc", req.Datacenter, + "key", req.Key, + "index", req.Index, + "stream_id", &streamID{}) +} + +type eventLogger struct { + logger Logger + snapshotDone bool + count uint64 +} + +func (l *eventLogger) Trace(e []stream.Event) { + if len(e) == 0 { + return + } + + first := e[0] + switch { + case first.IsEndOfSnapshot() || first.IsEndOfEmptySnapshot(): + l.snapshotDone = true + l.logger.Trace("snapshot complete", "index", first.Index, "sent", l.count) + + case l.snapshotDone: + l.logger.Trace("sending events", "index", first.Index, "sent", l.count, "batch_size", len(e)) + } + + l.count += uint64(len(e)) +} diff --git a/agent/subscribe/subscribe.go b/agent/subscribe/subscribe.go index 0b1d0a6a9..191638f29 100644 --- a/agent/subscribe/subscribe.go +++ b/agent/subscribe/subscribe.go @@ -4,7 +4,7 @@ import ( "errors" "fmt" - "github.com/hashicorp/go-uuid" + "github.com/hashicorp/go-hclog" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -23,9 +23,13 @@ type Server struct { Logger Logger } +func NewServer(backend Backend, logger Logger) *Server { + return &Server{Backend: backend, Logger: logger} +} + type Logger interface { - IsTrace() bool Trace(msg string, args ...interface{}) + With(args ...interface{}) hclog.Logger } var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil) @@ -37,41 +41,17 @@ type Backend interface { } func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer) error { - // streamID is just used for message correlation in trace logs and not - // populated normally. - var streamID string - - if h.Logger.IsTrace() { - // TODO(banks) it might be nice one day to replace this with OpenTracing ID - // if one is set etc. but probably pointless until we support that properly - // in other places so it's actually propagated properly. For now this just - // makes lifetime of a stream more traceable in our regular server logs for - // debugging/dev. - var err error - streamID, err = uuid.GenerateUUID() - if err != nil { - return err - } - } - - // TODO: add fields to logger and pass logger around instead of streamID - handled, err := h.Backend.Forward(req.Datacenter, h.forwardToDC(req, serverStream, streamID)) + logger := h.newLoggerForRequest(req) + handled, err := h.Backend.Forward(req.Datacenter, forwardToDC(req, serverStream, logger)) if handled || err != nil { return err } - h.Logger.Trace("new subscription", - "topic", req.Topic.String(), - "key", req.Key, - "index", req.Index, - "stream_id", streamID, - ) - - var sentCount uint64 - defer h.Logger.Trace("subscription closed", "stream_id", streamID) + logger.Trace("new subscription") + defer logger.Trace("subscription closed") // Resolve the token and create the ACL filter. - // TODO: handle token expiry gracefully... + // TODO(streaming): handle token expiry gracefully... authz, err := h.Backend.ResolveToken(req.Token) if err != nil { return err @@ -84,12 +64,13 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub defer sub.Unsubscribe() ctx := serverStream.Context() - snapshotDone := false + + elog := &eventLogger{logger: logger} for { events, err := sub.Next(ctx) switch { case errors.Is(err, stream.ErrSubscriptionClosed): - h.Logger.Trace("subscription reset by server", "stream_id", streamID) + logger.Trace("subscription reset by server") return status.Error(codes.Aborted, err.Error()) case err != nil: return err @@ -100,23 +81,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub continue } - first := events[0] - switch { - case first.IsEndOfSnapshot() || first.IsEndOfEmptySnapshot(): - snapshotDone = true - h.Logger.Trace("snapshot complete", - "index", first.Index, "sent", sentCount, "stream_id", streamID) - - case snapshotDone: - h.Logger.Trace("sending events", - "index", first.Index, - "sent", sentCount, - "batch_size", len(events), - "stream_id", streamID, - ) - } - - sentCount += uint64(len(events)) + elog.Trace(events) e := newEventFromStreamEvents(req, events) if err := serverStream.Send(e); err != nil { return err @@ -134,26 +99,14 @@ func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest) *stream.Subscri } } -func (h *Server) forwardToDC( +func forwardToDC( req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer, - streamID string, + logger Logger, ) func(conn *grpc.ClientConn) error { return func(conn *grpc.ClientConn) error { - h.Logger.Trace("forwarding to another DC", - "dc", req.Datacenter, - "topic", req.Topic.String(), - "key", req.Key, - "index", req.Index, - "stream_id", streamID, - ) - - defer func() { - h.Logger.Trace("forwarded stream closed", - "dc", req.Datacenter, - "stream_id", streamID, - ) - }() + logger.Trace("forwarding to another DC") + defer logger.Trace("forwarded stream closed") client := pbsubscribe.NewStateChangeSubscriptionClient(conn) streamHandle, err := client.Subscribe(serverStream.Context(), req) @@ -175,7 +128,7 @@ func (h *Server) forwardToDC( // filterStreamEvents to only those allowed by the acl token. func filterStreamEvents(authz acl.Authorizer, events []stream.Event) []stream.Event { - // TODO: when is authz nil? + // authz will be nil when ACLs are disabled if authz == nil || len(events) == 0 { return events } diff --git a/agent/subscribe/subscribe_test.go b/agent/subscribe/subscribe_test.go index a005f6eee..82f1ea6f2 100644 --- a/agent/subscribe/subscribe_test.go +++ b/agent/subscribe/subscribe_test.go @@ -32,7 +32,7 @@ import ( func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { backend, err := newTestBackend() require.NoError(t, err) - srv := &Server{Backend: backend, Logger: hclog.New(nil)} + srv := NewServer(backend, hclog.New(nil)) addr := newTestServer(t, srv) ids := newCounter() @@ -373,11 +373,11 @@ func raftIndex(ids *counter, created, modified string) pbcommon.RaftIndex { func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { backendLocal, err := newTestBackend() require.NoError(t, err) - addrLocal := newTestServer(t, &Server{Backend: backendLocal, Logger: hclog.New(nil)}) + addrLocal := newTestServer(t, NewServer(backendLocal, hclog.New(nil))) backendRemoteDC, err := newTestBackend() require.NoError(t, err) - srvRemoteDC := &Server{Backend: backendRemoteDC, Logger: hclog.New(nil)} + srvRemoteDC := NewServer(backendRemoteDC, hclog.New(nil)) addrRemoteDC := newTestServer(t, srvRemoteDC) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -592,7 +592,7 @@ func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testi backend, err := newTestBackend() require.NoError(t, err) - srv := &Server{Backend: backend, Logger: hclog.New(nil)} + srv := NewServer(backend, hclog.New(nil)) addr := newTestServer(t, srv) // Create a policy for the test token. @@ -796,7 +796,7 @@ node "node1" { func TestServer_Subscribe_IntegrationWithBackend_ACLUpdate(t *testing.T) { backend, err := newTestBackend() require.NoError(t, err) - srv := &Server{Backend: backend, Logger: hclog.New(nil)} + srv := NewServer(backend, hclog.New(nil)) addr := newTestServer(t, srv) rules := `