Keeps the service and check tokens around for deregistration.
We fixed a few related issues while we were in here. We now only let services register checks with a matching token, and we also close out service and check delete operations if the catalog deregister claims it doesn't know about the ID of the service or check being deleted.
This commit is contained in:
parent
c553e1d93a
commit
f2211d2489
|
@ -1192,11 +1192,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
|
|||
}
|
||||
|
||||
// Remove service immediately
|
||||
err := a.state.RemoveService(serviceID)
|
||||
|
||||
// TODO: Return the error instead of just logging here in Consul 0.8
|
||||
// For now, keep the current idempotent behavior on deleting a nonexistent service
|
||||
if err != nil {
|
||||
if err := a.state.RemoveService(serviceID); err != nil {
|
||||
a.logger.Printf("[WARN] agent: Failed to deregister service %q: %s", serviceID, err)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -177,7 +177,8 @@ func (l *localState) RemoveService(serviceID string) error {
|
|||
|
||||
if _, ok := l.services[serviceID]; ok {
|
||||
delete(l.services, serviceID)
|
||||
delete(l.serviceTokens, serviceID)
|
||||
// Leave the service token around, if any, until we successfully
|
||||
// delete the service.
|
||||
l.serviceStatus[serviceID] = syncStatus{inSync: false}
|
||||
l.changeMade()
|
||||
} else {
|
||||
|
@ -241,7 +242,8 @@ func (l *localState) RemoveCheck(checkID types.CheckID) {
|
|||
defer l.Unlock()
|
||||
|
||||
delete(l.checks, checkID)
|
||||
delete(l.checkTokens, checkID)
|
||||
// Leave the check token around, if any, until we successfully delete
|
||||
// the check.
|
||||
delete(l.checkCriticalTime, checkID)
|
||||
l.checkStatus[checkID] = syncStatus{inSync: false}
|
||||
l.changeMade()
|
||||
|
@ -602,9 +604,15 @@ func (l *localState) deleteService(id string) error {
|
|||
}
|
||||
var out struct{}
|
||||
err := l.iface.RPC("Catalog.Deregister", &req, &out)
|
||||
if err == nil {
|
||||
if err == nil || strings.Contains(err.Error(), "Unknown service") {
|
||||
delete(l.serviceStatus, id)
|
||||
delete(l.serviceTokens, id)
|
||||
l.logger.Printf("[INFO] agent: Deregistered service '%s'", id)
|
||||
return nil
|
||||
} else if strings.Contains(err.Error(), permissionDenied) {
|
||||
l.serviceStatus[id] = syncStatus{inSync: true}
|
||||
l.logger.Printf("[WARN] agent: Service '%s' deregistration blocked by ACLs", id)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -623,9 +631,15 @@ func (l *localState) deleteCheck(id types.CheckID) error {
|
|||
}
|
||||
var out struct{}
|
||||
err := l.iface.RPC("Catalog.Deregister", &req, &out)
|
||||
if err == nil {
|
||||
if err == nil || strings.Contains(err.Error(), "Unknown check") {
|
||||
delete(l.checkStatus, id)
|
||||
delete(l.checkTokens, id)
|
||||
l.logger.Printf("[INFO] agent: Deregistered check '%s'", id)
|
||||
return nil
|
||||
} else if strings.Contains(err.Error(), permissionDenied) {
|
||||
l.checkStatus[id] = syncStatus{inSync: true}
|
||||
l.logger.Printf("[WARN] agent: Check '%s' deregistration blocked by ACLs", id)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -645,10 +659,13 @@ func (l *localState) syncService(id string) error {
|
|||
|
||||
// If the service has associated checks that are out of sync,
|
||||
// piggyback them on the service sync so they are part of the
|
||||
// same transaction and are registered atomically.
|
||||
// same transaction and are registered atomically. We only let
|
||||
// checks ride on service registrations with the same token,
|
||||
// otherwise we need to register them separately so they don't
|
||||
// pick up privileges from the service token.
|
||||
var checks structs.HealthChecks
|
||||
for _, check := range l.checks {
|
||||
if check.ServiceID == id {
|
||||
if check.ServiceID == id && (l.serviceToken(id) == l.checkToken(check.CheckID)) {
|
||||
if stat, ok := l.checkStatus[check.CheckID]; !ok || !stat.inSync {
|
||||
checks = append(checks, check)
|
||||
}
|
||||
|
@ -711,7 +728,7 @@ func (l *localState) syncCheck(id types.CheckID) error {
|
|||
if err == nil {
|
||||
l.checkStatus[id] = syncStatus{inSync: true}
|
||||
// Given how the register API works, this info is also updated
|
||||
// every time we sync a service.
|
||||
// every time we sync a check.
|
||||
l.nodeInfoInSync = true
|
||||
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
|
||||
} else if strings.Contains(err.Error(), permissionDenied) {
|
||||
|
|
|
@ -476,9 +476,17 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
|
|||
}
|
||||
|
||||
var testRegisterRules = `
|
||||
node "" {
|
||||
policy = "write"
|
||||
}
|
||||
|
||||
service "api" {
|
||||
policy = "write"
|
||||
}
|
||||
|
||||
service "consul" {
|
||||
policy = "write"
|
||||
}
|
||||
`
|
||||
|
||||
func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
||||
|
@ -486,6 +494,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
|||
conf.ACLDatacenter = "dc1"
|
||||
conf.ACLMasterToken = "root"
|
||||
conf.ACLDefaultPolicy = "deny"
|
||||
conf.ACLEnforceVersion8 = Bool(true)
|
||||
dir, agent := makeAgent(t, conf)
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
@ -501,81 +510,142 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
|||
Type: structs.ACLTypeClient,
|
||||
Rules: testRegisterRules,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Token: "root",
|
||||
},
|
||||
}
|
||||
var out string
|
||||
if err := agent.RPC("ACL.Apply", &arg, &out); err != nil {
|
||||
var token string
|
||||
if err := agent.RPC("ACL.Apply", &arg, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Update the agent ACL token, resume sync
|
||||
conf.ACLToken = out
|
||||
conf.ACLAgentToken = token
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Create service (Allowed)
|
||||
// Create service (disallowed)
|
||||
srv1 := &structs.NodeService{
|
||||
ID: "mysql",
|
||||
Service: "mysql",
|
||||
Tags: []string{"master"},
|
||||
Port: 5000,
|
||||
}
|
||||
agent.state.AddService(srv1, "")
|
||||
agent.state.AddService(srv1, token)
|
||||
|
||||
// Create service (Disallowed)
|
||||
// Create service (allowed)
|
||||
srv2 := &structs.NodeService{
|
||||
ID: "api",
|
||||
Service: "api",
|
||||
Tags: []string{"foo"},
|
||||
Port: 5001,
|
||||
}
|
||||
agent.state.AddService(srv2, "")
|
||||
agent.state.AddService(srv2, token)
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: agent.config.NodeName,
|
||||
QueryOptions: structs.QueryOptions{Token: out},
|
||||
}
|
||||
var services structs.IndexedNodeServices
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
{
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: agent.config.NodeName,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: "root",
|
||||
},
|
||||
}
|
||||
var services structs.IndexedNodeServices
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// We should have 2 services (consul included)
|
||||
if len(services.NodeServices.Services) != 2 {
|
||||
t.Fatalf("bad: %v", services.NodeServices.Services)
|
||||
}
|
||||
// We should have 2 services (consul included)
|
||||
if len(services.NodeServices.Services) != 2 {
|
||||
t.Fatalf("bad: %v", services.NodeServices.Services)
|
||||
}
|
||||
|
||||
// All the services should match
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||
switch id {
|
||||
case "mysql":
|
||||
t.Fatalf("should not be permitted")
|
||||
case "api":
|
||||
if !reflect.DeepEqual(serv, srv2) {
|
||||
t.Fatalf("bad: %#v %#v", serv, srv2)
|
||||
// All the services should match
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||
switch id {
|
||||
case "mysql":
|
||||
t.Fatalf("should not be permitted")
|
||||
case "api":
|
||||
if !reflect.DeepEqual(serv, srv2) {
|
||||
t.Fatalf("bad: %#v %#v", serv, srv2)
|
||||
}
|
||||
case "consul":
|
||||
// ignore
|
||||
default:
|
||||
t.Fatalf("unexpected service: %v", id)
|
||||
}
|
||||
}
|
||||
|
||||
// Check the local state
|
||||
if len(agent.state.services) != 3 {
|
||||
t.Fatalf("bad: %v", agent.state.services)
|
||||
}
|
||||
if len(agent.state.serviceStatus) != 3 {
|
||||
t.Fatalf("bad: %v", agent.state.serviceStatus)
|
||||
}
|
||||
for name, status := range agent.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
case "consul":
|
||||
// ignore
|
||||
default:
|
||||
t.Fatalf("unexpected service: %v", id)
|
||||
}
|
||||
}
|
||||
|
||||
// Check the local state
|
||||
if len(agent.state.services) != 3 {
|
||||
t.Fatalf("bad: %v", agent.state.services)
|
||||
}
|
||||
if len(agent.state.serviceStatus) != 3 {
|
||||
t.Fatalf("bad: %v", agent.state.serviceStatus)
|
||||
}
|
||||
for name, status := range agent.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
// Now remove the service and re-sync
|
||||
agent.state.RemoveService("api")
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
{
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: agent.config.NodeName,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: "root",
|
||||
},
|
||||
}
|
||||
var services structs.IndexedNodeServices
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// We should have 1 service (just consul)
|
||||
if len(services.NodeServices.Services) != 1 {
|
||||
t.Fatalf("bad: %v", services.NodeServices.Services)
|
||||
}
|
||||
|
||||
// All the services should match
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||
switch id {
|
||||
case "mysql":
|
||||
t.Fatalf("should not be permitted")
|
||||
case "api":
|
||||
t.Fatalf("should be deleted")
|
||||
case "consul":
|
||||
// ignore
|
||||
default:
|
||||
t.Fatalf("unexpected service: %v", id)
|
||||
}
|
||||
}
|
||||
|
||||
// Check the local state
|
||||
if len(agent.state.services) != 2 {
|
||||
t.Fatalf("bad: %v", agent.state.services)
|
||||
}
|
||||
if len(agent.state.serviceStatus) != 2 {
|
||||
t.Fatalf("bad: %v", agent.state.serviceStatus)
|
||||
}
|
||||
for name, status := range agent.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -800,6 +870,249 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
||||
conf := nextConfig()
|
||||
conf.ACLDatacenter = "dc1"
|
||||
conf.ACLMasterToken = "root"
|
||||
conf.ACLDefaultPolicy = "deny"
|
||||
conf.ACLEnforceVersion8 = Bool(true)
|
||||
dir, agent := makeAgent(t, conf)
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
||||
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||
|
||||
// Create the ACL
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: testRegisterRules,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Token: "root",
|
||||
},
|
||||
}
|
||||
var token string
|
||||
if err := agent.RPC("ACL.Apply", &arg, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Update the agent ACL token, resume sync
|
||||
conf.ACLAgentToken = token
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Create services using the root token
|
||||
srv1 := &structs.NodeService{
|
||||
ID: "mysql",
|
||||
Service: "mysql",
|
||||
Tags: []string{"master"},
|
||||
Port: 5000,
|
||||
}
|
||||
agent.state.AddService(srv1, "root")
|
||||
srv2 := &structs.NodeService{
|
||||
ID: "api",
|
||||
Service: "api",
|
||||
Tags: []string{"foo"},
|
||||
Port: 5001,
|
||||
}
|
||||
agent.state.AddService(srv2, "root")
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
{
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: agent.config.NodeName,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: "root",
|
||||
},
|
||||
}
|
||||
var services structs.IndexedNodeServices
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// We should have 3 services (consul included)
|
||||
if len(services.NodeServices.Services) != 3 {
|
||||
t.Fatalf("bad: %v", services.NodeServices.Services)
|
||||
}
|
||||
|
||||
// All the services should match
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||
switch id {
|
||||
case "mysql":
|
||||
if !reflect.DeepEqual(serv, srv1) {
|
||||
t.Fatalf("bad: %#v %#v", serv, srv1)
|
||||
}
|
||||
case "api":
|
||||
if !reflect.DeepEqual(serv, srv2) {
|
||||
t.Fatalf("bad: %#v %#v", serv, srv2)
|
||||
}
|
||||
case "consul":
|
||||
// ignore
|
||||
default:
|
||||
t.Fatalf("unexpected service: %v", id)
|
||||
}
|
||||
}
|
||||
|
||||
// Check the local state
|
||||
if len(agent.state.services) != 3 {
|
||||
t.Fatalf("bad: %v", agent.state.services)
|
||||
}
|
||||
if len(agent.state.serviceStatus) != 3 {
|
||||
t.Fatalf("bad: %v", agent.state.serviceStatus)
|
||||
}
|
||||
for name, status := range agent.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This check won't be allowed.
|
||||
chk1 := &structs.HealthCheck{
|
||||
Node: agent.config.NodeName,
|
||||
ServiceID: "mysql",
|
||||
ServiceName: "mysql",
|
||||
CheckID: "mysql-check",
|
||||
Name: "mysql",
|
||||
Status: structs.HealthPassing,
|
||||
}
|
||||
agent.state.AddCheck(chk1, token)
|
||||
|
||||
// This one will be allowed.
|
||||
chk2 := &structs.HealthCheck{
|
||||
Node: agent.config.NodeName,
|
||||
ServiceID: "api",
|
||||
ServiceName: "api",
|
||||
CheckID: "api-check",
|
||||
Name: "api",
|
||||
Status: structs.HealthPassing,
|
||||
}
|
||||
agent.state.AddCheck(chk2, token)
|
||||
|
||||
// Trigger anti-entropy run and wait.
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
if err := testutil.WaitForResult(func() (bool, error) {
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: agent.config.NodeName,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: "root",
|
||||
},
|
||||
}
|
||||
var checks structs.IndexedHealthChecks
|
||||
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
return false, fmt.Errorf("err: %v", err)
|
||||
}
|
||||
|
||||
// We should have 2 checks (serf included)
|
||||
if len(checks.HealthChecks) != 2 {
|
||||
return false, fmt.Errorf("bad: %v", checks)
|
||||
}
|
||||
|
||||
// All the checks should match
|
||||
for _, chk := range checks.HealthChecks {
|
||||
chk.CreateIndex, chk.ModifyIndex = 0, 0
|
||||
switch chk.CheckID {
|
||||
case "mysql-check":
|
||||
t.Fatalf("should not be permitted")
|
||||
case "api-check":
|
||||
if !reflect.DeepEqual(chk, chk2) {
|
||||
return false, fmt.Errorf("bad: %v %v", chk, chk2)
|
||||
}
|
||||
case "serfHealth":
|
||||
// ignore
|
||||
default:
|
||||
return false, fmt.Errorf("unexpected check: %v", chk)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Check the local state.
|
||||
if len(agent.state.checks) != 2 {
|
||||
t.Fatalf("bad: %v", agent.state.checks)
|
||||
}
|
||||
if len(agent.state.checkStatus) != 2 {
|
||||
t.Fatalf("bad: %v", agent.state.checkStatus)
|
||||
}
|
||||
for name, status := range agent.state.checkStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
|
||||
// Now delete the check and wait for sync.
|
||||
agent.state.RemoveCheck("api-check")
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
if err := testutil.WaitForResult(func() (bool, error) {
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: agent.config.NodeName,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: "root",
|
||||
},
|
||||
}
|
||||
var checks structs.IndexedHealthChecks
|
||||
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
return false, fmt.Errorf("err: %v", err)
|
||||
}
|
||||
|
||||
// We should have 1 check (just serf)
|
||||
if len(checks.HealthChecks) != 1 {
|
||||
return false, fmt.Errorf("bad: %v", checks)
|
||||
}
|
||||
|
||||
// All the checks should match
|
||||
for _, chk := range checks.HealthChecks {
|
||||
chk.CreateIndex, chk.ModifyIndex = 0, 0
|
||||
switch chk.CheckID {
|
||||
case "mysql-check":
|
||||
t.Fatalf("should not be permitted")
|
||||
case "api-check":
|
||||
t.Fatalf("should be deleted")
|
||||
case "serfHealth":
|
||||
// ignore
|
||||
default:
|
||||
return false, fmt.Errorf("unexpected check: %v", chk)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Check the local state.
|
||||
if len(agent.state.checks) != 1 {
|
||||
t.Fatalf("bad: %v", agent.state.checks)
|
||||
}
|
||||
if len(agent.state.checkStatus) != 1 {
|
||||
t.Fatalf("bad: %v", agent.state.checkStatus)
|
||||
}
|
||||
for name, status := range agent.state.checkStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
||||
conf := nextConfig()
|
||||
conf.CheckUpdateInterval = 500 * time.Millisecond
|
||||
|
|
Loading…
Reference in a new issue