diff --git a/agent/grpc-external/services/peerstream/replication.go b/agent/grpc-external/services/peerstream/replication.go index f13f0128c..20bc3b12a 100644 --- a/agent/grpc-external/services/peerstream/replication.go +++ b/agent/grpc-external/services/peerstream/replication.go @@ -11,6 +11,7 @@ import ( "google.golang.org/protobuf/types/known/anypb" "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbpeerstream" @@ -256,9 +257,10 @@ func (s *Server) handleUpsert( } // handleUpdateService handles both deletion and upsert events for a service. -// On an UPSERT event: -// - All nodes, services, checks in the input pbNodes are re-applied through Raft. -// - Any nodes, services, or checks in the catalog that were not in the input pbNodes get deleted. +// +// On an UPSERT event: +// - All nodes, services, checks in the input pbNodes are re-applied through Raft. +// - Any nodes, services, or checks in the catalog that were not in the input pbNodes get deleted. // // On a DELETE event: // - A reconciliation against nil or empty input pbNodes leads to deleting all stored catalog resources @@ -470,6 +472,33 @@ func (s *Server) handleUpsertRoots( return s.Backend.PeeringTrustBundleWrite(req) } +func (s *Server) handleUpsertServerAddrs( + peerName string, + partition string, + addrs *pbpeering.PeeringServerAddresses, +) error { + q := state.Query{ + Value: peerName, + EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(partition), + } + _, existing, err := s.GetStore().PeeringRead(nil, q) + if err != nil { + return fmt.Errorf("failed to read peering: %w", err) + } + if existing == nil || !existing.IsActive() { + return fmt.Errorf("peering does not exist or has been marked for deletion") + } + + // Clone to avoid mutating the existing data + p := proto.Clone(existing).(*pbpeering.Peering) + p.PeerServerAddresses = addrs.GetAddresses() + + req := &pbpeering.PeeringWriteRequest{ + Peering: p, + } + return s.Backend.PeeringWrite(req) +} + func (s *Server) handleDelete( peerName string, partition string, diff --git a/agent/grpc-external/services/peerstream/server.go b/agent/grpc-external/services/peerstream/server.go index 7254c60c7..9565b6b76 100644 --- a/agent/grpc-external/services/peerstream/server.go +++ b/agent/grpc-external/services/peerstream/server.go @@ -104,6 +104,7 @@ type Backend interface { PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error CatalogRegister(req *structs.RegisterRequest) error CatalogDeregister(req *structs.DeregisterRequest) error + PeeringWrite(req *pbpeering.PeeringWriteRequest) error } // StateStore provides a read-only interface for querying Peering data. diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index 1c4706c44..33b88d962 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -126,7 +126,7 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) { // Receive a subscription from a peer. This message arrives while the // server is a leader and should work. - testutil.RunStep(t, "send subscription request to leader and consume its two requests", func(t *testing.T) { + testutil.RunStep(t, "send subscription request to leader and consume its three requests", func(t *testing.T) { sub := &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Open_{ Open: &pbpeerstream.ReplicationMessage_Open{ @@ -145,6 +145,10 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) { msg2, err := client.Recv() require.NoError(t, err) require.NotEmpty(t, msg2) + + msg3, err := client.Recv() + require.NoError(t, err) + require.NotEmpty(t, msg3) }) // The ACK will be a new request but at this point the server is not the @@ -1314,7 +1318,7 @@ func TestStreamResources_Server_KeepsConnectionOpenWithHeartbeat(t *testing.T) { // makeClient sets up a *MockClient with the initial subscription // message handshake. -func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID string) *MockClient { +func makeClient(t *testing.T, srv *testServer, peerID string) *MockClient { t.Helper() client := NewMockClient(context.Background()) @@ -1326,7 +1330,7 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s // Pass errors from server handler into ErrCh so that they can be seen by the client on Recv(). // This matches gRPC's behavior when an error is returned by a server. if err := srv.StreamResources(client.ReplicationStream); err != nil { - errCh <- srv.StreamResources(client.ReplicationStream) + errCh <- err } }() @@ -1345,8 +1349,15 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s require.NoError(t, err) receivedSub2, err := client.Recv() require.NoError(t, err) + receivedSub3, err := client.Recv() + require.NoError(t, err) - // Issue a services and roots subscription pair to server + // This is required when the client subscribes to server address replication messages. + // We assert for the handler to be called at least once but the data doesn't matter. + srv.mockSnapshotHandler.expect("", 0, 0, nil) + + // Issue services, roots, and server address subscription to server. + // Note that server address may not come as an initial message for _, resourceURL := range []string{ pbpeerstream.TypeURLExportedService, pbpeerstream.TypeURLPeeringTrustBundle, @@ -1390,6 +1401,7 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s got := []*pbpeerstream.ReplicationMessage{ receivedSub1, receivedSub2, + receivedSub3, } prototest.AssertElementsMatch(t, expect, got) @@ -1446,6 +1458,10 @@ func (b *testStreamBackend) PeeringSecretsWrite(req *pbpeering.SecretsWriteReque return b.store.PeeringSecretsWrite(1, req) } +func (b *testStreamBackend) PeeringWrite(req *pbpeering.PeeringWriteRequest) error { + return b.store.PeeringWrite(1, req) +} + // CatalogRegister mocks catalog registrations through Raft by copying the logic of FSM.applyRegister. func (b *testStreamBackend) CatalogRegister(req *structs.RegisterRequest) error { return b.store.EnsureRegistration(1, req) @@ -2734,11 +2750,16 @@ func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes) type testServer struct { *Server + + // mockSnapshotHandler is solely used for handling autopilot events + // which don't come from the state store. + mockSnapshotHandler *mockSnapshotHandler } func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state.Store) { + t.Helper() publisher := stream.NewEventPublisher(10 * time.Second) - store := newStateStore(t, publisher) + store, handler := newStateStore(t, publisher) ports := freeport.GetN(t, 1) // {grpc} @@ -2775,7 +2796,8 @@ func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state. t.Cleanup(grpcServer.Stop) return &testServer{ - Server: srv, + Server: srv, + mockSnapshotHandler: handler, }, store }