connect: register group services with Consul

Fixes #6042

Add new task group service hook for registering group services like
Connect-enabled services.

Does not yet support checks.
This commit is contained in:
Michael Schurter 2019-08-14 15:02:00 -07:00
parent c02a493008
commit b008fd1724
9 changed files with 542 additions and 6 deletions

View File

@ -121,6 +121,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
newAllocHealthWatcherHook(hookLogger, ar.Alloc(), hs, ar.Listener(), ar.consulClient),
newNetworkHook(hookLogger, ns, ar.Alloc(), nm, nc),
newGroupServiceHook(hookLogger, ar.Alloc(), ar.consulClient),
}
return nil

View File

@ -117,11 +117,13 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
// 2 removals (canary+noncanary) during prekill
// 2 removals (canary+noncanary) during exited
// 2 removals (canary+noncanary) during stop
// 1 remove group during stop
consulOps := conf2.Consul.(*consul.MockConsulServiceClient).GetOps()
require.Len(t, consulOps, 6)
for _, op := range consulOps {
require.Len(t, consulOps, 7)
for _, op := range consulOps[:6] {
require.Equal(t, "remove", op.Op)
}
require.Equal(t, "remove_group", consulOps[6].Op)
// Assert terminated task event was emitted
events := ar2.AllocState().TaskStates[task.Name].Events

View File

@ -0,0 +1,66 @@
package allocrunner
import (
"sync"
hclog "github.com/hashicorp/go-hclog"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/nomad/structs"
)
// groupServiceHook manages task group Consul service registration and
// deregistration.
type groupServiceHook struct {
alloc *structs.Allocation
consulClient consul.ConsulServiceAPI
prerun bool
mu sync.Mutex
logger log.Logger
}
func newGroupServiceHook(logger hclog.Logger, alloc *structs.Allocation, consulClient consul.ConsulServiceAPI) *groupServiceHook {
h := &groupServiceHook{
alloc: alloc,
consulClient: consulClient,
}
h.logger = logger.Named(h.Name())
return h
}
func (*groupServiceHook) Name() string {
return "group_services"
}
func (h *groupServiceHook) Prerun() error {
h.mu.Lock()
defer func() {
// Mark prerun as true to unblock Updates
h.prerun = true
h.mu.Unlock()
}()
return h.consulClient.RegisterGroup(h.alloc)
}
func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.mu.Lock()
defer h.mu.Unlock()
oldAlloc := h.alloc
h.alloc = req.Alloc
if !h.prerun {
// Update called before Prerun. Update alloc and exit to allow
// Prerun to do initial registration.
return nil
}
return h.consulClient.UpdateGroup(oldAlloc, h.alloc)
}
func (h *groupServiceHook) Postrun() error {
h.mu.Lock()
defer h.mu.Unlock()
return h.consulClient.RemoveGroup(h.alloc)
}

View File

@ -0,0 +1,119 @@
package allocrunner
import (
"testing"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerUpdateHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerPostrunHook = (*groupServiceHook)(nil)
// TestGroupServiceHook_NoGroupServices asserts calling group service hooks
// without group services does not error.
func TestGroupServiceHook_NoGroupServices(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
logger := testlog.HCLogger(t)
consulClient := consul.NewMockConsulServiceClient(t, logger)
h := newGroupServiceHook(logger, alloc, consulClient)
require.NoError(t, h.Prerun())
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.NoError(t, h.Update(req))
require.NoError(t, h.Postrun())
ops := consulClient.GetOps()
require.Len(t, ops, 3)
require.Equal(t, "add_group", ops[0].Op)
require.Equal(t, "update_group", ops[1].Op)
require.Equal(t, "remove_group", ops[2].Op)
}
// TestGroupServiceHook_GroupServices asserts group service hooks with group
// services does not error.
func TestGroupServiceHook_GroupServices(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
alloc.AllocatedResources.Shared.Networks = []*structs.NetworkResource{
{
Mode: "bridge",
IP: "10.0.0.1",
DynamicPorts: []structs.Port{
{
Label: "connect-proxy-testconnect",
Value: 9999,
To: 9998,
},
},
},
}
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
tg.Services = []*structs.Service{
{
Name: "testconnect",
PortLabel: "9999",
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{},
},
},
}
logger := testlog.HCLogger(t)
consulClient := consul.NewMockConsulServiceClient(t, logger)
h := newGroupServiceHook(logger, alloc, consulClient)
require.NoError(t, h.Prerun())
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.NoError(t, h.Update(req))
require.NoError(t, h.Postrun())
ops := consulClient.GetOps()
require.Len(t, ops, 3)
require.Equal(t, "add_group", ops[0].Op)
require.Equal(t, "update_group", ops[1].Op)
require.Equal(t, "remove_group", ops[2].Op)
}
// TestGroupServiceHook_Error asserts group service hooks with group
// services but no group network returns an error.
func TestGroupServiceHook_Error(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
tg.Services = []*structs.Service{
{
Name: "testconnect",
PortLabel: "9999",
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{},
},
},
}
logger := testlog.HCLogger(t)
// No need to set Consul client or call Run. This hould fail before
// attempting to register.
consulClient := agentconsul.NewServiceClient(nil, logger, false)
h := newGroupServiceHook(logger, alloc, consulClient)
require.Error(t, h.Prerun())
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.Error(t, h.Update(req))
require.Error(t, h.Postrun())
}

View File

@ -2,11 +2,15 @@ package consul
import (
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
)
// ConsulServiceAPI is the interface the Nomad Client uses to register and
// remove services and checks from Consul.
type ConsulServiceAPI interface {
RegisterGroup(*structs.Allocation) error
RemoveGroup(*structs.Allocation) error
UpdateGroup(oldAlloc, newAlloc *structs.Allocation) error
RegisterTask(*consul.TaskServices) error
RemoveTask(*consul.TaskServices)
UpdateTask(old, newTask *consul.TaskServices) error

View File

@ -7,6 +7,7 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
testing "github.com/mitchellh/go-testing-interface"
)
@ -14,17 +15,20 @@ import (
type MockConsulOp struct {
Op string // add, remove, or update
AllocID string
Task string
Name string // task or group name
}
func NewMockConsulOp(op, allocID, task string) MockConsulOp {
if op != "add" && op != "remove" && op != "update" && op != "alloc_registrations" {
func NewMockConsulOp(op, allocID, name string) MockConsulOp {
switch op {
case "add", "remove", "update", "alloc_registrations",
"add_group", "remove_group", "update_group":
default:
panic(fmt.Errorf("invalid consul op: %s", op))
}
return MockConsulOp{
Op: op,
AllocID: allocID,
Task: task,
Name: name,
}
}
@ -50,6 +54,33 @@ func NewMockConsulServiceClient(t testing.T, logger log.Logger) *MockConsulServi
return &m
}
func (m *MockConsulServiceClient) RegisterGroup(alloc *structs.Allocation) error {
m.mu.Lock()
defer m.mu.Unlock()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
m.logger.Trace("RegisterGroup", "alloc_id", alloc.ID, "num_services", len(tg.Services))
m.ops = append(m.ops, NewMockConsulOp("add_group", alloc.ID, alloc.TaskGroup))
return nil
}
func (m *MockConsulServiceClient) UpdateGroup(_, alloc *structs.Allocation) error {
m.mu.Lock()
defer m.mu.Unlock()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
m.logger.Trace("UpdateGroup", "alloc_id", alloc.ID, "num_services", len(tg.Services))
m.ops = append(m.ops, NewMockConsulOp("update_group", alloc.ID, alloc.TaskGroup))
return nil
}
func (m *MockConsulServiceClient) RemoveGroup(alloc *structs.Allocation) error {
m.mu.Lock()
defer m.mu.Unlock()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
m.logger.Trace("RemoveGroup", "alloc_id", alloc.ID, "num_services", len(tg.Services))
m.ops = append(m.ops, NewMockConsulOp("remove_group", alloc.ID, alloc.TaskGroup))
return nil
}
func (m *MockConsulServiceClient) UpdateTask(old, newSvcs *consul.TaskServices) error {
m.mu.Lock()
defer m.mu.Unlock()

View File

@ -722,6 +722,12 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
copy(tags, service.Tags)
}
// newConnect returns (nil, nil) if there's no Connect-enabled service.
connect, err := newConnect(service.Name, service.Connect, task.Networks)
if err != nil {
return nil, fmt.Errorf("invalid Consul Connect configuration for service %q: %v", service.Name, err)
}
// Build the Consul Service registration request
serviceReg := &api.AgentServiceRegistration{
ID: id,
@ -733,6 +739,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
Meta: map[string]string{
"external-source": "nomad",
},
Connect: connect, // will be nil if no Connect stanza
}
ops.regServices = append(ops.regServices, serviceReg)
@ -807,6 +814,117 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st
return checkIDs, nil
}
//TODO(schmichael) remove
type noopRestarter struct{}
func (noopRestarter) Restart(context.Context, *structs.TaskEvent, bool) error { return nil }
// makeAllocTaskServices creates a TaskServices struct for a group service.
//
//TODO(schmichael) rename TaskServices and refactor this into a New method
func makeAllocTaskServices(alloc *structs.Allocation, tg *structs.TaskGroup) (*TaskServices, error) {
if n := len(alloc.AllocatedResources.Shared.Networks); n == 0 {
return nil, fmt.Errorf("unable to register a group service without a group network")
}
//TODO(schmichael) only support one network for now
net := alloc.AllocatedResources.Shared.Networks[0]
ts := &TaskServices{
AllocID: alloc.ID,
Name: "group-" + alloc.TaskGroup,
Services: tg.Services,
Networks: alloc.AllocatedResources.Shared.Networks,
//TODO(schmichael) there's probably a better way than hacking driver network
DriverNetwork: &drivers.DriverNetwork{
AutoAdvertise: true,
IP: net.IP,
// Copy PortLabels from group network
PortMap: net.PortLabels(),
},
// unsupported for group services
Restarter: noopRestarter{},
DriverExec: nil,
}
if alloc.DeploymentStatus != nil {
ts.Canary = alloc.DeploymentStatus.Canary
}
return ts, nil
}
// RegisterGroup services with Consul. Adds all task group-level service
// entries and checks to Consul.
func (c *ServiceClient) RegisterGroup(alloc *structs.Allocation) error {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
return fmt.Errorf("task group %q not in allocation", alloc.TaskGroup)
}
if len(tg.Services) == 0 {
// noop
return nil
}
ts, err := makeAllocTaskServices(alloc, tg)
if err != nil {
return err
}
return c.RegisterTask(ts)
}
// UpdateGroup services with Consul. Updates all task group-level service
// entries and checks to Consul.
func (c *ServiceClient) UpdateGroup(oldAlloc, newAlloc *structs.Allocation) error {
oldTG := oldAlloc.Job.LookupTaskGroup(oldAlloc.TaskGroup)
if oldTG == nil {
return fmt.Errorf("task group %q not in old allocation", oldAlloc.TaskGroup)
}
oldServices, err := makeAllocTaskServices(oldAlloc, oldTG)
if err != nil {
return err
}
newTG := newAlloc.Job.LookupTaskGroup(newAlloc.TaskGroup)
if newTG == nil {
return fmt.Errorf("task group %q not in new allocation", newAlloc.TaskGroup)
}
newServices, err := makeAllocTaskServices(newAlloc, newTG)
if err != nil {
return err
}
return c.UpdateTask(oldServices, newServices)
}
// RemoveGroup services with Consul. Removes all task group-level service
// entries and checks from Consul.
func (c *ServiceClient) RemoveGroup(alloc *structs.Allocation) error {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
return fmt.Errorf("task group %q not in allocation", alloc.TaskGroup)
}
if len(tg.Services) == 0 {
// noop
return nil
}
ts, err := makeAllocTaskServices(alloc, tg)
if err != nil {
return err
}
c.RemoveTask(ts)
return nil
}
// RegisterTask with Consul. Adds all service entries and checks to Consul. If
// exec is nil and a script check exists an error is returned.
//
@ -1314,3 +1432,81 @@ func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet
return "", 0, fmt.Errorf("invalid address mode %q", addrMode)
}
}
// newConnect creates a new Consul AgentServiceConnect struct based on a Nomad
// Connect struct. If the nomad Connect struct is nil, nil will be returned to
// disable Connect for this service.
func newConnect(serviceName string, nc *structs.ConsulConnect, networks structs.Networks) (*api.AgentServiceConnect, error) {
if nc == nil {
// No Connect stanza, returning nil is fine
return nil, nil
}
cc := &api.AgentServiceConnect{
Native: nc.Native,
}
if nc.SidecarService == nil {
return cc, nil
}
net, port, err := getConnectPort(serviceName, networks)
if err != nil {
return nil, err
}
// Bind to netns IP(s):port
proxyConfig := map[string]interface{}{}
if nc.SidecarService.Proxy != nil && nc.SidecarService.Proxy.Config != nil {
proxyConfig = nc.SidecarService.Proxy.Config
}
proxyConfig["bind_address"] = "0.0.0.0"
proxyConfig["bind_port"] = port.To
// Advertise host IP:port
cc.SidecarService = &api.AgentServiceRegistration{
Address: net.IP,
Port: port.Value,
// Automatically configure the proxy to bind to all addresses
// within the netns.
Proxy: &api.AgentServiceConnectProxyConfig{
Config: proxyConfig,
},
}
// If no further proxy settings were explicitly configured, exit early
if nc.SidecarService.Proxy == nil {
return cc, nil
}
numUpstreams := len(nc.SidecarService.Proxy.Upstreams)
if numUpstreams == 0 {
return cc, nil
}
upstreams := make([]api.Upstream, numUpstreams)
for i, nu := range nc.SidecarService.Proxy.Upstreams {
upstreams[i].DestinationName = nu.DestinationName
upstreams[i].LocalBindPort = nu.LocalBindPort
}
cc.SidecarService.Proxy.Upstreams = upstreams
return cc, nil
}
// getConnectPort returns the network and port for the Connect proxy sidecar
// defined for this service. An error is returned if the network and port
// cannot be determined.
func getConnectPort(serviceName string, networks structs.Networks) (*structs.NetworkResource, structs.Port, error) {
if n := len(networks); n != 1 {
return nil, structs.Port{}, fmt.Errorf("Connect only supported with exactly 1 network (found %d)", n)
}
port, ok := networks[0].PortForService(serviceName)
if !ok {
return nil, structs.Port{}, fmt.Errorf("No Connect port defined for service %q", serviceName)
}
return networks[0], port, nil
}

View File

@ -0,0 +1,99 @@
package consul
import (
"io/ioutil"
"testing"
"time"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestConsul_Connect(t *testing.T) {
// Create an embedded Consul server
testconsul, err := testutil.NewTestServerConfig(func(c *testutil.TestServerConfig) {
// If -v wasn't specified squelch consul logging
if !testing.Verbose() {
c.Stdout = ioutil.Discard
c.Stderr = ioutil.Discard
}
})
if err != nil {
t.Fatalf("error starting test consul server: %v", err)
}
defer testconsul.Stop()
consulConfig := consulapi.DefaultConfig()
consulConfig.Address = testconsul.HTTPAddr
consulClient, err := consulapi.NewClient(consulConfig)
require.NoError(t, err)
serviceClient := NewServiceClient(consulClient.Agent(), testlog.HCLogger(t), true)
go serviceClient.Run()
alloc := mock.Alloc()
alloc.AllocatedResources.Shared.Networks = []*structs.NetworkResource{
{
Mode: "bridge",
IP: "10.0.0.1",
DynamicPorts: []structs.Port{
{
Label: "connect-proxy-testconnect",
Value: 9999,
To: 9998,
},
},
},
}
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
tg.Services = []*structs.Service{
{
Name: "testconnect",
PortLabel: "9999",
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{},
},
},
}
require.NoError(t, serviceClient.RegisterGroup(alloc))
require.Eventually(t, func() bool {
services, err := consulClient.Agent().Services()
require.NoError(t, err)
return len(services) == 2
}, 3*time.Second, 100*time.Millisecond)
services, err := consulClient.Agent().Services()
require.NoError(t, err)
require.Len(t, services, 2)
serviceID := makeTaskServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0], false)
connectID := serviceID + "-sidecar-proxy"
require.Contains(t, services, serviceID)
agentService := services[serviceID]
require.Equal(t, agentService.Service, "testconnect")
require.Equal(t, agentService.Address, "10.0.0.1")
require.Equal(t, agentService.Port, 9999)
require.Nil(t, agentService.Connect)
require.Nil(t, agentService.Proxy)
require.Contains(t, services, connectID)
connectService := services[connectID]
require.Equal(t, connectService.Service, "testconnect-sidecar-proxy")
require.Equal(t, connectService.Address, "10.0.0.1")
require.Equal(t, connectService.Port, 9999)
require.Nil(t, connectService.Connect)
require.Equal(t, connectService.Proxy.DestinationServiceName, "testconnect")
require.Equal(t, connectService.Proxy.DestinationServiceID, serviceID)
require.Equal(t, connectService.Proxy.LocalServiceAddress, "127.0.0.1")
require.Equal(t, connectService.Proxy.LocalServicePort, 9999)
require.Equal(t, connectService.Proxy.Config, map[string]interface{}{
"bind_address": "0.0.0.0",
"bind_port": float64(9998),
})
}

View File

@ -2151,6 +2151,24 @@ func (n *NetworkResource) PortLabels() map[string]int {
return labelValues
}
// ConnectPort returns the Connect port for the given service. Returns false if
// no port was found for a service with that name.
func (n *NetworkResource) PortForService(serviceName string) (Port, bool) {
label := fmt.Sprintf("%s-%s", ConnectProxyPrefix, serviceName)
for _, port := range n.ReservedPorts {
if port.Label == label {
return port, true
}
}
for _, port := range n.DynamicPorts {
if port.Label == label {
return port, true
}
}
return Port{}, false
}
// Networks defined for a task on the Resources struct.
type Networks []*NetworkResource