Implemented registering client and server services

This commit is contained in:
Diptanu Choudhury 2016-05-11 15:24:37 -07:00
parent d9497632ed
commit 83fed62a0a
5 changed files with 160 additions and 5 deletions

View File

@ -1186,7 +1186,7 @@ func (c *Client) setupConsulClient() error {
return err
}
// syncConsul removes services of tasks which are no longer in running state
// syncConsul removes services of tasks which are no longer in running state and
func (c *Client) syncConsul() {
sync := time.NewTicker(consulSyncInterval)
for {
@ -1222,6 +1222,14 @@ func (c *Client) syncConsul() {
}
}
}
// Add the client service
clientService := &structs.Service{
Name: c.config.ClientServiceName,
PortLabel: "clienthttpaddr",
}
services[clientService.ID("agent", "client")] = struct{}{}
if err := c.consulService.KeepServices(services); err != nil {
c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err)
}

View File

@ -7,6 +7,7 @@ import (
"strings"
"time"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -108,6 +109,11 @@ type Config struct {
// Revision is the commit number of the Nomad client
Revision string
ClientServiceName string
// ConsulConfig is the configuration to connect with Consul Agent
ConsulConfig *consul.ConsulConfig
}
func (c *Config) Copy() *Config {

View File

@ -8,11 +8,13 @@ import (
"os"
"path/filepath"
"runtime"
"strconv"
"sync"
"time"
"github.com/hashicorp/nomad/client"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -27,6 +29,11 @@ type Agent struct {
logger *log.Logger
logOutput io.Writer
consulService *consul.ConsulService
consulConfig *consul.ConsulConfig
serverHTTPAddr string
clientHTTPAddr string
server *nomad.Server
client *client.Client
@ -49,6 +56,8 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) {
shutdownCh: make(chan struct{}),
}
a.createConsulConfig()
if err := a.setupServer(); err != nil {
return nil, err
}
@ -58,6 +67,11 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) {
if a.client == nil && a.server == nil {
return nil, fmt.Errorf("must have at least client or server mode enabled")
}
if err := a.syncAgentServicesWithConsul(a.serverHTTPAddr, a.clientHTTPAddr); err != nil {
a.logger.Printf("[ERR] agent: unable to sync agent services with consul: %v", err)
} else {
go a.consulService.PeriodicSync()
}
return a, nil
}
@ -140,6 +154,7 @@ func (a *Agent) serverConfig() (*nomad.Config, error) {
if port := a.config.Ports.Serf; port != 0 {
conf.SerfConfig.MemberlistConfig.BindPort = port
}
a.serverHTTPAddr = fmt.Sprintf("%v:%v", a.config.Addresses.HTTP, a.config.Ports.HTTP)
if gcThreshold := a.config.Server.NodeGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
@ -226,6 +241,7 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) {
httpAddr = fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port)
}
conf.Node.HTTPAddr = httpAddr
a.clientHTTPAddr = httpAddr
// Reserve resources on the node.
r := conf.Node.Reserved
@ -242,6 +258,8 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) {
conf.Version = fmt.Sprintf("%s%s", a.config.Version, a.config.VersionPrerelease)
conf.Revision = a.config.Revision
conf.ConsulConfig = a.consulConfig
return conf, nil
}
@ -403,6 +421,12 @@ func (a *Agent) Shutdown() error {
}
}
if a.consulService != nil {
if err := a.consulService.Shutdown(); err != nil {
a.logger.Printf("[ERR] agent: shutting down consul service failed: %v", err)
}
}
a.logger.Println("[INFO] agent: shutdown complete")
a.shutdown = true
close(a.shutdownCh)
@ -445,3 +469,68 @@ func (a *Agent) Stats() map[string]map[string]string {
}
return stats
}
func (a *Agent) createConsulConfig() {
cfg := &consul.ConsulConfig{
Addr: a.config.ConsulConfig.Addr,
Token: a.config.ConsulConfig.Token,
Auth: a.config.ConsulConfig.Auth,
EnableSSL: a.config.ConsulConfig.EnableSSL,
VerifySSL: a.config.ConsulConfig.VerifySSL,
CAFile: a.config.ConsulConfig.CAFile,
CertFile: a.config.ConsulConfig.CertFile,
KeyFile: a.config.ConsulConfig.KeyFile,
}
a.consulConfig = cfg
}
func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAddr string) error {
cs, err := consul.NewConsulService(a.consulConfig, a.logger)
if err != nil {
return err
}
a.consulService = cs
var services []*structs.Service
addrs := make(map[string]string)
if a.client != nil && a.config.ConsulConfig.ClientServiceName != "" {
if err != nil {
return err
}
clientService := &structs.Service{
Name: a.config.ConsulConfig.ClientServiceName,
PortLabel: "clienthttpaddr",
}
addrs["clienthttpaddr"] = clientHttpAddr
services = append(services, clientService)
cs.SetTaskName("client")
}
if a.server != nil && a.config.ConsulConfig.ServerServiceName != "" {
serverService := &structs.Service{
Name: a.config.ConsulConfig.ServerServiceName,
PortLabel: "serverhttpaddr",
}
addrs["serverhttpaddr"] = serverHttpAddr
services = append(services, serverService)
cs.SetTaskName("server")
}
cs.SetAddrFinder(func(portLabel string) (string, int) {
addr := addrs[portLabel]
if addr == "" {
return "", 0
}
host, port, err := net.SplitHostPort(addr)
if err != nil {
return "", 0
}
p, err := strconv.Atoi(port)
if err != nil {
return "", 0
}
return host, p
})
cs.SetAllocID("agent")
return cs.SyncServices(services)
}

View File

@ -59,6 +59,7 @@ func (c *Command) readConfig() *Config {
// Make a new, empty config.
cmdConfig := &Config{
Atlas: &AtlasConfig{},
ConsulConfig: &ConsulConfig{},
Client: &ClientConfig{},
Ports: &Ports{},
Server: &ServerConfig{},

View File

@ -563,6 +563,14 @@ func (c *Config) Merge(b *Config) *Config {
result.Atlas = result.Atlas.Merge(b.Atlas)
}
// Apply the Consul Configuration
if result.ConsulConfig == nil && b.ConsulConfig != nil {
consulConfig := *b.ConsulConfig
result.ConsulConfig = &consulConfig
} else if b.ConsulConfig != nil {
result.ConsulConfig = result.ConsulConfig.Merge(b.ConsulConfig)
}
// Merge config files lists
result.Files = append(result.Files, b.Files...)
@ -761,6 +769,49 @@ func (a *AtlasConfig) Merge(b *AtlasConfig) *AtlasConfig {
return &result
}
// Merge merges two Consul Configurations together.
func (a *ConsulConfig) Merge(b *ConsulConfig) *ConsulConfig {
result := *a
if b.ServerServiceName != "" {
result.ServerServiceName = b.ServerServiceName
}
if b.ClientServiceName != "" {
result.ClientServiceName = b.ClientServiceName
}
if b.Addr != "" {
result.Addr = b.Addr
}
if b.Token != "" {
result.Token = b.Token
}
if b.Auth != "" {
result.Auth = b.Auth
}
if b.EnableSSL {
result.EnableSSL = true
}
if b.VerifySSL {
result.VerifySSL = true
}
if b.CAFile != "" {
result.CAFile = b.CAFile
}
if b.CertFile != "" {
result.CertFile = b.CertFile
}
if b.KeyFile != "" {
result.KeyFile = b.KeyFile
}
if b.ServerAutoJoin {
result.ServerAutoJoin = true
}
if b.ClientAutoJoin {
result.ClientAutoJoin = true
}
return &result
}
func (r *Resources) Merge(b *Resources) *Resources {
result := *r
if b.CPU != 0 {