Merge pull request #366 from hashicorp/f-blocking

Support blocking queries
This commit is contained in:
Ryan Uber 2015-11-04 11:18:50 -08:00
commit 38b812178b
13 changed files with 1593 additions and 430 deletions

View file

@ -5,6 +5,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
)
// Alloc endpoint is used for manipulating allocations
@ -19,35 +20,45 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "list"}, time.Now())
// Capture all the allocations
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
iter, err := snap.Allocs()
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Table: "allocs"}),
run: func() error {
// Capture all the allocations
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
iter, err := snap.Allocs()
if err != nil {
return err
}
for {
raw := iter.Next()
if raw == nil {
break
}
alloc := raw.(*structs.Allocation)
reply.Allocations = append(reply.Allocations, alloc.Stub())
}
var allocs []*structs.AllocListStub
for {
raw := iter.Next()
if raw == nil {
break
}
alloc := raw.(*structs.Allocation)
allocs = append(allocs, alloc.Stub())
}
reply.Allocations = allocs
// Use the last index that affected the jobs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
// Use the last index that affected the jobs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return a.srv.blockingRPC(&opts)
}
// GetAlloc is used to lookup a particular allocation
@ -58,30 +69,38 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest,
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now())
// Lookup the allocation
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.AllocByID(args.AllocID)
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Alloc: args.AllocID}),
run: func() error {
// Lookup the allocation
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.AllocByID(args.AllocID)
if err != nil {
return err
}
// Setup the output
if out != nil {
reply.Alloc = out
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
}
// Setup the output
reply.Alloc = out
if out != nil {
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return a.srv.blockingRPC(&opts)
}

View file

@ -3,6 +3,7 @@ package nomad
import (
"reflect"
"testing"
"time"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/mock"
@ -44,6 +45,74 @@ func TestAllocEndpoint_List(t *testing.T) {
}
}
func TestAllocEndpoint_List_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the alloc
alloc := mock.Alloc()
// Upsert alloc triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertAllocs(2, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
})
req := &structs.AllocListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 1,
},
}
start := time.Now()
var resp structs.AllocListResponse
if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 2 {
t.Fatalf("Bad index: %d %d", resp.Index, 2)
}
if len(resp.Allocations) != 1 || resp.Allocations[0].ID != alloc.ID {
t.Fatalf("bad: %#v", resp.Allocations)
}
// Client updates trigger watches
alloc2 := mock.Alloc()
alloc2.ID = alloc.ID
alloc2.ClientStatus = structs.AllocClientStatusRunning
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpdateAllocFromClient(3, alloc2); err != nil {
t.Fatalf("err: %v", err)
}
})
req.MinQueryIndex = 2
start = time.Now()
var resp2 structs.AllocListResponse
if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 3 {
t.Fatalf("Bad index: %d %d", resp2.Index, 3)
}
if len(resp2.Allocations) != 1 || resp.Allocations[0].ID != alloc.ID ||
resp2.Allocations[0].ClientStatus != structs.AllocClientStatusRunning {
t.Fatalf("bad: %#v", resp2.Allocations)
}
}
func TestAllocEndpoint_GetAlloc(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
@ -75,3 +144,55 @@ func TestAllocEndpoint_GetAlloc(t *testing.T) {
t.Fatalf("bad: %#v", resp.Alloc)
}
}
func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the allocs
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
// First create an unrelated alloc
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
})
// Create the alloc we are watching later
time.AfterFunc(200*time.Millisecond, func() {
err := state.UpsertAllocs(200, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
})
// Lookup the jobs
get := &structs.AllocSpecificRequest{
AllocID: alloc2.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 50,
},
}
var resp structs.SingleAllocResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if resp.Alloc == nil || resp.Alloc.ID != alloc2.ID {
t.Fatalf("bad: %#v", resp.Alloc)
}
}

View file

@ -6,6 +6,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
)
const (
@ -26,32 +27,40 @@ func (e *Eval) GetEval(args *structs.EvalSpecificRequest,
}
defer metrics.MeasureSince([]string{"nomad", "eval", "get_eval"}, time.Now())
// Look for the job
snap, err := e.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.EvalByID(args.EvalID)
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Eval: args.EvalID}),
run: func() error {
// Look for the job
snap, err := e.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.EvalByID(args.EvalID)
if err != nil {
return err
}
// Setup the output
if out != nil {
reply.Eval = out
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("evals")
if err != nil {
return err
}
reply.Index = index
}
// Setup the output
reply.Eval = out
if out != nil {
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("evals")
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
e.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Set the query response
e.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return e.srv.blockingRPC(&opts)
}
// Dequeue is used to dequeue a pending evaluation
@ -219,35 +228,45 @@ func (e *Eval) List(args *structs.EvalListRequest,
}
defer metrics.MeasureSince([]string{"nomad", "eval", "list"}, time.Now())
// Scan all the evaluations
snap, err := e.srv.fsm.State().Snapshot()
if err != nil {
return err
}
iter, err := snap.Evals()
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Table: "evals"}),
run: func() error {
// Scan all the evaluations
snap, err := e.srv.fsm.State().Snapshot()
if err != nil {
return err
}
iter, err := snap.Evals()
if err != nil {
return err
}
for {
raw := iter.Next()
if raw == nil {
break
}
eval := raw.(*structs.Evaluation)
reply.Evaluations = append(reply.Evaluations, eval)
}
var evals []*structs.Evaluation
for {
raw := iter.Next()
if raw == nil {
break
}
eval := raw.(*structs.Evaluation)
evals = append(evals, eval)
}
reply.Evaluations = evals
// Use the last index that affected the jobs table
index, err := snap.Index("evals")
if err != nil {
return err
}
reply.Index = index
// Use the last index that affected the jobs table
index, err := snap.Index("evals")
if err != nil {
return err
}
reply.Index = index
// Set the query response
e.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Set the query response
e.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return e.srv.blockingRPC(&opts)
}
// Allocations is used to list the allocations for an evaluation
@ -258,32 +277,40 @@ func (e *Eval) Allocations(args *structs.EvalSpecificRequest,
}
defer metrics.MeasureSince([]string{"nomad", "eval", "allocations"}, time.Now())
// Capture the allocations
snap, err := e.srv.fsm.State().Snapshot()
if err != nil {
return err
}
allocs, err := snap.AllocsByEval(args.EvalID)
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{AllocEval: args.EvalID}),
run: func() error {
// Capture the allocations
snap, err := e.srv.fsm.State().Snapshot()
if err != nil {
return err
}
allocs, err := snap.AllocsByEval(args.EvalID)
if err != nil {
return err
}
// Convert to a stub
if len(allocs) > 0 {
reply.Allocations = make([]*structs.AllocListStub, 0, len(allocs))
for _, alloc := range allocs {
reply.Allocations = append(reply.Allocations, alloc.Stub())
}
}
// Convert to a stub
if len(allocs) > 0 {
reply.Allocations = make([]*structs.AllocListStub, 0, len(allocs))
for _, alloc := range allocs {
reply.Allocations = append(reply.Allocations, alloc.Stub())
}
}
// Use the last index that affected the allocs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
// Use the last index that affected the allocs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
// Set the query response
e.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Set the query response
e.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return e.srv.blockingRPC(&opts)
}

View file

@ -51,6 +51,83 @@ func TestEvalEndpoint_GetEval(t *testing.T) {
}
}
func TestEvalEndpoint_GetEval_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the evals
eval1 := mock.Eval()
eval2 := mock.Eval()
// First create an unrelated eval
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertEvals(100, []*structs.Evaluation{eval1})
if err != nil {
t.Fatalf("err: %v", err)
}
})
// Upsert the eval we are watching later
time.AfterFunc(200*time.Millisecond, func() {
err := state.UpsertEvals(200, []*structs.Evaluation{eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
})
// Lookup the eval
req := &structs.EvalSpecificRequest{
EvalID: eval2.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 50,
},
}
var resp structs.SingleEvalResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if resp.Eval == nil || resp.Eval.ID != eval2.ID {
t.Fatalf("bad: %#v", resp.Eval)
}
// Eval delete triggers watches
time.AfterFunc(100*time.Millisecond, func() {
err := state.DeleteEval(300, []string{eval2.ID}, []string{})
if err != nil {
t.Fatalf("err: %v", err)
}
})
req.QueryOptions.MinQueryIndex = 250
var resp2 structs.SingleEvalResponse
start = time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 300 {
t.Fatalf("Bad index: %d %d", resp2.Index, 300)
}
if resp2.Eval != nil {
t.Fatalf("bad: %#v", resp2.Eval)
}
}
func TestEvalEndpoint_Dequeue(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
@ -334,6 +411,70 @@ func TestEvalEndpoint_List(t *testing.T) {
}
}
func TestEvalEndpoint_List_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the ieval
eval := mock.Eval()
// Upsert eval triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertEvals(2, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}
})
req := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 1,
},
}
start := time.Now()
var resp structs.EvalListResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.List", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 2 {
t.Fatalf("Bad index: %d %d", resp.Index, 2)
}
if len(resp.Evaluations) != 1 || resp.Evaluations[0].ID != eval.ID {
t.Fatalf("bad: %#v", resp.Evaluations)
}
// Eval deletion triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteEval(3, []string{eval.ID}, nil); err != nil {
t.Fatalf("err: %v", err)
}
})
req.MinQueryIndex = 2
start = time.Now()
var resp2 structs.EvalListResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.List", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 3 {
t.Fatalf("Bad index: %d %d", resp2.Index, 3)
}
if len(resp2.Evaluations) != 0 {
t.Fatalf("bad: %#v", resp2.Evaluations)
}
}
func TestEvalEndpoint_Allocations(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
@ -368,3 +509,55 @@ func TestEvalEndpoint_Allocations(t *testing.T) {
t.Fatalf("bad: %#v", resp.Allocations)
}
}
func TestEvalEndpoint_Allocations_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the allocs
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
// Upsert an unrelated alloc first
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
})
// Upsert an alloc which will trigger the watch later
time.AfterFunc(200*time.Millisecond, func() {
err := state.UpsertAllocs(200, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
})
// Lookup the eval
get := &structs.EvalSpecificRequest{
EvalID: alloc2.EvalID,
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 50,
},
}
var resp structs.EvalAllocationsResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Eval.Allocations", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if len(resp.Allocations) != 1 || resp.Allocations[0].ID != alloc2.ID {
t.Fatalf("bad: %#v", resp.Allocations)
}
}

View file

@ -6,6 +6,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
)
// Job endpoint is used for job interactions
@ -180,32 +181,41 @@ func (j *Job) GetJob(args *structs.JobSpecificRequest,
}
defer metrics.MeasureSince([]string{"nomad", "job", "get_job"}, time.Now())
// Look for the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.JobByID(args.JobID)
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Job: args.JobID}),
run: func() error {
// Setup the output
if out != nil {
reply.Job = out
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("jobs")
if err != nil {
return err
}
reply.Index = index
}
// Look for the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.JobByID(args.JobID)
if err != nil {
return err
}
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Setup the output
reply.Job = out
if out != nil {
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("jobs")
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
// List is used to list the jobs registered in the system
@ -216,35 +226,45 @@ func (j *Job) List(args *structs.JobListRequest,
}
defer metrics.MeasureSince([]string{"nomad", "job", "list"}, time.Now())
// Capture all the jobs
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
iter, err := snap.Jobs()
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Table: "jobs"}),
run: func() error {
// Capture all the jobs
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
iter, err := snap.Jobs()
if err != nil {
return err
}
for {
raw := iter.Next()
if raw == nil {
break
}
job := raw.(*structs.Job)
reply.Jobs = append(reply.Jobs, job.Stub())
}
var jobs []*structs.JobListStub
for {
raw := iter.Next()
if raw == nil {
break
}
job := raw.(*structs.Job)
jobs = append(jobs, job.Stub())
}
reply.Jobs = jobs
// Use the last index that affected the jobs table
index, err := snap.Index("jobs")
if err != nil {
return err
}
reply.Index = index
// Use the last index that affected the jobs table
index, err := snap.Index("jobs")
if err != nil {
return err
}
reply.Index = index
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
// Allocations is used to list the allocations for a job
@ -255,34 +275,43 @@ func (j *Job) Allocations(args *structs.JobSpecificRequest,
}
defer metrics.MeasureSince([]string{"nomad", "job", "allocations"}, time.Now())
// Capture the allocations
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
allocs, err := snap.AllocsByJob(args.JobID)
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{AllocJob: args.JobID}),
run: func() error {
// Capture the allocations
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
allocs, err := snap.AllocsByJob(args.JobID)
if err != nil {
return err
}
// Convert to stubs
if len(allocs) > 0 {
reply.Allocations = make([]*structs.AllocListStub, 0, len(allocs))
for _, alloc := range allocs {
reply.Allocations = append(reply.Allocations, alloc.Stub())
}
}
// Convert to stubs
if len(allocs) > 0 {
reply.Allocations = make([]*structs.AllocListStub, 0, len(allocs))
for _, alloc := range allocs {
reply.Allocations = append(reply.Allocations, alloc.Stub())
}
}
// Use the last index that affected the allocs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
// Use the last index that affected the allocs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
// Evaluations is used to list the evaluations for a job

View file

@ -3,6 +3,7 @@ package nomad
import (
"reflect"
"testing"
"time"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/mock"
@ -363,6 +364,80 @@ func TestJobEndpoint_GetJob(t *testing.T) {
}
}
func TestJobEndpoint_GetJob_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the jobs
job1 := mock.Job()
job2 := mock.Job()
// Upsert a job we are not interested in first.
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertJob(100, job1); err != nil {
t.Fatalf("err: %v", err)
}
})
// Upsert another job later which should trigger the watch.
time.AfterFunc(200*time.Millisecond, func() {
if err := state.UpsertJob(200, job2); err != nil {
t.Fatalf("err: %v", err)
}
})
req := &structs.JobSpecificRequest{
JobID: job2.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 50,
},
}
start := time.Now()
var resp structs.SingleJobResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.GetJob", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if resp.Job == nil || resp.Job.ID != job2.ID {
t.Fatalf("bad: %#v", resp.Job)
}
// Job delete fires watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteJob(300, job2.ID); err != nil {
t.Fatalf("err: %v", err)
}
})
req.QueryOptions.MinQueryIndex = 250
start = time.Now()
var resp2 structs.SingleJobResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.GetJob", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 300 {
t.Fatalf("Bad index: %d %d", resp2.Index, 300)
}
if resp2.Job != nil {
t.Fatalf("bad: %#v", resp2.Job)
}
}
func TestJobEndpoint_ListJobs(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
@ -397,6 +472,70 @@ func TestJobEndpoint_ListJobs(t *testing.T) {
}
}
func TestJobEndpoint_ListJobs_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the job
job := mock.Job()
// Upsert job triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertJob(100, job); err != nil {
t.Fatalf("err: %v", err)
}
})
req := &structs.JobListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 50,
},
}
start := time.Now()
var resp structs.JobListResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.List", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 100 {
t.Fatalf("Bad index: %d %d", resp.Index, 100)
}
if len(resp.Jobs) != 1 || resp.Jobs[0].ID != job.ID {
t.Fatalf("bad: %#v", resp.Jobs)
}
// Job deletion triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteJob(200, job.ID); err != nil {
t.Fatalf("err: %v", err)
}
})
req.MinQueryIndex = 150
start = time.Now()
var resp2 structs.JobListResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.List", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 200 {
t.Fatalf("Bad index: %d %d", resp2.Index, 200)
}
if len(resp2.Jobs) != 0 {
t.Fatalf("bad: %#v", resp2.Jobs)
}
}
func TestJobEndpoint_Allocations(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
@ -432,6 +571,59 @@ func TestJobEndpoint_Allocations(t *testing.T) {
}
}
func TestJobEndpoint_Allocations_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
alloc2.JobID = "job1"
state := s1.fsm.State()
// First upsert an unrelated alloc
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
})
// Upsert an alloc for the job we are interested in later
time.AfterFunc(200*time.Millisecond, func() {
err := state.UpsertAllocs(200, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
})
// Lookup the jobs
get := &structs.JobSpecificRequest{
JobID: "job1",
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 50,
},
}
var resp structs.JobAllocationsResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Job.Allocations", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if len(resp.Allocations) != 1 || resp.Allocations[0].JobID != "job1" {
t.Fatalf("bad: %#v", resp.Allocations)
}
}
func TestJobEndpoint_Evaluations(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()

View file

@ -6,6 +6,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
)
// Node endpoint is used for client interactions
@ -282,37 +283,45 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest,
}
defer metrics.MeasureSince([]string{"nomad", "client", "get_node"}, time.Now())
// Verify the arguments
if args.NodeID == "" {
return fmt.Errorf("missing node ID")
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Node: args.NodeID}),
run: func() error {
// Verify the arguments
if args.NodeID == "" {
return fmt.Errorf("missing node ID")
}
// Look for the node
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.NodeByID(args.NodeID)
if err != nil {
return err
}
// Look for the node
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.NodeByID(args.NodeID)
if err != nil {
return err
}
// Setup the output
if out != nil {
reply.Node = out
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("nodes")
if err != nil {
return err
}
reply.Index = index
}
// Setup the output
reply.Node = out
if out != nil {
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("nodes")
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
n.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Set the query response
n.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return n.srv.blockingRPC(&opts)
}
// GetAllocs is used to request allocations for a specific node
@ -330,9 +339,9 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest,
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
allocWatch: args.NodeID,
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{AllocNode: args.NodeID}),
run: func() error {
// Look for the node
snap, err := n.srv.fsm.State().Snapshot()
@ -404,35 +413,45 @@ func (n *Node) List(args *structs.NodeListRequest,
}
defer metrics.MeasureSince([]string{"nomad", "client", "list"}, time.Now())
// Capture all the nodes
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
iter, err := snap.Nodes()
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Table: "nodes"}),
run: func() error {
// Capture all the nodes
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
iter, err := snap.Nodes()
if err != nil {
return err
}
for {
raw := iter.Next()
if raw == nil {
break
}
node := raw.(*structs.Node)
reply.Nodes = append(reply.Nodes, node.Stub())
}
var nodes []*structs.NodeListStub
for {
raw := iter.Next()
if raw == nil {
break
}
node := raw.(*structs.Node)
nodes = append(nodes, node.Stub())
}
reply.Nodes = nodes
// Use the last index that affected the jobs table
index, err := snap.Index("nodes")
if err != nil {
return err
}
reply.Index = index
// Use the last index that affected the jobs table
index, err := snap.Index("nodes")
if err != nil {
return err
}
reply.Index = index
// Set the query response
n.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Set the query response
n.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return n.srv.blockingRPC(&opts)
}
// createNodeEvals is used to create evaluations for each alloc on a node.

View file

@ -371,6 +371,107 @@ func TestClientEndpoint_GetNode(t *testing.T) {
}
}
func TestClientEndpoint_GetNode_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the node
node1 := mock.Node()
node2 := mock.Node()
// First create an unrelated node.
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertNode(100, node1); err != nil {
t.Fatalf("err: %v", err)
}
})
// Upsert the node we are watching later
time.AfterFunc(200*time.Millisecond, func() {
if err := state.UpsertNode(200, node2); err != nil {
t.Fatalf("err: %v", err)
}
})
// Lookup the node
req := &structs.NodeSpecificRequest{
NodeID: node2.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 50,
},
}
var resp structs.SingleNodeResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if resp.Node == nil || resp.Node.ID != node2.ID {
t.Fatalf("bad: %#v", resp.Node)
}
// Node update triggers watches
time.AfterFunc(100*time.Millisecond, func() {
nodeUpdate := mock.Node()
nodeUpdate.ID = node2.ID
nodeUpdate.Status = structs.NodeStatusDown
if err := state.UpsertNode(300, nodeUpdate); err != nil {
t.Fatalf("err: %v", err)
}
})
req.QueryOptions.MinQueryIndex = 250
var resp2 structs.SingleNodeResponse
start = time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp2.Index != 300 {
t.Fatalf("Bad index: %d %d", resp2.Index, 300)
}
if resp2.Node == nil || resp2.Node.Status != structs.NodeStatusDown {
t.Fatalf("bad: %#v", resp2.Node)
}
// Node delete triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteNode(400, node2.ID); err != nil {
t.Fatalf("err: %v", err)
}
})
req.QueryOptions.MinQueryIndex = 350
var resp3 structs.SingleNodeResponse
start = time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp3); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp3.Index != 400 {
t.Fatalf("Bad index: %d %d", resp2.Index, 400)
}
if resp3.Node != nil {
t.Fatalf("bad: %#v", resp3.Node)
}
}
func TestClientEndpoint_GetAllocs(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
@ -457,16 +558,15 @@ func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) {
alloc.NodeID = node.ID
state := s1.fsm.State()
start := time.Now()
go func() {
time.Sleep(100 * time.Millisecond)
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertAllocs(100, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
}()
})
// Lookup the allocs in a blocking query
get := &structs.NodeSpecificRequest{
req := &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
@ -475,7 +575,7 @@ func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) {
},
}
var resp2 structs.NodeAllocsResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", get, &resp2); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
@ -491,6 +591,34 @@ func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) {
if len(resp2.Allocs) != 1 || resp2.Allocs[0].ID != alloc.ID {
t.Fatalf("bad: %#v", resp2.Allocs)
}
// Alloc updates fire watches
time.AfterFunc(100*time.Millisecond, func() {
allocUpdate := mock.Alloc()
allocUpdate.NodeID = alloc.NodeID
allocUpdate.ID = alloc.ID
allocUpdate.ClientStatus = structs.AllocClientStatusRunning
err := state.UpdateAllocFromClient(200, allocUpdate)
if err != nil {
t.Fatalf("err: %v", err)
}
})
req.QueryOptions.MinQueryIndex = 150
var resp3 structs.NodeAllocsResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp3); err != nil {
t.Fatalf("err: %v", err)
}
if time.Since(start) < 100*time.Millisecond {
t.Fatalf("too fast")
}
if resp3.Index != 200 {
t.Fatalf("Bad index: %d %d", resp3.Index, 200)
}
if len(resp3.Allocs) != 1 || resp3.Allocs[0].ClientStatus != structs.AllocClientStatusRunning {
t.Fatalf("bad: %#v", resp3.Allocs[0])
}
}
func TestClientEndpoint_UpdateAlloc(t *testing.T) {
@ -752,3 +880,115 @@ func TestClientEndpoint_ListNodes(t *testing.T) {
t.Fatalf("bad: %#v", resp2.Nodes[0])
}
}
func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the node
node := mock.Node()
// Node upsert triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertNode(2, node); err != nil {
t.Fatalf("err: %v", err)
}
})
req := &structs.NodeListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 1,
},
}
start := time.Now()
var resp structs.NodeListResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 2 {
t.Fatalf("Bad index: %d %d", resp.Index, 2)
}
if len(resp.Nodes) != 1 || resp.Nodes[0].ID != node.ID {
t.Fatalf("bad: %#v", resp.Nodes)
}
// Node drain updates trigger watches.
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpdateNodeDrain(3, node.ID, true); err != nil {
t.Fatalf("err: %v", err)
}
})
req.MinQueryIndex = 2
var resp2 structs.NodeListResponse
start = time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 3 {
t.Fatalf("Bad index: %d %d", resp2.Index, 3)
}
if len(resp2.Nodes) != 1 || !resp2.Nodes[0].Drain {
t.Fatalf("bad: %#v", resp2.Nodes)
}
// Node status update triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpdateNodeStatus(4, node.ID, structs.NodeStatusDown); err != nil {
t.Fatalf("err: %v", err)
}
})
req.MinQueryIndex = 3
var resp3 structs.NodeListResponse
start = time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp3); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp3)
}
if resp3.Index != 4 {
t.Fatalf("Bad index: %d %d", resp3.Index, 4)
}
if len(resp3.Nodes) != 1 || resp3.Nodes[0].Status != structs.NodeStatusDown {
t.Fatalf("bad: %#v", resp3.Nodes)
}
// Node delete triggers watches.
time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteNode(5, node.ID); err != nil {
t.Fatalf("err: %v", err)
}
})
req.MinQueryIndex = 4
var resp4 structs.NodeListResponse
start = time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp4); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp4)
}
if resp4.Index != 5 {
t.Fatalf("Bad index: %d %d", resp4.Index, 5)
}
if len(resp4.Nodes) != 0 {
t.Fatalf("bad: %#v", resp4.Nodes)
}
}

View file

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
"github.com/hashicorp/raft"
"github.com/hashicorp/yamux"
)
@ -268,10 +269,10 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) {
// blockingOptions is used to parameterize blockingRPC
type blockingOptions struct {
queryOpts *structs.QueryOptions
queryMeta *structs.QueryMeta
allocWatch string
run func() error
queryOpts *structs.QueryOptions
queryMeta *structs.QueryMeta
watch watch.Items
run func() error
}
// blockingRPC is used for queries that need to wait for a
@ -306,17 +307,13 @@ func (s *Server) blockingRPC(opts *blockingOptions) error {
state = s.fsm.State()
defer func() {
timeout.Stop()
if opts.allocWatch != "" {
state.StopWatchAllocs(opts.allocWatch, notifyCh)
}
state.StopWatch(opts.watch, notifyCh)
}()
REGISTER_NOTIFY:
// Register the notification channel. This may be done
// multiple times if we have not reached the target wait index.
if opts.allocWatch != "" {
state.WatchAllocs(opts.allocWatch, notifyCh)
}
state.Watch(opts.watch, notifyCh)
RUN_QUERY:
// Update the query meta data
@ -327,7 +324,7 @@ RUN_QUERY:
err := opts.run()
// Check for minimum query time
if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex {
if err == nil && opts.queryOpts.MinQueryIndex > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex {
select {
case <-notifyCh:
goto REGISTER_NOTIFY

View file

@ -8,8 +8,16 @@ import (
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
)
// IndexEntry is used with the "index" table
// for managing the latest Raft index affecting a table.
type IndexEntry struct {
Key string
Value uint64
}
// The StateStore is responsible for maintaining all the Nomad
// state. It is manipulated by the FSM which maintains consistency
// through the use of Raft. The goals of the StateStore are to provide
@ -23,45 +31,6 @@ type StateStore struct {
watch *stateWatch
}
// StateSnapshot is used to provide a point-in-time snapshot
type StateSnapshot struct {
StateStore
}
// StateRestore is used to optimize the performance when
// restoring state by only using a single large transaction
// instead of thousands of sub transactions
type StateRestore struct {
txn *memdb.Txn
watch *stateWatch
allocNodes map[string]struct{}
}
// Abort is used to abort the restore operation
func (s *StateRestore) Abort() {
s.txn.Abort()
}
// Commit is used to commit the restore operation
func (s *StateRestore) Commit() {
s.txn.Defer(func() { s.watch.notifyAllocs(s.allocNodes) })
s.txn.Commit()
}
// IndexEntry is used with the "index" table
// for managing the latest Raft index affecting a table.
type IndexEntry struct {
Key string
Value uint64
}
// stateWatch holds shared state for watching updates. This is
// outside of StateStore so it can be shared with snapshots.
type stateWatch struct {
allocs map[string]*NotifyGroup
allocLock sync.Mutex
}
// NewStateStore is used to create a new state store
func NewStateStore(logOutput io.Writer) (*StateStore, error) {
// Create the MemDB
@ -70,16 +39,11 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) {
return nil, fmt.Errorf("state store setup failed: %v", err)
}
// Create the watch entry
watch := &stateWatch{
allocs: make(map[string]*NotifyGroup),
}
// Create the state store
s := &StateStore{
logger: log.New(logOutput, "", log.LstdFlags),
db: db,
watch: watch,
watch: newStateWatch(),
}
return s, nil
}
@ -104,55 +68,21 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
func (s *StateStore) Restore() (*StateRestore, error) {
txn := s.db.Txn(true)
r := &StateRestore{
txn: txn,
watch: s.watch,
allocNodes: make(map[string]struct{}),
txn: txn,
watch: s.watch,
items: watch.NewItems(),
}
return r, nil
}
// WatchAllocs is used to subscribe a channel to changes in allocations for a node
func (s *StateStore) WatchAllocs(node string, notify chan struct{}) {
s.watch.allocLock.Lock()
defer s.watch.allocLock.Unlock()
// Check for an existing notify group
if grp, ok := s.watch.allocs[node]; ok {
grp.Wait(notify)
return
}
// Create new notify group
grp := &NotifyGroup{}
grp.Wait(notify)
s.watch.allocs[node] = grp
// Watch subscribes a channel to a set of watch items.
func (s *StateStore) Watch(items watch.Items, notify chan struct{}) {
s.watch.watch(items, notify)
}
// StopWatchAllocs is used to unsubscribe a channel from changes in allocations
func (s *StateStore) StopWatchAllocs(node string, notify chan struct{}) {
s.watch.allocLock.Lock()
defer s.watch.allocLock.Unlock()
// Check for an existing notify group
if grp, ok := s.watch.allocs[node]; ok {
grp.Clear(notify)
if grp.Empty() {
delete(s.watch.allocs, node)
}
}
}
// notifyAllocs is used to notify any node alloc listeners of a change
func (w *stateWatch) notifyAllocs(nodes map[string]struct{}) {
w.allocLock.Lock()
defer w.allocLock.Unlock()
for node := range nodes {
if grp, ok := w.allocs[node]; ok {
grp.Notify()
delete(w.allocs, node)
}
}
// StopWatch unsubscribes a channel from a set of watch items.
func (s *StateStore) StopWatch(items watch.Items, notify chan struct{}) {
s.watch.stopWatch(items, notify)
}
// UpsertNode is used to register a node or update a node definition
@ -162,6 +92,10 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "nodes"})
watcher.Add(watch.Item{Node: node.ID})
// Check if the node already exists
existing, err := txn.First("nodes", "id", node.ID)
if err != nil {
@ -187,6 +121,7 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
@ -196,6 +131,10 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "nodes"})
watcher.Add(watch.Item{Node: nodeID})
// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
if err != nil {
@ -213,6 +152,7 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
@ -222,6 +162,10 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "nodes"})
watcher.Add(watch.Item{Node: nodeID})
// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
if err != nil {
@ -248,6 +192,7 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
@ -257,6 +202,10 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "nodes"})
watcher.Add(watch.Item{Node: nodeID})
// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
if err != nil {
@ -283,6 +232,7 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
@ -319,6 +269,10 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "jobs"})
watcher.Add(watch.Item{Job: job.ID})
// Check if the job already exists
existing, err := txn.First("jobs", "id", job.ID)
if err != nil {
@ -342,6 +296,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
@ -351,6 +306,10 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "jobs"})
watcher.Add(watch.Item{Job: jobID})
// Lookup the node
existing, err := txn.First("jobs", "id", jobID)
if err != nil {
@ -368,6 +327,7 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
@ -417,13 +377,18 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "evals"})
// Do a nested upsert
for _, eval := range evals {
watcher.Add(watch.Item{Eval: eval.ID})
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
return err
}
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
@ -459,7 +424,9 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error {
txn := s.db.Txn(true)
defer txn.Abort()
nodes := make(map[string]struct{})
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "evals"})
watcher.Add(watch.Item{Table: "allocs"})
for _, eval := range evals {
existing, err := txn.First("evals", "id", eval)
@ -472,6 +439,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
if err := txn.Delete("evals", existing); err != nil {
return fmt.Errorf("eval delete failed: %v", err)
}
watcher.Add(watch.Item{Eval: eval})
}
for _, alloc := range allocs {
@ -482,10 +450,14 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
if existing == nil {
continue
}
nodes[existing.(*structs.Allocation).NodeID] = struct{}{}
if err := txn.Delete("allocs", existing); err != nil {
return fmt.Errorf("alloc delete failed: %v", err)
}
realAlloc := existing.(*structs.Allocation)
watcher.Add(watch.Item{Alloc: realAlloc.ID})
watcher.Add(watch.Item{AllocEval: realAlloc.EvalID})
watcher.Add(watch.Item{AllocJob: realAlloc.JobID})
watcher.Add(watch.Item{AllocNode: realAlloc.NodeID})
}
// Update the indexes
@ -495,7 +467,8 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notifyAllocs(nodes) })
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
@ -557,6 +530,13 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "allocs"})
watcher.Add(watch.Item{Alloc: alloc.ID})
watcher.Add(watch.Item{AllocEval: alloc.EvalID})
watcher.Add(watch.Item{AllocJob: alloc.JobID})
watcher.Add(watch.Item{AllocNode: alloc.NodeID})
// Look for existing alloc
existing, err := txn.First("allocs", "id", alloc.ID)
if err != nil {
@ -590,8 +570,7 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati
return fmt.Errorf("index update failed: %v", err)
}
nodes := map[string]struct{}{alloc.NodeID: struct{}{}}
txn.Defer(func() { s.watch.notifyAllocs(nodes) })
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
@ -601,7 +580,9 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati
func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) error {
txn := s.db.Txn(true)
defer txn.Abort()
nodes := make(map[string]struct{})
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "allocs"})
// Handle the allocations
for _, alloc := range allocs {
@ -620,10 +601,14 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
alloc.ClientStatus = exist.ClientStatus
alloc.ClientDescription = exist.ClientDescription
}
nodes[alloc.NodeID] = struct{}{}
if err := txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
watcher.Add(watch.Item{Alloc: alloc.ID})
watcher.Add(watch.Item{AllocEval: alloc.EvalID})
watcher.Add(watch.Item{AllocJob: alloc.JobID})
watcher.Add(watch.Item{AllocNode: alloc.NodeID})
}
// Update the indexes
@ -631,7 +616,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notifyAllocs(nodes) })
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
@ -753,8 +738,35 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) {
return iter, nil
}
// StateSnapshot is used to provide a point-in-time snapshot
type StateSnapshot struct {
StateStore
}
// StateRestore is used to optimize the performance when
// restoring state by only using a single large transaction
// instead of thousands of sub transactions
type StateRestore struct {
txn *memdb.Txn
watch *stateWatch
items watch.Items
}
// Abort is used to abort the restore operation
func (s *StateRestore) Abort() {
s.txn.Abort()
}
// Commit is used to commit the restore operation
func (s *StateRestore) Commit() {
s.txn.Defer(func() { s.watch.notify(s.items) })
s.txn.Commit()
}
// NodeRestore is used to restore a node
func (r *StateRestore) NodeRestore(node *structs.Node) error {
r.items.Add(watch.Item{Table: "nodes"})
r.items.Add(watch.Item{Node: node.ID})
if err := r.txn.Insert("nodes", node); err != nil {
return fmt.Errorf("node insert failed: %v", err)
}
@ -763,6 +775,8 @@ func (r *StateRestore) NodeRestore(node *structs.Node) error {
// JobRestore is used to restore a job
func (r *StateRestore) JobRestore(job *structs.Job) error {
r.items.Add(watch.Item{Table: "jobs"})
r.items.Add(watch.Item{Job: job.ID})
if err := r.txn.Insert("jobs", job); err != nil {
return fmt.Errorf("job insert failed: %v", err)
}
@ -771,6 +785,8 @@ func (r *StateRestore) JobRestore(job *structs.Job) error {
// EvalRestore is used to restore an evaluation
func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error {
r.items.Add(watch.Item{Table: "evals"})
r.items.Add(watch.Item{Eval: eval.ID})
if err := r.txn.Insert("evals", eval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
}
@ -779,7 +795,11 @@ func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error {
// AllocRestore is used to restore an allocation
func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error {
r.allocNodes[alloc.NodeID] = struct{}{}
r.items.Add(watch.Item{Table: "allocs"})
r.items.Add(watch.Item{Alloc: alloc.ID})
r.items.Add(watch.Item{AllocEval: alloc.EvalID})
r.items.Add(watch.Item{AllocJob: alloc.JobID})
r.items.Add(watch.Item{AllocNode: alloc.NodeID})
if err := r.txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
@ -793,3 +813,59 @@ func (r *StateRestore) IndexRestore(idx *IndexEntry) error {
}
return nil
}
// stateWatch holds shared state for watching updates. This is
// outside of StateStore so it can be shared with snapshots.
type stateWatch struct {
items map[watch.Item]*NotifyGroup
l sync.Mutex
}
// newStateWatch creates a new stateWatch for change notification.
func newStateWatch() *stateWatch {
return &stateWatch{
items: make(map[watch.Item]*NotifyGroup),
}
}
// watch subscribes a channel to the given watch items.
func (w *stateWatch) watch(items watch.Items, ch chan struct{}) {
w.l.Lock()
defer w.l.Unlock()
for item, _ := range items {
grp, ok := w.items[item]
if !ok {
grp = new(NotifyGroup)
w.items[item] = grp
}
grp.Wait(ch)
}
}
// stopWatch unsubscribes a channel from the given watch items.
func (w *stateWatch) stopWatch(items watch.Items, ch chan struct{}) {
w.l.Lock()
defer w.l.Unlock()
for item, _ := range items {
if grp, ok := w.items[item]; ok {
grp.Clear(ch)
if grp.Empty() {
delete(w.items, item)
}
}
}
}
// notify is used to fire notifications on the given watch items.
func (w *stateWatch) notify(items watch.Items) {
w.l.Lock()
defer w.l.Unlock()
for wi, _ := range items {
if grp, ok := w.items[wi]; ok {
grp.Notify()
}
}
}

View file

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
)
func testStateStore(t *testing.T) *StateStore {
@ -25,6 +26,11 @@ func TestStateStore_UpsertNode_Node(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
notify := setupNotifyTest(
state,
watch.Item{Table: "nodes"},
watch.Item{Node: node.ID})
err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
@ -46,12 +52,19 @@ func TestStateStore_UpsertNode_Node(t *testing.T) {
if index != 1000 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_DeleteNode_Node(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
notify := setupNotifyTest(
state,
watch.Item{Table: "nodes"},
watch.Item{Node: node.ID})
err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
@ -78,12 +91,19 @@ func TestStateStore_DeleteNode_Node(t *testing.T) {
if index != 1001 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
notify := setupNotifyTest(
state,
watch.Item{Table: "nodes"},
watch.Item{Node: node.ID})
err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
@ -113,12 +133,19 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
if index != 1001 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
notify := setupNotifyTest(
state,
watch.Item{Table: "nodes"},
watch.Item{Node: node.ID})
err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
@ -148,6 +175,8 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
if index != 1001 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_Nodes(t *testing.T) {
@ -188,18 +217,22 @@ func TestStateStore_Nodes(t *testing.T) {
func TestStateStore_RestoreNode(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
notify := setupNotifyTest(
state,
watch.Item{Table: "nodes"},
watch.Item{Node: node.ID})
restore, err := state.Restore()
if err != nil {
t.Fatalf("err: %v", err)
}
node := mock.Node()
err = restore.NodeRestore(node)
if err != nil {
t.Fatalf("err: %v", err)
}
restore.Commit()
out, err := state.NodeByID(node.ID)
@ -210,12 +243,19 @@ func TestStateStore_RestoreNode(t *testing.T) {
if !reflect.DeepEqual(out, node) {
t.Fatalf("Bad: %#v %#v", out, node)
}
notify.verify(t)
}
func TestStateStore_UpsertJob_Job(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
notify := setupNotifyTest(
state,
watch.Item{Table: "jobs"},
watch.Item{Job: job.ID})
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
@ -237,12 +277,19 @@ func TestStateStore_UpsertJob_Job(t *testing.T) {
if index != 1000 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
notify := setupNotifyTest(
state,
watch.Item{Table: "jobs"},
watch.Item{Job: job.ID})
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
@ -278,12 +325,19 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
if index != 1001 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_DeleteJob_Job(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
notify := setupNotifyTest(
state,
watch.Item{Table: "jobs"},
watch.Item{Job: job.ID})
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
@ -310,6 +364,8 @@ func TestStateStore_DeleteJob_Job(t *testing.T) {
if index != 1001 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_Jobs(t *testing.T) {
@ -417,18 +473,22 @@ func TestStateStore_JobsByScheduler(t *testing.T) {
func TestStateStore_RestoreJob(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
notify := setupNotifyTest(
state,
watch.Item{Table: "jobs"},
watch.Item{Job: job.ID})
restore, err := state.Restore()
if err != nil {
t.Fatalf("err: %v", err)
}
job := mock.Job()
err = restore.JobRestore(job)
if err != nil {
t.Fatalf("err: %v", err)
}
restore.Commit()
out, err := state.JobByID(job.ID)
@ -439,6 +499,8 @@ func TestStateStore_RestoreJob(t *testing.T) {
if !reflect.DeepEqual(out, job) {
t.Fatalf("Bad: %#v %#v", out, job)
}
notify.verify(t)
}
func TestStateStore_Indexes(t *testing.T) {
@ -503,6 +565,11 @@ func TestStateStore_UpsertEvals_Eval(t *testing.T) {
state := testStateStore(t)
eval := mock.Eval()
notify := setupNotifyTest(
state,
watch.Item{Table: "evals"},
watch.Item{Eval: eval.ID})
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
@ -524,6 +591,8 @@ func TestStateStore_UpsertEvals_Eval(t *testing.T) {
if index != 1000 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_Update_UpsertEvals_Eval(t *testing.T) {
@ -535,6 +604,11 @@ func TestStateStore_Update_UpsertEvals_Eval(t *testing.T) {
t.Fatalf("err: %v", err)
}
notify := setupNotifyTest(
state,
watch.Item{Table: "evals"},
watch.Item{Eval: eval.ID})
eval2 := mock.Eval()
eval2.ID = eval.ID
err = state.UpsertEvals(1001, []*structs.Evaluation{eval2})
@ -565,40 +639,54 @@ func TestStateStore_Update_UpsertEvals_Eval(t *testing.T) {
if index != 1001 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_DeleteEval_Eval(t *testing.T) {
state := testStateStore(t)
eval := mock.Eval()
eval1 := mock.Eval()
eval2 := mock.Eval()
alloc := mock.Alloc()
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
err := state.UpsertEvals(1000, []*structs.Evaluation{eval, eval2})
notify := setupNotifyTest(
state,
watch.Item{Table: "evals"},
watch.Item{Table: "allocs"},
watch.Item{Eval: eval1.ID},
watch.Item{Eval: eval2.ID},
watch.Item{Alloc: alloc1.ID},
watch.Item{Alloc: alloc2.ID},
watch.Item{AllocEval: alloc1.EvalID},
watch.Item{AllocEval: alloc2.EvalID},
watch.Item{AllocJob: alloc1.JobID},
watch.Item{AllocJob: alloc2.JobID},
watch.Item{AllocNode: alloc1.NodeID},
watch.Item{AllocNode: alloc2.NodeID})
err := state.UpsertEvals(1000, []*structs.Evaluation{eval1, eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc, alloc2})
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc1, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
notify1 := make(chan struct{}, 1)
state.WatchAllocs(alloc.NodeID, notify1)
err = state.DeleteEval(1002, []string{eval.ID, eval2.ID}, []string{alloc.ID, alloc2.ID})
err = state.DeleteEval(1002, []string{eval1.ID, eval2.ID}, []string{alloc1.ID, alloc2.ID})
if err != nil {
t.Fatalf("err: %v", err)
}
out, err := state.EvalByID(eval.ID)
out, err := state.EvalByID(eval1.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %#v %#v", eval, out)
t.Fatalf("bad: %#v %#v", eval1, out)
}
out, err = state.EvalByID(eval2.ID)
@ -607,16 +695,16 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) {
}
if out != nil {
t.Fatalf("bad: %#v %#v", eval, out)
t.Fatalf("bad: %#v %#v", eval1, out)
}
outA, err := state.AllocByID(alloc.ID)
outA, err := state.AllocByID(alloc1.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %#v %#v", alloc, outA)
t.Fatalf("bad: %#v %#v", alloc1, outA)
}
outA, err = state.AllocByID(alloc2.ID)
@ -625,7 +713,7 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) {
}
if out != nil {
t.Fatalf("bad: %#v %#v", alloc, outA)
t.Fatalf("bad: %#v %#v", alloc1, outA)
}
index, err := state.Index("evals")
@ -644,11 +732,7 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) {
t.Fatalf("bad: %d", index)
}
select {
case <-notify1:
default:
t.Fatalf("should be notified")
}
notify.verify(t)
}
func TestStateStore_EvalsByJob(t *testing.T) {
@ -720,34 +804,48 @@ func TestStateStore_Evals(t *testing.T) {
func TestStateStore_RestoreEval(t *testing.T) {
state := testStateStore(t)
eval := mock.Eval()
notify := setupNotifyTest(
state,
watch.Item{Table: "evals"},
watch.Item{Eval: eval.ID})
restore, err := state.Restore()
if err != nil {
t.Fatalf("err: %v", err)
}
job := mock.Eval()
err = restore.EvalRestore(job)
err = restore.EvalRestore(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
restore.Commit()
out, err := state.EvalByID(job.ID)
out, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(out, job) {
t.Fatalf("Bad: %#v %#v", out, job)
if !reflect.DeepEqual(out, eval) {
t.Fatalf("Bad: %#v %#v", out, eval)
}
notify.verify(t)
}
func TestStateStore_UpdateAllocFromClient(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
notify := setupNotifyTest(
state,
watch.Item{Table: "allocs"},
watch.Item{Alloc: alloc.ID},
watch.Item{AllocEval: alloc.EvalID},
watch.Item{AllocJob: alloc.JobID},
watch.Item{AllocNode: alloc.NodeID})
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
@ -779,12 +877,22 @@ func TestStateStore_UpdateAllocFromClient(t *testing.T) {
if index != 1001 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_UpsertAlloc_Alloc(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
notify := setupNotifyTest(
state,
watch.Item{Table: "allocs"},
watch.Item{Alloc: alloc.ID},
watch.Item{AllocEval: alloc.EvalID},
watch.Item{AllocJob: alloc.JobID},
watch.Item{AllocNode: alloc.NodeID})
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
@ -806,35 +914,8 @@ func TestStateStore_UpsertAlloc_Alloc(t *testing.T) {
if index != 1000 {
t.Fatalf("bad: %d", index)
}
}
func TestStateStore_WatchAllocs(t *testing.T) {
state := testStateStore(t)
notify1 := make(chan struct{}, 1)
notify2 := make(chan struct{}, 1)
state.WatchAllocs("foo", notify1)
state.WatchAllocs("foo", notify2)
state.StopWatchAllocs("foo", notify2)
alloc := mock.Alloc()
alloc.NodeID = "foo"
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
select {
case <-notify1:
default:
t.Fatalf("should be notified")
}
select {
case <-notify2:
t.Fatalf("should not be notified")
default:
}
notify.verify(t)
}
func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
@ -849,6 +930,15 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
alloc2 := mock.Alloc()
alloc2.ID = alloc.ID
alloc2.NodeID = alloc.NodeID + ".new"
notify := setupNotifyTest(
state,
watch.Item{Table: "allocs"},
watch.Item{Alloc: alloc2.ID},
watch.Item{AllocEval: alloc2.EvalID},
watch.Item{AllocJob: alloc2.JobID},
watch.Item{AllocNode: alloc2.NodeID})
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
@ -877,6 +967,8 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
if index != 1001 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_EvictAlloc_Alloc(t *testing.T) {
@ -1008,13 +1100,21 @@ func TestStateStore_Allocs(t *testing.T) {
func TestStateStore_RestoreAlloc(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
notify := setupNotifyTest(
state,
watch.Item{Table: "allocs"},
watch.Item{Alloc: alloc.ID},
watch.Item{AllocEval: alloc.EvalID},
watch.Item{AllocJob: alloc.JobID},
watch.Item{AllocNode: alloc.NodeID})
restore, err := state.Restore()
if err != nil {
t.Fatalf("err: %v", err)
}
alloc := mock.Alloc()
err = restore.AllocRestore(alloc)
if err != nil {
t.Fatalf("err: %v", err)
@ -1030,6 +1130,87 @@ func TestStateStore_RestoreAlloc(t *testing.T) {
if !reflect.DeepEqual(out, alloc) {
t.Fatalf("Bad: %#v %#v", out, alloc)
}
notify.verify(t)
}
func TestStateWatch_watch(t *testing.T) {
sw := newStateWatch()
notify1 := make(chan struct{}, 1)
notify2 := make(chan struct{}, 1)
notify3 := make(chan struct{}, 1)
// Notifications trigger subscribed channels
sw.watch(watch.NewItems(watch.Item{Table: "foo"}), notify1)
sw.watch(watch.NewItems(watch.Item{Table: "bar"}), notify2)
sw.watch(watch.NewItems(watch.Item{Table: "baz"}), notify3)
items := watch.NewItems()
items.Add(watch.Item{Table: "foo"})
items.Add(watch.Item{Table: "bar"})
sw.notify(items)
if len(notify1) != 1 {
t.Fatalf("should notify")
}
if len(notify2) != 1 {
t.Fatalf("should notify")
}
if len(notify3) != 0 {
t.Fatalf("should not notify")
}
}
func TestStateWatch_stopWatch(t *testing.T) {
sw := newStateWatch()
notify := make(chan struct{})
// First subscribe
sw.watch(watch.NewItems(watch.Item{Table: "foo"}), notify)
// Unsubscribe stop notifications
sw.stopWatch(watch.NewItems(watch.Item{Table: "foo"}), notify)
// Check that the group was removed
if _, ok := sw.items[watch.Item{Table: "foo"}]; ok {
t.Fatalf("should remove group")
}
// Check that we are not notified
sw.notify(watch.NewItems(watch.Item{Table: "foo"}))
if len(notify) != 0 {
t.Fatalf("should not notify")
}
}
// setupNotifyTest takes a state store and a set of watch items, then creates
// and subscribes a notification channel for each item.
func setupNotifyTest(state *StateStore, items ...watch.Item) notifyTest {
var n notifyTest
for _, item := range items {
ch := make(chan struct{}, 1)
state.Watch(watch.NewItems(item), ch)
n = append(n, &notifyTestCase{item, ch})
}
return n
}
// notifyTestCase is used to set up and verify watch triggers.
type notifyTestCase struct {
item watch.Item
ch chan struct{}
}
// notifyTest is a suite of notifyTestCases.
type notifyTest []*notifyTestCase
// verify ensures that each channel received a notification.
func (n notifyTest) verify(t *testing.T) {
for _, tcase := range n {
if len(tcase.ch) != 1 {
t.Fatalf("should notify %#v", tcase.item)
}
}
}
// NodeIDSort is used to sort nodes by ID

38
nomad/watch/watch.go Normal file
View file

@ -0,0 +1,38 @@
package watch
// The watch package provides a means of describing a watch for a blocking
// query. It is exported so it may be shared between Nomad's RPC layer and
// the underlying state store.
// Item describes the scope of a watch. It is used to provide a uniform
// input for subscribe/unsubscribe and notification firing. Specifying
// multiple fields does not place a watch on multiple items. Each Item
// describes exactly one scoped watch.
type Item struct {
Alloc string
AllocEval string
AllocJob string
AllocNode string
Eval string
Job string
Node string
Table string
}
// Items is a helper used to construct a set of watchItems. It deduplicates
// the items as they are added using map keys.
type Items map[Item]struct{}
// NewItems creates a new Items set and adds the given items.
func NewItems(items ...Item) Items {
wi := make(Items)
for _, item := range items {
wi.Add(item)
}
return wi
}
// Add adds an item to the watch set.
func (wi Items) Add(i Item) {
wi[i] = struct{}{}
}

31
nomad/watch/watch_test.go Normal file
View file

@ -0,0 +1,31 @@
package watch
import (
"testing"
)
func TestWatchItems(t *testing.T) {
// Creates an empty set of items
wi := NewItems()
if len(wi) != 0 {
t.Fatalf("expect 0 items, got: %#v", wi)
}
// Creates a new set of supplied items
wi = NewItems(Item{Table: "foo"})
if len(wi) != 1 {
t.Fatalf("expected 1 item, got: %#v", wi)
}
// Adding items works
wi.Add(Item{Node: "bar"})
if len(wi) != 2 {
t.Fatalf("expected 2 items, got: %#v", wi)
}
// Adding duplicates auto-dedupes
wi.Add(Item{Table: "foo"})
if len(wi) != 2 {
t.Fatalf("expected 2 items, got: %#v", wi)
}
}