From 83fed62a0a67a8698e0462fd8a13957085bd8b77 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 11 May 2016 15:24:37 -0700 Subject: [PATCH] Implemented registering client and server services --- client/client.go | 10 ++++- client/config/config.go | 6 +++ command/agent/agent.go | 89 ++++++++++++++++++++++++++++++++++++++++ command/agent/command.go | 9 ++-- command/agent/config.go | 51 +++++++++++++++++++++++ 5 files changed, 160 insertions(+), 5 deletions(-) diff --git a/client/client.go b/client/client.go index 6fb3ad34e..0d1b4cbcd 100644 --- a/client/client.go +++ b/client/client.go @@ -1186,7 +1186,7 @@ func (c *Client) setupConsulClient() error { return err } -// syncConsul removes services of tasks which are no longer in running state +// syncConsul removes services of tasks which are no longer in running state and func (c *Client) syncConsul() { sync := time.NewTicker(consulSyncInterval) for { @@ -1222,6 +1222,14 @@ func (c *Client) syncConsul() { } } } + + // Add the client service + clientService := &structs.Service{ + Name: c.config.ClientServiceName, + PortLabel: "clienthttpaddr", + } + services[clientService.ID("agent", "client")] = struct{}{} + if err := c.consulService.KeepServices(services); err != nil { c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err) } diff --git a/client/config/config.go b/client/config/config.go index f5a0a6b0b..64a1dd53d 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/nomad/structs" ) @@ -108,6 +109,11 @@ type Config struct { // Revision is the commit number of the Nomad client Revision string + + ClientServiceName string + + // ConsulConfig is the configuration to connect with Consul Agent + ConsulConfig *consul.ConsulConfig } func (c *Config) Copy() *Config { diff --git a/command/agent/agent.go b/command/agent/agent.go index db54da287..5932d1124 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -8,11 +8,13 @@ import ( "os" "path/filepath" "runtime" + "strconv" "sync" "time" "github.com/hashicorp/nomad/client" clientconfig "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/structs" ) @@ -27,6 +29,11 @@ type Agent struct { logger *log.Logger logOutput io.Writer + consulService *consul.ConsulService + consulConfig *consul.ConsulConfig + serverHTTPAddr string + clientHTTPAddr string + server *nomad.Server client *client.Client @@ -49,6 +56,8 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) { shutdownCh: make(chan struct{}), } + a.createConsulConfig() + if err := a.setupServer(); err != nil { return nil, err } @@ -58,6 +67,11 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) { if a.client == nil && a.server == nil { return nil, fmt.Errorf("must have at least client or server mode enabled") } + if err := a.syncAgentServicesWithConsul(a.serverHTTPAddr, a.clientHTTPAddr); err != nil { + a.logger.Printf("[ERR] agent: unable to sync agent services with consul: %v", err) + } else { + go a.consulService.PeriodicSync() + } return a, nil } @@ -140,6 +154,7 @@ func (a *Agent) serverConfig() (*nomad.Config, error) { if port := a.config.Ports.Serf; port != 0 { conf.SerfConfig.MemberlistConfig.BindPort = port } + a.serverHTTPAddr = fmt.Sprintf("%v:%v", a.config.Addresses.HTTP, a.config.Ports.HTTP) if gcThreshold := a.config.Server.NodeGCThreshold; gcThreshold != "" { dur, err := time.ParseDuration(gcThreshold) @@ -226,6 +241,7 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) { httpAddr = fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port) } conf.Node.HTTPAddr = httpAddr + a.clientHTTPAddr = httpAddr // Reserve resources on the node. r := conf.Node.Reserved @@ -242,6 +258,8 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) { conf.Version = fmt.Sprintf("%s%s", a.config.Version, a.config.VersionPrerelease) conf.Revision = a.config.Revision + conf.ConsulConfig = a.consulConfig + return conf, nil } @@ -403,6 +421,12 @@ func (a *Agent) Shutdown() error { } } + if a.consulService != nil { + if err := a.consulService.Shutdown(); err != nil { + a.logger.Printf("[ERR] agent: shutting down consul service failed: %v", err) + } + } + a.logger.Println("[INFO] agent: shutdown complete") a.shutdown = true close(a.shutdownCh) @@ -445,3 +469,68 @@ func (a *Agent) Stats() map[string]map[string]string { } return stats } + +func (a *Agent) createConsulConfig() { + cfg := &consul.ConsulConfig{ + Addr: a.config.ConsulConfig.Addr, + Token: a.config.ConsulConfig.Token, + Auth: a.config.ConsulConfig.Auth, + EnableSSL: a.config.ConsulConfig.EnableSSL, + VerifySSL: a.config.ConsulConfig.VerifySSL, + CAFile: a.config.ConsulConfig.CAFile, + CertFile: a.config.ConsulConfig.CertFile, + KeyFile: a.config.ConsulConfig.KeyFile, + } + a.consulConfig = cfg +} + +func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAddr string) error { + cs, err := consul.NewConsulService(a.consulConfig, a.logger) + if err != nil { + return err + } + a.consulService = cs + var services []*structs.Service + addrs := make(map[string]string) + if a.client != nil && a.config.ConsulConfig.ClientServiceName != "" { + if err != nil { + return err + } + clientService := &structs.Service{ + Name: a.config.ConsulConfig.ClientServiceName, + PortLabel: "clienthttpaddr", + } + addrs["clienthttpaddr"] = clientHttpAddr + services = append(services, clientService) + cs.SetTaskName("client") + } + if a.server != nil && a.config.ConsulConfig.ServerServiceName != "" { + serverService := &structs.Service{ + Name: a.config.ConsulConfig.ServerServiceName, + PortLabel: "serverhttpaddr", + } + addrs["serverhttpaddr"] = serverHttpAddr + services = append(services, serverService) + cs.SetTaskName("server") + } + + cs.SetAddrFinder(func(portLabel string) (string, int) { + addr := addrs[portLabel] + if addr == "" { + return "", 0 + } + host, port, err := net.SplitHostPort(addr) + if err != nil { + return "", 0 + } + p, err := strconv.Atoi(port) + if err != nil { + return "", 0 + } + return host, p + }) + + cs.SetAllocID("agent") + + return cs.SyncServices(services) +} diff --git a/command/agent/command.go b/command/agent/command.go index 1ce19a933..d414808fc 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -58,10 +58,11 @@ func (c *Command) readConfig() *Config { // Make a new, empty config. cmdConfig := &Config{ - Atlas: &AtlasConfig{}, - Client: &ClientConfig{}, - Ports: &Ports{}, - Server: &ServerConfig{}, + Atlas: &AtlasConfig{}, + ConsulConfig: &ConsulConfig{}, + Client: &ClientConfig{}, + Ports: &Ports{}, + Server: &ServerConfig{}, } flags := flag.NewFlagSet("agent", flag.ContinueOnError) diff --git a/command/agent/config.go b/command/agent/config.go index 76079483d..c58345b5c 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -563,6 +563,14 @@ func (c *Config) Merge(b *Config) *Config { result.Atlas = result.Atlas.Merge(b.Atlas) } + // Apply the Consul Configuration + if result.ConsulConfig == nil && b.ConsulConfig != nil { + consulConfig := *b.ConsulConfig + result.ConsulConfig = &consulConfig + } else if b.ConsulConfig != nil { + result.ConsulConfig = result.ConsulConfig.Merge(b.ConsulConfig) + } + // Merge config files lists result.Files = append(result.Files, b.Files...) @@ -761,6 +769,49 @@ func (a *AtlasConfig) Merge(b *AtlasConfig) *AtlasConfig { return &result } +// Merge merges two Consul Configurations together. +func (a *ConsulConfig) Merge(b *ConsulConfig) *ConsulConfig { + result := *a + + if b.ServerServiceName != "" { + result.ServerServiceName = b.ServerServiceName + } + if b.ClientServiceName != "" { + result.ClientServiceName = b.ClientServiceName + } + if b.Addr != "" { + result.Addr = b.Addr + } + if b.Token != "" { + result.Token = b.Token + } + if b.Auth != "" { + result.Auth = b.Auth + } + if b.EnableSSL { + result.EnableSSL = true + } + if b.VerifySSL { + result.VerifySSL = true + } + if b.CAFile != "" { + result.CAFile = b.CAFile + } + if b.CertFile != "" { + result.CertFile = b.CertFile + } + if b.KeyFile != "" { + result.KeyFile = b.KeyFile + } + if b.ServerAutoJoin { + result.ServerAutoJoin = true + } + if b.ClientAutoJoin { + result.ClientAutoJoin = true + } + return &result +} + func (r *Resources) Merge(b *Resources) *Resources { result := *r if b.CPU != 0 {