package cluster import ( "context" "encoding/json" "fmt" "io" "os" "path/filepath" "strconv" "time" goretry "github.com/avast/retry-go" dockercontainer "github.com/docker/docker/api/types/container" "github.com/hashicorp/go-multierror" "github.com/pkg/errors" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" ) const bootLogLine = "Consul agent running" const disableRYUKEnv = "TESTCONTAINERS_RYUK_DISABLED" // Exposed ports info const MaxEnvoyOnNode = 10 // the max number of Envoy sidecar can run along with the agent, base is 19000 const ServiceUpstreamLocalBindPort = 5000 // local bind Port of service's upstream // consulContainerNode implements the Agent interface by running a Consul agent // in a container. type consulContainerNode struct { ctx context.Context pod testcontainers.Container container testcontainers.Container serverMode bool datacenter string config Config podReq testcontainers.ContainerRequest consulReq testcontainers.ContainerRequest dataDir string network string id int name string terminateFuncs []func() error client *api.Client clientAddr string clientCACertFile string ip string nextAdminPortOffset int nextConnectPortOffset int info AgentInfo } func (c *consulContainerNode) GetPod() testcontainers.Container { return c.pod } func (c *consulContainerNode) ClaimAdminPort() (int, error) { if c.nextAdminPortOffset >= MaxEnvoyOnNode { return 0, fmt.Errorf("running out of envoy admin port, max %d, already claimed %d", MaxEnvoyOnNode, c.nextAdminPortOffset) } p := 19000 + c.nextAdminPortOffset c.nextAdminPortOffset++ return p, nil } // NewConsulContainer starts a Consul agent in a container with the given config. func NewConsulContainer(ctx context.Context, config Config, cluster *Cluster) (Agent, error) { network := cluster.NetworkName index := cluster.Index if config.ScratchDir == "" { return nil, fmt.Errorf("ScratchDir is required") } license, err := readLicense() if err != nil { return nil, err } pc, err := readSomeConfigFileFields(config.JSON) if err != nil { return nil, err } consulType := "client" if pc.Server { consulType = "server" } name := utils.RandName(fmt.Sprintf("%s-consul-%s-%d", pc.Datacenter, consulType, index)) // Inject new Agent name config.Cmd = append(config.Cmd, "-node", name) tmpDirData := filepath.Join(config.ScratchDir, "data") if err := os.MkdirAll(tmpDirData, 0777); err != nil { return nil, fmt.Errorf("error creating data directory %s: %w", tmpDirData, err) } if err := os.Chmod(tmpDirData, 0777); err != nil { return nil, fmt.Errorf("error chowning data directory %s: %w", tmpDirData, err) } var caCertFileForAPI string if config.CACert != "" { caCertFileForAPI = filepath.Join(config.ScratchDir, "ca.pem") if err := os.WriteFile(caCertFileForAPI, []byte(config.CACert), 0644); err != nil { return nil, fmt.Errorf("error writing out CA cert %s: %w", caCertFileForAPI, err) } } configFile, err := createConfigFile(config.ScratchDir, config.JSON) if err != nil { return nil, fmt.Errorf("error writing out config file %s: %w", configFile, err) } opts := containerOpts{ name: name, configFile: configFile, dataDir: tmpDirData, license: license, addtionalNetworks: []string{"bridge", network}, hostname: fmt.Sprintf("agent-%d", index), } podReq, consulReq := newContainerRequest(config, opts) // Do some trickery to ensure that partial completion is correctly torn // down, but successful execution is not. var deferClean utils.ResettableDefer defer deferClean.Execute() podContainer, err := startContainer(ctx, podReq) if err != nil { return nil, fmt.Errorf("error starting pod with image %q: %w", podReq.Image, err) } deferClean.Add(func() { _ = podContainer.Terminate(ctx) }) var ( httpPort = pc.Ports.HTTP httpsPort = pc.Ports.HTTPS clientAddr string clientCACertFile string info AgentInfo ) if httpPort > 0 { uri, err := podContainer.PortEndpoint(ctx, "8500", "http") if err != nil { return nil, err } clientAddr = uri } else if httpsPort > 0 { uri, err := podContainer.PortEndpoint(ctx, "8501", "https") if err != nil { return nil, err } clientAddr = uri clientCACertFile = caCertFileForAPI } else { if pc.Server { return nil, fmt.Errorf("server container does not expose HTTP or HTTPS") } } if caCertFileForAPI != "" { if config.UseAPIWithTLS { if pc.Ports.HTTPS > 0 { info.UseTLSForAPI = true } else { return nil, fmt.Errorf("UseAPIWithTLS is set but ports.https is not for this agent") } } if config.UseGRPCWithTLS { if pc.Ports.GRPCTLS > 0 { info.UseTLSForGRPC = true } else { return nil, fmt.Errorf("UseGRPCWithTLS is set but ports.grpc_tls is not for this agent") } } info.CACertFile = clientCACertFile } ip, err := podContainer.ContainerIP(ctx) if err != nil { return nil, err } consulContainer, err := startContainer(ctx, consulReq) if err != nil { return nil, fmt.Errorf("error starting main with image %q: %w", consulReq.Image, err) } deferClean.Add(func() { _ = consulContainer.Terminate(ctx) }) if utils.FollowLog { if err := consulContainer.StartLogProducer(ctx); err != nil { return nil, err } deferClean.Add(func() { _ = consulContainer.StopLogProducer() }) if config.LogConsumer != nil { consulContainer.FollowOutput(config.LogConsumer) } else { consulContainer.FollowOutput(&LogConsumer{ Prefix: opts.name, }) } } node := &consulContainerNode{ config: config, pod: podContainer, container: consulContainer, serverMode: pc.Server, datacenter: pc.Datacenter, ctx: ctx, podReq: podReq, consulReq: consulReq, dataDir: tmpDirData, network: network, id: index, name: name, ip: ip, info: info, } if httpPort > 0 || httpsPort > 0 { apiConfig := api.DefaultConfig() apiConfig.Address = clientAddr if clientCACertFile != "" { apiConfig.TLSConfig.CAFile = clientCACertFile } if cluster.TokenBootstrap != "" { apiConfig.Token = cluster.TokenBootstrap } apiClient, err := api.NewClient(apiConfig) if err != nil { return nil, err } node.client = apiClient node.clientAddr = clientAddr node.clientCACertFile = clientCACertFile } // Inject node token if ACL is enabled and the bootstrap token is generated if cluster.TokenBootstrap != "" && cluster.ACLEnabled { agentToken, err := cluster.CreateAgentToken(pc.Datacenter, name) if err != nil { return nil, err } cmd := []string{"consul", "acl", "set-agent-token", "-token", cluster.TokenBootstrap, "agent", agentToken} // retry in case agent has not fully initialized err = goretry.Do( func() error { _, err := node.Exec(context.Background(), cmd) if err != nil { return fmt.Errorf("error setting the agent token, error %s", err) } return nil }, goretry.Delay(time.Second*1), ) if err != nil { return nil, fmt.Errorf("error setting agent token: %s", err) } } // disable cleanup functions now that we have an object with a Terminate() function deferClean.Reset() return node, nil } func (c *consulContainerNode) GetName() string { if c.container == nil { return c.consulReq.Name // TODO: is this safe to do all the time? } name, err := c.container.Name(c.ctx) if err != nil { return "" } return name } func (c *consulContainerNode) GetAgentName() string { return c.name } func (c *consulContainerNode) GetConfig() Config { return c.config.Clone() } func (c *consulContainerNode) GetDatacenter() string { return c.datacenter } func (c *consulContainerNode) IsServer() bool { return c.serverMode } // GetClient returns an API client that can be used to communicate with the Agent. func (c *consulContainerNode) GetClient() *api.Client { return c.client } // NewClient returns an API client by making a new one based on the provided token // - updateDefault: if true update the default client func (c *consulContainerNode) NewClient(token string, updateDefault bool) (*api.Client, error) { apiConfig := api.DefaultConfig() apiConfig.Address = c.clientAddr if c.clientCACertFile != "" { apiConfig.TLSConfig.CAFile = c.clientCACertFile } if token != "" { apiConfig.Token = token } apiClient, err := api.NewClient(apiConfig) if err != nil { return nil, err } if updateDefault { c.client = apiClient } return apiClient, nil } func (c *consulContainerNode) GetAPIAddrInfo() (addr, caCert string) { return c.clientAddr, c.clientCACertFile } func (c *consulContainerNode) GetInfo() AgentInfo { return c.info } func (c *consulContainerNode) GetIP() string { return c.ip } func (c *consulContainerNode) RegisterTermination(f func() error) { c.terminateFuncs = append(c.terminateFuncs, f) } func (c *consulContainerNode) Exec(ctx context.Context, cmd []string) (string, error) { exitcode, reader, err := c.container.Exec(ctx, cmd) if exitcode != 0 { return "", fmt.Errorf("exec with exit code %d", exitcode) } if err != nil { return "", fmt.Errorf("exec with error %s", err) } buf, err := io.ReadAll(reader) if err != nil { return "", fmt.Errorf("error reading from exe output: %s", err) } return string(buf), err } func (c *consulContainerNode) Upgrade(ctx context.Context, config Config) error { if config.ScratchDir == "" { return fmt.Errorf("ScratchDir is required") } newConfigFile, err := createConfigFile(config.ScratchDir, config.JSON) if err != nil { return err } // We'll keep the same pod. opts := containerOpts{ name: c.consulReq.Name, configFile: newConfigFile, dataDir: c.dataDir, license: "", addtionalNetworks: []string{"bridge", c.network}, hostname: c.consulReq.Hostname, } _, consulReq2 := newContainerRequest(config, opts) consulReq2.Env = c.consulReq.Env // copy license // sanity check two fields if consulReq2.Name != c.consulReq.Name { return fmt.Errorf("new name %q should match old name %q", consulReq2.Name, c.consulReq.Name) } if consulReq2.Hostname != c.consulReq.Hostname { return fmt.Errorf("new hostname %q should match old hostname %q", consulReq2.Hostname, c.consulReq.Hostname) } if err := c.TerminateAndRetainPod(true); err != nil { return fmt.Errorf("error terminating running container during upgrade: %w", err) } c.consulReq = consulReq2 container, err := startContainer(ctx, c.consulReq) c.ctx = ctx c.container = container if err != nil { return err } if utils.FollowLog { if err := container.StartLogProducer(ctx); err != nil { return err } container.FollowOutput(&LogConsumer{ Prefix: opts.name, }) } return nil } // Terminate attempts to terminate the agent container. // This might also include running termination functions for containers associated with the agent. // On failure, an error will be returned and the reaper process (RYUK) will handle cleanup. func (c *consulContainerNode) Terminate() error { return c.terminate(false, false) } func (c *consulContainerNode) TerminateAndRetainPod(skipFuncs bool) error { return c.terminate(true, skipFuncs) } func (c *consulContainerNode) terminate(retainPod bool, skipFuncs bool) error { // Services might register a termination function that should also fire // when the "agent" is cleaned up. // If skipFuncs is tru, We skip the terminateFuncs of connect sidecar, e.g., // during upgrade if !skipFuncs { for _, f := range c.terminateFuncs { err := f() if err != nil { continue } } } var merr error if c.container != nil { if err := TerminateContainer(c.ctx, c.container, true); err != nil { merr = multierror.Append(merr, err) } c.container = nil } if !retainPod && c.pod != nil { if err := TerminateContainer(c.ctx, c.pod, false); err != nil { merr = multierror.Append(merr, err) } c.pod = nil } return merr } func (c *consulContainerNode) DataDir() string { return c.dataDir } func startContainer(ctx context.Context, req testcontainers.ContainerRequest) (testcontainers.Container, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*40) defer cancel() return testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: req, Started: true, }) } const pauseImage = "k8s.gcr.io/pause:3.3" type containerOpts struct { configFile string dataDir string hostname string index int license string name string addtionalNetworks []string } func newContainerRequest(config Config, opts containerOpts) (podRequest, consulRequest testcontainers.ContainerRequest) { skipReaper := isRYUKDisabled() pod := testcontainers.ContainerRequest{ Image: pauseImage, AutoRemove: false, Name: opts.name + "-pod", SkipReaper: skipReaper, ExposedPorts: []string{ "8500/tcp", // Consul HTTP API "8501/tcp", // Consul HTTPs API "8443/tcp", // Envoy Gateway Listener "8079/tcp", // Envoy App Listener "8080/tcp", // Envoy App Listener "9998/tcp", // Envoy App Listener "9999/tcp", // Envoy App Listener }, Hostname: opts.hostname, Networks: opts.addtionalNetworks, } // Envoy upstream listener pod.ExposedPorts = append(pod.ExposedPorts, fmt.Sprintf("%d/tcp", ServiceUpstreamLocalBindPort)) // Reserve the exposed ports for Envoy admin port, e.g., 19000 - 19009 basePort := 19000 for i := 0; i < MaxEnvoyOnNode; i++ { pod.ExposedPorts = append(pod.ExposedPorts, fmt.Sprintf("%d/tcp", basePort+i)) } // For handshakes like auto-encrypt, it can take 10's of seconds for the agent to become "ready". // If we only wait until the log stream starts, subsequent commands to agents will fail. // TODO: optimize the wait strategy app := testcontainers.ContainerRequest{ NetworkMode: dockercontainer.NetworkMode("container:" + opts.name + "-pod"), Image: config.DockerImage(), WaitingFor: wait.ForLog(bootLogLine).WithStartupTimeout(60 * time.Second), // See note above AutoRemove: false, Name: opts.name, Mounts: []testcontainers.ContainerMount{ { Source: testcontainers.DockerBindMountSource{HostPath: opts.configFile}, Target: "/consul/config/config.json", ReadOnly: true, }, { Source: testcontainers.DockerBindMountSource{HostPath: opts.dataDir}, Target: "/consul/data", }, }, Cmd: config.Cmd, SkipReaper: skipReaper, Env: map[string]string{"CONSUL_LICENSE": opts.license}, } if config.CertVolume != "" { app.Mounts = append(app.Mounts, testcontainers.ContainerMount{ Source: testcontainers.DockerVolumeMountSource{ Name: config.CertVolume, }, Target: "/consul/config/certs", ReadOnly: true, }) } // fmt.Printf("app: %s\n", utils.Dump(app)) return pod, app } // isRYUKDisabled returns whether the reaper process (RYUK) has been disabled // by an environment variable. // // https://github.com/testcontainers/moby-ryuk func isRYUKDisabled() bool { skipReaperStr := os.Getenv(disableRYUKEnv) skipReaper, err := strconv.ParseBool(skipReaperStr) if err != nil { return false } return skipReaper } func readLicense() (string, error) { if license := os.Getenv("CONSUL_LICENSE"); license != "" { return license, nil } licensePath := os.Getenv("CONSUL_LICENSE_PATH") if licensePath == "" { return "", nil } licenseBytes, err := os.ReadFile(licensePath) if err != nil { return "", err } return string(licenseBytes), nil } func createConfigFile(scratchDir string, JSON string) (string, error) { configDir := filepath.Join(scratchDir, "config") if err := os.MkdirAll(configDir, 0777); err != nil { return "", err } if err := os.Chmod(configDir, 0777); err != nil { return "", err } configFile := filepath.Join(configDir, "config.hcl") if err := os.WriteFile(configFile, []byte(JSON), 0644); err != nil { return "", err } return configFile, nil } type parsedConfig struct { Datacenter string `json:"datacenter"` Server bool `json:"server"` Ports parsedPorts `json:"ports"` } type parsedPorts struct { DNS int `json:"dns"` HTTP int `json:"http"` HTTPS int `json:"https"` GRPC int `json:"grpc"` GRPCTLS int `json:"grpc_tls"` SerfLAN int `json:"serf_lan"` SerfWAN int `json:"serf_wan"` Server int `json:"server"` } func readSomeConfigFileFields(JSON string) (parsedConfig, error) { var pc parsedConfig if err := json.Unmarshal([]byte(JSON), &pc); err != nil { return pc, errors.Wrap(err, "failed to parse config file") } if pc.Datacenter == "" { pc.Datacenter = "dc1" } return pc, nil }