peering: accept replication stream of discovery chain information at the importing side (#13151)

This commit is contained in:
R.B. Boyer 2022-05-19 16:37:52 -05:00 committed by GitHub
parent 68789effeb
commit 63a9175bd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 566 additions and 166 deletions

View File

@ -77,6 +77,7 @@ func (s *HTTPHandlers) PeeringList(resp http.ResponseWriter, req *http.Request)
// PeeringGenerateToken handles POSTs to the /v1/peering/token endpoint. The request
// will always be forwarded via RPC to the local leader.
func (s *HTTPHandlers) PeeringGenerateToken(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// TODO(peering): decode into api type
args := pbpeering.GenerateTokenRequest{
Datacenter: s.agent.config.Datacenter,
}
@ -108,6 +109,7 @@ func (s *HTTPHandlers) PeeringGenerateToken(resp http.ResponseWriter, req *http.
// PeeringInitiate handles POSTs to the /v1/peering/initiate endpoint. The request
// will always be forwarded via RPC to the local leader.
func (s *HTTPHandlers) PeeringInitiate(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// TODO(peering): decode into api type
args := pbpeering.InitiateRequest{
Datacenter: s.agent.config.Datacenter,
}

View File

@ -0,0 +1,82 @@
package peering
import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/types"
)
// healthSnapshot represents a normalized view of a set of CheckServiceNodes
// meant for easy comparison to aid in differential synchronization
type healthSnapshot struct {
Nodes map[types.NodeID]*nodeSnapshot
}
type nodeSnapshot struct {
Node *structs.Node
Services map[structs.ServiceID]*serviceSnapshot
}
type serviceSnapshot struct {
Service *structs.NodeService
Checks map[types.CheckID]*structs.HealthCheck
}
func newHealthSnapshot(all []structs.CheckServiceNode, partition, peerName string) *healthSnapshot {
// For all nodes, services, and checks we override the peer name and
// partition to be the local partition and local name for the peer.
for _, instance := range all {
// For all nodes, services, and checks we override the peer name and partition to be
// the local partition and local name for the peer.
instance.Node.PeerName = peerName
instance.Node.OverridePartition(partition)
instance.Service.PeerName = peerName
instance.Service.OverridePartition(partition)
for _, chk := range instance.Checks {
chk.PeerName = peerName
chk.OverridePartition(partition)
}
}
snap := &healthSnapshot{
Nodes: make(map[types.NodeID]*nodeSnapshot),
}
for _, instance := range all {
if instance.Node.ID == "" {
panic("TODO(peering): data should always have a node ID")
}
nodeSnap, ok := snap.Nodes[instance.Node.ID]
if !ok {
nodeSnap = &nodeSnapshot{
Node: instance.Node,
Services: make(map[structs.ServiceID]*serviceSnapshot),
}
snap.Nodes[instance.Node.ID] = nodeSnap
}
if instance.Service.ID == "" {
panic("TODO(peering): data should always have a service ID")
}
sid := instance.Service.CompoundServiceID()
svcSnap, ok := nodeSnap.Services[sid]
if !ok {
svcSnap = &serviceSnapshot{
Service: instance.Service,
Checks: make(map[types.CheckID]*structs.HealthCheck),
}
nodeSnap.Services[sid] = svcSnap
}
for _, c := range instance.Checks {
if c.CheckID == "" {
panic("TODO(peering): data should always have a check ID")
}
svcSnap.Checks[c.CheckID] = c
}
}
return snap
}

View File

@ -0,0 +1,152 @@
package peering
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/types"
)
func TestHealthSnapshot(t *testing.T) {
type testcase struct {
name string
in []structs.CheckServiceNode
expect *healthSnapshot
}
entMeta := acl.DefaultEnterpriseMeta()
run := func(t *testing.T, tc testcase) {
snap := newHealthSnapshot(tc.in, entMeta.PartitionOrEmpty(), "my-peer")
require.Equal(t, tc.expect, snap)
}
newNode := func(id, name, peerName string) *structs.Node {
return &structs.Node{
ID: types.NodeID(id),
Node: name,
Partition: entMeta.PartitionOrEmpty(),
PeerName: peerName,
}
}
newService := func(id string, port int, peerName string) *structs.NodeService {
return &structs.NodeService{
ID: id,
Service: "xyz",
EnterpriseMeta: *entMeta,
PeerName: peerName,
Port: port,
}
}
newCheck := func(node, svcID, peerName string) *structs.HealthCheck {
return &structs.HealthCheck{
Node: node,
ServiceID: svcID,
ServiceName: "xyz",
CheckID: types.CheckID(svcID + ":check"),
Name: "check",
EnterpriseMeta: *entMeta,
PeerName: peerName,
Status: "passing",
}
}
cases := []testcase{
{
name: "single",
in: []structs.CheckServiceNode{
{
Node: newNode("abc-123", "abc", ""),
Service: newService("xyz-123", 8080, ""),
Checks: structs.HealthChecks{
newCheck("abc", "xyz-123", ""),
},
},
},
expect: &healthSnapshot{
Nodes: map[types.NodeID]*nodeSnapshot{
"abc-123": {
Node: newNode("abc-123", "abc", "my-peer"),
Services: map[structs.ServiceID]*serviceSnapshot{
structs.NewServiceID("xyz-123", nil): {
Service: newService("xyz-123", 8080, "my-peer"),
Checks: map[types.CheckID]*structs.HealthCheck{
"xyz-123:check": newCheck("abc", "xyz-123", "my-peer"),
},
},
},
},
},
},
},
{
name: "multiple",
in: []structs.CheckServiceNode{
{
Node: newNode("abc-123", "abc", ""),
Service: newService("xyz-123", 8080, ""),
Checks: structs.HealthChecks{
newCheck("abc", "xyz-123", ""),
},
},
{
Node: newNode("abc-123", "abc", ""),
Service: newService("xyz-789", 8181, ""),
Checks: structs.HealthChecks{
newCheck("abc", "xyz-789", ""),
},
},
{
Node: newNode("def-456", "def", ""),
Service: newService("xyz-456", 9090, ""),
Checks: structs.HealthChecks{
newCheck("def", "xyz-456", ""),
},
},
},
expect: &healthSnapshot{
Nodes: map[types.NodeID]*nodeSnapshot{
"abc-123": {
Node: newNode("abc-123", "abc", "my-peer"),
Services: map[structs.ServiceID]*serviceSnapshot{
structs.NewServiceID("xyz-123", nil): {
Service: newService("xyz-123", 8080, "my-peer"),
Checks: map[types.CheckID]*structs.HealthCheck{
"xyz-123:check": newCheck("abc", "xyz-123", "my-peer"),
},
},
structs.NewServiceID("xyz-789", nil): {
Service: newService("xyz-789", 8181, "my-peer"),
Checks: map[types.CheckID]*structs.HealthCheck{
"xyz-789:check": newCheck("abc", "xyz-789", "my-peer"),
},
},
},
},
"def-456": {
Node: newNode("def-456", "def", "my-peer"),
Services: map[structs.ServiceID]*serviceSnapshot{
structs.NewServiceID("xyz-456", nil): {
Service: newService("xyz-456", 9090, "my-peer"),
Checks: map[types.CheckID]*structs.HealthCheck{
"xyz-456:check": newCheck("def", "xyz-456", "my-peer"),
},
},
},
},
},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
})
}
}

View File

@ -3,7 +3,6 @@ package peering
import (
"errors"
"fmt"
"strconv"
"strings"
"github.com/golang/protobuf/ptypes"
@ -11,19 +10,34 @@ import (
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbstatus"
"github.com/hashicorp/consul/types"
)
/*
TODO(peering):
At the start of each peering stream establishment (not initiation, but the
thing that reconnects) we need to do a little bit of light differential
snapshot correction to initially synchronize the local state store.
Then if we ever fail to apply a replication message we should either tear
down the entire connection (and thus force a resync on reconnect) or
request a resync operation.
*/
// pushService response handles sending exported service instance updates to the peer cluster.
// Each cache.UpdateEvent will contain all instances for a service name.
// If there are no instances in the event, we consider that to be a de-registration.
func pushServiceResponse(logger hclog.Logger, stream BidirectionalStream, status *lockableStreamStatus, update cache.UpdateEvent) error {
func pushServiceResponse(
logger hclog.Logger,
stream BidirectionalStream,
status *lockableStreamStatus,
update cache.UpdateEvent,
) error {
csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
if !ok {
logger.Error(fmt.Sprintf("invalid type for response: %T, expected *pbservice.IndexedCheckServiceNodes", update.Result))
@ -31,6 +45,7 @@ func pushServiceResponse(logger hclog.Logger, stream BidirectionalStream, status
// Skip this update to avoid locking up peering due to a bad service update.
return nil
}
serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService)
// If no nodes are present then it's due to one of:
@ -88,139 +103,172 @@ func pushServiceResponse(logger hclog.Logger, stream BidirectionalStream, status
}
func (s *Service) processResponse(peerName string, partition string, resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) {
var (
err error
errCode code.Code
errMsg string
)
if resp.ResourceURL != pbpeering.TypeURLService {
errCode = code.Code_INVALID_ARGUMENT
err = fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL)
return makeReply(resp.ResourceURL, resp.Nonce, errCode, err.Error()), err
err := fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL)
return makeReply(
resp.ResourceURL,
resp.Nonce,
code.Code_INVALID_ARGUMENT,
err.Error(),
), err
}
switch resp.Operation {
case pbpeering.ReplicationMessage_Response_UPSERT:
if resp.Resource == nil {
break
err := fmt.Errorf("received upsert response with no content")
return makeReply(
resp.ResourceURL,
resp.Nonce,
code.Code_INVALID_ARGUMENT,
err.Error(),
), err
}
err = s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource)
if err != nil {
errCode = code.Code_INTERNAL
errMsg = err.Error()
if err := s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource); err != nil {
return makeReply(
resp.ResourceURL,
resp.Nonce,
code.Code_INTERNAL,
fmt.Sprintf("upsert error, ResourceURL: %q, ResourceID: %q: %v", resp.ResourceURL, resp.ResourceID, err),
), fmt.Errorf("upsert error: %w", err)
}
return makeReply(resp.ResourceURL, resp.Nonce, code.Code_OK, ""), nil
case pbpeering.ReplicationMessage_Response_DELETE:
err = handleDelete(resp.ResourceURL, resp.ResourceID)
if err != nil {
errCode = code.Code_INTERNAL
errMsg = err.Error()
if err := s.handleDelete(peerName, partition, resp.ResourceURL, resp.ResourceID); err != nil {
return makeReply(
resp.ResourceURL,
resp.Nonce,
code.Code_INTERNAL,
fmt.Sprintf("delete error, ResourceURL: %q, ResourceID: %q: %v", resp.ResourceURL, resp.ResourceID, err),
), fmt.Errorf("delete error: %w", err)
}
return makeReply(resp.ResourceURL, resp.Nonce, code.Code_OK, ""), nil
default:
errCode = code.Code_INVALID_ARGUMENT
op := pbpeering.ReplicationMessage_Response_Operation_name[int32(resp.Operation)]
if op == "" {
op = strconv.FormatInt(int64(resp.Operation), 10)
}
var errMsg string
if op := pbpeering.ReplicationMessage_Response_Operation_name[int32(resp.Operation)]; op != "" {
errMsg = fmt.Sprintf("unsupported operation: %q", op)
err = errors.New(errMsg)
} else {
errMsg = fmt.Sprintf("unsupported operation: %d", resp.Operation)
}
return makeReply(
resp.ResourceURL,
resp.Nonce,
code.Code_INVALID_ARGUMENT,
errMsg,
), errors.New(errMsg)
}
}
return makeReply(resp.ResourceURL, resp.Nonce, errCode, errMsg), err
}
func (s *Service) handleUpsert(
peerName string,
partition string,
resourceURL string,
resourceID string,
resource *anypb.Any,
) error {
switch resourceURL {
case pbpeering.TypeURLService:
sn := structs.ServiceNameFromString(resourceID)
sn.OverridePartition(partition)
func (s *Service) handleUpsert(peerName string, partition string, resourceURL string, resourceID string, resource *anypb.Any) error {
csn := &pbservice.IndexedCheckServiceNodes{}
err := ptypes.UnmarshalAny(resource, csn)
if err != nil {
return fmt.Errorf("failed to unmarshal resource, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
if err := ptypes.UnmarshalAny(resource, csn); err != nil {
return fmt.Errorf("failed to unmarshal resource: %w", err)
}
return s.handleUpsertService(peerName, partition, sn, csn)
default:
return fmt.Errorf("unexpected resourceURL: %s", resourceURL)
}
}
func (s *Service) handleUpsertService(
peerName string,
partition string,
sn structs.ServiceName,
csn *pbservice.IndexedCheckServiceNodes,
) error {
if csn == nil || len(csn.Nodes) == 0 {
return nil
return s.handleDeleteService(peerName, partition, sn)
}
type checkTuple struct {
checkID types.CheckID
serviceID string
nodeID types.NodeID
acl.EnterpriseMeta
}
var (
nodes = make(map[types.NodeID]*structs.Node)
services = make(map[types.NodeID][]*structs.NodeService)
checks = make(map[types.NodeID]map[checkTuple]*structs.HealthCheck)
)
for _, pbinstance := range csn.Nodes {
instance, err := pbservice.CheckServiceNodeToStructs(pbinstance)
// Convert exported data into structs format.
structsNodes := make([]structs.CheckServiceNode, 0, len(csn.Nodes))
for _, pb := range csn.Nodes {
instance, err := pbservice.CheckServiceNodeToStructs(pb)
if err != nil {
return fmt.Errorf("failed to convert instance, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
return fmt.Errorf("failed to convert instance: %w", err)
}
structsNodes = append(structsNodes, *instance)
}
nodes[instance.Node.ID] = instance.Node
services[instance.Node.ID] = append(services[instance.Node.ID], instance.Service)
if _, ok := checks[instance.Node.ID]; !ok {
checks[instance.Node.ID] = make(map[checkTuple]*structs.HealthCheck)
}
for _, c := range instance.Checks {
tuple := checkTuple{
checkID: c.CheckID,
serviceID: c.ServiceID,
nodeID: instance.Node.ID,
EnterpriseMeta: c.EnterpriseMeta,
}
checks[instance.Node.ID][tuple] = c
}
}
for nodeID, node := range nodes {
// For all nodes, services, and checks we override the peer name and partition to be
// the local partition and local name for the peer.
node.PeerName, node.Partition = peerName, partition
// Normalize the data into a convenient form for operation.
snap := newHealthSnapshot(structsNodes, partition, peerName)
for _, nodeSnap := range snap.Nodes {
// First register the node
req := node.ToRegisterRequest()
req := nodeSnap.Node.ToRegisterRequest()
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
return fmt.Errorf("failed to register node: %w", err)
}
// Then register all services on that node
for _, svc := range services[nodeID] {
svc.PeerName = peerName
svc.OverridePartition(partition)
req.Service = svc
for _, svcSnap := range nodeSnap.Services {
req.Service = svcSnap.Service
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
return fmt.Errorf("failed to register service: %w", err)
}
}
req.Service = nil
// Then register all checks on that node
var chks structs.HealthChecks
for _, c := range checks[nodeID] {
c.PeerName = peerName
c.OverridePartition(partition)
for _, svcSnap := range nodeSnap.Services {
for _, c := range svcSnap.Checks {
chks = append(chks, c)
}
}
req.Checks = chks
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
return fmt.Errorf("failed to register check: %w", err)
}
}
// TODO(peering): cleanup and deregister existing data that is now missing safely somehow
return nil
}
func handleDelete(resourceURL string, resourceID string) error {
func (s *Service) handleDelete(
peerName string,
partition string,
resourceURL string,
resourceID string,
) error {
switch resourceURL {
case pbpeering.TypeURLService:
sn := structs.ServiceNameFromString(resourceID)
sn.OverridePartition(partition)
return s.handleDeleteService(peerName, partition, sn)
default:
return fmt.Errorf("unexpected resourceURL: %s", resourceURL)
}
}
func (s *Service) handleDeleteService(
peerName string,
partition string,
sn structs.ServiceName,
) error {
// Deregister: ServiceID == DeleteService ANd checks
// Deregister: ServiceID(empty) CheckID(empty) == DeleteNode
// TODO(peering): implement
return nil
}
@ -234,7 +282,8 @@ func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbp
}
}
msg := &pbpeering.ReplicationMessage{
// TODO: shouldn't this be response?
return &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
ResourceURL: resourceURL,
@ -243,5 +292,4 @@ func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbp
},
},
}
return msg
}

View File

@ -239,16 +239,18 @@ func (s *Service) Initiate(
}
// as soon as a peering is written with a list of ServerAddresses that is
// non-empty, the leader routine will see the peering and attempt to establish
// a connection with the remote peer.
// non-empty, the leader routine will see the peering and attempt to
// establish a connection with the remote peer.
//
// This peer now has a record of both the LocalPeerID(ID) and
// RemotePeerID(PeerID) but at this point the other peer does not.
writeReq := &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: req.PeerName,
PeerCAPems: tok.CA,
PeerServerAddresses: serverAddrs,
PeerServerName: tok.ServerName,
// uncomment once #1613 lands
// PeerID: tok.PeerID,
PeerID: tok.PeerID,
Meta: req.Meta,
},
}
@ -464,8 +466,8 @@ func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResource
// For server peers both of these ID values are the same, because we generated a token with a local ID,
// and the client peer dials using that same ID.
return s.HandleStream(HandleStreamRequest{
LocalID: req.PeerID,
RemoteID: req.PeerID,
LocalID: p.ID,
RemoteID: p.PeerID,
PeerName: p.Name,
Partition: p.Partition,
Stream: stream,

View File

@ -2,17 +2,23 @@ package peering
import (
"context"
"fmt"
"io"
"sort"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/stretchr/testify/require"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
@ -159,17 +165,13 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
}
}()
peering := pbpeering.Peering{
Name: "my-peer",
}
require.NoError(t, store.PeeringWrite(0, &peering))
_, p, err := store.PeeringRead(nil, state.Query{Value: "my-peer"})
require.NoError(t, err)
p := writeInitiatedPeering(t, store, 1, "my-peer")
var (
peerID = p.ID // for Send
remotePeerID = p.PeerID // for Recv
)
// Receive a subscription from a peer
peerID := p.ID
sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
@ -178,7 +180,7 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
},
},
}
err = client.Send(sub)
err := client.Send(sub)
require.NoError(t, err)
testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) {
@ -197,7 +199,7 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
ResourceURL: pbpeering.TypeURLService,
PeerID: peerID,
PeerID: remotePeerID,
},
},
}
@ -244,16 +246,13 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
errCh <- srv.StreamResources(client.ReplicationStream)
}()
peering := pbpeering.Peering{
Name: "my-peer",
}
require.NoError(t, store.PeeringWrite(0, &peering))
_, p, err := store.PeeringRead(nil, state.Query{Value: "my-peer"})
require.NoError(t, err)
peerID := p.ID
p := writeInitiatedPeering(t, store, 1, "my-peer")
var (
peerID = p.ID // for Send
remotePeerID = p.PeerID // for Recv
)
// Receive a subscription from a peer
sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
@ -262,7 +261,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
},
},
}
err = client.Send(sub)
err := client.Send(sub)
require.NoError(t, err)
testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) {
@ -281,7 +280,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
ResourceURL: pbpeering.TypeURLService,
PeerID: peerID,
PeerID: remotePeerID,
Nonce: "",
},
},
@ -370,6 +369,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ResourceID: "api",
Nonce: "21",
Operation: pbpeering.ReplicationMessage_Response_UPSERT,
Resource: makeAnyPB(t, &pbservice.IndexedCheckServiceNodes{}),
},
},
}
@ -506,14 +506,11 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
// Create a peering
var lastIdx uint64 = 1
err := store.PeeringWrite(lastIdx, &pbpeering.Peering{
Name: "my-peering",
})
require.NoError(t, err)
_, p, err := store.PeeringRead(nil, state.Query{Value: "my-peering"})
require.NoError(t, err)
require.NotNil(t, p)
p := writeInitiatedPeering(t, store, lastIdx, "my-peering")
var (
peerID = p.ID // for Send
remotePeerID = p.PeerID // for Recv
)
srv := NewService(testutil.Logger(t), &testStreamBackend{
store: store,
@ -537,7 +534,7 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
init := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
PeerID: p.ID,
PeerID: peerID,
ResourceURL: pbpeering.TypeURLService,
},
},
@ -552,7 +549,7 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
ResourceURL: pbpeering.TypeURLService,
PeerID: p.ID,
PeerID: remotePeerID,
},
},
}
@ -570,6 +567,13 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service))
var (
mongoSN = structs.NewServiceName("mongo", nil).String()
mongoProxySN = structs.NewServiceName("mongo-sidecar-proxy", nil).String()
mysqlSN = structs.NewServiceName("mysql", nil).String()
mysqlProxySN = structs.NewServiceName("mysql-sidecar-proxy", nil).String()
)
testutil.RunStep(t, "exporting mysql leads to an UPSERT event", func(t *testing.T) {
entry := &structs.ExportedServicesConfigEntry{
Name: "default",
@ -577,36 +581,50 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
{
Name: "mysql",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peering",
},
{PeerName: "my-peering"},
},
},
{
// Mongo does not get pushed because it does not have instances registered.
Name: "mongo",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peering",
},
{PeerName: "my-peering"},
},
},
},
}
lastIdx++
err = store.EnsureConfigEntry(lastIdx, entry)
require.NoError(t, err)
require.NoError(t, store.EnsureConfigEntry(lastIdx, entry))
retry.Run(t, func(r *retry.R) {
msg, err := client.RecvWithTimeout(100 * time.Millisecond)
require.NoError(r, err)
require.Equal(r, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation)
require.Equal(r, mysql.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID)
expectReplEvents(t, client,
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoSN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource)
},
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource)
},
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlSN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation)
var nodes pbservice.IndexedCheckServiceNodes
require.NoError(r, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
require.Len(r, nodes.Nodes, 1)
})
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
require.Len(t, nodes.Nodes, 1)
},
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource)
},
)
})
mongo := &structs.CheckServiceNode{
@ -753,6 +771,7 @@ func Test_processResponse_Validation(t *testing.T) {
ResourceID: "api",
Nonce: "1",
Operation: pbpeering.ReplicationMessage_Response_UPSERT,
Resource: makeAnyPB(t, &pbservice.IndexedCheckServiceNodes{}),
},
expect: &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
@ -838,7 +857,7 @@ func Test_processResponse_Validation(t *testing.T) {
Nonce: "1",
Error: &pbstatus.Status{
Code: int32(code.Code_INVALID_ARGUMENT),
Message: `unsupported operation: "100000"`,
Message: `unsupported operation: 100000`,
},
},
},
@ -852,3 +871,100 @@ func Test_processResponse_Validation(t *testing.T) {
})
}
}
// writeInitiatedPeering creates a peering with the provided name and ensures
// the PeerID field is set for the ID of the remote peer.
func writeInitiatedPeering(t *testing.T, store *state.Store, idx uint64, peerName string) *pbpeering.Peering {
remotePeerID, err := uuid.GenerateUUID()
require.NoError(t, err)
peering := pbpeering.Peering{
Name: peerName,
PeerID: remotePeerID,
}
require.NoError(t, store.PeeringWrite(idx, &peering))
_, p, err := store.PeeringRead(nil, state.Query{Value: peerName})
require.NoError(t, err)
return p
}
func makeAnyPB(t *testing.T, pb proto.Message) *any.Any {
any, err := ptypes.MarshalAny(pb)
require.NoError(t, err)
return any
}
func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *testing.T, got *pbpeering.ReplicationMessage)) {
t.Helper()
num := len(checkFns)
if num == 0 {
// No updates should be received.
msg, err := client.RecvWithTimeout(100 * time.Millisecond)
if err == io.EOF && msg == nil {
return
} else if err != nil {
t.Fatalf("received unexpected update error: %v", err)
} else {
t.Fatalf("received unexpected update: %+v", msg)
}
}
var out []*pbpeering.ReplicationMessage
for len(out) < num {
msg, err := client.RecvWithTimeout(100 * time.Millisecond)
if err == io.EOF && msg == nil {
t.Fatalf("timed out with %d of %d events", len(out), num)
}
require.NoError(t, err)
out = append(out, msg)
}
if msg, err := client.RecvWithTimeout(100 * time.Millisecond); err != io.EOF || msg != nil {
t.Fatalf("expected only %d events but got more; prev %+v; next %+v", num, out, msg)
}
require.Len(t, out, num)
sort.SliceStable(out, func(i, j int) bool {
a, b := out[i], out[j]
typeA := fmt.Sprintf("%T", a.GetPayload())
typeB := fmt.Sprintf("%T", b.GetPayload())
if typeA != typeB {
return typeA < typeB
}
switch a.GetPayload().(type) {
case *pbpeering.ReplicationMessage_Request_:
reqA, reqB := a.GetRequest(), b.GetRequest()
if reqA.ResourceURL != reqB.ResourceURL {
return reqA.ResourceURL < reqB.ResourceURL
}
return reqA.Nonce < reqB.Nonce
case *pbpeering.ReplicationMessage_Response_:
respA, respB := a.GetResponse(), b.GetResponse()
if respA.ResourceURL != respB.ResourceURL {
return respA.ResourceURL < respB.ResourceURL
}
if respA.ResourceID != respB.ResourceID {
return respA.ResourceID < respB.ResourceID
}
return respA.Nonce < respB.Nonce
case *pbpeering.ReplicationMessage_Terminated_:
return false
default:
panic("unknown type")
}
})
for i := 0; i < num; i++ {
checkFns[i](t, out[i])
}
}

View File

@ -58,8 +58,7 @@ func TestPeering(peerName string, state pbpeering.PeeringState, meta map[string]
PeerServerAddresses: []string{validAddress},
PeerServerName: validServerName,
State: state,
// uncomment once #1613 lands
// PeerID: validPeerID
PeerID: validPeerID,
Meta: meta,
}
}

View File

@ -3,13 +3,11 @@ package peering
import (
"fmt"
"net"
"net/netip"
"strconv"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
// TODO: replace this with net/netip when we upgrade to go1.18
"inet.af/netaddr"
)
// validatePeeringToken ensures that the token has valid values.
@ -39,7 +37,7 @@ func validatePeeringToken(tok *structs.PeeringToken) error {
if port < 1 || port > 65535 {
return &errPeeringInvalidServerAddress{addr}
}
if _, err := netaddr.ParseIP(host); err != nil {
if _, err := netip.ParseAddr(host); err != nil {
return &errPeeringInvalidServerAddress{addr}
}
}

View File

@ -57,6 +57,10 @@ func NodeEnterpriseMetaInDefaultPartition() *acl.EnterpriseMeta {
// FillAuthzContext stub
func (_ *Node) FillAuthzContext(_ *acl.AuthorizerContext) {}
func (n *Node) OverridePartition(_ string) {
n.Partition = ""
}
func (_ *Coordinate) FillAuthzContext(_ *acl.AuthorizerContext) {}
func (_ *NodeInfo) FillAuthzContext(_ *acl.AuthorizerContext) {}

3
go.mod
View File

@ -85,7 +85,6 @@ require (
google.golang.org/protobuf v1.25.0
gopkg.in/square/go-jose.v2 v2.5.1
gotest.tools/v3 v3.0.3
inet.af/netaddr v0.0.0-20211027220019-c74959edd3b6
k8s.io/api v0.18.2
k8s.io/apimachinery v0.18.2
k8s.io/client-go v0.18.2
@ -174,8 +173,6 @@ require (
github.com/vmware/govmomi v0.18.0 // indirect
go.opencensus.io v0.22.3 // indirect
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20211027215541-db492cf91b37 // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
golang.org/x/text v0.3.6 // indirect

7
go.sum
View File

@ -159,7 +159,6 @@ github.com/docker/go-connections v0.3.0 h1:3lOnM9cSzgGwx8VfK/NGOW5fLQ0GjIlCkaktF
github.com/docker/go-connections v0.3.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dvyukov/go-fuzz v0.0.0-20210103155950-6a8e9d1f2415/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw=
github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y=
github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
@ -634,10 +633,6 @@ go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 h1:UXLjNohABv4S58tHmeuIZDO6e3mHpW2Dx33gaNt03LE=
go4.org/intern v0.0.0-20211027215823-ae77deb06f29/go.mod h1:cS2ma+47FKrLPdXFpr7CuxiTW3eyJbWew4qx0qtQWDA=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20211027215541-db492cf91b37 h1:Tx9kY6yUkLge/pFG7IEMwDZy6CS2ajFc9TvQdPCW0uA=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20211027215541-db492cf91b37/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@ -977,8 +972,6 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
inet.af/netaddr v0.0.0-20211027220019-c74959edd3b6 h1:acCzuUSQ79tGsM/O50VRFySfMm19IoMKL+sZztZkCxw=
inet.af/netaddr v0.0.0-20211027220019-c74959edd3b6/go.mod h1:y3MGhcFMlh0KZPMuXXow8mpjxxAk3yoDNsp4cQz54i8=
k8s.io/api v0.18.2 h1:wG5g5ZmSVgm5B+eHMIbI9EGATS2L8Z72rda19RIEgY8=
k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78=
k8s.io/apimachinery v0.18.2 h1:44CmtbmkzVDAhCpRVSiP2R5PPrC2RtlIv/MoB8xpdRA=

View File

@ -79,3 +79,7 @@ func (msg *InitiateRequest) Timeout(rpcHoldTimeout time.Duration, maxQueryTime t
func (p *Peering) ShouldDial() bool {
return len(p.PeerServerAddresses) > 0 && p.State != PeeringState_TERMINATED
}
func (x ReplicationMessage_Response_Operation) GoString() string {
return x.String()
}

View File

@ -10,6 +10,9 @@ type CheckIDType = types.CheckID
type NodeIDType = types.NodeID
func RaftIndexToStructs(s *pbcommon.RaftIndex) structs.RaftIndex {
if s == nil {
return structs.RaftIndex{}
}
return structs.RaftIndex{
CreateIndex: s.CreateIndex,
ModifyIndex: s.ModifyIndex,