Event Stream: Track ACL changes, unsubscribe on invalidating changes (#9447)

* upsertaclpolicies

* delete acl policies msgtype

* upsert acl policies msgtype

* delete acl tokens msgtype

* acl bootstrap msgtype

wip unsubscribe on token delete

test that subscriptions are closed after an ACL token has been deleted

Start writing policyupdated test

* update test to use before/after policy

* add SubscribeWithACLCheck to run acl checks on subscribe

* update rpc endpoint to use broker acl check

* Add and use subscriptions.closeSubscriptionFunc

This fixes the issue of not being able to defer unlocking the mutex on
the event broker in the for loop.

handle acl policy updates

* rpc endpoint test for terminating acl change

* add comments

Co-authored-by: Kris Hicks <khicks@hashicorp.com>
This commit is contained in:
Drew Bailey 2020-12-01 11:11:34 -05:00 committed by GitHub
parent 70ae7ec621
commit 9adca240f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 987 additions and 219 deletions

View File

@ -31,9 +31,9 @@ func TestClient_ACL_resolveTokenValue(t *testing.T) {
token2 := mock.ACLToken()
token2.Type = structs.ACLManagementToken
token2.Policies = nil
err := s1.State().UpsertACLPolicies(100, []*structs.ACLPolicy{policy, policy2})
err := s1.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{policy, policy2})
assert.Nil(t, err)
err = s1.State().UpsertACLTokens(110, []*structs.ACLToken{token, token2})
err = s1.State().UpsertACLTokens(structs.MsgTypeTestSetup, 110, []*structs.ACLToken{token, token2})
assert.Nil(t, err)
// Test the client resolution
@ -80,9 +80,9 @@ func TestClient_ACL_resolvePolicies(t *testing.T) {
token2 := mock.ACLToken()
token2.Type = structs.ACLManagementToken
token2.Policies = nil
err := s1.State().UpsertACLPolicies(100, []*structs.ACLPolicy{policy, policy2})
err := s1.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{policy, policy2})
assert.Nil(t, err)
err = s1.State().UpsertACLTokens(110, []*structs.ACLToken{token, token2})
err = s1.State().UpsertACLTokens(structs.MsgTypeTestSetup, 110, []*structs.ACLToken{token, token2})
assert.Nil(t, err)
// Test the client resolution
@ -136,9 +136,9 @@ func TestClient_ACL_ResolveToken(t *testing.T) {
token2 := mock.ACLToken()
token2.Type = structs.ACLManagementToken
token2.Policies = nil
err := s1.State().UpsertACLPolicies(100, []*structs.ACLPolicy{policy, policy2})
err := s1.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{policy, policy2})
assert.Nil(t, err)
err = s1.State().UpsertACLTokens(110, []*structs.ACLToken{token, token2})
err = s1.State().UpsertACLTokens(structs.MsgTypeTestSetup, 110, []*structs.ACLToken{token, token2})
assert.Nil(t, err)
// Test the client resolution
@ -181,7 +181,7 @@ func TestClient_ACL_ResolveSecretToken(t *testing.T) {
token := mock.ACLToken()
err := s1.State().UpsertACLTokens(110, []*structs.ACLToken{token})
err := s1.State().UpsertACLTokens(structs.MsgTypeTestSetup, 110, []*structs.ACLToken{token})
assert.Nil(t, err)
respToken, err := c1.ResolveSecretToken(token.SecretID)

View File

@ -34,7 +34,7 @@ func TestACLPolicyDeleteCommand(t *testing.T) {
Rules: acl.PolicyWrite,
}
policy.SetHash()
assert.Nil(state.UpsertACLPolicies(1000, []*structs.ACLPolicy{policy}))
assert.Nil(state.UpsertACLPolicies(structs.MsgTypeTestSetup, 1000, []*structs.ACLPolicy{policy}))
ui := cli.NewMockUi()
cmd := &ACLPolicyDeleteCommand{Meta: Meta{Ui: ui, flagAddress: url}}

View File

@ -32,7 +32,7 @@ func TestACLPolicyInfoCommand(t *testing.T) {
Rules: "node { policy = \"read\" }",
}
policy.SetHash()
assert.Nil(state.UpsertACLPolicies(1000, []*structs.ACLPolicy{policy}))
assert.Nil(state.UpsertACLPolicies(structs.MsgTypeTestSetup, 1000, []*structs.ACLPolicy{policy}))
ui := cli.NewMockUi()
cmd := &ACLPolicyInfoCommand{Meta: Meta{Ui: ui, flagAddress: url}}

View File

@ -33,7 +33,7 @@ func TestACLPolicyListCommand(t *testing.T) {
Rules: acl.PolicyWrite,
}
policy.SetHash()
assert.Nil(state.UpsertACLPolicies(1000, []*structs.ACLPolicy{policy}))
assert.Nil(state.UpsertACLPolicies(structs.MsgTypeTestSetup, 1000, []*structs.ACLPolicy{policy}))
ui := cli.NewMockUi()
cmd := &ACLPolicyListCommand{Meta: Meta{Ui: ui, flagAddress: url}}

View File

@ -35,7 +35,7 @@ func TestACLTokenDeleteCommand_ViaEnvVariable(t *testing.T) {
mockToken := mock.ACLToken()
mockToken.Policies = []string{acl.PolicyWrite}
mockToken.SetHash()
assert.Nil(state.UpsertACLTokens(1000, []*structs.ACLToken{mockToken}))
assert.Nil(state.UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{mockToken}))
// Attempt to delete a token without providing a valid token with delete
// permissions

View File

@ -37,7 +37,7 @@ func TestACLTokenInfoCommand_ViaEnvVar(t *testing.T) {
mockToken := mock.ACLToken()
mockToken.Policies = []string{acl.PolicyWrite}
mockToken.SetHash()
assert.Nil(state.UpsertACLTokens(1000, []*structs.ACLToken{mockToken}))
assert.Nil(state.UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{mockToken}))
// Attempt to fetch info on a token without providing a valid management
// token

View File

@ -31,7 +31,7 @@ func TestACLTokenListCommand(t *testing.T) {
mockToken := mock.ACLToken()
mockToken.Policies = []string{acl.PolicyWrite}
mockToken.SetHash()
assert.Nil(state.UpsertACLTokens(1000, []*structs.ACLToken{mockToken}))
assert.Nil(state.UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{mockToken}))
ui := cli.NewMockUi()
cmd := &ACLTokenListCommand{Meta: Meta{Ui: ui, flagAddress: url}}

View File

@ -37,7 +37,7 @@ func TestACLTokenSelfCommand_ViaEnvVar(t *testing.T) {
mockToken := mock.ACLToken()
mockToken.Policies = []string{acl.PolicyWrite}
mockToken.SetHash()
assert.Nil(state.UpsertACLTokens(1000, []*structs.ACLToken{mockToken}))
assert.Nil(state.UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{mockToken}))
// Attempt to fetch info on a token without providing a valid management
// token

View File

@ -33,7 +33,7 @@ func TestACLTokenUpdateCommand(t *testing.T) {
mockToken := mock.ACLToken()
mockToken.Policies = []string{acl.PolicyWrite}
mockToken.SetHash()
assert.Nil(state.UpsertACLTokens(1000, []*structs.ACLToken{mockToken}))
assert.Nil(state.UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{mockToken}))
// Request to update a new token without providing a valid management token
invalidToken := mock.ACLToken()

View File

@ -221,7 +221,7 @@ RETRY:
if a.Config.ACL.Enabled && a.Config.Server.Enabled && a.Config.ACL.PolicyTTL != 0 {
a.RootToken = mock.ACLManagementToken()
state := a.Agent.server.State()
if err := state.BootstrapACLTokens(1, 0, a.RootToken); err != nil {
if err := state.BootstrapACLTokens(structs.MsgTypeTestSetup, 1, 0, a.RootToken); err != nil {
a.T.Fatalf("token bootstrap failed: %v", err)
}
}

View File

@ -28,16 +28,16 @@ func TestACLEndpoint_GetPolicy(t *testing.T) {
// Create the register request
policy := mock.ACLPolicy()
s1.fsm.State().UpsertACLPolicies(1000, []*structs.ACLPolicy{policy})
s1.fsm.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 1000, []*structs.ACLPolicy{policy})
anonymousPolicy := mock.ACLPolicy()
anonymousPolicy.Name = "anonymous"
s1.fsm.State().UpsertACLPolicies(1001, []*structs.ACLPolicy{anonymousPolicy})
s1.fsm.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 1001, []*structs.ACLPolicy{anonymousPolicy})
// Create a token with one the policy
token := mock.ACLToken()
token.Policies = []string{policy.Name}
s1.fsm.State().UpsertACLTokens(1002, []*structs.ACLToken{token})
s1.fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1002, []*structs.ACLToken{token})
// Lookup the policy
get := &structs.ACLPolicySpecificRequest{
@ -119,7 +119,7 @@ func TestACLEndpoint_GetPolicy_Blocking(t *testing.T) {
// First create an unrelated policy
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertACLPolicies(100, []*structs.ACLPolicy{p1})
err := state.UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{p1})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -127,7 +127,7 @@ func TestACLEndpoint_GetPolicy_Blocking(t *testing.T) {
// Upsert the policy we are watching later
time.AfterFunc(200*time.Millisecond, func() {
err := state.UpsertACLPolicies(200, []*structs.ACLPolicy{p2})
err := state.UpsertACLPolicies(structs.MsgTypeTestSetup, 200, []*structs.ACLPolicy{p2})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -160,7 +160,7 @@ func TestACLEndpoint_GetPolicy_Blocking(t *testing.T) {
// Eval delete triggers watches
time.AfterFunc(100*time.Millisecond, func() {
err := state.DeleteACLPolicies(300, []string{p2.Name})
err := state.DeleteACLPolicies(structs.MsgTypeTestSetup, 300, []string{p2.Name})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -195,7 +195,7 @@ func TestACLEndpoint_GetPolicies(t *testing.T) {
// Create the register request
policy := mock.ACLPolicy()
policy2 := mock.ACLPolicy()
s1.fsm.State().UpsertACLPolicies(1000, []*structs.ACLPolicy{policy, policy2})
s1.fsm.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 1000, []*structs.ACLPolicy{policy, policy2})
// Lookup the policy
get := &structs.ACLPolicySetRequest{
@ -235,11 +235,11 @@ func TestACLEndpoint_GetPolicies_TokenSubset(t *testing.T) {
// Create the register request
policy := mock.ACLPolicy()
policy2 := mock.ACLPolicy()
s1.fsm.State().UpsertACLPolicies(1000, []*structs.ACLPolicy{policy, policy2})
s1.fsm.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 1000, []*structs.ACLPolicy{policy, policy2})
token := mock.ACLToken()
token.Policies = []string{policy.Name}
s1.fsm.State().UpsertACLTokens(1000, []*structs.ACLToken{token})
s1.fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{token})
// Lookup the policy which is a subset of our tokens
get := &structs.ACLPolicySetRequest{
@ -280,7 +280,7 @@ func TestACLEndpoint_GetPolicies_Blocking(t *testing.T) {
// First create an unrelated policy
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertACLPolicies(100, []*structs.ACLPolicy{p1})
err := state.UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{p1})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -288,7 +288,7 @@ func TestACLEndpoint_GetPolicies_Blocking(t *testing.T) {
// Upsert the policy we are watching later
time.AfterFunc(200*time.Millisecond, func() {
err := state.UpsertACLPolicies(200, []*structs.ACLPolicy{p2})
err := state.UpsertACLPolicies(structs.MsgTypeTestSetup, 200, []*structs.ACLPolicy{p2})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -321,7 +321,7 @@ func TestACLEndpoint_GetPolicies_Blocking(t *testing.T) {
// Eval delete triggers watches
time.AfterFunc(100*time.Millisecond, func() {
err := state.DeleteACLPolicies(300, []string{p2.Name})
err := state.DeleteACLPolicies(structs.MsgTypeTestSetup, 300, []string{p2.Name})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -360,12 +360,12 @@ func TestACLEndpoint_ListPolicies(t *testing.T) {
p1.Name = "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"
p2.Name = "aaaabbbb-3350-4b4b-d185-0e1992ed43e9"
s1.fsm.State().UpsertACLPolicies(1000, []*structs.ACLPolicy{p1, p2})
s1.fsm.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 1000, []*structs.ACLPolicy{p1, p2})
// Create a token with one of those policies
token := mock.ACLToken()
token.Policies = []string{p1.Name}
s1.fsm.State().UpsertACLTokens(1001, []*structs.ACLToken{token})
s1.fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1001, []*structs.ACLToken{token})
// Lookup the policies
get := &structs.ACLPolicyListRequest{
@ -442,7 +442,7 @@ func TestACLEndpoint_ListPolicies_Unauthenticated(t *testing.T) {
p1 := mock.ACLPolicy()
p1.Name = "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"
s1.fsm.State().UpsertACLPolicies(1000, []*structs.ACLPolicy{p1})
s1.fsm.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 1000, []*structs.ACLPolicy{p1})
t.Run("no anonymous policy", func(t *testing.T) {
resp, err := listPolicies()
@ -454,7 +454,7 @@ func TestACLEndpoint_ListPolicies_Unauthenticated(t *testing.T) {
// now try with anonymous policy
p2 := mock.ACLPolicy()
p2.Name = "anonymous"
s1.fsm.State().UpsertACLPolicies(1001, []*structs.ACLPolicy{p2})
s1.fsm.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 1001, []*structs.ACLPolicy{p2})
t.Run("with anonymous policy", func(t *testing.T) {
resp, err := listPolicies()
@ -479,7 +479,7 @@ func TestACLEndpoint_ListPolicies_Blocking(t *testing.T) {
// Upsert eval triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertACLPolicies(2, []*structs.ACLPolicy{policy}); err != nil {
if err := state.UpsertACLPolicies(structs.MsgTypeTestSetup, 2, []*structs.ACLPolicy{policy}); err != nil {
t.Fatalf("err: %v", err)
}
})
@ -507,7 +507,7 @@ func TestACLEndpoint_ListPolicies_Blocking(t *testing.T) {
// Eval deletion triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteACLPolicies(3, []string{policy.Name}); err != nil {
if err := state.DeleteACLPolicies(structs.MsgTypeTestSetup, 3, []string{policy.Name}); err != nil {
t.Fatalf("err: %v", err)
}
})
@ -536,7 +536,7 @@ func TestACLEndpoint_DeletePolicies(t *testing.T) {
// Create the register request
p1 := mock.ACLPolicy()
s1.fsm.State().UpsertACLPolicies(1000, []*structs.ACLPolicy{p1})
s1.fsm.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 1000, []*structs.ACLPolicy{p1})
// Lookup the policies
req := &structs.ACLPolicyDeleteRequest{
@ -622,7 +622,7 @@ func TestACLEndpoint_GetToken(t *testing.T) {
// Create the register request
token := mock.ACLToken()
s1.fsm.State().UpsertACLTokens(1000, []*structs.ACLToken{token})
s1.fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{token})
// Lookup the token
get := &structs.ACLTokenSpecificRequest{
@ -673,7 +673,7 @@ func TestACLEndpoint_GetToken_Blocking(t *testing.T) {
// First create an unrelated token
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertACLTokens(100, []*structs.ACLToken{p1})
err := state.UpsertACLTokens(structs.MsgTypeTestSetup, 100, []*structs.ACLToken{p1})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -681,7 +681,7 @@ func TestACLEndpoint_GetToken_Blocking(t *testing.T) {
// Upsert the token we are watching later
time.AfterFunc(200*time.Millisecond, func() {
err := state.UpsertACLTokens(200, []*structs.ACLToken{p2})
err := state.UpsertACLTokens(structs.MsgTypeTestSetup, 200, []*structs.ACLToken{p2})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -714,7 +714,7 @@ func TestACLEndpoint_GetToken_Blocking(t *testing.T) {
// Eval delete triggers watches
time.AfterFunc(100*time.Millisecond, func() {
err := state.DeleteACLTokens(300, []string{p2.AccessorID})
err := state.DeleteACLTokens(structs.MsgTypeTestSetup, 300, []string{p2.AccessorID})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -749,7 +749,7 @@ func TestACLEndpoint_GetTokens(t *testing.T) {
// Create the register request
token := mock.ACLToken()
token2 := mock.ACLToken()
s1.fsm.State().UpsertACLTokens(1000, []*structs.ACLToken{token, token2})
s1.fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{token, token2})
// Lookup the token
get := &structs.ACLTokenSetRequest{
@ -792,7 +792,7 @@ func TestACLEndpoint_GetTokens_Blocking(t *testing.T) {
// First create an unrelated token
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertACLTokens(100, []*structs.ACLToken{p1})
err := state.UpsertACLTokens(structs.MsgTypeTestSetup, 100, []*structs.ACLToken{p1})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -800,7 +800,7 @@ func TestACLEndpoint_GetTokens_Blocking(t *testing.T) {
// Upsert the token we are watching later
time.AfterFunc(200*time.Millisecond, func() {
err := state.UpsertACLTokens(200, []*structs.ACLToken{p2})
err := state.UpsertACLTokens(structs.MsgTypeTestSetup, 200, []*structs.ACLToken{p2})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -833,7 +833,7 @@ func TestACLEndpoint_GetTokens_Blocking(t *testing.T) {
// Eval delete triggers watches
time.AfterFunc(100*time.Millisecond, func() {
err := state.DeleteACLTokens(300, []string{p2.AccessorID})
err := state.DeleteACLTokens(structs.MsgTypeTestSetup, 300, []string{p2.AccessorID})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -872,7 +872,7 @@ func TestACLEndpoint_ListTokens(t *testing.T) {
p1.AccessorID = "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"
p2.AccessorID = "aaaabbbb-3350-4b4b-d185-0e1992ed43e9"
s1.fsm.State().UpsertACLTokens(1000, []*structs.ACLToken{p1, p2})
s1.fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{p1, p2})
// Lookup the tokens
get := &structs.ACLTokenListRequest{
@ -933,7 +933,7 @@ func TestACLEndpoint_ListTokens_Blocking(t *testing.T) {
// Upsert eval triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertACLTokens(3, []*structs.ACLToken{token}); err != nil {
if err := state.UpsertACLTokens(structs.MsgTypeTestSetup, 3, []*structs.ACLToken{token}); err != nil {
t.Fatalf("err: %v", err)
}
})
@ -961,7 +961,7 @@ func TestACLEndpoint_ListTokens_Blocking(t *testing.T) {
// Eval deletion triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteACLTokens(4, []string{token.AccessorID}); err != nil {
if err := state.DeleteACLTokens(structs.MsgTypeTestSetup, 4, []string{token.AccessorID}); err != nil {
t.Fatalf("err: %v", err)
}
})
@ -990,7 +990,7 @@ func TestACLEndpoint_DeleteTokens(t *testing.T) {
// Create the register request
p1 := mock.ACLToken()
s1.fsm.State().UpsertACLTokens(1000, []*structs.ACLToken{p1})
s1.fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{p1})
// Lookup the tokens
req := &structs.ACLTokenDeleteRequest{
@ -1225,7 +1225,7 @@ func TestACLEndpoint_ResolveToken(t *testing.T) {
// Create the register request
token := mock.ACLToken()
s1.fsm.State().UpsertACLTokens(1000, []*structs.ACLToken{token})
s1.fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{token})
// Lookup the token
get := &structs.ResolveACLTokenRequest{

View File

@ -29,9 +29,9 @@ func TestResolveACLToken(t *testing.T) {
token2 := mock.ACLToken()
token2.Type = structs.ACLManagementToken
token2.Policies = nil
err = state.UpsertACLPolicies(100, []*structs.ACLPolicy{policy, policy2})
err = state.UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{policy, policy2})
assert.Nil(t, err)
err = state.UpsertACLTokens(110, []*structs.ACLToken{token, token2})
err = state.UpsertACLTokens(structs.MsgTypeTestSetup, 110, []*structs.ACLToken{token, token2})
assert.Nil(t, err)
snap, err := state.Snapshot()
@ -78,7 +78,7 @@ func TestResolveACLToken(t *testing.T) {
}
// Bust the cache by upserting the policy
err = state.UpsertACLPolicies(120, []*structs.ACLPolicy{policy})
err = state.UpsertACLPolicies(structs.MsgTypeTestSetup, 120, []*structs.ACLPolicy{policy})
assert.Nil(t, err)
snap, err = state.Snapshot()
assert.Nil(t, err)
@ -121,7 +121,7 @@ func TestResolveSecretToken(t *testing.T) {
token := mock.ACLToken()
err := state.UpsertACLTokens(110, []*structs.ACLToken{token})
err := state.UpsertACLTokens(structs.MsgTypeTestSetup, 110, []*structs.ACLToken{token})
assert.Nil(t, err)
respToken, err := s1.ResolveSecretToken(token.SecretID)

View File

@ -70,7 +70,7 @@ func TestCSIVolumeEndpoint_Get_ACL(t *testing.T) {
ns := structs.DefaultNamespace
state := srv.fsm.State()
state.BootstrapACLTokens(1, 0, mock.ACLManagementToken())
state.BootstrapACLTokens(structs.MsgTypeTestSetup, 1, 0, mock.ACLManagementToken())
srv.config.ACLEnabled = true
policy := mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIReadVolume})
validToken := mock.CreatePolicyAndToken(t, state, 1001, "csi-access", policy)
@ -372,7 +372,7 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) {
ns := structs.DefaultNamespace
state := srv.fsm.State()
state.BootstrapACLTokens(1, 0, mock.ACLManagementToken())
state.BootstrapACLTokens(structs.MsgTypeTestSetup, 1, 0, mock.ACLManagementToken())
policy := mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIMountVolume}) +
mock.PluginPolicy("read")
@ -455,7 +455,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
index := uint64(1000)
ns := structs.DefaultNamespace
state := srv.fsm.State()
state.BootstrapACLTokens(1, 0, mock.ACLManagementToken())
state.BootstrapACLTokens(structs.MsgTypeTestSetup, 1, 0, mock.ACLManagementToken())
policy := mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIMountVolume}) +
mock.PluginPolicy("read")
@ -584,7 +584,7 @@ func TestCSIVolumeEndpoint_List(t *testing.T) {
testutil.WaitForLeader(t, srv.RPC)
state := srv.fsm.State()
state.BootstrapACLTokens(1, 0, mock.ACLManagementToken())
state.BootstrapACLTokens(structs.MsgTypeTestSetup, 1, 0, mock.ACLManagementToken())
srv.config.ACLEnabled = true
codec := rpcClient(t, srv)
@ -665,7 +665,7 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) {
defer deleteNodes()
state := srv.fsm.State()
state.BootstrapACLTokens(1, 0, mock.ACLManagementToken())
state.BootstrapACLTokens(structs.MsgTypeTestSetup, 1, 0, mock.ACLManagementToken())
srv.config.ACLEnabled = true
codec := rpcClient(t, srv)
@ -814,7 +814,7 @@ func TestCSIPluginEndpoint_DeleteViaGC(t *testing.T) {
defer deleteNodes()
state := srv.fsm.State()
state.BootstrapACLTokens(1, 0, mock.ACLManagementToken())
state.BootstrapACLTokens(structs.MsgTypeTestSetup, 1, 0, mock.ACLManagementToken())
srv.config.ACLEnabled = true
codec := rpcClient(t, srv)

View File

@ -2,13 +2,11 @@ package nomad
import (
"context"
"fmt"
"io"
"io/ioutil"
"time"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
@ -43,12 +41,7 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
return
}
aclObj, err := e.srv.ResolveToken(args.AuthToken)
if err != nil {
handleJsonResultError(err, nil, encoder)
return
}
// Generate the subscription request
subReq := &stream.SubscribeRequest{
Token: args.AuthToken,
Topics: args.Topics,
@ -56,14 +49,6 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
Namespace: args.Namespace,
}
// Check required ACL permissions for requested Topics
if aclObj != nil {
if err := aclCheckForEvents(subReq, aclObj); err != nil {
handleJsonResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder)
return
}
}
// Get the servers broker and subscribe
publisher, err := e.srv.State().EventBroker()
if err != nil {
@ -71,27 +56,31 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// start subscription to publisher
subscription, err := publisher.Subscribe(subReq)
if err != nil {
handleJsonResultError(err, helper.Int64ToPtr(500), encoder)
var subscription *stream.Subscription
var subErr error
// Check required ACL permissions for requested Topics
if e.srv.config.ACLEnabled {
subscription, subErr = publisher.SubscribeWithACLCheck(subReq)
} else {
subscription, subErr = publisher.Subscribe(subReq)
}
if subErr != nil {
handleJsonResultError(subErr, helper.Int64ToPtr(500), encoder)
return
}
defer subscription.Unsubscribe()
errCh := make(chan error)
jsonStream := stream.NewJsonStream(ctx, 30*time.Second)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// goroutine to detect remote side closing
go func() {
io.Copy(ioutil.Discard, conn)
cancel()
}()
jsonStream := stream.NewJsonStream(ctx, 30*time.Second)
errCh := make(chan error)
go func() {
defer cancel()
for {
@ -196,47 +185,3 @@ func handleJsonResultError(err error, code *int64, encoder *codec.Encoder) {
Error: structs.NewRpcError(err, code),
})
}
func aclCheckForEvents(subReq *stream.SubscribeRequest, aclObj *acl.ACL) error {
if len(subReq.Topics) == 0 {
return fmt.Errorf("invalid topic request")
}
reqPolicies := make(map[string]struct{})
var required = struct{}{}
for topic := range subReq.Topics {
switch topic {
case structs.TopicDeployment, structs.TopicEval,
structs.TopicAlloc, structs.TopicJob:
if _, ok := reqPolicies[acl.NamespaceCapabilityReadJob]; !ok {
reqPolicies[acl.NamespaceCapabilityReadJob] = required
}
case structs.TopicNode:
reqPolicies["node-read"] = required
case structs.TopicAll:
reqPolicies["management"] = required
default:
return fmt.Errorf("unknown topic %s", topic)
}
}
for checks := range reqPolicies {
switch checks {
case acl.NamespaceCapabilityReadJob:
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok {
return structs.ErrPermissionDenied
}
case "node-read":
if ok := aclObj.AllowNodeRead(); !ok {
return structs.ErrPermissionDenied
}
case "management":
if ok := aclObj.IsManagement(); !ok {
return structs.ErrPermissionDenied
}
}
}
return nil
}

View File

@ -1,6 +1,7 @@
package nomad
import (
"context"
"encoding/json"
"fmt"
"io"
@ -10,6 +11,7 @@ import (
"time"
"github.com/hashicorp/go-msgpack/codec"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
@ -308,7 +310,7 @@ func TestEventStream_ACL(t *testing.T) {
require := require.New(t)
// start server
s, root, cleanupS := TestACLServer(t, nil)
s, _, cleanupS := TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
@ -346,14 +348,6 @@ func TestEventStream_ACL(t *testing.T) {
},
ExpectedErr: structs.ErrPermissionDenied.Error(),
},
{
Name: "root token",
Token: root.SecretID,
Topics: map[structs.Topic][]string{
"*": {"*"},
},
ExpectedErr: "subscription closed by server",
},
{
Name: "job namespace token - correct ns",
Token: tokenNsFoo.SecretID,
@ -508,3 +502,124 @@ func TestEventStream_ACL(t *testing.T) {
})
}
}
// TestEventStream_ACL_Update_Close_Stream asserts that an active subscription
// is closed after the token is no longer valid
func TestEventStream_ACL_Update_Close_Stream(t *testing.T) {
t.Parallel()
// start server
s1, root, cleanupS := TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s1.RPC)
policyNsGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})
tokenNsFoo := mock.CreatePolicyAndToken(t, s1.State(), 1006, "valid", policyNsGood)
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"Job": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
Namespace: structs.DefaultNamespace,
AuthToken: tokenNsFoo.SecretID,
},
}
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
publisher, err := s1.State().EventBroker()
require.NoError(t, err)
job := mock.Job()
jobEvent := structs.JobEvent{
Job: job,
}
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
// publish some events
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
// RPC to delete token
aclDelReq := &structs.ACLTokenDeleteRequest{
AccessorIDs: []string{tokenNsFoo.AccessorID},
WriteRequest: structs.WriteRequest{
Region: s1.Region(),
Namespace: structs.DefaultNamespace,
AuthToken: root.SecretID,
},
}
var aclResp structs.GenericResponse
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancel()
codec := rpcClient(t, s1)
errChStream := make(chan error, 1)
go func() {
for {
select {
case <-ctx.Done():
errChStream <- ctx.Err()
return
case err := <-errCh:
errChStream <- err
return
case msg := <-streamMsg:
if msg.Error == nil {
// received a valid event, make RPC to delete token
// continue trying for error
continue
}
errChStream <- msg.Error
return
}
}
}()
// Delete the token used to create the stream
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ACL.DeleteTokens", aclDelReq, &aclResp))
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case err := <-errChStream:
// Success
require.Contains(t, err.Error(), stream.ErrSubscriptionClosed.Error())
break OUTER
}
}
}

View File

@ -246,15 +246,15 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
case structs.JobStabilityRequestType:
return n.applyJobStability(buf[1:], log.Index)
case structs.ACLPolicyUpsertRequestType:
return n.applyACLPolicyUpsert(buf[1:], log.Index)
return n.applyACLPolicyUpsert(msgType, buf[1:], log.Index)
case structs.ACLPolicyDeleteRequestType:
return n.applyACLPolicyDelete(buf[1:], log.Index)
return n.applyACLPolicyDelete(msgType, buf[1:], log.Index)
case structs.ACLTokenUpsertRequestType:
return n.applyACLTokenUpsert(buf[1:], log.Index)
return n.applyACLTokenUpsert(msgType, buf[1:], log.Index)
case structs.ACLTokenDeleteRequestType:
return n.applyACLTokenDelete(buf[1:], log.Index)
return n.applyACLTokenDelete(msgType, buf[1:], log.Index)
case structs.ACLTokenBootstrapRequestType:
return n.applyACLTokenBootstrap(buf[1:], log.Index)
return n.applyACLTokenBootstrap(msgType, buf[1:], log.Index)
case structs.AutopilotRequestType:
return n.applyAutopilotUpdate(buf[1:], log.Index)
case structs.UpsertNodeEventsType:
@ -1071,14 +1071,14 @@ func (n *nomadFSM) applyJobStability(buf []byte, index uint64) interface{} {
}
// applyACLPolicyUpsert is used to upsert a set of policies
func (n *nomadFSM) applyACLPolicyUpsert(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyACLPolicyUpsert(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_policy_upsert"}, time.Now())
var req structs.ACLPolicyUpsertRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertACLPolicies(index, req.Policies); err != nil {
if err := n.state.UpsertACLPolicies(msgType, index, req.Policies); err != nil {
n.logger.Error("UpsertACLPolicies failed", "error", err)
return err
}
@ -1086,14 +1086,14 @@ func (n *nomadFSM) applyACLPolicyUpsert(buf []byte, index uint64) interface{} {
}
// applyACLPolicyDelete is used to delete a set of policies
func (n *nomadFSM) applyACLPolicyDelete(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyACLPolicyDelete(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_policy_delete"}, time.Now())
var req structs.ACLPolicyDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteACLPolicies(index, req.Names); err != nil {
if err := n.state.DeleteACLPolicies(msgType, index, req.Names); err != nil {
n.logger.Error("DeleteACLPolicies failed", "error", err)
return err
}
@ -1101,14 +1101,14 @@ func (n *nomadFSM) applyACLPolicyDelete(buf []byte, index uint64) interface{} {
}
// applyACLTokenUpsert is used to upsert a set of policies
func (n *nomadFSM) applyACLTokenUpsert(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyACLTokenUpsert(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_token_upsert"}, time.Now())
var req structs.ACLTokenUpsertRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertACLTokens(index, req.Tokens); err != nil {
if err := n.state.UpsertACLTokens(msgType, index, req.Tokens); err != nil {
n.logger.Error("UpsertACLTokens failed", "error", err)
return err
}
@ -1116,14 +1116,14 @@ func (n *nomadFSM) applyACLTokenUpsert(buf []byte, index uint64) interface{} {
}
// applyACLTokenDelete is used to delete a set of policies
func (n *nomadFSM) applyACLTokenDelete(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyACLTokenDelete(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_token_delete"}, time.Now())
var req structs.ACLTokenDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteACLTokens(index, req.AccessorIDs); err != nil {
if err := n.state.DeleteACLTokens(msgType, index, req.AccessorIDs); err != nil {
n.logger.Error("DeleteACLTokens failed", "error", err)
return err
}
@ -1131,14 +1131,14 @@ func (n *nomadFSM) applyACLTokenDelete(buf []byte, index uint64) interface{} {
}
// applyACLTokenBootstrap is used to bootstrap an ACL token
func (n *nomadFSM) applyACLTokenBootstrap(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyACLTokenBootstrap(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_token_bootstrap"}, time.Now())
var req structs.ACLTokenBootstrapRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.BootstrapACLTokens(index, req.ResetIndex, req.Token); err != nil {
if err := n.state.BootstrapACLTokens(msgType, index, req.ResetIndex, req.Token); err != nil {
n.logger.Error("BootstrapACLToken failed", "error", err)
return err
}

View File

@ -2328,7 +2328,7 @@ func TestFSM_DeleteACLPolicies(t *testing.T) {
fsm := testFSM(t)
policy := mock.ACLPolicy()
err := fsm.State().UpsertACLPolicies(1000, []*structs.ACLPolicy{policy})
err := fsm.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 1000, []*structs.ACLPolicy{policy})
assert.Nil(t, err)
req := structs.ACLPolicyDeleteRequest{
@ -2426,7 +2426,7 @@ func TestFSM_DeleteACLTokens(t *testing.T) {
fsm := testFSM(t)
token := mock.ACLToken()
err := fsm.State().UpsertACLTokens(1000, []*structs.ACLToken{token})
err := fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{token})
assert.Nil(t, err)
req := structs.ACLTokenDeleteRequest{
@ -2810,7 +2810,7 @@ func TestFSM_SnapshotRestore_ACLPolicy(t *testing.T) {
state := fsm.State()
p1 := mock.ACLPolicy()
p2 := mock.ACLPolicy()
state.UpsertACLPolicies(1000, []*structs.ACLPolicy{p1, p2})
state.UpsertACLPolicies(structs.MsgTypeTestSetup, 1000, []*structs.ACLPolicy{p1, p2})
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
@ -2829,7 +2829,7 @@ func TestFSM_SnapshotRestore_ACLTokens(t *testing.T) {
state := fsm.State()
tk1 := mock.ACLToken()
tk2 := mock.ACLToken()
state.UpsertACLTokens(1000, []*structs.ACLToken{tk1, tk2})
state.UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{tk1, tk2})
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)

View File

@ -2167,10 +2167,10 @@ func TestJobEndpoint_Register_ACL_Namespace(t *testing.T) {
// Upsert policy and token
token := mock.ACLToken()
token.Policies = []string{policy.Name}
err := s1.State().UpsertACLPolicies(100, []*structs.ACLPolicy{policy})
err := s1.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{policy})
assert.Nil(err)
err = s1.State().UpsertACLTokens(110, []*structs.ACLToken{token})
err = s1.State().UpsertACLTokens(structs.MsgTypeTestSetup, 110, []*structs.ACLToken{token})
assert.Nil(err)
// Upsert namespace

View File

@ -886,7 +886,7 @@ func TestLeader_ReplicateACLPolicies(t *testing.T) {
// Write a policy to the authoritative region
p1 := mock.ACLPolicy()
if err := s1.State().UpsertACLPolicies(100, []*structs.ACLPolicy{p1}); err != nil {
if err := s1.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{p1}); err != nil {
t.Fatalf("bad: %v", err)
}
@ -909,7 +909,7 @@ func TestLeader_DiffACLPolicies(t *testing.T) {
p1 := mock.ACLPolicy()
p2 := mock.ACLPolicy()
p3 := mock.ACLPolicy()
assert.Nil(t, state.UpsertACLPolicies(100, []*structs.ACLPolicy{p1, p2, p3}))
assert.Nil(t, state.UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{p1, p2, p3}))
// Simulate a remote list
p2Stub := p2.Stub()
@ -956,7 +956,7 @@ func TestLeader_ReplicateACLTokens(t *testing.T) {
// Write a token to the authoritative region
p1 := mock.ACLToken()
p1.Global = true
if err := s1.State().UpsertACLTokens(100, []*structs.ACLToken{p1}); err != nil {
if err := s1.State().UpsertACLTokens(structs.MsgTypeTestSetup, 100, []*structs.ACLToken{p1}); err != nil {
t.Fatalf("bad: %v", err)
}
@ -983,7 +983,7 @@ func TestLeader_DiffACLTokens(t *testing.T) {
p2.Global = true
p3 := mock.ACLToken()
p3.Global = true
assert.Nil(t, state.UpsertACLTokens(100, []*structs.ACLToken{p0, p1, p2, p3}))
assert.Nil(t, state.UpsertACLTokens(structs.MsgTypeTestSetup, 100, []*structs.ACLToken{p0, p1, p2, p3}))
// Simulate a remote list
p2Stub := p2.Stub()

View File

@ -14,8 +14,8 @@ import (
// StateStore defines the methods required from state.StateStore but avoids a
// circular dependency.
type StateStore interface {
UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error
UpsertACLTokens(index uint64, tokens []*structs.ACLToken) error
UpsertACLPolicies(msgType structs.MessageType, index uint64, policies []*structs.ACLPolicy) error
UpsertACLTokens(msgType structs.MessageType, index uint64, tokens []*structs.ACLToken) error
}
// NamespacePolicy is a helper for generating the policy hcl for a given
@ -88,7 +88,7 @@ func CreatePolicy(t testing.T, state StateStore, index uint64, name, rule string
Rules: rule,
}
policy.SetHash()
assert.Nil(t, state.UpsertACLPolicies(index, []*structs.ACLPolicy{policy}))
assert.Nil(t, state.UpsertACLPolicies(structs.MsgTypeTestSetup, index, []*structs.ACLPolicy{policy}))
}
// CreateToken creates a local, client token for the given policies
@ -99,7 +99,7 @@ func CreateToken(t testing.T, state StateStore, index uint64, policies []string)
token := ACLToken()
token.Policies = policies
token.SetHash()
assert.Nil(t, state.UpsertACLTokens(index, []*structs.ACLToken{token}))
assert.Nil(t, state.UpsertACLTokens(structs.MsgTypeTestSetup, index, []*structs.ACLToken{token}))
return token
}

View File

@ -24,6 +24,10 @@ var MsgTypeEvents = map[structs.MessageType]string{
structs.DeploymentPromoteRequestType: structs.TypeDeploymentPromotion,
structs.DeploymentAllocHealthRequestType: structs.TypeDeploymentAllocHealth,
structs.ApplyPlanResultsRequestType: structs.TypePlanResult,
structs.ACLTokenDeleteRequestType: structs.TypeACLTokenDeleted,
structs.ACLTokenUpsertRequestType: structs.TypeACLTokenUpserted,
structs.ACLPolicyDeleteRequestType: structs.TypeACLPolicyDeleted,
structs.ACLPolicyUpsertRequestType: structs.TypeACLPolicyUpserted,
}
func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {
@ -47,6 +51,22 @@ func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {
func eventFromChange(change memdb.Change) (structs.Event, bool) {
if change.Deleted() {
switch before := change.Before.(type) {
case *structs.ACLToken:
return structs.Event{
Topic: structs.TopicACLToken,
Key: before.AccessorID,
Payload: structs.ACLTokenEvent{
ACLToken: before,
},
}, true
case *structs.ACLPolicy:
return structs.Event{
Topic: structs.TopicACLPolicy,
Key: before.Name,
Payload: structs.ACLPolicyEvent{
ACLPolicy: before,
},
}, true
case *structs.Node:
return structs.Event{
Topic: structs.TopicNode,
@ -61,6 +81,22 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
}
switch after := change.After.(type) {
case *structs.ACLToken:
return structs.Event{
Topic: structs.TopicACLToken,
Key: after.AccessorID,
Payload: &structs.ACLTokenEvent{
ACLToken: after,
},
}, true
case *structs.ACLPolicy:
return structs.Event{
Topic: structs.TopicACLPolicy,
Key: after.Name,
Payload: &structs.ACLPolicyEvent{
ACLPolicy: after,
},
}, true
case *structs.Evaluation:
return structs.Event{
Topic: structs.TopicEval,
@ -74,7 +110,6 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Eval: after,
},
}, true
case *structs.Allocation:
alloc := after.Copy()
@ -95,7 +130,6 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Alloc: alloc,
},
}, true
case *structs.Job:
return structs.Event{
Topic: structs.TopicJob,
@ -105,7 +139,6 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Job: after,
},
}, true
case *structs.Node:
return structs.Event{
Topic: structs.TopicNode,
@ -114,7 +147,6 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Node: after,
},
}, true
case *structs.Deployment:
return structs.Event{
Topic: structs.TopicDeployment,

View File

@ -77,6 +77,15 @@ type StateStore struct {
stopEventBroker func()
}
type streamACLDelegate struct {
s *StateStore
}
func (a *streamACLDelegate) TokenProvider() stream.ACLTokenProvider {
resolver, _ := a.s.Snapshot()
return resolver
}
// NewStateStore is used to create a new state store
func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
// Create the MemDB
@ -96,10 +105,13 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
if config.EnablePublisher {
// Create new event publisher using provided config
broker := stream.NewEventBroker(ctx, stream.EventBrokerCfg{
broker, err := stream.NewEventBroker(ctx, &streamACLDelegate{s}, stream.EventBrokerCfg{
EventBufferSize: config.EventBufferSize,
Logger: config.Logger,
})
if err != nil {
return nil, fmt.Errorf("creating state store event broker %w", err)
}
s.db = NewChangeTrackerDB(db, broker, eventsFromChanges)
} else {
s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges)
@ -5015,7 +5027,7 @@ func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.J
}
// UpsertACLPolicies is used to create or update a set of ACL policies
func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error {
func (s *StateStore) UpsertACLPolicies(msgType structs.MessageType, index uint64, policies []*structs.ACLPolicy) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()
@ -5056,8 +5068,8 @@ func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPoli
}
// DeleteACLPolicies deletes the policies with the given names
func (s *StateStore) DeleteACLPolicies(index uint64, names []string) error {
txn := s.db.WriteTxn(index)
func (s *StateStore) DeleteACLPolicies(msgType structs.MessageType, index uint64, names []string) error {
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()
// Delete the policy
@ -5115,7 +5127,7 @@ func (s *StateStore) ACLPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error
}
// UpsertACLTokens is used to create or update a set of ACL tokens
func (s *StateStore) UpsertACLTokens(index uint64, tokens []*structs.ACLToken) error {
func (s *StateStore) UpsertACLTokens(msgType structs.MessageType, index uint64, tokens []*structs.ACLToken) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()
@ -5161,8 +5173,8 @@ func (s *StateStore) UpsertACLTokens(index uint64, tokens []*structs.ACLToken) e
}
// DeleteACLTokens deletes the tokens with the given accessor ids
func (s *StateStore) DeleteACLTokens(index uint64, ids []string) error {
txn := s.db.WriteTxn(index)
func (s *StateStore) DeleteACLTokens(msgType structs.MessageType, index uint64, ids []string) error {
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()
// Delete the tokens
@ -5275,8 +5287,8 @@ func (s *StateStore) CanBootstrapACLToken() (bool, uint64, error) {
}
// BootstrapACLToken is used to create an initial ACL token
func (s *StateStore) BootstrapACLTokens(index, resetIndex uint64, token *structs.ACLToken) error {
txn := s.db.WriteTxn(index)
func (s *StateStore) BootstrapACLTokens(msgType structs.MessageType, index uint64, resetIndex uint64, token *structs.ACLToken) error {
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()
// Check if we have already done a bootstrap

View File

@ -8006,8 +8006,7 @@ func TestStateStore_UpsertACLPolicy(t *testing.T) {
t.Fatalf("err: %v", err)
}
if err := state.UpsertACLPolicies(1000,
[]*structs.ACLPolicy{policy, policy2}); err != nil {
if err := state.UpsertACLPolicies(structs.MsgTypeTestSetup, 1000, []*structs.ACLPolicy{policy, policy2}); err != nil {
t.Fatalf("err: %v", err)
}
if !watchFired(ws) {
@ -8062,8 +8061,7 @@ func TestStateStore_DeleteACLPolicy(t *testing.T) {
policy2 := mock.ACLPolicy()
// Create the policy
if err := state.UpsertACLPolicies(1000,
[]*structs.ACLPolicy{policy, policy2}); err != nil {
if err := state.UpsertACLPolicies(structs.MsgTypeTestSetup, 1000, []*structs.ACLPolicy{policy, policy2}); err != nil {
t.Fatalf("err: %v", err)
}
@ -8074,8 +8072,7 @@ func TestStateStore_DeleteACLPolicy(t *testing.T) {
}
// Delete the policy
if err := state.DeleteACLPolicies(1001,
[]string{policy.Name, policy2.Name}); err != nil {
if err := state.DeleteACLPolicies(structs.MsgTypeTestSetup, 1001, []string{policy.Name, policy2.Name}); err != nil {
t.Fatalf("err: %v", err)
}
@ -8140,7 +8137,7 @@ func TestStateStore_ACLPolicyByNamePrefix(t *testing.T) {
for _, name := range names {
p := mock.ACLPolicy()
p.Name = name
if err := state.UpsertACLPolicies(baseIndex, []*structs.ACLPolicy{p}); err != nil {
if err := state.UpsertACLPolicies(structs.MsgTypeTestSetup, baseIndex, []*structs.ACLPolicy{p}); err != nil {
t.Fatalf("err: %v", err)
}
baseIndex++
@ -8184,7 +8181,7 @@ func TestStateStore_BootstrapACLTokens(t *testing.T) {
assert.Equal(t, true, ok)
assert.EqualValues(t, 0, resetIdx)
if err := state.BootstrapACLTokens(1000, 0, tk1); err != nil {
if err := state.BootstrapACLTokens(structs.MsgTypeTestSetup, 1000, 0, tk1); err != nil {
t.Fatalf("err: %v", err)
}
@ -8197,7 +8194,7 @@ func TestStateStore_BootstrapACLTokens(t *testing.T) {
assert.Equal(t, false, ok)
assert.EqualValues(t, 1000, resetIdx)
if err := state.BootstrapACLTokens(1001, 0, tk2); err == nil {
if err := state.BootstrapACLTokens(structs.MsgTypeTestSetup, 1001, 0, tk2); err == nil {
t.Fatalf("expected error")
}
@ -8235,7 +8232,7 @@ func TestStateStore_BootstrapACLTokens(t *testing.T) {
}
// Should allow bootstrap with reset index
if err := state.BootstrapACLTokens(1001, 1000, tk2); err != nil {
if err := state.BootstrapACLTokens(structs.MsgTypeTestSetup, 1001, 1000, tk2); err != nil {
t.Fatalf("err %v", err)
}
@ -8271,8 +8268,7 @@ func TestStateStore_UpsertACLTokens(t *testing.T) {
t.Fatalf("err: %v", err)
}
if err := state.UpsertACLTokens(1000,
[]*structs.ACLToken{tk1, tk2}); err != nil {
if err := state.UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{tk1, tk2}); err != nil {
t.Fatalf("err: %v", err)
}
if !watchFired(ws) {
@ -8335,8 +8331,7 @@ func TestStateStore_DeleteACLTokens(t *testing.T) {
tk2 := mock.ACLToken()
// Create the tokens
if err := state.UpsertACLTokens(1000,
[]*structs.ACLToken{tk1, tk2}); err != nil {
if err := state.UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{tk1, tk2}); err != nil {
t.Fatalf("err: %v", err)
}
@ -8347,8 +8342,7 @@ func TestStateStore_DeleteACLTokens(t *testing.T) {
}
// Delete the token
if err := state.DeleteACLTokens(1001,
[]string{tk1.AccessorID, tk2.AccessorID}); err != nil {
if err := state.DeleteACLTokens(structs.MsgTypeTestSetup, 1001, []string{tk1.AccessorID, tk2.AccessorID}); err != nil {
t.Fatalf("err: %v", err)
}
@ -8413,7 +8407,7 @@ func TestStateStore_ACLTokenByAccessorIDPrefix(t *testing.T) {
for _, prefix := range prefixes {
tk := mock.ACLToken()
tk.AccessorID = prefix + tk.AccessorID[4:]
if err := state.UpsertACLTokens(baseIndex, []*structs.ACLToken{tk}); err != nil {
if err := state.UpsertACLTokens(structs.MsgTypeTestSetup, baseIndex, []*structs.ACLToken{tk}); err != nil {
t.Fatalf("err: %v", err)
}
baseIndex++
@ -8480,8 +8474,7 @@ func TestStateStore_ACLTokensByGlobal(t *testing.T) {
tk4 := mock.ACLToken()
tk3.Global = true
if err := state.UpsertACLTokens(1000,
[]*structs.ACLToken{tk1, tk2, tk3, tk4}); err != nil {
if err := state.UpsertACLTokens(structs.MsgTypeTestSetup, 1000, []*structs.ACLToken{tk1, tk2, tk3, tk4}); err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -2,18 +2,24 @@ package stream
import (
"context"
"errors"
"fmt"
"sync"
"time"
"sync/atomic"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/go-hclog"
)
const (
DefaultTTL = 1 * time.Hour
ACLCheckNodeRead = "node-read"
ACLCheckManagement = "management"
aclCacheSize = 32
)
type EventBrokerCfg struct {
@ -35,6 +41,11 @@ type EventBroker struct {
// the Commit call in the FSM hot path.
publishCh chan *structs.Events
aclDelegate ACLDelegate
aclCache *lru.TwoQueueCache
aclCh chan *structs.Event
logger hclog.Logger
}
@ -42,7 +53,7 @@ type EventBroker struct {
// A goroutine is run in the background to publish events to an event buffer.
// Cancelling the context will shutdown the goroutine to free resources, and stop
// all publishing.
func NewEventBroker(ctx context.Context, cfg EventBrokerCfg) *EventBroker {
func NewEventBroker(ctx context.Context, aclDelegate ACLDelegate, cfg EventBrokerCfg) (*EventBroker, error) {
if cfg.Logger == nil {
cfg.Logger = hclog.NewNullLogger()
}
@ -52,19 +63,28 @@ func NewEventBroker(ctx context.Context, cfg EventBrokerCfg) *EventBroker {
cfg.EventBufferSize = 100
}
aclCache, err := lru.New2Q(aclCacheSize)
if err != nil {
return nil, err
}
buffer := newEventBuffer(cfg.EventBufferSize)
e := &EventBroker{
logger: cfg.Logger.Named("event_broker"),
eventBuf: buffer,
publishCh: make(chan *structs.Events, 64),
logger: cfg.Logger.Named("event_broker"),
eventBuf: buffer,
publishCh: make(chan *structs.Events, 64),
aclCh: make(chan *structs.Event, 10),
aclDelegate: aclDelegate,
aclCache: aclCache,
subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
},
}
go e.handleUpdates(ctx)
go e.handleACLUpdates(ctx)
return e
return e, nil
}
// Returns the current length of the event buffer
@ -74,9 +94,34 @@ func (e *EventBroker) Len() int {
// Publish events to all subscribers of the event Topic.
func (e *EventBroker) Publish(events *structs.Events) {
if len(events.Events) > 0 {
e.publishCh <- events
if len(events.Events) == 0 {
return
}
// Notify the broker to check running subscriptions against potentially
// updated ACL Token or Policy
for _, event := range events.Events {
if event.Topic == structs.TopicACLToken || event.Topic == structs.TopicACLPolicy {
e.aclCh <- &event
}
}
e.publishCh <- events
}
// SubscribeWithACLCheck validates the SubscribeRequest's token and requested Topics
// to ensure that the tokens privileges are sufficient enough.
func (e *EventBroker) SubscribeWithACLCheck(req *SubscribeRequest) (*Subscription, error) {
aclObj, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, req.Token)
if err != nil {
return nil, structs.ErrPermissionDenied
}
if allowed := aclAllowsSubscription(aclObj, req); !allowed {
return nil, structs.ErrPermissionDenied
}
return e.Subscribe(req)
}
// Subscribe returns a new Subscription for a given request. A Subscription
@ -90,7 +135,7 @@ func (e *EventBroker) Publish(events *structs.Events) {
// will be returned.
//
// When a caller is finished with the subscription it must call Subscription.Unsubscribe
// to free ACL tracking resources. TODO(drew) ACL tracking
// to free ACL tracking resources.
func (e *EventBroker) Subscribe(req *SubscribeRequest) (*Subscription, error) {
e.mu.Lock()
defer e.mu.Unlock()
@ -137,6 +182,143 @@ func (e *EventBroker) handleUpdates(ctx context.Context) {
}
}
func (e *EventBroker) handleACLUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case update := <-e.aclCh:
switch payload := update.Payload.(type) {
case structs.ACLTokenEvent:
tokenSecretID := payload.ACLToken.SecretID
// Token was deleted
if update.Type == structs.TypeACLTokenDeleted {
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue
}
// If broker cannot fetch state there is nothing more to do
if e.aclDelegate == nil {
continue
}
aclObj, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, tokenSecretID)
if err != nil || aclObj == nil {
e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err)
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue
}
e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool {
return !aclAllowsSubscription(aclObj, sub.req)
})
case structs.ACLPolicyEvent:
// Re-evaluate each subscriptions permissions since a policy
// change may or may not affect the subscription
e.checkSubscriptionsAgainstPolicyChange()
}
}
}
}
// checkSubscriptionsAgainstPolicyChange iterates over the brokers
// subscriptions and evaluates whether the token used for the subscription is
// still valid. If it is not valid it closes the subscriptions belonging to the
// token.
//
// A lock must be held to iterate over the map of subscriptions.
func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() {
e.mu.Lock()
defer e.mu.Unlock()
// If broker cannot fetch state there is nothing more to do
if e.aclDelegate == nil {
return
}
aclSnapshot := e.aclDelegate.TokenProvider()
for tokenSecretID := range e.subscriptions.byToken {
aclObj, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID)
if err != nil || aclObj == nil {
e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err)
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue
}
e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool {
return !aclAllowsSubscription(aclObj, sub.req)
})
}
}
func aclObjFromSnapshotForTokenSecretID(aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (*acl.ACL, error) {
aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID)
if err != nil {
return nil, err
}
if aclToken == nil {
return nil, errors.New("no token for secret ID")
}
// Check if this is a management token
if aclToken.Type == structs.ACLManagementToken {
return acl.ManagementACL, nil
}
aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies))
for _, policyName := range aclToken.Policies {
policy, err := aclSnapshot.ACLPolicyByName(nil, policyName)
if err != nil || policy == nil {
return nil, errors.New("error finding acl policy")
}
aclPolicies = append(aclPolicies, policy)
}
return structs.CompileACLObject(aclCache, aclPolicies)
}
type ACLTokenProvider interface {
ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error)
ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error)
}
type ACLDelegate interface {
TokenProvider() ACLTokenProvider
}
func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool {
for topic := range subReq.Topics {
switch topic {
case structs.TopicDeployment,
structs.TopicEval,
structs.TopicAlloc,
structs.TopicJob:
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok {
return false
}
case structs.TopicNode:
if ok := aclObj.AllowNodeRead(); !ok {
return false
}
default:
if ok := aclObj.IsManagement(); !ok {
return false
}
}
}
return true
}
func (s *Subscription) forceClose() {
if atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed) {
close(s.forceClosed)
}
}
type subscriptions struct {
// mu for byToken. If both subscription.mu and EventBroker.mu need
// to be held, EventBroker mutex MUST always be acquired first.
@ -175,6 +357,17 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
}
}
func (s *subscriptions) closeSubscriptionFunc(tokenSecretID string, fn func(*Subscription) bool) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, sub := range s.byToken[tokenSecretID] {
if fn(sub) {
sub.forceClose()
}
}
}
// unsubscribeFn returns a function that the subscription will call to remove
// itself from the subsByToken.
// This function is returned as a closure so that the caller doesn't need to keep

View File

@ -6,6 +6,9 @@ import (
"testing"
"time"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
@ -20,7 +23,9 @@ func TestEventBroker_PublishChangesAndSubscribe(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventBroker(ctx, EventBrokerCfg{EventBufferSize: 100})
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{EventBufferSize: 100})
require.NoError(t, err)
sub, err := publisher.Subscribe(subscription)
require.NoError(t, err)
eventCh := consumeSubscription(ctx, sub)
@ -64,7 +69,8 @@ func TestEventBroker_ShutdownClosesSubscriptions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
publisher := NewEventBroker(ctx, EventBrokerCfg{})
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
require.NoError(t, err)
sub1, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
@ -92,7 +98,8 @@ func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
publisher := NewEventBroker(ctx, EventBrokerCfg{})
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
require.NoError(t, err)
// first subscription, empty token
sub1, err := publisher.Subscribe(&SubscribeRequest{})
@ -109,6 +116,469 @@ func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) {
require.Equal(t, subscriptionStateOpen, atomic.LoadUint32(&sub2.state))
}
func TestEventBroker_handleACLUpdates_tokendeleted(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
require.NoError(t, err)
sub1, err := publisher.Subscribe(&SubscribeRequest{
Topics: map[structs.Topic][]string{
"*": {"*"},
},
Token: "foo",
})
require.NoError(t, err)
defer sub1.Unsubscribe()
aclEvent := structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenDeleted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: "foo",
},
},
}
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{aclEvent}})
for {
_, err := sub1.Next(ctx)
if err == ErrSubscriptionClosed {
break
}
}
out, err := sub1.Next(ctx)
require.Error(t, err)
require.Equal(t, ErrSubscriptionClosed, err)
require.Equal(t, structs.Events{}, out)
}
type fakeACLDelegate struct {
tokenProvider ACLTokenProvider
}
func (d *fakeACLDelegate) TokenProvider() ACLTokenProvider {
return d.tokenProvider
}
type fakeACLTokenProvider struct {
policy *structs.ACLPolicy
policyErr error
token *structs.ACLToken
tokenErr error
}
func (p *fakeACLTokenProvider) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error) {
return p.token, p.tokenErr
}
func (p *fakeACLTokenProvider) ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error) {
return p.policy, p.policyErr
}
func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
secretID := "some-secret-id"
cases := []struct {
policyBeforeRules string
policyAfterRules string
topics map[structs.Topic][]string
desc string
event structs.Event
policyEvent structs.Event
shouldUnsubscribe bool
initialSubErr bool
}{
{
desc: "subscribed to deployments and removed access",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicDeployment,
Type: structs.TypeDeploymentUpdate,
Payload: structs.DeploymentEvent{
Deployment: &structs.Deployment{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
},
},
{
desc: "subscribed to evals and removed access",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicEval,
Type: structs.TypeEvalUpdated,
Payload: structs.EvalEvent{
Eval: &structs.Evaluation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
},
},
{
desc: "subscribed to allocs and removed access",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicAlloc,
Type: structs.TypeAllocUpdated,
Payload: structs.AllocEvent{
Alloc: &structs.Allocation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
},
},
{
desc: "subscribed to nodes and removed access",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyDeny),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
},
},
{
desc: "subscribed to deployments and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicDeployment,
Type: structs.TypeDeploymentUpdate,
Payload: structs.DeploymentEvent{
Deployment: &structs.Deployment{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
},
},
{
desc: "subscribed to evals and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicEval,
Type: structs.TypeEvalUpdated,
Payload: structs.EvalEvent{
Eval: &structs.Evaluation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
},
},
{
desc: "subscribed to allocs and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicAlloc,
Type: structs.TypeAllocUpdated,
Payload: structs.AllocEvent{
Alloc: &structs.Allocation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
},
},
{
desc: "subscribed to nodes and no access change",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyRead),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
},
},
{
desc: "initial token insufficient privileges",
initialSubErr: true,
policyBeforeRules: mock.NodePolicy(acl.PolicyDeny),
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
},
},
{
desc: "subscribed to nodes and policy change no change",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyWrite),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyUpserted,
Payload: structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},
},
},
},
{
desc: "subscribed to nodes and policy change no access",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyDeny),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyUpserted,
Payload: structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},
},
},
},
{
desc: "subscribed to nodes policy deleted",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: "",
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyDeleted,
Payload: structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},
},
},
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
policy := &structs.ACLPolicy{
Name: "some-policy",
Rules: tc.policyBeforeRules,
}
policy.SetHash()
tokenProvider := &fakeACLTokenProvider{
policy: policy,
token: &structs.ACLToken{
SecretID: secretID,
Policies: []string{policy.Name},
},
}
aclDelegate := &fakeACLDelegate{
tokenProvider: tokenProvider,
}
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
require.NoError(t, err)
sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{
tc.event.Topic: {"*"},
},
Namespace: structs.DefaultNamespace,
Token: secretID,
})
if tc.initialSubErr {
require.Error(t, err)
require.Nil(t, sub)
return
} else {
require.NoError(t, err)
}
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{tc.event}})
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
_, err = sub.Next(ctx)
require.NoError(t, err)
// Update the mock provider to use the after rules
policyAfter := &structs.ACLPolicy{
Name: "some-new-policy",
Rules: tc.policyAfterRules,
ModifyIndex: 101, // The ModifyIndex is used to caclulate the acl cache key
}
policyAfter.SetHash()
tokenProvider.policy = policyAfter
// Publish ACL event triggering subscription re-evaluation
publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{tc.policyEvent}})
// Publish another event
publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{tc.event}})
// If we are expecting to unsubscribe consume the subscription
// until the expected error occurs.
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
if tc.shouldUnsubscribe {
for {
_, err = sub.Next(ctx)
if err != nil {
if err == context.DeadlineExceeded {
require.Fail(t, err.Error())
}
if err == ErrSubscriptionClosed {
break
}
}
}
} else {
_, err = sub.Next(ctx)
require.NoError(t, err)
}
publisher.Publish(&structs.Events{Index: 103, Events: []structs.Event{tc.event}})
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
_, err = sub.Next(ctx)
if tc.shouldUnsubscribe {
require.Equal(t, ErrSubscriptionClosed, err)
} else {
require.NoError(t, err)
}
})
}
}
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult {
eventCh := make(chan subNextResult, 1)
go func() {

View File

@ -22,6 +22,7 @@ const (
// ErrSubscriptionClosed is a error signalling the subscription has been
// closed. The client should Unsubscribe, then re-Subscribe.
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe")
var ErrACLInvalid = errors.New("Provided ACL token is invalid for requested topics")
type Subscription struct {
// state must be accessed atomically 0 means open, 1 means closed with reload
@ -110,13 +111,6 @@ func (s *Subscription) NextNoBlock() ([]structs.Event, error) {
}
}
func (s *Subscription) forceClose() {
swapped := atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed)
if swapped {
close(s.forceClosed)
}
}
func (s *Subscription) Unsubscribe() {
s.unsub()
}

View File

@ -21,6 +21,8 @@ const (
TopicAlloc Topic = "Alloc"
TopicJob Topic = "Job"
TopicNode Topic = "Node"
TopicACLPolicy Topic = "ACLToken"
TopicACLToken Topic = "ACLPolicy"
TopicAll Topic = "*"
TypeNodeRegistration = "NodeRegistration"
@ -39,6 +41,10 @@ const (
TypeJobDeregistered = "JobDeregistered"
TypeJobBatchDeregistered = "JobBatchDeregistered"
TypePlanResult = "PlanResult"
TypeACLTokenDeleted = "ACLTokenDeleted"
TypeACLTokenUpserted = "ACLTokenUpserted"
TypeACLPolicyDeleted = "ACLPolicyDeleted"
TypeACLPolicyUpserted = "ACLPolicyUpserted"
)
// Event represents a change in Nomads state.
@ -111,3 +117,11 @@ type DeploymentEvent struct {
type NodeStreamEvent struct {
Node *Node
}
type ACLTokenEvent struct {
ACLToken *ACLToken
}
type ACLPolicyEvent struct {
ACLPolicy *ACLPolicy
}

View File

@ -34,7 +34,7 @@ func TestACLServer(t testing.T, cb func(*Config)) (*Server, *structs.ACLToken, f
}
})
token := mock.ACLManagementToken()
err := server.State().BootstrapACLTokens(1, 0, token)
err := server.State().BootstrapACLTokens(structs.MsgTypeTestSetup, 1, 0, token)
if err != nil {
t.Fatalf("failed to bootstrap ACL token: %v", err)
}