From f2211d2489771fb17616a0b5746145b9bf47dfe7 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Fri, 24 Mar 2017 17:15:20 -0700 Subject: [PATCH] Keeps the service and check tokens around for deregistration. We fixed a few related issues while we were in here. We now only let services register checks with a matching token, and we also close out service and check delete operations if the catalog deregister claims it doesn't know about the ID of the service or check being deleted. --- command/agent/agent.go | 6 +- command/agent/local.go | 31 ++- command/agent/local_test.go | 401 ++++++++++++++++++++++++++++++++---- 3 files changed, 382 insertions(+), 56 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index dddf054f8..0594201dc 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1192,11 +1192,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { } // Remove service immediately - err := a.state.RemoveService(serviceID) - - // TODO: Return the error instead of just logging here in Consul 0.8 - // For now, keep the current idempotent behavior on deleting a nonexistent service - if err != nil { + if err := a.state.RemoveService(serviceID); err != nil { a.logger.Printf("[WARN] agent: Failed to deregister service %q: %s", serviceID, err) return nil } diff --git a/command/agent/local.go b/command/agent/local.go index 5ce92b48f..7f4b9ff0b 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -177,7 +177,8 @@ func (l *localState) RemoveService(serviceID string) error { if _, ok := l.services[serviceID]; ok { delete(l.services, serviceID) - delete(l.serviceTokens, serviceID) + // Leave the service token around, if any, until we successfully + // delete the service. l.serviceStatus[serviceID] = syncStatus{inSync: false} l.changeMade() } else { @@ -241,7 +242,8 @@ func (l *localState) RemoveCheck(checkID types.CheckID) { defer l.Unlock() delete(l.checks, checkID) - delete(l.checkTokens, checkID) + // Leave the check token around, if any, until we successfully delete + // the check. delete(l.checkCriticalTime, checkID) l.checkStatus[checkID] = syncStatus{inSync: false} l.changeMade() @@ -602,9 +604,15 @@ func (l *localState) deleteService(id string) error { } var out struct{} err := l.iface.RPC("Catalog.Deregister", &req, &out) - if err == nil { + if err == nil || strings.Contains(err.Error(), "Unknown service") { delete(l.serviceStatus, id) + delete(l.serviceTokens, id) l.logger.Printf("[INFO] agent: Deregistered service '%s'", id) + return nil + } else if strings.Contains(err.Error(), permissionDenied) { + l.serviceStatus[id] = syncStatus{inSync: true} + l.logger.Printf("[WARN] agent: Service '%s' deregistration blocked by ACLs", id) + return nil } return err } @@ -623,9 +631,15 @@ func (l *localState) deleteCheck(id types.CheckID) error { } var out struct{} err := l.iface.RPC("Catalog.Deregister", &req, &out) - if err == nil { + if err == nil || strings.Contains(err.Error(), "Unknown check") { delete(l.checkStatus, id) + delete(l.checkTokens, id) l.logger.Printf("[INFO] agent: Deregistered check '%s'", id) + return nil + } else if strings.Contains(err.Error(), permissionDenied) { + l.checkStatus[id] = syncStatus{inSync: true} + l.logger.Printf("[WARN] agent: Check '%s' deregistration blocked by ACLs", id) + return nil } return err } @@ -645,10 +659,13 @@ func (l *localState) syncService(id string) error { // If the service has associated checks that are out of sync, // piggyback them on the service sync so they are part of the - // same transaction and are registered atomically. + // same transaction and are registered atomically. We only let + // checks ride on service registrations with the same token, + // otherwise we need to register them separately so they don't + // pick up privileges from the service token. var checks structs.HealthChecks for _, check := range l.checks { - if check.ServiceID == id { + if check.ServiceID == id && (l.serviceToken(id) == l.checkToken(check.CheckID)) { if stat, ok := l.checkStatus[check.CheckID]; !ok || !stat.inSync { checks = append(checks, check) } @@ -711,7 +728,7 @@ func (l *localState) syncCheck(id types.CheckID) error { if err == nil { l.checkStatus[id] = syncStatus{inSync: true} // Given how the register API works, this info is also updated - // every time we sync a service. + // every time we sync a check. l.nodeInfoInSync = true l.logger.Printf("[INFO] agent: Synced check '%s'", id) } else if strings.Contains(err.Error(), permissionDenied) { diff --git a/command/agent/local_test.go b/command/agent/local_test.go index e1e8d5796..56d001491 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -476,9 +476,17 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { } var testRegisterRules = ` +node "" { + policy = "write" +} + service "api" { policy = "write" } + +service "consul" { + policy = "write" +} ` func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { @@ -486,6 +494,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { conf.ACLDatacenter = "dc1" conf.ACLMasterToken = "root" conf.ACLDefaultPolicy = "deny" + conf.ACLEnforceVersion8 = Bool(true) dir, agent := makeAgent(t, conf) defer os.RemoveAll(dir) defer agent.Shutdown() @@ -501,81 +510,142 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { Type: structs.ACLTypeClient, Rules: testRegisterRules, }, - WriteRequest: structs.WriteRequest{Token: "root"}, + WriteRequest: structs.WriteRequest{ + Token: "root", + }, } - var out string - if err := agent.RPC("ACL.Apply", &arg, &out); err != nil { + var token string + if err := agent.RPC("ACL.Apply", &arg, &token); err != nil { t.Fatalf("err: %v", err) } // Update the agent ACL token, resume sync - conf.ACLToken = out + conf.ACLAgentToken = token + agent.StartSync() + time.Sleep(200 * time.Millisecond) - // Create service (Allowed) + // Create service (disallowed) srv1 := &structs.NodeService{ ID: "mysql", Service: "mysql", Tags: []string{"master"}, Port: 5000, } - agent.state.AddService(srv1, "") + agent.state.AddService(srv1, token) - // Create service (Disallowed) + // Create service (allowed) srv2 := &structs.NodeService{ ID: "api", Service: "api", Tags: []string{"foo"}, Port: 5001, } - agent.state.AddService(srv2, "") + agent.state.AddService(srv2, token) // Trigger anti-entropy run and wait agent.StartSync() time.Sleep(200 * time.Millisecond) // Verify that we are in sync - req := structs.NodeSpecificRequest{ - Datacenter: "dc1", - Node: agent.config.NodeName, - QueryOptions: structs.QueryOptions{Token: out}, - } - var services structs.IndexedNodeServices - if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { - t.Fatalf("err: %v", err) - } + { + req := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: agent.config.NodeName, + QueryOptions: structs.QueryOptions{ + Token: "root", + }, + } + var services structs.IndexedNodeServices + if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { + t.Fatalf("err: %v", err) + } - // We should have 2 services (consul included) - if len(services.NodeServices.Services) != 2 { - t.Fatalf("bad: %v", services.NodeServices.Services) - } + // We should have 2 services (consul included) + if len(services.NodeServices.Services) != 2 { + t.Fatalf("bad: %v", services.NodeServices.Services) + } - // All the services should match - for id, serv := range services.NodeServices.Services { - serv.CreateIndex, serv.ModifyIndex = 0, 0 - switch id { - case "mysql": - t.Fatalf("should not be permitted") - case "api": - if !reflect.DeepEqual(serv, srv2) { - t.Fatalf("bad: %#v %#v", serv, srv2) + // All the services should match + for id, serv := range services.NodeServices.Services { + serv.CreateIndex, serv.ModifyIndex = 0, 0 + switch id { + case "mysql": + t.Fatalf("should not be permitted") + case "api": + if !reflect.DeepEqual(serv, srv2) { + t.Fatalf("bad: %#v %#v", serv, srv2) + } + case "consul": + // ignore + default: + t.Fatalf("unexpected service: %v", id) + } + } + + // Check the local state + if len(agent.state.services) != 3 { + t.Fatalf("bad: %v", agent.state.services) + } + if len(agent.state.serviceStatus) != 3 { + t.Fatalf("bad: %v", agent.state.serviceStatus) + } + for name, status := range agent.state.serviceStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) } - case "consul": - // ignore - default: - t.Fatalf("unexpected service: %v", id) } } - // Check the local state - if len(agent.state.services) != 3 { - t.Fatalf("bad: %v", agent.state.services) - } - if len(agent.state.serviceStatus) != 3 { - t.Fatalf("bad: %v", agent.state.serviceStatus) - } - for name, status := range agent.state.serviceStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) + // Now remove the service and re-sync + agent.state.RemoveService("api") + agent.StartSync() + time.Sleep(200 * time.Millisecond) + + // Verify that we are in sync + { + req := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: agent.config.NodeName, + QueryOptions: structs.QueryOptions{ + Token: "root", + }, + } + var services structs.IndexedNodeServices + if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { + t.Fatalf("err: %v", err) + } + + // We should have 1 service (just consul) + if len(services.NodeServices.Services) != 1 { + t.Fatalf("bad: %v", services.NodeServices.Services) + } + + // All the services should match + for id, serv := range services.NodeServices.Services { + serv.CreateIndex, serv.ModifyIndex = 0, 0 + switch id { + case "mysql": + t.Fatalf("should not be permitted") + case "api": + t.Fatalf("should be deleted") + case "consul": + // ignore + default: + t.Fatalf("unexpected service: %v", id) + } + } + + // Check the local state + if len(agent.state.services) != 2 { + t.Fatalf("bad: %v", agent.state.services) + } + if len(agent.state.serviceStatus) != 2 { + t.Fatalf("bad: %v", agent.state.serviceStatus) + } + for name, status := range agent.state.serviceStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } } } } @@ -800,6 +870,249 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { } } +func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { + conf := nextConfig() + conf.ACLDatacenter = "dc1" + conf.ACLMasterToken = "root" + conf.ACLDefaultPolicy = "deny" + conf.ACLEnforceVersion8 = Bool(true) + dir, agent := makeAgent(t, conf) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + testutil.WaitForLeader(t, agent.RPC, "dc1") + + // Create the ACL + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: testRegisterRules, + }, + WriteRequest: structs.WriteRequest{ + Token: "root", + }, + } + var token string + if err := agent.RPC("ACL.Apply", &arg, &token); err != nil { + t.Fatalf("err: %v", err) + } + + // Update the agent ACL token, resume sync + conf.ACLAgentToken = token + agent.StartSync() + time.Sleep(200 * time.Millisecond) + + // Create services using the root token + srv1 := &structs.NodeService{ + ID: "mysql", + Service: "mysql", + Tags: []string{"master"}, + Port: 5000, + } + agent.state.AddService(srv1, "root") + srv2 := &structs.NodeService{ + ID: "api", + Service: "api", + Tags: []string{"foo"}, + Port: 5001, + } + agent.state.AddService(srv2, "root") + + // Trigger anti-entropy run and wait + agent.StartSync() + time.Sleep(200 * time.Millisecond) + + // Verify that we are in sync + { + req := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: agent.config.NodeName, + QueryOptions: structs.QueryOptions{ + Token: "root", + }, + } + var services structs.IndexedNodeServices + if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { + t.Fatalf("err: %v", err) + } + + // We should have 3 services (consul included) + if len(services.NodeServices.Services) != 3 { + t.Fatalf("bad: %v", services.NodeServices.Services) + } + + // All the services should match + for id, serv := range services.NodeServices.Services { + serv.CreateIndex, serv.ModifyIndex = 0, 0 + switch id { + case "mysql": + if !reflect.DeepEqual(serv, srv1) { + t.Fatalf("bad: %#v %#v", serv, srv1) + } + case "api": + if !reflect.DeepEqual(serv, srv2) { + t.Fatalf("bad: %#v %#v", serv, srv2) + } + case "consul": + // ignore + default: + t.Fatalf("unexpected service: %v", id) + } + } + + // Check the local state + if len(agent.state.services) != 3 { + t.Fatalf("bad: %v", agent.state.services) + } + if len(agent.state.serviceStatus) != 3 { + t.Fatalf("bad: %v", agent.state.serviceStatus) + } + for name, status := range agent.state.serviceStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + } + + // This check won't be allowed. + chk1 := &structs.HealthCheck{ + Node: agent.config.NodeName, + ServiceID: "mysql", + ServiceName: "mysql", + CheckID: "mysql-check", + Name: "mysql", + Status: structs.HealthPassing, + } + agent.state.AddCheck(chk1, token) + + // This one will be allowed. + chk2 := &structs.HealthCheck{ + Node: agent.config.NodeName, + ServiceID: "api", + ServiceName: "api", + CheckID: "api-check", + Name: "api", + Status: structs.HealthPassing, + } + agent.state.AddCheck(chk2, token) + + // Trigger anti-entropy run and wait. + agent.StartSync() + time.Sleep(200 * time.Millisecond) + + // Verify that we are in sync + if err := testutil.WaitForResult(func() (bool, error) { + req := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: agent.config.NodeName, + QueryOptions: structs.QueryOptions{ + Token: "root", + }, + } + var checks structs.IndexedHealthChecks + if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil { + return false, fmt.Errorf("err: %v", err) + } + + // We should have 2 checks (serf included) + if len(checks.HealthChecks) != 2 { + return false, fmt.Errorf("bad: %v", checks) + } + + // All the checks should match + for _, chk := range checks.HealthChecks { + chk.CreateIndex, chk.ModifyIndex = 0, 0 + switch chk.CheckID { + case "mysql-check": + t.Fatalf("should not be permitted") + case "api-check": + if !reflect.DeepEqual(chk, chk2) { + return false, fmt.Errorf("bad: %v %v", chk, chk2) + } + case "serfHealth": + // ignore + default: + return false, fmt.Errorf("unexpected check: %v", chk) + } + } + return true, nil + }); err != nil { + t.Fatal(err) + } + + // Check the local state. + if len(agent.state.checks) != 2 { + t.Fatalf("bad: %v", agent.state.checks) + } + if len(agent.state.checkStatus) != 2 { + t.Fatalf("bad: %v", agent.state.checkStatus) + } + for name, status := range agent.state.checkStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + + // Now delete the check and wait for sync. + agent.state.RemoveCheck("api-check") + agent.StartSync() + time.Sleep(200 * time.Millisecond) + + // Verify that we are in sync + if err := testutil.WaitForResult(func() (bool, error) { + req := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: agent.config.NodeName, + QueryOptions: structs.QueryOptions{ + Token: "root", + }, + } + var checks structs.IndexedHealthChecks + if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil { + return false, fmt.Errorf("err: %v", err) + } + + // We should have 1 check (just serf) + if len(checks.HealthChecks) != 1 { + return false, fmt.Errorf("bad: %v", checks) + } + + // All the checks should match + for _, chk := range checks.HealthChecks { + chk.CreateIndex, chk.ModifyIndex = 0, 0 + switch chk.CheckID { + case "mysql-check": + t.Fatalf("should not be permitted") + case "api-check": + t.Fatalf("should be deleted") + case "serfHealth": + // ignore + default: + return false, fmt.Errorf("unexpected check: %v", chk) + } + } + return true, nil + }); err != nil { + t.Fatal(err) + } + + // Check the local state. + if len(agent.state.checks) != 1 { + t.Fatalf("bad: %v", agent.state.checks) + } + if len(agent.state.checkStatus) != 1 { + t.Fatalf("bad: %v", agent.state.checkStatus) + } + for name, status := range agent.state.checkStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } +} + func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { conf := nextConfig() conf.CheckUpdateInterval = 500 * time.Millisecond