Add option to register services and their checks idempotently (#4905)

This commit is contained in:
Aestek 2019-09-02 17:38:29 +02:00 committed by Freddy
parent 2254633d93
commit 19c4459d19
5 changed files with 187 additions and 13 deletions

View file

@ -1984,18 +1984,27 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error {
return nil return nil
} }
// AddServiceAndReplaceChecks is used to add a service entry and its check. Any check for this service missing from chkTypes will be deleted.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (a *Agent) AddServiceAndReplaceChecks(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.addServiceLocked(service, chkTypes, persist, token, true, source)
}
// AddService is used to add a service entry. // AddService is used to add a service entry.
// This entry is persistent and the agent will make a best effort to // This entry is persistent and the agent will make a best effort to
// ensure it is registered // ensure it is registered
func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
a.stateLock.Lock() a.stateLock.Lock()
defer a.stateLock.Unlock() defer a.stateLock.Unlock()
return a.addServiceLocked(service, chkTypes, persist, token, source) return a.addServiceLocked(service, chkTypes, persist, token, false, source)
} }
// addServiceLocked adds a service entry to the service manager if enabled, or directly // addServiceLocked adds a service entry to the service manager if enabled, or directly
// to the local state if it is not. This function assumes the state lock is already held. // to the local state if it is not. This function assumes the state lock is already held.
func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, replaceExistingChecks bool, source configSource) error {
if err := a.validateService(service, chkTypes); err != nil { if err := a.validateService(service, chkTypes); err != nil {
return err return err
} }
@ -2004,11 +2013,11 @@ func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*struc
return a.serviceManager.AddService(service, chkTypes, persist, token, source) return a.serviceManager.AddService(service, chkTypes, persist, token, source)
} }
return a.addServiceInternal(service, chkTypes, persist, token, source) return a.addServiceInternal(service, chkTypes, persist, token, replaceExistingChecks, source)
} }
// addServiceInternal adds the given service and checks to the local state. // addServiceInternal adds the given service and checks to the local state.
func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, replaceExistingChecks bool, source configSource) error {
// Pause the service syncs during modification // Pause the service syncs during modification
a.PauseSync() a.PauseSync()
defer a.ResumeSync() defer a.ResumeSync()
@ -2020,8 +2029,17 @@ func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*str
var checks []*structs.HealthCheck var checks []*structs.HealthCheck
existingChecks := map[types.CheckID]bool{}
for _, check := range a.State.Checks() {
if check.ServiceID == service.ID {
existingChecks[check.CheckID] = false
}
}
// Create an associated health check // Create an associated health check
for i, chkType := range chkTypes { for i, chkType := range chkTypes {
existingChecks[chkType.CheckID] = true
checkID := string(chkType.CheckID) checkID := string(chkType.CheckID)
if checkID == "" { if checkID == "" {
checkID = fmt.Sprintf("service:%s", service.ID) checkID = fmt.Sprintf("service:%s", service.ID)
@ -2102,6 +2120,14 @@ func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*str
} }
} }
if replaceExistingChecks {
for checkID, keep := range existingChecks {
if !keep {
a.removeCheckLocked(checkID, persist)
}
}
}
return nil return nil
} }
@ -2845,13 +2871,13 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
// syntax sugar and shouldn't be persisted in local or server state. // syntax sugar and shouldn't be persisted in local or server state.
ns.Connect.SidecarService = nil ns.Connect.SidecarService = nil
if err := a.addServiceLocked(ns, chkTypes, false, service.Token, ConfigSourceLocal); err != nil { if err := a.addServiceLocked(ns, chkTypes, false, service.Token, false, ConfigSourceLocal); err != nil {
return fmt.Errorf("Failed to register service %q: %v", service.Name, err) return fmt.Errorf("Failed to register service %q: %v", service.Name, err)
} }
// If there is a sidecar service, register that too. // If there is a sidecar service, register that too.
if sidecar != nil { if sidecar != nil {
if err := a.addServiceLocked(sidecar, sidecarChecks, false, sidecarToken, ConfigSourceLocal); err != nil { if err := a.addServiceLocked(sidecar, sidecarChecks, false, sidecarToken, false, ConfigSourceLocal); err != nil {
return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err) return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err)
} }
} }
@ -2914,7 +2940,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
} else { } else {
a.logger.Printf("[DEBUG] agent: restored service definition %q from %q", a.logger.Printf("[DEBUG] agent: restored service definition %q from %q",
serviceID, file) serviceID, file)
if err := a.addServiceLocked(p.Service, nil, false, p.Token, ConfigSourceLocal); err != nil { if err := a.addServiceLocked(p.Service, nil, false, p.Token, false, ConfigSourceLocal); err != nil {
return fmt.Errorf("failed adding service %q: %s", serviceID, err) return fmt.Errorf("failed adding service %q: %s", serviceID, err)
} }
} }

View file

@ -915,8 +915,21 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
} }
// Add the service. // Add the service.
if err := s.agent.AddService(ns, chkTypes, true, token, ConfigSourceRemote); err != nil { replaceExistingChecks := false
return nil, err
query := req.URL.Query()
if len(query["replace-existing-checks"]) > 0 && (query.Get("replace-existing-checks") == "" || query.Get("replace-existing-checks") == "true") {
replaceExistingChecks = true
}
if replaceExistingChecks {
if err := s.agent.AddServiceAndReplaceChecks(ns, chkTypes, true, token, ConfigSourceRemote); err != nil {
return nil, err
}
} else {
if err := s.agent.AddService(ns, chkTypes, true, token, ConfigSourceRemote); err != nil {
return nil, err
}
} }
// Add sidecar. // Add sidecar.
if sidecar != nil { if sidecar != nil {

View file

@ -13,6 +13,7 @@ import (
"net/url" "net/url"
"os" "os"
"reflect" "reflect"
"sort"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -2414,6 +2415,136 @@ func TestAgent_RegisterService(t *testing.T) {
} }
} }
func TestAgent_RegisterService_ReRegister(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
args := &structs.ServiceDefinition{
Name: "test",
Meta: map[string]string{"hello": "world"},
Tags: []string{"master"},
Port: 8000,
Checks: []*structs.CheckType{
&structs.CheckType{
CheckID: types.CheckID("check_1"),
TTL: 20 * time.Second,
},
&structs.CheckType{
CheckID: types.CheckID("check_2"),
TTL: 30 * time.Second,
},
},
Weights: &structs.Weights{
Passing: 100,
Warning: 3,
},
}
req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(args))
_, err := a.srv.AgentRegisterService(nil, req)
require.NoError(t, err)
args = &structs.ServiceDefinition{
Name: "test",
Meta: map[string]string{"hello": "world"},
Tags: []string{"master"},
Port: 8000,
Checks: []*structs.CheckType{
&structs.CheckType{
CheckID: types.CheckID("check_1"),
TTL: 20 * time.Second,
},
&structs.CheckType{
CheckID: types.CheckID("check_3"),
TTL: 30 * time.Second,
},
},
Weights: &structs.Weights{
Passing: 100,
Warning: 3,
},
}
req, _ = http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(args))
_, err = a.srv.AgentRegisterService(nil, req)
require.NoError(t, err)
checks := a.State.Checks()
require.Equal(t, 3, len(checks))
checkIDs := []string{}
for id := range checks {
checkIDs = append(checkIDs, string(id))
}
sort.Strings(checkIDs)
require.Equal(t, []string{"check_1", "check_2", "check_3"}, checkIDs)
}
func TestAgent_RegisterService_ReRegister_ReplaceExistingChecks(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
args := &structs.ServiceDefinition{
Name: "test",
Meta: map[string]string{"hello": "world"},
Tags: []string{"master"},
Port: 8000,
Checks: []*structs.CheckType{
&structs.CheckType{
CheckID: types.CheckID("check_1"),
TTL: 20 * time.Second,
},
&structs.CheckType{
CheckID: types.CheckID("check_2"),
TTL: 30 * time.Second,
},
},
Weights: &structs.Weights{
Passing: 100,
Warning: 3,
},
}
req, _ := http.NewRequest("PUT", "/v1/agent/service/register?replace-existing-checks", jsonReader(args))
_, err := a.srv.AgentRegisterService(nil, req)
require.NoError(t, err)
args = &structs.ServiceDefinition{
Name: "test",
Meta: map[string]string{"hello": "world"},
Tags: []string{"master"},
Port: 8000,
Checks: []*structs.CheckType{
&structs.CheckType{
CheckID: types.CheckID("check_1"),
TTL: 20 * time.Second,
},
&structs.CheckType{
CheckID: types.CheckID("check_3"),
TTL: 30 * time.Second,
},
},
Weights: &structs.Weights{
Passing: 100,
Warning: 3,
},
}
req, _ = http.NewRequest("PUT", "/v1/agent/service/register?replace-existing-checks", jsonReader(args))
_, err = a.srv.AgentRegisterService(nil, req)
require.NoError(t, err)
checks := a.State.Checks()
require.Equal(t, 2, len(checks))
checkIDs := []string{}
for id := range checks {
checkIDs = append(checkIDs, string(id))
}
sort.Strings(checkIDs)
require.Equal(t, []string{"check_1", "check_3"}, checkIDs)
}
func TestAgent_RegisterService_TranslateKeys(t *testing.T) { func TestAgent_RegisterService_TranslateKeys(t *testing.T) {
t.Parallel() t.Parallel()
tests := []struct { tests := []struct {

View file

@ -39,7 +39,7 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st
// For now only sidecar proxies have anything that can be configured // For now only sidecar proxies have anything that can be configured
// centrally. So bypass the whole manager for regular services. // centrally. So bypass the whole manager for regular services.
if !service.IsSidecarProxy() && !service.IsMeshGateway() { if !service.IsSidecarProxy() && !service.IsMeshGateway() {
return s.agent.addServiceInternal(service, chkTypes, persist, token, source) return s.agent.addServiceInternal(service, chkTypes, persist, token, false, source)
} }
s.lock.Lock() s.lock.Lock()
@ -263,7 +263,7 @@ func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked, first
// updateAgentRegistration updates the service (and its sidecar, if applicable) in the // updateAgentRegistration updates the service (and its sidecar, if applicable) in the
// local state. // local state.
func (s *serviceConfigWatch) updateAgentRegistration(ns *structs.NodeService) error { func (s *serviceConfigWatch) updateAgentRegistration(ns *structs.NodeService) error {
return s.agent.addServiceInternal(ns, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) return s.agent.addServiceInternal(ns, s.registration.chkTypes, s.registration.persist, s.registration.token, false, s.registration.source)
} }
// ensureConfigWatch starts a cache.Notify goroutine to run a continuous // ensureConfigWatch starts a cache.Notify goroutine to run a continuous

View file

@ -461,7 +461,7 @@ Parameters and response format are the same as
## Register Service ## Register Service
This endpoint adds a new service, with an optional health check, to the local This endpoint adds a new service, with optional health checks, to the local
agent. agent.
The agent is responsible for managing the status of its local services, and for The agent is responsible for managing the status of its local services, and for
@ -485,6 +485,10 @@ The table below shows this endpoint's support for
| ---------------- | ----------------- | ------------- | --------------- | | ---------------- | ----------------- | ------------- | --------------- |
| `NO` | `none` | `none` | `service:write` | | `NO` | `none` | `none` | `service:write` |
### Query string parameters
- `replace-existing-checks` - Missing healthchecks from the request will be deleted from the agent. Using this parameter allows to idempotently register a service and its checks whithout having to manually deregister checks.
### Parameters ### Parameters
Note that this endpoint, unlike most also [supports `snake_case`](/docs/agent/services.html#service-definition-parameter-case) Note that this endpoint, unlike most also [supports `snake_case`](/docs/agent/services.html#service-definition-parameter-case)
@ -623,7 +627,7 @@ For the `Connect` field, the parameters are:
$ curl \ $ curl \
--request PUT \ --request PUT \
--data @payload.json \ --data @payload.json \
http://127.0.0.1:8500/v1/agent/service/register http://127.0.0.1:8500/v1/agent/service/register?replace-existing-checks=1
``` ```
## Deregister Service ## Deregister Service