open-nomad/nomad/variables_endpoint_test.go
Tim Gross 9d906d4632 variables: fix filter on List RPC
The List RPC correctly authorized against the prefix argument. But when
filtering results underneath the prefix, it only checked authorization for
standard ACL tokens and not Workload Identity. This results in WI tokens being
able to read List results (metadata only: variable paths and timestamps) for
variables under the `nomad/` prefix that belong to other jobs in the same
namespace.

Fixes the filtering and split the `handleMixedAuthEndpoint` function into
separate authentication and authorization steps so that we don't need to
re-verify the claim token on each filtered object.

Also includes:
* update semgrep rule for mixed auth endpoints
* variables: List returns empty set when all results are filtered
2022-10-27 13:08:05 -04:00

873 lines
24 KiB
Go

package nomad
import (
"encoding/json"
"fmt"
"math/rand"
"strings"
"testing"
"time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
func TestVariablesEndpoint_auth(t *testing.T) {
ci.Parallel(t)
srv, _, shutdown := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
const ns = "nondefault-namespace"
alloc1 := mock.Alloc()
alloc1.ClientStatus = structs.AllocClientStatusFailed
alloc1.Job.Namespace = ns
alloc1.Namespace = ns
jobID := alloc1.JobID
// create an alloc that will have no access to variables we create
alloc2 := mock.Alloc()
alloc2.Job.TaskGroups[0].Name = "other-no-permissions"
alloc2.TaskGroup = "other-no-permissions"
alloc2.ClientStatus = structs.AllocClientStatusRunning
alloc2.Job.Namespace = ns
alloc2.Namespace = ns
alloc3 := mock.Alloc()
alloc3.ClientStatus = structs.AllocClientStatusRunning
alloc3.Job.Namespace = ns
alloc3.Namespace = ns
alloc3.Job.ParentID = jobID
alloc4 := mock.Alloc()
alloc4.ClientStatus = structs.AllocClientStatusRunning
alloc4.Job.Namespace = ns
alloc4.Namespace = ns
store := srv.fsm.State()
must.NoError(t, store.UpsertNamespaces(1000, []*structs.Namespace{{Name: ns}}))
must.NoError(t, store.UpsertAllocs(
structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc1, alloc2, alloc3, alloc4}))
claims1 := alloc1.ToTaskIdentityClaims(nil, "web")
idToken, err := srv.encrypter.SignClaims(claims1)
must.NoError(t, err)
claims2 := alloc2.ToTaskIdentityClaims(nil, "web")
noPermissionsToken, err := srv.encrypter.SignClaims(claims2)
must.NoError(t, err)
claims3 := alloc3.ToTaskIdentityClaims(alloc3.Job, "web")
idDispatchToken, err := srv.encrypter.SignClaims(claims3)
must.NoError(t, err)
// corrupt the signature of the token
idTokenParts := strings.Split(idToken, ".")
must.Len(t, 3, idTokenParts)
sig := []string(strings.Split(idTokenParts[2], ""))
rand.Shuffle(len(sig), func(i, j int) {
sig[i], sig[j] = sig[j], sig[i]
})
idTokenParts[2] = strings.Join(sig, "")
invalidIDToken := strings.Join(idTokenParts, ".")
claims4 := alloc4.ToTaskIdentityClaims(alloc4.Job, "web")
wiOnlyToken, err := srv.encrypter.SignClaims(claims4)
must.NoError(t, err)
policy := mock.ACLPolicy()
policy.Rules = `namespace "nondefault-namespace" {
variables {
path "nomad/jobs/*" { capabilities = ["list"] }
path "other/path" { capabilities = ["read"] }
}}`
policy.JobACL = &structs.JobACL{
Namespace: ns,
JobID: jobID,
Group: alloc1.TaskGroup,
}
policy.SetHash()
err = store.UpsertACLPolicies(structs.MsgTypeTestSetup, 1100, []*structs.ACLPolicy{policy})
must.NoError(t, err)
aclToken := mock.ACLToken()
aclToken.Policies = []string{policy.Name}
err = store.UpsertACLTokens(structs.MsgTypeTestSetup, 1150, []*structs.ACLToken{aclToken})
must.NoError(t, err)
t.Run("terminal alloc should be denied", func(t *testing.T) {
_, _, err = srv.staticEndpoints.Variables.handleMixedAuthEndpoint(
structs.QueryOptions{AuthToken: idToken, Namespace: ns}, acl.PolicyList,
fmt.Sprintf("nomad/jobs/%s/web/web", jobID))
must.EqError(t, err, structs.ErrPermissionDenied.Error())
})
// make alloc non-terminal
alloc1.ClientStatus = structs.AllocClientStatusRunning
must.NoError(t, store.UpsertAllocs(
structs.MsgTypeTestSetup, 1200, []*structs.Allocation{alloc1}))
t.Run("wrong namespace should be denied", func(t *testing.T) {
_, _, err = srv.staticEndpoints.Variables.handleMixedAuthEndpoint(
structs.QueryOptions{AuthToken: idToken, Namespace: structs.DefaultNamespace}, acl.PolicyList,
fmt.Sprintf("nomad/jobs/%s/web/web", jobID))
must.EqError(t, err, structs.ErrPermissionDenied.Error())
})
testCases := []struct {
name string
token string
cap string
path string
expectedErr error
}{
{
name: "valid claim for path with task secret",
token: idToken,
cap: acl.PolicyRead,
path: fmt.Sprintf("nomad/jobs/%s/web/web", jobID),
expectedErr: nil,
},
{
name: "valid claim for path with group secret",
token: idToken,
cap: acl.PolicyRead,
path: fmt.Sprintf("nomad/jobs/%s/web", jobID),
expectedErr: nil,
},
{
name: "valid claim for path with job secret",
token: idToken,
cap: acl.PolicyRead,
path: fmt.Sprintf("nomad/jobs/%s", jobID),
expectedErr: nil,
},
{
name: "valid claim for path with dispatch job secret",
token: idDispatchToken,
cap: acl.PolicyRead,
path: fmt.Sprintf("nomad/jobs/%s", jobID),
expectedErr: nil,
},
{
name: "valid claim for path with namespace secret",
token: idToken,
cap: acl.PolicyRead,
path: "nomad/jobs",
expectedErr: nil,
},
{
name: "valid claim for job-attached policy",
token: idToken,
cap: acl.PolicyRead,
path: "other/path",
expectedErr: nil,
},
{
name: "valid claim for job-attached policy path denied",
token: idToken,
cap: acl.PolicyRead,
path: "other/not-allowed",
expectedErr: structs.ErrPermissionDenied,
},
{
name: "valid claim for job-attached policy capability denied",
token: idToken,
cap: acl.PolicyWrite,
path: "other/path",
expectedErr: structs.ErrPermissionDenied,
},
{
name: "valid claim for job-attached policy capability with cross-job access",
token: idToken,
cap: acl.PolicyList,
path: "nomad/jobs/some-other",
expectedErr: nil,
},
{
name: "valid claim with no permissions denied by path",
token: noPermissionsToken,
cap: acl.PolicyList,
path: fmt.Sprintf("nomad/jobs/%s/w", jobID),
expectedErr: structs.ErrPermissionDenied,
},
{
name: "valid claim with no permissions allowed by namespace",
token: noPermissionsToken,
cap: acl.PolicyList,
path: "nomad/jobs",
expectedErr: nil,
},
{
name: "valid claim with no permissions denied by capability",
token: noPermissionsToken,
cap: acl.PolicyRead,
path: fmt.Sprintf("nomad/jobs/%s/w", jobID),
expectedErr: structs.ErrPermissionDenied,
},
{
name: "missing auth token is denied",
cap: acl.PolicyList,
path: fmt.Sprintf("nomad/jobs/%s/web/web", jobID),
expectedErr: structs.ErrPermissionDenied,
},
{
name: "invalid signature is denied",
token: invalidIDToken,
cap: acl.PolicyList,
path: fmt.Sprintf("nomad/jobs/%s/web/web", jobID),
expectedErr: structs.ErrPermissionDenied,
},
{
name: "invalid claim for dispatched ID",
token: idDispatchToken,
cap: acl.PolicyList,
path: fmt.Sprintf("nomad/jobs/%s", alloc3.JobID),
expectedErr: structs.ErrPermissionDenied,
},
{
name: "acl token read policy is allowed to list",
token: aclToken.SecretID,
cap: acl.PolicyList,
path: fmt.Sprintf("nomad/jobs/%s/web/web", jobID),
expectedErr: nil,
},
{
name: "acl token read policy is not allowed to write",
token: aclToken.SecretID,
cap: acl.PolicyWrite,
path: fmt.Sprintf("nomad/jobs/%s/web/web", jobID),
expectedErr: structs.ErrPermissionDenied,
},
{
name: "WI token can read own task",
token: wiOnlyToken,
cap: acl.PolicyRead,
path: fmt.Sprintf("nomad/jobs/%s/web/web", alloc4.JobID),
expectedErr: nil,
},
{
name: "WI token can list own task",
token: wiOnlyToken,
cap: acl.PolicyList,
path: fmt.Sprintf("nomad/jobs/%s/web/web", alloc4.JobID),
expectedErr: nil,
},
{
name: "WI token can read own group",
token: wiOnlyToken,
cap: acl.PolicyRead,
path: fmt.Sprintf("nomad/jobs/%s/web", alloc4.JobID),
expectedErr: nil,
},
{
name: "WI token can list own group",
token: wiOnlyToken,
cap: acl.PolicyList,
path: fmt.Sprintf("nomad/jobs/%s/web", alloc4.JobID),
expectedErr: nil,
},
{
name: "WI token cannot read another task in group",
token: wiOnlyToken,
cap: acl.PolicyRead,
path: fmt.Sprintf("nomad/jobs/%s/web/other", alloc4.JobID),
expectedErr: structs.ErrPermissionDenied,
},
{
name: "WI token cannot list another task in group",
token: wiOnlyToken,
cap: acl.PolicyList,
path: fmt.Sprintf("nomad/jobs/%s/web/other", alloc4.JobID),
expectedErr: structs.ErrPermissionDenied,
},
{
name: "WI token cannot read another task in group",
token: wiOnlyToken,
cap: acl.PolicyRead,
path: fmt.Sprintf("nomad/jobs/%s/web/other", alloc4.JobID),
expectedErr: structs.ErrPermissionDenied,
},
{
name: "WI token cannot list a task in another group",
token: wiOnlyToken,
cap: acl.PolicyRead,
path: fmt.Sprintf("nomad/jobs/%s/other/web", alloc4.JobID),
expectedErr: structs.ErrPermissionDenied,
},
{
name: "WI token cannot read a task in another group",
token: wiOnlyToken,
cap: acl.PolicyRead,
path: fmt.Sprintf("nomad/jobs/%s/other/web", alloc4.JobID),
expectedErr: structs.ErrPermissionDenied,
},
{
name: "WI token cannot read a group in another job",
token: wiOnlyToken,
cap: acl.PolicyRead,
path: "nomad/jobs/other/web/web",
expectedErr: structs.ErrPermissionDenied,
},
{
name: "WI token cannot list a group in another job",
token: wiOnlyToken,
cap: acl.PolicyList,
path: "nomad/jobs/other/web/web",
expectedErr: structs.ErrPermissionDenied,
},
{
name: "WI token extra trailing slash is denied",
token: wiOnlyToken,
cap: acl.PolicyList,
path: fmt.Sprintf("nomad/jobs/%s/web/", alloc4.JobID),
expectedErr: structs.ErrPermissionDenied,
},
{
name: "WI token invalid prefix is denied",
token: wiOnlyToken,
cap: acl.PolicyList,
path: fmt.Sprintf("nomad/jobs/%s/w", alloc4.JobID),
expectedErr: structs.ErrPermissionDenied,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
_, _, err := srv.staticEndpoints.Variables.handleMixedAuthEndpoint(
structs.QueryOptions{AuthToken: tc.token, Namespace: ns}, tc.cap, tc.path)
if tc.expectedErr == nil {
must.NoError(t, err)
} else {
must.EqError(t, err, tc.expectedErr.Error())
}
})
}
}
func TestVariablesEndpoint_Apply_ACL(t *testing.T) {
ci.Parallel(t)
srv, rootToken, shutdown := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
codec := rpcClient(t, srv)
state := srv.fsm.State()
pol := mock.NamespacePolicyWithVariables(
structs.DefaultNamespace, "", []string{"list-jobs"},
map[string][]string{
"dropbox/*": {"write"},
})
writeToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", pol)
sv1 := mock.Variable()
sv1.ModifyIndex = 0
var svHold *structs.VariableDecrypted
opMap := map[string]structs.VarOp{
"set": structs.VarOpSet,
"cas": structs.VarOpCAS,
"delete": structs.VarOpDelete,
"delete-cas": structs.VarOpDeleteCAS,
}
for name, op := range opMap {
t.Run(name+"/no token", func(t *testing.T) {
sv1 := sv1
applyReq := structs.VariablesApplyRequest{
Op: op,
Var: sv1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
applyResp := new(structs.VariablesApplyResponse)
err := msgpackrpc.CallWithCodec(codec, structs.VariablesApplyRPCMethod, &applyReq, applyResp)
must.EqError(t, err, structs.ErrPermissionDenied.Error())
})
}
t.Run("cas/management token/new", func(t *testing.T) {
applyReq := structs.VariablesApplyRequest{
Op: structs.VarOpCAS,
Var: sv1,
WriteRequest: structs.WriteRequest{
Region: "global",
AuthToken: rootToken.SecretID,
},
}
applyResp := new(structs.VariablesApplyResponse)
err := msgpackrpc.CallWithCodec(codec, structs.VariablesApplyRPCMethod, &applyReq, applyResp)
must.NoError(t, err)
must.Eq(t, structs.VarOpResultOk, applyResp.Result)
must.Eq(t, sv1.Items, applyResp.Output.Items)
svHold = applyResp.Output
})
t.Run("cas with current", func(t *testing.T) {
must.NotNil(t, svHold)
sv := svHold
sv.Items["new"] = "newVal"
applyReq := structs.VariablesApplyRequest{
Op: structs.VarOpCAS,
Var: sv,
WriteRequest: structs.WriteRequest{
Region: "global",
AuthToken: rootToken.SecretID,
},
}
applyResp := new(structs.VariablesApplyResponse)
applyReq.AuthToken = rootToken.SecretID
err := msgpackrpc.CallWithCodec(codec, structs.VariablesApplyRPCMethod, &applyReq, &applyResp)
must.NoError(t, err)
must.Eq(t, structs.VarOpResultOk, applyResp.Result)
must.Eq(t, sv.Items, applyResp.Output.Items)
svHold = applyResp.Output
})
t.Run("cas with stale", func(t *testing.T) {
must.NotNil(t, sv1) // TODO: query these directly
must.NotNil(t, svHold)
sv1 := sv1
svHold := svHold
applyReq := structs.VariablesApplyRequest{
Op: structs.VarOpCAS,
Var: sv1,
WriteRequest: structs.WriteRequest{
Region: "global",
AuthToken: rootToken.SecretID,
},
}
applyResp := new(structs.VariablesApplyResponse)
applyReq.AuthToken = rootToken.SecretID
err := msgpackrpc.CallWithCodec(codec, structs.VariablesApplyRPCMethod, &applyReq, &applyResp)
must.NoError(t, err)
must.Eq(t, structs.VarOpResultConflict, applyResp.Result)
must.Eq(t, svHold.VariableMetadata, applyResp.Conflict.VariableMetadata)
must.Eq(t, svHold.Items, applyResp.Conflict.Items)
})
sv3 := mock.Variable()
sv3.Path = "dropbox/a"
sv3.ModifyIndex = 0
t.Run("cas/write-only/read own new", func(t *testing.T) {
sv3 := sv3
applyReq := structs.VariablesApplyRequest{
Op: structs.VarOpCAS,
Var: sv3,
WriteRequest: structs.WriteRequest{
Region: "global",
AuthToken: writeToken.SecretID,
},
}
applyResp := new(structs.VariablesApplyResponse)
err := msgpackrpc.CallWithCodec(codec, structs.VariablesApplyRPCMethod, &applyReq, &applyResp)
must.NoError(t, err)
must.Eq(t, structs.VarOpResultOk, applyResp.Result)
must.Eq(t, sv3.Items, applyResp.Output.Items)
svHold = applyResp.Output
})
t.Run("cas/write only/conflict redacted", func(t *testing.T) {
must.NotNil(t, sv3)
must.NotNil(t, svHold)
sv3 := sv3
svHold := svHold
applyReq := structs.VariablesApplyRequest{
Op: structs.VarOpCAS,
Var: sv3,
WriteRequest: structs.WriteRequest{
Region: "global",
AuthToken: writeToken.SecretID,
},
}
applyResp := new(structs.VariablesApplyResponse)
err := msgpackrpc.CallWithCodec(codec, structs.VariablesApplyRPCMethod, &applyReq, &applyResp)
must.NoError(t, err)
must.Eq(t, structs.VarOpResultRedacted, applyResp.Result)
must.Eq(t, svHold.VariableMetadata, applyResp.Conflict.VariableMetadata)
must.Nil(t, applyResp.Conflict.Items)
})
t.Run("cas/write only/read own upsert", func(t *testing.T) {
must.NotNil(t, svHold)
sv := svHold
sv.Items["upsert"] = "read"
applyReq := structs.VariablesApplyRequest{
Op: structs.VarOpCAS,
Var: sv,
WriteRequest: structs.WriteRequest{
Region: "global",
AuthToken: writeToken.SecretID,
},
}
applyResp := new(structs.VariablesApplyResponse)
err := msgpackrpc.CallWithCodec(codec, structs.VariablesApplyRPCMethod, &applyReq, &applyResp)
must.NoError(t, err)
must.Eq(t, structs.VarOpResultOk, applyResp.Result)
must.Eq(t, sv.Items, applyResp.Output.Items)
})
}
func TestVariablesEndpoint_ListFiltering(t *testing.T) {
ci.Parallel(t)
srv, _, shutdown := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
codec := rpcClient(t, srv)
ns := "nondefault-namespace"
idx := uint64(1000)
alloc := mock.Alloc()
alloc.Job.ID = "job1"
alloc.JobID = "job1"
alloc.TaskGroup = "group"
alloc.Job.TaskGroups[0].Name = "group"
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.Job.Namespace = ns
alloc.Namespace = ns
store := srv.fsm.State()
must.NoError(t, store.UpsertNamespaces(idx, []*structs.Namespace{{Name: ns}}))
idx++
must.NoError(t, store.UpsertAllocs(
structs.MsgTypeTestSetup, idx, []*structs.Allocation{alloc}))
claims := alloc.ToTaskIdentityClaims(alloc.Job, "web")
token, err := srv.encrypter.SignClaims(claims)
must.NoError(t, err)
writeVar := func(ns, path string) {
idx++
sv := mock.VariableEncrypted()
sv.Namespace = ns
sv.Path = path
resp := store.VarSet(idx, &structs.VarApplyStateRequest{
Op: structs.VarOpSet,
Var: sv,
})
must.NoError(t, resp.Error)
}
writeVar(ns, "nomad/jobs/job1/group/web")
writeVar(ns, "nomad/jobs/job1/group")
writeVar(ns, "nomad/jobs/job1")
writeVar(ns, "nomad/jobs/job1/group/other")
writeVar(ns, "nomad/jobs/job1/other/web")
writeVar(ns, "nomad/jobs/job2/group/web")
req := &structs.VariablesListRequest{
QueryOptions: structs.QueryOptions{
Namespace: ns,
Prefix: "nomad",
AuthToken: token,
Region: "global",
},
}
var resp structs.VariablesListResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Variables.List", req, &resp))
found := []string{}
for _, variable := range resp.Data {
found = append(found, variable.Path)
}
expect := []string{
"nomad/jobs/job1",
"nomad/jobs/job1/group",
"nomad/jobs/job1/group/web",
}
must.Eq(t, expect, found)
}
func TestVariablesEndpoint_ComplexACLPolicies(t *testing.T) {
ci.Parallel(t)
srv, _, shutdown := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
codec := rpcClient(t, srv)
idx := uint64(1000)
policyRules := `
namespace "dev" {
variables {
path "*" { capabilities = ["list", "read"] }
path "system/*" { capabilities = ["deny"] }
path "config/system/*" { capabilities = ["deny"] }
}
}
namespace "prod" {
variables {
path "*" {
capabilities = ["list"]
}
}
}
namespace "*" {}
`
store := srv.fsm.State()
must.NoError(t, store.UpsertNamespaces(1000, []*structs.Namespace{
{Name: "dev"}, {Name: "prod"}, {Name: "other"}}))
idx++
token := mock.CreatePolicyAndToken(t, store, idx, "developer", policyRules)
writeVar := func(ns, path string) {
idx++
sv := mock.VariableEncrypted()
sv.Namespace = ns
sv.Path = path
resp := store.VarSet(idx, &structs.VarApplyStateRequest{
Op: structs.VarOpSet,
Var: sv,
})
must.NoError(t, resp.Error)
}
writeVar("dev", "system/never-list")
writeVar("dev", "config/system/never-list")
writeVar("dev", "config/can-read")
writeVar("dev", "project/can-read")
writeVar("prod", "system/can-list")
writeVar("prod", "config/system/can-list")
writeVar("prod", "config/can-list")
writeVar("prod", "project/can-list")
writeVar("other", "system/never-list")
writeVar("other", "config/system/never-list")
writeVar("other", "config/never-list")
writeVar("other", "project/never-list")
testListPrefix := func(ns, prefix string, expectedCount int, expectErr error) {
t.Run(fmt.Sprintf("ns=%s-prefix=%s", ns, prefix), func(t *testing.T) {
req := &structs.VariablesListRequest{
QueryOptions: structs.QueryOptions{
Namespace: ns,
Prefix: prefix,
AuthToken: token.SecretID,
Region: "global",
},
}
var resp structs.VariablesListResponse
if expectErr != nil {
must.EqError(t,
msgpackrpc.CallWithCodec(codec, "Variables.List", req, &resp),
expectErr.Error())
return
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Variables.List", req, &resp))
found := "found:\n"
for _, sv := range resp.Data {
found += fmt.Sprintf(" ns=%s path=%s\n", sv.Namespace, sv.Path)
}
must.Len(t, expectedCount, resp.Data, test.Sprintf("%s", found))
})
}
testListPrefix("dev", "system", 0, nil)
testListPrefix("dev", "config/system", 0, nil)
testListPrefix("dev", "config", 1, nil)
testListPrefix("dev", "project", 1, nil)
testListPrefix("dev", "", 2, nil)
testListPrefix("prod", "system", 1, nil)
testListPrefix("prod", "config/system", 1, nil)
testListPrefix("prod", "config", 2, nil)
testListPrefix("prod", "project", 1, nil)
testListPrefix("prod", "", 4, nil)
// list gives empty but no error!
testListPrefix("other", "system", 0, nil)
testListPrefix("other", "config/system", 0, nil)
testListPrefix("other", "config", 0, nil)
testListPrefix("other", "project", 0, nil)
testListPrefix("other", "", 0, nil)
testListPrefix("*", "system", 1, nil)
testListPrefix("*", "config/system", 1, nil)
testListPrefix("*", "config", 3, nil)
testListPrefix("*", "project", 2, nil)
testListPrefix("*", "", 6, nil)
}
func TestVariablesEndpoint_GetVariable_Blocking(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// First create an unrelated variable.
delay := 100 * time.Millisecond
time.AfterFunc(delay, func() {
writeVar(t, s1, 100, "default", "aaa")
})
// Upsert the variable we are watching later
delay = 200 * time.Millisecond
time.AfterFunc(delay, func() {
writeVar(t, s1, 200, "default", "bbb")
})
// Lookup the variable
req := &structs.VariablesReadRequest{
Path: "bbb",
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 150,
MaxQueryTime: 500 * time.Millisecond,
},
}
var resp structs.VariablesReadResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Variables.Read", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
elapsed := time.Since(start)
if elapsed < delay {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if elapsed > req.MaxQueryTime {
t.Fatalf("blocking query timed out %#v", resp)
}
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if resp.Data == nil || resp.Data.Path != "bbb" {
t.Fatalf("bad: %#v", resp.Data)
}
// Variable update triggers watches
delay = 100 * time.Millisecond
time.AfterFunc(delay, func() {
writeVar(t, s1, 300, "default", "bbb")
})
req.QueryOptions.MinQueryIndex = 250
var resp2 structs.VariablesReadResponse
start = time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Variables.Read", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
elapsed = time.Since(start)
if elapsed < delay {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if elapsed > req.MaxQueryTime {
t.Fatal("blocking query timed out")
}
if resp2.Index != 300 {
t.Fatalf("Bad index: %d %d", resp2.Index, 300)
}
if resp2.Data == nil || resp2.Data.Path != "bbb" {
t.Fatalf("bad: %#v", resp2.Data)
}
// Variable delete triggers watches
delay = 100 * time.Millisecond
time.AfterFunc(delay, func() {
sv := mock.VariableEncrypted()
sv.Path = "bbb"
if resp := state.VarDelete(400, &structs.VarApplyStateRequest{Op: structs.VarOpDelete, Var: sv}); !resp.IsOk() {
t.Fatalf("err: %v", resp.Error)
}
})
req.QueryOptions.MinQueryIndex = 350
var resp3 structs.VariablesReadResponse
start = time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Variables.Read", req, &resp3); err != nil {
t.Fatalf("err: %v", err)
}
elapsed = time.Since(start)
if elapsed < delay {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if elapsed > req.MaxQueryTime {
t.Fatal("blocking query timed out")
}
if resp3.Index != 400 {
t.Fatalf("Bad index: %d %d", resp3.Index, 400)
}
if resp3.Data != nil {
t.Fatalf("bad: %#v", resp3.Data)
}
}
func writeVar(t *testing.T, s *Server, idx uint64, ns, path string) {
store := s.fsm.State()
sv := mock.Variable()
sv.Namespace = ns
sv.Path = path
bPlain, err := json.Marshal(sv.Items)
must.NoError(t, err)
bEnc, kID, err := s.encrypter.Encrypt(bPlain)
must.NoError(t, err)
sve := &structs.VariableEncrypted{
VariableMetadata: sv.VariableMetadata,
VariableData: structs.VariableData{
Data: bEnc,
KeyID: kID,
},
}
resp := store.VarSet(idx, &structs.VarApplyStateRequest{
Op: structs.VarOpSet,
Var: sve,
})
must.NoError(t, resp.Error)
}