open-nomad/nomad/alloc_endpoint.go
Alex Dadgar c19985244a GetAllocs uses a blocking query
This PR makes GetAllocs use a blocking query as well as adding a sanity
check to the clients watchAllocation code to ensure it gets the correct
allocations.

This PR fixes https://github.com/hashicorp/nomad/issues/2119 and
https://github.com/hashicorp/nomad/issues/2153.

The issue was that the client was talking to two different servers, one
to check which allocations to pull and the other to pull those
allocations.  However the latter call was not with a blocking query and
thus the client would not retreive the allocations it requested.

The logging has been improved to make the problem more clear as well.
2017-01-10 13:30:35 -08:00

190 lines
4.6 KiB
Go

package nomad
import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
)
// Alloc endpoint is used for manipulating allocations
type Alloc struct {
srv *Server
}
// List is used to list the allocations in the system
func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListResponse) error {
if done, err := a.srv.forward("Alloc.List", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "list"}, time.Now())
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Table: "allocs"}),
run: func() error {
// Capture all the allocations
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = snap.AllocsByIDPrefix(prefix)
} else {
iter, err = snap.Allocs()
}
if err != nil {
return err
}
var allocs []*structs.AllocListStub
for {
raw := iter.Next()
if raw == nil {
break
}
alloc := raw.(*structs.Allocation)
allocs = append(allocs, alloc.Stub())
}
reply.Allocations = allocs
// Use the last index that affected the jobs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return a.srv.blockingRPC(&opts)
}
// GetAlloc is used to lookup a particular allocation
func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest,
reply *structs.SingleAllocResponse) error {
if done, err := a.srv.forward("Alloc.GetAlloc", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now())
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Alloc: args.AllocID}),
run: func() error {
// Lookup the allocation
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.AllocByID(args.AllocID)
if err != nil {
return err
}
// Setup the output
reply.Alloc = out
if out != nil {
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the allocs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return a.srv.blockingRPC(&opts)
}
// GetAllocs is used to lookup a set of allocations
func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
reply *structs.AllocsGetResponse) error {
if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_allocs"}, time.Now())
// Build the watch
items := make([]watch.Item, 0, len(args.AllocIDs))
for _, allocID := range args.AllocIDs {
items = append(items, watch.Item{Alloc: allocID})
}
allocs := make([]*structs.Allocation, len(args.AllocIDs))
// Setup the blocking query. We wait for at least one of the requested
// allocations to be above the min query index. This guarantees that the
// server has received that index.
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(items...),
run: func() error {
// Lookup the allocation
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
thresholdMet := false
maxIndex := uint64(0)
for i, alloc := range args.AllocIDs {
out, err := snap.AllocByID(alloc)
if err != nil {
return err
}
if out == nil {
// We don't have the alloc yet
thresholdMet = false
break
}
// Store the pointer
allocs[i] = out
// Check if we have passed the minimum index
if out.ModifyIndex > args.QueryOptions.MinQueryIndex {
thresholdMet = true
}
if maxIndex < out.ModifyIndex {
maxIndex = out.ModifyIndex
}
}
// Setup the output
if thresholdMet {
reply.Allocs = allocs
reply.Index = maxIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
},
}
return a.srv.blockingRPC(&opts)
}