diff --git a/.changelog/12819.txt b/.changelog/12819.txt new file mode 100644 index 000000000..c98b59c3c --- /dev/null +++ b/.changelog/12819.txt @@ -0,0 +1,3 @@ +```release-note:improvement +grpc: Add a new ServerDiscovery.WatchServers gRPC endpoint for being notified when the set of ready servers has changed. +``` \ No newline at end of file diff --git a/agent/consul/grpc_integration_test.go b/agent/consul/grpc_integration_test.go index c243ebfee..d588e324d 100644 --- a/agent/consul/grpc_integration_test.go +++ b/agent/consul/grpc_integration_test.go @@ -2,16 +2,15 @@ package consul import ( "context" - "net" - "os" "testing" + "time" "github.com/stretchr/testify/require" - "google.golang.org/grpc" "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/grpc/public" "github.com/hashicorp/consul/proto-public/pbconnectca" - "github.com/hashicorp/consul/testrpc" + "github.com/hashicorp/consul/proto-public/pbserverdiscovery" ) func TestGRPCIntegration_ConnectCA_Sign(t *testing.T) { @@ -19,8 +18,6 @@ func TestGRPCIntegration_ConnectCA_Sign(t *testing.T) { t.Skip("too slow for testing.Short") } - t.Parallel() - // The gRPC endpoint itself well-tested with mocks. This test checks we're // correctly wiring everything up in the server by: // @@ -28,42 +25,24 @@ func TestGRPCIntegration_ConnectCA_Sign(t *testing.T) { // * Making a request to a follower's public gRPC port. // * Ensuring that the request is correctly forwarded to the leader. // * Ensuring we get a valid certificate back (so it went through the CAManager). - dir1, server1 := testServerWithConfig(t, func(c *Config) { + server1, conn1 := testGRPCIntegrationServer(t, func(c *Config) { c.Bootstrap = false c.BootstrapExpect = 2 }) - defer os.RemoveAll(dir1) - defer server1.Shutdown() - dir2, server2 := testServerWithConfig(t, func(c *Config) { + server2, conn2 := testGRPCIntegrationServer(t, func(c *Config) { c.Bootstrap = false }) - defer os.RemoveAll(dir2) - defer server2.Shutdown() joinLAN(t, server2, server1) - testrpc.WaitForLeader(t, server1.RPC, "dc1") + waitForLeaderEstablishment(t, server1, server2) - var follower *Server - if server1.IsLeader() { - follower = server2 - } else { - follower = server1 + conn := conn2 + if server2.IsLeader() { + conn = conn1 } - // publicGRPCServer is bound to a listener by the wrapping agent code, so we - // need to do it ourselves here. - lis, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err) - go func() { - require.NoError(t, follower.publicGRPCServer.Serve(lis)) - }() - t.Cleanup(follower.publicGRPCServer.Stop) - - conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) - require.NoError(t, err) - client := pbconnectca.NewConnectCAServiceClient(conn) csr, _ := connect.TestCSR(t, &connect.SpiffeIDService{ @@ -73,8 +52,13 @@ func TestGRPCIntegration_ConnectCA_Sign(t *testing.T) { Service: "foo", }) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + ctx = public.ContextWithToken(ctx, TestDefaultInitialManagementToken) + // This would fail if it wasn't forwarded to the leader. - rsp, err := client.Sign(context.Background(), &pbconnectca.SignRequest{ + rsp, err := client.Sign(ctx, &pbconnectca.SignRequest{ Csr: csr, }) require.NoError(t, err) @@ -82,3 +66,52 @@ func TestGRPCIntegration_ConnectCA_Sign(t *testing.T) { _, err = connect.ParseCert(rsp.CertPem) require.NoError(t, err) } + +func TestGRPCIntegration_ServerDiscovery_WatchServers(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + // The gRPC endpoint itself well-tested with mocks. This test checks we're + // correctly wiring everything up in the server by: + // + // * Starting a server + // * Initiating the gRPC stream + // * Validating the snapshot + // * Adding another server + // * Validating another message is sent. + + server1, conn := testGRPCIntegrationServer(t, func(c *Config) { + c.Bootstrap = true + c.BootstrapExpect = 1 + }) + waitForLeaderEstablishment(t, server1) + + client := pbserverdiscovery.NewServerDiscoveryServiceClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + ctx = public.ContextWithToken(ctx, TestDefaultInitialManagementToken) + + serverStream, err := client.WatchServers(ctx, &pbserverdiscovery.WatchServersRequest{Wan: false}) + require.NoError(t, err) + + rsp, err := serverStream.Recv() + require.NoError(t, err) + require.NotNil(t, rsp) + require.Len(t, rsp.Servers, 1) + + _, server2, _ := testACLServerWithConfig(t, func(c *Config) { + c.Bootstrap = false + }, false) + + // join the new server to the leader + joinLAN(t, server2, server1) + + // now receive the event containing 2 servers + rsp, err = serverStream.Recv() + require.NoError(t, err) + require.NotNil(t, rsp) + require.Len(t, rsp.Servers, 2) +} diff --git a/agent/consul/server.go b/agent/consul/server.go index bfe0d65fe..163b1fe38 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -46,6 +46,7 @@ import ( "github.com/hashicorp/consul/agent/grpc/private/services/subscribe" "github.com/hashicorp/consul/agent/grpc/public/services/connectca" "github.com/hashicorp/consul/agent/grpc/public/services/dataplane" + "github.com/hashicorp/consul/agent/grpc/public/services/serverdiscovery" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" @@ -683,6 +684,12 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve Datacenter: s.config.Datacenter, }).Register(s.publicGRPCServer) + serverdiscovery.NewServer(serverdiscovery.Config{ + Publisher: s.publisher, + ACLResolver: plainACLResolver{s.ACLResolver}, + Logger: logger.Named("grpc-api.server-discovery"), + }).Register(s.publicGRPCServer) + // Initialize private gRPC server. // // Note: some "public" gRPC services are also exposed on the private gRPC server diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 0d6a4925b..8e693afab 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -257,6 +257,26 @@ func testACLServerWithConfig(t *testing.T, cb func(*Config), initReplicationToke return dir, srv, codec } +func testGRPCIntegrationServer(t *testing.T, cb func(*Config)) (*Server, *grpc.ClientConn) { + _, srv, _ := testACLServerWithConfig(t, cb, false) + + // Normally the gRPC server listener is created at the agent level and passed down into + // the Server creation. For our tests, we need to ensure + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + go func() { + _ = srv.publicGRPCServer.Serve(ln) + }() + t.Cleanup(srv.publicGRPCServer.Stop) + + conn, err := grpc.Dial(ln.Addr().String(), grpc.WithInsecure()) + require.NoError(t, err) + + t.Cleanup(func() { _ = conn.Close() }) + + return srv, conn +} + func newServer(t *testing.T, c *Config) (*Server, error) { return newServerWithDeps(t, c, newDefaultDeps(t, c)) } diff --git a/agent/grpc/public/forward.go b/agent/grpc/public/forward.go new file mode 100644 index 000000000..398d33d51 --- /dev/null +++ b/agent/grpc/public/forward.go @@ -0,0 +1,16 @@ +package public + +import ( + "context" + + "google.golang.org/grpc/metadata" +) + +func ForwardMetadataContext(ctx context.Context) context.Context { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return ctx + } + + return metadata.NewOutgoingContext(ctx, md) +} diff --git a/agent/grpc/public/services/connectca/server_test.go b/agent/grpc/public/services/connectca/server_test.go index def654bf8..b382f8823 100644 --- a/agent/grpc/public/services/connectca/server_test.go +++ b/agent/grpc/public/services/connectca/server_test.go @@ -2,17 +2,15 @@ package connectca import ( "context" - "net" - "sync" "testing" - "time" "github.com/stretchr/testify/require" "google.golang.org/grpc" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" - "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/grpc/public/testutils" + structs "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto-public/pbconnectca" ) @@ -20,68 +18,26 @@ func noopForwardRPC(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error) return false, nil } -func testStateStore(t *testing.T, publisher state.EventPublisher) *state.Store { +func setupFSMAndPublisher(t *testing.T) (*testutils.FakeFSM, state.EventPublisher) { t.Helper() - gc, err := state.NewTombstoneGC(time.Second, time.Millisecond) - require.NoError(t, err) + config := testutils.FakeFSMConfig{ + Register: func(fsm *testutils.FakeFSM, publisher *stream.EventPublisher) { + // register handlers + publisher.RegisterHandler(state.EventTopicCARoots, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + return fsm.GetStore().CARootsSnapshot(req, buf) + }) + }, + Refresh: []stream.Topic{state.EventTopicCARoots}, + } - return state.NewStateStoreWithEventPublisher(gc, publisher) -} - -type FakeFSM struct { - lock sync.Mutex - store *state.Store - publisher *stream.EventPublisher -} - -func newFakeFSM(t *testing.T, publisher *stream.EventPublisher) *FakeFSM { - t.Helper() - - store := testStateStore(t, publisher) - - fsm := FakeFSM{store: store, publisher: publisher} - - // register handlers - publisher.RegisterHandler(state.EventTopicCARoots, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { - return fsm.GetStore().CARootsSnapshot(req, buf) - }) - - return &fsm -} - -func (f *FakeFSM) GetStore() *state.Store { - f.lock.Lock() - defer f.lock.Unlock() - return f.store -} - -func (f *FakeFSM) ReplaceStore(store *state.Store) { - f.lock.Lock() - defer f.lock.Unlock() - oldStore := f.store - f.store = store - oldStore.Abandon() - f.publisher.RefreshTopic(state.EventTopicCARoots) -} - -func setupFSMAndPublisher(t *testing.T) (*FakeFSM, state.EventPublisher) { - t.Helper() - publisher := stream.NewEventPublisher(10 * time.Second) - - fsm := newFakeFSM(t, publisher) - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - go publisher.Run(ctx) - - return fsm, publisher + return testutils.SetupFSMAndPublisher(t, config) } func testClient(t *testing.T, server *Server) pbconnectca.ConnectCAServiceClient { t.Helper() - addr := runTestServer(t, server) + addr := testutils.RunTestServer(t, server) conn, err := grpc.DialContext(context.Background(), addr.String(), grpc.WithInsecure()) require.NoError(t, err) @@ -91,18 +47,3 @@ func testClient(t *testing.T, server *Server) pbconnectca.ConnectCAServiceClient return pbconnectca.NewConnectCAServiceClient(conn) } - -func runTestServer(t *testing.T, server *Server) net.Addr { - t.Helper() - - lis, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err) - - grpcServer := grpc.NewServer() - server.Register(grpcServer) - - go grpcServer.Serve(lis) - t.Cleanup(grpcServer.Stop) - - return lis.Addr() -} diff --git a/agent/grpc/public/services/connectca/sign.go b/agent/grpc/public/services/connectca/sign.go index d6a21d616..b3ace6d3d 100644 --- a/agent/grpc/public/services/connectca/sign.go +++ b/agent/grpc/public/services/connectca/sign.go @@ -22,7 +22,7 @@ func (s *Server) Sign(ctx context.Context, req *pbconnectca.SignRequest) (*pbcon return nil, err } - logger := s.Logger.Named("sign").With("request_id", traceID()) + logger := s.Logger.Named("sign").With("request_id", public.TraceID()) logger.Trace("request received") token := public.TokenFromContext(ctx) @@ -48,6 +48,7 @@ func (s *Server) Sign(ctx context.Context, req *pbconnectca.SignRequest) (*pbcon var rsp *pbconnectca.SignResponse handled, err := s.ForwardRPC(&rpcInfo, func(conn *grpc.ClientConn) error { logger.Trace("forwarding RPC") + ctx := public.ForwardMetadataContext(ctx) var err error rsp, err = pbconnectca.NewConnectCAServiceClient(conn).Sign(ctx, req) return err diff --git a/agent/grpc/public/services/connectca/sign_test.go b/agent/grpc/public/services/connectca/sign_test.go index 600b1056c..a4f891b8c 100644 --- a/agent/grpc/public/services/connectca/sign_test.go +++ b/agent/grpc/public/services/connectca/sign_test.go @@ -15,6 +15,7 @@ import ( acl "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/grpc/public/testutils" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto-public/pbconnectca" ) @@ -231,7 +232,7 @@ func TestSign_RPCForwarding(t *testing.T) { ForwardRPC: noopForwardRPC, ConnectEnabled: true, }) - leaderConn, err := grpc.Dial(runTestServer(t, leader).String(), grpc.WithInsecure()) + leaderConn, err := grpc.Dial(testutils.RunTestServer(t, leader).String(), grpc.WithInsecure()) require.NoError(t, err) follower := NewServer(Config{ diff --git a/agent/grpc/public/services/connectca/watch_roots.go b/agent/grpc/public/services/connectca/watch_roots.go index 1d458b558..cee37d7aa 100644 --- a/agent/grpc/public/services/connectca/watch_roots.go +++ b/agent/grpc/public/services/connectca/watch_roots.go @@ -11,7 +11,6 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-uuid" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" @@ -30,7 +29,7 @@ func (s *Server) WatchRoots(_ *emptypb.Empty, serverStream pbconnectca.ConnectCA return err } - logger := s.Logger.Named("watch-roots").With("stream_id", traceID()) + logger := s.Logger.Named("watch-roots").With("request_id", public.TraceID()) logger.Trace("starting stream") defer logger.Trace("stream closed") @@ -181,16 +180,6 @@ func (s *Server) authorize(token string) error { return nil } -// We tag logs with a unique identifier to ease debugging. In the future this -// should probably be a real Open Telemetry trace ID. -func traceID() string { - id, err := uuid.GenerateUUID() - if err != nil { - return "" - } - return id -} - func getTrustDomain(store StateStore, logger hclog.Logger) (string, error) { _, cfg, err := store.CAConfig(nil) switch { diff --git a/agent/grpc/public/services/connectca/watch_roots_test.go b/agent/grpc/public/services/connectca/watch_roots_test.go index da4317c8c..acaa349f1 100644 --- a/agent/grpc/public/services/connectca/watch_roots_test.go +++ b/agent/grpc/public/services/connectca/watch_roots_test.go @@ -230,7 +230,7 @@ func TestWatchRoots_StateStoreAbandoned(t *testing.T) { mustGetRoots(t, rspCh) // Simulate a snapshot restore. - storeB := testStateStore(t, publisher) + storeB := testutils.TestStateStore(t, publisher) rootB := connect.TestCA(t, nil) _, err = storeB.CARootSetCAS(1, 0, structs.CARoots{rootB}) diff --git a/agent/grpc/public/services/dataplane/get_envoy_boostrap_params_test.go b/agent/grpc/public/services/dataplane/get_envoy_boostrap_params_test.go index 0db8f5f22..072068861 100644 --- a/agent/grpc/public/services/dataplane/get_envoy_boostrap_params_test.go +++ b/agent/grpc/public/services/dataplane/get_envoy_boostrap_params_test.go @@ -69,7 +69,7 @@ func TestGetEnvoyBootstrapParams_Success(t *testing.T) { } run := func(t *testing.T, tc testCase) { - store := testStateStore(t) + store := testutils.TestStateStore(t, nil) err := store.EnsureRegistration(1, tc.registerReq) require.NoError(t, err) @@ -148,7 +148,7 @@ func TestGetEnvoyBootstrapParams_Error(t *testing.T) { Return(testutils.TestAuthorizerServiceRead(t, proxyServiceID), nil) ctx := public.ContextWithToken(context.Background(), testToken) - store := testStateStore(t) + store := testutils.TestStateStore(t, nil) registerReq := testRegisterRequestProxy(t) err := store.EnsureRegistration(1, registerReq) require.NoError(t, err) @@ -217,7 +217,7 @@ func TestGetEnvoyBootstrapParams_Unauthenticated(t *testing.T) { aclResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything). Return(nil, acl.ErrNotFound) ctx := public.ContextWithToken(context.Background(), testToken) - store := testStateStore(t) + store := testutils.TestStateStore(t, nil) server := NewServer(Config{ GetStore: func() StateStore { return store }, Logger: hclog.NewNullLogger(), @@ -236,7 +236,7 @@ func TestGetEnvoyBootstrapParams_PermissionDenied(t *testing.T) { aclResolver.On("ResolveTokenAndDefaultMeta", testToken, mock.Anything, mock.Anything). Return(acl.DenyAll(), nil) ctx := public.ContextWithToken(context.Background(), testToken) - store := testStateStore(t) + store := testutils.TestStateStore(t, nil) registerReq := structs.TestRegisterRequestProxy(t) proxyServiceID := "web-sidecar-proxy" registerReq.Service.ID = proxyServiceID diff --git a/agent/grpc/public/services/dataplane/server_test.go b/agent/grpc/public/services/dataplane/server_test.go index 95aea273a..fa0a24b91 100644 --- a/agent/grpc/public/services/dataplane/server_test.go +++ b/agent/grpc/public/services/dataplane/server_test.go @@ -2,29 +2,18 @@ package dataplane import ( "context" - "net" "testing" - "time" - "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/grpc/public/testutils" "github.com/hashicorp/consul/proto-public/pbdataplane" "github.com/stretchr/testify/require" "google.golang.org/grpc" ) -func testStateStore(t *testing.T) *state.Store { - t.Helper() - - gc, err := state.NewTombstoneGC(time.Second, time.Millisecond) - require.NoError(t, err) - - return state.NewStateStore(gc) -} - func testClient(t *testing.T, server *Server) pbdataplane.DataplaneServiceClient { t.Helper() - addr := RunTestServer(t, server) + addr := testutils.RunTestServer(t, server) conn, err := grpc.DialContext(context.Background(), addr.String(), grpc.WithInsecure()) require.NoError(t, err) @@ -34,18 +23,3 @@ func testClient(t *testing.T, server *Server) pbdataplane.DataplaneServiceClient return pbdataplane.NewDataplaneServiceClient(conn) } - -func RunTestServer(t *testing.T, server *Server) net.Addr { - t.Helper() - - lis, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err) - - grpcServer := grpc.NewServer() - server.Register(grpcServer) - - go grpcServer.Serve(lis) - t.Cleanup(grpcServer.Stop) - - return lis.Addr() -} diff --git a/agent/grpc/public/services/serverdiscovery/mock_ACLResolver.go b/agent/grpc/public/services/serverdiscovery/mock_ACLResolver.go new file mode 100644 index 000000000..909e9c617 --- /dev/null +++ b/agent/grpc/public/services/serverdiscovery/mock_ACLResolver.go @@ -0,0 +1,36 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package serverdiscovery + +import ( + acl "github.com/hashicorp/consul/acl" + mock "github.com/stretchr/testify/mock" +) + +// MockACLResolver is an autogenerated mock type for the ACLResolver type +type MockACLResolver struct { + mock.Mock +} + +// ResolveTokenAndDefaultMeta provides a mock function with given fields: _a0, _a1, _a2 +func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.EnterpriseMeta, _a2 *acl.AuthorizerContext) (acl.Authorizer, error) { + ret := _m.Called(_a0, _a1, _a2) + + var r0 acl.Authorizer + if rf, ok := ret.Get(0).(func(string, *acl.EnterpriseMeta, *acl.AuthorizerContext) acl.Authorizer); ok { + r0 = rf(_a0, _a1, _a2) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(acl.Authorizer) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, *acl.EnterpriseMeta, *acl.AuthorizerContext) error); ok { + r1 = rf(_a0, _a1, _a2) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/agent/grpc/public/services/serverdiscovery/server.go b/agent/grpc/public/services/serverdiscovery/server.go new file mode 100644 index 000000000..ec82b47fa --- /dev/null +++ b/agent/grpc/public/services/serverdiscovery/server.go @@ -0,0 +1,38 @@ +package serverdiscovery + +import ( + "google.golang.org/grpc" + + "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/proto-public/pbserverdiscovery" +) + +type Server struct { + Config +} + +type Config struct { + Publisher EventPublisher + Logger hclog.Logger + ACLResolver ACLResolver +} + +type EventPublisher interface { + Subscribe(*stream.SubscribeRequest) (*stream.Subscription, error) +} + +//go:generate mockery -name ACLResolver -inpkg +type ACLResolver interface { + ResolveTokenAndDefaultMeta(string, *acl.EnterpriseMeta, *acl.AuthorizerContext) (acl.Authorizer, error) +} + +func NewServer(cfg Config) *Server { + return &Server{cfg} +} + +func (s *Server) Register(grpcServer *grpc.Server) { + pbserverdiscovery.RegisterServerDiscoveryServiceServer(grpcServer, s) +} diff --git a/agent/grpc/public/services/serverdiscovery/server_test.go b/agent/grpc/public/services/serverdiscovery/server_test.go new file mode 100644 index 000000000..2c26f2a1c --- /dev/null +++ b/agent/grpc/public/services/serverdiscovery/server_test.go @@ -0,0 +1,89 @@ +package serverdiscovery + +import ( + "context" + "testing" + "time" + + mock "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + "github.com/hashicorp/consul/agent/consul/autopilotevents" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/grpc/public/testutils" + "github.com/hashicorp/consul/proto-public/pbserverdiscovery" +) + +type mockSnapshotHandler struct { + mock.Mock +} + +func newMockSnapshotHandler(t *testing.T) *mockSnapshotHandler { + handler := &mockSnapshotHandler{} + t.Cleanup(func() { + handler.AssertExpectations(t) + }) + return handler +} + +func (m *mockSnapshotHandler) handle(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + ret := m.Called(req, buf) + return ret.Get(0).(uint64), ret.Error(1) +} + +func (m *mockSnapshotHandler) expect(token string, requestIndex uint64, eventIndex uint64, payload autopilotevents.EventPayloadReadyServers) { + m.On("handle", stream.SubscribeRequest{ + Topic: autopilotevents.EventTopicReadyServers, + Subject: stream.SubjectNone, + Token: token, + Index: requestIndex, + }, mock.Anything).Once().Run(func(args mock.Arguments) { + buf := args.Get(1).(stream.SnapshotAppender) + buf.Append([]stream.Event{ + { + Topic: autopilotevents.EventTopicReadyServers, + Index: eventIndex, + Payload: payload, + }, + }) + }).Return(eventIndex, nil) +} + +func newMockACLResolver(t *testing.T) *MockACLResolver { + t.Helper() + m := &MockACLResolver{} + t.Cleanup(func() { m.AssertExpectations(t) }) + return m +} + +func setupPublisher(t *testing.T) (*mockSnapshotHandler, state.EventPublisher) { + t.Helper() + + handler := newMockSnapshotHandler(t) + + publisher := stream.NewEventPublisher(10 * time.Second) + publisher.RegisterHandler(autopilotevents.EventTopicReadyServers, handler.handle) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go publisher.Run(ctx) + + return handler, publisher + +} + +func testClient(t *testing.T, server *Server) pbserverdiscovery.ServerDiscoveryServiceClient { + t.Helper() + + addr := testutils.RunTestServer(t, server) + + conn, err := grpc.DialContext(context.Background(), addr.String(), grpc.WithInsecure()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, conn.Close()) + }) + + return pbserverdiscovery.NewServerDiscoveryServiceClient(conn) +} diff --git a/agent/grpc/public/services/serverdiscovery/watch_servers.go b/agent/grpc/public/services/serverdiscovery/watch_servers.go new file mode 100644 index 000000000..6ceda83ff --- /dev/null +++ b/agent/grpc/public/services/serverdiscovery/watch_servers.go @@ -0,0 +1,146 @@ +package serverdiscovery + +import ( + "context" + "errors" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/autopilotevents" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/grpc/public" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto-public/pbserverdiscovery" + "github.com/hashicorp/go-hclog" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// WatchServers provides a stream on which you can receive the list of servers +// that are ready to receive incoming requests including stale queries. The +// current set of ready servers are sent immediately at the start of the +// stream and new updates will be sent whenver the set of ready servers changes. +func (s *Server) WatchServers(req *pbserverdiscovery.WatchServersRequest, serverStream pbserverdiscovery.ServerDiscoveryService_WatchServersServer) error { + logger := s.Logger.Named("watch-servers").With("request_id", public.TraceID()) + + logger.Debug("starting stream") + defer logger.Trace("stream closed") + + token := public.TokenFromContext(serverStream.Context()) + + // Serve the ready servers from an EventPublisher subscription. If the subscription is + // closed due to an ACL change, we'll attempt to re-authorize and resume it to + // prevent unnecessarily terminating the stream. + var idx uint64 + for { + var err error + idx, err = s.serveReadyServers(token, idx, req, serverStream, logger) + if errors.Is(err, stream.ErrSubForceClosed) { + logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt to re-auth and resume") + } else { + return err + } + } +} + +func (s *Server) serveReadyServers(token string, index uint64, req *pbserverdiscovery.WatchServersRequest, serverStream pbserverdiscovery.ServerDiscoveryService_WatchServersServer, logger hclog.Logger) (uint64, error) { + if err := s.authorize(token); err != nil { + return 0, err + } + + // Start the subscription. + sub, err := s.Publisher.Subscribe(&stream.SubscribeRequest{ + Topic: autopilotevents.EventTopicReadyServers, + Subject: stream.SubjectNone, + Token: token, + Index: index, + }) + if err != nil { + logger.Error("failed to subscribe to server discovery events", "error", err) + return 0, status.Error(codes.Internal, "failed to subscribe to server discovery events") + } + defer sub.Unsubscribe() + + for { + event, err := sub.Next(serverStream.Context()) + switch { + case errors.Is(err, stream.ErrSubForceClosed): + return index, err + case errors.Is(err, context.Canceled): + return 0, nil + case err != nil: + logger.Error("failed to read next event", "error", err) + return index, status.Error(codes.Internal, err.Error()) + } + + // We do not send framing events (e.g. EndOfSnapshot, NewSnapshotToFollow) + // because we send a full list of ready servers on every event, rather than expecting + // clients to maintain a state-machine in the way they do for service health. + if event.IsFramingEvent() { + continue + } + + // Note: this check isn't strictly necessary because the event publishing + // machinery will ensure the index increases monotonically, but it can be + // tricky to faithfully reproduce this in tests (e.g. the EventPublisher + // garbage collects topic buffers and snapshots aggressively when streams + // disconnect) so this avoids a bunch of confusing setup code. + if event.Index <= index { + continue + } + + index = event.Index + + rsp, err := eventToResponse(req, event) + if err != nil { + logger.Error("failed to convert event to response", "error", err) + return index, status.Error(codes.Internal, err.Error()) + } + if err := serverStream.Send(rsp); err != nil { + logger.Error("failed to send response", "error", err) + return index, err + } + } +} + +func (s *Server) authorize(token string) error { + // Require the given ACL token to have `service:write` on any service (in any + // partition and namespace). + var authzContext acl.AuthorizerContext + entMeta := structs.WildcardEnterpriseMetaInPartition(structs.WildcardSpecifier) + authz, err := s.ACLResolver.ResolveTokenAndDefaultMeta(token, entMeta, &authzContext) + if err != nil { + return status.Error(codes.Unauthenticated, err.Error()) + } + if err := authz.ToAllowAuthorizer().ServiceWriteAnyAllowed(&authzContext); err != nil { + return status.Error(codes.PermissionDenied, err.Error()) + } + return nil +} + +func eventToResponse(req *pbserverdiscovery.WatchServersRequest, event stream.Event) (*pbserverdiscovery.WatchServersResponse, error) { + readyServers, err := autopilotevents.ExtractEventPayload(event) + if err != nil { + return nil, err + } + + var servers []*pbserverdiscovery.Server + + for _, srv := range readyServers { + addr := srv.Address + + wanAddr, ok := srv.TaggedAddresses[structs.TaggedAddressWAN] + if req.Wan && ok { + addr = wanAddr + } + + servers = append(servers, &pbserverdiscovery.Server{ + Id: srv.ID, + Version: srv.Version, + Address: addr, + }) + } + + return &pbserverdiscovery.WatchServersResponse{ + Servers: servers, + }, nil +} diff --git a/agent/grpc/public/services/serverdiscovery/watch_servers_test.go b/agent/grpc/public/services/serverdiscovery/watch_servers_test.go new file mode 100644 index 000000000..1409431d9 --- /dev/null +++ b/agent/grpc/public/services/serverdiscovery/watch_servers_test.go @@ -0,0 +1,302 @@ +package serverdiscovery + +import ( + "context" + "errors" + "io" + "testing" + "time" + + acl "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/autopilotevents" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/grpc/public" + "github.com/hashicorp/consul/agent/grpc/public/testutils" + "github.com/hashicorp/consul/proto-public/pbserverdiscovery" + "github.com/hashicorp/consul/proto/prototest" + "github.com/hashicorp/consul/sdk/testutil" + mock "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const testACLToken = "eb61f1ed-65a4-4da6-8d3d-0564bd16c965" + +func TestWatchServers_StreamLifeCycle(t *testing.T) { + // The flow for this test is roughly: + // + // 1. Open a WatchServers stream + // 2. Observe the snapshot message is sent back through + // the stream. + // 3. Publish an event that changes to 2 servers. + // 4. See the corresponding message sent back through the stream. + // 5. Send a NewCloseSubscriptionEvent for the token secret. + // 6. See that a new snapshot is taken and the corresponding message + // gets sent back. If there were multiple subscribers for the topic + // then this should not happen. However with the current EventPublisher + // implementation, whenever the last subscriber for a topic has its + // subscription closed then the publisher will delete the whole topic + // buffer. When that happens, resubscribing will see no snapshot + // cache, or latest event in the buffer and force creating a new snapshot. + // 7. Publish another event to move to 3 servers. + // 8. Ensure that the message gets sent through the stream. Also + // this will validate that no other 1 or 2 server event is + // seen after stream reinitialization. + + srv1 := autopilotevents.ReadyServerInfo{ + ID: "9aeb73f6-e83e-43c1-bdc9-ca5e43efe3e4", + Address: "198.18.0.1", + Version: "1.12.0", + } + srv2 := autopilotevents.ReadyServerInfo{ + ID: "eec8721f-c42b-48da-a5a5-07565158015e", + Address: "198.18.0.2", + Version: "1.12.3", + } + srv3 := autopilotevents.ReadyServerInfo{ + ID: "256796f2-3a38-4f80-8cef-375c3cb3aa1f", + Address: "198.18.0.3", + Version: "1.12.3", + } + + oneServerEventPayload := autopilotevents.EventPayloadReadyServers{srv1} + twoServerEventPayload := autopilotevents.EventPayloadReadyServers{srv1, srv2} + threeServerEventPayload := autopilotevents.EventPayloadReadyServers{srv1, srv2, srv3} + + oneServerResponse := &pbserverdiscovery.WatchServersResponse{ + Servers: []*pbserverdiscovery.Server{ + { + Id: srv1.ID, + Address: srv1.Address, + Version: srv1.Version, + }, + }, + } + + twoServerResponse := &pbserverdiscovery.WatchServersResponse{ + Servers: []*pbserverdiscovery.Server{ + { + Id: srv1.ID, + Address: srv1.Address, + Version: srv1.Version, + }, + { + Id: srv2.ID, + Address: srv2.Address, + Version: srv2.Version, + }, + }, + } + + threeServerResponse := &pbserverdiscovery.WatchServersResponse{ + Servers: []*pbserverdiscovery.Server{ + { + Id: srv1.ID, + Address: srv1.Address, + Version: srv1.Version, + }, + { + Id: srv2.ID, + Address: srv2.Address, + Version: srv2.Version, + }, + { + Id: srv3.ID, + Address: srv3.Address, + Version: srv3.Version, + }, + }, + } + + // setup the event publisher and snapshot handler + handler, publisher := setupPublisher(t) + // we only expect this to be called once. For the rest of the + // test we ought to be able to resume the stream. + handler.expect(testACLToken, 0, 1, oneServerEventPayload) + handler.expect(testACLToken, 2, 3, twoServerEventPayload) + + // setup the mock ACLResolver and its expectations + // 2 times authorization should succeed and the third should fail. + resolver := newMockACLResolver(t) + resolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything). + Return(testutils.TestAuthorizerServiceWriteAny(t), nil).Twice() + + // add the token to the requests context + ctx := public.ContextWithToken(context.Background(), testACLToken) + + // setup the server + server := NewServer(Config{ + Publisher: publisher, + Logger: testutil.Logger(t), + ACLResolver: resolver, + }) + + // Run the server and get a test client for it + client := testClient(t, server) + + // 1. Open the WatchServers stream + serverStream, err := client.WatchServers(ctx, &pbserverdiscovery.WatchServersRequest{Wan: false}) + require.NoError(t, err) + + rspCh := handleReadyServersStream(t, serverStream) + + // 2. Observe the snapshot message is sent back through the stream. + rsp := mustGetServers(t, rspCh) + require.NotNil(t, rsp) + prototest.AssertDeepEqual(t, oneServerResponse, rsp) + + // 3. Publish an event that changes to 2 servers. + publisher.Publish([]stream.Event{ + { + Topic: autopilotevents.EventTopicReadyServers, + Index: 2, + Payload: twoServerEventPayload, + }, + }) + + // 4. See the corresponding message sent back through the stream. + rsp = mustGetServers(t, rspCh) + require.NotNil(t, rsp) + prototest.AssertDeepEqual(t, twoServerResponse, rsp) + + // 5. Send a NewCloseSubscriptionEvent for the token secret. + publisher.Publish([]stream.Event{ + stream.NewCloseSubscriptionEvent([]string{testACLToken}), + }) + + // 6. Observe another snapshot message + rsp = mustGetServers(t, rspCh) + require.NotNil(t, rsp) + prototest.AssertDeepEqual(t, twoServerResponse, rsp) + + // 7. Publish another event to move to 3 servers. + publisher.Publish([]stream.Event{ + { + Topic: autopilotevents.EventTopicReadyServers, + Index: 4, + Payload: threeServerEventPayload, + }, + }) + + // 8. Ensure that the message gets sent through the stream. Also + // this will validate that no other 1 or 2 server event is + // seen after stream reinitialization. + rsp = mustGetServers(t, rspCh) + require.NotNil(t, rsp) + prototest.AssertDeepEqual(t, threeServerResponse, rsp) +} + +func TestWatchServers_ACLToken_PermissionDenied(t *testing.T) { + // setup the event publisher and snapshot handler + _, publisher := setupPublisher(t) + + resolver := newMockACLResolver(t) + resolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything). + Return(acl.DenyAll(), nil).Once() + + // add the token to the requests context + ctx := public.ContextWithToken(context.Background(), testACLToken) + + // setup the server + server := NewServer(Config{ + Publisher: publisher, + Logger: testutil.Logger(t), + ACLResolver: resolver, + }) + + // Run the server and get a test client for it + client := testClient(t, server) + + // 1. Open the WatchServers stream + serverStream, err := client.WatchServers(ctx, &pbserverdiscovery.WatchServersRequest{Wan: false}) + require.NoError(t, err) + rspCh := handleReadyServersStream(t, serverStream) + + // Expect to get an Unauthenticated error immediately. + err = mustGetError(t, rspCh) + require.Equal(t, codes.PermissionDenied.String(), status.Code(err).String()) +} + +func TestWatchServers_ACLToken_Unauthenticated(t *testing.T) { + // setup the event publisher and snapshot handler + _, publisher := setupPublisher(t) + + resolver := newMockACLResolver(t) + resolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything). + Return(nil, acl.ErrNotFound).Once() + + // add the token to the requests context + ctx := public.ContextWithToken(context.Background(), testACLToken) + + // setup the server + server := NewServer(Config{ + Publisher: publisher, + Logger: testutil.Logger(t), + ACLResolver: resolver, + }) + + // Run the server and get a test client for it + client := testClient(t, server) + + // 1. Open the WatchServers stream + serverStream, err := client.WatchServers(ctx, &pbserverdiscovery.WatchServersRequest{Wan: false}) + require.NoError(t, err) + rspCh := handleReadyServersStream(t, serverStream) + + // Expect to get an Unauthenticated error immediately. + err = mustGetError(t, rspCh) + require.Equal(t, codes.Unauthenticated.String(), status.Code(err).String()) +} + +func handleReadyServersStream(t *testing.T, stream pbserverdiscovery.ServerDiscoveryService_WatchServersClient) <-chan serversOrError { + t.Helper() + + rspCh := make(chan serversOrError) + go func() { + for { + rsp, err := stream.Recv() + if errors.Is(err, io.EOF) || + errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) { + return + } + rspCh <- serversOrError{ + rsp: rsp, + err: err, + } + } + }() + return rspCh +} + +func mustGetServers(t *testing.T, ch <-chan serversOrError) *pbserverdiscovery.WatchServersResponse { + t.Helper() + + select { + case rsp := <-ch: + require.NoError(t, rsp.err) + return rsp.rsp + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for WatchServersResponse") + return nil + } +} + +func mustGetError(t *testing.T, ch <-chan serversOrError) error { + t.Helper() + + select { + case rsp := <-ch: + require.Error(t, rsp.err) + return rsp.err + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for WatchServersResponse") + return nil + } +} + +type serversOrError struct { + rsp *pbserverdiscovery.WatchServersResponse + err error +} diff --git a/agent/grpc/public/testutils/fsm.go b/agent/grpc/public/testutils/fsm.go new file mode 100644 index 000000000..aea426a4e --- /dev/null +++ b/agent/grpc/public/testutils/fsm.go @@ -0,0 +1,81 @@ +package testutils + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/stretchr/testify/require" +) + +func TestStateStore(t *testing.T, publisher state.EventPublisher) *state.Store { + t.Helper() + + gc, err := state.NewTombstoneGC(time.Second, time.Millisecond) + require.NoError(t, err) + + if publisher == nil { + publisher = stream.NoOpEventPublisher{} + } + + return state.NewStateStoreWithEventPublisher(gc, publisher) +} + +type Registrar func(*FakeFSM, *stream.EventPublisher) + +type FakeFSMConfig struct { + Register Registrar + Refresh []stream.Topic + publisher *stream.EventPublisher +} + +type FakeFSM struct { + config FakeFSMConfig + lock sync.Mutex + store *state.Store +} + +func newFakeFSM(t *testing.T, config FakeFSMConfig) *FakeFSM { + t.Helper() + + store := TestStateStore(t, config.publisher) + + fsm := &FakeFSM{store: store, config: config} + + config.Register(fsm, fsm.config.publisher) + + return fsm +} + +func (f *FakeFSM) GetStore() *state.Store { + f.lock.Lock() + defer f.lock.Unlock() + return f.store +} + +func (f *FakeFSM) ReplaceStore(store *state.Store) { + f.lock.Lock() + defer f.lock.Unlock() + oldStore := f.store + f.store = store + oldStore.Abandon() + for _, topic := range f.config.Refresh { + f.config.publisher.RefreshTopic(topic) + } +} + +func SetupFSMAndPublisher(t *testing.T, config FakeFSMConfig) (*FakeFSM, state.EventPublisher) { + t.Helper() + config.publisher = stream.NewEventPublisher(10 * time.Second) + + fsm := newFakeFSM(t, config) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go config.publisher.Run(ctx) + + return fsm, config.publisher +} diff --git a/agent/grpc/public/testutils/server.go b/agent/grpc/public/testutils/server.go new file mode 100644 index 000000000..53d779d91 --- /dev/null +++ b/agent/grpc/public/testutils/server.go @@ -0,0 +1,30 @@ +package testutils + +import ( + "net" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +type GRPCService interface { + Register(*grpc.Server) +} + +func RunTestServer(t *testing.T, services ...GRPCService) net.Addr { + t.Helper() + + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + grpcServer := grpc.NewServer() + for _, svc := range services { + svc.Register(grpcServer) + } + + go grpcServer.Serve(lis) + t.Cleanup(grpcServer.Stop) + + return lis.Addr() +} diff --git a/proto-public/pbserverdiscovery/serverdiscovery.pb.binary.go b/proto-public/pbserverdiscovery/serverdiscovery.pb.binary.go new file mode 100644 index 000000000..a2e291967 --- /dev/null +++ b/proto-public/pbserverdiscovery/serverdiscovery.pb.binary.go @@ -0,0 +1,38 @@ +// Code generated by protoc-gen-go-binary. DO NOT EDIT. +// source: proto-public/pbserverdiscovery/serverdiscovery.proto + +package pbserverdiscovery + +import ( + "github.com/golang/protobuf/proto" +) + +// MarshalBinary implements encoding.BinaryMarshaler +func (msg *WatchServersRequest) MarshalBinary() ([]byte, error) { + return proto.Marshal(msg) +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler +func (msg *WatchServersRequest) UnmarshalBinary(b []byte) error { + return proto.Unmarshal(b, msg) +} + +// MarshalBinary implements encoding.BinaryMarshaler +func (msg *WatchServersResponse) MarshalBinary() ([]byte, error) { + return proto.Marshal(msg) +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler +func (msg *WatchServersResponse) UnmarshalBinary(b []byte) error { + return proto.Unmarshal(b, msg) +} + +// MarshalBinary implements encoding.BinaryMarshaler +func (msg *Server) MarshalBinary() ([]byte, error) { + return proto.Marshal(msg) +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler +func (msg *Server) UnmarshalBinary(b []byte) error { + return proto.Unmarshal(b, msg) +} diff --git a/proto-public/pbserverdiscovery/serverdiscovery.pb.go b/proto-public/pbserverdiscovery/serverdiscovery.pb.go new file mode 100644 index 000000000..c6638e9fc --- /dev/null +++ b/proto-public/pbserverdiscovery/serverdiscovery.pb.go @@ -0,0 +1,437 @@ +// Package serverdiscovery provides a service on Consul servers to discover the set of servers +// currently able to handle incoming requests. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.23.0 +// protoc v3.15.8 +// source: proto-public/pbserverdiscovery/serverdiscovery.proto + +package pbserverdiscovery + +import ( + context "context" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +type WatchServersRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Wan being set to true will cause WAN addresses to be sent in the response + // instead of the LAN addresses which are the default + Wan bool `protobuf:"varint,1,opt,name=wan,proto3" json:"wan,omitempty"` +} + +func (x *WatchServersRequest) Reset() { + *x = WatchServersRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_public_pbserverdiscovery_serverdiscovery_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WatchServersRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WatchServersRequest) ProtoMessage() {} + +func (x *WatchServersRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_public_pbserverdiscovery_serverdiscovery_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WatchServersRequest.ProtoReflect.Descriptor instead. +func (*WatchServersRequest) Descriptor() ([]byte, []int) { + return file_proto_public_pbserverdiscovery_serverdiscovery_proto_rawDescGZIP(), []int{0} +} + +func (x *WatchServersRequest) GetWan() bool { + if x != nil { + return x.Wan + } + return false +} + +type WatchServersResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Servers is the list of server address information. + Servers []*Server `protobuf:"bytes,1,rep,name=servers,proto3" json:"servers,omitempty"` +} + +func (x *WatchServersResponse) Reset() { + *x = WatchServersResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_public_pbserverdiscovery_serverdiscovery_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WatchServersResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WatchServersResponse) ProtoMessage() {} + +func (x *WatchServersResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_public_pbserverdiscovery_serverdiscovery_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WatchServersResponse.ProtoReflect.Descriptor instead. +func (*WatchServersResponse) Descriptor() ([]byte, []int) { + return file_proto_public_pbserverdiscovery_serverdiscovery_proto_rawDescGZIP(), []int{1} +} + +func (x *WatchServersResponse) GetServers() []*Server { + if x != nil { + return x.Servers + } + return nil +} + +type Server struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // id is the unique string identifying this server for all time. + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // address on the network of the server + Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` + // the consul version of the server + Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"` +} + +func (x *Server) Reset() { + *x = Server{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_public_pbserverdiscovery_serverdiscovery_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Server) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Server) ProtoMessage() {} + +func (x *Server) ProtoReflect() protoreflect.Message { + mi := &file_proto_public_pbserverdiscovery_serverdiscovery_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Server.ProtoReflect.Descriptor instead. +func (*Server) Descriptor() ([]byte, []int) { + return file_proto_public_pbserverdiscovery_serverdiscovery_proto_rawDescGZIP(), []int{2} +} + +func (x *Server) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *Server) GetAddress() string { + if x != nil { + return x.Address + } + return "" +} + +func (x *Server) GetVersion() string { + if x != nil { + return x.Version + } + return "" +} + +var File_proto_public_pbserverdiscovery_serverdiscovery_proto protoreflect.FileDescriptor + +var file_proto_public_pbserverdiscovery_serverdiscovery_proto_rawDesc = []byte{ + 0x0a, 0x34, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2d, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x2f, 0x70, + 0x62, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, + 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x64, 0x69, + 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x22, 0x27, 0x0a, 0x13, 0x57, 0x61, 0x74, 0x63, 0x68, + 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, + 0x0a, 0x03, 0x77, 0x61, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x77, 0x61, 0x6e, + 0x22, 0x49, 0x0a, 0x14, 0x57, 0x61, 0x74, 0x63, 0x68, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x31, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x73, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x53, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x22, 0x4c, 0x0a, 0x06, 0x53, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, + 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x32, 0x79, 0x0a, 0x16, 0x53, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x12, 0x5f, 0x0a, 0x0c, 0x57, 0x61, 0x74, 0x63, 0x68, 0x53, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x73, 0x12, 0x24, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x64, 0x69, 0x73, 0x63, + 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x53, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x73, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x57, 0x61, 0x74, 0x63, + 0x68, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x00, 0x30, 0x01, 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, + 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2d, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, + 0x2f, 0x70, 0x62, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, + 0x72, 0x79, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_public_pbserverdiscovery_serverdiscovery_proto_rawDescOnce sync.Once + file_proto_public_pbserverdiscovery_serverdiscovery_proto_rawDescData = file_proto_public_pbserverdiscovery_serverdiscovery_proto_rawDesc +) + +func file_proto_public_pbserverdiscovery_serverdiscovery_proto_rawDescGZIP() []byte { + file_proto_public_pbserverdiscovery_serverdiscovery_proto_rawDescOnce.Do(func() { + file_proto_public_pbserverdiscovery_serverdiscovery_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_public_pbserverdiscovery_serverdiscovery_proto_rawDescData) + }) + return file_proto_public_pbserverdiscovery_serverdiscovery_proto_rawDescData +} + +var file_proto_public_pbserverdiscovery_serverdiscovery_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_proto_public_pbserverdiscovery_serverdiscovery_proto_goTypes = []interface{}{ + (*WatchServersRequest)(nil), // 0: serverdiscovery.WatchServersRequest + (*WatchServersResponse)(nil), // 1: serverdiscovery.WatchServersResponse + (*Server)(nil), // 2: serverdiscovery.Server +} +var file_proto_public_pbserverdiscovery_serverdiscovery_proto_depIdxs = []int32{ + 2, // 0: serverdiscovery.WatchServersResponse.servers:type_name -> serverdiscovery.Server + 0, // 1: serverdiscovery.ServerDiscoveryService.WatchServers:input_type -> serverdiscovery.WatchServersRequest + 1, // 2: serverdiscovery.ServerDiscoveryService.WatchServers:output_type -> serverdiscovery.WatchServersResponse + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_proto_public_pbserverdiscovery_serverdiscovery_proto_init() } +func file_proto_public_pbserverdiscovery_serverdiscovery_proto_init() { + if File_proto_public_pbserverdiscovery_serverdiscovery_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_public_pbserverdiscovery_serverdiscovery_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*WatchServersRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_public_pbserverdiscovery_serverdiscovery_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*WatchServersResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_public_pbserverdiscovery_serverdiscovery_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Server); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_public_pbserverdiscovery_serverdiscovery_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_public_pbserverdiscovery_serverdiscovery_proto_goTypes, + DependencyIndexes: file_proto_public_pbserverdiscovery_serverdiscovery_proto_depIdxs, + MessageInfos: file_proto_public_pbserverdiscovery_serverdiscovery_proto_msgTypes, + }.Build() + File_proto_public_pbserverdiscovery_serverdiscovery_proto = out.File + file_proto_public_pbserverdiscovery_serverdiscovery_proto_rawDesc = nil + file_proto_public_pbserverdiscovery_serverdiscovery_proto_goTypes = nil + file_proto_public_pbserverdiscovery_serverdiscovery_proto_depIdxs = nil +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConnInterface + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion6 + +// ServerDiscoveryServiceClient is the client API for ServerDiscoveryService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type ServerDiscoveryServiceClient interface { + // WatchServers will stream back sets of ready servers as they change such as + // when new servers are added or older ones removed. A ready server is one that + // should be considered ready for sending general RPC requests towards that would + // catalog queries, xDS proxy configurations and similar services. + WatchServers(ctx context.Context, in *WatchServersRequest, opts ...grpc.CallOption) (ServerDiscoveryService_WatchServersClient, error) +} + +type serverDiscoveryServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewServerDiscoveryServiceClient(cc grpc.ClientConnInterface) ServerDiscoveryServiceClient { + return &serverDiscoveryServiceClient{cc} +} + +func (c *serverDiscoveryServiceClient) WatchServers(ctx context.Context, in *WatchServersRequest, opts ...grpc.CallOption) (ServerDiscoveryService_WatchServersClient, error) { + stream, err := c.cc.NewStream(ctx, &_ServerDiscoveryService_serviceDesc.Streams[0], "/serverdiscovery.ServerDiscoveryService/WatchServers", opts...) + if err != nil { + return nil, err + } + x := &serverDiscoveryServiceWatchServersClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type ServerDiscoveryService_WatchServersClient interface { + Recv() (*WatchServersResponse, error) + grpc.ClientStream +} + +type serverDiscoveryServiceWatchServersClient struct { + grpc.ClientStream +} + +func (x *serverDiscoveryServiceWatchServersClient) Recv() (*WatchServersResponse, error) { + m := new(WatchServersResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// ServerDiscoveryServiceServer is the server API for ServerDiscoveryService service. +type ServerDiscoveryServiceServer interface { + // WatchServers will stream back sets of ready servers as they change such as + // when new servers are added or older ones removed. A ready server is one that + // should be considered ready for sending general RPC requests towards that would + // catalog queries, xDS proxy configurations and similar services. + WatchServers(*WatchServersRequest, ServerDiscoveryService_WatchServersServer) error +} + +// UnimplementedServerDiscoveryServiceServer can be embedded to have forward compatible implementations. +type UnimplementedServerDiscoveryServiceServer struct { +} + +func (*UnimplementedServerDiscoveryServiceServer) WatchServers(*WatchServersRequest, ServerDiscoveryService_WatchServersServer) error { + return status.Errorf(codes.Unimplemented, "method WatchServers not implemented") +} + +func RegisterServerDiscoveryServiceServer(s *grpc.Server, srv ServerDiscoveryServiceServer) { + s.RegisterService(&_ServerDiscoveryService_serviceDesc, srv) +} + +func _ServerDiscoveryService_WatchServers_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(WatchServersRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ServerDiscoveryServiceServer).WatchServers(m, &serverDiscoveryServiceWatchServersServer{stream}) +} + +type ServerDiscoveryService_WatchServersServer interface { + Send(*WatchServersResponse) error + grpc.ServerStream +} + +type serverDiscoveryServiceWatchServersServer struct { + grpc.ServerStream +} + +func (x *serverDiscoveryServiceWatchServersServer) Send(m *WatchServersResponse) error { + return x.ServerStream.SendMsg(m) +} + +var _ServerDiscoveryService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "serverdiscovery.ServerDiscoveryService", + HandlerType: (*ServerDiscoveryServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "WatchServers", + Handler: _ServerDiscoveryService_WatchServers_Handler, + ServerStreams: true, + }, + }, + Metadata: "proto-public/pbserverdiscovery/serverdiscovery.proto", +} diff --git a/proto-public/pbserverdiscovery/serverdiscovery.proto b/proto-public/pbserverdiscovery/serverdiscovery.proto new file mode 100644 index 000000000..203b25903 --- /dev/null +++ b/proto-public/pbserverdiscovery/serverdiscovery.proto @@ -0,0 +1,37 @@ +// Package serverdiscovery provides a service on Consul servers to discover the set of servers +// currently able to handle incoming requests. + +syntax = "proto3"; + +package serverdiscovery; + +option go_package = "github.com/hashicorp/consul/proto-public/pbserverdiscovery"; + +service ServerDiscoveryService { + // WatchServers will stream back sets of ready servers as they change such as + // when new servers are added or older ones removed. A ready server is one that + // should be considered ready for sending general RPC requests towards that would + // catalog queries, xDS proxy configurations and similar services. + rpc WatchServers(WatchServersRequest) returns (stream WatchServersResponse) {}; +} + +message WatchServersRequest { + // Wan being set to true will cause WAN addresses to be sent in the response + // instead of the LAN addresses which are the default + bool wan = 1; +} + +message WatchServersResponse{ + // Servers is the list of server address information. + repeated Server servers = 1; +} + +message Server { + // id is the unique string identifying this server for all time. + string id = 1; + // address on the network of the server + string address = 2; + // the consul version of the server + string version = 3; +} +