Add the service registration manager to the agent

This commit is contained in:
Kyle Havlovitz 2019-04-17 21:35:19 -07:00
parent d51fd740bf
commit 6aa022c1cd
5 changed files with 336 additions and 0 deletions

View File

@ -243,6 +243,8 @@ type Agent struct {
// directly.
proxyConfig *proxycfg.Manager
serviceManager *ServiceManager
// xdsServer is the Server instance that serves xDS gRPC API.
xdsServer *xds.Server
@ -473,6 +475,9 @@ func (a *Agent) Start() error {
}
}()
// Start the service registration manager.
a.serviceManager = NewServiceManager(a)
// Start watching for critical services to deregister, based on their
// checks.
go a.reapServices()
@ -1892,6 +1897,7 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error {
func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
a.serviceManager.AddService(service, chkTypes, persist, token, source)
return a.addServiceLocked(service, chkTypes, persist, token, source)
}
@ -2055,6 +2061,7 @@ func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.Check
func (a *Agent) RemoveService(serviceID string, persist bool) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
a.serviceManager.RemoveService(serviceID)
return a.removeServiceLocked(serviceID, persist)
}

View File

@ -0,0 +1,52 @@
package cachetype
import (
"fmt"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
// Recommended name for registration.
const ResolvedServiceConfigName = "resolved-service-config"
// ResolvedServiceConfig supports fetching the config for a service resolved from
// the global proxy defaults and the centrally registered service config.
type ResolvedServiceConfig struct {
RPC RPC
}
func (c *ResolvedServiceConfig) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
// The request should be a ServiceConfigRequest.
reqReal, ok := req.(*structs.ServiceConfigRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
// Allways allow stale - there's no point in hitting leader if the request is
// going to be served from cache and endup arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true
// Fetch
var reply structs.ServiceConfigResponse
if err := c.RPC.RPC("ConfigEntry.ResolveServiceConfig", reqReal, &reply); err != nil {
return result, err
}
result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *ResolvedServiceConfig) SupportsBlocking() bool {
return true
}

View File

@ -0,0 +1,67 @@
package cachetype
import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestResolvedServiceConfig(t *testing.T) {
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ := &ResolvedServiceConfig{RPC: rpc}
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.ServiceConfigResponse
rpc.On("RPC", "ConfigEntry.ResolveServiceConfig", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceConfigRequest)
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal("foo", req.Name)
require.True(req.AllowStale)
reply := args.Get(2).(*structs.ServiceConfigResponse)
reply.Definition = structs.ServiceDefinition{
ID: "1234",
Name: "foo",
}
reply.QueryMeta.Index = 48
resp = reply
})
// Fetch
resultA, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.ServiceConfigRequest{
Datacenter: "dc1",
Name: "foo",
})
require.NoError(err)
require.Equal(cache.FetchResult{
Value: resp,
Index: 48,
}, resultA)
}
func TestResolvedServiceConfig_badReqType(t *testing.T) {
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ := &ResolvedServiceConfig{RPC: rpc}
// Fetch
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
require.Error(err)
require.Contains(err.Error(), "wrong type")
}

183
agent/service_manager.go Normal file
View File

@ -0,0 +1,183 @@
package agent
import (
"fmt"
"sync"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
"golang.org/x/net/context"
)
type ServiceManager struct {
services map[string]*serviceConfigWatch
agent *Agent
sync.Mutex
}
func NewServiceManager(agent *Agent) *ServiceManager {
return &ServiceManager{
services: make(map[string]*serviceConfigWatch),
agent: agent,
}
}
func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) {
s.Lock()
defer s.Unlock()
reg := serviceRegistration{
service: service,
chkTypes: chkTypes,
persist: persist,
token: token,
source: source,
}
// If a service watch already exists, update the registration. Otherwise,
// start a new config watcher.
watch, ok := s.services[service.ID]
if ok {
watch.updateRegistration(&reg)
} else {
watch := &serviceConfigWatch{
registration: &reg,
updateCh: make(chan cache.UpdateEvent, 1),
agent: s.agent,
}
s.services[service.ID] = watch
watch.Start()
}
}
func (s *ServiceManager) RemoveService(serviceID string) {
s.Lock()
defer s.Unlock()
serviceWatch, ok := s.services[serviceID]
if !ok {
return
}
serviceWatch.Stop()
delete(s.services, serviceID)
}
type serviceRegistration struct {
service *structs.NodeService
chkTypes []*structs.CheckType
persist bool
token string
source configSource
}
type serviceConfigWatch struct {
registration *serviceRegistration
config *structs.ServiceDefinition
agent *Agent
updateCh chan cache.UpdateEvent
ctx context.Context
cancelFunc func()
sync.RWMutex
}
func (s *serviceConfigWatch) Start() error {
s.ctx, s.cancelFunc = context.WithCancel(context.Background())
if err := s.startConfigWatch(); err != nil {
return err
}
go s.runWatch()
return nil
}
func (s *serviceConfigWatch) runWatch() {
for {
select {
case <-s.ctx.Done():
return
case event := <-s.updateCh:
s.handleUpdate(event)
}
}
}
func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) {
switch event.Result.(type) {
case serviceRegistration:
s.Lock()
s.registration = event.Result.(*serviceRegistration)
s.Unlock()
case structs.ServiceConfigResponse:
s.Lock()
s.config = &event.Result.(*structs.ServiceConfigResponse).Definition
s.Unlock()
default:
s.agent.logger.Printf("[ERR] unknown update event type: %T", event)
}
service := s.mergeServiceConfig()
s.agent.logger.Printf("[INFO] updating service registration: %v, %v", service.ID, service.Meta)
/*err := s.agent.AddService(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source)
if err != nil {
s.agent.logger.Printf("[ERR] error updating service registration: %v", err)
}*/
}
func (s *serviceConfigWatch) startConfigWatch() error {
s.RLock()
name := s.registration.service.Service
s.RUnlock()
req := &structs.ServiceConfigRequest{
Name: name,
Datacenter: s.agent.config.Datacenter,
}
err := s.agent.cache.Notify(s.ctx, cachetype.ResolvedServiceConfigName, req, fmt.Sprintf("service-config:%s", name), s.updateCh)
return err
}
func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) {
s.updateCh <- cache.UpdateEvent{
Result: registration,
}
}
func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService {
return nil
}
func (s *serviceConfigWatch) Stop() {
s.cancelFunc()
}
/*
// Construct the service config request. This will be re-used with an updated
// index to watch for changes in the effective service config.
req := structs.ServiceConfigRequest{
Name: s.registration.service.Service,
Datacenter: s.agent.config.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.agent.tokens.AgentToken()},
}
consul.RetryLoopBackoff(s.shutdownCh, func() error {
var reply structs.ServiceConfigResponse
if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &req, &reply); err != nil {
return err
}
s.updateConfig(&reply.Definition)
req.QueryOptions.MinQueryIndex = reply.QueryMeta.Index
return nil
}, func(err error) {
s.agent.logger.Printf("[ERR] Error getting service config: %v", err)
})
*/

View File

@ -2,10 +2,13 @@ package structs
import (
"fmt"
"strconv"
"strings"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/go-msgpack/codec"
"github.com/mitchellh/hashstructure"
)
const (
@ -265,6 +268,30 @@ func (s *ServiceConfigRequest) RequestDatacenter() string {
return s.Datacenter
}
func (r *ServiceConfigRequest) CacheInfo() cache.RequestInfo {
info := cache.RequestInfo{
Token: r.Token,
Datacenter: r.Datacenter,
MinIndex: r.MinQueryIndex,
Timeout: r.MaxQueryTime,
MaxAge: r.MaxAge,
MustRevalidate: r.MustRevalidate,
}
// To calculate the cache key we only hash the service name. The
// datacenter is handled by the cache framework. The other fields are
// not, but should not be used in any cache types.
v, err := hashstructure.Hash(r.Name, nil)
if err == nil {
// If there is an error, we don't set the key. A blank key forces
// no cache for this request so the request is forwarded directly
// to the server.
info.Key = strconv.FormatUint(v, 10)
}
return info
}
type ServiceConfigResponse struct {
Definition ServiceDefinition