Handle server addresses update as client
This commit is contained in:
parent
205e873689
commit
98d102326f
|
@ -11,6 +11,7 @@ import (
|
|||
"google.golang.org/protobuf/types/known/anypb"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
"github.com/hashicorp/consul/proto/pbpeerstream"
|
||||
|
@ -256,6 +257,7 @@ func (s *Server) handleUpsert(
|
|||
}
|
||||
|
||||
// handleUpdateService handles both deletion and upsert events for a service.
|
||||
//
|
||||
// On an UPSERT event:
|
||||
// - All nodes, services, checks in the input pbNodes are re-applied through Raft.
|
||||
// - Any nodes, services, or checks in the catalog that were not in the input pbNodes get deleted.
|
||||
|
@ -470,6 +472,33 @@ func (s *Server) handleUpsertRoots(
|
|||
return s.Backend.PeeringTrustBundleWrite(req)
|
||||
}
|
||||
|
||||
func (s *Server) handleUpsertServerAddrs(
|
||||
peerName string,
|
||||
partition string,
|
||||
addrs *pbpeering.PeeringServerAddresses,
|
||||
) error {
|
||||
q := state.Query{
|
||||
Value: peerName,
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(partition),
|
||||
}
|
||||
_, existing, err := s.GetStore().PeeringRead(nil, q)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read peering: %w", err)
|
||||
}
|
||||
if existing == nil || !existing.IsActive() {
|
||||
return fmt.Errorf("peering does not exist or has been marked for deletion")
|
||||
}
|
||||
|
||||
// Clone to avoid mutating the existing data
|
||||
p := proto.Clone(existing).(*pbpeering.Peering)
|
||||
p.PeerServerAddresses = addrs.GetAddresses()
|
||||
|
||||
req := &pbpeering.PeeringWriteRequest{
|
||||
Peering: p,
|
||||
}
|
||||
return s.Backend.PeeringWrite(req)
|
||||
}
|
||||
|
||||
func (s *Server) handleDelete(
|
||||
peerName string,
|
||||
partition string,
|
||||
|
|
|
@ -104,6 +104,7 @@ type Backend interface {
|
|||
PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
|
||||
CatalogRegister(req *structs.RegisterRequest) error
|
||||
CatalogDeregister(req *structs.DeregisterRequest) error
|
||||
PeeringWrite(req *pbpeering.PeeringWriteRequest) error
|
||||
}
|
||||
|
||||
// StateStore provides a read-only interface for querying Peering data.
|
||||
|
|
|
@ -126,7 +126,7 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
|
|||
|
||||
// Receive a subscription from a peer. This message arrives while the
|
||||
// server is a leader and should work.
|
||||
testutil.RunStep(t, "send subscription request to leader and consume its two requests", func(t *testing.T) {
|
||||
testutil.RunStep(t, "send subscription request to leader and consume its three requests", func(t *testing.T) {
|
||||
sub := &pbpeerstream.ReplicationMessage{
|
||||
Payload: &pbpeerstream.ReplicationMessage_Open_{
|
||||
Open: &pbpeerstream.ReplicationMessage_Open{
|
||||
|
@ -145,6 +145,10 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
|
|||
msg2, err := client.Recv()
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, msg2)
|
||||
|
||||
msg3, err := client.Recv()
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, msg3)
|
||||
})
|
||||
|
||||
// The ACK will be a new request but at this point the server is not the
|
||||
|
@ -1314,7 +1318,7 @@ func TestStreamResources_Server_KeepsConnectionOpenWithHeartbeat(t *testing.T) {
|
|||
|
||||
// makeClient sets up a *MockClient with the initial subscription
|
||||
// message handshake.
|
||||
func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID string) *MockClient {
|
||||
func makeClient(t *testing.T, srv *testServer, peerID string) *MockClient {
|
||||
t.Helper()
|
||||
|
||||
client := NewMockClient(context.Background())
|
||||
|
@ -1326,7 +1330,7 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s
|
|||
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
|
||||
// This matches gRPC's behavior when an error is returned by a server.
|
||||
if err := srv.StreamResources(client.ReplicationStream); err != nil {
|
||||
errCh <- srv.StreamResources(client.ReplicationStream)
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -1345,8 +1349,15 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s
|
|||
require.NoError(t, err)
|
||||
receivedSub2, err := client.Recv()
|
||||
require.NoError(t, err)
|
||||
receivedSub3, err := client.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Issue a services and roots subscription pair to server
|
||||
// This is required when the client subscribes to server address replication messages.
|
||||
// We assert for the handler to be called at least once but the data doesn't matter.
|
||||
srv.mockSnapshotHandler.expect("", 0, 0, nil)
|
||||
|
||||
// Issue services, roots, and server address subscription to server.
|
||||
// Note that server address may not come as an initial message
|
||||
for _, resourceURL := range []string{
|
||||
pbpeerstream.TypeURLExportedService,
|
||||
pbpeerstream.TypeURLPeeringTrustBundle,
|
||||
|
@ -1390,6 +1401,7 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s
|
|||
got := []*pbpeerstream.ReplicationMessage{
|
||||
receivedSub1,
|
||||
receivedSub2,
|
||||
receivedSub3,
|
||||
}
|
||||
prototest.AssertElementsMatch(t, expect, got)
|
||||
|
||||
|
@ -1446,6 +1458,10 @@ func (b *testStreamBackend) PeeringSecretsWrite(req *pbpeering.SecretsWriteReque
|
|||
return b.store.PeeringSecretsWrite(1, req)
|
||||
}
|
||||
|
||||
func (b *testStreamBackend) PeeringWrite(req *pbpeering.PeeringWriteRequest) error {
|
||||
return b.store.PeeringWrite(1, req)
|
||||
}
|
||||
|
||||
// CatalogRegister mocks catalog registrations through Raft by copying the logic of FSM.applyRegister.
|
||||
func (b *testStreamBackend) CatalogRegister(req *structs.RegisterRequest) error {
|
||||
return b.store.EnsureRegistration(1, req)
|
||||
|
@ -2734,11 +2750,16 @@ func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes)
|
|||
|
||||
type testServer struct {
|
||||
*Server
|
||||
|
||||
// mockSnapshotHandler is solely used for handling autopilot events
|
||||
// which don't come from the state store.
|
||||
mockSnapshotHandler *mockSnapshotHandler
|
||||
}
|
||||
|
||||
func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state.Store) {
|
||||
t.Helper()
|
||||
publisher := stream.NewEventPublisher(10 * time.Second)
|
||||
store := newStateStore(t, publisher)
|
||||
store, handler := newStateStore(t, publisher)
|
||||
|
||||
ports := freeport.GetN(t, 1) // {grpc}
|
||||
|
||||
|
@ -2776,6 +2797,7 @@ func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state.
|
|||
|
||||
return &testServer{
|
||||
Server: srv,
|
||||
mockSnapshotHandler: handler,
|
||||
}, store
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue