2019-09-03 15:43:38 +00:00
|
|
|
package allocrunner
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"net"
|
|
|
|
"os"
|
|
|
|
"path/filepath"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
hclog "github.com/hashicorp/go-hclog"
|
|
|
|
"github.com/hashicorp/nomad/client/allocdir"
|
|
|
|
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
|
|
"github.com/hashicorp/nomad/nomad/structs/config"
|
|
|
|
)
|
|
|
|
|
|
|
|
// consulSockHook creates Unix sockets to allow communication from inside a
|
|
|
|
// netns to Consul.
|
|
|
|
//
|
|
|
|
// Noop for allocations without a group Connect stanza.
|
|
|
|
type consulSockHook struct {
|
|
|
|
alloc *structs.Allocation
|
|
|
|
|
|
|
|
proxy *sockProxy
|
|
|
|
|
|
|
|
// mu synchronizes group & cancel as they may be mutated and accessed
|
|
|
|
// concurrently via Prerun, Update, Postrun.
|
|
|
|
mu sync.Mutex
|
|
|
|
|
|
|
|
logger hclog.Logger
|
|
|
|
}
|
|
|
|
|
|
|
|
func newConsulSockHook(logger hclog.Logger, alloc *structs.Allocation, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *consulSockHook {
|
|
|
|
h := &consulSockHook{
|
|
|
|
alloc: alloc,
|
|
|
|
proxy: newSockProxy(logger, allocDir, config),
|
|
|
|
}
|
|
|
|
h.logger = logger.Named(h.Name())
|
|
|
|
return h
|
|
|
|
}
|
|
|
|
|
|
|
|
func (*consulSockHook) Name() string {
|
|
|
|
return "consul_socket"
|
|
|
|
}
|
|
|
|
|
|
|
|
// shouldRun returns true if the Unix socket should be created and proxied.
|
|
|
|
// Requires the mutex to be held.
|
|
|
|
func (h *consulSockHook) shouldRun() bool {
|
|
|
|
tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup)
|
|
|
|
for _, s := range tg.Services {
|
|
|
|
if s.Connect != nil {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *consulSockHook) Prerun() error {
|
|
|
|
h.mu.Lock()
|
|
|
|
defer h.mu.Unlock()
|
|
|
|
|
|
|
|
if !h.shouldRun() {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return h.proxy.run(h.alloc)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Update creates a gRPC socket file and proxy if there are any Connect
|
|
|
|
// services.
|
|
|
|
func (h *consulSockHook) Update(req *interfaces.RunnerUpdateRequest) error {
|
|
|
|
h.mu.Lock()
|
|
|
|
defer h.mu.Unlock()
|
|
|
|
|
|
|
|
h.alloc = req.Alloc
|
|
|
|
|
|
|
|
if !h.shouldRun() {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return h.proxy.run(h.alloc)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *consulSockHook) Postrun() error {
|
|
|
|
h.mu.Lock()
|
|
|
|
defer h.mu.Unlock()
|
|
|
|
|
|
|
|
if err := h.proxy.stop(); err != nil {
|
|
|
|
// Only log failures to stop proxies. Worst case scenario is a
|
|
|
|
// small goroutine leak.
|
|
|
|
h.logger.Debug("error stopping Consul proxy", "error", err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type sockProxy struct {
|
|
|
|
allocDir *allocdir.AllocDir
|
|
|
|
config *config.ConsulConfig
|
|
|
|
|
|
|
|
ctx context.Context
|
|
|
|
cancel func()
|
|
|
|
doneCh chan struct{}
|
|
|
|
runOnce bool
|
|
|
|
|
|
|
|
logger hclog.Logger
|
|
|
|
}
|
|
|
|
|
|
|
|
func newSockProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *sockProxy {
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
return &sockProxy{
|
|
|
|
allocDir: allocDir,
|
|
|
|
config: config,
|
|
|
|
ctx: ctx,
|
|
|
|
cancel: cancel,
|
|
|
|
doneCh: make(chan struct{}),
|
|
|
|
logger: logger,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// run socket proxy if allocation requires it, it isn't already running, and it
|
|
|
|
// hasn't been told to stop.
|
|
|
|
//
|
|
|
|
// NOT safe for concurrent use.
|
|
|
|
func (s *sockProxy) run(alloc *structs.Allocation) error {
|
|
|
|
// Only run once.
|
|
|
|
if s.runOnce {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Only run once. Never restart.
|
|
|
|
select {
|
|
|
|
case <-s.doneCh:
|
|
|
|
s.logger.Trace("socket proxy already shutdown; exiting")
|
|
|
|
return nil
|
|
|
|
case <-s.ctx.Done():
|
|
|
|
s.logger.Trace("socket proxy already done; exiting")
|
|
|
|
return nil
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
destAddr := s.config.GRPCAddr
|
|
|
|
if destAddr == "" {
|
|
|
|
// No GRPCAddr defined. Use Addr but replace port with the gRPC
|
|
|
|
// default of 8502.
|
|
|
|
host, _, err := net.SplitHostPort(s.config.Addr)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error parsing Consul address %q: %v",
|
|
|
|
s.config.Addr, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
destAddr = net.JoinHostPort(host, "8502")
|
|
|
|
}
|
|
|
|
|
|
|
|
hostGRPCSockPath := filepath.Join(s.allocDir.AllocDir, allocdir.AllocGRPCSocket)
|
2019-09-19 18:37:20 +00:00
|
|
|
|
|
|
|
// if the socket already exists we'll try to remove it, but if not then any
|
|
|
|
// other errors will bubble up to the caller here or when we try to listen
|
|
|
|
_, err := os.Stat(hostGRPCSockPath)
|
|
|
|
if err == nil {
|
|
|
|
err := os.Remove(hostGRPCSockPath)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf(
|
|
|
|
"unable to remove existing unix socket for Consul gRPC endpoint: %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-09-03 15:43:38 +00:00
|
|
|
listener, err := net.Listen("unix", hostGRPCSockPath)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("unable to create unix socket for Consul gRPC endpoint: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// The gRPC socket should be usable by all users in case a task is
|
|
|
|
// running as an unprivileged user. Unix does not allow setting domain
|
|
|
|
// socket permissions when creating the file, so we must manually call
|
|
|
|
// chmod afterwards.
|
|
|
|
// https://github.com/golang/go/issues/11822
|
|
|
|
if err := os.Chmod(hostGRPCSockPath, os.ModePerm); err != nil {
|
|
|
|
return fmt.Errorf("unable to set permissions on unix socket for Consul gRPC endpoint: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
proxy(s.ctx, s.logger, destAddr, listener)
|
|
|
|
s.cancel()
|
|
|
|
close(s.doneCh)
|
|
|
|
}()
|
|
|
|
|
|
|
|
s.runOnce = true
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// stop the proxy and blocks until the proxy has stopped. Returns an error if
|
|
|
|
// the proxy does not exit in a timely fashion.
|
|
|
|
func (s *sockProxy) stop() error {
|
|
|
|
s.cancel()
|
|
|
|
|
|
|
|
// If proxy was never run, don't wait for anything to shutdown.
|
|
|
|
if !s.runOnce {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-s.doneCh:
|
|
|
|
return nil
|
|
|
|
case <-time.After(3 * time.Second):
|
|
|
|
return fmt.Errorf("timed out waiting for proxy to exit")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Proxy between a listener and dest
|
|
|
|
func proxy(ctx context.Context, logger hclog.Logger, dest string, l net.Listener) {
|
|
|
|
// Wait for all connections to be done before exiting to prevent
|
|
|
|
// goroutine leaks.
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer func() {
|
|
|
|
// Must cancel context and close listener before waiting
|
|
|
|
cancel()
|
|
|
|
l.Close()
|
|
|
|
wg.Wait()
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Close Accept() when context is cancelled
|
|
|
|
go func() {
|
|
|
|
<-ctx.Done()
|
|
|
|
l.Close()
|
|
|
|
}()
|
|
|
|
|
|
|
|
for ctx.Err() == nil {
|
|
|
|
conn, err := l.Accept()
|
|
|
|
if err != nil {
|
|
|
|
if ctx.Err() != nil {
|
|
|
|
// Accept errors during shutdown are to be expected
|
|
|
|
return
|
|
|
|
}
|
|
|
|
logger.Error("error in grpc proxy; shutting down proxy", "error", err, "dest", dest)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
|
|
|
proxyConn(ctx, logger, dest, conn)
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// proxyConn proxies between an existing net.Conn and a destination address. If
|
|
|
|
// the destAddr starts with "unix://" it is treated as a path to a unix socket.
|
|
|
|
// Otherwise it is treated as a host for a TCP connection.
|
|
|
|
//
|
|
|
|
// When the context is cancelled proxyConn blocks until all goroutines shutdown
|
|
|
|
// to prevent leaks.
|
|
|
|
func proxyConn(ctx context.Context, logger hclog.Logger, destAddr string, conn net.Conn) {
|
|
|
|
// Close the connection when we're done with it.
|
|
|
|
defer conn.Close()
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
// Detect unix sockets
|
|
|
|
network := "tcp"
|
|
|
|
const unixPrefix = "unix://"
|
|
|
|
if strings.HasPrefix(destAddr, unixPrefix) {
|
|
|
|
network = "unix"
|
|
|
|
destAddr = destAddr[len(unixPrefix):]
|
|
|
|
}
|
|
|
|
|
|
|
|
dialer := &net.Dialer{}
|
|
|
|
dest, err := dialer.DialContext(ctx, network, destAddr)
|
|
|
|
if err == context.Canceled || err == context.DeadlineExceeded {
|
|
|
|
logger.Trace("proxy exiting gracefully", "error", err, "dest", destAddr,
|
|
|
|
"src_local", conn.LocalAddr(), "src_remote", conn.RemoteAddr())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
logger.Error("error connecting to grpc", "error", err, "dest", destAddr)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait for goroutines to exit before exiting to prevent leaking.
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
defer wg.Wait()
|
|
|
|
|
|
|
|
// socket -> gRPC
|
|
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
|
|
|
defer cancel()
|
|
|
|
n, err := io.Copy(dest, conn)
|
|
|
|
if ctx.Err() == nil && err != nil {
|
|
|
|
logger.Warn("error proxying to Consul", "error", err, "dest", destAddr,
|
|
|
|
"src_local", conn.LocalAddr(), "src_remote", conn.RemoteAddr(),
|
|
|
|
"bytes", n,
|
|
|
|
)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
logger.Trace("proxy to Consul complete",
|
|
|
|
"src_local", conn.LocalAddr(), "src_remote", conn.RemoteAddr(),
|
|
|
|
"bytes", n,
|
|
|
|
)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// gRPC -> socket
|
|
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
|
|
|
defer cancel()
|
|
|
|
n, err := io.Copy(conn, dest)
|
|
|
|
if ctx.Err() == nil && err != nil {
|
|
|
|
logger.Warn("error proxying from Consul", "error", err, "dest", destAddr,
|
|
|
|
"src_local", conn.LocalAddr(), "src_remote", conn.RemoteAddr(),
|
|
|
|
"bytes", n,
|
|
|
|
)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
logger.Trace("proxy from Consul complete",
|
|
|
|
"src_local", conn.LocalAddr(), "src_remote", conn.RemoteAddr(),
|
|
|
|
"bytes", n,
|
|
|
|
)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// When cancelled close connections to break out of copies goroutines.
|
|
|
|
<-ctx.Done()
|
|
|
|
conn.Close()
|
|
|
|
dest.Close()
|
|
|
|
}
|