open-nomad/nomad/alloc_endpoint.go

184 lines
4.8 KiB
Go
Raw Normal View History

2015-09-06 22:34:28 +00:00
package nomad
import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
2017-09-15 00:24:51 +00:00
"github.com/hashicorp/nomad/acl"
2017-02-08 04:31:23 +00:00
"github.com/hashicorp/nomad/nomad/state"
2015-09-06 22:34:28 +00:00
"github.com/hashicorp/nomad/nomad/structs"
)
// Alloc endpoint is used for manipulating allocations
type Alloc struct {
srv *Server
}
// List is used to list the allocations in the system
func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListResponse) error {
if done, err := a.srv.forward("Alloc.List", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "list"}, time.Now())
2017-09-15 00:24:51 +00:00
// Check namespace read-job permissions
if aclObj, err := a.srv.resolveToken(args.SecretID); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
2017-02-08 04:31:23 +00:00
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture all the allocations
2017-02-08 04:31:23 +00:00
var err error
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
2017-09-07 23:56:15 +00:00
iter, err = state.AllocsByIDPrefix(ws, args.RequestNamespace(), prefix)
} else {
2017-09-07 23:56:15 +00:00
iter, err = state.AllocsByNamespace(ws, args.RequestNamespace())
}
if err != nil {
return err
}
2015-09-06 22:34:28 +00:00
var allocs []*structs.AllocListStub
for {
raw := iter.Next()
if raw == nil {
break
}
alloc := raw.(*structs.Allocation)
allocs = append(allocs, alloc.Stub())
}
reply.Allocations = allocs
2015-09-06 22:34:28 +00:00
// Use the last index that affected the jobs table
2017-02-08 04:31:23 +00:00
index, err := state.Index("allocs")
if err != nil {
return err
}
reply.Index = index
2015-09-06 22:34:28 +00:00
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return a.srv.blockingRPC(&opts)
2015-09-06 22:34:28 +00:00
}
2015-09-06 22:46:45 +00:00
// GetAlloc is used to lookup a particular allocation
func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest,
reply *structs.SingleAllocResponse) error {
if done, err := a.srv.forward("Alloc.GetAlloc", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now())
2017-09-15 00:43:27 +00:00
// Check namespace read-job permissions
if aclObj, err := a.srv.resolveToken(args.SecretID); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
2017-02-08 04:31:23 +00:00
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Lookup the allocation
2017-02-08 04:31:23 +00:00
out, err := state.AllocByID(ws, args.AllocID)
if err != nil {
return err
}
2015-09-06 22:46:45 +00:00
// Setup the output
2015-10-30 15:27:47 +00:00
reply.Alloc = out
if out != nil {
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the allocs table
2017-02-08 04:31:23 +00:00
index, err := state.Index("allocs")
if err != nil {
return err
}
reply.Index = index
}
2015-09-06 22:46:45 +00:00
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return a.srv.blockingRPC(&opts)
2015-09-06 22:46:45 +00:00
}
// GetAllocs is used to lookup a set of allocations
func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
reply *structs.AllocsGetResponse) error {
if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_allocs"}, time.Now())
allocs := make([]*structs.Allocation, len(args.AllocIDs))
// Setup the blocking query. We wait for at least one of the requested
// allocations to be above the min query index. This guarantees that the
// server has received that index.
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
2017-02-08 04:31:23 +00:00
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Lookup the allocation
thresholdMet := false
maxIndex := uint64(0)
for i, alloc := range args.AllocIDs {
2017-02-08 04:31:23 +00:00
out, err := state.AllocByID(ws, alloc)
if err != nil {
return err
}
if out == nil {
// We don't have the alloc yet
thresholdMet = false
break
}
// Store the pointer
allocs[i] = out
// Check if we have passed the minimum index
if out.ModifyIndex > args.QueryOptions.MinQueryIndex {
thresholdMet = true
}
if maxIndex < out.ModifyIndex {
maxIndex = out.ModifyIndex
}
}
// Setup the output
if thresholdMet {
reply.Allocs = allocs
reply.Index = maxIndex
} else {
// Use the last index that affected the nodes table
2017-02-08 04:31:23 +00:00
index, err := state.Index("allocs")
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
},
}
return a.srv.blockingRPC(&opts)
}