sidecar_task override in connect admission controller (#6140)

* structs: use seperate SidecarTask struct for sidecar_task stanza and add merge

* nomad: merge SidecarTask into proxy task during connect Mutate hook
This commit is contained in:
Nick Ethier 2019-08-20 01:22:46 -04:00 committed by GitHub
parent 84880f5fbc
commit 24f5a4c276
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 456 additions and 27 deletions

View File

@ -135,7 +135,7 @@ func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
type ConsulConnect struct {
Native bool
SidecarService *ConsulSidecarService `mapstructure:"sidecar_service"`
SidecarTask *Task `mapstructure:"sidecar_task"`
SidecarTask *SidecarTask `mapstructure:"sidecar_task"`
}
// ConsulSidecarService represents a Consul Connect SidecarService jobspec
@ -145,6 +145,22 @@ type ConsulSidecarService struct {
Proxy *ConsulProxy
}
// SidecarTask represents a subset of Task fields that can be set to override
// the fields of the Task generated for the sidecar
type SidecarTask struct {
Name string
Driver string
User string
Config map[string]interface{}
Env map[string]string
Resources *Resources
Meta map[string]string
KillTimeout *time.Duration `mapstructure:"kill_timeout"`
LogConfig *LogConfig `mapstructure:"logs"`
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"`
KillSignal string `mapstructure:"kill_signal"`
}
// ConsulProxy represents a Consul Connect sidecar proxy jobspec stanza.
type ConsulProxy struct {
Upstreams []*ConsulUpstream

View File

@ -1082,7 +1082,31 @@ func ApiConsulConnectToStructs(in *api.ConsulConnect) *structs.ConsulConnect {
}
if in.SidecarTask != nil {
ApiTaskToStructsTask(in.SidecarTask, out.SidecarTask)
out.SidecarTask = &structs.SidecarTask{
Name: in.SidecarTask.Name,
Driver: in.SidecarTask.Driver,
Config: in.SidecarTask.Config,
User: in.SidecarTask.User,
Env: in.SidecarTask.Env,
Resources: ApiResourcesToStructs(in.SidecarTask.Resources),
Meta: in.SidecarTask.Meta,
LogConfig: &structs.LogConfig{},
ShutdownDelay: in.SidecarTask.ShutdownDelay,
KillSignal: in.SidecarTask.KillSignal,
}
if in.SidecarTask.KillTimeout != nil {
out.SidecarTask.KillTimeout = in.SidecarTask.KillTimeout
}
if in.SidecarTask.LogConfig != nil {
out.SidecarTask.LogConfig = &structs.LogConfig{}
if in.SidecarTask.LogConfig.MaxFiles != nil {
out.SidecarTask.LogConfig.MaxFiles = *in.SidecarTask.LogConfig.MaxFiles
}
if in.SidecarTask.LogConfig.MaxFileSizeMB != nil {
out.SidecarTask.LogConfig.MaxFileSizeMB = *in.SidecarTask.LogConfig.MaxFileSizeMB
}
}
}
return out

View File

@ -164,7 +164,7 @@ func parseConnect(co *ast.ObjectItem) (*api.ConsulConnect, error) {
return nil, fmt.Errorf("only one 'sidecar_task' block allowed per task")
}
t, err := parseTask(o.Items[0])
t, err := parseSidecarTask(o.Items[0])
if err != nil {
return nil, fmt.Errorf("sidecar_task, %v", err)
}
@ -228,6 +228,75 @@ func parseSidecarService(o *ast.ObjectItem) (*api.ConsulSidecarService, error) {
return &sidecar, nil
}
func parseSidecarTask(item *ast.ObjectItem) (*api.SidecarTask, error) {
// We need this later
var listVal *ast.ObjectList
if ot, ok := item.Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return nil, fmt.Errorf("should be an object")
}
// Check for invalid keys
valid := []string{
"config",
"driver",
"env",
"kill_timeout",
"logs",
"meta",
"resources",
"shutdown_delay",
"user",
"kill_signal",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return nil, err
}
task, err := parseTask(item)
if err != nil {
return nil, err
}
sidecarTask := &api.SidecarTask{
Name: task.Name,
Driver: task.Driver,
User: task.User,
Config: task.Config,
Env: task.Env,
Resources: task.Resources,
Meta: task.Meta,
KillTimeout: task.KillTimeout,
LogConfig: task.LogConfig,
KillSignal: task.KillSignal,
}
// Parse ShutdownDelay seperatly to get pointer
var m map[string]interface{}
if err := hcl.DecodeObject(&m, item.Val); err != nil {
return nil, err
}
m = map[string]interface{}{
"shutdown_delay": m["shutdown_delay"],
}
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: sidecarTask,
})
if err != nil {
return nil, err
}
if err := dec.Decode(m); err != nil {
return nil, err
}
return sidecarTask, nil
}
func parseProxy(o *ast.ObjectItem) (*api.ConsulProxy, error) {
valid := []string{
"upstreams",

View File

@ -925,7 +925,7 @@ func TestParse(t *testing.T) {
},
},
},
SidecarTask: &api.Task{
SidecarTask: &api.SidecarTask{
Resources: &api.Resources{
CPU: helper.IntToPtr(500),
MemoryMB: helper.IntToPtr(1024),
@ -933,6 +933,7 @@ func TestParse(t *testing.T) {
Env: map[string]string{
"FOO": "abc",
},
ShutdownDelay: helper.TimeToPtr(5 * time.Second),
},
},
},

View File

@ -34,6 +34,8 @@ job "foo" {
env {
FOO = "abc"
}
shutdown_delay = "5s"
}
}
}

View File

@ -9,21 +9,35 @@ import (
)
var (
// connectSidecarResources is the set of resources used by default for the
// Consul Connect sidecar task
connectSidecarResources = &structs.Resources{
CPU: 250,
MemoryMB: 128,
// connectSidecarResources returns the set of resources used by default for
// the Consul Connect sidecar task
connectSidecarResources = func() *structs.Resources {
return &structs.Resources{
CPU: 250,
MemoryMB: 128,
}
}
// connectDriverConfig is the driver configuration used by the injected
// connect proxy sidecar task
connectDriverConfig = map[string]interface{}{
"image": "${meta.connect.sidecar_image}",
"args": []interface{}{
"-c", "${NOMAD_TASK_DIR}/bootstrap.json",
"-l", "${meta.connect.log_level}",
},
}
// connectVersionConstraint is used when building the sidecar task to ensure
// the proper Consul version is used that supports the nessicary Connect
// features. This includes bootstraping envoy with a unix socket for Consul's
// grpc xDS api.
connectVersionConstraint = &structs.Constraint{
LTarget: "${attr.consul.version}",
RTarget: ">= 1.6.0beta1",
Operand: "version",
connectVersionConstraint = func() *structs.Constraint {
return &structs.Constraint{
LTarget: "${attr.consul.version}",
RTarget: ">= 1.6.0beta1",
Operand: "version",
}
}
)
@ -94,7 +108,9 @@ func groupConnectHook(g *structs.TaskGroup) error {
g.Tasks = append(g.Tasks, task)
}
//TODO merge in sidecar_task overrides
if service.Connect.SidecarTask != nil {
service.Connect.SidecarTask.MergeIntoTask(task)
}
// port to be added for the sidecar task's proxy port
port := structs.Port{
@ -126,24 +142,18 @@ func groupConnectHook(g *structs.TaskGroup) error {
func newConnectTask(service *structs.Service) *structs.Task {
task := &structs.Task{
// Name is used in container name so must start with '[A-Za-z0-9]'
Name: fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, service.Name),
Kind: structs.TaskKind(fmt.Sprintf("%s:%s", structs.ConnectProxyPrefix, service.Name)),
Driver: "docker",
Config: map[string]interface{}{
"image": "${meta.connect.sidecar_image}",
"args": []string{
"-c", "${NOMAD_TASK_DIR}/bootstrap.json",
"-l", "${meta.connect.log_level}",
},
},
Name: fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, service.Name),
Kind: structs.TaskKind(fmt.Sprintf("%s:%s", structs.ConnectProxyPrefix, service.Name)),
Driver: "docker",
Config: connectDriverConfig,
ShutdownDelay: 5 * time.Second,
LogConfig: &structs.LogConfig{
MaxFiles: 2,
MaxFileSizeMB: 2,
},
Resources: connectSidecarResources,
Resources: connectSidecarResources(),
Constraints: structs.Constraints{
connectVersionConstraint,
connectVersionConstraint(),
},
}

View File

@ -176,6 +176,101 @@ func TestJobEndpoint_Register_Connect(t *testing.T) {
}
func TestJobEndpoint_Register_ConnectWithSidecarTask(t *testing.T) {
require := require.New(t)
t.Parallel()
s1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
job.TaskGroups[0].Networks = structs.Networks{
{
Mode: "bridge",
},
}
job.TaskGroups[0].Services = []*structs.Service{
{
Name: "backend",
PortLabel: "8080",
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{},
SidecarTask: &structs.SidecarTask{
Meta: map[string]string{
"source": "test",
},
Resources: &structs.Resources{
CPU: 500,
},
Config: map[string]interface{}{
"labels": map[string]string{
"foo": "bar",
},
},
},
},
},
}
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
// Fetch the response
var resp structs.JobRegisterResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
// Check for the node in the FSM
state := s1.fsm.State()
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.Namespace, job.ID)
require.NoError(err)
require.NotNil(out)
require.Equal(resp.JobModifyIndex, out.CreateIndex)
// Check that the sidecar task was injected
require.Len(out.TaskGroups[0].Tasks, 2)
sidecarTask := out.TaskGroups[0].Tasks[1]
require.Equal("connect-proxy-backend", sidecarTask.Name)
require.Equal("connect-proxy:backend", string(sidecarTask.Kind))
require.Equal("connect-proxy-backend", out.TaskGroups[0].Networks[0].DynamicPorts[0].Label)
// Check that the correct fields were overridden from the sidecar_task stanza
require.Equal("test", sidecarTask.Meta["source"])
require.Equal(500, sidecarTask.Resources.CPU)
require.Equal(connectSidecarResources().MemoryMB, sidecarTask.Resources.MemoryMB)
cfg := connectDriverConfig
cfg["labels"] = map[string]interface{}{
"foo": "bar",
}
require.Equal(cfg, sidecarTask.Config)
// Check that round tripping the job doesn't change the sidecarTask
out.Meta["test"] = "abc"
req.Job = out
require.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
// Check for the new node in the FSM
state = s1.fsm.State()
ws = memdb.NewWatchSet()
out, err = state.JobByID(ws, job.Namespace, job.ID)
require.NoError(err)
require.NotNil(out)
require.Equal(resp.JobModifyIndex, out.CreateIndex)
require.Len(out.TaskGroups[0].Tasks, 2)
require.Exactly(sidecarTask, out.TaskGroups[0].Tasks[1])
}
func TestJobEndpoint_Register_ACL(t *testing.T) {
t.Parallel()

View File

@ -15,6 +15,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/args"
"github.com/mitchellh/copystructure"
)
const (
@ -517,7 +518,7 @@ type ConsulConnect struct {
SidecarService *ConsulSidecarService
// SidecarTask is non-nil if sidecar overrides are set
SidecarTask *Task
SidecarTask *SidecarTask
}
// Copy the stanza recursively. Returns nil if nil.
@ -600,6 +601,148 @@ func (s *ConsulSidecarService) Equals(o *ConsulSidecarService) bool {
return s.Proxy.Equals(o.Proxy)
}
// SidecarTask represents a subset of Task fields that are able to be overridden
// from the sidecar_task stanza
type SidecarTask struct {
// Name of the task
Name string
// Driver is used to control which driver is used
Driver string
// User is used to determine which user will run the task. It defaults to
// the same user the Nomad client is being run as.
User string
// Config is provided to the driver to initialize
Config map[string]interface{}
// Map of environment variables to be used by the driver
Env map[string]string
// Resources is the resources needed by this task
Resources *Resources
// Meta is used to associate arbitrary metadata with this
// task. This is opaque to Nomad.
Meta map[string]string
// KillTimeout is the time between signaling a task that it will be
// killed and killing it.
KillTimeout *time.Duration
// LogConfig provides configuration for log rotation
LogConfig *LogConfig
// ShutdownDelay is the duration of the delay between deregistering a
// task from Consul and sending it a signal to shutdown. See #2441
ShutdownDelay *time.Duration
// KillSignal is the kill signal to use for the task. This is an optional
// specification and defaults to SIGINT
KillSignal string
}
func (t *SidecarTask) Copy() *SidecarTask {
if t == nil {
return nil
}
nt := new(SidecarTask)
*nt = *t
nt.Env = helper.CopyMapStringString(nt.Env)
nt.Resources = nt.Resources.Copy()
nt.LogConfig = nt.LogConfig.Copy()
nt.Meta = helper.CopyMapStringString(nt.Meta)
if i, err := copystructure.Copy(nt.Config); err != nil {
panic(err.Error())
} else {
nt.Config = i.(map[string]interface{})
}
if t.KillTimeout != nil {
nt.KillTimeout = helper.TimeToPtr(*t.KillTimeout)
}
if t.ShutdownDelay != nil {
nt.ShutdownDelay = helper.TimeToPtr(*t.ShutdownDelay)
}
return nt
}
// MergeIntoTask merges the SidecarTask fields over the given task
func (t *SidecarTask) MergeIntoTask(task *Task) {
if t.Name != "" {
task.Name = t.Name
}
// If the driver changes then the driver config can be overwritten.
// Otherwise we'll merge the driver config together
if t.Driver != "" && t.Driver != task.Driver {
task.Driver = t.Driver
task.Config = t.Config
} else {
for k, v := range t.Config {
task.Config[k] = v
}
}
if t.User != "" {
task.User = t.User
}
if t.Env != nil {
if task.Env == nil {
task.Env = t.Env
} else {
for k, v := range t.Env {
task.Env[k] = v
}
}
}
if t.Resources != nil {
task.Resources.Merge(t.Resources)
}
if t.Meta != nil {
if task.Meta == nil {
task.Meta = t.Meta
} else {
for k, v := range t.Meta {
task.Meta[k] = v
}
}
}
if t.KillTimeout != nil {
task.KillTimeout = *t.KillTimeout
}
if t.LogConfig != nil {
if task.LogConfig == nil {
task.LogConfig = t.LogConfig
} else {
if t.LogConfig.MaxFiles > 0 {
task.LogConfig.MaxFiles = t.LogConfig.MaxFiles
}
if t.LogConfig.MaxFileSizeMB > 0 {
task.LogConfig.MaxFileSizeMB = t.LogConfig.MaxFileSizeMB
}
}
}
if t.ShutdownDelay != nil {
task.ShutdownDelay = *t.ShutdownDelay
}
if t.KillSignal != "" {
task.KillSignal = t.KillSignal
}
}
// ConsulProxy represents a Consul Connect sidecar proxy jobspec stanza.
type ConsulProxy struct {
// Upstreams configures the upstream services this service intends to

View File

@ -2,7 +2,9 @@ package structs
import (
"testing"
"time"
"github.com/hashicorp/nomad/helper"
"github.com/stretchr/testify/require"
)
@ -60,3 +62,59 @@ func TestConsulConnect_CopyEquals(t *testing.T) {
o.SidecarService.Proxy.Upstreams = nil
require.False(t, c.Equals(o))
}
func TestSidecarTask_MergeIntoTask(t *testing.T) {
task := MockJob().TaskGroups[0].Tasks[0]
sTask := &SidecarTask{
Name: "sidecar",
Driver: "sidecar",
User: "test",
Config: map[string]interface{}{
"foo": "bar",
},
Resources: &Resources{
CPU: 10000,
MemoryMB: 10000,
},
Env: map[string]string{
"sidecar": "proxy",
},
Meta: map[string]string{
"abc": "123",
},
KillTimeout: helper.TimeToPtr(15 * time.Second),
LogConfig: &LogConfig{
MaxFiles: 3,
},
ShutdownDelay: helper.TimeToPtr(5 * time.Second),
KillSignal: "SIGABRT",
}
expected := task.Copy()
expected.Name = "sidecar"
expected.Driver = "sidecar"
expected.User = "test"
expected.Config = map[string]interface{}{
"foo": "bar",
}
expected.Resources.CPU = 10000
expected.Resources.MemoryMB = 10000
expected.Env["sidecar"] = "proxy"
expected.Meta["abc"] = "123"
expected.KillTimeout = 15 * time.Second
expected.LogConfig.MaxFiles = 3
expected.ShutdownDelay = 5 * time.Second
expected.KillSignal = "SIGABRT"
sTask.MergeIntoTask(task)
require.Exactly(t, expected, task)
// Check that changing just driver config doesn't replace map
sTask.Config["abc"] = 123
expected.Config["abc"] = 123
sTask.MergeIntoTask(task)
require.Exactly(t, expected, task)
}

View File

@ -5106,6 +5106,16 @@ type LogConfig struct {
MaxFileSizeMB int
}
func (l *LogConfig) Copy() *LogConfig {
if l == nil {
return nil
}
return &LogConfig{
MaxFiles: l.MaxFiles,
MaxFileSizeMB: l.MaxFileSizeMB,
}
}
// DefaultLogConfig returns the default LogConfig values.
func DefaultLogConfig() *LogConfig {
return &LogConfig{
@ -5229,6 +5239,7 @@ func (t *Task) Copy() *Task {
nt.Vault = nt.Vault.Copy()
nt.Resources = nt.Resources.Copy()
nt.LogConfig = nt.LogConfig.Copy()
nt.Meta = helper.CopyMapStringString(nt.Meta)
nt.DispatchPayload = nt.DispatchPayload.Copy()