Merge pull request #821 from hashicorp/f-client-set-receive
Client stores when it receives a task
This commit is contained in:
commit
fef9b70a9e
|
@ -8,7 +8,6 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/go-multierror"
|
|
||||||
"github.com/hashicorp/nomad/client/allocdir"
|
"github.com/hashicorp/nomad/client/allocdir"
|
||||||
"github.com/hashicorp/nomad/client/config"
|
"github.com/hashicorp/nomad/client/config"
|
||||||
"github.com/hashicorp/nomad/client/driver"
|
"github.com/hashicorp/nomad/client/driver"
|
||||||
|
@ -19,6 +18,14 @@ const (
|
||||||
// allocSyncRetryIntv is the interval on which we retry updating
|
// allocSyncRetryIntv is the interval on which we retry updating
|
||||||
// the status of the allocation
|
// the status of the allocation
|
||||||
allocSyncRetryIntv = 15 * time.Second
|
allocSyncRetryIntv = 15 * time.Second
|
||||||
|
|
||||||
|
// taskReceivedSyncLimit is how long the client will wait before sending
|
||||||
|
// that a task was received to the server. The client does not immediately
|
||||||
|
// send that the task was received to the server because another transistion
|
||||||
|
// to running or failed is likely to occur immediately after and a single
|
||||||
|
// update will transfer all past state information. If not other transistion
|
||||||
|
// has occured up to this limit, we will send to the server.
|
||||||
|
taskReceivedSyncLimit = 30 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// AllocStateUpdater is used to update the status of an allocation
|
// AllocStateUpdater is used to update the status of an allocation
|
||||||
|
@ -45,6 +52,11 @@ type AllocRunner struct {
|
||||||
restored map[string]struct{}
|
restored map[string]struct{}
|
||||||
taskLock sync.RWMutex
|
taskLock sync.RWMutex
|
||||||
|
|
||||||
|
// taskReceivedTimer is used to mitigate updates sent to the server because
|
||||||
|
// we expect that shortly after receiving an alloc it will transistion
|
||||||
|
// state. We use a timer to send the update if this hasn't happened after a
|
||||||
|
// reasonable time.
|
||||||
|
taskReceivedTimer *time.Timer
|
||||||
taskStatusLock sync.RWMutex
|
taskStatusLock sync.RWMutex
|
||||||
|
|
||||||
updateCh chan *structs.Allocation
|
updateCh chan *structs.Allocation
|
||||||
|
@ -126,7 +138,8 @@ func (r *AllocRunner) RestoreState() error {
|
||||||
if err := tr.RestoreState(); err != nil {
|
if err := tr.RestoreState(); err != nil {
|
||||||
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task '%s': %v", r.alloc.ID, name, err)
|
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task '%s': %v", r.alloc.ID, name, err)
|
||||||
mErr.Errors = append(mErr.Errors, err)
|
mErr.Errors = append(mErr.Errors, err)
|
||||||
} else {
|
} else if !r.alloc.TerminalStatus() {
|
||||||
|
// Only start if the alloc isn't in a terminal status.
|
||||||
go tr.Run()
|
go tr.Run()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -323,6 +336,24 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
|
||||||
taskState.State = state
|
taskState.State = state
|
||||||
r.appendTaskEvent(taskState, event)
|
r.appendTaskEvent(taskState, event)
|
||||||
|
|
||||||
|
// We don't immediately mark ourselves as dirty, since in most cases there
|
||||||
|
// will immediately be another state transistion. This reduces traffic to
|
||||||
|
// the server.
|
||||||
|
if event != nil && event.Type == structs.TaskReceived {
|
||||||
|
if r.taskReceivedTimer == nil {
|
||||||
|
r.taskReceivedTimer = time.AfterFunc(taskReceivedSyncLimit, func() {
|
||||||
|
// Send a dirty signal to sync our state.
|
||||||
|
r.dirtyCh <- struct{}{}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel any existing received state timer.
|
||||||
|
if r.taskReceivedTimer != nil {
|
||||||
|
r.taskReceivedTimer.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case r.dirtyCh <- struct{}{}:
|
case r.dirtyCh <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -95,6 +95,9 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
|
||||||
destroyCh: make(chan struct{}),
|
destroyCh: make(chan struct{}),
|
||||||
waitCh: make(chan struct{}),
|
waitCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set the state to pending.
|
||||||
|
tc.updater(task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived))
|
||||||
return tc
|
return tc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -68,20 +68,24 @@ func TestTaskRunner_SimpleRun(t *testing.T) {
|
||||||
t.Fatalf("timeout")
|
t.Fatalf("timeout")
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(upd.events) != 2 {
|
if len(upd.events) != 3 {
|
||||||
t.Fatalf("should have 2 updates: %#v", upd.events)
|
t.Fatalf("should have 3 updates: %#v", upd.events)
|
||||||
}
|
}
|
||||||
|
|
||||||
if upd.state != structs.TaskStateDead {
|
if upd.state != structs.TaskStateDead {
|
||||||
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
||||||
}
|
}
|
||||||
|
|
||||||
if upd.events[0].Type != structs.TaskStarted {
|
if upd.events[0].Type != structs.TaskReceived {
|
||||||
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskStarted)
|
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||||
}
|
}
|
||||||
|
|
||||||
if upd.events[1].Type != structs.TaskTerminated {
|
if upd.events[1].Type != structs.TaskStarted {
|
||||||
t.Fatalf("First Event was %v; want %v", upd.events[1].Type, structs.TaskTerminated)
|
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||||
|
}
|
||||||
|
|
||||||
|
if upd.events[2].Type != structs.TaskTerminated {
|
||||||
|
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskTerminated)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,20 +111,24 @@ func TestTaskRunner_Destroy(t *testing.T) {
|
||||||
t.Fatalf("timeout")
|
t.Fatalf("timeout")
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(upd.events) != 2 {
|
if len(upd.events) != 3 {
|
||||||
t.Fatalf("should have 2 updates: %#v", upd.events)
|
t.Fatalf("should have 3 updates: %#v", upd.events)
|
||||||
}
|
}
|
||||||
|
|
||||||
if upd.state != structs.TaskStateDead {
|
if upd.state != structs.TaskStateDead {
|
||||||
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
||||||
}
|
}
|
||||||
|
|
||||||
if upd.events[0].Type != structs.TaskStarted {
|
if upd.events[0].Type != structs.TaskReceived {
|
||||||
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskStarted)
|
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||||
}
|
}
|
||||||
|
|
||||||
if upd.events[1].Type != structs.TaskKilled {
|
if upd.events[1].Type != structs.TaskStarted {
|
||||||
t.Fatalf("First Event was %v; want %v", upd.events[1].Type, structs.TaskKilled)
|
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||||
|
}
|
||||||
|
|
||||||
|
if upd.events[2].Type != structs.TaskKilled {
|
||||||
|
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskKilled)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1687,6 +1687,10 @@ const (
|
||||||
// failure in the driver.
|
// failure in the driver.
|
||||||
TaskDriverFailure = "Driver Failure"
|
TaskDriverFailure = "Driver Failure"
|
||||||
|
|
||||||
|
// Task Received signals that the task has been pulled by the client at the
|
||||||
|
// given timestamp.
|
||||||
|
TaskReceived = "Received"
|
||||||
|
|
||||||
// Task Started signals that the task was started and its timestamp can be
|
// Task Started signals that the task was started and its timestamp can be
|
||||||
// used to determine the running length of the task.
|
// used to determine the running length of the task.
|
||||||
TaskStarted = "Started"
|
TaskStarted = "Started"
|
||||||
|
|
Loading…
Reference in a new issue