809344a6f5
- Add endpoints related to peering: read, list, generate token, initiate peering - Update node/service/check table indexing to account for peers - Foundational changes for pushing service updates to a peer - Plumb peer name through Health.ServiceNodes path see: ENT-1765, ENT-1280, ENT-1283, ENT-1283, ENT-1756, ENT-1739, ENT-1750, ENT-1679, ENT-1709, ENT-1704, ENT-1690, ENT-1689, ENT-1702, ENT-1701, ENT-1683, ENT-1663, ENT-1650, ENT-1678, ENT-1628, ENT-1658, ENT-1640, ENT-1637, ENT-1597, ENT-1634, ENT-1613, ENT-1616, ENT-1617, ENT-1591, ENT-1588, ENT-1596, ENT-1572, ENT-1555 Co-authored-by: R.B. Boyer <rb@hashicorp.com> Co-authored-by: freddygv <freddy@hashicorp.com> Co-authored-by: Chris S. Kim <ckim@hashicorp.com> Co-authored-by: Evan Culver <eculver@hashicorp.com> Co-authored-by: Nitya Dhanushkodi <nitya@hashicorp.com>
200 lines
5.3 KiB
Go
200 lines
5.3 KiB
Go
package peering
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"google.golang.org/grpc/metadata"
|
|
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/proto/pbpeering"
|
|
)
|
|
|
|
// same certificate that appears in our connect tests
|
|
var validCA = `
|
|
-----BEGIN CERTIFICATE-----
|
|
MIICmDCCAj6gAwIBAgIBBzAKBggqhkjOPQQDAjAWMRQwEgYDVQQDEwtDb25zdWwg
|
|
Q0EgNzAeFw0xODA1MjExNjMzMjhaFw0yODA1MTgxNjMzMjhaMBYxFDASBgNVBAMT
|
|
C0NvbnN1bCBDQSA3MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAER0qlxjnRcMEr
|
|
iSGlH7G7dYU7lzBEmLUSMZkyBbClmyV8+e8WANemjn+PLnCr40If9cmpr7RnC9Qk
|
|
GTaLnLiF16OCAXswggF3MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTADAQH/
|
|
MGgGA1UdDgRhBF8xZjo5MTpjYTo0MTo4ZjphYzo2NzpiZjo1OTpjMjpmYTo0ZTo3
|
|
NTo1YzpkODpmMDo1NTpkZTpiZTo3NTpiODozMzozMTpkNToyNDpiMDowNDpiMzpl
|
|
ODo5Nzo1Yjo3ZTBqBgNVHSMEYzBhgF8xZjo5MTpjYTo0MTo4ZjphYzo2NzpiZjo1
|
|
OTpjMjpmYTo0ZTo3NTo1YzpkODpmMDo1NTpkZTpiZTo3NTpiODozMzozMTpkNToy
|
|
NDpiMDowNDpiMzplODo5Nzo1Yjo3ZTA/BgNVHREEODA2hjRzcGlmZmU6Ly8xMjRk
|
|
ZjVhMC05ODIwLTc2YzMtOWFhOS02ZjYyMTY0YmExYzIuY29uc3VsMD0GA1UdHgEB
|
|
/wQzMDGgLzAtgisxMjRkZjVhMC05ODIwLTc2YzMtOWFhOS02ZjYyMTY0YmExYzIu
|
|
Y29uc3VsMAoGCCqGSM49BAMCA0gAMEUCIQDzkkI7R+0U12a+zq2EQhP/n2mHmta+
|
|
fs2hBxWIELGwTAIgLdO7RRw+z9nnxCIA6kNl//mIQb+PGItespiHZKAz74Q=
|
|
-----END CERTIFICATE-----
|
|
`
|
|
var invalidCA = `
|
|
-----BEGIN CERTIFICATE-----
|
|
not valid
|
|
-----END CERTIFICATE-----
|
|
`
|
|
|
|
var validAddress = "1.2.3.4:80"
|
|
|
|
var validServerName = "server.consul"
|
|
|
|
var validPeerID = "peer1"
|
|
|
|
// TODO(peering): the test methods below are exposed to prevent duplication,
|
|
// these should be removed at same time tests in peering_test get refactored.
|
|
// XXX: we can't put the existing tests in service_test.go into the peering
|
|
// package because it causes an import cycle by importing the top-level consul
|
|
// package (which correctly imports the agent/rpc/peering package)
|
|
|
|
// TestPeering is a test utility for generating a pbpeering.Peering with valid
|
|
// data along with the peerName, state and index.
|
|
func TestPeering(peerName string, state pbpeering.PeeringState) *pbpeering.Peering {
|
|
return &pbpeering.Peering{
|
|
Name: peerName,
|
|
PeerCAPems: []string{validCA},
|
|
PeerServerAddresses: []string{validAddress},
|
|
PeerServerName: validServerName,
|
|
State: state,
|
|
// uncomment once #1613 lands
|
|
// PeerID: validPeerID
|
|
}
|
|
}
|
|
|
|
// TestPeeringToken is a test utility for generating a valid peering token
|
|
// with the given peerID for use in test cases
|
|
func TestPeeringToken(peerID string) structs.PeeringToken {
|
|
return structs.PeeringToken{
|
|
CA: []string{validCA},
|
|
ServerAddresses: []string{validAddress},
|
|
ServerName: validServerName,
|
|
PeerID: peerID,
|
|
}
|
|
}
|
|
|
|
type mockClient struct {
|
|
mu sync.Mutex
|
|
errCh chan error
|
|
|
|
replicationStream *mockStream
|
|
}
|
|
|
|
func (c *mockClient) Send(r *pbpeering.ReplicationMessage) error {
|
|
c.replicationStream.recvCh <- r
|
|
return nil
|
|
}
|
|
|
|
func (c *mockClient) Recv() (*pbpeering.ReplicationMessage, error) {
|
|
select {
|
|
case err := <-c.errCh:
|
|
return nil, err
|
|
case r := <-c.replicationStream.sendCh:
|
|
return r, nil
|
|
case <-time.After(10 * time.Millisecond):
|
|
return nil, io.EOF
|
|
}
|
|
}
|
|
|
|
func (c *mockClient) RecvWithTimeout(dur time.Duration) (*pbpeering.ReplicationMessage, error) {
|
|
select {
|
|
case err := <-c.errCh:
|
|
return nil, err
|
|
case r := <-c.replicationStream.sendCh:
|
|
return r, nil
|
|
case <-time.After(dur):
|
|
return nil, io.EOF
|
|
}
|
|
}
|
|
|
|
func (c *mockClient) Close() {
|
|
close(c.replicationStream.recvCh)
|
|
}
|
|
|
|
func newMockClient(ctx context.Context) *mockClient {
|
|
return &mockClient{
|
|
replicationStream: newTestReplicationStream(ctx),
|
|
}
|
|
}
|
|
|
|
// mockStream mocks peering.PeeringService_StreamResourcesServer
|
|
type mockStream struct {
|
|
sendCh chan *pbpeering.ReplicationMessage
|
|
recvCh chan *pbpeering.ReplicationMessage
|
|
|
|
ctx context.Context
|
|
mu sync.Mutex
|
|
}
|
|
|
|
var _ pbpeering.PeeringService_StreamResourcesServer = (*mockStream)(nil)
|
|
|
|
func newTestReplicationStream(ctx context.Context) *mockStream {
|
|
return &mockStream{
|
|
sendCh: make(chan *pbpeering.ReplicationMessage, 1),
|
|
recvCh: make(chan *pbpeering.ReplicationMessage, 1),
|
|
ctx: ctx,
|
|
}
|
|
}
|
|
|
|
// Send implements pbpeering.PeeringService_StreamResourcesServer
|
|
func (s *mockStream) Send(r *pbpeering.ReplicationMessage) error {
|
|
s.sendCh <- r
|
|
return nil
|
|
}
|
|
|
|
// Recv implements pbpeering.PeeringService_StreamResourcesServer
|
|
func (s *mockStream) Recv() (*pbpeering.ReplicationMessage, error) {
|
|
r := <-s.recvCh
|
|
if r == nil {
|
|
return nil, io.EOF
|
|
}
|
|
return r, nil
|
|
}
|
|
|
|
// Context implements grpc.ServerStream and grpc.ClientStream
|
|
func (s *mockStream) Context() context.Context {
|
|
return s.ctx
|
|
}
|
|
|
|
// SendMsg implements grpc.ServerStream and grpc.ClientStream
|
|
func (s *mockStream) SendMsg(m interface{}) error {
|
|
return nil
|
|
}
|
|
|
|
// RecvMsg implements grpc.ServerStream and grpc.ClientStream
|
|
func (s *mockStream) RecvMsg(m interface{}) error {
|
|
return nil
|
|
}
|
|
|
|
// SetHeader implements grpc.ServerStream
|
|
func (s *mockStream) SetHeader(metadata.MD) error {
|
|
return nil
|
|
}
|
|
|
|
// SendHeader implements grpc.ServerStream
|
|
func (s *mockStream) SendHeader(metadata.MD) error {
|
|
return nil
|
|
}
|
|
|
|
// SetTrailer implements grpc.ServerStream
|
|
func (s *mockStream) SetTrailer(metadata.MD) {}
|
|
|
|
type incrementalTime struct {
|
|
base time.Time
|
|
next uint64
|
|
}
|
|
|
|
func (t *incrementalTime) Now() time.Time {
|
|
t.next++
|
|
return t.base.Add(time.Duration(t.next) * time.Second)
|
|
}
|
|
|
|
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
|
t.Helper()
|
|
if !t.Run(name, fn) {
|
|
t.FailNow()
|
|
}
|
|
}
|