open-consul/agent/consul/server_test.go

2059 lines
58 KiB
Go
Raw Normal View History

2013-12-06 23:43:07 +00:00
package consul
import (
"context"
2022-09-01 17:32:11 +00:00
"crypto/tls"
"crypto/x509"
2013-12-07 01:18:09 +00:00
"fmt"
2013-12-31 23:44:27 +00:00
"net"
2013-12-06 23:43:07 +00:00
"os"
"reflect"
"strings"
"sync"
2017-06-26 08:46:20 +00:00
"sync/atomic"
2013-12-06 23:43:07 +00:00
"testing"
"time"
"github.com/armon/go-metrics"
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
"github.com/google/tcpproxy"
"github.com/hashicorp/consul/agent/hcp"
"github.com/hashicorp/go-hclog"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/hashicorp/consul/agent/connect"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/tlsutil"
2017-02-22 20:53:32 +00:00
"github.com/hashicorp/consul/types"
2013-12-06 23:43:07 +00:00
)
const (
TestDefaultInitialManagementToken = "d9f05e83-a7ae-47ce-839e-c0d53a68c00a"
)
// testTLSCertificates Generates a TLS CA and server key/cert and returns them
// in PEM encoded form.
func testTLSCertificates(serverName string) (cert string, key string, cacert string, err error) {
signer, _, err := tlsutil.GeneratePrivateKey()
if err != nil {
return "", "", "", err
}
ca, _, err := tlsutil.GenerateCA(tlsutil.CAOpts{Signer: signer})
if err != nil {
return "", "", "", err
}
cert, privateKey, err := tlsutil.GenerateCert(tlsutil.CertOpts{
Signer: signer,
CA: ca,
Name: "Test Cert Name",
Days: 365,
DNSNames: []string{serverName},
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
})
if err != nil {
return "", "", "", err
}
return cert, privateKey, ca, nil
}
func testServerACLConfig(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = TestDefaultInitialManagementToken
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
}
2014-04-07 21:36:32 +00:00
func configureTLS(config *Config) {
config.TLSConfig.InternalRPC.CAFile = "../../test/ca/root.cer"
config.TLSConfig.InternalRPC.CertFile = "../../test/key/ourdomain.cer"
config.TLSConfig.InternalRPC.KeyFile = "../../test/key/ourdomain.key"
2014-04-07 21:36:32 +00:00
}
var id int64
func uniqueNodeName(name string) string {
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
name = strings.ReplaceAll(name, "/", "_")
return fmt.Sprintf("%s-node-%d", name, atomic.AddInt64(&id, 1))
}
// This will find the leader of a list of servers and verify that leader establishment has completed
func waitForLeaderEstablishment(t *testing.T, servers ...*Server) {
t.Helper()
retry.Run(t, func(r *retry.R) {
hasLeader := false
for _, srv := range servers {
if srv.IsLeader() {
hasLeader = true
require.True(r, srv.isReadyForConsistentReads(), "Leader %s hasn't finished establishing leadership yet", srv.config.NodeName)
}
}
require.True(r, hasLeader, "Cluster has not elected a leader yet")
})
}
func testServerConfig(t *testing.T) (string, *Config) {
dir := testutil.TempDir(t, "consul")
2013-12-07 01:18:09 +00:00
config := DefaultConfig()
ports := freeport.GetN(t, 4) // {server, serf_lan, serf_wan, grpc}
config.NodeName = uniqueNodeName(t.Name())
config.Bootstrap = true
config.Datacenter = "dc1"
config.PrimaryDatacenter = "dc1"
2013-12-07 01:18:09 +00:00
config.DataDir = dir
2017-06-25 19:36:03 +00:00
// bind the rpc server to a random port. config.RPCAdvertise will be
// set to the listen address unless it was set in the configuration.
// In that case get the address from srv.Listener.Addr().
New config parser, HCL support, multiple bind addrs (#3480) * new config parser for agent This patch implements a new config parser for the consul agent which makes the following changes to the previous implementation: * add HCL support * all configuration fragments in tests and for default config are expressed as HCL fragments * HCL fragments can be provided on the command line so that they can eventually replace the command line flags. * HCL/JSON fragments are parsed into a temporary Config structure which can be merged using reflection (all values are pointers). The existing merge logic of overwrite for values and append for slices has been preserved. * A single builder process generates a typed runtime configuration for the agent. The new implementation is more strict and fails in the builder process if no valid runtime configuration can be generated. Therefore, additional validations in other parts of the code should be removed. The builder also pre-computes all required network addresses so that no address/port magic should be required where the configuration is used and should therefore be removed. * Upgrade github.com/hashicorp/hcl to support int64 * improve error messages * fix directory permission test * Fix rtt test * Fix ForceLeave test * Skip performance test for now until we know what to do * Update github.com/hashicorp/memberlist to update log prefix * Make memberlist use the default logger * improve config error handling * do not fail on non-existing data-dir * experiment with non-uniform timeouts to get a handle on stalled leader elections * Run tests for packages separately to eliminate the spurious port conflicts * refactor private address detection and unify approach for ipv4 and ipv6. Fixes #2825 * do not allow unix sockets for DNS * improve bind and advertise addr error handling * go through builder using test coverage * minimal update to the docs * more coverage tests fixed * more tests * fix makefile * cleanup * fix port conflicts with external port server 'porter' * stop test server on error * do not run api test that change global ENV concurrently with the other tests * Run remaining api tests concurrently * no need for retry with the port number service * monkey patch race condition in go-sockaddr until we understand why that fails * monkey patch hcl decoder race condidtion until we understand why that fails * monkey patch spurious errors in strings.EqualFold from here * add test for hcl decoder race condition. Run with go test -parallel 128 * Increase timeout again * cleanup * don't log port allocations by default * use base command arg parsing to format help output properly * handle -dc deprecation case in Build * switch autopilot.max_trailing_logs to int * remove duplicate test case * remove unused methods * remove comments about flag/config value inconsistencies * switch got and want around since the error message was misleading. * Removes a stray debug log. * Removes a stray newline in imports. * Fixes TestACL_Version8. * Runs go fmt. * Adds a default case for unknown address types. * Reoders and reformats some imports. * Adds some comments and fixes typos. * Reorders imports. * add unix socket support for dns later * drop all deprecated flags and arguments * fix wrong field name * remove stray node-id file * drop unnecessary patch section in test * drop duplicate test * add test for LeaveOnTerm and SkipLeaveOnInt in client mode * drop "bla" and add clarifying comment for the test * split up tests to support enterprise/non-enterprise tests * drop raft multiplier and derive values during build phase * sanitize runtime config reflectively and add test * detect invalid config fields * fix tests with invalid config fields * use different values for wan sanitiziation test * drop recursor in favor of recursors * allow dns_config.udp_answer_limit to be zero * make sure tests run on machines with multiple ips * Fix failing tests in a few more places by providing a bind address in the test * Gets rid of skipped TestAgent_CheckPerformanceSettings and adds case for builder. * Add porter to server_test.go to make tests there less flaky * go fmt
2017-09-25 18:40:42 +00:00
config.RPCAddr = &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: ports[0]}
2017-06-25 19:36:03 +00:00
2017-02-22 20:53:32 +00:00
nodeID, err := uuid.GenerateUUID()
if err != nil {
t.Fatal(err)
}
config.NodeID = types.NodeID(nodeID)
2017-06-25 19:36:03 +00:00
// set the memberlist bind port to 0 to bind to a random port.
// memberlist will update the value of BindPort after bind
// to the actual value.
2013-12-07 01:18:09 +00:00
config.SerfLANConfig.MemberlistConfig.BindAddr = "127.0.0.1"
New config parser, HCL support, multiple bind addrs (#3480) * new config parser for agent This patch implements a new config parser for the consul agent which makes the following changes to the previous implementation: * add HCL support * all configuration fragments in tests and for default config are expressed as HCL fragments * HCL fragments can be provided on the command line so that they can eventually replace the command line flags. * HCL/JSON fragments are parsed into a temporary Config structure which can be merged using reflection (all values are pointers). The existing merge logic of overwrite for values and append for slices has been preserved. * A single builder process generates a typed runtime configuration for the agent. The new implementation is more strict and fails in the builder process if no valid runtime configuration can be generated. Therefore, additional validations in other parts of the code should be removed. The builder also pre-computes all required network addresses so that no address/port magic should be required where the configuration is used and should therefore be removed. * Upgrade github.com/hashicorp/hcl to support int64 * improve error messages * fix directory permission test * Fix rtt test * Fix ForceLeave test * Skip performance test for now until we know what to do * Update github.com/hashicorp/memberlist to update log prefix * Make memberlist use the default logger * improve config error handling * do not fail on non-existing data-dir * experiment with non-uniform timeouts to get a handle on stalled leader elections * Run tests for packages separately to eliminate the spurious port conflicts * refactor private address detection and unify approach for ipv4 and ipv6. Fixes #2825 * do not allow unix sockets for DNS * improve bind and advertise addr error handling * go through builder using test coverage * minimal update to the docs * more coverage tests fixed * more tests * fix makefile * cleanup * fix port conflicts with external port server 'porter' * stop test server on error * do not run api test that change global ENV concurrently with the other tests * Run remaining api tests concurrently * no need for retry with the port number service * monkey patch race condition in go-sockaddr until we understand why that fails * monkey patch hcl decoder race condidtion until we understand why that fails * monkey patch spurious errors in strings.EqualFold from here * add test for hcl decoder race condition. Run with go test -parallel 128 * Increase timeout again * cleanup * don't log port allocations by default * use base command arg parsing to format help output properly * handle -dc deprecation case in Build * switch autopilot.max_trailing_logs to int * remove duplicate test case * remove unused methods * remove comments about flag/config value inconsistencies * switch got and want around since the error message was misleading. * Removes a stray debug log. * Removes a stray newline in imports. * Fixes TestACL_Version8. * Runs go fmt. * Adds a default case for unknown address types. * Reoders and reformats some imports. * Adds some comments and fixes typos. * Reorders imports. * add unix socket support for dns later * drop all deprecated flags and arguments * fix wrong field name * remove stray node-id file * drop unnecessary patch section in test * drop duplicate test * add test for LeaveOnTerm and SkipLeaveOnInt in client mode * drop "bla" and add clarifying comment for the test * split up tests to support enterprise/non-enterprise tests * drop raft multiplier and derive values during build phase * sanitize runtime config reflectively and add test * detect invalid config fields * fix tests with invalid config fields * use different values for wan sanitiziation test * drop recursor in favor of recursors * allow dns_config.udp_answer_limit to be zero * make sure tests run on machines with multiple ips * Fix failing tests in a few more places by providing a bind address in the test * Gets rid of skipped TestAgent_CheckPerformanceSettings and adds case for builder. * Add porter to server_test.go to make tests there less flaky * go fmt
2017-09-25 18:40:42 +00:00
config.SerfLANConfig.MemberlistConfig.BindPort = ports[1]
config.SerfLANConfig.MemberlistConfig.AdvertisePort = ports[1]
config.SerfLANConfig.MemberlistConfig.SuspicionMult = 2
config.SerfLANConfig.MemberlistConfig.ProbeTimeout = 50 * time.Millisecond
config.SerfLANConfig.MemberlistConfig.ProbeInterval = 100 * time.Millisecond
2013-12-11 22:57:40 +00:00
config.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
2019-05-15 18:59:33 +00:00
config.SerfLANConfig.MemberlistConfig.DeadNodeReclaimTime = 100 * time.Millisecond
2013-12-11 22:57:40 +00:00
2013-12-07 01:18:09 +00:00
config.SerfWANConfig.MemberlistConfig.BindAddr = "127.0.0.1"
New config parser, HCL support, multiple bind addrs (#3480) * new config parser for agent This patch implements a new config parser for the consul agent which makes the following changes to the previous implementation: * add HCL support * all configuration fragments in tests and for default config are expressed as HCL fragments * HCL fragments can be provided on the command line so that they can eventually replace the command line flags. * HCL/JSON fragments are parsed into a temporary Config structure which can be merged using reflection (all values are pointers). The existing merge logic of overwrite for values and append for slices has been preserved. * A single builder process generates a typed runtime configuration for the agent. The new implementation is more strict and fails in the builder process if no valid runtime configuration can be generated. Therefore, additional validations in other parts of the code should be removed. The builder also pre-computes all required network addresses so that no address/port magic should be required where the configuration is used and should therefore be removed. * Upgrade github.com/hashicorp/hcl to support int64 * improve error messages * fix directory permission test * Fix rtt test * Fix ForceLeave test * Skip performance test for now until we know what to do * Update github.com/hashicorp/memberlist to update log prefix * Make memberlist use the default logger * improve config error handling * do not fail on non-existing data-dir * experiment with non-uniform timeouts to get a handle on stalled leader elections * Run tests for packages separately to eliminate the spurious port conflicts * refactor private address detection and unify approach for ipv4 and ipv6. Fixes #2825 * do not allow unix sockets for DNS * improve bind and advertise addr error handling * go through builder using test coverage * minimal update to the docs * more coverage tests fixed * more tests * fix makefile * cleanup * fix port conflicts with external port server 'porter' * stop test server on error * do not run api test that change global ENV concurrently with the other tests * Run remaining api tests concurrently * no need for retry with the port number service * monkey patch race condition in go-sockaddr until we understand why that fails * monkey patch hcl decoder race condidtion until we understand why that fails * monkey patch spurious errors in strings.EqualFold from here * add test for hcl decoder race condition. Run with go test -parallel 128 * Increase timeout again * cleanup * don't log port allocations by default * use base command arg parsing to format help output properly * handle -dc deprecation case in Build * switch autopilot.max_trailing_logs to int * remove duplicate test case * remove unused methods * remove comments about flag/config value inconsistencies * switch got and want around since the error message was misleading. * Removes a stray debug log. * Removes a stray newline in imports. * Fixes TestACL_Version8. * Runs go fmt. * Adds a default case for unknown address types. * Reoders and reformats some imports. * Adds some comments and fixes typos. * Reorders imports. * add unix socket support for dns later * drop all deprecated flags and arguments * fix wrong field name * remove stray node-id file * drop unnecessary patch section in test * drop duplicate test * add test for LeaveOnTerm and SkipLeaveOnInt in client mode * drop "bla" and add clarifying comment for the test * split up tests to support enterprise/non-enterprise tests * drop raft multiplier and derive values during build phase * sanitize runtime config reflectively and add test * detect invalid config fields * fix tests with invalid config fields * use different values for wan sanitiziation test * drop recursor in favor of recursors * allow dns_config.udp_answer_limit to be zero * make sure tests run on machines with multiple ips * Fix failing tests in a few more places by providing a bind address in the test * Gets rid of skipped TestAgent_CheckPerformanceSettings and adds case for builder. * Add porter to server_test.go to make tests there less flaky * go fmt
2017-09-25 18:40:42 +00:00
config.SerfWANConfig.MemberlistConfig.BindPort = ports[2]
config.SerfWANConfig.MemberlistConfig.AdvertisePort = ports[2]
config.SerfWANConfig.MemberlistConfig.SuspicionMult = 2
config.SerfWANConfig.MemberlistConfig.ProbeTimeout = 50 * time.Millisecond
config.SerfWANConfig.MemberlistConfig.ProbeInterval = 100 * time.Millisecond
2013-12-11 22:57:40 +00:00
config.SerfWANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
2019-05-15 18:59:33 +00:00
config.SerfWANConfig.MemberlistConfig.DeadNodeReclaimTime = 100 * time.Millisecond
2013-12-11 22:57:40 +00:00
config.RaftConfig.LeaderLeaseTimeout = 100 * time.Millisecond
config.RaftConfig.HeartbeatTimeout = 200 * time.Millisecond
config.RaftConfig.ElectionTimeout = 200 * time.Millisecond
2013-12-07 01:18:09 +00:00
config.ReconcileInterval = 300 * time.Millisecond
2015-05-14 01:22:34 +00:00
2017-03-21 23:36:44 +00:00
config.AutopilotConfig.ServerStabilizationTime = 100 * time.Millisecond
config.ServerHealthInterval = 50 * time.Millisecond
config.AutopilotInterval = 100 * time.Millisecond
config.CoordinateUpdatePeriod = 100 * time.Millisecond
config.LeaveDrainTime = 1 * time.Millisecond
// TODO (slackpad) - We should be able to run all tests w/o this, but it
// looks like several depend on it.
config.RPCHoldTimeout = 10 * time.Second
config.GRPCPort = ports[3]
2018-04-27 06:02:18 +00:00
config.ConnectEnabled = true
config.CAConfig = &structs.CAConfiguration{
ClusterID: connect.TestClusterID,
Provider: structs.ConsulCAProvider,
Config: map[string]interface{}{
"PrivateKey": "",
"RootCert": "",
"LeafCertTTL": "72h",
"IntermediateCertTTL": "288h",
},
}
config.PeeringEnabled = true
return dir, config
}
2014-01-10 19:07:29 +00:00
// Deprecated: use testServerWithConfig instead. It does the same thing and more.
func testServer(t *testing.T) (string, *Server) {
return testServerWithConfig(t)
}
// Deprecated: use testServerWithConfig
func testServerDC(t *testing.T, dc string) (string, *Server) {
2017-06-26 08:44:36 +00:00
return testServerWithConfig(t, func(c *Config) {
c.Datacenter = dc
c.Bootstrap = true
})
}
// Deprecated: use testServerWithConfig
func testServerDCBootstrap(t *testing.T, dc string, bootstrap bool) (string, *Server) {
2017-06-26 08:44:36 +00:00
return testServerWithConfig(t, func(c *Config) {
c.Datacenter = dc
c.PrimaryDatacenter = dc
2017-06-26 08:44:36 +00:00
c.Bootstrap = bootstrap
})
2013-12-07 01:18:09 +00:00
}
// Deprecated: use testServerWithConfig
func testServerDCExpect(t *testing.T, dc string, expect int) (string, *Server) {
2017-06-26 08:44:36 +00:00
return testServerWithConfig(t, func(c *Config) {
c.Datacenter = dc
c.Bootstrap = false
c.BootstrapExpect = expect
})
}
func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *Server) {
var dir string
2019-07-12 15:52:26 +00:00
var srv *Server
2022-09-01 17:32:11 +00:00
var config *Config
var deps Deps
2019-07-12 15:52:26 +00:00
// Retry added to avoid cases where bind addr is already in use
retry.RunWith(retry.ThreeTimes(), t, func(r *retry.R) {
dir, config = testServerConfig(t)
for _, fn := range configOpts {
fn(config)
}
// Apply config to copied fields because many tests only set the old
// values.
config.ACLResolverSettings.ACLsEnabled = config.ACLsEnabled
config.ACLResolverSettings.NodeName = config.NodeName
config.ACLResolverSettings.Datacenter = config.Datacenter
config.ACLResolverSettings.EnterpriseMeta = *config.AgentEnterpriseMeta()
2020-07-29 20:05:51 +00:00
var err error
2022-09-01 17:32:11 +00:00
deps = newDefaultDeps(t, config)
srv, err = newServerWithDeps(t, config, deps)
2019-07-12 15:52:26 +00:00
if err != nil {
r.Fatalf("err: %v", err)
}
})
t.Cleanup(func() { srv.Shutdown() })
for _, grpcPort := range []int{srv.config.GRPCPort, srv.config.GRPCTLSPort} {
if grpcPort == 0 {
continue
}
// Normally the gRPC server listener is created at the agent level and
// passed down into the Server creation.
ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", grpcPort))
require.NoError(t, err)
2022-09-01 17:32:11 +00:00
if grpcPort == srv.config.GRPCTLSPort || deps.TLSConfigurator.GRPCServerUseTLS() {
// Set the internally managed server certificate. The cert manager is hooked to the Agent, so we need to bypass that here.
if srv.config.PeeringEnabled && srv.config.ConnectEnabled {
key, _ := srv.config.CAConfig.Config["PrivateKey"].(string)
cert, _ := srv.config.CAConfig.Config["RootCert"].(string)
if key != "" && cert != "" {
ca := &structs.CARoot{
SigningKey: key,
RootCert: cert,
}
require.NoError(t, deps.TLSConfigurator.UpdateAutoTLSCert(connect.TestServerLeaf(t, srv.config.Datacenter, ca)))
deps.TLSConfigurator.UpdateAutoTLSPeeringServerName(connect.PeeringServerSAN("dc1", connect.TestTrustDomain))
}
}
// Wrap the listener with TLS.
2022-09-01 17:32:11 +00:00
ln = tls.NewListener(ln, deps.TLSConfigurator.IncomingGRPCConfig())
}
go func() {
2022-09-01 17:32:11 +00:00
_ = srv.externalGRPCServer.Serve(ln)
}()
2022-09-01 17:32:11 +00:00
t.Cleanup(srv.externalGRPCServer.Stop)
}
2017-06-26 08:46:20 +00:00
return dir, srv
2014-08-11 21:01:45 +00:00
}
// cb is a function that can alter the test servers configuration prior to the server starting.
func testACLServerWithConfig(t *testing.T, cb func(*Config), initReplicationToken bool) (string, *Server, rpc.ClientCodec) {
opts := []func(*Config){testServerACLConfig}
if cb != nil {
opts = append(opts, cb)
}
dir, srv := testServerWithConfig(t, opts...)
if initReplicationToken {
// setup some tokens here so we get less warnings in the logs
srv.tokens.UpdateReplicationToken(TestDefaultInitialManagementToken, token.TokenSourceConfig)
}
codec := rpcClient(t, srv)
return dir, srv, codec
}
func testGRPCIntegrationServer(t *testing.T, cb func(*Config)) (*Server, *grpc.ClientConn, rpc.ClientCodec) {
_, srv, codec := testACLServerWithConfig(t, cb, false)
grpcAddr := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure())
require.NoError(t, err)
t.Cleanup(func() { _ = conn.Close() })
return srv, conn, codec
}
2020-07-29 20:05:51 +00:00
func newServer(t *testing.T, c *Config) (*Server, error) {
return newServerWithDeps(t, c, newDefaultDeps(t, c))
}
func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) {
2017-06-25 19:36:03 +00:00
// chain server up notification
oldNotify := c.NotifyListen
up := make(chan struct{})
c.NotifyListen = func() {
close(up)
if oldNotify != nil {
oldNotify()
}
}
2022-09-01 17:32:11 +00:00
grpcServer := external.NewServer(deps.Logger.Named("grpc.external"))
srv, err := NewServer(c, deps, grpcServer)
2017-06-25 19:36:03 +00:00
if err != nil {
return nil, err
}
t.Cleanup(func() { srv.Shutdown() })
2017-06-25 19:36:03 +00:00
// wait until after listen
<-up
// get the real address
//
// the server already sets the RPCAdvertise address
// if it wasn't configured since it needs it for
// some initialization
//
// todo(fs): setting RPCAddr should probably be guarded
// todo(fs): but for now it is a shortcut to avoid fixing
// todo(fs): tests which depend on that value. They should
// todo(fs): just get the listener address instead.
c.RPCAddr = srv.Listener.Addr().(*net.TCPAddr)
return srv, nil
}
2013-12-06 23:43:07 +00:00
func TestServer_StartStop(t *testing.T) {
t.Parallel()
2017-03-24 03:04:23 +00:00
// Start up a server and then stop it.
_, s1 := testServer(t)
2017-03-24 03:04:23 +00:00
if err := s1.Shutdown(); err != nil {
2013-12-06 23:43:07 +00:00
t.Fatalf("err: %v", err)
}
2017-03-24 03:04:23 +00:00
// Shut down again, which should be idempotent.
if err := s1.Shutdown(); err != nil {
2013-12-06 23:43:07 +00:00
t.Fatalf("err: %v", err)
}
}
2013-12-07 01:18:09 +00:00
func TestServer_fixupACLDatacenter(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
_, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "aye"
c.PrimaryDatacenter = "aye"
c.ACLsEnabled = true
})
defer s1.Shutdown()
_, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "bee"
c.PrimaryDatacenter = "aye"
c.ACLsEnabled = true
})
defer s2.Shutdown()
// Try to join
joinWAN(t, s2, s1)
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.WANMembers()), 2; got != want {
r.Fatalf("got %d s1 WAN members want %d", got, want)
}
if got, want := len(s2.WANMembers()), 2; got != want {
r.Fatalf("got %d s2 WAN members want %d", got, want)
}
})
testrpc.WaitForLeader(t, s1.RPC, "aye")
testrpc.WaitForLeader(t, s2.RPC, "bee")
require.Equal(t, "aye", s1.config.Datacenter)
require.Equal(t, "aye", s1.config.PrimaryDatacenter)
require.Equal(t, "aye", s1.config.PrimaryDatacenter)
require.Equal(t, "bee", s2.config.Datacenter)
require.Equal(t, "aye", s2.config.PrimaryDatacenter)
require.Equal(t, "aye", s2.config.PrimaryDatacenter)
}
2013-12-07 01:18:09 +00:00
func TestServer_JoinLAN(t *testing.T) {
t.Parallel()
2013-12-07 01:18:09 +00:00
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServer(t)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join
2017-05-05 10:29:49 +00:00
joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d s1 LAN members want %d", got, want)
}
if got, want := len(s2.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d s2 LAN members want %d", got, want)
}
})
2013-12-07 01:18:09 +00:00
}
// TestServer_JoinLAN_SerfAllowedCIDRs test that IPs might be blocked with
// Serf.
//
// To run properly, this test requires to be able to bind and have access on
// 127.0.1.1 which is the case for most Linux machines and Windows, so Unit
// test will run in the CI.
//
// To run it on Mac OS, please run this command first, otherwise the test will
// be skipped: `sudo ifconfig lo0 alias 127.0.1.1 up`
func TestServer_JoinLAN_SerfAllowedCIDRs(t *testing.T) {
t.Parallel()
const targetAddr = "127.0.1.1"
skipIfCannotBindToIP(t, targetAddr)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.BootstrapExpect = 1
lan, err := memberlist.ParseCIDRs([]string{"127.0.0.1/32"})
require.NoError(t, err)
c.SerfLANConfig.MemberlistConfig.CIDRsAllowed = lan
wan, err := memberlist.ParseCIDRs([]string{"127.0.0.0/24", "::1/128"})
require.NoError(t, err)
c.SerfWANConfig.MemberlistConfig.CIDRsAllowed = wan
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, a2 := testClientWithConfig(t, func(c *Config) {
c.SerfLANConfig.MemberlistConfig.BindAddr = targetAddr
})
defer os.RemoveAll(dir2)
defer a2.Shutdown()
dir3, rs3 := testServerWithConfig(t, func(c *Config) {
c.BootstrapExpect = 1
c.Datacenter = "dc2"
})
defer os.RemoveAll(dir3)
defer rs3.Shutdown()
leaderAddr := joinAddrLAN(s1)
if _, err := a2.JoinLAN([]string{leaderAddr}, nil); err != nil {
t.Fatalf("Expected no error, had: %#v", err)
}
// Try to join
joinWAN(t, rs3, s1)
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.LANMembersInAgentPartition()), 1; got != want {
// LAN is blocked, should be 1 only
r.Fatalf("got %d s1 LAN members want %d", got, want)
}
if got, want := len(a2.LANMembersInAgentPartition()), 2; got != want {
// LAN is blocked a2 can see s1, but not s1
r.Fatalf("got %d a2 LAN members want %d", got, want)
}
if got, want := len(s1.WANMembers()), 2; got != want {
r.Fatalf("got %d s1 WAN members want %d", got, want)
}
if got, want := len(rs3.WANMembers()), 2; got != want {
r.Fatalf("got %d rs3 WAN members want %d", got, want)
}
})
}
// TestServer_JoinWAN_SerfAllowedCIDRs test that IPs might be
// blocked with Serf.
//
// To run properly, this test requires to be able to bind and have access on
// 127.0.1.1 which is the case for most Linux machines and Windows, so Unit
// test will run in the CI.
//
// To run it on Mac OS, please run this command first, otherwise the test will
// be skipped: `sudo ifconfig lo0 alias 127.0.1.1 up`
func TestServer_JoinWAN_SerfAllowedCIDRs(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
const targetAddr = "127.0.1.1"
skipIfCannotBindToIP(t, targetAddr)
wanCIDRs, err := memberlist.ParseCIDRs([]string{"127.0.0.1/32"})
require.NoError(t, err)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = true
c.BootstrapExpect = 1
c.Datacenter = "dc1"
c.SerfWANConfig.MemberlistConfig.CIDRsAllowed = wanCIDRs
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = true
c.BootstrapExpect = 1
c.PrimaryDatacenter = "dc1"
c.Datacenter = "dc2"
c.SerfWANConfig.MemberlistConfig.BindAddr = targetAddr
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
waitForLeaderEstablishment(t, s2)
testrpc.WaitForLeader(t, s2.RPC, "dc2")
// Joining should be fine
joinWANWithNoMembershipChecks(t, s2, s1)
// But membership is blocked if you go and take a peek on the server.
t.Run("LAN membership should only show each other", func(t *testing.T) {
require.Len(t, s1.LANMembersInAgentPartition(), 1)
require.Len(t, s2.LANMembersInAgentPartition(), 1)
})
t.Run("WAN membership in the primary should not show the secondary", func(t *testing.T) {
require.Len(t, s1.WANMembers(), 1)
})
t.Run("WAN membership in the secondary can show the primary", func(t *testing.T) {
require.Len(t, s2.WANMembers(), 2)
})
}
func skipIfCannotBindToIP(t *testing.T, ip string) {
l, err := net.Listen("tcp", net.JoinHostPort(ip, "0"))
if err != nil {
t.Skipf("Cannot bind on %s, to run on Mac OS: `sudo ifconfig lo0 alias %s up`", ip, ip)
}
l.Close()
}
func TestServer_LANReap(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
configureServer := func(c *Config) {
c.SerfFloodInterval = 100 * time.Millisecond
c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond
c.SerfLANConfig.TombstoneTimeout = 250 * time.Millisecond
c.SerfLANConfig.ReapInterval = 300 * time.Millisecond
}
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
configureServer(c)
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
configureServer(c)
})
defer os.RemoveAll(dir2)
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
configureServer(c)
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
testrpc.WaitForLeader(t, s3.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.LANMembersInAgentPartition(), 3)
require.Len(r, s2.LANMembersInAgentPartition(), 3)
require.Len(r, s3.LANMembersInAgentPartition(), 3)
})
// Check the router has both
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.serverLookup.Servers(), 3)
require.Len(r, s2.serverLookup.Servers(), 3)
require.Len(r, s3.serverLookup.Servers(), 3)
})
// shutdown the second dc
s2.Shutdown()
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.LANMembersInAgentPartition(), 2)
servers := s1.serverLookup.Servers()
require.Len(r, servers, 2)
// require.Equal(r, s1.config.NodeName, servers[0].Name)
})
}
2013-12-07 01:18:09 +00:00
func TestServer_JoinWAN(t *testing.T) {
t.Parallel()
2013-12-07 01:18:09 +00:00
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
2013-12-12 00:24:34 +00:00
dir2, s2 := testServerDC(t, "dc2")
2013-12-07 01:18:09 +00:00
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join
2017-05-05 10:29:49 +00:00
joinWAN(t, s2, s1)
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.WANMembers()), 2; got != want {
r.Fatalf("got %d s1 WAN members want %d", got, want)
}
if got, want := len(s2.WANMembers()), 2; got != want {
r.Fatalf("got %d s2 WAN members want %d", got, want)
}
})
2013-12-12 00:24:34 +00:00
// Check the router has both
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.router.GetDatacenters()), 2; got != want {
r.Fatalf("got %d routes want %d", got, want)
}
if got, want := len(s2.router.GetDatacenters()), 2; got != want {
r.Fatalf("got %d datacenters want %d", got, want)
}
})
2013-12-07 01:18:09 +00:00
}
func TestServer_WANReap(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
c.SerfFloodInterval = 100 * time.Millisecond
c.SerfWANConfig.ReconnectTimeout = 250 * time.Millisecond
c.SerfWANConfig.TombstoneTimeout = 250 * time.Millisecond
c.SerfWANConfig.ReapInterval = 500 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDC(t, "dc2")
defer os.RemoveAll(dir2)
// Try to join
joinWAN(t, s2, s1)
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.WANMembers(), 2)
require.Len(r, s2.WANMembers(), 2)
})
// Check the router has both
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.router.GetDatacenters(), 2)
require.Len(r, s2.router.GetDatacenters(), 2)
})
// shutdown the second dc
s2.Shutdown()
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.WANMembers(), 1)
datacenters := s1.router.GetDatacenters()
require.Len(r, datacenters, 1)
require.Equal(r, "dc1", datacenters[0])
})
}
2017-03-15 19:26:54 +00:00
func TestServer_JoinWAN_Flood(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
2017-03-15 19:26:54 +00:00
// Set up two servers in a WAN.
dir1, s1 := testServerDCBootstrap(t, "dc1", true)
2017-03-15 19:26:54 +00:00
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc2", true)
2017-03-15 19:26:54 +00:00
defer os.RemoveAll(dir2)
defer s2.Shutdown()
2017-05-05 10:29:49 +00:00
joinWAN(t, s2, s1)
2017-03-15 19:26:54 +00:00
for _, s := range []*Server{s1, s2} {
retry.Run(t, func(r *retry.R) {
if got, want := len(s.WANMembers()), 2; got != want {
r.Fatalf("got %d WAN members want %d", got, want)
}
})
2017-03-20 23:23:40 +00:00
}
2017-03-15 19:26:54 +00:00
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
2017-03-15 19:26:54 +00:00
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Do just a LAN join for the new server and make sure it
// shows up in the WAN.
2017-05-05 10:29:49 +00:00
joinLAN(t, s3, s1)
2017-03-15 19:26:54 +00:00
for _, s := range []*Server{s1, s2, s3} {
retry.Run(t, func(r *retry.R) {
if got, want := len(s.WANMembers()), 3; got != want {
r.Fatalf("got %d WAN members for %s want %d", got, s.config.NodeName, want)
}
})
2017-03-20 23:23:40 +00:00
}
2017-03-15 19:26:54 +00:00
}
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
// This is a mirror of a similar test in agent/agent_test.go
func TestServer_JoinWAN_viaMeshGateway(t *testing.T) {
// if this test is failing because of expired certificates
// use the procedure in test/CA-GENERATION.md
if testing.Short() {
t.Skip("too slow for testing.Short")
}
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
t.Parallel()
port := freeport.GetOne(t)
gwAddr := ipaddr.FormatAddressPort("127.0.0.1", port)
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.TLSConfig.Domain = "consul"
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
c.NodeName = "bob"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.Bootstrap = true
// tls
c.TLSConfig.InternalRPC.CAFile = "../../test/hostname/CertAuth.crt"
c.TLSConfig.InternalRPC.CertFile = "../../test/hostname/Bob.crt"
c.TLSConfig.InternalRPC.KeyFile = "../../test/hostname/Bob.key"
c.TLSConfig.InternalRPC.VerifyIncoming = true
c.TLSConfig.InternalRPC.VerifyOutgoing = true
c.TLSConfig.InternalRPC.VerifyServerHostname = true
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
// wanfed
c.ConnectMeshGatewayWANFederationEnabled = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.TLSConfig.Domain = "consul"
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
c.NodeName = "betty"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.Bootstrap = true
// tls
c.TLSConfig.InternalRPC.CAFile = "../../test/hostname/CertAuth.crt"
c.TLSConfig.InternalRPC.CertFile = "../../test/hostname/Betty.crt"
c.TLSConfig.InternalRPC.KeyFile = "../../test/hostname/Betty.key"
c.TLSConfig.InternalRPC.VerifyIncoming = true
c.TLSConfig.InternalRPC.VerifyOutgoing = true
c.TLSConfig.InternalRPC.VerifyServerHostname = true
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
// wanfed
c.ConnectMeshGatewayWANFederationEnabled = true
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.TLSConfig.Domain = "consul"
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
c.NodeName = "bonnie"
c.Datacenter = "dc3"
c.PrimaryDatacenter = "dc1"
c.Bootstrap = true
// tls
c.TLSConfig.InternalRPC.CAFile = "../../test/hostname/CertAuth.crt"
c.TLSConfig.InternalRPC.CertFile = "../../test/hostname/Bonnie.crt"
c.TLSConfig.InternalRPC.KeyFile = "../../test/hostname/Bonnie.key"
c.TLSConfig.InternalRPC.VerifyIncoming = true
c.TLSConfig.InternalRPC.VerifyOutgoing = true
c.TLSConfig.InternalRPC.VerifyServerHostname = true
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
// wanfed
c.ConnectMeshGatewayWANFederationEnabled = true
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// We'll use the same gateway for all datacenters since it doesn't care.
var p tcpproxy.Proxy
p.AddSNIRoute(gwAddr, "bob.server.dc1.consul", tcpproxy.To(s1.config.RPCAddr.String()))
p.AddSNIRoute(gwAddr, "betty.server.dc2.consul", tcpproxy.To(s2.config.RPCAddr.String()))
p.AddSNIRoute(gwAddr, "bonnie.server.dc3.consul", tcpproxy.To(s3.config.RPCAddr.String()))
p.AddStopACMESearch(gwAddr)
require.NoError(t, p.Start())
defer func() {
p.Close()
p.Wait()
}()
t.Logf("routing %s => %s", "bob.server.dc1.consul", s1.config.RPCAddr.String())
t.Logf("routing %s => %s", "betty.server.dc2.consul", s2.config.RPCAddr.String())
t.Logf("routing %s => %s", "bonnie.server.dc3.consul", s3.config.RPCAddr.String())
// Register this into the catalog in dc1.
{
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "bob",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway",
Service: "mesh-gateway",
Meta: map[string]string{structs.MetaWANFederationKey: "1"},
Port: port,
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
},
}
var out struct{}
require.NoError(t, s1.RPC("Catalog.Register", &arg, &out))
}
// Wait for it to make it into the gateway locator.
retry.Run(t, func(r *retry.R) {
require.NotEmpty(r, s1.gatewayLocator.PickGateway("dc1"))
})
// Seed the secondaries with the address of the primary and wait for that to
// be in their locators.
s2.RefreshPrimaryGatewayFallbackAddresses([]string{gwAddr})
retry.Run(t, func(r *retry.R) {
require.NotEmpty(r, s2.gatewayLocator.PickGateway("dc1"))
})
s3.RefreshPrimaryGatewayFallbackAddresses([]string{gwAddr})
retry.Run(t, func(r *retry.R) {
require.NotEmpty(r, s3.gatewayLocator.PickGateway("dc1"))
})
// Try to join from secondary to primary. We can't use joinWAN() because we
// are simulating proper bootstrapping and if ACLs were on we would have to
// delay gateway registration in the secondary until after one directional
// join. So this way we explicitly join secondary-to-primary as a standalone
// operation and follow it up later with a full join.
_, err := s2.JoinWAN([]string{joinAddrWAN(s1)})
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
if got, want := len(s2.WANMembers()), 2; got != want {
r.Fatalf("got %d s2 WAN members want %d", got, want)
}
})
_, err = s3.JoinWAN([]string{joinAddrWAN(s1)})
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
if got, want := len(s3.WANMembers()), 3; got != want {
r.Fatalf("got %d s3 WAN members want %d", got, want)
}
})
// Now we can register this into the catalog in dc2 and dc3.
{
arg := structs.RegisterRequest{
Datacenter: "dc2",
Node: "betty",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway",
Service: "mesh-gateway",
Meta: map[string]string{structs.MetaWANFederationKey: "1"},
Port: port,
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
},
}
var out struct{}
require.NoError(t, s2.RPC("Catalog.Register", &arg, &out))
}
{
arg := structs.RegisterRequest{
Datacenter: "dc3",
Node: "bonnie",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway",
Service: "mesh-gateway",
Meta: map[string]string{structs.MetaWANFederationKey: "1"},
Port: port,
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
},
}
var out struct{}
require.NoError(t, s3.RPC("Catalog.Register", &arg, &out))
}
// Wait for it to make it into the gateway locator in dc2 and then for
// AE to carry it back to the primary
retry.Run(t, func(r *retry.R) {
require.NotEmpty(r, s3.gatewayLocator.PickGateway("dc2"))
require.NotEmpty(r, s2.gatewayLocator.PickGateway("dc2"))
require.NotEmpty(r, s1.gatewayLocator.PickGateway("dc2"))
require.NotEmpty(r, s3.gatewayLocator.PickGateway("dc3"))
require.NotEmpty(r, s2.gatewayLocator.PickGateway("dc3"))
require.NotEmpty(r, s1.gatewayLocator.PickGateway("dc3"))
})
// Try to join again using the standard verification method now that
// all of the plumbing is in place.
joinWAN(t, s2, s1)
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.WANMembers()), 3; got != want {
r.Fatalf("got %d s1 WAN members want %d", got, want)
}
if got, want := len(s2.WANMembers()), 3; got != want {
r.Fatalf("got %d s2 WAN members want %d", got, want)
}
})
// Check the router has all of them
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.router.GetDatacenters()), 3; got != want {
r.Fatalf("got %d routes want %d", got, want)
}
if got, want := len(s2.router.GetDatacenters()), 3; got != want {
r.Fatalf("got %d datacenters want %d", got, want)
}
if got, want := len(s3.router.GetDatacenters()), 3; got != want {
r.Fatalf("got %d datacenters want %d", got, want)
}
})
// Ensure we can do some trivial RPC in all directions.
servers := map[string]*Server{"dc1": s1, "dc2": s2, "dc3": s3}
names := map[string]string{"dc1": "bob", "dc2": "betty", "dc3": "bonnie"}
for _, srcDC := range []string{"dc1", "dc2", "dc3"} {
srv := servers[srcDC]
for _, dstDC := range []string{"dc1", "dc2", "dc3"} {
if srcDC == dstDC {
continue
}
t.Run(srcDC+" to "+dstDC, func(t *testing.T) {
arg := structs.DCSpecificRequest{
Datacenter: dstDC,
}
var out structs.IndexedNodes
require.NoError(t, srv.RPC("Catalog.ListNodes", &arg, &out))
require.Len(t, out.Nodes, 1)
node := out.Nodes[0]
require.Equal(t, dstDC, node.Datacenter)
require.Equal(t, names[dstDC], node.Node)
})
}
}
}
func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = t.Name() + "-s1"
c.Datacenter = "dc1"
c.Bootstrap = true
c.SerfFloodInterval = 100 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
s2Name := t.Name() + "-s2"
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = s2Name
c.Datacenter = "dc2"
c.Bootstrap = false
// This wan address will be expected to be seen on s1
c.SerfWANConfig.MemberlistConfig.AdvertiseAddr = "127.0.0.2"
// This lan address will be expected to be seen on s3
c.SerfLANConfig.MemberlistConfig.AdvertiseAddr = "127.0.0.3"
c.SerfFloodInterval = 100 * time.Millisecond
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.NodeName = t.Name() + "-s3"
c.Datacenter = "dc2"
c.Bootstrap = true
c.SerfFloodInterval = 100 * time.Millisecond
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Join s2 to s1 on wan
2017-05-05 10:29:49 +00:00
joinWAN(t, s2, s1)
// Join s3 to s2 on lan
2017-05-05 10:29:49 +00:00
joinLAN(t, s3, s2)
2017-07-05 04:38:42 +00:00
// We rely on flood joining to fill across the LAN, so we expect s3 to
// show up on the WAN as well, even though it's not explicitly joined.
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.WANMembers()), 3; got != want {
r.Fatalf("got %d s1 WAN members want %d", got, want)
}
if got, want := len(s2.WANMembers()), 3; got != want {
r.Fatalf("got %d s2 WAN members want %d", got, want)
}
if got, want := len(s2.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d s2 LAN members want %d", got, want)
}
if got, want := len(s3.LANMembersInAgentPartition()), 2; got != want {
2017-06-26 12:22:09 +00:00
r.Fatalf("got %d s3 LAN members want %d", got, want)
}
})
// Check the router has both
retry.Run(t, func(r *retry.R) {
if len(s1.router.GetDatacenters()) != 2 {
r.Fatalf("remote consul missing")
}
if len(s2.router.GetDatacenters()) != 2 {
r.Fatalf("remote consul missing")
}
2017-08-30 17:31:36 +00:00
if len(s2.serverLookup.Servers()) != 2 {
r.Fatalf("local consul fellow s3 for s2 missing")
}
})
// Get and check the wan address of s2 from s1
var s2WanAddr string
for _, member := range s1.WANMembers() {
if member.Name == s2Name+".dc2" {
s2WanAddr = member.Addr.String()
}
}
if s2WanAddr != "127.0.0.2" {
t.Fatalf("s1 sees s2 on a wrong address: %s, expecting: %s", s2WanAddr, "127.0.0.2")
}
// Get and check the lan address of s2 from s3
var s2LanAddr string
for _, lanmember := range s3.LANMembersInAgentPartition() {
if lanmember.Name == s2Name {
s2LanAddr = lanmember.Addr.String()
}
}
if s2LanAddr != "127.0.0.3" {
t.Fatalf("s3 sees s2 on a wrong address: %s, expecting: %s", s2LanAddr, "127.0.0.3")
}
}
func TestServer_LeaveLeader(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
2014-01-30 21:13:29 +00:00
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
retry.Run(t, func(r *retry.R) {
r.Check(wantPeers(s1, 3))
r.Check(wantPeers(s2, 3))
r.Check(wantPeers(s3, 3))
})
// Issue a leave to the leader
var leader *Server
switch {
case s1.IsLeader():
leader = s1
case s2.IsLeader():
leader = s2
case s3.IsLeader():
leader = s3
default:
t.Fatal("no leader")
}
if err := leader.Leave(); err != nil {
t.Fatal("leave failed: ", err)
}
// Should lose a peer
retry.Run(t, func(r *retry.R) {
r.Check(wantPeers(s1, 2))
r.Check(wantPeers(s2, 2))
r.Check(wantPeers(s3, 2))
})
}
func TestServer_Leave(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
// Second server not in bootstrap mode
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join
2017-05-05 10:29:49 +00:00
joinLAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
// Issue a leave to the non-leader
var nonleader *Server
switch {
case s1.IsLeader():
nonleader = s2
case s2.IsLeader():
nonleader = s1
default:
t.Fatal("no leader")
}
if err := nonleader.Leave(); err != nil {
t.Fatal("leave failed: ", err)
}
// Should lose a peer
retry.Run(t, func(r *retry.R) {
r.Check(wantPeers(s1, 1))
r.Check(wantPeers(s2, 1))
})
}
func TestServer_RPC(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
var out struct{}
if err := s1.RPC("Status.Ping", struct{}{}, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
2014-04-07 21:36:32 +00:00
// TestServer_RPC_MetricsIntercept_Off proves that we can turn off net/rpc interceptors all together.
func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
storage := &sync.Map{} // string -> float32
keyMakingFunc := func(key []string, labels []metrics.Label) string {
allKey := strings.Join(key, "+")
for _, label := range labels {
if label.Name == "method" {
allKey = allKey + "+" + label.Value
}
}
return allKey
}
simpleRecorderFunc := func(key []string, val float32, labels []metrics.Label) {
storage.Store(keyMakingFunc(key, labels), val)
}
t.Run("test no net/rpc interceptor metric with nil func", func(t *testing.T) {
_, conf := testServerConfig(t)
deps := newDefaultDeps(t, conf)
// "disable" metrics net/rpc interceptor
deps.GetNetRPCInterceptorFunc = nil
// "hijack" the rpc recorder for asserts;
// note that there will be "internal" net/rpc calls made
// that will still show up; those don't go thru the net/rpc interceptor;
// see consul.agent.rpc.middleware.RPCTypeInternal for context
deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder {
// for the purposes of this test, we don't need isLeader or localDC
return &middleware.RequestRecorder{
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
RecorderFunc: simpleRecorderFunc,
}
}
2022-09-01 17:32:11 +00:00
s1, err := NewServer(conf, deps, grpc.NewServer())
if err != nil {
t.Fatalf("err: %v", err)
}
t.Cleanup(func() { s1.Shutdown() })
var out struct{}
if err := s1.RPC("Status.Ping", struct{}{}, &out); err != nil {
t.Fatalf("err: %v", err)
}
key := keyMakingFunc(middleware.OneTwelveRPCSummary[0].Name, []metrics.Label{{Name: "method", Value: "Status.Ping"}})
if _, ok := storage.Load(key); ok {
t.Fatalf("Did not expect to find key %s in the metrics log, ", key)
}
})
t.Run("test no net/rpc interceptor metric with func that gives nil", func(t *testing.T) {
_, conf := testServerConfig(t)
deps := newDefaultDeps(t, conf)
// "hijack" the rpc recorder for asserts;
// note that there will be "internal" net/rpc calls made
// that will still show up; those don't go thru the net/rpc interceptor;
// see consul.agent.rpc.middleware.RPCTypeInternal for context
deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder {
// for the purposes of this test, we don't need isLeader or localDC
return &middleware.RequestRecorder{
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
RecorderFunc: simpleRecorderFunc,
}
}
deps.GetNetRPCInterceptorFunc = func(recorder *middleware.RequestRecorder) rpc.ServerServiceCallInterceptor {
return nil
}
2022-09-01 17:32:11 +00:00
s2, err := NewServer(conf, deps, grpc.NewServer())
if err != nil {
t.Fatalf("err: %v", err)
}
t.Cleanup(func() { s2.Shutdown() })
if err != nil {
t.Fatalf("err: %v", err)
}
var out struct{}
if err := s2.RPC("Status.Ping", struct{}{}, &out); err != nil {
t.Fatalf("err: %v", err)
}
key := keyMakingFunc(middleware.OneTwelveRPCSummary[0].Name, []metrics.Label{{Name: "method", Value: "Status.Ping"}})
if _, ok := storage.Load(key); ok {
t.Fatalf("Did not expect to find key %s in the metrics log, ", key)
}
})
}
// TestServer_RPC_RequestRecorder proves that we cannot make a server without a valid RequestRecorder provider func
// or a non nil RequestRecorder.
func TestServer_RPC_RequestRecorder(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Run("test nil func provider", func(t *testing.T) {
_, conf := testServerConfig(t)
deps := newDefaultDeps(t, conf)
deps.NewRequestRecorderFunc = nil
2022-09-01 17:32:11 +00:00
s1, err := NewServer(conf, deps, grpc.NewServer())
require.Error(t, err, "need err when provider func is nil")
require.Equal(t, err.Error(), "cannot initialize server without an RPC request recorder provider")
t.Cleanup(func() {
if s1 != nil {
s1.Shutdown()
}
})
})
t.Run("test nil RequestRecorder", func(t *testing.T) {
_, conf := testServerConfig(t)
deps := newDefaultDeps(t, conf)
deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder {
return nil
}
2022-09-01 17:32:11 +00:00
s2, err := NewServer(conf, deps, grpc.NewServer())
require.Error(t, err, "need err when RequestRecorder is nil")
require.Equal(t, err.Error(), "cannot initialize server with a nil RPC request recorder")
t.Cleanup(func() {
if s2 != nil {
s2.Shutdown()
}
})
})
}
// TestServer_RPC_MetricsIntercept mocks a request recorder and asserts that RPC calls are observed.
func TestServer_RPC_MetricsIntercept(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, conf := testServerConfig(t)
deps := newDefaultDeps(t, conf)
// The method used to record metric observations here is similar to that used in
// interceptors_test.go.
storage := &sync.Map{} // string -> float32
keyMakingFunc := func(key []string, labels []metrics.Label) string {
allKey := strings.Join(key, "+")
for _, label := range labels {
allKey = allKey + "+" + label.Value
}
return allKey
}
simpleRecorderFunc := func(key []string, val float32, labels []metrics.Label) {
storage.Store(keyMakingFunc(key, labels), val)
}
deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder {
// for the purposes of this test, we don't need isLeader or localDC
return &middleware.RequestRecorder{
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
RecorderFunc: simpleRecorderFunc,
}
}
deps.GetNetRPCInterceptorFunc = func(recorder *middleware.RequestRecorder) rpc.ServerServiceCallInterceptor {
return func(reqServiceMethod string, argv, replyv reflect.Value, handler func() error) {
reqStart := time.Now()
err := handler()
recorder.Record(reqServiceMethod, "test", reqStart, argv.Interface(), err != nil)
}
}
s, err := newServerWithDeps(t, conf, deps)
if err != nil {
t.Fatalf("err: %v", err)
}
defer s.Shutdown()
testrpc.WaitForTestAgent(t, s.RPC, "dc1")
// asserts
t.Run("test happy path for metrics interceptor", func(t *testing.T) {
var out struct{}
if err := s.RPC("Status.Ping", struct{}{}, &out); err != nil {
t.Fatalf("err: %v", err)
}
expectedLabels := []metrics.Label{
{Name: "method", Value: "Status.Ping"},
{Name: "errored", Value: "false"},
{Name: "request_type", Value: "read"},
{Name: "rpc_type", Value: "test"},
{Name: "server_role", Value: "unreported"},
}
key := keyMakingFunc(middleware.OneTwelveRPCSummary[0].Name, expectedLabels)
if _, ok := storage.Load(key); !ok {
// the compound key will look like: "rpc+server+call+Status.Ping+false+read+test+unreported"
t.Fatalf("Did not find key %s in the metrics log, ", key)
}
})
}
2014-04-07 21:36:32 +00:00
func TestServer_JoinLAN_TLS(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
_, conf1 := testServerConfig(t)
conf1.TLSConfig.InternalRPC.VerifyIncoming = true
conf1.TLSConfig.InternalRPC.VerifyOutgoing = true
2014-04-07 21:36:32 +00:00
configureTLS(conf1)
2020-07-29 20:05:51 +00:00
s1, err := newServer(t, conf1)
2014-04-07 21:36:32 +00:00
if err != nil {
t.Fatalf("err: %v", err)
}
defer s1.Shutdown()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
2014-04-07 21:36:32 +00:00
_, conf2 := testServerConfig(t)
2014-04-07 21:36:32 +00:00
conf2.Bootstrap = false
conf2.TLSConfig.InternalRPC.VerifyIncoming = true
conf2.TLSConfig.InternalRPC.VerifyOutgoing = true
2014-04-07 21:36:32 +00:00
configureTLS(conf2)
2020-07-29 20:05:51 +00:00
s2, err := newServer(t, conf2)
2014-04-07 21:36:32 +00:00
if err != nil {
t.Fatalf("err: %v", err)
}
defer s2.Shutdown()
// Try to join
2017-05-05 10:29:49 +00:00
joinLAN(t, s2, s1)
testrpc.WaitForTestAgent(t, s2.RPC, "dc1")
// Verify Raft has established a peer
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft([]*Server{s1, s2}))
})
2014-04-07 21:36:32 +00:00
}
func TestServer_Expect(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
// All test servers should be in expect=3 mode, except for the 3rd one,
// but one with expect=0 can cause a bootstrap to occur from the other
// servers as currently implemented.
dir1, s1 := testServerDCExpect(t, "dc1", 3)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCExpect(t, "dc1", 3)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
2014-06-18 23:15:28 +00:00
dir3, s3 := testServerDCExpect(t, "dc1", 0)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
dir4, s4 := testServerDCExpect(t, "dc1", 3)
defer os.RemoveAll(dir4)
defer s4.Shutdown()
// Join the first two servers.
2017-05-05 10:29:49 +00:00
joinLAN(t, s2, s1)
// Should have no peers yet since the bootstrap didn't occur.
retry.Run(t, func(r *retry.R) {
r.Check(wantPeers(s1, 0))
r.Check(wantPeers(s2, 0))
})
// Join the third node.
2017-05-05 10:29:49 +00:00
joinLAN(t, s3, s1)
// Now we have three servers so we should bootstrap.
retry.Run(t, func(r *retry.R) {
r.Check(wantPeers(s1, 3))
r.Check(wantPeers(s2, 3))
r.Check(wantPeers(s3, 3))
})
2019-07-12 15:52:26 +00:00
// Join the fourth node.
joinLAN(t, s4, s1)
// Wait for the new server to see itself added to the cluster.
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft([]*Server{s1, s2, s3, s4}))
})
}
// Should not trigger bootstrap and new election when s3 joins, since cluster exists
func TestServer_AvoidReBootstrap(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
2019-07-12 15:52:26 +00:00
dir1, s1 := testServerDCExpect(t, "dc1", 2)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCExpect(t, "dc1", 0)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCExpect(t, "dc1", 2)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Join the first two servers
joinLAN(t, s2, s1)
// Make sure a leader is elected, grab the current term and then add in
2019-07-12 15:52:26 +00:00
// the third server.
testrpc.WaitForLeader(t, s1.RPC, "dc1")
termBefore := s1.raft.Stats()["last_log_term"]
2019-07-12 15:52:26 +00:00
joinLAN(t, s3, s1)
// Wait for the new server to see itself added to the cluster.
retry.Run(t, func(r *retry.R) {
2019-07-12 15:52:26 +00:00
r.Check(wantRaft([]*Server{s1, s2, s3}))
})
// Make sure there's still a leader and that the term didn't change,
// so we know an election didn't occur.
testrpc.WaitForLeader(t, s1.RPC, "dc1")
termAfter := s1.raft.Stats()["last_log_term"]
if termAfter != termBefore {
t.Fatalf("looks like an election took place")
}
}
2018-09-20 00:41:36 +00:00
func TestServer_Expect_NonVoters(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
2018-09-20 00:41:36 +00:00
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 2
c.ReadReplica = true
})
2018-09-20 00:41:36 +00:00
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCExpect(t, "dc1", 2)
2018-09-20 00:41:36 +00:00
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCExpect(t, "dc1", 2)
2018-09-20 00:41:36 +00:00
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Join the first two servers.
2018-09-20 00:41:36 +00:00
joinLAN(t, s2, s1)
// Should have no peers yet since the bootstrap didn't occur.
retry.Run(t, func(r *retry.R) {
r.Check(wantPeers(s1, 0))
r.Check(wantPeers(s2, 0))
})
// Join the third node.
joinLAN(t, s3, s1)
2018-09-20 00:41:36 +00:00
// Now we have three servers so we should bootstrap.
retry.Run(t, func(r *retry.R) {
r.Check(wantPeers(s1, 2))
r.Check(wantPeers(s2, 2))
r.Check(wantPeers(s3, 2))
2018-09-20 00:41:36 +00:00
})
// Make sure a leader is elected
testrpc.WaitForLeader(t, s1.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft([]*Server{s1, s2, s3}))
2018-09-20 00:41:36 +00:00
})
}
func TestServer_BadExpect(t *testing.T) {
t.Parallel()
// this one is in expect=3 mode
dir1, s1 := testServerDCExpect(t, "dc1", 3)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
// this one is in expect=2 mode
dir2, s2 := testServerDCExpect(t, "dc1", 2)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// and this one is in expect=3 mode
dir3, s3 := testServerDCExpect(t, "dc1", 3)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join
2017-05-05 10:29:49 +00:00
joinLAN(t, s2, s1)
// should have no peers yet
retry.Run(t, func(r *retry.R) {
r.Check(wantPeers(s1, 0))
r.Check(wantPeers(s2, 0))
})
// join the third node
2017-05-05 10:29:49 +00:00
joinLAN(t, s3, s1)
// should still have no peers (because s2 is in expect=2 mode)
retry.Run(t, func(r *retry.R) {
r.Check(wantPeers(s1, 0))
r.Check(wantPeers(s2, 0))
r.Check(wantPeers(s3, 0))
})
}
type fakeGlobalResp struct{}
func (r *fakeGlobalResp) Add(interface{}) {
}
func (r *fakeGlobalResp) New() interface{} {
return struct{}{}
}
func TestServer_keyringRPCs(t *testing.T) {
t.Parallel()
dir1, s1 := testServerDC(t, "dc1")
defer os.RemoveAll(dir1)
defer s1.Shutdown()
retry.Run(t, func(r *retry.R) {
if len(s1.router.GetDatacenters()) != 1 {
r.Fatal(nil)
}
})
2014-12-05 05:32:59 +00:00
// Check that an error from a remote DC is returned
_, err := s1.keyringRPCs("Bad.Method", nil, []string{s1.config.Datacenter})
if err == nil {
t.Fatalf("should have errored")
}
if !strings.Contains(err.Error(), "Bad.Method") {
t.Fatalf("unexpected error: %s", err)
}
}
func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) {
joinLAN(t, s1, s2)
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft([]*Server{s1, s2}))
})
// Have s2 make an RPC call to s1
var leader *metadata.Server
2017-08-30 17:31:36 +00:00
for _, server := range s2.serverLookup.Servers() {
if server.Name == s1.config.NodeName {
leader = server
}
}
2017-05-24 19:26:42 +00:00
if leader == nil {
t.Fatal("no leader")
}
return s2.connPool.Ping(leader.Datacenter, leader.ShortName, leader.Addr)
}
func TestServer_TLSToNoTLS(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
// Set up a server with no TLS configured
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Add a second server with TLS configured
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.TLSConfig.InternalRPC.CAFile = "../../test/client_certs/rootca.crt"
c.TLSConfig.InternalRPC.CertFile = "../../test/client_certs/server.crt"
c.TLSConfig.InternalRPC.KeyFile = "../../test/client_certs/server.key"
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
success, err := testVerifyRPC(s1, s2, t)
if err != nil {
t.Fatal(err)
}
if !success {
t.Fatalf("bad: %v", success)
}
}
func TestServer_TLSForceOutgoingToNoTLS(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
// Set up a server with no TLS configured
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Add a second server with TLS and VerifyOutgoing set
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.TLSConfig.InternalRPC.CAFile = "../../test/client_certs/rootca.crt"
c.TLSConfig.InternalRPC.CertFile = "../../test/client_certs/server.crt"
c.TLSConfig.InternalRPC.KeyFile = "../../test/client_certs/server.key"
c.TLSConfig.InternalRPC.VerifyOutgoing = true
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
_, err := testVerifyRPC(s1, s2, t)
if err == nil || !strings.Contains(err.Error(), "remote error: tls") {
t.Fatalf("should fail")
}
}
func TestServer_TLSToFullVerify(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
// Set up a server with TLS and VerifyIncoming set
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.TLSConfig.InternalRPC.CAFile = "../../test/client_certs/rootca.crt"
c.TLSConfig.InternalRPC.CertFile = "../../test/client_certs/server.crt"
c.TLSConfig.InternalRPC.KeyFile = "../../test/client_certs/server.key"
c.TLSConfig.InternalRPC.VerifyOutgoing = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Add a second server with TLS configured
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.TLSConfig.InternalRPC.CAFile = "../../test/client_certs/rootca.crt"
c.TLSConfig.InternalRPC.CertFile = "../../test/client_certs/server.crt"
c.TLSConfig.InternalRPC.KeyFile = "../../test/client_certs/server.key"
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
success, err := testVerifyRPC(s1, s2, t)
if err != nil {
t.Fatal(err)
}
if !success {
t.Fatalf("bad: %v", success)
}
}
func TestServer_RevokeLeadershipIdempotent(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
s1.revokeLeadership()
s1.revokeLeadership()
}
func TestServer_ReloadConfig(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
entryInit := &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
// these are made a []uint8 and a int64 to allow the Equals test to pass
// otherwise it will fail complaining about data types
"foo": "bar",
"bar": int64(1),
},
}
dir1, s := testServerWithConfig(t, func(c *Config) {
c.Build = "1.5.0"
c.RPCRateLimit = 500
c.RPCMaxBurst = 5000
// Set one raft param to be non-default in the initial config, others are
// default.
c.RaftConfig.TrailingLogs = 1234
})
defer os.RemoveAll(dir1)
defer s.Shutdown()
testrpc.WaitForTestAgent(t, s.RPC, "dc1")
limiter := s.rpcLimiter.Load().(*rate.Limiter)
require.Equal(t, rate.Limit(500), limiter.Limit())
require.Equal(t, 5000, limiter.Burst())
rc := ReloadableConfig{
RPCRateLimit: 1000,
RPCMaxBurst: 10000,
ConfigEntryBootstrap: []structs.ConfigEntry{entryInit},
// Reset the custom one to default be removing it from config file (it will
// be a zero value here).
RaftTrailingLogs: 0,
// Set a different Raft param to something custom now
RaftSnapshotThreshold: 4321,
// Leave other raft fields default
}
require.NoError(t, s.ReloadConfig(rc))
_, entry, err := s.fsm.State().ConfigEntry(nil, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMetaInDefaultPartition())
require.NoError(t, err)
require.NotNil(t, entry)
global, ok := entry.(*structs.ProxyConfigEntry)
require.True(t, ok)
require.Equal(t, entryInit.Kind, global.Kind)
require.Equal(t, entryInit.Name, global.Name)
require.Equal(t, entryInit.Config, global.Config)
// Check rate limiter got updated
limiter = s.rpcLimiter.Load().(*rate.Limiter)
require.Equal(t, rate.Limit(1000), limiter.Limit())
require.Equal(t, 10000, limiter.Burst())
// Check raft config
defaults := DefaultConfig()
got := s.raft.ReloadableConfig()
require.Equal(t, uint64(4321), got.SnapshotThreshold,
"should have be reloaded to new value")
require.Equal(t, defaults.RaftConfig.SnapshotInterval, got.SnapshotInterval,
"should have remained the default interval")
require.Equal(t, defaults.RaftConfig.TrailingLogs, got.TrailingLogs,
"should have reloaded to default trailing_logs")
// Now check that update each of those raft fields separately works correctly
// too.
}
func TestServer_computeRaftReloadableConfig(t *testing.T) {
defaults := DefaultConfig().RaftConfig
cases := []struct {
name string
rc ReloadableConfig
want raft.ReloadableConfig
}{
{
// This case is the common path - reload is called with a ReloadableConfig
// populated from the RuntimeConfig which has zero values for the fields.
// On startup we selectively pick non-zero runtime config fields to
// override defaults so we need to do the same.
name: "Still defaults",
rc: ReloadableConfig{},
want: raft.ReloadableConfig{
SnapshotThreshold: defaults.SnapshotThreshold,
SnapshotInterval: defaults.SnapshotInterval,
TrailingLogs: defaults.TrailingLogs,
ElectionTimeout: defaults.ElectionTimeout,
HeartbeatTimeout: defaults.HeartbeatTimeout,
},
},
{
name: "Threshold set",
rc: ReloadableConfig{
RaftSnapshotThreshold: 123456,
},
want: raft.ReloadableConfig{
SnapshotThreshold: 123456,
SnapshotInterval: defaults.SnapshotInterval,
TrailingLogs: defaults.TrailingLogs,
ElectionTimeout: defaults.ElectionTimeout,
HeartbeatTimeout: defaults.HeartbeatTimeout,
},
},
{
name: "interval set",
rc: ReloadableConfig{
RaftSnapshotInterval: 13 * time.Minute,
},
want: raft.ReloadableConfig{
SnapshotThreshold: defaults.SnapshotThreshold,
SnapshotInterval: 13 * time.Minute,
TrailingLogs: defaults.TrailingLogs,
ElectionTimeout: defaults.ElectionTimeout,
HeartbeatTimeout: defaults.HeartbeatTimeout,
},
},
{
name: "trailing logs set",
rc: ReloadableConfig{
RaftTrailingLogs: 78910,
},
want: raft.ReloadableConfig{
SnapshotThreshold: defaults.SnapshotThreshold,
SnapshotInterval: defaults.SnapshotInterval,
TrailingLogs: 78910,
ElectionTimeout: defaults.ElectionTimeout,
HeartbeatTimeout: defaults.HeartbeatTimeout,
},
},
{
name: "all set",
rc: ReloadableConfig{
RaftSnapshotThreshold: 123456,
RaftSnapshotInterval: 13 * time.Minute,
RaftTrailingLogs: 78910,
ElectionTimeout: 300 * time.Millisecond,
HeartbeatTimeout: 400 * time.Millisecond,
},
want: raft.ReloadableConfig{
SnapshotThreshold: 123456,
SnapshotInterval: 13 * time.Minute,
TrailingLogs: 78910,
ElectionTimeout: 300 * time.Millisecond,
HeartbeatTimeout: 400 * time.Millisecond,
},
},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
got := computeRaftReloadableConfig(tc.rc)
require.Equal(t, tc.want, got)
})
}
}
func TestServer_RPC_RateLimit(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
_, conf1 := testServerConfig(t)
conf1.RPCRateLimit = 2
conf1.RPCMaxBurst = 2
2020-07-29 20:05:51 +00:00
s1, err := newServer(t, conf1)
if err != nil {
t.Fatalf("err: %v", err)
}
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
var out struct{}
if err := s1.RPC("Status.Ping", struct{}{}, &out); err != structs.ErrRPCRateExceeded {
r.Fatalf("err: %v", err)
}
})
}
// TestServer_Peering_LeadershipCheck tests that a peering service can receive the leader address
// through the LeaderAddress IRL.
func TestServer_Peering_LeadershipCheck(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
// given two servers: s1 (leader), s2 (follower)
_, conf1 := testServerConfig(t)
s1, err := newServer(t, conf1)
if err != nil {
t.Fatalf("err: %v", err)
}
defer s1.Shutdown()
_, conf2 := testServerConfig(t)
conf2.Bootstrap = false
s2, err := newServer(t, conf2)
if err != nil {
t.Fatalf("err: %v", err)
}
defer s2.Shutdown()
// Try to join
joinLAN(t, s2, s1)
// Verify Raft has established a peer
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft([]*Server{s1, s2}))
})
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
waitForLeaderEstablishment(t, s1)
// the actual tests
// when leadership has been established s2 should have the address of s1
// in the peering service
peeringLeaderAddr := s2.peeringBackend.GetLeaderAddress()
require.Equal(t, s1.config.RPCAddr.String(), peeringLeaderAddr)
// test corollary by transitivity to future-proof against any setup bugs
require.NotEqual(t, s2.config.RPCAddr.String(), peeringLeaderAddr)
}
func TestServer_hcpManager(t *testing.T) {
_, conf1 := testServerConfig(t)
conf1.BootstrapExpect = 1
conf1.RPCAdvertise = &net.TCPAddr{IP: []byte{127, 0, 0, 2}, Port: conf1.RPCAddr.Port}
hcp1 := hcp.NewMockClient(t)
hcp1.EXPECT().PushServerStatus(mock.Anything, mock.MatchedBy(func(status *hcp.ServerStatus) bool {
return status.ID == string(conf1.NodeID)
})).Run(func(ctx context.Context, status *hcp.ServerStatus) {
require.Equal(t, status.LanAddress, "127.0.0.2")
}).Call.Return(nil)
deps1 := newDefaultDeps(t, conf1)
deps1.HCP.Client = hcp1
s1, err := newServerWithDeps(t, conf1, deps1)
if err != nil {
t.Fatalf("err: %v", err)
}
defer s1.Shutdown()
require.NotNil(t, s1.hcpManager)
waitForLeaderEstablishment(t, s1)
hcp1.AssertExpectations(t)
}