452 lines
14 KiB
Go
452 lines
14 KiB
Go
package consul
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
gogrpc "google.golang.org/grpc"
|
|
|
|
grpc "github.com/hashicorp/consul/agent/grpc"
|
|
"github.com/hashicorp/consul/agent/grpc/resolver"
|
|
"github.com/hashicorp/consul/agent/router"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/proto/pbservice"
|
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
|
"github.com/hashicorp/consul/testrpc"
|
|
)
|
|
|
|
func TestSubscribeBackend_IntegrationWithServer_TLSEnabled(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
_, conf1 := testServerConfig(t)
|
|
conf1.VerifyIncoming = true
|
|
conf1.VerifyOutgoing = true
|
|
conf1.RPCConfig.EnableStreaming = true
|
|
configureTLS(conf1)
|
|
server, err := newServer(t, conf1)
|
|
require.NoError(t, err)
|
|
defer server.Shutdown()
|
|
|
|
client, builder := newClientWithGRPCResolver(t, configureTLS, clientConfigVerifyOutgoing)
|
|
|
|
// Try to join
|
|
testrpc.WaitForLeader(t, server.RPC, "dc1")
|
|
joinLAN(t, client, server)
|
|
testrpc.WaitForTestAgent(t, client.RPC, "dc1")
|
|
|
|
// Register a dummy node with our service on it.
|
|
{
|
|
req := &structs.RegisterRequest{
|
|
Node: "node1",
|
|
Address: "3.4.5.6",
|
|
Datacenter: "dc1",
|
|
Service: &structs.NodeService{
|
|
ID: "redis1",
|
|
Service: "redis",
|
|
Address: "3.4.5.6",
|
|
Port: 8080,
|
|
},
|
|
}
|
|
var out struct{}
|
|
require.NoError(t, server.RPC("Catalog.Register", &req, &out))
|
|
}
|
|
|
|
// Start a Subscribe call to our streaming endpoint from the client.
|
|
{
|
|
pool := grpc.NewClientConnPool(builder, grpc.TLSWrapper(client.tlsConfigurator.OutgoingRPCWrapper()), client.tlsConfigurator.UseTLS)
|
|
conn, err := pool.ClientConn("dc1")
|
|
require.NoError(t, err)
|
|
|
|
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
req := &pbsubscribe.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis"}
|
|
streamHandle, err := streamClient.Subscribe(ctx, req)
|
|
require.NoError(t, err)
|
|
|
|
// Start a goroutine to read updates off the pbsubscribe.
|
|
eventCh := make(chan *pbsubscribe.Event, 0)
|
|
go receiveSubscribeEvents(t, eventCh, streamHandle)
|
|
|
|
var snapshotEvents []*pbsubscribe.Event
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case event := <-eventCh:
|
|
snapshotEvents = append(snapshotEvents, event)
|
|
case <-time.After(3 * time.Second):
|
|
t.Fatalf("did not receive events past %d", len(snapshotEvents))
|
|
}
|
|
}
|
|
|
|
// Make sure the snapshot events come back with no issues.
|
|
require.Len(t, snapshotEvents, 2)
|
|
}
|
|
|
|
// Start a Subscribe call to our streaming endpoint from the server's loopback client.
|
|
{
|
|
|
|
pool := grpc.NewClientConnPool(builder, grpc.TLSWrapper(client.tlsConfigurator.OutgoingRPCWrapper()), client.tlsConfigurator.UseTLS)
|
|
conn, err := pool.ClientConn("dc1")
|
|
require.NoError(t, err)
|
|
|
|
retryFailedConn(t, conn)
|
|
|
|
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
req := &pbsubscribe.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis"}
|
|
streamHandle, err := streamClient.Subscribe(ctx, req)
|
|
require.NoError(t, err)
|
|
|
|
// Start a goroutine to read updates off the pbsubscribe.
|
|
eventCh := make(chan *pbsubscribe.Event, 0)
|
|
go receiveSubscribeEvents(t, eventCh, streamHandle)
|
|
|
|
var snapshotEvents []*pbsubscribe.Event
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case event := <-eventCh:
|
|
snapshotEvents = append(snapshotEvents, event)
|
|
case <-time.After(3 * time.Second):
|
|
t.Fatalf("did not receive events past %d", len(snapshotEvents))
|
|
}
|
|
}
|
|
|
|
// Make sure the snapshot events come back with no issues.
|
|
require.Len(t, snapshotEvents, 2)
|
|
}
|
|
}
|
|
|
|
// receiveSubscribeEvents and send them to the channel.
|
|
func receiveSubscribeEvents(t *testing.T, ch chan *pbsubscribe.Event, handle pbsubscribe.StateChangeSubscription_SubscribeClient) {
|
|
for {
|
|
event, err := handle.Recv()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "context deadline exceeded") ||
|
|
strings.Contains(err.Error(), "context canceled") {
|
|
break
|
|
}
|
|
t.Log(err)
|
|
}
|
|
ch <- event
|
|
}
|
|
}
|
|
|
|
func TestSubscribeBackend_IntegrationWithServer_TLSReload(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Set up a server with initially bad certificates.
|
|
_, conf1 := testServerConfig(t)
|
|
conf1.VerifyIncoming = true
|
|
conf1.VerifyOutgoing = true
|
|
conf1.CAFile = "../../test/ca/root.cer"
|
|
conf1.CertFile = "../../test/key/ssl-cert-snakeoil.pem"
|
|
conf1.KeyFile = "../../test/key/ssl-cert-snakeoil.key"
|
|
conf1.RPCConfig.EnableStreaming = true
|
|
|
|
server, err := newServer(t, conf1)
|
|
require.NoError(t, err)
|
|
defer server.Shutdown()
|
|
|
|
// Set up a client with valid certs and verify_outgoing = true
|
|
client, builder := newClientWithGRPCResolver(t, configureTLS, clientConfigVerifyOutgoing)
|
|
|
|
testrpc.WaitForLeader(t, server.RPC, "dc1")
|
|
|
|
// Subscribe calls should fail initially
|
|
joinLAN(t, client, server)
|
|
|
|
pool := grpc.NewClientConnPool(builder, grpc.TLSWrapper(client.tlsConfigurator.OutgoingRPCWrapper()), client.tlsConfigurator.UseTLS)
|
|
conn, err := pool.ClientConn("dc1")
|
|
require.NoError(t, err)
|
|
|
|
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
req := &pbsubscribe.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis"}
|
|
_, err = streamClient.Subscribe(ctx, req)
|
|
require.Error(t, err)
|
|
|
|
// Reload the server with valid certs
|
|
newConf := server.config.ToTLSUtilConfig()
|
|
newConf.CertFile = "../../test/key/ourdomain.cer"
|
|
newConf.KeyFile = "../../test/key/ourdomain.key"
|
|
server.tlsConfigurator.Update(newConf)
|
|
|
|
// Try the subscribe call again
|
|
retryFailedConn(t, conn)
|
|
|
|
streamClient = pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
|
_, err = streamClient.Subscribe(ctx, req)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func clientConfigVerifyOutgoing(config *Config) {
|
|
config.VerifyOutgoing = true
|
|
}
|
|
|
|
// retryFailedConn forces the ClientConn to reset its backoff timer and retry the connection,
|
|
// to simulate the client eventually retrying after the initial failure. This is used both to simulate
|
|
// retrying after an expected failure as well as to avoid flakiness when running many tests in parallel.
|
|
func retryFailedConn(t *testing.T, conn *gogrpc.ClientConn) {
|
|
state := conn.GetState()
|
|
if state.String() != "TRANSIENT_FAILURE" {
|
|
return
|
|
}
|
|
|
|
// If the connection has failed, retry and wait for a state change.
|
|
conn.ResetConnectBackoff()
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
require.True(t, conn.WaitForStateChange(ctx, state))
|
|
}
|
|
|
|
func TestSubscribeBackend_IntegrationWithServer_DeliversAllMessages(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for -short run")
|
|
}
|
|
// This is a fuzz/probabilistic test to try to provoke streaming into dropping
|
|
// messages. There is a bug in the initial implementation that should make
|
|
// this fail. While we can't be certain a pass means it's correct, it is
|
|
// useful for finding bugs in our concurrency design.
|
|
|
|
// The issue is that when updates are coming in fast such that updates occur
|
|
// in between us making the snapshot and beginning the stream updates, we
|
|
// shouldn't miss anything.
|
|
|
|
// To test this, we will run a background goroutine that will write updates as
|
|
// fast as possible while we then try to stream the results and ensure that we
|
|
// see every change. We'll make the updates monotonically increasing so we can
|
|
// easily tell if we missed one.
|
|
|
|
_, server := testServerWithConfig(t, func(c *Config) {
|
|
c.Datacenter = "dc1"
|
|
c.Bootstrap = true
|
|
c.RPCConfig.EnableStreaming = true
|
|
})
|
|
defer server.Shutdown()
|
|
codec := rpcClient(t, server)
|
|
defer codec.Close()
|
|
|
|
client, builder := newClientWithGRPCResolver(t)
|
|
|
|
// Try to join
|
|
testrpc.WaitForLeader(t, server.RPC, "dc1")
|
|
joinLAN(t, client, server)
|
|
testrpc.WaitForTestAgent(t, client.RPC, "dc1")
|
|
|
|
// Register a whole bunch of service instances so that the initial snapshot on
|
|
// subscribe is big enough to take a bit of time to load giving more
|
|
// opportunity for missed updates if there is a bug.
|
|
for i := 0; i < 1000; i++ {
|
|
req := &structs.RegisterRequest{
|
|
Node: fmt.Sprintf("node-redis-%03d", i),
|
|
Address: "3.4.5.6",
|
|
Datacenter: "dc1",
|
|
Service: &structs.NodeService{
|
|
ID: fmt.Sprintf("redis-%03d", i),
|
|
Service: "redis",
|
|
Port: 11211,
|
|
},
|
|
}
|
|
var out struct{}
|
|
require.NoError(t, server.RPC("Catalog.Register", &req, &out))
|
|
}
|
|
|
|
// Start background writer
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
go func() {
|
|
// Update the registration with a monotonically increasing port as fast as
|
|
// we can.
|
|
req := &structs.RegisterRequest{
|
|
Node: "node1",
|
|
Address: "3.4.5.6",
|
|
Datacenter: "dc1",
|
|
Service: &structs.NodeService{
|
|
ID: "redis-canary",
|
|
Service: "redis",
|
|
Port: 0,
|
|
},
|
|
}
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
var out struct{}
|
|
require.NoError(t, server.RPC("Catalog.Register", &req, &out))
|
|
req.Service.Port++
|
|
if req.Service.Port > 100 {
|
|
return
|
|
}
|
|
time.Sleep(1 * time.Millisecond)
|
|
}
|
|
}()
|
|
|
|
pool := grpc.NewClientConnPool(builder, grpc.TLSWrapper(client.tlsConfigurator.OutgoingRPCWrapper()), client.tlsConfigurator.UseTLS)
|
|
conn, err := pool.ClientConn("dc1")
|
|
require.NoError(t, err)
|
|
|
|
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
|
|
|
// Now start a whole bunch of streamers in parallel to maximise chance of
|
|
// catching a race.
|
|
n := 5
|
|
var wg sync.WaitGroup
|
|
var updateCount uint64
|
|
// Buffered error chan so that workers can exit and terminate wg without
|
|
// blocking on send. We collect errors this way since t isn't thread safe.
|
|
errCh := make(chan error, n)
|
|
for i := 0; i < n; i++ {
|
|
i := i
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
verifyMonotonicStreamUpdates(ctx, t, streamClient, i, &updateCount, errCh)
|
|
}()
|
|
}
|
|
|
|
// Wait until all subscribers have verified the first bunch of updates all got
|
|
// delivered.
|
|
wg.Wait()
|
|
|
|
close(errCh)
|
|
|
|
// Require that none of them errored. Since we closed the chan above this loop
|
|
// should terminate immediately if no errors were buffered.
|
|
for err := range errCh {
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// Sanity check that at least some non-snapshot messages were delivered. We
|
|
// can't know exactly how many because it's timing dependent based on when
|
|
// each subscribers snapshot occurs.
|
|
require.True(t, atomic.LoadUint64(&updateCount) > 0,
|
|
"at least some of the subscribers should have received non-snapshot updates")
|
|
}
|
|
|
|
func newClientWithGRPCResolver(t *testing.T, ops ...func(*Config)) (*Client, *resolver.ServerResolverBuilder) {
|
|
builder := resolver.NewServerResolverBuilder(resolver.Config{Authority: t.Name()})
|
|
resolver.Register(builder)
|
|
t.Cleanup(func() {
|
|
resolver.Deregister(builder.Authority())
|
|
})
|
|
|
|
_, config := testClientConfig(t)
|
|
for _, op := range ops {
|
|
op(config)
|
|
}
|
|
|
|
deps := newDefaultDeps(t, config)
|
|
deps.Router = router.NewRouter(
|
|
deps.Logger,
|
|
config.Datacenter,
|
|
fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter),
|
|
builder)
|
|
|
|
client, err := NewClient(config, deps)
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
client.Shutdown()
|
|
})
|
|
return client, builder
|
|
}
|
|
|
|
type testLogger interface {
|
|
Logf(format string, args ...interface{})
|
|
}
|
|
|
|
func verifyMonotonicStreamUpdates(ctx context.Context, logger testLogger, client pbsubscribe.StateChangeSubscriptionClient, i int, updateCount *uint64, errCh chan<- error) {
|
|
req := &pbsubscribe.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis"}
|
|
streamHandle, err := client.Subscribe(ctx, req)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "context deadline exceeded") ||
|
|
strings.Contains(err.Error(), "context canceled") {
|
|
logger.Logf("subscriber %05d: context cancelled before loop")
|
|
return
|
|
}
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
snapshotDone := false
|
|
expectPort := int32(0)
|
|
for {
|
|
event, err := streamHandle.Recv()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "context deadline exceeded") ||
|
|
strings.Contains(err.Error(), "context canceled") {
|
|
break
|
|
}
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
switch {
|
|
case event.GetEndOfSnapshot():
|
|
snapshotDone = true
|
|
logger.Logf("subscriber %05d: snapshot done, expect next port to be %d", i, expectPort)
|
|
case snapshotDone:
|
|
// Verify we get all updates in order
|
|
svc, err := svcOrErr(event)
|
|
if err != nil {
|
|
errCh <- err
|
|
return
|
|
}
|
|
if expectPort != svc.Port {
|
|
errCh <- fmt.Errorf("subscriber %05d: missed %d update(s)!", i, svc.Port-expectPort)
|
|
return
|
|
}
|
|
atomic.AddUint64(updateCount, 1)
|
|
logger.Logf("subscriber %05d: got event with correct port=%d", i, expectPort)
|
|
expectPort++
|
|
default:
|
|
// This is a snapshot update. Check if it's an update for the canary
|
|
// instance that got applied before our snapshot was sent (likely)
|
|
svc, err := svcOrErr(event)
|
|
if err != nil {
|
|
errCh <- err
|
|
return
|
|
}
|
|
if svc.ID == "redis-canary" {
|
|
// Update the expected port we see in the next update to be one more
|
|
// than the port in the snapshot.
|
|
expectPort = svc.Port + 1
|
|
logger.Logf("subscriber %05d: saw canary in snapshot with port %d", i, svc.Port)
|
|
}
|
|
}
|
|
if expectPort > 100 {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func svcOrErr(event *pbsubscribe.Event) (*pbservice.NodeService, error) {
|
|
health := event.GetServiceHealth()
|
|
if health == nil {
|
|
return nil, fmt.Errorf("not a health event: %#v", event)
|
|
}
|
|
csn := health.CheckServiceNode
|
|
if csn == nil {
|
|
return nil, fmt.Errorf("nil CSN: %#v", event)
|
|
}
|
|
if csn.Service == nil {
|
|
return nil, fmt.Errorf("nil service: %#v", event)
|
|
}
|
|
return csn.Service, nil
|
|
}
|