Fix double pull with introduction of AllocModifyIndex

This commit is contained in:
Alex Dadgar 2016-02-01 13:57:35 -08:00
parent 45a733600a
commit 2d98c0eadd
18 changed files with 247 additions and 207 deletions

View File

@ -61,18 +61,6 @@ func (n *Nodes) Allocations(nodeID string, q *QueryOptions) ([]*Allocation, *Que
return resp, qm, nil
}
// ClientAllocations is used to return a lightweight list of allocations associated with a node.
// It is primarily used by the client in order to determine which allocations actually need
// an update.
func (n *Nodes) ClientAllocations(nodeID string, q *QueryOptions) (map[string]uint64, *QueryMeta, error) {
var resp map[string]uint64
qm, err := n.client.query("/v1/node/"+nodeID+"/clientallocations", &resp, q)
if err != nil {
return nil, nil, err
}
return resp, qm, nil
}
// ForceEvaluate is used to force-evaluate an existing node.
func (n *Nodes) ForceEvaluate(nodeID string, q *WriteOptions) (string, *WriteMeta, error) {
var resp nodeEvalResponse

View File

@ -207,24 +207,6 @@ func TestNodes_Allocations(t *testing.T) {
}
}
func TestNodes_ClientAllocations(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
nodes := c.Nodes()
// Looking up by a non-existent node returns nothing. We
// don't check the index here because it's possible the node
// has already registered, in which case we will get a non-
// zero result anyways.
allocs, _, err := nodes.ClientAllocations("nope", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if n := len(allocs); n != 0 {
t.Fatalf("expected 0 allocs, got: %d", n)
}
}
func TestNodes_ForceEvaluate(t *testing.T) {
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true

View File

@ -38,7 +38,8 @@ type AllocRunner struct {
logger *log.Logger
consulService *ConsulService
alloc *structs.Allocation
alloc *structs.Allocation
allocLock sync.Mutex
dirtyCh chan struct{}
@ -184,6 +185,8 @@ func (r *AllocRunner) DestroyContext() error {
// Alloc returns the associated allocation
func (r *AllocRunner) Alloc() *structs.Allocation {
r.allocLock.Lock()
defer r.allocLock.Unlock()
return r.alloc
}
@ -336,6 +339,11 @@ OUTER:
for {
select {
case update := <-r.updateCh:
// Store the updated allocation.
r.allocLock.Lock()
r.alloc = update
r.allocLock.Unlock()
// Check if we're in a terminal status
if update.TerminalStatus() {
break OUTER
@ -408,6 +416,14 @@ func (r *AllocRunner) Update(update *structs.Allocation) {
}
}
// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and
// checks if the current running allocation is behind and should be updated.
func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool {
r.allocLock.Lock()
defer r.allocLock.Unlock()
return r.alloc.AllocModifyIndex < serverIndex
}
// Destroy is used to indicate that the allocation context should be destroyed
func (r *AllocRunner) Destroy() {
r.destroyLock.Lock()

View File

@ -91,7 +91,7 @@ func TestAllocRunner_Destroy(t *testing.T) {
func TestAllocRunner_Update(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
_, ar := testAllocRunner(false)
// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
@ -99,27 +99,20 @@ func TestAllocRunner_Update(t *testing.T) {
task.Config["args"] = []string{"10"}
go ar.Run()
defer ar.Destroy()
start := time.Now()
// Update the alloc definition
newAlloc := new(structs.Allocation)
*newAlloc = *ar.alloc
newAlloc.DesiredStatus = structs.AllocDesiredStatusStop
newAlloc.Name = "FOO"
newAlloc.AllocModifyIndex++
ar.Update(newAlloc)
// Check the alloc runner stores the update allocation.
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
return ar.Alloc().Name == "FOO", nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
t.Fatalf("err: %v %#v", err, ar.Alloc())
})
if time.Since(start) > 15*time.Second {
t.Fatalf("took too long to terminate")
}
}
func TestAllocRunner_SaveRestoreState(t *testing.T) {

View File

@ -627,7 +627,7 @@ func (c *Client) run() {
}
// Watch for changes in allocations
allocUpdates := make(chan []*structs.Allocation, 1)
allocUpdates := make(chan *allocUpdates, 1)
go c.watchAllocations(allocUpdates)
// Create a snapshot timer
@ -642,8 +642,8 @@ func (c *Client) run() {
c.logger.Printf("[ERR] client: failed to save state: %v", err)
}
case allocs := <-allocUpdates:
c.runAllocs(allocs)
case update := <-allocUpdates:
c.runAllocs(update)
case <-heartbeat:
if err := c.updateNodeStatus(); err != nil {
@ -722,8 +722,22 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) error {
return nil
}
// allocUpdates holds the results of receiving updated allocations from the
// servers.
type allocUpdates struct {
// pulled is the set of allocations that were downloaded from the servers.
pulled map[string]*structs.Allocation
// filtered is the set of allocations that were not pulled because their
// AllocModifyIndex didn't change.
filtered map[string]struct{}
}
// watchAllocations is used to scan for updates to allocations
func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) {
func (c *Client) watchAllocations(updates chan *allocUpdates) {
// The request and response for getting the map of allocations that should
// be running on the Node to their AllocModifyIndex which is incremented
// when the allocation is updated by the servers.
req := structs.NodeSpecificRequest{
NodeID: c.Node().ID,
QueryOptions: structs.QueryOptions{
@ -731,12 +745,24 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) {
AllowStale: true,
},
}
var resp structs.NodeAllocsResponse
var resp structs.NodeClientAllocsResponse
// The request and response for pulling down the set of allocations that are
// new, or updated server side.
allocsReq := structs.AllocsGetRequest{
QueryOptions: structs.QueryOptions{
Region: c.config.Region,
AllowStale: true,
},
}
var allocsResp structs.AllocsGetResponse
for {
// Get the allocations, blocking for updates
resp = structs.NodeAllocsResponse{}
err := c.RPC("Node.GetAllocs", &req, &resp)
// Get the allocation modify index map, blocking for updates. We will
// use this to determine exactly what allocations need to be downloaded
// in full.
resp = structs.NodeClientAllocsResponse{}
err := c.RPC("Node.GetClientAllocs", &req, &resp)
if err != nil {
c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err)
retry := c.retryIntv(getAllocRetryIntv)
@ -755,16 +781,69 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) {
default:
}
// Check for updates
// Filter all allocations whose AllocModifyIndex was not incremented.
// These are the allocations who have either not been updated, or whose
// updates are a result of the client sending an update for the alloc.
// This lets us reduce the network traffic to the server as we don't
// need to pull all the allocations.
var pull []string
filtered := make(map[string]struct{})
c.allocLock.Lock()
for allocID, modifyIndex := range resp.Allocs {
// Pull the allocation if we don't have an alloc runner for the
// allocation or if the alloc runner requires an updated allocation.
runner, ok := c.allocs[allocID]
if !ok || runner.shouldUpdate(modifyIndex) {
pull = append(pull, allocID)
}
filtered[allocID] = struct{}{}
}
c.allocLock.Unlock()
c.logger.Printf("[DEBUG] client: updated allocations at index %d (pulled %d) (filtered %d)",
resp.Index, len(pull), len(filtered))
// Pull the allocations that passed filtering.
allocsResp.Allocs = nil
if len(pull) != 0 {
// Pull the allocations that need to be updated.
allocsReq.AllocIDs = pull
allocsResp = structs.AllocsGetResponse{}
if err := c.RPC("Alloc.GetAllocs", &allocsReq, &allocsResp); err != nil {
c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err)
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-time.After(retry):
continue
case <-c.shutdownCh:
return
}
}
// Check for shutdown
select {
case <-c.shutdownCh:
return
default:
}
}
// Update the query index.
if resp.Index <= req.MinQueryIndex {
continue
}
req.MinQueryIndex = resp.Index
c.logger.Printf("[DEBUG] client: updated allocations at index %d (%d allocs)", resp.Index, len(resp.Allocs))
// Push the updates
// Push the updates.
pulled := make(map[string]*structs.Allocation, len(allocsResp.Allocs))
for _, alloc := range allocsResp.Allocs {
pulled[alloc.ID] = alloc
}
update := &allocUpdates{
filtered: filtered,
pulled: pulled,
}
select {
case allocUpdates <- resp.Allocs:
case updates <- update:
case <-c.shutdownCh:
return
}
@ -772,7 +851,7 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) {
}
// runAllocs is invoked when we get an updated set of allocations
func (c *Client) runAllocs(updated []*structs.Allocation) {
func (c *Client) runAllocs(update *allocUpdates) {
// Get the existing allocs
c.allocLock.RLock()
exist := make([]*structs.Allocation, 0, len(c.allocs))
@ -782,7 +861,7 @@ func (c *Client) runAllocs(updated []*structs.Allocation) {
c.allocLock.RUnlock()
// Diff the existing and updated allocations
diff := diffAllocs(exist, updated)
diff := diffAllocs(exist, update)
c.logger.Printf("[DEBUG] client: %#v", diff)
// Remove the old allocations

View File

@ -355,10 +355,14 @@ func TestClient_WatchAllocs(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Update the other allocation
alloc2.DesiredStatus = structs.AllocDesiredStatusStop
// Update the other allocation. Have to make a copy because the allocs are
// shared in memory in the test and the modify index would be updated in the
// alloc runner.
alloc2_2 := new(structs.Allocation)
*alloc2_2 = *alloc2
alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(102,
[]*structs.Allocation{alloc2})
[]*structs.Allocation{alloc2_2})
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -31,33 +31,28 @@ func (d *diffResult) GoString() string {
// diffAllocs is used to diff the existing and updated allocations
// to see what has happened.
func diffAllocs(existing, updated []*structs.Allocation) *diffResult {
result := &diffResult{}
// Index the updated allocations by id
idx := make(map[string]*structs.Allocation)
for _, update := range updated {
idx[update.ID] = update
}
func diffAllocs(existing []*structs.Allocation, allocs *allocUpdates) *diffResult {
// Scan the existing allocations
result := &diffResult{}
existIdx := make(map[string]struct{})
for _, exist := range existing {
// Mark this as existing
existIdx[exist.ID] = struct{}{}
// Check for presence in the new set
update, ok := idx[exist.ID]
// Check if the alloc was updated or filtered because an update wasn't
// needed.
alloc, pulled := allocs.pulled[exist.ID]
_, filtered := allocs.filtered[exist.ID]
// If not present, removed
if !ok {
// If not updated or filtered, removed
if !pulled && !filtered {
result.removed = append(result.removed, exist)
continue
}
// Check for an update
if update.ModifyIndex > exist.ModifyIndex {
result.updated = append(result.updated, allocTuple{exist, update})
if pulled && alloc.AllocModifyIndex > exist.AllocModifyIndex {
result.updated = append(result.updated, allocTuple{exist, alloc})
continue
}
@ -66,9 +61,9 @@ func diffAllocs(existing, updated []*structs.Allocation) *diffResult {
}
// Scan the updated allocations for any that are new
for _, update := range updated {
if _, ok := existIdx[update.ID]; !ok {
result.added = append(result.added, update)
for id, pulled := range allocs.pulled {
if _, ok := existIdx[id]; !ok {
result.added = append(result.added, pulled)
}
}
return result

View File

@ -17,7 +17,7 @@ func TestDiffAllocs(t *testing.T) {
alloc2 := mock.Alloc() // Update
alloc2u := new(structs.Allocation)
*alloc2u = *alloc2
alloc2u.ModifyIndex += 1
alloc2u.AllocModifyIndex += 1
alloc3 := mock.Alloc() // Remove
alloc4 := mock.Alloc() // Add
@ -26,13 +26,17 @@ func TestDiffAllocs(t *testing.T) {
alloc2,
alloc3,
}
updated := []*structs.Allocation{
alloc1,
alloc2u,
alloc4,
update := &allocUpdates{
pulled: map[string]*structs.Allocation{
alloc2u.ID: alloc2u,
alloc4.ID: alloc4,
},
filtered: map[string]struct{}{
alloc1.ID: struct{}{},
},
}
result := diffAllocs(exist, updated)
result := diffAllocs(exist, update)
if len(result.ignore) != 1 || result.ignore[0] != alloc1 {
t.Fatalf("Bad: %#v", result.ignore)

View File

@ -36,9 +36,6 @@ func (s *HTTPServer) NodeSpecificRequest(resp http.ResponseWriter, req *http.Req
case strings.HasSuffix(path, "/evaluate"):
nodeName := strings.TrimSuffix(path, "/evaluate")
return s.nodeForceEvaluate(resp, req, nodeName)
case strings.HasSuffix(path, "/clientallocations"):
nodeName := strings.TrimSuffix(path, "/clientallocations")
return s.nodeClientAllocations(resp, req, nodeName)
case strings.HasSuffix(path, "/allocations"):
nodeName := strings.TrimSuffix(path, "/allocations")
return s.nodeAllocations(resp, req, nodeName)
@ -92,27 +89,6 @@ func (s *HTTPServer) nodeAllocations(resp http.ResponseWriter, req *http.Request
return out.Allocs, nil
}
func (s *HTTPServer) nodeClientAllocations(resp http.ResponseWriter, req *http.Request,
nodeID string) (interface{}, error) {
if req.Method != "GET" {
return nil, CodedError(405, ErrInvalidMethod)
}
args := structs.NodeSpecificRequest{
NodeID: nodeID,
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
var out structs.NodeClientAllocsResponse
if err := s.agent.RPC("Node.GetClientAllocs", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return out.Allocs, nil
}
func (s *HTTPServer) nodeToggleDrain(resp http.ResponseWriter, req *http.Request,
nodeID string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {

View File

@ -214,60 +214,6 @@ func TestHTTP_NodeAllocations(t *testing.T) {
})
}
func TestHTTP_NodeClientAllocations(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Create the job
node := mock.Node()
args := structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.NodeUpdateResponse
if err := s.Agent.RPC("Node.Register", &args, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Directly manipulate the state
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc1.NodeID = node.ID
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/node/"+node.ID+"/clientallocations", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.NodeSpecificRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
if respW.HeaderMap.Get("X-Nomad-KnownLeader") != "true" {
t.Fatalf("missing known leader")
}
if respW.HeaderMap.Get("X-Nomad-LastContact") == "" {
t.Fatalf("missing last contact")
}
// Check the node
allocs := obj.(map[string]uint64)
if len(allocs) != 1 || allocs[alloc1.ID] != 1000 {
t.Fatalf("bad: %#v", allocs)
}
})
}
func TestHTTP_NodeDrain(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Create the node

View File

@ -1,6 +1,7 @@
package nomad
import (
"fmt"
"time"
"github.com/armon/go-metrics"
@ -110,3 +111,39 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest,
}}
return a.srv.blockingRPC(&opts)
}
// GetAllocs is used to lookup a set of allocations
func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
reply *structs.AllocsGetResponse) error {
if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now())
// Lookup the allocations
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
allocs := make([]*structs.Allocation, len(args.AllocIDs))
for i, alloc := range args.AllocIDs {
out, err := snap.AllocByID(alloc)
if err != nil {
return err
}
if out == nil {
return fmt.Errorf("unknown alloc id %q", alloc)
}
allocs[i] = out
if reply.Index < out.ModifyIndex {
reply.Index = out.ModifyIndex
}
}
// Set the response
a.srv.setQueryMeta(&reply.QueryMeta)
reply.Allocs = allocs
return nil
}

View File

@ -192,7 +192,7 @@ func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) {
}
})
// Lookup the jobs
// Lookup the allocs
get := &structs.AllocSpecificRequest{
AllocID: alloc2.ID,
QueryOptions: structs.QueryOptions{
@ -216,3 +216,45 @@ func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) {
t.Fatalf("bad: %#v", resp.Alloc)
}
}
func TestAllocEndpoint_GetAllocs(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
alloc := mock.Alloc()
alloc2 := mock.Alloc()
state := s1.fsm.State()
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Lookup the allocs
get := &structs.AllocsGetRequest{
AllocIDs: []string{alloc.ID, alloc2.ID},
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp structs.AllocsGetResponse
if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index != 1000 {
t.Fatalf("Bad index: %d %d", resp.Index, 1000)
}
if len(resp.Allocs) != 2 {
t.Fatalf("bad: %#v", resp.Allocs)
}
// Lookup non-existent allocs.
get = &structs.AllocsGetRequest{
AllocIDs: []string{"foo"},
QueryOptions: structs.QueryOptions{Region: "global"},
}
if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err == nil {
t.Fatalf("expect error")
}
}

View File

@ -421,6 +421,7 @@ func TestFSM_UpsertAllocs(t *testing.T) {
}
alloc.CreateIndex = out.CreateIndex
alloc.ModifyIndex = out.ModifyIndex
alloc.AllocModifyIndex = out.AllocModifyIndex
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)
}

View File

@ -387,7 +387,7 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest,
return n.srv.blockingRPC(&opts)
}
// GetClientAllocs is used to request a lightweight list of modify indexes
// GetClientAllocs is used to request a lightweight list of alloc modify indexes
// per allocation.
func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
reply *structs.NodeClientAllocsResponse) error {
@ -421,7 +421,7 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
// Setup the output
if len(allocs) != 0 {
for _, alloc := range allocs {
reply.Allocs[alloc.ID] = alloc.ModifyIndex
reply.Allocs[alloc.ID] = alloc.AllocModifyIndex
reply.Index = maxUint64(reply.Index, alloc.ModifyIndex)
}
} else {

View File

@ -666,7 +666,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) {
allocUpdate.NodeID = alloc.NodeID
allocUpdate.ID = alloc.ID
allocUpdate.ClientStatus = structs.AllocClientStatusRunning
err := state.UpdateAllocFromClient(200, allocUpdate)
err := state.UpsertAllocs(200, []*structs.Allocation{allocUpdate})
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -793,10 +793,12 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
if existing == nil {
alloc.CreateIndex = index
alloc.ModifyIndex = index
alloc.AllocModifyIndex = index
} else {
exist := existing.(*structs.Allocation)
alloc.CreateIndex = exist.CreateIndex
alloc.ModifyIndex = index
alloc.AllocModifyIndex = index
alloc.ClientStatus = exist.ClientStatus
alloc.ClientDescription = exist.ClientDescription
}

View File

@ -272,6 +272,12 @@ type AllocSpecificRequest struct {
QueryOptions
}
// AllocsGetcRequest is used to query a set of allocations
type AllocsGetRequest struct {
AllocIDs []string
QueryOptions
}
// PeriodicForceReqeuest is used to force a specific periodic job.
type PeriodicForceRequest struct {
JobID string
@ -378,6 +384,12 @@ type SingleAllocResponse struct {
QueryMeta
}
// AllocsGetResponse is used to return a set of allocations
type AllocsGetResponse struct {
Allocs []*Allocation
QueryMeta
}
// JobAllocationsResponse is used to return the allocations for a job
type JobAllocationsResponse struct {
Allocations []*AllocListStub
@ -1647,8 +1659,9 @@ type Allocation struct {
TaskStates map[string]*TaskState
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
CreateIndex uint64
ModifyIndex uint64
AllocModifyIndex uint64
}
// TerminalStatus returns if the desired or actual status is terminal and

View File

@ -311,44 +311,6 @@ be specified using the `?region=` query parameter.
</dd>
</dl>
<dl>
<dt>Description</dt>
<dd>
Query the allocations belonging to a single node. This endpoint only returns
a map from allocation id to its modify index and is primarily used by the client
to determine which allocations need to be updated.
</dd>
<dt>Method</dt>
<dd>GET</dd>
<dt>URL</dt>
<dd>`/v1/node/<id>/clientallocations`</dd>
<dt>Parameters</dt>
<dd>
None
</dd>
<dt>Blocking Queries</dt>
<dd>
[Supported](/docs/http/index.html#blocking-queries)
</dd>
<dt>Returns</dt>
<dd>
```javascript
{
"d66ea8d7-1d4c-119e-46b3-e23713a4ab72": 9,
"abf34c35-1d4c-119e-46b3-e23713a4ab72": 10
}
```
</dd>
</dl>
## PUT / POST
<dl>