package agent import ( "fmt" "io" "log" "net" "os" "strconv" "sync" "github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/serf/serf" ) /* The agent is the long running process that is run on every machine. It exposes an RPC interface that is used by the CLI to control the agent. The agent runs the query interfaces like HTTP, DNS, and RPC. However, it can run in either a client, or server mode. In server mode, it runs a full Consul server. In client-only mode, it only forwards requests to other Consul servers. */ type Agent struct { config *Config // Used for writing our logs logger *log.Logger // Output sink for logs logOutput io.Writer // We have one of a client or a server, depending // on our configuration server *consul.Server client *consul.Client // state stores a local representation of the node, // services and checks. Used for anti-entropy. state localState // checkMonitors maps the check ID to an associated monitor // checkTTLs maps the check ID to an associated check TTL // checkLock protects updates to either checkMonitors map[string]*CheckMonitor checkTTLs map[string]*CheckTTL checkLock sync.Mutex // eventCh is used to receive user events eventCh chan serf.UserEvent // eventBuf stores the most recent events in a ring buffer // using eventIndex as the next index to insert into. This // is guarded by eventLock. When an insert happens, the // eventNotify group is notified. eventBuf []*UserEvent eventIndex int eventLock sync.RWMutex eventNotify consul.NotifyGroup shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex } // Create is used to create a new Agent. Returns // the agent or potentially an error. func Create(config *Config, logOutput io.Writer) (*Agent, error) { // Ensure we have a log sink if logOutput == nil { logOutput = os.Stderr } // Validate the config if config.Datacenter == "" { return nil, fmt.Errorf("Must configure a Datacenter") } if config.DataDir == "" { return nil, fmt.Errorf("Must configure a DataDir") } // Try to get an advertise address if config.AdvertiseAddr != "" { if ip := net.ParseIP(config.AdvertiseAddr); ip == nil { return nil, fmt.Errorf("Failed to parse advertise address: %v", config.AdvertiseAddr) } } else if config.BindAddr != "0.0.0.0" && config.BindAddr != "" { config.AdvertiseAddr = config.BindAddr } else { ip, err := consul.GetPrivateIP() if err != nil { return nil, fmt.Errorf("Failed to get advertise address: %v", err) } config.AdvertiseAddr = ip.String() } agent := &Agent{ config: config, logger: log.New(logOutput, "", log.LstdFlags), logOutput: logOutput, checkMonitors: make(map[string]*CheckMonitor), checkTTLs: make(map[string]*CheckTTL), eventCh: make(chan serf.UserEvent, 1024), eventBuf: make([]*UserEvent, 256), shutdownCh: make(chan struct{}), } // Initialize the local state agent.state.Init(config, agent.logger) // Setup either the client or the server var err error if config.Server { err = agent.setupServer() agent.state.SetIface(agent.server) // Automatically register the "consul" service on server nodes consulService := structs.NodeService{ Service: consul.ConsulServiceName, ID: consul.ConsulServiceID, Port: agent.config.Ports.Server, Tags: []string{}, } agent.state.AddService(&consulService) } else { err = agent.setupClient() agent.state.SetIface(agent.client) } if err != nil { return nil, err } // Start handling events go agent.handleEvents() // Write out the PID file if necessary err = agent.storePid() if err != nil { return nil, err } return agent, nil } // consulConfig is used to return a consul configuration func (a *Agent) consulConfig() *consul.Config { // Start with the provided config or default config var base *consul.Config if a.config.ConsulConfig != nil { base = a.config.ConsulConfig } else { base = consul.DefaultConfig() } // Override with our config if a.config.Datacenter != "" { base.Datacenter = a.config.Datacenter } if a.config.DataDir != "" { base.DataDir = a.config.DataDir } if a.config.EncryptKey != "" { key, _ := a.config.EncryptBytes() base.SerfLANConfig.MemberlistConfig.SecretKey = key base.SerfWANConfig.MemberlistConfig.SecretKey = key } if a.config.NodeName != "" { base.NodeName = a.config.NodeName } if a.config.BindAddr != "" { base.SerfLANConfig.MemberlistConfig.BindAddr = a.config.BindAddr base.SerfWANConfig.MemberlistConfig.BindAddr = a.config.BindAddr } if a.config.Ports.SerfLan != 0 { base.SerfLANConfig.MemberlistConfig.BindPort = a.config.Ports.SerfLan base.SerfLANConfig.MemberlistConfig.AdvertisePort = a.config.Ports.SerfLan } if a.config.Ports.SerfWan != 0 { base.SerfWANConfig.MemberlistConfig.BindPort = a.config.Ports.SerfWan base.SerfWANConfig.MemberlistConfig.AdvertisePort = a.config.Ports.SerfWan } if a.config.BindAddr != "" { bindAddr := &net.TCPAddr{ IP: net.ParseIP(a.config.BindAddr), Port: a.config.Ports.Server, } base.RPCAddr = bindAddr } if a.config.AdvertiseAddr != "" { base.SerfLANConfig.MemberlistConfig.AdvertiseAddr = a.config.AdvertiseAddr base.SerfWANConfig.MemberlistConfig.AdvertiseAddr = a.config.AdvertiseAddr base.RPCAdvertise = &net.TCPAddr{ IP: net.ParseIP(a.config.AdvertiseAddr), Port: a.config.Ports.Server, } } if a.config.Bootstrap { base.Bootstrap = true } if a.config.RejoinAfterLeave { base.RejoinAfterLeave = true } if a.config.BootstrapExpect != 0 { base.BootstrapExpect = a.config.BootstrapExpect } if a.config.Protocol > 0 { base.ProtocolVersion = uint8(a.config.Protocol) } if a.config.ACLToken != "" { base.ACLToken = a.config.ACLToken } if a.config.ACLMasterToken != "" { base.ACLMasterToken = a.config.ACLMasterToken } if a.config.ACLDatacenter != "" { base.ACLDatacenter = a.config.ACLDatacenter } if a.config.ACLTTLRaw != "" { base.ACLTTL = a.config.ACLTTL } if a.config.ACLDefaultPolicy != "" { base.ACLDefaultPolicy = a.config.ACLDefaultPolicy } if a.config.ACLDownPolicy != "" { base.ACLDownPolicy = a.config.ACLDownPolicy } // Format the build string revision := a.config.Revision if len(revision) > 8 { revision = revision[:8] } base.Build = fmt.Sprintf("%s%s:%s", a.config.Version, a.config.VersionPrerelease, revision) // Copy the TLS configuration base.VerifyIncoming = a.config.VerifyIncoming base.VerifyOutgoing = a.config.VerifyOutgoing base.CAFile = a.config.CAFile base.CertFile = a.config.CertFile base.KeyFile = a.config.KeyFile base.ServerName = a.config.ServerName // Setup the ServerUp callback base.ServerUp = a.state.ConsulServerUp // Setup the user event callback base.UserEventHandler = func(e serf.UserEvent) { select { case a.eventCh <- e: case <-a.shutdownCh: } } // Setup the loggers base.LogOutput = a.logOutput return base } // setupServer is used to initialize the Consul server func (a *Agent) setupServer() error { server, err := consul.NewServer(a.consulConfig()) if err != nil { return fmt.Errorf("Failed to start Consul server: %v", err) } a.server = server return nil } // setupClient is used to initialize the Consul client func (a *Agent) setupClient() error { client, err := consul.NewClient(a.consulConfig()) if err != nil { return fmt.Errorf("Failed to start Consul client: %v", err) } a.client = client return nil } // RPC is used to make an RPC call to the Consul servers // This allows the agent to implement the Consul.Interface func (a *Agent) RPC(method string, args interface{}, reply interface{}) error { if a.server != nil { return a.server.RPC(method, args, reply) } return a.client.RPC(method, args, reply) } // Leave is used to prepare the agent for a graceful shutdown func (a *Agent) Leave() error { if a.server != nil { return a.server.Leave() } else { return a.client.Leave() } } // Shutdown is used to hard stop the agent. Should be // preceeded by a call to Leave to do it gracefully. func (a *Agent) Shutdown() error { a.shutdownLock.Lock() defer a.shutdownLock.Unlock() if a.shutdown { return nil } // Stop all the checks a.checkLock.Lock() defer a.checkLock.Unlock() for _, chk := range a.checkMonitors { chk.Stop() } for _, chk := range a.checkTTLs { chk.Stop() } a.logger.Println("[INFO] agent: requesting shutdown") var err error if a.server != nil { err = a.server.Shutdown() } else { err = a.client.Shutdown() } pidErr := a.deletePid() if pidErr != nil { a.logger.Println("[WARN] agent: could not delete pid file ", pidErr) } a.logger.Println("[INFO] agent: shutdown complete") a.shutdown = true close(a.shutdownCh) return err } // ShutdownCh is used to return a channel that can be // selected to wait for the agent to perform a shutdown. func (a *Agent) ShutdownCh() <-chan struct{} { return a.shutdownCh } // JoinLAN is used to have the agent join a LAN cluster func (a *Agent) JoinLAN(addrs []string) (n int, err error) { a.logger.Printf("[INFO] agent: (LAN) joining: %v", addrs) if a.server != nil { n, err = a.server.JoinLAN(addrs) } else { n, err = a.client.JoinLAN(addrs) } a.logger.Printf("[INFO] agent: (LAN) joined: %d Err: %v", n, err) return } // JoinWAN is used to have the agent join a WAN cluster func (a *Agent) JoinWAN(addrs []string) (n int, err error) { a.logger.Printf("[INFO] agent: (WAN) joining: %v", addrs) if a.server != nil { n, err = a.server.JoinWAN(addrs) } else { err = fmt.Errorf("Must be a server to join WAN cluster") } a.logger.Printf("[INFO] agent: (WAN) joined: %d Err: %v", n, err) return } // ForceLeave is used to remove a failed node from the cluster func (a *Agent) ForceLeave(node string) (err error) { a.logger.Printf("[INFO] Force leaving node: %v", node) if a.server != nil { err = a.server.RemoveFailedNode(node) } else { err = a.client.RemoveFailedNode(node) } if err != nil { a.logger.Printf("[WARN] Failed to remove node: %v", err) } return err } // LocalMember is used to return the local node func (a *Agent) LocalMember() serf.Member { if a.server != nil { return a.server.LocalMember() } else { return a.client.LocalMember() } } // LANMembers is used to retrieve the LAN members func (a *Agent) LANMembers() []serf.Member { if a.server != nil { return a.server.LANMembers() } else { return a.client.LANMembers() } } // WANMembers is used to retrieve the WAN members func (a *Agent) WANMembers() []serf.Member { if a.server != nil { return a.server.WANMembers() } else { return nil } } // StartSync is called once Services and Checks are registered. // This is called to prevent a race between clients and the anti-entropy routines func (a *Agent) StartSync() { // Start the anti entropy routine go a.state.antiEntropy(a.shutdownCh) } // PauseSync is used to pause anti-entropy while bulk changes are make func (a *Agent) PauseSync() { a.state.Pause() } // ResumeSync is used to unpause anti-entropy after bulk changes are make func (a *Agent) ResumeSync() { a.state.Resume() } // AddService is used to add a service entry. // This entry is persistent and the agent will make a best effort to // ensure it is registered func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType) error { if service.Service == "" { return fmt.Errorf("Service name missing") } if service.ID == "" && service.Service != "" { service.ID = service.Service } if chkType != nil && !chkType.Valid() { return fmt.Errorf("Check type is not valid") } // Add the service a.state.AddService(service) // Create an associated health check if chkType != nil { check := &structs.HealthCheck{ Node: a.config.NodeName, CheckID: fmt.Sprintf("service:%s", service.ID), Name: fmt.Sprintf("Service '%s' check", service.Service), Status: structs.HealthCritical, Notes: chkType.Notes, ServiceID: service.ID, ServiceName: service.Service, } if err := a.AddCheck(check, chkType); err != nil { return err } } return nil } // RemoveService is used to remove a service entry. // The agent will make a best effort to ensure it is deregistered func (a *Agent) RemoveService(serviceID string) error { // Protect "consul" service from deletion by a user if a.server != nil && serviceID == consul.ConsulServiceID { return fmt.Errorf( "Deregistering the %s service is not allowed", consul.ConsulServiceID) } // Remove service immeidately a.state.RemoveService(serviceID) // Deregister any associated health checks checkID := fmt.Sprintf("service:%s", serviceID) return a.RemoveCheck(checkID) } // AddCheck is used to add a health check to the agent. // This entry is persistent and the agent will make a best effort to // ensure it is registered. The Check may include a CheckType which // is used to automatically update the check status func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType) error { if check.CheckID == "" { return fmt.Errorf("CheckID missing") } if chkType != nil && !chkType.Valid() { return fmt.Errorf("Check type is not valid") } a.checkLock.Lock() defer a.checkLock.Unlock() // Check if already registered if chkType != nil { if chkType.IsTTL() { if existing, ok := a.checkTTLs[check.CheckID]; ok { existing.Stop() } ttl := &CheckTTL{ Notify: &a.state, CheckID: check.CheckID, TTL: chkType.TTL, Logger: a.logger, } ttl.Start() a.checkTTLs[check.CheckID] = ttl } else { if existing, ok := a.checkMonitors[check.CheckID]; ok { existing.Stop() } if chkType.Interval < MinInterval { a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v", check.CheckID, MinInterval)) chkType.Interval = MinInterval } monitor := &CheckMonitor{ Notify: &a.state, CheckID: check.CheckID, Script: chkType.Script, Interval: chkType.Interval, Logger: a.logger, } monitor.Start() a.checkMonitors[check.CheckID] = monitor } } // Add to the local state for anti-entropy a.state.AddCheck(check) return nil } // RemoveCheck is used to remove a health check. // The agent will make a best effort to ensure it is deregistered func (a *Agent) RemoveCheck(checkID string) error { // Add to the local state for anti-entropy a.state.RemoveCheck(checkID) a.checkLock.Lock() defer a.checkLock.Unlock() // Stop any monitors if check, ok := a.checkMonitors[checkID]; ok { check.Stop() delete(a.checkMonitors, checkID) } if check, ok := a.checkTTLs[checkID]; ok { check.Stop() delete(a.checkTTLs, checkID) } return nil } // UpdateCheck is used to update the status of a check. // This can only be used with checks of the TTL type. func (a *Agent) UpdateCheck(checkID, status, output string) error { a.checkLock.Lock() defer a.checkLock.Unlock() check, ok := a.checkTTLs[checkID] if !ok { return fmt.Errorf("CheckID does not have associated TTL") } // Set the status through CheckTTL to reset the TTL check.SetStatus(status, output) return nil } // Stats is used to get various debugging state from the sub-systems func (a *Agent) Stats() map[string]map[string]string { toString := func(v uint64) string { return strconv.FormatUint(v, 10) } var stats map[string]map[string]string if a.server != nil { stats = a.server.Stats() } else { stats = a.client.Stats() } stats["agent"] = map[string]string{ "check_monitors": toString(uint64(len(a.checkMonitors))), "check_ttls": toString(uint64(len(a.checkTTLs))), "checks": toString(uint64(len(a.state.checks))), "services": toString(uint64(len(a.state.services))), } revision := a.config.Revision if len(revision) > 8 { revision = revision[:8] } stats["build"] = map[string]string{ "revision": revision, "version": a.config.Version, "prerelease": a.config.VersionPrerelease, } return stats } // storePid is used to write out our PID to a file if necessary func (a *Agent) storePid() error { // Quit fast if no pidfile pidPath := a.config.PidFile if pidPath == "" { return nil } // Open the PID file pidFile, err := os.OpenFile(pidPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) if err != nil { return fmt.Errorf("Could not open pid file: %v", err) } defer pidFile.Close() // Write out the PID pid := os.Getpid() _, err = pidFile.WriteString(fmt.Sprintf("%d", pid)) if err != nil { return fmt.Errorf("Could not write to pid file: %s", err) } return nil } // deletePid is used to delete our PID on exit func (a *Agent) deletePid() error { // Quit fast if no pidfile pidPath := a.config.PidFile if pidPath == "" { return nil } stat, err := os.Stat(pidPath) if err != nil { return fmt.Errorf("Could not remove pid file: %s", err) } if stat.IsDir() { return fmt.Errorf("Specified pid file path is directory") } err = os.Remove(pidPath) if err != nil { return fmt.Errorf("Could not remove pid file: %s", err) } return nil }