Merge pull request #9300 from hashicorp/dnephin/add-service
agent: reduce AddService
This commit is contained in:
commit
f3e2e3545c
139
agent/agent.go
139
agent/agent.go
|
@ -1890,54 +1890,25 @@ func (a *Agent) readPersistedServiceConfigs() (map[structs.ServiceID]*structs.Se
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddServiceAndReplaceChecks is used to add a service entry and its check. Any check for this service missing from chkTypes will be deleted.
|
// AddService is used to add a service entry and its check. Any check for this service missing from chkTypes will be deleted.
|
||||||
// This entry is persistent and the agent will make a best effort to
|
// This entry is persistent and the agent will make a best effort to
|
||||||
// ensure it is registered
|
// ensure it is registered
|
||||||
func (a *Agent) AddServiceAndReplaceChecks(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
|
func (a *Agent) AddService(req AddServiceRequest) error {
|
||||||
|
req.waitForCentralConfig = true
|
||||||
|
req.persistServiceConfig = true
|
||||||
a.stateLock.Lock()
|
a.stateLock.Lock()
|
||||||
defer a.stateLock.Unlock()
|
defer a.stateLock.Unlock()
|
||||||
return a.addServiceLocked(&addServiceRequest{
|
|
||||||
service: service,
|
|
||||||
chkTypes: chkTypes,
|
|
||||||
previousDefaults: nil,
|
|
||||||
waitForCentralConfig: true,
|
|
||||||
persist: persist,
|
|
||||||
persistServiceConfig: true,
|
|
||||||
token: token,
|
|
||||||
replaceExistingChecks: true,
|
|
||||||
source: source,
|
|
||||||
snap: a.snapshotCheckState(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddService is used to add a service entry.
|
req.snap = a.State.Checks(structs.WildcardEnterpriseMeta())
|
||||||
// This entry is persistent and the agent will make a best effort to
|
return a.addServiceLocked(req)
|
||||||
// ensure it is registered
|
|
||||||
func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
|
|
||||||
a.stateLock.Lock()
|
|
||||||
defer a.stateLock.Unlock()
|
|
||||||
return a.addServiceLocked(&addServiceRequest{
|
|
||||||
service: service,
|
|
||||||
chkTypes: chkTypes,
|
|
||||||
previousDefaults: nil,
|
|
||||||
waitForCentralConfig: true,
|
|
||||||
persist: persist,
|
|
||||||
persistServiceConfig: true,
|
|
||||||
token: token,
|
|
||||||
replaceExistingChecks: false,
|
|
||||||
source: source,
|
|
||||||
snap: a.snapshotCheckState(),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// addServiceLocked adds a service entry to the service manager if enabled, or directly
|
// addServiceLocked adds a service entry to the service manager if enabled, or directly
|
||||||
// to the local state if it is not. This function assumes the state lock is already held.
|
// to the local state if it is not. This function assumes the state lock is already held.
|
||||||
func (a *Agent) addServiceLocked(req *addServiceRequest) error {
|
func (a *Agent) addServiceLocked(req AddServiceRequest) error {
|
||||||
req.fixupForAddServiceLocked()
|
req.Service.EnterpriseMeta.Normalize()
|
||||||
|
|
||||||
req.service.EnterpriseMeta.Normalize()
|
if err := a.validateService(req.Service, req.chkTypes); err != nil {
|
||||||
|
|
||||||
if err := a.validateService(req.service, req.chkTypes); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1945,15 +1916,11 @@ func (a *Agent) addServiceLocked(req *addServiceRequest) error {
|
||||||
return a.serviceManager.AddService(req)
|
return a.serviceManager.AddService(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
// previousDefaults are ignored here because they are only relevant for central config.
|
|
||||||
req.persistService = nil
|
|
||||||
req.persistDefaults = nil
|
|
||||||
req.persistServiceConfig = false
|
req.persistServiceConfig = false
|
||||||
|
return a.addServiceInternal(addServiceInternalRequest{AddServiceRequest: req})
|
||||||
return a.addServiceInternal(req)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// addServiceRequest is the union of arguments for calling both
|
// AddServiceRequest is the union of arguments for calling both
|
||||||
// addServiceLocked and addServiceInternal. The overlap was significant enough
|
// addServiceLocked and addServiceInternal. The overlap was significant enough
|
||||||
// to warrant merging them and indicating which fields are meant to be set only
|
// to warrant merging them and indicating which fields are meant to be set only
|
||||||
// in one of the two contexts.
|
// in one of the two contexts.
|
||||||
|
@ -1963,46 +1930,28 @@ func (a *Agent) addServiceLocked(req *addServiceRequest) error {
|
||||||
//
|
//
|
||||||
// The ServiceManager.AddService signature is largely just a passthrough for
|
// The ServiceManager.AddService signature is largely just a passthrough for
|
||||||
// addServiceLocked and should be treated as such.
|
// addServiceLocked and should be treated as such.
|
||||||
type addServiceRequest struct {
|
type AddServiceRequest struct {
|
||||||
service *structs.NodeService
|
Service *structs.NodeService
|
||||||
chkTypes []*structs.CheckType
|
chkTypes []*structs.CheckType
|
||||||
previousDefaults *structs.ServiceConfigResponse // just for: addServiceLocked
|
previousDefaults *structs.ServiceConfigResponse // just for: addServiceLocked
|
||||||
waitForCentralConfig bool // just for: addServiceLocked
|
waitForCentralConfig bool // just for: addServiceLocked
|
||||||
persistService *structs.NodeService // just for: addServiceInternal
|
|
||||||
persistDefaults *structs.ServiceConfigResponse // just for: addServiceInternal
|
|
||||||
persist bool
|
persist bool
|
||||||
persistServiceConfig bool
|
persistServiceConfig bool
|
||||||
token string
|
token string
|
||||||
replaceExistingChecks bool
|
replaceExistingChecks bool
|
||||||
source configSource
|
Source configSource
|
||||||
snap map[structs.CheckID]*structs.HealthCheck
|
snap map[structs.CheckID]*structs.HealthCheck
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *addServiceRequest) fixupForAddServiceLocked() {
|
type addServiceInternalRequest struct {
|
||||||
r.persistService = nil
|
AddServiceRequest
|
||||||
r.persistDefaults = nil
|
persistService *structs.NodeService
|
||||||
}
|
persistDefaults *structs.ServiceConfigResponse
|
||||||
|
|
||||||
func (r *addServiceRequest) fixupForAddServiceInternal() {
|
|
||||||
r.previousDefaults = nil
|
|
||||||
r.waitForCentralConfig = false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// addServiceInternal adds the given service and checks to the local state.
|
// addServiceInternal adds the given service and checks to the local state.
|
||||||
func (a *Agent) addServiceInternal(req *addServiceRequest) error {
|
func (a *Agent) addServiceInternal(req addServiceInternalRequest) error {
|
||||||
req.fixupForAddServiceInternal()
|
service := req.Service
|
||||||
var (
|
|
||||||
service = req.service
|
|
||||||
chkTypes = req.chkTypes
|
|
||||||
persistService = req.persistService
|
|
||||||
persistDefaults = req.persistDefaults
|
|
||||||
persist = req.persist
|
|
||||||
persistServiceConfig = req.persistServiceConfig
|
|
||||||
token = req.token
|
|
||||||
replaceExistingChecks = req.replaceExistingChecks
|
|
||||||
source = req.source
|
|
||||||
snap = req.snap
|
|
||||||
)
|
|
||||||
|
|
||||||
// Pause the service syncs during modification
|
// Pause the service syncs during modification
|
||||||
a.PauseSync()
|
a.PauseSync()
|
||||||
|
@ -2038,11 +1987,11 @@ func (a *Agent) addServiceInternal(req *addServiceRequest) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create an associated health check
|
// Create an associated health check
|
||||||
for i, chkType := range chkTypes {
|
for i, chkType := range req.chkTypes {
|
||||||
checkID := string(chkType.CheckID)
|
checkID := string(chkType.CheckID)
|
||||||
if checkID == "" {
|
if checkID == "" {
|
||||||
checkID = fmt.Sprintf("service:%s", service.ID)
|
checkID = fmt.Sprintf("service:%s", service.ID)
|
||||||
if len(chkTypes) > 1 {
|
if len(req.chkTypes) > 1 {
|
||||||
checkID += fmt.Sprintf(":%d", i+1)
|
checkID += fmt.Sprintf(":%d", i+1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2071,7 +2020,7 @@ func (a *Agent) addServiceInternal(req *addServiceRequest) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restore the fields from the snapshot.
|
// Restore the fields from the snapshot.
|
||||||
prev, ok := snap[cid]
|
prev, ok := req.snap[cid]
|
||||||
if ok {
|
if ok {
|
||||||
check.Output = prev.Output
|
check.Output = prev.Output
|
||||||
check.Status = prev.Status
|
check.Status = prev.Status
|
||||||
|
@ -2098,20 +2047,22 @@ func (a *Agent) addServiceInternal(req *addServiceRequest) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := a.State.AddServiceWithChecks(service, checks, token)
|
err := a.State.AddServiceWithChecks(service, checks, req.token)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.cleanupRegistration(cleanupServices, cleanupChecks)
|
a.cleanupRegistration(cleanupServices, cleanupChecks)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
source := req.Source
|
||||||
|
persist := req.persist
|
||||||
for i := range checks {
|
for i := range checks {
|
||||||
if err := a.addCheck(checks[i], chkTypes[i], service, token, source); err != nil {
|
if err := a.addCheck(checks[i], req.chkTypes[i], service, req.token, source); err != nil {
|
||||||
a.cleanupRegistration(cleanupServices, cleanupChecks)
|
a.cleanupRegistration(cleanupServices, cleanupChecks)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if persist && a.config.DataDir != "" {
|
if persist && a.config.DataDir != "" {
|
||||||
if err := a.persistCheck(checks[i], chkTypes[i], source); err != nil {
|
if err := a.persistCheck(checks[i], req.chkTypes[i], source); err != nil {
|
||||||
a.cleanupRegistration(cleanupServices, cleanupChecks)
|
a.cleanupRegistration(cleanupServices, cleanupChecks)
|
||||||
return err
|
return err
|
||||||
|
|
||||||
|
@ -2134,10 +2085,10 @@ func (a *Agent) addServiceInternal(req *addServiceRequest) error {
|
||||||
a.resetExposedChecks(psid)
|
a.resetExposedChecks(psid)
|
||||||
}
|
}
|
||||||
|
|
||||||
if persistServiceConfig && a.config.DataDir != "" {
|
if req.persistServiceConfig && a.config.DataDir != "" {
|
||||||
var err error
|
var err error
|
||||||
if persistDefaults != nil {
|
if req.persistDefaults != nil {
|
||||||
err = a.persistServiceConfig(service.CompoundServiceID(), persistDefaults)
|
err = a.persistServiceConfig(service.CompoundServiceID(), req.persistDefaults)
|
||||||
} else {
|
} else {
|
||||||
err = a.purgeServiceConfig(service.CompoundServiceID())
|
err = a.purgeServiceConfig(service.CompoundServiceID())
|
||||||
}
|
}
|
||||||
|
@ -2150,17 +2101,17 @@ func (a *Agent) addServiceInternal(req *addServiceRequest) error {
|
||||||
|
|
||||||
// Persist the service to a file
|
// Persist the service to a file
|
||||||
if persist && a.config.DataDir != "" {
|
if persist && a.config.DataDir != "" {
|
||||||
if persistService == nil {
|
if req.persistService == nil {
|
||||||
persistService = service
|
req.persistService = service
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := a.persistService(persistService, source); err != nil {
|
if err := a.persistService(req.persistService, source); err != nil {
|
||||||
a.cleanupRegistration(cleanupServices, cleanupChecks)
|
a.cleanupRegistration(cleanupServices, cleanupChecks)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if replaceExistingChecks {
|
if req.replaceExistingChecks {
|
||||||
for checkID, keep := range existingChecks {
|
for checkID, keep := range existingChecks {
|
||||||
if !keep {
|
if !keep {
|
||||||
a.removeCheckLocked(checkID, persist)
|
a.removeCheckLocked(checkID, persist)
|
||||||
|
@ -2372,7 +2323,7 @@ func (a *Agent) removeServiceLocked(serviceID structs.ServiceID, persist bool) e
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) removeServiceSidecars(serviceID structs.ServiceID, persist bool) error {
|
func (a *Agent) removeServiceSidecars(serviceID structs.ServiceID, persist bool) error {
|
||||||
sidecarSID := structs.NewServiceID(a.sidecarServiceID(serviceID.ID), &serviceID.EnterpriseMeta)
|
sidecarSID := structs.NewServiceID(sidecarServiceID(serviceID.ID), &serviceID.EnterpriseMeta)
|
||||||
if sidecar := a.State.Service(sidecarSID); sidecar != nil {
|
if sidecar := a.State.Service(sidecarSID); sidecar != nil {
|
||||||
// Double check that it's not just an ID collision and we actually added
|
// Double check that it's not just an ID collision and we actually added
|
||||||
// this from a sidecar.
|
// this from a sidecar.
|
||||||
|
@ -3126,8 +3077,8 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
||||||
ns.Connect.SidecarService = nil
|
ns.Connect.SidecarService = nil
|
||||||
|
|
||||||
sid := ns.CompoundServiceID()
|
sid := ns.CompoundServiceID()
|
||||||
err = a.addServiceLocked(&addServiceRequest{
|
err = a.addServiceLocked(AddServiceRequest{
|
||||||
service: ns,
|
Service: ns,
|
||||||
chkTypes: chkTypes,
|
chkTypes: chkTypes,
|
||||||
previousDefaults: persistedServiceConfigs[sid],
|
previousDefaults: persistedServiceConfigs[sid],
|
||||||
waitForCentralConfig: false, // exclusively use cached values
|
waitForCentralConfig: false, // exclusively use cached values
|
||||||
|
@ -3135,7 +3086,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
||||||
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
||||||
token: service.Token,
|
token: service.Token,
|
||||||
replaceExistingChecks: false, // do default behavior
|
replaceExistingChecks: false, // do default behavior
|
||||||
source: ConfigSourceLocal,
|
Source: ConfigSourceLocal,
|
||||||
snap: snap,
|
snap: snap,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -3145,8 +3096,8 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
||||||
// If there is a sidecar service, register that too.
|
// If there is a sidecar service, register that too.
|
||||||
if sidecar != nil {
|
if sidecar != nil {
|
||||||
sidecarServiceID := sidecar.CompoundServiceID()
|
sidecarServiceID := sidecar.CompoundServiceID()
|
||||||
err = a.addServiceLocked(&addServiceRequest{
|
err = a.addServiceLocked(AddServiceRequest{
|
||||||
service: sidecar,
|
Service: sidecar,
|
||||||
chkTypes: sidecarChecks,
|
chkTypes: sidecarChecks,
|
||||||
previousDefaults: persistedServiceConfigs[sidecarServiceID],
|
previousDefaults: persistedServiceConfigs[sidecarServiceID],
|
||||||
waitForCentralConfig: false, // exclusively use cached values
|
waitForCentralConfig: false, // exclusively use cached values
|
||||||
|
@ -3154,7 +3105,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
||||||
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
||||||
token: sidecarToken,
|
token: sidecarToken,
|
||||||
replaceExistingChecks: false, // do default behavior
|
replaceExistingChecks: false, // do default behavior
|
||||||
source: ConfigSourceLocal,
|
Source: ConfigSourceLocal,
|
||||||
snap: snap,
|
snap: snap,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -3242,8 +3193,8 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
||||||
"service", serviceID.String(),
|
"service", serviceID.String(),
|
||||||
"file", file,
|
"file", file,
|
||||||
)
|
)
|
||||||
err = a.addServiceLocked(&addServiceRequest{
|
err = a.addServiceLocked(AddServiceRequest{
|
||||||
service: p.Service,
|
Service: p.Service,
|
||||||
chkTypes: nil,
|
chkTypes: nil,
|
||||||
previousDefaults: persistedServiceConfigs[serviceID],
|
previousDefaults: persistedServiceConfigs[serviceID],
|
||||||
waitForCentralConfig: false, // exclusively use cached values
|
waitForCentralConfig: false, // exclusively use cached values
|
||||||
|
@ -3251,7 +3202,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
||||||
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
||||||
token: p.Token,
|
token: p.Token,
|
||||||
replaceExistingChecks: false, // do default behavior
|
replaceExistingChecks: false, // do default behavior
|
||||||
source: source,
|
Source: source,
|
||||||
snap: snap,
|
snap: snap,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -10,6 +10,12 @@ import (
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
"github.com/mitchellh/hashstructure"
|
"github.com/mitchellh/hashstructure"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-bexpr"
|
||||||
|
"github.com/hashicorp/serf/coordinate"
|
||||||
|
"github.com/hashicorp/serf/serf"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
"github.com/hashicorp/consul/agent/debug"
|
"github.com/hashicorp/consul/agent/debug"
|
||||||
|
@ -22,11 +28,6 @@ import (
|
||||||
"github.com/hashicorp/consul/logging"
|
"github.com/hashicorp/consul/logging"
|
||||||
"github.com/hashicorp/consul/logging/monitor"
|
"github.com/hashicorp/consul/logging/monitor"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/go-bexpr"
|
|
||||||
"github.com/hashicorp/serf/coordinate"
|
|
||||||
"github.com/hashicorp/serf/serf"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Self struct {
|
type Self struct {
|
||||||
|
@ -991,25 +992,29 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.
|
||||||
replaceExistingChecks = true
|
replaceExistingChecks = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if replaceExistingChecks {
|
addReq := AddServiceRequest{
|
||||||
if err := s.agent.AddServiceAndReplaceChecks(ns, chkTypes, true, token, ConfigSourceRemote); err != nil {
|
Service: ns,
|
||||||
return nil, err
|
chkTypes: chkTypes,
|
||||||
}
|
persist: true,
|
||||||
} else {
|
token: token,
|
||||||
if err := s.agent.AddService(ns, chkTypes, true, token, ConfigSourceRemote); err != nil {
|
Source: ConfigSourceRemote,
|
||||||
return nil, err
|
replaceExistingChecks: replaceExistingChecks,
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// Add sidecar.
|
if err := s.agent.AddService(addReq); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if sidecar != nil {
|
if sidecar != nil {
|
||||||
if replaceExistingChecks {
|
addReq := AddServiceRequest{
|
||||||
if err := s.agent.AddServiceAndReplaceChecks(sidecar, sidecarChecks, true, sidecarToken, ConfigSourceRemote); err != nil {
|
Service: sidecar,
|
||||||
return nil, err
|
chkTypes: sidecarChecks,
|
||||||
}
|
persist: true,
|
||||||
} else {
|
token: sidecarToken,
|
||||||
if err := s.agent.AddService(sidecar, sidecarChecks, true, sidecarToken, ConfigSourceRemote); err != nil {
|
Source: ConfigSourceRemote,
|
||||||
return nil, err
|
replaceExistingChecks: replaceExistingChecks,
|
||||||
}
|
}
|
||||||
|
if err := s.agent.AddService(addReq); err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.syncChanges()
|
s.syncChanges()
|
||||||
|
|
|
@ -18,6 +18,12 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-uuid"
|
||||||
|
"github.com/hashicorp/serf/serf"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/connect"
|
"github.com/hashicorp/consul/agent/connect"
|
||||||
|
@ -34,11 +40,6 @@ import (
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/go-uuid"
|
|
||||||
"github.com/hashicorp/serf/serf"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
"golang.org/x/time/rate"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func makeReadOnlyAgentACL(t *testing.T, srv *HTTPHandlers) string {
|
func makeReadOnlyAgentACL(t *testing.T, srv *HTTPHandlers) string {
|
||||||
|
@ -735,21 +736,21 @@ func TestAgent_HealthServiceByID(t *testing.T) {
|
||||||
ID: "mysql",
|
ID: "mysql",
|
||||||
Service: "mysql",
|
Service: "mysql",
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
service = &structs.NodeService{
|
service = &structs.NodeService{
|
||||||
ID: "mysql2",
|
ID: "mysql2",
|
||||||
Service: "mysql2",
|
Service: "mysql2",
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
service = &structs.NodeService{
|
service = &structs.NodeService{
|
||||||
ID: "mysql3",
|
ID: "mysql3",
|
||||||
Service: "mysql3",
|
Service: "mysql3",
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -933,42 +934,42 @@ func TestAgent_HealthServiceByName(t *testing.T) {
|
||||||
ID: "mysql1",
|
ID: "mysql1",
|
||||||
Service: "mysql-pool-r",
|
Service: "mysql-pool-r",
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
service = &structs.NodeService{
|
service = &structs.NodeService{
|
||||||
ID: "mysql2",
|
ID: "mysql2",
|
||||||
Service: "mysql-pool-r",
|
Service: "mysql-pool-r",
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
service = &structs.NodeService{
|
service = &structs.NodeService{
|
||||||
ID: "mysql3",
|
ID: "mysql3",
|
||||||
Service: "mysql-pool-rw",
|
Service: "mysql-pool-rw",
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
service = &structs.NodeService{
|
service = &structs.NodeService{
|
||||||
ID: "mysql4",
|
ID: "mysql4",
|
||||||
Service: "mysql-pool-rw",
|
Service: "mysql-pool-rw",
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
service = &structs.NodeService{
|
service = &structs.NodeService{
|
||||||
ID: "httpd1",
|
ID: "httpd1",
|
||||||
Service: "httpd",
|
Service: "httpd",
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
service = &structs.NodeService{
|
service = &structs.NodeService{
|
||||||
ID: "httpd2",
|
ID: "httpd2",
|
||||||
Service: "httpd",
|
Service: "httpd",
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1180,13 +1181,13 @@ func TestAgent_HealthServicesACLEnforcement(t *testing.T) {
|
||||||
ID: "mysql1",
|
ID: "mysql1",
|
||||||
Service: "mysql",
|
Service: "mysql",
|
||||||
}
|
}
|
||||||
require.NoError(t, a.AddService(service, nil, false, "", ConfigSourceLocal))
|
require.NoError(t, a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal))
|
||||||
|
|
||||||
service = &structs.NodeService{
|
service = &structs.NodeService{
|
||||||
ID: "foo1",
|
ID: "foo1",
|
||||||
Service: "foo",
|
Service: "foo",
|
||||||
}
|
}
|
||||||
require.NoError(t, a.AddService(service, nil, false, "", ConfigSourceLocal))
|
require.NoError(t, a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal))
|
||||||
|
|
||||||
// no token
|
// no token
|
||||||
t.Run("no-token-health-by-id", func(t *testing.T) {
|
t.Run("no-token-health-by-id", func(t *testing.T) {
|
||||||
|
@ -4014,10 +4015,10 @@ func testAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T, extraHCL s
|
||||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||||
|
|
||||||
if tt.preRegister != nil {
|
if tt.preRegister != nil {
|
||||||
require.NoError(a.AddService(tt.preRegister, nil, false, "", ConfigSourceLocal))
|
require.NoError(a.addServiceFromSource(tt.preRegister, nil, false, "", ConfigSourceLocal))
|
||||||
}
|
}
|
||||||
if tt.preRegister2 != nil {
|
if tt.preRegister2 != nil {
|
||||||
require.NoError(a.AddService(tt.preRegister2, nil, false, "", ConfigSourceLocal))
|
require.NoError(a.addServiceFromSource(tt.preRegister2, nil, false, "", ConfigSourceLocal))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create an ACL token with require policy
|
// Create an ACL token with require policy
|
||||||
|
@ -4319,7 +4320,7 @@ func TestAgent_DeregisterService(t *testing.T) {
|
||||||
ID: "test",
|
ID: "test",
|
||||||
Service: "test",
|
Service: "test",
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4351,7 +4352,7 @@ func TestAgent_DeregisterService_ACLDeny(t *testing.T) {
|
||||||
ID: "test",
|
ID: "test",
|
||||||
Service: "test",
|
Service: "test",
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4429,7 +4430,7 @@ func TestAgent_ServiceMaintenance_Enable(t *testing.T) {
|
||||||
ID: "test",
|
ID: "test",
|
||||||
Service: "test",
|
Service: "test",
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4476,7 +4477,7 @@ func TestAgent_ServiceMaintenance_Disable(t *testing.T) {
|
||||||
ID: "test",
|
ID: "test",
|
||||||
Service: "test",
|
Service: "test",
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4517,7 +4518,7 @@ func TestAgent_ServiceMaintenance_ACLDeny(t *testing.T) {
|
||||||
ID: "test",
|
ID: "test",
|
||||||
Service: "test",
|
Service: "test",
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(service, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -529,7 +529,7 @@ func testAgent_AddService(t *testing.T, extraHCL string) {
|
||||||
t.Run(tt.desc, func(t *testing.T) {
|
t.Run(tt.desc, func(t *testing.T) {
|
||||||
// check the service registration
|
// check the service registration
|
||||||
t.Run(tt.srv.ID, func(t *testing.T) {
|
t.Run(tt.srv.ID, func(t *testing.T) {
|
||||||
err := a.AddService(tt.srv, tt.chkTypes, false, "", ConfigSourceLocal)
|
err := a.addServiceFromSource(tt.srv, tt.chkTypes, false, "", ConfigSourceLocal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -572,6 +572,20 @@ func testAgent_AddService(t *testing.T, extraHCL string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// addServiceFromSource is a test helper that exists to maintain an old function
|
||||||
|
// signature that was used in many tests.
|
||||||
|
// Deprecated: use AddService
|
||||||
|
func (a *Agent) addServiceFromSource(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
|
||||||
|
return a.AddService(AddServiceRequest{
|
||||||
|
Service: service,
|
||||||
|
chkTypes: chkTypes,
|
||||||
|
persist: persist,
|
||||||
|
token: token,
|
||||||
|
replaceExistingChecks: false,
|
||||||
|
Source: source,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T) {
|
func TestAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
|
@ -638,7 +652,7 @@ func testAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T, extraHCL st
|
||||||
chkTypes, err := service.CheckTypes()
|
chkTypes, err := service.CheckTypes()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.NoError(t, a.AddService(ns, chkTypes, false, service.Token, ConfigSourceLocal))
|
require.NoError(t, a.addServiceFromSource(ns, chkTypes, false, service.Token, ConfigSourceLocal))
|
||||||
}
|
}
|
||||||
|
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
@ -665,7 +679,7 @@ func test_createAlias(t *testing.T, agent *TestAgent, chk *structs.CheckType, ex
|
||||||
if chk.CheckID == "" {
|
if chk.CheckID == "" {
|
||||||
chk.CheckID = types.CheckID(fmt.Sprintf("check-%d", serviceNum))
|
chk.CheckID = types.CheckID(fmt.Sprintf("check-%d", serviceNum))
|
||||||
}
|
}
|
||||||
err := agent.AddService(srv, []*structs.CheckType{chk}, false, "", ConfigSourceLocal)
|
err := agent.addServiceFromSource(srv, []*structs.CheckType{chk}, false, "", ConfigSourceLocal)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
return func(r *retry.R) {
|
return func(r *retry.R) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
@ -712,7 +726,7 @@ func TestAgent_CheckAliasRPC(t *testing.T) {
|
||||||
// We ensure to not block and update Agent's index
|
// We ensure to not block and update Agent's index
|
||||||
srv.Tags = []string{fmt.Sprintf("tag-%s", time.Now())}
|
srv.Tags = []string{fmt.Sprintf("tag-%s", time.Now())}
|
||||||
assert.NoError(t, a.waitForUp())
|
assert.NoError(t, a.waitForUp())
|
||||||
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceLocal)
|
err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceLocal)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
shutdownAgent := func() {
|
shutdownAgent := func() {
|
||||||
|
@ -727,7 +741,7 @@ func TestAgent_CheckAliasRPC(t *testing.T) {
|
||||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||||
|
|
||||||
assert.NoError(t, a.waitForUp())
|
assert.NoError(t, a.waitForUp())
|
||||||
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceLocal)
|
err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceLocal)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
@ -832,12 +846,12 @@ func testAgent_AddServiceNoExec(t *testing.T, extraHCL string) {
|
||||||
Interval: 15 * time.Second,
|
Interval: 15 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := a.AddService(srv, []*structs.CheckType{chk}, false, "", ConfigSourceLocal)
|
err := a.addServiceFromSource(srv, []*structs.CheckType{chk}, false, "", ConfigSourceLocal)
|
||||||
if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") {
|
if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = a.AddService(srv, []*structs.CheckType{chk}, false, "", ConfigSourceRemote)
|
err = a.addServiceFromSource(srv, []*structs.CheckType{chk}, false, "", ConfigSourceRemote)
|
||||||
if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") {
|
if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -879,7 +893,7 @@ func testAgent_AddServiceNoRemoteExec(t *testing.T, extraHCL string) {
|
||||||
Interval: 15 * time.Second,
|
Interval: 15 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := a.AddService(srv, []*structs.CheckType{chk}, false, "", ConfigSourceRemote)
|
err := a.addServiceFromSource(srv, []*structs.CheckType{chk}, false, "", ConfigSourceRemote)
|
||||||
if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") {
|
if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -932,7 +946,7 @@ func TestCacheRateLimit(t *testing.T) {
|
||||||
Address: fmt.Sprintf("10.0.1.%d", i%255),
|
Address: fmt.Sprintf("10.0.1.%d", i%255),
|
||||||
}
|
}
|
||||||
|
|
||||||
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
|
err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1007,7 +1021,7 @@ func TestAddServiceIPv4TaggedDefault(t *testing.T) {
|
||||||
Address: "10.0.1.2",
|
Address: "10.0.1.2",
|
||||||
}
|
}
|
||||||
|
|
||||||
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
|
err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
||||||
ns := a.State.Service(structs.NewServiceID("my_service_id", nil))
|
ns := a.State.Service(structs.NewServiceID("my_service_id", nil))
|
||||||
|
@ -1040,7 +1054,7 @@ func TestAddServiceIPv6TaggedDefault(t *testing.T) {
|
||||||
Address: "::5",
|
Address: "::5",
|
||||||
}
|
}
|
||||||
|
|
||||||
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
|
err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
||||||
ns := a.State.Service(structs.NewServiceID("my_service_id", nil))
|
ns := a.State.Service(structs.NewServiceID("my_service_id", nil))
|
||||||
|
@ -1079,7 +1093,7 @@ func TestAddServiceIPv4TaggedSet(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
|
err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
||||||
ns := a.State.Service(structs.NewServiceID("my_service_id", nil))
|
ns := a.State.Service(structs.NewServiceID("my_service_id", nil))
|
||||||
|
@ -1118,7 +1132,7 @@ func TestAddServiceIPv6TaggedSet(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
|
err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
||||||
ns := a.State.Service(structs.NewServiceID("my_service_id", nil))
|
ns := a.State.Service(structs.NewServiceID("my_service_id", nil))
|
||||||
|
@ -1173,7 +1187,7 @@ func testAgent_RemoveService(t *testing.T, extraHCL string) {
|
||||||
}
|
}
|
||||||
chkTypes := []*structs.CheckType{{TTL: time.Minute}}
|
chkTypes := []*structs.CheckType{{TTL: time.Minute}}
|
||||||
|
|
||||||
if err := a.AddService(srv, chkTypes, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(srv, chkTypes, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1208,7 +1222,7 @@ func testAgent_RemoveService(t *testing.T, extraHCL string) {
|
||||||
{TTL: time.Minute},
|
{TTL: time.Minute},
|
||||||
{TTL: 30 * time.Second},
|
{TTL: 30 * time.Second},
|
||||||
}
|
}
|
||||||
if err := a.AddService(srv, chkTypes, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(srv, chkTypes, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1222,7 +1236,7 @@ func testAgent_RemoveService(t *testing.T, extraHCL string) {
|
||||||
{TTL: time.Minute},
|
{TTL: time.Minute},
|
||||||
{TTL: 30 * time.Second},
|
{TTL: 30 * time.Second},
|
||||||
}
|
}
|
||||||
if err := a.AddService(srv, chkTypes, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(srv, chkTypes, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1294,7 +1308,7 @@ func testAgent_RemoveServiceRemovesAllChecks(t *testing.T, extraHCL string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// register service with chk1
|
// register service with chk1
|
||||||
if err := a.AddService(svc, []*structs.CheckType{chk1}, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, []*structs.CheckType{chk1}, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatal("Failed to register service", err)
|
t.Fatal("Failed to register service", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1302,7 +1316,7 @@ func testAgent_RemoveServiceRemovesAllChecks(t *testing.T, extraHCL string) {
|
||||||
requireCheckExists(t, a, "chk1")
|
requireCheckExists(t, a, "chk1")
|
||||||
|
|
||||||
// update the service with chk2
|
// update the service with chk2
|
||||||
if err := a.AddService(svc, []*structs.CheckType{chk2}, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, []*structs.CheckType{chk2}, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatal("Failed to update service", err)
|
t.Fatal("Failed to update service", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1359,7 +1373,7 @@ func verifyIndexChurn(t *testing.T, tags []string) {
|
||||||
Tags: tags,
|
Tags: tags,
|
||||||
Weights: weights,
|
Weights: weights,
|
||||||
}
|
}
|
||||||
if err := a.AddService(svc, nil, true, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, nil, true, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1767,7 +1781,7 @@ func TestAgent_RestoreServiceWithAliasCheck(t *testing.T) {
|
||||||
|
|
||||||
registerServicesAndChecks := func(t *testing.T, a *TestAgent) {
|
registerServicesAndChecks := func(t *testing.T, a *TestAgent) {
|
||||||
// add one persistent service with a simple check
|
// add one persistent service with a simple check
|
||||||
require.NoError(t, a.AddService(
|
require.NoError(t, a.addServiceFromSource(
|
||||||
&structs.NodeService{
|
&structs.NodeService{
|
||||||
ID: "ping",
|
ID: "ping",
|
||||||
Service: "ping",
|
Service: "ping",
|
||||||
|
@ -1786,7 +1800,7 @@ func TestAgent_RestoreServiceWithAliasCheck(t *testing.T) {
|
||||||
|
|
||||||
// add one persistent sidecar service with an alias check in the manner
|
// add one persistent sidecar service with an alias check in the manner
|
||||||
// of how sidecar_service would add it
|
// of how sidecar_service would add it
|
||||||
require.NoError(t, a.AddService(
|
require.NoError(t, a.addServiceFromSource(
|
||||||
&structs.NodeService{
|
&structs.NodeService{
|
||||||
ID: "ping-sidecar-proxy",
|
ID: "ping-sidecar-proxy",
|
||||||
Service: "ping-sidecar-proxy",
|
Service: "ping-sidecar-proxy",
|
||||||
|
@ -2276,7 +2290,7 @@ func testAgent_PersistService(t *testing.T, extraHCL string) {
|
||||||
file := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svc.ID))
|
file := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svc.ID))
|
||||||
|
|
||||||
// Check is not persisted unless requested
|
// Check is not persisted unless requested
|
||||||
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
if _, err := os.Stat(file); err == nil {
|
if _, err := os.Stat(file); err == nil {
|
||||||
|
@ -2284,7 +2298,7 @@ func testAgent_PersistService(t *testing.T, extraHCL string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Persists to file if requested
|
// Persists to file if requested
|
||||||
if err := a.AddService(svc, nil, true, "mytoken", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, nil, true, "mytoken", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
if _, err := os.Stat(file); err != nil {
|
if _, err := os.Stat(file); err != nil {
|
||||||
|
@ -2308,7 +2322,7 @@ func testAgent_PersistService(t *testing.T, extraHCL string) {
|
||||||
|
|
||||||
// Updates service definition on disk
|
// Updates service definition on disk
|
||||||
svc.Port = 8001
|
svc.Port = 8001
|
||||||
if err := a.AddService(svc, nil, true, "mytoken", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, nil, true, "mytoken", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
expected, err = json.Marshal(persistedService{
|
expected, err = json.Marshal(persistedService{
|
||||||
|
@ -2431,7 +2445,7 @@ func testAgent_PurgeService(t *testing.T, extraHCL string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
file := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svc.ID))
|
file := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svc.ID))
|
||||||
if err := a.AddService(svc, nil, true, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, nil, true, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
// Exists
|
// Exists
|
||||||
|
@ -2448,7 +2462,7 @@ func testAgent_PurgeService(t *testing.T, extraHCL string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Re-add the service
|
// Re-add the service
|
||||||
if err := a.AddService(svc, nil, true, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, nil, true, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2494,7 +2508,7 @@ func testAgent_PurgeServiceOnDuplicate(t *testing.T, extraHCL string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// First persist the service
|
// First persist the service
|
||||||
require.NoError(t, a.AddService(svc1, nil, true, "", ConfigSourceLocal))
|
require.NoError(t, a.addServiceFromSource(svc1, nil, true, "", ConfigSourceLocal))
|
||||||
a.Shutdown()
|
a.Shutdown()
|
||||||
|
|
||||||
// Try bringing the agent back up with the service already
|
// Try bringing the agent back up with the service already
|
||||||
|
@ -2742,9 +2756,9 @@ func TestAgent_DeregisterPersistedSidecarAfterRestart(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// First persist the check
|
// First persist the check
|
||||||
err = a.AddService(srv, nil, true, "", ConfigSourceLocal)
|
err = a.addServiceFromSource(srv, nil, true, "", ConfigSourceLocal)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = a.AddService(connectSrv, nil, true, "", ConfigSourceLocal)
|
err = a.addServiceFromSource(connectSrv, nil, true, "", ConfigSourceLocal)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// check both services were registered
|
// check both services were registered
|
||||||
|
@ -2814,7 +2828,7 @@ func TestAgent_unloadChecks(t *testing.T) {
|
||||||
Tags: []string{"foo"},
|
Tags: []string{"foo"},
|
||||||
Port: 8000,
|
Port: 8000,
|
||||||
}
|
}
|
||||||
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3093,7 +3107,7 @@ func testAgent_unloadServices(t *testing.T, extraHCL string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register the service
|
// Register the service
|
||||||
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3125,7 +3139,7 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register the service
|
// Register the service
|
||||||
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3206,7 +3220,7 @@ func TestAgent_Service_Reap(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register the service.
|
// Register the service.
|
||||||
if err := a.AddService(svc, chkTypes, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, chkTypes, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3263,7 +3277,7 @@ func TestAgent_Service_NoReap(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register the service.
|
// Register the service.
|
||||||
if err := a.AddService(svc, chkTypes, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, chkTypes, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3310,7 +3324,7 @@ func testAgent_AddService_restoresSnapshot(t *testing.T, extraHCL string) {
|
||||||
Tags: []string{"foo"},
|
Tags: []string{"foo"},
|
||||||
Port: 8000,
|
Port: 8000,
|
||||||
}
|
}
|
||||||
require.NoError(t, a.AddService(svc, nil, false, "", ConfigSourceLocal))
|
require.NoError(t, a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal))
|
||||||
|
|
||||||
// Register a check
|
// Register a check
|
||||||
check1 := &structs.HealthCheck{
|
check1 := &structs.HealthCheck{
|
||||||
|
@ -3325,7 +3339,7 @@ func testAgent_AddService_restoresSnapshot(t *testing.T, extraHCL string) {
|
||||||
|
|
||||||
// Re-registering the service preserves the state of the check
|
// Re-registering the service preserves the state of the check
|
||||||
chkTypes := []*structs.CheckType{{TTL: 30 * time.Second}}
|
chkTypes := []*structs.CheckType{{TTL: 30 * time.Second}}
|
||||||
require.NoError(t, a.AddService(svc, chkTypes, false, "", ConfigSourceLocal))
|
require.NoError(t, a.addServiceFromSource(svc, chkTypes, false, "", ConfigSourceLocal))
|
||||||
check := requireCheckExists(t, a, "service:redis")
|
check := requireCheckExists(t, a, "service:redis")
|
||||||
require.Equal(t, api.HealthPassing, check.Status)
|
require.Equal(t, api.HealthPassing, check.Status)
|
||||||
}
|
}
|
||||||
|
@ -3346,7 +3360,7 @@ func TestAgent_AddCheck_restoresSnapshot(t *testing.T) {
|
||||||
Tags: []string{"foo"},
|
Tags: []string{"foo"},
|
||||||
Port: 8000,
|
Port: 8000,
|
||||||
}
|
}
|
||||||
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3431,7 +3445,7 @@ func TestAgent_checkStateSnapshot(t *testing.T) {
|
||||||
Tags: []string{"foo"},
|
Tags: []string{"foo"},
|
||||||
Port: 8000,
|
Port: 8000,
|
||||||
}
|
}
|
||||||
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4250,7 +4264,7 @@ func TestAgent_RerouteExistingHTTPChecks(t *testing.T) {
|
||||||
TLSSkipVerify: true,
|
TLSSkipVerify: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := a.AddService(svc, chks, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, chks, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("failed to add svc: %v", err)
|
t.Fatalf("failed to add svc: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4273,7 +4287,7 @@ func TestAgent_RerouteExistingHTTPChecks(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := a.AddService(proxy, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(proxy, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("failed to add svc: %v", err)
|
t.Fatalf("failed to add svc: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4326,7 +4340,7 @@ func TestAgent_RerouteExistingHTTPChecks(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := a.AddService(proxy, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(proxy, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("failed to add svc: %v", err)
|
t.Fatalf("failed to add svc: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4369,7 +4383,7 @@ func TestAgent_RerouteNewHTTPChecks(t *testing.T) {
|
||||||
Address: "localhost",
|
Address: "localhost",
|
||||||
Port: 8080,
|
Port: 8080,
|
||||||
}
|
}
|
||||||
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("failed to add svc: %v", err)
|
t.Fatalf("failed to add svc: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4391,7 +4405,7 @@ func TestAgent_RerouteNewHTTPChecks(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := a.AddService(proxy, nil, false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(proxy, nil, false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("failed to add svc: %v", err)
|
t.Fatalf("failed to add svc: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,12 +5,13 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
"github.com/hashicorp/consul/agent/checks"
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Integration test for ServiceHTTPBasedChecks cache-type
|
// Integration test for ServiceHTTPBasedChecks cache-type
|
||||||
|
@ -62,7 +63,7 @@ func TestAgent_ServiceHTTPChecksNotification(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
// Adding TTL type should lead to a timeout, since only HTTP-based checks are watched
|
// Adding TTL type should lead to a timeout, since only HTTP-based checks are watched
|
||||||
if err := a.AddService(&service, chkTypes[2:], false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(&service, chkTypes[2:], false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("failed to add service: %v", err)
|
t.Fatalf("failed to add service: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,7 +75,7 @@ func TestAgent_ServiceHTTPChecksNotification(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adding service with HTTP checks should lead notification for them
|
// Adding service with HTTP checks should lead notification for them
|
||||||
if err := a.AddService(&service, chkTypes[0:2], false, "", ConfigSourceLocal); err != nil {
|
if err := a.addServiceFromSource(&service, chkTypes[0:2], false, "", ConfigSourceLocal); err != nil {
|
||||||
t.Fatalf("failed to add service: %v", err)
|
t.Fatalf("failed to add service: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,12 +4,13 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
|
||||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
"github.com/imdario/mergo"
|
"github.com/imdario/mergo"
|
||||||
"github.com/mitchellh/copystructure"
|
"github.com/mitchellh/copystructure"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
// The ServiceManager is a layer for service registration in between the agent
|
// The ServiceManager is a layer for service registration in between the agent
|
||||||
|
@ -83,7 +84,7 @@ func (s *ServiceManager) Start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// runOnce will process a single registration request
|
// runOnce will process a single registration request
|
||||||
func (s *ServiceManager) registerOnce(args *addServiceRequest) error {
|
func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error {
|
||||||
s.agent.stateLock.Lock()
|
s.agent.stateLock.Lock()
|
||||||
defer s.agent.stateLock.Unlock()
|
defer s.agent.stateLock.Unlock()
|
||||||
|
|
||||||
|
@ -120,46 +121,30 @@ func (s *ServiceManager) registerOnce(args *addServiceRequest) error {
|
||||||
// merged with the global defaults before registration.
|
// merged with the global defaults before registration.
|
||||||
//
|
//
|
||||||
// NOTE: the caller must hold the Agent.stateLock!
|
// NOTE: the caller must hold the Agent.stateLock!
|
||||||
func (s *ServiceManager) AddService(req *addServiceRequest) error {
|
func (s *ServiceManager) AddService(req AddServiceRequest) error {
|
||||||
req.fixupForAddServiceLocked()
|
req.Service.EnterpriseMeta.Normalize()
|
||||||
|
|
||||||
req.service.EnterpriseMeta.Normalize()
|
|
||||||
|
|
||||||
// For now only proxies have anything that can be configured
|
// For now only proxies have anything that can be configured
|
||||||
// centrally. So bypass the whole manager for regular services.
|
// centrally. So bypass the whole manager for regular services.
|
||||||
if !req.service.IsSidecarProxy() && !req.service.IsGateway() {
|
if !req.Service.IsSidecarProxy() && !req.Service.IsGateway() {
|
||||||
// previousDefaults are ignored here because they are only relevant for central config.
|
|
||||||
req.persistService = nil
|
|
||||||
req.persistDefaults = nil
|
|
||||||
req.persistServiceConfig = false
|
req.persistServiceConfig = false
|
||||||
return s.agent.addServiceInternal(req)
|
return s.agent.addServiceInternal(addServiceInternalRequest{AddServiceRequest: req})
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
// TODO: replace serviceRegistration with AddServiceRequest
|
||||||
service = req.service
|
|
||||||
chkTypes = req.chkTypes
|
|
||||||
previousDefaults = req.previousDefaults
|
|
||||||
waitForCentralConfig = req.waitForCentralConfig
|
|
||||||
persist = req.persist
|
|
||||||
persistServiceConfig = req.persistServiceConfig
|
|
||||||
token = req.token
|
|
||||||
replaceExistingChecks = req.replaceExistingChecks
|
|
||||||
source = req.source
|
|
||||||
)
|
|
||||||
|
|
||||||
reg := &serviceRegistration{
|
reg := &serviceRegistration{
|
||||||
service: service,
|
service: req.Service,
|
||||||
chkTypes: chkTypes,
|
chkTypes: req.chkTypes,
|
||||||
persist: persist,
|
persist: req.persist,
|
||||||
token: token,
|
token: req.token,
|
||||||
replaceExistingChecks: replaceExistingChecks,
|
replaceExistingChecks: req.replaceExistingChecks,
|
||||||
source: source,
|
source: req.Source,
|
||||||
}
|
}
|
||||||
|
|
||||||
s.servicesLock.Lock()
|
s.servicesLock.Lock()
|
||||||
defer s.servicesLock.Unlock()
|
defer s.servicesLock.Unlock()
|
||||||
|
|
||||||
sid := service.CompoundServiceID()
|
sid := req.Service.CompoundServiceID()
|
||||||
|
|
||||||
// If a service watch already exists, shut it down and replace it.
|
// If a service watch already exists, shut it down and replace it.
|
||||||
oldWatch, updating := s.services[sid]
|
oldWatch, updating := s.services[sid]
|
||||||
|
@ -178,9 +163,9 @@ func (s *ServiceManager) AddService(req *addServiceRequest) error {
|
||||||
|
|
||||||
err := watch.RegisterAndStart(
|
err := watch.RegisterAndStart(
|
||||||
s.ctx,
|
s.ctx,
|
||||||
previousDefaults,
|
req.previousDefaults,
|
||||||
waitForCentralConfig,
|
req.waitForCentralConfig,
|
||||||
persistServiceConfig,
|
req.persistServiceConfig,
|
||||||
&s.running,
|
&s.running,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -190,9 +175,9 @@ func (s *ServiceManager) AddService(req *addServiceRequest) error {
|
||||||
s.services[sid] = watch
|
s.services[sid] = watch
|
||||||
|
|
||||||
if updating {
|
if updating {
|
||||||
s.agent.logger.Debug("updated local registration for service", "service", service.ID)
|
s.agent.logger.Debug("updated local registration for service", "service", req.Service.ID)
|
||||||
} else {
|
} else {
|
||||||
s.agent.logger.Debug("added local registration for service", "service", service.ID)
|
s.agent.logger.Debug("added local registration for service", "service", req.Service.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -267,17 +252,19 @@ func (w *serviceConfigWatch) RegisterAndStart(
|
||||||
// The first time we do this interactively, we need to know if it
|
// The first time we do this interactively, we need to know if it
|
||||||
// failed for validation reasons which we only get back from the
|
// failed for validation reasons which we only get back from the
|
||||||
// initial underlying add service call.
|
// initial underlying add service call.
|
||||||
err = w.agent.addServiceInternal(&addServiceRequest{
|
err = w.agent.addServiceInternal(addServiceInternalRequest{
|
||||||
service: merged,
|
AddServiceRequest: AddServiceRequest{
|
||||||
chkTypes: w.registration.chkTypes,
|
Service: merged,
|
||||||
persistService: w.registration.service,
|
chkTypes: w.registration.chkTypes,
|
||||||
persistDefaults: serviceDefaults,
|
persist: w.registration.persist,
|
||||||
persist: w.registration.persist,
|
persistServiceConfig: persistServiceConfig,
|
||||||
persistServiceConfig: persistServiceConfig,
|
token: w.registration.token,
|
||||||
token: w.registration.token,
|
replaceExistingChecks: w.registration.replaceExistingChecks,
|
||||||
replaceExistingChecks: w.registration.replaceExistingChecks,
|
Source: w.registration.source,
|
||||||
source: w.registration.source,
|
snap: w.agent.snapshotCheckState(),
|
||||||
snap: w.agent.snapshotCheckState(),
|
},
|
||||||
|
persistService: w.registration.service,
|
||||||
|
persistDefaults: serviceDefaults,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error updating service registration: %v", err)
|
return fmt.Errorf("error updating service registration: %v", err)
|
||||||
|
@ -408,16 +395,18 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
|
||||||
}
|
}
|
||||||
|
|
||||||
registerReq := &asyncRegisterRequest{
|
registerReq := &asyncRegisterRequest{
|
||||||
Args: &addServiceRequest{
|
Args: addServiceInternalRequest{
|
||||||
service: merged,
|
AddServiceRequest: AddServiceRequest{
|
||||||
chkTypes: w.registration.chkTypes,
|
Service: merged,
|
||||||
persistService: w.registration.service,
|
chkTypes: w.registration.chkTypes,
|
||||||
persistDefaults: serviceDefaults,
|
persist: w.registration.persist,
|
||||||
persist: w.registration.persist,
|
persistServiceConfig: true,
|
||||||
persistServiceConfig: true,
|
token: w.registration.token,
|
||||||
token: w.registration.token,
|
replaceExistingChecks: w.registration.replaceExistingChecks,
|
||||||
replaceExistingChecks: w.registration.replaceExistingChecks,
|
Source: w.registration.source,
|
||||||
source: w.registration.source,
|
},
|
||||||
|
persistService: w.registration.service,
|
||||||
|
persistDefaults: serviceDefaults,
|
||||||
},
|
},
|
||||||
Reply: make(chan error, 1),
|
Reply: make(chan error, 1),
|
||||||
}
|
}
|
||||||
|
@ -441,7 +430,7 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
|
||||||
}
|
}
|
||||||
|
|
||||||
type asyncRegisterRequest struct {
|
type asyncRegisterRequest struct {
|
||||||
Args *addServiceRequest
|
Args addServiceInternalRequest
|
||||||
Reply chan error
|
Reply chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,10 +8,11 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestServiceManager_RegisterService(t *testing.T) {
|
func TestServiceManager_RegisterService(t *testing.T) {
|
||||||
|
@ -47,7 +48,7 @@ func TestServiceManager_RegisterService(t *testing.T) {
|
||||||
Port: 8000,
|
Port: 8000,
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||||
}
|
}
|
||||||
require.NoError(a.AddService(svc, nil, false, "", ConfigSourceLocal))
|
require.NoError(a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal))
|
||||||
|
|
||||||
// Verify both the service and sidecar.
|
// Verify both the service and sidecar.
|
||||||
redisService := a.State.Service(structs.NewServiceID("redis", nil))
|
redisService := a.State.Service(structs.NewServiceID("redis", nil))
|
||||||
|
@ -118,7 +119,7 @@ func TestServiceManager_RegisterSidecar(t *testing.T) {
|
||||||
},
|
},
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||||
}
|
}
|
||||||
require.NoError(a.AddService(svc, nil, false, "", ConfigSourceLocal))
|
require.NoError(a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal))
|
||||||
|
|
||||||
// Verify sidecar got global config loaded
|
// Verify sidecar got global config loaded
|
||||||
sidecarService := a.State.Service(structs.NewServiceID("web-sidecar-proxy", nil))
|
sidecarService := a.State.Service(structs.NewServiceID("web-sidecar-proxy", nil))
|
||||||
|
@ -191,7 +192,7 @@ func TestServiceManager_RegisterMeshGateway(t *testing.T) {
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(a.AddService(svc, nil, false, "", ConfigSourceLocal))
|
require.NoError(a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal))
|
||||||
|
|
||||||
// Verify gateway got global config loaded
|
// Verify gateway got global config loaded
|
||||||
gateway := a.State.Service(structs.NewServiceID("mesh-gateway", nil))
|
gateway := a.State.Service(structs.NewServiceID("mesh-gateway", nil))
|
||||||
|
@ -251,7 +252,7 @@ func TestServiceManager_RegisterTerminatingGateway(t *testing.T) {
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(a.AddService(svc, nil, false, "", ConfigSourceLocal))
|
require.NoError(a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal))
|
||||||
|
|
||||||
// Verify gateway got global config loaded
|
// Verify gateway got global config loaded
|
||||||
gateway := a.State.Service(structs.NewServiceID("terminating-gateway", nil))
|
gateway := a.State.Service(structs.NewServiceID("terminating-gateway", nil))
|
||||||
|
@ -386,12 +387,12 @@ func TestServiceManager_PersistService_API(t *testing.T) {
|
||||||
configFile := filepath.Join(a.Config.DataDir, serviceConfigDir, svcID.StringHash())
|
configFile := filepath.Join(a.Config.DataDir, serviceConfigDir, svcID.StringHash())
|
||||||
|
|
||||||
// Service is not persisted unless requested, but we always persist service configs.
|
// Service is not persisted unless requested, but we always persist service configs.
|
||||||
require.NoError(a.AddService(svc, nil, false, "", ConfigSourceRemote))
|
require.NoError(a.addServiceFromSource(svc, nil, false, "", ConfigSourceRemote))
|
||||||
requireFileIsAbsent(t, svcFile)
|
requireFileIsAbsent(t, svcFile)
|
||||||
requireFileIsPresent(t, configFile)
|
requireFileIsPresent(t, configFile)
|
||||||
|
|
||||||
// Persists to file if requested
|
// Persists to file if requested
|
||||||
require.NoError(a.AddService(svc, nil, true, "mytoken", ConfigSourceRemote))
|
require.NoError(a.addServiceFromSource(svc, nil, true, "mytoken", ConfigSourceRemote))
|
||||||
requireFileIsPresent(t, svcFile)
|
requireFileIsPresent(t, svcFile)
|
||||||
requireFileIsPresent(t, configFile)
|
requireFileIsPresent(t, configFile)
|
||||||
|
|
||||||
|
@ -432,7 +433,7 @@ func TestServiceManager_PersistService_API(t *testing.T) {
|
||||||
|
|
||||||
// Updates service definition on disk
|
// Updates service definition on disk
|
||||||
svc.Proxy.LocalServicePort = 8001
|
svc.Proxy.LocalServicePort = 8001
|
||||||
require.NoError(a.AddService(svc, nil, true, "mytoken", ConfigSourceRemote))
|
require.NoError(a.addServiceFromSource(svc, nil, true, "mytoken", ConfigSourceRemote))
|
||||||
requireFileIsPresent(t, svcFile)
|
requireFileIsPresent(t, svcFile)
|
||||||
requireFileIsPresent(t, configFile)
|
requireFileIsPresent(t, configFile)
|
||||||
|
|
||||||
|
@ -720,7 +721,7 @@ func TestServiceManager_Disabled(t *testing.T) {
|
||||||
},
|
},
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||||
}
|
}
|
||||||
require.NoError(a.AddService(svc, nil, false, "", ConfigSourceLocal))
|
require.NoError(a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal))
|
||||||
|
|
||||||
// Verify sidecar got global config loaded
|
// Verify sidecar got global config loaded
|
||||||
sidecarService := a.State.Service(structs.NewServiceID("web-sidecar-proxy", nil))
|
sidecarService := a.State.Service(structs.NewServiceID("web-sidecar-proxy", nil))
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (a *Agent) sidecarServiceID(serviceID string) string {
|
func sidecarServiceID(serviceID string) string {
|
||||||
return serviceID + "-sidecar-proxy"
|
return serviceID + "-sidecar-proxy"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,6 +29,7 @@ func (a *Agent) sidecarServiceID(serviceID string) string {
|
||||||
// The third return argument is the effective Token to use for the sidecar
|
// The third return argument is the effective Token to use for the sidecar
|
||||||
// registration. This will be the same as the token parameter passed unless the
|
// registration. This will be the same as the token parameter passed unless the
|
||||||
// SidecarService definition contains a distinct one.
|
// SidecarService definition contains a distinct one.
|
||||||
|
// TODO: return AddServiceRequest
|
||||||
func (a *Agent) sidecarServiceFromNodeService(ns *structs.NodeService, token string) (*structs.NodeService, []*structs.CheckType, string, error) {
|
func (a *Agent) sidecarServiceFromNodeService(ns *structs.NodeService, token string) (*structs.NodeService, []*structs.CheckType, string, error) {
|
||||||
if ns.Connect.SidecarService == nil {
|
if ns.Connect.SidecarService == nil {
|
||||||
return nil, nil, "", nil
|
return nil, nil, "", nil
|
||||||
|
@ -39,7 +40,7 @@ func (a *Agent) sidecarServiceFromNodeService(ns *structs.NodeService, token str
|
||||||
|
|
||||||
// Override the ID which must always be consistent for a given outer service
|
// Override the ID which must always be consistent for a given outer service
|
||||||
// ID. We rely on this for lifecycle management of the nested definition.
|
// ID. We rely on this for lifecycle management of the nested definition.
|
||||||
sidecar.ID = a.sidecarServiceID(ns.ID)
|
sidecar.ID = sidecarServiceID(ns.ID)
|
||||||
|
|
||||||
// for now at least these must be identical
|
// for now at least these must be identical
|
||||||
sidecar.EnterpriseMeta = ns.EnterpriseMeta
|
sidecar.EnterpriseMeta = ns.EnterpriseMeta
|
||||||
|
|
|
@ -5,8 +5,9 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestAgent_sidecarServiceFromNodeService(t *testing.T) {
|
func TestAgent_sidecarServiceFromNodeService(t *testing.T) {
|
||||||
|
@ -333,7 +334,7 @@ func TestAgent_sidecarServiceFromNodeService(t *testing.T) {
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
|
|
||||||
if tt.preRegister != nil {
|
if tt.preRegister != nil {
|
||||||
err := a.AddService(tt.preRegister.NodeService(), nil, false, "", ConfigSourceLocal)
|
err := a.addServiceFromSource(tt.preRegister.NodeService(), nil, false, "", ConfigSourceLocal)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,9 +4,10 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/mitchellh/cli"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent"
|
"github.com/hashicorp/consul/agent"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/mitchellh/cli"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMaintCommand_noTabs(t *testing.T) {
|
func TestMaintCommand_noTabs(t *testing.T) {
|
||||||
|
@ -49,11 +50,14 @@ func TestMaintCommand_NoArgs(t *testing.T) {
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
|
|
||||||
// Register the service and put it into maintenance mode
|
// Register the service and put it into maintenance mode
|
||||||
service := &structs.NodeService{
|
addReq := agent.AddServiceRequest{
|
||||||
ID: "test",
|
Service: &structs.NodeService{
|
||||||
Service: "test",
|
ID: "test",
|
||||||
|
Service: "test",
|
||||||
|
},
|
||||||
|
Source: agent.ConfigSourceLocal,
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", agent.ConfigSourceLocal); err != nil {
|
if err := a.AddService(addReq); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
if err := a.EnableServiceMaintenance(structs.NewServiceID("test", nil), "broken 1", ""); err != nil {
|
if err := a.EnableServiceMaintenance(structs.NewServiceID("test", nil), "broken 1", ""); err != nil {
|
||||||
|
@ -157,11 +161,14 @@ func TestMaintCommand_EnableServiceMaintenance(t *testing.T) {
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
|
|
||||||
// Register the service
|
// Register the service
|
||||||
service := &structs.NodeService{
|
addReq := agent.AddServiceRequest{
|
||||||
ID: "test",
|
Service: &structs.NodeService{
|
||||||
Service: "test",
|
ID: "test",
|
||||||
|
Service: "test",
|
||||||
|
},
|
||||||
|
Source: agent.ConfigSourceLocal,
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", agent.ConfigSourceLocal); err != nil {
|
if err := a.AddService(addReq); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,11 +202,14 @@ func TestMaintCommand_DisableServiceMaintenance(t *testing.T) {
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
|
|
||||||
// Register the service
|
// Register the service
|
||||||
service := &structs.NodeService{
|
addReq := agent.AddServiceRequest{
|
||||||
ID: "test",
|
Service: &structs.NodeService{
|
||||||
Service: "test",
|
ID: "test",
|
||||||
|
Service: "test",
|
||||||
|
},
|
||||||
|
Source: agent.ConfigSourceLocal,
|
||||||
}
|
}
|
||||||
if err := a.AddService(service, nil, false, "", agent.ConfigSourceLocal); err != nil {
|
if err := a.AddService(addReq); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue