Merge pull request #7106 from hashicorp/f-ctag-override
client: enable configuring enable_tag_override for services
This commit is contained in:
commit
543354aabe
|
@ -97,17 +97,18 @@ type ServiceCheck struct {
|
|||
// Service represents a Consul service definition.
|
||||
type Service struct {
|
||||
//FIXME Id is unused. Remove?
|
||||
Id string
|
||||
Name string
|
||||
Tags []string
|
||||
CanaryTags []string `mapstructure:"canary_tags"`
|
||||
PortLabel string `mapstructure:"port"`
|
||||
AddressMode string `mapstructure:"address_mode"`
|
||||
Checks []ServiceCheck
|
||||
CheckRestart *CheckRestart `mapstructure:"check_restart"`
|
||||
Connect *ConsulConnect
|
||||
Meta map[string]string
|
||||
CanaryMeta map[string]string
|
||||
Id string
|
||||
Name string
|
||||
Tags []string
|
||||
CanaryTags []string `mapstructure:"canary_tags"`
|
||||
EnableTagOverride bool `mapstructure:"enable_tag_override"`
|
||||
PortLabel string `mapstructure:"port"`
|
||||
AddressMode string `mapstructure:"address_mode"`
|
||||
Checks []ServiceCheck
|
||||
CheckRestart *CheckRestart `mapstructure:"check_restart"`
|
||||
Connect *ConsulConnect
|
||||
Meta map[string]string
|
||||
CanaryMeta map[string]string
|
||||
}
|
||||
|
||||
// Canonicalize the Service by ensuring its name and address mode are set. Task
|
||||
|
|
|
@ -5,11 +5,14 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestService_CheckRestart asserts Service.CheckRestart settings are properly
|
||||
// inherited by Checks.
|
||||
func TestService_CheckRestart(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
job := &Job{Name: stringToPtr("job")}
|
||||
tg := &TaskGroup{Name: stringToPtr("group")}
|
||||
task := &Task{Name: "task"}
|
||||
|
@ -58,6 +61,8 @@ func TestService_CheckRestart(t *testing.T) {
|
|||
// TestService_Connect asserts Service.Connect settings are properly
|
||||
// inherited by Checks.
|
||||
func TestService_Connect(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
job := &Job{Name: stringToPtr("job")}
|
||||
tg := &TaskGroup{Name: stringToPtr("group")}
|
||||
task := &Task{Name: "task"}
|
||||
|
@ -83,3 +88,23 @@ func TestService_Connect(t *testing.T) {
|
|||
assert.Equal(t, proxy.Upstreams[0].DestinationName, "upstream")
|
||||
assert.Equal(t, proxy.LocalServicePort, 8000)
|
||||
}
|
||||
|
||||
func TestService_Tags(t *testing.T) {
|
||||
t.Parallel()
|
||||
r := require.New(t)
|
||||
|
||||
// canonicalize does not modify eto or tags
|
||||
job := &Job{Name: stringToPtr("job")}
|
||||
tg := &TaskGroup{Name: stringToPtr("group")}
|
||||
task := &Task{Name: "task"}
|
||||
service := &Service{
|
||||
Tags: []string{"a", "b"},
|
||||
CanaryTags: []string{"c", "d"},
|
||||
EnableTagOverride: true,
|
||||
}
|
||||
|
||||
service.Canonicalize(task, tg, job)
|
||||
r.True(service.EnableTagOverride)
|
||||
r.Equal([]string{"a", "b"}, service.Tags)
|
||||
r.Equal([]string{"c", "d"}, service.CanaryTags)
|
||||
}
|
||||
|
|
|
@ -97,6 +97,7 @@ func (h *groupServiceHook) Prerun() error {
|
|||
func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
oldWorkloadServices := h.getWorkloadServices()
|
||||
|
||||
// Store new updated values out of request
|
||||
|
|
|
@ -204,3 +204,17 @@ func (c *MockAgent) UpdateTTL(id string, output string, status string) error {
|
|||
c.checkTTLs[id]++
|
||||
return nil
|
||||
}
|
||||
|
||||
// a convenience method for looking up a registered service by name
|
||||
func (c *MockAgent) lookupService(name string) []*api.AgentServiceRegistration {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
var services []*api.AgentServiceRegistration
|
||||
for _, service := range c.services {
|
||||
if service.Name == name {
|
||||
services = append(services, service)
|
||||
}
|
||||
}
|
||||
return services
|
||||
}
|
||||
|
|
|
@ -106,14 +106,56 @@ type ACLsAPI interface {
|
|||
TokenList(q *api.QueryOptions) ([]*api.ACLTokenListEntry, *api.QueryMeta, error)
|
||||
}
|
||||
|
||||
func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.AgentService) bool {
|
||||
return !(reg.Kind == svc.Kind &&
|
||||
reg.ID == svc.ID &&
|
||||
reg.Port == svc.Port &&
|
||||
reg.Address == svc.Address &&
|
||||
reg.Name == svc.Service &&
|
||||
reflect.DeepEqual(reg.Tags, svc.Tags) &&
|
||||
reflect.DeepEqual(reg.Meta, svc.Meta))
|
||||
// agentServiceUpdateRequired checks if any critical fields in Nomad's version
|
||||
// of a service definition are different from the existing service definition as
|
||||
// known by Consul.
|
||||
func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegistration, existing *api.AgentService) bool {
|
||||
switch reason {
|
||||
case syncPeriodic:
|
||||
// In a periodic sync with Consul, we need to respect the value of
|
||||
// the enable_tag_override field so that we maintain the illusion that the
|
||||
// user is in control of the Consul tags, as they may be externally edited
|
||||
// via the Consul catalog API (e.g. a user manually sets them).
|
||||
//
|
||||
// As Consul does by disabling anti-entropy for the tags field, Nomad will
|
||||
// ignore differences in the tags field during the periodic syncs with
|
||||
// the Consul agent API.
|
||||
//
|
||||
// We do so by over-writing the nomad service registration by the value
|
||||
// of the tags that Consul contains, if enable_tag_override = true.
|
||||
maybeTweakTags(wanted, existing)
|
||||
return different(wanted, existing)
|
||||
|
||||
default:
|
||||
// A non-periodic sync with Consul indicates an operation has been set
|
||||
// on the queue. This happens when service has been added / removed / modified
|
||||
// and implies the Consul agent should be sync'd with nomad, because
|
||||
// nomad is the ultimate source of truth for the service definition.
|
||||
return different(wanted, existing)
|
||||
}
|
||||
}
|
||||
|
||||
// maybeTweakTags will override wanted.Tags with a copy of existing.Tags only if
|
||||
// EnableTagOverride is true. Otherwise the wanted service registration is left
|
||||
// unchanged.
|
||||
func maybeTweakTags(wanted *api.AgentServiceRegistration, existing *api.AgentService) {
|
||||
if wanted.EnableTagOverride {
|
||||
wanted.Tags = helper.CopySliceString(existing.Tags)
|
||||
}
|
||||
}
|
||||
|
||||
// different compares the wanted state of the service registration with the actual
|
||||
// (cached) state of the service registration reported by Consul. If any of the
|
||||
// critical fields are not deeply equal, they considered different.
|
||||
func different(wanted *api.AgentServiceRegistration, existing *api.AgentService) bool {
|
||||
return !(wanted.Kind == existing.Kind &&
|
||||
wanted.ID == existing.ID &&
|
||||
wanted.Port == existing.Port &&
|
||||
wanted.Address == existing.Address &&
|
||||
wanted.Name == existing.Service &&
|
||||
wanted.EnableTagOverride == existing.EnableTagOverride &&
|
||||
reflect.DeepEqual(wanted.Meta, existing.Meta) &&
|
||||
reflect.DeepEqual(wanted.Tags, existing.Tags))
|
||||
}
|
||||
|
||||
// operations are submitted to the main loop via commit() for synchronizing
|
||||
|
@ -320,6 +362,18 @@ func (c *ServiceClient) hasSeen() bool {
|
|||
return atomic.LoadInt32(&c.seen) == seen
|
||||
}
|
||||
|
||||
// syncReason indicates why a sync operation with consul is about to happen.
|
||||
//
|
||||
// The trigger for a sync may have implications on the behavior of the sync itself.
|
||||
// In particular, if a service is defined with enable_tag_override=true
|
||||
type syncReason byte
|
||||
|
||||
const (
|
||||
syncPeriodic = iota
|
||||
syncShutdown
|
||||
syncNewOps
|
||||
)
|
||||
|
||||
// Run the Consul main loop which retries operations against Consul. It should
|
||||
// be called exactly once.
|
||||
func (c *ServiceClient) Run() {
|
||||
|
@ -357,16 +411,23 @@ INIT:
|
|||
|
||||
failures := 0
|
||||
for {
|
||||
// On every iteration take note of what the trigger for the next sync
|
||||
// was, so that it may be referenced during the sync itself.
|
||||
var reasonForSync syncReason
|
||||
|
||||
select {
|
||||
case <-retryTimer.C:
|
||||
reasonForSync = syncPeriodic
|
||||
case <-c.shutdownCh:
|
||||
reasonForSync = syncShutdown
|
||||
// Cancel check watcher but sync one last time
|
||||
cancel()
|
||||
case ops := <-c.opCh:
|
||||
reasonForSync = syncNewOps
|
||||
c.merge(ops)
|
||||
}
|
||||
|
||||
if err := c.sync(); err != nil {
|
||||
if err := c.sync(reasonForSync); err != nil {
|
||||
if failures == 0 {
|
||||
// Log on the first failure
|
||||
c.logger.Warn("failed to update services in Consul", "error", err)
|
||||
|
@ -460,7 +521,7 @@ func (c *ServiceClient) merge(ops *operations) {
|
|||
}
|
||||
|
||||
// sync enqueued operations.
|
||||
func (c *ServiceClient) sync() error {
|
||||
func (c *ServiceClient) sync(reason syncReason) error {
|
||||
sreg, creg, sdereg, cdereg := 0, 0, 0, 0
|
||||
|
||||
consulServices, err := c.client.Services()
|
||||
|
@ -518,20 +579,20 @@ func (c *ServiceClient) sync() error {
|
|||
}
|
||||
|
||||
// Add Nomad services missing from Consul, or where the service has been updated.
|
||||
for id, locals := range c.services {
|
||||
for id, local := range c.services {
|
||||
existingSvc, ok := consulServices[id]
|
||||
|
||||
if ok {
|
||||
// There is an existing registration of this service in Consul, so here
|
||||
// we validate to see if the service has been invalidated to see if it
|
||||
// should be updated.
|
||||
if !agentServiceUpdateRequired(locals, existingSvc) {
|
||||
if !agentServiceUpdateRequired(reason, local, existingSvc) {
|
||||
// No Need to update services that have not changed
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if err = c.client.ServiceRegister(locals); err != nil {
|
||||
if err = c.client.ServiceRegister(local); err != nil {
|
||||
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
|
||||
return err
|
||||
}
|
||||
|
@ -746,13 +807,14 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
|
|||
|
||||
// Build the Consul Service registration request
|
||||
serviceReg := &api.AgentServiceRegistration{
|
||||
ID: id,
|
||||
Name: service.Name,
|
||||
Tags: tags,
|
||||
Address: ip,
|
||||
Port: port,
|
||||
Meta: meta,
|
||||
Connect: connect, // will be nil if no Connect stanza
|
||||
ID: id,
|
||||
Name: service.Name,
|
||||
Tags: tags,
|
||||
EnableTagOverride: service.EnableTagOverride,
|
||||
Address: ip,
|
||||
Port: port,
|
||||
Meta: meta,
|
||||
Connect: connect, // will be nil if no Connect stanza
|
||||
}
|
||||
ops.regServices = append(ops.regServices, serviceReg)
|
||||
|
||||
|
@ -868,8 +930,7 @@ func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
|
|||
//
|
||||
// DriverNetwork must not change between invocations for the same allocation.
|
||||
func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error {
|
||||
ops := &operations{}
|
||||
|
||||
ops := new(operations)
|
||||
regs := new(ServiceRegistrations)
|
||||
regs.Services = make(map[string]*ServiceRegistration, len(newWorkload.Services))
|
||||
|
||||
|
@ -984,6 +1045,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
160
command/agent/consul/client_test.go
Normal file
160
command/agent/consul/client_test.go
Normal file
|
@ -0,0 +1,160 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
wanted := api.AgentServiceRegistration{
|
||||
Kind: "service",
|
||||
ID: "_id",
|
||||
Name: "name",
|
||||
Tags: []string{"a", "b"},
|
||||
Port: 9000,
|
||||
Address: "1.1.1.1",
|
||||
EnableTagOverride: true,
|
||||
Meta: map[string]string{"foo": "1"},
|
||||
}
|
||||
|
||||
existing := &api.AgentService{
|
||||
Kind: "service",
|
||||
ID: "_id",
|
||||
Service: "name",
|
||||
Tags: []string{"a", "b"},
|
||||
Port: 9000,
|
||||
Address: "1.1.1.1",
|
||||
EnableTagOverride: true,
|
||||
Meta: map[string]string{"foo": "1"},
|
||||
}
|
||||
|
||||
// By default wanted and existing match. Each test should modify wanted in
|
||||
// 1 way, and / or configure the type of sync operation that is being
|
||||
// considered, then evaluate the result of the update-required algebra.
|
||||
|
||||
type asr = api.AgentServiceRegistration
|
||||
type tweaker func(w asr) *asr // create a conveniently modifiable copy
|
||||
|
||||
try := func(
|
||||
t *testing.T,
|
||||
exp bool,
|
||||
reason syncReason,
|
||||
tweak tweaker) {
|
||||
result := agentServiceUpdateRequired(reason, tweak(wanted), existing)
|
||||
require.Equal(t, exp, result)
|
||||
}
|
||||
|
||||
t.Run("matching", func(t *testing.T) {
|
||||
try(t, false, syncNewOps, func(w asr) *asr {
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different kind", func(t *testing.T) {
|
||||
try(t, true, syncNewOps, func(w asr) *asr {
|
||||
w.Kind = "other"
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different id", func(t *testing.T) {
|
||||
try(t, true, syncNewOps, func(w asr) *asr {
|
||||
w.ID = "_other"
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different port", func(t *testing.T) {
|
||||
try(t, true, syncNewOps, func(w asr) *asr {
|
||||
w.Port = 9001
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different address", func(t *testing.T) {
|
||||
try(t, true, syncNewOps, func(w asr) *asr {
|
||||
w.Address = "2.2.2.2"
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different name", func(t *testing.T) {
|
||||
try(t, true, syncNewOps, func(w asr) *asr {
|
||||
w.Name = "bob"
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different enable_tag_override", func(t *testing.T) {
|
||||
try(t, true, syncNewOps, func(w asr) *asr {
|
||||
w.EnableTagOverride = false
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different meta", func(t *testing.T) {
|
||||
try(t, true, syncNewOps, func(w asr) *asr {
|
||||
w.Meta = map[string]string{"foo": "2"}
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different tags syncNewOps eto->true", func(t *testing.T) {
|
||||
// sync is required even though eto=true, because NewOps indicates the
|
||||
// service definition in nomad has changed (e.g. job run a modified job)
|
||||
try(t, true, syncNewOps, func(w asr) *asr {
|
||||
w.Tags = []string{"other", "tags"}
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different tags syncPeriodic eto->true", func(t *testing.T) {
|
||||
// sync is not required since eto=true and this is a periodic sync
|
||||
// with consul - in which case we keep Consul's definition of the tags
|
||||
try(t, false, syncPeriodic, func(w asr) *asr {
|
||||
w.Tags = []string{"other", "tags"}
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
// for remaining tests, EnableTagOverride = false
|
||||
wanted.EnableTagOverride = false
|
||||
existing.EnableTagOverride = false
|
||||
|
||||
t.Run("different tags : syncPeriodic : eto->false", func(t *testing.T) {
|
||||
// sync is required because eto=false and the tags do not match
|
||||
try(t, true, syncPeriodic, func(w asr) *asr {
|
||||
w.Tags = []string{"other", "tags"}
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different tags : syncNewOps : eto->false", func(t *testing.T) {
|
||||
// sync is required because it was triggered by NewOps and the tags
|
||||
// do not match
|
||||
try(t, true, syncNewOps, func(w asr) *asr {
|
||||
w.Tags = []string{"other", "tags"}
|
||||
return &w
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestSyncLogic_maybeTweakTags(t *testing.T) {
|
||||
t.Parallel()
|
||||
r := require.New(t)
|
||||
|
||||
wanted := &api.AgentServiceRegistration{Tags: []string{"original"}}
|
||||
existing := &api.AgentService{Tags: []string{"other"}}
|
||||
maybeTweakTags(wanted, existing)
|
||||
r.Equal([]string{"original"}, wanted.Tags)
|
||||
|
||||
wantedETO := &api.AgentServiceRegistration{Tags: []string{"original"}, EnableTagOverride: true}
|
||||
existingETO := &api.AgentService{Tags: []string{"other"}, EnableTagOverride: true}
|
||||
maybeTweakTags(wantedETO, existingETO)
|
||||
r.Equal(existingETO.Tags, wantedETO.Tags)
|
||||
r.False(&(existingETO.Tags) == &(wantedETO.Tags))
|
||||
}
|
|
@ -2,6 +2,7 @@ package consul
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
@ -71,18 +72,34 @@ var errNoOps = fmt.Errorf("testing error: no pending operations")
|
|||
|
||||
// syncOps simulates one iteration of the ServiceClient.Run loop and returns
|
||||
// any errors returned by sync() or errNoOps if no pending operations.
|
||||
func (t *testFakeCtx) syncOnce() error {
|
||||
select {
|
||||
case ops := <-t.ServiceClient.opCh:
|
||||
t.ServiceClient.merge(ops)
|
||||
err := t.ServiceClient.sync()
|
||||
func (t *testFakeCtx) syncOnce(reason syncReason) error {
|
||||
switch reason {
|
||||
|
||||
case syncPeriodic:
|
||||
err := t.ServiceClient.sync(syncPeriodic)
|
||||
if err == nil {
|
||||
t.ServiceClient.clearExplicitlyDeregistered()
|
||||
}
|
||||
return err
|
||||
default:
|
||||
return errNoOps
|
||||
|
||||
case syncNewOps:
|
||||
select {
|
||||
case ops := <-t.ServiceClient.opCh:
|
||||
t.ServiceClient.merge(ops)
|
||||
err := t.ServiceClient.sync(syncNewOps)
|
||||
if err == nil {
|
||||
t.ServiceClient.clearExplicitlyDeregistered()
|
||||
}
|
||||
return err
|
||||
default:
|
||||
return errNoOps
|
||||
}
|
||||
|
||||
case syncShutdown:
|
||||
return errors.New("no test for sync due to shutdown")
|
||||
}
|
||||
|
||||
return errors.New("bad sync reason")
|
||||
}
|
||||
|
||||
// setupFake creates a testFakeCtx with a ServiceClient backed by a fakeConsul.
|
||||
|
@ -103,40 +120,83 @@ func setupFake(t *testing.T) *testFakeCtx {
|
|||
}
|
||||
|
||||
func TestConsul_ChangeTags(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := setupFake(t)
|
||||
require := require.New(t)
|
||||
r := require.New(t)
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
|
||||
r.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
r.NoError(ctx.syncOnce(syncNewOps))
|
||||
r.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
|
||||
|
||||
// Validate the alloc registration
|
||||
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID)
|
||||
require.NoError(err)
|
||||
require.NotNil(reg1, "Unexpected nil alloc registration")
|
||||
require.Equal(1, reg1.NumServices())
|
||||
require.Equal(0, reg1.NumChecks())
|
||||
r.NoError(err)
|
||||
r.NotNil(reg1, "Unexpected nil alloc registration")
|
||||
r.Equal(1, reg1.NumServices())
|
||||
r.Equal(0, reg1.NumChecks())
|
||||
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
require.Equal(v.Name, ctx.Workload.Services[0].Name)
|
||||
require.Equal(v.Tags, ctx.Workload.Services[0].Tags)
|
||||
}
|
||||
serviceBefore := ctx.FakeConsul.lookupService("taskname-service")[0]
|
||||
r.Equal(serviceBefore.Name, ctx.Workload.Services[0].Name)
|
||||
r.Equal(serviceBefore.Tags, ctx.Workload.Services[0].Tags)
|
||||
|
||||
// Update the task definition
|
||||
origWorkload := ctx.Workload.Copy()
|
||||
ctx.Workload.Services[0].Tags[0] = "newtag"
|
||||
ctx.Workload.Services[0].Tags[0] = "new-tag"
|
||||
|
||||
// Register and sync the update
|
||||
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
|
||||
r.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
|
||||
r.NoError(ctx.syncOnce(syncNewOps))
|
||||
r.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
|
||||
|
||||
// Validate the metadata changed
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
require.Equal(v.Name, ctx.Workload.Services[0].Name)
|
||||
require.Equal(v.Tags, ctx.Workload.Services[0].Tags)
|
||||
require.Equal("newtag", v.Tags[0])
|
||||
}
|
||||
// Validate the consul service definition changed
|
||||
serviceAfter := ctx.FakeConsul.lookupService("taskname-service")[0]
|
||||
r.Equal(serviceAfter.Name, ctx.Workload.Services[0].Name)
|
||||
r.Equal(serviceAfter.Tags, ctx.Workload.Services[0].Tags)
|
||||
r.Equal("new-tag", serviceAfter.Tags[0])
|
||||
}
|
||||
|
||||
func TestConsul_EnableTagOverride_Syncs(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := setupFake(t)
|
||||
r := require.New(t)
|
||||
|
||||
// Configure our test service to set EnableTagOverride = true
|
||||
ctx.Workload.Services[0].EnableTagOverride = true
|
||||
|
||||
r.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
r.NoError(ctx.syncOnce(syncNewOps))
|
||||
r.Equal(1, len(ctx.FakeConsul.services))
|
||||
|
||||
// Validate the alloc registration
|
||||
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID)
|
||||
r.NoError(err)
|
||||
r.NotNil(reg1)
|
||||
r.Equal(1, reg1.NumServices())
|
||||
r.Equal(0, reg1.NumChecks())
|
||||
|
||||
const service = "taskname-service"
|
||||
|
||||
// sanity check things are what we expect
|
||||
consulServiceDefBefore := ctx.FakeConsul.lookupService(service)[0]
|
||||
r.Equal(ctx.Workload.Services[0].Name, consulServiceDefBefore.Name)
|
||||
r.Equal([]string{"tag1", "tag2"}, consulServiceDefBefore.Tags)
|
||||
r.True(consulServiceDefBefore.EnableTagOverride)
|
||||
|
||||
// manually set the tags in consul
|
||||
ctx.FakeConsul.lookupService(service)[0].Tags = []string{"new", "tags"}
|
||||
|
||||
// do a periodic sync (which will respect EnableTagOverride)
|
||||
r.NoError(ctx.syncOnce(syncPeriodic))
|
||||
r.Equal(1, len(ctx.FakeConsul.services))
|
||||
consulServiceDefAfter := ctx.FakeConsul.lookupService(service)[0]
|
||||
r.Equal([]string{"new", "tags"}, consulServiceDefAfter.Tags) // manually set tags should still be there
|
||||
|
||||
// now do a new-ops sync (which will override EnableTagOverride)
|
||||
r.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
r.NoError(ctx.syncOnce(syncNewOps))
|
||||
r.Equal(1, len(ctx.FakeConsul.services))
|
||||
consulServiceDefUpdated := ctx.FakeConsul.lookupService(service)[0]
|
||||
r.Equal([]string{"tag1", "tag2"}, consulServiceDefUpdated.Tags) // jobspec tags should be set now
|
||||
}
|
||||
|
||||
// TestConsul_ChangePorts asserts that changing the ports on a service updates
|
||||
|
@ -172,7 +232,7 @@ func TestConsul_ChangePorts(t *testing.T) {
|
|||
}
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
|
||||
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
|
@ -234,7 +294,7 @@ func TestConsul_ChangePorts(t *testing.T) {
|
|||
}
|
||||
|
||||
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
|
||||
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
|
@ -284,7 +344,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
|
|||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
if err := ctx.syncOnce(syncNewOps); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
|
||||
|
@ -380,7 +440,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
|
|||
c1ID, upd.remove, upd.checkID)
|
||||
}
|
||||
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
if err := ctx.syncOnce(syncNewOps); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
|
||||
|
@ -474,7 +534,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
|
|||
if err := ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
if err := ctx.syncOnce(syncNewOps); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
|
||||
|
@ -518,7 +578,7 @@ func TestConsul_RegServices(t *testing.T) {
|
|||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
if err := ctx.syncOnce(syncNewOps); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
|
||||
|
@ -582,7 +642,7 @@ func TestConsul_RegServices(t *testing.T) {
|
|||
}
|
||||
|
||||
// Now sync() and re-check for the applied updates
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
if err := ctx.syncOnce(syncNewOps); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
if n := len(ctx.FakeConsul.services); n != 2 {
|
||||
|
@ -606,7 +666,7 @@ func TestConsul_RegServices(t *testing.T) {
|
|||
|
||||
// Remove the new task
|
||||
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
if err := ctx.syncOnce(syncNewOps); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
if n := len(ctx.FakeConsul.services); n != 1 {
|
||||
|
@ -799,7 +859,7 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
|
|||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
if err := ctx.syncOnce(syncNewOps); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
|
||||
|
@ -903,7 +963,7 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
|
|||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
if err := ctx.syncOnce(syncNewOps); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
|
||||
|
@ -964,7 +1024,7 @@ func TestConsul_DriverNetwork_Change(t *testing.T) {
|
|||
}
|
||||
|
||||
syncAndAssertPort := func(port int) {
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
if err := ctx.syncOnce(syncNewOps); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
|
||||
|
@ -1024,7 +1084,7 @@ func TestConsul_CanaryTags(t *testing.T) {
|
|||
ctx.Workload.Services[0].CanaryTags = canaryTags
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
for _, service := range ctx.FakeConsul.services {
|
||||
require.Equal(canaryTags, service.Tags)
|
||||
|
@ -1034,14 +1094,14 @@ func TestConsul_CanaryTags(t *testing.T) {
|
|||
origWorkload := ctx.Workload.Copy()
|
||||
ctx.Workload.Canary = false
|
||||
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
for _, service := range ctx.FakeConsul.services {
|
||||
require.NotEqual(canaryTags, service.Tags)
|
||||
}
|
||||
|
||||
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 0)
|
||||
}
|
||||
|
||||
|
@ -1057,7 +1117,7 @@ func TestConsul_CanaryTags_NoTags(t *testing.T) {
|
|||
ctx.Workload.Services[0].Tags = tags
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
for _, service := range ctx.FakeConsul.services {
|
||||
require.Equal(tags, service.Tags)
|
||||
|
@ -1067,14 +1127,14 @@ func TestConsul_CanaryTags_NoTags(t *testing.T) {
|
|||
origWorkload := ctx.Workload.Copy()
|
||||
ctx.Workload.Canary = false
|
||||
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
for _, service := range ctx.FakeConsul.services {
|
||||
require.Equal(tags, service.Tags)
|
||||
}
|
||||
|
||||
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 0)
|
||||
}
|
||||
|
||||
|
@ -1090,7 +1150,7 @@ func TestConsul_CanaryMeta(t *testing.T) {
|
|||
ctx.Workload.Services[0].CanaryMeta = canaryMeta
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
for _, service := range ctx.FakeConsul.services {
|
||||
require.Equal(canaryMeta, service.Meta)
|
||||
|
@ -1100,14 +1160,14 @@ func TestConsul_CanaryMeta(t *testing.T) {
|
|||
origWorkload := ctx.Workload.Copy()
|
||||
ctx.Workload.Canary = false
|
||||
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
for _, service := range ctx.FakeConsul.services {
|
||||
require.NotEqual(canaryMeta, service.Meta)
|
||||
}
|
||||
|
||||
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 0)
|
||||
}
|
||||
|
||||
|
@ -1124,7 +1184,7 @@ func TestConsul_CanaryMeta_NoMeta(t *testing.T) {
|
|||
ctx.Workload.Services[0].Meta = meta
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
for _, service := range ctx.FakeConsul.services {
|
||||
require.Equal(meta, service.Meta)
|
||||
|
@ -1134,14 +1194,14 @@ func TestConsul_CanaryMeta_NoMeta(t *testing.T) {
|
|||
origWorkload := ctx.Workload.Copy()
|
||||
ctx.Workload.Canary = false
|
||||
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
for _, service := range ctx.FakeConsul.services {
|
||||
require.Equal(meta, service.Meta)
|
||||
}
|
||||
|
||||
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 0)
|
||||
}
|
||||
|
||||
|
@ -1509,7 +1569,7 @@ func TestConsul_ServiceName_Duplicates(t *testing.T) {
|
|||
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
|
||||
require.Len(ctx.FakeConsul.services, 3)
|
||||
|
||||
|
@ -1554,7 +1614,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
|
|||
remainingWorkload.Name(), remainingWorkload.Services[0])
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
require.Len(ctx.FakeConsul.checks, 1)
|
||||
|
||||
|
@ -1578,7 +1638,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
|
|||
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload))
|
||||
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 2)
|
||||
require.Len(ctx.FakeConsul.checks, 2)
|
||||
|
||||
|
@ -1602,7 +1662,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
|
|||
outofbandWorkload.Name(), outofbandWorkload.Services[0])
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
|
||||
require.Len(ctx.FakeConsul.services, 3)
|
||||
|
||||
|
@ -1619,8 +1679,8 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
|
|||
// Sync and ensure that explicitly removed service as well as outofbandWorkload were removed
|
||||
|
||||
ctx.ServiceClient.RemoveWorkload(explicitlyRemovedWorkload)
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.ServiceClient.sync())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.NoError(ctx.ServiceClient.sync(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
require.Len(ctx.FakeConsul.checks, 1)
|
||||
|
||||
|
@ -1663,7 +1723,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
|
|||
remainingWorkload.Name(), remainingWorkload.Services[0])
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
require.Len(ctx.FakeConsul.checks, 1)
|
||||
|
||||
|
@ -1687,7 +1747,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
|
|||
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload))
|
||||
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 2)
|
||||
require.Len(ctx.FakeConsul.checks, 2)
|
||||
|
||||
|
@ -1711,7 +1771,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
|
|||
outofbandWorkload.Name(), outofbandWorkload.Services[0])
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
|
||||
require.Len(ctx.FakeConsul.services, 3)
|
||||
|
||||
|
@ -1728,8 +1788,8 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
|
|||
// Sync and ensure that explicitly removed service was removed, but outofbandWorkload remains
|
||||
|
||||
ctx.ServiceClient.RemoveWorkload(explicitlyRemovedWorkload)
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.ServiceClient.sync())
|
||||
require.NoError(ctx.syncOnce(syncNewOps))
|
||||
require.NoError(ctx.ServiceClient.sync(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 2)
|
||||
require.Len(ctx.FakeConsul.checks, 2)
|
||||
|
||||
|
@ -1744,7 +1804,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
|
|||
// after probation, outofband services and checks are removed
|
||||
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour)
|
||||
|
||||
require.NoError(ctx.ServiceClient.sync())
|
||||
require.NoError(ctx.ServiceClient.sync(syncNewOps))
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
require.Len(ctx.FakeConsul.checks, 1)
|
||||
|
||||
|
|
|
@ -829,13 +829,14 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
|
|||
structsTask.Services = make([]*structs.Service, l)
|
||||
for i, service := range apiTask.Services {
|
||||
structsTask.Services[i] = &structs.Service{
|
||||
Name: service.Name,
|
||||
PortLabel: service.PortLabel,
|
||||
Tags: service.Tags,
|
||||
CanaryTags: service.CanaryTags,
|
||||
AddressMode: service.AddressMode,
|
||||
Meta: helper.CopyMapStringString(service.Meta),
|
||||
CanaryMeta: helper.CopyMapStringString(service.CanaryMeta),
|
||||
Name: service.Name,
|
||||
PortLabel: service.PortLabel,
|
||||
Tags: service.Tags,
|
||||
CanaryTags: service.CanaryTags,
|
||||
EnableTagOverride: service.EnableTagOverride,
|
||||
AddressMode: service.AddressMode,
|
||||
Meta: helper.CopyMapStringString(service.Meta),
|
||||
CanaryMeta: helper.CopyMapStringString(service.CanaryMeta),
|
||||
}
|
||||
|
||||
if l := len(service.Checks); l != 0 {
|
||||
|
@ -1008,13 +1009,14 @@ func ApiServicesToStructs(in []*api.Service) []*structs.Service {
|
|||
out := make([]*structs.Service, len(in))
|
||||
for i, s := range in {
|
||||
out[i] = &structs.Service{
|
||||
Name: s.Name,
|
||||
PortLabel: s.PortLabel,
|
||||
Tags: s.Tags,
|
||||
CanaryTags: s.CanaryTags,
|
||||
AddressMode: s.AddressMode,
|
||||
Meta: helper.CopyMapStringString(s.Meta),
|
||||
CanaryMeta: helper.CopyMapStringString(s.CanaryMeta),
|
||||
Name: s.Name,
|
||||
PortLabel: s.PortLabel,
|
||||
Tags: s.Tags,
|
||||
CanaryTags: s.CanaryTags,
|
||||
EnableTagOverride: s.EnableTagOverride,
|
||||
AddressMode: s.AddressMode,
|
||||
Meta: helper.CopyMapStringString(s.Meta),
|
||||
CanaryMeta: helper.CopyMapStringString(s.CanaryMeta),
|
||||
}
|
||||
|
||||
if l := len(s.Checks); l != 0 {
|
||||
|
|
|
@ -1503,10 +1503,11 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
|
|||
},
|
||||
Services: []*api.Service{
|
||||
{
|
||||
Name: "groupserviceA",
|
||||
Tags: []string{"a", "b"},
|
||||
CanaryTags: []string{"d", "e"},
|
||||
PortLabel: "1234",
|
||||
Name: "groupserviceA",
|
||||
Tags: []string{"a", "b"},
|
||||
CanaryTags: []string{"d", "e"},
|
||||
EnableTagOverride: true,
|
||||
PortLabel: "1234",
|
||||
Meta: map[string]string{
|
||||
"servicemeta": "foobar",
|
||||
},
|
||||
|
@ -1576,11 +1577,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
|
|||
|
||||
Services: []*api.Service{
|
||||
{
|
||||
Id: "id",
|
||||
Name: "serviceA",
|
||||
Tags: []string{"1", "2"},
|
||||
CanaryTags: []string{"3", "4"},
|
||||
PortLabel: "foo",
|
||||
Id: "id",
|
||||
Name: "serviceA",
|
||||
Tags: []string{"1", "2"},
|
||||
CanaryTags: []string{"3", "4"},
|
||||
EnableTagOverride: true,
|
||||
PortLabel: "foo",
|
||||
Meta: map[string]string{
|
||||
"servicemeta": "foobar",
|
||||
},
|
||||
|
@ -1854,11 +1856,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
|
|||
},
|
||||
Services: []*structs.Service{
|
||||
{
|
||||
Name: "groupserviceA",
|
||||
Tags: []string{"a", "b"},
|
||||
CanaryTags: []string{"d", "e"},
|
||||
PortLabel: "1234",
|
||||
AddressMode: "auto",
|
||||
Name: "groupserviceA",
|
||||
Tags: []string{"a", "b"},
|
||||
CanaryTags: []string{"d", "e"},
|
||||
EnableTagOverride: true,
|
||||
PortLabel: "1234",
|
||||
AddressMode: "auto",
|
||||
Meta: map[string]string{
|
||||
"servicemeta": "foobar",
|
||||
},
|
||||
|
@ -1923,11 +1926,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
|
|||
},
|
||||
Services: []*structs.Service{
|
||||
{
|
||||
Name: "serviceA",
|
||||
Tags: []string{"1", "2"},
|
||||
CanaryTags: []string{"3", "4"},
|
||||
PortLabel: "foo",
|
||||
AddressMode: "auto",
|
||||
Name: "serviceA",
|
||||
Tags: []string{"1", "2"},
|
||||
CanaryTags: []string{"3", "4"},
|
||||
EnableTagOverride: true,
|
||||
PortLabel: "foo",
|
||||
AddressMode: "auto",
|
||||
Meta: map[string]string{
|
||||
"servicemeta": "foobar",
|
||||
},
|
||||
|
|
|
@ -41,6 +41,7 @@ func parseService(o *ast.ObjectItem) (*api.Service, error) {
|
|||
"name",
|
||||
"tags",
|
||||
"canary_tags",
|
||||
"enable_tag_override",
|
||||
"port",
|
||||
"check",
|
||||
"address_mode",
|
||||
|
|
|
@ -823,6 +823,25 @@ func TestParse(t *testing.T) {
|
|||
},
|
||||
false,
|
||||
},
|
||||
{
|
||||
"service-enable-tag-override.hcl",
|
||||
&api.Job{
|
||||
ID: helper.StringToPtr("service_eto"),
|
||||
Name: helper.StringToPtr("service_eto"),
|
||||
Type: helper.StringToPtr("service"),
|
||||
TaskGroups: []*api.TaskGroup{{
|
||||
Name: helper.StringToPtr("group"),
|
||||
Tasks: []*api.Task{{
|
||||
Name: "task",
|
||||
Services: []*api.Service{{
|
||||
Name: "example",
|
||||
EnableTagOverride: true,
|
||||
}},
|
||||
}},
|
||||
}},
|
||||
},
|
||||
false,
|
||||
},
|
||||
{
|
||||
"reschedule-job.hcl",
|
||||
&api.Job{
|
||||
|
@ -1046,6 +1065,21 @@ func TestParse(t *testing.T) {
|
|||
},
|
||||
false,
|
||||
},
|
||||
{
|
||||
"tg-service-enable-tag-override.hcl",
|
||||
&api.Job{
|
||||
ID: helper.StringToPtr("group_service_eto"),
|
||||
Name: helper.StringToPtr("group_service_eto"),
|
||||
TaskGroups: []*api.TaskGroup{{
|
||||
Name: helper.StringToPtr("group"),
|
||||
Services: []*api.Service{{
|
||||
Name: "example",
|
||||
EnableTagOverride: true,
|
||||
}},
|
||||
}},
|
||||
},
|
||||
false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
|
|
11
jobspec/test-fixtures/service-enable-tag-override.hcl
Normal file
11
jobspec/test-fixtures/service-enable-tag-override.hcl
Normal file
|
@ -0,0 +1,11 @@
|
|||
job "service_eto" {
|
||||
type = "service"
|
||||
group "group" {
|
||||
task "task" {
|
||||
service {
|
||||
name = "example"
|
||||
enable_tag_override = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
8
jobspec/test-fixtures/tg-service-enable-tag-override.hcl
Normal file
8
jobspec/test-fixtures/tg-service-enable-tag-override.hcl
Normal file
|
@ -0,0 +1,8 @@
|
|||
job "group_service_eto" {
|
||||
group "group" {
|
||||
service {
|
||||
name = "example"
|
||||
enable_tag_override = true
|
||||
}
|
||||
}
|
||||
}
|
|
@ -4,6 +4,8 @@ import (
|
|||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestJobDiff(t *testing.T) {
|
||||
|
@ -1201,20 +1203,22 @@ func TestJobDiff(t *testing.T) {
|
|||
|
||||
func TestTaskGroupDiff(t *testing.T) {
|
||||
cases := []struct {
|
||||
TestCase string
|
||||
Old, New *TaskGroup
|
||||
Expected *TaskGroupDiff
|
||||
Error bool
|
||||
ExpErr bool
|
||||
Contextual bool
|
||||
}{
|
||||
{
|
||||
Old: nil,
|
||||
New: nil,
|
||||
TestCase: "Empty",
|
||||
Old: nil,
|
||||
New: nil,
|
||||
Expected: &TaskGroupDiff{
|
||||
Type: DiffTypeNone,
|
||||
},
|
||||
},
|
||||
{
|
||||
// Primitive only that has different names
|
||||
TestCase: "Primitive only that has different names",
|
||||
Old: &TaskGroup{
|
||||
Name: "foo",
|
||||
Count: 10,
|
||||
|
@ -1229,10 +1233,10 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
Error: true,
|
||||
ExpErr: true,
|
||||
},
|
||||
{
|
||||
// Primitive only that is the same
|
||||
TestCase: "Primitive only that is the same",
|
||||
Old: &TaskGroup{
|
||||
Name: "foo",
|
||||
Count: 10,
|
||||
|
@ -1253,7 +1257,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// Primitive only that has diffs
|
||||
TestCase: "Primitive only that has diffs",
|
||||
Old: &TaskGroup{
|
||||
Name: "foo",
|
||||
Count: 10,
|
||||
|
@ -1288,7 +1292,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// Map diff
|
||||
TestCase: "Map diff",
|
||||
Old: &TaskGroup{
|
||||
Meta: map[string]string{
|
||||
"foo": "foo",
|
||||
|
@ -1320,7 +1324,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// Constraints edited
|
||||
TestCase: "Constraints edited",
|
||||
Old: &TaskGroup{
|
||||
Constraints: []*Constraint{
|
||||
{
|
||||
|
@ -1408,7 +1412,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// Affinities edited
|
||||
TestCase: "Affinities edited",
|
||||
Old: &TaskGroup{
|
||||
Affinities: []*Affinity{
|
||||
{
|
||||
|
@ -1512,8 +1516,8 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// RestartPolicy added
|
||||
Old: &TaskGroup{},
|
||||
TestCase: "RestartPolicy added",
|
||||
Old: &TaskGroup{},
|
||||
New: &TaskGroup{
|
||||
RestartPolicy: &RestartPolicy{
|
||||
Attempts: 1,
|
||||
|
@ -1559,7 +1563,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// RestartPolicy deleted
|
||||
TestCase: "RestartPolicy deleted",
|
||||
Old: &TaskGroup{
|
||||
RestartPolicy: &RestartPolicy{
|
||||
Attempts: 1,
|
||||
|
@ -1606,7 +1610,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// RestartPolicy edited
|
||||
TestCase: "RestartPolicy edited",
|
||||
Old: &TaskGroup{
|
||||
RestartPolicy: &RestartPolicy{
|
||||
Attempts: 1,
|
||||
|
@ -1660,7 +1664,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// RestartPolicy edited with context
|
||||
TestCase: "RestartPolicy edited with context",
|
||||
Contextual: true,
|
||||
Old: &TaskGroup{
|
||||
RestartPolicy: &RestartPolicy{
|
||||
|
@ -1715,8 +1719,8 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// ReschedulePolicy added
|
||||
Old: &TaskGroup{},
|
||||
TestCase: "ReschedulePolicy added",
|
||||
Old: &TaskGroup{},
|
||||
New: &TaskGroup{
|
||||
ReschedulePolicy: &ReschedulePolicy{
|
||||
Attempts: 1,
|
||||
|
@ -1776,7 +1780,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// ReschedulePolicy deleted
|
||||
TestCase: "ReschedulePolicy deleted",
|
||||
Old: &TaskGroup{
|
||||
ReschedulePolicy: &ReschedulePolicy{
|
||||
Attempts: 1,
|
||||
|
@ -1837,7 +1841,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// ReschedulePolicy edited
|
||||
TestCase: "ReschedulePolicy edited",
|
||||
Old: &TaskGroup{
|
||||
ReschedulePolicy: &ReschedulePolicy{
|
||||
Attempts: 1,
|
||||
|
@ -1899,8 +1903,9 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
}, {
|
||||
// ReschedulePolicy edited with context
|
||||
},
|
||||
{
|
||||
TestCase: "ReschedulePolicy edited with context",
|
||||
Contextual: true,
|
||||
Old: &TaskGroup{
|
||||
ReschedulePolicy: &ReschedulePolicy{
|
||||
|
@ -1963,7 +1968,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// Update strategy deleted
|
||||
TestCase: "Update strategy deleted",
|
||||
Old: &TaskGroup{
|
||||
Update: &UpdateStrategy{
|
||||
AutoRevert: true,
|
||||
|
@ -2025,8 +2030,8 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// Update strategy added
|
||||
Old: &TaskGroup{},
|
||||
TestCase: "Update strategy added",
|
||||
Old: &TaskGroup{},
|
||||
New: &TaskGroup{
|
||||
Update: &UpdateStrategy{
|
||||
AutoRevert: true,
|
||||
|
@ -2087,7 +2092,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// Update strategy edited
|
||||
TestCase: "Update strategy edited",
|
||||
Old: &TaskGroup{
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: 5,
|
||||
|
@ -2173,7 +2178,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// Update strategy edited with context
|
||||
TestCase: "Update strategy edited with context",
|
||||
Contextual: true,
|
||||
Old: &TaskGroup{
|
||||
Update: &UpdateStrategy{
|
||||
|
@ -2260,8 +2265,8 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// EphemeralDisk added
|
||||
Old: &TaskGroup{},
|
||||
TestCase: "EphemeralDisk added",
|
||||
Old: &TaskGroup{},
|
||||
New: &TaskGroup{
|
||||
EphemeralDisk: &EphemeralDisk{
|
||||
Migrate: true,
|
||||
|
@ -2300,7 +2305,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// EphemeralDisk deleted
|
||||
TestCase: "EphemeralDisk deleted",
|
||||
Old: &TaskGroup{
|
||||
EphemeralDisk: &EphemeralDisk{
|
||||
Migrate: true,
|
||||
|
@ -2340,7 +2345,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// EphemeralDisk edited
|
||||
TestCase: "EphemeralDisk edited",
|
||||
Old: &TaskGroup{
|
||||
EphemeralDisk: &EphemeralDisk{
|
||||
Migrate: true,
|
||||
|
@ -2387,7 +2392,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// EphemeralDisk edited with context
|
||||
TestCase: "EphemeralDisk edited with context",
|
||||
Contextual: true,
|
||||
Old: &TaskGroup{
|
||||
EphemeralDisk: &EphemeralDisk{
|
||||
|
@ -2433,14 +2438,14 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
// TaskGroup Services edited
|
||||
TestCase: "TaskGroup Services edited",
|
||||
Contextual: true,
|
||||
Old: &TaskGroup{
|
||||
Services: []*Service{
|
||||
{
|
||||
Name: "foo",
|
||||
Name: "foo",
|
||||
EnableTagOverride: false,
|
||||
Checks: []*ServiceCheck{
|
||||
{
|
||||
Name: "foo",
|
||||
|
@ -2472,7 +2477,8 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
New: &TaskGroup{
|
||||
Services: []*Service{
|
||||
{
|
||||
Name: "foo",
|
||||
Name: "foo",
|
||||
EnableTagOverride: true,
|
||||
Checks: []*ServiceCheck{
|
||||
{
|
||||
Name: "foo",
|
||||
|
@ -2522,6 +2528,12 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
Old: "",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "EnableTagOverride",
|
||||
Old: "false",
|
||||
New: "true",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "Name",
|
||||
|
@ -2771,7 +2783,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// TaskGroup Networks edited
|
||||
TestCase: "TaskGroup Networks edited",
|
||||
Contextual: true,
|
||||
Old: &TaskGroup{
|
||||
Networks: Networks{
|
||||
|
@ -2894,7 +2906,7 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// Tasks edited
|
||||
TestCase: "Tasks edited",
|
||||
Old: &TaskGroup{
|
||||
Tasks: []*Task{
|
||||
{
|
||||
|
@ -3012,21 +3024,19 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, c := range cases {
|
||||
actual, err := c.Old.Diff(c.New, c.Contextual)
|
||||
if c.Error && err == nil {
|
||||
t.Fatalf("case %d: expected errored", i+1)
|
||||
} else if err != nil {
|
||||
if !c.Error {
|
||||
t.Fatalf("case %d: errored %#v", i+1, err)
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
require.NotEmpty(t, c.TestCase, "case #%d needs a name", i+1)
|
||||
|
||||
if !reflect.DeepEqual(actual, c.Expected) {
|
||||
t.Fatalf("case %d: got:\n%#v\n want:\n%#v\n",
|
||||
i+1, actual, c.Expected)
|
||||
}
|
||||
t.Run(c.TestCase, func(t *testing.T) {
|
||||
result, err := c.Old.Diff(c.New, c.Contextual)
|
||||
switch c.ExpErr {
|
||||
case true:
|
||||
require.Error(t, err, "case %q expected error", c.TestCase)
|
||||
case false:
|
||||
require.NoError(t, err, "case %q expected no error", c.TestCase)
|
||||
require.True(t, reflect.DeepEqual(result, c.Expected),
|
||||
"case %q got\n%#v\nwant:\n%#v\n", c.TestCase, result, c.Expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4441,6 +4451,12 @@ func TestTaskDiff(t *testing.T) {
|
|||
Type: DiffTypeAdded,
|
||||
Name: "Service",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "EnableTagOverride",
|
||||
Old: "",
|
||||
New: "false",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "Name",
|
||||
|
@ -4459,6 +4475,12 @@ func TestTaskDiff(t *testing.T) {
|
|||
Type: DiffTypeDeleted,
|
||||
Name: "Service",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "EnableTagOverride",
|
||||
Old: "false",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "Name",
|
||||
|
@ -4506,8 +4528,15 @@ func TestTaskDiff(t *testing.T) {
|
|||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "AddressMode",
|
||||
Old: "",
|
||||
New: "driver",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "EnableTagOverride",
|
||||
Old: "false",
|
||||
New: "false",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "Name",
|
||||
|
@ -4525,6 +4554,37 @@ func TestTaskDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "Service EnableTagOverride edited no context",
|
||||
Contextual: false,
|
||||
Old: &Task{
|
||||
Services: []*Service{{
|
||||
EnableTagOverride: false,
|
||||
}},
|
||||
},
|
||||
New: &Task{
|
||||
Services: []*Service{{
|
||||
EnableTagOverride: true,
|
||||
}},
|
||||
},
|
||||
Expected: &TaskDiff{
|
||||
Type: DiffTypeEdited,
|
||||
Objects: []*ObjectDiff{
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "Service",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "EnableTagOverride",
|
||||
Old: "false",
|
||||
New: "true",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "Services tags edited (no checks) with context",
|
||||
Contextual: true,
|
||||
|
@ -4605,6 +4665,12 @@ func TestTaskDiff(t *testing.T) {
|
|||
Type: DiffTypeNone,
|
||||
Name: "AddressMode",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "EnableTagOverride",
|
||||
Old: "false",
|
||||
New: "false",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "Name",
|
||||
|
@ -4990,6 +5056,12 @@ func TestTaskDiff(t *testing.T) {
|
|||
Old: "",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "EnableTagOverride",
|
||||
Old: "false",
|
||||
New: "false",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "Name",
|
||||
|
|
|
@ -326,6 +326,14 @@ type Service struct {
|
|||
// this service.
|
||||
AddressMode string
|
||||
|
||||
// EnableTagOverride will disable Consul's anti-entropy mechanism for the
|
||||
// tags of this service. External updates to the service definition via
|
||||
// Consul will not be corrected to match the service definition set in the
|
||||
// Nomad job specification.
|
||||
//
|
||||
// https://www.consul.io/docs/agent/services.html#service-definition
|
||||
EnableTagOverride bool
|
||||
|
||||
Tags []string // List of tags for the service
|
||||
CanaryTags []string // List of tags for the service when it is a canary
|
||||
Checks []*ServiceCheck // List of checks associated with the service
|
||||
|
@ -388,7 +396,7 @@ func (s *Service) Canonicalize(job string, taskGroup string, task string) {
|
|||
}
|
||||
}
|
||||
|
||||
// Validate checks if the Check definition is valid
|
||||
// Validate checks if the Service definition is valid
|
||||
func (s *Service) Validate() error {
|
||||
var mErr multierror.Error
|
||||
|
||||
|
@ -436,7 +444,7 @@ func (s *Service) Validate() error {
|
|||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// ValidateName checks if the services Name is valid and should be called after
|
||||
// ValidateName checks if the service Name is valid and should be called after
|
||||
// the name has been interpolated
|
||||
func (s *Service) ValidateName(name string) error {
|
||||
// Ensure the service name is valid per RFC-952 §1
|
||||
|
@ -471,6 +479,7 @@ func (s *Service) Hash(allocID, taskName string, canary bool) string {
|
|||
if len(s.CanaryMeta) > 0 {
|
||||
fmt.Fprintf(h, "%v", s.CanaryMeta)
|
||||
}
|
||||
fmt.Fprintf(h, "%t", s.EnableTagOverride)
|
||||
|
||||
// Vary ID on whether or not CanaryTags will be used
|
||||
if canary {
|
||||
|
@ -539,6 +548,10 @@ OUTER:
|
|||
return false
|
||||
}
|
||||
|
||||
if s.EnableTagOverride != o.EnableTagOverride {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
|
@ -2621,6 +2621,9 @@ func TestService_Equals(t *testing.T) {
|
|||
|
||||
o.Connect = &ConsulConnect{Native: true}
|
||||
assertDiff()
|
||||
|
||||
o.EnableTagOverride = true
|
||||
assertDiff()
|
||||
}
|
||||
|
||||
func TestJob_ExpandServiceNames(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue