connect: task hook for bootstrapping envoy sidecar

Fixes #6041

Unlike all other Consul operations, boostrapping requires Consul be
available. This PR tries Consul 3 times with a backoff to account for
the group services being asynchronously registered with Consul.
This commit is contained in:
Michael Schurter 2019-08-12 15:41:39 -07:00
parent 050cc32fde
commit 59e0b67c7f
9 changed files with 430 additions and 20 deletions

View File

@ -60,6 +60,10 @@ var (
// TaskDirs is the set of directories created in each tasks directory.
TaskDirs = map[string]os.FileMode{TmpDirName: os.ModeSticky | 0777}
// AllocGRPCSocket is the path relative to the task dir root for the
// unix socket connected to Consul's gRPC endpoint.
AllocGRPCSocket = filepath.Join(TmpDirName, "consul_grpc.sock")
)
// AllocDir allows creating, destroying, and accessing an allocation's

View File

@ -0,0 +1,151 @@
package taskrunner
import (
"bytes"
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"time"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
)
var _ interfaces.TaskPrestartHook = &envoyBootstrapHook{}
// envoyBootstrapHook writes the bootstrap config for the Connect Envoy proxy
// sidecar.
type envoyBootstrapHook struct {
alloc *structs.Allocation
// Bootstrapping Envoy requires talking directly to Consul to generate
// the bootstrap.json config. Runtime Envoy configuration is done via
// Consul's gRPC endpoint.
consulHTTPAddr string
logger log.Logger
}
func newEnvoyBootstrapHook(alloc *structs.Allocation, consulHTTPAddr string, logger log.Logger) *envoyBootstrapHook {
h := &envoyBootstrapHook{
alloc: alloc,
consulHTTPAddr: consulHTTPAddr,
}
h.logger = logger.Named(h.Name())
return h
}
func (envoyBootstrapHook) Name() string {
return "envoy_bootstrap"
}
func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
if !req.Task.Kind.IsConnectProxy() {
// Not a Connect proxy sidecar
resp.Done = true
return nil
}
serviceName := req.Task.Kind.Value()
if serviceName == "" {
return fmt.Errorf("Connect proxy sidecar does not specify service name")
}
tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup)
var service *structs.Service
for _, s := range tg.Services {
if s.Name == serviceName {
service = s
break
}
}
if service == nil {
return fmt.Errorf("Connect proxy sidecar task exists but no services configured with a sidecar")
}
h.logger.Debug("bootstrapping Connect proxy sidecar", "task", req.Task.Name, "service", serviceName)
//TODO(schmichael) relies on GRPCSocket being created
//TODO(schmichael) unnecessasry if the sidecar is running on the host netns
grpcAddr := "unix://" + filepath.Join(allocdir.SharedAllocName, allocdir.AllocGRPCSocket)
// Envoy bootstrap configuration may contain a Consul token, so write
// it to the secrets directory like Vault tokens.
fn := filepath.Join(req.TaskDir.SecretsDir, "envoy_bootstrap.json")
canary := h.alloc.DeploymentStatus.IsCanary()
id := agentconsul.MakeTaskServiceID(h.alloc.ID, "group-"+tg.Name, service, canary)
h.logger.Debug("bootstrapping envoy", "sidecar_for", service.Name, "boostrap_file", fn, "sidecar_for_id", id, "grpc_addr", grpcAddr)
// Since Consul services are registered asynchronously with this task
// hook running, retry a small number of times with backoff.
for tries := 3; ; tries-- {
cmd := exec.CommandContext(ctx, "consul", "connect", "envoy",
"-grpc-addr", grpcAddr,
"-http-addr", h.consulHTTPAddr,
"-bootstrap",
"-sidecar-for", id,
)
// Redirect output to secrets/envoy_bootstrap.json
fd, err := os.Create(fn)
if err != nil {
return fmt.Errorf("error creating secrets/envoy_bootstrap.json for envoy: %v", err)
}
cmd.Stdout = fd
buf := bytes.NewBuffer(nil)
cmd.Stderr = buf
// Generate bootstrap
err = cmd.Run()
// Close bootstrap.json
fd.Close()
if err == nil {
// Happy path! Bootstrap was created, exit.
break
}
// Check for error from command
if tries == 0 {
h.logger.Error("error creating bootstrap configuration for Connect proxy sidecar", "error", err, "stderr", buf.String())
// Cleanup the bootstrap file. An errors here is not
// important as (a) we test to ensure the deletion
// occurs, and (b) the file will either be rewritten on
// retry or eventually garbage collected if the task
// fails.
os.Remove(fn)
// ExitErrors are recoverable since they indicate the
// command was runnable but exited with a unsuccessful
// error code.
_, recoverable := err.(*exec.ExitError)
return structs.NewRecoverableError(
fmt.Errorf("error creating bootstrap configuration for Connect proxy sidecar: %v", err),
recoverable,
)
}
// Sleep before retrying to give Consul services time to register
select {
case <-time.After(2 * time.Second):
case <-ctx.Done():
// Killed before bootstrap, exit without setting Done
return nil
}
}
// Bootstrap written. Mark as done and move on.
resp.Done = true
return nil
}

View File

@ -0,0 +1,247 @@
package taskrunner
import (
"context"
"encoding/json"
"io/ioutil"
"os"
"path/filepath"
"testing"
consulapi "github.com/hashicorp/consul/api"
consultest "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/testutil"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
var _ interfaces.TaskPrestartHook = (*envoyBootstrapHook)(nil)
// TestTaskRunner_EnvoyBootstrapHook_Prestart asserts the EnvoyBootstrapHook
// creates Envoy's bootstrap.json configuration based on Connect proxy sidecars
// registered for the task.
func TestTaskRunner_EnvoyBootstrapHook_Ok(t *testing.T) {
t.Parallel()
testutil.RequireConsul(t)
testconsul, err := consultest.NewTestServerConfig(func(c *consultest.TestServerConfig) {
// If -v wasn't specified squelch consul logging
if !testing.Verbose() {
c.Stdout = ioutil.Discard
c.Stderr = ioutil.Discard
}
})
if err != nil {
t.Fatalf("error starting test consul server: %v", err)
}
defer testconsul.Stop()
alloc := mock.Alloc()
alloc.AllocatedResources.Shared.Networks = []*structs.NetworkResource{
{
Mode: "bridge",
IP: "10.0.0.1",
DynamicPorts: []structs.Port{
{
Label: "connect-proxy-foo",
Value: 9999,
To: 9999,
},
},
},
}
tg := alloc.Job.TaskGroups[0]
tg.Services = []*structs.Service{
{
Name: "foo",
PortLabel: "9999", // Just need a valid port, nothing will bind to it
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{},
},
},
}
sidecarTask := &structs.Task{
Name: "sidecar",
Kind: "connect-proxy:foo",
}
tg.Tasks = append(tg.Tasks, sidecarTask)
logger := testlog.HCLogger(t)
tmpAllocDir, err := ioutil.TempDir("", "EnvoyBootstrapHookTest")
if err != nil {
t.Fatalf("Couldn't create temp dir: %v", err)
}
defer os.RemoveAll(tmpAllocDir)
allocDir := allocdir.NewAllocDir(testlog.HCLogger(t), tmpAllocDir)
defer allocDir.Destroy()
// Register Group Services
consulConfig := consulapi.DefaultConfig()
consulConfig.Address = testconsul.HTTPAddr
consulAPIClient, err := consulapi.NewClient(consulConfig)
require.NoError(t, err)
consulClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), logger, true)
go consulClient.Run()
defer consulClient.Shutdown()
require.NoError(t, consulClient.RegisterGroup(alloc))
// Run Connect bootstrap Hook
h := newEnvoyBootstrapHook(alloc, testconsul.HTTPAddr, logger)
req := &interfaces.TaskPrestartRequest{
Task: sidecarTask,
TaskDir: allocDir.NewTaskDir(sidecarTask.Name),
}
require.NoError(t, req.TaskDir.Build(false, nil))
resp := &interfaces.TaskPrestartResponse{}
// Run the hook
require.NoError(t, h.Prestart(context.Background(), req, resp))
// Assert it is Done
require.True(t, resp.Done)
f, err := os.Open(filepath.Join(req.TaskDir.SecretsDir, "envoy_bootstrap.json"))
require.NoError(t, err)
defer f.Close()
// Assert bootstrap configuration is valid json
var out map[string]interface{}
require.NoError(t, json.NewDecoder(f).Decode(&out))
}
// TestTaskRunner_EnvoyBootstrapHook_Noop asserts that the Envoy bootstrap hook
// is a noop for non-Connect proxy sidecar tasks.
func TestTaskRunner_EnvoyBootstrapHook_Noop(t *testing.T) {
t.Parallel()
logger := testlog.HCLogger(t)
tmpAllocDir, err := ioutil.TempDir("", "EnvoyBootstrapHookTest")
if err != nil {
t.Fatalf("Couldn't create temp dir: %v", err)
}
defer os.RemoveAll(tmpAllocDir)
allocDir := allocdir.NewAllocDir(testlog.HCLogger(t), tmpAllocDir)
defer allocDir.Destroy()
alloc := mock.Alloc()
task := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0]
// Run Envoy bootstrap Hook. Use invalid Consul address as it should
// not get hit.
h := newEnvoyBootstrapHook(alloc, "http://127.0.0.2:1", logger)
req := &interfaces.TaskPrestartRequest{
Task: task,
TaskDir: allocDir.NewTaskDir(task.Name),
}
require.NoError(t, req.TaskDir.Build(false, nil))
resp := &interfaces.TaskPrestartResponse{}
// Run the hook
require.NoError(t, h.Prestart(context.Background(), req, resp))
// Assert it is Done
require.True(t, resp.Done)
// Assert no file was written
_, err = os.Open(filepath.Join(req.TaskDir.SecretsDir, "envoy_bootstrap.json"))
require.Error(t, err)
require.True(t, os.IsNotExist(err))
}
// TestTaskRunner_EnvoyBootstrapHook_RecoverableError asserts the Envoy
// bootstrap hook returns a Recoverable error if the bootstrap command runs but
// fails.
func TestTaskRunner_EnvoyBootstrapHook_RecoverableError(t *testing.T) {
t.Parallel()
testutil.RequireConsul(t)
testconsul, err := consultest.NewTestServerConfig(func(c *consultest.TestServerConfig) {
// If -v wasn't specified squelch consul logging
if !testing.Verbose() {
c.Stdout = ioutil.Discard
c.Stderr = ioutil.Discard
}
})
if err != nil {
t.Fatalf("error starting test consul server: %v", err)
}
defer testconsul.Stop()
alloc := mock.Alloc()
alloc.AllocatedResources.Shared.Networks = []*structs.NetworkResource{
{
Mode: "bridge",
IP: "10.0.0.1",
DynamicPorts: []structs.Port{
{
Label: "connect-proxy-foo",
Value: 9999,
To: 9999,
},
},
},
}
tg := alloc.Job.TaskGroups[0]
tg.Services = []*structs.Service{
{
Name: "foo",
PortLabel: "9999", // Just need a valid port, nothing will bind to it
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{},
},
},
}
sidecarTask := &structs.Task{
Name: "sidecar",
Kind: "connect-proxy:foo",
}
tg.Tasks = append(tg.Tasks, sidecarTask)
logger := testlog.HCLogger(t)
tmpAllocDir, err := ioutil.TempDir("", "EnvoyBootstrapHookTest")
if err != nil {
t.Fatalf("Couldn't create temp dir: %v", err)
}
defer os.RemoveAll(tmpAllocDir)
allocDir := allocdir.NewAllocDir(testlog.HCLogger(t), tmpAllocDir)
defer allocDir.Destroy()
// Unlike the successful test above, do NOT register the group services
// yet. This should cause a recoverable error similar to if Consul was
// not running.
// Run Connect bootstrap Hook
h := newEnvoyBootstrapHook(alloc, testconsul.HTTPAddr, logger)
req := &interfaces.TaskPrestartRequest{
Task: sidecarTask,
TaskDir: allocDir.NewTaskDir(sidecarTask.Name),
}
require.NoError(t, req.TaskDir.Build(false, nil))
resp := &interfaces.TaskPrestartResponse{}
// Run the hook
err = h.Prestart(context.Background(), req, resp)
require.Error(t, err)
require.True(t, structs.IsRecoverable(err))
// Assert it is not Done
require.False(t, resp.Done)
// Assert no file was written
_, err = os.Open(filepath.Join(req.TaskDir.SecretsDir, "envoy_bootstrap.json"))
require.Error(t, err)
require.True(t, os.IsNotExist(err))
}

View File

@ -56,15 +56,17 @@ func (tr *TaskRunner) initHooks() {
// Create the task directory hook. This is run first to ensure the
// directory path exists for other hooks.
alloc := tr.Alloc()
tr.runnerHooks = []interfaces.TaskHook{
newValidateHook(tr.clientConfig, hookLogger),
newTaskDirHook(tr, hookLogger),
newLogMonHook(tr.logmonHookConfig, hookLogger),
newDispatchHook(tr.Alloc(), hookLogger),
newDispatchHook(alloc, hookLogger),
newVolumeHook(tr, hookLogger),
newArtifactHook(tr, hookLogger),
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger),
newDeviceHook(tr.devicemanager, hookLogger),
newEnvoyBootstrapHook(alloc, tr.clientConfig.ConsulConfig.Addr, hookLogger),
}
// If Vault is enabled, add the hook

View File

@ -17,6 +17,14 @@ func RequireRoot(t *testing.T) {
}
}
// RequireConsul skips tests unless a Consul binary is available on $PATH.
func RequireConsul(t *testing.T) {
_, err := exec.Command("consul", "version").CombinedOutput()
if err != nil {
t.Skipf("Test requires Consul: %v", err)
}
}
func ExecCompatible(t *testing.T) {
if runtime.GOOS != "linux" || syscall.Geteuid() != 0 {
t.Skip("Test only available running as root on linux")

View File

@ -694,7 +694,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
*ServiceRegistration, error) {
// Get the services ID
id := makeTaskServiceID(task.AllocID, task.Name, service, task.Canary)
id := MakeTaskServiceID(task.AllocID, task.Name, service, task.Canary)
sreg := &ServiceRegistration{
serviceID: id,
checkIDs: make(map[string]struct{}, len(service.Checks)),
@ -959,7 +959,7 @@ func (c *ServiceClient) RegisterTask(task *TaskServices) error {
// Start watching checks. Done after service registrations are built
// since an error building them could leak watches.
for _, service := range task.Services {
serviceID := makeTaskServiceID(task.AllocID, task.Name, service, task.Canary)
serviceID := MakeTaskServiceID(task.AllocID, task.Name, service, task.Canary)
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := makeCheckID(serviceID, check)
@ -982,11 +982,11 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
existingIDs := make(map[string]*structs.Service, len(old.Services))
for _, s := range old.Services {
existingIDs[makeTaskServiceID(old.AllocID, old.Name, s, old.Canary)] = s
existingIDs[MakeTaskServiceID(old.AllocID, old.Name, s, old.Canary)] = s
}
newIDs := make(map[string]*structs.Service, len(newTask.Services))
for _, s := range newTask.Services {
newIDs[makeTaskServiceID(newTask.AllocID, newTask.Name, s, newTask.Canary)] = s
newIDs[MakeTaskServiceID(newTask.AllocID, newTask.Name, s, newTask.Canary)] = s
}
// Loop over existing Service IDs to see if they have been removed
@ -1083,7 +1083,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
// Start watching checks. Done after service registrations are built
// since an error building them could leak watches.
for _, service := range newIDs {
serviceID := makeTaskServiceID(newTask.AllocID, newTask.Name, service, newTask.Canary)
serviceID := MakeTaskServiceID(newTask.AllocID, newTask.Name, service, newTask.Canary)
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := makeCheckID(serviceID, check)
@ -1101,7 +1101,7 @@ func (c *ServiceClient) RemoveTask(task *TaskServices) {
ops := operations{}
for _, service := range task.Services {
id := makeTaskServiceID(task.AllocID, task.Name, service, task.Canary)
id := MakeTaskServiceID(task.AllocID, task.Name, service, task.Canary)
ops.deregServices = append(ops.deregServices, id)
for _, check := range service.Checks {
@ -1262,11 +1262,11 @@ func makeAgentServiceID(role string, service *structs.Service) string {
return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, service.Hash(role, "", false))
}
// makeTaskServiceID creates a unique ID for identifying a task service in
// MakeTaskServiceID creates a unique ID for identifying a task service in
// Consul.
//
// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http-http
func makeTaskServiceID(allocID, taskName string, service *structs.Service, canary bool) string {
func MakeTaskServiceID(allocID, taskName string, service *structs.Service, canary bool) string {
return fmt.Sprintf("%s%s-%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name, service.PortLabel)
}

View File

@ -71,7 +71,7 @@ func TestConsul_Connect(t *testing.T) {
require.NoError(t, err)
require.Len(t, services, 2)
serviceID := makeTaskServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0], false)
serviceID := MakeTaskServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0], false)
connectID := serviceID + "-sidecar-proxy"
require.Contains(t, services, serviceID)

View File

@ -1710,7 +1710,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
remainingTaskServiceID := makeTaskServiceID(remainingTask.AllocID,
remainingTaskServiceID := MakeTaskServiceID(remainingTask.AllocID,
remainingTask.Name, remainingTask.Services[0], false)
require.NoError(ctx.ServiceClient.RegisterTask(remainingTask))
@ -1733,7 +1733,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
explicitlyRemovedTaskServiceID := makeTaskServiceID(explicitlyRemovedTask.AllocID,
explicitlyRemovedTaskServiceID := MakeTaskServiceID(explicitlyRemovedTask.AllocID,
explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0], false)
require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask))
@ -1758,7 +1758,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
outofbandTaskServiceID := makeTaskServiceID(outofbandTask.AllocID,
outofbandTaskServiceID := MakeTaskServiceID(outofbandTask.AllocID,
outofbandTask.Name, outofbandTask.Services[0], false)
require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask))
@ -1819,7 +1819,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
remainingTaskServiceID := makeTaskServiceID(remainingTask.AllocID,
remainingTaskServiceID := MakeTaskServiceID(remainingTask.AllocID,
remainingTask.Name, remainingTask.Services[0], false)
require.NoError(ctx.ServiceClient.RegisterTask(remainingTask))
@ -1842,7 +1842,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
explicitlyRemovedTaskServiceID := makeTaskServiceID(explicitlyRemovedTask.AllocID,
explicitlyRemovedTaskServiceID := MakeTaskServiceID(explicitlyRemovedTask.AllocID,
explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0], false)
require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask))
@ -1867,7 +1867,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
outofbandTaskServiceID := makeTaskServiceID(outofbandTask.AllocID,
outofbandTaskServiceID := MakeTaskServiceID(outofbandTask.AllocID,
outofbandTask.Name, outofbandTask.Services[0], false)
require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask))

View File

@ -5695,8 +5695,6 @@ const ConnectProxyPrefix = "connect-proxy"
// proxied by this task exists in the task group and contains
// valid Connect config.
func ValidateConnectProxyService(serviceName string, tgServices []*Service) error {
var mErr multierror.Error
found := false
for _, svc := range tgServices {
if svc.Name == serviceName && svc.Connect != nil && svc.Connect.SidecarService != nil {
@ -5706,10 +5704,10 @@ func ValidateConnectProxyService(serviceName string, tgServices []*Service) erro
}
if !found {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Connect proxy service name not found in services from task group"))
return fmt.Errorf("Connect proxy service name not found in services from task group")
}
return mErr.ErrorOrNil()
return nil
}
const (