open-consul/command/agent/local.go

323 lines
8.3 KiB
Go
Raw Normal View History

package agent
import (
"github.com/hashicorp/consul/consul/structs"
"reflect"
"sync"
"time"
)
const (
syncRetryIntv = 30 * time.Second
maxDelaySync = 30 * time.Second
)
// syncStatus is used to represent the difference between
// the local and remote state, and if action needs to be taken
type syncStatus struct {
remoteDelete bool // Should this be deleted from the server
inSync bool // Is this in sync with the server
}
// localState is used to represent the node's services,
// and checks. We used it to perform anti-entropy with the
// catalog representation
type localState struct {
sync.Mutex
// delaySync is used to delay the initial sync until
// the client has registered its services and checks.
delaySync chan struct{}
// Services tracks the local services
services map[string]*structs.NodeService
serviceStatus map[string]syncStatus
// Checks tracks the local checks
checks map[string]*structs.HealthCheck
checkStatus map[string]syncStatus
// triggerCh is used to inform of a change to local state
// that requires anti-entropy with the server
triggerCh chan struct{}
}
// changeMade is used to trigger an anti-entropy run
func (l *localState) changeMade() {
select {
case l.triggerCh <- struct{}{}:
default:
}
}
// RegistrationDone is called by the Agent client once base Services
// and Checks are registered. This is called to prevent a race
// between clients and the anti-entropy routines
func (a *Agent) RegistrationDone() {
select {
case a.state.delaySync <- struct{}{}:
default:
}
}
// AddService is used to add a service entry to the local state.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (a *Agent) AddService(service *structs.NodeService) {
a.state.Lock()
defer a.state.Unlock()
a.state.services[service.ID] = service
a.state.serviceStatus[service.ID] = syncStatus{}
a.state.changeMade()
}
// RemoveService is used to remove a service entry from the local state.
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) RemoveService(serviceID string) {
a.state.Lock()
defer a.state.Unlock()
delete(a.state.services, serviceID)
a.state.serviceStatus[serviceID] = syncStatus{remoteDelete: true}
a.state.changeMade()
}
// AddCheck is used to add a health check to the local state.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (a *Agent) AddCheck(check *structs.HealthCheck) {
a.state.Lock()
defer a.state.Unlock()
a.state.checks[check.CheckID] = check
a.state.checkStatus[check.CheckID] = syncStatus{}
a.state.changeMade()
}
// RemoveCheck is used to remove a health check from the local state.
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) RemoveCheck(checkID string) {
a.state.Lock()
defer a.state.Unlock()
delete(a.state.checks, checkID)
a.state.checkStatus[checkID] = syncStatus{remoteDelete: true}
a.state.changeMade()
}
// UpdateCheck is used to update the status of a check
func (a *Agent) UpdateCheck(checkID, status string) {
a.state.Lock()
defer a.state.Unlock()
check, ok := a.state.checks[checkID]
if !ok {
return
}
// Do nothing if update is idempotent
if check.Status == status {
return
}
// Update status and mark out of sync
check.Status = status
a.state.checkStatus[checkID] = syncStatus{inSync: false}
a.state.changeMade()
}
// antiEntropy is a long running method used to perform anti-entropy
// between local and remote state.
func (a *Agent) antiEntropy() {
// Delay the initial sync until client has a chance to register
select {
case <-a.state.delaySync:
case <-time.After(maxDelaySync):
a.logger.Printf("[WARN] Client failed to call RegisterDone within %v", maxDelaySync)
case <-a.shutdownCh:
return
}
SYNC:
// Sync our state with the servers
for !a.shutdown {
if err := a.setSyncState(); err != nil {
a.logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
time.Sleep(aeScale(syncRetryIntv, len(a.LANMembers())))
continue
}
break
}
// Force-trigger AE to pickup any changes
a.state.changeMade()
// Schedule the next full sync, with a random stagger
aeIntv := aeScale(a.config.AEInterval, len(a.LANMembers()))
aeIntv = aeIntv + randomStagger(aeIntv)
aeTimer := time.After(aeIntv)
// Wait for sync events
for {
select {
case <-aeTimer:
goto SYNC
case <-a.state.triggerCh:
if err := a.syncChanges(); err != nil {
a.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
}
case <-a.shutdownCh:
return
}
}
}
// setSyncState does a read of the server state, and updates
// the local syncStatus as appropriate
func (a *Agent) setSyncState() error {
req := structs.NodeSpecificRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
}
var services structs.NodeServices
var checks structs.HealthChecks
if e := a.RPC("Catalog.NodeServices", &req, &services); e != nil {
return e
}
if err := a.RPC("Health.NodeChecks", &req, &checks); err != nil {
return err
}
a.state.Lock()
defer a.state.Unlock()
for id, service := range services.Services {
// If we don't have the service locally, deregister it
existing, ok := a.state.services[id]
if !ok {
a.state.serviceStatus[id] = syncStatus{remoteDelete: true}
continue
}
// If our definition is different, we need to update it
equal := !reflect.DeepEqual(existing, service)
a.state.serviceStatus[id] = syncStatus{inSync: equal}
}
for _, check := range checks {
// If we don't have the check locally, deregister it
id := check.CheckID
existing, ok := a.state.checks[id]
if !ok {
a.state.checkStatus[id] = syncStatus{remoteDelete: true}
continue
}
// If our definition is different, we need to update it
equal := !reflect.DeepEqual(existing, check)
a.state.checkStatus[id] = syncStatus{inSync: equal}
}
return nil
}
// syncChanges is used to scan the status our local services and checks
// and update any that are out of sync with the server
func (a *Agent) syncChanges() error {
a.state.Lock()
defer a.state.Unlock()
// Sync the services
for id, status := range a.state.serviceStatus {
if status.remoteDelete {
if err := a.deleteService(id); err != nil {
return err
}
} else if !status.inSync {
if err := a.syncService(id); err != nil {
return err
}
}
}
// Sync the checks
for id, status := range a.state.checkStatus {
if status.remoteDelete {
if err := a.deleteCheck(id); err != nil {
return err
}
} else if !status.inSync {
if err := a.syncCheck(id); err != nil {
return err
}
}
}
return nil
}
// deleteService is used to delete a service from the server
func (a *Agent) deleteService(id string) error {
req := structs.DeregisterRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
ServiceID: id,
}
var out struct{}
err := a.RPC("Catalog.Deregister", &req, &out)
if err == nil {
delete(a.state.serviceStatus, id)
a.logger.Printf("[INFO] Deregistered service '%s'", id)
}
return err
}
// deleteCheck is used to delete a service from the server
func (a *Agent) deleteCheck(id string) error {
req := structs.DeregisterRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
CheckID: id,
}
var out struct{}
err := a.RPC("Catalog.Deregister", &req, &out)
if err == nil {
delete(a.state.checkStatus, id)
a.logger.Printf("[INFO] Deregistered check '%s'", id)
}
return err
}
// syncService is used to sync a service to the server
func (a *Agent) syncService(id string) error {
req := structs.RegisterRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
Address: a.config.AdvertiseAddr,
Service: a.state.services[id],
}
var out struct{}
err := a.RPC("Catalog.Register", &req, &out)
if err == nil {
a.state.serviceStatus[id] = syncStatus{inSync: true}
a.logger.Printf("[INFO] Synced service '%s'", id)
}
return err
}
// syncCheck is used to sync a service to the server
func (a *Agent) syncCheck(id string) error {
req := structs.RegisterRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
Address: a.config.AdvertiseAddr,
Check: a.state.checks[id],
}
var out struct{}
err := a.RPC("Catalog.Register", &req, &out)
if err == nil {
a.state.checkStatus[id] = syncStatus{inSync: true}
a.logger.Printf("[INFO] Synced check '%s'", id)
}
return err
}