API for `Eval.Count` (#15147)
Add a new `Eval.Count` RPC and associated HTTP API endpoints. This API is designed to support interactive use in the `nomad eval delete` command to get a count of evals expected to be deleted before doing so. The state store operations to do this sort of thing are somewhat expensive, but it's cheaper than serializing a big list of evals to JSON. Note that although it seems like this could be done as an extra parameter and response field on `Eval.List`, having it as its own endpoint avoids having to change the response body shape and lets us avoid handling the legacy filter params supported by `Eval.List`.
This commit is contained in:
parent
7dc1a66630
commit
9e1c0b46d8
|
@ -0,0 +1,3 @@
|
|||
```release-note:improvement
|
||||
api: Added an API for counting evaluations that match a filter
|
||||
```
|
|
@ -30,6 +30,16 @@ func (e *Evaluations) PrefixList(prefix string) ([]*Evaluation, *QueryMeta, erro
|
|||
return e.List(&QueryOptions{Prefix: prefix})
|
||||
}
|
||||
|
||||
// Count is used to get a count of evaluations.
|
||||
func (e *Evaluations) Count(q *QueryOptions) (*EvalCountResponse, *QueryMeta, error) {
|
||||
var resp *EvalCountResponse
|
||||
qm, err := e.client.query("/v1/evaluations/count", &resp, q)
|
||||
if err != nil {
|
||||
return resp, nil, err
|
||||
}
|
||||
return resp, qm, nil
|
||||
}
|
||||
|
||||
// Info is used to query a single evaluation by its ID.
|
||||
func (e *Evaluations) Info(evalID string, q *QueryOptions) (*Evaluation, *QueryMeta, error) {
|
||||
var resp Evaluation
|
||||
|
@ -133,6 +143,11 @@ type EvalDeleteRequest struct {
|
|||
WriteRequest
|
||||
}
|
||||
|
||||
type EvalCountResponse struct {
|
||||
Count int
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// EvalIndexSort is a wrapper to sort evaluations by CreateIndex.
|
||||
// We reverse the test so that we get the highest index first.
|
||||
type EvalIndexSort []*Evaluation
|
||||
|
|
|
@ -138,3 +138,22 @@ func (s *HTTPServer) evalQuery(resp http.ResponseWriter, req *http.Request, eval
|
|||
}
|
||||
return out.Eval, nil
|
||||
}
|
||||
|
||||
func (s *HTTPServer) EvalsCountRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
if req.Method != http.MethodGet {
|
||||
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
|
||||
}
|
||||
|
||||
args := structs.EvalCountRequest{}
|
||||
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var out structs.EvalCountResponse
|
||||
if err := s.agent.RPC("Eval.Count", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
setMeta(resp, &out.QueryMeta)
|
||||
return &out, nil
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/api"
|
||||
|
@ -11,6 +12,7 @@ import (
|
|||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/shoenig/test/must"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -372,3 +374,45 @@ func TestHTTP_EvalQueryWithRelated(t *testing.T) {
|
|||
require.Equal(t, expected, e.RelatedEvals)
|
||||
})
|
||||
}
|
||||
|
||||
func TestHTTP_EvalCount(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
httpTest(t, nil, func(s *TestAgent) {
|
||||
// Directly manipulate the state
|
||||
state := s.Agent.server.State()
|
||||
eval1 := mock.Eval()
|
||||
eval2 := mock.Eval()
|
||||
err := state.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1, eval2})
|
||||
must.NoError(t, err)
|
||||
|
||||
// simple count request
|
||||
req, err := http.NewRequest("GET", "/v1/evaluations/count", nil)
|
||||
must.NoError(t, err)
|
||||
respW := httptest.NewRecorder()
|
||||
obj, err := s.Server.EvalsCountRequest(respW, req)
|
||||
must.NoError(t, err)
|
||||
|
||||
// check headers and response body
|
||||
must.NotEq(t, "", respW.Result().Header.Get("X-Nomad-Index"),
|
||||
must.Sprint("missing index"))
|
||||
must.Eq(t, "true", respW.Result().Header.Get("X-Nomad-KnownLeader"),
|
||||
must.Sprint("missing known leader"))
|
||||
must.NotEq(t, "", respW.Result().Header.Get("X-Nomad-LastContact"),
|
||||
must.Sprint("missing last contact"))
|
||||
|
||||
resp := obj.(*structs.EvalCountResponse)
|
||||
must.Eq(t, resp.Count, 2)
|
||||
|
||||
// filtered count request
|
||||
v := url.Values{}
|
||||
v.Add("filter", fmt.Sprintf("JobID==\"%s\"", eval2.JobID))
|
||||
req, err = http.NewRequest("GET", "/v1/evaluations/count?"+v.Encode(), nil)
|
||||
must.NoError(t, err)
|
||||
respW = httptest.NewRecorder()
|
||||
obj, err = s.Server.EvalsCountRequest(respW, req)
|
||||
must.NoError(t, err)
|
||||
resp = obj.(*structs.EvalCountResponse)
|
||||
must.Eq(t, resp.Count, 1)
|
||||
|
||||
})
|
||||
}
|
||||
|
|
|
@ -359,6 +359,7 @@ func (s HTTPServer) registerHandlers(enableDebug bool) {
|
|||
s.mux.HandleFunc("/v1/allocation/", s.wrap(s.AllocSpecificRequest))
|
||||
|
||||
s.mux.HandleFunc("/v1/evaluations", s.wrap(s.EvalsRequest))
|
||||
s.mux.HandleFunc("/v1/evaluations/count", s.wrap(s.EvalsCountRequest))
|
||||
s.mux.HandleFunc("/v1/evaluation/", s.wrap(s.EvalSpecificRequest))
|
||||
|
||||
s.mux.HandleFunc("/v1/deployments", s.wrap(s.DeploymentsRequest))
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/go-bexpr"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
|
@ -611,6 +612,104 @@ func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListRespon
|
|||
return e.srv.blockingRPC(&opts)
|
||||
}
|
||||
|
||||
// Count is used to get a list of the evaluations in the system
|
||||
func (e *Eval) Count(args *structs.EvalCountRequest, reply *structs.EvalCountResponse) error {
|
||||
if done, err := e.srv.forward("Eval.Count", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "eval", "count"}, time.Now())
|
||||
namespace := args.RequestNamespace()
|
||||
|
||||
// Check for read-job permissions
|
||||
aclObj, err := e.srv.ResolveToken(args.AuthToken)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob) {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
allow := aclObj.AllowNsOpFunc(acl.NamespaceCapabilityReadJob)
|
||||
|
||||
var filter *bexpr.Evaluator
|
||||
if args.Filter != "" {
|
||||
filter, err = bexpr.CreateEvaluator(args.Filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Setup the blocking query. This is only superficially like Eval.List,
|
||||
// because we don't any concerns about pagination, sorting, and legacy
|
||||
// filter fields.
|
||||
opts := blockingOptions{
|
||||
queryOpts: &args.QueryOptions,
|
||||
queryMeta: &reply.QueryMeta,
|
||||
run: func(ws memdb.WatchSet, store *state.StateStore) error {
|
||||
// Scan all the evaluations
|
||||
var err error
|
||||
var iter memdb.ResultIterator
|
||||
|
||||
// Get the namespaces the user is allowed to access.
|
||||
allowableNamespaces, err := allowedNSes(aclObj, store, allow)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if prefix := args.QueryOptions.Prefix; prefix != "" {
|
||||
iter, err = store.EvalsByIDPrefix(ws, namespace, prefix, state.SortDefault)
|
||||
} else if namespace != structs.AllNamespacesSentinel {
|
||||
iter, err = store.EvalsByNamespace(ws, namespace)
|
||||
} else {
|
||||
iter, err = store.Evals(ws, state.SortDefault)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
count := 0
|
||||
|
||||
iter = memdb.NewFilterIterator(iter, func(raw interface{}) bool {
|
||||
if raw == nil {
|
||||
return true
|
||||
}
|
||||
eval := raw.(*structs.Evaluation)
|
||||
if allowableNamespaces != nil && !allowableNamespaces[eval.Namespace] {
|
||||
return true
|
||||
}
|
||||
if filter != nil {
|
||||
ok, err := filter.Evaluate(eval)
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
return !ok
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
count++
|
||||
}
|
||||
|
||||
// Use the last index that affected the jobs table
|
||||
index, err := store.Index("evals")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reply.Index = index
|
||||
reply.Count = count
|
||||
|
||||
// Set the query response
|
||||
e.srv.setQueryMeta(&reply.QueryMeta)
|
||||
return nil
|
||||
}}
|
||||
|
||||
return e.srv.blockingRPC(&opts)
|
||||
}
|
||||
|
||||
// Allocations is used to list the allocations for an evaluation
|
||||
func (e *Eval) Allocations(args *structs.EvalSpecificRequest,
|
||||
reply *structs.EvalAllocationsResponse) error {
|
||||
|
|
|
@ -1548,6 +1548,124 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
|
|||
require.Equal(t, tc.expectedNextToken, resp.QueryMeta.NextToken, "unexpected NextToken")
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Count(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
s1, _, cleanupS1 := TestACLServer(t, nil)
|
||||
defer cleanupS1()
|
||||
codec := rpcClient(t, s1)
|
||||
index := uint64(100)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
store := s1.fsm.State()
|
||||
|
||||
// Create non-default namespace
|
||||
nondefaultNS := mock.Namespace()
|
||||
nondefaultNS.Name = "non-default"
|
||||
err := store.UpsertNamespaces(index, []*structs.Namespace{nondefaultNS})
|
||||
must.NoError(t, err)
|
||||
|
||||
// create a set of evals and field values to filter on.
|
||||
mocks := []struct {
|
||||
namespace string
|
||||
status string
|
||||
}{
|
||||
{namespace: structs.DefaultNamespace, status: structs.EvalStatusPending},
|
||||
{namespace: structs.DefaultNamespace, status: structs.EvalStatusPending},
|
||||
{namespace: structs.DefaultNamespace, status: structs.EvalStatusPending},
|
||||
{namespace: nondefaultNS.Name, status: structs.EvalStatusPending},
|
||||
{namespace: structs.DefaultNamespace, status: structs.EvalStatusComplete},
|
||||
{namespace: nondefaultNS.Name, status: structs.EvalStatusComplete},
|
||||
}
|
||||
|
||||
evals := []*structs.Evaluation{}
|
||||
for i, m := range mocks {
|
||||
eval := mock.Eval()
|
||||
eval.ID = fmt.Sprintf("%d", i) + uuid.Generate()[1:] // sorted for prefix count tests
|
||||
eval.Namespace = m.namespace
|
||||
eval.Status = m.status
|
||||
evals = append(evals, eval)
|
||||
}
|
||||
|
||||
index++
|
||||
require.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, index, evals))
|
||||
|
||||
index++
|
||||
aclToken := mock.CreatePolicyAndToken(t, store, index, "test-read-any",
|
||||
mock.NamespacePolicy("*", "read", nil)).SecretID
|
||||
|
||||
limitedACLToken := mock.CreatePolicyAndToken(t, store, index, "test-read-limited",
|
||||
mock.NamespacePolicy("default", "read", nil)).SecretID
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
namespace string
|
||||
prefix string
|
||||
filter string
|
||||
token string
|
||||
expectedCount int
|
||||
}{
|
||||
{
|
||||
name: "count wildcard namespace with read-any ACL",
|
||||
namespace: "*",
|
||||
token: aclToken,
|
||||
expectedCount: 6,
|
||||
},
|
||||
{
|
||||
name: "count wildcard namespace with limited-read ACL",
|
||||
namespace: "*",
|
||||
token: limitedACLToken,
|
||||
expectedCount: 4,
|
||||
},
|
||||
{
|
||||
name: "count wildcard namespace with prefix",
|
||||
namespace: "*",
|
||||
prefix: evals[2].ID[:2],
|
||||
token: aclToken,
|
||||
expectedCount: 1,
|
||||
},
|
||||
{
|
||||
name: "count default namespace with filter",
|
||||
namespace: structs.DefaultNamespace,
|
||||
filter: "Status == \"pending\"",
|
||||
token: aclToken,
|
||||
expectedCount: 3,
|
||||
},
|
||||
{
|
||||
name: "count nondefault namespace with filter",
|
||||
namespace: "non-default",
|
||||
filter: "Status == \"complete\"",
|
||||
token: aclToken,
|
||||
expectedCount: 1,
|
||||
},
|
||||
{
|
||||
name: "count no results",
|
||||
namespace: "non-default",
|
||||
filter: "Status == \"never\"",
|
||||
token: aclToken,
|
||||
expectedCount: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
req := &structs.EvalCountRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
Namespace: tc.namespace,
|
||||
Prefix: tc.prefix,
|
||||
Filter: tc.filter,
|
||||
},
|
||||
}
|
||||
req.AuthToken = tc.token
|
||||
var resp structs.EvalCountResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Eval.Count", req, &resp)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, tc.expectedCount, resp.Count)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Allocations(t *testing.T) {
|
||||
|
|
|
@ -899,6 +899,11 @@ func (req *EvalListRequest) ShouldBeFiltered(e *Evaluation) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// EvalCountRequest is used to count evaluations
|
||||
type EvalCountRequest struct {
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
// PlanRequest is used to submit an allocation plan to the leader
|
||||
type PlanRequest struct {
|
||||
Plan *Plan
|
||||
|
@ -1599,6 +1604,12 @@ type EvalListResponse struct {
|
|||
QueryMeta
|
||||
}
|
||||
|
||||
// EvalCountResponse is used for a count request
|
||||
type EvalCountResponse struct {
|
||||
Count int
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// EvalAllocationsResponse is used to return the allocations for an evaluation
|
||||
type EvalAllocationsResponse struct {
|
||||
Allocations []*AllocListStub
|
||||
|
|
|
@ -382,4 +382,65 @@ $ curl \
|
|||
]
|
||||
```
|
||||
|
||||
## Count Evaluations
|
||||
|
||||
This endpoint counts evaluations. Note that Nomad's state store architecture
|
||||
makes calculating this count unexpectedly expensive (similar in cost to the List
|
||||
API), and this API was designed for use during recovery operations with the
|
||||
`nomad eval delete` command. It is not recommended to use this API for
|
||||
monitoring. The `nomad.nomad.broker.*` metrics are better for that use
|
||||
case. See the [metrics reference][] for details.
|
||||
|
||||
|
||||
| Method | Path | Produces |
|
||||
|--------|-------------------------|--------------------|
|
||||
| `GET` | `/v1/evaluations/count` | `application/json` |
|
||||
|
||||
The table below shows this endpoint's support for
|
||||
[blocking queries](/api-docs#blocking-queries) and
|
||||
[required ACLs](/api-docs#acls).
|
||||
|
||||
| Blocking Queries | ACL Required |
|
||||
| ---------------- | -------------------- |
|
||||
| `YES` | `namespace:read-job` |
|
||||
|
||||
### Parameters
|
||||
|
||||
- `prefix` `(string: "")`- Specifies a string to filter evaluations based on an
|
||||
ID prefix. Because the value is decoded to bytes, the prefix must have an even
|
||||
number of hexadecimal characters (0-9a-f). This is specified as a query string
|
||||
parameter and is used before any `filter` expression is applied.
|
||||
|
||||
- `filter` `(string: "")` - Specifies the [expression](/api-docs#filtering) used
|
||||
to filter the results.
|
||||
|
||||
- `namespace` `(string: "default")` - Specifies the target namespace.
|
||||
Specifying `*` will return all evaluations across all authorized namespaces.
|
||||
This parameter is used before any `filter` expression is applied.
|
||||
|
||||
### Sample Request
|
||||
|
||||
```shell-session
|
||||
$ curl \
|
||||
https://localhost:4646/v1/evaluations/count
|
||||
```
|
||||
|
||||
```shell-session
|
||||
$ curl \
|
||||
https://localhost:4646/v1/evaluations/count?prefix=25ba81
|
||||
```
|
||||
|
||||
### Sample Response
|
||||
|
||||
```json
|
||||
{
|
||||
"Count": 36
|
||||
"Index": 133,
|
||||
"KnownLeader": true,
|
||||
"LastContact": 0,
|
||||
"NextToken": ""
|
||||
}
|
||||
```
|
||||
|
||||
[update_scheduler_configuration]: /api-docs/operator/scheduler#update-scheduler-configuration
|
||||
[metrics reference]: /docs/operations/metrics-reference
|
||||
|
|
Loading…
Reference in New Issue