peering: replicate expected SNI, SPIFFE, and service protocol to peers (#13218)

The importing peer will need to know what SNI and SPIFFE name
corresponds to each exported service. Additionally it will need to know
at a high level the protocol in use (L4/L7) to generate the appropriate
connection pool and local metrics.

For replicated connect synthetic entities we edit the `Connect{}` part
of a `NodeService` to have a new section:

    {
      "PeerMeta": {
        "SNI": [
          "web.default.default.owt.external.183150d5-1033-3672-c426-c29205a576b8.consul"
        ],
        "SpiffeID": [
          "spiffe://183150d5-1033-3672-c426-c29205a576b8.consul/ns/default/dc/dc1/svc/web"
        ],
        "Protocol": "tcp"
      }
    }

This data is then replicated and saved as-is at the importing side. Both
SNI and SpiffeID are slices for now until I can be sure we don't need
them for how mesh gateways will ultimately work.
This commit is contained in:
R.B. Boyer 2022-05-25 12:37:44 -05:00 committed by GitHub
parent a4fb0b6ece
commit bc10055edc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1014 additions and 443 deletions

View File

@ -11,6 +11,7 @@ const (
internal = "internal" internal = "internal"
version = "v1" version = "v1"
internalVersion = internal + "-" + version internalVersion = internal + "-" + version
external = "external"
) )
func UpstreamSNI(u *structs.Upstream, subset string, dc string, trustDomain string) string { func UpstreamSNI(u *structs.Upstream, subset string, dc string, trustDomain string) string {
@ -64,6 +65,21 @@ func ServiceSNI(service string, subset string, namespace string, partition strin
} }
} }
func PeeredServiceSNI(service, namespace, partition, peerName, trustDomain string) string {
if peerName == "" {
panic("peer name is a requirement for this function and does not make sense without it")
}
if namespace == "" {
namespace = structs.IntentionDefaultNamespace
}
if partition == "" {
// TODO(partitions) Make default available in OSS as a constant for uses like this one
partition = "default"
}
return dotJoin(service, namespace, partition, peerName, external, trustDomain)
}
func dotJoin(parts ...string) string { func dotJoin(parts ...string) string {
return strings.Join(parts, ".") return strings.Join(parts, ".")
} }

View File

@ -3,8 +3,9 @@ package connect
import ( import (
"testing" "testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
) )
const ( const (
@ -164,6 +165,11 @@ func TestServiceSNI(t *testing.T) {
ServiceSNI("api", "canary", "neighbor", "part1", "foo", testTrustDomain2)) ServiceSNI("api", "canary", "neighbor", "part1", "foo", testTrustDomain2))
} }
func TestPeeredServiceSNI(t *testing.T) {
require.Equal(t, "api.billing.default.webstuff.external."+testTrustDomainSuffix1,
PeeredServiceSNI("api", "billing", "", "webstuff", testTrustDomainSuffix1))
}
func TestQuerySNI(t *testing.T) { func TestQuerySNI(t *testing.T) {
require.Equal(t, "magicquery.default.foo.query."+testTrustDomain1, require.Equal(t, "magicquery.default.foo.query."+testTrustDomain1,
QuerySNI("magicquery", "foo", testTrustDomain1)) QuerySNI("magicquery", "foo", testTrustDomain1))

View File

@ -23,6 +23,7 @@ import (
// //
// NOTE: this is duplicated in the api package as testClusterID // NOTE: this is duplicated in the api package as testClusterID
const TestClusterID = "11111111-2222-3333-4444-555555555555" const TestClusterID = "11111111-2222-3333-4444-555555555555"
const TestTrustDomain = TestClusterID + ".consul"
// testCACounter is just an atomically incremented counter for creating // testCACounter is just an atomically incremented counter for creating
// unique names for the CA certs. // unique names for the CA certs.

View File

@ -760,6 +760,10 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve
func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler { func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler {
p := peering.NewService( p := peering.NewService(
deps.Logger.Named("grpc-api.peering"), deps.Logger.Named("grpc-api.peering"),
peering.Config{
Datacenter: config.Datacenter,
ConnectEnabled: config.ConnectEnabled,
},
NewPeeringBackend(s, deps.GRPCConnPool), NewPeeringBackend(s, deps.GRPCConnPool),
) )
s.peeringService = p s.peeringService = p

View File

@ -398,10 +398,42 @@ func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peerin
structs.ServiceList(normal).Sort() structs.ServiceList(normal).Sort()
structs.ServiceList(disco).Sort() structs.ServiceList(disco).Sort()
return maxIdx, &structs.ExportedServiceList{ serviceProtocols := make(map[structs.ServiceName]string)
Services: normal, populateProtocol := func(svc structs.ServiceName) error {
DiscoChains: disco, if _, ok := serviceProtocols[svc]; ok {
}, nil return nil // already processed
}
idx, protocol, err := protocolForService(tx, ws, svc)
if err != nil {
return fmt.Errorf("failed to get protocol for service: %w", err)
}
if idx > maxIdx {
maxIdx = idx
}
serviceProtocols[svc] = protocol
return nil
}
for _, svc := range normal {
if err := populateProtocol(svc); err != nil {
return 0, nil, err
}
}
for _, svc := range disco {
if err := populateProtocol(svc); err != nil {
return 0, nil, err
}
}
list := &structs.ExportedServiceList{
Services: normal,
DiscoChains: disco,
ConnectProtocol: serviceProtocols,
}
return maxIdx, list, nil
} }
// PeeringsForService returns the list of peerings that are associated with the service name provided in the query. // PeeringsForService returns the list of peerings that are associated with the service name provided in the query.

View File

@ -644,6 +644,10 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
defaultEntMeta := structs.DefaultEnterpriseMetaInDefaultPartition() defaultEntMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
newSN := func(name string) structs.ServiceName {
return structs.NewServiceName(name, defaultEntMeta)
}
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
ensureConfigEntry := func(t *testing.T, entry structs.ConfigEntry) { ensureConfigEntry := func(t *testing.T, entry structs.ConfigEntry) {
@ -704,6 +708,10 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
EnterpriseMeta: *defaultEntMeta, EnterpriseMeta: *defaultEntMeta,
}, },
}, },
ConnectProtocol: map[structs.ServiceName]string{
newSN("mysql"): "tcp",
newSN("redis"): "tcp",
},
} }
idx, got, err := s.ExportedServicesForPeer(ws, id) idx, got, err := s.ExportedServicesForPeer(ws, id)
@ -746,6 +754,9 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
EnterpriseMeta: *defaultEntMeta, EnterpriseMeta: *defaultEntMeta,
}, },
}, },
ConnectProtocol: map[structs.ServiceName]string{
newSN("billing"): "tcp",
},
} }
idx, got, err := s.ExportedServicesForPeer(ws, id) idx, got, err := s.ExportedServicesForPeer(ws, id)
require.NoError(t, err) require.NoError(t, err)
@ -826,6 +837,13 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
EnterpriseMeta: *defaultEntMeta, EnterpriseMeta: *defaultEntMeta,
}, },
}, },
ConnectProtocol: map[structs.ServiceName]string{
newSN("billing"): "http",
newSN("payments"): "http",
newSN("resolver"): "http",
newSN("router"): "http",
newSN("splitter"): "http",
},
} }
idx, got, err := s.ExportedServicesForPeer(ws, id) idx, got, err := s.ExportedServicesForPeer(ws, id)
require.NoError(t, err) require.NoError(t, err)
@ -861,6 +879,11 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
EnterpriseMeta: *defaultEntMeta, EnterpriseMeta: *defaultEntMeta,
}, },
}, },
ConnectProtocol: map[structs.ServiceName]string{
newSN("payments"): "http",
newSN("resolver"): "http",
newSN("router"): "http",
},
} }
idx, got, err := s.ExportedServicesForPeer(ws, id) idx, got, err := s.ExportedServicesForPeer(ws, id)
require.NoError(t, err) require.NoError(t, err)

View File

@ -19,6 +19,7 @@ import (
grpcstatus "google.golang.org/grpc/status" grpcstatus "google.golang.org/grpc/status"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/dns" "github.com/hashicorp/consul/agent/dns"
@ -44,25 +45,30 @@ func (e *errPeeringInvalidServerAddress) Error() string {
return fmt.Sprintf("%s is not a valid peering server address", e.addr) return fmt.Sprintf("%s is not a valid peering server address", e.addr)
} }
type Config struct {
Datacenter string
ConnectEnabled bool
// TODO(peering): remove this when we're ready
DisableMeshGatewayMode bool
}
// Service implements pbpeering.PeeringService to provide RPC operations for // Service implements pbpeering.PeeringService to provide RPC operations for
// managing peering relationships. // managing peering relationships.
type Service struct { type Service struct {
Backend Backend Backend Backend
logger hclog.Logger logger hclog.Logger
config Config
streams *streamTracker streams *streamTracker
// TODO(peering): remove this when we're ready
DisableMeshGatewayMode bool
} }
func NewService(logger hclog.Logger, backend Backend) *Service { func NewService(logger hclog.Logger, cfg Config, backend Backend) *Service {
srv := &Service{ cfg.DisableMeshGatewayMode = true
return &Service{
Backend: backend, Backend: backend,
logger: logger, logger: logger,
config: cfg,
streams: newStreamTracker(), streams: newStreamTracker(),
} }
srv.DisableMeshGatewayMode = true
return srv
} }
var _ pbpeering.PeeringServiceServer = (*Service)(nil) var _ pbpeering.PeeringServiceServer = (*Service)(nil)
@ -112,6 +118,7 @@ type Store interface {
ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error)
PeeringsForService(ws memdb.WatchSet, serviceName string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) PeeringsForService(ws memdb.WatchSet, serviceName string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
CAConfig(memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
AbandonCh() <-chan struct{} AbandonCh() <-chan struct{}
} }
@ -521,9 +528,24 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
// TODO(peering) Also need to clear subscriptions associated with the peer // TODO(peering) Also need to clear subscriptions associated with the peer
defer s.streams.disconnected(req.LocalID) defer s.streams.disconnected(req.LocalID)
mgr := newSubscriptionManager(req.Stream.Context(), logger, s.Backend) var trustDomain string
mgr.DisableMeshGatewayMode = s.DisableMeshGatewayMode if s.config.ConnectEnabled {
subCh := mgr.subscribe(req.Stream.Context(), req.LocalID, req.Partition) // Read the TrustDomain up front - we do not allow users to change the ClusterID
// so reading it once at the beginning of the stream is sufficient.
trustDomain, err = getTrustDomain(s.Backend.Store(), logger)
if err != nil {
return err
}
}
mgr := newSubscriptionManager(
req.Stream.Context(),
logger,
s.config,
trustDomain,
s.Backend,
)
subCh := mgr.subscribe(req.Stream.Context(), req.LocalID, req.PeerName, req.Partition)
sub := &pbpeering.ReplicationMessage{ sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{ Payload: &pbpeering.ReplicationMessage_Request_{
@ -674,6 +696,19 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
} }
} }
func getTrustDomain(store Store, logger hclog.Logger) (string, error) {
_, cfg, err := store.CAConfig(nil)
switch {
case err != nil:
logger.Error("failed to read Connect CA Config", "error", err)
return "", grpcstatus.Error(codes.Internal, "failed to read Connect CA Config")
case cfg == nil:
logger.Warn("cannot begin stream because Connect CA is not yet initialized")
return "", grpcstatus.Error(codes.FailedPrecondition, "Connect CA is not yet initialized")
}
return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil
}
func (s *Service) StreamStatus(peer string) (resp StreamStatus, found bool) { func (s *Service) StreamStatus(peer string) (resp StreamStatus, found bool) {
return s.streams.streamStatus(peer) return s.streams.streamStatus(peer)
} }

View File

@ -679,8 +679,16 @@ func Test_StreamHandler_UpsertServices(t *testing.T) {
s := newTestServer(t, nil) s := newTestServer(t, nil)
testrpc.WaitForLeader(t, s.Server.RPC, "dc1") testrpc.WaitForLeader(t, s.Server.RPC, "dc1")
testrpc.WaitForActiveCARoot(t, s.Server.RPC, "dc1", nil)
srv := peering.NewService(testutil.Logger(t), consul.NewPeeringBackend(s.Server, nil)) srv := peering.NewService(
testutil.Logger(t),
peering.Config{
Datacenter: "dc1",
ConnectEnabled: true,
},
consul.NewPeeringBackend(s.Server, nil),
)
require.NoError(t, s.Server.FSM().State().PeeringWrite(0, &pbpeering.Peering{ require.NoError(t, s.Server.FSM().State().PeeringWrite(0, &pbpeering.Peering{
Name: "my-peer", Name: "my-peer",
@ -1038,6 +1046,9 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer {
conf.SerfWANConfig.MemberlistConfig.BindPort = ports[2] conf.SerfWANConfig.MemberlistConfig.BindPort = ports[2]
conf.SerfWANConfig.MemberlistConfig.AdvertisePort = ports[2] conf.SerfWANConfig.MemberlistConfig.AdvertisePort = ports[2]
conf.PrimaryDatacenter = "dc1"
conf.ConnectEnabled = true
nodeID, err := uuid.GenerateUUID() nodeID, err := uuid.GenerateUUID()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -19,6 +19,7 @@ import (
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -34,13 +35,19 @@ func TestStreamResources_Server_Follower(t *testing.T) {
publisher := stream.NewEventPublisher(10 * time.Second) publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher) store := newStateStore(t, publisher)
srv := NewService(testutil.Logger(t), &testStreamBackend{ srv := NewService(
store: store, testutil.Logger(t),
pub: publisher, Config{
leader: func() bool { Datacenter: "dc1",
return false ConnectEnabled: true,
}, },
}) &testStreamBackend{
store: store,
pub: publisher,
leader: func() bool {
return false
},
})
client := NewMockClient(context.Background()) client := NewMockClient(context.Background())
@ -76,11 +83,18 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
} }
return false return false
} }
srv := NewService(testutil.Logger(t), &testStreamBackend{
store: store, srv := NewService(
pub: publisher, testutil.Logger(t),
leader: leaderFunc, Config{
}) Datacenter: "dc1",
ConnectEnabled: true,
},
&testStreamBackend{
store: store,
pub: publisher,
leader: leaderFunc,
})
client := NewMockClient(context.Background()) client := NewMockClient(context.Background())
@ -97,6 +111,9 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
p := writeInitiatedPeering(t, store, 1, "my-peer") p := writeInitiatedPeering(t, store, 1, "my-peer")
peerID := p.ID peerID := p.ID
// Set the initial roots and CA configuration.
_ = writeInitialRootsAndCA(t, store)
// Receive a subscription from a peer // Receive a subscription from a peer
sub := &pbpeering.ReplicationMessage{ sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{ Payload: &pbpeering.ReplicationMessage_Request_{
@ -142,10 +159,15 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) {
publisher := stream.NewEventPublisher(10 * time.Second) publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher) store := newStateStore(t, publisher)
srv := NewService(testutil.Logger(t), &testStreamBackend{ srv := NewService(
store: store, testutil.Logger(t),
pub: publisher, Config{
}) Datacenter: "dc1",
ConnectEnabled: true,
}, &testStreamBackend{
store: store,
pub: publisher,
})
client := NewMockClient(context.Background()) client := NewMockClient(context.Background())
@ -243,10 +265,15 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
publisher := stream.NewEventPublisher(10 * time.Second) publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher) store := newStateStore(t, publisher)
srv := NewService(testutil.Logger(t), &testStreamBackend{ srv := NewService(
store: store, testutil.Logger(t),
pub: publisher, Config{
}) Datacenter: "dc1",
ConnectEnabled: true,
}, &testStreamBackend{
store: store,
pub: publisher,
})
it := incrementalTime{ it := incrementalTime{
base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC), base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC),
@ -272,6 +299,9 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
remotePeerID = p.PeerID // for Recv remotePeerID = p.PeerID // for Recv
) )
// Set the initial roots and CA configuration.
_ = writeInitialRootsAndCA(t, store)
// Receive a subscription from a peer // Receive a subscription from a peer
sub := &pbpeering.ReplicationMessage{ sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{ Payload: &pbpeering.ReplicationMessage_Request_{
@ -330,10 +360,15 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
publisher := stream.NewEventPublisher(10 * time.Second) publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher) store := newStateStore(t, publisher)
srv := NewService(testutil.Logger(t), &testStreamBackend{ srv := NewService(
store: store, testutil.Logger(t),
pub: publisher, Config{
}) Datacenter: "dc1",
ConnectEnabled: true,
}, &testStreamBackend{
store: store,
pub: publisher,
})
it := incrementalTime{ it := incrementalTime{
base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC), base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC),
@ -353,6 +388,9 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
remotePeerID = p.PeerID // for Recv remotePeerID = p.PeerID // for Recv
) )
// Set the initial roots and CA configuration.
_ = writeInitialRootsAndCA(t, store)
// Receive a subscription from a peer // Receive a subscription from a peer
sub := &pbpeering.ReplicationMessage{ sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{ Payload: &pbpeering.ReplicationMessage_Request_{
@ -602,200 +640,12 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
} }
func TestStreamResources_Server_ServiceUpdates(t *testing.T) { func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
publisher := stream.NewEventPublisher(10 * time.Second) testStreamResources_Server_ServiceUpdates(t, true)
store := newStateStore(t, publisher)
// Create a peering
var lastIdx uint64 = 1
p := writeInitiatedPeering(t, store, lastIdx, "my-peering")
var (
peerID = p.ID // for Send
remotePeerID = p.PeerID // for Recv
)
srv := NewService(testutil.Logger(t), &testStreamBackend{
store: store,
pub: publisher,
})
srv.DisableMeshGatewayMode = false
client := NewMockClient(context.Background())
errCh := make(chan error, 1)
client.ErrCh = errCh
go func() {
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
// This matches gRPC's behavior when an error is returned by a server.
if err := srv.StreamResources(client.ReplicationStream); err != nil {
errCh <- err
}
}()
// Issue a services subscription to server
init := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
PeerID: peerID,
ResourceURL: pbpeering.TypeURLService,
},
},
}
require.NoError(t, client.Send(init))
// Receive a services subscription from server
receivedSub, err := client.Recv()
require.NoError(t, err)
expect := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
ResourceURL: pbpeering.TypeURLService,
PeerID: remotePeerID,
},
},
}
prototest.AssertDeepEqual(t, expect, receivedSub)
// Register a service that is not yet exported
mysql := &structs.CheckServiceNode{
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000},
}
lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, mysql.Node))
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service))
var (
mongoSN = structs.NewServiceName("mongo", nil).String()
mongoProxySN = structs.NewServiceName("mongo-sidecar-proxy", nil).String()
mysqlSN = structs.NewServiceName("mysql", nil).String()
mysqlProxySN = structs.NewServiceName("mysql-sidecar-proxy", nil).String()
)
testutil.RunStep(t, "exporting mysql leads to an UPSERT event", func(t *testing.T) {
entry := &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "mysql",
Consumers: []structs.ServiceConsumer{
{PeerName: "my-peering"},
},
},
{
// Mongo does not get pushed because it does not have instances registered.
Name: "mongo",
Consumers: []structs.ServiceConsumer{
{PeerName: "my-peering"},
},
},
},
}
lastIdx++
require.NoError(t, store.EnsureConfigEntry(lastIdx, entry))
expectReplEvents(t, client,
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoSN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource)
},
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource)
},
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlSN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation)
var nodes pbservice.IndexedCheckServiceNodes
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
require.Len(t, nodes.Nodes, 1)
},
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource)
},
)
})
mongo := &structs.CheckServiceNode{
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000},
}
testutil.RunStep(t, "registering mongo instance leads to an UPSERT event", func(t *testing.T) {
lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, mongo.Node))
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "zip", mongo.Service))
retry.Run(t, func(r *retry.R) {
msg, err := client.RecvWithTimeout(100 * time.Millisecond)
require.NoError(r, err)
require.Equal(r, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation)
require.Equal(r, mongo.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID)
var nodes pbservice.IndexedCheckServiceNodes
require.NoError(r, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
require.Len(r, nodes.Nodes, 1)
})
})
testutil.RunStep(t, "un-exporting mysql leads to a DELETE event for mysql", func(t *testing.T) {
entry := &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "mongo",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peering",
},
},
},
},
}
lastIdx++
err = store.EnsureConfigEntry(lastIdx, entry)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
msg, err := client.RecvWithTimeout(100 * time.Millisecond)
require.NoError(r, err)
require.Equal(r, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Equal(r, mysql.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID)
require.Nil(r, msg.GetResponse().Resource)
})
})
testutil.RunStep(t, "deleting the config entry leads to a DELETE event for mongo", func(t *testing.T) {
lastIdx++
err = store.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", nil)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
msg, err := client.RecvWithTimeout(100 * time.Millisecond)
require.NoError(r, err)
require.Equal(r, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Equal(r, mongo.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID)
require.Nil(r, msg.GetResponse().Resource)
})
})
} }
func TestStreamResources_Server_ServiceUpdates_EnableMeshGateways(t *testing.T) {
func TestStreamResources_Server_ServiceUpdates_DisableMeshGatewayMode(t *testing.T) { testStreamResources_Server_ServiceUpdates(t, false)
}
func testStreamResources_Server_ServiceUpdates(t *testing.T, disableMeshGateways bool) {
publisher := stream.NewEventPublisher(10 * time.Second) publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher) store := newStateStore(t, publisher)
@ -807,11 +657,19 @@ func TestStreamResources_Server_ServiceUpdates_DisableMeshGatewayMode(t *testing
remotePeerID = p.PeerID // for Recv remotePeerID = p.PeerID // for Recv
) )
srv := NewService(testutil.Logger(t), &testStreamBackend{ // Set the initial roots and CA configuration.
store: store, _ = writeInitialRootsAndCA(t, store)
pub: publisher,
}) srv := NewService(
srv.DisableMeshGatewayMode = true testutil.Logger(t),
Config{
Datacenter: "dc1",
ConnectEnabled: true,
DisableMeshGatewayMode: disableMeshGateways,
}, &testStreamBackend{
store: store,
pub: publisher,
})
client := NewMockClient(context.Background()) client := NewMockClient(context.Background())
@ -1052,10 +910,16 @@ func Test_processResponse_Validation(t *testing.T) {
publisher := stream.NewEventPublisher(10 * time.Second) publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher) store := newStateStore(t, publisher)
srv := NewService(testutil.Logger(t), &testStreamBackend{
store: store, srv := NewService(
pub: publisher, testutil.Logger(t),
}) Config{
Datacenter: "dc1",
ConnectEnabled: true,
}, &testStreamBackend{
store: store,
pub: publisher,
})
run := func(t *testing.T, tc testCase) { run := func(t *testing.T, tc testCase) {
reply, err := srv.processResponse("", "", tc.in) reply, err := srv.processResponse("", "", tc.in)
@ -1194,6 +1058,19 @@ func writeInitiatedPeering(t *testing.T, store *state.Store, idx uint64, peerNam
return p return p
} }
func writeInitialRootsAndCA(t *testing.T, store *state.Store) string {
const clusterID = connect.TestClusterID
rootA := connect.TestCA(t, nil)
_, err := store.CARootSetCAS(1, 0, structs.CARoots{rootA})
require.NoError(t, err)
err = store.CASetConfig(0, &structs.CAConfiguration{ClusterID: clusterID})
require.NoError(t, err)
return clusterID
}
func makeAnyPB(t *testing.T, pb proto.Message) *any.Any { func makeAnyPB(t *testing.T, pb proto.Message) *any.Any {
any, err := ptypes.MarshalAny(pb) any, err := ptypes.MarshalAny(pb)
require.NoError(t, err) require.NoError(t, err)

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview" "github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
@ -29,41 +30,48 @@ type SubscriptionBackend interface {
// subscriptionManager handlers requests to subscribe to events from an events publisher. // subscriptionManager handlers requests to subscribe to events from an events publisher.
type subscriptionManager struct { type subscriptionManager struct {
logger hclog.Logger logger hclog.Logger
viewStore MaterializedViewStore config Config
backend SubscriptionBackend trustDomain string
viewStore MaterializedViewStore
// TODO(peering): remove this when we're ready backend SubscriptionBackend
DisableMeshGatewayMode bool
} }
// TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering. // TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering.
func newSubscriptionManager(ctx context.Context, logger hclog.Logger, backend SubscriptionBackend) *subscriptionManager { func newSubscriptionManager(
ctx context.Context,
logger hclog.Logger,
config Config,
trustDomain string,
backend SubscriptionBackend,
) *subscriptionManager {
logger = logger.Named("subscriptions") logger = logger.Named("subscriptions")
store := submatview.NewStore(logger.Named("viewstore")) store := submatview.NewStore(logger.Named("viewstore"))
go store.Run(ctx) go store.Run(ctx)
return &subscriptionManager{ return &subscriptionManager{
logger: logger, logger: logger,
viewStore: store, config: config,
backend: backend, trustDomain: trustDomain,
viewStore: store,
backend: backend,
} }
} }
// subscribe returns a channel that will contain updates to exported service instances for a given peer. // subscribe returns a channel that will contain updates to exported service instances for a given peer.
func (m *subscriptionManager) subscribe(ctx context.Context, peerID, partition string) <-chan cache.UpdateEvent { func (m *subscriptionManager) subscribe(ctx context.Context, peerID, peerName, partition string) <-chan cache.UpdateEvent {
var ( var (
updateCh = make(chan cache.UpdateEvent, 1) updateCh = make(chan cache.UpdateEvent, 1)
publicUpdateCh = make(chan cache.UpdateEvent, 1) publicUpdateCh = make(chan cache.UpdateEvent, 1)
) )
state := newSubscriptionState(partition) state := newSubscriptionState(peerName, partition)
state.publicUpdateCh = publicUpdateCh state.publicUpdateCh = publicUpdateCh
state.updateCh = updateCh state.updateCh = updateCh
// Wrap our bare state store queries in goroutines that emit events. // Wrap our bare state store queries in goroutines that emit events.
go m.notifyExportedServicesForPeerID(ctx, state, peerID) go m.notifyExportedServicesForPeerID(ctx, state, peerID)
if !m.DisableMeshGatewayMode { if !m.config.DisableMeshGatewayMode && m.config.ConnectEnabled {
go m.notifyMeshGatewaysForPartition(ctx, state, state.partition) go m.notifyMeshGatewaysForPartition(ctx, state, state.partition)
} }
@ -112,10 +120,12 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
pending := &pendingPayload{} pending := &pendingPayload{}
m.syncNormalServices(ctx, state, pending, evt.Services) m.syncNormalServices(ctx, state, pending, evt.Services)
if m.DisableMeshGatewayMode { if m.config.DisableMeshGatewayMode {
m.syncProxyServices(ctx, state, pending, evt.Services) m.syncProxyServices(ctx, state, pending, evt.Services)
} else { } else {
m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains()) if m.config.ConnectEnabled {
m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains())
}
} }
state.sendPendingEvents(ctx, m.logger, pending) state.sendPendingEvents(ctx, m.logger, pending)
@ -133,7 +143,7 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
// Clear this raft index before exporting. // Clear this raft index before exporting.
csn.Index = 0 csn.Index = 0
if !m.DisableMeshGatewayMode { if !m.config.DisableMeshGatewayMode {
// Ensure that connect things are scrubbed so we don't mix-and-match // Ensure that connect things are scrubbed so we don't mix-and-match
// with the synthetic entries that point to mesh gateways. // with the synthetic entries that point to mesh gateways.
filterConnectReferences(csn) filterConnectReferences(csn)
@ -150,6 +160,18 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
} }
} }
// Scrub raft indexes
for _, instance := range csn.Nodes {
instance.Node.RaftIndex = nil
instance.Service.RaftIndex = nil
if m.config.DisableMeshGatewayMode {
for _, chk := range instance.Checks {
chk.RaftIndex = nil
}
}
// skip checks since we just generated one from scratch
}
id := servicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedService) id := servicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedService)
// Just ferry this one directly along to the destination. // Just ferry this one directly along to the destination.
@ -165,7 +187,7 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
return fmt.Errorf("invalid type for response: %T", u.Result) return fmt.Errorf("invalid type for response: %T", u.Result)
} }
if !m.DisableMeshGatewayMode { if !m.config.DisableMeshGatewayMode {
return nil // ignore event return nil // ignore event
} }
@ -183,6 +205,18 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
// ) // )
// } // }
// Scrub raft indexes
for _, instance := range csn.Nodes {
instance.Node.RaftIndex = nil
instance.Service.RaftIndex = nil
if m.config.DisableMeshGatewayMode {
for _, chk := range instance.Checks {
chk.RaftIndex = nil
}
}
// skip checks since we just generated one from scratch
}
id := proxyServicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedProxyService) id := proxyServicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedProxyService)
// Just ferry this one directly along to the destination. // Just ferry this one directly along to the destination.
@ -200,7 +234,7 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
partition := strings.TrimPrefix(u.CorrelationID, subMeshGateway) partition := strings.TrimPrefix(u.CorrelationID, subMeshGateway)
if m.DisableMeshGatewayMode { if m.config.DisableMeshGatewayMode || !m.config.ConnectEnabled {
return nil // ignore event return nil // ignore event
} }
@ -211,6 +245,30 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
// Clear this raft index before exporting. // Clear this raft index before exporting.
csn.Index = 0 csn.Index = 0
// Flatten health checks
for _, instance := range csn.Nodes {
instance.Checks = flattenChecks(
instance.Node.Node,
instance.Service.ID,
instance.Service.Service,
instance.Service.EnterpriseMeta,
instance.Checks,
)
}
// Scrub raft indexes
for _, instance := range csn.Nodes {
instance.Node.RaftIndex = nil
instance.Service.RaftIndex = nil
// skip checks since we just generated one from scratch
// Remove connect things like native mode.
if instance.Service.Connect != nil || instance.Service.Proxy != nil {
instance.Service.Connect = nil
instance.Service.Proxy = nil
}
}
state.meshGateway = csn state.meshGateway = csn
pending := &pendingPayload{} pending := &pendingPayload{}
@ -223,8 +281,8 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
if state.exportList != nil { if state.exportList != nil {
// Trigger public events for all synthetic discovery chain replies. // Trigger public events for all synthetic discovery chain replies.
for chainName := range state.connectServices { for chainName, protocol := range state.connectServices {
m.emitEventForDiscoveryChain(ctx, state, pending, chainName) m.emitEventForDiscoveryChain(ctx, state, pending, chainName, protocol)
} }
} }
@ -369,17 +427,17 @@ func (m *subscriptionManager) syncDiscoveryChains(
ctx context.Context, ctx context.Context,
state *subscriptionState, state *subscriptionState,
pending *pendingPayload, pending *pendingPayload,
chainsByName map[structs.ServiceName]struct{}, chainsByName map[structs.ServiceName]string, // TODO(peering):rename variable
) { ) {
// if it was newly added, then try to emit an UPDATE event // if it was newly added, then try to emit an UPDATE event
for chainName := range chainsByName { for chainName, protocol := range chainsByName {
if _, ok := state.connectServices[chainName]; ok { if oldProtocol, ok := state.connectServices[chainName]; ok && protocol == oldProtocol {
continue continue
} }
state.connectServices[chainName] = struct{}{} state.connectServices[chainName] = protocol
m.emitEventForDiscoveryChain(ctx, state, pending, chainName) m.emitEventForDiscoveryChain(ctx, state, pending, chainName, protocol)
} }
// if it was dropped, try to emit an DELETE event // if it was dropped, try to emit an DELETE event
@ -411,6 +469,7 @@ func (m *subscriptionManager) emitEventForDiscoveryChain(
state *subscriptionState, state *subscriptionState,
pending *pendingPayload, pending *pendingPayload,
chainName structs.ServiceName, chainName structs.ServiceName,
protocol string,
) { ) {
if _, ok := state.connectServices[chainName]; !ok { if _, ok := state.connectServices[chainName]; !ok {
return // not found return // not found
@ -427,7 +486,11 @@ func (m *subscriptionManager) emitEventForDiscoveryChain(
discoveryChainPayloadIDPrefix+chainName.String(), discoveryChainPayloadIDPrefix+chainName.String(),
subExportedService+proxyName.String(), subExportedService+proxyName.String(),
createDiscoChainHealth( createDiscoChainHealth(
state.peerName,
m.config.Datacenter,
m.trustDomain,
chainName, chainName,
protocol,
state.meshGateway, state.meshGateway,
), ),
) )
@ -436,9 +499,43 @@ func (m *subscriptionManager) emitEventForDiscoveryChain(
} }
} }
func createDiscoChainHealth(sn structs.ServiceName, pb *pbservice.IndexedCheckServiceNodes) *pbservice.IndexedCheckServiceNodes { func createDiscoChainHealth(
peerName string,
datacenter, trustDomain string,
sn structs.ServiceName,
protocol string,
pb *pbservice.IndexedCheckServiceNodes,
) *pbservice.IndexedCheckServiceNodes {
fakeProxyName := sn.Name + syntheticProxyNameSuffix fakeProxyName := sn.Name + syntheticProxyNameSuffix
var peerMeta *pbservice.PeeringServiceMeta
{
spiffeID := connect.SpiffeIDService{
Host: trustDomain,
Partition: sn.PartitionOrDefault(),
Namespace: sn.NamespaceOrDefault(),
Datacenter: datacenter,
Service: sn.Name,
}
sni := connect.PeeredServiceSNI(
sn.Name,
sn.NamespaceOrDefault(),
sn.PartitionOrDefault(),
peerName,
trustDomain,
)
// Create common peer meta.
//
// TODO(peering): should this be replicated by service and not by instance?
peerMeta = &pbservice.PeeringServiceMeta{
SNI: []string{sni},
SpiffeID: []string{spiffeID.URI().String()},
Protocol: protocol,
}
}
newNodes := make([]*pbservice.CheckServiceNode, 0, len(pb.Nodes)) newNodes := make([]*pbservice.CheckServiceNode, 0, len(pb.Nodes))
for i := range pb.Nodes { for i := range pb.Nodes {
gwNode := pb.Nodes[i].Node gwNode := pb.Nodes[i].Node
@ -448,10 +545,12 @@ func createDiscoChainHealth(sn structs.ServiceName, pb *pbservice.IndexedCheckSe
pbEntMeta := pbcommon.NewEnterpriseMetaFromStructs(sn.EnterpriseMeta) pbEntMeta := pbcommon.NewEnterpriseMetaFromStructs(sn.EnterpriseMeta)
fakeProxyID := fakeProxyName fakeProxyID := fakeProxyName
destServiceID := sn.Name
if gwService.ID != "" { if gwService.ID != "" {
// This is only going to be relevant if multiple mesh gateways are // This is only going to be relevant if multiple mesh gateways are
// on the same exporting node. // on the same exporting node.
fakeProxyID = fmt.Sprintf("%s-instance-%d", fakeProxyName, i) fakeProxyID = fmt.Sprintf("%s-instance-%d", fakeProxyName, i)
destServiceID = fmt.Sprintf("%s-instance-%d", sn.Name, i)
} }
csn := &pbservice.CheckServiceNode{ csn := &pbservice.CheckServiceNode{
@ -464,7 +563,7 @@ func createDiscoChainHealth(sn structs.ServiceName, pb *pbservice.IndexedCheckSe
PeerName: structs.DefaultPeerKeyword, PeerName: structs.DefaultPeerKeyword,
Proxy: &pbservice.ConnectProxyConfig{ Proxy: &pbservice.ConnectProxyConfig{
DestinationServiceName: sn.Name, DestinationServiceName: sn.Name,
DestinationServiceID: sn.Name, DestinationServiceID: destServiceID,
}, },
// direct // direct
Address: gwService.Address, Address: gwService.Address,
@ -472,6 +571,9 @@ func createDiscoChainHealth(sn structs.ServiceName, pb *pbservice.IndexedCheckSe
Port: gwService.Port, Port: gwService.Port,
SocketPath: gwService.SocketPath, SocketPath: gwService.SocketPath,
Weights: gwService.Weights, Weights: gwService.Weights,
Connect: &pbservice.ServiceConnect{
PeerMeta: peerMeta,
},
}, },
Checks: flattenChecks(gwNode.Node, fakeProxyID, fakeProxyName, pbEntMeta, gwChecks), Checks: flattenChecks(gwNode.Node, fakeProxyID, fakeProxyName, pbEntMeta, gwChecks),
} }

View File

@ -7,18 +7,28 @@ import (
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/durationpb"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/prototest"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
) )
func TestSubscriptionManager_RegisterDeregister(t *testing.T) { func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
testSubscriptionManager_RegisterDeregister(t, true)
}
func TestSubscriptionManager_RegisterDeregister_EnableMeshGateways(t *testing.T) {
testSubscriptionManager_RegisterDeregister(t, false)
}
func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateways bool) {
backend := newTestSubscriptionBackend(t) backend := newTestSubscriptionBackend(t)
// initialCatalogIdx := backend.lastIdx // initialCatalogIdx := backend.lastIdx
@ -29,8 +39,12 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
_, id := backend.ensurePeering(t, "my-peering") _, id := backend.ensurePeering(t, "my-peering")
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
mgr := newSubscriptionManager(ctx, testutil.Logger(t), backend) mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{
subCh := mgr.subscribe(ctx, id, partition) Datacenter: "dc1",
ConnectEnabled: true,
DisableMeshGatewayMode: disableMeshGateways,
}, connect.TestTrustDomain, backend)
subCh := mgr.subscribe(ctx, id, "my-peering", partition)
var ( var (
gatewayCorrID = subMeshGateway + partition gatewayCorrID = subMeshGateway + partition
@ -38,12 +52,18 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String() mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String()
mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String() mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String()
mysqlProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("mysql", nil).String()
) )
// Expect just the empty mesh gateway event to replicate. if disableMeshGateways {
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { expectEvents(t, subCh)
checkEvent(t, got, gatewayCorrID, 0) } else {
}) // Expect just the empty mesh gateway event to replicate.
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, gatewayCorrID, 0)
})
}
testutil.RunStep(t, "initial export syncs empty instance lists", func(t *testing.T) { testutil.RunStep(t, "initial export syncs empty instance lists", func(t *testing.T) {
backend.ensureConfigEntry(t, &structs.ExportedServicesConfigEntry{ backend.ensureConfigEntry(t, &structs.ExportedServicesConfigEntry{
@ -64,14 +84,25 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
}, },
}) })
expectEvents(t, subCh, if disableMeshGateways {
func(t *testing.T, got cache.UpdateEvent) { expectEvents(t, subCh,
checkEvent(t, got, mysqlCorrID, 0) func(t *testing.T, got cache.UpdateEvent) {
}, checkEvent(t, got, mysqlProxyCorrID_temp, 0)
func(t *testing.T, got cache.UpdateEvent) { },
checkEvent(t, got, mysqlProxyCorrID, 0) func(t *testing.T, got cache.UpdateEvent) {
}, checkEvent(t, got, mysqlCorrID, 0)
) },
)
} else {
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlCorrID, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlProxyCorrID, 0)
},
)
}
}) })
mysql1 := &structs.CheckServiceNode{ mysql1 := &structs.CheckServiceNode{
@ -93,12 +124,18 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
require.Equal(t, uint64(0), res.Index) require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1) require.Len(t, res.Nodes, 1)
node := res.Nodes[0]
require.NotNil(t, node.Node) if disableMeshGateways {
require.Equal(t, "foo", node.Node.Node) prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
require.NotNil(t, node.Service) Node: pbNode("foo", "10.0.0.1", partition),
require.Equal(t, "mysql-1", node.Service.ID) Service: pbService_temp("", "mysql-1", "mysql", 5000, nil),
require.Len(t, node.Checks, 0) }, res.Nodes[0])
} else {
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("foo", "10.0.0.1", partition),
Service: pbService("", "mysql-1", "mysql", 5000, nil),
}, res.Nodes[0])
}
}) })
backend.ensureCheck(t, mysql1.Checks[0]) backend.ensureCheck(t, mysql1.Checks[0])
@ -110,13 +147,24 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
require.Equal(t, uint64(0), res.Index) require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1) require.Len(t, res.Nodes, 1)
node := res.Nodes[0]
require.NotNil(t, node.Node) if disableMeshGateways {
require.Equal(t, "foo", node.Node.Node) prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
require.NotNil(t, node.Service) Node: pbNode("foo", "10.0.0.1", partition),
require.Equal(t, "mysql-1", node.Service.ID) Service: pbService_temp("", "mysql-1", "mysql", 5000, nil),
require.Len(t, node.Checks, 1) Checks: []*pbservice.HealthCheck{
require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID) pbCheck_temp("foo", "mysql-1", "mysql", "mysql-check", "critical", nil),
},
}, res.Nodes[0])
} else {
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("foo", "10.0.0.1", partition),
Service: pbService("", "mysql-1", "mysql", 5000, nil),
Checks: []*pbservice.HealthCheck{
pbCheck("foo", "mysql-1", "mysql", "critical", nil),
},
}, res.Nodes[0])
}
}) })
}) })
@ -139,21 +187,31 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
require.Equal(t, uint64(0), res.Index) require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 2) require.Len(t, res.Nodes, 2)
{
node := res.Nodes[0] if disableMeshGateways {
require.NotNil(t, node.Node) prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
require.Equal(t, "bar", node.Node.Node) Node: pbNode("bar", "10.0.0.2", partition),
require.NotNil(t, node.Service) Service: pbService_temp("", "mysql-2", "mysql", 5000, nil),
require.Equal(t, "mysql-2", node.Service.ID) }, res.Nodes[0])
require.Len(t, node.Checks, 0) prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
} Node: pbNode("foo", "10.0.0.1", partition),
{ Service: pbService_temp("", "mysql-1", "mysql", 5000, nil),
node := res.Nodes[1] Checks: []*pbservice.HealthCheck{
require.NotNil(t, node.Node) pbCheck_temp("foo", "mysql-1", "mysql", "mysql-check", "critical", nil),
require.Equal(t, "foo", node.Node.Node) },
require.NotNil(t, node.Service) }, res.Nodes[1])
require.Len(t, node.Checks, 1) } else {
require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID) prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("bar", "10.0.0.2", partition),
Service: pbService("", "mysql-2", "mysql", 5000, nil),
}, res.Nodes[0])
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("foo", "10.0.0.1", partition),
Service: pbService("", "mysql-1", "mysql", 5000, nil),
Checks: []*pbservice.HealthCheck{
pbCheck("foo", "mysql-1", "mysql", "critical", nil),
},
}, res.Nodes[1])
} }
}) })
@ -166,22 +224,36 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
require.Equal(t, uint64(0), res.Index) require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 2) require.Len(t, res.Nodes, 2)
{ if disableMeshGateways {
node := res.Nodes[0] prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
require.NotNil(t, node.Node) Node: pbNode("bar", "10.0.0.2", partition),
require.Equal(t, "bar", node.Node.Node) Service: pbService_temp("", "mysql-2", "mysql", 5000, nil),
require.NotNil(t, node.Service) Checks: []*pbservice.HealthCheck{
require.Equal(t, "mysql-2", node.Service.ID) pbCheck_temp("bar", "mysql-2", "mysql", "mysql-2-check", "critical", nil),
require.Len(t, node.Checks, 1) },
require.Equal(t, "mysql-2:overall-check", node.Checks[0].CheckID) }, res.Nodes[0])
} prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
{ Node: pbNode("foo", "10.0.0.1", partition),
node := res.Nodes[1] Service: pbService_temp("", "mysql-1", "mysql", 5000, nil),
require.NotNil(t, node.Node) Checks: []*pbservice.HealthCheck{
require.Equal(t, "foo", node.Node.Node) pbCheck_temp("foo", "mysql-1", "mysql", "mysql-check", "critical", nil),
require.NotNil(t, node.Service) },
require.Len(t, node.Checks, 1) }, res.Nodes[1])
require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID) } else {
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("bar", "10.0.0.2", partition),
Service: pbService("", "mysql-2", "mysql", 5000, nil),
Checks: []*pbservice.HealthCheck{
pbCheck("bar", "mysql-2", "mysql", "critical", nil),
},
}, res.Nodes[0])
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("foo", "10.0.0.1", partition),
Service: pbService("", "mysql-1", "mysql", 5000, nil),
Checks: []*pbservice.HealthCheck{
pbCheck("foo", "mysql-1", "mysql", "critical", nil),
},
}, res.Nodes[1])
} }
}) })
}) })
@ -212,17 +284,90 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
require.Equal(t, uint64(0), res.Index) require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1) require.Len(t, res.Nodes, 1)
if disableMeshGateways {
node := res.Nodes[0] prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
require.NotNil(t, node.Node) Node: pbNode("bar", "10.0.0.2", partition),
require.Equal(t, "bar", node.Node.Node) Service: pbService_temp("", "mysql-2", "mysql", 5000, nil),
require.NotNil(t, node.Service) Checks: []*pbservice.HealthCheck{
require.Equal(t, "mysql-2", node.Service.ID) pbCheck_temp("bar", "mysql-2", "mysql", "mysql-2-check", "critical", nil),
require.Len(t, node.Checks, 1) },
require.Equal(t, "mysql-2:overall-check", node.Checks[0].CheckID) }, res.Nodes[0])
} else {
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("bar", "10.0.0.2", partition),
Service: pbService("", "mysql-2", "mysql", 5000, nil),
Checks: []*pbservice.HealthCheck{
pbCheck("bar", "mysql-2", "mysql", "critical", nil),
},
}, res.Nodes[0])
}
}) })
}) })
testutil.RunStep(t, "register mesh gateway to send proxy updates", func(t *testing.T) {
if disableMeshGateways {
t.Skip()
return
}
gateway := &structs.CheckServiceNode{
Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"},
Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443},
// TODO: checks
}
backend.ensureNode(t, gateway.Node)
backend.ensureService(t, "mgw", gateway.Service)
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, mysqlProxyCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1)
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("mgw", "10.1.1.1", partition),
Service: &pbservice.NodeService{
Kind: "connect-proxy",
ID: "mysql-sidecar-proxy-instance-0",
Service: "mysql-sidecar-proxy",
Port: 8443,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
Proxy: &pbservice.ConnectProxyConfig{
DestinationServiceID: "mysql-instance-0",
DestinationServiceName: "mysql",
},
Connect: &pbservice.ServiceConnect{
PeerMeta: &pbservice.PeeringServiceMeta{
SNI: []string{
"mysql.default.default.my-peering.external.11111111-2222-3333-4444-555555555555.consul",
},
SpiffeID: []string{
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql",
},
Protocol: "tcp",
},
},
},
}, res.Nodes[0])
},
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, gatewayCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1)
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("mgw", "10.1.1.1", partition),
Service: pbService("mesh-gateway", "gateway-1", "gateway", 8443, nil),
}, res.Nodes[0])
},
)
})
testutil.RunStep(t, "deregister the last instance and the output is empty", func(t *testing.T) { testutil.RunStep(t, "deregister the last instance and the output is empty", func(t *testing.T) {
backend.deleteService(t, "bar", mysql2.Service.ID) backend.deleteService(t, "bar", mysql2.Service.ID)
@ -234,9 +379,40 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
require.Len(t, res.Nodes, 0) require.Len(t, res.Nodes, 0)
}) })
}) })
testutil.RunStep(t, "deregister mesh gateway to send proxy removals", func(t *testing.T) {
if disableMeshGateways {
t.Skip()
return
}
backend.deleteService(t, "mgw", "gateway-1")
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, mysqlProxyCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, gatewayCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 0)
},
)
})
} }
func TestSubscriptionManager_InitialSnapshot(t *testing.T) { func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
testSubscriptionManager_InitialSnapshot(t, true)
}
func TestSubscriptionManager_InitialSnapshot_EnableMeshGateways(t *testing.T) {
testSubscriptionManager_InitialSnapshot(t, false)
}
func testSubscriptionManager_InitialSnapshot(t *testing.T, disableMeshGateways bool) {
backend := newTestSubscriptionBackend(t) backend := newTestSubscriptionBackend(t)
// initialCatalogIdx := backend.lastIdx // initialCatalogIdx := backend.lastIdx
@ -247,8 +423,12 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
_, id := backend.ensurePeering(t, "my-peering") _, id := backend.ensurePeering(t, "my-peering")
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
mgr := newSubscriptionManager(ctx, testutil.Logger(t), backend) mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{
subCh := mgr.subscribe(ctx, id, partition) Datacenter: "dc1",
ConnectEnabled: true,
DisableMeshGatewayMode: disableMeshGateways,
}, connect.TestTrustDomain, backend)
subCh := mgr.subscribe(ctx, id, "my-peering", partition)
// Register two services that are not yet exported // Register two services that are not yet exported
mysql := &structs.CheckServiceNode{ mysql := &structs.CheckServiceNode{
@ -275,12 +455,20 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String() mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String()
mongoProxyCorrID = subExportedService + structs.NewServiceName("mongo-sidecar-proxy", nil).String() mongoProxyCorrID = subExportedService + structs.NewServiceName("mongo-sidecar-proxy", nil).String()
chainProxyCorrID = subExportedService + structs.NewServiceName("chain-sidecar-proxy", nil).String() chainProxyCorrID = subExportedService + structs.NewServiceName("chain-sidecar-proxy", nil).String()
mysqlProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("mysql", nil).String()
mongoProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("mongo", nil).String()
chainProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("chain", nil).String()
) )
// Expect just the empty mesh gateway event to replicate. if disableMeshGateways {
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { expectEvents(t, subCh)
checkEvent(t, got, gatewayCorrID, 0) } else {
}) // Expect just the empty mesh gateway event to replicate.
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, gatewayCorrID, 0)
})
}
// At this point in time we'll have a mesh-gateway notification with no // At this point in time we'll have a mesh-gateway notification with no
// content stored and handled. // content stored and handled.
@ -309,29 +497,56 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
}, },
}) })
expectEvents(t, subCh, if disableMeshGateways {
func(t *testing.T, got cache.UpdateEvent) { expectEvents(t, subCh,
checkEvent(t, got, chainCorrID, 0) func(t *testing.T, got cache.UpdateEvent) {
}, checkEvent(t, got, chainProxyCorrID_temp, 0)
func(t *testing.T, got cache.UpdateEvent) { },
checkEvent(t, got, chainProxyCorrID, 0) func(t *testing.T, got cache.UpdateEvent) {
}, checkEvent(t, got, mongoProxyCorrID_temp, 0)
func(t *testing.T, got cache.UpdateEvent) { },
checkEvent(t, got, mongoCorrID, 1, "mongo", string(structs.ServiceKindTypical)) func(t *testing.T, got cache.UpdateEvent) {
}, checkEvent(t, got, mysqlProxyCorrID_temp, 0)
func(t *testing.T, got cache.UpdateEvent) { },
checkEvent(t, got, mongoProxyCorrID, 0) func(t *testing.T, got cache.UpdateEvent) {
}, checkEvent(t, got, chainCorrID, 0)
func(t *testing.T, got cache.UpdateEvent) { },
checkEvent(t, got, mysqlCorrID, 1, "mysql", string(structs.ServiceKindTypical)) func(t *testing.T, got cache.UpdateEvent) {
}, checkEvent(t, got, mongoCorrID, 1, "mongo", string(structs.ServiceKindTypical))
func(t *testing.T, got cache.UpdateEvent) { },
checkEvent(t, got, mysqlProxyCorrID, 0) func(t *testing.T, got cache.UpdateEvent) {
}, checkEvent(t, got, mysqlCorrID, 1, "mysql", string(structs.ServiceKindTypical))
) },
)
} else {
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, chainCorrID, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, chainProxyCorrID, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mongoCorrID, 1, "mongo", string(structs.ServiceKindTypical))
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mongoProxyCorrID, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlCorrID, 1, "mysql", string(structs.ServiceKindTypical))
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlProxyCorrID, 0)
},
)
}
}) })
testutil.RunStep(t, "registering a mesh gateway triggers connect replies", func(t *testing.T) { testutil.RunStep(t, "registering a mesh gateway triggers connect replies", func(t *testing.T) {
if disableMeshGateways {
t.Skip()
return
}
gateway := &structs.CheckServiceNode{ gateway := &structs.CheckServiceNode{
Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"}, Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"},
Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443}, Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443},
@ -535,3 +750,82 @@ func checkEvent(
} }
} }
} }
func pbNode(node, addr, partition string) *pbservice.Node {
return &pbservice.Node{Node: node, Partition: partition, Address: addr}
}
func pbService(kind, id, name string, port int32, entMeta *pbcommon.EnterpriseMeta) *pbservice.NodeService {
if entMeta == nil {
entMeta = pbcommon.DefaultEnterpriseMeta
}
return &pbservice.NodeService{
ID: id,
Kind: kind,
Service: name,
Port: port,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
EnterpriseMeta: entMeta,
}
}
func pbService_temp(kind, id, name string, port int32, entMeta *pbcommon.EnterpriseMeta) *pbservice.NodeService {
if entMeta == nil {
entMeta = pbcommon.DefaultEnterpriseMeta
}
return &pbservice.NodeService{
ID: id,
Kind: kind,
Service: name,
Port: port,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
EnterpriseMeta: entMeta,
Connect: &pbservice.ServiceConnect{},
Proxy: &pbservice.ConnectProxyConfig{
MeshGateway: &pbservice.MeshGatewayConfig{},
Expose: &pbservice.ExposeConfig{},
TransparentProxy: &pbservice.TransparentProxyConfig{},
},
}
}
func pbCheck(node, svcID, svcName, status string, entMeta *pbcommon.EnterpriseMeta) *pbservice.HealthCheck {
if entMeta == nil {
entMeta = pbcommon.DefaultEnterpriseMeta
}
return &pbservice.HealthCheck{
Node: node,
CheckID: svcID + ":overall-check",
Name: "overall-check",
Status: status,
ServiceID: svcID,
ServiceName: svcName,
EnterpriseMeta: entMeta,
}
}
func pbCheck_temp(node, svcID, svcName, checkID, status string, entMeta *pbcommon.EnterpriseMeta) *pbservice.HealthCheck {
if entMeta == nil {
entMeta = pbcommon.DefaultEnterpriseMeta
}
return &pbservice.HealthCheck{
Node: node,
CheckID: checkID,
Status: status,
ServiceID: svcID,
ServiceName: svcName,
EnterpriseMeta: entMeta,
Definition: &pbservice.HealthCheckDefinition{
DeregisterCriticalServiceAfter: durationpb.New(0),
Interval: durationpb.New(0),
TTL: durationpb.New(0),
Timeout: durationpb.New(0),
},
}
}

View File

@ -17,6 +17,8 @@ import (
// subscriptionState is a collection of working state tied to a peerID subscription. // subscriptionState is a collection of working state tied to a peerID subscription.
type subscriptionState struct { type subscriptionState struct {
// peerName is immutable and is the LOCAL name for the peering
peerName string
// partition is immutable // partition is immutable
partition string partition string
@ -25,7 +27,7 @@ type subscriptionState struct {
watchedServices map[structs.ServiceName]context.CancelFunc watchedServices map[structs.ServiceName]context.CancelFunc
watchedProxyServices map[structs.ServiceName]context.CancelFunc // TODO(peering): remove watchedProxyServices map[structs.ServiceName]context.CancelFunc // TODO(peering): remove
connectServices map[structs.ServiceName]struct{} connectServices map[structs.ServiceName]string // value:protocol
// eventVersions is a duplicate event suppression system keyed by the "id" // eventVersions is a duplicate event suppression system keyed by the "id"
// not the "correlationID" // not the "correlationID"
@ -42,12 +44,13 @@ type subscriptionState struct {
publicUpdateCh chan<- cache.UpdateEvent publicUpdateCh chan<- cache.UpdateEvent
} }
func newSubscriptionState(partition string) *subscriptionState { func newSubscriptionState(peerName, partition string) *subscriptionState {
return &subscriptionState{ return &subscriptionState{
peerName: peerName,
partition: partition, partition: partition,
watchedServices: make(map[structs.ServiceName]context.CancelFunc), watchedServices: make(map[structs.ServiceName]context.CancelFunc),
watchedProxyServices: make(map[structs.ServiceName]context.CancelFunc), watchedProxyServices: make(map[structs.ServiceName]context.CancelFunc),
connectServices: make(map[structs.ServiceName]struct{}), connectServices: make(map[structs.ServiceName]string),
eventVersions: make(map[string]string), eventVersions: make(map[string]string),
} }
} }

View File

@ -21,7 +21,7 @@ func TestSubscriptionState_Events(t *testing.T) {
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
state := newSubscriptionState(partition) state := newSubscriptionState("my-peering", partition)
testutil.RunStep(t, "empty", func(t *testing.T) { testutil.RunStep(t, "empty", func(t *testing.T) {
pending := &pendingPayload{} pending := &pendingPayload{}
@ -193,7 +193,7 @@ func testNewSubscriptionState(partition string) (
publicUpdateCh = make(chan cache.UpdateEvent, 1) publicUpdateCh = make(chan cache.UpdateEvent, 1)
) )
state := newSubscriptionState(partition) state := newSubscriptionState("my-peering", partition)
state.publicUpdateCh = publicUpdateCh state.publicUpdateCh = publicUpdateCh
return state, publicUpdateCh return state, publicUpdateCh

View File

@ -22,21 +22,24 @@ type ExportedServiceList struct {
// DiscoChains is a list of exported service that ONLY apply to service mesh. // DiscoChains is a list of exported service that ONLY apply to service mesh.
DiscoChains []ServiceName DiscoChains []ServiceName
// TODO(peering): reduce duplication here in the response
ConnectProtocol map[ServiceName]string
} }
// ListAllDiscoveryChains returns all discovery chains (union of Services and // ListAllDiscoveryChains returns all discovery chains (union of Services and
// DiscoChains). // DiscoChains).
func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]struct{} { func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]string {
chainsByName := make(map[ServiceName]struct{}) chainsByName := make(map[ServiceName]string)
if list == nil { if list == nil {
return chainsByName return chainsByName
} }
for _, svc := range list.Services { for _, svc := range list.Services {
chainsByName[svc] = struct{}{} chainsByName[svc] = list.ConnectProtocol[svc]
} }
for _, chainName := range list.DiscoChains { for _, chainName := range list.DiscoChains {
chainsByName[chainName] = struct{}{} chainsByName[chainName] = list.ConnectProtocol[chainName]
} }
return chainsByName return chainsByName
} }

View File

@ -1227,6 +1227,13 @@ type NodeService struct {
RaftIndex `bexpr:"-"` RaftIndex `bexpr:"-"`
} }
// PeeringServiceMeta is read-only information provided from an exported peer.
type PeeringServiceMeta struct {
SNI []string `json:",omitempty"`
SpiffeID []string `json:",omitempty"`
Protocol string `json:",omitempty"`
}
func (ns *NodeService) BestAddress(wan bool) (string, int) { func (ns *NodeService) BestAddress(wan bool) (string, int) {
addr := ns.Address addr := ns.Address
port := ns.Port port := ns.Port
@ -1297,6 +1304,8 @@ type ServiceConnect struct {
// result is identical to just making a second service registration via any // result is identical to just making a second service registration via any
// other means. // other means.
SidecarService *ServiceDefinition `json:",omitempty" bexpr:"-"` SidecarService *ServiceDefinition `json:",omitempty" bexpr:"-"`
PeerMeta *PeeringServiceMeta `json:",omitempty" bexpr:"-"`
} }
func (t *ServiceConnect) UnmarshalJSON(data []byte) (err error) { func (t *ServiceConnect) UnmarshalJSON(data []byte) (err error) {

View File

@ -100,12 +100,33 @@ func MeshGatewayConfigFromStructs(t *structs.MeshGatewayConfig, s *MeshGatewayCo
} }
s.Mode = string(t.Mode) s.Mode = string(t.Mode)
} }
func PeeringServiceMetaToStructs(s *PeeringServiceMeta, t *structs.PeeringServiceMeta) {
if s == nil {
return
}
t.SNI = s.SNI
t.SpiffeID = s.SpiffeID
t.Protocol = s.Protocol
}
func PeeringServiceMetaFromStructs(t *structs.PeeringServiceMeta, s *PeeringServiceMeta) {
if s == nil {
return
}
s.SNI = t.SNI
s.SpiffeID = t.SpiffeID
s.Protocol = t.Protocol
}
func ServiceConnectToStructs(s *ServiceConnect, t *structs.ServiceConnect) { func ServiceConnectToStructs(s *ServiceConnect, t *structs.ServiceConnect) {
if s == nil { if s == nil {
return return
} }
t.Native = s.Native t.Native = s.Native
t.SidecarService = ServiceDefinitionPtrToStructs(s.SidecarService) t.SidecarService = ServiceDefinitionPtrToStructs(s.SidecarService)
if s.PeerMeta != nil {
var x structs.PeeringServiceMeta
PeeringServiceMetaToStructs(s.PeerMeta, &x)
t.PeerMeta = &x
}
} }
func ServiceConnectFromStructs(t *structs.ServiceConnect, s *ServiceConnect) { func ServiceConnectFromStructs(t *structs.ServiceConnect, s *ServiceConnect) {
if s == nil { if s == nil {
@ -113,6 +134,11 @@ func ServiceConnectFromStructs(t *structs.ServiceConnect, s *ServiceConnect) {
} }
s.Native = t.Native s.Native = t.Native
s.SidecarService = NewServiceDefinitionPtrFromStructs(t.SidecarService) s.SidecarService = NewServiceDefinitionPtrFromStructs(t.SidecarService)
if t.PeerMeta != nil {
var x PeeringServiceMeta
PeeringServiceMetaFromStructs(t.PeerMeta, &x)
s.PeerMeta = &x
}
} }
func ServiceDefinitionToStructs(s *ServiceDefinition, t *structs.ServiceDefinition) { func ServiceDefinitionToStructs(s *ServiceDefinition, t *structs.ServiceDefinition) {
if s == nil { if s == nil {

View File

@ -37,6 +37,16 @@ func (msg *ServiceConnect) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg) return proto.Unmarshal(b, msg)
} }
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *PeeringServiceMeta) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *PeeringServiceMeta) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler // MarshalBinary implements encoding.BinaryMarshaler
func (msg *ExposeConfig) MarshalBinary() ([]byte, error) { func (msg *ExposeConfig) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg) return proto.Marshal(msg)

View File

@ -389,7 +389,8 @@ type ServiceConnect struct {
// result is identical to just making a second service registration via any // result is identical to just making a second service registration via any
// other means. // other means.
// mog: func-to=ServiceDefinitionPtrToStructs func-from=NewServiceDefinitionPtrFromStructs // mog: func-to=ServiceDefinitionPtrToStructs func-from=NewServiceDefinitionPtrFromStructs
SidecarService *ServiceDefinition `protobuf:"bytes,3,opt,name=SidecarService,proto3" json:"SidecarService,omitempty"` SidecarService *ServiceDefinition `protobuf:"bytes,3,opt,name=SidecarService,proto3" json:"SidecarService,omitempty"`
PeerMeta *PeeringServiceMeta `protobuf:"bytes,4,opt,name=PeerMeta,proto3" json:"PeerMeta,omitempty"`
} }
func (x *ServiceConnect) Reset() { func (x *ServiceConnect) Reset() {
@ -438,6 +439,83 @@ func (x *ServiceConnect) GetSidecarService() *ServiceDefinition {
return nil return nil
} }
func (x *ServiceConnect) GetPeerMeta() *PeeringServiceMeta {
if x != nil {
return x.PeerMeta
}
return nil
}
// PeeringServiceMeta is read-only information provided from an exported peer.
//
// mog annotation:
//
// target=github.com/hashicorp/consul/agent/structs.PeeringServiceMeta
// output=service.gen.go
// name=Structs
type PeeringServiceMeta struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
SNI []string `protobuf:"bytes,1,rep,name=SNI,proto3" json:"SNI,omitempty"`
SpiffeID []string `protobuf:"bytes,2,rep,name=SpiffeID,proto3" json:"SpiffeID,omitempty"`
Protocol string `protobuf:"bytes,3,opt,name=Protocol,proto3" json:"Protocol,omitempty"`
}
func (x *PeeringServiceMeta) Reset() {
*x = PeeringServiceMeta{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_pbservice_service_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PeeringServiceMeta) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PeeringServiceMeta) ProtoMessage() {}
func (x *PeeringServiceMeta) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbservice_service_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PeeringServiceMeta.ProtoReflect.Descriptor instead.
func (*PeeringServiceMeta) Descriptor() ([]byte, []int) {
return file_proto_pbservice_service_proto_rawDescGZIP(), []int{3}
}
func (x *PeeringServiceMeta) GetSNI() []string {
if x != nil {
return x.SNI
}
return nil
}
func (x *PeeringServiceMeta) GetSpiffeID() []string {
if x != nil {
return x.SpiffeID
}
return nil
}
func (x *PeeringServiceMeta) GetProtocol() string {
if x != nil {
return x.Protocol
}
return ""
}
// ExposeConfig describes HTTP paths to expose through Envoy outside of Connect. // ExposeConfig describes HTTP paths to expose through Envoy outside of Connect.
// Users can expose individual paths and/or all HTTP/GRPC paths for checks. // Users can expose individual paths and/or all HTTP/GRPC paths for checks.
// //
@ -462,7 +540,7 @@ type ExposeConfig struct {
func (x *ExposeConfig) Reset() { func (x *ExposeConfig) Reset() {
*x = ExposeConfig{} *x = ExposeConfig{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_proto_pbservice_service_proto_msgTypes[3] mi := &file_proto_pbservice_service_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@ -475,7 +553,7 @@ func (x *ExposeConfig) String() string {
func (*ExposeConfig) ProtoMessage() {} func (*ExposeConfig) ProtoMessage() {}
func (x *ExposeConfig) ProtoReflect() protoreflect.Message { func (x *ExposeConfig) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbservice_service_proto_msgTypes[3] mi := &file_proto_pbservice_service_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@ -488,7 +566,7 @@ func (x *ExposeConfig) ProtoReflect() protoreflect.Message {
// Deprecated: Use ExposeConfig.ProtoReflect.Descriptor instead. // Deprecated: Use ExposeConfig.ProtoReflect.Descriptor instead.
func (*ExposeConfig) Descriptor() ([]byte, []int) { func (*ExposeConfig) Descriptor() ([]byte, []int) {
return file_proto_pbservice_service_proto_rawDescGZIP(), []int{3} return file_proto_pbservice_service_proto_rawDescGZIP(), []int{4}
} }
func (x *ExposeConfig) GetChecks() bool { func (x *ExposeConfig) GetChecks() bool {
@ -533,7 +611,7 @@ type ExposePath struct {
func (x *ExposePath) Reset() { func (x *ExposePath) Reset() {
*x = ExposePath{} *x = ExposePath{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_proto_pbservice_service_proto_msgTypes[4] mi := &file_proto_pbservice_service_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@ -546,7 +624,7 @@ func (x *ExposePath) String() string {
func (*ExposePath) ProtoMessage() {} func (*ExposePath) ProtoMessage() {}
func (x *ExposePath) ProtoReflect() protoreflect.Message { func (x *ExposePath) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbservice_service_proto_msgTypes[4] mi := &file_proto_pbservice_service_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@ -559,7 +637,7 @@ func (x *ExposePath) ProtoReflect() protoreflect.Message {
// Deprecated: Use ExposePath.ProtoReflect.Descriptor instead. // Deprecated: Use ExposePath.ProtoReflect.Descriptor instead.
func (*ExposePath) Descriptor() ([]byte, []int) { func (*ExposePath) Descriptor() ([]byte, []int) {
return file_proto_pbservice_service_proto_rawDescGZIP(), []int{4} return file_proto_pbservice_service_proto_rawDescGZIP(), []int{5}
} }
func (x *ExposePath) GetListenerPort() int32 { func (x *ExposePath) GetListenerPort() int32 {
@ -614,7 +692,7 @@ type MeshGatewayConfig struct {
func (x *MeshGatewayConfig) Reset() { func (x *MeshGatewayConfig) Reset() {
*x = MeshGatewayConfig{} *x = MeshGatewayConfig{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_proto_pbservice_service_proto_msgTypes[5] mi := &file_proto_pbservice_service_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@ -627,7 +705,7 @@ func (x *MeshGatewayConfig) String() string {
func (*MeshGatewayConfig) ProtoMessage() {} func (*MeshGatewayConfig) ProtoMessage() {}
func (x *MeshGatewayConfig) ProtoReflect() protoreflect.Message { func (x *MeshGatewayConfig) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbservice_service_proto_msgTypes[5] mi := &file_proto_pbservice_service_proto_msgTypes[6]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@ -640,7 +718,7 @@ func (x *MeshGatewayConfig) ProtoReflect() protoreflect.Message {
// Deprecated: Use MeshGatewayConfig.ProtoReflect.Descriptor instead. // Deprecated: Use MeshGatewayConfig.ProtoReflect.Descriptor instead.
func (*MeshGatewayConfig) Descriptor() ([]byte, []int) { func (*MeshGatewayConfig) Descriptor() ([]byte, []int) {
return file_proto_pbservice_service_proto_rawDescGZIP(), []int{5} return file_proto_pbservice_service_proto_rawDescGZIP(), []int{6}
} }
func (x *MeshGatewayConfig) GetMode() string { func (x *MeshGatewayConfig) GetMode() string {
@ -671,7 +749,7 @@ type TransparentProxyConfig struct {
func (x *TransparentProxyConfig) Reset() { func (x *TransparentProxyConfig) Reset() {
*x = TransparentProxyConfig{} *x = TransparentProxyConfig{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_proto_pbservice_service_proto_msgTypes[6] mi := &file_proto_pbservice_service_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@ -684,7 +762,7 @@ func (x *TransparentProxyConfig) String() string {
func (*TransparentProxyConfig) ProtoMessage() {} func (*TransparentProxyConfig) ProtoMessage() {}
func (x *TransparentProxyConfig) ProtoReflect() protoreflect.Message { func (x *TransparentProxyConfig) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbservice_service_proto_msgTypes[6] mi := &file_proto_pbservice_service_proto_msgTypes[7]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@ -697,7 +775,7 @@ func (x *TransparentProxyConfig) ProtoReflect() protoreflect.Message {
// Deprecated: Use TransparentProxyConfig.ProtoReflect.Descriptor instead. // Deprecated: Use TransparentProxyConfig.ProtoReflect.Descriptor instead.
func (*TransparentProxyConfig) Descriptor() ([]byte, []int) { func (*TransparentProxyConfig) Descriptor() ([]byte, []int) {
return file_proto_pbservice_service_proto_rawDescGZIP(), []int{6} return file_proto_pbservice_service_proto_rawDescGZIP(), []int{7}
} }
func (x *TransparentProxyConfig) GetOutboundListenerPort() int32 { func (x *TransparentProxyConfig) GetOutboundListenerPort() int32 {
@ -768,7 +846,7 @@ type ServiceDefinition struct {
func (x *ServiceDefinition) Reset() { func (x *ServiceDefinition) Reset() {
*x = ServiceDefinition{} *x = ServiceDefinition{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_proto_pbservice_service_proto_msgTypes[7] mi := &file_proto_pbservice_service_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@ -781,7 +859,7 @@ func (x *ServiceDefinition) String() string {
func (*ServiceDefinition) ProtoMessage() {} func (*ServiceDefinition) ProtoMessage() {}
func (x *ServiceDefinition) ProtoReflect() protoreflect.Message { func (x *ServiceDefinition) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbservice_service_proto_msgTypes[7] mi := &file_proto_pbservice_service_proto_msgTypes[8]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@ -794,7 +872,7 @@ func (x *ServiceDefinition) ProtoReflect() protoreflect.Message {
// Deprecated: Use ServiceDefinition.ProtoReflect.Descriptor instead. // Deprecated: Use ServiceDefinition.ProtoReflect.Descriptor instead.
func (*ServiceDefinition) Descriptor() ([]byte, []int) { func (*ServiceDefinition) Descriptor() ([]byte, []int) {
return file_proto_pbservice_service_proto_rawDescGZIP(), []int{7} return file_proto_pbservice_service_proto_rawDescGZIP(), []int{8}
} }
func (x *ServiceDefinition) GetKind() string { func (x *ServiceDefinition) GetKind() string {
@ -930,7 +1008,7 @@ type ServiceAddress struct {
func (x *ServiceAddress) Reset() { func (x *ServiceAddress) Reset() {
*x = ServiceAddress{} *x = ServiceAddress{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_proto_pbservice_service_proto_msgTypes[8] mi := &file_proto_pbservice_service_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@ -943,7 +1021,7 @@ func (x *ServiceAddress) String() string {
func (*ServiceAddress) ProtoMessage() {} func (*ServiceAddress) ProtoMessage() {}
func (x *ServiceAddress) ProtoReflect() protoreflect.Message { func (x *ServiceAddress) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbservice_service_proto_msgTypes[8] mi := &file_proto_pbservice_service_proto_msgTypes[9]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@ -956,7 +1034,7 @@ func (x *ServiceAddress) ProtoReflect() protoreflect.Message {
// Deprecated: Use ServiceAddress.ProtoReflect.Descriptor instead. // Deprecated: Use ServiceAddress.ProtoReflect.Descriptor instead.
func (*ServiceAddress) Descriptor() ([]byte, []int) { func (*ServiceAddress) Descriptor() ([]byte, []int) {
return file_proto_pbservice_service_proto_rawDescGZIP(), []int{8} return file_proto_pbservice_service_proto_rawDescGZIP(), []int{9}
} }
func (x *ServiceAddress) GetAddress() string { func (x *ServiceAddress) GetAddress() string {
@ -988,7 +1066,7 @@ type Weights struct {
func (x *Weights) Reset() { func (x *Weights) Reset() {
*x = Weights{} *x = Weights{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_proto_pbservice_service_proto_msgTypes[9] mi := &file_proto_pbservice_service_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@ -1001,7 +1079,7 @@ func (x *Weights) String() string {
func (*Weights) ProtoMessage() {} func (*Weights) ProtoMessage() {}
func (x *Weights) ProtoReflect() protoreflect.Message { func (x *Weights) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbservice_service_proto_msgTypes[9] mi := &file_proto_pbservice_service_proto_msgTypes[10]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@ -1014,7 +1092,7 @@ func (x *Weights) ProtoReflect() protoreflect.Message {
// Deprecated: Use Weights.ProtoReflect.Descriptor instead. // Deprecated: Use Weights.ProtoReflect.Descriptor instead.
func (*Weights) Descriptor() ([]byte, []int) { func (*Weights) Descriptor() ([]byte, []int) {
return file_proto_pbservice_service_proto_rawDescGZIP(), []int{9} return file_proto_pbservice_service_proto_rawDescGZIP(), []int{10}
} }
func (x *Weights) GetPassing() int32 { func (x *Weights) GetPassing() int32 {
@ -1117,14 +1195,24 @@ var file_proto_pbservice_service_proto_rawDesc = []byte{
0x6b, 0x65, 0x74, 0x50, 0x61, 0x74, 0x68, 0x12, 0x30, 0x0a, 0x13, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x6b, 0x65, 0x74, 0x50, 0x61, 0x74, 0x68, 0x12, 0x30, 0x0a, 0x13, 0x4c, 0x6f, 0x63, 0x61, 0x6c,
0x42, 0x69, 0x6e, 0x64, 0x53, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x4d, 0x6f, 0x64, 0x65, 0x18, 0x0b, 0x42, 0x69, 0x6e, 0x64, 0x53, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x4d, 0x6f, 0x64, 0x65, 0x18, 0x0b,
0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x42, 0x69, 0x6e, 0x64, 0x53, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x42, 0x69, 0x6e, 0x64, 0x53,
0x6f, 0x63, 0x6b, 0x65, 0x74, 0x4d, 0x6f, 0x64, 0x65, 0x22, 0x6c, 0x0a, 0x0e, 0x53, 0x65, 0x72, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x4d, 0x6f, 0x64, 0x65, 0x22, 0xab, 0x01, 0x0a, 0x0e, 0x53, 0x65,
0x76, 0x69, 0x63, 0x65, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x4e, 0x72, 0x76, 0x69, 0x63, 0x65, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x16, 0x0a, 0x06,
0x61, 0x74, 0x69, 0x76, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x4e, 0x61, 0x74, 0x4e, 0x61, 0x74, 0x69, 0x76, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x4e, 0x61,
0x69, 0x76, 0x65, 0x12, 0x42, 0x0a, 0x0e, 0x53, 0x69, 0x64, 0x65, 0x63, 0x61, 0x72, 0x53, 0x65, 0x74, 0x69, 0x76, 0x65, 0x12, 0x42, 0x0a, 0x0e, 0x53, 0x69, 0x64, 0x65, 0x63, 0x61, 0x72, 0x53,
0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x73, 0x65, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x73,
0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x44, 0x65, 0x66, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x44, 0x65,
0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0e, 0x53, 0x69, 0x64, 0x65, 0x63, 0x61, 0x72, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0e, 0x53, 0x69, 0x64, 0x65, 0x63, 0x61,
0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x22, 0x51, 0x0a, 0x0c, 0x45, 0x78, 0x70, 0x6f, 0x73, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x37, 0x0a, 0x08, 0x50, 0x65, 0x65, 0x72,
0x4d, 0x65, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x73, 0x65, 0x72,
0x76, 0x69, 0x63, 0x65, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x53, 0x65, 0x72, 0x76,
0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x52, 0x08, 0x50, 0x65, 0x65, 0x72, 0x4d, 0x65, 0x74,
0x61, 0x4a, 0x04, 0x08, 0x02, 0x10, 0x03, 0x22, 0x5e, 0x0a, 0x12, 0x50, 0x65, 0x65, 0x72, 0x69,
0x6e, 0x67, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x10, 0x0a,
0x03, 0x53, 0x4e, 0x49, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x03, 0x53, 0x4e, 0x49, 0x12,
0x1a, 0x0a, 0x08, 0x53, 0x70, 0x69, 0x66, 0x66, 0x65, 0x49, 0x44, 0x18, 0x02, 0x20, 0x03, 0x28,
0x09, 0x52, 0x08, 0x53, 0x70, 0x69, 0x66, 0x66, 0x65, 0x49, 0x44, 0x12, 0x1a, 0x0a, 0x08, 0x50,
0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x50,
0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x22, 0x51, 0x0a, 0x0c, 0x45, 0x78, 0x70, 0x6f, 0x73,
0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x16, 0x0a, 0x06, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x16, 0x0a, 0x06, 0x43, 0x68, 0x65, 0x63, 0x6b,
0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x12, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x12,
0x29, 0x0a, 0x05, 0x50, 0x61, 0x74, 0x68, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x13, 0x29, 0x0a, 0x05, 0x50, 0x61, 0x74, 0x68, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x13,
@ -1235,48 +1323,50 @@ func file_proto_pbservice_service_proto_rawDescGZIP() []byte {
return file_proto_pbservice_service_proto_rawDescData return file_proto_pbservice_service_proto_rawDescData
} }
var file_proto_pbservice_service_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_proto_pbservice_service_proto_msgTypes = make([]protoimpl.MessageInfo, 13)
var file_proto_pbservice_service_proto_goTypes = []interface{}{ var file_proto_pbservice_service_proto_goTypes = []interface{}{
(*ConnectProxyConfig)(nil), // 0: service.ConnectProxyConfig (*ConnectProxyConfig)(nil), // 0: service.ConnectProxyConfig
(*Upstream)(nil), // 1: service.Upstream (*Upstream)(nil), // 1: service.Upstream
(*ServiceConnect)(nil), // 2: service.ServiceConnect (*ServiceConnect)(nil), // 2: service.ServiceConnect
(*ExposeConfig)(nil), // 3: service.ExposeConfig (*PeeringServiceMeta)(nil), // 3: service.PeeringServiceMeta
(*ExposePath)(nil), // 4: service.ExposePath (*ExposeConfig)(nil), // 4: service.ExposeConfig
(*MeshGatewayConfig)(nil), // 5: service.MeshGatewayConfig (*ExposePath)(nil), // 5: service.ExposePath
(*TransparentProxyConfig)(nil), // 6: service.TransparentProxyConfig (*MeshGatewayConfig)(nil), // 6: service.MeshGatewayConfig
(*ServiceDefinition)(nil), // 7: service.ServiceDefinition (*TransparentProxyConfig)(nil), // 7: service.TransparentProxyConfig
(*ServiceAddress)(nil), // 8: service.ServiceAddress (*ServiceDefinition)(nil), // 8: service.ServiceDefinition
(*Weights)(nil), // 9: service.Weights (*ServiceAddress)(nil), // 9: service.ServiceAddress
nil, // 10: service.ServiceDefinition.TaggedAddressesEntry (*Weights)(nil), // 10: service.Weights
nil, // 11: service.ServiceDefinition.MetaEntry nil, // 11: service.ServiceDefinition.TaggedAddressesEntry
(*structpb.Struct)(nil), // 12: google.protobuf.Struct nil, // 12: service.ServiceDefinition.MetaEntry
(*CheckType)(nil), // 13: service.CheckType (*structpb.Struct)(nil), // 13: google.protobuf.Struct
(*pbcommon.EnterpriseMeta)(nil), // 14: common.EnterpriseMeta (*CheckType)(nil), // 14: service.CheckType
(*pbcommon.EnterpriseMeta)(nil), // 15: common.EnterpriseMeta
} }
var file_proto_pbservice_service_proto_depIdxs = []int32{ var file_proto_pbservice_service_proto_depIdxs = []int32{
12, // 0: service.ConnectProxyConfig.Config:type_name -> google.protobuf.Struct 13, // 0: service.ConnectProxyConfig.Config:type_name -> google.protobuf.Struct
1, // 1: service.ConnectProxyConfig.Upstreams:type_name -> service.Upstream 1, // 1: service.ConnectProxyConfig.Upstreams:type_name -> service.Upstream
5, // 2: service.ConnectProxyConfig.MeshGateway:type_name -> service.MeshGatewayConfig 6, // 2: service.ConnectProxyConfig.MeshGateway:type_name -> service.MeshGatewayConfig
3, // 3: service.ConnectProxyConfig.Expose:type_name -> service.ExposeConfig 4, // 3: service.ConnectProxyConfig.Expose:type_name -> service.ExposeConfig
6, // 4: service.ConnectProxyConfig.TransparentProxy:type_name -> service.TransparentProxyConfig 7, // 4: service.ConnectProxyConfig.TransparentProxy:type_name -> service.TransparentProxyConfig
12, // 5: service.Upstream.Config:type_name -> google.protobuf.Struct 13, // 5: service.Upstream.Config:type_name -> google.protobuf.Struct
5, // 6: service.Upstream.MeshGateway:type_name -> service.MeshGatewayConfig 6, // 6: service.Upstream.MeshGateway:type_name -> service.MeshGatewayConfig
7, // 7: service.ServiceConnect.SidecarService:type_name -> service.ServiceDefinition 8, // 7: service.ServiceConnect.SidecarService:type_name -> service.ServiceDefinition
4, // 8: service.ExposeConfig.Paths:type_name -> service.ExposePath 3, // 8: service.ServiceConnect.PeerMeta:type_name -> service.PeeringServiceMeta
10, // 9: service.ServiceDefinition.TaggedAddresses:type_name -> service.ServiceDefinition.TaggedAddressesEntry 5, // 9: service.ExposeConfig.Paths:type_name -> service.ExposePath
11, // 10: service.ServiceDefinition.Meta:type_name -> service.ServiceDefinition.MetaEntry 11, // 10: service.ServiceDefinition.TaggedAddresses:type_name -> service.ServiceDefinition.TaggedAddressesEntry
13, // 11: service.ServiceDefinition.Check:type_name -> service.CheckType 12, // 11: service.ServiceDefinition.Meta:type_name -> service.ServiceDefinition.MetaEntry
13, // 12: service.ServiceDefinition.Checks:type_name -> service.CheckType 14, // 12: service.ServiceDefinition.Check:type_name -> service.CheckType
9, // 13: service.ServiceDefinition.Weights:type_name -> service.Weights 14, // 13: service.ServiceDefinition.Checks:type_name -> service.CheckType
0, // 14: service.ServiceDefinition.Proxy:type_name -> service.ConnectProxyConfig 10, // 14: service.ServiceDefinition.Weights:type_name -> service.Weights
14, // 15: service.ServiceDefinition.EnterpriseMeta:type_name -> common.EnterpriseMeta 0, // 15: service.ServiceDefinition.Proxy:type_name -> service.ConnectProxyConfig
2, // 16: service.ServiceDefinition.Connect:type_name -> service.ServiceConnect 15, // 16: service.ServiceDefinition.EnterpriseMeta:type_name -> common.EnterpriseMeta
8, // 17: service.ServiceDefinition.TaggedAddressesEntry.value:type_name -> service.ServiceAddress 2, // 17: service.ServiceDefinition.Connect:type_name -> service.ServiceConnect
18, // [18:18] is the sub-list for method output_type 9, // 18: service.ServiceDefinition.TaggedAddressesEntry.value:type_name -> service.ServiceAddress
18, // [18:18] is the sub-list for method input_type 19, // [19:19] is the sub-list for method output_type
18, // [18:18] is the sub-list for extension type_name 19, // [19:19] is the sub-list for method input_type
18, // [18:18] is the sub-list for extension extendee 19, // [19:19] is the sub-list for extension type_name
0, // [0:18] is the sub-list for field type_name 19, // [19:19] is the sub-list for extension extendee
0, // [0:19] is the sub-list for field type_name
} }
func init() { file_proto_pbservice_service_proto_init() } func init() { file_proto_pbservice_service_proto_init() }
@ -1323,7 +1413,7 @@ func file_proto_pbservice_service_proto_init() {
} }
} }
file_proto_pbservice_service_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { file_proto_pbservice_service_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ExposeConfig); i { switch v := v.(*PeeringServiceMeta); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@ -1335,7 +1425,7 @@ func file_proto_pbservice_service_proto_init() {
} }
} }
file_proto_pbservice_service_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { file_proto_pbservice_service_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ExposePath); i { switch v := v.(*ExposeConfig); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@ -1347,7 +1437,7 @@ func file_proto_pbservice_service_proto_init() {
} }
} }
file_proto_pbservice_service_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { file_proto_pbservice_service_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*MeshGatewayConfig); i { switch v := v.(*ExposePath); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@ -1359,7 +1449,7 @@ func file_proto_pbservice_service_proto_init() {
} }
} }
file_proto_pbservice_service_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { file_proto_pbservice_service_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*TransparentProxyConfig); i { switch v := v.(*MeshGatewayConfig); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@ -1371,7 +1461,7 @@ func file_proto_pbservice_service_proto_init() {
} }
} }
file_proto_pbservice_service_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { file_proto_pbservice_service_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ServiceDefinition); i { switch v := v.(*TransparentProxyConfig); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@ -1383,7 +1473,7 @@ func file_proto_pbservice_service_proto_init() {
} }
} }
file_proto_pbservice_service_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { file_proto_pbservice_service_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ServiceAddress); i { switch v := v.(*ServiceDefinition); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@ -1395,6 +1485,18 @@ func file_proto_pbservice_service_proto_init() {
} }
} }
file_proto_pbservice_service_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { file_proto_pbservice_service_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ServiceAddress); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_pbservice_service_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Weights); i { switch v := v.(*Weights); i {
case 0: case 0:
return &v.state return &v.state
@ -1413,7 +1515,7 @@ func file_proto_pbservice_service_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_proto_pbservice_service_proto_rawDesc, RawDescriptor: file_proto_pbservice_service_proto_rawDesc,
NumEnums: 0, NumEnums: 0,
NumMessages: 12, NumMessages: 13,
NumExtensions: 0, NumExtensions: 0,
NumServices: 0, NumServices: 0,
}, },

View File

@ -139,6 +139,8 @@ message ServiceConnect {
// Native is true when this service can natively understand Connect. // Native is true when this service can natively understand Connect.
bool Native = 1; bool Native = 1;
reserved 2;
// SidecarService is a nested Service Definition to register at the same time. // SidecarService is a nested Service Definition to register at the same time.
// It's purely a convenience mechanism to allow specifying a sidecar service // It's purely a convenience mechanism to allow specifying a sidecar service
// along with the application service definition. It's nested nature allows // along with the application service definition. It's nested nature allows
@ -148,6 +150,21 @@ message ServiceConnect {
// other means. // other means.
// mog: func-to=ServiceDefinitionPtrToStructs func-from=NewServiceDefinitionPtrFromStructs // mog: func-to=ServiceDefinitionPtrToStructs func-from=NewServiceDefinitionPtrFromStructs
ServiceDefinition SidecarService = 3; ServiceDefinition SidecarService = 3;
PeeringServiceMeta PeerMeta = 4;
}
// PeeringServiceMeta is read-only information provided from an exported peer.
//
// mog annotation:
//
// target=github.com/hashicorp/consul/agent/structs.PeeringServiceMeta
// output=service.gen.go
// name=Structs
message PeeringServiceMeta {
repeated string SNI = 1;
repeated string SpiffeID = 2;
string Protocol = 3;
} }
// ExposeConfig describes HTTP paths to expose through Envoy outside of Connect. // ExposeConfig describes HTTP paths to expose through Envoy outside of Connect.

View File

@ -3,14 +3,14 @@ package prototest
import ( import (
"testing" "testing"
"github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/testing/protocmp"
) )
func AssertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { func AssertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
t.Helper() t.Helper()
opts = append(opts, cmp.Comparer(proto.Equal)) opts = append(opts, protocmp.Transform())
if diff := cmp.Diff(x, y, opts...); diff != "" { if diff := cmp.Diff(x, y, opts...); diff != "" {
t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff) t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)