package agent import ( "fmt" "github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/serf/serf" "io" "log" "net" "os" "sync" ) /* The agent is the long running process that is run on every machine. It exposes an RPC interface that is used by the CLI to control the agent. The agent runs the query interfaces like HTTP, DNS, and RPC. However, it can run in either a client, or server mode. In server mode, it runs a full Consul server. In client-only mode, it only forwards requests to other Consul servers. */ type Agent struct { config *Config // Used for writing our logs logger *log.Logger // Output sink for logs logOutput io.Writer // We have one of a client or a server, depending // on our configuration server *consul.Server client *consul.Client shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex // state stores a local representation of the node, // services and checks. Used for anti-entropy. state localState } // Create is used to create a new Agent. Returns // the agent or potentially an error. func Create(config *Config, logOutput io.Writer) (*Agent, error) { // Ensure we have a log sink if logOutput == nil { logOutput = os.Stderr } // Validate the config if config.Datacenter == "" { return nil, fmt.Errorf("Must configure a Datacenter") } if config.DataDir == "" { return nil, fmt.Errorf("Must configure a DataDir") } // Ensure the RPC Addr is sane if _, err := net.ResolveTCPAddr("tcp", config.ServerAddr); err != nil { return nil, fmt.Errorf("Bad server address: %v", err) } // Try to get an advertise address if config.AdvertiseAddr != "" { if ip := net.ParseIP(config.AdvertiseAddr); ip == nil { return nil, fmt.Errorf("Failed to parse advertise address: %v", config.AdvertiseAddr) } } else { ip, err := consul.GetPrivateIP() if err != nil { return nil, fmt.Errorf("Failed to get advertise address: %v", err) } config.AdvertiseAddr = ip.IP.String() } agent := &Agent{ config: config, logger: log.New(logOutput, "", log.LstdFlags), logOutput: logOutput, shutdownCh: make(chan struct{}), state: localState{ delaySync: make(chan struct{}, 1), services: make(map[string]*structs.NodeService), serviceStatus: make(map[string]syncStatus), checks: make(map[string]*structs.HealthCheck), checkStatus: make(map[string]syncStatus), triggerCh: make(chan struct{}, 1), }, } // Setup either the client or the server var err error if config.Server { err = agent.setupServer() } else { err = agent.setupClient() } if err != nil { return nil, err } // Start the anti entropy routine go agent.antiEntropy() return agent, nil } // consulConfig is used to return a consul configuration func (a *Agent) consulConfig() *consul.Config { // Start with the provided config or default config var base *consul.Config if a.config.ConsulConfig != nil { base = a.config.ConsulConfig } else { base = consul.DefaultConfig() } // Override with our config if a.config.Datacenter != "" { base.Datacenter = a.config.Datacenter } if a.config.DataDir != "" { base.DataDir = a.config.DataDir } if a.config.EncryptKey != "" { key, _ := a.config.EncryptBytes() base.SerfLANConfig.MemberlistConfig.SecretKey = key base.SerfWANConfig.MemberlistConfig.SecretKey = key } if a.config.NodeName != "" { base.NodeName = a.config.NodeName } if a.config.SerfBindAddr != "" { base.SerfLANConfig.MemberlistConfig.BindAddr = a.config.SerfBindAddr base.SerfWANConfig.MemberlistConfig.BindAddr = a.config.SerfBindAddr } if a.config.SerfLanPort != 0 { base.SerfLANConfig.MemberlistConfig.BindPort = a.config.SerfLanPort base.SerfLANConfig.MemberlistConfig.AdvertisePort = a.config.SerfLanPort } if a.config.SerfWanPort != 0 { base.SerfWANConfig.MemberlistConfig.BindPort = a.config.SerfWanPort base.SerfWANConfig.MemberlistConfig.AdvertisePort = a.config.SerfWanPort } if a.config.ServerAddr != "" { addr, _ := net.ResolveTCPAddr("tcp", a.config.ServerAddr) base.RPCAddr = addr } if a.config.AdvertiseAddr != "" { base.SerfLANConfig.MemberlistConfig.AdvertiseAddr = a.config.AdvertiseAddr base.SerfWANConfig.MemberlistConfig.AdvertiseAddr = a.config.AdvertiseAddr base.RPCAdvertise = &net.TCPAddr{ IP: net.ParseIP(a.config.AdvertiseAddr), Port: base.RPCAddr.Port, } } if a.config.Bootstrap { base.Bootstrap = true } // Setup the loggers base.LogOutput = a.logOutput return base } // setupServer is used to initialize the Consul server func (a *Agent) setupServer() error { server, err := consul.NewServer(a.consulConfig()) if err != nil { return fmt.Errorf("Failed to start Consul server: %v", err) } a.server = server return nil } // setupClient is used to initialize the Consul client func (a *Agent) setupClient() error { client, err := consul.NewClient(a.consulConfig()) if err != nil { return fmt.Errorf("Failed to start Consul client: %v", err) } a.client = client return nil } // RPC is used to make an RPC call to the Consul servers // This allows the agent to implement the Consul.Interface func (a *Agent) RPC(method string, args interface{}, reply interface{}) error { if a.server != nil { return a.server.RPC(method, args, reply) } return a.client.RPC(method, args, reply) } // Leave prepares the agent for a graceful shutdown func (a *Agent) Leave() error { if a.server != nil { return a.server.Leave() } else { return a.client.Leave() } } // Shutdown is used to hard stop the agent. Should be preceeded // by a call to Leave to do it gracefully. func (a *Agent) Shutdown() error { a.shutdownLock.Lock() defer a.shutdownLock.Unlock() if a.shutdown { return nil } a.logger.Println("[INFO] agent: requesting shutdown") var err error if a.server != nil { err = a.server.Shutdown() } else { err = a.client.Shutdown() } a.logger.Println("[INFO] agent: shutdown complete") a.shutdown = true close(a.shutdownCh) return err } // ShutdownCh returns a channel that can be selected to wait // for the agent to perform a shutdown. func (a *Agent) ShutdownCh() <-chan struct{} { return a.shutdownCh } // JoinLAN is used to have the agent join a LAN cluster func (a *Agent) JoinLAN(addrs []string) (n int, err error) { a.logger.Printf("[INFO] agent: (LAN) joining: %v", addrs) if a.server != nil { n, err = a.server.JoinLAN(addrs) } else { n, err = a.client.JoinLAN(addrs) } a.logger.Printf("[INFO] agent: (LAN) joined: %d Err: %v", n, err) return } // JoinWAN is used to have the agent join a WAN cluster func (a *Agent) JoinWAN(addrs []string) (n int, err error) { a.logger.Printf("[INFO] agent: (WAN) joining: %v", addrs) if a.server != nil { n, err = a.server.JoinWAN(addrs) } else { err = fmt.Errorf("Must be a server to join WAN cluster") } a.logger.Printf("[INFO] agent: (WAN) joined: %d Err: %v", n, err) return } // ForceLeave is used to remove a failed node from the cluster func (a *Agent) ForceLeave(node string) (err error) { a.logger.Printf("[INFO] Force leaving node: %v", node) if a.server != nil { err = a.server.RemoveFailedNode(node) } else { err = a.client.RemoveFailedNode(node) } if err != nil { a.logger.Printf("[WARN] Failed to remove node: %v", err) } return err } // Used to retrieve the LAN members func (a *Agent) LANMembers() []serf.Member { if a.server != nil { return a.server.LANMembers() } else { return a.client.LANMembers() } } // Used to retrieve the WAN members func (a *Agent) WANMembers() []serf.Member { if a.server != nil { return a.server.WANMembers() } else { return nil } }