client: refactor common service registration objects from Consul.

This commit performs refactoring to pull out common service
registration objects into a new `client/serviceregistration`
package. This new package will form the base point for all
client specific service registration functionality.

The Consul specific implementation is not moved as it also
includes non-service registration implementations; this reduces
the blast radius of the changes as well.
This commit is contained in:
James Rasell 2022-03-15 09:38:30 +01:00
parent 018a3e9f25
commit 7cd28a6fb6
No known key found for this signature in database
GPG Key ID: AA7D460F5C8377AA
38 changed files with 1245 additions and 987 deletions

View File

@ -9,9 +9,8 @@ import (
"github.com/hashicorp/consul/api"
hclog "github.com/hashicorp/go-hclog"
cconsul "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/serviceregistration"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -56,7 +55,7 @@ type Tracker struct {
allocUpdates *cstructs.AllocListener
// consulClient is used to look up the state of the task's checks
consulClient cconsul.ConsulServiceAPI
consulClient serviceregistration.Handler
// healthy is used to signal whether we have determined the allocation to be
// healthy or unhealthy
@ -93,7 +92,7 @@ type Tracker struct {
// listener and consul API object are given so that the watcher can detect
// health changes.
func NewTracker(parentCtx context.Context, logger hclog.Logger, alloc *structs.Allocation,
allocUpdates *cstructs.AllocListener, consulClient cconsul.ConsulServiceAPI,
allocUpdates *cstructs.AllocListener, consulClient serviceregistration.Handler,
minHealthyTime time.Duration, useChecks bool) *Tracker {
// Do not create a named sub-logger as the hook controlling
@ -377,7 +376,7 @@ func (t *Tracker) watchConsulEvents() {
consulChecksErr := false
// allocReg are the registered objects in Consul for the allocation
var allocReg *consul.AllocRegistration
var allocReg *serviceregistration.AllocRegistration
OUTER:
for {
@ -482,7 +481,7 @@ OUTER:
type taskHealthState struct {
task *structs.Task
state *structs.TaskState
taskRegistrations *consul.ServiceRegistrations
taskRegistrations *serviceregistration.ServiceRegistrations
}
// event takes the deadline time for the allocation to be healthy and the update

View File

@ -8,9 +8,9 @@ import (
"time"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/serviceregistration"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
cstructs "github.com/hashicorp/nomad/client/structs"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
@ -39,9 +39,9 @@ func TestTracker_Checks_Healthy(t *testing.T) {
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
taskRegs := map[string]*agentconsul.ServiceRegistrations{
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
Services: map[string]*serviceregistration.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
@ -59,13 +59,13 @@ func TestTracker_Checks_Healthy(t *testing.T) {
// Don't reply on the first call
var called uint64
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
consul := regMock.NewServiceRegistrationHandler(logger)
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}
reg := &agentconsul.AllocRegistration{
reg := &serviceregistration.AllocRegistration{
Tasks: taskRegs,
}
@ -111,7 +111,7 @@ func TestTracker_Checks_PendingPostStop_Healthy(t *testing.T) {
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
consul := consul.NewMockConsulServiceClient(t, logger)
consul := regMock.NewServiceRegistrationHandler(logger)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
@ -152,7 +152,7 @@ func TestTracker_Succeeded_PostStart_Healthy(t *testing.T) {
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
consul := consul.NewMockConsulServiceClient(t, logger)
consul := regMock.NewServiceRegistrationHandler(logger)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
@ -199,9 +199,9 @@ func TestTracker_Checks_Unhealthy(t *testing.T) {
Name: task.Services[0].Checks[1].Name,
Status: consulapi.HealthCritical,
}
taskRegs := map[string]*agentconsul.ServiceRegistrations{
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
Services: map[string]*serviceregistration.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
@ -219,13 +219,13 @@ func TestTracker_Checks_Unhealthy(t *testing.T) {
// Don't reply on the first call
var called uint64
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
consul := regMock.NewServiceRegistrationHandler(logger)
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}
reg := &agentconsul.AllocRegistration{
reg := &serviceregistration.AllocRegistration{
Tasks: taskRegs,
}
@ -341,9 +341,9 @@ func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) {
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
taskRegs := map[string]*agentconsul.ServiceRegistrations{
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
Services: map[string]*serviceregistration.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
@ -361,13 +361,13 @@ func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) {
// Don't reply on the first call
var called uint64
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
consul := regMock.NewServiceRegistrationHandler(logger)
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}
reg := &agentconsul.AllocRegistration{
reg := &serviceregistration.AllocRegistration{
Tasks: taskRegs,
}
@ -480,9 +480,9 @@ func TestTracker_Checks_OnUpdate(t *testing.T) {
Name: task.Services[0].Checks[0].Name,
Status: tc.consulResp,
}
taskRegs := map[string]*agentconsul.ServiceRegistrations{
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
Services: map[string]*serviceregistration.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
@ -503,13 +503,13 @@ func TestTracker_Checks_OnUpdate(t *testing.T) {
// Don't reply on the first call
var called uint64
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
consul := regMock.NewServiceRegistrationHandler(logger)
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}
reg := &agentconsul.AllocRegistration{
reg := &serviceregistration.AllocRegistration{
Tasks: taskRegs,
}

View File

@ -22,6 +22,7 @@ import (
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
"github.com/hashicorp/nomad/client/serviceregistration"
cstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
@ -63,7 +64,7 @@ type allocRunner struct {
// consulClient is the client used by the consul service hook for
// registering services and checks
consulClient consul.ConsulServiceAPI
consulClient serviceregistration.Handler
// consulProxiesClient is the client used by the envoy version hook for
// looking up supported envoy versions of the consul agent.

View File

@ -11,9 +11,9 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/allochealth"
"github.com/hashicorp/nomad/client/allocwatcher"
cconsul "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/serviceregistration"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
@ -577,9 +577,9 @@ func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) {
})
// Get consul client operations
consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
consulClient := conf.Consul.(*regMock.ServiceRegistrationHandler)
consulOpts := consulClient.GetOps()
var groupRemoveOp cconsul.MockConsulOp
var groupRemoveOp regMock.Operation
for _, op := range consulOpts {
// Grab the first deregistration request
if op.Op == "remove" && op.Name == "group-web" {
@ -1030,12 +1030,12 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) {
defer cleanup()
// Only return the check as healthy after a duration
consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
return &consul.AllocRegistration{
Tasks: map[string]*consul.ServiceRegistrations{
consulClient := conf.Consul.(*regMock.ServiceRegistrationHandler)
consulClient.AllocRegistrationsFn = func(allocID string) (*serviceregistration.AllocRegistration, error) {
return &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*consul.ServiceRegistration{
Services: map[string]*serviceregistration.ServiceRegistration{
"123": {
Service: &api.AgentService{Service: "fakeservice"},
Checks: []*api.AgentCheck{checkUnhealthy},
@ -1352,12 +1352,12 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
return &consul.AllocRegistration{
Tasks: map[string]*consul.ServiceRegistrations{
consulClient := conf.Consul.(*regMock.ServiceRegistrationHandler)
consulClient.AllocRegistrationsFn = func(allocID string) (*serviceregistration.AllocRegistration, error) {
return &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*consul.ServiceRegistration{
Services: map[string]*serviceregistration.ServiceRegistration{
"123": {
Service: &api.AgentService{Service: "fakeservice"},
Checks: []*api.AgentCheck{checkHealthy},

View File

@ -11,7 +11,7 @@ import (
"testing"
"time"
"github.com/hashicorp/nomad/client/consul"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
@ -125,7 +125,7 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
// - removal during exited is de-duped due to prekill
// - removal during stop is de-duped due to prekill
// 1 removal group during stop
consulOps := conf2.Consul.(*consul.MockConsulServiceClient).GetOps()
consulOps := conf2.Consul.(*regMock.ServiceRegistrationHandler).GetOps()
require.Len(t, consulOps, 2)
for _, op := range consulOps {
require.Equal(t, "remove", op.Op)

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
"github.com/hashicorp/nomad/client/serviceregistration"
cstate "github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/nomad/structs"
@ -31,7 +32,7 @@ type Config struct {
StateDB cstate.StateDB
// Consul is the Consul client used to register task services and checks
Consul consul.ConsulServiceAPI
Consul serviceregistration.Handler
// ConsulProxies is the Consul client used to lookup supported envoy versions
// of the Consul agent.

View File

@ -7,7 +7,7 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
@ -27,7 +27,7 @@ type groupServiceHook struct {
allocID string
group string
restarter agentconsul.WorkloadRestarter
consulClient consul.ConsulServiceAPI
consulClient serviceregistration.Handler
consulNamespace string
prerun bool
deregistered bool
@ -51,7 +51,7 @@ type groupServiceHook struct {
type groupServiceHookConfig struct {
alloc *structs.Allocation
consul consul.ConsulServiceAPI
consul serviceregistration.Handler
consulNamespace string
restarter agentconsul.WorkloadRestarter
taskEnvBuilder *taskenv.Builder
@ -217,7 +217,7 @@ func (h *groupServiceHook) deregister() {
}
}
func (h *groupServiceHook) getWorkloadServices() *agentconsul.WorkloadServices {
func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadServices {
// Interpolate with the task's environment
interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services)
@ -227,15 +227,15 @@ func (h *groupServiceHook) getWorkloadServices() *agentconsul.WorkloadServices {
}
// Create task services struct with request's driver metadata
return &agentconsul.WorkloadServices{
AllocID: h.allocID,
Group: h.group,
ConsulNamespace: h.consulNamespace,
Restarter: h.restarter,
Services: interpolatedServices,
Networks: h.networks,
NetworkStatus: netStatus,
Ports: h.ports,
Canary: h.canary,
return &serviceregistration.WorkloadServices{
AllocID: h.allocID,
Group: h.group,
Namespace: h.consulNamespace,
Restarter: h.restarter,
Services: interpolatedServices,
Networks: h.networks,
NetworkStatus: netStatus,
Ports: h.ports,
Canary: h.canary,
}
}

View File

@ -8,7 +8,7 @@ import (
consulapi "github.com/hashicorp/consul/api"
ctestutil "github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
@ -35,7 +35,7 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) {
PortLabel: "9999",
}}
logger := testlog.HCLogger(t)
consulClient := consul.NewMockConsulServiceClient(t, logger)
consulClient := regMock.NewServiceRegistrationHandler(logger)
h := newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
@ -71,7 +71,7 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) {
alloc.Job.TaskGroups[0].ShutdownDelay = helper.TimeToPtr(10 * time.Second)
logger := testlog.HCLogger(t)
consulClient := consul.NewMockConsulServiceClient(t, logger)
consulClient := regMock.NewServiceRegistrationHandler(logger)
h := newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
@ -106,7 +106,7 @@ func TestGroupServiceHook_GroupServices(t *testing.T) {
alloc := mock.ConnectAlloc()
logger := testlog.HCLogger(t)
consulClient := consul.NewMockConsulServiceClient(t, logger)
consulClient := regMock.NewServiceRegistrationHandler(logger)
h := newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
@ -152,7 +152,7 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) {
}
logger := testlog.HCLogger(t)
consulClient := consul.NewMockConsulServiceClient(t, logger)
consulClient := regMock.NewServiceRegistrationHandler(logger)
h := newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
@ -196,7 +196,7 @@ func TestGroupServiceHook_getWorkloadServices(t *testing.T) {
}
logger := testlog.HCLogger(t)
consulClient := consul.NewMockConsulServiceClient(t, logger)
consulClient := regMock.NewServiceRegistrationHandler(logger)
h := newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,

View File

@ -9,7 +9,7 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allochealth"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/serviceregistration"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -33,7 +33,7 @@ type allocHealthWatcherHook struct {
healthSetter healthSetter
// consul client used to monitor health checks
consul consul.ConsulServiceAPI
consul serviceregistration.Handler
// listener is given to trackers to listen for alloc updates and closed
// when the alloc is destroyed.
@ -68,7 +68,7 @@ type allocHealthWatcherHook struct {
}
func newAllocHealthWatcherHook(logger log.Logger, alloc *structs.Allocation, hs healthSetter,
listener *cstructs.AllocListener, consul consul.ConsulServiceAPI) interfaces.RunnerHook {
listener *cstructs.AllocListener, consul serviceregistration.Handler) interfaces.RunnerHook {
// Neither deployments nor migrations care about the health of
// non-service jobs so never watch their health

View File

@ -7,9 +7,9 @@ import (
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/serviceregistration"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
cstructs "github.com/hashicorp/nomad/client/structs"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
@ -92,7 +92,7 @@ func TestHealthHook_PrerunPostrun(t *testing.T) {
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
consul := consul.NewMockConsulServiceClient(t, logger)
consul := regMock.NewServiceRegistrationHandler(logger)
hs := &mockHealthSetter{}
h := newAllocHealthWatcherHook(logger, mock.Alloc(), hs, b.Listen(), consul)
@ -130,7 +130,7 @@ func TestHealthHook_PrerunUpdatePostrun(t *testing.T) {
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
consul := consul.NewMockConsulServiceClient(t, logger)
consul := regMock.NewServiceRegistrationHandler(logger)
hs := &mockHealthSetter{}
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
@ -169,7 +169,7 @@ func TestHealthHook_UpdatePrerunPostrun(t *testing.T) {
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
consul := consul.NewMockConsulServiceClient(t, logger)
consul := regMock.NewServiceRegistrationHandler(logger)
hs := &mockHealthSetter{}
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
@ -210,7 +210,7 @@ func TestHealthHook_Postrun(t *testing.T) {
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
consul := consul.NewMockConsulServiceClient(t, logger)
consul := regMock.NewServiceRegistrationHandler(logger)
hs := &mockHealthSetter{}
h := newAllocHealthWatcherHook(logger, mock.Alloc(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
@ -243,9 +243,9 @@ func TestHealthHook_SetHealth_healthy(t *testing.T) {
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
taskRegs := map[string]*agentconsul.ServiceRegistrations{
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
Services: map[string]*serviceregistration.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
@ -263,14 +263,14 @@ func TestHealthHook_SetHealth_healthy(t *testing.T) {
// Don't reply on the first call
called := false
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
consul := regMock.NewServiceRegistrationHandler(logger)
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
if !called {
called = true
return nil, nil
}
reg := &agentconsul.AllocRegistration{
reg := &serviceregistration.AllocRegistration{
Tasks: taskRegs,
}
@ -331,9 +331,9 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) {
Name: task.Services[0].Checks[1].Name,
Status: consulapi.HealthCritical,
}
taskRegs := map[string]*agentconsul.ServiceRegistrations{
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
Services: map[string]*serviceregistration.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
@ -351,14 +351,14 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) {
// Don't reply on the first call
called := false
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
consul := regMock.NewServiceRegistrationHandler(logger)
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
if !called {
called = true
return nil, nil
}
reg := &agentconsul.AllocRegistration{
reg := &serviceregistration.AllocRegistration{
Tasks: taskRegs,
}

View File

@ -17,8 +17,8 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocdir"
ifs "github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
@ -451,9 +451,9 @@ func (h *envoyBootstrapHook) grpcAddress(env map[string]string) string {
}
func (h *envoyBootstrapHook) proxyServiceID(group string, service *structs.Service) string {
// Note, it is critical the ID here matches what is actually registered in Consul.
// See: WorkloadServices.Name in structs.go
return agentconsul.MakeAllocServiceID(h.alloc.ID, "group-"+group, service)
// Note, it is critical the ID here matches what is actually registered in
// Consul. See: WorkloadServices.Name in serviceregistration/workload.go.
return serviceregistration.MakeAllocServiceID(h.alloc.ID, "group-"+group, service)
}
// newEnvoyBootstrapArgs is used to prepare for the invocation of the

View File

@ -10,7 +10,7 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
tinterfaces "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
@ -26,7 +26,7 @@ const defaultShutdownWait = time.Minute
type scriptCheckHookConfig struct {
alloc *structs.Allocation
task *structs.Task
consul consul.ConsulServiceAPI
consul serviceregistration.Handler
logger log.Logger
shutdownWait time.Duration
}
@ -34,7 +34,7 @@ type scriptCheckHookConfig struct {
// scriptCheckHook implements a task runner hook for running script
// checks in the context of a task
type scriptCheckHook struct {
consul consul.ConsulServiceAPI
consul serviceregistration.Handler
consulNamespace string
alloc *structs.Allocation
task *structs.Task
@ -182,7 +182,7 @@ func (h *scriptCheckHook) newScriptChecks() map[string]*scriptCheck {
if check.Type != structs.ServiceCheckScript {
continue
}
serviceID := agentconsul.MakeAllocServiceID(
serviceID := serviceregistration.MakeAllocServiceID(
h.alloc.ID, h.task.Name, service)
sc := newScriptCheck(&scriptCheckConfig{
consulNamespace: h.consulNamespace,
@ -222,7 +222,7 @@ func (h *scriptCheckHook) newScriptChecks() map[string]*scriptCheck {
continue
}
groupTaskName := "group-" + tg.Name
serviceID := agentconsul.MakeAllocServiceID(
serviceID := serviceregistration.MakeAllocServiceID(
h.alloc.ID, groupTaskName, service)
sc := newScriptCheck(&scriptCheckConfig{
consulNamespace: h.consulNamespace,

View File

@ -10,7 +10,8 @@ import (
"github.com/hashicorp/consul/api"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/serviceregistration"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
@ -226,7 +227,7 @@ func TestScript_Exec_Codes(t *testing.T) {
func TestScript_TaskEnvInterpolation(t *testing.T) {
logger := testlog.HCLogger(t)
consulClient := consul.NewMockConsulServiceClient(t, logger)
consulClient := regMock.NewServiceRegistrationHandler(logger)
exec, cancel := newBlockingScriptExec()
defer cancel()
@ -262,7 +263,7 @@ func TestScript_TaskEnvInterpolation(t *testing.T) {
scHook.driverExec = exec
expectedSvc := svcHook.getWorkloadServices().Services[0]
expected := agentconsul.MakeCheckID(agentconsul.MakeAllocServiceID(
expected := agentconsul.MakeCheckID(serviceregistration.MakeAllocServiceID(
alloc.ID, task.Name, expectedSvc), expectedSvc.Checks[0])
actual := scHook.newScriptChecks()
@ -278,7 +279,7 @@ func TestScript_TaskEnvInterpolation(t *testing.T) {
svcHook.taskEnv = env
expectedSvc = svcHook.getWorkloadServices().Services[0]
expected = agentconsul.MakeCheckID(agentconsul.MakeAllocServiceID(
expected = agentconsul.MakeCheckID(serviceregistration.MakeAllocServiceID(
alloc.ID, task.Name, expectedSvc), expectedSvc.Checks[0])
actual = scHook.newScriptChecks()

View File

@ -8,7 +8,7 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
tinterfaces "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
@ -24,7 +24,7 @@ var _ interfaces.TaskUpdateHook = &serviceHook{}
type serviceHookConfig struct {
alloc *structs.Allocation
task *structs.Task
consulServices consul.ConsulServiceAPI
consulServices serviceregistration.Handler
consulNamespace string
// Restarter is a subset of the TaskLifecycle interface
@ -37,7 +37,7 @@ type serviceHook struct {
allocID string
taskName string
consulNamespace string
consulServices consul.ConsulServiceAPI
consulServices serviceregistration.Handler
restarter agentconsul.WorkloadRestarter
logger log.Logger
@ -193,21 +193,21 @@ func (h *serviceHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest,
return nil
}
func (h *serviceHook) getWorkloadServices() *agentconsul.WorkloadServices {
func (h *serviceHook) getWorkloadServices() *serviceregistration.WorkloadServices {
// Interpolate with the task's environment
interpolatedServices := taskenv.InterpolateServices(h.taskEnv, h.services)
// Create task services struct with request's driver metadata
return &agentconsul.WorkloadServices{
AllocID: h.allocID,
Task: h.taskName,
ConsulNamespace: h.consulNamespace,
Restarter: h.restarter,
Services: interpolatedServices,
DriverExec: h.driverExec,
DriverNetwork: h.driverNet,
Networks: h.networks,
Canary: h.canary,
Ports: h.ports,
return &serviceregistration.WorkloadServices{
AllocID: h.allocID,
Task: h.taskName,
Namespace: h.consulNamespace,
Restarter: h.restarter,
Services: interpolatedServices,
DriverExec: h.driverExec,
DriverNetwork: h.driverNet,
Networks: h.networks,
Canary: h.canary,
Ports: h.ports,
}
}

View File

@ -5,7 +5,7 @@ import (
"testing"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/stretchr/testify/require"
@ -20,7 +20,7 @@ var _ interfaces.TaskUpdateHook = (*serviceHook)(nil)
func TestUpdate_beforePoststart(t *testing.T) {
alloc := mock.Alloc()
logger := testlog.HCLogger(t)
c := consul.NewMockConsulServiceClient(t, logger)
c := regMock.NewServiceRegistrationHandler(logger)
hook := newServiceHook(serviceHookConfig{
alloc: alloc,
@ -56,7 +56,7 @@ func Test_serviceHook_multipleDeRegisterCall(t *testing.T) {
alloc := mock.Alloc()
logger := testlog.HCLogger(t)
c := consul.NewMockConsulServiceClient(t, logger)
c := regMock.NewServiceRegistrationHandler(logger)
hook := newServiceHook(serviceHookConfig{
alloc: alloc,

View File

@ -25,6 +25,7 @@ import (
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
"github.com/hashicorp/nomad/client/serviceregistration"
cstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
@ -166,7 +167,7 @@ type TaskRunner struct {
// consulClient is the client used by the consul service hook for
// registering services and checks
consulServiceClient consul.ConsulServiceAPI
consulServiceClient serviceregistration.Handler
// consulProxiesClient is the client used by the envoy version hook for
// asking consul what version of envoy nomad should inject into the connect
@ -248,7 +249,7 @@ type Config struct {
Logger log.Logger
// Consul is the client to use for managing Consul service registrations
Consul consul.ConsulServiceAPI
Consul serviceregistration.Handler
// ConsulProxies is the client to use for looking up supported envoy versions
// from Consul.

View File

@ -24,6 +24,7 @@ import (
consulapi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/devicemanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
cstate "github.com/hashicorp/nomad/client/state"
ctestutil "github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/client/vaultclient"
@ -109,7 +110,7 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
Task: thisTask,
TaskDir: taskDir,
Logger: clientConf.Logger,
Consul: consulapi.NewMockConsulServiceClient(t, logger),
Consul: regMock.NewServiceRegistrationHandler(logger),
ConsulSI: consulapi.NewMockServiceIdentitiesClient(),
Vault: vaultclient.NewMockVaultClient(),
StateDB: cstate.NoopDB{},
@ -939,7 +940,7 @@ func TestTaskRunner_ShutdownDelay(t *testing.T) {
tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name)
defer cleanup()
mockConsul := conf.Consul.(*consulapi.MockConsulServiceClient)
mockConsul := conf.Consul.(*regMock.ServiceRegistrationHandler)
// Wait for the task to start
testWaitForTaskToStart(t, tr)
@ -1027,7 +1028,7 @@ func TestTaskRunner_NoShutdownDelay(t *testing.T) {
tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name)
defer cleanup()
mockConsul := conf.Consul.(*consulapi.MockConsulServiceClient)
mockConsul := conf.Consul.(*regMock.ServiceRegistrationHandler)
testWaitForTaskToStart(t, tr)
@ -2479,7 +2480,7 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) {
state := tr.TaskState()
require.Equal(t, structs.TaskStateDead, state.State)
consul := conf.Consul.(*consulapi.MockConsulServiceClient)
consul := conf.Consul.(*regMock.ServiceRegistrationHandler)
consulOps := consul.GetOps()
require.Len(t, consulOps, 4)

View File

@ -14,6 +14,7 @@ import (
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/devicemanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
"github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/nomad/structs"
@ -62,7 +63,7 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, fu
Logger: clientConf.Logger,
ClientConfig: clientConf,
StateDB: state.NoopDB{},
Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger),
Consul: mock.NewServiceRegistrationHandler(clientConf.Logger),
ConsulSI: consul.NewMockServiceIdentitiesClient(),
Vault: vaultclient.NewMockVaultClient(),
StateUpdater: &MockStateUpdater{},

View File

@ -39,6 +39,7 @@ import (
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
"github.com/hashicorp/nomad/client/servers"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/stats"
cstructs "github.com/hashicorp/nomad/client/structs"
@ -227,7 +228,7 @@ type Client struct {
// consulService is Nomad's custom Consul client for managing services
// and checks.
consulService consulApi.ConsulServiceAPI
consulService serviceregistration.Handler
// consulProxies is Nomad's custom Consul client for looking up supported
// envoy versions
@ -322,7 +323,7 @@ var (
// registered via https://golang.org/pkg/net/rpc/#Server.RegisterName in place
// of the client's normal RPC handlers. This allows server tests to override
// the behavior of the client.
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxies consulApi.SupportedProxiesAPI, consulService consulApi.ConsulServiceAPI, rpcs map[string]interface{}) (*Client, error) {
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxies consulApi.SupportedProxiesAPI, consulService serviceregistration.Handler, rpcs map[string]interface{}) (*Client, error) {
// Create the tls wrapper
var tlsWrap tlsutil.RegionWrapper
if cfg.TLSConfig.EnableRPC {

View File

@ -14,8 +14,9 @@ import (
memdb "github.com/hashicorp/go-memdb"
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
"github.com/hashicorp/nomad/client/config"
consulApi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/fingerprint"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
cstate "github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/pluginutils/catalog"
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
@ -29,8 +30,6 @@ import (
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
cstate "github.com/hashicorp/nomad/client/state"
"github.com/stretchr/testify/require"
)
@ -615,7 +614,7 @@ func TestClient_SaveRestoreState(t *testing.T) {
logger := testlog.HCLogger(t)
c1.config.Logger = logger
consulCatalog := consul.NewMockCatalog(logger)
mockService := consulApi.NewMockConsulServiceClient(t, logger)
mockService := regMock.NewServiceRegistrationHandler(logger)
// ensure we use non-shutdown driver instances
c1.config.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", c1.config.Options, nil)

View File

@ -1,33 +1,9 @@
package consul
import (
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
)
// ConsulServiceAPI is the interface the Nomad Client uses to register and
// remove services and checks from Consul.
//
// ACL requirements
// - service:write
type ConsulServiceAPI interface {
// RegisterWorkload with Consul. Adds all service entries and checks to Consul.
RegisterWorkload(*consul.WorkloadServices) error
// RemoveWorkload from Consul. Removes all service entries and checks.
RemoveWorkload(*consul.WorkloadServices)
// UpdateWorkload in Consul. Does not alter the service if only checks have
// changed.
UpdateWorkload(old, newTask *consul.WorkloadServices) error
// AllocRegistrations returns the registrations for the given allocation.
AllocRegistrations(allocID string) (*consul.AllocRegistration, error)
// UpdateTTL is used to update the TTL of a check.
UpdateTTL(id, namespace, output, status string) error
}
// TokenDeriverFunc takes an allocation and a set of tasks and derives a
// service identity token for each. Requests go through nomad server.
type TokenDeriverFunc func(*structs.Allocation, []string) (map[string]string, error)

View File

@ -1,113 +0,0 @@
package consul
import (
"fmt"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/command/agent/consul"
testing "github.com/mitchellh/go-testing-interface"
)
// MockConsulOp represents the register/deregister operations.
type MockConsulOp struct {
Op string // add, remove, or update
AllocID string
Name string // task or group name
OccurredAt time.Time
}
func NewMockConsulOp(op, allocID, name string) MockConsulOp {
switch op {
case "add", "remove", "update", "alloc_registrations",
"add_group", "remove_group", "update_group", "update_ttl":
default:
panic(fmt.Errorf("invalid consul op: %s", op))
}
return MockConsulOp{
Op: op,
AllocID: allocID,
Name: name,
OccurredAt: time.Now(),
}
}
// MockConsulServiceClient implements the ConsulServiceAPI interface to record
// and log task registration/deregistration.
type MockConsulServiceClient struct {
ops []MockConsulOp
mu sync.Mutex
logger log.Logger
// AllocRegistrationsFn allows injecting return values for the
// AllocRegistrations function.
AllocRegistrationsFn func(allocID string) (*consul.AllocRegistration, error)
}
func NewMockConsulServiceClient(t testing.T, logger log.Logger) *MockConsulServiceClient {
logger = logger.Named("mock_consul")
m := MockConsulServiceClient{
ops: make([]MockConsulOp, 0, 20),
logger: logger,
}
return &m
}
func (m *MockConsulServiceClient) UpdateWorkload(old, newSvcs *consul.WorkloadServices) error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Trace("UpdateWorkload", "alloc_id", newSvcs.AllocID, "name", newSvcs.Name(),
"old_services", len(old.Services), "new_services", len(newSvcs.Services),
)
m.ops = append(m.ops, NewMockConsulOp("update", newSvcs.AllocID, newSvcs.Name()))
return nil
}
func (m *MockConsulServiceClient) RegisterWorkload(svcs *consul.WorkloadServices) error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Trace("RegisterWorkload", "alloc_id", svcs.AllocID, "name", svcs.Name(),
"services", len(svcs.Services),
)
m.ops = append(m.ops, NewMockConsulOp("add", svcs.AllocID, svcs.Name()))
return nil
}
func (m *MockConsulServiceClient) RemoveWorkload(svcs *consul.WorkloadServices) {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Trace("RemoveWorkload", "alloc_id", svcs.AllocID, "name", svcs.Name(),
"services", len(svcs.Services),
)
m.ops = append(m.ops, NewMockConsulOp("remove", svcs.AllocID, svcs.Name()))
}
func (m *MockConsulServiceClient) AllocRegistrations(allocID string) (*consul.AllocRegistration, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Trace("AllocRegistrations", "alloc_id", allocID)
m.ops = append(m.ops, NewMockConsulOp("alloc_registrations", allocID, ""))
if m.AllocRegistrationsFn != nil {
return m.AllocRegistrationsFn(allocID)
}
return nil, nil
}
func (m *MockConsulServiceClient) UpdateTTL(checkID, namespace, output, status string) error {
// TODO(tgross): this method is here so we can implement the
// interface but the locking we need for testing creates a lot
// of opportunities for deadlocks in testing that will never
// appear in live code.
m.logger.Trace("UpdateTTL", "check_id", checkID, "namespace", namespace, "status", status)
return nil
}
func (m *MockConsulServiceClient) GetOps() []MockConsulOp {
m.mu.Lock()
defer m.mu.Unlock()
return m.ops
}

View File

@ -0,0 +1,136 @@
package serviceregistration
import (
"fmt"
"strconv"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
// GetAddress returns the IP and port to use for a service or check. If no port
// label is specified (an empty value), zero values are returned because no
// address could be resolved.
func GetAddress(
addrMode, portLabel string, networks structs.Networks, driverNet *drivers.DriverNetwork,
ports structs.AllocatedPorts, netStatus *structs.AllocNetworkStatus) (string, int, error) {
switch addrMode {
case structs.AddressModeAuto:
if driverNet.Advertise() {
addrMode = structs.AddressModeDriver
} else {
addrMode = structs.AddressModeHost
}
return GetAddress(addrMode, portLabel, networks, driverNet, ports, netStatus)
case structs.AddressModeHost:
if portLabel == "" {
if len(networks) != 1 {
// If no networks are specified return zero
// values. Consul will advertise the host IP
// with no port. This is the pre-0.7.1 behavior
// some people rely on.
return "", 0, nil
}
return networks[0].IP, 0, nil
}
// Default path: use host ip:port
// Try finding port in the AllocatedPorts struct first
// Check in Networks struct for backwards compatibility if not found
mapping, ok := ports.Get(portLabel)
if !ok {
mapping = networks.Port(portLabel)
if mapping.Value > 0 {
return mapping.HostIP, mapping.Value, nil
}
// If port isn't a label, try to parse it as a literal port number
port, err := strconv.Atoi(portLabel)
if err != nil {
// Don't include Atoi error message as user likely
// never intended it to be a numeric and it creates a
// confusing error message
return "", 0, fmt.Errorf("invalid port %q: port label not found", portLabel)
}
if port <= 0 {
return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel)
}
// A number was given which will use the Consul agent's address and the given port
// Returning a blank string as an address will use the Consul agent's address
return "", port, nil
}
return mapping.HostIP, mapping.Value, nil
case structs.AddressModeDriver:
// Require a driver network if driver address mode is used
if driverNet == nil {
return "", 0, fmt.Errorf(`cannot use address_mode="driver": no driver network exists`)
}
// If no port label is specified just return the IP
if portLabel == "" {
return driverNet.IP, 0, nil
}
// If the port is a label, use the driver's port (not the host's)
if port, ok := ports.Get(portLabel); ok {
return driverNet.IP, port.To, nil
}
// Check if old style driver portmap is used
if port, ok := driverNet.PortMap[portLabel]; ok {
return driverNet.IP, port, nil
}
// If port isn't a label, try to parse it as a literal port number
port, err := strconv.Atoi(portLabel)
if err != nil {
// Don't include Atoi error message as user likely
// never intended it to be a numeric and it creates a
// confusing error message
return "", 0, fmt.Errorf("invalid port label %q: port labels in driver address_mode must be numeric or in the driver's port map", portLabel)
}
if port <= 0 {
return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel)
}
return driverNet.IP, port, nil
case structs.AddressModeAlloc:
if netStatus == nil {
return "", 0, fmt.Errorf(`cannot use address_mode="alloc": no allocation network status reported`)
}
// If no port label is specified just return the IP
if portLabel == "" {
return netStatus.Address, 0, nil
}
// If port is a label and is found then return it
if port, ok := ports.Get(portLabel); ok {
// Use port.To value unless not set
if port.To > 0 {
return netStatus.Address, port.To, nil
}
return netStatus.Address, port.Value, nil
}
// Check if port is a literal number
port, err := strconv.Atoi(portLabel)
if err != nil {
// User likely specified wrong port label here
return "", 0, fmt.Errorf("invalid port %q: port label not found or is not numeric", portLabel)
}
if port <= 0 {
return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel)
}
return netStatus.Address, port, nil
default:
// Shouldn't happen due to validation, but enforce invariants
return "", 0, fmt.Errorf("invalid address mode %q", addrMode)
}
}

View File

@ -0,0 +1,361 @@
package serviceregistration
import (
"testing"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/stretchr/testify/require"
)
func Test_GetAddress(t *testing.T) {
const HostIP = "127.0.0.1"
testCases := []struct {
name string
// Parameters
mode string
portLabel string
host map[string]int // will be converted to structs.Networks
driver *drivers.DriverNetwork
ports structs.AllocatedPorts
status *structs.AllocNetworkStatus
// Results
expectedIP string
expectedPort int
expectedErr string
}{
// Valid Configurations
{
name: "ExampleService",
mode: structs.AddressModeAuto,
portLabel: "db",
host: map[string]int{"db": 12435},
driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
expectedIP: HostIP,
expectedPort: 12435,
},
{
name: "host",
mode: structs.AddressModeHost,
portLabel: "db",
host: map[string]int{"db": 12345},
driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
expectedIP: HostIP,
expectedPort: 12345,
},
{
name: "driver",
mode: structs.AddressModeDriver,
portLabel: "db",
host: map[string]int{"db": 12345},
driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
expectedIP: "10.1.2.3",
expectedPort: 6379,
},
{
name: "AutoDriver",
mode: structs.AddressModeAuto,
portLabel: "db",
host: map[string]int{"db": 12345},
driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
AutoAdvertise: true,
},
expectedIP: "10.1.2.3",
expectedPort: 6379,
},
{
name: "DriverCustomPort",
mode: structs.AddressModeDriver,
portLabel: "7890",
host: map[string]int{"db": 12345},
driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
expectedIP: "10.1.2.3",
expectedPort: 7890,
},
// Invalid Configurations
{
name: "DriverWithoutNetwork",
mode: structs.AddressModeDriver,
portLabel: "db",
host: map[string]int{"db": 12345},
driver: nil,
expectedErr: "no driver network exists",
},
{
name: "DriverBadPort",
mode: structs.AddressModeDriver,
portLabel: "bad-port-label",
host: map[string]int{"db": 12345},
driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
expectedErr: "invalid port",
},
{
name: "DriverZeroPort",
mode: structs.AddressModeDriver,
portLabel: "0",
driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
},
expectedErr: "invalid port",
},
{
name: "HostBadPort",
mode: structs.AddressModeHost,
portLabel: "bad-port-label",
expectedErr: "invalid port",
},
{
name: "InvalidMode",
mode: "invalid-mode",
portLabel: "80",
expectedErr: "invalid address mode",
},
{
name: "NoPort_AutoMode",
mode: structs.AddressModeAuto,
expectedIP: HostIP,
},
{
name: "NoPort_HostMode",
mode: structs.AddressModeHost,
expectedIP: HostIP,
},
{
name: "NoPort_DriverMode",
mode: structs.AddressModeDriver,
driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
},
expectedIP: "10.1.2.3",
},
// Scenarios using port 0.12 networking fields (NetworkStatus, AllocatedPortMapping)
{
name: "ExampleServer_withAllocatedPorts",
mode: structs.AddressModeAuto,
portLabel: "db",
ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12435,
To: 6379,
HostIP: HostIP,
},
},
status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
expectedIP: HostIP,
expectedPort: 12435,
},
{
name: "Host_withAllocatedPorts",
mode: structs.AddressModeHost,
portLabel: "db",
ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
expectedIP: HostIP,
expectedPort: 12345,
},
{
name: "Driver_withAllocatedPorts",
mode: structs.AddressModeDriver,
portLabel: "db",
ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
},
status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
expectedIP: "10.1.2.3",
expectedPort: 6379,
},
{
name: "AutoDriver_withAllocatedPorts",
mode: structs.AddressModeAuto,
portLabel: "db",
ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
AutoAdvertise: true,
},
status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
expectedIP: "10.1.2.3",
expectedPort: 6379,
},
{
name: "DriverCustomPort_withAllocatedPorts",
mode: structs.AddressModeDriver,
portLabel: "7890",
ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
},
status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
expectedIP: "10.1.2.3",
expectedPort: 7890,
},
{
name: "Host_MultiHostInterface",
mode: structs.AddressModeAuto,
portLabel: "db",
ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: "127.0.0.100",
},
},
status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
expectedIP: "127.0.0.100",
expectedPort: 12345,
},
{
name: "Alloc",
mode: structs.AddressModeAlloc,
portLabel: "db",
ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
expectedIP: "172.26.0.1",
expectedPort: 6379,
},
{
name: "Alloc no to value",
mode: structs.AddressModeAlloc,
portLabel: "db",
ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
HostIP: HostIP,
},
},
status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
expectedIP: "172.26.0.1",
expectedPort: 12345,
},
{
name: "AllocCustomPort",
mode: structs.AddressModeAlloc,
portLabel: "6379",
status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
expectedIP: "172.26.0.1",
expectedPort: 6379,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Convert host port map into a structs.Networks.
networks := []*structs.NetworkResource{
{
IP: HostIP,
ReservedPorts: make([]structs.Port, len(tc.host)),
},
}
i := 0
for label, port := range tc.host {
networks[0].ReservedPorts[i].Label = label
networks[0].ReservedPorts[i].Value = port
i++
}
// Run the GetAddress function.
actualIP, actualPort, actualErr := GetAddress(
tc.mode, tc.portLabel, networks, tc.driver, tc.ports, tc.status)
// Assert the results
require.Equal(t, tc.expectedIP, actualIP, "IP mismatch")
require.Equal(t, tc.expectedPort, actualPort, "Port mismatch")
if tc.expectedErr == "" {
require.Nil(t, actualErr)
} else {
require.Error(t, actualErr)
require.Contains(t, actualErr.Error(), tc.expectedErr)
}
})
}
}

View File

@ -0,0 +1,27 @@
package serviceregistration
import (
"fmt"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// nomadServicePrefix is the prefix that scopes all Nomad registered
// services (both agent and task entries).
nomadServicePrefix = "_nomad"
// nomadTaskPrefix is the prefix that scopes Nomad registered services
// for tasks.
nomadTaskPrefix = nomadServicePrefix + "-task-"
)
// MakeAllocServiceID creates a unique ID for identifying an alloc service in
// a service registration provider. Both Nomad and Consul solutions use the
// same ID format to provide consistency.
//
// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http-http
func MakeAllocServiceID(allocID, taskName string, service *structs.Service) string {
return fmt.Sprintf("%s%s-%s-%s-%s",
nomadTaskPrefix, allocID, taskName, service.Name, service.PortLabel)
}

View File

@ -0,0 +1,36 @@
package serviceregistration
import (
"testing"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func Test_MakeAllocServiceID(t *testing.T) {
testCases := []struct {
inputAllocID string
inputTaskName string
inputService *structs.Service
expectedOutput string
name string
}{
{
inputAllocID: "7ac7c672-1824-6f06-644c-4c249e1578b9",
inputTaskName: "cache",
inputService: &structs.Service{
Name: "redis",
PortLabel: "db",
},
expectedOutput: "_nomad-task-7ac7c672-1824-6f06-644c-4c249e1578b9-cache-redis-db",
name: "generic 1",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualOutput := MakeAllocServiceID(tc.inputAllocID, tc.inputTaskName, tc.inputService)
require.Equal(t, tc.expectedOutput, actualOutput)
})
}
}

View File

@ -0,0 +1,125 @@
package mock
import (
"fmt"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/serviceregistration"
)
// Ensure that the mock handler implements the service registration handler
// interface.
var _ serviceregistration.Handler = (*ServiceRegistrationHandler)(nil)
// ServiceRegistrationHandler is the mock implementation of the
// serviceregistration.Handler interface and can be used for testing.
type ServiceRegistrationHandler struct {
log hclog.Logger
// ops tracks the requested operations by the caller during the entire
// lifecycle of the ServiceRegistrationHandler. The mutex should be used
// whenever interacting with this.
mu sync.Mutex
ops []Operation
// AllocRegistrationsFn allows injecting return values for the
// AllocRegistrations function.
AllocRegistrationsFn func(allocID string) (*serviceregistration.AllocRegistration, error)
}
// NewServiceRegistrationHandler returns a ready to use
// ServiceRegistrationHandler for testing.
func NewServiceRegistrationHandler(log hclog.Logger) *ServiceRegistrationHandler {
return &ServiceRegistrationHandler{
ops: make([]Operation, 0, 20),
log: log.Named("mock_service_registration"),
}
}
func (h *ServiceRegistrationHandler) RegisterWorkload(services *serviceregistration.WorkloadServices) error {
h.mu.Lock()
defer h.mu.Unlock()
h.log.Trace("RegisterWorkload", "alloc_id", services.AllocID,
"name", services.Name(), "services", len(services.Services))
h.ops = append(h.ops, newOperation("add", services.AllocID, services.Name()))
return nil
}
func (h *ServiceRegistrationHandler) RemoveWorkload(services *serviceregistration.WorkloadServices) {
h.mu.Lock()
defer h.mu.Unlock()
h.log.Trace("RemoveWorkload", "alloc_id", services.AllocID,
"name", services.Name(), "services", len(services.Services))
h.ops = append(h.ops, newOperation("remove", services.AllocID, services.Name()))
}
func (h *ServiceRegistrationHandler) UpdateWorkload(old, newServices *serviceregistration.WorkloadServices) error {
h.mu.Lock()
defer h.mu.Unlock()
h.log.Trace("UpdateWorkload", "alloc_id", newServices.AllocID, "name", newServices.Name(),
"old_services", len(old.Services), "new_services", len(newServices.Services))
h.ops = append(h.ops, newOperation("update", newServices.AllocID, newServices.Name()))
return nil
}
func (h *ServiceRegistrationHandler) AllocRegistrations(allocID string) (*serviceregistration.AllocRegistration, error) {
h.mu.Lock()
defer h.mu.Unlock()
h.log.Trace("AllocRegistrations", "alloc_id", allocID)
h.ops = append(h.ops, newOperation("alloc_registrations", allocID, ""))
if h.AllocRegistrationsFn != nil {
return h.AllocRegistrationsFn(allocID)
}
return nil, nil
}
func (h *ServiceRegistrationHandler) UpdateTTL(checkID, namespace, output, status string) error {
// TODO(tgross): this method is here so we can implement the
// interface but the locking we need for testing creates a lot
// of opportunities for deadlocks in testing that will never
// appear in live code.
h.log.Trace("UpdateTTL", "check_id", checkID, "namespace", namespace, "status", status)
return nil
}
// GetOps returns all stored operations within the handler.
func (h *ServiceRegistrationHandler) GetOps() []Operation {
h.mu.Lock()
defer h.mu.Unlock()
return h.ops
}
// Operation represents the register/deregister operations.
type Operation struct {
Op string // add, remove, or update
AllocID string
Name string // task or group name
OccurredAt time.Time
}
// newOperation generates a new Operation for the given parameters.
func newOperation(op, allocID, name string) Operation {
switch op {
case "add", "remove", "update", "alloc_registrations",
"add_group", "remove_group", "update_group", "update_ttl":
default:
panic(fmt.Errorf("invalid consul op: %s", op))
}
return Operation{
Op: op,
AllocID: allocID,
Name: name,
OccurredAt: time.Now(),
}
}

View File

@ -0,0 +1,157 @@
package serviceregistration
import (
"context"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
// Handler is the interface the Nomad Client uses to register, update and
// remove services and checks from service registration providers. Currently,
// Consul and Nomad are supported providers.
//
// When utilising Consul, the ACL "service:write" is required. It supports all
// functionality and is the OG/GOAT.
//
// When utilising Nomad, the client secret ID is used for authorisation. It
// currently supports service registrations only.
type Handler interface {
// RegisterWorkload adds all service entries and checks to the backend
// provider. Whilst callers attempt to ensure WorkloadServices.Services is
// not empty before calling this function, implementations should also
// perform this.
RegisterWorkload(workload *WorkloadServices) error
// RemoveWorkload all service entries and checks from the backend provider
// that are found within the passed WorkloadServices object. Whilst callers
// attempt to ensure WorkloadServices.Services is not empty before calling
// this function, implementations should also perform this.
RemoveWorkload(workload *WorkloadServices)
// UpdateWorkload removes workload as specified by the old parameter, and
// adds workload as specified by the new parameter. Callers do not perform
// any deduplication on both objects, it is therefore the responsibility of
// the implementation.
UpdateWorkload(old, newTask *WorkloadServices) error
// AllocRegistrations returns the registrations for the given allocation.
AllocRegistrations(allocID string) (*AllocRegistration, error)
// UpdateTTL is used to update the TTL of an individual service
// registration check.
UpdateTTL(id, namespace, output, status string) error
}
// WorkloadRestarter allows the checkWatcher to restart tasks or entire task
// groups.
type WorkloadRestarter interface {
Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error
}
// AllocRegistration holds the status of services registered for a particular
// allocations by task.
type AllocRegistration struct {
// Tasks maps the name of a task to its registered services and checks.
Tasks map[string]*ServiceRegistrations
}
// Copy performs a deep copy of the AllocRegistration object.
func (a *AllocRegistration) Copy() *AllocRegistration {
c := &AllocRegistration{
Tasks: make(map[string]*ServiceRegistrations, len(a.Tasks)),
}
for k, v := range a.Tasks {
c.Tasks[k] = v.copy()
}
return c
}
// NumServices returns the number of registered services.
func (a *AllocRegistration) NumServices() int {
if a == nil {
return 0
}
total := 0
for _, treg := range a.Tasks {
for _, sreg := range treg.Services {
if sreg.Service != nil {
total++
}
}
}
return total
}
// NumChecks returns the number of registered checks.
func (a *AllocRegistration) NumChecks() int {
if a == nil {
return 0
}
total := 0
for _, treg := range a.Tasks {
for _, sreg := range treg.Services {
total += len(sreg.Checks)
}
}
return total
}
// ServiceRegistrations holds the status of services registered for a
// particular task or task group.
type ServiceRegistrations struct {
Services map[string]*ServiceRegistration
}
func (t *ServiceRegistrations) copy() *ServiceRegistrations {
c := &ServiceRegistrations{
Services: make(map[string]*ServiceRegistration, len(t.Services)),
}
for k, v := range t.Services {
c.Services[k] = v.copy()
}
return c
}
// ServiceRegistration holds the status of a registered Consul Service and its
// Checks.
type ServiceRegistration struct {
// serviceID and checkIDs are internal fields that track just the IDs of the
// services/checks registered in Consul. It is used to materialize the other
// fields when queried.
ServiceID string
CheckIDs map[string]struct{}
// CheckOnUpdate is a map of checkIDs and the associated OnUpdate value
// from the ServiceCheck It is used to determine how a reported checks
// status should be evaluated.
CheckOnUpdate map[string]string
// Service is the AgentService registered in Consul.
Service *api.AgentService
// Checks is the status of the registered checks.
Checks []*api.AgentCheck
}
func (s *ServiceRegistration) copy() *ServiceRegistration {
// Copy does not copy the external fields but only the internal fields.
// This is so that the caller of AllocRegistrations can not access the
// internal fields and that method uses these fields to populate the
// external fields.
return &ServiceRegistration{
ServiceID: s.ServiceID,
CheckIDs: helper.CopyMapStringStruct(s.CheckIDs),
CheckOnUpdate: helper.CopyMapStringString(s.CheckOnUpdate),
}
}

View File

@ -0,0 +1,53 @@
package serviceregistration
import (
"testing"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestAllocRegistration_Copy(t *testing.T) {
testCases := []struct {
inputAllocRegistration *AllocRegistration
name string
}{
{
inputAllocRegistration: &AllocRegistration{
Tasks: map[string]*ServiceRegistrations{},
},
name: "empty tasks map",
},
{
inputAllocRegistration: &AllocRegistration{
Tasks: map[string]*ServiceRegistrations{
"cache": {
Services: map[string]*ServiceRegistration{
"redis-db": {
ServiceID: "service-id-1",
CheckIDs: map[string]struct{}{
"check-id-1": {},
"check-id-2": {},
"check-id-3": {},
},
CheckOnUpdate: map[string]string{
"check-id-1": structs.OnUpdateIgnore,
"check-id-2": structs.OnUpdateRequireHealthy,
"check-id-3": structs.OnUpdateIgnoreWarn,
},
},
},
},
},
},
name: "non-empty tasks map",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualOutput := tc.inputAllocRegistration.Copy()
require.Equal(t, tc.inputAllocRegistration, actualOutput)
})
}
}

View File

@ -0,0 +1,95 @@
package serviceregistration
import (
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
// WorkloadServices describes services defined in either a Task or TaskGroup
// that need to be syncronized with a service registration provider.
type WorkloadServices struct {
AllocID string
// Group in which the service belongs for a group-level service, or the
// group in which task belongs for a task-level service.
Group string
// Task in which the service belongs for task-level service. Will be empty
// for a group-level service.
Task string
// JobID provides additional context for providers regarding which job
// caused this registration.
JobID string
// Canary indicates whether, or not the allocation is a canary. This is
// used to build the correct tags mapping.
Canary bool
// Namespace is the provider namespace in which services will be
// registered, if the provider supports this functionality.
Namespace string
// Restarter allows restarting the task or task group depending on the
// check_restart stanzas.
Restarter WorkloadRestarter
// Services and checks to register for the task.
Services []*structs.Service
// Networks from the task's resources stanza.
// TODO: remove and use Ports
Networks structs.Networks
// NetworkStatus from alloc if network namespace is created.
// Can be nil.
NetworkStatus *structs.AllocNetworkStatus
// AllocatedPorts is the list of port mappings.
Ports structs.AllocatedPorts
// DriverExec is the script executor for the task's driver. For group
// services this is nil and script execution is managed by a tasklet in the
// taskrunner script_check_hook.
DriverExec interfaces.ScriptExecutor
// DriverNetwork is the network specified by the driver and may be nil.
DriverNetwork *drivers.DriverNetwork
}
// RegistrationProvider identifies the service registration provider for the
// WorkloadServices.
func (ws *WorkloadServices) RegistrationProvider() string {
// Protect against an empty array; it would be embarrassing to panic here.
if len(ws.Services) == 0 {
return ""
}
// Note(jrasell): a Nomad task group can only currently utilise a single
// service provider for all services included within it. In the event we
// remove this restriction, this will need to change along which a lot of
// other logic.
return ws.Services[0].Provider
}
// Copy method for easing tests.
func (ws *WorkloadServices) Copy() *WorkloadServices {
newTS := new(WorkloadServices)
*newTS = *ws
// Deep copy Services
newTS.Services = make([]*structs.Service, len(ws.Services))
for i := range ws.Services {
newTS.Services[i] = ws.Services[i].Copy()
}
return newTS
}
func (ws *WorkloadServices) Name() string {
if ws.Task != "" {
return ws.Task
}
return "group-" + ws.Group
}

View File

@ -0,0 +1,49 @@
package serviceregistration
import (
"testing"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestWorkloadServices_RegistrationProvider(t *testing.T) {
testCases := []struct {
inputWorkloadServices *WorkloadServices
expectedOutput string
name string
}{
{
inputWorkloadServices: &WorkloadServices{
Services: nil,
},
expectedOutput: "",
name: "nil panic check",
},
{
inputWorkloadServices: &WorkloadServices{
Services: []*structs.Service{
{Provider: structs.ServiceProviderNomad},
},
},
expectedOutput: "nomad",
name: "nomad provider",
},
{
inputWorkloadServices: &WorkloadServices{
Services: []*structs.Service{
{Provider: structs.ServiceProviderConsul},
},
},
expectedOutput: "consul",
name: "consul provider",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualOutput := tc.inputWorkloadServices.RegistrationProvider()
require.Equal(t, tc.expectedOutput, actualOutput)
})
}
}

View File

@ -14,10 +14,10 @@ import (
"github.com/hashicorp/nomad/client/allocrunner"
"github.com/hashicorp/nomad/client/allocwatcher"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/devicemanager"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
. "github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper/boltdd"
@ -206,7 +206,7 @@ func checkUpgradedAlloc(t *testing.T, path string, db StateDB, alloc *structs.Al
Logger: clientConf.Logger,
ClientConfig: clientConf,
StateDB: db,
Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger),
Consul: regMock.NewServiceRegistrationHandler(clientConf.Logger),
Vault: vaultclient.NewMockVaultClient(),
StateUpdater: &allocrunner.MockStateUpdater{},
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},

View File

@ -7,9 +7,9 @@ import (
"time"
"github.com/hashicorp/nomad/client/config"
consulapi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/servers"
"github.com/hashicorp/nomad/client/serviceregistration/mock"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/pluginutils/catalog"
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
@ -53,7 +53,7 @@ func TestClientWithRPCs(t testing.T, cb func(c *config.Config), rpcs map[string]
conf.PluginSingletonLoader = singleton.NewSingletonLoader(logger, conf.PluginLoader)
}
mockCatalog := agentconsul.NewMockCatalog(logger)
mockService := consulapi.NewMockConsulServiceClient(t, logger)
mockService := mock.NewServiceRegistrationHandler(logger)
client, err := NewClient(conf, mockCatalog, nil, mockService, rpcs)
if err != nil {
cleanup()

View File

@ -7,6 +7,7 @@ import (
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
@ -92,7 +93,7 @@ func TestConsul_Connect(t *testing.T) {
require.NoError(t, err)
require.Len(t, services, 2)
serviceID := MakeAllocServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0])
serviceID := serviceregistration.MakeAllocServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0])
connectID := serviceID + "-sidecar-proxy"
require.Contains(t, services, serviceID)

View File

@ -14,14 +14,13 @@ import (
"time"
"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper/envoy"
"github.com/pkg/errors"
"github.com/hashicorp/consul/api"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/envoy"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/pkg/errors"
)
const (
@ -370,109 +369,6 @@ func (o operations) String() string {
return fmt.Sprintf("<%d, %d, %d, %d>", len(o.regServices), len(o.regChecks), len(o.deregServices), len(o.deregChecks))
}
// AllocRegistration holds the status of services registered for a particular
// allocations by task.
type AllocRegistration struct {
// Tasks maps the name of a task to its registered services and checks
Tasks map[string]*ServiceRegistrations
}
func (a *AllocRegistration) copy() *AllocRegistration {
c := &AllocRegistration{
Tasks: make(map[string]*ServiceRegistrations, len(a.Tasks)),
}
for k, v := range a.Tasks {
c.Tasks[k] = v.copy()
}
return c
}
// NumServices returns the number of registered services
func (a *AllocRegistration) NumServices() int {
if a == nil {
return 0
}
total := 0
for _, treg := range a.Tasks {
for _, sreg := range treg.Services {
if sreg.Service != nil {
total++
}
}
}
return total
}
// NumChecks returns the number of registered checks
func (a *AllocRegistration) NumChecks() int {
if a == nil {
return 0
}
total := 0
for _, treg := range a.Tasks {
for _, sreg := range treg.Services {
total += len(sreg.Checks)
}
}
return total
}
// ServiceRegistrations holds the status of services registered for a particular
// task or task group.
type ServiceRegistrations struct {
Services map[string]*ServiceRegistration
}
func (t *ServiceRegistrations) copy() *ServiceRegistrations {
c := &ServiceRegistrations{
Services: make(map[string]*ServiceRegistration, len(t.Services)),
}
for k, v := range t.Services {
c.Services[k] = v.copy()
}
return c
}
// ServiceRegistration holds the status of a registered Consul Service and its
// Checks.
type ServiceRegistration struct {
// serviceID and checkIDs are internal fields that track just the IDs of the
// services/checks registered in Consul. It is used to materialize the other
// fields when queried.
serviceID string
checkIDs map[string]struct{}
// CheckOnUpdate is a map of checkIDs and the associated OnUpdate value
// from the ServiceCheck It is used to determine how a reported checks
// status should be evaluated.
CheckOnUpdate map[string]string
// Service is the AgentService registered in Consul.
Service *api.AgentService
// Checks is the status of the registered checks.
Checks []*api.AgentCheck
}
func (s *ServiceRegistration) copy() *ServiceRegistration {
// Copy does not copy the external fields but only the internal fields. This
// is so that the caller of AllocRegistrations can not access the internal
// fields and that method uses these fields to populate the external fields.
return &ServiceRegistration{
serviceID: s.serviceID,
checkIDs: helper.CopyMapStringStruct(s.checkIDs),
CheckOnUpdate: helper.CopyMapStringString(s.CheckOnUpdate),
}
}
// ServiceClient handles task and agent service registration with Consul.
type ServiceClient struct {
agentAPI AgentAPI
@ -503,7 +399,7 @@ type ServiceClient struct {
// allocRegistrations stores the services and checks that are registered
// with Consul by allocation ID.
allocRegistrations map[string]*AllocRegistration
allocRegistrations map[string]*serviceregistration.AllocRegistration
allocRegistrationsLock sync.RWMutex
// Nomad agent services and checks that are recorded so they can be removed
@ -550,7 +446,7 @@ func NewServiceClient(agentAPI AgentAPI, namespacesClient *NamespacesClient, log
checks: make(map[string]*api.AgentCheckRegistration),
explicitlyDeregisteredServices: make(map[string]bool),
explicitlyDeregisteredChecks: make(map[string]bool),
allocRegistrations: make(map[string]*AllocRegistration),
allocRegistrations: make(map[string]*serviceregistration.AllocRegistration),
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
checkWatcher: newCheckWatcher(logger, agentAPI, namespacesClient),
@ -1033,14 +929,15 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
// serviceRegs creates service registrations, check registrations, and script
// checks from a service. It returns a service registration object with the
// service and check IDs populated.
func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, workload *WorkloadServices) (
*ServiceRegistration, error) {
func (c *ServiceClient) serviceRegs(
ops *operations, service *structs.Service, workload *serviceregistration.WorkloadServices) (
*serviceregistration.ServiceRegistration, error) {
// Get the services ID
id := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
sreg := &ServiceRegistration{
serviceID: id,
checkIDs: make(map[string]struct{}, len(service.Checks)),
id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service)
sreg := &serviceregistration.ServiceRegistration{
ServiceID: id,
CheckIDs: make(map[string]struct{}, len(service.Checks)),
CheckOnUpdate: make(map[string]string, len(service.Checks)),
}
@ -1051,7 +948,8 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
}
// Determine the address to advertise based on the mode
ip, port, err := getAddress(addrMode, service.PortLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus)
ip, port, err := serviceregistration.GetAddress(
addrMode, service.PortLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus)
if err != nil {
return nil, fmt.Errorf("unable to get address for service %q: %v", service.Name, err)
}
@ -1135,7 +1033,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
Kind: kind,
ID: id,
Name: service.Name,
Namespace: workload.ConsulNamespace,
Namespace: workload.Namespace,
Tags: tags,
EnableTagOverride: service.EnableTagOverride,
Address: ip,
@ -1152,7 +1050,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
return nil, err
}
for _, registration := range checkRegs {
sreg.checkIDs[registration.ID] = struct{}{}
sreg.CheckIDs[registration.ID] = struct{}{}
ops.regChecks = append(ops.regChecks, registration)
}
@ -1161,7 +1059,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
// checkRegs creates check registrations for the given service
func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service,
workload *WorkloadServices, sreg *ServiceRegistration) ([]*api.AgentCheckRegistration, error) {
workload *serviceregistration.WorkloadServices, sreg *serviceregistration.ServiceRegistration) ([]*api.AgentCheckRegistration, error) {
registrations := make([]*api.AgentCheckRegistration, 0, len(service.Checks))
for _, check := range service.Checks {
@ -1181,14 +1079,15 @@ func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service,
}
var err error
ip, port, err = getAddress(addrMode, portLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus)
ip, port, err = serviceregistration.GetAddress(
addrMode, portLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus)
if err != nil {
return nil, fmt.Errorf("error getting address for check %q: %v", check.Name, err)
}
}
checkID := MakeCheckID(serviceID, check)
registration, err := createCheckReg(serviceID, checkID, check, ip, port, workload.ConsulNamespace)
registration, err := createCheckReg(serviceID, checkID, check, ip, port, workload.Namespace)
if err != nil {
return nil, fmt.Errorf("failed to add check %q: %v", check.Name, err)
}
@ -1205,15 +1104,15 @@ func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service,
// Checks will always use the IP from the Task struct (host's IP).
//
// Actual communication with Consul is done asynchronously (see Run).
func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
func (c *ServiceClient) RegisterWorkload(workload *serviceregistration.WorkloadServices) error {
// Fast path
numServices := len(workload.Services)
if numServices == 0 {
return nil
}
t := new(ServiceRegistrations)
t.Services = make(map[string]*ServiceRegistration, numServices)
t := new(serviceregistration.ServiceRegistrations)
t.Services = make(map[string]*serviceregistration.ServiceRegistration, numServices)
ops := &operations{}
for _, service := range workload.Services {
@ -1221,7 +1120,7 @@ func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
if err != nil {
return err
}
t.Services[sreg.serviceID] = sreg
t.Services[sreg.ServiceID] = sreg
}
// Add the workload to the allocation's registration
@ -1232,7 +1131,7 @@ func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
// Start watching checks. Done after service registrations are built
// since an error building them could leak watches.
for _, service := range workload.Services {
serviceID := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
serviceID := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service)
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := MakeCheckID(serviceID, check)
@ -1247,19 +1146,19 @@ func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
// changed.
//
// DriverNetwork must not change between invocations for the same allocation.
func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error {
func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.WorkloadServices) error {
ops := new(operations)
regs := new(ServiceRegistrations)
regs.Services = make(map[string]*ServiceRegistration, len(newWorkload.Services))
regs := new(serviceregistration.ServiceRegistrations)
regs.Services = make(map[string]*serviceregistration.ServiceRegistration, len(newWorkload.Services))
newIDs := make(map[string]*structs.Service, len(newWorkload.Services))
for _, s := range newWorkload.Services {
newIDs[MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), s)] = s
newIDs[serviceregistration.MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), s)] = s
}
// Loop over existing Services to see if they have been removed
for _, existingSvc := range old.Services {
existingID := MakeAllocServiceID(old.AllocID, old.Name(), existingSvc)
existingID := serviceregistration.MakeAllocServiceID(old.AllocID, old.Name(), existingSvc)
newSvc, ok := newIDs[existingID]
if !ok {
@ -1285,9 +1184,9 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
}
// Service still exists so add it to the task's registration
sreg := &ServiceRegistration{
serviceID: existingID,
checkIDs: make(map[string]struct{}, len(newSvc.Checks)),
sreg := &serviceregistration.ServiceRegistration{
ServiceID: existingID,
CheckIDs: make(map[string]struct{}, len(newSvc.Checks)),
CheckOnUpdate: make(map[string]string, len(newSvc.Checks)),
}
regs.Services[existingID] = sreg
@ -1305,7 +1204,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
// Check is still required. Remove it from the map so it doesn't get
// deleted later.
delete(existingChecks, checkID)
sreg.checkIDs[checkID] = struct{}{}
sreg.CheckIDs[checkID] = struct{}{}
sreg.CheckOnUpdate[checkID] = check.OnUpdate
}
@ -1316,7 +1215,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
}
for _, registration := range checkRegs {
sreg.checkIDs[registration.ID] = struct{}{}
sreg.CheckIDs[registration.ID] = struct{}{}
sreg.CheckOnUpdate[registration.ID] = check.OnUpdate
ops.regChecks = append(ops.regChecks, registration)
}
@ -1345,7 +1244,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
return err
}
regs.Services[sreg.serviceID] = sreg
regs.Services[sreg.ServiceID] = sreg
}
// Add the task to the allocation's registration
@ -1370,11 +1269,11 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
// RemoveWorkload from Consul. Removes all service entries and checks.
//
// Actual communication with Consul is done asynchronously (see Run).
func (c *ServiceClient) RemoveWorkload(workload *WorkloadServices) {
func (c *ServiceClient) RemoveWorkload(workload *serviceregistration.WorkloadServices) {
ops := operations{}
for _, service := range workload.Services {
id := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service)
ops.deregServices = append(ops.deregServices, id)
for _, check := range service.Checks {
@ -1406,7 +1305,7 @@ func normalizeNamespace(namespace string) string {
// AllocRegistrations returns the registrations for the given allocation. If the
// allocation has no registrations, the response is a nil object.
func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration, error) {
func (c *ServiceClient) AllocRegistrations(allocID string) (*serviceregistration.AllocRegistration, error) {
// Get the internal struct using the lock
c.allocRegistrationsLock.RLock()
regInternal, ok := c.allocRegistrations[allocID]
@ -1416,7 +1315,7 @@ func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration,
}
// Copy so we don't expose internal structs
reg := regInternal.copy()
reg := regInternal.Copy()
c.allocRegistrationsLock.RUnlock()
// Get the list of all namespaces created so we can iterate them.
@ -1451,7 +1350,7 @@ func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration,
for _, treg := range reg.Tasks {
for serviceID, sreg := range treg.Services {
sreg.Service = services[serviceID]
for checkID := range sreg.checkIDs {
for checkID := range sreg.CheckIDs {
if check, ok := checks[checkID]; ok {
sreg.Checks = append(sreg.Checks, check)
}
@ -1547,14 +1446,14 @@ func (c *ServiceClient) Shutdown() error {
}
// addRegistration adds the service registrations for the given allocation.
func (c *ServiceClient) addRegistrations(allocID, taskName string, reg *ServiceRegistrations) {
func (c *ServiceClient) addRegistrations(allocID, taskName string, reg *serviceregistration.ServiceRegistrations) {
c.allocRegistrationsLock.Lock()
defer c.allocRegistrationsLock.Unlock()
alloc, ok := c.allocRegistrations[allocID]
if !ok {
alloc = &AllocRegistration{
Tasks: make(map[string]*ServiceRegistrations),
alloc = &serviceregistration.AllocRegistration{
Tasks: make(map[string]*serviceregistration.ServiceRegistrations),
}
c.allocRegistrations[allocID] = alloc
}
@ -1592,14 +1491,6 @@ func makeAgentServiceID(role string, service *structs.Service) string {
return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, service.Hash(role, "", false))
}
// MakeAllocServiceID creates a unique ID for identifying an alloc service in
// Consul.
//
// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http-http
func MakeAllocServiceID(allocID, taskName string, service *structs.Service) string {
return fmt.Sprintf("%s%s-%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name, service.PortLabel)
}
// MakeCheckID creates a unique ID for a check.
//
// Example Check ID: _nomad-check-434ae42f9a57c5705344974ac38de2aee0ee089d
@ -1768,127 +1659,3 @@ func getNomadSidecar(id string, services map[string]*api.AgentService) *api.Agen
sidecarID := id + sidecarSuffix
return services[sidecarID]
}
// getAddress returns the IP and port to use for a service or check. If no port
// label is specified (an empty value), zero values are returned because no
// address could be resolved.
func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet *drivers.DriverNetwork, ports structs.AllocatedPorts, netStatus *structs.AllocNetworkStatus) (string, int, error) {
switch addrMode {
case structs.AddressModeAuto:
if driverNet.Advertise() {
addrMode = structs.AddressModeDriver
} else {
addrMode = structs.AddressModeHost
}
return getAddress(addrMode, portLabel, networks, driverNet, ports, netStatus)
case structs.AddressModeHost:
if portLabel == "" {
if len(networks) != 1 {
// If no networks are specified return zero
// values. Consul will advertise the host IP
// with no port. This is the pre-0.7.1 behavior
// some people rely on.
return "", 0, nil
}
return networks[0].IP, 0, nil
}
// Default path: use host ip:port
// Try finding port in the AllocatedPorts struct first
// Check in Networks struct for backwards compatibility if not found
mapping, ok := ports.Get(portLabel)
if !ok {
mapping = networks.Port(portLabel)
if mapping.Value > 0 {
return mapping.HostIP, mapping.Value, nil
}
// If port isn't a label, try to parse it as a literal port number
port, err := strconv.Atoi(portLabel)
if err != nil {
// Don't include Atoi error message as user likely
// never intended it to be a numeric and it creates a
// confusing error message
return "", 0, fmt.Errorf("invalid port %q: port label not found", portLabel)
}
if port <= 0 {
return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel)
}
// A number was given which will use the Consul agent's address and the given port
// Returning a blank string as an address will use the Consul agent's address
return "", port, nil
}
return mapping.HostIP, mapping.Value, nil
case structs.AddressModeDriver:
// Require a driver network if driver address mode is used
if driverNet == nil {
return "", 0, fmt.Errorf(`cannot use address_mode="driver": no driver network exists`)
}
// If no port label is specified just return the IP
if portLabel == "" {
return driverNet.IP, 0, nil
}
// If the port is a label, use the driver's port (not the host's)
if port, ok := ports.Get(portLabel); ok {
return driverNet.IP, port.To, nil
}
// Check if old style driver portmap is used
if port, ok := driverNet.PortMap[portLabel]; ok {
return driverNet.IP, port, nil
}
// If port isn't a label, try to parse it as a literal port number
port, err := strconv.Atoi(portLabel)
if err != nil {
// Don't include Atoi error message as user likely
// never intended it to be a numeric and it creates a
// confusing error message
return "", 0, fmt.Errorf("invalid port label %q: port labels in driver address_mode must be numeric or in the driver's port map", portLabel)
}
if port <= 0 {
return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel)
}
return driverNet.IP, port, nil
case structs.AddressModeAlloc:
if netStatus == nil {
return "", 0, fmt.Errorf(`cannot use address_mode="alloc": no allocation network status reported`)
}
// If no port label is specified just return the IP
if portLabel == "" {
return netStatus.Address, 0, nil
}
// If port is a label and is found then return it
if port, ok := ports.Get(portLabel); ok {
// Use port.To value unless not set
if port.To > 0 {
return netStatus.Address, port.To, nil
}
return netStatus.Address, port.Value, nil
}
// Check if port is a literal number
port, err := strconv.Atoi(portLabel)
if err != nil {
// User likely specified wrong port label here
return "", 0, fmt.Errorf("invalid port %q: port label not found or is not numeric", portLabel)
}
if port <= 0 {
return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel)
}
return netStatus.Address, port, nil
default:
// Shouldn't happen due to validation, but enforce invariants
return "", 0, fmt.Errorf("invalid address mode %q", addrMode)
}
}

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
@ -393,7 +394,7 @@ func TestServiceRegistration_CheckOnUpdate(t *testing.T) {
sc := NewServiceClient(mockAgent, namespacesClient, logger, true)
allocID := uuid.Generate()
ws := &WorkloadServices{
ws := &serviceregistration.WorkloadServices{
AllocID: allocID,
Task: "taskname",
Restarter: &restartRecorder{},
@ -444,7 +445,7 @@ func TestServiceRegistration_CheckOnUpdate(t *testing.T) {
}
// Update
wsUpdate := new(WorkloadServices)
wsUpdate := new(serviceregistration.WorkloadServices)
*wsUpdate = *ws
wsUpdate.Services[0].Checks[0].OnUpdate = structs.OnUpdateRequireHealthy

View File

@ -1,64 +1,22 @@
package consul
import (
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
// WorkloadServices describes services defined in either a Task or TaskGroup
// that need to be syncronized with Consul.
type WorkloadServices struct {
AllocID string
// Name of the task and task group the services are defined for. For
// group based services, Task will be empty.
Task string
Group string
// Canary indicates whether or not the allocation is a canary.
Canary bool
// ConsulNamespace is the consul namespace in which services will be registered.
ConsulNamespace string
// Restarter allows restarting the task or task group depending on the
// check_restart stanzas.
Restarter WorkloadRestarter
// Services and checks to register for the task.
Services []*structs.Service
// Networks from the task's resources stanza.
// TODO: remove and use Ports
Networks structs.Networks
// NetworkStatus from alloc if network namespace is created.
// Can be nil.
NetworkStatus *structs.AllocNetworkStatus
// AllocatedPorts is the list of port mappings.
Ports structs.AllocatedPorts
// DriverExec is the script executor for the task's driver.
// For group services this is nil and script execution is managed by
// a tasklet in the taskrunner script_check_hook.
DriverExec interfaces.ScriptExecutor
// DriverNetwork is the network specified by the driver and may be nil.
DriverNetwork *drivers.DriverNetwork
}
func BuildAllocServices(node *structs.Node, alloc *structs.Allocation, restarter WorkloadRestarter) *WorkloadServices {
func BuildAllocServices(
node *structs.Node, alloc *structs.Allocation, restarter WorkloadRestarter) *serviceregistration.WorkloadServices {
//TODO(schmichael) only support one network for now
net := alloc.AllocatedResources.Shared.Networks[0]
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
ws := &WorkloadServices{
ws := &serviceregistration.WorkloadServices{
AllocID: alloc.ID,
Group: alloc.TaskGroup,
Services: taskenv.InterpolateServices(taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build(), tg.Services),
@ -82,24 +40,3 @@ func BuildAllocServices(node *structs.Node, alloc *structs.Allocation, restarter
return ws
}
// Copy method for easing tests
func (ws *WorkloadServices) Copy() *WorkloadServices {
newTS := new(WorkloadServices)
*newTS = *ws
// Deep copy Services
newTS.Services = make([]*structs.Service, len(ws.Services))
for i := range ws.Services {
newTS.Services[i] = ws.Services[i].Copy()
}
return newTS
}
func (ws *WorkloadServices) Name() string {
if ws.Task != "" {
return ws.Task
}
return "group-" + ws.Group
}

View File

@ -11,12 +11,12 @@ import (
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -26,8 +26,8 @@ const (
yPort = 1235
)
func testWorkload() *WorkloadServices {
return &WorkloadServices{
func testWorkload() *serviceregistration.WorkloadServices {
return &serviceregistration.WorkloadServices{
AllocID: uuid.Generate(),
Task: "taskname",
Restarter: &restartRecorder{},
@ -65,7 +65,7 @@ func (r *restartRecorder) Restart(ctx context.Context, event *structs.TaskEvent,
type testFakeCtx struct {
ServiceClient *ServiceClient
FakeConsul *MockAgent
Workload *WorkloadServices
Workload *serviceregistration.WorkloadServices
}
var errNoOps = fmt.Errorf("testing error: no pending operations")
@ -502,8 +502,8 @@ func TestConsul_ChangeChecks(t *testing.T) {
t.Fatalf("service ID changed")
}
for newID := range sreg.checkIDs {
if _, ok := otherServiceReg.checkIDs[newID]; ok {
for newID := range sreg.CheckIDs {
if _, ok := otherServiceReg.CheckIDs[newID]; ok {
t.Fatalf("check IDs should change")
}
}
@ -1349,361 +1349,6 @@ func TestCreateCheckReg_GRPC(t *testing.T) {
require.Equal(t, expected, actual)
}
// TestGetAddress asserts Nomad uses the correct ip and port for services and
// checks depending on port labels, driver networks, and address mode.
func TestGetAddress(t *testing.T) {
const HostIP = "127.0.0.1"
cases := []struct {
Name string
// Parameters
Mode string
PortLabel string
Host map[string]int // will be converted to structs.Networks
Driver *drivers.DriverNetwork
Ports structs.AllocatedPorts
Status *structs.AllocNetworkStatus
// Results
ExpectedIP string
ExpectedPort int
ExpectedErr string
}{
// Valid Configurations
{
Name: "ExampleService",
Mode: structs.AddressModeAuto,
PortLabel: "db",
Host: map[string]int{"db": 12435},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedIP: HostIP,
ExpectedPort: 12435,
},
{
Name: "Host",
Mode: structs.AddressModeHost,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedIP: HostIP,
ExpectedPort: 12345,
},
{
Name: "Driver",
Mode: structs.AddressModeDriver,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 6379,
},
{
Name: "AutoDriver",
Mode: structs.AddressModeAuto,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
AutoAdvertise: true,
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 6379,
},
{
Name: "DriverCustomPort",
Mode: structs.AddressModeDriver,
PortLabel: "7890",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 7890,
},
// Invalid Configurations
{
Name: "DriverWithoutNetwork",
Mode: structs.AddressModeDriver,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: nil,
ExpectedErr: "no driver network exists",
},
{
Name: "DriverBadPort",
Mode: structs.AddressModeDriver,
PortLabel: "bad-port-label",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedErr: "invalid port",
},
{
Name: "DriverZeroPort",
Mode: structs.AddressModeDriver,
PortLabel: "0",
Driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
},
ExpectedErr: "invalid port",
},
{
Name: "HostBadPort",
Mode: structs.AddressModeHost,
PortLabel: "bad-port-label",
ExpectedErr: "invalid port",
},
{
Name: "InvalidMode",
Mode: "invalid-mode",
PortLabel: "80",
ExpectedErr: "invalid address mode",
},
{
Name: "NoPort_AutoMode",
Mode: structs.AddressModeAuto,
ExpectedIP: HostIP,
},
{
Name: "NoPort_HostMode",
Mode: structs.AddressModeHost,
ExpectedIP: HostIP,
},
{
Name: "NoPort_DriverMode",
Mode: structs.AddressModeDriver,
Driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
},
ExpectedIP: "10.1.2.3",
},
// Scenarios using port 0.12 networking fields (NetworkStatus, AllocatedPortMapping)
{
Name: "ExampleServer_withAllocatedPorts",
Mode: structs.AddressModeAuto,
PortLabel: "db",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12435,
To: 6379,
HostIP: HostIP,
},
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: HostIP,
ExpectedPort: 12435,
},
{
Name: "Host_withAllocatedPorts",
Mode: structs.AddressModeHost,
PortLabel: "db",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: HostIP,
ExpectedPort: 12345,
},
{
Name: "Driver_withAllocatedPorts",
Mode: structs.AddressModeDriver,
PortLabel: "db",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
Driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 6379,
},
{
Name: "AutoDriver_withAllocatedPorts",
Mode: structs.AddressModeAuto,
PortLabel: "db",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
Driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
AutoAdvertise: true,
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 6379,
},
{
Name: "DriverCustomPort_withAllocatedPorts",
Mode: structs.AddressModeDriver,
PortLabel: "7890",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
Driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 7890,
},
{
Name: "Host_MultiHostInterface",
Mode: structs.AddressModeAuto,
PortLabel: "db",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: "127.0.0.100",
},
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: "127.0.0.100",
ExpectedPort: 12345,
},
{
Name: "Alloc",
Mode: structs.AddressModeAlloc,
PortLabel: "db",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: "172.26.0.1",
ExpectedPort: 6379,
},
{
Name: "Alloc no to value",
Mode: structs.AddressModeAlloc,
PortLabel: "db",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
HostIP: HostIP,
},
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: "172.26.0.1",
ExpectedPort: 12345,
},
{
Name: "AllocCustomPort",
Mode: structs.AddressModeAlloc,
PortLabel: "6379",
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: "172.26.0.1",
ExpectedPort: 6379,
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
// convert host port map into a structs.Networks
networks := []*structs.NetworkResource{
{
IP: HostIP,
ReservedPorts: make([]structs.Port, len(tc.Host)),
},
}
i := 0
for label, port := range tc.Host {
networks[0].ReservedPorts[i].Label = label
networks[0].ReservedPorts[i].Value = port
i++
}
// Run getAddress
ip, port, err := getAddress(tc.Mode, tc.PortLabel, networks, tc.Driver, tc.Ports, tc.Status)
// Assert the results
assert.Equal(t, tc.ExpectedIP, ip, "IP mismatch")
assert.Equal(t, tc.ExpectedPort, port, "Port mismatch")
if tc.ExpectedErr == "" {
assert.Nil(t, err)
} else {
if err == nil {
t.Fatalf("expected error containing %q but err=nil", tc.ExpectedErr)
} else {
assert.Contains(t, err.Error(), tc.ExpectedErr)
}
}
})
}
}
func TestConsul_ServiceName_Duplicates(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
@ -1789,7 +1434,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
remainingWorkloadServiceID := MakeAllocServiceID(remainingWorkload.AllocID,
remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocID,
remainingWorkload.Name(), remainingWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload))
@ -1812,7 +1457,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
explicitlyRemovedWorkloadServiceID := MakeAllocServiceID(explicitlyRemovedWorkload.AllocID,
explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocID,
explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload))
@ -1837,7 +1482,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
outofbandWorkloadServiceID := MakeAllocServiceID(outofbandWorkload.AllocID,
outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocID,
outofbandWorkload.Name(), outofbandWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))
@ -1898,7 +1543,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
remainingWorkloadServiceID := MakeAllocServiceID(remainingWorkload.AllocID,
remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocID,
remainingWorkload.Name(), remainingWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload))
@ -1921,7 +1566,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
explicitlyRemovedWorkloadServiceID := MakeAllocServiceID(explicitlyRemovedWorkload.AllocID,
explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocID,
explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload))
@ -1946,7 +1591,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
outofbandWorkloadServiceID := MakeAllocServiceID(outofbandWorkload.AllocID,
outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocID,
outofbandWorkload.Name(), outofbandWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))