open-nomad/api/internal/testutil/server.go

408 lines
11 KiB
Go

package testutil
// TestServer is a test helper. It uses a fork/exec model to create
// a test Nomad server instance in the background and initialize it
// with some data and/or services. The test server can then be used
// to run a unit test, and offers an easy API to tear itself down
// when the test has completed. The only prerequisite is to have a nomad
// binary available on the $PATH.
//
// This package does not use Nomad's official API client. This is
// because we use TestServer to test the API client, which would
// otherwise cause an import cycle.
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"time"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/nomad/api/internal/testutil/discover"
testing "github.com/mitchellh/go-testing-interface"
"github.com/shoenig/test/must"
"github.com/shoenig/test/portal"
"github.com/shoenig/test/wait"
)
// TestServerConfig is the main server configuration struct.
type TestServerConfig struct {
NodeName string `json:"name,omitempty"`
DataDir string `json:"data_dir,omitempty"`
Region string `json:"region,omitempty"`
DisableCheckpoint bool `json:"disable_update_check"`
LogLevel string `json:"log_level,omitempty"`
Consul *Consul `json:"consul,omitempty"`
AdvertiseAddrs *Advertise `json:"advertise,omitempty"`
Ports *PortsConfig `json:"ports,omitempty"`
Server *ServerConfig `json:"server,omitempty"`
Client *ClientConfig `json:"client,omitempty"`
Vault *VaultConfig `json:"vault,omitempty"`
ACL *ACLConfig `json:"acl,omitempty"`
Telemetry *Telemetry `json:"telemetry,omitempty"`
DevMode bool `json:"-"`
Stdout, Stderr io.Writer `json:"-"`
}
// Consul is used to configure the communication with Consul
type Consul struct {
Address string `json:"address,omitempty"`
Auth string `json:"auth,omitempty"`
Token string `json:"token,omitempty"`
}
// Advertise is used to configure the addresses to advertise
type Advertise struct {
HTTP string `json:"http,omitempty"`
RPC string `json:"rpc,omitempty"`
Serf string `json:"serf,omitempty"`
}
// PortsConfig is used to configure the network ports we use.
type PortsConfig struct {
HTTP int `json:"http,omitempty"`
RPC int `json:"rpc,omitempty"`
Serf int `json:"serf,omitempty"`
}
// ServerConfig is used to configure the nomad server.
type ServerConfig struct {
Enabled bool `json:"enabled"`
BootstrapExpect int `json:"bootstrap_expect"`
RaftProtocol int `json:"raft_protocol,omitempty"`
}
// ClientConfig is used to configure the client
type ClientConfig struct {
Enabled bool `json:"enabled"`
Options map[string]string `json:"options,omitempty"`
}
// VaultConfig is used to configure Vault
type VaultConfig struct {
Enabled bool `json:"enabled"`
}
// ACLConfig is used to configure ACLs
type ACLConfig struct {
Enabled bool `json:"enabled"`
}
// Telemetry is used to configure the Nomad telemetry setup.
type Telemetry struct {
PrometheusMetrics bool `json:"prometheus_metrics"`
}
// ServerConfigCallback is a function interface which can be
// passed to NewTestServerConfig to modify the server config.
type ServerConfigCallback func(c *TestServerConfig)
// defaultServerConfig returns a new TestServerConfig struct pre-populated with
// usable config for running as server.
func defaultServerConfig(t testing.T) *TestServerConfig {
grabber := portal.New(t)
ports := grabber.Grab(3)
logLevel := "ERROR"
if envLogLevel := os.Getenv("NOMAD_TEST_LOG_LEVEL"); envLogLevel != "" {
logLevel = envLogLevel
}
return &TestServerConfig{
NodeName: fmt.Sprintf("node-%d", ports[0]),
DisableCheckpoint: true,
LogLevel: logLevel,
Ports: &PortsConfig{
HTTP: ports[0],
RPC: ports[1],
Serf: ports[2],
},
Server: &ServerConfig{
Enabled: true,
BootstrapExpect: 1,
},
Client: &ClientConfig{
Enabled: false,
},
Vault: &VaultConfig{
Enabled: false,
},
ACL: &ACLConfig{
Enabled: false,
},
}
}
// TestServer is the main server wrapper struct.
type TestServer struct {
cmd *exec.Cmd
Config *TestServerConfig
t testing.T
HTTPAddr string
SerfAddr string
HTTPClient *http.Client
}
// NewTestServer creates a new TestServer, and makes a call to
// an optional callback function to modify the configuration.
func NewTestServer(t testing.T, cb ServerConfigCallback) *TestServer {
path, err := discover.NomadExecutable()
if err != nil {
t.Skipf("nomad not found, skipping: %v", err)
}
// Check that we are actually running nomad
_, err = exec.Command(path, "-version").CombinedOutput()
must.NoError(t, err)
dataDir, err := os.MkdirTemp("", "nomad")
must.NoError(t, err)
configFile, err := os.CreateTemp(dataDir, "nomad")
must.NoError(t, err)
nomadConfig := defaultServerConfig(t)
nomadConfig.DataDir = dataDir
if cb != nil {
cb(nomadConfig)
}
if nomadConfig.DevMode {
if nomadConfig.Client.Options == nil {
nomadConfig.Client.Options = map[string]string{}
}
nomadConfig.Client.Options["test.tighten_network_timeouts"] = "true"
}
configContent, err := json.Marshal(nomadConfig)
must.NoError(t, err)
_, err = configFile.Write(configContent)
must.NoError(t, err)
must.NoError(t, configFile.Sync())
must.NoError(t, configFile.Close())
args := []string{"agent", "-config", configFile.Name()}
if nomadConfig.DevMode {
args = append(args, "-dev")
}
stdout := io.Writer(os.Stdout)
if nomadConfig.Stdout != nil {
stdout = nomadConfig.Stdout
}
stderr := io.Writer(os.Stderr)
if nomadConfig.Stderr != nil {
stderr = nomadConfig.Stderr
}
// Start the server
cmd := exec.Command(path, args...)
cmd.Stdout = stdout
cmd.Stderr = stderr
must.NoError(t, cmd.Start())
client := cleanhttp.DefaultClient()
client.Timeout = 10 * time.Second
server := &TestServer{
Config: nomadConfig,
cmd: cmd,
t: t,
HTTPAddr: fmt.Sprintf("127.0.0.1:%d", nomadConfig.Ports.HTTP),
SerfAddr: fmt.Sprintf("127.0.0.1:%d", nomadConfig.Ports.Serf),
HTTPClient: client,
}
// Wait for the server to be ready
if nomadConfig.Server.Enabled && nomadConfig.Server.BootstrapExpect != 0 {
server.waitForLeader()
} else {
server.waitForAPI()
}
// Wait for the client to be ready
if nomadConfig.DevMode {
server.waitForClient()
}
return server
}
// Stop stops the test Nomad server, and removes the Nomad data
// directory once we are done.
func (s *TestServer) Stop() {
defer func() { _ = os.RemoveAll(s.Config.DataDir) }()
// wait for the process to exit to be sure that the data dir can be
// deleted on all platforms.
done := make(chan struct{})
go func() {
defer close(done)
_ = s.cmd.Wait()
}()
// kill and wait gracefully
err := s.cmd.Process.Signal(os.Interrupt)
must.NoError(s.t, err)
select {
case <-done:
return
case <-time.After(5 * time.Second):
s.t.Logf("timed out waiting for process to gracefully terminate")
}
err = s.cmd.Process.Kill()
must.NoError(s.t, err, must.Sprint("failed to kill process"))
select {
case <-done:
case <-time.After(5 * time.Second):
s.t.Logf("timed out waiting for process to be killed")
}
}
// waitForAPI waits for only the agent HTTP endpoint to start
// responding. This is an indication that the agent has started,
// but will likely return before a leader is elected.
func (s *TestServer) waitForAPI() {
f := func() error {
resp, err := s.HTTPClient.Get(s.url("/v1/metrics"))
if err != nil {
return fmt.Errorf("failed to get metrics: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if err = s.requireOK(resp); err != nil {
return fmt.Errorf("metrics response is not ok: %w", err)
}
return nil
}
must.Wait(s.t,
wait.InitialSuccess(
wait.ErrorFunc(f),
wait.Timeout(10*time.Second),
wait.Gap(1*time.Second),
),
must.Sprint("failed to wait for api"),
)
}
// waitForLeader waits for the Nomad server's HTTP API to become
// available, and then waits for a known leader and an index of
// 1 or more to be observed to confirm leader election is done.
func (s *TestServer) waitForLeader() {
f := func() error {
// Query the API and check the status code
// Using this endpoint as it is does not have restricted access
resp, err := s.HTTPClient.Get(s.url("/v1/status/leader"))
if err != nil {
return fmt.Errorf("failed to get leader: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if err = s.requireOK(resp); err != nil {
return fmt.Errorf("leader response is not ok: %w", err)
}
return nil
}
must.Wait(s.t,
wait.InitialSuccess(
wait.ErrorFunc(f),
wait.Timeout(10*time.Second),
wait.Gap(1*time.Second),
),
must.Sprint("failed to wait for leader"),
)
}
// waitForClient waits for the Nomad client to be ready. The function returns
// immediately if the server is not in dev mode.
func (s *TestServer) waitForClient() {
if !s.Config.DevMode {
return
}
f := func() error {
resp, err := s.HTTPClient.Get(s.url("/v1/nodes"))
if err != nil {
return fmt.Errorf("failed to get nodes: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if err = s.requireOK(resp); err != nil {
return fmt.Errorf("nodes response not ok: %w", err)
}
var decoded []struct {
ID string
Status string
}
if err = json.NewDecoder(resp.Body).Decode(&decoded); err != nil {
return fmt.Errorf("failed to decode nodes response: %w", err)
}
return nil
}
must.Wait(s.t,
wait.InitialSuccess(
wait.ErrorFunc(f),
wait.Timeout(10*time.Second),
wait.Gap(1*time.Second),
),
must.Sprint("failed to wait for client (node)"),
)
}
// url is a helper function which takes a relative URL and
// makes it into a proper URL against the local Nomad server.
func (s *TestServer) url(path string) string {
return fmt.Sprintf("http://%s%s", s.HTTPAddr, path)
}
// requireOK checks the HTTP response code and ensures it is acceptable.
func (s *TestServer) requireOK(resp *http.Response) error {
if resp.StatusCode != 200 {
return fmt.Errorf("bad status code: %d", resp.StatusCode)
}
return nil
}
// put performs a new HTTP PUT request.
func (s *TestServer) put(path string, body io.Reader) *http.Response {
req, err := http.NewRequest("PUT", s.url(path), body)
must.NoError(s.t, err)
resp, err := s.HTTPClient.Do(req)
must.NoError(s.t, err)
if err = s.requireOK(resp); err != nil {
_ = resp.Body.Close()
must.NoError(s.t, err)
}
return resp
}
// get performs a new HTTP GET request.
func (s *TestServer) get(path string) *http.Response {
resp, err := s.HTTPClient.Get(s.url(path))
must.NoError(s.t, err)
if err = s.requireOK(resp); err != nil {
_ = resp.Body.Close()
must.NoError(s.t, err)
}
return resp
}
// encodePayload returns a new io.Reader wrapping the encoded contents
// of the payload, suitable for passing directly to a new request.
func (s *TestServer) encodePayload(payload any) io.Reader {
var encoded bytes.Buffer
err := json.NewEncoder(&encoded).Encode(payload)
must.NoError(s.t, err)
return &encoded
}